diff options
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index faaf99dc6..5989e86ec 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -371,6 +371,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 371 | .iter_mut() | 371 | .iter_mut() |
| 372 | .skip(current_message_index) | 372 | .skip(current_message_index) |
| 373 | .for_each(|(_, counter)| *counter -= 1); | 373 | .for_each(|(_, counter)| *counter -= 1); |
| 374 | |||
| 375 | let mut wake_publishers = false; | ||
| 376 | while let Some((_, count)) = self.queue.front() { | ||
| 377 | if *count == 0 { | ||
| 378 | self.queue.pop_front().unwrap(); | ||
| 379 | wake_publishers = true; | ||
| 380 | } else { | ||
| 381 | break; | ||
| 382 | } | ||
| 383 | } | ||
| 384 | |||
| 385 | if wake_publishers { | ||
| 386 | self.publisher_wakers.wake(); | ||
| 387 | } | ||
| 374 | } | 388 | } |
| 375 | } | 389 | } |
| 376 | 390 | ||
| @@ -612,4 +626,37 @@ mod tests { | |||
| 612 | sub1.next_message().await; | 626 | sub1.next_message().await; |
| 613 | assert_eq!(pub0.space(), 4); | 627 | assert_eq!(pub0.space(), 4); |
| 614 | } | 628 | } |
| 629 | |||
| 630 | #[futures_test::test] | ||
| 631 | async fn empty_channel_when_last_subscriber_is_dropped() { | ||
| 632 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 633 | |||
| 634 | let pub0 = channel.publisher().unwrap(); | ||
| 635 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 636 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 637 | |||
| 638 | assert_eq!(4, pub0.space()); | ||
| 639 | |||
| 640 | pub0.publish(1).await; | ||
| 641 | pub0.publish(2).await; | ||
| 642 | |||
| 643 | assert_eq!(2, channel.space()); | ||
| 644 | |||
| 645 | assert_eq!(1, sub0.try_next_message_pure().unwrap()); | ||
| 646 | assert_eq!(2, sub0.try_next_message_pure().unwrap()); | ||
| 647 | |||
| 648 | assert_eq!(2, channel.space()); | ||
| 649 | |||
| 650 | drop(sub0); | ||
| 651 | |||
| 652 | assert_eq!(2, channel.space()); | ||
| 653 | |||
| 654 | assert_eq!(1, sub1.try_next_message_pure().unwrap()); | ||
| 655 | |||
| 656 | assert_eq!(3, channel.space()); | ||
| 657 | |||
| 658 | drop(sub1); | ||
| 659 | |||
| 660 | assert_eq!(4, channel.space()); | ||
| 661 | } | ||
| 615 | } | 662 | } |
