From f9d0daad80827dd1b379ca727a2e27870a497122 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Wed, 20 Dec 2023 08:37:15 +0100 Subject: feat(embassy-sync): Add try_take() to signal --- embassy-sync/src/signal.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index bea67d8be..97d76b463 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -111,6 +111,17 @@ where poll_fn(move |cx| self.poll_wait(cx)) } + /// non-blocking method to try and take the signal value. + pub fn try_take(&self) -> Option { + self.state.lock(|cell| { + let state = cell.replace(State::None); + match state { + State::Signaled(res) => Some(res), + _ => None, + } + }) + } + /// non-blocking method to check whether this signal has been signaled. pub fn signaled(&self) -> bool { self.state.lock(|cell| { -- cgit From 13c107e81582e2249df2fd940791b611a1ddbd62 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Wed, 20 Dec 2023 13:09:16 +0100 Subject: Put waiting state back if any --- embassy-sync/src/signal.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index 97d76b463..d75750ce7 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -117,7 +117,10 @@ where let state = cell.replace(State::None); match state { State::Signaled(res) => Some(res), - _ => None, + state => { + cell.set(state); + None + } } }) } -- cgit From fe0b21e21e8a1144a37ef07892927062682206a1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 10 Jan 2024 18:25:45 +0100 Subject: Remove nightly autodetects. --- embassy-sync/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index b0ccfde57..d88c76db5 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -1,6 +1,4 @@ #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(nightly, feature(async_fn_in_trait, impl_trait_projections))] -#![cfg_attr(nightly, allow(stable_features, unknown_lints))] #![allow(async_fn_in_trait)] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] -- cgit From 15e9b60abbceb8843781b6eec398dafb45cc111d Mon Sep 17 00:00:00 2001 From: Sam Lakerveld Date: Thu, 1 Feb 2024 13:47:07 +0100 Subject: sync/pipe: be able to be zero-initialized --- embassy-sync/src/ring_buffer.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs index d95ffa7c9..81e60c42b 100644 --- a/embassy-sync/src/ring_buffer.rs +++ b/embassy-sync/src/ring_buffer.rs @@ -3,7 +3,7 @@ use core::ops::Range; pub struct RingBuffer { start: usize, end: usize, - empty: bool, + full: bool, } impl RingBuffer { @@ -11,13 +11,13 @@ impl RingBuffer { Self { start: 0, end: 0, - empty: true, + full: false, } } pub fn push_buf(&mut self) -> Range { - if self.start == self.end && !self.empty { - trace!(" ringbuf: push_buf empty"); + if self.is_full() { + trace!(" ringbuf: push_buf full"); return 0..0; } @@ -38,11 +38,11 @@ impl RingBuffer { } self.end = self.wrap(self.end + n); - self.empty = false; + self.full = self.start == self.end; } pub fn pop_buf(&mut self) -> Range { - if self.empty { + if self.is_empty() { trace!(" ringbuf: pop_buf empty"); return 0..0; } @@ -64,20 +64,20 @@ impl RingBuffer { } self.start = self.wrap(self.start + n); - self.empty = self.start == self.end; + self.full = false; } pub fn is_full(&self) -> bool { - self.start == self.end && !self.empty + self.full } pub fn is_empty(&self) -> bool { - self.empty + self.start == self.end && !self.full } #[allow(unused)] pub fn len(&self) -> usize { - if self.empty { + if self.is_empty() { 0 } else if self.start < self.end { self.end - self.start @@ -89,7 +89,7 @@ impl RingBuffer { pub fn clear(&mut self) { self.start = 0; self.end = 0; - self.empty = true; + self.full = false; } fn wrap(&self, n: usize) -> usize { -- cgit From b9d0069671b33107e35af6bdaa662e9c7be8e3f9 Mon Sep 17 00:00:00 2001 From: Stefan Gehr Date: Sat, 3 Feb 2024 14:56:31 +0100 Subject: correct spelling of the word "receive" --- embassy-sync/src/priority_channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index bd75c0135..e77678c24 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -325,7 +325,7 @@ where /// /// Sent data may be reordered based on their priorty within the channel. /// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] -/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be recieved as `[3, 2, 1]`. +/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. pub struct PriorityChannel where T: Ord, -- cgit From 69d37503c22be73c3d8283f39155cfa1559a37eb Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sat, 2 Mar 2024 13:13:26 +0100 Subject: Add constructor for dynamic channel --- embassy-sync/src/channel.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index ff7129303..01db0d09a 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -507,6 +507,16 @@ where Receiver { channel: self } } + /// Get a sender for this channel using dynamic dispatch. + pub fn dyn_sender(&self) -> DynamicSender<'_, T> { + DynamicSender { channel: self } + } + + /// Get a receiver for this channel using dynamic dispatch. + pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> { + DynamicReceiver { channel: self } + } + /// Send a value, waiting until there is capacity. /// /// Sending completes when the value has been pushed to the channel's queue. @@ -648,7 +658,7 @@ mod tests { } #[test] - fn dynamic_dispatch() { + fn dynamic_dispatch_into() { let c = Channel::::new(); let s: DynamicSender<'_, u32> = c.sender().into(); let r: DynamicReceiver<'_, u32> = c.receiver().into(); @@ -657,6 +667,16 @@ mod tests { assert_eq!(r.try_receive().unwrap(), 1); } + #[test] + fn dynamic_dispatch_constructor() { + let c = Channel::::new(); + let s = c.dyn_sender(); + let r = c.dyn_receiver(); + + assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_receive().unwrap(), 1); + } + #[futures_test::test] async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap(); -- cgit From 4bbcc2a7fbf1a363719d224dd80dfbcbbee6f26e Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sun, 3 Mar 2024 15:35:52 +0100 Subject: Add OnceLock sync primitive --- embassy-sync/src/lib.rs | 1 + embassy-sync/src/once_lock.rs | 237 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+) create mode 100644 embassy-sync/src/once_lock.rs (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index d88c76db5..61b173e80 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -13,6 +13,7 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; pub mod mutex; +pub mod once_lock; pub mod pipe; pub mod priority_channel; pub mod pubsub; diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs new file mode 100644 index 000000000..f83577a6d --- /dev/null +++ b/embassy-sync/src/once_lock.rs @@ -0,0 +1,237 @@ +//! Syncronization primitive for initializing a value once, allowing others to await a reference to the value. + +use core::cell::Cell; +use core::future::poll_fn; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicBool, Ordering}; +use core::task::Poll; + +/// The `OnceLock` is a synchronization primitive that allows for +/// initializing a value once, and allowing others to `.await` a +/// reference to the value. This is useful for lazy initialization of +/// a static value. +/// +/// **Note**: this implementation uses a busy loop to poll the value, +/// which is not as efficient as registering a dedicated `Waker`. +/// However, the if the usecase for is to initialize a static variable +/// relatively early in the program life cycle, it should be fine. +/// +/// # Example +/// ``` +/// use futures_executor::block_on; +/// use embassy_sync::once_lock::OnceLock; +/// +/// // Define a static value that will be lazily initialized +/// static VALUE: OnceLock = OnceLock::new(); +/// +/// let f = async { +/// +/// // Initialize the value +/// let reference = VALUE.get_or_init(|| 20); +/// assert_eq!(reference, &20); +/// +/// // Wait for the value to be initialized +/// // and get a static reference it +/// assert_eq!(VALUE.get().await, &20); +/// +/// }; +/// block_on(f) +/// ``` +pub struct OnceLock { + init: AtomicBool, + data: Cell>, +} + +unsafe impl Sync for OnceLock {} + +impl OnceLock { + /// Create a new uninitialized `OnceLock`. + pub const fn new() -> Self { + Self { + init: AtomicBool::new(false), + data: Cell::new(MaybeUninit::zeroed()), + } + } + + /// Get a reference to the underlying value, waiting for it to be set. + /// If the value is already set, this will return immediately. + pub async fn get(&self) -> &T { + + poll_fn(|cx| match self.try_get() { + Some(data) => Poll::Ready(data), + None => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .await + } + + /// Try to get a reference to the underlying value if it exists. + pub fn try_get(&self) -> Option<&T> { + if self.init.load(Ordering::Relaxed) { + Some(unsafe { self.get_ref_unchecked() }) + } else { + None + } + } + + /// Set the underlying value. If the value is already set, this will return an error with the given value. + pub fn init(&self, value: T) -> Result<(), T> { + // Critical section is required to ensure that the value is + // not simultaniously initialized elsewhere at the same time. + critical_section::with(|_| { + // If the value is not set, set it and return Ok. + if !self.init.load(Ordering::Relaxed) { + self.data.set(MaybeUninit::new(value)); + self.init.store(true, Ordering::Relaxed); + Ok(()) + + // Otherwise return an error with the given value. + } else { + Err(value) + } + }) + } + + /// Get a reference to the underlying value, initializing it if it does not exist. + pub fn get_or_init(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + // Critical section is required to ensure that the value is + // not simultaniously initialized elsewhere at the same time. + critical_section::with(|_| { + // If the value is not set, set it. + if !self.init.load(Ordering::Relaxed) { + self.data.set(MaybeUninit::new(f())); + self.init.store(true, Ordering::Relaxed); + } + }); + + // Return a reference to the value. + unsafe { self.get_ref_unchecked() } + } + + /// Consume the `OnceLock`, returning the underlying value if it was initialized. + pub fn into_inner(self) -> Option { + if self.init.load(Ordering::Relaxed) { + Some(unsafe { self.data.into_inner().assume_init() }) + } else { + None + } + } + + /// Take the underlying value if it was initialized, uninitializing the `OnceLock` in the process. + pub fn take(&mut self) -> Option { + // If the value is set, uninitialize the lock and return the value. + critical_section::with(|_| { + if self.init.load(Ordering::Relaxed) { + let val = unsafe { self.data.replace(MaybeUninit::zeroed()).assume_init() }; + self.init.store(false, Ordering::Relaxed); + Some(val) + + // Otherwise return None. + } else { + None + } + }) + } + + /// Check if the value has been set. + pub fn is_set(&self) -> bool { + self.init.load(Ordering::Relaxed) + } + + /// Get a reference to the underlying value. + /// # Safety + /// Must only be used if a value has been set. + unsafe fn get_ref_unchecked(&self) -> &T { + (*self.data.as_ptr()).assume_init_ref() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn once_lock() { + let lock = OnceLock::new(); + assert_eq!(lock.try_get(), None); + assert_eq!(lock.is_set(), false); + + let v = 42; + assert_eq!(lock.init(v), Ok(())); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&v)); + assert_eq!(lock.try_get(), Some(&v)); + + let v = 43; + assert_eq!(lock.init(v), Err(v)); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&42)); + } + + #[test] + fn once_lock_get_or_init() { + let lock = OnceLock::new(); + assert_eq!(lock.try_get(), None); + assert_eq!(lock.is_set(), false); + + let v = lock.get_or_init(|| 42); + assert_eq!(v, &42); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&42)); + + let v = lock.get_or_init(|| 43); + assert_eq!(v, &42); + assert_eq!(lock.is_set(), true); + assert_eq!(lock.try_get(), Some(&42)); + } + + #[test] + fn once_lock_static() { + static LOCK: OnceLock = OnceLock::new(); + + let v: &'static i32 = LOCK.get_or_init(|| 42); + assert_eq!(v, &42); + + let v: &'static i32 = LOCK.get_or_init(|| 43); + assert_eq!(v, &42); + } + + #[futures_test::test] + async fn once_lock_async() { + static LOCK: OnceLock = OnceLock::new(); + + assert!(LOCK.init(42).is_ok()); + + let v: &'static i32 = LOCK.get().await; + assert_eq!(v, &42); + } + + #[test] + fn once_lock_into_inner() { + let lock: OnceLock = OnceLock::new(); + + let v = lock.get_or_init(|| 42); + assert_eq!(v, &42); + + assert_eq!(lock.into_inner(), Some(42)); + } + + #[test] + fn once_lock_take_init() { + let mut lock: OnceLock = OnceLock::new(); + + assert_eq!(lock.get_or_init(|| 42), &42); + assert_eq!(lock.is_set(), true); + + assert_eq!(lock.take(), Some(42)); + assert_eq!(lock.is_set(), false); + + assert_eq!(lock.get_or_init(|| 43), &43); + assert_eq!(lock.is_set(), true); + } +} -- cgit From 245e7d3bc20d66fa77f6763ce5e5bec0ea6ddeff Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sun, 3 Mar 2024 15:43:01 +0100 Subject: This one is for ci/rustfmt --- embassy-sync/src/once_lock.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index f83577a6d..31cc99711 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs @@ -33,7 +33,7 @@ use core::task::Poll; /// // Wait for the value to be initialized /// // and get a static reference it /// assert_eq!(VALUE.get().await, &20); -/// +/// /// }; /// block_on(f) /// ``` @@ -56,7 +56,6 @@ impl OnceLock { /// Get a reference to the underlying value, waiting for it to be set. /// If the value is already set, this will return immediately. pub async fn get(&self) -> &T { - poll_fn(|cx| match self.try_get() { Some(data) => Poll::Ready(data), None => { -- cgit From 2a257573773f3dabdf16ea6a44ba0dadef786f37 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 4 Mar 2024 18:36:34 +0100 Subject: docs: clarify capabilities of zerocopy channel --- embassy-sync/src/zerocopy_channel.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs index f704cbd5d..cfce9a571 100644 --- a/embassy-sync/src/zerocopy_channel.rs +++ b/embassy-sync/src/zerocopy_channel.rs @@ -1,10 +1,7 @@ //! A zero-copy queue for sending values between asynchronous tasks. //! -//! It can be used concurrently by multiple producers (senders) and multiple -//! consumers (receivers), i.e. it is an "MPMC channel". -//! -//! Receivers are competing for messages. So a message that is received by -//! one receiver is not received by any other. +//! It can be used concurrently by a producer (sender) and a +//! consumer (receiver), i.e. it is an "SPSC channel". //! //! This queue takes a Mutex type so that various //! targets can be attained. For example, a ThreadModeMutex can be used -- cgit From 8bb1fe1f653bd4bef6ad0760f33a07bcfe5d91fb Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 12 Mar 2024 15:27:52 +0100 Subject: Add conversion into dyn variants for channel futures --- embassy-sync/src/channel.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 01db0d09a..79d3e0378 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -263,6 +263,15 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { } } +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicReceiveFuture<'ch, T> +{ + fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self { + Self { + channel: value.channel, + } + } +} + /// Future returned by [`Channel::send`] and [`Sender::send`]. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SendFuture<'ch, M, T, const N: usize> @@ -321,6 +330,16 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicSendFuture<'ch, T> +{ + fn from(value: SendFuture<'ch, M, T, N>) -> Self { + Self { + channel: value.channel, + message: value.message, + } + } +} + pub(crate) trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; -- cgit From c0d91600ea7b6847e2295a0fee819291c951132f Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 12 Mar 2024 15:37:53 +0100 Subject: rustfmt --- embassy-sync/src/channel.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 79d3e0378..48f4dafd6 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -263,12 +263,9 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { } } -impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicReceiveFuture<'ch, T> -{ +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicReceiveFuture<'ch, T> { fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self { - Self { - channel: value.channel, - } + Self { channel: value.channel } } } @@ -330,8 +327,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} -impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicSendFuture<'ch, T> -{ +impl<'ch, M: RawMutex, T, const N: usize> From> for DynamicSendFuture<'ch, T> { fn from(value: SendFuture<'ch, M, T, N>) -> Self { Self { channel: value.channel, -- cgit From d06dbf332bc2557d83045c93f71a86163663d481 Mon Sep 17 00:00:00 2001 From: Noah Bliss Date: Wed, 20 Mar 2024 03:19:01 +0000 Subject: Doc update: signaled does not clear signal signaled does not clear signal (doc update) --- embassy-sync/src/signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index d75750ce7..520f1a896 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -125,7 +125,7 @@ where }) } - /// non-blocking method to check whether this signal has been signaled. + /// non-blocking method to check whether this signal has been signaled. This does not clear the signal. pub fn signaled(&self) -> bool { self.state.lock(|cell| { let state = cell.replace(State::None); -- cgit From 3d842dac85a4ea21519f56d4ec6342b528805b8a Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 20 Mar 2024 14:53:19 +0100 Subject: fmt: disable "unused" warnings. --- embassy-sync/src/fmt.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 78e583c1c..2ac42c557 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs @@ -1,5 +1,5 @@ #![macro_use] -#![allow(unused_macros)] +#![allow(unused)] use core::fmt::{Debug, Display, LowerHex}; @@ -229,7 +229,6 @@ impl Try for Result { } } -#[allow(unused)] pub(crate) struct Bytes<'a>(pub &'a [u8]); impl<'a> Debug for Bytes<'a> { -- cgit