aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2023-05-26 13:35:53 +0000
committerGitHub <[email protected]>2023-05-26 13:35:53 +0000
commitc5c5b6472993434d62993102903db9fd6fac95a0 (patch)
tree9b0e2fd56ba0affcd82b06e5141967000e73566e
parent31b364b9b0a18a9ebb341747861441b11f621ea0 (diff)
parent3081ecf301a54f8ed3d0f72350dd21f8ac9e1b18 (diff)
Merge #1490
1490: sync: do will_wake check in MultiWakerRegistration. r=Dirbaio a=Dirbaio Co-authored-by: Dario Nieuwenhuis <[email protected]>
-rw-r--r--embassy-sync/src/pubsub/mod.rs34
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs49
2 files changed, 40 insertions, 43 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 59e701c58..6afd54af5 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -4,7 +4,7 @@
4 4
5use core::cell::RefCell; 5use core::cell::RefCell;
6use core::fmt::Debug; 6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker}; 7use core::task::{Context, Poll};
8 8
9use heapless::Deque; 9use heapless::Deque;
10 10
@@ -179,7 +179,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
179 // No, so we need to reregister our waker and sleep again 179 // No, so we need to reregister our waker and sleep again
180 None => { 180 None => {
181 if let Some(cx) = cx { 181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker()); 182 s.subscriber_wakers.register(cx.waker());
183 } 183 }
184 Poll::Pending 184 Poll::Pending
185 } 185 }
@@ -206,7 +206,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
206 // The queue is full, so we need to reregister our waker and go to sleep 206 // The queue is full, so we need to reregister our waker and go to sleep
207 Err(message) => { 207 Err(message) => {
208 if let Some(cx) = cx { 208 if let Some(cx) = cx {
209 s.register_publisher_waker(cx.waker()); 209 s.publisher_wakers.register(cx.waker());
210 } 210 }
211 Err(message) 211 Err(message)
212 } 212 }
@@ -335,34 +335,6 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
335 Some(WaitResult::Message(message)) 335 Some(WaitResult::Message(message))
336 } 336 }
337 337
338 fn register_subscriber_waker(&mut self, waker: &Waker) {
339 match self.subscriber_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.subscriber_wakers.wake();
347 self.subscriber_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn register_publisher_waker(&mut self, waker: &Waker) {
353 match self.publisher_wakers.register(waker) {
354 Ok(()) => {}
355 Err(_) => {
356 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
357 // We need to throw it away. It's a bit inefficient, but we can wake everything.
358 // Any future that is still active will simply reregister.
359 // This won't happen a lot, so it's ok.
360 self.publisher_wakers.wake();
361 self.publisher_wakers.register(waker).unwrap();
362 }
363 }
364 }
365
366 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { 338 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
367 self.subscriber_count -= 1; 339 self.subscriber_count -= 1;
368 340
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 325d2cb3a..824d192da 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -1,33 +1,58 @@
1use core::task::Waker; 1use core::task::Waker;
2 2
3use super::WakerRegistration; 3use heapless::Vec;
4 4
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> { 6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: [WakerRegistration; N], 7 wakers: Vec<Waker, N>,
8} 8}
9 9
10impl<const N: usize> MultiWakerRegistration<N> { 10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance 11 /// Create a new empty instance
12 pub const fn new() -> Self { 12 pub const fn new() -> Self {
13 const WAKER: WakerRegistration = WakerRegistration::new(); 13 Self { wakers: Vec::new() }
14 Self { wakers: [WAKER; N] }
15 } 14 }
16 15
17 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
18 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { 17 pub fn register<'a>(&mut self, w: &'a Waker) {
19 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { 18 // If we already have some waker that wakes the same task as `w`, do nothing.
20 waker_slot.register(w); 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
21 Ok(()) 20 for w2 in &self.wakers {
22 } else { 21 if w.will_wake(w2) {
23 Err(w) 22 return;
23 }
24 }
25
26 if self.wakers.is_full() {
27 // All waker slots were full. It's a bit inefficient, but we can wake everything.
28 // Any future that is still active will simply reregister.
29 // This won't happen a lot, so it's ok.
30 self.wake();
31 }
32
33 if self.wakers.push(w.clone()).is_err() {
34 // This can't happen unless N=0
35 // (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
36 panic!("tried to push a waker to a zero-length MultiWakerRegistration")
24 } 37 }
25 } 38 }
26 39
27 /// Wake all registered wakers. This clears the buffer 40 /// Wake all registered wakers. This clears the buffer
28 pub fn wake(&mut self) { 41 pub fn wake(&mut self) {
29 for waker_slot in self.wakers.iter_mut() { 42 // heapless::Vec has no `drain()`, do it unsafely ourselves...
30 waker_slot.wake() 43
44 // First set length to 0, without dropping the contents.
45 // This is necessary for soundness: if wake() panics and we're using panic=unwind.
46 // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
47 // (it'll leak wakers, but that's not UB)
48 let len = self.wakers.len();
49 unsafe { self.wakers.set_len(0) }
50
51 for i in 0..len {
52 // Move a waker out of the vec.
53 let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
54 // Wake it by value, which consumes (drops) it.
55 waker.wake();
31 } 56 }
32 } 57 }
33} 58}