aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/pubsub/mod.rs
diff options
context:
space:
mode:
authorDion Dokter <[email protected]>2024-05-20 15:34:03 +0200
committerDion Dokter <[email protected]>2024-05-20 15:34:03 +0200
commita76082b104edd4cbb7f10a4a589ef6773d8d6e4f (patch)
treed91061e9bf142f0b51713f90ec52936ed0837d8f /embassy-sync/src/pubsub/mod.rs
parent2a4a714060d4cdb976eab62a70f9c1242425714f (diff)
Expose new length functions in the subs and pubs
Diffstat (limited to 'embassy-sync/src/pubsub/mod.rs')
-rw-r--r--embassy-sync/src/pubsub/mod.rs82
1 files changed, 60 insertions, 22 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index adbf8cf02..af3d6db2a 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -248,13 +248,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
248 }) 248 })
249 } 249 }
250 250
251 fn space(&self) -> usize {
252 self.inner.lock(|s| {
253 let s = s.borrow();
254 s.queue.capacity() - s.queue.len()
255 })
256 }
257
258 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 251 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
259 self.inner.lock(|s| { 252 self.inner.lock(|s| {
260 let mut s = s.borrow_mut(); 253 let mut s = s.borrow_mut();
@@ -268,6 +261,26 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
268 s.unregister_publisher() 261 s.unregister_publisher()
269 }) 262 })
270 } 263 }
264
265 fn capacity(&self) -> usize {
266 self.capacity()
267 }
268
269 fn free_capacity(&self) -> usize {
270 self.free_capacity()
271 }
272
273 fn len(&self) -> usize {
274 self.len()
275 }
276
277 fn is_empty(&self) -> bool {
278 self.is_empty()
279 }
280
281 fn is_full(&self) -> bool {
282 self.is_full()
283 }
271} 284}
272 285
273/// Internal state for the PubSub channel 286/// Internal state for the PubSub channel
@@ -439,8 +452,22 @@ trait SealedPubSubBehavior<T> {
439 /// Publish a message immediately 452 /// Publish a message immediately
440 fn publish_immediate(&self, message: T); 453 fn publish_immediate(&self, message: T);
441 454
442 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 455 /// Returns the maximum number of elements the channel can hold.
443 fn space(&self) -> usize; 456 fn capacity(&self) -> usize;
457
458 /// Returns the free capacity of the channel.
459 ///
460 /// This is equivalent to `capacity() - len()`
461 fn free_capacity(&self) -> usize;
462
463 /// Returns the number of elements currently in the channel.
464 fn len(&self) -> usize;
465
466 /// Returns whether the channel is empty.
467 fn is_empty(&self) -> bool;
468
469 /// Returns whether the channel is full.
470 fn is_full(&self) -> bool;
444 471
445 /// Let the channel know that a subscriber has dropped 472 /// Let the channel know that a subscriber has dropped
446 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 473 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -588,6 +615,7 @@ mod tests {
588 assert_eq!(pub0.try_publish(0), Ok(())); 615 assert_eq!(pub0.try_publish(0), Ok(()));
589 assert_eq!(pub0.try_publish(0), Ok(())); 616 assert_eq!(pub0.try_publish(0), Ok(()));
590 assert_eq!(pub0.try_publish(0), Ok(())); 617 assert_eq!(pub0.try_publish(0), Ok(()));
618 assert!(pub0.is_full());
591 assert_eq!(pub0.try_publish(0), Err(0)); 619 assert_eq!(pub0.try_publish(0), Err(0));
592 620
593 drop(sub0); 621 drop(sub0);
@@ -620,32 +648,42 @@ mod tests {
620 } 648 }
621 649
622 #[futures_test::test] 650 #[futures_test::test]
623 async fn correct_space() { 651 async fn correct_len() {
624 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 652 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
625 653
626 let mut sub0 = channel.subscriber().unwrap(); 654 let mut sub0 = channel.subscriber().unwrap();
627 let mut sub1 = channel.subscriber().unwrap(); 655 let mut sub1 = channel.subscriber().unwrap();
628 let pub0 = channel.publisher().unwrap(); 656 let pub0 = channel.publisher().unwrap();
629 657
630 assert_eq!(pub0.space(), 4); 658 assert!(sub0.is_empty());
659 assert!(sub1.is_empty());
660 assert!(pub0.is_empty());
661 assert_eq!(pub0.free_capacity(), 4);
662 assert_eq!(pub0.len(), 0);
631 663
632 pub0.publish(42).await; 664 pub0.publish(42).await;
633 665
634 assert_eq!(pub0.space(), 3); 666 assert_eq!(pub0.free_capacity(), 3);
667 assert_eq!(pub0.len(), 1);
635 668
636 pub0.publish(42).await; 669 pub0.publish(42).await;
637 670
638 assert_eq!(pub0.space(), 2); 671 assert_eq!(pub0.free_capacity(), 2);
672 assert_eq!(pub0.len(), 2);
639 673
640 sub0.next_message().await; 674 sub0.next_message().await;
641 sub0.next_message().await; 675 sub0.next_message().await;
642 676
643 assert_eq!(pub0.space(), 2); 677 assert_eq!(pub0.free_capacity(), 2);
678 assert_eq!(pub0.len(), 2);
644 679
645 sub1.next_message().await; 680 sub1.next_message().await;
646 assert_eq!(pub0.space(), 3); 681 assert_eq!(pub0.free_capacity(), 3);
682 assert_eq!(pub0.len(), 1);
683
647 sub1.next_message().await; 684 sub1.next_message().await;
648 assert_eq!(pub0.space(), 4); 685 assert_eq!(pub0.free_capacity(), 4);
686 assert_eq!(pub0.len(), 0);
649 } 687 }
650 688
651 #[futures_test::test] 689 #[futures_test::test]
@@ -656,29 +694,29 @@ mod tests {
656 let mut sub0 = channel.subscriber().unwrap(); 694 let mut sub0 = channel.subscriber().unwrap();
657 let mut sub1 = channel.subscriber().unwrap(); 695 let mut sub1 = channel.subscriber().unwrap();
658 696
659 assert_eq!(4, pub0.space()); 697 assert_eq!(4, pub0.free_capacity());
660 698
661 pub0.publish(1).await; 699 pub0.publish(1).await;
662 pub0.publish(2).await; 700 pub0.publish(2).await;
663 701
664 assert_eq!(2, channel.space()); 702 assert_eq!(2, channel.free_capacity());
665 703
666 assert_eq!(1, sub0.try_next_message_pure().unwrap()); 704 assert_eq!(1, sub0.try_next_message_pure().unwrap());
667 assert_eq!(2, sub0.try_next_message_pure().unwrap()); 705 assert_eq!(2, sub0.try_next_message_pure().unwrap());
668 706
669 assert_eq!(2, channel.space()); 707 assert_eq!(2, channel.free_capacity());
670 708
671 drop(sub0); 709 drop(sub0);
672 710
673 assert_eq!(2, channel.space()); 711 assert_eq!(2, channel.free_capacity());
674 712
675 assert_eq!(1, sub1.try_next_message_pure().unwrap()); 713 assert_eq!(1, sub1.try_next_message_pure().unwrap());
676 714
677 assert_eq!(3, channel.space()); 715 assert_eq!(3, channel.free_capacity());
678 716
679 drop(sub1); 717 drop(sub1);
680 718
681 assert_eq!(4, channel.space()); 719 assert_eq!(4, channel.free_capacity());
682 } 720 }
683 721
684 struct CloneCallCounter(usize); 722 struct CloneCallCounter(usize);