aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
authorGustav Toft <[email protected]>2024-05-30 09:56:09 +0200
committerGustav Toft <[email protected]>2024-05-30 09:56:09 +0200
commitd3c3670a966cd68b8d2d46a732ab971390ec3006 (patch)
treee0815debd51e1baa5b019049e0ea1b1a286f7742 /embassy-sync/src
parentab36329dce653a2ee20d32e9a5345799d9595202 (diff)
parent50210e8cdc95c3c8bea150541cd8f15482450b1e (diff)
Merge branch 'main' of https://github.com/embassy-rs/embassy into fix_main
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/channel.rs64
-rw-r--r--embassy-sync/src/mutex.rs209
-rw-r--r--embassy-sync/src/once_lock.rs8
-rw-r--r--embassy-sync/src/pipe.rs2
-rw-r--r--embassy-sync/src/priority_channel.rs54
-rw-r--r--embassy-sync/src/pubsub/mod.rs150
-rw-r--r--embassy-sync/src/pubsub/publisher.rs68
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs36
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs2
9 files changed, 544 insertions, 49 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 18be462cb..55ac5fb66 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -42,7 +42,7 @@ where
42 M: RawMutex, 42 M: RawMutex,
43{ 43{
44 fn clone(&self) -> Self { 44 fn clone(&self) -> Self {
45 Sender { channel: self.channel } 45 *self
46 } 46 }
47} 47}
48 48
@@ -81,7 +81,7 @@ pub struct DynamicSender<'ch, T> {
81 81
82impl<'ch, T> Clone for DynamicSender<'ch, T> { 82impl<'ch, T> Clone for DynamicSender<'ch, T> {
83 fn clone(&self) -> Self { 83 fn clone(&self) -> Self {
84 DynamicSender { channel: self.channel } 84 *self
85 } 85 }
86} 86}
87 87
@@ -135,7 +135,7 @@ where
135 M: RawMutex, 135 M: RawMutex,
136{ 136{
137 fn clone(&self) -> Self { 137 fn clone(&self) -> Self {
138 Receiver { channel: self.channel } 138 *self
139 } 139 }
140} 140}
141 141
@@ -152,6 +152,13 @@ where
152 self.channel.receive() 152 self.channel.receive()
153 } 153 }
154 154
155 /// Is a value ready to be received in the channel
156 ///
157 /// See [`Channel::ready_to_receive()`].
158 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
159 self.channel.ready_to_receive()
160 }
161
155 /// Attempt to immediately receive the next value. 162 /// Attempt to immediately receive the next value.
156 /// 163 ///
157 /// See [`Channel::try_receive()`] 164 /// See [`Channel::try_receive()`]
@@ -181,7 +188,7 @@ pub struct DynamicReceiver<'ch, T> {
181 188
182impl<'ch, T> Clone for DynamicReceiver<'ch, T> { 189impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
183 fn clone(&self) -> Self { 190 fn clone(&self) -> Self {
184 DynamicReceiver { channel: self.channel } 191 *self
185 } 192 }
186} 193}
187 194
@@ -246,6 +253,26 @@ where
246 } 253 }
247} 254}
248 255
256/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`].
257#[must_use = "futures do nothing unless you `.await` or poll them"]
258pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
259where
260 M: RawMutex,
261{
262 channel: &'ch Channel<M, T, N>,
263}
264
265impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
266where
267 M: RawMutex,
268{
269 type Output = ();
270
271 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
272 self.channel.poll_ready_to_receive(cx)
273 }
274}
275
249/// Future returned by [`DynamicReceiver::receive`]. 276/// Future returned by [`DynamicReceiver::receive`].
250#[must_use = "futures do nothing unless you `.await` or poll them"] 277#[must_use = "futures do nothing unless you `.await` or poll them"]
251pub struct DynamicReceiveFuture<'ch, T> { 278pub struct DynamicReceiveFuture<'ch, T> {
@@ -450,6 +477,10 @@ impl<T, const N: usize> ChannelState<T, N> {
450 } 477 }
451 } 478 }
452 479
480 fn clear(&mut self) {
481 self.queue.clear();
482 }
483
453 fn len(&self) -> usize { 484 fn len(&self) -> usize {
454 self.queue.len() 485 self.queue.len()
455 } 486 }
@@ -577,6 +608,14 @@ where
577 ReceiveFuture { channel: self } 608 ReceiveFuture { channel: self }
578 } 609 }
579 610
611 /// Is a value ready to be received in the channel
612 ///
613 /// If there are no messages in the channel's buffer, this method will
614 /// wait until there is at least one
615 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
616 ReceiveReadyFuture { channel: self }
617 }
618
580 /// Attempt to immediately receive a message. 619 /// Attempt to immediately receive a message.
581 /// 620 ///
582 /// This method will either receive a message from the channel immediately or return an error 621 /// This method will either receive a message from the channel immediately or return an error
@@ -585,6 +624,23 @@ where
585 self.lock(|c| c.try_receive()) 624 self.lock(|c| c.try_receive())
586 } 625 }
587 626
627 /// Returns the maximum number of elements the channel can hold.
628 pub const fn capacity(&self) -> usize {
629 N
630 }
631
632 /// Returns the free capacity of the channel.
633 ///
634 /// This is equivalent to `capacity() - len()`
635 pub fn free_capacity(&self) -> usize {
636 N - self.len()
637 }
638
639 /// Clears all elements in the channel.
640 pub fn clear(&self) {
641 self.lock(|c| c.clear());
642 }
643
588 /// Returns the number of elements currently in the channel. 644 /// Returns the number of elements currently in the channel.
589 pub fn len(&self) -> usize { 645 pub fn len(&self) -> usize {
590 self.lock(|c| c.len()) 646 self.lock(|c| c.len())
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 72459d660..8c3a3af9f 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -5,6 +5,7 @@ use core::cell::{RefCell, UnsafeCell};
5use core::future::poll_fn; 5use core::future::poll_fn;
6use core::ops::{Deref, DerefMut}; 6use core::ops::{Deref, DerefMut};
7use core::task::Poll; 7use core::task::Poll;
8use core::{fmt, mem};
8 9
9use crate::blocking_mutex::raw::RawMutex; 10use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex as BlockingMutex; 11use crate::blocking_mutex::Mutex as BlockingMutex;
@@ -128,12 +129,49 @@ where
128 } 129 }
129} 130}
130 131
132impl<M: RawMutex, T> From<T> for Mutex<M, T> {
133 fn from(from: T) -> Self {
134 Self::new(from)
135 }
136}
137
138impl<M, T> Default for Mutex<M, T>
139where
140 M: RawMutex,
141 T: ?Sized + Default,
142{
143 fn default() -> Self {
144 Self::new(Default::default())
145 }
146}
147
148impl<M, T> fmt::Debug for Mutex<M, T>
149where
150 M: RawMutex,
151 T: ?Sized + fmt::Debug,
152{
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 let mut d = f.debug_struct("Mutex");
155 match self.try_lock() {
156 Ok(value) => {
157 d.field("inner", &&*value);
158 }
159 Err(TryLockError) => {
160 d.field("inner", &format_args!("<locked>"));
161 }
162 }
163
164 d.finish_non_exhaustive()
165 }
166}
167
131/// Async mutex guard. 168/// Async mutex guard.
132/// 169///
133/// Owning an instance of this type indicates having 170/// Owning an instance of this type indicates having
134/// successfully locked the mutex, and grants access to the contents. 171/// successfully locked the mutex, and grants access to the contents.
135/// 172///
136/// Dropping it unlocks the mutex. 173/// Dropping it unlocks the mutex.
174#[clippy::has_significant_drop]
137pub struct MutexGuard<'a, M, T> 175pub struct MutexGuard<'a, M, T>
138where 176where
139 M: RawMutex, 177 M: RawMutex,
@@ -142,6 +180,25 @@ where
142 mutex: &'a Mutex<M, T>, 180 mutex: &'a Mutex<M, T>,
143} 181}
144 182
183impl<'a, M, T> MutexGuard<'a, M, T>
184where
185 M: RawMutex,
186 T: ?Sized,
187{
188 /// Returns a locked view over a portion of the locked data.
189 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
190 let mutex = this.mutex;
191 let value = fun(unsafe { &mut *this.mutex.inner.get() });
192 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
193 // locked state is being moved to the returned MappedMutexGuard.
194 mem::forget(this);
195 MappedMutexGuard {
196 state: &mutex.state,
197 value,
198 }
199 }
200}
201
145impl<'a, M, T> Drop for MutexGuard<'a, M, T> 202impl<'a, M, T> Drop for MutexGuard<'a, M, T>
146where 203where
147 M: RawMutex, 204 M: RawMutex,
@@ -180,3 +237,155 @@ where
180 unsafe { &mut *(self.mutex.inner.get()) } 237 unsafe { &mut *(self.mutex.inner.get()) }
181 } 238 }
182} 239}
240
241impl<'a, M, T> fmt::Debug for MutexGuard<'a, M, T>
242where
243 M: RawMutex,
244 T: ?Sized + fmt::Debug,
245{
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 fmt::Debug::fmt(&**self, f)
248 }
249}
250
251impl<'a, M, T> fmt::Display for MutexGuard<'a, M, T>
252where
253 M: RawMutex,
254 T: ?Sized + fmt::Display,
255{
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 fmt::Display::fmt(&**self, f)
258 }
259}
260
261/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`] or
262/// [`MappedMutexGuard::map`].
263///
264/// This can be used to hold a subfield of the protected data.
265#[clippy::has_significant_drop]
266pub struct MappedMutexGuard<'a, M, T>
267where
268 M: RawMutex,
269 T: ?Sized,
270{
271 state: &'a BlockingMutex<M, RefCell<State>>,
272 value: *mut T,
273}
274
275impl<'a, M, T> MappedMutexGuard<'a, M, T>
276where
277 M: RawMutex,
278 T: ?Sized,
279{
280 /// Returns a locked view over a portion of the locked data.
281 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
282 let state = this.state;
283 let value = fun(unsafe { &mut *this.value });
284 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
285 // locked state is being moved to the returned MappedMutexGuard.
286 mem::forget(this);
287 MappedMutexGuard { state, value }
288 }
289}
290
291impl<'a, M, T> Deref for MappedMutexGuard<'a, M, T>
292where
293 M: RawMutex,
294 T: ?Sized,
295{
296 type Target = T;
297 fn deref(&self) -> &Self::Target {
298 // Safety: the MutexGuard represents exclusive access to the contents
299 // of the mutex, so it's OK to get it.
300 unsafe { &*self.value }
301 }
302}
303
304impl<'a, M, T> DerefMut for MappedMutexGuard<'a, M, T>
305where
306 M: RawMutex,
307 T: ?Sized,
308{
309 fn deref_mut(&mut self) -> &mut Self::Target {
310 // Safety: the MutexGuard represents exclusive access to the contents
311 // of the mutex, so it's OK to get it.
312 unsafe { &mut *self.value }
313 }
314}
315
316impl<'a, M, T> Drop for MappedMutexGuard<'a, M, T>
317where
318 M: RawMutex,
319 T: ?Sized,
320{
321 fn drop(&mut self) {
322 self.state.lock(|s| {
323 let mut s = unwrap!(s.try_borrow_mut());
324 s.locked = false;
325 s.waker.wake();
326 })
327 }
328}
329
330unsafe impl<M, T> Send for MappedMutexGuard<'_, M, T>
331where
332 M: RawMutex + Sync,
333 T: Send + ?Sized,
334{
335}
336
337unsafe impl<M, T> Sync for MappedMutexGuard<'_, M, T>
338where
339 M: RawMutex + Sync,
340 T: Sync + ?Sized,
341{
342}
343
344impl<'a, M, T> fmt::Debug for MappedMutexGuard<'a, M, T>
345where
346 M: RawMutex,
347 T: ?Sized + fmt::Debug,
348{
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 fmt::Debug::fmt(&**self, f)
351 }
352}
353
354impl<'a, M, T> fmt::Display for MappedMutexGuard<'a, M, T>
355where
356 M: RawMutex,
357 T: ?Sized + fmt::Display,
358{
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 fmt::Display::fmt(&**self, f)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use crate::blocking_mutex::raw::NoopRawMutex;
367 use crate::mutex::{Mutex, MutexGuard};
368
369 #[futures_test::test]
370 async fn mapped_guard_releases_lock_when_dropped() {
371 let mutex: Mutex<NoopRawMutex, [i32; 2]> = Mutex::new([0, 1]);
372
373 {
374 let guard = mutex.lock().await;
375 assert_eq!(*guard, [0, 1]);
376 let mut mapped = MutexGuard::map(guard, |this| &mut this[1]);
377 assert_eq!(*mapped, 1);
378 *mapped = 2;
379 }
380
381 {
382 let guard = mutex.lock().await;
383 assert_eq!(*guard, [0, 2]);
384 let mut mapped = MutexGuard::map(guard, |this| &mut this[1]);
385 assert_eq!(*mapped, 2);
386 *mapped = 3;
387 }
388
389 assert_eq!(*mutex.lock().await, [0, 3]);
390 }
391}
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
index 31cc99711..55608ba32 100644
--- a/embassy-sync/src/once_lock.rs
+++ b/embassy-sync/src/once_lock.rs
@@ -1,4 +1,4 @@
1//! Syncronization primitive for initializing a value once, allowing others to await a reference to the value. 1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
2 2
3use core::cell::Cell; 3use core::cell::Cell;
4use core::future::poll_fn; 4use core::future::poll_fn;
@@ -13,7 +13,7 @@ use core::task::Poll;
13/// 13///
14/// **Note**: this implementation uses a busy loop to poll the value, 14/// **Note**: this implementation uses a busy loop to poll the value,
15/// which is not as efficient as registering a dedicated `Waker`. 15/// which is not as efficient as registering a dedicated `Waker`.
16/// However, the if the usecase for is to initialize a static variable 16/// However, if the usecase for it is to initialize a static variable
17/// relatively early in the program life cycle, it should be fine. 17/// relatively early in the program life cycle, it should be fine.
18/// 18///
19/// # Example 19/// # Example
@@ -78,7 +78,7 @@ impl<T> OnceLock<T> {
78 /// Set the underlying value. If the value is already set, this will return an error with the given value. 78 /// Set the underlying value. If the value is already set, this will return an error with the given value.
79 pub fn init(&self, value: T) -> Result<(), T> { 79 pub fn init(&self, value: T) -> Result<(), T> {
80 // Critical section is required to ensure that the value is 80 // Critical section is required to ensure that the value is
81 // not simultaniously initialized elsewhere at the same time. 81 // not simultaneously initialized elsewhere at the same time.
82 critical_section::with(|_| { 82 critical_section::with(|_| {
83 // If the value is not set, set it and return Ok. 83 // If the value is not set, set it and return Ok.
84 if !self.init.load(Ordering::Relaxed) { 84 if !self.init.load(Ordering::Relaxed) {
@@ -99,7 +99,7 @@ impl<T> OnceLock<T> {
99 F: FnOnce() -> T, 99 F: FnOnce() -> T,
100 { 100 {
101 // Critical section is required to ensure that the value is 101 // Critical section is required to ensure that the value is
102 // not simultaniously initialized elsewhere at the same time. 102 // not simultaneously initialized elsewhere at the same time.
103 critical_section::with(|_| { 103 critical_section::with(|_| {
104 // If the value is not set, set it. 104 // If the value is not set, set it.
105 if !self.init.load(Ordering::Relaxed) { 105 if !self.init.load(Ordering::Relaxed) {
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 42fe8ebd0..cd5b8ed75 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -25,7 +25,7 @@ where
25 M: RawMutex, 25 M: RawMutex,
26{ 26{
27 fn clone(&self) -> Self { 27 fn clone(&self) -> Self {
28 Writer { pipe: self.pipe } 28 *self
29 } 29 }
30} 30}
31 31
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index e77678c24..24c6c5a7f 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -33,7 +33,7 @@ where
33 M: RawMutex, 33 M: RawMutex,
34{ 34{
35 fn clone(&self) -> Self { 35 fn clone(&self) -> Self {
36 Sender { channel: self.channel } 36 *self
37 } 37 }
38} 38}
39 39
@@ -101,7 +101,7 @@ where
101 M: RawMutex, 101 M: RawMutex,
102{ 102{
103 fn clone(&self) -> Self { 103 fn clone(&self) -> Self {
104 Receiver { channel: self.channel } 104 *self
105 } 105 }
106} 106}
107 107
@@ -314,6 +314,22 @@ where
314 Poll::Pending 314 Poll::Pending
315 } 315 }
316 } 316 }
317
318 fn clear(&mut self) {
319 self.queue.clear();
320 }
321
322 fn len(&self) -> usize {
323 self.queue.len()
324 }
325
326 fn is_empty(&self) -> bool {
327 self.queue.is_empty()
328 }
329
330 fn is_full(&self) -> bool {
331 self.queue.len() == self.queue.capacity()
332 }
317} 333}
318 334
319/// A bounded channel for communicating between asynchronous tasks 335/// A bounded channel for communicating between asynchronous tasks
@@ -323,7 +339,7 @@ where
323/// buffer is full, attempts to `send` new messages will wait until a message is 339/// buffer is full, attempts to `send` new messages will wait until a message is
324/// received from the channel. 340/// received from the channel.
325/// 341///
326/// Sent data may be reordered based on their priorty within the channel. 342/// Sent data may be reordered based on their priority within the channel.
327/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] 343/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
328/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. 344/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
329pub struct PriorityChannel<M, T, K, const N: usize> 345pub struct PriorityChannel<M, T, K, const N: usize>
@@ -433,6 +449,38 @@ where
433 pub fn try_receive(&self) -> Result<T, TryReceiveError> { 449 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
434 self.lock(|c| c.try_receive()) 450 self.lock(|c| c.try_receive())
435 } 451 }
452
453 /// Returns the maximum number of elements the channel can hold.
454 pub const fn capacity(&self) -> usize {
455 N
456 }
457
458 /// Returns the free capacity of the channel.
459 ///
460 /// This is equivalent to `capacity() - len()`
461 pub fn free_capacity(&self) -> usize {
462 N - self.len()
463 }
464
465 /// Clears all elements in the channel.
466 pub fn clear(&self) {
467 self.lock(|c| c.clear());
468 }
469
470 /// Returns the number of elements currently in the channel.
471 pub fn len(&self) -> usize {
472 self.lock(|c| c.len())
473 }
474
475 /// Returns whether the channel is empty.
476 pub fn is_empty(&self) -> bool {
477 self.lock(|c| c.is_empty())
478 }
479
480 /// Returns whether the channel is full.
481 pub fn is_full(&self) -> bool {
482 self.lock(|c| c.is_full())
483 }
436} 484}
437 485
438/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the 486/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 6afd54af5..66c9b0017 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -160,9 +160,41 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { 160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self)) 161 DynImmediatePublisher(ImmediatePub::new(self))
162 } 162 }
163
164 /// Returns the maximum number of elements the channel can hold.
165 pub const fn capacity(&self) -> usize {
166 CAP
167 }
168
169 /// Returns the free capacity of the channel.
170 ///
171 /// This is equivalent to `capacity() - len()`
172 pub fn free_capacity(&self) -> usize {
173 CAP - self.len()
174 }
175
176 /// Clears all elements in the channel.
177 pub fn clear(&self) {
178 self.inner.lock(|inner| inner.borrow_mut().clear());
179 }
180
181 /// Returns the number of elements currently in the channel.
182 pub fn len(&self) -> usize {
183 self.inner.lock(|inner| inner.borrow().len())
184 }
185
186 /// Returns whether the channel is empty.
187 pub fn is_empty(&self) -> bool {
188 self.inner.lock(|inner| inner.borrow().is_empty())
189 }
190
191 /// Returns whether the channel is full.
192 pub fn is_full(&self) -> bool {
193 self.inner.lock(|inner| inner.borrow().is_full())
194 }
163} 195}
164 196
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> 197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS> 198 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{ 199{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { 200 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
@@ -221,13 +253,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
221 }) 253 })
222 } 254 }
223 255
224 fn space(&self) -> usize {
225 self.inner.lock(|s| {
226 let s = s.borrow();
227 s.queue.capacity() - s.queue.len()
228 })
229 }
230
231 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 256 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
232 self.inner.lock(|s| { 257 self.inner.lock(|s| {
233 let mut s = s.borrow_mut(); 258 let mut s = s.borrow_mut();
@@ -241,6 +266,30 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
241 s.unregister_publisher() 266 s.unregister_publisher()
242 }) 267 })
243 } 268 }
269
270 fn capacity(&self) -> usize {
271 self.capacity()
272 }
273
274 fn free_capacity(&self) -> usize {
275 self.free_capacity()
276 }
277
278 fn clear(&self) {
279 self.clear();
280 }
281
282 fn len(&self) -> usize {
283 self.len()
284 }
285
286 fn is_empty(&self) -> bool {
287 self.is_empty()
288 }
289
290 fn is_full(&self) -> bool {
291 self.is_full()
292 }
244} 293}
245 294
246/// Internal state for the PubSub channel 295/// Internal state for the PubSub channel
@@ -366,6 +415,22 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
366 fn unregister_publisher(&mut self) { 415 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1; 416 self.publisher_count -= 1;
368 } 417 }
418
419 fn clear(&mut self) {
420 self.queue.clear();
421 }
422
423 fn len(&self) -> usize {
424 self.queue.len()
425 }
426
427 fn is_empty(&self) -> bool {
428 self.queue.is_empty()
429 }
430
431 fn is_full(&self) -> bool {
432 self.queue.is_full()
433 }
369} 434}
370 435
371/// Error type for the [PubSubChannel] 436/// Error type for the [PubSubChannel]
@@ -382,10 +447,10 @@ pub enum Error {
382 447
383/// 'Middle level' behaviour of the pubsub channel. 448/// 'Middle level' behaviour of the pubsub channel.
384/// This trait is used so that Sub and Pub can be generic over the channel. 449/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> { 450trait SealedPubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id. 451 /// Try to get a message from the queue with the given message id.
387 /// 452 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 453 /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 454 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390 455
391 /// Get the amount of messages that are between the given the next_message_id and the most recent message. 456 /// Get the amount of messages that are between the given the next_message_id and the most recent message.
@@ -400,8 +465,25 @@ pub trait PubSubBehavior<T> {
400 /// Publish a message immediately 465 /// Publish a message immediately
401 fn publish_immediate(&self, message: T); 466 fn publish_immediate(&self, message: T);
402 467
403 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 468 /// Returns the maximum number of elements the channel can hold.
404 fn space(&self) -> usize; 469 fn capacity(&self) -> usize;
470
471 /// Returns the free capacity of the channel.
472 ///
473 /// This is equivalent to `capacity() - len()`
474 fn free_capacity(&self) -> usize;
475
476 /// Clears all elements in the channel.
477 fn clear(&self);
478
479 /// Returns the number of elements currently in the channel.
480 fn len(&self) -> usize;
481
482 /// Returns whether the channel is empty.
483 fn is_empty(&self) -> bool;
484
485 /// Returns whether the channel is full.
486 fn is_full(&self) -> bool;
405 487
406 /// Let the channel know that a subscriber has dropped 488 /// Let the channel know that a subscriber has dropped
407 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 489 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -410,6 +492,13 @@ pub trait PubSubBehavior<T> {
410 fn unregister_publisher(&self); 492 fn unregister_publisher(&self);
411} 493}
412 494
495/// 'Middle level' behaviour of the pubsub channel.
496/// This trait is used so that Sub and Pub can be generic over the channel.
497#[allow(private_bounds)]
498pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {}
499
500impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {}
501
413/// The result of the subscriber wait procedure 502/// The result of the subscriber wait procedure
414#[derive(Debug, Clone, PartialEq, Eq)] 503#[derive(Debug, Clone, PartialEq, Eq)]
415#[cfg_attr(feature = "defmt", derive(defmt::Format))] 504#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -542,6 +631,7 @@ mod tests {
542 assert_eq!(pub0.try_publish(0), Ok(())); 631 assert_eq!(pub0.try_publish(0), Ok(()));
543 assert_eq!(pub0.try_publish(0), Ok(())); 632 assert_eq!(pub0.try_publish(0), Ok(()));
544 assert_eq!(pub0.try_publish(0), Ok(())); 633 assert_eq!(pub0.try_publish(0), Ok(()));
634 assert!(pub0.is_full());
545 assert_eq!(pub0.try_publish(0), Err(0)); 635 assert_eq!(pub0.try_publish(0), Err(0));
546 636
547 drop(sub0); 637 drop(sub0);
@@ -574,32 +664,42 @@ mod tests {
574 } 664 }
575 665
576 #[futures_test::test] 666 #[futures_test::test]
577 async fn correct_space() { 667 async fn correct_len() {
578 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 668 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
579 669
580 let mut sub0 = channel.subscriber().unwrap(); 670 let mut sub0 = channel.subscriber().unwrap();
581 let mut sub1 = channel.subscriber().unwrap(); 671 let mut sub1 = channel.subscriber().unwrap();
582 let pub0 = channel.publisher().unwrap(); 672 let pub0 = channel.publisher().unwrap();
583 673
584 assert_eq!(pub0.space(), 4); 674 assert!(sub0.is_empty());
675 assert!(sub1.is_empty());
676 assert!(pub0.is_empty());
677 assert_eq!(pub0.free_capacity(), 4);
678 assert_eq!(pub0.len(), 0);
585 679
586 pub0.publish(42).await; 680 pub0.publish(42).await;
587 681
588 assert_eq!(pub0.space(), 3); 682 assert_eq!(pub0.free_capacity(), 3);
683 assert_eq!(pub0.len(), 1);
589 684
590 pub0.publish(42).await; 685 pub0.publish(42).await;
591 686
592 assert_eq!(pub0.space(), 2); 687 assert_eq!(pub0.free_capacity(), 2);
688 assert_eq!(pub0.len(), 2);
593 689
594 sub0.next_message().await; 690 sub0.next_message().await;
595 sub0.next_message().await; 691 sub0.next_message().await;
596 692
597 assert_eq!(pub0.space(), 2); 693 assert_eq!(pub0.free_capacity(), 2);
694 assert_eq!(pub0.len(), 2);
598 695
599 sub1.next_message().await; 696 sub1.next_message().await;
600 assert_eq!(pub0.space(), 3); 697 assert_eq!(pub0.free_capacity(), 3);
698 assert_eq!(pub0.len(), 1);
699
601 sub1.next_message().await; 700 sub1.next_message().await;
602 assert_eq!(pub0.space(), 4); 701 assert_eq!(pub0.free_capacity(), 4);
702 assert_eq!(pub0.len(), 0);
603 } 703 }
604 704
605 #[futures_test::test] 705 #[futures_test::test]
@@ -610,29 +710,29 @@ mod tests {
610 let mut sub0 = channel.subscriber().unwrap(); 710 let mut sub0 = channel.subscriber().unwrap();
611 let mut sub1 = channel.subscriber().unwrap(); 711 let mut sub1 = channel.subscriber().unwrap();
612 712
613 assert_eq!(4, pub0.space()); 713 assert_eq!(4, pub0.free_capacity());
614 714
615 pub0.publish(1).await; 715 pub0.publish(1).await;
616 pub0.publish(2).await; 716 pub0.publish(2).await;
617 717
618 assert_eq!(2, channel.space()); 718 assert_eq!(2, channel.free_capacity());
619 719
620 assert_eq!(1, sub0.try_next_message_pure().unwrap()); 720 assert_eq!(1, sub0.try_next_message_pure().unwrap());
621 assert_eq!(2, sub0.try_next_message_pure().unwrap()); 721 assert_eq!(2, sub0.try_next_message_pure().unwrap());
622 722
623 assert_eq!(2, channel.space()); 723 assert_eq!(2, channel.free_capacity());
624 724
625 drop(sub0); 725 drop(sub0);
626 726
627 assert_eq!(2, channel.space()); 727 assert_eq!(2, channel.free_capacity());
628 728
629 assert_eq!(1, sub1.try_next_message_pure().unwrap()); 729 assert_eq!(1, sub1.try_next_message_pure().unwrap());
630 730
631 assert_eq!(3, channel.space()); 731 assert_eq!(3, channel.free_capacity());
632 732
633 drop(sub1); 733 drop(sub1);
634 734
635 assert_eq!(4, channel.space()); 735 assert_eq!(4, channel.free_capacity());
636 } 736 }
637 737
638 struct CloneCallCounter(usize); 738 struct CloneCallCounter(usize);
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e1edc9eb9..e66b3b1db 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -43,12 +43,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45 45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 46 /// Returns the maximum number of elements the ***channel*** can hold.
47 pub fn capacity(&self) -> usize {
48 self.channel.capacity()
49 }
50
51 /// Returns the free capacity of the ***channel***.
47 /// 52 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 53 /// This is equivalent to `capacity() - len()`
49 /// So checking doesn't give any guarantees.* 54 pub fn free_capacity(&self) -> usize {
50 pub fn space(&self) -> usize { 55 self.channel.free_capacity()
51 self.channel.space() 56 }
57
58 /// Clears all elements in the ***channel***.
59 pub fn clear(&self) {
60 self.channel.clear();
61 }
62
63 /// Returns the number of elements currently in the ***channel***.
64 pub fn len(&self) -> usize {
65 self.channel.len()
66 }
67
68 /// Returns whether the ***channel*** is empty.
69 pub fn is_empty(&self) -> bool {
70 self.channel.is_empty()
71 }
72
73 /// Returns whether the ***channel*** is full.
74 pub fn is_full(&self) -> bool {
75 self.channel.is_full()
52 } 76 }
53} 77}
54 78
@@ -124,12 +148,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
124 self.channel.publish_with_context(message, None) 148 self.channel.publish_with_context(message, None)
125 } 149 }
126 150
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 151 /// Returns the maximum number of elements the ***channel*** can hold.
152 pub fn capacity(&self) -> usize {
153 self.channel.capacity()
154 }
155
156 /// Returns the free capacity of the ***channel***.
128 /// 157 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 158 /// This is equivalent to `capacity() - len()`
130 /// So checking doesn't give any guarantees.* 159 pub fn free_capacity(&self) -> usize {
131 pub fn space(&self) -> usize { 160 self.channel.free_capacity()
132 self.channel.space() 161 }
162
163 /// Clears all elements in the ***channel***.
164 pub fn clear(&self) {
165 self.channel.clear();
166 }
167
168 /// Returns the number of elements currently in the ***channel***.
169 pub fn len(&self) -> usize {
170 self.channel.len()
171 }
172
173 /// Returns whether the ***channel*** is empty.
174 pub fn is_empty(&self) -> bool {
175 self.channel.is_empty()
176 }
177
178 /// Returns whether the ***channel*** is full.
179 pub fn is_full(&self) -> bool {
180 self.channel.is_full()
133 } 181 }
134} 182}
135 183
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index f420a75f0..6ad660cb3 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -65,10 +65,44 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
65 } 65 }
66 } 66 }
67 67
68 /// The amount of messages this subscriber hasn't received yet 68 /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
69 /// for this subscriber.
69 pub fn available(&self) -> u64 { 70 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id) 71 self.channel.available(self.next_message_id)
71 } 72 }
73
74 /// Returns the maximum number of elements the ***channel*** can hold.
75 pub fn capacity(&self) -> usize {
76 self.channel.capacity()
77 }
78
79 /// Returns the free capacity of the ***channel***.
80 ///
81 /// This is equivalent to `capacity() - len()`
82 pub fn free_capacity(&self) -> usize {
83 self.channel.free_capacity()
84 }
85
86 /// Clears all elements in the ***channel***.
87 pub fn clear(&self) {
88 self.channel.clear();
89 }
90
91 /// Returns the number of elements currently in the ***channel***.
92 /// See [Self::available] for how many messages are available for this subscriber.
93 pub fn len(&self) -> usize {
94 self.channel.len()
95 }
96
97 /// Returns whether the ***channel*** is empty.
98 pub fn is_empty(&self) -> bool {
99 self.channel.is_empty()
100 }
101
102 /// Returns whether the ***channel*** is full.
103 pub fn is_full(&self) -> bool {
104 self.channel.is_full()
105 }
72} 106}
73 107
74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 108impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 824d192da..0e520bf40 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -14,7 +14,7 @@ impl<const N: usize> MultiWakerRegistration<N> {
14 } 14 }
15 15
16 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
17 pub fn register<'a>(&mut self, w: &'a Waker) { 17 pub fn register(&mut self, w: &Waker) {
18 // If we already have some waker that wakes the same task as `w`, do nothing. 18 // If we already have some waker that wakes the same task as `w`, do nothing.
19 // This avoids cloning wakers, and avoids unnecessary mass-wakes. 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
20 for w2 in &self.wakers { 20 for w2 in &self.wakers {