aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
authorQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
committerQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
commit6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch)
tree748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-sync/src
parentd896f80405aa8963877049ed999e4aba25d6e2bb (diff)
parent6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff)
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/channel.rs4
-rw-r--r--embassy-sync/src/fmt.rs3
-rw-r--r--embassy-sync/src/lib.rs2
-rw-r--r--embassy-sync/src/mutex.rs19
-rw-r--r--embassy-sync/src/pipe.rs155
-rw-r--r--embassy-sync/src/pubsub/mod.rs186
-rw-r--r--embassy-sync/src/pubsub/publisher.rs17
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs6
-rw-r--r--embassy-sync/src/signal.rs90
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker.rs41
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker_turbo.rs30
-rw-r--r--embassy-sync/src/waitqueue/mod.rs8
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs49
-rw-r--r--embassy-sync/src/waitqueue/waker_registration.rs (renamed from embassy-sync/src/waitqueue/waker.rs)42
14 files changed, 441 insertions, 211 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 76f42d0e7..77352874d 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -181,6 +181,7 @@ where
181} 181}
182 182
183/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. 183/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
184#[must_use = "futures do nothing unless you `.await` or poll them"]
184pub struct RecvFuture<'ch, M, T, const N: usize> 185pub struct RecvFuture<'ch, M, T, const N: usize>
185where 186where
186 M: RawMutex, 187 M: RawMutex,
@@ -203,6 +204,7 @@ where
203} 204}
204 205
205/// Future returned by [`DynamicReceiver::recv`]. 206/// Future returned by [`DynamicReceiver::recv`].
207#[must_use = "futures do nothing unless you `.await` or poll them"]
206pub struct DynamicRecvFuture<'ch, T> { 208pub struct DynamicRecvFuture<'ch, T> {
207 channel: &'ch dyn DynamicChannel<T>, 209 channel: &'ch dyn DynamicChannel<T>,
208} 210}
@@ -219,6 +221,7 @@ impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
219} 221}
220 222
221/// Future returned by [`Channel::send`] and [`Sender::send`]. 223/// Future returned by [`Channel::send`] and [`Sender::send`].
224#[must_use = "futures do nothing unless you `.await` or poll them"]
222pub struct SendFuture<'ch, M, T, const N: usize> 225pub struct SendFuture<'ch, M, T, const N: usize>
223where 226where
224 M: RawMutex, 227 M: RawMutex,
@@ -250,6 +253,7 @@ where
250impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} 253impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
251 254
252/// Future returned by [`DynamicSender::send`]. 255/// Future returned by [`DynamicSender::send`].
256#[must_use = "futures do nothing unless you `.await` or poll them"]
253pub struct DynamicSendFuture<'ch, T> { 257pub struct DynamicSendFuture<'ch, T> {
254 channel: &'ch dyn DynamicChannel<T>, 258 channel: &'ch dyn DynamicChannel<T>,
255 message: Option<T>, 259 message: Option<T>,
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
index f8bb0a035..066970813 100644
--- a/embassy-sync/src/fmt.rs
+++ b/embassy-sync/src/fmt.rs
@@ -195,9 +195,6 @@ macro_rules! unwrap {
195 } 195 }
196} 196}
197 197
198#[cfg(feature = "defmt-timestamp-uptime")]
199defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
200
201#[derive(Debug, Copy, Clone, Eq, PartialEq)] 198#[derive(Debug, Copy, Clone, Eq, PartialEq)]
202pub struct NoneError; 199pub struct NoneError;
203 200
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 25150e8aa..53d95d081 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -1,5 +1,5 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] 2#![cfg_attr(feature = "nightly", feature(async_fn_in_trait, impl_trait_projections))]
3#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
4#![doc = include_str!("../README.md")] 4#![doc = include_str!("../README.md")]
5#![warn(missing_docs)] 5#![warn(missing_docs)]
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 75a6e8dd3..fcf056d36 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -2,11 +2,10 @@
2//! 2//!
3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. 3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell}; 4use core::cell::{RefCell, UnsafeCell};
5use core::future::poll_fn;
5use core::ops::{Deref, DerefMut}; 6use core::ops::{Deref, DerefMut};
6use core::task::Poll; 7use core::task::Poll;
7 8
8use futures_util::future::poll_fn;
9
10use crate::blocking_mutex::raw::RawMutex; 9use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex as BlockingMutex; 10use crate::blocking_mutex::Mutex as BlockingMutex;
12use crate::waitqueue::WakerRegistration; 11use crate::waitqueue::WakerRegistration;
@@ -111,6 +110,22 @@ where
111 110
112 Ok(MutexGuard { mutex: self }) 111 Ok(MutexGuard { mutex: self })
113 } 112 }
113
114 /// Consumes this mutex, returning the underlying data.
115 pub fn into_inner(self) -> T
116 where
117 T: Sized,
118 {
119 self.inner.into_inner()
120 }
121
122 /// Returns a mutable reference to the underlying data.
123 ///
124 /// Since this call borrows the Mutex mutably, no actual locking needs to
125 /// take place -- the mutable borrow statically guarantees no locks exist.
126 pub fn get_mut(&mut self) -> &mut T {
127 self.inner.get_mut()
128 }
114} 129}
115 130
116/// Async mutex guard. 131/// Async mutex guard.
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 7d64b648e..13bf4ef01 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -32,22 +32,23 @@ impl<'p, M, const N: usize> Writer<'p, M, N>
32where 32where
33 M: RawMutex, 33 M: RawMutex,
34{ 34{
35 /// Writes a value. 35 /// Write some bytes to the pipe.
36 /// 36 ///
37 /// See [`Pipe::write()`] 37 /// See [`Pipe::write()`]
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { 38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
39 self.pipe.write(buf) 39 self.pipe.write(buf)
40 } 40 }
41 41
42 /// Attempt to immediately write a message. 42 /// Attempt to immediately write some bytes to the pipe.
43 /// 43 ///
44 /// See [`Pipe::write()`] 44 /// See [`Pipe::try_write()`]
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { 45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
46 self.pipe.try_write(buf) 46 self.pipe.try_write(buf)
47 } 47 }
48} 48}
49 49
50/// Future returned by [`Pipe::write`] and [`Writer::write`]. 50/// Future returned by [`Pipe::write`] and [`Writer::write`].
51#[must_use = "futures do nothing unless you `.await` or poll them"]
51pub struct WriteFuture<'p, M, const N: usize> 52pub struct WriteFuture<'p, M, const N: usize>
52where 53where
53 M: RawMutex, 54 M: RawMutex,
@@ -94,22 +95,23 @@ impl<'p, M, const N: usize> Reader<'p, M, N>
94where 95where
95 M: RawMutex, 96 M: RawMutex,
96{ 97{
97 /// Reads a value. 98 /// Read some bytes from the pipe.
98 /// 99 ///
99 /// See [`Pipe::read()`] 100 /// See [`Pipe::read()`]
100 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { 101 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
101 self.pipe.read(buf) 102 self.pipe.read(buf)
102 } 103 }
103 104
104 /// Attempt to immediately read a message. 105 /// Attempt to immediately read some bytes from the pipe.
105 /// 106 ///
106 /// See [`Pipe::read()`] 107 /// See [`Pipe::try_read()`]
107 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { 108 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
108 self.pipe.try_read(buf) 109 self.pipe.try_read(buf)
109 } 110 }
110} 111}
111 112
112/// Future returned by [`Pipe::read`] and [`Reader::read`]. 113/// Future returned by [`Pipe::read`] and [`Reader::read`].
114#[must_use = "futures do nothing unless you `.await` or poll them"]
113pub struct ReadFuture<'p, M, const N: usize> 115pub struct ReadFuture<'p, M, const N: usize>
114where 116where
115 M: RawMutex, 117 M: RawMutex,
@@ -219,12 +221,11 @@ impl<const N: usize> PipeState<N> {
219 } 221 }
220} 222}
221 223
222/// A bounded pipe for communicating between asynchronous tasks 224/// A bounded byte-oriented pipe for communicating between asynchronous tasks
223/// with backpressure. 225/// with backpressure.
224/// 226///
225/// The pipe will buffer up to the provided number of messages. Once the 227/// The pipe will buffer up to the provided number of bytes. Once the
226/// buffer is full, attempts to `write` new messages will wait until a message is 228/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up.
227/// read from the pipe.
228/// 229///
229/// All data written will become available in the same order as it was written. 230/// All data written will become available in the same order as it was written.
230pub struct Pipe<M, const N: usize> 231pub struct Pipe<M, const N: usize>
@@ -275,40 +276,66 @@ where
275 Reader { pipe: self } 276 Reader { pipe: self }
276 } 277 }
277 278
278 /// Write a value, waiting until there is capacity. 279 /// Write some bytes to the pipe.
280 ///
281 /// This method writes a nonzero amount of bytes from `buf` into the pipe, and
282 /// returns the amount of bytes written.
279 /// 283 ///
280 /// Writeing completes when the value has been pushed to the pipe's queue. 284 /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full,
281 /// This doesn't mean the value has been read yet. 285 /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that
286 /// returns an error instead of waiting.
287 ///
288 /// It is not guaranteed that all bytes in the buffer are written, even if there's enough
289 /// free space in the pipe buffer for all. In other words, it is possible for `write` to return
290 /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave
291 /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like
292 /// `write_all` from the `embedded-io` crate.
282 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { 293 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
283 WriteFuture { pipe: self, buf } 294 WriteFuture { pipe: self, buf }
284 } 295 }
285 296
286 /// Attempt to immediately write a message. 297 /// Write all bytes to the pipe.
287 ///
288 /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
289 /// buffer is full, instead of waiting.
290 /// 298 ///
291 /// # Errors 299 /// This method writes all bytes from `buf` into the pipe
300 pub async fn write_all(&self, mut buf: &[u8]) {
301 while !buf.is_empty() {
302 let n = self.write(buf).await;
303 buf = &buf[n..];
304 }
305 }
306
307 /// Attempt to immediately write some bytes to the pipe.
292 /// 308 ///
293 /// If the pipe capacity has been reached, i.e., the pipe has `n` 309 /// This method will either write a nonzero amount of bytes to the pipe immediately,
294 /// buffered values where `n` is the argument passed to [`Pipe`], then an 310 /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant
295 /// error is returned. 311 /// that waits instead of returning an error.
296 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { 312 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
297 self.lock(|c| c.try_write(buf)) 313 self.lock(|c| c.try_write(buf))
298 } 314 }
299 315
300 /// Receive the next value. 316 /// Read some bytes from the pipe.
317 ///
318 /// This method reads a nonzero amount of bytes from the pipe into `buf` and
319 /// returns the amount of bytes read.
320 ///
321 /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty,
322 /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that
323 /// returns an error instead of waiting.
301 /// 324 ///
302 /// If there are no messages in the pipe's buffer, this method will 325 /// It is not guaranteed that all bytes in the buffer are read, even if there's enough
303 /// wait until a message is written. 326 /// space in `buf` for all. In other words, it is possible for `read` to return
327 /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes
328 /// in the pipe buffer. You should always `read` in a loop, or use helpers like
329 /// `read_exact` from the `embedded-io` crate.
304 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { 330 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
305 ReadFuture { pipe: self, buf } 331 ReadFuture { pipe: self, buf }
306 } 332 }
307 333
308 /// Attempt to immediately read a message. 334 /// Attempt to immediately read some bytes from the pipe.
309 /// 335 ///
310 /// This method will either read a message from the pipe immediately or return an error 336 /// This method will either read a nonzero amount of bytes from the pipe immediately,
311 /// if the pipe is empty. 337 /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant
338 /// that waits instead of returning an error.
312 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { 339 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
313 self.lock(|c| c.try_read(buf)) 340 self.lock(|c| c.try_read(buf))
314 } 341 }
@@ -352,8 +379,6 @@ where
352mod io_impls { 379mod io_impls {
353 use core::convert::Infallible; 380 use core::convert::Infallible;
354 381
355 use futures_util::FutureExt;
356
357 use super::*; 382 use super::*;
358 383
359 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { 384 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> {
@@ -361,30 +386,18 @@ mod io_impls {
361 } 386 }
362 387
363 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { 388 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> {
364 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 389 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
365 where 390 Ok(Pipe::read(self, buf).await)
366 Self: 'a;
367
368 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
369 Pipe::read(self, buf).map(Ok)
370 } 391 }
371 } 392 }
372 393
373 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { 394 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> {
374 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 395 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
375 where 396 Ok(Pipe::write(self, buf).await)
376 Self: 'a;
377
378 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
379 Pipe::write(self, buf).map(Ok)
380 } 397 }
381 398
382 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 399 async fn flush(&mut self) -> Result<(), Self::Error> {
383 where 400 Ok(())
384 Self: 'a;
385
386 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
387 futures_util::future::ready(Ok(()))
388 } 401 }
389 } 402 }
390 403
@@ -393,30 +406,18 @@ mod io_impls {
393 } 406 }
394 407
395 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { 408 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> {
396 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 409 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
397 where 410 Ok(Pipe::read(self, buf).await)
398 Self: 'a;
399
400 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
401 Pipe::read(self, buf).map(Ok)
402 } 411 }
403 } 412 }
404 413
405 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { 414 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> {
406 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 415 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
407 where 416 Ok(Pipe::write(self, buf).await)
408 Self: 'a;
409
410 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
411 Pipe::write(self, buf).map(Ok)
412 } 417 }
413 418
414 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 419 async fn flush(&mut self) -> Result<(), Self::Error> {
415 where 420 Ok(())
416 Self: 'a;
417
418 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
419 futures_util::future::ready(Ok(()))
420 } 421 }
421 } 422 }
422 423
@@ -425,12 +426,8 @@ mod io_impls {
425 } 426 }
426 427
427 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { 428 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> {
428 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 429 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
429 where 430 Ok(Reader::read(self, buf).await)
430 Self: 'a;
431
432 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
433 Reader::read(self, buf).map(Ok)
434 } 431 }
435 } 432 }
436 433
@@ -439,20 +436,12 @@ mod io_impls {
439 } 436 }
440 437
441 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { 438 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> {
442 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 439 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
443 where 440 Ok(Writer::write(self, buf).await)
444 Self: 'a;
445
446 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
447 Writer::write(self, buf).map(Ok)
448 } 441 }
449 442
450 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 443 async fn flush(&mut self) -> Result<(), Self::Error> {
451 where 444 Ok(())
452 Self: 'a;
453
454 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
455 futures_util::future::ready(Ok(()))
456 } 445 }
457 } 446 }
458} 447}
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 62a9e4763..6afd54af5 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -4,7 +4,7 @@
4 4
5use core::cell::RefCell; 5use core::cell::RefCell;
6use core::fmt::Debug; 6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker}; 7use core::task::{Context, Poll};
8 8
9use heapless::Deque; 9use heapless::Deque;
10 10
@@ -179,7 +179,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
179 // No, so we need to reregister our waker and sleep again 179 // No, so we need to reregister our waker and sleep again
180 None => { 180 None => {
181 if let Some(cx) = cx { 181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker()); 182 s.subscriber_wakers.register(cx.waker());
183 } 183 }
184 Poll::Pending 184 Poll::Pending
185 } 185 }
@@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
192 }) 192 })
193 } 193 }
194 194
195 fn available(&self, next_message_id: u64) -> u64 {
196 self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
197 }
198
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { 199 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| { 200 self.inner.lock(|s| {
197 let mut s = s.borrow_mut(); 201 let mut s = s.borrow_mut();
@@ -202,7 +206,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
202 // The queue is full, so we need to reregister our waker and go to sleep 206 // The queue is full, so we need to reregister our waker and go to sleep
203 Err(message) => { 207 Err(message) => {
204 if let Some(cx) = cx { 208 if let Some(cx) = cx {
205 s.register_publisher_waker(cx.waker()); 209 s.publisher_wakers.register(cx.waker());
206 } 210 }
207 Err(message) 211 Err(message)
208 } 212 }
@@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
217 }) 221 })
218 } 222 }
219 223
224 fn space(&self) -> usize {
225 self.inner.lock(|s| {
226 let s = s.borrow();
227 s.queue.capacity() - s.queue.len()
228 })
229 }
230
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 231 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| { 232 self.inner.lock(|s| {
222 let mut s = s.borrow_mut(); 233 let mut s = s.borrow_mut();
@@ -311,44 +322,19 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
311 322
312 // We're reading this item, so decrement the counter 323 // We're reading this item, so decrement the counter
313 queue_item.1 -= 1; 324 queue_item.1 -= 1;
314 let message = queue_item.0.clone();
315 325
316 if current_message_index == 0 && queue_item.1 == 0 { 326 let message = if current_message_index == 0 && queue_item.1 == 0 {
317 self.queue.pop_front(); 327 let (message, _) = self.queue.pop_front().unwrap();
318 self.publisher_wakers.wake(); 328 self.publisher_wakers.wake();
319 } 329 // Return pop'd message without clone
330 message
331 } else {
332 queue_item.0.clone()
333 };
320 334
321 Some(WaitResult::Message(message)) 335 Some(WaitResult::Message(message))
322 } 336 }
323 337
324 fn register_subscriber_waker(&mut self, waker: &Waker) {
325 match self.subscriber_wakers.register(waker) {
326 Ok(()) => {}
327 Err(_) => {
328 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
329 // We need to throw it away. It's a bit inefficient, but we can wake everything.
330 // Any future that is still active will simply reregister.
331 // This won't happen a lot, so it's ok.
332 self.subscriber_wakers.wake();
333 self.subscriber_wakers.register(waker).unwrap();
334 }
335 }
336 }
337
338 fn register_publisher_waker(&mut self, waker: &Waker) {
339 match self.publisher_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.publisher_wakers.wake();
347 self.publisher_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { 338 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
353 self.subscriber_count -= 1; 339 self.subscriber_count -= 1;
354 340
@@ -360,6 +346,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
360 .iter_mut() 346 .iter_mut()
361 .skip(current_message_index) 347 .skip(current_message_index)
362 .for_each(|(_, counter)| *counter -= 1); 348 .for_each(|(_, counter)| *counter -= 1);
349
350 let mut wake_publishers = false;
351 while let Some((_, count)) = self.queue.front() {
352 if *count == 0 {
353 self.queue.pop_front().unwrap();
354 wake_publishers = true;
355 } else {
356 break;
357 }
358 }
359
360 if wake_publishers {
361 self.publisher_wakers.wake();
362 }
363 } 363 }
364 } 364 }
365 365
@@ -388,6 +388,10 @@ pub trait PubSubBehavior<T> {
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390 390
391 /// Get the amount of messages that are between the given the next_message_id and the most recent message.
392 /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
393 fn available(&self, next_message_id: u64) -> u64;
394
391 /// Try to publish a message to the queue. 395 /// Try to publish a message to the queue.
392 /// 396 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. 397 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
@@ -396,6 +400,9 @@ pub trait PubSubBehavior<T> {
396 /// Publish a message immediately 400 /// Publish a message immediately
397 fn publish_immediate(&self, message: T); 401 fn publish_immediate(&self, message: T);
398 402
403 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
404 fn space(&self) -> usize;
405
399 /// Let the channel know that a subscriber has dropped 406 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 407 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401 408
@@ -539,4 +546,113 @@ mod tests {
539 546
540 drop(sub0); 547 drop(sub0);
541 } 548 }
549
550 #[futures_test::test]
551 async fn correct_available() {
552 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
553
554 let sub0 = channel.subscriber().unwrap();
555 let mut sub1 = channel.subscriber().unwrap();
556 let pub0 = channel.publisher().unwrap();
557
558 assert_eq!(sub0.available(), 0);
559 assert_eq!(sub1.available(), 0);
560
561 pub0.publish(42).await;
562
563 assert_eq!(sub0.available(), 1);
564 assert_eq!(sub1.available(), 1);
565
566 sub1.next_message().await;
567
568 assert_eq!(sub1.available(), 0);
569
570 pub0.publish(42).await;
571
572 assert_eq!(sub0.available(), 2);
573 assert_eq!(sub1.available(), 1);
574 }
575
576 #[futures_test::test]
577 async fn correct_space() {
578 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
579
580 let mut sub0 = channel.subscriber().unwrap();
581 let mut sub1 = channel.subscriber().unwrap();
582 let pub0 = channel.publisher().unwrap();
583
584 assert_eq!(pub0.space(), 4);
585
586 pub0.publish(42).await;
587
588 assert_eq!(pub0.space(), 3);
589
590 pub0.publish(42).await;
591
592 assert_eq!(pub0.space(), 2);
593
594 sub0.next_message().await;
595 sub0.next_message().await;
596
597 assert_eq!(pub0.space(), 2);
598
599 sub1.next_message().await;
600 assert_eq!(pub0.space(), 3);
601 sub1.next_message().await;
602 assert_eq!(pub0.space(), 4);
603 }
604
605 #[futures_test::test]
606 async fn empty_channel_when_last_subscriber_is_dropped() {
607 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
608
609 let pub0 = channel.publisher().unwrap();
610 let mut sub0 = channel.subscriber().unwrap();
611 let mut sub1 = channel.subscriber().unwrap();
612
613 assert_eq!(4, pub0.space());
614
615 pub0.publish(1).await;
616 pub0.publish(2).await;
617
618 assert_eq!(2, channel.space());
619
620 assert_eq!(1, sub0.try_next_message_pure().unwrap());
621 assert_eq!(2, sub0.try_next_message_pure().unwrap());
622
623 assert_eq!(2, channel.space());
624
625 drop(sub0);
626
627 assert_eq!(2, channel.space());
628
629 assert_eq!(1, sub1.try_next_message_pure().unwrap());
630
631 assert_eq!(3, channel.space());
632
633 drop(sub1);
634
635 assert_eq!(4, channel.space());
636 }
637
638 struct CloneCallCounter(usize);
639
640 impl Clone for CloneCallCounter {
641 fn clone(&self) -> Self {
642 Self(self.0 + 1)
643 }
644 }
645
646 #[futures_test::test]
647 async fn skip_clone_for_last_message() {
648 let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
649 let pub0 = channel.publisher().unwrap();
650 let mut sub0 = channel.subscriber().unwrap();
651 let mut sub1 = channel.subscriber().unwrap();
652
653 pub0.publish(CloneCallCounter(0)).await;
654
655 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
656 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
657 }
542} 658}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index 705797f60..e1edc9eb9 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
42 pub fn try_publish(&self, message: T) -> Result<(), T> { 42 pub fn try_publish(&self, message: T) -> Result<(), T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
47 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
49 /// So checking doesn't give any guarantees.*
50 pub fn space(&self) -> usize {
51 self.channel.space()
52 }
45} 53}
46 54
47impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 55impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
115 pub fn try_publish(&self, message: T) -> Result<(), T> { 123 pub fn try_publish(&self, message: T) -> Result<(), T> {
116 self.channel.publish_with_context(message, None) 124 self.channel.publish_with_context(message, None)
117 } 125 }
126
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
128 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
130 /// So checking doesn't give any guarantees.*
131 pub fn space(&self) -> usize {
132 self.channel.space()
133 }
118} 134}
119 135
120/// An immediate publisher that holds a dynamic reference to the channel 136/// An immediate publisher that holds a dynamic reference to the channel
@@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
158} 174}
159 175
160/// Future for the publisher wait action 176/// Future for the publisher wait action
177#[must_use = "futures do nothing unless you `.await` or poll them"]
161pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 178pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
162 /// The message we need to publish 179 /// The message we need to publish
163 message: Option<T>, 180 message: Option<T>,
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index b9a2cbe18..f420a75f0 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
64 } 64 }
65 } 65 }
66 } 66 }
67
68 /// The amount of messages this subscriber hasn't received yet
69 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id)
71 }
67} 72}
68 73
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
@@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
135} 140}
136 141
137/// Future for the subscriber wait action 142/// Future for the subscriber wait action
143#[must_use = "futures do nothing unless you `.await` or poll them"]
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 144pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>, 145 subscriber: &'s mut Sub<'a, PSB, T>,
140} 146}
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index f6ebeb9b9..bea67d8be 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -1,9 +1,11 @@
1//! A synchronization primitive for passing the latest value to a task. 1//! A synchronization primitive for passing the latest value to a task.
2use core::cell::UnsafeCell; 2use core::cell::Cell;
3use core::future::Future; 3use core::future::{poll_fn, Future};
4use core::mem;
5use core::task::{Context, Poll, Waker}; 4use core::task::{Context, Poll, Waker};
6 5
6use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex;
8
7/// Single-slot signaling primitive. 9/// Single-slot signaling primitive.
8/// 10///
9/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except 11/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except
@@ -20,16 +22,20 @@ use core::task::{Context, Poll, Waker};
20/// 22///
21/// ``` 23/// ```
22/// use embassy_sync::signal::Signal; 24/// use embassy_sync::signal::Signal;
25/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
23/// 26///
24/// enum SomeCommand { 27/// enum SomeCommand {
25/// On, 28/// On,
26/// Off, 29/// Off,
27/// } 30/// }
28/// 31///
29/// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new(); 32/// static SOME_SIGNAL: Signal<CriticalSectionRawMutex, SomeCommand> = Signal::new();
30/// ``` 33/// ```
31pub struct Signal<T> { 34pub struct Signal<M, T>
32 state: UnsafeCell<State<T>>, 35where
36 M: RawMutex,
37{
38 state: Mutex<M, Cell<State<T>>>,
33} 39}
34 40
35enum State<T> { 41enum State<T> {
@@ -38,24 +44,36 @@ enum State<T> {
38 Signaled(T), 44 Signaled(T),
39} 45}
40 46
41unsafe impl<T: Send> Send for Signal<T> {} 47impl<M, T> Signal<M, T>
42unsafe impl<T: Send> Sync for Signal<T> {} 48where
43 49 M: RawMutex,
44impl<T> Signal<T> { 50{
45 /// Create a new `Signal`. 51 /// Create a new `Signal`.
46 pub const fn new() -> Self { 52 pub const fn new() -> Self {
47 Self { 53 Self {
48 state: UnsafeCell::new(State::None), 54 state: Mutex::new(Cell::new(State::None)),
49 } 55 }
50 } 56 }
51} 57}
52 58
53impl<T: Send> Signal<T> { 59impl<M, T> Default for Signal<M, T>
60where
61 M: RawMutex,
62{
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl<M, T: Send> Signal<M, T>
69where
70 M: RawMutex,
71{
54 /// Mark this Signal as signaled. 72 /// Mark this Signal as signaled.
55 pub fn signal(&self, val: T) { 73 pub fn signal(&self, val: T) {
56 critical_section::with(|_| unsafe { 74 self.state.lock(|cell| {
57 let state = &mut *self.state.get(); 75 let state = cell.replace(State::Signaled(val));
58 if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { 76 if let State::Waiting(waker) = state {
59 waker.wake(); 77 waker.wake();
60 } 78 }
61 }) 79 })
@@ -63,38 +81,46 @@ impl<T: Send> Signal<T> {
63 81
64 /// Remove the queued value in this `Signal`, if any. 82 /// Remove the queued value in this `Signal`, if any.
65 pub fn reset(&self) { 83 pub fn reset(&self) {
66 critical_section::with(|_| unsafe { 84 self.state.lock(|cell| cell.set(State::None));
67 let state = &mut *self.state.get();
68 *state = State::None
69 })
70 } 85 }
71 86
72 /// Manually poll the Signal future. 87 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
73 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { 88 self.state.lock(|cell| {
74 critical_section::with(|_| unsafe { 89 let state = cell.replace(State::None);
75 let state = &mut *self.state.get();
76 match state { 90 match state {
77 State::None => { 91 State::None => {
78 *state = State::Waiting(cx.waker().clone()); 92 cell.set(State::Waiting(cx.waker().clone()));
93 Poll::Pending
94 }
95 State::Waiting(w) if w.will_wake(cx.waker()) => {
96 cell.set(State::Waiting(w));
97 Poll::Pending
98 }
99 State::Waiting(w) => {
100 cell.set(State::Waiting(cx.waker().clone()));
101 w.wake();
79 Poll::Pending 102 Poll::Pending
80 } 103 }
81 State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, 104 State::Signaled(res) => Poll::Ready(res),
82 State::Waiting(_) => panic!("waker overflow"),
83 State::Signaled(_) => match mem::replace(state, State::None) {
84 State::Signaled(res) => Poll::Ready(res),
85 _ => unreachable!(),
86 },
87 } 105 }
88 }) 106 })
89 } 107 }
90 108
91 /// Future that completes when this Signal has been signaled. 109 /// Future that completes when this Signal has been signaled.
92 pub fn wait(&self) -> impl Future<Output = T> + '_ { 110 pub fn wait(&self) -> impl Future<Output = T> + '_ {
93 futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) 111 poll_fn(move |cx| self.poll_wait(cx))
94 } 112 }
95 113
96 /// non-blocking method to check whether this signal has been signaled. 114 /// non-blocking method to check whether this signal has been signaled.
97 pub fn signaled(&self) -> bool { 115 pub fn signaled(&self) -> bool {
98 critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) 116 self.state.lock(|cell| {
117 let state = cell.replace(State::None);
118
119 let res = matches!(state, State::Signaled(_));
120
121 cell.set(state);
122
123 res
124 })
99 } 125 }
100} 126}
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs
new file mode 100644
index 000000000..63fe04a6e
--- /dev/null
+++ b/embassy-sync/src/waitqueue/atomic_waker.rs
@@ -0,0 +1,41 @@
1use core::cell::Cell;
2use core::task::Waker;
3
4use crate::blocking_mutex::raw::CriticalSectionRawMutex;
5use crate::blocking_mutex::Mutex;
6
7/// Utility struct to register and wake a waker.
8pub struct AtomicWaker {
9 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
10}
11
12impl AtomicWaker {
13 /// Create a new `AtomicWaker`.
14 pub const fn new() -> Self {
15 Self {
16 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
17 }
18 }
19
20 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&self, w: &Waker) {
22 critical_section::with(|cs| {
23 let cell = self.waker.borrow(cs);
24 cell.set(match cell.replace(None) {
25 Some(w2) if (w2.will_wake(w)) => Some(w2),
26 _ => Some(w.clone()),
27 })
28 })
29 }
30
31 /// Wake the registered waker, if any.
32 pub fn wake(&self) {
33 critical_section::with(|cs| {
34 let cell = self.waker.borrow(cs);
35 if let Some(w) = cell.replace(None) {
36 w.wake_by_ref();
37 cell.set(Some(w));
38 }
39 })
40 }
41}
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
new file mode 100644
index 000000000..5c6a96ec8
--- /dev/null
+++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
@@ -0,0 +1,30 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4use core::task::Waker;
5
6/// Utility struct to register and wake a waker.
7pub struct AtomicWaker {
8 waker: AtomicPtr<()>,
9}
10
11impl AtomicWaker {
12 /// Create a new `AtomicWaker`.
13 pub const fn new() -> Self {
14 Self {
15 waker: AtomicPtr::new(ptr::null_mut()),
16 }
17 }
18
19 /// Register a waker. Overwrites the previous waker, if any.
20 pub fn register(&self, w: &Waker) {
21 self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release);
22 }
23
24 /// Wake the registered waker, if any.
25 pub fn wake(&self) {
26 if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) {
27 unsafe { Waker::from_turbo_ptr(ptr) }.wake();
28 }
29 }
30}
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs
index 6661a6b61..6b0b0c64e 100644
--- a/embassy-sync/src/waitqueue/mod.rs
+++ b/embassy-sync/src/waitqueue/mod.rs
@@ -1,7 +1,11 @@
1//! Async low-level wait queues 1//! Async low-level wait queues
2 2
3mod waker; 3#[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")]
4pub use waker::*; 4mod atomic_waker;
5pub use atomic_waker::*;
6
7mod waker_registration;
8pub use waker_registration::*;
5 9
6mod multi_waker; 10mod multi_waker;
7pub use multi_waker::*; 11pub use multi_waker::*;
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 325d2cb3a..824d192da 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -1,33 +1,58 @@
1use core::task::Waker; 1use core::task::Waker;
2 2
3use super::WakerRegistration; 3use heapless::Vec;
4 4
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> { 6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: [WakerRegistration; N], 7 wakers: Vec<Waker, N>,
8} 8}
9 9
10impl<const N: usize> MultiWakerRegistration<N> { 10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance 11 /// Create a new empty instance
12 pub const fn new() -> Self { 12 pub const fn new() -> Self {
13 const WAKER: WakerRegistration = WakerRegistration::new(); 13 Self { wakers: Vec::new() }
14 Self { wakers: [WAKER; N] }
15 } 14 }
16 15
17 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
18 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { 17 pub fn register<'a>(&mut self, w: &'a Waker) {
19 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { 18 // If we already have some waker that wakes the same task as `w`, do nothing.
20 waker_slot.register(w); 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
21 Ok(()) 20 for w2 in &self.wakers {
22 } else { 21 if w.will_wake(w2) {
23 Err(w) 22 return;
23 }
24 }
25
26 if self.wakers.is_full() {
27 // All waker slots were full. It's a bit inefficient, but we can wake everything.
28 // Any future that is still active will simply reregister.
29 // This won't happen a lot, so it's ok.
30 self.wake();
31 }
32
33 if self.wakers.push(w.clone()).is_err() {
34 // This can't happen unless N=0
35 // (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
36 panic!("tried to push a waker to a zero-length MultiWakerRegistration")
24 } 37 }
25 } 38 }
26 39
27 /// Wake all registered wakers. This clears the buffer 40 /// Wake all registered wakers. This clears the buffer
28 pub fn wake(&mut self) { 41 pub fn wake(&mut self) {
29 for waker_slot in self.wakers.iter_mut() { 42 // heapless::Vec has no `drain()`, do it unsafely ourselves...
30 waker_slot.wake() 43
44 // First set length to 0, without dropping the contents.
45 // This is necessary for soundness: if wake() panics and we're using panic=unwind.
46 // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
47 // (it'll leak wakers, but that's not UB)
48 let len = self.wakers.len();
49 unsafe { self.wakers.set_len(0) }
50
51 for i in 0..len {
52 // Move a waker out of the vec.
53 let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
54 // Wake it by value, which consumes (drops) it.
55 waker.wake();
31 } 56 }
32 } 57 }
33} 58}
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker_registration.rs
index 64e300eb8..9b666e7c4 100644
--- a/embassy-sync/src/waitqueue/waker.rs
+++ b/embassy-sync/src/waitqueue/waker_registration.rs
@@ -1,12 +1,8 @@
1use core::cell::Cell;
2use core::mem; 1use core::mem;
3use core::task::Waker; 2use core::task::Waker;
4 3
5use crate::blocking_mutex::raw::CriticalSectionRawMutex;
6use crate::blocking_mutex::Mutex;
7
8/// Utility struct to register and wake a waker. 4/// Utility struct to register and wake a waker.
9#[derive(Debug)] 5#[derive(Debug, Default)]
10pub struct WakerRegistration { 6pub struct WakerRegistration {
11 waker: Option<Waker>, 7 waker: Option<Waker>,
12} 8}
@@ -54,39 +50,3 @@ impl WakerRegistration {
54 self.waker.is_some() 50 self.waker.is_some()
55 } 51 }
56} 52}
57
58/// Utility struct to register and wake a waker.
59pub struct AtomicWaker {
60 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
61}
62
63impl AtomicWaker {
64 /// Create a new `AtomicWaker`.
65 pub const fn new() -> Self {
66 Self {
67 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
68 }
69 }
70
71 /// Register a waker. Overwrites the previous waker, if any.
72 pub fn register(&self, w: &Waker) {
73 critical_section::with(|cs| {
74 let cell = self.waker.borrow(cs);
75 cell.set(match cell.replace(None) {
76 Some(w2) if (w2.will_wake(w)) => Some(w2),
77 _ => Some(w.clone()),
78 })
79 })
80 }
81
82 /// Wake the registered waker, if any.
83 pub fn wake(&self) {
84 critical_section::with(|cs| {
85 let cell = self.waker.borrow(cs);
86 if let Some(w) = cell.replace(None) {
87 w.wake_by_ref();
88 cell.set(Some(w));
89 }
90 })
91 }
92}