From 14a2d1524080593f7795fe14950a3f0ee6e2b409 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Sat, 5 Nov 2022 22:55:04 +0800 Subject: Derive Default for WakerRegistration This simplifies creating arrays of WakerRegistrations --- embassy-sync/src/waitqueue/waker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src/waitqueue') diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs index 64e300eb8..9ce94a089 100644 --- a/embassy-sync/src/waitqueue/waker.rs +++ b/embassy-sync/src/waitqueue/waker.rs @@ -6,7 +6,7 @@ use crate::blocking_mutex::raw::CriticalSectionRawMutex; use crate::blocking_mutex::Mutex; /// Utility struct to register and wake a waker. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct WakerRegistration { waker: Option, } -- cgit From 80972f1e0e6b6d409cc4d86202608c22e5ee3e5a Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 30 Mar 2023 17:55:55 +0200 Subject: executor,sync: add support for turbo-wakers. This is a `core` patch to make wakers 1 word (the task pointer) instead of 2 (task pointer + vtable). It allows having the "waker optimization" we had a while back on `WakerRegistration/AtomicWaker`, but EVERYWHERE, without patching all crates. Advantages: - Less memory usage. - Faster. - `AtomicWaker` can actually use atomics to load/store the waker, No critical section needed. - No `dyn` call, which means `cargo-call-stack` can now see through wakes. Disadvantages: - You have to patch `core`... - Breaks all executors and other things that create wakers, unless they opt in to using the new `from_ptr` API. How to use: - Run this shell script to patch `core`. https://gist.github.com/Dirbaio/c67da7cf318515181539122c9d32b395 - Enable `build-std` - Enable `build-std-features = core/turbowakers` - Enable feature `turbowakers` in `embassy-executor`, `embassy-sync`. - Make sure you have no other crate creating wakers other than `embassy-executor`. These will panic at runtime. Note that the patched `core` is equivalent to the unpached one when the `turbowakers` feature is not enabled, so it should be fine to leave it there. --- embassy-sync/src/waitqueue/atomic_waker.rs | 41 +++++++++++ embassy-sync/src/waitqueue/atomic_waker_turbo.rs | 30 ++++++++ embassy-sync/src/waitqueue/mod.rs | 8 ++- embassy-sync/src/waitqueue/waker.rs | 92 ------------------------ embassy-sync/src/waitqueue/waker_registration.rs | 52 ++++++++++++++ 5 files changed, 129 insertions(+), 94 deletions(-) create mode 100644 embassy-sync/src/waitqueue/atomic_waker.rs create mode 100644 embassy-sync/src/waitqueue/atomic_waker_turbo.rs delete mode 100644 embassy-sync/src/waitqueue/waker.rs create mode 100644 embassy-sync/src/waitqueue/waker_registration.rs (limited to 'embassy-sync/src/waitqueue') diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs new file mode 100644 index 000000000..63fe04a6e --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker.rs @@ -0,0 +1,41 @@ +use core::cell::Cell; +use core::task::Waker; + +use crate::blocking_mutex::raw::CriticalSectionRawMutex; +use crate::blocking_mutex::Mutex; + +/// Utility struct to register and wake a waker. +pub struct AtomicWaker { + waker: Mutex>>, +} + +impl AtomicWaker { + /// Create a new `AtomicWaker`. + pub const fn new() -> Self { + Self { + waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + cell.set(match cell.replace(None) { + Some(w2) if (w2.will_wake(w)) => Some(w2), + _ => Some(w.clone()), + }) + }) + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + if let Some(w) = cell.replace(None) { + w.wake_by_ref(); + cell.set(Some(w)); + } + }) + } +} diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs new file mode 100644 index 000000000..5c6a96ec8 --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs @@ -0,0 +1,30 @@ +use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicPtr, Ordering}; +use core::task::Waker; + +/// Utility struct to register and wake a waker. +pub struct AtomicWaker { + waker: AtomicPtr<()>, +} + +impl AtomicWaker { + /// Create a new `AtomicWaker`. + pub const fn new() -> Self { + Self { + waker: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release); + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) { + unsafe { Waker::from_turbo_ptr(ptr) }.wake(); + } + } +} diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs index 6661a6b61..6b0b0c64e 100644 --- a/embassy-sync/src/waitqueue/mod.rs +++ b/embassy-sync/src/waitqueue/mod.rs @@ -1,7 +1,11 @@ //! Async low-level wait queues -mod waker; -pub use waker::*; +#[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")] +mod atomic_waker; +pub use atomic_waker::*; + +mod waker_registration; +pub use waker_registration::*; mod multi_waker; pub use multi_waker::*; diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs deleted file mode 100644 index 9ce94a089..000000000 --- a/embassy-sync/src/waitqueue/waker.rs +++ /dev/null @@ -1,92 +0,0 @@ -use core::cell::Cell; -use core::mem; -use core::task::Waker; - -use crate::blocking_mutex::raw::CriticalSectionRawMutex; -use crate::blocking_mutex::Mutex; - -/// Utility struct to register and wake a waker. -#[derive(Debug, Default)] -pub struct WakerRegistration { - waker: Option, -} - -impl WakerRegistration { - /// Create a new `WakerRegistration`. - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(w)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - w.wake() - } - } - - /// Returns true if a waker is currently registered - pub fn occupied(&self) -> bool { - self.waker.is_some() - } -} - -/// Utility struct to register and wake a waker. -pub struct AtomicWaker { - waker: Mutex>>, -} - -impl AtomicWaker { - /// Create a new `AtomicWaker`. - pub const fn new() -> Self { - Self { - waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), - } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&self, w: &Waker) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - cell.set(match cell.replace(None) { - Some(w2) if (w2.will_wake(w)) => Some(w2), - _ => Some(w.clone()), - }) - }) - } - - /// Wake the registered waker, if any. - pub fn wake(&self) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - if let Some(w) = cell.replace(None) { - w.wake_by_ref(); - cell.set(Some(w)); - } - }) - } -} diff --git a/embassy-sync/src/waitqueue/waker_registration.rs b/embassy-sync/src/waitqueue/waker_registration.rs new file mode 100644 index 000000000..9b666e7c4 --- /dev/null +++ b/embassy-sync/src/waitqueue/waker_registration.rs @@ -0,0 +1,52 @@ +use core::mem; +use core::task::Waker; + +/// Utility struct to register and wake a waker. +#[derive(Debug, Default)] +pub struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + /// Create a new `WakerRegistration`. + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + w.wake() + } + } + + /// Returns true if a waker is currently registered + pub fn occupied(&self) -> bool { + self.waker.is_some() + } +} -- cgit From 3081ecf301a54f8ed3d0f72350dd21f8ac9e1b18 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 26 May 2023 13:07:32 +0200 Subject: sync: do will_wake check in MultiWakerRegistration. --- embassy-sync/src/waitqueue/multi_waker.rs | 49 +++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 12 deletions(-) (limited to 'embassy-sync/src/waitqueue') diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 325d2cb3a..824d192da 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs @@ -1,33 +1,58 @@ use core::task::Waker; -use super::WakerRegistration; +use heapless::Vec; /// Utility struct to register and wake multiple wakers. pub struct MultiWakerRegistration { - wakers: [WakerRegistration; N], + wakers: Vec, } impl MultiWakerRegistration { /// Create a new empty instance pub const fn new() -> Self { - const WAKER: WakerRegistration = WakerRegistration::new(); - Self { wakers: [WAKER; N] } + Self { wakers: Vec::new() } } /// Register a waker. If the buffer is full the function returns it in the error - pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { - if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { - waker_slot.register(w); - Ok(()) - } else { - Err(w) + pub fn register<'a>(&mut self, w: &'a Waker) { + // If we already have some waker that wakes the same task as `w`, do nothing. + // This avoids cloning wakers, and avoids unnecessary mass-wakes. + for w2 in &self.wakers { + if w.will_wake(w2) { + return; + } + } + + if self.wakers.is_full() { + // All waker slots were full. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.wake(); + } + + if self.wakers.push(w.clone()).is_err() { + // This can't happen unless N=0 + // (Either `wakers` wasn't full, or it was in which case `wake()` empied it) + panic!("tried to push a waker to a zero-length MultiWakerRegistration") } } /// Wake all registered wakers. This clears the buffer pub fn wake(&mut self) { - for waker_slot in self.wakers.iter_mut() { - waker_slot.wake() + // heapless::Vec has no `drain()`, do it unsafely ourselves... + + // First set length to 0, without dropping the contents. + // This is necessary for soundness: if wake() panics and we're using panic=unwind. + // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state. + // (it'll leak wakers, but that's not UB) + let len = self.wakers.len(); + unsafe { self.wakers.set_len(0) } + + for i in 0..len { + // Move a waker out of the vec. + let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() }; + // Wake it by value, which consumes (drops) it. + waker.wake(); } } } -- cgit