aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
author1-rafael-1 <[email protected]>2025-09-15 20:07:18 +0200
committer1-rafael-1 <[email protected]>2025-09-15 20:07:18 +0200
commit6bb3d2c0720fa082f27d3cdb70f516058497ec87 (patch)
tree5a1e255cff999b00800f203b91a759c720c973e5 /embassy-sync/src
parenteb685574601d98c44faed9a3534d056199b46e20 (diff)
parent92a6fd2946f2cbb15359290f68aa360953da2ff7 (diff)
Merge branch 'main' into rp2040-rtc-alarm
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/blocking_mutex/mod.rs1
-rw-r--r--embassy-sync/src/blocking_mutex/raw.rs2
-rw-r--r--embassy-sync/src/channel.rs225
-rw-r--r--embassy-sync/src/lazy_lock.rs24
-rw-r--r--embassy-sync/src/mutex.rs7
-rw-r--r--embassy-sync/src/once_lock.rs12
-rw-r--r--embassy-sync/src/pipe.rs16
-rw-r--r--embassy-sync/src/priority_channel.rs68
-rw-r--r--embassy-sync/src/pubsub/mod.rs10
-rw-r--r--embassy-sync/src/pubsub/publisher.rs8
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs5
-rw-r--r--embassy-sync/src/ring_buffer.rs1
-rw-r--r--embassy-sync/src/rwlock.rs1
-rw-r--r--embassy-sync/src/semaphore.rs6
-rw-r--r--embassy-sync/src/signal.rs3
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker_turbo.rs1
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs5
-rw-r--r--embassy-sync/src/watch.rs6
-rw-r--r--embassy-sync/src/zerocopy_channel.rs5
19 files changed, 385 insertions, 21 deletions
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs
index a41bc3569..11809c763 100644
--- a/embassy-sync/src/blocking_mutex/mod.rs
+++ b/embassy-sync/src/blocking_mutex/mod.rs
@@ -22,6 +22,7 @@ use self::raw::RawMutex;
22/// 22///
23/// In all cases, the blocking mutex is intended to be short lived and not held across await points. 23/// In all cases, the blocking mutex is intended to be short lived and not held across await points.
24/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. 24/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points.
25#[derive(Debug)]
25pub struct Mutex<R, T: ?Sized> { 26pub struct Mutex<R, T: ?Sized> {
26 // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets 27 // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets
27 // to run BEFORE dropping `data`. 28 // to run BEFORE dropping `data`.
diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs
index a8afcad34..50f965e00 100644
--- a/embassy-sync/src/blocking_mutex/raw.rs
+++ b/embassy-sync/src/blocking_mutex/raw.rs
@@ -37,6 +37,7 @@ pub unsafe trait RawMutex {
37/// # Safety 37/// # Safety
38/// 38///
39/// This mutex is safe to share between different executors and interrupts. 39/// This mutex is safe to share between different executors and interrupts.
40#[derive(Debug)]
40pub struct CriticalSectionRawMutex { 41pub struct CriticalSectionRawMutex {
41 _phantom: PhantomData<()>, 42 _phantom: PhantomData<()>,
42} 43}
@@ -65,6 +66,7 @@ unsafe impl RawMutex for CriticalSectionRawMutex {
65/// # Safety 66/// # Safety
66/// 67///
67/// **This Mutex is only safe within a single executor.** 68/// **This Mutex is only safe within a single executor.**
69#[derive(Debug)]
68pub struct NoopRawMutex { 70pub struct NoopRawMutex {
69 _phantom: PhantomData<*mut ()>, 71 _phantom: PhantomData<*mut ()>,
70} 72}
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 4d1fa9e39..de437cc52 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -17,6 +17,31 @@
17//! messages that it can store, and if this limit is reached, trying to send 17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned. 18//! another message will result in an error being returned.
19//! 19//!
20//! # Example: Message passing between task and interrupt handler
21//!
22//! ```rust
23//! use embassy_sync::channel::Channel;
24//! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
25//!
26//! static SHARED_CHANNEL: Channel<CriticalSectionRawMutex, u32, 8> = Channel::new();
27//!
28//! fn my_interrupt_handler() {
29//! // Do some work..
30//! // ...
31//! if let Err(e) = SHARED_CHANNEL.sender().try_send(42) {
32//! // Channel is full..
33//! }
34//! }
35//!
36//! async fn my_async_task() {
37//! // ...
38//! let receiver = SHARED_CHANNEL.receiver();
39//! loop {
40//! let data_from_interrupt = receiver.receive().await;
41//! // Do something with the data.
42//! }
43//! }
44//! ```
20 45
21use core::cell::RefCell; 46use core::cell::RefCell;
22use core::future::Future; 47use core::future::Future;
@@ -30,6 +55,7 @@ use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration; 55use crate::waitqueue::WakerRegistration;
31 56
32/// Send-only access to a [`Channel`]. 57/// Send-only access to a [`Channel`].
58#[derive(Debug)]
33pub struct Sender<'ch, M, T, const N: usize> 59pub struct Sender<'ch, M, T, const N: usize>
34where 60where
35 M: RawMutex, 61 M: RawMutex,
@@ -164,7 +190,59 @@ impl<'ch, T> DynamicSender<'ch, T> {
164 } 190 }
165} 191}
166 192
193/// Send-only access to a [`Channel`] without knowing channel size.
194/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
195pub struct SendDynamicSender<'ch, T> {
196 pub(crate) channel: &'ch dyn DynamicChannel<T>,
197}
198
199impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
200 fn clone(&self) -> Self {
201 *self
202 }
203}
204
205impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
206unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
207unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
208
209impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
210where
211 M: RawMutex + Sync + Send,
212{
213 fn from(s: Sender<'ch, M, T, N>) -> Self {
214 Self { channel: s.channel }
215 }
216}
217
218impl<'ch, T> SendDynamicSender<'ch, T> {
219 /// Sends a value.
220 ///
221 /// See [`Channel::send()`]
222 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
223 DynamicSendFuture {
224 channel: self.channel,
225 message: Some(message),
226 }
227 }
228
229 /// Attempt to immediately send a message.
230 ///
231 /// See [`Channel::send()`]
232 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
233 self.channel.try_send_with_context(message, None)
234 }
235
236 /// Allows a poll_fn to poll until the channel is ready to send
237 ///
238 /// See [`Channel::poll_ready_to_send()`]
239 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
240 self.channel.poll_ready_to_send(cx)
241 }
242}
243
167/// Receive-only access to a [`Channel`]. 244/// Receive-only access to a [`Channel`].
245#[derive(Debug)]
168pub struct Receiver<'ch, M, T, const N: usize> 246pub struct Receiver<'ch, M, T, const N: usize>
169where 247where
170 M: RawMutex, 248 M: RawMutex,
@@ -208,6 +286,16 @@ where
208 self.channel.try_receive() 286 self.channel.try_receive()
209 } 287 }
210 288
289 /// Peek at the next value without removing it from the queue.
290 ///
291 /// See [`Channel::try_peek()`]
292 pub fn try_peek(&self) -> Result<T, TryReceiveError>
293 where
294 T: Clone,
295 {
296 self.channel.try_peek()
297 }
298
211 /// Allows a poll_fn to poll until the channel is ready to receive 299 /// Allows a poll_fn to poll until the channel is ready to receive
212 /// 300 ///
213 /// See [`Channel::poll_ready_to_receive()`] 301 /// See [`Channel::poll_ready_to_receive()`]
@@ -293,6 +381,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
293 self.channel.try_receive_with_context(None) 381 self.channel.try_receive_with_context(None)
294 } 382 }
295 383
384 /// Peek at the next value without removing it from the queue.
385 ///
386 /// See [`Channel::try_peek()`]
387 pub fn try_peek(&self) -> Result<T, TryReceiveError>
388 where
389 T: Clone,
390 {
391 self.channel.try_peek_with_context(None)
392 }
393
296 /// Allows a poll_fn to poll until the channel is ready to receive 394 /// Allows a poll_fn to poll until the channel is ready to receive
297 /// 395 ///
298 /// See [`Channel::poll_ready_to_receive()`] 396 /// See [`Channel::poll_ready_to_receive()`]
@@ -317,7 +415,67 @@ where
317 } 415 }
318} 416}
319 417
320impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> 418/// Receive-only access to a [`Channel`] without knowing channel size.
419/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
420pub struct SendDynamicReceiver<'ch, T> {
421 pub(crate) channel: &'ch dyn DynamicChannel<T>,
422}
423
424/// Receive-only access to a [`Channel`] without knowing channel size.
425/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
426#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
427pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
428
429impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
430 fn clone(&self) -> Self {
431 *self
432 }
433}
434
435impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
436unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
437unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
438
439impl<'ch, T> SendDynamicReceiver<'ch, T> {
440 /// Receive the next value.
441 ///
442 /// See [`Channel::receive()`].
443 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
444 DynamicReceiveFuture { channel: self.channel }
445 }
446
447 /// Attempt to immediately receive the next value.
448 ///
449 /// See [`Channel::try_receive()`]
450 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
451 self.channel.try_receive_with_context(None)
452 }
453
454 /// Allows a poll_fn to poll until the channel is ready to receive
455 ///
456 /// See [`Channel::poll_ready_to_receive()`]
457 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
458 self.channel.poll_ready_to_receive(cx)
459 }
460
461 /// Poll the channel for the next item
462 ///
463 /// See [`Channel::poll_receive()`]
464 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
465 self.channel.poll_receive(cx)
466 }
467}
468
469impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
470where
471 M: RawMutex + Sync + Send,
472{
473 fn from(s: Receiver<'ch, M, T, N>) -> Self {
474 Self { channel: s.channel }
475 }
476}
477
478impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
321where 479where
322 M: RawMutex, 480 M: RawMutex,
323{ 481{
@@ -330,6 +488,7 @@ where
330 488
331/// Future returned by [`Channel::receive`] and [`Receiver::receive`]. 489/// Future returned by [`Channel::receive`] and [`Receiver::receive`].
332#[must_use = "futures do nothing unless you `.await` or poll them"] 490#[must_use = "futures do nothing unless you `.await` or poll them"]
491#[derive(Debug)]
333pub struct ReceiveFuture<'ch, M, T, const N: usize> 492pub struct ReceiveFuture<'ch, M, T, const N: usize>
334where 493where
335 M: RawMutex, 494 M: RawMutex,
@@ -350,6 +509,7 @@ where
350 509
351/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. 510/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`].
352#[must_use = "futures do nothing unless you `.await` or poll them"] 511#[must_use = "futures do nothing unless you `.await` or poll them"]
512#[derive(Debug)]
353pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> 513pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
354where 514where
355 M: RawMutex, 515 M: RawMutex,
@@ -393,6 +553,7 @@ impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for
393 553
394/// Future returned by [`Channel::send`] and [`Sender::send`]. 554/// Future returned by [`Channel::send`] and [`Sender::send`].
395#[must_use = "futures do nothing unless you `.await` or poll them"] 555#[must_use = "futures do nothing unless you `.await` or poll them"]
556#[derive(Debug)]
396pub struct SendFuture<'ch, M, T, const N: usize> 557pub struct SendFuture<'ch, M, T, const N: usize>
397where 558where
398 M: RawMutex, 559 M: RawMutex,
@@ -463,6 +624,10 @@ pub(crate) trait DynamicChannel<T> {
463 624
464 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; 625 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
465 626
627 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
628 where
629 T: Clone;
630
466 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; 631 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
467 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; 632 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
468 633
@@ -486,6 +651,7 @@ pub enum TrySendError<T> {
486 Full(T), 651 Full(T),
487} 652}
488 653
654#[derive(Debug)]
489struct ChannelState<T, const N: usize> { 655struct ChannelState<T, const N: usize> {
490 queue: Deque<T, N>, 656 queue: Deque<T, N>,
491 receiver_waker: WakerRegistration, 657 receiver_waker: WakerRegistration,
@@ -505,6 +671,31 @@ impl<T, const N: usize> ChannelState<T, N> {
505 self.try_receive_with_context(None) 671 self.try_receive_with_context(None)
506 } 672 }
507 673
674 fn try_peek(&mut self) -> Result<T, TryReceiveError>
675 where
676 T: Clone,
677 {
678 self.try_peek_with_context(None)
679 }
680
681 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
682 where
683 T: Clone,
684 {
685 if self.queue.is_full() {
686 self.senders_waker.wake();
687 }
688
689 if let Some(message) = self.queue.front() {
690 Ok(message.clone())
691 } else {
692 if let Some(cx) = cx {
693 self.receiver_waker.register(cx.waker());
694 }
695 Err(TryReceiveError::Empty)
696 }
697 }
698
508 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 699 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
509 if self.queue.is_full() { 700 if self.queue.is_full() {
510 self.senders_waker.wake(); 701 self.senders_waker.wake();
@@ -600,6 +791,7 @@ impl<T, const N: usize> ChannelState<T, N> {
600/// received from the channel. 791/// received from the channel.
601/// 792///
602/// All data sent will become available in the same order as it was sent. 793/// All data sent will become available in the same order as it was sent.
794#[derive(Debug)]
603pub struct Channel<M, T, const N: usize> 795pub struct Channel<M, T, const N: usize>
604where 796where
605 M: RawMutex, 797 M: RawMutex,
@@ -634,6 +826,13 @@ where
634 self.lock(|c| c.try_receive_with_context(cx)) 826 self.lock(|c| c.try_receive_with_context(cx))
635 } 827 }
636 828
829 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
830 where
831 T: Clone,
832 {
833 self.lock(|c| c.try_peek_with_context(cx))
834 }
835
637 /// Poll the channel for the next message 836 /// Poll the channel for the next message
638 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 837 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
639 self.lock(|c| c.poll_receive(cx)) 838 self.lock(|c| c.poll_receive(cx))
@@ -722,6 +921,17 @@ where
722 self.lock(|c| c.try_receive()) 921 self.lock(|c| c.try_receive())
723 } 922 }
724 923
924 /// Peek at the next value without removing it from the queue.
925 ///
926 /// This method will either receive a copy of the message from the channel immediately or return
927 /// an error if the channel is empty.
928 pub fn try_peek(&self) -> Result<T, TryReceiveError>
929 where
930 T: Clone,
931 {
932 self.lock(|c| c.try_peek())
933 }
934
725 /// Returns the maximum number of elements the channel can hold. 935 /// Returns the maximum number of elements the channel can hold.
726 pub const fn capacity(&self) -> usize { 936 pub const fn capacity(&self) -> usize {
727 N 937 N
@@ -769,6 +979,13 @@ where
769 Channel::try_receive_with_context(self, cx) 979 Channel::try_receive_with_context(self, cx)
770 } 980 }
771 981
982 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
983 where
984 T: Clone,
985 {
986 Channel::try_peek_with_context(self, cx)
987 }
988
772 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 989 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
773 Channel::poll_ready_to_send(self, cx) 990 Channel::poll_ready_to_send(self, cx)
774 } 991 }
@@ -782,7 +999,7 @@ where
782 } 999 }
783} 1000}
784 1001
785impl<M, T, const N: usize> futures_util::Stream for Channel<M, T, N> 1002impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
786where 1003where
787 M: RawMutex, 1004 M: RawMutex,
788{ 1005{
@@ -851,6 +1068,8 @@ mod tests {
851 fn simple_send_and_receive() { 1068 fn simple_send_and_receive() {
852 let c = Channel::<NoopRawMutex, u32, 3>::new(); 1069 let c = Channel::<NoopRawMutex, u32, 3>::new();
853 assert!(c.try_send(1).is_ok()); 1070 assert!(c.try_send(1).is_ok());
1071 assert_eq!(c.try_peek().unwrap(), 1);
1072 assert_eq!(c.try_peek().unwrap(), 1);
854 assert_eq!(c.try_receive().unwrap(), 1); 1073 assert_eq!(c.try_receive().unwrap(), 1);
855 } 1074 }
856 1075
@@ -881,6 +1100,8 @@ mod tests {
881 let r = c.dyn_receiver(); 1100 let r = c.dyn_receiver();
882 1101
883 assert!(s.try_send(1).is_ok()); 1102 assert!(s.try_send(1).is_ok());
1103 assert_eq!(r.try_peek().unwrap(), 1);
1104 assert_eq!(r.try_peek().unwrap(), 1);
884 assert_eq!(r.try_receive().unwrap(), 1); 1105 assert_eq!(r.try_receive().unwrap(), 1);
885 } 1106 }
886 1107
diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs
index 18e3c2019..945560a80 100644
--- a/embassy-sync/src/lazy_lock.rs
+++ b/embassy-sync/src/lazy_lock.rs
@@ -21,6 +21,7 @@ use core::sync::atomic::{AtomicBool, Ordering};
21/// let reference = VALUE.get(); 21/// let reference = VALUE.get();
22/// assert_eq!(reference, &20); 22/// assert_eq!(reference, &20);
23/// ``` 23/// ```
24#[derive(Debug)]
24pub struct LazyLock<T, F = fn() -> T> { 25pub struct LazyLock<T, F = fn() -> T> {
25 init: AtomicBool, 26 init: AtomicBool,
26 data: UnsafeCell<Data<T, F>>, 27 data: UnsafeCell<Data<T, F>>,
@@ -31,7 +32,12 @@ union Data<T, F> {
31 f: ManuallyDrop<F>, 32 f: ManuallyDrop<F>,
32} 33}
33 34
34unsafe impl<T, F> Sync for LazyLock<T, F> {} 35unsafe impl<T, F> Sync for LazyLock<T, F>
36where
37 T: Sync,
38 F: Sync,
39{
40}
35 41
36impl<T, F: FnOnce() -> T> LazyLock<T, F> { 42impl<T, F: FnOnce() -> T> LazyLock<T, F> {
37 /// Create a new uninitialized `StaticLock`. 43 /// Create a new uninitialized `StaticLock`.
@@ -52,6 +58,14 @@ impl<T, F: FnOnce() -> T> LazyLock<T, F> {
52 unsafe { &(*self.data.get()).value } 58 unsafe { &(*self.data.get()).value }
53 } 59 }
54 60
61 /// Get a mutable reference to the underlying value, initializing it if it
62 /// has not been done already.
63 #[inline]
64 pub fn get_mut(&mut self) -> &mut T {
65 self.ensure_init_fast();
66 unsafe { &mut (*self.data.get()).value }
67 }
68
55 /// Consume the `LazyLock`, returning the underlying value. The 69 /// Consume the `LazyLock`, returning the underlying value. The
56 /// initialization function will be called if it has not been 70 /// initialization function will be called if it has not been
57 /// already. 71 /// already.
@@ -117,6 +131,13 @@ mod tests {
117 assert_eq!(reference, &20); 131 assert_eq!(reference, &20);
118 } 132 }
119 #[test] 133 #[test]
134 fn test_lazy_lock_mutation() {
135 let mut value: LazyLock<u32> = LazyLock::new(|| 20);
136 *value.get_mut() = 21;
137 let reference = value.get();
138 assert_eq!(reference, &21);
139 }
140 #[test]
120 fn test_lazy_lock_into_inner() { 141 fn test_lazy_lock_into_inner() {
121 let lazy: LazyLock<u32> = LazyLock::new(|| 20); 142 let lazy: LazyLock<u32> = LazyLock::new(|| 20);
122 let value = lazy.into_inner(); 143 let value = lazy.into_inner();
@@ -124,6 +145,7 @@ mod tests {
124 } 145 }
125 146
126 static DROP_CHECKER: AtomicU32 = AtomicU32::new(0); 147 static DROP_CHECKER: AtomicU32 = AtomicU32::new(0);
148 #[derive(Debug)]
127 struct DropCheck; 149 struct DropCheck;
128 150
129 impl Drop for DropCheck { 151 impl Drop for DropCheck {
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 7528a9f68..aea682899 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -16,6 +16,7 @@ use crate::waitqueue::WakerRegistration;
16#[cfg_attr(feature = "defmt", derive(defmt::Format))] 16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError; 17pub struct TryLockError;
18 18
19#[derive(Debug)]
19struct State { 20struct State {
20 locked: bool, 21 locked: bool,
21 waker: WakerRegistration, 22 waker: WakerRegistration,
@@ -23,7 +24,7 @@ struct State {
23 24
24/// Async mutex. 25/// Async mutex.
25/// 26///
26/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). 27/// The mutex is generic over a blocking [`RawMutex`].
27/// The raw mutex is used to guard access to the internal "is locked" flag. It 28/// The raw mutex is used to guard access to the internal "is locked" flag. It
28/// is held for very short periods only, while locking and unlocking. It is *not* held 29/// is held for very short periods only, while locking and unlocking. It is *not* held
29/// for the entire time the async Mutex is locked. 30/// for the entire time the async Mutex is locked.
@@ -186,7 +187,7 @@ where
186 T: ?Sized, 187 T: ?Sized,
187{ 188{
188 /// Returns a locked view over a portion of the locked data. 189 /// Returns a locked view over a portion of the locked data.
189 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { 190 pub fn map<U: ?Sized>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
190 let mutex = this.mutex; 191 let mutex = this.mutex;
191 let value = fun(unsafe { &mut *this.mutex.inner.get() }); 192 let value = fun(unsafe { &mut *this.mutex.inner.get() });
192 // Don't run the `drop` method for MutexGuard. The ownership of the underlying 193 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
@@ -278,7 +279,7 @@ where
278 T: ?Sized, 279 T: ?Sized,
279{ 280{
280 /// Returns a locked view over a portion of the locked data. 281 /// Returns a locked view over a portion of the locked data.
281 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { 282 pub fn map<U: ?Sized>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
282 let state = this.state; 283 let state = this.state;
283 let value = fun(unsafe { &mut *this.value }); 284 let value = fun(unsafe { &mut *this.value });
284 // Don't run the `drop` method for MutexGuard. The ownership of the underlying 285 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
index cd05b986d..73edfea9a 100644
--- a/embassy-sync/src/once_lock.rs
+++ b/embassy-sync/src/once_lock.rs
@@ -1,6 +1,7 @@
1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. 1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
2 2
3use core::cell::Cell; 3use core::cell::Cell;
4use core::fmt::{Debug, Formatter};
4use core::future::{poll_fn, Future}; 5use core::future::{poll_fn, Future};
5use core::mem::MaybeUninit; 6use core::mem::MaybeUninit;
6use core::sync::atomic::{AtomicBool, Ordering}; 7use core::sync::atomic::{AtomicBool, Ordering};
@@ -42,7 +43,16 @@ pub struct OnceLock<T> {
42 data: Cell<MaybeUninit<T>>, 43 data: Cell<MaybeUninit<T>>,
43} 44}
44 45
45unsafe impl<T> Sync for OnceLock<T> {} 46impl<T> Debug for OnceLock<T> {
47 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
48 f.debug_struct("OnceLock")
49 .field("init", &self.init)
50 .field("data", &"Cell<MaybeUninit<{unprintable}>>")
51 .finish()
52 }
53}
54
55unsafe impl<T> Sync for OnceLock<T> where T: Sync {}
46 56
47impl<T> OnceLock<T> { 57impl<T> OnceLock<T> {
48 /// Create a new uninitialized `OnceLock`. 58 /// Create a new uninitialized `OnceLock`.
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 2598652d2..6d624979a 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -13,6 +13,7 @@ use crate::ring_buffer::RingBuffer;
13use crate::waitqueue::WakerRegistration; 13use crate::waitqueue::WakerRegistration;
14 14
15/// Write-only access to a [`Pipe`]. 15/// Write-only access to a [`Pipe`].
16#[derive(Debug)]
16pub struct Writer<'p, M, const N: usize> 17pub struct Writer<'p, M, const N: usize>
17where 18where
18 M: RawMutex, 19 M: RawMutex,
@@ -52,6 +53,7 @@ where
52 53
53/// Future returned by [`Pipe::write`] and [`Writer::write`]. 54/// Future returned by [`Pipe::write`] and [`Writer::write`].
54#[must_use = "futures do nothing unless you `.await` or poll them"] 55#[must_use = "futures do nothing unless you `.await` or poll them"]
56#[derive(Debug)]
55pub struct WriteFuture<'p, M, const N: usize> 57pub struct WriteFuture<'p, M, const N: usize>
56where 58where
57 M: RawMutex, 59 M: RawMutex,
@@ -77,6 +79,7 @@ where
77impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} 79impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
78 80
79/// Read-only access to a [`Pipe`]. 81/// Read-only access to a [`Pipe`].
82#[derive(Debug)]
80pub struct Reader<'p, M, const N: usize> 83pub struct Reader<'p, M, const N: usize>
81where 84where
82 M: RawMutex, 85 M: RawMutex,
@@ -128,6 +131,7 @@ where
128 131
129/// Future returned by [`Pipe::read`] and [`Reader::read`]. 132/// Future returned by [`Pipe::read`] and [`Reader::read`].
130#[must_use = "futures do nothing unless you `.await` or poll them"] 133#[must_use = "futures do nothing unless you `.await` or poll them"]
134#[derive(Debug)]
131pub struct ReadFuture<'p, M, const N: usize> 135pub struct ReadFuture<'p, M, const N: usize>
132where 136where
133 M: RawMutex, 137 M: RawMutex,
@@ -152,8 +156,9 @@ where
152 156
153impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} 157impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
154 158
155/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. 159/// Future returned by [`Reader::fill_buf`].
156#[must_use = "futures do nothing unless you `.await` or poll them"] 160#[must_use = "futures do nothing unless you `.await` or poll them"]
161#[derive(Debug)]
157pub struct FillBufFuture<'p, M, const N: usize> 162pub struct FillBufFuture<'p, M, const N: usize>
158where 163where
159 M: RawMutex, 164 M: RawMutex,
@@ -199,6 +204,7 @@ pub enum TryWriteError {
199 Full, 204 Full,
200} 205}
201 206
207#[derive(Debug)]
202struct PipeState<const N: usize> { 208struct PipeState<const N: usize> {
203 buffer: RingBuffer<N>, 209 buffer: RingBuffer<N>,
204 read_waker: WakerRegistration, 210 read_waker: WakerRegistration,
@@ -206,6 +212,7 @@ struct PipeState<const N: usize> {
206} 212}
207 213
208#[repr(transparent)] 214#[repr(transparent)]
215#[derive(Debug)]
209struct Buffer<const N: usize>(UnsafeCell<[u8; N]>); 216struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
210 217
211impl<const N: usize> Buffer<N> { 218impl<const N: usize> Buffer<N> {
@@ -230,6 +237,7 @@ unsafe impl<const N: usize> Sync for Buffer<N> {}
230/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. 237/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up.
231/// 238///
232/// All data written will become available in the same order as it was written. 239/// All data written will become available in the same order as it was written.
240#[derive(Debug)]
233pub struct Pipe<M, const N: usize> 241pub struct Pipe<M, const N: usize>
234where 242where
235 M: RawMutex, 243 M: RawMutex,
@@ -587,7 +595,7 @@ where
587 } 595 }
588} 596}
589 597
590/// Write-only access to a [`DynamicPipe`]. 598/// Write-only access to the dynamic pipe.
591pub struct DynamicWriter<'p> { 599pub struct DynamicWriter<'p> {
592 pipe: &'p dyn DynamicPipe, 600 pipe: &'p dyn DynamicPipe,
593} 601}
@@ -657,7 +665,7 @@ where
657 } 665 }
658} 666}
659 667
660/// Read-only access to a [`DynamicPipe`]. 668/// Read-only access to a dynamic pipe.
661pub struct DynamicReader<'p> { 669pub struct DynamicReader<'p> {
662 pipe: &'p dyn DynamicPipe, 670 pipe: &'p dyn DynamicPipe,
663} 671}
@@ -742,7 +750,7 @@ where
742 } 750 }
743} 751}
744 752
745/// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`]. 753/// Future returned by [`DynamicReader::fill_buf`].
746#[must_use = "futures do nothing unless you `.await` or poll them"] 754#[must_use = "futures do nothing unless you `.await` or poll them"]
747pub struct DynamicFillBufFuture<'p> { 755pub struct DynamicFillBufFuture<'p> {
748 pipe: Option<&'p dyn DynamicPipe>, 756 pipe: Option<&'p dyn DynamicPipe>,
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 36959204f..715a20e86 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -1,7 +1,7 @@
1//! A queue for sending values between asynchronous tasks. 1//! A queue for sending values between asynchronous tasks.
2//! 2//!
3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. 3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue.
4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel. 4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`] parameter of the channel.
5 5
6use core::cell::RefCell; 6use core::cell::RefCell;
7use core::future::Future; 7use core::future::Future;
@@ -175,6 +175,16 @@ where
175 self.channel.try_receive() 175 self.channel.try_receive()
176 } 176 }
177 177
178 /// Peek at the next value without removing it from the queue.
179 ///
180 /// See [`PriorityChannel::try_peek()`]
181 pub fn try_peek(&self) -> Result<T, TryReceiveError>
182 where
183 T: Clone,
184 {
185 self.channel.try_peek_with_context(None)
186 }
187
178 /// Allows a poll_fn to poll until the channel is ready to receive 188 /// Allows a poll_fn to poll until the channel is ready to receive
179 /// 189 ///
180 /// See [`PriorityChannel::poll_ready_to_receive()`] 190 /// See [`PriorityChannel::poll_ready_to_receive()`]
@@ -343,6 +353,31 @@ where
343 self.try_receive_with_context(None) 353 self.try_receive_with_context(None)
344 } 354 }
345 355
356 fn try_peek(&mut self) -> Result<T, TryReceiveError>
357 where
358 T: Clone,
359 {
360 self.try_peek_with_context(None)
361 }
362
363 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364 where
365 T: Clone,
366 {
367 if self.queue.len() == self.queue.capacity() {
368 self.senders_waker.wake();
369 }
370
371 if let Some(message) = self.queue.peek() {
372 Ok(message.clone())
373 } else {
374 if let Some(cx) = cx {
375 self.receiver_waker.register(cx.waker());
376 }
377 Err(TryReceiveError::Empty)
378 }
379 }
380
346 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 381 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
347 if self.queue.len() == self.queue.capacity() { 382 if self.queue.len() == self.queue.capacity() {
348 self.senders_waker.wake(); 383 self.senders_waker.wake();
@@ -438,7 +473,7 @@ where
438/// received from the channel. 473/// received from the channel.
439/// 474///
440/// Sent data may be reordered based on their priority within the channel. 475/// Sent data may be reordered based on their priority within the channel.
441/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] 476/// For example, in a [`Max`] [`PriorityChannel`]
442/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. 477/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
443pub struct PriorityChannel<M, T, K, const N: usize> 478pub struct PriorityChannel<M, T, K, const N: usize>
444where 479where
@@ -478,6 +513,13 @@ where
478 self.lock(|c| c.try_receive_with_context(cx)) 513 self.lock(|c| c.try_receive_with_context(cx))
479 } 514 }
480 515
516 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517 where
518 T: Clone,
519 {
520 self.lock(|c| c.try_peek_with_context(cx))
521 }
522
481 /// Poll the channel for the next message 523 /// Poll the channel for the next message
482 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 524 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
483 self.lock(|c| c.poll_receive(cx)) 525 self.lock(|c| c.poll_receive(cx))
@@ -548,6 +590,17 @@ where
548 self.lock(|c| c.try_receive()) 590 self.lock(|c| c.try_receive())
549 } 591 }
550 592
593 /// Peek at the next value without removing it from the queue.
594 ///
595 /// This method will either receive a copy of the message from the channel immediately or return
596 /// an error if the channel is empty.
597 pub fn try_peek(&self) -> Result<T, TryReceiveError>
598 where
599 T: Clone,
600 {
601 self.lock(|c| c.try_peek())
602 }
603
551 /// Removes elements from the channel based on the given predicate. 604 /// Removes elements from the channel based on the given predicate.
552 pub fn remove_if<F>(&self, predicate: F) 605 pub fn remove_if<F>(&self, predicate: F)
553 where 606 where
@@ -617,6 +670,13 @@ where
617 PriorityChannel::try_receive_with_context(self, cx) 670 PriorityChannel::try_receive_with_context(self, cx)
618 } 671 }
619 672
673 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674 where
675 T: Clone,
676 {
677 PriorityChannel::try_peek_with_context(self, cx)
678 }
679
620 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 680 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
621 PriorityChannel::poll_ready_to_send(self, cx) 681 PriorityChannel::poll_ready_to_send(self, cx)
622 } 682 }
@@ -705,6 +765,8 @@ mod tests {
705 fn simple_send_and_receive() { 765 fn simple_send_and_receive() {
706 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); 766 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
707 assert!(c.try_send(1).is_ok()); 767 assert!(c.try_send(1).is_ok());
768 assert_eq!(c.try_peek().unwrap(), 1);
769 assert_eq!(c.try_peek().unwrap(), 1);
708 assert_eq!(c.try_receive().unwrap(), 1); 770 assert_eq!(c.try_receive().unwrap(), 1);
709 } 771 }
710 772
@@ -725,6 +787,8 @@ mod tests {
725 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 787 let r: DynamicReceiver<'_, u32> = c.receiver().into();
726 788
727 assert!(s.try_send(1).is_ok()); 789 assert!(s.try_send(1).is_ok());
790 assert_eq!(r.try_peek().unwrap(), 1);
791 assert_eq!(r.try_peek().unwrap(), 1);
728 assert_eq!(r.try_receive().unwrap(), 1); 792 assert_eq!(r.try_receive().unwrap(), 1);
729 } 793 }
730 794
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 606efff0a..ad9402f5a 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -71,6 +71,7 @@ pub use subscriber::{DynSubscriber, Subscriber};
71/// # block_on(test); 71/// # block_on(test);
72/// ``` 72/// ```
73/// 73///
74#[derive(Debug)]
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { 75pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, 76 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76} 77}
@@ -88,7 +89,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
88 /// Create a new subscriber. It will only receive messages that are published after its creation. 89 /// Create a new subscriber. It will only receive messages that are published after its creation.
89 /// 90 ///
90 /// If there are no subscriber slots left, an error will be returned. 91 /// If there are no subscriber slots left, an error will be returned.
91 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { 92 pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| { 93 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut(); 94 let mut s = inner.borrow_mut();
94 95
@@ -120,7 +121,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
120 /// Create a new publisher 121 /// Create a new publisher
121 /// 122 ///
122 /// If there are no publisher slots left, an error will be returned. 123 /// If there are no publisher slots left, an error will be returned.
123 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { 124 pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| { 125 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut(); 126 let mut s = inner.borrow_mut();
126 127
@@ -151,13 +152,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
151 152
152 /// Create a new publisher that can only send immediate messages. 153 /// Create a new publisher that can only send immediate messages.
153 /// This kind of publisher does not take up a publisher slot. 154 /// This kind of publisher does not take up a publisher slot.
154 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { 155 pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self)) 156 ImmediatePublisher(ImmediatePub::new(self))
156 } 157 }
157 158
158 /// Create a new publisher that can only send immediate messages. 159 /// Create a new publisher that can only send immediate messages.
159 /// This kind of publisher does not take up a publisher slot. 160 /// This kind of publisher does not take up a publisher slot.
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { 161 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> {
161 DynImmediatePublisher(ImmediatePub::new(self)) 162 DynImmediatePublisher(ImmediatePub::new(self))
162 } 163 }
163 164
@@ -297,6 +298,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
297} 298}
298 299
299/// Internal state for the PubSub channel 300/// Internal state for the PubSub channel
301#[derive(Debug)]
300struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { 302struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
301 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it 303 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
302 queue: Deque<(T, usize), CAP>, 304 queue: Deque<(T, usize), CAP>,
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index 7a1ab66de..2a67a0002 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex; 10use crate::blocking_mutex::raw::RawMutex;
11 11
12/// A publisher to a channel 12/// A publisher to a channel
13#[derive(Debug)]
13pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 14pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The channel we are a publisher for 15 /// The channel we are a publisher for
15 channel: &'a PSB, 16 channel: &'a PSB,
@@ -75,7 +76,7 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
75 self.channel.is_full() 76 self.channel.is_full()
76 } 77 }
77 78
78 /// Create a [`futures::Sink`] adapter for this publisher. 79 /// Create a [`futures_sink::Sink`] adapter for this publisher.
79 #[inline] 80 #[inline]
80 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { 81 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
81 PubSink { publ: self, fut: None } 82 PubSink { publ: self, fut: None }
@@ -106,6 +107,7 @@ impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
106} 107}
107 108
108/// A publisher that holds a generic reference to the channel 109/// A publisher that holds a generic reference to the channel
110#[derive(Debug)]
109pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( 111pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
110 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, 112 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
111); 113);
@@ -130,6 +132,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
130 132
131/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. 133/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
132/// (So an infinite amount is possible) 134/// (So an infinite amount is possible)
135#[derive(Debug)]
133pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 136pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
134 /// The channel we are a publisher for 137 /// The channel we are a publisher for
135 channel: &'a PSB, 138 channel: &'a PSB,
@@ -205,6 +208,7 @@ impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
205} 208}
206 209
207/// An immediate publisher that holds a generic reference to the channel 210/// An immediate publisher that holds a generic reference to the channel
211#[derive(Debug)]
208pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( 212pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
209 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, 213 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
210); 214);
@@ -229,6 +233,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
229 233
230#[must_use = "Sinks do nothing unless polled"] 234#[must_use = "Sinks do nothing unless polled"]
231/// [`futures_sink::Sink`] adapter for [`Pub`]. 235/// [`futures_sink::Sink`] adapter for [`Pub`].
236#[derive(Debug)]
232pub struct PubSink<'a, 'p, PSB, T> 237pub struct PubSink<'a, 'p, PSB, T>
233where 238where
234 T: Clone, 239 T: Clone,
@@ -290,6 +295,7 @@ where
290 295
291/// Future for the publisher wait action 296/// Future for the publisher wait action
292#[must_use = "futures do nothing unless you `.await` or poll them"] 297#[must_use = "futures do nothing unless you `.await` or poll them"]
298#[derive(Debug)]
293pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 299pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
294 /// The message we need to publish 300 /// The message we need to publish
295 message: Option<T>, 301 message: Option<T>,
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index 6ad660cb3..356de23f6 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex; 10use crate::blocking_mutex::raw::RawMutex;
11 11
12/// A subscriber to a channel 12/// A subscriber to a channel
13#[derive(Debug)]
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 14pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The message id of the next message we are yet to receive 15 /// The message id of the next message we are yet to receive
15 next_message_id: u64, 16 next_message_id: u64,
@@ -115,7 +116,7 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
115 116
116/// Warning: The stream implementation ignores lag results and returns all messages. 117/// Warning: The stream implementation ignores lag results and returns all messages.
117/// This might miss some messages without you knowing it. 118/// This might miss some messages without you knowing it.
118impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { 119impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_core::Stream for Sub<'a, PSB, T> {
119 type Item = T; 120 type Item = T;
120 121
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -151,6 +152,7 @@ impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
151} 152}
152 153
153/// A subscriber that holds a generic reference to the channel 154/// A subscriber that holds a generic reference to the channel
155#[derive(Debug)]
154pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( 156pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
155 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, 157 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
156); 158);
@@ -175,6 +177,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
175 177
176/// Future for the subscriber wait action 178/// Future for the subscriber wait action
177#[must_use = "futures do nothing unless you `.await` or poll them"] 179#[must_use = "futures do nothing unless you `.await` or poll them"]
180#[derive(Debug)]
178pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 181pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
179 subscriber: &'s mut Sub<'a, PSB, T>, 182 subscriber: &'s mut Sub<'a, PSB, T>,
180} 183}
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs
index 81e60c42b..f03b7dd8f 100644
--- a/embassy-sync/src/ring_buffer.rs
+++ b/embassy-sync/src/ring_buffer.rs
@@ -1,5 +1,6 @@
1use core::ops::Range; 1use core::ops::Range;
2 2
3#[derive(Debug)]
3pub struct RingBuffer<const N: usize> { 4pub struct RingBuffer<const N: usize> {
4 start: usize, 5 start: usize,
5 end: usize, 6 end: usize,
diff --git a/embassy-sync/src/rwlock.rs b/embassy-sync/src/rwlock.rs
index deeadd167..0d784a7dc 100644
--- a/embassy-sync/src/rwlock.rs
+++ b/embassy-sync/src/rwlock.rs
@@ -16,6 +16,7 @@ use crate::waitqueue::WakerRegistration;
16#[cfg_attr(feature = "defmt", derive(defmt::Format))] 16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError; 17pub struct TryLockError;
18 18
19#[derive(Debug)]
19struct State { 20struct State {
20 readers: usize, 21 readers: usize,
21 writer: bool, 22 writer: bool,
diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs
index d30eee30b..4e82b0fcd 100644
--- a/embassy-sync/src/semaphore.rs
+++ b/embassy-sync/src/semaphore.rs
@@ -46,6 +46,7 @@ pub trait Semaphore: Sized {
46/// A representation of a number of acquired permits. 46/// A representation of a number of acquired permits.
47/// 47///
48/// The acquired permits will be released back to the [`Semaphore`] when this is dropped. 48/// The acquired permits will be released back to the [`Semaphore`] when this is dropped.
49#[derive(Debug)]
49pub struct SemaphoreReleaser<'a, S: Semaphore> { 50pub struct SemaphoreReleaser<'a, S: Semaphore> {
50 semaphore: &'a S, 51 semaphore: &'a S,
51 permits: usize, 52 permits: usize,
@@ -181,6 +182,7 @@ impl<M: RawMutex> Semaphore for GreedySemaphore<M> {
181 } 182 }
182} 183}
183 184
185#[derive(Debug)]
184struct SemaphoreState { 186struct SemaphoreState {
185 permits: usize, 187 permits: usize,
186 waker: WakerRegistration, 188 waker: WakerRegistration,
@@ -221,6 +223,7 @@ impl SemaphoreState {
221/// 223///
222/// Up to `N` tasks may attempt to acquire permits concurrently. If additional 224/// Up to `N` tasks may attempt to acquire permits concurrently. If additional
223/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned. 225/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned.
226#[derive(Debug)]
224pub struct FairSemaphore<M, const N: usize> 227pub struct FairSemaphore<M, const N: usize>
225where 228where
226 M: RawMutex, 229 M: RawMutex,
@@ -341,6 +344,7 @@ impl<M: RawMutex, const N: usize> Semaphore for FairSemaphore<M, N> {
341 } 344 }
342} 345}
343 346
347#[derive(Debug)]
344struct FairAcquire<'a, M: RawMutex, const N: usize> { 348struct FairAcquire<'a, M: RawMutex, const N: usize> {
345 sema: &'a FairSemaphore<M, N>, 349 sema: &'a FairSemaphore<M, N>,
346 permits: usize, 350 permits: usize,
@@ -364,6 +368,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M
364 } 368 }
365} 369}
366 370
371#[derive(Debug)]
367struct FairAcquireAll<'a, M: RawMutex, const N: usize> { 372struct FairAcquireAll<'a, M: RawMutex, const N: usize> {
368 sema: &'a FairSemaphore<M, N>, 373 sema: &'a FairSemaphore<M, N>,
369 min: usize, 374 min: usize,
@@ -387,6 +392,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a
387 } 392 }
388} 393}
389 394
395#[derive(Debug)]
390struct FairSemaphoreState<const N: usize> { 396struct FairSemaphoreState<const N: usize> {
391 permits: usize, 397 permits: usize,
392 next_ticket: usize, 398 next_ticket: usize,
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index e7095401e..229b1fa99 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -39,6 +39,7 @@ where
39 state: Mutex<M, Cell<State<T>>>, 39 state: Mutex<M, Cell<State<T>>>,
40} 40}
41 41
42#[derive(Debug)]
42enum State<T> { 43enum State<T> {
43 None, 44 None,
44 Waiting(Waker), 45 Waiting(Waker),
@@ -82,7 +83,7 @@ where
82 83
83 /// Remove the queued value in this `Signal`, if any. 84 /// Remove the queued value in this `Signal`, if any.
84 pub fn reset(&self) { 85 pub fn reset(&self) {
85 self.state.lock(|cell| cell.set(State::None)); 86 self.try_take();
86 } 87 }
87 88
88 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { 89 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
index c06b83056..a45adeab8 100644
--- a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
+++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
@@ -7,6 +7,7 @@ use core::task::Waker;
7/// If a waker is registered, registering another waker will replace the previous one without waking it. 7/// If a waker is registered, registering another waker will replace the previous one without waking it.
8/// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected, 8/// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected,
9/// that multiple tasks register try to register a waker simultaneously. 9/// that multiple tasks register try to register a waker simultaneously.
10#[derive(Debug)]
10pub struct AtomicWaker { 11pub struct AtomicWaker {
11 waker: AtomicPtr<()>, 12 waker: AtomicPtr<()>,
12} 13}
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 0384d6bed..56c0cd1b2 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -5,6 +5,7 @@ use heapless::Vec;
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6/// Queue of wakers with a maximum length of `N`. 6/// Queue of wakers with a maximum length of `N`.
7/// Intended for waking multiple tasks. 7/// Intended for waking multiple tasks.
8#[derive(Debug)]
8pub struct MultiWakerRegistration<const N: usize> { 9pub struct MultiWakerRegistration<const N: usize> {
9 wakers: Vec<Waker, N>, 10 wakers: Vec<Waker, N>,
10} 11}
@@ -15,7 +16,9 @@ impl<const N: usize> MultiWakerRegistration<N> {
15 Self { wakers: Vec::new() } 16 Self { wakers: Vec::new() }
16 } 17 }
17 18
18 /// Register a waker. If the buffer is full the function returns it in the error 19 /// Register a waker.
20 ///
21 /// If the buffer is full, [wakes all the wakers](Self::wake), clears its buffer and registers the waker.
19 pub fn register(&mut self, w: &Waker) { 22 pub fn register(&mut self, w: &Waker) {
20 // If we already have some waker that wakes the same task as `w`, do nothing. 23 // If we already have some waker that wakes the same task as `w`, do nothing.
21 // This avoids cloning wakers, and avoids unnecessary mass-wakes. 24 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
index 08d6a833d..332ab5405 100644
--- a/embassy-sync/src/watch.rs
+++ b/embassy-sync/src/watch.rs
@@ -65,10 +65,12 @@ use crate::waitqueue::MultiWakerRegistration;
65/// }; 65/// };
66/// block_on(f); 66/// block_on(f);
67/// ``` 67/// ```
68#[derive(Debug)]
68pub struct Watch<M: RawMutex, T: Clone, const N: usize> { 69pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
69 mutex: Mutex<M, RefCell<WatchState<T, N>>>, 70 mutex: Mutex<M, RefCell<WatchState<T, N>>>,
70} 71}
71 72
73#[derive(Debug)]
72struct WatchState<T: Clone, const N: usize> { 74struct WatchState<T: Clone, const N: usize> {
73 data: Option<T>, 75 data: Option<T>,
74 current_id: u64, 76 current_id: u64,
@@ -392,6 +394,7 @@ impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
392} 394}
393 395
394/// A receiver can `.await` a change in the `Watch` value. 396/// A receiver can `.await` a change in the `Watch` value.
397#[derive(Debug)]
395pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { 398pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
396 watch: &'a W, 399 watch: &'a W,
397 _phantom: PhantomData<T>, 400 _phantom: PhantomData<T>,
@@ -467,6 +470,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
467/// 470///
468/// For a simpler type definition, consider [`DynSender`] at the expense of 471/// For a simpler type definition, consider [`DynSender`] at the expense of
469/// some runtime performance due to dynamic dispatch. 472/// some runtime performance due to dynamic dispatch.
473#[derive(Debug)]
470pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>); 474pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
471 475
472impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { 476impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
@@ -622,6 +626,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
622} 626}
623 627
624/// A anonymous receiver can NOT `.await` a change in the `Watch` value. 628/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
629#[derive(Debug)]
625pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { 630pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
626 watch: &'a W, 631 watch: &'a W,
627 at_id: u64, 632 at_id: u64,
@@ -726,6 +731,7 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
726} 731}
727 732
728/// A receiver of a `Watch` channel that cannot `.await` values. 733/// A receiver of a `Watch` channel that cannot `.await` values.
734#[derive(Debug)]
729pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>); 735pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
730 736
731impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { 737impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
index e3e5b2538..b3f7dbe8c 100644
--- a/embassy-sync/src/zerocopy_channel.rs
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -34,6 +34,7 @@ use crate::waitqueue::WakerRegistration;
34/// 34///
35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through 35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
36/// an `&mut T`. 36/// an `&mut T`.
37#[derive(Debug)]
37pub struct Channel<'a, M: RawMutex, T> { 38pub struct Channel<'a, M: RawMutex, T> {
38 buf: BufferPtr<T>, 39 buf: BufferPtr<T>,
39 phantom: PhantomData<&'a mut T>, 40 phantom: PhantomData<&'a mut T>,
@@ -95,6 +96,7 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
95} 96}
96 97
97#[repr(transparent)] 98#[repr(transparent)]
99#[derive(Debug)]
98struct BufferPtr<T>(*mut T); 100struct BufferPtr<T>(*mut T);
99 101
100impl<T> BufferPtr<T> { 102impl<T> BufferPtr<T> {
@@ -107,6 +109,7 @@ unsafe impl<T> Send for BufferPtr<T> {}
107unsafe impl<T> Sync for BufferPtr<T> {} 109unsafe impl<T> Sync for BufferPtr<T> {}
108 110
109/// Send-only access to a [`Channel`]. 111/// Send-only access to a [`Channel`].
112#[derive(Debug)]
110pub struct Sender<'a, M: RawMutex, T> { 113pub struct Sender<'a, M: RawMutex, T> {
111 channel: &'a Channel<'a, M, T>, 114 channel: &'a Channel<'a, M, T>,
112} 115}
@@ -190,6 +193,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
190} 193}
191 194
192/// Receive-only access to a [`Channel`]. 195/// Receive-only access to a [`Channel`].
196#[derive(Debug)]
193pub struct Receiver<'a, M: RawMutex, T> { 197pub struct Receiver<'a, M: RawMutex, T> {
194 channel: &'a Channel<'a, M, T>, 198 channel: &'a Channel<'a, M, T>,
195} 199}
@@ -272,6 +276,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
272 } 276 }
273} 277}
274 278
279#[derive(Debug)]
275struct State { 280struct State {
276 /// Maximum number of elements the channel can hold. 281 /// Maximum number of elements the channel can hold.
277 capacity: usize, 282 capacity: usize,