From ea5f2c71e063aff9fdc0bde04656f36a3883178d Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 12 Sep 2022 12:05:58 +0200 Subject: sync/signal: wake old waker on overflow instead of panicking. This makes behavior consistent with `WakerRegistration`. It allows canceling `wait` in one task and then calling `wait` in another. If two tasks are `wait`ing concurrently the signal will be received by only one of them, randomly. --- embassy-sync/src/signal.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index f6ebeb9b9..34201d03a 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -79,7 +79,11 @@ impl Signal { Poll::Pending } State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, - State::Waiting(_) => panic!("waker overflow"), + State::Waiting(w) => { + let w = mem::replace(w, cx.waker().clone()); + w.wake(); + Poll::Pending + } State::Signaled(_) => match mem::replace(state, State::None) { State::Signaled(res) => Poll::Ready(res), _ => unreachable!(), -- cgit From 70a3b85acc3b87abab5a66b1a02da033789b5e1a Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Fri, 16 Sep 2022 10:32:43 +0200 Subject: Add .into_inner() and .get_mut() to Mutex --- embassy-sync/src/mutex.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 75a6e8dd3..a792cf070 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -111,6 +111,20 @@ where Ok(MutexGuard { mutex: self }) } + + /// Consumes this mutex, returning the underlying data. + pub fn into_inner(self) -> T + where T: Sized { + self.inner.into_inner() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the Mutex mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } } /// Async mutex guard. -- cgit From 79654510b71290632ee659dd2ae1851f33f48374 Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Fri, 16 Sep 2022 10:44:33 +0200 Subject: Make rustfmt happy --- embassy-sync/src/mutex.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index a792cf070..92101c6b5 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -114,7 +114,9 @@ where /// Consumes this mutex, returning the underlying data. pub fn into_inner(self) -> T - where T: Sized { + where + T: Sized, + { self.inner.into_inner() } -- cgit From 897b72c872183221e088611aa6f30989800afd2b Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 22 Sep 2022 16:28:56 +0200 Subject: Update Rust nightly. Removes feature(generic_associated_types) --- embassy-sync/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 25150e8aa..80bb907a3 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -1,5 +1,5 @@ #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] +#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] #![warn(missing_docs)] -- cgit From a0487380da42a71ab7532e2bc1befd1039c18a78 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 22 Sep 2022 16:42:49 +0200 Subject: Replace futures::future::poll_fn -> core::future::poll_fn. --- embassy-sync/src/mutex.rs | 3 +-- embassy-sync/src/signal.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 92101c6b5..fcf056d36 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -2,11 +2,10 @@ //! //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. use core::cell::{RefCell, UnsafeCell}; +use core::future::poll_fn; use core::ops::{Deref, DerefMut}; use core::task::Poll; -use futures_util::future::poll_fn; - use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex as BlockingMutex; use crate::waitqueue::WakerRegistration; diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index 34201d03a..b4d99513a 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -1,6 +1,6 @@ //! A synchronization primitive for passing the latest value to a task. use core::cell::UnsafeCell; -use core::future::Future; +use core::future::{poll_fn, Future}; use core::mem; use core::task::{Context, Poll, Waker}; @@ -94,7 +94,7 @@ impl Signal { /// Future that completes when this Signal has been signaled. pub fn wait(&self) -> impl Future + '_ { - futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) + poll_fn(move |cx| self.poll_wait(cx)) } /// non-blocking method to check whether this signal has been signaled. -- cgit From ca92302d038e3a8f1446085040228bac7a1d00e6 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Thu, 22 Sep 2022 09:05:40 +0300 Subject: Parameterize Signal with RawMutex --- embassy-sync/src/signal.rs | 69 +++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 28 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index b4d99513a..7c38637c3 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -1,9 +1,12 @@ //! A synchronization primitive for passing the latest value to a task. -use core::cell::UnsafeCell; +use core::cell::Cell; use core::future::{poll_fn, Future}; use core::mem; use core::task::{Context, Poll, Waker}; +use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; +use crate::blocking_mutex::Mutex; + /// Single-slot signaling primitive. /// /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except @@ -28,8 +31,11 @@ use core::task::{Context, Poll, Waker}; /// /// static SOME_SIGNAL: Signal = Signal::new(); /// ``` -pub struct Signal { - state: UnsafeCell>, +pub struct Signal +where + R: RawMutex, +{ + state: Mutex>>, } enum State { @@ -38,24 +44,27 @@ enum State { Signaled(T), } -unsafe impl Send for Signal {} -unsafe impl Sync for Signal {} - -impl Signal { +impl Signal +where + R: RawMutex, +{ /// Create a new `Signal`. pub const fn new() -> Self { Self { - state: UnsafeCell::new(State::None), + state: Mutex::new(Cell::new(State::None)), } } } -impl Signal { +impl Signal +where + R: RawMutex, +{ /// Mark this Signal as signaled. pub fn signal(&self, val: T) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { + self.state.lock(|cell| { + let state = cell.replace(State::Signaled(val)); + if let State::Waiting(waker) = state { waker.wake(); } }) @@ -63,31 +72,27 @@ impl Signal { /// Remove the queued value in this `Signal`, if any. pub fn reset(&self) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - *state = State::None - }) + self.state.lock(|cell| cell.set(State::None)); } - /// Manually poll the Signal future. - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); + fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { + self.state.lock(|cell| { + let state = cell.replace(State::None); match state { State::None => { - *state = State::Waiting(cx.waker().clone()); + cell.set(State::Waiting(cx.waker().clone())); + Poll::Pending + } + State::Waiting(w) if w.will_wake(cx.waker()) => { + cell.set(State::Waiting(w)); Poll::Pending } - State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, State::Waiting(w) => { - let w = mem::replace(w, cx.waker().clone()); + cell.set(State::Waiting(cx.waker().clone())); w.wake(); Poll::Pending } - State::Signaled(_) => match mem::replace(state, State::None) { - State::Signaled(res) => Poll::Ready(res), - _ => unreachable!(), - }, + State::Signaled(res) => Poll::Ready(res), } }) } @@ -99,6 +104,14 @@ impl Signal { /// non-blocking method to check whether this signal has been signaled. pub fn signaled(&self) -> bool { - critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) + self.state.lock(|cell| { + let state = cell.replace(State::None); + + let res = matches!(state, State::Signaled(_)); + + cell.set(state); + + res + }) } } -- cgit From 85366661489c09fa8dec1375e9b9beee522e5e9f Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Sat, 24 Sep 2022 12:08:46 +0300 Subject: Remove default, reorder generic params --- embassy-sync/src/signal.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index 7c38637c3..8cb832a2b 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -1,7 +1,6 @@ //! A synchronization primitive for passing the latest value to a task. use core::cell::Cell; use core::future::{poll_fn, Future}; -use core::mem; use core::task::{Context, Poll, Waker}; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; -- cgit From c5ce02b30e488aade19f9f859425aa127d085b92 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Sat, 24 Sep 2022 12:08:46 +0300 Subject: Remove default, reorder generic params --- embassy-sync/src/signal.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index 8cb832a2b..c3c10a8af 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -3,7 +3,7 @@ use core::cell::Cell; use core::future::{poll_fn, Future}; use core::task::{Context, Poll, Waker}; -use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; +use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; /// Single-slot signaling primitive. @@ -22,19 +22,20 @@ use crate::blocking_mutex::Mutex; /// /// ``` /// use embassy_sync::signal::Signal; +/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; /// /// enum SomeCommand { /// On, /// Off, /// } /// -/// static SOME_SIGNAL: Signal = Signal::new(); +/// static SOME_SIGNAL: Signal = Signal::new(); /// ``` -pub struct Signal +pub struct Signal where - R: RawMutex, + M: RawMutex, { - state: Mutex>>, + state: Mutex>>, } enum State { @@ -43,9 +44,9 @@ enum State { Signaled(T), } -impl Signal +impl Signal where - R: RawMutex, + M: RawMutex, { /// Create a new `Signal`. pub const fn new() -> Self { @@ -55,9 +56,9 @@ where } } -impl Signal +impl Signal where - R: RawMutex, + M: RawMutex, { /// Mark this Signal as signaled. pub fn signal(&self, val: T) { -- cgit From f4ebc36b638a081b4a8b68ae72c4cca5199c4c4c Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 29 Sep 2022 14:24:42 +0200 Subject: Futures in pub & sub are now awaited instead of returned for better user compiler diagnostics. Added functions for reading how many messages are available --- embassy-sync/src/pubsub/mod.rs | 73 +++++++++++++++++++++++++++++++++++ embassy-sync/src/pubsub/publisher.rs | 13 ++++++- embassy-sync/src/pubsub/subscriber.rs | 11 ++++-- 3 files changed, 92 insertions(+), 5 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..335d7e33e 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -192,6 +192,10 @@ impl u64 { + self.inner.lock(|s| s.borrow().next_message_id - next_message_id) + } + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -217,6 +221,13 @@ impl usize { + self.inner.lock(|s| { + let s = s.borrow(); + s.queue.capacity() - s.queue.len() + }) + } + fn unregister_subscriber(&self, subscriber_next_message_id: u64) { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -388,6 +399,10 @@ pub trait PubSubBehavior { /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; + /// Get the amount of messages that are between the given the next_message_id and the most recent message. + /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. + fn available(&self, next_message_id: u64) -> u64; + /// Try to publish a message to the queue. /// /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. @@ -396,6 +411,9 @@ pub trait PubSubBehavior { /// Publish a message immediately fn publish_immediate(&self, message: T); + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + fn space(&self) -> usize; + /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -539,4 +557,59 @@ mod tests { drop(sub0); } + + #[futures_test::test] + async fn correct_available() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + assert_eq!(sub0.available(), 0); + assert_eq!(sub1.available(), 0); + + pub0.publish(42).await; + + assert_eq!(sub0.available(), 1); + assert_eq!(sub1.available(), 1); + + sub1.next_message().await; + + assert_eq!(sub1.available(), 0); + + pub0.publish(42).await; + + assert_eq!(sub0.available(), 2); + assert_eq!(sub1.available(), 1); + } + + #[futures_test::test] + async fn correct_space() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + assert_eq!(pub0.space(), 4); + + pub0.publish(42).await; + + assert_eq!(pub0.space(), 3); + + pub0.publish(42).await; + + assert_eq!(pub0.space(), 2); + + sub0.next_message().await; + sub0.next_message().await; + + assert_eq!(pub0.space(), 2); + + sub1.next_message().await; + assert_eq!(pub0.space(), 3); + sub1.next_message().await; + assert_eq!(pub0.space(), 4); + } } diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..484f1dbfd 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -31,17 +31,26 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { } /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { + pub async fn publish<'s>(&'s self, message: T) { PublisherWaitFuture { message: Some(message), publisher: self, } + .await } /// Publish a message if there is space in the message queue pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.publish_with_context(message, None) } + + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// + /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. + /// So checking doesn't give any guarantees.* + pub fn space(&self) -> usize { + self.channel.space() + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { @@ -158,7 +167,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, publisher: &'s Pub<'a, PSB, T>, diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..8a8e9144b 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -28,8 +28,8 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { - SubscriberWaitFuture { subscriber: self } + pub async fn next_message(&mut self) -> WaitResult { + SubscriberWaitFuture { subscriber: self }.await } /// Wait for a published message (ignoring lag results) @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } } } + + /// The amount of messages this subscriber hasn't received yet + pub fn available(&self) -> u64 { + self.channel.available(self.next_message_id) + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { @@ -135,7 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { subscriber: &'s mut Sub<'a, PSB, T>, } -- cgit From 874384826d4a6f9c9a9c8d3abf41f99a662f58fb Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 29 Sep 2022 15:15:10 +0200 Subject: Went back to named futures but now with must_use --- embassy-sync/src/pubsub/mod.rs | 2 +- embassy-sync/src/pubsub/publisher.rs | 6 +++--- embassy-sync/src/pubsub/subscriber.rs | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 335d7e33e..faaf99dc6 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -562,7 +562,7 @@ mod tests { async fn correct_available() { let channel = PubSubChannel::::new(); - let mut sub0 = channel.subscriber().unwrap(); + let sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); let pub0 = channel.publisher().unwrap(); diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 484f1dbfd..faa67d947 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -31,12 +31,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { } /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub async fn publish<'s>(&'s self, message: T) { + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { PublisherWaitFuture { message: Some(message), publisher: self, } - .await } /// Publish a message if there is space in the message queue @@ -167,7 +166,8 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the publisher wait action -struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, publisher: &'s Pub<'a, PSB, T>, diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index 8a8e9144b..f420a75f0 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -28,8 +28,8 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } /// Wait for a published message - pub async fn next_message(&mut self) -> WaitResult { - SubscriberWaitFuture { subscriber: self }.await + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { + SubscriberWaitFuture { subscriber: self } } /// Wait for a published message (ignoring lag results) @@ -140,7 +140,8 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the subscriber wait action -struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { subscriber: &'s mut Sub<'a, PSB, T>, } -- cgit From 530182d6683531f7c259448e6c54c866f35837c7 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Wed, 5 Oct 2022 15:15:03 +0200 Subject: Forgot to add space function to immediate publisher --- embassy-sync/src/pubsub/publisher.rs | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index faa67d947..e1edc9eb9 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -123,6 +123,14 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.publish_with_context(message, None) } + + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// + /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. + /// So checking doesn't give any guarantees.* + pub fn space(&self) -> usize { + self.channel.space() + } } /// An immediate publisher that holds a dynamic reference to the channel -- cgit From eeb072d9cbc457892c58670ca6fefacf8c80a32b Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 26 Oct 2022 16:47:29 +0200 Subject: Update Rust nightly. --- embassy-sync/src/pipe.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 7d64b648e..cd577f34f 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -361,7 +361,7 @@ mod io_impls { } impl embedded_io::asynch::Read for Pipe { - type ReadFuture<'a> = impl Future> + type ReadFuture<'a> = impl Future> + 'a where Self: 'a; @@ -371,7 +371,7 @@ mod io_impls { } impl embedded_io::asynch::Write for Pipe { - type WriteFuture<'a> = impl Future> + type WriteFuture<'a> = impl Future> + 'a where Self: 'a; @@ -379,7 +379,7 @@ mod io_impls { Pipe::write(self, buf).map(Ok) } - type FlushFuture<'a> = impl Future> + type FlushFuture<'a> = impl Future> + 'a where Self: 'a; @@ -393,7 +393,7 @@ mod io_impls { } impl embedded_io::asynch::Read for &Pipe { - type ReadFuture<'a> = impl Future> + type ReadFuture<'a> = impl Future> + 'a where Self: 'a; @@ -403,7 +403,7 @@ mod io_impls { } impl embedded_io::asynch::Write for &Pipe { - type WriteFuture<'a> = impl Future> + type WriteFuture<'a> = impl Future> + 'a where Self: 'a; @@ -411,7 +411,7 @@ mod io_impls { Pipe::write(self, buf).map(Ok) } - type FlushFuture<'a> = impl Future> + type FlushFuture<'a> = impl Future> + 'a where Self: 'a; @@ -425,7 +425,7 @@ mod io_impls { } impl embedded_io::asynch::Read for Reader<'_, M, N> { - type ReadFuture<'a> = impl Future> + type ReadFuture<'a> = impl Future> + 'a where Self: 'a; @@ -439,7 +439,7 @@ mod io_impls { } impl embedded_io::asynch::Write for Writer<'_, M, N> { - type WriteFuture<'a> = impl Future> + type WriteFuture<'a> = impl Future> + 'a where Self: 'a; @@ -447,7 +447,7 @@ mod io_impls { Writer::write(self, buf).map(Ok) } - type FlushFuture<'a> = impl Future> + type FlushFuture<'a> = impl Future> + 'a where Self: 'a; -- cgit From 14a2d1524080593f7795fe14950a3f0ee6e2b409 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Sat, 5 Nov 2022 22:55:04 +0800 Subject: Derive Default for WakerRegistration This simplifies creating arrays of WakerRegistrations --- embassy-sync/src/waitqueue/waker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs index 64e300eb8..9ce94a089 100644 --- a/embassy-sync/src/waitqueue/waker.rs +++ b/embassy-sync/src/waitqueue/waker.rs @@ -6,7 +6,7 @@ use crate::blocking_mutex::raw::CriticalSectionRawMutex; use crate::blocking_mutex::Mutex; /// Utility struct to register and wake a waker. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct WakerRegistration { waker: Option, } -- cgit From 536b6a2de5c5342a27dc1095f5642792fb6d860b Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Tue, 22 Nov 2022 21:55:10 +0800 Subject: sync/signal: Implement Default for Signal --- embassy-sync/src/signal.rs | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index c3c10a8af..bea67d8be 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs @@ -56,6 +56,15 @@ where } } +impl Default for Signal +where + M: RawMutex, +{ + fn default() -> Self { + Self::new() + } +} + impl Signal where M: RawMutex, -- cgit From d438d1b685acb41b29d01c64bc422836760cb3de Mon Sep 17 00:00:00 2001 From: Gabriel Smith Date: Sun, 27 Nov 2022 16:24:20 -0500 Subject: sync: Fix nightly feature compilation after upgrade to embedded-io 0.4.0 --- embassy-sync/src/lib.rs | 3 +- embassy-sync/src/pipe.rs | 74 ++++++++++++------------------------------------ 2 files changed, 20 insertions(+), 57 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 80bb907a3..f9435ecff 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] +#![cfg_attr(feature = "nightly", feature(async_fn_in_trait, impl_trait_projections))] +#![cfg_attr(feature = "nightly", allow(incomplete_features))] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] #![warn(missing_docs)] diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index cd577f34f..905686acd 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -352,8 +352,6 @@ where mod io_impls { use core::convert::Infallible; - use futures_util::FutureExt; - use super::*; impl embedded_io::Io for Pipe { @@ -361,30 +359,18 @@ mod io_impls { } impl embedded_io::asynch::Read for Pipe { - type ReadFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Pipe::read(self, buf).map(Ok) + async fn read(&mut self, buf: &mut [u8]) -> Result { + Ok(Pipe::read(self, buf).await) } } impl embedded_io::asynch::Write for Pipe { - type WriteFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Pipe::write(self, buf).map(Ok) + async fn write(&mut self, buf: &[u8]) -> Result { + Ok(Pipe::write(self, buf).await) } - type FlushFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) + async fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) } } @@ -393,30 +379,18 @@ mod io_impls { } impl embedded_io::asynch::Read for &Pipe { - type ReadFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Pipe::read(self, buf).map(Ok) + async fn read(&mut self, buf: &mut [u8]) -> Result { + Ok(Pipe::read(self, buf).await) } } impl embedded_io::asynch::Write for &Pipe { - type WriteFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Pipe::write(self, buf).map(Ok) + async fn write(&mut self, buf: &[u8]) -> Result { + Ok(Pipe::write(self, buf).await) } - type FlushFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) + async fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) } } @@ -425,12 +399,8 @@ mod io_impls { } impl embedded_io::asynch::Read for Reader<'_, M, N> { - type ReadFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Reader::read(self, buf).map(Ok) + async fn read(&mut self, buf: &mut [u8]) -> Result { + Ok(Reader::read(self, buf).await) } } @@ -439,20 +409,12 @@ mod io_impls { } impl embedded_io::asynch::Write for Writer<'_, M, N> { - type WriteFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Writer::write(self, buf).map(Ok) + async fn write(&mut self, buf: &[u8]) -> Result { + Ok(Writer::write(self, buf).await) } - type FlushFuture<'a> = impl Future> + 'a - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) + async fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) } } } -- cgit From 7be4337de96de9948632bdc2fc5067d0c4a76b33 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Fri, 24 Feb 2023 13:01:41 -0600 Subject: Add `#[must_use]` to all futures --- embassy-sync/src/channel.rs | 4 ++++ embassy-sync/src/pipe.rs | 2 ++ 2 files changed, 6 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 76f42d0e7..77352874d 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -181,6 +181,7 @@ where } /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct RecvFuture<'ch, M, T, const N: usize> where M: RawMutex, @@ -203,6 +204,7 @@ where } /// Future returned by [`DynamicReceiver::recv`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct DynamicRecvFuture<'ch, T> { channel: &'ch dyn DynamicChannel, } @@ -219,6 +221,7 @@ impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { } /// Future returned by [`Channel::send`] and [`Sender::send`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SendFuture<'ch, M, T, const N: usize> where M: RawMutex, @@ -250,6 +253,7 @@ where impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} /// Future returned by [`DynamicSender::send`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct DynamicSendFuture<'ch, T> { channel: &'ch dyn DynamicChannel, message: Option, diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 905686acd..1977005fb 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -48,6 +48,7 @@ where } /// Future returned by [`Pipe::write`] and [`Writer::write`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct WriteFuture<'p, M, const N: usize> where M: RawMutex, @@ -110,6 +111,7 @@ where } /// Future returned by [`Pipe::read`] and [`Reader::read`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ReadFuture<'p, M, const N: usize> where M: RawMutex, -- cgit From 472df3fad6dde82b15e8f4716291add8b1cae1ae Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Wed, 15 Mar 2023 16:45:18 +0100 Subject: fix(pubsub): Pop messages which count is 0 after unsubscribe --- embassy-sync/src/pubsub/mod.rs | 47 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index faaf99dc6..5989e86ec 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -371,6 +371,20 @@ impl PubSubSta .iter_mut() .skip(current_message_index) .for_each(|(_, counter)| *counter -= 1); + + let mut wake_publishers = false; + while let Some((_, count)) = self.queue.front() { + if *count == 0 { + self.queue.pop_front().unwrap(); + wake_publishers = true; + } else { + break; + } + } + + if wake_publishers { + self.publisher_wakers.wake(); + } } } @@ -612,4 +626,37 @@ mod tests { sub1.next_message().await; assert_eq!(pub0.space(), 4); } + + #[futures_test::test] + async fn empty_channel_when_last_subscriber_is_dropped() { + let channel = PubSubChannel::::new(); + + let pub0 = channel.publisher().unwrap(); + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + + assert_eq!(4, pub0.space()); + + pub0.publish(1).await; + pub0.publish(2).await; + + assert_eq!(2, channel.space()); + + assert_eq!(1, sub0.try_next_message_pure().unwrap()); + assert_eq!(2, sub0.try_next_message_pure().unwrap()); + + assert_eq!(2, channel.space()); + + drop(sub0); + + assert_eq!(2, channel.space()); + + assert_eq!(1, sub1.try_next_message_pure().unwrap()); + + assert_eq!(3, channel.space()); + + drop(sub1); + + assert_eq!(4, channel.space()); + } } -- cgit From ce7bd6955f85baf53b3e1091974be70c6cf88b49 Mon Sep 17 00:00:00 2001 From: Rasmus Melchior Jacobsen Date: Tue, 21 Mar 2023 13:25:49 +0100 Subject: perf(pubsub): Skip clone on last message --- embassy-sync/src/pubsub/mod.rs | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 5989e86ec..59e701c58 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -322,12 +322,15 @@ impl PubSubSta // We're reading this item, so decrement the counter queue_item.1 -= 1; - let message = queue_item.0.clone(); - if current_message_index == 0 && queue_item.1 == 0 { - self.queue.pop_front(); + let message = if current_message_index == 0 && queue_item.1 == 0 { + let (message, _) = self.queue.pop_front().unwrap(); self.publisher_wakers.wake(); - } + // Return pop'd message without clone + message + } else { + queue_item.0.clone() + }; Some(WaitResult::Message(message)) } @@ -659,4 +662,25 @@ mod tests { assert_eq!(4, channel.space()); } + + struct CloneCallCounter(usize); + + impl Clone for CloneCallCounter { + fn clone(&self) -> Self { + Self(self.0 + 1) + } + } + + #[futures_test::test] + async fn skip_clone_for_last_message() { + let channel = PubSubChannel::::new(); + let pub0 = channel.publisher().unwrap(); + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + + pub0.publish(CloneCallCounter(0)).await; + + assert_eq!(1, sub0.try_next_message_pure().unwrap().0); + assert_eq!(0, sub1.try_next_message_pure().unwrap().0); + } } -- cgit From 2c45b5c5193adc27d865cab27e1ac000aaae7899 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sun, 26 Mar 2023 23:32:12 +0200 Subject: sync/pipe: update to clarify docs that it is byte-oriented. There was some language copypasted from Channel talking about "messages" or "values", that is not really accurate with Pipe. --- embassy-sync/src/pipe.rs | 71 +++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 28 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 1977005fb..ee27cdec8 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -32,16 +32,16 @@ impl<'p, M, const N: usize> Writer<'p, M, N> where M: RawMutex, { - /// Writes a value. + /// Write some bytes to the pipe. /// /// See [`Pipe::write()`] pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { self.pipe.write(buf) } - /// Attempt to immediately write a message. + /// Attempt to immediately write some bytes to the pipe. /// - /// See [`Pipe::write()`] + /// See [`Pipe::try_write()`] pub fn try_write(&self, buf: &[u8]) -> Result { self.pipe.try_write(buf) } @@ -95,16 +95,16 @@ impl<'p, M, const N: usize> Reader<'p, M, N> where M: RawMutex, { - /// Reads a value. + /// Read some bytes from the pipe. /// /// See [`Pipe::read()`] pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { self.pipe.read(buf) } - /// Attempt to immediately read a message. + /// Attempt to immediately read some bytes from the pipe. /// - /// See [`Pipe::read()`] + /// See [`Pipe::try_read()`] pub fn try_read(&self, buf: &mut [u8]) -> Result { self.pipe.try_read(buf) } @@ -221,12 +221,11 @@ impl PipeState { } } -/// A bounded pipe for communicating between asynchronous tasks +/// A bounded byte-oriented pipe for communicating between asynchronous tasks /// with backpressure. /// -/// The pipe will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `write` new messages will wait until a message is -/// read from the pipe. +/// The pipe will buffer up to the provided number of bytes. Once the +/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. /// /// All data written will become available in the same order as it was written. pub struct Pipe @@ -277,40 +276,56 @@ where Reader { pipe: self } } - /// Write a value, waiting until there is capacity. + /// Write some bytes to the pipe. + /// + /// This method writes a nonzero amount of bytes from `buf` into the pipe, and + /// returns the amount of bytes written. + /// + /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full, + /// this method will wait until it is. See [`try_write`](Self::try_write) for a variant that + /// returns an error instead of waiting. /// - /// Writeing completes when the value has been pushed to the pipe's queue. - /// This doesn't mean the value has been read yet. + /// It is not guaranteed that all bytes in the buffer are written, even if there's enough + /// free space in the pipe buffer for all. In other words, it is possible for `write` to return + /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave + /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like + /// `write_all` from the `embedded-io` crate. pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { WriteFuture { pipe: self, buf } } - /// Attempt to immediately write a message. + /// Attempt to immediately write some bytes to the pipe. /// - /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's - /// buffer is full, instead of waiting. - /// - /// # Errors - /// - /// If the pipe capacity has been reached, i.e., the pipe has `n` - /// buffered values where `n` is the argument passed to [`Pipe`], then an - /// error is returned. + /// This method will either write a nonzero amount of bytes to the pipe immediately, + /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant + /// that waits instead of returning an error. pub fn try_write(&self, buf: &[u8]) -> Result { self.lock(|c| c.try_write(buf)) } - /// Receive the next value. + /// Read some bytes from the pipe. + /// + /// This method reads a nonzero amount of bytes from the pipe into `buf` and + /// returns the amount of bytes read. + /// + /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty, + /// this method will wait until it is. See [`try_read`](Self::try_read) for a variant that + /// returns an error instead of waiting. /// - /// If there are no messages in the pipe's buffer, this method will - /// wait until a message is written. + /// It is not guaranteed that all bytes in the buffer are read, even if there's enough + /// space in `buf` for all. In other words, it is possible for `read` to return + /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes + /// in the pipe buffer. You should always `read` in a loop, or use helpers like + /// `read_exact` from the `embedded-io` crate. pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { ReadFuture { pipe: self, buf } } - /// Attempt to immediately read a message. + /// Attempt to immediately read some bytes from the pipe. /// - /// This method will either read a message from the pipe immediately or return an error - /// if the pipe is empty. + /// This method will either read a nonzero amount of bytes from the pipe immediately, + /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant + /// that waits instead of returning an error. pub fn try_read(&self, buf: &mut [u8]) -> Result { self.lock(|c| c.try_read(buf)) } -- cgit From 80972f1e0e6b6d409cc4d86202608c22e5ee3e5a Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 30 Mar 2023 17:55:55 +0200 Subject: executor,sync: add support for turbo-wakers. This is a `core` patch to make wakers 1 word (the task pointer) instead of 2 (task pointer + vtable). It allows having the "waker optimization" we had a while back on `WakerRegistration/AtomicWaker`, but EVERYWHERE, without patching all crates. Advantages: - Less memory usage. - Faster. - `AtomicWaker` can actually use atomics to load/store the waker, No critical section needed. - No `dyn` call, which means `cargo-call-stack` can now see through wakes. Disadvantages: - You have to patch `core`... - Breaks all executors and other things that create wakers, unless they opt in to using the new `from_ptr` API. How to use: - Run this shell script to patch `core`. https://gist.github.com/Dirbaio/c67da7cf318515181539122c9d32b395 - Enable `build-std` - Enable `build-std-features = core/turbowakers` - Enable feature `turbowakers` in `embassy-executor`, `embassy-sync`. - Make sure you have no other crate creating wakers other than `embassy-executor`. These will panic at runtime. Note that the patched `core` is equivalent to the unpached one when the `turbowakers` feature is not enabled, so it should be fine to leave it there. --- embassy-sync/src/waitqueue/atomic_waker.rs | 41 +++++++++++ embassy-sync/src/waitqueue/atomic_waker_turbo.rs | 30 ++++++++ embassy-sync/src/waitqueue/mod.rs | 8 ++- embassy-sync/src/waitqueue/waker.rs | 92 ------------------------ embassy-sync/src/waitqueue/waker_registration.rs | 52 ++++++++++++++ 5 files changed, 129 insertions(+), 94 deletions(-) create mode 100644 embassy-sync/src/waitqueue/atomic_waker.rs create mode 100644 embassy-sync/src/waitqueue/atomic_waker_turbo.rs delete mode 100644 embassy-sync/src/waitqueue/waker.rs create mode 100644 embassy-sync/src/waitqueue/waker_registration.rs (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs new file mode 100644 index 000000000..63fe04a6e --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker.rs @@ -0,0 +1,41 @@ +use core::cell::Cell; +use core::task::Waker; + +use crate::blocking_mutex::raw::CriticalSectionRawMutex; +use crate::blocking_mutex::Mutex; + +/// Utility struct to register and wake a waker. +pub struct AtomicWaker { + waker: Mutex>>, +} + +impl AtomicWaker { + /// Create a new `AtomicWaker`. + pub const fn new() -> Self { + Self { + waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + cell.set(match cell.replace(None) { + Some(w2) if (w2.will_wake(w)) => Some(w2), + _ => Some(w.clone()), + }) + }) + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + if let Some(w) = cell.replace(None) { + w.wake_by_ref(); + cell.set(Some(w)); + } + }) + } +} diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs new file mode 100644 index 000000000..5c6a96ec8 --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs @@ -0,0 +1,30 @@ +use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicPtr, Ordering}; +use core::task::Waker; + +/// Utility struct to register and wake a waker. +pub struct AtomicWaker { + waker: AtomicPtr<()>, +} + +impl AtomicWaker { + /// Create a new `AtomicWaker`. + pub const fn new() -> Self { + Self { + waker: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release); + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) { + unsafe { Waker::from_turbo_ptr(ptr) }.wake(); + } + } +} diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs index 6661a6b61..6b0b0c64e 100644 --- a/embassy-sync/src/waitqueue/mod.rs +++ b/embassy-sync/src/waitqueue/mod.rs @@ -1,7 +1,11 @@ //! Async low-level wait queues -mod waker; -pub use waker::*; +#[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")] +mod atomic_waker; +pub use atomic_waker::*; + +mod waker_registration; +pub use waker_registration::*; mod multi_waker; pub use multi_waker::*; diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs deleted file mode 100644 index 9ce94a089..000000000 --- a/embassy-sync/src/waitqueue/waker.rs +++ /dev/null @@ -1,92 +0,0 @@ -use core::cell::Cell; -use core::mem; -use core::task::Waker; - -use crate::blocking_mutex::raw::CriticalSectionRawMutex; -use crate::blocking_mutex::Mutex; - -/// Utility struct to register and wake a waker. -#[derive(Debug, Default)] -pub struct WakerRegistration { - waker: Option, -} - -impl WakerRegistration { - /// Create a new `WakerRegistration`. - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(w)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - w.wake() - } - } - - /// Returns true if a waker is currently registered - pub fn occupied(&self) -> bool { - self.waker.is_some() - } -} - -/// Utility struct to register and wake a waker. -pub struct AtomicWaker { - waker: Mutex>>, -} - -impl AtomicWaker { - /// Create a new `AtomicWaker`. - pub const fn new() -> Self { - Self { - waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), - } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&self, w: &Waker) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - cell.set(match cell.replace(None) { - Some(w2) if (w2.will_wake(w)) => Some(w2), - _ => Some(w.clone()), - }) - }) - } - - /// Wake the registered waker, if any. - pub fn wake(&self) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - if let Some(w) = cell.replace(None) { - w.wake_by_ref(); - cell.set(Some(w)); - } - }) - } -} diff --git a/embassy-sync/src/waitqueue/waker_registration.rs b/embassy-sync/src/waitqueue/waker_registration.rs new file mode 100644 index 000000000..9b666e7c4 --- /dev/null +++ b/embassy-sync/src/waitqueue/waker_registration.rs @@ -0,0 +1,52 @@ +use core::mem; +use core::task::Waker; + +/// Utility struct to register and wake a waker. +#[derive(Debug, Default)] +pub struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + /// Create a new `WakerRegistration`. + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + w.wake() + } + } + + /// Returns true if a waker is currently registered + pub fn occupied(&self) -> bool { + self.waker.is_some() + } +} -- cgit From 9f7392474b6a6e3a2f20e6419743afb196456c66 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 19 May 2023 17:12:29 +0200 Subject: Update Rust nightly. --- embassy-sync/src/lib.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index f9435ecff..53d95d081 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -1,6 +1,5 @@ #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] #![cfg_attr(feature = "nightly", feature(async_fn_in_trait, impl_trait_projections))] -#![cfg_attr(feature = "nightly", allow(incomplete_features))] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] #![warn(missing_docs)] -- cgit From 3081ecf301a54f8ed3d0f72350dd21f8ac9e1b18 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 26 May 2023 13:07:32 +0200 Subject: sync: do will_wake check in MultiWakerRegistration. --- embassy-sync/src/pubsub/mod.rs | 34 ++------------------- embassy-sync/src/waitqueue/multi_waker.rs | 49 +++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 43 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 59e701c58..6afd54af5 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -4,7 +4,7 @@ use core::cell::RefCell; use core::fmt::Debug; -use core::task::{Context, Poll, Waker}; +use core::task::{Context, Poll}; use heapless::Deque; @@ -179,7 +179,7 @@ impl { if let Some(cx) = cx { - s.register_subscriber_waker(cx.waker()); + s.subscriber_wakers.register(cx.waker()); } Poll::Pending } @@ -206,7 +206,7 @@ impl { if let Some(cx) = cx { - s.register_publisher_waker(cx.waker()); + s.publisher_wakers.register(cx.waker()); } Err(message) } @@ -335,34 +335,6 @@ impl PubSubSta Some(WaitResult::Message(message)) } - fn register_subscriber_waker(&mut self, waker: &Waker) { - match self.subscriber_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a subscriber that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.subscriber_wakers.wake(); - self.subscriber_wakers.register(waker).unwrap(); - } - } - } - - fn register_publisher_waker(&mut self, waker: &Waker) { - match self.publisher_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a publisher that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.publisher_wakers.wake(); - self.publisher_wakers.register(waker).unwrap(); - } - } - } - fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { self.subscriber_count -= 1; diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 325d2cb3a..824d192da 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs @@ -1,33 +1,58 @@ use core::task::Waker; -use super::WakerRegistration; +use heapless::Vec; /// Utility struct to register and wake multiple wakers. pub struct MultiWakerRegistration { - wakers: [WakerRegistration; N], + wakers: Vec, } impl MultiWakerRegistration { /// Create a new empty instance pub const fn new() -> Self { - const WAKER: WakerRegistration = WakerRegistration::new(); - Self { wakers: [WAKER; N] } + Self { wakers: Vec::new() } } /// Register a waker. If the buffer is full the function returns it in the error - pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { - if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { - waker_slot.register(w); - Ok(()) - } else { - Err(w) + pub fn register<'a>(&mut self, w: &'a Waker) { + // If we already have some waker that wakes the same task as `w`, do nothing. + // This avoids cloning wakers, and avoids unnecessary mass-wakes. + for w2 in &self.wakers { + if w.will_wake(w2) { + return; + } + } + + if self.wakers.is_full() { + // All waker slots were full. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.wake(); + } + + if self.wakers.push(w.clone()).is_err() { + // This can't happen unless N=0 + // (Either `wakers` wasn't full, or it was in which case `wake()` empied it) + panic!("tried to push a waker to a zero-length MultiWakerRegistration") } } /// Wake all registered wakers. This clears the buffer pub fn wake(&mut self) { - for waker_slot in self.wakers.iter_mut() { - waker_slot.wake() + // heapless::Vec has no `drain()`, do it unsafely ourselves... + + // First set length to 0, without dropping the contents. + // This is necessary for soundness: if wake() panics and we're using panic=unwind. + // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state. + // (it'll leak wakers, but that's not UB) + let len = self.wakers.len(); + unsafe { self.wakers.set_len(0) } + + for i in 0..len { + // Move a waker out of the vec. + let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() }; + // Wake it by value, which consumes (drops) it. + waker.wake(); } } } -- cgit From 24c4ea71b11ce6c07d92dd6876fec6a9b0f6ed10 Mon Sep 17 00:00:00 2001 From: ZhangYong Date: Sat, 3 Jun 2023 17:44:25 +0800 Subject: sync/pipe: write all user data to pipe sync/pipe: add write_all function --- embassy-sync/src/pipe.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index ee27cdec8..db6ebb08b 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -294,6 +294,16 @@ where WriteFuture { pipe: self, buf } } + /// Write all bytes to the pipe. + /// + /// This method writes all bytes from `buf` into the pipe + pub async fn write_all(&self, mut buf: &[u8]) { + while !buf.is_empty() { + let n = self.write(buf).await; + buf = &buf[n..]; + } + } + /// Attempt to immediately write some bytes to the pipe. /// /// This method will either write a nonzero amount of bytes to the pipe immediately, -- cgit From 3465452a93719cdb46a2af4b6d893da3aacc0a15 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 9 Jun 2023 03:33:39 +0200 Subject: fmt: remove unused defmt::timestamp! --- embassy-sync/src/fmt.rs | 3 --- 1 file changed, 3 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index f8bb0a035..066970813 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs @@ -195,9 +195,6 @@ macro_rules! unwrap { } } -#[cfg(feature = "defmt-timestamp-uptime")] -defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } - #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct NoneError; -- cgit From f5ca687e9bbeb81ce24f56db6cd7defbcb5c2db2 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 27 Jun 2023 23:49:12 +0200 Subject: sync/pipe: fix doc typos. --- embassy-sync/src/pipe.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index db6ebb08b..13bf4ef01 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -282,7 +282,7 @@ where /// returns the amount of bytes written. /// /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full, - /// this method will wait until it is. See [`try_write`](Self::try_write) for a variant that + /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that /// returns an error instead of waiting. /// /// It is not guaranteed that all bytes in the buffer are written, even if there's enough @@ -319,7 +319,7 @@ where /// returns the amount of bytes read. /// /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty, - /// this method will wait until it is. See [`try_read`](Self::try_read) for a variant that + /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that /// returns an error instead of waiting. /// /// It is not guaranteed that all bytes in the buffer are read, even if there's enough -- cgit