aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
committerQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
commit6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch)
tree748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-sync
parentd896f80405aa8963877049ed999e4aba25d6e2bb (diff)
parent6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff)
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/CHANGELOG.md22
-rw-r--r--embassy-sync/Cargo.toml22
-rw-r--r--embassy-sync/README.md24
-rw-r--r--embassy-sync/src/channel.rs4
-rw-r--r--embassy-sync/src/fmt.rs3
-rw-r--r--embassy-sync/src/lib.rs2
-rw-r--r--embassy-sync/src/mutex.rs19
-rw-r--r--embassy-sync/src/pipe.rs155
-rw-r--r--embassy-sync/src/pubsub/mod.rs186
-rw-r--r--embassy-sync/src/pubsub/publisher.rs17
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs6
-rw-r--r--embassy-sync/src/signal.rs90
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker.rs41
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker_turbo.rs30
-rw-r--r--embassy-sync/src/waitqueue/mod.rs8
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs49
-rw-r--r--embassy-sync/src/waitqueue/waker_registration.rs (renamed from embassy-sync/src/waitqueue/waker.rs)42
17 files changed, 503 insertions, 217 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md
new file mode 100644
index 000000000..a60f3f7c4
--- /dev/null
+++ b/embassy-sync/CHANGELOG.md
@@ -0,0 +1,22 @@
1# Changelog
2
3All notable changes to this project will be documented in this file.
4
5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7
8## 0.2.0 - 2023-04-13
9
10- pubsub: Fix messages not getting popped when the last subscriber that needed them gets dropped.
11- pubsub: Move instead of clone messages when the last subscriber pops them.
12- pubsub: Pop messages which count is 0 after unsubscribe.
13- Update `embedded-io` from `0.3` to `0.4` (uses `async fn` in traits).
14- impl `Default` for `WakerRegistration`
15- impl `Default` for `Signal`
16- Remove unnecessary uses of `atomic-polyfill`
17- Add `#[must_use]` to all futures.
18
19
20## 0.1.0 - 2022-08-26
21
22- First release \ No newline at end of file
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
index 0d14bba55..340724eab 100644
--- a/embassy-sync/Cargo.toml
+++ b/embassy-sync/Cargo.toml
@@ -1,7 +1,17 @@
1[package] 1[package]
2name = "embassy-sync" 2name = "embassy-sync"
3version = "0.1.0" 3version = "0.2.0"
4edition = "2021" 4edition = "2021"
5description = "no-std, no-alloc synchronization primitives with async support"
6repository = "https://github.com/embassy-rs/embassy"
7readme = "README.md"
8license = "MIT OR Apache-2.0"
9categories = [
10 "embedded",
11 "no-std",
12 "concurrency",
13 "asynchronous",
14]
5 15
6[package.metadata.embassy_docs] 16[package.metadata.embassy_docs]
7src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" 17src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/"
@@ -9,19 +19,23 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/
9features = ["nightly"] 19features = ["nightly"]
10target = "thumbv7em-none-eabi" 20target = "thumbv7em-none-eabi"
11 21
22[package.metadata.docs.rs]
23features = ["nightly"]
24
12[features] 25[features]
13nightly = ["embedded-io/async"] 26nightly = ["embedded-io/async"]
27std = []
28turbowakers = []
14 29
15[dependencies] 30[dependencies]
16defmt = { version = "0.3", optional = true } 31defmt = { version = "0.3", optional = true }
17log = { version = "0.4.14", optional = true } 32log = { version = "0.4.14", optional = true }
18 33
19futures-util = { version = "0.3.17", default-features = false } 34futures-util = { version = "0.3.17", default-features = false }
20atomic-polyfill = "1.0.1"
21critical-section = "1.1" 35critical-section = "1.1"
22heapless = "0.7.5" 36heapless = "0.7.5"
23cfg-if = "1.0.0" 37cfg-if = "1.0.0"
24embedded-io = "0.3.0" 38embedded-io = "0.4.0"
25 39
26[dev-dependencies] 40[dev-dependencies]
27futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } 41futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
@@ -31,4 +45,4 @@ futures-util = { version = "0.3.17", features = [ "channel" ] }
31 45
32# Enable critical-section implementation for std, for tests 46# Enable critical-section implementation for std, for tests
33critical-section = { version = "1.1", features = ["std"] } 47critical-section = { version = "1.1", features = ["std"] }
34static_cell = "1.0" 48static_cell = "1.1"
diff --git a/embassy-sync/README.md b/embassy-sync/README.md
index 106295c0d..cc65cf6ef 100644
--- a/embassy-sync/README.md
+++ b/embassy-sync/README.md
@@ -1,12 +1,32 @@
1# embassy-sync 1# embassy-sync
2 2
3Synchronization primitives and data structures with an async API: 3An [Embassy](https://embassy.dev) project.
4
5Synchronization primitives and data structures with async support:
4 6
5- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. 7- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer.
6- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. 8- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers.
7- [`Signal`](signal::Signal) - Signalling latest value to a single consumer. 9- [`Signal`](signal::Signal) - Signalling latest value to a single consumer.
8- [`Mutex`](mutex::Mutex) - A Mutex for synchronizing state between asynchronous tasks. 10- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks.
9- [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. 11- [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits.
10- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. 12- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`.
11- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. 13- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API.
12- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. 14- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s.
15
16## Interoperability
17
18Futures from this crate can run on any executor.
19
20## Minimum supported Rust version (MSRV)
21
22Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release.
23
24## License
25
26This work is licensed under either of
27
28- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or
29 <http://www.apache.org/licenses/LICENSE-2.0>)
30- MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
31
32at your option.
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 76f42d0e7..77352874d 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -181,6 +181,7 @@ where
181} 181}
182 182
183/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. 183/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
184#[must_use = "futures do nothing unless you `.await` or poll them"]
184pub struct RecvFuture<'ch, M, T, const N: usize> 185pub struct RecvFuture<'ch, M, T, const N: usize>
185where 186where
186 M: RawMutex, 187 M: RawMutex,
@@ -203,6 +204,7 @@ where
203} 204}
204 205
205/// Future returned by [`DynamicReceiver::recv`]. 206/// Future returned by [`DynamicReceiver::recv`].
207#[must_use = "futures do nothing unless you `.await` or poll them"]
206pub struct DynamicRecvFuture<'ch, T> { 208pub struct DynamicRecvFuture<'ch, T> {
207 channel: &'ch dyn DynamicChannel<T>, 209 channel: &'ch dyn DynamicChannel<T>,
208} 210}
@@ -219,6 +221,7 @@ impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
219} 221}
220 222
221/// Future returned by [`Channel::send`] and [`Sender::send`]. 223/// Future returned by [`Channel::send`] and [`Sender::send`].
224#[must_use = "futures do nothing unless you `.await` or poll them"]
222pub struct SendFuture<'ch, M, T, const N: usize> 225pub struct SendFuture<'ch, M, T, const N: usize>
223where 226where
224 M: RawMutex, 227 M: RawMutex,
@@ -250,6 +253,7 @@ where
250impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} 253impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
251 254
252/// Future returned by [`DynamicSender::send`]. 255/// Future returned by [`DynamicSender::send`].
256#[must_use = "futures do nothing unless you `.await` or poll them"]
253pub struct DynamicSendFuture<'ch, T> { 257pub struct DynamicSendFuture<'ch, T> {
254 channel: &'ch dyn DynamicChannel<T>, 258 channel: &'ch dyn DynamicChannel<T>,
255 message: Option<T>, 259 message: Option<T>,
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
index f8bb0a035..066970813 100644
--- a/embassy-sync/src/fmt.rs
+++ b/embassy-sync/src/fmt.rs
@@ -195,9 +195,6 @@ macro_rules! unwrap {
195 } 195 }
196} 196}
197 197
198#[cfg(feature = "defmt-timestamp-uptime")]
199defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
200
201#[derive(Debug, Copy, Clone, Eq, PartialEq)] 198#[derive(Debug, Copy, Clone, Eq, PartialEq)]
202pub struct NoneError; 199pub struct NoneError;
203 200
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 25150e8aa..53d95d081 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -1,5 +1,5 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] 2#![cfg_attr(feature = "nightly", feature(async_fn_in_trait, impl_trait_projections))]
3#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
4#![doc = include_str!("../README.md")] 4#![doc = include_str!("../README.md")]
5#![warn(missing_docs)] 5#![warn(missing_docs)]
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 75a6e8dd3..fcf056d36 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -2,11 +2,10 @@
2//! 2//!
3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. 3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell}; 4use core::cell::{RefCell, UnsafeCell};
5use core::future::poll_fn;
5use core::ops::{Deref, DerefMut}; 6use core::ops::{Deref, DerefMut};
6use core::task::Poll; 7use core::task::Poll;
7 8
8use futures_util::future::poll_fn;
9
10use crate::blocking_mutex::raw::RawMutex; 9use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex as BlockingMutex; 10use crate::blocking_mutex::Mutex as BlockingMutex;
12use crate::waitqueue::WakerRegistration; 11use crate::waitqueue::WakerRegistration;
@@ -111,6 +110,22 @@ where
111 110
112 Ok(MutexGuard { mutex: self }) 111 Ok(MutexGuard { mutex: self })
113 } 112 }
113
114 /// Consumes this mutex, returning the underlying data.
115 pub fn into_inner(self) -> T
116 where
117 T: Sized,
118 {
119 self.inner.into_inner()
120 }
121
122 /// Returns a mutable reference to the underlying data.
123 ///
124 /// Since this call borrows the Mutex mutably, no actual locking needs to
125 /// take place -- the mutable borrow statically guarantees no locks exist.
126 pub fn get_mut(&mut self) -> &mut T {
127 self.inner.get_mut()
128 }
114} 129}
115 130
116/// Async mutex guard. 131/// Async mutex guard.
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 7d64b648e..13bf4ef01 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -32,22 +32,23 @@ impl<'p, M, const N: usize> Writer<'p, M, N>
32where 32where
33 M: RawMutex, 33 M: RawMutex,
34{ 34{
35 /// Writes a value. 35 /// Write some bytes to the pipe.
36 /// 36 ///
37 /// See [`Pipe::write()`] 37 /// See [`Pipe::write()`]
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { 38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
39 self.pipe.write(buf) 39 self.pipe.write(buf)
40 } 40 }
41 41
42 /// Attempt to immediately write a message. 42 /// Attempt to immediately write some bytes to the pipe.
43 /// 43 ///
44 /// See [`Pipe::write()`] 44 /// See [`Pipe::try_write()`]
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { 45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
46 self.pipe.try_write(buf) 46 self.pipe.try_write(buf)
47 } 47 }
48} 48}
49 49
50/// Future returned by [`Pipe::write`] and [`Writer::write`]. 50/// Future returned by [`Pipe::write`] and [`Writer::write`].
51#[must_use = "futures do nothing unless you `.await` or poll them"]
51pub struct WriteFuture<'p, M, const N: usize> 52pub struct WriteFuture<'p, M, const N: usize>
52where 53where
53 M: RawMutex, 54 M: RawMutex,
@@ -94,22 +95,23 @@ impl<'p, M, const N: usize> Reader<'p, M, N>
94where 95where
95 M: RawMutex, 96 M: RawMutex,
96{ 97{
97 /// Reads a value. 98 /// Read some bytes from the pipe.
98 /// 99 ///
99 /// See [`Pipe::read()`] 100 /// See [`Pipe::read()`]
100 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { 101 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
101 self.pipe.read(buf) 102 self.pipe.read(buf)
102 } 103 }
103 104
104 /// Attempt to immediately read a message. 105 /// Attempt to immediately read some bytes from the pipe.
105 /// 106 ///
106 /// See [`Pipe::read()`] 107 /// See [`Pipe::try_read()`]
107 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { 108 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
108 self.pipe.try_read(buf) 109 self.pipe.try_read(buf)
109 } 110 }
110} 111}
111 112
112/// Future returned by [`Pipe::read`] and [`Reader::read`]. 113/// Future returned by [`Pipe::read`] and [`Reader::read`].
114#[must_use = "futures do nothing unless you `.await` or poll them"]
113pub struct ReadFuture<'p, M, const N: usize> 115pub struct ReadFuture<'p, M, const N: usize>
114where 116where
115 M: RawMutex, 117 M: RawMutex,
@@ -219,12 +221,11 @@ impl<const N: usize> PipeState<N> {
219 } 221 }
220} 222}
221 223
222/// A bounded pipe for communicating between asynchronous tasks 224/// A bounded byte-oriented pipe for communicating between asynchronous tasks
223/// with backpressure. 225/// with backpressure.
224/// 226///
225/// The pipe will buffer up to the provided number of messages. Once the 227/// The pipe will buffer up to the provided number of bytes. Once the
226/// buffer is full, attempts to `write` new messages will wait until a message is 228/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up.
227/// read from the pipe.
228/// 229///
229/// All data written will become available in the same order as it was written. 230/// All data written will become available in the same order as it was written.
230pub struct Pipe<M, const N: usize> 231pub struct Pipe<M, const N: usize>
@@ -275,40 +276,66 @@ where
275 Reader { pipe: self } 276 Reader { pipe: self }
276 } 277 }
277 278
278 /// Write a value, waiting until there is capacity. 279 /// Write some bytes to the pipe.
280 ///
281 /// This method writes a nonzero amount of bytes from `buf` into the pipe, and
282 /// returns the amount of bytes written.
279 /// 283 ///
280 /// Writeing completes when the value has been pushed to the pipe's queue. 284 /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full,
281 /// This doesn't mean the value has been read yet. 285 /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that
286 /// returns an error instead of waiting.
287 ///
288 /// It is not guaranteed that all bytes in the buffer are written, even if there's enough
289 /// free space in the pipe buffer for all. In other words, it is possible for `write` to return
290 /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave
291 /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like
292 /// `write_all` from the `embedded-io` crate.
282 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { 293 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
283 WriteFuture { pipe: self, buf } 294 WriteFuture { pipe: self, buf }
284 } 295 }
285 296
286 /// Attempt to immediately write a message. 297 /// Write all bytes to the pipe.
287 ///
288 /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
289 /// buffer is full, instead of waiting.
290 /// 298 ///
291 /// # Errors 299 /// This method writes all bytes from `buf` into the pipe
300 pub async fn write_all(&self, mut buf: &[u8]) {
301 while !buf.is_empty() {
302 let n = self.write(buf).await;
303 buf = &buf[n..];
304 }
305 }
306
307 /// Attempt to immediately write some bytes to the pipe.
292 /// 308 ///
293 /// If the pipe capacity has been reached, i.e., the pipe has `n` 309 /// This method will either write a nonzero amount of bytes to the pipe immediately,
294 /// buffered values where `n` is the argument passed to [`Pipe`], then an 310 /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant
295 /// error is returned. 311 /// that waits instead of returning an error.
296 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { 312 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
297 self.lock(|c| c.try_write(buf)) 313 self.lock(|c| c.try_write(buf))
298 } 314 }
299 315
300 /// Receive the next value. 316 /// Read some bytes from the pipe.
317 ///
318 /// This method reads a nonzero amount of bytes from the pipe into `buf` and
319 /// returns the amount of bytes read.
320 ///
321 /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty,
322 /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that
323 /// returns an error instead of waiting.
301 /// 324 ///
302 /// If there are no messages in the pipe's buffer, this method will 325 /// It is not guaranteed that all bytes in the buffer are read, even if there's enough
303 /// wait until a message is written. 326 /// space in `buf` for all. In other words, it is possible for `read` to return
327 /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes
328 /// in the pipe buffer. You should always `read` in a loop, or use helpers like
329 /// `read_exact` from the `embedded-io` crate.
304 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { 330 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
305 ReadFuture { pipe: self, buf } 331 ReadFuture { pipe: self, buf }
306 } 332 }
307 333
308 /// Attempt to immediately read a message. 334 /// Attempt to immediately read some bytes from the pipe.
309 /// 335 ///
310 /// This method will either read a message from the pipe immediately or return an error 336 /// This method will either read a nonzero amount of bytes from the pipe immediately,
311 /// if the pipe is empty. 337 /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant
338 /// that waits instead of returning an error.
312 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { 339 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
313 self.lock(|c| c.try_read(buf)) 340 self.lock(|c| c.try_read(buf))
314 } 341 }
@@ -352,8 +379,6 @@ where
352mod io_impls { 379mod io_impls {
353 use core::convert::Infallible; 380 use core::convert::Infallible;
354 381
355 use futures_util::FutureExt;
356
357 use super::*; 382 use super::*;
358 383
359 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { 384 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> {
@@ -361,30 +386,18 @@ mod io_impls {
361 } 386 }
362 387
363 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { 388 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> {
364 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 389 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
365 where 390 Ok(Pipe::read(self, buf).await)
366 Self: 'a;
367
368 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
369 Pipe::read(self, buf).map(Ok)
370 } 391 }
371 } 392 }
372 393
373 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { 394 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> {
374 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 395 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
375 where 396 Ok(Pipe::write(self, buf).await)
376 Self: 'a;
377
378 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
379 Pipe::write(self, buf).map(Ok)
380 } 397 }
381 398
382 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 399 async fn flush(&mut self) -> Result<(), Self::Error> {
383 where 400 Ok(())
384 Self: 'a;
385
386 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
387 futures_util::future::ready(Ok(()))
388 } 401 }
389 } 402 }
390 403
@@ -393,30 +406,18 @@ mod io_impls {
393 } 406 }
394 407
395 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { 408 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> {
396 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 409 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
397 where 410 Ok(Pipe::read(self, buf).await)
398 Self: 'a;
399
400 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
401 Pipe::read(self, buf).map(Ok)
402 } 411 }
403 } 412 }
404 413
405 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { 414 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> {
406 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 415 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
407 where 416 Ok(Pipe::write(self, buf).await)
408 Self: 'a;
409
410 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
411 Pipe::write(self, buf).map(Ok)
412 } 417 }
413 418
414 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 419 async fn flush(&mut self) -> Result<(), Self::Error> {
415 where 420 Ok(())
416 Self: 'a;
417
418 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
419 futures_util::future::ready(Ok(()))
420 } 421 }
421 } 422 }
422 423
@@ -425,12 +426,8 @@ mod io_impls {
425 } 426 }
426 427
427 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { 428 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> {
428 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 429 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
429 where 430 Ok(Reader::read(self, buf).await)
430 Self: 'a;
431
432 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
433 Reader::read(self, buf).map(Ok)
434 } 431 }
435 } 432 }
436 433
@@ -439,20 +436,12 @@ mod io_impls {
439 } 436 }
440 437
441 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { 438 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> {
442 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 439 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
443 where 440 Ok(Writer::write(self, buf).await)
444 Self: 'a;
445
446 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
447 Writer::write(self, buf).map(Ok)
448 } 441 }
449 442
450 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 443 async fn flush(&mut self) -> Result<(), Self::Error> {
451 where 444 Ok(())
452 Self: 'a;
453
454 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
455 futures_util::future::ready(Ok(()))
456 } 445 }
457 } 446 }
458} 447}
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 62a9e4763..6afd54af5 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -4,7 +4,7 @@
4 4
5use core::cell::RefCell; 5use core::cell::RefCell;
6use core::fmt::Debug; 6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker}; 7use core::task::{Context, Poll};
8 8
9use heapless::Deque; 9use heapless::Deque;
10 10
@@ -179,7 +179,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
179 // No, so we need to reregister our waker and sleep again 179 // No, so we need to reregister our waker and sleep again
180 None => { 180 None => {
181 if let Some(cx) = cx { 181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker()); 182 s.subscriber_wakers.register(cx.waker());
183 } 183 }
184 Poll::Pending 184 Poll::Pending
185 } 185 }
@@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
192 }) 192 })
193 } 193 }
194 194
195 fn available(&self, next_message_id: u64) -> u64 {
196 self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
197 }
198
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { 199 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| { 200 self.inner.lock(|s| {
197 let mut s = s.borrow_mut(); 201 let mut s = s.borrow_mut();
@@ -202,7 +206,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
202 // The queue is full, so we need to reregister our waker and go to sleep 206 // The queue is full, so we need to reregister our waker and go to sleep
203 Err(message) => { 207 Err(message) => {
204 if let Some(cx) = cx { 208 if let Some(cx) = cx {
205 s.register_publisher_waker(cx.waker()); 209 s.publisher_wakers.register(cx.waker());
206 } 210 }
207 Err(message) 211 Err(message)
208 } 212 }
@@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
217 }) 221 })
218 } 222 }
219 223
224 fn space(&self) -> usize {
225 self.inner.lock(|s| {
226 let s = s.borrow();
227 s.queue.capacity() - s.queue.len()
228 })
229 }
230
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 231 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| { 232 self.inner.lock(|s| {
222 let mut s = s.borrow_mut(); 233 let mut s = s.borrow_mut();
@@ -311,44 +322,19 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
311 322
312 // We're reading this item, so decrement the counter 323 // We're reading this item, so decrement the counter
313 queue_item.1 -= 1; 324 queue_item.1 -= 1;
314 let message = queue_item.0.clone();
315 325
316 if current_message_index == 0 && queue_item.1 == 0 { 326 let message = if current_message_index == 0 && queue_item.1 == 0 {
317 self.queue.pop_front(); 327 let (message, _) = self.queue.pop_front().unwrap();
318 self.publisher_wakers.wake(); 328 self.publisher_wakers.wake();
319 } 329 // Return pop'd message without clone
330 message
331 } else {
332 queue_item.0.clone()
333 };
320 334
321 Some(WaitResult::Message(message)) 335 Some(WaitResult::Message(message))
322 } 336 }
323 337
324 fn register_subscriber_waker(&mut self, waker: &Waker) {
325 match self.subscriber_wakers.register(waker) {
326 Ok(()) => {}
327 Err(_) => {
328 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
329 // We need to throw it away. It's a bit inefficient, but we can wake everything.
330 // Any future that is still active will simply reregister.
331 // This won't happen a lot, so it's ok.
332 self.subscriber_wakers.wake();
333 self.subscriber_wakers.register(waker).unwrap();
334 }
335 }
336 }
337
338 fn register_publisher_waker(&mut self, waker: &Waker) {
339 match self.publisher_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.publisher_wakers.wake();
347 self.publisher_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { 338 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
353 self.subscriber_count -= 1; 339 self.subscriber_count -= 1;
354 340
@@ -360,6 +346,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
360 .iter_mut() 346 .iter_mut()
361 .skip(current_message_index) 347 .skip(current_message_index)
362 .for_each(|(_, counter)| *counter -= 1); 348 .for_each(|(_, counter)| *counter -= 1);
349
350 let mut wake_publishers = false;
351 while let Some((_, count)) = self.queue.front() {
352 if *count == 0 {
353 self.queue.pop_front().unwrap();
354 wake_publishers = true;
355 } else {
356 break;
357 }
358 }
359
360 if wake_publishers {
361 self.publisher_wakers.wake();
362 }
363 } 363 }
364 } 364 }
365 365
@@ -388,6 +388,10 @@ pub trait PubSubBehavior<T> {
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390 390
391 /// Get the amount of messages that are between the given the next_message_id and the most recent message.
392 /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
393 fn available(&self, next_message_id: u64) -> u64;
394
391 /// Try to publish a message to the queue. 395 /// Try to publish a message to the queue.
392 /// 396 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. 397 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
@@ -396,6 +400,9 @@ pub trait PubSubBehavior<T> {
396 /// Publish a message immediately 400 /// Publish a message immediately
397 fn publish_immediate(&self, message: T); 401 fn publish_immediate(&self, message: T);
398 402
403 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
404 fn space(&self) -> usize;
405
399 /// Let the channel know that a subscriber has dropped 406 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 407 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401 408
@@ -539,4 +546,113 @@ mod tests {
539 546
540 drop(sub0); 547 drop(sub0);
541 } 548 }
549
550 #[futures_test::test]
551 async fn correct_available() {
552 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
553
554 let sub0 = channel.subscriber().unwrap();
555 let mut sub1 = channel.subscriber().unwrap();
556 let pub0 = channel.publisher().unwrap();
557
558 assert_eq!(sub0.available(), 0);
559 assert_eq!(sub1.available(), 0);
560
561 pub0.publish(42).await;
562
563 assert_eq!(sub0.available(), 1);
564 assert_eq!(sub1.available(), 1);
565
566 sub1.next_message().await;
567
568 assert_eq!(sub1.available(), 0);
569
570 pub0.publish(42).await;
571
572 assert_eq!(sub0.available(), 2);
573 assert_eq!(sub1.available(), 1);
574 }
575
576 #[futures_test::test]
577 async fn correct_space() {
578 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
579
580 let mut sub0 = channel.subscriber().unwrap();
581 let mut sub1 = channel.subscriber().unwrap();
582 let pub0 = channel.publisher().unwrap();
583
584 assert_eq!(pub0.space(), 4);
585
586 pub0.publish(42).await;
587
588 assert_eq!(pub0.space(), 3);
589
590 pub0.publish(42).await;
591
592 assert_eq!(pub0.space(), 2);
593
594 sub0.next_message().await;
595 sub0.next_message().await;
596
597 assert_eq!(pub0.space(), 2);
598
599 sub1.next_message().await;
600 assert_eq!(pub0.space(), 3);
601 sub1.next_message().await;
602 assert_eq!(pub0.space(), 4);
603 }
604
605 #[futures_test::test]
606 async fn empty_channel_when_last_subscriber_is_dropped() {
607 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
608
609 let pub0 = channel.publisher().unwrap();
610 let mut sub0 = channel.subscriber().unwrap();
611 let mut sub1 = channel.subscriber().unwrap();
612
613 assert_eq!(4, pub0.space());
614
615 pub0.publish(1).await;
616 pub0.publish(2).await;
617
618 assert_eq!(2, channel.space());
619
620 assert_eq!(1, sub0.try_next_message_pure().unwrap());
621 assert_eq!(2, sub0.try_next_message_pure().unwrap());
622
623 assert_eq!(2, channel.space());
624
625 drop(sub0);
626
627 assert_eq!(2, channel.space());
628
629 assert_eq!(1, sub1.try_next_message_pure().unwrap());
630
631 assert_eq!(3, channel.space());
632
633 drop(sub1);
634
635 assert_eq!(4, channel.space());
636 }
637
638 struct CloneCallCounter(usize);
639
640 impl Clone for CloneCallCounter {
641 fn clone(&self) -> Self {
642 Self(self.0 + 1)
643 }
644 }
645
646 #[futures_test::test]
647 async fn skip_clone_for_last_message() {
648 let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
649 let pub0 = channel.publisher().unwrap();
650 let mut sub0 = channel.subscriber().unwrap();
651 let mut sub1 = channel.subscriber().unwrap();
652
653 pub0.publish(CloneCallCounter(0)).await;
654
655 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
656 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
657 }
542} 658}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index 705797f60..e1edc9eb9 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
42 pub fn try_publish(&self, message: T) -> Result<(), T> { 42 pub fn try_publish(&self, message: T) -> Result<(), T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
47 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
49 /// So checking doesn't give any guarantees.*
50 pub fn space(&self) -> usize {
51 self.channel.space()
52 }
45} 53}
46 54
47impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 55impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
115 pub fn try_publish(&self, message: T) -> Result<(), T> { 123 pub fn try_publish(&self, message: T) -> Result<(), T> {
116 self.channel.publish_with_context(message, None) 124 self.channel.publish_with_context(message, None)
117 } 125 }
126
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
128 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
130 /// So checking doesn't give any guarantees.*
131 pub fn space(&self) -> usize {
132 self.channel.space()
133 }
118} 134}
119 135
120/// An immediate publisher that holds a dynamic reference to the channel 136/// An immediate publisher that holds a dynamic reference to the channel
@@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
158} 174}
159 175
160/// Future for the publisher wait action 176/// Future for the publisher wait action
177#[must_use = "futures do nothing unless you `.await` or poll them"]
161pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 178pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
162 /// The message we need to publish 179 /// The message we need to publish
163 message: Option<T>, 180 message: Option<T>,
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index b9a2cbe18..f420a75f0 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
64 } 64 }
65 } 65 }
66 } 66 }
67
68 /// The amount of messages this subscriber hasn't received yet
69 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id)
71 }
67} 72}
68 73
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
@@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
135} 140}
136 141
137/// Future for the subscriber wait action 142/// Future for the subscriber wait action
143#[must_use = "futures do nothing unless you `.await` or poll them"]
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 144pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>, 145 subscriber: &'s mut Sub<'a, PSB, T>,
140} 146}
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index f6ebeb9b9..bea67d8be 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -1,9 +1,11 @@
1//! A synchronization primitive for passing the latest value to a task. 1//! A synchronization primitive for passing the latest value to a task.
2use core::cell::UnsafeCell; 2use core::cell::Cell;
3use core::future::Future; 3use core::future::{poll_fn, Future};
4use core::mem;
5use core::task::{Context, Poll, Waker}; 4use core::task::{Context, Poll, Waker};
6 5
6use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex;
8
7/// Single-slot signaling primitive. 9/// Single-slot signaling primitive.
8/// 10///
9/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except 11/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except
@@ -20,16 +22,20 @@ use core::task::{Context, Poll, Waker};
20/// 22///
21/// ``` 23/// ```
22/// use embassy_sync::signal::Signal; 24/// use embassy_sync::signal::Signal;
25/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
23/// 26///
24/// enum SomeCommand { 27/// enum SomeCommand {
25/// On, 28/// On,
26/// Off, 29/// Off,
27/// } 30/// }
28/// 31///
29/// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new(); 32/// static SOME_SIGNAL: Signal<CriticalSectionRawMutex, SomeCommand> = Signal::new();
30/// ``` 33/// ```
31pub struct Signal<T> { 34pub struct Signal<M, T>
32 state: UnsafeCell<State<T>>, 35where
36 M: RawMutex,
37{
38 state: Mutex<M, Cell<State<T>>>,
33} 39}
34 40
35enum State<T> { 41enum State<T> {
@@ -38,24 +44,36 @@ enum State<T> {
38 Signaled(T), 44 Signaled(T),
39} 45}
40 46
41unsafe impl<T: Send> Send for Signal<T> {} 47impl<M, T> Signal<M, T>
42unsafe impl<T: Send> Sync for Signal<T> {} 48where
43 49 M: RawMutex,
44impl<T> Signal<T> { 50{
45 /// Create a new `Signal`. 51 /// Create a new `Signal`.
46 pub const fn new() -> Self { 52 pub const fn new() -> Self {
47 Self { 53 Self {
48 state: UnsafeCell::new(State::None), 54 state: Mutex::new(Cell::new(State::None)),
49 } 55 }
50 } 56 }
51} 57}
52 58
53impl<T: Send> Signal<T> { 59impl<M, T> Default for Signal<M, T>
60where
61 M: RawMutex,
62{
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl<M, T: Send> Signal<M, T>
69where
70 M: RawMutex,
71{
54 /// Mark this Signal as signaled. 72 /// Mark this Signal as signaled.
55 pub fn signal(&self, val: T) { 73 pub fn signal(&self, val: T) {
56 critical_section::with(|_| unsafe { 74 self.state.lock(|cell| {
57 let state = &mut *self.state.get(); 75 let state = cell.replace(State::Signaled(val));
58 if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { 76 if let State::Waiting(waker) = state {
59 waker.wake(); 77 waker.wake();
60 } 78 }
61 }) 79 })
@@ -63,38 +81,46 @@ impl<T: Send> Signal<T> {
63 81
64 /// Remove the queued value in this `Signal`, if any. 82 /// Remove the queued value in this `Signal`, if any.
65 pub fn reset(&self) { 83 pub fn reset(&self) {
66 critical_section::with(|_| unsafe { 84 self.state.lock(|cell| cell.set(State::None));
67 let state = &mut *self.state.get();
68 *state = State::None
69 })
70 } 85 }
71 86
72 /// Manually poll the Signal future. 87 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
73 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { 88 self.state.lock(|cell| {
74 critical_section::with(|_| unsafe { 89 let state = cell.replace(State::None);
75 let state = &mut *self.state.get();
76 match state { 90 match state {
77 State::None => { 91 State::None => {
78 *state = State::Waiting(cx.waker().clone()); 92 cell.set(State::Waiting(cx.waker().clone()));
93 Poll::Pending
94 }
95 State::Waiting(w) if w.will_wake(cx.waker()) => {
96 cell.set(State::Waiting(w));
97 Poll::Pending
98 }
99 State::Waiting(w) => {
100 cell.set(State::Waiting(cx.waker().clone()));
101 w.wake();
79 Poll::Pending 102 Poll::Pending
80 } 103 }
81 State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, 104 State::Signaled(res) => Poll::Ready(res),
82 State::Waiting(_) => panic!("waker overflow"),
83 State::Signaled(_) => match mem::replace(state, State::None) {
84 State::Signaled(res) => Poll::Ready(res),
85 _ => unreachable!(),
86 },
87 } 105 }
88 }) 106 })
89 } 107 }
90 108
91 /// Future that completes when this Signal has been signaled. 109 /// Future that completes when this Signal has been signaled.
92 pub fn wait(&self) -> impl Future<Output = T> + '_ { 110 pub fn wait(&self) -> impl Future<Output = T> + '_ {
93 futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) 111 poll_fn(move |cx| self.poll_wait(cx))
94 } 112 }
95 113
96 /// non-blocking method to check whether this signal has been signaled. 114 /// non-blocking method to check whether this signal has been signaled.
97 pub fn signaled(&self) -> bool { 115 pub fn signaled(&self) -> bool {
98 critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) 116 self.state.lock(|cell| {
117 let state = cell.replace(State::None);
118
119 let res = matches!(state, State::Signaled(_));
120
121 cell.set(state);
122
123 res
124 })
99 } 125 }
100} 126}
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs
new file mode 100644
index 000000000..63fe04a6e
--- /dev/null
+++ b/embassy-sync/src/waitqueue/atomic_waker.rs
@@ -0,0 +1,41 @@
1use core::cell::Cell;
2use core::task::Waker;
3
4use crate::blocking_mutex::raw::CriticalSectionRawMutex;
5use crate::blocking_mutex::Mutex;
6
7/// Utility struct to register and wake a waker.
8pub struct AtomicWaker {
9 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
10}
11
12impl AtomicWaker {
13 /// Create a new `AtomicWaker`.
14 pub const fn new() -> Self {
15 Self {
16 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
17 }
18 }
19
20 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&self, w: &Waker) {
22 critical_section::with(|cs| {
23 let cell = self.waker.borrow(cs);
24 cell.set(match cell.replace(None) {
25 Some(w2) if (w2.will_wake(w)) => Some(w2),
26 _ => Some(w.clone()),
27 })
28 })
29 }
30
31 /// Wake the registered waker, if any.
32 pub fn wake(&self) {
33 critical_section::with(|cs| {
34 let cell = self.waker.borrow(cs);
35 if let Some(w) = cell.replace(None) {
36 w.wake_by_ref();
37 cell.set(Some(w));
38 }
39 })
40 }
41}
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
new file mode 100644
index 000000000..5c6a96ec8
--- /dev/null
+++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
@@ -0,0 +1,30 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4use core::task::Waker;
5
6/// Utility struct to register and wake a waker.
7pub struct AtomicWaker {
8 waker: AtomicPtr<()>,
9}
10
11impl AtomicWaker {
12 /// Create a new `AtomicWaker`.
13 pub const fn new() -> Self {
14 Self {
15 waker: AtomicPtr::new(ptr::null_mut()),
16 }
17 }
18
19 /// Register a waker. Overwrites the previous waker, if any.
20 pub fn register(&self, w: &Waker) {
21 self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release);
22 }
23
24 /// Wake the registered waker, if any.
25 pub fn wake(&self) {
26 if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) {
27 unsafe { Waker::from_turbo_ptr(ptr) }.wake();
28 }
29 }
30}
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs
index 6661a6b61..6b0b0c64e 100644
--- a/embassy-sync/src/waitqueue/mod.rs
+++ b/embassy-sync/src/waitqueue/mod.rs
@@ -1,7 +1,11 @@
1//! Async low-level wait queues 1//! Async low-level wait queues
2 2
3mod waker; 3#[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")]
4pub use waker::*; 4mod atomic_waker;
5pub use atomic_waker::*;
6
7mod waker_registration;
8pub use waker_registration::*;
5 9
6mod multi_waker; 10mod multi_waker;
7pub use multi_waker::*; 11pub use multi_waker::*;
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 325d2cb3a..824d192da 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -1,33 +1,58 @@
1use core::task::Waker; 1use core::task::Waker;
2 2
3use super::WakerRegistration; 3use heapless::Vec;
4 4
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> { 6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: [WakerRegistration; N], 7 wakers: Vec<Waker, N>,
8} 8}
9 9
10impl<const N: usize> MultiWakerRegistration<N> { 10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance 11 /// Create a new empty instance
12 pub const fn new() -> Self { 12 pub const fn new() -> Self {
13 const WAKER: WakerRegistration = WakerRegistration::new(); 13 Self { wakers: Vec::new() }
14 Self { wakers: [WAKER; N] }
15 } 14 }
16 15
17 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
18 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { 17 pub fn register<'a>(&mut self, w: &'a Waker) {
19 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { 18 // If we already have some waker that wakes the same task as `w`, do nothing.
20 waker_slot.register(w); 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
21 Ok(()) 20 for w2 in &self.wakers {
22 } else { 21 if w.will_wake(w2) {
23 Err(w) 22 return;
23 }
24 }
25
26 if self.wakers.is_full() {
27 // All waker slots were full. It's a bit inefficient, but we can wake everything.
28 // Any future that is still active will simply reregister.
29 // This won't happen a lot, so it's ok.
30 self.wake();
31 }
32
33 if self.wakers.push(w.clone()).is_err() {
34 // This can't happen unless N=0
35 // (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
36 panic!("tried to push a waker to a zero-length MultiWakerRegistration")
24 } 37 }
25 } 38 }
26 39
27 /// Wake all registered wakers. This clears the buffer 40 /// Wake all registered wakers. This clears the buffer
28 pub fn wake(&mut self) { 41 pub fn wake(&mut self) {
29 for waker_slot in self.wakers.iter_mut() { 42 // heapless::Vec has no `drain()`, do it unsafely ourselves...
30 waker_slot.wake() 43
44 // First set length to 0, without dropping the contents.
45 // This is necessary for soundness: if wake() panics and we're using panic=unwind.
46 // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
47 // (it'll leak wakers, but that's not UB)
48 let len = self.wakers.len();
49 unsafe { self.wakers.set_len(0) }
50
51 for i in 0..len {
52 // Move a waker out of the vec.
53 let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
54 // Wake it by value, which consumes (drops) it.
55 waker.wake();
31 } 56 }
32 } 57 }
33} 58}
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker_registration.rs
index 64e300eb8..9b666e7c4 100644
--- a/embassy-sync/src/waitqueue/waker.rs
+++ b/embassy-sync/src/waitqueue/waker_registration.rs
@@ -1,12 +1,8 @@
1use core::cell::Cell;
2use core::mem; 1use core::mem;
3use core::task::Waker; 2use core::task::Waker;
4 3
5use crate::blocking_mutex::raw::CriticalSectionRawMutex;
6use crate::blocking_mutex::Mutex;
7
8/// Utility struct to register and wake a waker. 4/// Utility struct to register and wake a waker.
9#[derive(Debug)] 5#[derive(Debug, Default)]
10pub struct WakerRegistration { 6pub struct WakerRegistration {
11 waker: Option<Waker>, 7 waker: Option<Waker>,
12} 8}
@@ -54,39 +50,3 @@ impl WakerRegistration {
54 self.waker.is_some() 50 self.waker.is_some()
55 } 51 }
56} 52}
57
58/// Utility struct to register and wake a waker.
59pub struct AtomicWaker {
60 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
61}
62
63impl AtomicWaker {
64 /// Create a new `AtomicWaker`.
65 pub const fn new() -> Self {
66 Self {
67 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
68 }
69 }
70
71 /// Register a waker. Overwrites the previous waker, if any.
72 pub fn register(&self, w: &Waker) {
73 critical_section::with(|cs| {
74 let cell = self.waker.borrow(cs);
75 cell.set(match cell.replace(None) {
76 Some(w2) if (w2.will_wake(w)) => Some(w2),
77 _ => Some(w.clone()),
78 })
79 })
80 }
81
82 /// Wake the registered waker, if any.
83 pub fn wake(&self) {
84 critical_section::with(|cs| {
85 let cell = self.waker.borrow(cs);
86 if let Some(w) = cell.replace(None) {
87 w.wake_by_ref();
88 cell.set(Some(w));
89 }
90 })
91 }
92}