From 69d37503c22be73c3d8283f39155cfa1559a37eb Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sat, 2 Mar 2024 13:13:26 +0100 Subject: Add constructor for dynamic channel --- embassy-sync/src/channel.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index ff7129303..01db0d09a 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -507,6 +507,16 @@ where Receiver { channel: self } } + /// Get a sender for this channel using dynamic dispatch. + pub fn dyn_sender(&self) -> DynamicSender<'_, T> { + DynamicSender { channel: self } + } + + /// Get a receiver for this channel using dynamic dispatch. + pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> { + DynamicReceiver { channel: self } + } + /// Send a value, waiting until there is capacity. /// /// Sending completes when the value has been pushed to the channel's queue. @@ -648,7 +658,7 @@ mod tests { } #[test] - fn dynamic_dispatch() { + fn dynamic_dispatch_into() { let c = Channel::::new(); let s: DynamicSender<'_, u32> = c.sender().into(); let r: DynamicReceiver<'_, u32> = c.receiver().into(); @@ -657,6 +667,16 @@ mod tests { assert_eq!(r.try_receive().unwrap(), 1); } + #[test] + fn dynamic_dispatch_constructor() { + let c = Channel::::new(); + let s = c.dyn_sender(); + let r = c.dyn_receiver(); + + assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_receive().unwrap(), 1); + } + #[futures_test::test] async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap(); -- cgit From 4bbcc2a7fbf1a363719d224dd80dfbcbbee6f26e Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sun, 3 Mar 2024 15:35:52 +0100 Subject: Add OnceLock sync primitive --- embassy-sync/src/lib.rs | 1 + embassy-sync/src/once_lock.rs | 237 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+) create mode 100644 embassy-sync/src/once_lock.rs (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index d88c76db5..61b173e80 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -13,6 +13,7 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; pub mod mutex; +pub mod once_lock; pub mod pipe; pub mod priority_channel; pub mod pubsub; diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs new file mode 100644 index 000000000..f83577a6d --- /dev/null +++ b/embassy-sync/src/once_lock.rs @@ -0,0 +1,237 @@ +//! Syncronization primitive for initializing a value once, allowing others to await a reference to the value. + +use core::cell::Cell; +use core::future::poll_fn; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; +use core::task::Poll; + +/// The `OnceLock` is a synchronization primitive that allows for +/// initializing a value once, and allowing others to `.await` a +/// reference to the value. This is useful for lazy initialization of +/// a static value. +/// +/// **Note**: this implementation uses a busy loop to poll the value, +/// which is not as efficient as registering a dedicated `Waker`. +/// However, the if the usecase for is to initialize a static variable +/// relatively early in the program life cycle, it should be fine. +/// +/// # Example +/// ``` +/// use futures_executor::block_on; +/// use embassy_sync::once_lock::OnceLock; +/// +/// // Define a static value that will be lazily initialized +/// static VALUE: OnceLock = OnceLock::new(); +/// +/// let f = async { +/// +/// // Initialize the value +/// let reference = VALUE.get_or_init(|| 20); +/// assert_eq!(reference, &20); +/// +/// // Wait for the value to be initialized +/// // and get a static reference it +/// assert_eq!(VALUE.get().await, &20); +/// +/// }; +/// block_on(f) +/// ``` +pub struct OnceLock { + init: AtomicBool, + data: Cell>, +} + +unsafe impl Sync for OnceLock {} + +impl OnceLock { + /// Create a new uninitialized `OnceLock`. + pub const fn new() -> Self { + Self { + init: AtomicBool::new(false), + data: Cell::new(MaybeUninit::zeroed()), + } + } + + /// Get a reference to the underlying value, waiting for it to be set. + /// If the value is already set, this will return immediately. + pub async fn get(&self) -> &T { + + poll_fn(|cx| match self.try_get() { + Some(data) => Poll::Ready(data), + None => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .await + } + + /// Try to get a reference to the underlying value if it exists. + pub fn try_get(&self) -> Option<&T> { + if self.init.load(Ordering::Relaxed) { + Some(unsafe { self.get_ref_unchecked() }) + } else { + None + } + } + + /// Set the underlying value. If the value is already set, this will return an error with the given value. + pub fn init(&self, value: T) -> Result<(), T> { + // Critical section is required to ensure that the value is + // not simultaniously initialized elsewhere at the same time. + critical_section::with(|_| { + // If the value is not set, set it and return Ok. + if !self.init.load(Ordering::Relaxed) { + self.data.set(MaybeUninit::new(value)); + self.init.store(true, Ordering::Relaxed); + Ok(()) + + // Otherwise return an error with the given value. + } else { + Err(value) + } + }) + } + + /// Get a reference to the underlying value, initializing it if it does not exist. + pub fn get_or_init(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + // Critical section is required to ensure that the value is + // not simultaniously initialized elsewhere at the same time. + critical_section::with(|_| { + // If the value is not set, set it. + if !self.init.load(Ordering::Relaxed) { + self.data.set(MaybeUninit::new(f())); + self.init.store(true, Ordering::Relaxed); + } + }); + + // Return a reference to the value. + unsafe { self.get_ref_unchecked() } + } + + /// Consume the `OnceLock`, returning the underlying value if it was initialized. + pub fn into_inner(self) -> Option { + if self.init.load(Ordering::Relaxed) { + Some(unsafe { self.data.into_inner().assume_init() }) + } else { + None + } + } + + /// Take the underlying value if it was initialized, uninitializing the `OnceLock` in the process. + pub fn take(&mut self) -> Option { + // If the value is set, uninitialize the lock and return the value. + critical_section::with(|_| { + if self.init.load(Ordering::Relaxed) { + let val = unsafe { self.data.replace(MaybeUninit::zeroed()).assume_init() }; + self.init.store(false, Ordering::Relaxed); + Some(val) + + // Otherwise return None. + } else { + None + } + }) + } + + /// Check if the value has been set. + pub fn is_set(&self) -> bool { + self.init.load(Ordering::Relaxed) + } + + /// Get a reference to the underlying value. + /// # Safety + /// Must only be used if a value has been set. + unsafe fn get_ref_unchecked(&self) -> &T { + (*self.data.as_ptr()).assume_init_ref() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn once_lock() { + let lock = OnceLock::new(); + assert_eq!(lock.try_get(), None); + assert_eq!(lock.is_set(), false); + + let v = 42; + assert_eq!(lock.init(v), Ok(())); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&v)); + assert_eq!(lock.try_get(), Some(&v)); + + let v = 43; + assert_eq!(lock.init(v), Err(v)); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&42)); + } + + #[test] + fn once_lock_get_or_init() { + let lock = OnceLock::new(); + assert_eq!(lock.try_get(), None); + assert_eq!(lock.is_set(), false); + + let v = lock.get_or_init(|| 42); + assert_eq!(v, &42); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&42)); + + let v = lock.get_or_init(|| 43); + assert_eq!(v, &42); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&42)); + } + + #[test] + fn once_lock_static() { + static LOCK: OnceLock = OnceLock::new(); + + let v: &'static i32 = LOCK.get_or_init(|| 42); + assert_eq!(v, &42); + + let v: &'static i32 = LOCK.get_or_init(|| 43); + assert_eq!(v, &42); + } + + #[futures_test::test] + async fn once_lock_async() { + static LOCK: OnceLock = OnceLock::new(); + + assert!(LOCK.init(42).is_ok()); + + let v: &'static i32 = LOCK.get().await; + assert_eq!(v, &42); + } + + #[test] + fn once_lock_into_inner() { + let lock: OnceLock = OnceLock::new(); + + let v = lock.get_or_init(|| 42); + assert_eq!(v, &42); + + assert_eq!(lock.into_inner(), Some(42)); + } + + #[test] + fn once_lock_take_init() { + let mut lock: OnceLock = OnceLock::new(); + + assert_eq!(lock.get_or_init(|| 42), &42); + assert_eq!(lock.is_set(), true); + + assert_eq!(lock.take(), Some(42)); + assert_eq!(lock.is_set(), false); + + assert_eq!(lock.get_or_init(|| 43), &43); + assert_eq!(lock.is_set(), true); + } +} -- cgit From 245e7d3bc20d66fa77f6763ce5e5bec0ea6ddeff Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sun, 3 Mar 2024 15:43:01 +0100 Subject: This one is for ci/rustfmt --- embassy-sync/src/once_lock.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index f83577a6d..31cc99711 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs @@ -33,7 +33,7 @@ use core::task::Poll; /// // Wait for the value to be initialized /// // and get a static reference it /// assert_eq!(VALUE.get().await, &20); -/// +/// /// }; /// block_on(f) /// ``` @@ -56,7 +56,6 @@ impl OnceLock { /// Get a reference to the underlying value, waiting for it to be set. /// If the value is already set, this will return immediately. pub async fn get(&self) -> &T { - poll_fn(|cx| match self.try_get() { Some(data) => Poll::Ready(data), None => { -- cgit From 2a257573773f3dabdf16ea6a44ba0dadef786f37 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 4 Mar 2024 18:36:34 +0100 Subject: docs: clarify capabilities of zerocopy channel --- embassy-sync/src/zerocopy_channel.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs index f704cbd5d..cfce9a571 100644 --- a/embassy-sync/src/zerocopy_channel.rs +++ b/embassy-sync/src/zerocopy_channel.rs @@ -1,10 +1,7 @@ //! A zero-copy queue for sending values between asynchronous tasks. //! -//! It can be used concurrently by multiple producers (senders) and multiple -//! consumers (receivers), i.e. it is an "MPMC channel". -//! -//! Receivers are competing for messages. So a message that is received by -//! one receiver is not received by any other. +//! It can be used concurrently by a producer (sender) and a +//! consumer (receiver), i.e. it is an "SPSC channel". //! //! This queue takes a Mutex type so that various //! targets can be attained. For example, a ThreadModeMutex can be used -- cgit From 8bb1fe1f653bd4bef6ad0760f33a07bcfe5d91fb Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 12 Mar 2024 15:27:52 +0100 Subject: Add conversion into dyn variants for channel futures --- embassy-sync/src/channel.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 01db0d09a..79d3e0378 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -263,6 +263,15 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { } } +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicReceiveFuture<'ch, T> +{ + fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self { + Self { + channel: value.channel, + } + } +} + /// Future returned by [`Channel::send`] and [`Sender::send`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SendFuture<'ch, M, T, const N: usize> @@ -321,6 +330,16 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicSendFuture<'ch, T> +{ + fn from(value: SendFuture<'ch, M, T, N>) -> Self { + Self { + channel: value.channel, + message: value.message, + } + } +} + pub(crate) trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; -- cgit From c0d91600ea7b6847e2295a0fee819291c951132f Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 12 Mar 2024 15:37:53 +0100 Subject: rustfmt --- embassy-sync/src/channel.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 79d3e0378..48f4dafd6 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -263,12 +263,9 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { } } -impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicReceiveFuture<'ch, T> -{ +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicReceiveFuture<'ch, T> { fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self { - Self { - channel: value.channel, - } + Self { channel: value.channel } } } @@ -330,8 +327,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicSendFuture<'ch, T> -{ +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicSendFuture<'ch, T> { fn from(value: SendFuture<'ch, M, T, N>) -> Self { Self { channel: value.channel, -- cgit From d06dbf332bc2557d83045c93f71a86163663d481 Mon Sep 17 00:00:00 2001 From: Noah Bliss Date: Wed, 20 Mar 2024 03:19:01 +0000 Subject: Doc update: signaled does not clear signal signaled does not clear signal (doc update) --- embassy-sync/src/signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index d75750ce7..520f1a896 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -125,7 +125,7 @@ where }) } - /// non-blocking method to check whether this signal has been signaled. + /// non-blocking method to check whether this signal has been signaled. This does not clear the signal. pub fn signaled(&self) -> bool { self.state.lock(|cell| { let state = cell.replace(State::None); -- cgit From 3d842dac85a4ea21519f56d4ec6342b528805b8a Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 20 Mar 2024 14:53:19 +0100 Subject: fmt: disable "unused" warnings. --- embassy-sync/src/fmt.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 78e583c1c..2ac42c557 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs @@ -1,5 +1,5 @@ #![macro_use] -#![allow(unused_macros)] +#![allow(unused)] use core::fmt::{Debug, Display, LowerHex}; @@ -229,7 +229,6 @@ impl Try for Result { } } -#[allow(unused)] pub(crate) struct Bytes<'a>(pub &'a [u8]); impl<'a> Debug for Bytes<'a> { -- cgit From f8a6007e1cb3fe9b13db4a56872228155ed8f1cb Mon Sep 17 00:00:00 2001 From: Alex Moon Date: Sat, 30 Mar 2024 22:19:55 -0400 Subject: Semaphore synchronization primitive This provides both a "greedy" and "fair" async semaphore implementation. --- embassy-sync/src/lib.rs | 1 + embassy-sync/src/semaphore.rs | 704 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 705 insertions(+) create mode 100644 embassy-sync/src/semaphore.rs (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 61b173e80..1873483f9 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -17,6 +17,7 @@ pub mod once_lock; pub mod pipe; pub mod priority_channel; pub mod pubsub; +pub mod semaphore; pub mod signal; pub mod waitqueue; pub mod zerocopy_channel; diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs new file mode 100644 index 000000000..52c468b4a --- /dev/null +++ b/embassy-sync/src/semaphore.rs @@ -0,0 +1,704 @@ +//! A synchronization primitive for controlling access to a pool of resources. +use core::cell::{Cell, RefCell}; +use core::convert::Infallible; +use core::future::poll_fn; +use core::mem::MaybeUninit; +use core::task::{Poll, Waker}; + +use heapless::Deque; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// An asynchronous semaphore. +/// +/// A semaphore tracks a number of permits, typically representing a pool of shared resources. +/// Users can acquire permits to synchronize access to those resources. The semaphore does not +/// contain the resources themselves, only the count of available permits. +pub trait Semaphore: Sized { + /// The error returned when the semaphore is unable to acquire the requested permits. + type Error; + + /// Asynchronously acquire one or more permits from the semaphore. + async fn acquire(&self, permits: usize) -> Result, Self::Error>; + + /// Try to immediately acquire one or more permits from the semaphore. + fn try_acquire(&self, permits: usize) -> Option>; + + /// Asynchronously acquire all permits controlled by the semaphore. + /// + /// This method will wait until at least `min` permits are available, then acquire all available permits + /// from the semaphore. Note that other tasks may have already acquired some permits which could be released + /// back to the semaphore at any time. The number of permits actually acquired may be determined by calling + /// [`SemaphoreReleaser::permits`]. + async fn acquire_all(&self, min: usize) -> Result, Self::Error>; + + /// Try to immediately acquire all available permits from the semaphore, if at least `min` permits are available. + fn try_acquire_all(&self, min: usize) -> Option>; + + /// Release `permits` back to the semaphore, making them available to be acquired. + fn release(&self, permits: usize); + + /// Reset the number of available permints in the semaphore to `permits`. + fn set(&self, permits: usize); +} + +/// A representation of a number of acquired permits. +/// +/// The acquired permits will be released back to the [`Semaphore`] when this is dropped. +pub struct SemaphoreReleaser<'a, S: Semaphore> { + semaphore: &'a S, + permits: usize, +} + +impl<'a, S: Semaphore> Drop for SemaphoreReleaser<'a, S> { + fn drop(&mut self) { + self.semaphore.release(self.permits); + } +} + +impl<'a, S: Semaphore> SemaphoreReleaser<'a, S> { + /// The number of acquired permits. + pub fn permits(&self) -> usize { + self.permits + } + + /// Prevent the acquired permits from being released on drop. + /// + /// Returns the number of acquired permits. + pub fn disarm(self) -> usize { + let permits = self.permits; + core::mem::forget(self); + permits + } +} + +/// A greedy [`Semaphore`] implementation. +/// +/// Tasks can acquire permits as soon as they become available, even if another task +/// is waiting on a larger number of permits. +pub struct GreedySemaphore { + state: Mutex>, +} + +impl Default for GreedySemaphore { + fn default() -> Self { + Self::new(0) + } +} + +impl GreedySemaphore { + /// Create a new `Semaphore`. + pub const fn new(permits: usize) -> Self { + Self { + state: Mutex::new(Cell::new(SemaphoreState { + permits, + waker: WakerRegistration::new(), + })), + } + } + + #[cfg(test)] + fn permits(&self) -> usize { + self.state.lock(|cell| { + let state = cell.replace(SemaphoreState::EMPTY); + let permits = state.permits; + cell.replace(state); + permits + }) + } + + fn poll_acquire( + &self, + permits: usize, + acquire_all: bool, + waker: Option<&Waker>, + ) -> Poll, Infallible>> { + self.state.lock(|cell| { + let mut state = cell.replace(SemaphoreState::EMPTY); + if let Some(permits) = state.take(permits, acquire_all) { + cell.set(state); + Poll::Ready(Ok(SemaphoreReleaser { + semaphore: self, + permits, + })) + } else { + if let Some(waker) = waker { + state.register(waker); + } + cell.set(state); + Poll::Pending + } + }) + } +} + +impl Semaphore for GreedySemaphore { + type Error = Infallible; + + async fn acquire(&self, permits: usize) -> Result, Self::Error> { + poll_fn(|cx| self.poll_acquire(permits, false, Some(cx.waker()))).await + } + + fn try_acquire(&self, permits: usize) -> Option> { + match self.poll_acquire(permits, false, None) { + Poll::Ready(Ok(n)) => Some(n), + _ => None, + } + } + + async fn acquire_all(&self, min: usize) -> Result, Self::Error> { + poll_fn(|cx| self.poll_acquire(min, true, Some(cx.waker()))).await + } + + fn try_acquire_all(&self, min: usize) -> Option> { + match self.poll_acquire(min, true, None) { + Poll::Ready(Ok(n)) => Some(n), + _ => None, + } + } + + fn release(&self, permits: usize) { + if permits > 0 { + self.state.lock(|cell| { + let mut state = cell.replace(SemaphoreState::EMPTY); + state.permits += permits; + state.wake(); + cell.set(state); + }); + } + } + + fn set(&self, permits: usize) { + self.state.lock(|cell| { + let mut state = cell.replace(SemaphoreState::EMPTY); + if permits > state.permits { + state.wake(); + } + state.permits = permits; + cell.set(state); + }); + } +} + +struct SemaphoreState { + permits: usize, + waker: WakerRegistration, +} + +impl SemaphoreState { + const EMPTY: SemaphoreState = SemaphoreState { + permits: 0, + waker: WakerRegistration::new(), + }; + + fn register(&mut self, w: &Waker) { + self.waker.register(w); + } + + fn take(&mut self, mut permits: usize, acquire_all: bool) -> Option { + if self.permits < permits { + None + } else { + if acquire_all { + permits = self.permits; + } + self.permits -= permits; + Some(permits) + } + } + + fn wake(&mut self) { + self.waker.wake(); + } +} + +/// A fair [`Semaphore`] implementation. +/// +/// Tasks are allowed to acquire permits in FIFO order. A task waiting to acquire +/// a large number of permits will prevent other tasks from acquiring any permits +/// until its request is satisfied. +/// +/// Up to `N` tasks may attempt to acquire permits concurrently. If additional +/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned. +pub struct FairSemaphore +where + M: RawMutex, +{ + state: Mutex>>, +} + +impl Default for FairSemaphore +where + M: RawMutex, +{ + fn default() -> Self { + Self::new(0) + } +} + +impl FairSemaphore +where + M: RawMutex, +{ + /// Create a new `FairSemaphore`. + pub const fn new(permits: usize) -> Self { + Self { + state: Mutex::new(RefCell::new(FairSemaphoreState::new(permits))), + } + } + + #[cfg(test)] + fn permits(&self) -> usize { + self.state.lock(|cell| cell.borrow().permits) + } + + fn poll_acquire( + &self, + permits: usize, + acquire_all: bool, + cx: Option<(&Cell>, &Waker)>, + ) -> Poll, WaitQueueFull>> { + let ticket = cx.as_ref().map(|(cell, _)| cell.get()).unwrap_or(None); + self.state.lock(|cell| { + let mut state = cell.borrow_mut(); + if let Some(permits) = state.take(ticket, permits, acquire_all) { + Poll::Ready(Ok(SemaphoreReleaser { + semaphore: self, + permits, + })) + } else if let Some((cell, waker)) = cx { + match state.register(ticket, waker) { + Ok(ticket) => { + cell.set(Some(ticket)); + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + } else { + Poll::Pending + } + }) + } +} + +/// An error indicating the [`FairSemaphore`]'s wait queue is full. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct WaitQueueFull; + +impl Semaphore for FairSemaphore { + type Error = WaitQueueFull; + + async fn acquire(&self, permits: usize) -> Result, Self::Error> { + let ticket = Cell::new(None); + let _guard = OnDrop::new(|| self.state.lock(|cell| cell.borrow_mut().cancel(ticket.get()))); + poll_fn(|cx| self.poll_acquire(permits, false, Some((&ticket, cx.waker())))).await + } + + fn try_acquire(&self, permits: usize) -> Option> { + match self.poll_acquire(permits, false, None) { + Poll::Ready(Ok(x)) => Some(x), + _ => None, + } + } + + async fn acquire_all(&self, min: usize) -> Result, Self::Error> { + let ticket = Cell::new(None); + let _guard = OnDrop::new(|| self.state.lock(|cell| cell.borrow_mut().cancel(ticket.get()))); + poll_fn(|cx| self.poll_acquire(min, true, Some((&ticket, cx.waker())))).await + } + + fn try_acquire_all(&self, min: usize) -> Option> { + match self.poll_acquire(min, true, None) { + Poll::Ready(Ok(x)) => Some(x), + _ => None, + } + } + + fn release(&self, permits: usize) { + if permits > 0 { + self.state.lock(|cell| { + let mut state = cell.borrow_mut(); + state.permits += permits; + state.wake(); + }); + } + } + + fn set(&self, permits: usize) { + self.state.lock(|cell| { + let mut state = cell.borrow_mut(); + if permits > state.permits { + state.wake(); + } + state.permits = permits; + }); + } +} + +struct FairSemaphoreState { + permits: usize, + next_ticket: usize, + wakers: Deque, N>, +} + +impl FairSemaphoreState { + /// Create a new empty instance + const fn new(permits: usize) -> Self { + Self { + permits, + next_ticket: 0, + wakers: Deque::new(), + } + } + + /// Register a waker. If the queue is full the function returns an error + fn register(&mut self, ticket: Option, w: &Waker) -> Result { + self.pop_canceled(); + + match ticket { + None => { + let ticket = self.next_ticket.wrapping_add(self.wakers.len()); + self.wakers.push_back(Some(w.clone())).or(Err(WaitQueueFull))?; + Ok(ticket) + } + Some(ticket) => { + self.set_waker(ticket, Some(w.clone())); + Ok(ticket) + } + } + } + + fn cancel(&mut self, ticket: Option) { + if let Some(ticket) = ticket { + self.set_waker(ticket, None); + } + } + + fn set_waker(&mut self, ticket: usize, waker: Option) { + let i = ticket.wrapping_sub(self.next_ticket); + if i < self.wakers.len() { + let (a, b) = self.wakers.as_mut_slices(); + let x = if i < a.len() { &mut a[i] } else { &mut b[i - a.len()] }; + *x = waker; + } + } + + fn take(&mut self, ticket: Option, mut permits: usize, acquire_all: bool) -> Option { + self.pop_canceled(); + + if permits > self.permits { + return None; + } + + match ticket { + Some(n) if n != self.next_ticket => return None, + None if !self.wakers.is_empty() => return None, + _ => (), + } + + if acquire_all { + permits = self.permits; + } + self.permits -= permits; + + if ticket.is_some() { + self.pop(); + } + + Some(permits) + } + + fn pop_canceled(&mut self) { + while let Some(None) = self.wakers.front() { + self.pop(); + } + } + + /// Panics if `self.wakers` is empty + fn pop(&mut self) { + self.wakers.pop_front().unwrap(); + self.next_ticket = self.next_ticket.wrapping_add(1); + } + + fn wake(&mut self) { + self.pop_canceled(); + + if let Some(Some(waker)) = self.wakers.front() { + waker.wake_by_ref(); + } + } +} + +/// A type to delay the drop handler invocation. +#[must_use = "to delay the drop handler invocation to the end of the scope"] +struct OnDrop { + f: MaybeUninit, +} + +impl OnDrop { + /// Create a new instance. + pub fn new(f: F) -> Self { + Self { f: MaybeUninit::new(f) } + } +} + +impl Drop for OnDrop { + fn drop(&mut self) { + unsafe { self.f.as_ptr().read()() } + } +} + +#[cfg(test)] +mod tests { + mod greedy { + use core::pin::pin; + + use futures_util::poll; + + use super::super::*; + use crate::blocking_mutex::raw::NoopRawMutex; + + #[test] + fn try_acquire() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + assert_eq!(a.permits(), 1); + assert_eq!(semaphore.permits(), 2); + + core::mem::drop(a); + assert_eq!(semaphore.permits(), 3); + } + + #[test] + fn disarm() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + assert_eq!(a.disarm(), 1); + assert_eq!(semaphore.permits(), 2); + } + + #[futures_test::test] + async fn acquire() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.acquire(1).await.unwrap(); + assert_eq!(a.permits(), 1); + assert_eq!(semaphore.permits(), 2); + + core::mem::drop(a); + assert_eq!(semaphore.permits(), 3); + } + + #[test] + fn try_acquire_all() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.try_acquire_all(1).unwrap(); + assert_eq!(a.permits(), 3); + assert_eq!(semaphore.permits(), 0); + } + + #[futures_test::test] + async fn acquire_all() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.acquire_all(1).await.unwrap(); + assert_eq!(a.permits(), 3); + assert_eq!(semaphore.permits(), 0); + } + + #[test] + fn release() { + let semaphore = GreedySemaphore::::new(3); + assert_eq!(semaphore.permits(), 3); + semaphore.release(2); + assert_eq!(semaphore.permits(), 5); + } + + #[test] + fn set() { + let semaphore = GreedySemaphore::::new(3); + assert_eq!(semaphore.permits(), 3); + semaphore.set(2); + assert_eq!(semaphore.permits(), 2); + } + + #[test] + fn contested() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + let b = semaphore.try_acquire(3); + assert!(b.is_none()); + + core::mem::drop(a); + + let b = semaphore.try_acquire(3); + assert!(b.is_some()); + } + + #[futures_test::test] + async fn greedy() { + let semaphore = GreedySemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + + let b_fut = semaphore.acquire(3); + let mut b_fut = pin!(b_fut); + let b = poll!(b_fut.as_mut()); + assert!(b.is_pending()); + + // Succeed even through `b` is waiting + let c = semaphore.try_acquire(1); + assert!(c.is_some()); + + let b = poll!(b_fut.as_mut()); + assert!(b.is_pending()); + + core::mem::drop(a); + + let b = poll!(b_fut.as_mut()); + assert!(b.is_pending()); + + core::mem::drop(c); + + let b = poll!(b_fut.as_mut()); + assert!(b.is_ready()); + } + } + + mod fair { + use core::pin::pin; + + use futures_util::poll; + + use super::super::*; + use crate::blocking_mutex::raw::NoopRawMutex; + + #[test] + fn try_acquire() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + assert_eq!(a.permits(), 1); + assert_eq!(semaphore.permits(), 2); + + core::mem::drop(a); + assert_eq!(semaphore.permits(), 3); + } + + #[test] + fn disarm() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + assert_eq!(a.disarm(), 1); + assert_eq!(semaphore.permits(), 2); + } + + #[futures_test::test] + async fn acquire() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.acquire(1).await.unwrap(); + assert_eq!(a.permits(), 1); + assert_eq!(semaphore.permits(), 2); + + core::mem::drop(a); + assert_eq!(semaphore.permits(), 3); + } + + #[test] + fn try_acquire_all() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.try_acquire_all(1).unwrap(); + assert_eq!(a.permits(), 3); + assert_eq!(semaphore.permits(), 0); + } + + #[futures_test::test] + async fn acquire_all() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.acquire_all(1).await.unwrap(); + assert_eq!(a.permits(), 3); + assert_eq!(semaphore.permits(), 0); + } + + #[test] + fn release() { + let semaphore = FairSemaphore::::new(3); + assert_eq!(semaphore.permits(), 3); + semaphore.release(2); + assert_eq!(semaphore.permits(), 5); + } + + #[test] + fn set() { + let semaphore = FairSemaphore::::new(3); + assert_eq!(semaphore.permits(), 3); + semaphore.set(2); + assert_eq!(semaphore.permits(), 2); + } + + #[test] + fn contested() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.try_acquire(1).unwrap(); + let b = semaphore.try_acquire(3); + assert!(b.is_none()); + + core::mem::drop(a); + + let b = semaphore.try_acquire(3); + assert!(b.is_some()); + } + + #[futures_test::test] + async fn fairness() { + let semaphore = FairSemaphore::::new(3); + + let a = semaphore.try_acquire(1); + assert!(a.is_some()); + + let b_fut = semaphore.acquire(3); + let mut b_fut = pin!(b_fut); + let b = poll!(b_fut.as_mut()); // Poll `b_fut` once so it is registered + assert!(b.is_pending()); + + let c = semaphore.try_acquire(1); + assert!(c.is_none()); + + let c_fut = semaphore.acquire(1); + let mut c_fut = pin!(c_fut); + let c = poll!(c_fut.as_mut()); // Poll `c_fut` once so it is registered + assert!(c.is_pending()); // `c` is blocked behind `b` + + let d = semaphore.acquire(1).await; + assert!(matches!(d, Err(WaitQueueFull))); + + core::mem::drop(a); + + let c = poll!(c_fut.as_mut()); + assert!(c.is_pending()); // `c` is still blocked behind `b` + + let b = poll!(b_fut.as_mut()); + assert!(b.is_ready()); + + let c = poll!(c_fut.as_mut()); + assert!(c.is_pending()); // `c` is still blocked behind `b` + + core::mem::drop(b); + + let c = poll!(c_fut.as_mut()); + assert!(c.is_ready()); + } + } +} -- cgit From c9acebf783c64784fe6b659a94b40fa080b6fbe8 Mon Sep 17 00:00:00 2001 From: Alex Moon Date: Wed, 3 Apr 2024 19:13:57 -0400 Subject: Fix `FairSemaphore` bugs - `acquire` and `acquire_all` futures were `!Send`, even for `M: RawMutex + Send` due to the captured `Cell`. - If multiple `acquire` tasks were queued, waking the first would not wake the second, even if there were permits remaining after the first `acquire` completed. --- embassy-sync/src/semaphore.rs | 136 +++++++++++++++++++++++++++++++----------- 1 file changed, 102 insertions(+), 34 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs index 52c468b4a..d30eee30b 100644 --- a/embassy-sync/src/semaphore.rs +++ b/embassy-sync/src/semaphore.rs @@ -1,8 +1,7 @@ //! A synchronization primitive for controlling access to a pool of resources. use core::cell::{Cell, RefCell}; use core::convert::Infallible; -use core::future::poll_fn; -use core::mem::MaybeUninit; +use core::future::{poll_fn, Future}; use core::task::{Poll, Waker}; use heapless::Deque; @@ -258,9 +257,9 @@ where &self, permits: usize, acquire_all: bool, - cx: Option<(&Cell>, &Waker)>, + cx: Option<(&mut Option, &Waker)>, ) -> Poll, WaitQueueFull>> { - let ticket = cx.as_ref().map(|(cell, _)| cell.get()).unwrap_or(None); + let ticket = cx.as_ref().map(|(x, _)| **x).unwrap_or(None); self.state.lock(|cell| { let mut state = cell.borrow_mut(); if let Some(permits) = state.take(ticket, permits, acquire_all) { @@ -268,10 +267,10 @@ where semaphore: self, permits, })) - } else if let Some((cell, waker)) = cx { + } else if let Some((ticket_ref, waker)) = cx { match state.register(ticket, waker) { Ok(ticket) => { - cell.set(Some(ticket)); + *ticket_ref = Some(ticket); Poll::Pending } Err(err) => Poll::Ready(Err(err)), @@ -291,10 +290,12 @@ pub struct WaitQueueFull; impl Semaphore for FairSemaphore { type Error = WaitQueueFull; - async fn acquire(&self, permits: usize) -> Result, Self::Error> { - let ticket = Cell::new(None); - let _guard = OnDrop::new(|| self.state.lock(|cell| cell.borrow_mut().cancel(ticket.get()))); - poll_fn(|cx| self.poll_acquire(permits, false, Some((&ticket, cx.waker())))).await + fn acquire(&self, permits: usize) -> impl Future, Self::Error>> { + FairAcquire { + sema: self, + permits, + ticket: None, + } } fn try_acquire(&self, permits: usize) -> Option> { @@ -304,10 +305,12 @@ impl Semaphore for FairSemaphore { } } - async fn acquire_all(&self, min: usize) -> Result, Self::Error> { - let ticket = Cell::new(None); - let _guard = OnDrop::new(|| self.state.lock(|cell| cell.borrow_mut().cancel(ticket.get()))); - poll_fn(|cx| self.poll_acquire(min, true, Some((&ticket, cx.waker())))).await + fn acquire_all(&self, min: usize) -> impl Future, Self::Error>> { + FairAcquireAll { + sema: self, + min, + ticket: None, + } } fn try_acquire_all(&self, min: usize) -> Option> { @@ -338,6 +341,52 @@ impl Semaphore for FairSemaphore { } } +struct FairAcquire<'a, M: RawMutex, const N: usize> { + sema: &'a FairSemaphore, + permits: usize, + ticket: Option, +} + +impl<'a, M: RawMutex, const N: usize> Drop for FairAcquire<'a, M, N> { + fn drop(&mut self) { + self.sema + .state + .lock(|cell| cell.borrow_mut().cancel(self.ticket.take())); + } +} + +impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M, N> { + type Output = Result>, WaitQueueFull>; + + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + self.sema + .poll_acquire(self.permits, false, Some((&mut self.ticket, cx.waker()))) + } +} + +struct FairAcquireAll<'a, M: RawMutex, const N: usize> { + sema: &'a FairSemaphore, + min: usize, + ticket: Option, +} + +impl<'a, M: RawMutex, const N: usize> Drop for FairAcquireAll<'a, M, N> { + fn drop(&mut self) { + self.sema + .state + .lock(|cell| cell.borrow_mut().cancel(self.ticket.take())); + } +} + +impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a, M, N> { + type Output = Result>, WaitQueueFull>; + + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + self.sema + .poll_acquire(self.min, true, Some((&mut self.ticket, cx.waker()))) + } +} + struct FairSemaphoreState { permits: usize, next_ticket: usize, @@ -406,6 +455,9 @@ impl FairSemaphoreState { if ticket.is_some() { self.pop(); + if self.permits > 0 { + self.wake(); + } } Some(permits) @@ -432,25 +484,6 @@ impl FairSemaphoreState { } } -/// A type to delay the drop handler invocation. -#[must_use = "to delay the drop handler invocation to the end of the scope"] -struct OnDrop { - f: MaybeUninit, -} - -impl OnDrop { - /// Create a new instance. - pub fn new(f: F) -> Self { - Self { f: MaybeUninit::new(f) } - } -} - -impl Drop for OnDrop { - fn drop(&mut self) { - unsafe { self.f.as_ptr().read()() } - } -} - #[cfg(test)] mod tests { mod greedy { @@ -574,11 +607,16 @@ mod tests { mod fair { use core::pin::pin; + use core::time::Duration; + use futures_executor::ThreadPool; + use futures_timer::Delay; use futures_util::poll; + use futures_util::task::SpawnExt; + use static_cell::StaticCell; use super::super::*; - use crate::blocking_mutex::raw::NoopRawMutex; + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; #[test] fn try_acquire() { @@ -700,5 +738,35 @@ mod tests { let c = poll!(c_fut.as_mut()); assert!(c.is_ready()); } + + #[futures_test::test] + async fn wakers() { + let executor = ThreadPool::new().unwrap(); + + static SEMAPHORE: StaticCell> = StaticCell::new(); + let semaphore = &*SEMAPHORE.init(FairSemaphore::new(3)); + + let a = semaphore.try_acquire(2); + assert!(a.is_some()); + + let b_task = executor + .spawn_with_handle(async move { semaphore.acquire(2).await }) + .unwrap(); + while semaphore.state.lock(|x| x.borrow().wakers.is_empty()) { + Delay::new(Duration::from_millis(50)).await; + } + + let c_task = executor + .spawn_with_handle(async move { semaphore.acquire(1).await }) + .unwrap(); + + core::mem::drop(a); + + let b = b_task.await.unwrap(); + assert_eq!(b.permits(), 2); + + let c = c_task.await.unwrap(); + assert_eq!(c.permits(), 1); + } } } -- cgit From fa05256f0563716ffa6e865b73679f4d545bcff9 Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Mon, 8 Apr 2024 00:39:58 +0200 Subject: embassy-sync: Add len, is_empty and is_full functions to Channel. --- embassy-sync/src/channel.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 48f4dafd6..18be462cb 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -449,6 +449,18 @@ impl ChannelState { Poll::Pending } } + + fn len(&self) -> usize { + self.queue.len() + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + fn is_full(&self) -> bool { + self.queue.is_full() + } } /// A bounded channel for communicating between asynchronous tasks @@ -572,6 +584,21 @@ where pub fn try_receive(&self) -> Result { self.lock(|c| c.try_receive()) } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.lock(|c| c.len()) + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.lock(|c| c.is_empty()) + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.lock(|c| c.is_full()) + } } /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the -- cgit From 86706bdc1438cdccf017d073f9b8f8ff2a0322fe Mon Sep 17 00:00:00 2001 From: Caio Date: Sun, 14 Apr 2024 19:35:59 -0300 Subject: Add map method --- embassy-sync/src/mutex.rs | 133 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 72459d660..b48a408c4 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -3,6 +3,7 @@ //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. use core::cell::{RefCell, UnsafeCell}; use core::future::poll_fn; +use core::mem; use core::ops::{Deref, DerefMut}; use core::task::Poll; @@ -134,6 +135,7 @@ where /// successfully locked the mutex, and grants access to the contents. /// /// Dropping it unlocks the mutex. +#[clippy::has_significant_drop] pub struct MutexGuard<'a, M, T> where M: RawMutex, @@ -142,6 +144,25 @@ where mutex: &'a Mutex, } +impl<'a, M, T> MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + /// Returns a locked view over a portion of the locked data. + pub fn map(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { + let mutex = this.mutex; + let value = fun(unsafe { &mut *this.mutex.inner.get() }); + // Don't run the `drop` method for MutexGuard. The ownership of the underlying + // locked state is being moved to the returned MappedMutexGuard. + mem::forget(this); + MappedMutexGuard { + state: &mutex.state, + value, + } + } +} + impl<'a, M, T> Drop for MutexGuard<'a, M, T> where M: RawMutex, @@ -180,3 +201,115 @@ where unsafe { &mut *(self.mutex.inner.get()) } } } + +/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`] or +/// [`MappedMutexGuard::map`]. +/// +/// This can be used to hold a subfield of the protected data. +#[clippy::has_significant_drop] +pub struct MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + state: &'a BlockingMutex>, + value: *mut T, +} + +impl<'a, M, T> MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + /// Returns a locked view over a portion of the locked data. + pub fn map(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { + let state = this.state; + let value = fun(unsafe { &mut *this.value }); + // Don't run the `drop` method for MutexGuard. The ownership of the underlying + // locked state is being moved to the returned MappedMutexGuard. + mem::forget(this); + MappedMutexGuard { state, value } + } +} + +impl<'a, M, T> Deref for MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + type Target = T; + fn deref(&self) -> &Self::Target { + // Safety: the MutexGuard represents exclusive access to the contents + // of the mutex, so it's OK to get it. + unsafe { &*self.value } + } +} + +impl<'a, M, T> DerefMut for MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + // Safety: the MutexGuard represents exclusive access to the contents + // of the mutex, so it's OK to get it. + unsafe { &mut *self.value } + } +} + +impl<'a, M, T> Drop for MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized, +{ + fn drop(&mut self) { + self.state.lock(|s| { + let mut s = unwrap!(s.try_borrow_mut()); + s.locked = false; + s.waker.wake(); + }) + } +} + +unsafe impl Send for MappedMutexGuard<'_, M, T> +where + M: RawMutex + Sync, + T: Send + ?Sized, +{ +} + +unsafe impl Sync for MappedMutexGuard<'_, M, T> +where + M: RawMutex + Sync, + T: Sync + ?Sized, +{ +} + +#[cfg(test)] +mod tests { + use crate::blocking_mutex::raw::NoopRawMutex; + use crate::mutex::{Mutex, MutexGuard}; + + #[futures_test::test] + async fn mapped_guard_releases_lock_when_dropped() { + let mutex: Mutex = Mutex::new([0, 1]); + + { + let guard = mutex.lock().await; + assert_eq!(*guard, [0, 1]); + let mut mapped = MutexGuard::map(guard, |this| &mut this[1]); + assert_eq!(*mapped, 1); + *mapped = 2; + } + + { + let guard = mutex.lock().await; + assert_eq!(*guard, [0, 2]); + let mut mapped = MutexGuard::map(guard, |this| &mut this[1]); + assert_eq!(*mapped, 2); + *mapped = 3; + } + + assert_eq!(*mutex.lock().await, [0, 3]); + } +} -- cgit From 30dcc880933a6e37f4b4e3e297d0a4fab586befb Mon Sep 17 00:00:00 2001 From: nerwalt Date: Fri, 19 Apr 2024 12:16:12 -0600 Subject: Adding ready_to_receive to Channel and Receiver Adding ReceiveReadyFuture --- embassy-sync/src/channel.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 18be462cb..c4267064c 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -152,6 +152,13 @@ where self.channel.receive() } + /// Is a value ready to be received in the channel + /// + /// See [`Channel::ready_to_receive()`]. + pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> { + self.channel.ready_to_receive() + } + /// Attempt to immediately receive the next value. /// /// See [`Channel::try_receive()`] @@ -246,6 +253,26 @@ where } } +/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_receive(cx) + } +} + /// Future returned by [`DynamicReceiver::receive`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct DynamicReceiveFuture<'ch, T> { @@ -577,6 +604,14 @@ where ReceiveFuture { channel: self } } + /// Is a value ready to be received in the channel + /// + /// If there are no messages in the channel's buffer, this method will + /// wait until there is at least one + pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> { + ReceiveReadyFuture { channel: self } + } + /// Attempt to immediately receive a message. /// /// This method will either receive a message from the channel immediately or return an error -- cgit From cc4ff9ef2d7b0a6a68e98cd3aa08855677ed8b90 Mon Sep 17 00:00:00 2001 From: W Etheredge Date: Sun, 5 May 2024 23:10:00 -0500 Subject: embassy_sync::Mutex: Implement traits to match std --- embassy-sync/src/mutex.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index b48a408c4..8c3a3af9f 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -3,9 +3,9 @@ //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. use core::cell::{RefCell, UnsafeCell}; use core::future::poll_fn; -use core::mem; use core::ops::{Deref, DerefMut}; use core::task::Poll; +use core::{fmt, mem}; use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex as BlockingMutex; @@ -129,6 +129,42 @@ where } } +impl From for Mutex { + fn from(from: T) -> Self { + Self::new(from) + } +} + +impl Default for Mutex +where + M: RawMutex, + T: ?Sized + Default, +{ + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl fmt::Debug for Mutex +where + M: RawMutex, + T: ?Sized + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut d = f.debug_struct("Mutex"); + match self.try_lock() { + Ok(value) => { + d.field("inner", &&*value); + } + Err(TryLockError) => { + d.field("inner", &format_args!("")); + } + } + + d.finish_non_exhaustive() + } +} + /// Async mutex guard. /// /// Owning an instance of this type indicates having @@ -202,6 +238,26 @@ where } } +impl<'a, M, T> fmt::Debug for MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, M, T> fmt::Display for MutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized + fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + /// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`] or /// [`MappedMutexGuard::map`]. /// @@ -285,6 +341,26 @@ where { } +impl<'a, M, T> fmt::Debug for MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, M, T> fmt::Display for MappedMutexGuard<'a, M, T> +where + M: RawMutex, + T: ?Sized + fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + #[cfg(test)] mod tests { use crate::blocking_mutex::raw::NoopRawMutex; -- cgit From fe68308958cbf39a12c8c4be560b0e449e70907a Mon Sep 17 00:00:00 2001 From: QWS Date: Fri, 10 May 2024 22:22:24 +0800 Subject: Fix: typo --- embassy-sync/src/once_lock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index 31cc99711..9332ecfaf 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs @@ -13,7 +13,7 @@ use core::task::Poll; /// /// **Note**: this implementation uses a busy loop to poll the value, /// which is not as efficient as registering a dedicated `Waker`. -/// However, the if the usecase for is to initialize a static variable +/// However, if the usecase for it is to initialize a static variable /// relatively early in the program life cycle, it should be fine. /// /// # Example -- cgit From 3d9b502c7af1f1dab92ee4a3aa07dc667a3bf103 Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Sat, 18 May 2024 13:37:51 +0200 Subject: embassy-sync: Add capacity and free_capacity functions to Channel --- embassy-sync/src/channel.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index c4267064c..f9f71d026 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -620,6 +620,18 @@ where self.lock(|c| c.try_receive()) } + /// Returns the maximum number of elements the channel can hold. + pub const fn capacity(&self) -> usize { + N + } + + /// Returns the free capacity of the channel. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + N - self.len() + } + /// Returns the number of elements currently in the channel. pub fn len(&self) -> usize { self.lock(|c| c.len()) -- cgit From f361c2e81cc2d595adc3896f8a8a198973392046 Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Sat, 18 May 2024 13:48:40 +0200 Subject: embassy-sync: Add capacity, free_capacity, len, is_empty and is_full functions to PriorityChannel --- embassy-sync/src/priority_channel.rs | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index e77678c24..4b9bd0515 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -314,6 +314,18 @@ where Poll::Pending } } + + fn len(&self) -> usize { + self.queue.len() + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + fn is_full(&self) -> bool { + self.queue.len() == self.queue.capacity() + } } /// A bounded channel for communicating between asynchronous tasks @@ -433,6 +445,33 @@ where pub fn try_receive(&self) -> Result { self.lock(|c| c.try_receive()) } + + /// Returns the maximum number of elements the channel can hold. + pub const fn capacity(&self) -> usize { + N + } + + /// Returns the free capacity of the channel. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + N - self.len() + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.lock(|c| c.len()) + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.lock(|c| c.is_empty()) + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.lock(|c| c.is_full()) + } } /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the -- cgit From ab899934513d7b7b1eda1727bc145713856e4a9a Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Sat, 18 May 2024 14:01:23 +0200 Subject: embassy-sync: Add capacity, free_capacity, len, is_empty and is_full functions to PubSubChannel --- embassy-sync/src/pubsub/mod.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 6afd54af5..754747ab8 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -160,6 +160,33 @@ impl DynImmediatePublisher { DynImmediatePublisher(ImmediatePub::new(self)) } + + /// Returns the maximum number of elements the channel can hold. + pub const fn capacity(&self) -> usize { + CAP + } + + /// Returns the free capacity of the channel. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + CAP - self.len() + } + + /// Returns the number of elements currently in the channel. + pub fn len(&self) -> usize { + self.inner.lock(|inner| inner.borrow().len()) + } + + /// Returns whether the channel is empty. + pub fn is_empty(&self) -> bool { + self.inner.lock(|inner| inner.borrow().is_empty()) + } + + /// Returns whether the channel is full. + pub fn is_full(&self) -> bool { + self.inner.lock(|inner| inner.borrow().is_full()) + } } impl PubSubBehavior @@ -366,6 +393,18 @@ impl PubSubSta fn unregister_publisher(&mut self) { self.publisher_count -= 1; } + + fn len(&self) -> usize { + self.queue.len() + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + fn is_full(&self) -> bool { + self.queue.is_full() + } } /// Error type for the [PubSubChannel] -- cgit From 2a4a714060d4cdb976eab62a70f9c1242425714f Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Mon, 20 May 2024 15:18:10 +0200 Subject: Make behaviour trait sealed --- embassy-sync/src/pubsub/mod.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 754747ab8..adbf8cf02 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -189,7 +189,7 @@ impl PubSubBehavior +impl SealedPubSubBehavior for PubSubChannel { fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { @@ -421,7 +421,7 @@ pub enum Error { /// 'Middle level' behaviour of the pubsub channel. /// This trait is used so that Sub and Pub can be generic over the channel. -pub trait PubSubBehavior { +trait SealedPubSubBehavior { /// Try to get a message from the queue with the given message id. /// /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. @@ -449,6 +449,13 @@ pub trait PubSubBehavior { fn unregister_publisher(&self); } +/// 'Middle level' behaviour of the pubsub channel. +/// This trait is used so that Sub and Pub can be generic over the channel. +#[allow(private_bounds)] +pub trait PubSubBehavior: SealedPubSubBehavior {} + +impl> PubSubBehavior for C {} + /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] -- cgit From a76082b104edd4cbb7f10a4a589ef6773d8d6e4f Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Mon, 20 May 2024 15:34:03 +0200 Subject: Expose new length functions in the subs and pubs --- embassy-sync/src/pubsub/mod.rs | 82 +++++++++++++++++++++++++---------- embassy-sync/src/pubsub/publisher.rs | 58 ++++++++++++++++++++----- embassy-sync/src/pubsub/subscriber.rs | 31 ++++++++++++- 3 files changed, 138 insertions(+), 33 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index adbf8cf02..af3d6db2a 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -248,13 +248,6 @@ impl usize { - self.inner.lock(|s| { - let s = s.borrow(); - s.queue.capacity() - s.queue.len() - }) - } - fn unregister_subscriber(&self, subscriber_next_message_id: u64) { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -268,6 +261,26 @@ impl usize { + self.capacity() + } + + fn free_capacity(&self) -> usize { + self.free_capacity() + } + + fn len(&self) -> usize { + self.len() + } + + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn is_full(&self) -> bool { + self.is_full() + } } /// Internal state for the PubSub channel @@ -439,8 +452,22 @@ trait SealedPubSubBehavior { /// Publish a message immediately fn publish_immediate(&self, message: T); - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers - fn space(&self) -> usize; + /// Returns the maximum number of elements the channel can hold. + fn capacity(&self) -> usize; + + /// Returns the free capacity of the channel. + /// + /// This is equivalent to `capacity() - len()` + fn free_capacity(&self) -> usize; + + /// Returns the number of elements currently in the channel. + fn len(&self) -> usize; + + /// Returns whether the channel is empty. + fn is_empty(&self) -> bool; + + /// Returns whether the channel is full. + fn is_full(&self) -> bool; /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -588,6 +615,7 @@ mod tests { assert_eq!(pub0.try_publish(0), Ok(())); assert_eq!(pub0.try_publish(0), Ok(())); assert_eq!(pub0.try_publish(0), Ok(())); + assert!(pub0.is_full()); assert_eq!(pub0.try_publish(0), Err(0)); drop(sub0); @@ -620,32 +648,42 @@ mod tests { } #[futures_test::test] - async fn correct_space() { + async fn correct_len() { let channel = PubSubChannel::::new(); let mut sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); let pub0 = channel.publisher().unwrap(); - assert_eq!(pub0.space(), 4); + assert!(sub0.is_empty()); + assert!(sub1.is_empty()); + assert!(pub0.is_empty()); + assert_eq!(pub0.free_capacity(), 4); + assert_eq!(pub0.len(), 0); pub0.publish(42).await; - assert_eq!(pub0.space(), 3); + assert_eq!(pub0.free_capacity(), 3); + assert_eq!(pub0.len(), 1); pub0.publish(42).await; - assert_eq!(pub0.space(), 2); + assert_eq!(pub0.free_capacity(), 2); + assert_eq!(pub0.len(), 2); sub0.next_message().await; sub0.next_message().await; - assert_eq!(pub0.space(), 2); + assert_eq!(pub0.free_capacity(), 2); + assert_eq!(pub0.len(), 2); sub1.next_message().await; - assert_eq!(pub0.space(), 3); + assert_eq!(pub0.free_capacity(), 3); + assert_eq!(pub0.len(), 1); + sub1.next_message().await; - assert_eq!(pub0.space(), 4); + assert_eq!(pub0.free_capacity(), 4); + assert_eq!(pub0.len(), 0); } #[futures_test::test] @@ -656,29 +694,29 @@ mod tests { let mut sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); - assert_eq!(4, pub0.space()); + assert_eq!(4, pub0.free_capacity()); pub0.publish(1).await; pub0.publish(2).await; - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); assert_eq!(1, sub0.try_next_message_pure().unwrap()); assert_eq!(2, sub0.try_next_message_pure().unwrap()); - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); drop(sub0); - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); assert_eq!(1, sub1.try_next_message_pure().unwrap()); - assert_eq!(3, channel.space()); + assert_eq!(3, channel.free_capacity()); drop(sub1); - assert_eq!(4, channel.space()); + assert_eq!(4, channel.free_capacity()); } struct CloneCallCounter(usize); diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e1edc9eb9..26e2c63b7 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -43,12 +43,31 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { self.channel.publish_with_context(message, None) } - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. /// - /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. - /// So checking doesn't give any guarantees.* - pub fn space(&self) -> usize { - self.channel.space() + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() } } @@ -124,12 +143,31 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { self.channel.publish_with_context(message, None) } - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. /// - /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. - /// So checking doesn't give any guarantees.* - pub fn space(&self) -> usize { - self.channel.space() + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() } } diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index f420a75f0..2e2bd26a9 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -65,10 +65,39 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } } - /// The amount of messages this subscriber hasn't received yet + /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically + /// for this subscriber. pub fn available(&self) -> u64 { self.channel.available(self.next_message_id) } + + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + /// See [Self::available] for how many messages are available for this subscriber. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { -- cgit From 73e523ab14b69494e324ba09fec88bc5eb0b3d2d Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Tue, 21 May 2024 22:59:39 +0200 Subject: embassy-sync: fixed some clippy warnings --- embassy-sync/src/channel.rs | 8 ++++---- embassy-sync/src/pipe.rs | 2 +- embassy-sync/src/priority_channel.rs | 4 ++-- embassy-sync/src/waitqueue/multi_waker.rs | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index f9f71d026..b124a885f 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -42,7 +42,7 @@ where M: RawMutex, { fn clone(&self) -> Self { - Sender { channel: self.channel } + *self } } @@ -81,7 +81,7 @@ pub struct DynamicSender<'ch, T> { impl<'ch, T> Clone for DynamicSender<'ch, T> { fn clone(&self) -> Self { - DynamicSender { channel: self.channel } + *self } } @@ -135,7 +135,7 @@ where M: RawMutex, { fn clone(&self) -> Self { - Receiver { channel: self.channel } + *self } } @@ -188,7 +188,7 @@ pub struct DynamicReceiver<'ch, T> { impl<'ch, T> Clone for DynamicReceiver<'ch, T> { fn clone(&self) -> Self { - DynamicReceiver { channel: self.channel } + *self } } diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 42fe8ebd0..cd5b8ed75 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -25,7 +25,7 @@ where M: RawMutex, { fn clone(&self) -> Self { - Writer { pipe: self.pipe } + *self } } diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 4b9bd0515..8572d3608 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -33,7 +33,7 @@ where M: RawMutex, { fn clone(&self) -> Self { - Sender { channel: self.channel } + *self } } @@ -101,7 +101,7 @@ where M: RawMutex, { fn clone(&self) -> Self { - Receiver { channel: self.channel } + *self } } diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 824d192da..0e520bf40 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs @@ -14,7 +14,7 @@ impl MultiWakerRegistration { } /// Register a waker. If the buffer is full the function returns it in the error - pub fn register<'a>(&mut self, w: &'a Waker) { + pub fn register(&mut self, w: &Waker) { // If we already have some waker that wakes the same task as `w`, do nothing. // This avoids cloning wakers, and avoids unnecessary mass-wakes. for w2 in &self.wakers { -- cgit From aee9d5902a4feb4a5fbb8d0e719401c96f3f651e Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Wed, 22 May 2024 00:54:52 +0200 Subject: embassy-sync: fixed some documentation typos --- embassy-sync/src/once_lock.rs | 6 +++--- embassy-sync/src/priority_channel.rs | 2 +- embassy-sync/src/pubsub/mod.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index 9332ecfaf..55608ba32 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs @@ -1,4 +1,4 @@ -//! Syncronization primitive for initializing a value once, allowing others to await a reference to the value. +//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. use core::cell::Cell; use core::future::poll_fn; @@ -78,7 +78,7 @@ impl OnceLock { /// Set the underlying value. If the value is already set, this will return an error with the given value. pub fn init(&self, value: T) -> Result<(), T> { // Critical section is required to ensure that the value is - // not simultaniously initialized elsewhere at the same time. + // not simultaneously initialized elsewhere at the same time. critical_section::with(|_| { // If the value is not set, set it and return Ok. if !self.init.load(Ordering::Relaxed) { @@ -99,7 +99,7 @@ impl OnceLock { F: FnOnce() -> T, { // Critical section is required to ensure that the value is - // not simultaniously initialized elsewhere at the same time. + // not simultaneously initialized elsewhere at the same time. critical_section::with(|_| { // If the value is not set, set it. if !self.init.load(Ordering::Relaxed) { diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 8572d3608..2954d1b76 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -335,7 +335,7 @@ where /// buffer is full, attempts to `send` new messages will wait until a message is /// received from the channel. /// -/// Sent data may be reordered based on their priorty within the channel. +/// Sent data may be reordered based on their priority within the channel. /// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. pub struct PriorityChannel diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index af3d6db2a..637336d9d 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -437,7 +437,7 @@ pub enum Error { trait SealedPubSubBehavior { /// Try to get a message from the queue with the given message id. /// - /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. + /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers. fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; /// Get the amount of messages that are between the given the next_message_id and the most recent message. -- cgit From bbeba7f014b383568e999650f0d954a33955ce73 Mon Sep 17 00:00:00 2001 From: Oliver Rockstedt Date: Wed, 22 May 2024 14:54:09 +0200 Subject: embassy-sync: Add clear function to all channels --- embassy-sync/src/channel.rs | 9 +++++++++ embassy-sync/src/priority_channel.rs | 9 +++++++++ embassy-sync/src/pubsub/mod.rs | 16 ++++++++++++++++ embassy-sync/src/pubsub/publisher.rs | 10 ++++++++++ embassy-sync/src/pubsub/subscriber.rs | 5 +++++ 5 files changed, 49 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index b124a885f..55ac5fb66 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -477,6 +477,10 @@ impl ChannelState { } } + fn clear(&mut self) { + self.queue.clear(); + } + fn len(&self) -> usize { self.queue.len() } @@ -632,6 +636,11 @@ where N - self.len() } + /// Clears all elements in the channel. + pub fn clear(&self) { + self.lock(|c| c.clear()); + } + /// Returns the number of elements currently in the channel. pub fn len(&self) -> usize { self.lock(|c| c.len()) diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 2954d1b76..24c6c5a7f 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -315,6 +315,10 @@ where } } + fn clear(&mut self) { + self.queue.clear(); + } + fn len(&self) -> usize { self.queue.len() } @@ -458,6 +462,11 @@ where N - self.len() } + /// Clears all elements in the channel. + pub fn clear(&self) { + self.lock(|c| c.clear()); + } + /// Returns the number of elements currently in the channel. pub fn len(&self) -> usize { self.lock(|c| c.len()) diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 637336d9d..66c9b0017 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -173,6 +173,11 @@ impl usize { self.inner.lock(|inner| inner.borrow().len()) @@ -270,6 +275,10 @@ impl usize { self.len() } @@ -407,6 +416,10 @@ impl PubSubSta self.publisher_count -= 1; } + fn clear(&mut self) { + self.queue.clear(); + } + fn len(&self) -> usize { self.queue.len() } @@ -460,6 +473,9 @@ trait SealedPubSubBehavior { /// This is equivalent to `capacity() - len()` fn free_capacity(&self) -> usize; + /// Clears all elements in the channel. + fn clear(&self); + /// Returns the number of elements currently in the channel. fn len(&self) -> usize; diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 26e2c63b7..e66b3b1db 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -55,6 +55,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { self.channel.free_capacity() } + /// Clears all elements in the ***channel***. + pub fn clear(&self) { + self.channel.clear(); + } + /// Returns the number of elements currently in the ***channel***. pub fn len(&self) -> usize { self.channel.len() @@ -155,6 +160,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { self.channel.free_capacity() } + /// Clears all elements in the ***channel***. + pub fn clear(&self) { + self.channel.clear(); + } + /// Returns the number of elements currently in the ***channel***. pub fn len(&self) -> usize { self.channel.len() diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index 2e2bd26a9..6ad660cb3 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -83,6 +83,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { self.channel.free_capacity() } + /// Clears all elements in the ***channel***. + pub fn clear(&self) { + self.channel.clear(); + } + /// Returns the number of elements currently in the ***channel***. /// See [Self::available] for how many messages are available for this subscriber. pub fn len(&self) -> usize { -- cgit From 368893c9cb1b192c9e0d45440cacb271d1039c29 Mon Sep 17 00:00:00 2001 From: Jan Špaček Date: Sat, 25 May 2024 21:44:37 +0200 Subject: Emit cargo:rustc-check-cfg instructions from build.rs --- embassy-sync/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 1873483f9..a5eee8d02 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] +#![cfg_attr(not(feature = "std"), no_std)] #![allow(async_fn_in_trait)] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] -- cgit From 3e85fb4f52678740d8fd2dc53ad4f5f3979028cc Mon Sep 17 00:00:00 2001 From: Jesse Stuart Date: Wed, 5 Jun 2024 20:22:48 -0400 Subject: embassy-sync: remove T: Send for Signal --- embassy-sync/src/signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index 520f1a896..a0f4b5a74 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -65,7 +65,7 @@ where } } -impl Signal +impl Signal where M: RawMutex, { -- cgit From 871fe3a5493961e81ec41ddc8f000f32b0549e71 Mon Sep 17 00:00:00 2001 From: dvdsk Date: Thu, 6 Jun 2024 23:19:07 +0200 Subject: Add Clone and Copy to Error types None of them are `non-exaustative`, they are all small enough to be copy (I estimate none are larger than 4 bytes). --- embassy-sync/src/pubsub/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 66c9b0017..a97eb7d5b 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -434,7 +434,7 @@ impl PubSubSta } /// Error type for the [PubSubChannel] -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Error { /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or -- cgit From 6a4ac5bd60693307721aa82c26909ffd05e2b193 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 17 Jun 2024 01:38:57 +0200 Subject: Add collapse_debuginfo to fmt.rs macros. This makes location info in defmt logs point to the code calling the macro, instead of always to fmt.rs as before. Fix works with nightlies starting with today's, and stable 1.81+. --- embassy-sync/src/fmt.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 2ac42c557..35b929fde 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs @@ -6,6 +6,7 @@ use core::fmt::{Debug, Display, LowerHex}; #[cfg(all(feature = "defmt", feature = "log"))] compile_error!("You may not enable both `defmt` and `log` features."); +#[collapse_debuginfo(yes)] macro_rules! assert { ($($x:tt)*) => { { @@ -17,6 +18,7 @@ macro_rules! assert { }; } +#[collapse_debuginfo(yes)] macro_rules! assert_eq { ($($x:tt)*) => { { @@ -28,6 +30,7 @@ macro_rules! assert_eq { }; } +#[collapse_debuginfo(yes)] macro_rules! assert_ne { ($($x:tt)*) => { { @@ -39,6 +42,7 @@ macro_rules! assert_ne { }; } +#[collapse_debuginfo(yes)] macro_rules! debug_assert { ($($x:tt)*) => { { @@ -50,6 +54,7 @@ macro_rules! debug_assert { }; } +#[collapse_debuginfo(yes)] macro_rules! debug_assert_eq { ($($x:tt)*) => { { @@ -61,6 +66,7 @@ macro_rules! debug_assert_eq { }; } +#[collapse_debuginfo(yes)] macro_rules! debug_assert_ne { ($($x:tt)*) => { { @@ -72,6 +78,7 @@ macro_rules! debug_assert_ne { }; } +#[collapse_debuginfo(yes)] macro_rules! todo { ($($x:tt)*) => { { @@ -84,6 +91,7 @@ macro_rules! todo { } #[cfg(not(feature = "defmt"))] +#[collapse_debuginfo(yes)] macro_rules! unreachable { ($($x:tt)*) => { ::core::unreachable!($($x)*) @@ -91,12 +99,14 @@ macro_rules! unreachable { } #[cfg(feature = "defmt")] +#[collapse_debuginfo(yes)] macro_rules! unreachable { ($($x:tt)*) => { ::defmt::unreachable!($($x)*) }; } +#[collapse_debuginfo(yes)] macro_rules! panic { ($($x:tt)*) => { { @@ -108,6 +118,7 @@ macro_rules! panic { }; } +#[collapse_debuginfo(yes)] macro_rules! trace { ($s:literal $(, $x:expr)* $(,)?) => { { @@ -121,6 +132,7 @@ macro_rules! trace { }; } +#[collapse_debuginfo(yes)] macro_rules! debug { ($s:literal $(, $x:expr)* $(,)?) => { { @@ -134,6 +146,7 @@ macro_rules! debug { }; } +#[collapse_debuginfo(yes)] macro_rules! info { ($s:literal $(, $x:expr)* $(,)?) => { { @@ -147,6 +160,7 @@ macro_rules! info { }; } +#[collapse_debuginfo(yes)] macro_rules! warn { ($s:literal $(, $x:expr)* $(,)?) => { { @@ -160,6 +174,7 @@ macro_rules! warn { }; } +#[collapse_debuginfo(yes)] macro_rules! error { ($s:literal $(, $x:expr)* $(,)?) => { { @@ -174,6 +189,7 @@ macro_rules! error { } #[cfg(feature = "defmt")] +#[collapse_debuginfo(yes)] macro_rules! unwrap { ($($x:tt)*) => { ::defmt::unwrap!($($x)*) @@ -181,6 +197,7 @@ macro_rules! unwrap { } #[cfg(not(feature = "defmt"))] +#[collapse_debuginfo(yes)] macro_rules! unwrap { ($arg:expr) => { match $crate::fmt::Try::into_result($arg) { -- cgit From a716a3f006e439bbd9d11ab858d8ee4a93ec48f8 Mon Sep 17 00:00:00 2001 From: Tarun Singh Date: Wed, 17 Jul 2024 17:05:52 -0400 Subject: Reduced define for 'unreachable!' to a single macro rule --- embassy-sync/src/fmt.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 35b929fde..8ca61bc39 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs @@ -90,19 +90,15 @@ macro_rules! todo { }; } -#[cfg(not(feature = "defmt"))] -#[collapse_debuginfo(yes)] -macro_rules! unreachable { - ($($x:tt)*) => { - ::core::unreachable!($($x)*) - }; -} - -#[cfg(feature = "defmt")] #[collapse_debuginfo(yes)] macro_rules! unreachable { ($($x:tt)*) => { - ::defmt::unreachable!($($x)*) + { + #[cfg(not(feature = "defmt"))] + ::core::unreachable!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::unreachable!($($x)*); + } }; } -- cgit From 05e0f128462ebdacd3dffc27f74502913d782589 Mon Sep 17 00:00:00 2001 From: Samuel Tardieu Date: Sat, 27 Jul 2024 11:49:02 +0200 Subject: embassy-sync: add LazyLock `LazyLock` is inspired by Rust 1.80.0's `std::sync::LazyLock` type. --- embassy-sync/src/lazy_lock.rs | 84 +++++++++++++++++++++++++++++++++++++++++++ embassy-sync/src/lib.rs | 1 + 2 files changed, 85 insertions(+) create mode 100644 embassy-sync/src/lazy_lock.rs (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs new file mode 100644 index 000000000..2b5742491 --- /dev/null +++ b/embassy-sync/src/lazy_lock.rs @@ -0,0 +1,84 @@ +//! Synchronization primitive for initializing a value once, allowing others to get a reference to the value. + +use core::cell::Cell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; + +/// The `LazyLock` is a synchronization primitive that allows for +/// initializing a value once, and allowing others to obtain a +/// reference to the value. This is useful for lazy initialization of +/// a static value. +/// +/// # Example +/// ``` +/// use futures_executor::block_on; +/// use embassy_sync::lazy_lock::LazyLock; +/// +/// // Define a static value that will be lazily initialized +/// // at runtime at the first access. +/// static VALUE: LazyLock = LazyLock::new(|| 20); +/// +/// let reference = VALUE.get(); +/// assert_eq!(reference, &20); +/// ``` +pub struct LazyLock T> { + init: AtomicBool, + init_fn: Cell>, + data: Cell>, +} + +unsafe impl Sync for LazyLock {} + +impl T> LazyLock { + /// Create a new uninitialized `StaticLock`. + pub const fn new(init_fn: F) -> Self { + Self { + init: AtomicBool::new(false), + init_fn: Cell::new(Some(init_fn)), + data: Cell::new(MaybeUninit::zeroed()), + } + } + + /// Get a reference to the underlying value, initializing it if it + /// has not been done already. + #[inline] + pub fn get(&self) -> &T { + self.ensure_init_fast(); + unsafe { (*self.data.as_ptr()).assume_init_ref() } + } + + /// Consume the `LazyLock`, returning the underlying value. The + /// initialization function will be called if it has not been + /// already. + #[inline] + pub fn into_inner(self) -> T { + self.ensure_init_fast(); + unsafe { self.data.into_inner().assume_init() } + } + + /// Initialize the `LazyLock` if it has not been initialized yet. + /// This function is a fast track to [`Self::ensure_init`] + /// which does not require a critical section in most cases when + /// the value has been initialized already. + /// When this function returns, `self.data` is guaranteed to be + /// initialized and visible on the current core. + #[inline] + fn ensure_init_fast(&self) { + if !self.init.load(Ordering::Acquire) { + self.ensure_init(); + } + } + + /// Initialize the `LazyLock` if it has not been initialized yet. + /// When this function returns, `self.data` is guaranteed to be + /// initialized and visible on the current core. + fn ensure_init(&self) { + critical_section::with(|_| { + if !self.init.load(Ordering::Acquire) { + let init_fn = self.init_fn.take().unwrap(); + self.data.set(MaybeUninit::new(init_fn())); + self.init.store(true, Ordering::Release); + } + }); + } +} diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index a5eee8d02..014bf1d06 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -12,6 +12,7 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; +pub mod lazy_lock; pub mod mutex; pub mod once_lock; pub mod pipe; -- cgit From 93696c912e7264e101308a3f205272dcdd44e6b2 Mon Sep 17 00:00:00 2001 From: wanglei Date: Wed, 31 Jul 2024 00:24:39 +0800 Subject: embassy-sync: fix the data of LazyLock never drop Using `union` can save more space. And the `MaybeUninit` will never drop the T, when dropping the `MaybeUninit`. Fixed it. Signed-off-by: wanglei --- embassy-sync/src/lazy_lock.rs | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs index 2b5742491..cf88bfdf8 100644 --- a/embassy-sync/src/lazy_lock.rs +++ b/embassy-sync/src/lazy_lock.rs @@ -1,7 +1,7 @@ //! Synchronization primitive for initializing a value once, allowing others to get a reference to the value. -use core::cell::Cell; -use core::mem::MaybeUninit; +use core::cell::UnsafeCell; +use core::mem::ManuallyDrop; use core::sync::atomic::{AtomicBool, Ordering}; /// The `LazyLock` is a synchronization primitive that allows for @@ -23,8 +23,12 @@ use core::sync::atomic::{AtomicBool, Ordering}; /// ``` pub struct LazyLock T> { init: AtomicBool, - init_fn: Cell>, - data: Cell>, + data: UnsafeCell>, +} + +union Data { + value: ManuallyDrop, + f: ManuallyDrop, } unsafe impl Sync for LazyLock {} @@ -34,8 +38,9 @@ impl T> LazyLock { pub const fn new(init_fn: F) -> Self { Self { init: AtomicBool::new(false), - init_fn: Cell::new(Some(init_fn)), - data: Cell::new(MaybeUninit::zeroed()), + data: UnsafeCell::new(Data { + f: ManuallyDrop::new(init_fn), + }), } } @@ -44,7 +49,7 @@ impl T> LazyLock { #[inline] pub fn get(&self) -> &T { self.ensure_init_fast(); - unsafe { (*self.data.as_ptr()).assume_init_ref() } + unsafe { &(*self.data.get()).value } } /// Consume the `LazyLock`, returning the underlying value. The @@ -53,7 +58,10 @@ impl T> LazyLock { #[inline] pub fn into_inner(self) -> T { self.ensure_init_fast(); - unsafe { self.data.into_inner().assume_init() } + let this = ManuallyDrop::new(self); + let data = unsafe { core::ptr::read(&this.data) }.into_inner(); + + ManuallyDrop::into_inner(unsafe { data.value }) } /// Initialize the `LazyLock` if it has not been initialized yet. @@ -75,10 +83,23 @@ impl T> LazyLock { fn ensure_init(&self) { critical_section::with(|_| { if !self.init.load(Ordering::Acquire) { - let init_fn = self.init_fn.take().unwrap(); - self.data.set(MaybeUninit::new(init_fn())); + let data = unsafe { &mut *self.data.get() }; + let f = unsafe { ManuallyDrop::take(&mut data.f) }; + let value = f(); + data.value = ManuallyDrop::new(value); + self.init.store(true, Ordering::Release); } }); } } + +impl Drop for LazyLock { + fn drop(&mut self) { + if self.init.load(Ordering::Acquire) { + unsafe { ManuallyDrop::drop(&mut self.data.get_mut().value) }; + } else { + unsafe { ManuallyDrop::drop(&mut self.data.get_mut().f) }; + } + } +} -- cgit From 05562b92af53c742c6531ec616afd518112687b8 Mon Sep 17 00:00:00 2001 From: wanglei Date: Wed, 31 Jul 2024 00:25:28 +0800 Subject: embassy-sync: more unit-test for LazyLock Signed-off-by: wanglei --- embassy-sync/src/lazy_lock.rs | 47 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs index cf88bfdf8..18e3c2019 100644 --- a/embassy-sync/src/lazy_lock.rs +++ b/embassy-sync/src/lazy_lock.rs @@ -103,3 +103,50 @@ impl Drop for LazyLock { } } } + +#[cfg(test)] +mod tests { + use core::sync::atomic::{AtomicU32, Ordering}; + + use super::*; + + #[test] + fn test_lazy_lock() { + static VALUE: LazyLock = LazyLock::new(|| 20); + let reference = VALUE.get(); + assert_eq!(reference, &20); + } + #[test] + fn test_lazy_lock_into_inner() { + let lazy: LazyLock = LazyLock::new(|| 20); + let value = lazy.into_inner(); + assert_eq!(value, 20); + } + + static DROP_CHECKER: AtomicU32 = AtomicU32::new(0); + struct DropCheck; + + impl Drop for DropCheck { + fn drop(&mut self) { + DROP_CHECKER.fetch_add(1, Ordering::Acquire); + } + } + + #[test] + fn test_lazy_drop() { + let lazy: LazyLock = LazyLock::new(|| DropCheck); + assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 0); + lazy.get(); + drop(lazy); + assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1); + + let dropper = DropCheck; + let lazy_fn: LazyLock = LazyLock::new(move || { + let _a = dropper; + 20 + }); + assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1); + drop(lazy_fn); + assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 2); + } +} -- cgit From 893b8d79e8bab8dae0f46a0182443df9160592b5 Mon Sep 17 00:00:00 2001 From: Nathan Perry Date: Thu, 19 Sep 2024 08:17:33 -0400 Subject: embassy_sync/pubsub: fix PubSubBehavior visibility https://github.com/embassy-rs/embassy/pull/2969 appears to have broken direct `publish_immediate()` on `pubsub::Channel`, as it functionally made `PubSubBehavior` private and didn't delegate this method to the new (private) `SealedPubSubBehavior`. This change moves `publish_immediate`, `capacity`, and `is_full` from `SealedPubSubBehavior` to `PubSubBehavior` in order to restore them to `pub` visibility. --- embassy-sync/src/pubsub/mod.rs | 56 +++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 28 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index a97eb7d5b..812302e2b 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -194,6 +194,25 @@ impl crate::pubsub::PubSubBehavior + for PubSubChannel +{ + fn publish_immediate(&self, message: T) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.publish_immediate(message) + }) + } + + fn capacity(&self) -> usize { + self.capacity() + } + + fn is_full(&self) -> bool { + self.is_full() + } +} + impl SealedPubSubBehavior for PubSubChannel { @@ -246,13 +265,6 @@ impl usize { - self.capacity() - } - fn free_capacity(&self) -> usize { self.free_capacity() } @@ -286,10 +294,6 @@ impl bool { self.is_empty() } - - fn is_full(&self) -> bool { - self.is_full() - } } /// Internal state for the PubSub channel @@ -445,8 +449,6 @@ pub enum Error { MaximumPublishersReached, } -/// 'Middle level' behaviour of the pubsub channel. -/// This trait is used so that Sub and Pub can be generic over the channel. trait SealedPubSubBehavior { /// Try to get a message from the queue with the given message id. /// @@ -462,12 +464,6 @@ trait SealedPubSubBehavior { /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; - /// Publish a message immediately - fn publish_immediate(&self, message: T); - - /// Returns the maximum number of elements the channel can hold. - fn capacity(&self) -> usize; - /// Returns the free capacity of the channel. /// /// This is equivalent to `capacity() - len()` @@ -482,9 +478,6 @@ trait SealedPubSubBehavior { /// Returns whether the channel is empty. fn is_empty(&self) -> bool; - /// Returns whether the channel is full. - fn is_full(&self) -> bool; - /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -495,9 +488,16 @@ trait SealedPubSubBehavior { /// 'Middle level' behaviour of the pubsub channel. /// This trait is used so that Sub and Pub can be generic over the channel. #[allow(private_bounds)] -pub trait PubSubBehavior: SealedPubSubBehavior {} +pub trait PubSubBehavior: SealedPubSubBehavior { + /// Publish a message immediately + fn publish_immediate(&self, message: T); -impl> PubSubBehavior for C {} + /// Returns the maximum number of elements the channel can hold. + fn capacity(&self) -> usize; + + /// Returns whether the channel is full. + fn is_full(&self) -> bool; +} /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq, Eq)] -- cgit