aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-sync/src/pubsub/mod.rs32
1 files changed, 28 insertions, 4 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 5989e86ec..59e701c58 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -322,12 +322,15 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
322 322
323 // We're reading this item, so decrement the counter 323 // We're reading this item, so decrement the counter
324 queue_item.1 -= 1; 324 queue_item.1 -= 1;
325 let message = queue_item.0.clone();
326 325
327 if current_message_index == 0 && queue_item.1 == 0 { 326 let message = if current_message_index == 0 && queue_item.1 == 0 {
328 self.queue.pop_front(); 327 let (message, _) = self.queue.pop_front().unwrap();
329 self.publisher_wakers.wake(); 328 self.publisher_wakers.wake();
330 } 329 // Return pop'd message without clone
330 message
331 } else {
332 queue_item.0.clone()
333 };
331 334
332 Some(WaitResult::Message(message)) 335 Some(WaitResult::Message(message))
333 } 336 }
@@ -659,4 +662,25 @@ mod tests {
659 662
660 assert_eq!(4, channel.space()); 663 assert_eq!(4, channel.space());
661 } 664 }
665
666 struct CloneCallCounter(usize);
667
668 impl Clone for CloneCallCounter {
669 fn clone(&self) -> Self {
670 Self(self.0 + 1)
671 }
672 }
673
674 #[futures_test::test]
675 async fn skip_clone_for_last_message() {
676 let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
677 let pub0 = channel.publisher().unwrap();
678 let mut sub0 = channel.subscriber().unwrap();
679 let mut sub1 = channel.subscriber().unwrap();
680
681 pub0.publish(CloneCallCounter(0)).await;
682
683 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
684 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
685 }
662} 686}