aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/pubsub/mod.rs
diff options
context:
space:
mode:
authorRasmus Melchior Jacobsen <[email protected]>2023-03-15 16:45:18 +0100
committerRasmus Melchior Jacobsen <[email protected]>2023-03-15 16:45:18 +0100
commit472df3fad6dde82b15e8f4716291add8b1cae1ae (patch)
tree433717fec4e91f8585ccb36b1931df896edf1a85 /embassy-sync/src/pubsub/mod.rs
parent2c9f289f40b0c5dcbbcdb24cc7beda37a79a7287 (diff)
fix(pubsub): Pop messages which count is 0 after unsubscribe
Diffstat (limited to 'embassy-sync/src/pubsub/mod.rs')
-rw-r--r--embassy-sync/src/pubsub/mod.rs47
1 files changed, 47 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index faaf99dc6..5989e86ec 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -371,6 +371,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
371 .iter_mut() 371 .iter_mut()
372 .skip(current_message_index) 372 .skip(current_message_index)
373 .for_each(|(_, counter)| *counter -= 1); 373 .for_each(|(_, counter)| *counter -= 1);
374
375 let mut wake_publishers = false;
376 while let Some((_, count)) = self.queue.front() {
377 if *count == 0 {
378 self.queue.pop_front().unwrap();
379 wake_publishers = true;
380 } else {
381 break;
382 }
383 }
384
385 if wake_publishers {
386 self.publisher_wakers.wake();
387 }
374 } 388 }
375 } 389 }
376 390
@@ -612,4 +626,37 @@ mod tests {
612 sub1.next_message().await; 626 sub1.next_message().await;
613 assert_eq!(pub0.space(), 4); 627 assert_eq!(pub0.space(), 4);
614 } 628 }
629
630 #[futures_test::test]
631 async fn empty_channel_when_last_subscriber_is_dropped() {
632 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
633
634 let pub0 = channel.publisher().unwrap();
635 let mut sub0 = channel.subscriber().unwrap();
636 let mut sub1 = channel.subscriber().unwrap();
637
638 assert_eq!(4, pub0.space());
639
640 pub0.publish(1).await;
641 pub0.publish(2).await;
642
643 assert_eq!(2, channel.space());
644
645 assert_eq!(1, sub0.try_next_message_pure().unwrap());
646 assert_eq!(2, sub0.try_next_message_pure().unwrap());
647
648 assert_eq!(2, channel.space());
649
650 drop(sub0);
651
652 assert_eq!(2, channel.space());
653
654 assert_eq!(1, sub1.try_next_message_pure().unwrap());
655
656 assert_eq!(3, channel.space());
657
658 drop(sub1);
659
660 assert_eq!(4, channel.space());
661 }
615} 662}