aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/blocking_mutex/mod.rs23
-rw-r--r--embassy-sync/src/blocking_mutex/raw.rs6
-rw-r--r--embassy-sync/src/channel.rs360
-rw-r--r--embassy-sync/src/fmt.rs31
-rw-r--r--embassy-sync/src/lazy_lock.rs174
-rw-r--r--embassy-sync/src/lib.rs4
-rw-r--r--embassy-sync/src/mutex.rs17
-rw-r--r--embassy-sync/src/once_lock.rs17
-rw-r--r--embassy-sync/src/pipe.rs285
-rw-r--r--embassy-sync/src/priority_channel.rs218
-rw-r--r--embassy-sync/src/pubsub/mod.rs101
-rw-r--r--embassy-sync/src/pubsub/publisher.rs73
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs5
-rw-r--r--embassy-sync/src/ring_buffer.rs7
-rw-r--r--embassy-sync/src/rwlock.rs386
-rw-r--r--embassy-sync/src/semaphore.rs10
-rw-r--r--embassy-sync/src/signal.rs12
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker.rs45
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker_turbo.rs4
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs7
-rw-r--r--embassy-sync/src/waitqueue/waker_registration.rs4
-rw-r--r--embassy-sync/src/watch.rs1127
-rw-r--r--embassy-sync/src/zerocopy_channel.rs143
23 files changed, 2924 insertions, 135 deletions
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs
index 8a4a4c642..62bfc26fb 100644
--- a/embassy-sync/src/blocking_mutex/mod.rs
+++ b/embassy-sync/src/blocking_mutex/mod.rs
@@ -22,6 +22,7 @@ use self::raw::RawMutex;
22/// 22///
23/// In all cases, the blocking mutex is intended to be short lived and not held across await points. 23/// In all cases, the blocking mutex is intended to be short lived and not held across await points.
24/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. 24/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points.
25#[derive(Debug)]
25pub struct Mutex<R, T: ?Sized> { 26pub struct Mutex<R, T: ?Sized> {
26 // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets 27 // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets
27 // to run BEFORE dropping `data`. 28 // to run BEFORE dropping `data`.
@@ -50,6 +51,23 @@ impl<R: RawMutex, T> Mutex<R, T> {
50 f(inner) 51 f(inner)
51 }) 52 })
52 } 53 }
54
55 /// Creates a critical section and grants temporary mutable access to the protected data.
56 ///
57 /// # Safety
58 ///
59 /// This method is marked unsafe because calling this method re-entrantly, i.e. within
60 /// another `lock_mut` or `lock` closure, violates Rust's aliasing rules. Calling this
61 /// method at the same time from different tasks is safe. For a safe alternative with
62 /// mutable access that never causes UB, use a `RefCell` in a `Mutex`.
63 pub unsafe fn lock_mut<U>(&self, f: impl FnOnce(&mut T) -> U) -> U {
64 self.raw.lock(|| {
65 let ptr = self.data.get() as *mut T;
66 // Safety: we have exclusive access to the data, as long as this mutex is not locked re-entrantly
67 let inner = unsafe { &mut *ptr };
68 f(inner)
69 })
70 }
53} 71}
54 72
55impl<R, T> Mutex<R, T> { 73impl<R, T> Mutex<R, T> {
@@ -104,6 +122,7 @@ impl<T> Mutex<raw::CriticalSectionRawMutex, T> {
104 122
105impl<T> Mutex<raw::NoopRawMutex, T> { 123impl<T> Mutex<raw::NoopRawMutex, T> {
106 /// Borrows the data 124 /// Borrows the data
125 #[allow(clippy::should_implement_trait)]
107 pub fn borrow(&self) -> &T { 126 pub fn borrow(&self) -> &T {
108 let ptr = self.data.get() as *const T; 127 let ptr = self.data.get() as *const T;
109 unsafe { &*ptr } 128 unsafe { &*ptr }
@@ -116,9 +135,9 @@ impl<T> Mutex<raw::NoopRawMutex, T> {
116// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), 135// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example),
117// but that will require T: Send even though it shouldn't be needed. 136// but that will require T: Send even though it shouldn't be needed.
118 137
119#[cfg(any(cortex_m, feature = "std"))] 138#[cfg(any(cortex_m, doc, feature = "std"))]
120pub use thread_mode_mutex::*; 139pub use thread_mode_mutex::*;
121#[cfg(any(cortex_m, feature = "std"))] 140#[cfg(any(cortex_m, doc, feature = "std"))]
122mod thread_mode_mutex { 141mod thread_mode_mutex {
123 use super::*; 142 use super::*;
124 143
diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs
index a8afcad34..fbb9ece15 100644
--- a/embassy-sync/src/blocking_mutex/raw.rs
+++ b/embassy-sync/src/blocking_mutex/raw.rs
@@ -37,6 +37,7 @@ pub unsafe trait RawMutex {
37/// # Safety 37/// # Safety
38/// 38///
39/// This mutex is safe to share between different executors and interrupts. 39/// This mutex is safe to share between different executors and interrupts.
40#[derive(Debug)]
40pub struct CriticalSectionRawMutex { 41pub struct CriticalSectionRawMutex {
41 _phantom: PhantomData<()>, 42 _phantom: PhantomData<()>,
42} 43}
@@ -65,6 +66,7 @@ unsafe impl RawMutex for CriticalSectionRawMutex {
65/// # Safety 66/// # Safety
66/// 67///
67/// **This Mutex is only safe within a single executor.** 68/// **This Mutex is only safe within a single executor.**
69#[derive(Debug)]
68pub struct NoopRawMutex { 70pub struct NoopRawMutex {
69 _phantom: PhantomData<*mut ()>, 71 _phantom: PhantomData<*mut ()>,
70} 72}
@@ -87,7 +89,7 @@ unsafe impl RawMutex for NoopRawMutex {
87 89
88// ================ 90// ================
89 91
90#[cfg(any(cortex_m, feature = "std"))] 92#[cfg(any(cortex_m, doc, feature = "std"))]
91mod thread_mode { 93mod thread_mode {
92 use super::*; 94 use super::*;
93 95
@@ -145,5 +147,5 @@ mod thread_mode {
145 return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; 147 return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0;
146 } 148 }
147} 149}
148#[cfg(any(cortex_m, feature = "std"))] 150#[cfg(any(cortex_m, doc, feature = "std"))]
149pub use thread_mode::*; 151pub use thread_mode::*;
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 55ac5fb66..dbd24a6c7 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -17,6 +17,31 @@
17//! messages that it can store, and if this limit is reached, trying to send 17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned. 18//! another message will result in an error being returned.
19//! 19//!
20//! # Example: Message passing between task and interrupt handler
21//!
22//! ```rust
23//! use embassy_sync::channel::Channel;
24//! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
25//!
26//! static SHARED_CHANNEL: Channel<CriticalSectionRawMutex, u32, 8> = Channel::new();
27//!
28//! fn my_interrupt_handler() {
29//! // Do some work..
30//! // ...
31//! if let Err(e) = SHARED_CHANNEL.sender().try_send(42) {
32//! // Channel is full..
33//! }
34//! }
35//!
36//! async fn my_async_task() {
37//! // ...
38//! let receiver = SHARED_CHANNEL.receiver();
39//! loop {
40//! let data_from_interrupt = receiver.receive().await;
41//! // Do something with the data.
42//! }
43//! }
44//! ```
20 45
21use core::cell::RefCell; 46use core::cell::RefCell;
22use core::future::Future; 47use core::future::Future;
@@ -25,11 +50,12 @@ use core::task::{Context, Poll};
25 50
26use heapless::Deque; 51use heapless::Deque;
27 52
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex; 53use crate::blocking_mutex::Mutex;
54use crate::blocking_mutex::raw::RawMutex;
30use crate::waitqueue::WakerRegistration; 55use crate::waitqueue::WakerRegistration;
31 56
32/// Send-only access to a [`Channel`]. 57/// Send-only access to a [`Channel`].
58#[derive(Debug)]
33pub struct Sender<'ch, M, T, const N: usize> 59pub struct Sender<'ch, M, T, const N: usize>
34where 60where
35 M: RawMutex, 61 M: RawMutex,
@@ -72,6 +98,48 @@ where
72 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 98 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73 self.channel.poll_ready_to_send(cx) 99 self.channel.poll_ready_to_send(cx)
74 } 100 }
101
102 /// Returns the maximum number of elements the channel can hold.
103 ///
104 /// See [`Channel::capacity()`]
105 pub const fn capacity(&self) -> usize {
106 self.channel.capacity()
107 }
108
109 /// Returns the free capacity of the channel.
110 ///
111 /// See [`Channel::free_capacity()`]
112 pub fn free_capacity(&self) -> usize {
113 self.channel.free_capacity()
114 }
115
116 /// Clears all elements in the channel.
117 ///
118 /// See [`Channel::clear()`]
119 pub fn clear(&self) {
120 self.channel.clear();
121 }
122
123 /// Returns the number of elements currently in the channel.
124 ///
125 /// See [`Channel::len()`]
126 pub fn len(&self) -> usize {
127 self.channel.len()
128 }
129
130 /// Returns whether the channel is empty.
131 ///
132 /// See [`Channel::is_empty()`]
133 pub fn is_empty(&self) -> bool {
134 self.channel.is_empty()
135 }
136
137 /// Returns whether the channel is full.
138 ///
139 /// See [`Channel::is_full()`]
140 pub fn is_full(&self) -> bool {
141 self.channel.is_full()
142 }
75} 143}
76 144
77/// Send-only access to a [`Channel`] without knowing channel size. 145/// Send-only access to a [`Channel`] without knowing channel size.
@@ -122,7 +190,59 @@ impl<'ch, T> DynamicSender<'ch, T> {
122 } 190 }
123} 191}
124 192
193/// Send-only access to a [`Channel`] without knowing channel size.
194/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
195pub struct SendDynamicSender<'ch, T> {
196 pub(crate) channel: &'ch dyn DynamicChannel<T>,
197}
198
199impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
200 fn clone(&self) -> Self {
201 *self
202 }
203}
204
205impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
206unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
207unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
208
209impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
210where
211 M: RawMutex + Sync + Send,
212{
213 fn from(s: Sender<'ch, M, T, N>) -> Self {
214 Self { channel: s.channel }
215 }
216}
217
218impl<'ch, T> SendDynamicSender<'ch, T> {
219 /// Sends a value.
220 ///
221 /// See [`Channel::send()`]
222 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
223 DynamicSendFuture {
224 channel: self.channel,
225 message: Some(message),
226 }
227 }
228
229 /// Attempt to immediately send a message.
230 ///
231 /// See [`Channel::send()`]
232 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
233 self.channel.try_send_with_context(message, None)
234 }
235
236 /// Allows a poll_fn to poll until the channel is ready to send
237 ///
238 /// See [`Channel::poll_ready_to_send()`]
239 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
240 self.channel.poll_ready_to_send(cx)
241 }
242}
243
125/// Receive-only access to a [`Channel`]. 244/// Receive-only access to a [`Channel`].
245#[derive(Debug)]
126pub struct Receiver<'ch, M, T, const N: usize> 246pub struct Receiver<'ch, M, T, const N: usize>
127where 247where
128 M: RawMutex, 248 M: RawMutex,
@@ -166,6 +286,16 @@ where
166 self.channel.try_receive() 286 self.channel.try_receive()
167 } 287 }
168 288
289 /// Peek at the next value without removing it from the queue.
290 ///
291 /// See [`Channel::try_peek()`]
292 pub fn try_peek(&self) -> Result<T, TryReceiveError>
293 where
294 T: Clone,
295 {
296 self.channel.try_peek()
297 }
298
169 /// Allows a poll_fn to poll until the channel is ready to receive 299 /// Allows a poll_fn to poll until the channel is ready to receive
170 /// 300 ///
171 /// See [`Channel::poll_ready_to_receive()`] 301 /// See [`Channel::poll_ready_to_receive()`]
@@ -179,6 +309,48 @@ where
179 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 309 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
180 self.channel.poll_receive(cx) 310 self.channel.poll_receive(cx)
181 } 311 }
312
313 /// Returns the maximum number of elements the channel can hold.
314 ///
315 /// See [`Channel::capacity()`]
316 pub const fn capacity(&self) -> usize {
317 self.channel.capacity()
318 }
319
320 /// Returns the free capacity of the channel.
321 ///
322 /// See [`Channel::free_capacity()`]
323 pub fn free_capacity(&self) -> usize {
324 self.channel.free_capacity()
325 }
326
327 /// Clears all elements in the channel.
328 ///
329 /// See [`Channel::clear()`]
330 pub fn clear(&self) {
331 self.channel.clear();
332 }
333
334 /// Returns the number of elements currently in the channel.
335 ///
336 /// See [`Channel::len()`]
337 pub fn len(&self) -> usize {
338 self.channel.len()
339 }
340
341 /// Returns whether the channel is empty.
342 ///
343 /// See [`Channel::is_empty()`]
344 pub fn is_empty(&self) -> bool {
345 self.channel.is_empty()
346 }
347
348 /// Returns whether the channel is full.
349 ///
350 /// See [`Channel::is_full()`]
351 pub fn is_full(&self) -> bool {
352 self.channel.is_full()
353 }
182} 354}
183 355
184/// Receive-only access to a [`Channel`] without knowing channel size. 356/// Receive-only access to a [`Channel`] without knowing channel size.
@@ -209,6 +381,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
209 self.channel.try_receive_with_context(None) 381 self.channel.try_receive_with_context(None)
210 } 382 }
211 383
384 /// Peek at the next value without removing it from the queue.
385 ///
386 /// See [`Channel::try_peek()`]
387 pub fn try_peek(&self) -> Result<T, TryReceiveError>
388 where
389 T: Clone,
390 {
391 self.channel.try_peek_with_context(None)
392 }
393
212 /// Allows a poll_fn to poll until the channel is ready to receive 394 /// Allows a poll_fn to poll until the channel is ready to receive
213 /// 395 ///
214 /// See [`Channel::poll_ready_to_receive()`] 396 /// See [`Channel::poll_ready_to_receive()`]
@@ -233,8 +415,80 @@ where
233 } 415 }
234} 416}
235 417
418/// Receive-only access to a [`Channel`] without knowing channel size.
419/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
420pub struct SendDynamicReceiver<'ch, T> {
421 pub(crate) channel: &'ch dyn DynamicChannel<T>,
422}
423
424/// Receive-only access to a [`Channel`] without knowing channel size.
425/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
426#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
427pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
428
429impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
430 fn clone(&self) -> Self {
431 *self
432 }
433}
434
435impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
436unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
437unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
438
439impl<'ch, T> SendDynamicReceiver<'ch, T> {
440 /// Receive the next value.
441 ///
442 /// See [`Channel::receive()`].
443 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
444 DynamicReceiveFuture { channel: self.channel }
445 }
446
447 /// Attempt to immediately receive the next value.
448 ///
449 /// See [`Channel::try_receive()`]
450 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
451 self.channel.try_receive_with_context(None)
452 }
453
454 /// Allows a poll_fn to poll until the channel is ready to receive
455 ///
456 /// See [`Channel::poll_ready_to_receive()`]
457 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
458 self.channel.poll_ready_to_receive(cx)
459 }
460
461 /// Poll the channel for the next item
462 ///
463 /// See [`Channel::poll_receive()`]
464 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
465 self.channel.poll_receive(cx)
466 }
467}
468
469impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
470where
471 M: RawMutex + Sync + Send,
472{
473 fn from(s: Receiver<'ch, M, T, N>) -> Self {
474 Self { channel: s.channel }
475 }
476}
477
478impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
479where
480 M: RawMutex,
481{
482 type Item = T;
483
484 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485 self.channel.poll_receive(cx).map(Some)
486 }
487}
488
236/// Future returned by [`Channel::receive`] and [`Receiver::receive`]. 489/// Future returned by [`Channel::receive`] and [`Receiver::receive`].
237#[must_use = "futures do nothing unless you `.await` or poll them"] 490#[must_use = "futures do nothing unless you `.await` or poll them"]
491#[derive(Debug)]
238pub struct ReceiveFuture<'ch, M, T, const N: usize> 492pub struct ReceiveFuture<'ch, M, T, const N: usize>
239where 493where
240 M: RawMutex, 494 M: RawMutex,
@@ -255,6 +509,7 @@ where
255 509
256/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. 510/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`].
257#[must_use = "futures do nothing unless you `.await` or poll them"] 511#[must_use = "futures do nothing unless you `.await` or poll them"]
512#[derive(Debug)]
258pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> 513pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
259where 514where
260 M: RawMutex, 515 M: RawMutex,
@@ -298,6 +553,7 @@ impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for
298 553
299/// Future returned by [`Channel::send`] and [`Sender::send`]. 554/// Future returned by [`Channel::send`] and [`Sender::send`].
300#[must_use = "futures do nothing unless you `.await` or poll them"] 555#[must_use = "futures do nothing unless you `.await` or poll them"]
556#[derive(Debug)]
301pub struct SendFuture<'ch, M, T, const N: usize> 557pub struct SendFuture<'ch, M, T, const N: usize>
302where 558where
303 M: RawMutex, 559 M: RawMutex,
@@ -368,6 +624,10 @@ pub(crate) trait DynamicChannel<T> {
368 624
369 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; 625 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
370 626
627 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
628 where
629 T: Clone;
630
371 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; 631 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
372 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; 632 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
373 633
@@ -391,6 +651,7 @@ pub enum TrySendError<T> {
391 Full(T), 651 Full(T),
392} 652}
393 653
654#[derive(Debug)]
394struct ChannelState<T, const N: usize> { 655struct ChannelState<T, const N: usize> {
395 queue: Deque<T, N>, 656 queue: Deque<T, N>,
396 receiver_waker: WakerRegistration, 657 receiver_waker: WakerRegistration,
@@ -410,6 +671,31 @@ impl<T, const N: usize> ChannelState<T, N> {
410 self.try_receive_with_context(None) 671 self.try_receive_with_context(None)
411 } 672 }
412 673
674 fn try_peek(&mut self) -> Result<T, TryReceiveError>
675 where
676 T: Clone,
677 {
678 self.try_peek_with_context(None)
679 }
680
681 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
682 where
683 T: Clone,
684 {
685 if self.queue.is_full() {
686 self.senders_waker.wake();
687 }
688
689 if let Some(message) = self.queue.front() {
690 Ok(message.clone())
691 } else {
692 if let Some(cx) = cx {
693 self.receiver_waker.register(cx.waker());
694 }
695 Err(TryReceiveError::Empty)
696 }
697 }
698
413 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 699 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
414 if self.queue.is_full() { 700 if self.queue.is_full() {
415 self.senders_waker.wake(); 701 self.senders_waker.wake();
@@ -478,6 +764,9 @@ impl<T, const N: usize> ChannelState<T, N> {
478 } 764 }
479 765
480 fn clear(&mut self) { 766 fn clear(&mut self) {
767 if self.queue.is_full() {
768 self.senders_waker.wake();
769 }
481 self.queue.clear(); 770 self.queue.clear();
482 } 771 }
483 772
@@ -502,6 +791,7 @@ impl<T, const N: usize> ChannelState<T, N> {
502/// received from the channel. 791/// received from the channel.
503/// 792///
504/// All data sent will become available in the same order as it was sent. 793/// All data sent will become available in the same order as it was sent.
794#[derive(Debug)]
505pub struct Channel<M, T, const N: usize> 795pub struct Channel<M, T, const N: usize>
506where 796where
507 M: RawMutex, 797 M: RawMutex,
@@ -536,6 +826,13 @@ where
536 self.lock(|c| c.try_receive_with_context(cx)) 826 self.lock(|c| c.try_receive_with_context(cx))
537 } 827 }
538 828
829 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
830 where
831 T: Clone,
832 {
833 self.lock(|c| c.try_peek_with_context(cx))
834 }
835
539 /// Poll the channel for the next message 836 /// Poll the channel for the next message
540 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 837 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
541 self.lock(|c| c.poll_receive(cx)) 838 self.lock(|c| c.poll_receive(cx))
@@ -624,6 +921,17 @@ where
624 self.lock(|c| c.try_receive()) 921 self.lock(|c| c.try_receive())
625 } 922 }
626 923
924 /// Peek at the next value without removing it from the queue.
925 ///
926 /// This method will either receive a copy of the message from the channel immediately or return
927 /// an error if the channel is empty.
928 pub fn try_peek(&self) -> Result<T, TryReceiveError>
929 where
930 T: Clone,
931 {
932 self.lock(|c| c.try_peek())
933 }
934
627 /// Returns the maximum number of elements the channel can hold. 935 /// Returns the maximum number of elements the channel can hold.
628 pub const fn capacity(&self) -> usize { 936 pub const fn capacity(&self) -> usize {
629 N 937 N
@@ -671,6 +979,13 @@ where
671 Channel::try_receive_with_context(self, cx) 979 Channel::try_receive_with_context(self, cx)
672 } 980 }
673 981
982 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
983 where
984 T: Clone,
985 {
986 Channel::try_peek_with_context(self, cx)
987 }
988
674 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 989 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
675 Channel::poll_ready_to_send(self, cx) 990 Channel::poll_ready_to_send(self, cx)
676 } 991 }
@@ -684,6 +999,17 @@ where
684 } 999 }
685} 1000}
686 1001
1002impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
1003where
1004 M: RawMutex,
1005{
1006 type Item = T;
1007
1008 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1009 self.poll_receive(cx).map(Some)
1010 }
1011}
1012
687#[cfg(test)] 1013#[cfg(test)]
688mod tests { 1014mod tests {
689 use core::time::Duration; 1015 use core::time::Duration;
@@ -742,6 +1068,8 @@ mod tests {
742 fn simple_send_and_receive() { 1068 fn simple_send_and_receive() {
743 let c = Channel::<NoopRawMutex, u32, 3>::new(); 1069 let c = Channel::<NoopRawMutex, u32, 3>::new();
744 assert!(c.try_send(1).is_ok()); 1070 assert!(c.try_send(1).is_ok());
1071 assert_eq!(c.try_peek().unwrap(), 1);
1072 assert_eq!(c.try_peek().unwrap(), 1);
745 assert_eq!(c.try_receive().unwrap(), 1); 1073 assert_eq!(c.try_receive().unwrap(), 1);
746 } 1074 }
747 1075
@@ -772,6 +1100,8 @@ mod tests {
772 let r = c.dyn_receiver(); 1100 let r = c.dyn_receiver();
773 1101
774 assert!(s.try_send(1).is_ok()); 1102 assert!(s.try_send(1).is_ok());
1103 assert_eq!(r.try_peek().unwrap(), 1);
1104 assert_eq!(r.try_peek().unwrap(), 1);
775 assert_eq!(r.try_receive().unwrap(), 1); 1105 assert_eq!(r.try_receive().unwrap(), 1);
776 } 1106 }
777 1107
@@ -782,11 +1112,13 @@ mod tests {
782 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); 1112 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
783 let c = &*CHANNEL.init(Channel::new()); 1113 let c = &*CHANNEL.init(Channel::new());
784 let c2 = c; 1114 let c2 = c;
785 assert!(executor 1115 assert!(
786 .spawn(async move { 1116 executor
787 assert!(c2.try_send(1).is_ok()); 1117 .spawn(async move {
788 }) 1118 assert!(c2.try_send(1).is_ok());
789 .is_ok()); 1119 })
1120 .is_ok()
1121 );
790 assert_eq!(c.receive().await, 1); 1122 assert_eq!(c.receive().await, 1);
791 } 1123 }
792 1124
@@ -813,13 +1145,15 @@ mod tests {
813 // However, I've used the debugger to observe that the send does indeed wait. 1145 // However, I've used the debugger to observe that the send does indeed wait.
814 Delay::new(Duration::from_millis(500)).await; 1146 Delay::new(Duration::from_millis(500)).await;
815 assert_eq!(c.receive().await, 1); 1147 assert_eq!(c.receive().await, 1);
816 assert!(executor 1148 assert!(
817 .spawn(async move { 1149 executor
818 loop { 1150 .spawn(async move {
819 c.receive().await; 1151 loop {
820 } 1152 c.receive().await;
821 }) 1153 }
822 .is_ok()); 1154 })
1155 .is_ok()
1156 );
823 send_task_1.unwrap().await; 1157 send_task_1.unwrap().await;
824 send_task_2.unwrap().await; 1158 send_task_2.unwrap().await;
825 } 1159 }
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
index 2ac42c557..8ca61bc39 100644
--- a/embassy-sync/src/fmt.rs
+++ b/embassy-sync/src/fmt.rs
@@ -6,6 +6,7 @@ use core::fmt::{Debug, Display, LowerHex};
6#[cfg(all(feature = "defmt", feature = "log"))] 6#[cfg(all(feature = "defmt", feature = "log"))]
7compile_error!("You may not enable both `defmt` and `log` features."); 7compile_error!("You may not enable both `defmt` and `log` features.");
8 8
9#[collapse_debuginfo(yes)]
9macro_rules! assert { 10macro_rules! assert {
10 ($($x:tt)*) => { 11 ($($x:tt)*) => {
11 { 12 {
@@ -17,6 +18,7 @@ macro_rules! assert {
17 }; 18 };
18} 19}
19 20
21#[collapse_debuginfo(yes)]
20macro_rules! assert_eq { 22macro_rules! assert_eq {
21 ($($x:tt)*) => { 23 ($($x:tt)*) => {
22 { 24 {
@@ -28,6 +30,7 @@ macro_rules! assert_eq {
28 }; 30 };
29} 31}
30 32
33#[collapse_debuginfo(yes)]
31macro_rules! assert_ne { 34macro_rules! assert_ne {
32 ($($x:tt)*) => { 35 ($($x:tt)*) => {
33 { 36 {
@@ -39,6 +42,7 @@ macro_rules! assert_ne {
39 }; 42 };
40} 43}
41 44
45#[collapse_debuginfo(yes)]
42macro_rules! debug_assert { 46macro_rules! debug_assert {
43 ($($x:tt)*) => { 47 ($($x:tt)*) => {
44 { 48 {
@@ -50,6 +54,7 @@ macro_rules! debug_assert {
50 }; 54 };
51} 55}
52 56
57#[collapse_debuginfo(yes)]
53macro_rules! debug_assert_eq { 58macro_rules! debug_assert_eq {
54 ($($x:tt)*) => { 59 ($($x:tt)*) => {
55 { 60 {
@@ -61,6 +66,7 @@ macro_rules! debug_assert_eq {
61 }; 66 };
62} 67}
63 68
69#[collapse_debuginfo(yes)]
64macro_rules! debug_assert_ne { 70macro_rules! debug_assert_ne {
65 ($($x:tt)*) => { 71 ($($x:tt)*) => {
66 { 72 {
@@ -72,6 +78,7 @@ macro_rules! debug_assert_ne {
72 }; 78 };
73} 79}
74 80
81#[collapse_debuginfo(yes)]
75macro_rules! todo { 82macro_rules! todo {
76 ($($x:tt)*) => { 83 ($($x:tt)*) => {
77 { 84 {
@@ -83,20 +90,19 @@ macro_rules! todo {
83 }; 90 };
84} 91}
85 92
86#[cfg(not(feature = "defmt"))] 93#[collapse_debuginfo(yes)]
87macro_rules! unreachable { 94macro_rules! unreachable {
88 ($($x:tt)*) => { 95 ($($x:tt)*) => {
89 ::core::unreachable!($($x)*) 96 {
90 }; 97 #[cfg(not(feature = "defmt"))]
91} 98 ::core::unreachable!($($x)*);
92 99 #[cfg(feature = "defmt")]
93#[cfg(feature = "defmt")] 100 ::defmt::unreachable!($($x)*);
94macro_rules! unreachable { 101 }
95 ($($x:tt)*) => {
96 ::defmt::unreachable!($($x)*)
97 }; 102 };
98} 103}
99 104
105#[collapse_debuginfo(yes)]
100macro_rules! panic { 106macro_rules! panic {
101 ($($x:tt)*) => { 107 ($($x:tt)*) => {
102 { 108 {
@@ -108,6 +114,7 @@ macro_rules! panic {
108 }; 114 };
109} 115}
110 116
117#[collapse_debuginfo(yes)]
111macro_rules! trace { 118macro_rules! trace {
112 ($s:literal $(, $x:expr)* $(,)?) => { 119 ($s:literal $(, $x:expr)* $(,)?) => {
113 { 120 {
@@ -121,6 +128,7 @@ macro_rules! trace {
121 }; 128 };
122} 129}
123 130
131#[collapse_debuginfo(yes)]
124macro_rules! debug { 132macro_rules! debug {
125 ($s:literal $(, $x:expr)* $(,)?) => { 133 ($s:literal $(, $x:expr)* $(,)?) => {
126 { 134 {
@@ -134,6 +142,7 @@ macro_rules! debug {
134 }; 142 };
135} 143}
136 144
145#[collapse_debuginfo(yes)]
137macro_rules! info { 146macro_rules! info {
138 ($s:literal $(, $x:expr)* $(,)?) => { 147 ($s:literal $(, $x:expr)* $(,)?) => {
139 { 148 {
@@ -147,6 +156,7 @@ macro_rules! info {
147 }; 156 };
148} 157}
149 158
159#[collapse_debuginfo(yes)]
150macro_rules! warn { 160macro_rules! warn {
151 ($s:literal $(, $x:expr)* $(,)?) => { 161 ($s:literal $(, $x:expr)* $(,)?) => {
152 { 162 {
@@ -160,6 +170,7 @@ macro_rules! warn {
160 }; 170 };
161} 171}
162 172
173#[collapse_debuginfo(yes)]
163macro_rules! error { 174macro_rules! error {
164 ($s:literal $(, $x:expr)* $(,)?) => { 175 ($s:literal $(, $x:expr)* $(,)?) => {
165 { 176 {
@@ -174,6 +185,7 @@ macro_rules! error {
174} 185}
175 186
176#[cfg(feature = "defmt")] 187#[cfg(feature = "defmt")]
188#[collapse_debuginfo(yes)]
177macro_rules! unwrap { 189macro_rules! unwrap {
178 ($($x:tt)*) => { 190 ($($x:tt)*) => {
179 ::defmt::unwrap!($($x)*) 191 ::defmt::unwrap!($($x)*)
@@ -181,6 +193,7 @@ macro_rules! unwrap {
181} 193}
182 194
183#[cfg(not(feature = "defmt"))] 195#[cfg(not(feature = "defmt"))]
196#[collapse_debuginfo(yes)]
184macro_rules! unwrap { 197macro_rules! unwrap {
185 ($arg:expr) => { 198 ($arg:expr) => {
186 match $crate::fmt::Try::into_result($arg) { 199 match $crate::fmt::Try::into_result($arg) {
diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs
new file mode 100644
index 000000000..945560a80
--- /dev/null
+++ b/embassy-sync/src/lazy_lock.rs
@@ -0,0 +1,174 @@
1//! Synchronization primitive for initializing a value once, allowing others to get a reference to the value.
2
3use core::cell::UnsafeCell;
4use core::mem::ManuallyDrop;
5use core::sync::atomic::{AtomicBool, Ordering};
6
7/// The `LazyLock` is a synchronization primitive that allows for
8/// initializing a value once, and allowing others to obtain a
9/// reference to the value. This is useful for lazy initialization of
10/// a static value.
11///
12/// # Example
13/// ```
14/// use futures_executor::block_on;
15/// use embassy_sync::lazy_lock::LazyLock;
16///
17/// // Define a static value that will be lazily initialized
18/// // at runtime at the first access.
19/// static VALUE: LazyLock<u32> = LazyLock::new(|| 20);
20///
21/// let reference = VALUE.get();
22/// assert_eq!(reference, &20);
23/// ```
24#[derive(Debug)]
25pub struct LazyLock<T, F = fn() -> T> {
26 init: AtomicBool,
27 data: UnsafeCell<Data<T, F>>,
28}
29
30union Data<T, F> {
31 value: ManuallyDrop<T>,
32 f: ManuallyDrop<F>,
33}
34
35unsafe impl<T, F> Sync for LazyLock<T, F>
36where
37 T: Sync,
38 F: Sync,
39{
40}
41
42impl<T, F: FnOnce() -> T> LazyLock<T, F> {
43 /// Create a new uninitialized `StaticLock`.
44 pub const fn new(init_fn: F) -> Self {
45 Self {
46 init: AtomicBool::new(false),
47 data: UnsafeCell::new(Data {
48 f: ManuallyDrop::new(init_fn),
49 }),
50 }
51 }
52
53 /// Get a reference to the underlying value, initializing it if it
54 /// has not been done already.
55 #[inline]
56 pub fn get(&self) -> &T {
57 self.ensure_init_fast();
58 unsafe { &(*self.data.get()).value }
59 }
60
61 /// Get a mutable reference to the underlying value, initializing it if it
62 /// has not been done already.
63 #[inline]
64 pub fn get_mut(&mut self) -> &mut T {
65 self.ensure_init_fast();
66 unsafe { &mut (*self.data.get()).value }
67 }
68
69 /// Consume the `LazyLock`, returning the underlying value. The
70 /// initialization function will be called if it has not been
71 /// already.
72 #[inline]
73 pub fn into_inner(self) -> T {
74 self.ensure_init_fast();
75 let this = ManuallyDrop::new(self);
76 let data = unsafe { core::ptr::read(&this.data) }.into_inner();
77
78 ManuallyDrop::into_inner(unsafe { data.value })
79 }
80
81 /// Initialize the `LazyLock` if it has not been initialized yet.
82 /// This function is a fast track to [`Self::ensure_init`]
83 /// which does not require a critical section in most cases when
84 /// the value has been initialized already.
85 /// When this function returns, `self.data` is guaranteed to be
86 /// initialized and visible on the current core.
87 #[inline]
88 fn ensure_init_fast(&self) {
89 if !self.init.load(Ordering::Acquire) {
90 self.ensure_init();
91 }
92 }
93
94 /// Initialize the `LazyLock` if it has not been initialized yet.
95 /// When this function returns, `self.data` is guaranteed to be
96 /// initialized and visible on the current core.
97 fn ensure_init(&self) {
98 critical_section::with(|_| {
99 if !self.init.load(Ordering::Acquire) {
100 let data = unsafe { &mut *self.data.get() };
101 let f = unsafe { ManuallyDrop::take(&mut data.f) };
102 let value = f();
103 data.value = ManuallyDrop::new(value);
104
105 self.init.store(true, Ordering::Release);
106 }
107 });
108 }
109}
110
111impl<T, F> Drop for LazyLock<T, F> {
112 fn drop(&mut self) {
113 if self.init.load(Ordering::Acquire) {
114 unsafe { ManuallyDrop::drop(&mut self.data.get_mut().value) };
115 } else {
116 unsafe { ManuallyDrop::drop(&mut self.data.get_mut().f) };
117 }
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use core::sync::atomic::{AtomicU32, Ordering};
124
125 use super::*;
126
127 #[test]
128 fn test_lazy_lock() {
129 static VALUE: LazyLock<u32> = LazyLock::new(|| 20);
130 let reference = VALUE.get();
131 assert_eq!(reference, &20);
132 }
133 #[test]
134 fn test_lazy_lock_mutation() {
135 let mut value: LazyLock<u32> = LazyLock::new(|| 20);
136 *value.get_mut() = 21;
137 let reference = value.get();
138 assert_eq!(reference, &21);
139 }
140 #[test]
141 fn test_lazy_lock_into_inner() {
142 let lazy: LazyLock<u32> = LazyLock::new(|| 20);
143 let value = lazy.into_inner();
144 assert_eq!(value, 20);
145 }
146
147 static DROP_CHECKER: AtomicU32 = AtomicU32::new(0);
148 #[derive(Debug)]
149 struct DropCheck;
150
151 impl Drop for DropCheck {
152 fn drop(&mut self) {
153 DROP_CHECKER.fetch_add(1, Ordering::Acquire);
154 }
155 }
156
157 #[test]
158 fn test_lazy_drop() {
159 let lazy: LazyLock<DropCheck> = LazyLock::new(|| DropCheck);
160 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 0);
161 lazy.get();
162 drop(lazy);
163 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1);
164
165 let dropper = DropCheck;
166 let lazy_fn: LazyLock<u32, _> = LazyLock::new(move || {
167 let _a = dropper;
168 20
169 });
170 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1);
171 drop(lazy_fn);
172 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 2);
173 }
174}
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index a5eee8d02..1cfde8b10 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -1,6 +1,7 @@
1#![cfg_attr(not(feature = "std"), no_std)] 1#![cfg_attr(not(feature = "std"), no_std)]
2#![allow(async_fn_in_trait)] 2#![allow(async_fn_in_trait)]
3#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
4#![allow(unsafe_op_in_unsafe_fn)]
4#![doc = include_str!("../README.md")] 5#![doc = include_str!("../README.md")]
5#![warn(missing_docs)] 6#![warn(missing_docs)]
6 7
@@ -12,12 +13,15 @@ mod ring_buffer;
12 13
13pub mod blocking_mutex; 14pub mod blocking_mutex;
14pub mod channel; 15pub mod channel;
16pub mod lazy_lock;
15pub mod mutex; 17pub mod mutex;
16pub mod once_lock; 18pub mod once_lock;
17pub mod pipe; 19pub mod pipe;
18pub mod priority_channel; 20pub mod priority_channel;
19pub mod pubsub; 21pub mod pubsub;
22pub mod rwlock;
20pub mod semaphore; 23pub mod semaphore;
21pub mod signal; 24pub mod signal;
22pub mod waitqueue; 25pub mod waitqueue;
26pub mod watch;
23pub mod zerocopy_channel; 27pub mod zerocopy_channel;
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 8c3a3af9f..96b834f02 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -2,13 +2,13 @@
2//! 2//!
3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. 3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell}; 4use core::cell::{RefCell, UnsafeCell};
5use core::future::poll_fn; 5use core::future::{Future, poll_fn};
6use core::ops::{Deref, DerefMut}; 6use core::ops::{Deref, DerefMut};
7use core::task::Poll; 7use core::task::Poll;
8use core::{fmt, mem}; 8use core::{fmt, mem};
9 9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex as BlockingMutex; 10use crate::blocking_mutex::Mutex as BlockingMutex;
11use crate::blocking_mutex::raw::RawMutex;
12use crate::waitqueue::WakerRegistration; 12use crate::waitqueue::WakerRegistration;
13 13
14/// Error returned by [`Mutex::try_lock`] 14/// Error returned by [`Mutex::try_lock`]
@@ -16,6 +16,7 @@ use crate::waitqueue::WakerRegistration;
16#[cfg_attr(feature = "defmt", derive(defmt::Format))] 16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError; 17pub struct TryLockError;
18 18
19#[derive(Debug)]
19struct State { 20struct State {
20 locked: bool, 21 locked: bool,
21 waker: WakerRegistration, 22 waker: WakerRegistration,
@@ -23,7 +24,7 @@ struct State {
23 24
24/// Async mutex. 25/// Async mutex.
25/// 26///
26/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). 27/// The mutex is generic over a blocking [`RawMutex`].
27/// The raw mutex is used to guard access to the internal "is locked" flag. It 28/// The raw mutex is used to guard access to the internal "is locked" flag. It
28/// is held for very short periods only, while locking and unlocking. It is *not* held 29/// is held for very short periods only, while locking and unlocking. It is *not* held
29/// for the entire time the async Mutex is locked. 30/// for the entire time the async Mutex is locked.
@@ -73,7 +74,7 @@ where
73 /// Lock the mutex. 74 /// Lock the mutex.
74 /// 75 ///
75 /// This will wait for the mutex to be unlocked if it's already locked. 76 /// This will wait for the mutex to be unlocked if it's already locked.
76 pub async fn lock(&self) -> MutexGuard<'_, M, T> { 77 pub fn lock(&self) -> impl Future<Output = MutexGuard<'_, M, T>> {
77 poll_fn(|cx| { 78 poll_fn(|cx| {
78 let ready = self.state.lock(|s| { 79 let ready = self.state.lock(|s| {
79 let mut s = s.borrow_mut(); 80 let mut s = s.borrow_mut();
@@ -92,7 +93,6 @@ where
92 Poll::Pending 93 Poll::Pending
93 } 94 }
94 }) 95 })
95 .await
96 } 96 }
97 97
98 /// Attempt to immediately lock the mutex. 98 /// Attempt to immediately lock the mutex.
@@ -138,7 +138,7 @@ impl<M: RawMutex, T> From<T> for Mutex<M, T> {
138impl<M, T> Default for Mutex<M, T> 138impl<M, T> Default for Mutex<M, T>
139where 139where
140 M: RawMutex, 140 M: RawMutex,
141 T: ?Sized + Default, 141 T: Default,
142{ 142{
143 fn default() -> Self { 143 fn default() -> Self {
144 Self::new(Default::default()) 144 Self::new(Default::default())
@@ -172,6 +172,7 @@ where
172/// 172///
173/// Dropping it unlocks the mutex. 173/// Dropping it unlocks the mutex.
174#[clippy::has_significant_drop] 174#[clippy::has_significant_drop]
175#[must_use = "if unused the Mutex will immediately unlock"]
175pub struct MutexGuard<'a, M, T> 176pub struct MutexGuard<'a, M, T>
176where 177where
177 M: RawMutex, 178 M: RawMutex,
@@ -186,7 +187,7 @@ where
186 T: ?Sized, 187 T: ?Sized,
187{ 188{
188 /// Returns a locked view over a portion of the locked data. 189 /// Returns a locked view over a portion of the locked data.
189 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { 190 pub fn map<U: ?Sized>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
190 let mutex = this.mutex; 191 let mutex = this.mutex;
191 let value = fun(unsafe { &mut *this.mutex.inner.get() }); 192 let value = fun(unsafe { &mut *this.mutex.inner.get() });
192 // Don't run the `drop` method for MutexGuard. The ownership of the underlying 193 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
@@ -278,7 +279,7 @@ where
278 T: ?Sized, 279 T: ?Sized,
279{ 280{
280 /// Returns a locked view over a portion of the locked data. 281 /// Returns a locked view over a portion of the locked data.
281 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { 282 pub fn map<U: ?Sized>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
282 let state = this.state; 283 let state = this.state;
283 let value = fun(unsafe { &mut *this.value }); 284 let value = fun(unsafe { &mut *this.value });
284 // Don't run the `drop` method for MutexGuard. The ownership of the underlying 285 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
index 55608ba32..2af19ca20 100644
--- a/embassy-sync/src/once_lock.rs
+++ b/embassy-sync/src/once_lock.rs
@@ -1,7 +1,8 @@
1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. 1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
2 2
3use core::cell::Cell; 3use core::cell::Cell;
4use core::future::poll_fn; 4use core::fmt::{Debug, Formatter};
5use core::future::{Future, poll_fn};
5use core::mem::MaybeUninit; 6use core::mem::MaybeUninit;
6use core::sync::atomic::{AtomicBool, Ordering}; 7use core::sync::atomic::{AtomicBool, Ordering};
7use core::task::Poll; 8use core::task::Poll;
@@ -42,7 +43,16 @@ pub struct OnceLock<T> {
42 data: Cell<MaybeUninit<T>>, 43 data: Cell<MaybeUninit<T>>,
43} 44}
44 45
45unsafe impl<T> Sync for OnceLock<T> {} 46impl<T> Debug for OnceLock<T> {
47 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
48 f.debug_struct("OnceLock")
49 .field("init", &self.init)
50 .field("data", &"Cell<MaybeUninit<{unprintable}>>")
51 .finish()
52 }
53}
54
55unsafe impl<T> Sync for OnceLock<T> where T: Sync {}
46 56
47impl<T> OnceLock<T> { 57impl<T> OnceLock<T> {
48 /// Create a new uninitialized `OnceLock`. 58 /// Create a new uninitialized `OnceLock`.
@@ -55,7 +65,7 @@ impl<T> OnceLock<T> {
55 65
56 /// Get a reference to the underlying value, waiting for it to be set. 66 /// Get a reference to the underlying value, waiting for it to be set.
57 /// If the value is already set, this will return immediately. 67 /// If the value is already set, this will return immediately.
58 pub async fn get(&self) -> &T { 68 pub fn get(&self) -> impl Future<Output = &T> {
59 poll_fn(|cx| match self.try_get() { 69 poll_fn(|cx| match self.try_get() {
60 Some(data) => Poll::Ready(data), 70 Some(data) => Poll::Ready(data),
61 None => { 71 None => {
@@ -63,7 +73,6 @@ impl<T> OnceLock<T> {
63 Poll::Pending 73 Poll::Pending
64 } 74 }
65 }) 75 })
66 .await
67 } 76 }
68 77
69 /// Try to get a reference to the underlying value if it exists. 78 /// Try to get a reference to the underlying value if it exists.
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index cd5b8ed75..215a556d9 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -7,12 +7,13 @@ use core::ops::Range;
7use core::pin::Pin; 7use core::pin::Pin;
8use core::task::{Context, Poll}; 8use core::task::{Context, Poll};
9 9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex; 10use crate::blocking_mutex::Mutex;
11use crate::blocking_mutex::raw::RawMutex;
12use crate::ring_buffer::RingBuffer; 12use crate::ring_buffer::RingBuffer;
13use crate::waitqueue::WakerRegistration; 13use crate::waitqueue::WakerRegistration;
14 14
15/// Write-only access to a [`Pipe`]. 15/// Write-only access to a [`Pipe`].
16#[derive(Debug)]
16pub struct Writer<'p, M, const N: usize> 17pub struct Writer<'p, M, const N: usize>
17where 18where
18 M: RawMutex, 19 M: RawMutex,
@@ -52,6 +53,7 @@ where
52 53
53/// Future returned by [`Pipe::write`] and [`Writer::write`]. 54/// Future returned by [`Pipe::write`] and [`Writer::write`].
54#[must_use = "futures do nothing unless you `.await` or poll them"] 55#[must_use = "futures do nothing unless you `.await` or poll them"]
56#[derive(Debug)]
55pub struct WriteFuture<'p, M, const N: usize> 57pub struct WriteFuture<'p, M, const N: usize>
56where 58where
57 M: RawMutex, 59 M: RawMutex,
@@ -77,6 +79,7 @@ where
77impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} 79impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
78 80
79/// Read-only access to a [`Pipe`]. 81/// Read-only access to a [`Pipe`].
82#[derive(Debug)]
80pub struct Reader<'p, M, const N: usize> 83pub struct Reader<'p, M, const N: usize>
81where 84where
82 M: RawMutex, 85 M: RawMutex,
@@ -128,6 +131,7 @@ where
128 131
129/// Future returned by [`Pipe::read`] and [`Reader::read`]. 132/// Future returned by [`Pipe::read`] and [`Reader::read`].
130#[must_use = "futures do nothing unless you `.await` or poll them"] 133#[must_use = "futures do nothing unless you `.await` or poll them"]
134#[derive(Debug)]
131pub struct ReadFuture<'p, M, const N: usize> 135pub struct ReadFuture<'p, M, const N: usize>
132where 136where
133 M: RawMutex, 137 M: RawMutex,
@@ -152,8 +156,9 @@ where
152 156
153impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} 157impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
154 158
155/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. 159/// Future returned by [`Reader::fill_buf`].
156#[must_use = "futures do nothing unless you `.await` or poll them"] 160#[must_use = "futures do nothing unless you `.await` or poll them"]
161#[derive(Debug)]
157pub struct FillBufFuture<'p, M, const N: usize> 162pub struct FillBufFuture<'p, M, const N: usize>
158where 163where
159 M: RawMutex, 164 M: RawMutex,
@@ -199,6 +204,7 @@ pub enum TryWriteError {
199 Full, 204 Full,
200} 205}
201 206
207#[derive(Debug)]
202struct PipeState<const N: usize> { 208struct PipeState<const N: usize> {
203 buffer: RingBuffer<N>, 209 buffer: RingBuffer<N>,
204 read_waker: WakerRegistration, 210 read_waker: WakerRegistration,
@@ -206,6 +212,7 @@ struct PipeState<const N: usize> {
206} 212}
207 213
208#[repr(transparent)] 214#[repr(transparent)]
215#[derive(Debug)]
209struct Buffer<const N: usize>(UnsafeCell<[u8; N]>); 216struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
210 217
211impl<const N: usize> Buffer<N> { 218impl<const N: usize> Buffer<N> {
@@ -230,6 +237,7 @@ unsafe impl<const N: usize> Sync for Buffer<N> {}
230/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. 237/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up.
231/// 238///
232/// All data written will become available in the same order as it was written. 239/// All data written will become available in the same order as it was written.
240#[derive(Debug)]
233pub struct Pipe<M, const N: usize> 241pub struct Pipe<M, const N: usize>
234where 242where
235 M: RawMutex, 243 M: RawMutex,
@@ -532,6 +540,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N>
532 } 540 }
533} 541}
534 542
543//
544// Type-erased variants
545//
546
547pub(crate) trait DynamicPipe {
548 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
549 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
550
551 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
552 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
553
554 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
555 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
556
557 fn consume(&self, amt: usize);
558 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
559}
560
561impl<M, const N: usize> DynamicPipe for Pipe<M, N>
562where
563 M: RawMutex,
564{
565 fn consume(&self, amt: usize) {
566 Pipe::consume(self, amt)
567 }
568
569 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
570 Pipe::try_fill_buf_with_context(self, cx)
571 }
572
573 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
574 Pipe::write(self, buf).into()
575 }
576
577 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
578 Pipe::read(self, buf).into()
579 }
580
581 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
582 Pipe::try_read(self, buf)
583 }
584
585 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
586 Pipe::try_write(self, buf)
587 }
588
589 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
590 Pipe::try_write_with_context(self, cx, buf)
591 }
592
593 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
594 Pipe::try_read_with_context(self, cx, buf)
595 }
596}
597
598/// Write-only access to the dynamic pipe.
599pub struct DynamicWriter<'p> {
600 pipe: &'p dyn DynamicPipe,
601}
602
603impl<'p> Clone for DynamicWriter<'p> {
604 fn clone(&self) -> Self {
605 *self
606 }
607}
608
609impl<'p> Copy for DynamicWriter<'p> {}
610
611impl<'p> DynamicWriter<'p> {
612 /// Write some bytes to the pipe.
613 ///
614 /// See [`Pipe::write()`]
615 pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
616 self.pipe.write(buf)
617 }
618
619 /// Attempt to immediately write some bytes to the pipe.
620 ///
621 /// See [`Pipe::try_write()`]
622 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
623 self.pipe.try_write(buf)
624 }
625}
626
627impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
628where
629 M: RawMutex,
630{
631 fn from(value: Writer<'p, M, N>) -> Self {
632 Self { pipe: value.pipe }
633 }
634}
635
636/// Future returned by [`DynamicWriter::write`].
637#[must_use = "futures do nothing unless you `.await` or poll them"]
638pub struct DynamicWriteFuture<'p> {
639 pipe: &'p dyn DynamicPipe,
640 buf: &'p [u8],
641}
642
643impl<'p> Future for DynamicWriteFuture<'p> {
644 type Output = usize;
645
646 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647 match self.pipe.try_write_with_context(Some(cx), self.buf) {
648 Ok(n) => Poll::Ready(n),
649 Err(TryWriteError::Full) => Poll::Pending,
650 }
651 }
652}
653
654impl<'p> Unpin for DynamicWriteFuture<'p> {}
655
656impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
657where
658 M: RawMutex,
659{
660 fn from(value: WriteFuture<'p, M, N>) -> Self {
661 Self {
662 pipe: value.pipe,
663 buf: value.buf,
664 }
665 }
666}
667
668/// Read-only access to a dynamic pipe.
669pub struct DynamicReader<'p> {
670 pipe: &'p dyn DynamicPipe,
671}
672
673impl<'p> DynamicReader<'p> {
674 /// Read some bytes from the pipe.
675 ///
676 /// See [`Pipe::read()`]
677 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
678 self.pipe.read(buf)
679 }
680
681 /// Attempt to immediately read some bytes from the pipe.
682 ///
683 /// See [`Pipe::try_read()`]
684 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
685 self.pipe.try_read(buf)
686 }
687
688 /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
689 ///
690 /// If no bytes are currently available to read, this function waits until at least one byte is available.
691 ///
692 /// If the reader is at end-of-file (EOF), an empty slice is returned.
693 pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
694 DynamicFillBufFuture { pipe: Some(self.pipe) }
695 }
696
697 /// Try returning contents of the internal buffer.
698 ///
699 /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
700 ///
701 /// If the reader is at end-of-file (EOF), an empty slice is returned.
702 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
703 unsafe { self.pipe.try_fill_buf_with_context(None) }
704 }
705
706 /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
707 pub fn consume(&mut self, amt: usize) {
708 self.pipe.consume(amt)
709 }
710}
711
712impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
713where
714 M: RawMutex,
715{
716 fn from(value: Reader<'p, M, N>) -> Self {
717 Self { pipe: value.pipe }
718 }
719}
720
721/// Future returned by [`Pipe::read`] and [`Reader::read`].
722#[must_use = "futures do nothing unless you `.await` or poll them"]
723pub struct DynamicReadFuture<'p> {
724 pipe: &'p dyn DynamicPipe,
725 buf: &'p mut [u8],
726}
727
728impl<'p> Future for DynamicReadFuture<'p> {
729 type Output = usize;
730
731 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
732 match self.pipe.try_read_with_context(Some(cx), self.buf) {
733 Ok(n) => Poll::Ready(n),
734 Err(TryReadError::Empty) => Poll::Pending,
735 }
736 }
737}
738
739impl<'p> Unpin for DynamicReadFuture<'p> {}
740
741impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
742where
743 M: RawMutex,
744{
745 fn from(value: ReadFuture<'p, M, N>) -> Self {
746 Self {
747 pipe: value.pipe,
748 buf: value.buf,
749 }
750 }
751}
752
753/// Future returned by [`DynamicReader::fill_buf`].
754#[must_use = "futures do nothing unless you `.await` or poll them"]
755pub struct DynamicFillBufFuture<'p> {
756 pipe: Option<&'p dyn DynamicPipe>,
757}
758
759impl<'p> Future for DynamicFillBufFuture<'p> {
760 type Output = &'p [u8];
761
762 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
763 let pipe = self.pipe.take().unwrap();
764 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
765 Ok(buf) => Poll::Ready(buf),
766 Err(TryReadError::Empty) => {
767 self.pipe = Some(pipe);
768 Poll::Pending
769 }
770 }
771 }
772}
773
774impl<'p> Unpin for DynamicFillBufFuture<'p> {}
775
776impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
777where
778 M: RawMutex,
779{
780 fn from(value: FillBufFuture<'p, M, N>) -> Self {
781 Self {
782 pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
783 }
784 }
785}
786
535#[cfg(test)] 787#[cfg(test)]
536mod tests { 788mod tests {
537 use futures_executor::ThreadPool; 789 use futures_executor::ThreadPool;
@@ -619,6 +871,35 @@ mod tests {
619 let _ = w.clone(); 871 let _ = w.clone();
620 } 872 }
621 873
874 #[test]
875 fn dynamic_dispatch_pipe() {
876 let mut c = Pipe::<NoopRawMutex, 3>::new();
877 let (r, w) = c.split();
878 let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
879
880 assert!(w.try_write(&[42, 43]).is_ok());
881 let buf = r.try_fill_buf().unwrap();
882 assert_eq!(buf, &[42, 43]);
883 let buf = r.try_fill_buf().unwrap();
884 assert_eq!(buf, &[42, 43]);
885 r.consume(1);
886 let buf = r.try_fill_buf().unwrap();
887 assert_eq!(buf, &[43]);
888 r.consume(1);
889 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
890 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
891 assert_eq!(w.try_write(&[45, 46]), Ok(2));
892 let buf = r.try_fill_buf().unwrap();
893 assert_eq!(buf, &[44]); // only one byte due to wraparound.
894 r.consume(1);
895 let buf = r.try_fill_buf().unwrap();
896 assert_eq!(buf, &[45, 46]);
897 assert!(w.try_write(&[47]).is_ok());
898 let buf = r.try_fill_buf().unwrap();
899 assert_eq!(buf, &[45, 46, 47]);
900 r.consume(3);
901 }
902
622 #[futures_test::test] 903 #[futures_test::test]
623 async fn receiver_receives_given_try_write_async() { 904 async fn receiver_receives_given_try_write_async() {
624 let executor = ThreadPool::new().unwrap(); 905 let executor = ThreadPool::new().unwrap();
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 24c6c5a7f..1af7d9221 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -1,18 +1,18 @@
1//! A queue for sending values between asynchronous tasks. 1//! A queue for sending values between asynchronous tasks.
2//! 2//!
3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. 3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue.
4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel. 4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`] parameter of the channel.
5 5
6use core::cell::RefCell; 6use core::cell::RefCell;
7use core::future::Future; 7use core::future::Future;
8use core::pin::Pin; 8use core::pin::Pin;
9use core::task::{Context, Poll}; 9use core::task::{Context, Poll};
10 10
11pub use heapless::binary_heap::{Kind, Max, Min};
12use heapless::BinaryHeap; 11use heapless::BinaryHeap;
12pub use heapless::binary_heap::{Kind, Max, Min};
13 13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex; 14use crate::blocking_mutex::Mutex;
15use crate::blocking_mutex::raw::RawMutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError}; 16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration; 17use crate::waitqueue::WakerRegistration;
18 18
@@ -71,6 +71,48 @@ where
71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx) 72 self.channel.poll_ready_to_send(cx)
73 } 73 }
74
75 /// Returns the maximum number of elements the channel can hold.
76 ///
77 /// See [`PriorityChannel::capacity()`]
78 pub const fn capacity(&self) -> usize {
79 self.channel.capacity()
80 }
81
82 /// Returns the free capacity of the channel.
83 ///
84 /// See [`PriorityChannel::free_capacity()`]
85 pub fn free_capacity(&self) -> usize {
86 self.channel.free_capacity()
87 }
88
89 /// Clears all elements in the channel.
90 ///
91 /// See [`PriorityChannel::clear()`]
92 pub fn clear(&self) {
93 self.channel.clear();
94 }
95
96 /// Returns the number of elements currently in the channel.
97 ///
98 /// See [`PriorityChannel::len()`]
99 pub fn len(&self) -> usize {
100 self.channel.len()
101 }
102
103 /// Returns whether the channel is empty.
104 ///
105 /// See [`PriorityChannel::is_empty()`]
106 pub fn is_empty(&self) -> bool {
107 self.channel.is_empty()
108 }
109
110 /// Returns whether the channel is full.
111 ///
112 /// See [`PriorityChannel::is_full()`]
113 pub fn is_full(&self) -> bool {
114 self.channel.is_full()
115 }
74} 116}
75 117
76impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> 118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
@@ -133,6 +175,16 @@ where
133 self.channel.try_receive() 175 self.channel.try_receive()
134 } 176 }
135 177
178 /// Peek at the next value without removing it from the queue.
179 ///
180 /// See [`PriorityChannel::try_peek()`]
181 pub fn try_peek(&self) -> Result<T, TryReceiveError>
182 where
183 T: Clone,
184 {
185 self.channel.try_peek_with_context(None)
186 }
187
136 /// Allows a poll_fn to poll until the channel is ready to receive 188 /// Allows a poll_fn to poll until the channel is ready to receive
137 /// 189 ///
138 /// See [`PriorityChannel::poll_ready_to_receive()`] 190 /// See [`PriorityChannel::poll_ready_to_receive()`]
@@ -146,6 +198,59 @@ where
146 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 198 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
147 self.channel.poll_receive(cx) 199 self.channel.poll_receive(cx)
148 } 200 }
201
202 /// Removes the elements from the channel that satisfy the predicate.
203 ///
204 /// See [`PriorityChannel::remove_if()`]
205 pub fn remove_if<F>(&self, predicate: F)
206 where
207 F: Fn(&T) -> bool,
208 T: Clone,
209 {
210 self.channel.remove_if(predicate)
211 }
212
213 /// Returns the maximum number of elements the channel can hold.
214 ///
215 /// See [`PriorityChannel::capacity()`]
216 pub const fn capacity(&self) -> usize {
217 self.channel.capacity()
218 }
219
220 /// Returns the free capacity of the channel.
221 ///
222 /// See [`PriorityChannel::free_capacity()`]
223 pub fn free_capacity(&self) -> usize {
224 self.channel.free_capacity()
225 }
226
227 /// Clears all elements in the channel.
228 ///
229 /// See [`PriorityChannel::clear()`]
230 pub fn clear(&self) {
231 self.channel.clear();
232 }
233
234 /// Returns the number of elements currently in the channel.
235 ///
236 /// See [`PriorityChannel::len()`]
237 pub fn len(&self) -> usize {
238 self.channel.len()
239 }
240
241 /// Returns whether the channel is empty.
242 ///
243 /// See [`PriorityChannel::is_empty()`]
244 pub fn is_empty(&self) -> bool {
245 self.channel.is_empty()
246 }
247
248 /// Returns whether the channel is full.
249 ///
250 /// See [`PriorityChannel::is_full()`]
251 pub fn is_full(&self) -> bool {
252 self.channel.is_full()
253 }
149} 254}
150 255
151impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> 256impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
@@ -248,6 +353,31 @@ where
248 self.try_receive_with_context(None) 353 self.try_receive_with_context(None)
249 } 354 }
250 355
356 fn try_peek(&mut self) -> Result<T, TryReceiveError>
357 where
358 T: Clone,
359 {
360 self.try_peek_with_context(None)
361 }
362
363 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364 where
365 T: Clone,
366 {
367 if self.queue.len() == self.queue.capacity() {
368 self.senders_waker.wake();
369 }
370
371 if let Some(message) = self.queue.peek() {
372 Ok(message.clone())
373 } else {
374 if let Some(cx) = cx {
375 self.receiver_waker.register(cx.waker());
376 }
377 Err(TryReceiveError::Empty)
378 }
379 }
380
251 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 381 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
252 if self.queue.len() == self.queue.capacity() { 382 if self.queue.len() == self.queue.capacity() {
253 self.senders_waker.wake(); 383 self.senders_waker.wake();
@@ -316,6 +446,9 @@ where
316 } 446 }
317 447
318 fn clear(&mut self) { 448 fn clear(&mut self) {
449 if self.queue.len() == self.queue.capacity() {
450 self.senders_waker.wake();
451 }
319 self.queue.clear(); 452 self.queue.clear();
320 } 453 }
321 454
@@ -340,7 +473,7 @@ where
340/// received from the channel. 473/// received from the channel.
341/// 474///
342/// Sent data may be reordered based on their priority within the channel. 475/// Sent data may be reordered based on their priority within the channel.
343/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] 476/// For example, in a [`Max`] [`PriorityChannel`]
344/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. 477/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
345pub struct PriorityChannel<M, T, K, const N: usize> 478pub struct PriorityChannel<M, T, K, const N: usize>
346where 479where
@@ -380,6 +513,13 @@ where
380 self.lock(|c| c.try_receive_with_context(cx)) 513 self.lock(|c| c.try_receive_with_context(cx))
381 } 514 }
382 515
516 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517 where
518 T: Clone,
519 {
520 self.lock(|c| c.try_peek_with_context(cx))
521 }
522
383 /// Poll the channel for the next message 523 /// Poll the channel for the next message
384 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 524 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
385 self.lock(|c| c.poll_receive(cx)) 525 self.lock(|c| c.poll_receive(cx))
@@ -450,6 +590,37 @@ where
450 self.lock(|c| c.try_receive()) 590 self.lock(|c| c.try_receive())
451 } 591 }
452 592
593 /// Peek at the next value without removing it from the queue.
594 ///
595 /// This method will either receive a copy of the message from the channel immediately or return
596 /// an error if the channel is empty.
597 pub fn try_peek(&self) -> Result<T, TryReceiveError>
598 where
599 T: Clone,
600 {
601 self.lock(|c| c.try_peek())
602 }
603
604 /// Removes elements from the channel based on the given predicate.
605 pub fn remove_if<F>(&self, predicate: F)
606 where
607 F: Fn(&T) -> bool,
608 T: Clone,
609 {
610 self.lock(|c| {
611 let mut new_heap = BinaryHeap::<T, K, N>::new();
612 for item in c.queue.iter() {
613 if !predicate(item) {
614 match new_heap.push(item.clone()) {
615 Ok(_) => (),
616 Err(_) => panic!("Error pushing item to heap"),
617 }
618 }
619 }
620 c.queue = new_heap;
621 });
622 }
623
453 /// Returns the maximum number of elements the channel can hold. 624 /// Returns the maximum number of elements the channel can hold.
454 pub const fn capacity(&self) -> usize { 625 pub const fn capacity(&self) -> usize {
455 N 626 N
@@ -499,6 +670,13 @@ where
499 PriorityChannel::try_receive_with_context(self, cx) 670 PriorityChannel::try_receive_with_context(self, cx)
500 } 671 }
501 672
673 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674 where
675 T: Clone,
676 {
677 PriorityChannel::try_peek_with_context(self, cx)
678 }
679
502 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 680 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
503 PriorityChannel::poll_ready_to_send(self, cx) 681 PriorityChannel::poll_ready_to_send(self, cx)
504 } 682 }
@@ -587,6 +765,8 @@ mod tests {
587 fn simple_send_and_receive() { 765 fn simple_send_and_receive() {
588 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); 766 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
589 assert!(c.try_send(1).is_ok()); 767 assert!(c.try_send(1).is_ok());
768 assert_eq!(c.try_peek().unwrap(), 1);
769 assert_eq!(c.try_peek().unwrap(), 1);
590 assert_eq!(c.try_receive().unwrap(), 1); 770 assert_eq!(c.try_receive().unwrap(), 1);
591 } 771 }
592 772
@@ -607,6 +787,8 @@ mod tests {
607 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 787 let r: DynamicReceiver<'_, u32> = c.receiver().into();
608 788
609 assert!(s.try_send(1).is_ok()); 789 assert!(s.try_send(1).is_ok());
790 assert_eq!(r.try_peek().unwrap(), 1);
791 assert_eq!(r.try_peek().unwrap(), 1);
610 assert_eq!(r.try_receive().unwrap(), 1); 792 assert_eq!(r.try_receive().unwrap(), 1);
611 } 793 }
612 794
@@ -617,11 +799,13 @@ mod tests {
617 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new(); 799 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
618 let c = &*CHANNEL.init(PriorityChannel::new()); 800 let c = &*CHANNEL.init(PriorityChannel::new());
619 let c2 = c; 801 let c2 = c;
620 assert!(executor 802 assert!(
621 .spawn(async move { 803 executor
622 assert!(c2.try_send(1).is_ok()); 804 .spawn(async move {
623 }) 805 assert!(c2.try_send(1).is_ok());
624 .is_ok()); 806 })
807 .is_ok()
808 );
625 assert_eq!(c.receive().await, 1); 809 assert_eq!(c.receive().await, 1);
626 } 810 }
627 811
@@ -648,13 +832,15 @@ mod tests {
648 // However, I've used the debugger to observe that the send does indeed wait. 832 // However, I've used the debugger to observe that the send does indeed wait.
649 Delay::new(Duration::from_millis(500)).await; 833 Delay::new(Duration::from_millis(500)).await;
650 assert_eq!(c.receive().await, 1); 834 assert_eq!(c.receive().await, 1);
651 assert!(executor 835 assert!(
652 .spawn(async move { 836 executor
653 loop { 837 .spawn(async move {
654 c.receive().await; 838 loop {
655 } 839 c.receive().await;
656 }) 840 }
657 .is_ok()); 841 })
842 .is_ok()
843 );
658 send_task_1.unwrap().await; 844 send_task_1.unwrap().await;
659 send_task_2.unwrap().await; 845 send_task_2.unwrap().await;
660 } 846 }
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index a97eb7d5b..127a208f1 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -10,8 +10,8 @@ use heapless::Deque;
10 10
11use self::publisher::{ImmediatePub, Pub}; 11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub; 12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex; 13use crate::blocking_mutex::Mutex;
14use crate::blocking_mutex::raw::RawMutex;
15use crate::waitqueue::MultiWakerRegistration; 15use crate::waitqueue::MultiWakerRegistration;
16 16
17pub mod publisher; 17pub mod publisher;
@@ -27,8 +27,8 @@ pub use subscriber::{DynSubscriber, Subscriber};
27/// 27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. 28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message 29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive 30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged. 31/// an error to indicate that it has lagged.
32/// 32///
33/// ## Example 33/// ## Example
34/// 34///
@@ -71,6 +71,7 @@ pub use subscriber::{DynSubscriber, Subscriber};
71/// # block_on(test); 71/// # block_on(test);
72/// ``` 72/// ```
73/// 73///
74#[derive(Debug)]
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { 75pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, 76 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76} 77}
@@ -88,7 +89,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
88 /// Create a new subscriber. It will only receive messages that are published after its creation. 89 /// Create a new subscriber. It will only receive messages that are published after its creation.
89 /// 90 ///
90 /// If there are no subscriber slots left, an error will be returned. 91 /// If there are no subscriber slots left, an error will be returned.
91 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { 92 pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| { 93 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut(); 94 let mut s = inner.borrow_mut();
94 95
@@ -120,7 +121,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
120 /// Create a new publisher 121 /// Create a new publisher
121 /// 122 ///
122 /// If there are no publisher slots left, an error will be returned. 123 /// If there are no publisher slots left, an error will be returned.
123 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { 124 pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| { 125 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut(); 126 let mut s = inner.borrow_mut();
126 127
@@ -151,13 +152,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
151 152
152 /// Create a new publisher that can only send immediate messages. 153 /// Create a new publisher that can only send immediate messages.
153 /// This kind of publisher does not take up a publisher slot. 154 /// This kind of publisher does not take up a publisher slot.
154 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { 155 pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self)) 156 ImmediatePublisher(ImmediatePub::new(self))
156 } 157 }
157 158
158 /// Create a new publisher that can only send immediate messages. 159 /// Create a new publisher that can only send immediate messages.
159 /// This kind of publisher does not take up a publisher slot. 160 /// This kind of publisher does not take up a publisher slot.
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { 161 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> {
161 DynImmediatePublisher(ImmediatePub::new(self)) 162 DynImmediatePublisher(ImmediatePub::new(self))
162 } 163 }
163 164
@@ -194,6 +195,25 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
194 } 195 }
195} 196}
196 197
198impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
199 for PubSubChannel<M, T, CAP, SUBS, PUBS>
200{
201 fn publish_immediate(&self, message: T) {
202 self.inner.lock(|s| {
203 let mut s = s.borrow_mut();
204 s.publish_immediate(message)
205 })
206 }
207
208 fn capacity(&self) -> usize {
209 self.capacity()
210 }
211
212 fn is_full(&self) -> bool {
213 self.is_full()
214 }
215}
216
197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> 217impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
198 for PubSubChannel<M, T, CAP, SUBS, PUBS> 218 for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{ 219{
@@ -246,13 +266,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
246 }) 266 })
247 } 267 }
248 268
249 fn publish_immediate(&self, message: T) {
250 self.inner.lock(|s| {
251 let mut s = s.borrow_mut();
252 s.publish_immediate(message)
253 })
254 }
255
256 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 269 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
257 self.inner.lock(|s| { 270 self.inner.lock(|s| {
258 let mut s = s.borrow_mut(); 271 let mut s = s.borrow_mut();
@@ -267,10 +280,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
267 }) 280 })
268 } 281 }
269 282
270 fn capacity(&self) -> usize {
271 self.capacity()
272 }
273
274 fn free_capacity(&self) -> usize { 283 fn free_capacity(&self) -> usize {
275 self.free_capacity() 284 self.free_capacity()
276 } 285 }
@@ -286,13 +295,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
286 fn is_empty(&self) -> bool { 295 fn is_empty(&self) -> bool {
287 self.is_empty() 296 self.is_empty()
288 } 297 }
289
290 fn is_full(&self) -> bool {
291 self.is_full()
292 }
293} 298}
294 299
295/// Internal state for the PubSub channel 300/// Internal state for the PubSub channel
301#[derive(Debug)]
296struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { 302struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
297 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it 303 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
298 queue: Deque<(T, usize), CAP>, 304 queue: Deque<(T, usize), CAP>,
@@ -417,6 +423,9 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
417 } 423 }
418 424
419 fn clear(&mut self) { 425 fn clear(&mut self) {
426 if self.is_full() {
427 self.publisher_wakers.wake();
428 }
420 self.queue.clear(); 429 self.queue.clear();
421 } 430 }
422 431
@@ -445,8 +454,6 @@ pub enum Error {
445 MaximumPublishersReached, 454 MaximumPublishersReached,
446} 455}
447 456
448/// 'Middle level' behaviour of the pubsub channel.
449/// This trait is used so that Sub and Pub can be generic over the channel.
450trait SealedPubSubBehavior<T> { 457trait SealedPubSubBehavior<T> {
451 /// Try to get a message from the queue with the given message id. 458 /// Try to get a message from the queue with the given message id.
452 /// 459 ///
@@ -462,12 +469,6 @@ trait SealedPubSubBehavior<T> {
462 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. 469 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
463 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; 470 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
464 471
465 /// Publish a message immediately
466 fn publish_immediate(&self, message: T);
467
468 /// Returns the maximum number of elements the channel can hold.
469 fn capacity(&self) -> usize;
470
471 /// Returns the free capacity of the channel. 472 /// Returns the free capacity of the channel.
472 /// 473 ///
473 /// This is equivalent to `capacity() - len()` 474 /// This is equivalent to `capacity() - len()`
@@ -482,9 +483,6 @@ trait SealedPubSubBehavior<T> {
482 /// Returns whether the channel is empty. 483 /// Returns whether the channel is empty.
483 fn is_empty(&self) -> bool; 484 fn is_empty(&self) -> bool;
484 485
485 /// Returns whether the channel is full.
486 fn is_full(&self) -> bool;
487
488 /// Let the channel know that a subscriber has dropped 486 /// Let the channel know that a subscriber has dropped
489 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 487 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
490 488
@@ -495,9 +493,16 @@ trait SealedPubSubBehavior<T> {
495/// 'Middle level' behaviour of the pubsub channel. 493/// 'Middle level' behaviour of the pubsub channel.
496/// This trait is used so that Sub and Pub can be generic over the channel. 494/// This trait is used so that Sub and Pub can be generic over the channel.
497#[allow(private_bounds)] 495#[allow(private_bounds)]
498pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {} 496pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
497 /// Publish a message immediately
498 fn publish_immediate(&self, message: T);
499 499
500impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {} 500 /// Returns the maximum number of elements the channel can hold.
501 fn capacity(&self) -> usize;
502
503 /// Returns whether the channel is full.
504 fn is_full(&self) -> bool;
505}
501 506
502/// The result of the subscriber wait procedure 507/// The result of the subscriber wait procedure
503#[derive(Debug, Clone, PartialEq, Eq)] 508#[derive(Debug, Clone, PartialEq, Eq)]
@@ -755,4 +760,30 @@ mod tests {
755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0); 760 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0); 761 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
757 } 762 }
763
764 #[futures_test::test]
765 async fn publisher_sink() {
766 use futures_util::{SinkExt, StreamExt};
767
768 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
769
770 let mut sub = channel.subscriber().unwrap();
771
772 let publ = channel.publisher().unwrap();
773 let mut sink = publ.sink();
774
775 sink.send(0).await.unwrap();
776 assert_eq!(0, sub.try_next_message_pure().unwrap());
777
778 sink.send(1).await.unwrap();
779 assert_eq!(1, sub.try_next_message_pure().unwrap());
780
781 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
782 .await
783 .unwrap();
784 assert_eq!(0, sub.try_next_message_pure().unwrap());
785 assert_eq!(1, sub.try_next_message_pure().unwrap());
786 assert_eq!(2, sub.try_next_message_pure().unwrap());
787 assert_eq!(3, sub.try_next_message_pure().unwrap());
788 }
758} 789}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e66b3b1db..2a67a0002 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex; 10use crate::blocking_mutex::raw::RawMutex;
11 11
12/// A publisher to a channel 12/// A publisher to a channel
13#[derive(Debug)]
13pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 14pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The channel we are a publisher for 15 /// The channel we are a publisher for
15 channel: &'a PSB, 16 channel: &'a PSB,
@@ -74,6 +75,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
74 pub fn is_full(&self) -> bool { 75 pub fn is_full(&self) -> bool {
75 self.channel.is_full() 76 self.channel.is_full()
76 } 77 }
78
79 /// Create a [`futures_sink::Sink`] adapter for this publisher.
80 #[inline]
81 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
82 PubSink { publ: self, fut: None }
83 }
77} 84}
78 85
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 86impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -100,6 +107,7 @@ impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
100} 107}
101 108
102/// A publisher that holds a generic reference to the channel 109/// A publisher that holds a generic reference to the channel
110#[derive(Debug)]
103pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( 111pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
104 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, 112 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
105); 113);
@@ -124,6 +132,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
124 132
125/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. 133/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
126/// (So an infinite amount is possible) 134/// (So an infinite amount is possible)
135#[derive(Debug)]
127pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 136pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
128 /// The channel we are a publisher for 137 /// The channel we are a publisher for
129 channel: &'a PSB, 138 channel: &'a PSB,
@@ -199,6 +208,7 @@ impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
199} 208}
200 209
201/// An immediate publisher that holds a generic reference to the channel 210/// An immediate publisher that holds a generic reference to the channel
211#[derive(Debug)]
202pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( 212pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
203 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, 213 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
204); 214);
@@ -221,8 +231,71 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
221 } 231 }
222} 232}
223 233
234#[must_use = "Sinks do nothing unless polled"]
235/// [`futures_sink::Sink`] adapter for [`Pub`].
236#[derive(Debug)]
237pub struct PubSink<'a, 'p, PSB, T>
238where
239 T: Clone,
240 PSB: PubSubBehavior<T> + ?Sized,
241{
242 publ: &'p Pub<'a, PSB, T>,
243 fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
244}
245
246impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
247where
248 PSB: PubSubBehavior<T> + ?Sized,
249 T: Clone,
250{
251 /// Try to make progress on the pending future if we have one.
252 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
253 let Some(mut fut) = self.fut.take() else {
254 return Poll::Ready(());
255 };
256
257 if Pin::new(&mut fut).poll(cx).is_pending() {
258 self.fut = Some(fut);
259 return Poll::Pending;
260 }
261
262 Poll::Ready(())
263 }
264}
265
266impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
267where
268 PSB: PubSubBehavior<T> + ?Sized,
269 T: Clone,
270{
271 type Error = core::convert::Infallible;
272
273 #[inline]
274 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
275 self.poll(cx).map(Ok)
276 }
277
278 #[inline]
279 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
280 self.fut = Some(self.publ.publish(item));
281
282 Ok(())
283 }
284
285 #[inline]
286 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 self.poll(cx).map(Ok)
288 }
289
290 #[inline]
291 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
292 self.poll(cx).map(Ok)
293 }
294}
295
224/// Future for the publisher wait action 296/// Future for the publisher wait action
225#[must_use = "futures do nothing unless you `.await` or poll them"] 297#[must_use = "futures do nothing unless you `.await` or poll them"]
298#[derive(Debug)]
226pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 299pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
227 /// The message we need to publish 300 /// The message we need to publish
228 message: Option<T>, 301 message: Option<T>,
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index 6ad660cb3..356de23f6 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex; 10use crate::blocking_mutex::raw::RawMutex;
11 11
12/// A subscriber to a channel 12/// A subscriber to a channel
13#[derive(Debug)]
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 14pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The message id of the next message we are yet to receive 15 /// The message id of the next message we are yet to receive
15 next_message_id: u64, 16 next_message_id: u64,
@@ -115,7 +116,7 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
115 116
116/// Warning: The stream implementation ignores lag results and returns all messages. 117/// Warning: The stream implementation ignores lag results and returns all messages.
117/// This might miss some messages without you knowing it. 118/// This might miss some messages without you knowing it.
118impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { 119impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_core::Stream for Sub<'a, PSB, T> {
119 type Item = T; 120 type Item = T;
120 121
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -151,6 +152,7 @@ impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
151} 152}
152 153
153/// A subscriber that holds a generic reference to the channel 154/// A subscriber that holds a generic reference to the channel
155#[derive(Debug)]
154pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( 156pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
155 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, 157 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
156); 158);
@@ -175,6 +177,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
175 177
176/// Future for the subscriber wait action 178/// Future for the subscriber wait action
177#[must_use = "futures do nothing unless you `.await` or poll them"] 179#[must_use = "futures do nothing unless you `.await` or poll them"]
180#[derive(Debug)]
178pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 181pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
179 subscriber: &'s mut Sub<'a, PSB, T>, 182 subscriber: &'s mut Sub<'a, PSB, T>,
180} 183}
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs
index 81e60c42b..608447cd6 100644
--- a/embassy-sync/src/ring_buffer.rs
+++ b/embassy-sync/src/ring_buffer.rs
@@ -1,5 +1,6 @@
1use core::ops::Range; 1use core::ops::Range;
2 2
3#[derive(Debug)]
3pub struct RingBuffer<const N: usize> { 4pub struct RingBuffer<const N: usize> {
4 start: usize, 5 start: usize,
5 end: usize, 6 end: usize,
@@ -94,11 +95,7 @@ impl<const N: usize> RingBuffer<N> {
94 95
95 fn wrap(&self, n: usize) -> usize { 96 fn wrap(&self, n: usize) -> usize {
96 assert!(n <= N); 97 assert!(n <= N);
97 if n == N { 98 if n == N { 0 } else { n }
98 0
99 } else {
100 n
101 }
102 } 99 }
103} 100}
104 101
diff --git a/embassy-sync/src/rwlock.rs b/embassy-sync/src/rwlock.rs
new file mode 100644
index 000000000..918a6aa41
--- /dev/null
+++ b/embassy-sync/src/rwlock.rs
@@ -0,0 +1,386 @@
1//! Async read-write lock.
2//!
3//! This module provides a read-write lock that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell};
5use core::fmt;
6use core::future::{Future, poll_fn};
7use core::ops::{Deref, DerefMut};
8use core::task::Poll;
9
10use crate::blocking_mutex::Mutex as BlockingMutex;
11use crate::blocking_mutex::raw::RawMutex;
12use crate::waitqueue::WakerRegistration;
13
14/// Error returned by [`RwLock::try_read`] and [`RwLock::try_write`] when the lock is already held.
15#[derive(PartialEq, Eq, Clone, Copy, Debug)]
16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError;
18
19#[derive(Debug)]
20struct State {
21 readers: usize,
22 writer: bool,
23 waker: WakerRegistration,
24}
25
26/// Async read-write lock.
27///
28/// The read-write lock is generic over the raw mutex implementation `M` and the data `T` it protects.
29/// The raw read-write lock is used to guard access to the internal state. It
30/// is held for very short periods only, while locking and unlocking. It is *not* held
31/// for the entire time the async RwLock is locked.
32///
33/// Which implementation you select depends on the context in which you're using the read-write lock.
34///
35/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts.
36///
37/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor.
38///
39/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton.
40pub struct RwLock<M, T>
41where
42 M: RawMutex,
43 T: ?Sized,
44{
45 state: BlockingMutex<M, RefCell<State>>,
46 inner: UnsafeCell<T>,
47}
48
49unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for RwLock<M, T> {}
50unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for RwLock<M, T> {}
51
52/// Async read-write lock.
53impl<M, T> RwLock<M, T>
54where
55 M: RawMutex,
56{
57 /// Create a new read-write lock with the given value.
58 pub const fn new(value: T) -> Self {
59 Self {
60 inner: UnsafeCell::new(value),
61 state: BlockingMutex::new(RefCell::new(State {
62 readers: 0,
63 writer: false,
64 waker: WakerRegistration::new(),
65 })),
66 }
67 }
68}
69
70impl<M, T> RwLock<M, T>
71where
72 M: RawMutex,
73 T: ?Sized,
74{
75 /// Lock the read-write lock for reading.
76 ///
77 /// This will wait for the lock to be available if it's already locked for writing.
78 pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, M, T>> {
79 poll_fn(|cx| {
80 let ready = self.state.lock(|s| {
81 let mut s = s.borrow_mut();
82 if s.writer {
83 s.waker.register(cx.waker());
84 false
85 } else {
86 s.readers += 1;
87 true
88 }
89 });
90
91 if ready {
92 Poll::Ready(RwLockReadGuard { rwlock: self })
93 } else {
94 Poll::Pending
95 }
96 })
97 }
98
99 /// Lock the read-write lock for writing.
100 ///
101 /// This will wait for the lock to be available if it's already locked for reading or writing.
102 pub fn write(&self) -> impl Future<Output = RwLockWriteGuard<'_, M, T>> {
103 poll_fn(|cx| {
104 let ready = self.state.lock(|s| {
105 let mut s = s.borrow_mut();
106 if s.writer || s.readers > 0 {
107 s.waker.register(cx.waker());
108 false
109 } else {
110 s.writer = true;
111 true
112 }
113 });
114
115 if ready {
116 Poll::Ready(RwLockWriteGuard { rwlock: self })
117 } else {
118 Poll::Pending
119 }
120 })
121 }
122
123 /// Attempt to immediately lock the rwlock.
124 ///
125 /// If the rwlock is already locked, this will return an error instead of waiting.
126 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, M, T>, TryLockError> {
127 self.state
128 .lock(|s| {
129 let mut s = s.borrow_mut();
130 if s.writer {
131 return Err(());
132 }
133 s.readers += 1;
134 Ok(())
135 })
136 .map_err(|_| TryLockError)?;
137
138 Ok(RwLockReadGuard { rwlock: self })
139 }
140
141 /// Attempt to immediately lock the rwlock.
142 ///
143 /// If the rwlock is already locked, this will return an error instead of waiting.
144 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, M, T>, TryLockError> {
145 self.state
146 .lock(|s| {
147 let mut s = s.borrow_mut();
148 if s.writer || s.readers > 0 {
149 return Err(());
150 }
151 s.writer = true;
152 Ok(())
153 })
154 .map_err(|_| TryLockError)?;
155
156 Ok(RwLockWriteGuard { rwlock: self })
157 }
158
159 /// Consumes this read-write lock, returning the underlying data.
160 pub fn into_inner(self) -> T
161 where
162 T: Sized,
163 {
164 self.inner.into_inner()
165 }
166
167 /// Returns a mutable reference to the underlying data.
168 ///
169 /// Since this call borrows the RwLock mutably, no actual locking needs to
170 /// take place -- the mutable borrow statically guarantees no locks exist.
171 pub fn get_mut(&mut self) -> &mut T {
172 self.inner.get_mut()
173 }
174}
175
176impl<M: RawMutex, T> From<T> for RwLock<M, T> {
177 fn from(from: T) -> Self {
178 Self::new(from)
179 }
180}
181
182impl<M, T> Default for RwLock<M, T>
183where
184 M: RawMutex,
185 T: Default,
186{
187 fn default() -> Self {
188 Self::new(Default::default())
189 }
190}
191
192impl<M, T> fmt::Debug for RwLock<M, T>
193where
194 M: RawMutex,
195 T: ?Sized + fmt::Debug,
196{
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 let mut d = f.debug_struct("RwLock");
199 match self.try_read() {
200 Ok(guard) => d.field("inner", &&*guard),
201 Err(TryLockError) => d.field("inner", &"Locked"),
202 }
203 .finish_non_exhaustive()
204 }
205}
206
207/// Async read lock guard.
208///
209/// Owning an instance of this type indicates having
210/// successfully locked the read-write lock for reading, and grants access to the contents.
211///
212/// Dropping it unlocks the read-write lock.
213#[clippy::has_significant_drop]
214#[must_use = "if unused the RwLock will immediately unlock"]
215pub struct RwLockReadGuard<'a, R, T>
216where
217 R: RawMutex,
218 T: ?Sized,
219{
220 rwlock: &'a RwLock<R, T>,
221}
222
223impl<'a, M, T> Drop for RwLockReadGuard<'a, M, T>
224where
225 M: RawMutex,
226 T: ?Sized,
227{
228 fn drop(&mut self) {
229 self.rwlock.state.lock(|s| {
230 let mut s = unwrap!(s.try_borrow_mut());
231 s.readers -= 1;
232 if s.readers == 0 {
233 s.waker.wake();
234 }
235 })
236 }
237}
238
239impl<'a, M, T> Deref for RwLockReadGuard<'a, M, T>
240where
241 M: RawMutex,
242 T: ?Sized,
243{
244 type Target = T;
245 fn deref(&self) -> &Self::Target {
246 // Safety: the RwLockReadGuard represents shared access to the contents
247 // of the read-write lock, so it's OK to get it.
248 unsafe { &*(self.rwlock.inner.get() as *const T) }
249 }
250}
251
252impl<'a, M, T> fmt::Debug for RwLockReadGuard<'a, M, T>
253where
254 M: RawMutex,
255 T: ?Sized + fmt::Debug,
256{
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 fmt::Debug::fmt(&**self, f)
259 }
260}
261
262impl<'a, M, T> fmt::Display for RwLockReadGuard<'a, M, T>
263where
264 M: RawMutex,
265 T: ?Sized + fmt::Display,
266{
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 fmt::Display::fmt(&**self, f)
269 }
270}
271
272/// Async write lock guard.
273///
274/// Owning an instance of this type indicates having
275/// successfully locked the read-write lock for writing, and grants access to the contents.
276///
277/// Dropping it unlocks the read-write lock.
278#[clippy::has_significant_drop]
279#[must_use = "if unused the RwLock will immediately unlock"]
280pub struct RwLockWriteGuard<'a, R, T>
281where
282 R: RawMutex,
283 T: ?Sized,
284{
285 rwlock: &'a RwLock<R, T>,
286}
287
288impl<'a, R, T> Drop for RwLockWriteGuard<'a, R, T>
289where
290 R: RawMutex,
291 T: ?Sized,
292{
293 fn drop(&mut self) {
294 self.rwlock.state.lock(|s| {
295 let mut s = unwrap!(s.try_borrow_mut());
296 s.writer = false;
297 s.waker.wake();
298 })
299 }
300}
301
302impl<'a, R, T> Deref for RwLockWriteGuard<'a, R, T>
303where
304 R: RawMutex,
305 T: ?Sized,
306{
307 type Target = T;
308 fn deref(&self) -> &Self::Target {
309 // Safety: the RwLockWriteGuard represents exclusive access to the contents
310 // of the read-write lock, so it's OK to get it.
311 unsafe { &*(self.rwlock.inner.get() as *mut T) }
312 }
313}
314
315impl<'a, R, T> DerefMut for RwLockWriteGuard<'a, R, T>
316where
317 R: RawMutex,
318 T: ?Sized,
319{
320 fn deref_mut(&mut self) -> &mut Self::Target {
321 // Safety: the RwLockWriteGuard represents exclusive access to the contents
322 // of the read-write lock, so it's OK to get it.
323 unsafe { &mut *(self.rwlock.inner.get()) }
324 }
325}
326
327impl<'a, R, T> fmt::Debug for RwLockWriteGuard<'a, R, T>
328where
329 R: RawMutex,
330 T: ?Sized + fmt::Debug,
331{
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 fmt::Debug::fmt(&**self, f)
334 }
335}
336
337impl<'a, R, T> fmt::Display for RwLockWriteGuard<'a, R, T>
338where
339 R: RawMutex,
340 T: ?Sized + fmt::Display,
341{
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 fmt::Display::fmt(&**self, f)
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use crate::blocking_mutex::raw::NoopRawMutex;
350 use crate::rwlock::RwLock;
351
352 #[futures_test::test]
353 async fn read_guard_releases_lock_when_dropped() {
354 let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]);
355
356 {
357 let guard = rwlock.read().await;
358 assert_eq!(*guard, [0, 1]);
359 }
360
361 {
362 let guard = rwlock.read().await;
363 assert_eq!(*guard, [0, 1]);
364 }
365
366 assert_eq!(*rwlock.read().await, [0, 1]);
367 }
368
369 #[futures_test::test]
370 async fn write_guard_releases_lock_when_dropped() {
371 let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]);
372
373 {
374 let mut guard = rwlock.write().await;
375 assert_eq!(*guard, [0, 1]);
376 guard[1] = 2;
377 }
378
379 {
380 let guard = rwlock.read().await;
381 assert_eq!(*guard, [0, 2]);
382 }
383
384 assert_eq!(*rwlock.read().await, [0, 2]);
385 }
386}
diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs
index d30eee30b..8d2413931 100644
--- a/embassy-sync/src/semaphore.rs
+++ b/embassy-sync/src/semaphore.rs
@@ -1,13 +1,13 @@
1//! A synchronization primitive for controlling access to a pool of resources. 1//! A synchronization primitive for controlling access to a pool of resources.
2use core::cell::{Cell, RefCell}; 2use core::cell::{Cell, RefCell};
3use core::convert::Infallible; 3use core::convert::Infallible;
4use core::future::{poll_fn, Future}; 4use core::future::{Future, poll_fn};
5use core::task::{Poll, Waker}; 5use core::task::{Poll, Waker};
6 6
7use heapless::Deque; 7use heapless::Deque;
8 8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex; 9use crate::blocking_mutex::Mutex;
10use crate::blocking_mutex::raw::RawMutex;
11use crate::waitqueue::WakerRegistration; 11use crate::waitqueue::WakerRegistration;
12 12
13/// An asynchronous semaphore. 13/// An asynchronous semaphore.
@@ -46,6 +46,7 @@ pub trait Semaphore: Sized {
46/// A representation of a number of acquired permits. 46/// A representation of a number of acquired permits.
47/// 47///
48/// The acquired permits will be released back to the [`Semaphore`] when this is dropped. 48/// The acquired permits will be released back to the [`Semaphore`] when this is dropped.
49#[derive(Debug)]
49pub struct SemaphoreReleaser<'a, S: Semaphore> { 50pub struct SemaphoreReleaser<'a, S: Semaphore> {
50 semaphore: &'a S, 51 semaphore: &'a S,
51 permits: usize, 52 permits: usize,
@@ -181,6 +182,7 @@ impl<M: RawMutex> Semaphore for GreedySemaphore<M> {
181 } 182 }
182} 183}
183 184
185#[derive(Debug)]
184struct SemaphoreState { 186struct SemaphoreState {
185 permits: usize, 187 permits: usize,
186 waker: WakerRegistration, 188 waker: WakerRegistration,
@@ -221,6 +223,7 @@ impl SemaphoreState {
221/// 223///
222/// Up to `N` tasks may attempt to acquire permits concurrently. If additional 224/// Up to `N` tasks may attempt to acquire permits concurrently. If additional
223/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned. 225/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned.
226#[derive(Debug)]
224pub struct FairSemaphore<M, const N: usize> 227pub struct FairSemaphore<M, const N: usize>
225where 228where
226 M: RawMutex, 229 M: RawMutex,
@@ -341,6 +344,7 @@ impl<M: RawMutex, const N: usize> Semaphore for FairSemaphore<M, N> {
341 } 344 }
342} 345}
343 346
347#[derive(Debug)]
344struct FairAcquire<'a, M: RawMutex, const N: usize> { 348struct FairAcquire<'a, M: RawMutex, const N: usize> {
345 sema: &'a FairSemaphore<M, N>, 349 sema: &'a FairSemaphore<M, N>,
346 permits: usize, 350 permits: usize,
@@ -364,6 +368,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M
364 } 368 }
365} 369}
366 370
371#[derive(Debug)]
367struct FairAcquireAll<'a, M: RawMutex, const N: usize> { 372struct FairAcquireAll<'a, M: RawMutex, const N: usize> {
368 sema: &'a FairSemaphore<M, N>, 373 sema: &'a FairSemaphore<M, N>,
369 min: usize, 374 min: usize,
@@ -387,6 +392,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a
387 } 392 }
388} 393}
389 394
395#[derive(Debug)]
390struct FairSemaphoreState<const N: usize> { 396struct FairSemaphoreState<const N: usize> {
391 permits: usize, 397 permits: usize,
392 next_ticket: usize, 398 next_ticket: usize,
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index a0f4b5a74..cc02228cf 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -1,12 +1,12 @@
1//! A synchronization primitive for passing the latest value to a task. 1//! A synchronization primitive for passing the latest value to a task.
2use core::cell::Cell; 2use core::cell::Cell;
3use core::future::{poll_fn, Future}; 3use core::future::{Future, poll_fn};
4use core::task::{Context, Poll, Waker}; 4use core::task::{Context, Poll, Waker};
5 5
6use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex; 6use crate::blocking_mutex::Mutex;
7use crate::blocking_mutex::raw::RawMutex;
8 8
9/// Single-slot signaling primitive. 9/// Single-slot signaling primitive for a _single_ consumer.
10/// 10///
11/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except 11/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except
12/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead 12/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead
@@ -17,6 +17,7 @@ use crate::blocking_mutex::Mutex;
17/// updates. 17/// updates.
18/// 18///
19/// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead. 19/// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead.
20/// For multiple consumers, use [`Watch`](crate::watch::Watch) instead.
20/// 21///
21/// Signals are generally declared as `static`s and then borrowed as required. 22/// Signals are generally declared as `static`s and then borrowed as required.
22/// 23///
@@ -38,6 +39,7 @@ where
38 state: Mutex<M, Cell<State<T>>>, 39 state: Mutex<M, Cell<State<T>>>,
39} 40}
40 41
42#[derive(Debug)]
41enum State<T> { 43enum State<T> {
42 None, 44 None,
43 Waiting(Waker), 45 Waiting(Waker),
@@ -81,7 +83,7 @@ where
81 83
82 /// Remove the queued value in this `Signal`, if any. 84 /// Remove the queued value in this `Signal`, if any.
83 pub fn reset(&self) { 85 pub fn reset(&self) {
84 self.state.lock(|cell| cell.set(State::None)); 86 self.try_take();
85 } 87 }
86 88
87 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { 89 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
@@ -106,7 +108,7 @@ where
106 }) 108 })
107 } 109 }
108 110
109 /// Future that completes when this Signal has been signaled. 111 /// Future that completes when this Signal has been signaled, taking the value out of the signal.
110 pub fn wait(&self) -> impl Future<Output = T> + '_ { 112 pub fn wait(&self) -> impl Future<Output = T> + '_ {
111 poll_fn(move |cx| self.poll_wait(cx)) 113 poll_fn(move |cx| self.poll_wait(cx))
112 } 114 }
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs
index 63fe04a6e..d2bf890e5 100644
--- a/embassy-sync/src/waitqueue/atomic_waker.rs
+++ b/embassy-sync/src/waitqueue/atomic_waker.rs
@@ -1,26 +1,28 @@
1use core::cell::Cell; 1use core::cell::Cell;
2use core::task::Waker; 2use core::task::Waker;
3 3
4use crate::blocking_mutex::raw::CriticalSectionRawMutex;
5use crate::blocking_mutex::Mutex; 4use crate::blocking_mutex::Mutex;
5use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
6 6
7/// Utility struct to register and wake a waker. 7/// Utility struct to register and wake a waker.
8pub struct AtomicWaker { 8/// If a waker is registered, registering another waker will replace the previous one without waking it.
9 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, 9/// Intended to wake a task from an interrupt. Therefore, it is generally not expected,
10/// that multiple tasks register try to register a waker simultaneously.
11pub struct GenericAtomicWaker<M: RawMutex> {
12 waker: Mutex<M, Cell<Option<Waker>>>,
10} 13}
11 14
12impl AtomicWaker { 15impl<M: RawMutex> GenericAtomicWaker<M> {
13 /// Create a new `AtomicWaker`. 16 /// Create a new `AtomicWaker`.
14 pub const fn new() -> Self { 17 pub const fn new(mutex: M) -> Self {
15 Self { 18 Self {
16 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), 19 waker: Mutex::const_new(mutex, Cell::new(None)),
17 } 20 }
18 } 21 }
19 22
20 /// Register a waker. Overwrites the previous waker, if any. 23 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&self, w: &Waker) { 24 pub fn register(&self, w: &Waker) {
22 critical_section::with(|cs| { 25 self.waker.lock(|cell| {
23 let cell = self.waker.borrow(cs);
24 cell.set(match cell.replace(None) { 26 cell.set(match cell.replace(None) {
25 Some(w2) if (w2.will_wake(w)) => Some(w2), 27 Some(w2) if (w2.will_wake(w)) => Some(w2),
26 _ => Some(w.clone()), 28 _ => Some(w.clone()),
@@ -30,8 +32,7 @@ impl AtomicWaker {
30 32
31 /// Wake the registered waker, if any. 33 /// Wake the registered waker, if any.
32 pub fn wake(&self) { 34 pub fn wake(&self) {
33 critical_section::with(|cs| { 35 self.waker.lock(|cell| {
34 let cell = self.waker.borrow(cs);
35 if let Some(w) = cell.replace(None) { 36 if let Some(w) = cell.replace(None) {
36 w.wake_by_ref(); 37 w.wake_by_ref();
37 cell.set(Some(w)); 38 cell.set(Some(w));
@@ -39,3 +40,27 @@ impl AtomicWaker {
39 }) 40 })
40 } 41 }
41} 42}
43
44/// Utility struct to register and wake a waker.
45pub struct AtomicWaker {
46 waker: GenericAtomicWaker<CriticalSectionRawMutex>,
47}
48
49impl AtomicWaker {
50 /// Create a new `AtomicWaker`.
51 pub const fn new() -> Self {
52 Self {
53 waker: GenericAtomicWaker::new(CriticalSectionRawMutex::new()),
54 }
55 }
56
57 /// Register a waker. Overwrites the previous waker, if any.
58 pub fn register(&self, w: &Waker) {
59 self.waker.register(w);
60 }
61
62 /// Wake the registered waker, if any.
63 pub fn wake(&self) {
64 self.waker.wake();
65 }
66}
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
index 5c6a96ec8..a45adeab8 100644
--- a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
+++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
@@ -4,6 +4,10 @@ use core::sync::atomic::{AtomicPtr, Ordering};
4use core::task::Waker; 4use core::task::Waker;
5 5
6/// Utility struct to register and wake a waker. 6/// Utility struct to register and wake a waker.
7/// If a waker is registered, registering another waker will replace the previous one without waking it.
8/// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected,
9/// that multiple tasks register try to register a waker simultaneously.
10#[derive(Debug)]
7pub struct AtomicWaker { 11pub struct AtomicWaker {
8 waker: AtomicPtr<()>, 12 waker: AtomicPtr<()>,
9} 13}
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 0e520bf40..56c0cd1b2 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -3,6 +3,9 @@ use core::task::Waker;
3use heapless::Vec; 3use heapless::Vec;
4 4
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6/// Queue of wakers with a maximum length of `N`.
7/// Intended for waking multiple tasks.
8#[derive(Debug)]
6pub struct MultiWakerRegistration<const N: usize> { 9pub struct MultiWakerRegistration<const N: usize> {
7 wakers: Vec<Waker, N>, 10 wakers: Vec<Waker, N>,
8} 11}
@@ -13,7 +16,9 @@ impl<const N: usize> MultiWakerRegistration<N> {
13 Self { wakers: Vec::new() } 16 Self { wakers: Vec::new() }
14 } 17 }
15 18
16 /// Register a waker. If the buffer is full the function returns it in the error 19 /// Register a waker.
20 ///
21 /// If the buffer is full, [wakes all the wakers](Self::wake), clears its buffer and registers the waker.
17 pub fn register(&mut self, w: &Waker) { 22 pub fn register(&mut self, w: &Waker) {
18 // If we already have some waker that wakes the same task as `w`, do nothing. 23 // If we already have some waker that wakes the same task as `w`, do nothing.
19 // This avoids cloning wakers, and avoids unnecessary mass-wakes. 24 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
diff --git a/embassy-sync/src/waitqueue/waker_registration.rs b/embassy-sync/src/waitqueue/waker_registration.rs
index 9b666e7c4..7f24f8fb6 100644
--- a/embassy-sync/src/waitqueue/waker_registration.rs
+++ b/embassy-sync/src/waitqueue/waker_registration.rs
@@ -2,6 +2,10 @@ use core::mem;
2use core::task::Waker; 2use core::task::Waker;
3 3
4/// Utility struct to register and wake a waker. 4/// Utility struct to register and wake a waker.
5/// If a waker is registered, registering another waker will replace the previous one.
6/// The previous waker will be woken in this case, giving it a chance to reregister itself.
7/// Although it is possible to wake multiple tasks this way,
8/// this will cause them to wake each other in a loop registering themselves.
5#[derive(Debug, Default)] 9#[derive(Debug, Default)]
6pub struct WakerRegistration { 10pub struct WakerRegistration {
7 waker: Option<Waker>, 11 waker: Option<Waker>,
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
new file mode 100644
index 000000000..0f8a8d679
--- /dev/null
+++ b/embassy-sync/src/watch.rs
@@ -0,0 +1,1127 @@
1//! A synchronization primitive for passing the latest value to **multiple** receivers.
2
3use core::cell::RefCell;
4use core::future::{Future, poll_fn};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::Mutex;
10use crate::blocking_mutex::raw::RawMutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// The `Watch` is a single-slot signaling primitive that allows _multiple_ (`N`) receivers to concurrently await
14/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
15/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
16/// value when a new one is sent, without waiting for all receivers to read the previous value.
17///
18/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
19/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
20/// always provided with the latest value.
21///
22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
24/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
25/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
26/// ```
27///
28/// use futures_executor::block_on;
29/// use embassy_sync::watch::Watch;
30/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
31///
32/// let f = async {
33///
34/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
35///
36/// // Obtain receivers and sender
37/// let mut rcv0 = WATCH.receiver().unwrap();
38/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
39/// let mut snd = WATCH.sender();
40///
41/// // No more receivers, and no update
42/// assert!(WATCH.receiver().is_none());
43/// assert_eq!(rcv1.try_changed(), None);
44///
45/// snd.send(10);
46///
47/// // Receive the new value (async or try)
48/// assert_eq!(rcv0.changed().await, 10);
49/// assert_eq!(rcv1.try_changed(), Some(10));
50///
51/// // No update
52/// assert_eq!(rcv0.try_changed(), None);
53/// assert_eq!(rcv1.try_changed(), None);
54///
55/// snd.send(20);
56///
57/// // Using `get` marks the value as seen
58/// assert_eq!(rcv1.get().await, 20);
59/// assert_eq!(rcv1.try_changed(), None);
60///
61/// // But `get` also returns when unchanged
62/// assert_eq!(rcv1.get().await, 20);
63/// assert_eq!(rcv1.get().await, 20);
64///
65/// };
66/// block_on(f);
67/// ```
68#[derive(Debug)]
69pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
70 mutex: Mutex<M, RefCell<WatchState<T, N>>>,
71}
72
73#[derive(Debug)]
74struct WatchState<T: Clone, const N: usize> {
75 data: Option<T>,
76 current_id: u64,
77 wakers: MultiWakerRegistration<N>,
78 receiver_count: usize,
79}
80
81trait SealedWatchBehavior<T> {
82 /// Poll the `Watch` for the current value, making it as seen.
83 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
84
85 /// Poll the `Watch` for the value if it matches the predicate function
86 /// `f`, making it as seen.
87 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
88
89 /// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
90 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
91
92 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
93 fn try_changed(&self, id: &mut u64) -> Option<T>;
94
95 /// Poll the `Watch` for a changed value that matches the predicate function
96 /// `f`, marking it as seen.
97 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
98
99 /// Tries to retrieve the value of the `Watch` if it has changed and matches the
100 /// predicate function `f`, marking it as seen.
101 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
102
103 /// Used when a receiver is dropped to decrement the receiver count.
104 ///
105 /// ## This method should not be called by the user.
106 fn drop_receiver(&self);
107
108 /// Clears the value of the `Watch`.
109 fn clear(&self);
110
111 /// Sends a new value to the `Watch`.
112 fn send(&self, val: T);
113
114 /// Modify the value of the `Watch` using a closure. Returns `false` if the
115 /// `Watch` does not already contain a value.
116 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
117
118 /// Modify the value of the `Watch` using a closure. Returns `false` if the
119 /// `Watch` does not already contain a value.
120 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
121}
122
123/// A trait representing the 'inner' behavior of the `Watch`.
124#[allow(private_bounds)]
125pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
126 /// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
127 fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
128
129 /// Tries to get the value of the `Watch` if it matches the predicate function
130 /// `f`, marking it as seen.
131 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
132
133 /// Checks if the `Watch` is been initialized with a value.
134 fn contains_value(&self) -> bool;
135}
136
137impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
138 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
139 self.mutex.lock(|state| {
140 let mut s = state.borrow_mut();
141 match &s.data {
142 Some(data) => {
143 *id = s.current_id;
144 Poll::Ready(data.clone())
145 }
146 None => {
147 s.wakers.register(cx.waker());
148 Poll::Pending
149 }
150 }
151 })
152 }
153
154 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
155 self.mutex.lock(|state| {
156 let mut s = state.borrow_mut();
157 match s.data {
158 Some(ref data) if f(data) => {
159 *id = s.current_id;
160 Poll::Ready(data.clone())
161 }
162 _ => {
163 s.wakers.register(cx.waker());
164 Poll::Pending
165 }
166 }
167 })
168 }
169
170 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
171 self.mutex.lock(|state| {
172 let mut s = state.borrow_mut();
173 match (&s.data, s.current_id > *id) {
174 (Some(data), true) => {
175 *id = s.current_id;
176 Poll::Ready(data.clone())
177 }
178 _ => {
179 s.wakers.register(cx.waker());
180 Poll::Pending
181 }
182 }
183 })
184 }
185
186 fn try_changed(&self, id: &mut u64) -> Option<T> {
187 self.mutex.lock(|state| {
188 let s = state.borrow();
189 match s.current_id > *id {
190 true => {
191 *id = s.current_id;
192 s.data.clone()
193 }
194 false => None,
195 }
196 })
197 }
198
199 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
200 self.mutex.lock(|state| {
201 let mut s = state.borrow_mut();
202 match (&s.data, s.current_id > *id) {
203 (Some(data), true) if f(data) => {
204 *id = s.current_id;
205 Poll::Ready(data.clone())
206 }
207 _ => {
208 s.wakers.register(cx.waker());
209 Poll::Pending
210 }
211 }
212 })
213 }
214
215 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
216 self.mutex.lock(|state| {
217 let s = state.borrow();
218 match (&s.data, s.current_id > *id) {
219 (Some(data), true) if f(data) => {
220 *id = s.current_id;
221 s.data.clone()
222 }
223 _ => None,
224 }
225 })
226 }
227
228 fn drop_receiver(&self) {
229 self.mutex.lock(|state| {
230 let mut s = state.borrow_mut();
231 s.receiver_count -= 1;
232 })
233 }
234
235 fn clear(&self) {
236 self.mutex.lock(|state| {
237 let mut s = state.borrow_mut();
238 s.data = None;
239 })
240 }
241
242 fn send(&self, val: T) {
243 self.mutex.lock(|state| {
244 let mut s = state.borrow_mut();
245 s.data = Some(val);
246 s.current_id += 1;
247 s.wakers.wake();
248 })
249 }
250
251 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
252 self.mutex.lock(|state| {
253 let mut s = state.borrow_mut();
254 f(&mut s.data);
255 s.current_id += 1;
256 s.wakers.wake();
257 })
258 }
259
260 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
261 self.mutex.lock(|state| {
262 let mut s = state.borrow_mut();
263 if f(&mut s.data) {
264 s.current_id += 1;
265 s.wakers.wake();
266 }
267 })
268 }
269}
270
271impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
272 fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
273 self.mutex.lock(|state| {
274 let s = state.borrow();
275 if let Some(id) = id {
276 *id = s.current_id;
277 }
278 s.data.clone()
279 })
280 }
281
282 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
283 self.mutex.lock(|state| {
284 let s = state.borrow();
285 match s.data {
286 Some(ref data) if f(data) => {
287 if let Some(id) = id {
288 *id = s.current_id;
289 }
290 Some(data.clone())
291 }
292 _ => None,
293 }
294 })
295 }
296
297 fn contains_value(&self) -> bool {
298 self.mutex.lock(|state| state.borrow().data.is_some())
299 }
300}
301
302impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
303 /// Create a new `Watch` channel for `N` receivers.
304 pub const fn new() -> Self {
305 Self {
306 mutex: Mutex::new(RefCell::new(WatchState {
307 data: None,
308 current_id: 0,
309 wakers: MultiWakerRegistration::new(),
310 receiver_count: 0,
311 })),
312 }
313 }
314
315 /// Create a new `Watch` channel with default data.
316 pub const fn new_with(data: T) -> Self {
317 Self {
318 mutex: Mutex::new(RefCell::new(WatchState {
319 data: Some(data),
320 current_id: 0,
321 wakers: MultiWakerRegistration::new(),
322 receiver_count: 0,
323 })),
324 }
325 }
326
327 /// Create a new [`Sender`] for the `Watch`.
328 pub fn sender(&self) -> Sender<'_, M, T, N> {
329 Sender(Snd::new(self))
330 }
331
332 /// Create a new [`DynSender`] for the `Watch`.
333 pub fn dyn_sender(&self) -> DynSender<'_, T> {
334 DynSender(Snd::new(self))
335 }
336
337 /// Try to create a new [`Receiver`] for the `Watch`. If the
338 /// maximum number of receivers has been reached, `None` is returned.
339 pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
340 self.mutex.lock(|state| {
341 let mut s = state.borrow_mut();
342 if s.receiver_count < N {
343 s.receiver_count += 1;
344 Some(Receiver(Rcv::new(self, 0)))
345 } else {
346 None
347 }
348 })
349 }
350
351 /// Try to create a new [`DynReceiver`] for the `Watch`. If the
352 /// maximum number of receivers has been reached, `None` is returned.
353 pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
354 self.mutex.lock(|state| {
355 let mut s = state.borrow_mut();
356 if s.receiver_count < N {
357 s.receiver_count += 1;
358 Some(DynReceiver(Rcv::new(self, 0)))
359 } else {
360 None
361 }
362 })
363 }
364
365 /// Try to create a new [`AnonReceiver`] for the `Watch`.
366 pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
367 AnonReceiver(AnonRcv::new(self, 0))
368 }
369
370 /// Try to create a new [`DynAnonReceiver`] for the `Watch`.
371 pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
372 DynAnonReceiver(AnonRcv::new(self, 0))
373 }
374
375 /// Returns the message ID of the latest message sent to the `Watch`.
376 ///
377 /// This counter is monotonic, and is incremented every time a new message is sent.
378 pub fn get_msg_id(&self) -> u64 {
379 self.mutex.lock(|state| state.borrow().current_id)
380 }
381
382 /// Tries to get the value of the `Watch`.
383 pub fn try_get(&self) -> Option<T> {
384 WatchBehavior::try_get(self, None)
385 }
386
387 /// Tries to get the value of the `Watch` if it matches the predicate function `f`.
388 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
389 where
390 F: Fn(&T) -> bool,
391 {
392 WatchBehavior::try_get_and(self, None, &mut f)
393 }
394}
395
396/// A receiver can `.await` a change in the `Watch` value.
397#[derive(Debug)]
398pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
399 watch: &'a W,
400 _phantom: PhantomData<T>,
401}
402
403impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
404 fn clone(&self) -> Self {
405 Self {
406 watch: self.watch,
407 _phantom: PhantomData,
408 }
409 }
410}
411
412impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
413 /// Creates a new `Receiver` with a reference to the `Watch`.
414 fn new(watch: &'a W) -> Self {
415 Self {
416 watch,
417 _phantom: PhantomData,
418 }
419 }
420
421 /// Sends a new value to the `Watch`.
422 pub fn send(&self, val: T) {
423 self.watch.send(val)
424 }
425
426 /// Clears the value of the `Watch`.
427 /// This will cause calls to [`Rcv::get`] to be pending.
428 pub fn clear(&self) {
429 self.watch.clear()
430 }
431
432 /// Tries to retrieve the value of the `Watch`.
433 pub fn try_get(&self) -> Option<T> {
434 self.watch.try_get(None)
435 }
436
437 /// Tries to peek the current value of the `Watch` if it matches the predicate
438 /// function `f`.
439 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
440 where
441 F: Fn(&T) -> bool,
442 {
443 self.watch.try_get_and(None, &mut f)
444 }
445
446 /// Returns true if the `Watch` contains a value.
447 pub fn contains_value(&self) -> bool {
448 self.watch.contains_value()
449 }
450
451 /// Modify the value of the `Watch` using a closure.
452 pub fn send_modify<F>(&self, mut f: F)
453 where
454 F: Fn(&mut Option<T>),
455 {
456 self.watch.send_modify(&mut f)
457 }
458
459 /// Modify the value of the `Watch` using a closure. The closure must return
460 /// `true` if the value was modified, which notifies all receivers.
461 pub fn send_if_modified<F>(&self, mut f: F)
462 where
463 F: Fn(&mut Option<T>) -> bool,
464 {
465 self.watch.send_if_modified(&mut f)
466 }
467}
468
469/// A sender of a `Watch` channel.
470///
471/// For a simpler type definition, consider [`DynSender`] at the expense of
472/// some runtime performance due to dynamic dispatch.
473#[derive(Debug)]
474pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
475
476impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
477 fn clone(&self) -> Self {
478 Self(self.0.clone())
479 }
480}
481
482impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
483 /// Converts the `Sender` into a [`DynSender`].
484 pub fn as_dyn(self) -> DynSender<'a, T> {
485 DynSender(Snd::new(self.watch))
486 }
487}
488
489impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
490 fn into(self) -> DynSender<'a, T> {
491 self.as_dyn()
492 }
493}
494
495impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
496 type Target = Snd<'a, T, Watch<M, T, N>>;
497
498 fn deref(&self) -> &Self::Target {
499 &self.0
500 }
501}
502
503impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
504 fn deref_mut(&mut self) -> &mut Self::Target {
505 &mut self.0
506 }
507}
508
509/// A sender which holds a **dynamic** reference to a `Watch` channel.
510///
511/// This is an alternative to [`Sender`] with a simpler type definition,
512pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
513
514impl<'a, T: Clone> Clone for DynSender<'a, T> {
515 fn clone(&self) -> Self {
516 Self(self.0.clone())
517 }
518}
519
520impl<'a, T: Clone> Deref for DynSender<'a, T> {
521 type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
522
523 fn deref(&self) -> &Self::Target {
524 &self.0
525 }
526}
527
528impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
529 fn deref_mut(&mut self) -> &mut Self::Target {
530 &mut self.0
531 }
532}
533
534/// A receiver can `.await` a change in the `Watch` value.
535pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
536 watch: &'a W,
537 at_id: u64,
538 _phantom: PhantomData<T>,
539}
540
541impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
542 /// Creates a new `Receiver` with a reference to the `Watch`.
543 fn new(watch: &'a W, at_id: u64) -> Self {
544 Self {
545 watch,
546 at_id,
547 _phantom: PhantomData,
548 }
549 }
550
551 /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
552 ///
553 /// **Note**: Futures do nothing unless you `.await` or poll them.
554 pub fn get(&mut self) -> impl Future<Output = T> + '_ {
555 poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
556 }
557
558 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
559 pub fn try_get(&mut self) -> Option<T> {
560 self.watch.try_get(Some(&mut self.at_id))
561 }
562
563 /// Returns the value of the `Watch` if it matches the predicate function `f`,
564 /// or waits for it to match, marking it as seen.
565 ///
566 /// **Note**: Futures do nothing unless you `.await` or poll them.
567 pub async fn get_and<F>(&mut self, mut f: F) -> T
568 where
569 F: Fn(&T) -> bool,
570 {
571 poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
572 }
573
574 /// Tries to get the current value of the `Watch` if it matches the predicate
575 /// function `f` without waiting, marking it as seen.
576 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
577 where
578 F: Fn(&T) -> bool,
579 {
580 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
581 }
582
583 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
584 ///
585 /// **Note**: Futures do nothing unless you `.await` or poll them.
586 pub async fn changed(&mut self) -> T {
587 poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
588 }
589
590 /// Tries to get the new value of the watch without waiting, marking it as seen.
591 pub fn try_changed(&mut self) -> Option<T> {
592 self.watch.try_changed(&mut self.at_id)
593 }
594
595 /// Waits for the `Watch` to change to a value which satisfies the predicate
596 /// function `f` and returns the new value, marking it as seen.
597 ///
598 /// **Note**: Futures do nothing unless you `.await` or poll them.
599 pub async fn changed_and<F>(&mut self, mut f: F) -> T
600 where
601 F: Fn(&T) -> bool,
602 {
603 poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
604 }
605
606 /// Tries to get the new value of the watch which satisfies the predicate
607 /// function `f` and returns the new value without waiting, marking it as seen.
608 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
609 where
610 F: Fn(&T) -> bool,
611 {
612 self.watch.try_changed_and(&mut self.at_id, &mut f)
613 }
614
615 /// Checks if the `Watch` contains a value. If this returns true,
616 /// then awaiting [`Rcv::get`] will return immediately.
617 pub fn contains_value(&self) -> bool {
618 self.watch.contains_value()
619 }
620}
621
622impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
623 fn drop(&mut self) {
624 self.watch.drop_receiver();
625 }
626}
627
628/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
629#[derive(Debug)]
630pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
631 watch: &'a W,
632 at_id: u64,
633 _phantom: PhantomData<T>,
634}
635
636impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
637 /// Creates a new `Receiver` with a reference to the `Watch`.
638 fn new(watch: &'a W, at_id: u64) -> Self {
639 Self {
640 watch,
641 at_id,
642 _phantom: PhantomData,
643 }
644 }
645
646 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
647 pub fn try_get(&mut self) -> Option<T> {
648 self.watch.try_get(Some(&mut self.at_id))
649 }
650
651 /// Tries to get the current value of the `Watch` if it matches the predicate
652 /// function `f` without waiting, marking it as seen.
653 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
654 where
655 F: Fn(&T) -> bool,
656 {
657 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
658 }
659
660 /// Tries to get the new value of the watch without waiting, marking it as seen.
661 pub fn try_changed(&mut self) -> Option<T> {
662 self.watch.try_changed(&mut self.at_id)
663 }
664
665 /// Tries to get the new value of the watch which satisfies the predicate
666 /// function `f` and returns the new value without waiting, marking it as seen.
667 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
668 where
669 F: Fn(&T) -> bool,
670 {
671 self.watch.try_changed_and(&mut self.at_id, &mut f)
672 }
673
674 /// Checks if the `Watch` contains a value. If this returns true,
675 /// then awaiting [`Rcv::get`] will return immediately.
676 pub fn contains_value(&self) -> bool {
677 self.watch.contains_value()
678 }
679}
680
681/// A receiver of a `Watch` channel.
682pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
683
684impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
685 /// Converts the `Receiver` into a [`DynReceiver`].
686 pub fn as_dyn(self) -> DynReceiver<'a, T> {
687 let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
688 core::mem::forget(self); // Ensures the destructor is not called
689 rcv
690 }
691}
692
693impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
694 fn into(self) -> DynReceiver<'a, T> {
695 self.as_dyn()
696 }
697}
698
699impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
700 type Target = Rcv<'a, T, Watch<M, T, N>>;
701
702 fn deref(&self) -> &Self::Target {
703 &self.0
704 }
705}
706
707impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
708 fn deref_mut(&mut self) -> &mut Self::Target {
709 &mut self.0
710 }
711}
712
713/// A receiver which holds a **dynamic** reference to a `Watch` channel.
714///
715/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
716/// some runtime performance due to dynamic dispatch.
717pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
718
719impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
720 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
721
722 fn deref(&self) -> &Self::Target {
723 &self.0
724 }
725}
726
727impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
728 fn deref_mut(&mut self) -> &mut Self::Target {
729 &mut self.0
730 }
731}
732
733/// A receiver of a `Watch` channel that cannot `.await` values.
734#[derive(Debug)]
735pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
736
737impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
738 /// Converts the `Receiver` into a [`DynReceiver`].
739 pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
740 let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
741 core::mem::forget(self); // Ensures the destructor is not called
742 rcv
743 }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
747 fn into(self) -> DynAnonReceiver<'a, T> {
748 self.as_dyn()
749 }
750}
751
752impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
753 type Target = AnonRcv<'a, T, Watch<M, T, N>>;
754
755 fn deref(&self) -> &Self::Target {
756 &self.0
757 }
758}
759
760impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
761 fn deref_mut(&mut self) -> &mut Self::Target {
762 &mut self.0
763 }
764}
765
766/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
767///
768/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
769/// some runtime performance due to dynamic dispatch.
770pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
771
772impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
773 type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
774
775 fn deref(&self) -> &Self::Target {
776 &self.0
777 }
778}
779
780impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
781 fn deref_mut(&mut self) -> &mut Self::Target {
782 &mut self.0
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use futures_executor::block_on;
789
790 use super::Watch;
791 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
792
793 #[test]
794 fn multiple_sends() {
795 let f = async {
796 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
797
798 // Obtain receiver and sender
799 let mut rcv = WATCH.receiver().unwrap();
800 let snd = WATCH.sender();
801
802 // Not initialized
803 assert_eq!(rcv.try_changed(), None);
804
805 // Receive the new value
806 snd.send(10);
807 assert_eq!(rcv.changed().await, 10);
808
809 // Receive another value
810 snd.send(20);
811 assert_eq!(rcv.try_changed(), Some(20));
812
813 // No update
814 assert_eq!(rcv.try_changed(), None);
815 };
816 block_on(f);
817 }
818
819 #[test]
820 fn all_try_get() {
821 let f = async {
822 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
823
824 // Obtain receiver and sender
825 let mut rcv = WATCH.receiver().unwrap();
826 let snd = WATCH.sender();
827
828 // Not initialized
829 assert_eq!(WATCH.try_get(), None);
830 assert_eq!(rcv.try_get(), None);
831 assert_eq!(snd.try_get(), None);
832
833 // Receive the new value
834 snd.send(10);
835 assert_eq!(WATCH.try_get(), Some(10));
836 assert_eq!(rcv.try_get(), Some(10));
837 assert_eq!(snd.try_get(), Some(10));
838
839 assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
840 assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
841 assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
842
843 assert_eq!(WATCH.try_get_and(|x| x < &5), None);
844 assert_eq!(rcv.try_get_and(|x| x < &5), None);
845 assert_eq!(snd.try_get_and(|x| x < &5), None);
846 };
847 block_on(f);
848 }
849
850 #[test]
851 fn once_lock_like() {
852 let f = async {
853 static CONFIG0: u8 = 10;
854 static CONFIG1: u8 = 20;
855
856 static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
857
858 // Obtain receiver and sender
859 let mut rcv = WATCH.receiver().unwrap();
860 let snd = WATCH.sender();
861
862 // Not initialized
863 assert_eq!(rcv.try_changed(), None);
864
865 // Receive the new value
866 snd.send(&CONFIG0);
867 let rcv0 = rcv.changed().await;
868 assert_eq!(rcv0, &10);
869
870 // Receive another value
871 snd.send(&CONFIG1);
872 let rcv1 = rcv.try_changed();
873 assert_eq!(rcv1, Some(&20));
874
875 // No update
876 assert_eq!(rcv.try_changed(), None);
877
878 // Ensure similarity with original static
879 assert_eq!(rcv0, &CONFIG0);
880 assert_eq!(rcv1, Some(&CONFIG1));
881 };
882 block_on(f);
883 }
884
885 #[test]
886 fn sender_modify() {
887 let f = async {
888 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
889
890 // Obtain receiver and sender
891 let mut rcv = WATCH.receiver().unwrap();
892 let snd = WATCH.sender();
893
894 // Receive the new value
895 snd.send(10);
896 assert_eq!(rcv.try_changed(), Some(10));
897
898 // Modify the value inplace
899 snd.send_modify(|opt| {
900 if let Some(inner) = opt {
901 *inner += 5;
902 }
903 });
904
905 // Get the modified value
906 assert_eq!(rcv.try_changed(), Some(15));
907 assert_eq!(rcv.try_changed(), None);
908 };
909 block_on(f);
910 }
911
912 #[test]
913 fn predicate_fn() {
914 let f = async {
915 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
916
917 // Obtain receiver and sender
918 let mut rcv = WATCH.receiver().unwrap();
919 let snd = WATCH.sender();
920
921 snd.send(15);
922 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
923 assert_eq!(rcv.try_get_and(|x| x < &5), None);
924 assert!(rcv.try_changed().is_none());
925
926 snd.send(20);
927 assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
928 assert_eq!(rcv.try_changed_and(|x| x > &5), None);
929
930 snd.send(25);
931 assert_eq!(rcv.try_changed_and(|x| x < &5), None);
932 assert_eq!(rcv.try_changed(), Some(25));
933
934 snd.send(30);
935 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
936 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
937 };
938 block_on(f);
939 }
940
941 #[test]
942 fn receive_after_create() {
943 let f = async {
944 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
945
946 // Obtain sender and send value
947 let snd = WATCH.sender();
948 snd.send(10);
949
950 // Obtain receiver and receive value
951 let mut rcv = WATCH.receiver().unwrap();
952 assert_eq!(rcv.try_changed(), Some(10));
953 };
954 block_on(f);
955 }
956
957 #[test]
958 fn max_receivers_drop() {
959 let f = async {
960 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
961
962 // Try to create 3 receivers (only 2 can exist at once)
963 let rcv0 = WATCH.receiver();
964 let rcv1 = WATCH.receiver();
965 let rcv2 = WATCH.receiver();
966
967 // Ensure the first two are successful and the third is not
968 assert!(rcv0.is_some());
969 assert!(rcv1.is_some());
970 assert!(rcv2.is_none());
971
972 // Drop the first receiver
973 drop(rcv0);
974
975 // Create another receiver and ensure it is successful
976 let rcv3 = WATCH.receiver();
977 assert!(rcv3.is_some());
978 };
979 block_on(f);
980 }
981
982 #[test]
983 fn multiple_receivers() {
984 let f = async {
985 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
986
987 // Obtain receivers and sender
988 let mut rcv0 = WATCH.receiver().unwrap();
989 let mut rcv1 = WATCH.anon_receiver();
990 let snd = WATCH.sender();
991
992 // No update for both
993 assert_eq!(rcv0.try_changed(), None);
994 assert_eq!(rcv1.try_changed(), None);
995
996 // Send a new value
997 snd.send(0);
998
999 // Both receivers receive the new value
1000 assert_eq!(rcv0.try_changed(), Some(0));
1001 assert_eq!(rcv1.try_changed(), Some(0));
1002 };
1003 block_on(f);
1004 }
1005
1006 #[test]
1007 fn clone_senders() {
1008 let f = async {
1009 // Obtain different ways to send
1010 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1011 let snd0 = WATCH.sender();
1012 let snd1 = snd0.clone();
1013
1014 // Obtain Receiver
1015 let mut rcv = WATCH.receiver().unwrap().as_dyn();
1016
1017 // Send a value from first sender
1018 snd0.send(10);
1019 assert_eq!(rcv.try_changed(), Some(10));
1020
1021 // Send a value from second sender
1022 snd1.send(20);
1023 assert_eq!(rcv.try_changed(), Some(20));
1024 };
1025 block_on(f);
1026 }
1027
1028 #[test]
1029 fn use_dynamics() {
1030 let f = async {
1031 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1032
1033 // Obtain receiver and sender
1034 let mut anon_rcv = WATCH.dyn_anon_receiver();
1035 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1036 let dyn_snd = WATCH.dyn_sender();
1037
1038 // Send a value
1039 dyn_snd.send(10);
1040
1041 // Ensure the dynamic receiver receives the value
1042 assert_eq!(anon_rcv.try_changed(), Some(10));
1043 assert_eq!(dyn_rcv.try_changed(), Some(10));
1044 assert_eq!(dyn_rcv.try_changed(), None);
1045 };
1046 block_on(f);
1047 }
1048
1049 #[test]
1050 fn convert_to_dyn() {
1051 let f = async {
1052 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1053
1054 // Obtain receiver and sender
1055 let anon_rcv = WATCH.anon_receiver();
1056 let rcv = WATCH.receiver().unwrap();
1057 let snd = WATCH.sender();
1058
1059 // Convert to dynamic
1060 let mut dyn_anon_rcv = anon_rcv.as_dyn();
1061 let mut dyn_rcv = rcv.as_dyn();
1062 let dyn_snd = snd.as_dyn();
1063
1064 // Send a value
1065 dyn_snd.send(10);
1066
1067 // Ensure the dynamic receiver receives the value
1068 assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1069 assert_eq!(dyn_rcv.try_changed(), Some(10));
1070 assert_eq!(dyn_rcv.try_changed(), None);
1071 };
1072 block_on(f);
1073 }
1074
1075 #[test]
1076 fn dynamic_receiver_count() {
1077 let f = async {
1078 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1079
1080 // Obtain receiver and sender
1081 let rcv0 = WATCH.receiver();
1082 let rcv1 = WATCH.receiver();
1083 let rcv2 = WATCH.receiver();
1084
1085 // Ensure the first two are successful and the third is not
1086 assert!(rcv0.is_some());
1087 assert!(rcv1.is_some());
1088 assert!(rcv2.is_none());
1089
1090 // Convert to dynamic
1091 let dyn_rcv0 = rcv0.unwrap().as_dyn();
1092
1093 // Drop the (now dynamic) receiver
1094 drop(dyn_rcv0);
1095
1096 // Create another receiver and ensure it is successful
1097 let rcv3 = WATCH.receiver();
1098 let rcv4 = WATCH.receiver();
1099 assert!(rcv3.is_some());
1100 assert!(rcv4.is_none());
1101 };
1102 block_on(f);
1103 }
1104
1105 #[test]
1106 fn contains_value() {
1107 let f = async {
1108 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1109
1110 // Obtain receiver and sender
1111 let rcv = WATCH.receiver().unwrap();
1112 let snd = WATCH.sender();
1113
1114 // check if the watch contains a value
1115 assert_eq!(rcv.contains_value(), false);
1116 assert_eq!(snd.contains_value(), false);
1117
1118 // Send a value
1119 snd.send(10);
1120
1121 // check if the watch contains a value
1122 assert_eq!(rcv.contains_value(), true);
1123 assert_eq!(snd.contains_value(), true);
1124 };
1125 block_on(f);
1126 }
1127}
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
index cfce9a571..c572592b8 100644
--- a/embassy-sync/src/zerocopy_channel.rs
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -15,12 +15,12 @@
15//! another message will result in an error being returned. 15//! another message will result in an error being returned.
16 16
17use core::cell::RefCell; 17use core::cell::RefCell;
18use core::future::poll_fn; 18use core::future::{Future, poll_fn};
19use core::marker::PhantomData; 19use core::marker::PhantomData;
20use core::task::{Context, Poll}; 20use core::task::{Context, Poll};
21 21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex; 22use crate::blocking_mutex::Mutex;
23use crate::blocking_mutex::raw::RawMutex;
24use crate::waitqueue::WakerRegistration; 24use crate::waitqueue::WakerRegistration;
25 25
26/// A bounded zero-copy channel for communicating between asynchronous tasks 26/// A bounded zero-copy channel for communicating between asynchronous tasks
@@ -34,8 +34,9 @@ use crate::waitqueue::WakerRegistration;
34/// 34///
35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through 35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
36/// an `&mut T`. 36/// an `&mut T`.
37#[derive(Debug)]
37pub struct Channel<'a, M: RawMutex, T> { 38pub struct Channel<'a, M: RawMutex, T> {
38 buf: *mut T, 39 buf: BufferPtr<T>,
39 phantom: PhantomData<&'a mut T>, 40 phantom: PhantomData<&'a mut T>,
40 state: Mutex<M, RefCell<State>>, 41 state: Mutex<M, RefCell<State>>,
41} 42}
@@ -50,10 +51,10 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
50 assert!(len != 0); 51 assert!(len != 0);
51 52
52 Self { 53 Self {
53 buf: buf.as_mut_ptr(), 54 buf: BufferPtr(buf.as_mut_ptr()),
54 phantom: PhantomData, 55 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State { 56 state: Mutex::new(RefCell::new(State {
56 len, 57 capacity: len,
57 front: 0, 58 front: 0,
58 back: 0, 59 back: 0,
59 full: false, 60 full: false,
@@ -70,9 +71,45 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
70 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { 71 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self }) 72 (Sender { channel: self }, Receiver { channel: self })
72 } 73 }
74
75 /// Clears all elements in the channel.
76 pub fn clear(&mut self) {
77 self.state.lock(|s| {
78 s.borrow_mut().clear();
79 });
80 }
81
82 /// Returns the number of elements currently in the channel.
83 pub fn len(&self) -> usize {
84 self.state.lock(|s| s.borrow().len())
85 }
86
87 /// Returns whether the channel is empty.
88 pub fn is_empty(&self) -> bool {
89 self.state.lock(|s| s.borrow().is_empty())
90 }
91
92 /// Returns whether the channel is full.
93 pub fn is_full(&self) -> bool {
94 self.state.lock(|s| s.borrow().is_full())
95 }
96}
97
98#[repr(transparent)]
99#[derive(Debug)]
100struct BufferPtr<T>(*mut T);
101
102impl<T> BufferPtr<T> {
103 unsafe fn add(&self, count: usize) -> *mut T {
104 self.0.add(count)
105 }
73} 106}
74 107
108unsafe impl<T> Send for BufferPtr<T> {}
109unsafe impl<T> Sync for BufferPtr<T> {}
110
75/// Send-only access to a [`Channel`]. 111/// Send-only access to a [`Channel`].
112#[derive(Debug)]
76pub struct Sender<'a, M: RawMutex, T> { 113pub struct Sender<'a, M: RawMutex, T> {
77 channel: &'a Channel<'a, M, T>, 114 channel: &'a Channel<'a, M, T>,
78} 115}
@@ -109,12 +146,15 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
109 } 146 }
110 147
111 /// Asynchronously send a value over the channel. 148 /// Asynchronously send a value over the channel.
112 pub async fn send(&mut self) -> &mut T { 149 pub fn send(&mut self) -> impl Future<Output = &mut T> {
113 let i = poll_fn(|cx| { 150 poll_fn(|cx| {
114 self.channel.state.lock(|s| { 151 self.channel.state.lock(|s| {
115 let s = &mut *s.borrow_mut(); 152 let s = &mut *s.borrow_mut();
116 match s.push_index() { 153 match s.push_index() {
117 Some(i) => Poll::Ready(i), 154 Some(i) => {
155 let r = unsafe { &mut *self.channel.buf.add(i) };
156 Poll::Ready(r)
157 }
118 None => { 158 None => {
119 s.receive_waker.register(cx.waker()); 159 s.receive_waker.register(cx.waker());
120 Poll::Pending 160 Poll::Pending
@@ -122,23 +162,44 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
122 } 162 }
123 }) 163 })
124 }) 164 })
125 .await;
126 unsafe { &mut *self.channel.buf.add(i) }
127 } 165 }
128 166
129 /// Notify the channel that the sending of the value has been finalized. 167 /// Notify the channel that the sending of the value has been finalized.
130 pub fn send_done(&mut self) { 168 pub fn send_done(&mut self) {
131 self.channel.state.lock(|s| s.borrow_mut().push_done()) 169 self.channel.state.lock(|s| s.borrow_mut().push_done())
132 } 170 }
171
172 /// Clears all elements in the channel.
173 pub fn clear(&mut self) {
174 self.channel.state.lock(|s| {
175 s.borrow_mut().clear();
176 });
177 }
178
179 /// Returns the number of elements currently in the channel.
180 pub fn len(&self) -> usize {
181 self.channel.state.lock(|s| s.borrow().len())
182 }
183
184 /// Returns whether the channel is empty.
185 pub fn is_empty(&self) -> bool {
186 self.channel.state.lock(|s| s.borrow().is_empty())
187 }
188
189 /// Returns whether the channel is full.
190 pub fn is_full(&self) -> bool {
191 self.channel.state.lock(|s| s.borrow().is_full())
192 }
133} 193}
134 194
135/// Receive-only access to a [`Channel`]. 195/// Receive-only access to a [`Channel`].
196#[derive(Debug)]
136pub struct Receiver<'a, M: RawMutex, T> { 197pub struct Receiver<'a, M: RawMutex, T> {
137 channel: &'a Channel<'a, M, T>, 198 channel: &'a Channel<'a, M, T>,
138} 199}
139 200
140impl<'a, M: RawMutex, T> Receiver<'a, M, T> { 201impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
141 /// Creates one further [`Sender`] over the same channel. 202 /// Creates one further [`Receiver`] over the same channel.
142 pub fn borrow(&mut self) -> Receiver<'_, M, T> { 203 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
143 Receiver { channel: self.channel } 204 Receiver { channel: self.channel }
144 } 205 }
@@ -169,12 +230,15 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
169 } 230 }
170 231
171 /// Asynchronously receive a value over the channel. 232 /// Asynchronously receive a value over the channel.
172 pub async fn receive(&mut self) -> &mut T { 233 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
173 let i = poll_fn(|cx| { 234 poll_fn(|cx| {
174 self.channel.state.lock(|s| { 235 self.channel.state.lock(|s| {
175 let s = &mut *s.borrow_mut(); 236 let s = &mut *s.borrow_mut();
176 match s.pop_index() { 237 match s.pop_index() {
177 Some(i) => Poll::Ready(i), 238 Some(i) => {
239 let r = unsafe { &mut *self.channel.buf.add(i) };
240 Poll::Ready(r)
241 }
178 None => { 242 None => {
179 s.send_waker.register(cx.waker()); 243 s.send_waker.register(cx.waker());
180 Poll::Pending 244 Poll::Pending
@@ -182,18 +246,40 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
182 } 246 }
183 }) 247 })
184 }) 248 })
185 .await;
186 unsafe { &mut *self.channel.buf.add(i) }
187 } 249 }
188 250
189 /// Notify the channel that the receiving of the value has been finalized. 251 /// Notify the channel that the receiving of the value has been finalized.
190 pub fn receive_done(&mut self) { 252 pub fn receive_done(&mut self) {
191 self.channel.state.lock(|s| s.borrow_mut().pop_done()) 253 self.channel.state.lock(|s| s.borrow_mut().pop_done())
192 } 254 }
255
256 /// Clears all elements in the channel.
257 pub fn clear(&mut self) {
258 self.channel.state.lock(|s| {
259 s.borrow_mut().clear();
260 });
261 }
262
263 /// Returns the number of elements currently in the channel.
264 pub fn len(&self) -> usize {
265 self.channel.state.lock(|s| s.borrow().len())
266 }
267
268 /// Returns whether the channel is empty.
269 pub fn is_empty(&self) -> bool {
270 self.channel.state.lock(|s| s.borrow().is_empty())
271 }
272
273 /// Returns whether the channel is full.
274 pub fn is_full(&self) -> bool {
275 self.channel.state.lock(|s| s.borrow().is_full())
276 }
193} 277}
194 278
279#[derive(Debug)]
195struct State { 280struct State {
196 len: usize, 281 /// Maximum number of elements the channel can hold.
282 capacity: usize,
197 283
198 /// Front index. Always 0..=(N-1) 284 /// Front index. Always 0..=(N-1)
199 front: usize, 285 front: usize,
@@ -210,10 +296,27 @@ struct State {
210 296
211impl State { 297impl State {
212 fn increment(&self, i: usize) -> usize { 298 fn increment(&self, i: usize) -> usize {
213 if i + 1 == self.len { 299 if i + 1 == self.capacity { 0 } else { i + 1 }
214 0 300 }
301
302 fn clear(&mut self) {
303 if self.full {
304 self.receive_waker.wake();
305 }
306 self.front = 0;
307 self.back = 0;
308 self.full = false;
309 }
310
311 fn len(&self) -> usize {
312 if !self.full {
313 if self.back >= self.front {
314 self.back - self.front
315 } else {
316 self.capacity + self.back - self.front
317 }
215 } else { 318 } else {
216 i + 1 319 self.capacity
217 } 320 }
218 } 321 }
219 322