aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
authorJoel Schulz-Andres <[email protected]>2024-05-23 15:34:16 +0200
committerGitHub <[email protected]>2024-05-23 15:34:16 +0200
commit0a5820e3ed423af1788072a5416e04ab86c44c2d (patch)
tree5a2b8d3d7a88835aebec0914da234d547e3b3dbe /embassy-sync/src
parent27e8ef6e7e720a3c74f7c696ab105915695431c5 (diff)
parentded1f9d33520fc847dce8fe72f2fb80f6fa86350 (diff)
Merge branch 'embassy-rs:main' into add-miso-pullup
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/channel.rs29
-rw-r--r--embassy-sync/src/once_lock.rs6
-rw-r--r--embassy-sync/src/pipe.rs2
-rw-r--r--embassy-sync/src/priority_channel.rs54
-rw-r--r--embassy-sync/src/pubsub/mod.rs150
-rw-r--r--embassy-sync/src/pubsub/publisher.rs68
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs36
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs2
8 files changed, 299 insertions, 48 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index c4267064c..55ac5fb66 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -42,7 +42,7 @@ where
42 M: RawMutex, 42 M: RawMutex,
43{ 43{
44 fn clone(&self) -> Self { 44 fn clone(&self) -> Self {
45 Sender { channel: self.channel } 45 *self
46 } 46 }
47} 47}
48 48
@@ -81,7 +81,7 @@ pub struct DynamicSender<'ch, T> {
81 81
82impl<'ch, T> Clone for DynamicSender<'ch, T> { 82impl<'ch, T> Clone for DynamicSender<'ch, T> {
83 fn clone(&self) -> Self { 83 fn clone(&self) -> Self {
84 DynamicSender { channel: self.channel } 84 *self
85 } 85 }
86} 86}
87 87
@@ -135,7 +135,7 @@ where
135 M: RawMutex, 135 M: RawMutex,
136{ 136{
137 fn clone(&self) -> Self { 137 fn clone(&self) -> Self {
138 Receiver { channel: self.channel } 138 *self
139 } 139 }
140} 140}
141 141
@@ -188,7 +188,7 @@ pub struct DynamicReceiver<'ch, T> {
188 188
189impl<'ch, T> Clone for DynamicReceiver<'ch, T> { 189impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
190 fn clone(&self) -> Self { 190 fn clone(&self) -> Self {
191 DynamicReceiver { channel: self.channel } 191 *self
192 } 192 }
193} 193}
194 194
@@ -477,6 +477,10 @@ impl<T, const N: usize> ChannelState<T, N> {
477 } 477 }
478 } 478 }
479 479
480 fn clear(&mut self) {
481 self.queue.clear();
482 }
483
480 fn len(&self) -> usize { 484 fn len(&self) -> usize {
481 self.queue.len() 485 self.queue.len()
482 } 486 }
@@ -620,6 +624,23 @@ where
620 self.lock(|c| c.try_receive()) 624 self.lock(|c| c.try_receive())
621 } 625 }
622 626
627 /// Returns the maximum number of elements the channel can hold.
628 pub const fn capacity(&self) -> usize {
629 N
630 }
631
632 /// Returns the free capacity of the channel.
633 ///
634 /// This is equivalent to `capacity() - len()`
635 pub fn free_capacity(&self) -> usize {
636 N - self.len()
637 }
638
639 /// Clears all elements in the channel.
640 pub fn clear(&self) {
641 self.lock(|c| c.clear());
642 }
643
623 /// Returns the number of elements currently in the channel. 644 /// Returns the number of elements currently in the channel.
624 pub fn len(&self) -> usize { 645 pub fn len(&self) -> usize {
625 self.lock(|c| c.len()) 646 self.lock(|c| c.len())
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
index 9332ecfaf..55608ba32 100644
--- a/embassy-sync/src/once_lock.rs
+++ b/embassy-sync/src/once_lock.rs
@@ -1,4 +1,4 @@
1//! Syncronization primitive for initializing a value once, allowing others to await a reference to the value. 1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
2 2
3use core::cell::Cell; 3use core::cell::Cell;
4use core::future::poll_fn; 4use core::future::poll_fn;
@@ -78,7 +78,7 @@ impl<T> OnceLock<T> {
78 /// Set the underlying value. If the value is already set, this will return an error with the given value. 78 /// Set the underlying value. If the value is already set, this will return an error with the given value.
79 pub fn init(&self, value: T) -> Result<(), T> { 79 pub fn init(&self, value: T) -> Result<(), T> {
80 // Critical section is required to ensure that the value is 80 // Critical section is required to ensure that the value is
81 // not simultaniously initialized elsewhere at the same time. 81 // not simultaneously initialized elsewhere at the same time.
82 critical_section::with(|_| { 82 critical_section::with(|_| {
83 // If the value is not set, set it and return Ok. 83 // If the value is not set, set it and return Ok.
84 if !self.init.load(Ordering::Relaxed) { 84 if !self.init.load(Ordering::Relaxed) {
@@ -99,7 +99,7 @@ impl<T> OnceLock<T> {
99 F: FnOnce() -> T, 99 F: FnOnce() -> T,
100 { 100 {
101 // Critical section is required to ensure that the value is 101 // Critical section is required to ensure that the value is
102 // not simultaniously initialized elsewhere at the same time. 102 // not simultaneously initialized elsewhere at the same time.
103 critical_section::with(|_| { 103 critical_section::with(|_| {
104 // If the value is not set, set it. 104 // If the value is not set, set it.
105 if !self.init.load(Ordering::Relaxed) { 105 if !self.init.load(Ordering::Relaxed) {
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 42fe8ebd0..cd5b8ed75 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -25,7 +25,7 @@ where
25 M: RawMutex, 25 M: RawMutex,
26{ 26{
27 fn clone(&self) -> Self { 27 fn clone(&self) -> Self {
28 Writer { pipe: self.pipe } 28 *self
29 } 29 }
30} 30}
31 31
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index e77678c24..24c6c5a7f 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -33,7 +33,7 @@ where
33 M: RawMutex, 33 M: RawMutex,
34{ 34{
35 fn clone(&self) -> Self { 35 fn clone(&self) -> Self {
36 Sender { channel: self.channel } 36 *self
37 } 37 }
38} 38}
39 39
@@ -101,7 +101,7 @@ where
101 M: RawMutex, 101 M: RawMutex,
102{ 102{
103 fn clone(&self) -> Self { 103 fn clone(&self) -> Self {
104 Receiver { channel: self.channel } 104 *self
105 } 105 }
106} 106}
107 107
@@ -314,6 +314,22 @@ where
314 Poll::Pending 314 Poll::Pending
315 } 315 }
316 } 316 }
317
318 fn clear(&mut self) {
319 self.queue.clear();
320 }
321
322 fn len(&self) -> usize {
323 self.queue.len()
324 }
325
326 fn is_empty(&self) -> bool {
327 self.queue.is_empty()
328 }
329
330 fn is_full(&self) -> bool {
331 self.queue.len() == self.queue.capacity()
332 }
317} 333}
318 334
319/// A bounded channel for communicating between asynchronous tasks 335/// A bounded channel for communicating between asynchronous tasks
@@ -323,7 +339,7 @@ where
323/// buffer is full, attempts to `send` new messages will wait until a message is 339/// buffer is full, attempts to `send` new messages will wait until a message is
324/// received from the channel. 340/// received from the channel.
325/// 341///
326/// Sent data may be reordered based on their priorty within the channel. 342/// Sent data may be reordered based on their priority within the channel.
327/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] 343/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
328/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. 344/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
329pub struct PriorityChannel<M, T, K, const N: usize> 345pub struct PriorityChannel<M, T, K, const N: usize>
@@ -433,6 +449,38 @@ where
433 pub fn try_receive(&self) -> Result<T, TryReceiveError> { 449 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
434 self.lock(|c| c.try_receive()) 450 self.lock(|c| c.try_receive())
435 } 451 }
452
453 /// Returns the maximum number of elements the channel can hold.
454 pub const fn capacity(&self) -> usize {
455 N
456 }
457
458 /// Returns the free capacity of the channel.
459 ///
460 /// This is equivalent to `capacity() - len()`
461 pub fn free_capacity(&self) -> usize {
462 N - self.len()
463 }
464
465 /// Clears all elements in the channel.
466 pub fn clear(&self) {
467 self.lock(|c| c.clear());
468 }
469
470 /// Returns the number of elements currently in the channel.
471 pub fn len(&self) -> usize {
472 self.lock(|c| c.len())
473 }
474
475 /// Returns whether the channel is empty.
476 pub fn is_empty(&self) -> bool {
477 self.lock(|c| c.is_empty())
478 }
479
480 /// Returns whether the channel is full.
481 pub fn is_full(&self) -> bool {
482 self.lock(|c| c.is_full())
483 }
436} 484}
437 485
438/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the 486/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 6afd54af5..66c9b0017 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -160,9 +160,41 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { 160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self)) 161 DynImmediatePublisher(ImmediatePub::new(self))
162 } 162 }
163
164 /// Returns the maximum number of elements the channel can hold.
165 pub const fn capacity(&self) -> usize {
166 CAP
167 }
168
169 /// Returns the free capacity of the channel.
170 ///
171 /// This is equivalent to `capacity() - len()`
172 pub fn free_capacity(&self) -> usize {
173 CAP - self.len()
174 }
175
176 /// Clears all elements in the channel.
177 pub fn clear(&self) {
178 self.inner.lock(|inner| inner.borrow_mut().clear());
179 }
180
181 /// Returns the number of elements currently in the channel.
182 pub fn len(&self) -> usize {
183 self.inner.lock(|inner| inner.borrow().len())
184 }
185
186 /// Returns whether the channel is empty.
187 pub fn is_empty(&self) -> bool {
188 self.inner.lock(|inner| inner.borrow().is_empty())
189 }
190
191 /// Returns whether the channel is full.
192 pub fn is_full(&self) -> bool {
193 self.inner.lock(|inner| inner.borrow().is_full())
194 }
163} 195}
164 196
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> 197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS> 198 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{ 199{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { 200 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
@@ -221,13 +253,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
221 }) 253 })
222 } 254 }
223 255
224 fn space(&self) -> usize {
225 self.inner.lock(|s| {
226 let s = s.borrow();
227 s.queue.capacity() - s.queue.len()
228 })
229 }
230
231 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 256 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
232 self.inner.lock(|s| { 257 self.inner.lock(|s| {
233 let mut s = s.borrow_mut(); 258 let mut s = s.borrow_mut();
@@ -241,6 +266,30 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
241 s.unregister_publisher() 266 s.unregister_publisher()
242 }) 267 })
243 } 268 }
269
270 fn capacity(&self) -> usize {
271 self.capacity()
272 }
273
274 fn free_capacity(&self) -> usize {
275 self.free_capacity()
276 }
277
278 fn clear(&self) {
279 self.clear();
280 }
281
282 fn len(&self) -> usize {
283 self.len()
284 }
285
286 fn is_empty(&self) -> bool {
287 self.is_empty()
288 }
289
290 fn is_full(&self) -> bool {
291 self.is_full()
292 }
244} 293}
245 294
246/// Internal state for the PubSub channel 295/// Internal state for the PubSub channel
@@ -366,6 +415,22 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
366 fn unregister_publisher(&mut self) { 415 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1; 416 self.publisher_count -= 1;
368 } 417 }
418
419 fn clear(&mut self) {
420 self.queue.clear();
421 }
422
423 fn len(&self) -> usize {
424 self.queue.len()
425 }
426
427 fn is_empty(&self) -> bool {
428 self.queue.is_empty()
429 }
430
431 fn is_full(&self) -> bool {
432 self.queue.is_full()
433 }
369} 434}
370 435
371/// Error type for the [PubSubChannel] 436/// Error type for the [PubSubChannel]
@@ -382,10 +447,10 @@ pub enum Error {
382 447
383/// 'Middle level' behaviour of the pubsub channel. 448/// 'Middle level' behaviour of the pubsub channel.
384/// This trait is used so that Sub and Pub can be generic over the channel. 449/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> { 450trait SealedPubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id. 451 /// Try to get a message from the queue with the given message id.
387 /// 452 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 453 /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 454 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390 455
391 /// Get the amount of messages that are between the given the next_message_id and the most recent message. 456 /// Get the amount of messages that are between the given the next_message_id and the most recent message.
@@ -400,8 +465,25 @@ pub trait PubSubBehavior<T> {
400 /// Publish a message immediately 465 /// Publish a message immediately
401 fn publish_immediate(&self, message: T); 466 fn publish_immediate(&self, message: T);
402 467
403 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 468 /// Returns the maximum number of elements the channel can hold.
404 fn space(&self) -> usize; 469 fn capacity(&self) -> usize;
470
471 /// Returns the free capacity of the channel.
472 ///
473 /// This is equivalent to `capacity() - len()`
474 fn free_capacity(&self) -> usize;
475
476 /// Clears all elements in the channel.
477 fn clear(&self);
478
479 /// Returns the number of elements currently in the channel.
480 fn len(&self) -> usize;
481
482 /// Returns whether the channel is empty.
483 fn is_empty(&self) -> bool;
484
485 /// Returns whether the channel is full.
486 fn is_full(&self) -> bool;
405 487
406 /// Let the channel know that a subscriber has dropped 488 /// Let the channel know that a subscriber has dropped
407 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 489 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -410,6 +492,13 @@ pub trait PubSubBehavior<T> {
410 fn unregister_publisher(&self); 492 fn unregister_publisher(&self);
411} 493}
412 494
495/// 'Middle level' behaviour of the pubsub channel.
496/// This trait is used so that Sub and Pub can be generic over the channel.
497#[allow(private_bounds)]
498pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {}
499
500impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {}
501
413/// The result of the subscriber wait procedure 502/// The result of the subscriber wait procedure
414#[derive(Debug, Clone, PartialEq, Eq)] 503#[derive(Debug, Clone, PartialEq, Eq)]
415#[cfg_attr(feature = "defmt", derive(defmt::Format))] 504#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -542,6 +631,7 @@ mod tests {
542 assert_eq!(pub0.try_publish(0), Ok(())); 631 assert_eq!(pub0.try_publish(0), Ok(()));
543 assert_eq!(pub0.try_publish(0), Ok(())); 632 assert_eq!(pub0.try_publish(0), Ok(()));
544 assert_eq!(pub0.try_publish(0), Ok(())); 633 assert_eq!(pub0.try_publish(0), Ok(()));
634 assert!(pub0.is_full());
545 assert_eq!(pub0.try_publish(0), Err(0)); 635 assert_eq!(pub0.try_publish(0), Err(0));
546 636
547 drop(sub0); 637 drop(sub0);
@@ -574,32 +664,42 @@ mod tests {
574 } 664 }
575 665
576 #[futures_test::test] 666 #[futures_test::test]
577 async fn correct_space() { 667 async fn correct_len() {
578 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 668 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
579 669
580 let mut sub0 = channel.subscriber().unwrap(); 670 let mut sub0 = channel.subscriber().unwrap();
581 let mut sub1 = channel.subscriber().unwrap(); 671 let mut sub1 = channel.subscriber().unwrap();
582 let pub0 = channel.publisher().unwrap(); 672 let pub0 = channel.publisher().unwrap();
583 673
584 assert_eq!(pub0.space(), 4); 674 assert!(sub0.is_empty());
675 assert!(sub1.is_empty());
676 assert!(pub0.is_empty());
677 assert_eq!(pub0.free_capacity(), 4);
678 assert_eq!(pub0.len(), 0);
585 679
586 pub0.publish(42).await; 680 pub0.publish(42).await;
587 681
588 assert_eq!(pub0.space(), 3); 682 assert_eq!(pub0.free_capacity(), 3);
683 assert_eq!(pub0.len(), 1);
589 684
590 pub0.publish(42).await; 685 pub0.publish(42).await;
591 686
592 assert_eq!(pub0.space(), 2); 687 assert_eq!(pub0.free_capacity(), 2);
688 assert_eq!(pub0.len(), 2);
593 689
594 sub0.next_message().await; 690 sub0.next_message().await;
595 sub0.next_message().await; 691 sub0.next_message().await;
596 692
597 assert_eq!(pub0.space(), 2); 693 assert_eq!(pub0.free_capacity(), 2);
694 assert_eq!(pub0.len(), 2);
598 695
599 sub1.next_message().await; 696 sub1.next_message().await;
600 assert_eq!(pub0.space(), 3); 697 assert_eq!(pub0.free_capacity(), 3);
698 assert_eq!(pub0.len(), 1);
699
601 sub1.next_message().await; 700 sub1.next_message().await;
602 assert_eq!(pub0.space(), 4); 701 assert_eq!(pub0.free_capacity(), 4);
702 assert_eq!(pub0.len(), 0);
603 } 703 }
604 704
605 #[futures_test::test] 705 #[futures_test::test]
@@ -610,29 +710,29 @@ mod tests {
610 let mut sub0 = channel.subscriber().unwrap(); 710 let mut sub0 = channel.subscriber().unwrap();
611 let mut sub1 = channel.subscriber().unwrap(); 711 let mut sub1 = channel.subscriber().unwrap();
612 712
613 assert_eq!(4, pub0.space()); 713 assert_eq!(4, pub0.free_capacity());
614 714
615 pub0.publish(1).await; 715 pub0.publish(1).await;
616 pub0.publish(2).await; 716 pub0.publish(2).await;
617 717
618 assert_eq!(2, channel.space()); 718 assert_eq!(2, channel.free_capacity());
619 719
620 assert_eq!(1, sub0.try_next_message_pure().unwrap()); 720 assert_eq!(1, sub0.try_next_message_pure().unwrap());
621 assert_eq!(2, sub0.try_next_message_pure().unwrap()); 721 assert_eq!(2, sub0.try_next_message_pure().unwrap());
622 722
623 assert_eq!(2, channel.space()); 723 assert_eq!(2, channel.free_capacity());
624 724
625 drop(sub0); 725 drop(sub0);
626 726
627 assert_eq!(2, channel.space()); 727 assert_eq!(2, channel.free_capacity());
628 728
629 assert_eq!(1, sub1.try_next_message_pure().unwrap()); 729 assert_eq!(1, sub1.try_next_message_pure().unwrap());
630 730
631 assert_eq!(3, channel.space()); 731 assert_eq!(3, channel.free_capacity());
632 732
633 drop(sub1); 733 drop(sub1);
634 734
635 assert_eq!(4, channel.space()); 735 assert_eq!(4, channel.free_capacity());
636 } 736 }
637 737
638 struct CloneCallCounter(usize); 738 struct CloneCallCounter(usize);
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e1edc9eb9..e66b3b1db 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -43,12 +43,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45 45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 46 /// Returns the maximum number of elements the ***channel*** can hold.
47 pub fn capacity(&self) -> usize {
48 self.channel.capacity()
49 }
50
51 /// Returns the free capacity of the ***channel***.
47 /// 52 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 53 /// This is equivalent to `capacity() - len()`
49 /// So checking doesn't give any guarantees.* 54 pub fn free_capacity(&self) -> usize {
50 pub fn space(&self) -> usize { 55 self.channel.free_capacity()
51 self.channel.space() 56 }
57
58 /// Clears all elements in the ***channel***.
59 pub fn clear(&self) {
60 self.channel.clear();
61 }
62
63 /// Returns the number of elements currently in the ***channel***.
64 pub fn len(&self) -> usize {
65 self.channel.len()
66 }
67
68 /// Returns whether the ***channel*** is empty.
69 pub fn is_empty(&self) -> bool {
70 self.channel.is_empty()
71 }
72
73 /// Returns whether the ***channel*** is full.
74 pub fn is_full(&self) -> bool {
75 self.channel.is_full()
52 } 76 }
53} 77}
54 78
@@ -124,12 +148,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
124 self.channel.publish_with_context(message, None) 148 self.channel.publish_with_context(message, None)
125 } 149 }
126 150
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 151 /// Returns the maximum number of elements the ***channel*** can hold.
152 pub fn capacity(&self) -> usize {
153 self.channel.capacity()
154 }
155
156 /// Returns the free capacity of the ***channel***.
128 /// 157 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 158 /// This is equivalent to `capacity() - len()`
130 /// So checking doesn't give any guarantees.* 159 pub fn free_capacity(&self) -> usize {
131 pub fn space(&self) -> usize { 160 self.channel.free_capacity()
132 self.channel.space() 161 }
162
163 /// Clears all elements in the ***channel***.
164 pub fn clear(&self) {
165 self.channel.clear();
166 }
167
168 /// Returns the number of elements currently in the ***channel***.
169 pub fn len(&self) -> usize {
170 self.channel.len()
171 }
172
173 /// Returns whether the ***channel*** is empty.
174 pub fn is_empty(&self) -> bool {
175 self.channel.is_empty()
176 }
177
178 /// Returns whether the ***channel*** is full.
179 pub fn is_full(&self) -> bool {
180 self.channel.is_full()
133 } 181 }
134} 182}
135 183
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index f420a75f0..6ad660cb3 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -65,10 +65,44 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
65 } 65 }
66 } 66 }
67 67
68 /// The amount of messages this subscriber hasn't received yet 68 /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
69 /// for this subscriber.
69 pub fn available(&self) -> u64 { 70 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id) 71 self.channel.available(self.next_message_id)
71 } 72 }
73
74 /// Returns the maximum number of elements the ***channel*** can hold.
75 pub fn capacity(&self) -> usize {
76 self.channel.capacity()
77 }
78
79 /// Returns the free capacity of the ***channel***.
80 ///
81 /// This is equivalent to `capacity() - len()`
82 pub fn free_capacity(&self) -> usize {
83 self.channel.free_capacity()
84 }
85
86 /// Clears all elements in the ***channel***.
87 pub fn clear(&self) {
88 self.channel.clear();
89 }
90
91 /// Returns the number of elements currently in the ***channel***.
92 /// See [Self::available] for how many messages are available for this subscriber.
93 pub fn len(&self) -> usize {
94 self.channel.len()
95 }
96
97 /// Returns whether the ***channel*** is empty.
98 pub fn is_empty(&self) -> bool {
99 self.channel.is_empty()
100 }
101
102 /// Returns whether the ***channel*** is full.
103 pub fn is_full(&self) -> bool {
104 self.channel.is_full()
105 }
72} 106}
73 107
74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 108impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 824d192da..0e520bf40 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -14,7 +14,7 @@ impl<const N: usize> MultiWakerRegistration<N> {
14 } 14 }
15 15
16 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
17 pub fn register<'a>(&mut self, w: &'a Waker) { 17 pub fn register(&mut self, w: &Waker) {
18 // If we already have some waker that wakes the same task as `w`, do nothing. 18 // If we already have some waker that wakes the same task as `w`, do nothing.
19 // This avoids cloning wakers, and avoids unnecessary mass-wakes. 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
20 for w2 in &self.wakers { 20 for w2 in &self.wakers {