aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/waitqueue
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src/waitqueue')
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker.rs41
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker_turbo.rs30
-rw-r--r--embassy-sync/src/waitqueue/mod.rs8
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs49
-rw-r--r--embassy-sync/src/waitqueue/waker_registration.rs (renamed from embassy-sync/src/waitqueue/waker.rs)42
5 files changed, 115 insertions, 55 deletions
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs
new file mode 100644
index 000000000..63fe04a6e
--- /dev/null
+++ b/embassy-sync/src/waitqueue/atomic_waker.rs
@@ -0,0 +1,41 @@
1use core::cell::Cell;
2use core::task::Waker;
3
4use crate::blocking_mutex::raw::CriticalSectionRawMutex;
5use crate::blocking_mutex::Mutex;
6
7/// Utility struct to register and wake a waker.
8pub struct AtomicWaker {
9 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
10}
11
12impl AtomicWaker {
13 /// Create a new `AtomicWaker`.
14 pub const fn new() -> Self {
15 Self {
16 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
17 }
18 }
19
20 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&self, w: &Waker) {
22 critical_section::with(|cs| {
23 let cell = self.waker.borrow(cs);
24 cell.set(match cell.replace(None) {
25 Some(w2) if (w2.will_wake(w)) => Some(w2),
26 _ => Some(w.clone()),
27 })
28 })
29 }
30
31 /// Wake the registered waker, if any.
32 pub fn wake(&self) {
33 critical_section::with(|cs| {
34 let cell = self.waker.borrow(cs);
35 if let Some(w) = cell.replace(None) {
36 w.wake_by_ref();
37 cell.set(Some(w));
38 }
39 })
40 }
41}
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
new file mode 100644
index 000000000..5c6a96ec8
--- /dev/null
+++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
@@ -0,0 +1,30 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4use core::task::Waker;
5
6/// Utility struct to register and wake a waker.
7pub struct AtomicWaker {
8 waker: AtomicPtr<()>,
9}
10
11impl AtomicWaker {
12 /// Create a new `AtomicWaker`.
13 pub const fn new() -> Self {
14 Self {
15 waker: AtomicPtr::new(ptr::null_mut()),
16 }
17 }
18
19 /// Register a waker. Overwrites the previous waker, if any.
20 pub fn register(&self, w: &Waker) {
21 self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release);
22 }
23
24 /// Wake the registered waker, if any.
25 pub fn wake(&self) {
26 if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) {
27 unsafe { Waker::from_turbo_ptr(ptr) }.wake();
28 }
29 }
30}
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs
index 6661a6b61..6b0b0c64e 100644
--- a/embassy-sync/src/waitqueue/mod.rs
+++ b/embassy-sync/src/waitqueue/mod.rs
@@ -1,7 +1,11 @@
1//! Async low-level wait queues 1//! Async low-level wait queues
2 2
3mod waker; 3#[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")]
4pub use waker::*; 4mod atomic_waker;
5pub use atomic_waker::*;
6
7mod waker_registration;
8pub use waker_registration::*;
5 9
6mod multi_waker; 10mod multi_waker;
7pub use multi_waker::*; 11pub use multi_waker::*;
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 325d2cb3a..824d192da 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -1,33 +1,58 @@
1use core::task::Waker; 1use core::task::Waker;
2 2
3use super::WakerRegistration; 3use heapless::Vec;
4 4
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> { 6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: [WakerRegistration; N], 7 wakers: Vec<Waker, N>,
8} 8}
9 9
10impl<const N: usize> MultiWakerRegistration<N> { 10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance 11 /// Create a new empty instance
12 pub const fn new() -> Self { 12 pub const fn new() -> Self {
13 const WAKER: WakerRegistration = WakerRegistration::new(); 13 Self { wakers: Vec::new() }
14 Self { wakers: [WAKER; N] }
15 } 14 }
16 15
17 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
18 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { 17 pub fn register<'a>(&mut self, w: &'a Waker) {
19 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { 18 // If we already have some waker that wakes the same task as `w`, do nothing.
20 waker_slot.register(w); 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
21 Ok(()) 20 for w2 in &self.wakers {
22 } else { 21 if w.will_wake(w2) {
23 Err(w) 22 return;
23 }
24 }
25
26 if self.wakers.is_full() {
27 // All waker slots were full. It's a bit inefficient, but we can wake everything.
28 // Any future that is still active will simply reregister.
29 // This won't happen a lot, so it's ok.
30 self.wake();
31 }
32
33 if self.wakers.push(w.clone()).is_err() {
34 // This can't happen unless N=0
35 // (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
36 panic!("tried to push a waker to a zero-length MultiWakerRegistration")
24 } 37 }
25 } 38 }
26 39
27 /// Wake all registered wakers. This clears the buffer 40 /// Wake all registered wakers. This clears the buffer
28 pub fn wake(&mut self) { 41 pub fn wake(&mut self) {
29 for waker_slot in self.wakers.iter_mut() { 42 // heapless::Vec has no `drain()`, do it unsafely ourselves...
30 waker_slot.wake() 43
44 // First set length to 0, without dropping the contents.
45 // This is necessary for soundness: if wake() panics and we're using panic=unwind.
46 // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
47 // (it'll leak wakers, but that's not UB)
48 let len = self.wakers.len();
49 unsafe { self.wakers.set_len(0) }
50
51 for i in 0..len {
52 // Move a waker out of the vec.
53 let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
54 // Wake it by value, which consumes (drops) it.
55 waker.wake();
31 } 56 }
32 } 57 }
33} 58}
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker_registration.rs
index 64e300eb8..9b666e7c4 100644
--- a/embassy-sync/src/waitqueue/waker.rs
+++ b/embassy-sync/src/waitqueue/waker_registration.rs
@@ -1,12 +1,8 @@
1use core::cell::Cell;
2use core::mem; 1use core::mem;
3use core::task::Waker; 2use core::task::Waker;
4 3
5use crate::blocking_mutex::raw::CriticalSectionRawMutex;
6use crate::blocking_mutex::Mutex;
7
8/// Utility struct to register and wake a waker. 4/// Utility struct to register and wake a waker.
9#[derive(Debug)] 5#[derive(Debug, Default)]
10pub struct WakerRegistration { 6pub struct WakerRegistration {
11 waker: Option<Waker>, 7 waker: Option<Waker>,
12} 8}
@@ -54,39 +50,3 @@ impl WakerRegistration {
54 self.waker.is_some() 50 self.waker.is_some()
55 } 51 }
56} 52}
57
58/// Utility struct to register and wake a waker.
59pub struct AtomicWaker {
60 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
61}
62
63impl AtomicWaker {
64 /// Create a new `AtomicWaker`.
65 pub const fn new() -> Self {
66 Self {
67 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
68 }
69 }
70
71 /// Register a waker. Overwrites the previous waker, if any.
72 pub fn register(&self, w: &Waker) {
73 critical_section::with(|cs| {
74 let cell = self.waker.borrow(cs);
75 cell.set(match cell.replace(None) {
76 Some(w2) if (w2.will_wake(w)) => Some(w2),
77 _ => Some(w.clone()),
78 })
79 })
80 }
81
82 /// Wake the registered waker, if any.
83 pub fn wake(&self) {
84 critical_section::with(|cs| {
85 let cell = self.waker.borrow(cs);
86 if let Some(w) = cell.replace(None) {
87 w.wake_by_ref();
88 cell.set(Some(w));
89 }
90 })
91 }
92}