aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-sync/src/pubsub/mod.rs82
-rw-r--r--embassy-sync/src/pubsub/publisher.rs58
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs31
3 files changed, 138 insertions, 33 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index adbf8cf02..af3d6db2a 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -248,13 +248,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
248 }) 248 })
249 } 249 }
250 250
251 fn space(&self) -> usize {
252 self.inner.lock(|s| {
253 let s = s.borrow();
254 s.queue.capacity() - s.queue.len()
255 })
256 }
257
258 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 251 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
259 self.inner.lock(|s| { 252 self.inner.lock(|s| {
260 let mut s = s.borrow_mut(); 253 let mut s = s.borrow_mut();
@@ -268,6 +261,26 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
268 s.unregister_publisher() 261 s.unregister_publisher()
269 }) 262 })
270 } 263 }
264
265 fn capacity(&self) -> usize {
266 self.capacity()
267 }
268
269 fn free_capacity(&self) -> usize {
270 self.free_capacity()
271 }
272
273 fn len(&self) -> usize {
274 self.len()
275 }
276
277 fn is_empty(&self) -> bool {
278 self.is_empty()
279 }
280
281 fn is_full(&self) -> bool {
282 self.is_full()
283 }
271} 284}
272 285
273/// Internal state for the PubSub channel 286/// Internal state for the PubSub channel
@@ -439,8 +452,22 @@ trait SealedPubSubBehavior<T> {
439 /// Publish a message immediately 452 /// Publish a message immediately
440 fn publish_immediate(&self, message: T); 453 fn publish_immediate(&self, message: T);
441 454
442 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 455 /// Returns the maximum number of elements the channel can hold.
443 fn space(&self) -> usize; 456 fn capacity(&self) -> usize;
457
458 /// Returns the free capacity of the channel.
459 ///
460 /// This is equivalent to `capacity() - len()`
461 fn free_capacity(&self) -> usize;
462
463 /// Returns the number of elements currently in the channel.
464 fn len(&self) -> usize;
465
466 /// Returns whether the channel is empty.
467 fn is_empty(&self) -> bool;
468
469 /// Returns whether the channel is full.
470 fn is_full(&self) -> bool;
444 471
445 /// Let the channel know that a subscriber has dropped 472 /// Let the channel know that a subscriber has dropped
446 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 473 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -588,6 +615,7 @@ mod tests {
588 assert_eq!(pub0.try_publish(0), Ok(())); 615 assert_eq!(pub0.try_publish(0), Ok(()));
589 assert_eq!(pub0.try_publish(0), Ok(())); 616 assert_eq!(pub0.try_publish(0), Ok(()));
590 assert_eq!(pub0.try_publish(0), Ok(())); 617 assert_eq!(pub0.try_publish(0), Ok(()));
618 assert!(pub0.is_full());
591 assert_eq!(pub0.try_publish(0), Err(0)); 619 assert_eq!(pub0.try_publish(0), Err(0));
592 620
593 drop(sub0); 621 drop(sub0);
@@ -620,32 +648,42 @@ mod tests {
620 } 648 }
621 649
622 #[futures_test::test] 650 #[futures_test::test]
623 async fn correct_space() { 651 async fn correct_len() {
624 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 652 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
625 653
626 let mut sub0 = channel.subscriber().unwrap(); 654 let mut sub0 = channel.subscriber().unwrap();
627 let mut sub1 = channel.subscriber().unwrap(); 655 let mut sub1 = channel.subscriber().unwrap();
628 let pub0 = channel.publisher().unwrap(); 656 let pub0 = channel.publisher().unwrap();
629 657
630 assert_eq!(pub0.space(), 4); 658 assert!(sub0.is_empty());
659 assert!(sub1.is_empty());
660 assert!(pub0.is_empty());
661 assert_eq!(pub0.free_capacity(), 4);
662 assert_eq!(pub0.len(), 0);
631 663
632 pub0.publish(42).await; 664 pub0.publish(42).await;
633 665
634 assert_eq!(pub0.space(), 3); 666 assert_eq!(pub0.free_capacity(), 3);
667 assert_eq!(pub0.len(), 1);
635 668
636 pub0.publish(42).await; 669 pub0.publish(42).await;
637 670
638 assert_eq!(pub0.space(), 2); 671 assert_eq!(pub0.free_capacity(), 2);
672 assert_eq!(pub0.len(), 2);
639 673
640 sub0.next_message().await; 674 sub0.next_message().await;
641 sub0.next_message().await; 675 sub0.next_message().await;
642 676
643 assert_eq!(pub0.space(), 2); 677 assert_eq!(pub0.free_capacity(), 2);
678 assert_eq!(pub0.len(), 2);
644 679
645 sub1.next_message().await; 680 sub1.next_message().await;
646 assert_eq!(pub0.space(), 3); 681 assert_eq!(pub0.free_capacity(), 3);
682 assert_eq!(pub0.len(), 1);
683
647 sub1.next_message().await; 684 sub1.next_message().await;
648 assert_eq!(pub0.space(), 4); 685 assert_eq!(pub0.free_capacity(), 4);
686 assert_eq!(pub0.len(), 0);
649 } 687 }
650 688
651 #[futures_test::test] 689 #[futures_test::test]
@@ -656,29 +694,29 @@ mod tests {
656 let mut sub0 = channel.subscriber().unwrap(); 694 let mut sub0 = channel.subscriber().unwrap();
657 let mut sub1 = channel.subscriber().unwrap(); 695 let mut sub1 = channel.subscriber().unwrap();
658 696
659 assert_eq!(4, pub0.space()); 697 assert_eq!(4, pub0.free_capacity());
660 698
661 pub0.publish(1).await; 699 pub0.publish(1).await;
662 pub0.publish(2).await; 700 pub0.publish(2).await;
663 701
664 assert_eq!(2, channel.space()); 702 assert_eq!(2, channel.free_capacity());
665 703
666 assert_eq!(1, sub0.try_next_message_pure().unwrap()); 704 assert_eq!(1, sub0.try_next_message_pure().unwrap());
667 assert_eq!(2, sub0.try_next_message_pure().unwrap()); 705 assert_eq!(2, sub0.try_next_message_pure().unwrap());
668 706
669 assert_eq!(2, channel.space()); 707 assert_eq!(2, channel.free_capacity());
670 708
671 drop(sub0); 709 drop(sub0);
672 710
673 assert_eq!(2, channel.space()); 711 assert_eq!(2, channel.free_capacity());
674 712
675 assert_eq!(1, sub1.try_next_message_pure().unwrap()); 713 assert_eq!(1, sub1.try_next_message_pure().unwrap());
676 714
677 assert_eq!(3, channel.space()); 715 assert_eq!(3, channel.free_capacity());
678 716
679 drop(sub1); 717 drop(sub1);
680 718
681 assert_eq!(4, channel.space()); 719 assert_eq!(4, channel.free_capacity());
682 } 720 }
683 721
684 struct CloneCallCounter(usize); 722 struct CloneCallCounter(usize);
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e1edc9eb9..26e2c63b7 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -43,12 +43,31 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45 45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 46 /// Returns the maximum number of elements the ***channel*** can hold.
47 pub fn capacity(&self) -> usize {
48 self.channel.capacity()
49 }
50
51 /// Returns the free capacity of the ***channel***.
47 /// 52 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 53 /// This is equivalent to `capacity() - len()`
49 /// So checking doesn't give any guarantees.* 54 pub fn free_capacity(&self) -> usize {
50 pub fn space(&self) -> usize { 55 self.channel.free_capacity()
51 self.channel.space() 56 }
57
58 /// Returns the number of elements currently in the ***channel***.
59 pub fn len(&self) -> usize {
60 self.channel.len()
61 }
62
63 /// Returns whether the ***channel*** is empty.
64 pub fn is_empty(&self) -> bool {
65 self.channel.is_empty()
66 }
67
68 /// Returns whether the ***channel*** is full.
69 pub fn is_full(&self) -> bool {
70 self.channel.is_full()
52 } 71 }
53} 72}
54 73
@@ -124,12 +143,31 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
124 self.channel.publish_with_context(message, None) 143 self.channel.publish_with_context(message, None)
125 } 144 }
126 145
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 146 /// Returns the maximum number of elements the ***channel*** can hold.
147 pub fn capacity(&self) -> usize {
148 self.channel.capacity()
149 }
150
151 /// Returns the free capacity of the ***channel***.
128 /// 152 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 153 /// This is equivalent to `capacity() - len()`
130 /// So checking doesn't give any guarantees.* 154 pub fn free_capacity(&self) -> usize {
131 pub fn space(&self) -> usize { 155 self.channel.free_capacity()
132 self.channel.space() 156 }
157
158 /// Returns the number of elements currently in the ***channel***.
159 pub fn len(&self) -> usize {
160 self.channel.len()
161 }
162
163 /// Returns whether the ***channel*** is empty.
164 pub fn is_empty(&self) -> bool {
165 self.channel.is_empty()
166 }
167
168 /// Returns whether the ***channel*** is full.
169 pub fn is_full(&self) -> bool {
170 self.channel.is_full()
133 } 171 }
134} 172}
135 173
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index f420a75f0..2e2bd26a9 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -65,10 +65,39 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
65 } 65 }
66 } 66 }
67 67
68 /// The amount of messages this subscriber hasn't received yet 68 /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
69 /// for this subscriber.
69 pub fn available(&self) -> u64 { 70 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id) 71 self.channel.available(self.next_message_id)
71 } 72 }
73
74 /// Returns the maximum number of elements the ***channel*** can hold.
75 pub fn capacity(&self) -> usize {
76 self.channel.capacity()
77 }
78
79 /// Returns the free capacity of the ***channel***.
80 ///
81 /// This is equivalent to `capacity() - len()`
82 pub fn free_capacity(&self) -> usize {
83 self.channel.free_capacity()
84 }
85
86 /// Returns the number of elements currently in the ***channel***.
87 /// See [Self::available] for how many messages are available for this subscriber.
88 pub fn len(&self) -> usize {
89 self.channel.len()
90 }
91
92 /// Returns whether the ***channel*** is empty.
93 pub fn is_empty(&self) -> bool {
94 self.channel.is_empty()
95 }
96
97 /// Returns whether the ***channel*** is full.
98 pub fn is_full(&self) -> bool {
99 self.channel.is_full()
100 }
72} 101}
73 102
74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 103impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {