From 042abc805a84d09231174e41edb0e498baaf7295 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Wed, 28 May 2025 10:36:05 +0200 Subject: feat: add support for channel peek Add support for peeking into the front of the channel if the value implements Clone. This can be useful in single-receiver situations where you don't want to remove the item from the queue until you've successfully processed it. --- embassy-sync/src/channel.rs | 78 ++++++++++++++++++++++++++++++++++++ embassy-sync/src/priority_channel.rs | 64 +++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 4d1fa9e39..a229c52e7 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -208,6 +208,16 @@ where self.channel.try_receive() } + /// Peek at the next value without removing it from the queue. + /// + /// See [`Channel::try_peek()`] + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.channel.try_peek() + } + /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`Channel::poll_ready_to_receive()`] @@ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> { self.channel.try_receive_with_context(None) } + /// Peek at the next value without removing it from the queue. + /// + /// See [`Channel::try_peek()`] + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.channel.try_peek_with_context(None) + } + /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`Channel::poll_ready_to_receive()`] @@ -463,6 +483,10 @@ pub(crate) trait DynamicChannel { fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone; + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; @@ -505,6 +529,31 @@ impl ChannelState { self.try_receive_with_context(None) } + fn try_peek(&mut self) -> Result + where + T: Clone, + { + self.try_peek_with_context(None) + } + + fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.front() { + Ok(message.clone()) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryReceiveError::Empty) + } + } + fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { if self.queue.is_full() { self.senders_waker.wake(); @@ -634,6 +683,13 @@ where self.lock(|c| c.try_receive_with_context(cx)) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek_with_context(cx)) + } + /// Poll the channel for the next message pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { self.lock(|c| c.poll_receive(cx)) @@ -722,6 +778,17 @@ where self.lock(|c| c.try_receive()) } + /// Peek at the next value without removing it from the queue. + /// + /// This method will either receive a copy of the message from the channel immediately or return + /// an error if the channel is empty. + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek()) + } + /// Returns the maximum number of elements the channel can hold. pub const fn capacity(&self) -> usize { N @@ -769,6 +836,13 @@ where Channel::try_receive_with_context(self, cx) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + Channel::try_peek_with_context(self, cx) + } + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { Channel::poll_ready_to_send(self, cx) } @@ -851,6 +925,8 @@ mod tests { fn simple_send_and_receive() { let c = Channel::::new(); assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_peek().unwrap(), 1); + assert_eq!(c.try_peek().unwrap(), 1); assert_eq!(c.try_receive().unwrap(), 1); } @@ -881,6 +957,8 @@ mod tests { let r = c.dyn_receiver(); assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_peek().unwrap(), 1); + assert_eq!(r.try_peek().unwrap(), 1); assert_eq!(r.try_receive().unwrap(), 1); } diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 36959204f..623c52993 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -175,6 +175,16 @@ where self.channel.try_receive() } + /// Peek at the next value without removing it from the queue. + /// + /// See [`PriorityChannel::try_peek()`] + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.channel.try_peek_with_context(None) + } + /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`PriorityChannel::poll_ready_to_receive()`] @@ -343,6 +353,31 @@ where self.try_receive_with_context(None) } + fn try_peek(&mut self) -> Result + where + T: Clone, + { + self.try_peek_with_context(None) + } + + fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + if self.queue.len() == self.queue.capacity() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.peek() { + Ok(message.clone()) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryReceiveError::Empty) + } + } + fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { if self.queue.len() == self.queue.capacity() { self.senders_waker.wake(); @@ -478,6 +513,13 @@ where self.lock(|c| c.try_receive_with_context(cx)) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek_with_context(cx)) + } + /// Poll the channel for the next message pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { self.lock(|c| c.poll_receive(cx)) @@ -548,6 +590,17 @@ where self.lock(|c| c.try_receive()) } + /// Peek at the next value without removing it from the queue. + /// + /// This method will either receive a copy of the message from the channel immediately or return + /// an error if the channel is empty. + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek()) + } + /// Removes elements from the channel based on the given predicate. pub fn remove_if(&self, predicate: F) where @@ -617,6 +670,13 @@ where PriorityChannel::try_receive_with_context(self, cx) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + PriorityChannel::try_peek_with_context(self, cx) + } + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { PriorityChannel::poll_ready_to_send(self, cx) } @@ -705,6 +765,8 @@ mod tests { fn simple_send_and_receive() { let c = PriorityChannel::::new(); assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_peek().unwrap(), 1); + assert_eq!(c.try_peek().unwrap(), 1); assert_eq!(c.try_receive().unwrap(), 1); } @@ -725,6 +787,8 @@ mod tests { let r: DynamicReceiver<'_, u32> = c.receiver().into(); assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_peek().unwrap(), 1); + assert_eq!(r.try_peek().unwrap(), 1); assert_eq!(r.try_receive().unwrap(), 1); } -- cgit From 277f6f7331684a0008930a43b6705ba52873d1f5 Mon Sep 17 00:00:00 2001 From: Corey Schuhen Date: Wed, 28 May 2025 18:15:15 +1000 Subject: Make Sync capable versions of DynamicSender and DynamicReceiver. DynamicSender and DynamicReceiver, just seem to be a fat pointer to a Channel which is already protected by it's own Mutex already. In fact, you can share the Channel already betwen threads and create Dynamic*er's in the target threads. It should be safe to share the Dynamic*er's directly. Can only be used when Mutex M of channel supoorts Sync. --- embassy-sync/src/channel.rs | 106 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 4d1fa9e39..1b053ecad 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -164,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> { } } +/// Send-only access to a [`Channel`] without knowing channel size. +/// This version can be sent between threads but can only be created if the underlying mutex is Sync. +pub struct SendDynamicSender<'ch, T> { + pub(crate) channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for SendDynamicSender<'ch, T> { + fn clone(&self) -> Self { + *self + } +} + +impl<'ch, T> Copy for SendDynamicSender<'ch, T> {} +unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {} +unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {} + +impl<'ch, M, T, const N: usize> From> for SendDynamicSender<'ch, T> +where + M: RawMutex + Sync + Send, +{ + fn from(s: Sender<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +impl<'ch, T> SendDynamicSender<'ch, T> { + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { + DynamicSendFuture { + channel: self.channel, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send_with_context(message, None) + } + + /// Allows a poll_fn to poll until the channel is ready to send + /// + /// See [`Channel::poll_ready_to_send()`] + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_send(cx) + } +} + /// Receive-only access to a [`Channel`]. pub struct Receiver<'ch, M, T, const N: usize> where @@ -317,6 +368,61 @@ where } } +/// Receive-only access to a [`Channel`] without knowing channel size. +/// This version can be sent between threads but can only be created if the underlying mutex is Sync. +pub struct SendableDynamicReceiver<'ch, T> { + pub(crate) channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> { + fn clone(&self) -> Self { + *self + } +} + +impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {} +unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {} +unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {} + +impl<'ch, T> SendableDynamicReceiver<'ch, T> { + /// Receive the next value. + /// + /// See [`Channel::receive()`]. + pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { + DynamicReceiveFuture { channel: self.channel } + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_receive()`] + pub fn try_receive(&self) -> Result { + self.channel.try_receive_with_context(None) + } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`Channel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_receive(cx) + } + + /// Poll the channel for the next item + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } +} + +impl<'ch, M, T, const N: usize> From> for SendableDynamicReceiver<'ch, T> +where + M: RawMutex + Sync + Send, +{ + fn from(s: Receiver<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> where M: RawMutex, -- cgit From 5730b57094d6e2da3645326596532a091b47ec86 Mon Sep 17 00:00:00 2001 From: Corey Schuhen Date: Thu, 29 May 2025 08:30:21 +1000 Subject: Rename SendableDynamicReceiver to SendDynamicReceiver --- embassy-sync/src/channel.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index f97cb2b8e..856551417 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -390,21 +390,21 @@ where /// Receive-only access to a [`Channel`] without knowing channel size. /// This version can be sent between threads but can only be created if the underlying mutex is Sync. -pub struct SendableDynamicReceiver<'ch, T> { +pub struct SendDynamicReceiver<'ch, T> { pub(crate) channel: &'ch dyn DynamicChannel, } -impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> { +impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> { fn clone(&self) -> Self { *self } } -impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {} -unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {} -unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {} +impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {} +unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {} +unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {} -impl<'ch, T> SendableDynamicReceiver<'ch, T> { +impl<'ch, T> SendDynamicReceiver<'ch, T> { /// Receive the next value. /// /// See [`Channel::receive()`]. @@ -434,7 +434,7 @@ impl<'ch, T> SendableDynamicReceiver<'ch, T> { } } -impl<'ch, M, T, const N: usize> From> for SendableDynamicReceiver<'ch, T> +impl<'ch, M, T, const N: usize> From> for SendDynamicReceiver<'ch, T> where M: RawMutex + Sync + Send, { -- cgit From a0d17ea5ca0bd76ef4d4398c28bc8f98c4e50065 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Jun 2025 13:57:19 +0200 Subject: Remove futures-util where unnecessary --- embassy-sync/src/channel.rs | 4 ++-- embassy-sync/src/pubsub/subscriber.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 856551417..dda91c651 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -443,7 +443,7 @@ where } } -impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> +impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N> where M: RawMutex, { @@ -962,7 +962,7 @@ where } } -impl futures_util::Stream for Channel +impl futures_core::Stream for Channel where M: RawMutex, { diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index 6ad660cb3..649382cf1 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -115,7 +115,7 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} /// Warning: The stream implementation ignores lag results and returns all messages. /// This might miss some messages without you knowing it. -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_core::Stream for Sub<'a, PSB, T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -- cgit From 051c63fea2fdb9cc9773c3bd311e6c423f3d1cd2 Mon Sep 17 00:00:00 2001 From: Melvin Wang Date: Wed, 18 Jun 2025 15:38:57 -0700 Subject: fix missing sync bounds --- embassy-sync/src/lazy_lock.rs | 7 ++++++- embassy-sync/src/once_lock.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs index 18e3c2019..f1bd88b61 100644 --- a/embassy-sync/src/lazy_lock.rs +++ b/embassy-sync/src/lazy_lock.rs @@ -31,7 +31,12 @@ union Data { f: ManuallyDrop, } -unsafe impl Sync for LazyLock {} +unsafe impl Sync for LazyLock +where + T: Sync, + F: Sync, +{ +} impl T> LazyLock { /// Create a new uninitialized `StaticLock`. diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index cd05b986d..1e848685a 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs @@ -42,7 +42,7 @@ pub struct OnceLock { data: Cell>, } -unsafe impl Sync for OnceLock {} +unsafe impl Sync for OnceLock where T: Sync {} impl OnceLock { /// Create a new uninitialized `OnceLock`. -- cgit From 72248a601a9ea28ac696f186e2cbe4c2f128a133 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sun, 29 Jun 2025 22:37:11 +0200 Subject: Update Rust nightly, stable. --- embassy-sync/src/pubsub/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 606efff0a..9206b9383 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -88,7 +88,7 @@ impl Result, Error> { + pub fn subscriber(&self) -> Result, Error> { self.inner.lock(|inner| { let mut s = inner.borrow_mut(); @@ -120,7 +120,7 @@ impl Result, Error> { + pub fn publisher(&self) -> Result, Error> { self.inner.lock(|inner| { let mut s = inner.borrow_mut(); @@ -151,13 +151,13 @@ impl ImmediatePublisher { + pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> { ImmediatePublisher(ImmediatePub::new(self)) } /// Create a new publisher that can only send immediate messages. /// This kind of publisher does not take up a publisher slot. - pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { + pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> { DynImmediatePublisher(ImmediatePub::new(self)) } -- cgit From e4aa539708781af2474240c5c16456ffe554754b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 9 Jul 2025 14:02:20 +0200 Subject: add embassy sync channel example for message passing between interrupt and task --- embassy-sync/src/channel.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index dda91c651..a0e39fcb5 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -17,6 +17,31 @@ //! messages that it can store, and if this limit is reached, trying to send //! another message will result in an error being returned. //! +//! # Example: Message passing between task and interrupt handler +//! +//! ```rust +//! use embassy_sync::channel::Channel; +//! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +//! +//! static SHARED_CHANNEL: Channel = Channel::new(); +//! +//! fn my_interrupt_handler() { +//! // Do some work.. +//! // ... +//! if let Err(e) = SHARED_CHANNEL.sender().try_send(42) { +//! // Channel is full.. +//! } +//! } +//! +//! async fn my_async_task() { +//! // ... +//! let receiver = SHARED_CHANNEL.receiver(); +//! loop { +//! let data_from_interrupt = receiver.receive().await; +//! // Do something with the data. +//! } +//! } +//! ``` use core::cell::RefCell; use core::future::Future; -- cgit From 42c8379c5a571aa76214cdd73ef05a2c720eeb6e Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 9 Jul 2025 14:21:19 +0200 Subject: some minor documentation fixes --- embassy-sync/src/mutex.rs | 2 +- embassy-sync/src/pipe.rs | 8 ++++---- embassy-sync/src/priority_channel.rs | 4 ++-- embassy-sync/src/pubsub/publisher.rs | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 7528a9f68..67c682704 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -23,7 +23,7 @@ struct State { /// Async mutex. /// -/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). +/// The mutex is generic over a blocking [RawMutex]. /// The raw mutex is used to guard access to the internal "is locked" flag. It /// is held for very short periods only, while locking and unlocking. It is *not* held /// for the entire time the async Mutex is locked. diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 2598652d2..df3b28b45 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -152,7 +152,7 @@ where impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} -/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. +/// Future returned by [`Reader::fill_buf`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct FillBufFuture<'p, M, const N: usize> where @@ -587,7 +587,7 @@ where } } -/// Write-only access to a [`DynamicPipe`]. +/// Write-only access to the dynamic pipe. pub struct DynamicWriter<'p> { pipe: &'p dyn DynamicPipe, } @@ -657,7 +657,7 @@ where } } -/// Read-only access to a [`DynamicPipe`]. +/// Read-only access to a dynamic pipe. pub struct DynamicReader<'p> { pipe: &'p dyn DynamicPipe, } @@ -742,7 +742,7 @@ where } } -/// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`]. +/// Future returned by [`DynamicReader::fill_buf`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct DynamicFillBufFuture<'p> { pipe: Option<&'p dyn DynamicPipe>, diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 623c52993..a6fbe8def 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -1,7 +1,7 @@ //! A queue for sending values between asynchronous tasks. //! //! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. -//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel. +//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [Kind] parameter of the channel. use core::cell::RefCell; use core::future::Future; @@ -473,7 +473,7 @@ where /// received from the channel. /// /// Sent data may be reordered based on their priority within the channel. -/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] +/// For example, in a [Max][PriorityChannel] /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. pub struct PriorityChannel where diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 7a1ab66de..2af1a9334 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -75,7 +75,7 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { self.channel.is_full() } - /// Create a [`futures::Sink`] adapter for this publisher. + /// Create a [futures_sink::Sink] adapter for this publisher. #[inline] pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { PubSink { publ: self, fut: None } -- cgit From da392ed942bbf78117f1dbba32208458de7cdea8 Mon Sep 17 00:00:00 2001 From: Robin Mueller <31589589+robamu@users.noreply.github.com> Date: Wed, 9 Jul 2025 14:26:20 +0200 Subject: Update embassy-sync/src/mutex.rs Co-authored-by: James Munns --- embassy-sync/src/mutex.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 67c682704..8496f34bf 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -23,7 +23,7 @@ struct State { /// Async mutex. /// -/// The mutex is generic over a blocking [RawMutex]. +/// The mutex is generic over a blocking [`RawMutex`]. /// The raw mutex is used to guard access to the internal "is locked" flag. It /// is held for very short periods only, while locking and unlocking. It is *not* held /// for the entire time the async Mutex is locked. -- cgit From 9892963da946aa896d9387916ee2b5d509b63e3b Mon Sep 17 00:00:00 2001 From: Robin Mueller <31589589+robamu@users.noreply.github.com> Date: Wed, 9 Jul 2025 14:28:18 +0200 Subject: Update embassy-sync/src/priority_channel.rs Co-authored-by: James Munns --- embassy-sync/src/priority_channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index a6fbe8def..6765a1503 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -1,7 +1,7 @@ //! A queue for sending values between asynchronous tasks. //! //! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. -//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [Kind] parameter of the channel. +//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`] parameter of the channel. use core::cell::RefCell; use core::future::Future; -- cgit From 554fbef571cf9f4692d6361d66f962df926615df Mon Sep 17 00:00:00 2001 From: Robin Mueller <31589589+robamu@users.noreply.github.com> Date: Wed, 9 Jul 2025 14:31:04 +0200 Subject: Update embassy-sync/src/priority_channel.rs Co-authored-by: James Munns --- embassy-sync/src/priority_channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 6765a1503..715a20e86 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -473,7 +473,7 @@ where /// received from the channel. /// /// Sent data may be reordered based on their priority within the channel. -/// For example, in a [Max][PriorityChannel] +/// For example, in a [`Max`] [`PriorityChannel`] /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. pub struct PriorityChannel where -- cgit From fa0f6bc670c29b799e0ef8812de041727c5d86f3 Mon Sep 17 00:00:00 2001 From: Robin Mueller <31589589+robamu@users.noreply.github.com> Date: Wed, 9 Jul 2025 14:31:42 +0200 Subject: Update embassy-sync/src/pubsub/publisher.rs Co-authored-by: James Munns --- embassy-sync/src/pubsub/publisher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 2af1a9334..52a52f926 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -75,7 +75,7 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { self.channel.is_full() } - /// Create a [futures_sink::Sink] adapter for this publisher. + /// Create a [`futures_sink::Sink`] adapter for this publisher. #[inline] pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { PubSink { publ: self, fut: None } -- cgit From 03b60dd5619bb65dff697cf9dd96f57ccc23f35e Mon Sep 17 00:00:00 2001 From: Anthony Grondin <104731965+AnthonyGrondin@users.noreply.github.com> Date: Wed, 30 Jul 2025 16:01:11 -0400 Subject: feat(embassy-sync): Add `get_mut` for `LazyLock` --- embassy-sync/src/lazy_lock.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs index f1bd88b61..a919f0037 100644 --- a/embassy-sync/src/lazy_lock.rs +++ b/embassy-sync/src/lazy_lock.rs @@ -57,6 +57,14 @@ impl T> LazyLock { unsafe { &(*self.data.get()).value } } + /// Get a mutable reference to the underlying value, initializing it if it + /// has not been done already. + #[inline] + pub fn get_mut(&mut self) -> &mut T { + self.ensure_init_fast(); + unsafe { &mut (*self.data.get()).value } + } + /// Consume the `LazyLock`, returning the underlying value. The /// initialization function will be called if it has not been /// already. @@ -122,6 +130,13 @@ mod tests { assert_eq!(reference, &20); } #[test] + fn test_lazy_lock_mutation() { + let mut value: LazyLock = LazyLock::new(|| 20); + *value.get_mut() = 21; + let reference = value.get(); + assert_eq!(reference, &21); + } + #[test] fn test_lazy_lock_into_inner() { let lazy: LazyLock = LazyLock::new(|| 20); let value = lazy.into_inner(); -- cgit From 89d52827564b7997f0900614c7b0eb67664c121a Mon Sep 17 00:00:00 2001 From: Brezak Date: Fri, 1 Aug 2025 18:42:25 +0200 Subject: embassy-sync: Update `MultiWakerRegistration::register` docs In 3081ecf301a54f8ed3d0f72350dd21f8ac9e1b18 `register` was changed to clear the buffer when it's full, but the docs weren't updated. --- embassy-sync/src/waitqueue/multi_waker.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 0384d6bed..1c05f8eaf 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs @@ -15,7 +15,9 @@ impl MultiWakerRegistration { Self { wakers: Vec::new() } } - /// Register a waker. If the buffer is full the function returns it in the error + /// Register a waker. + /// + /// If the buffer is full, [wakes all the wakers](Self::wake), clears its buffer and registers the waker. pub fn register(&mut self, w: &Waker) { // If we already have some waker that wakes the same task as `w`, do nothing. // This avoids cloning wakers, and avoids unnecessary mass-wakes. -- cgit From c7b9060a7443cd004d366586c418a3d95bf3447a Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 12 Aug 2025 20:23:20 +0200 Subject: fix: prepare embassy-sync 0.7.1 release * Add newtype for moved type to preserve API compat --- embassy-sync/src/channel.rs | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index a0e39fcb5..8e9fcc234 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -419,6 +419,11 @@ pub struct SendDynamicReceiver<'ch, T> { pub(crate) channel: &'ch dyn DynamicChannel, } +/// Receive-only access to a [`Channel`] without knowing channel size. +/// This version can be sent between threads but can only be created if the underlying mutex is Sync. +#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")] +pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>; + impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> { fn clone(&self) -> Self { *self -- cgit From 368738bef44dbba1a178383d878a6d9423b1ccd9 Mon Sep 17 00:00:00 2001 From: Curly Date: Tue, 19 Aug 2025 22:30:53 -0700 Subject: chore: add more `Debug` impls to `embassy-sync`, particularly on `OnceLock` All tests green --- embassy-sync/src/blocking_mutex/mod.rs | 1 + embassy-sync/src/blocking_mutex/raw.rs | 2 ++ embassy-sync/src/channel.rs | 7 +++++++ embassy-sync/src/lazy_lock.rs | 2 ++ embassy-sync/src/mutex.rs | 1 + embassy-sync/src/once_lock.rs | 10 ++++++++++ embassy-sync/src/pipe.rs | 8 ++++++++ embassy-sync/src/pubsub/mod.rs | 2 ++ embassy-sync/src/pubsub/publisher.rs | 6 ++++++ embassy-sync/src/pubsub/subscriber.rs | 3 +++ embassy-sync/src/ring_buffer.rs | 1 + embassy-sync/src/rwlock.rs | 1 + embassy-sync/src/semaphore.rs | 6 ++++++ embassy-sync/src/signal.rs | 1 + embassy-sync/src/waitqueue/atomic_waker_turbo.rs | 1 + embassy-sync/src/waitqueue/multi_waker.rs | 1 + embassy-sync/src/watch.rs | 6 ++++++ embassy-sync/src/zerocopy_channel.rs | 5 +++++ 18 files changed, 64 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs index a41bc3569..11809c763 100644 --- a/embassy-sync/src/blocking_mutex/mod.rs +++ b/embassy-sync/src/blocking_mutex/mod.rs @@ -22,6 +22,7 @@ use self::raw::RawMutex; /// /// In all cases, the blocking mutex is intended to be short lived and not held across await points. /// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. +#[derive(Debug)] pub struct Mutex { // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets // to run BEFORE dropping `data`. diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs index a8afcad34..50f965e00 100644 --- a/embassy-sync/src/blocking_mutex/raw.rs +++ b/embassy-sync/src/blocking_mutex/raw.rs @@ -37,6 +37,7 @@ pub unsafe trait RawMutex { /// # Safety /// /// This mutex is safe to share between different executors and interrupts. +#[derive(Debug)] pub struct CriticalSectionRawMutex { _phantom: PhantomData<()>, } @@ -65,6 +66,7 @@ unsafe impl RawMutex for CriticalSectionRawMutex { /// # Safety /// /// **This Mutex is only safe within a single executor.** +#[derive(Debug)] pub struct NoopRawMutex { _phantom: PhantomData<*mut ()>, } diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 8e9fcc234..de437cc52 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -55,6 +55,7 @@ use crate::blocking_mutex::Mutex; use crate::waitqueue::WakerRegistration; /// Send-only access to a [`Channel`]. +#[derive(Debug)] pub struct Sender<'ch, M, T, const N: usize> where M: RawMutex, @@ -241,6 +242,7 @@ impl<'ch, T> SendDynamicSender<'ch, T> { } /// Receive-only access to a [`Channel`]. +#[derive(Debug)] pub struct Receiver<'ch, M, T, const N: usize> where M: RawMutex, @@ -486,6 +488,7 @@ where /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct ReceiveFuture<'ch, M, T, const N: usize> where M: RawMutex, @@ -506,6 +509,7 @@ where /// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> where M: RawMutex, @@ -549,6 +553,7 @@ impl<'ch, M: RawMutex, T, const N: usize> From> for /// Future returned by [`Channel::send`] and [`Sender::send`]. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct SendFuture<'ch, M, T, const N: usize> where M: RawMutex, @@ -646,6 +651,7 @@ pub enum TrySendError { Full(T), } +#[derive(Debug)] struct ChannelState { queue: Deque, receiver_waker: WakerRegistration, @@ -785,6 +791,7 @@ impl ChannelState { /// received from the channel. /// /// All data sent will become available in the same order as it was sent. +#[derive(Debug)] pub struct Channel where M: RawMutex, diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs index a919f0037..945560a80 100644 --- a/embassy-sync/src/lazy_lock.rs +++ b/embassy-sync/src/lazy_lock.rs @@ -21,6 +21,7 @@ use core::sync::atomic::{AtomicBool, Ordering}; /// let reference = VALUE.get(); /// assert_eq!(reference, &20); /// ``` +#[derive(Debug)] pub struct LazyLock T> { init: AtomicBool, data: UnsafeCell>, @@ -144,6 +145,7 @@ mod tests { } static DROP_CHECKER: AtomicU32 = AtomicU32::new(0); + #[derive(Debug)] struct DropCheck; impl Drop for DropCheck { diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 8496f34bf..4ce6dd987 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -16,6 +16,7 @@ use crate::waitqueue::WakerRegistration; #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct TryLockError; +#[derive(Debug)] struct State { locked: bool, waker: WakerRegistration, diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index 1e848685a..73edfea9a 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs @@ -1,6 +1,7 @@ //! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. use core::cell::Cell; +use core::fmt::{Debug, Formatter}; use core::future::{poll_fn, Future}; use core::mem::MaybeUninit; use core::sync::atomic::{AtomicBool, Ordering}; @@ -42,6 +43,15 @@ pub struct OnceLock { data: Cell>, } +impl Debug for OnceLock { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + f.debug_struct("OnceLock") + .field("init", &self.init) + .field("data", &"Cell>") + .finish() + } +} + unsafe impl Sync for OnceLock where T: Sync {} impl OnceLock { diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index df3b28b45..6d624979a 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -13,6 +13,7 @@ use crate::ring_buffer::RingBuffer; use crate::waitqueue::WakerRegistration; /// Write-only access to a [`Pipe`]. +#[derive(Debug)] pub struct Writer<'p, M, const N: usize> where M: RawMutex, @@ -52,6 +53,7 @@ where /// Future returned by [`Pipe::write`] and [`Writer::write`]. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct WriteFuture<'p, M, const N: usize> where M: RawMutex, @@ -77,6 +79,7 @@ where impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} /// Read-only access to a [`Pipe`]. +#[derive(Debug)] pub struct Reader<'p, M, const N: usize> where M: RawMutex, @@ -128,6 +131,7 @@ where /// Future returned by [`Pipe::read`] and [`Reader::read`]. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct ReadFuture<'p, M, const N: usize> where M: RawMutex, @@ -154,6 +158,7 @@ impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} /// Future returned by [`Reader::fill_buf`]. #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct FillBufFuture<'p, M, const N: usize> where M: RawMutex, @@ -199,6 +204,7 @@ pub enum TryWriteError { Full, } +#[derive(Debug)] struct PipeState { buffer: RingBuffer, read_waker: WakerRegistration, @@ -206,6 +212,7 @@ struct PipeState { } #[repr(transparent)] +#[derive(Debug)] struct Buffer(UnsafeCell<[u8; N]>); impl Buffer { @@ -230,6 +237,7 @@ unsafe impl Sync for Buffer {} /// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. /// /// All data written will become available in the same order as it was written. +#[derive(Debug)] pub struct Pipe where M: RawMutex, diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 9206b9383..ad9402f5a 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -71,6 +71,7 @@ pub use subscriber::{DynSubscriber, Subscriber}; /// # block_on(test); /// ``` /// +#[derive(Debug)] pub struct PubSubChannel { inner: Mutex>>, } @@ -297,6 +298,7 @@ impl { /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it queue: Deque<(T, usize), CAP>, diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 52a52f926..2a67a0002 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel}; use crate::blocking_mutex::raw::RawMutex; /// A publisher to a channel +#[derive(Debug)] pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The channel we are a publisher for channel: &'a PSB, @@ -106,6 +107,7 @@ impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { } /// A publisher that holds a generic reference to the channel +#[derive(Debug)] pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( pub(super) Pub<'a, PubSubChannel, T>, ); @@ -130,6 +132,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: /// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. /// (So an infinite amount is possible) +#[derive(Debug)] pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The channel we are a publisher for channel: &'a PSB, @@ -205,6 +208,7 @@ impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { } /// An immediate publisher that holds a generic reference to the channel +#[derive(Debug)] pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( pub(super) ImmediatePub<'a, PubSubChannel, T>, ); @@ -229,6 +233,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: #[must_use = "Sinks do nothing unless polled"] /// [`futures_sink::Sink`] adapter for [`Pub`]. +#[derive(Debug)] pub struct PubSink<'a, 'p, PSB, T> where T: Clone, @@ -290,6 +295,7 @@ where /// Future for the publisher wait action #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index 649382cf1..356de23f6 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel, WaitResult}; use crate::blocking_mutex::raw::RawMutex; /// A subscriber to a channel +#[derive(Debug)] pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message id of the next message we are yet to receive next_message_id: u64, @@ -151,6 +152,7 @@ impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { } /// A subscriber that holds a generic reference to the channel +#[derive(Debug)] pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( pub(super) Sub<'a, PubSubChannel, T>, ); @@ -175,6 +177,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: /// Future for the subscriber wait action #[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { subscriber: &'s mut Sub<'a, PSB, T>, } diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs index 81e60c42b..f03b7dd8f 100644 --- a/embassy-sync/src/ring_buffer.rs +++ b/embassy-sync/src/ring_buffer.rs @@ -1,5 +1,6 @@ use core::ops::Range; +#[derive(Debug)] pub struct RingBuffer { start: usize, end: usize, diff --git a/embassy-sync/src/rwlock.rs b/embassy-sync/src/rwlock.rs index deeadd167..0d784a7dc 100644 --- a/embassy-sync/src/rwlock.rs +++ b/embassy-sync/src/rwlock.rs @@ -16,6 +16,7 @@ use crate::waitqueue::WakerRegistration; #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct TryLockError; +#[derive(Debug)] struct State { readers: usize, writer: bool, diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs index d30eee30b..4e82b0fcd 100644 --- a/embassy-sync/src/semaphore.rs +++ b/embassy-sync/src/semaphore.rs @@ -46,6 +46,7 @@ pub trait Semaphore: Sized { /// A representation of a number of acquired permits. /// /// The acquired permits will be released back to the [`Semaphore`] when this is dropped. +#[derive(Debug)] pub struct SemaphoreReleaser<'a, S: Semaphore> { semaphore: &'a S, permits: usize, @@ -181,6 +182,7 @@ impl Semaphore for GreedySemaphore { } } +#[derive(Debug)] struct SemaphoreState { permits: usize, waker: WakerRegistration, @@ -221,6 +223,7 @@ impl SemaphoreState { /// /// Up to `N` tasks may attempt to acquire permits concurrently. If additional /// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned. +#[derive(Debug)] pub struct FairSemaphore where M: RawMutex, @@ -341,6 +344,7 @@ impl Semaphore for FairSemaphore { } } +#[derive(Debug)] struct FairAcquire<'a, M: RawMutex, const N: usize> { sema: &'a FairSemaphore, permits: usize, @@ -364,6 +368,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M } } +#[derive(Debug)] struct FairAcquireAll<'a, M: RawMutex, const N: usize> { sema: &'a FairSemaphore, min: usize, @@ -387,6 +392,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a } } +#[derive(Debug)] struct FairSemaphoreState { permits: usize, next_ticket: usize, diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index e7095401e..d96e36245 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -39,6 +39,7 @@ where state: Mutex>>, } +#[derive(Debug)] enum State { None, Waiting(Waker), diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs index c06b83056..a45adeab8 100644 --- a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs @@ -7,6 +7,7 @@ use core::task::Waker; /// If a waker is registered, registering another waker will replace the previous one without waking it. /// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected, /// that multiple tasks register try to register a waker simultaneously. +#[derive(Debug)] pub struct AtomicWaker { waker: AtomicPtr<()>, } diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 1c05f8eaf..56c0cd1b2 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs @@ -5,6 +5,7 @@ use heapless::Vec; /// Utility struct to register and wake multiple wakers. /// Queue of wakers with a maximum length of `N`. /// Intended for waking multiple tasks. +#[derive(Debug)] pub struct MultiWakerRegistration { wakers: Vec, } diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 08d6a833d..332ab5405 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -65,10 +65,12 @@ use crate::waitqueue::MultiWakerRegistration; /// }; /// block_on(f); /// ``` +#[derive(Debug)] pub struct Watch { mutex: Mutex>>, } +#[derive(Debug)] struct WatchState { data: Option, current_id: u64, @@ -392,6 +394,7 @@ impl Watch { } /// A receiver can `.await` a change in the `Watch` value. +#[derive(Debug)] pub struct Snd<'a, T: Clone, W: WatchBehavior + ?Sized> { watch: &'a W, _phantom: PhantomData, @@ -467,6 +470,7 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Snd<'a, T, W> { /// /// For a simpler type definition, consider [`DynSender`] at the expense of /// some runtime performance due to dynamic dispatch. +#[derive(Debug)] pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch>); impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { @@ -622,6 +626,7 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Drop for Rcv<'a, T, W> { } /// A anonymous receiver can NOT `.await` a change in the `Watch` value. +#[derive(Debug)] pub struct AnonRcv<'a, T: Clone, W: WatchBehavior + ?Sized> { watch: &'a W, at_id: u64, @@ -726,6 +731,7 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { } /// A receiver of a `Watch` channel that cannot `.await` values. +#[derive(Debug)] pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch>); impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs index e3e5b2538..b3f7dbe8c 100644 --- a/embassy-sync/src/zerocopy_channel.rs +++ b/embassy-sync/src/zerocopy_channel.rs @@ -34,6 +34,7 @@ use crate::waitqueue::WakerRegistration; /// /// The channel requires a buffer of recyclable elements. Writing to the channel is done through /// an `&mut T`. +#[derive(Debug)] pub struct Channel<'a, M: RawMutex, T> { buf: BufferPtr, phantom: PhantomData<&'a mut T>, @@ -95,6 +96,7 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { } #[repr(transparent)] +#[derive(Debug)] struct BufferPtr(*mut T); impl BufferPtr { @@ -107,6 +109,7 @@ unsafe impl Send for BufferPtr {} unsafe impl Sync for BufferPtr {} /// Send-only access to a [`Channel`]. +#[derive(Debug)] pub struct Sender<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } @@ -190,6 +193,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { } /// Receive-only access to a [`Channel`]. +#[derive(Debug)] pub struct Receiver<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } @@ -272,6 +276,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { } } +#[derive(Debug)] struct State { /// Maximum number of elements the channel can hold. capacity: usize, -- cgit From fcf659fbe5c0cd6acf328281089c35c999f5514a Mon Sep 17 00:00:00 2001 From: Matthew Tran <0e4ef622@gmail.com> Date: Thu, 28 Aug 2025 10:35:27 -0500 Subject: embassy-sync: Don't drop wakers in Signal::reset --- embassy-sync/src/signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index d96e36245..229b1fa99 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -83,7 +83,7 @@ where /// Remove the queued value in this `Signal`, if any. pub fn reset(&self) { - self.state.lock(|cell| cell.set(State::None)); + self.try_take(); } fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { -- cgit From 78d5d3f2dde14fcbf4879de19076eb89d9b9ef8b Mon Sep 17 00:00:00 2001 From: Robert Zieba Date: Thu, 11 Sep 2025 14:40:29 -0700 Subject: Remove `Sized` bound from `MutexGuard::map` Since `MutexGuard` has `T: ?Sized`, `U` does not need to be restricted to `Sized` types. This now allows using `map` to cast from `MutexGuard<'_, M, ImplsTrait>` to `MutexGuard<'_, M, dyn Trait>`. --- embassy-sync/src/mutex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 4ce6dd987..aea682899 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -187,7 +187,7 @@ where T: ?Sized, { /// Returns a locked view over a portion of the locked data. - pub fn map(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { + pub fn map(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { let mutex = this.mutex; let value = fun(unsafe { &mut *this.mutex.inner.get() }); // Don't run the `drop` method for MutexGuard. The ownership of the underlying @@ -279,7 +279,7 @@ where T: ?Sized, { /// Returns a locked view over a portion of the locked data. - pub fn map(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { + pub fn map(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { let state = this.state; let value = fun(unsafe { &mut *this.value }); // Don't run the `drop` method for MutexGuard. The ownership of the underlying -- cgit