diff options
| author | Quentin Smith <[email protected]> | 2023-07-17 21:31:43 -0400 |
|---|---|---|
| committer | Quentin Smith <[email protected]> | 2023-07-17 21:31:43 -0400 |
| commit | 6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch) | |
| tree | 748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-sync | |
| parent | d896f80405aa8963877049ed999e4aba25d6e2bb (diff) | |
| parent | 6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff) | |
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/CHANGELOG.md | 22 | ||||
| -rw-r--r-- | embassy-sync/Cargo.toml | 22 | ||||
| -rw-r--r-- | embassy-sync/README.md | 24 | ||||
| -rw-r--r-- | embassy-sync/src/channel.rs | 4 | ||||
| -rw-r--r-- | embassy-sync/src/fmt.rs | 3 | ||||
| -rw-r--r-- | embassy-sync/src/lib.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/mutex.rs | 19 | ||||
| -rw-r--r-- | embassy-sync/src/pipe.rs | 155 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 186 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 17 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 6 | ||||
| -rw-r--r-- | embassy-sync/src/signal.rs | 90 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/atomic_waker.rs | 41 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/atomic_waker_turbo.rs | 30 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/mod.rs | 8 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/multi_waker.rs | 49 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/waker_registration.rs (renamed from embassy-sync/src/waitqueue/waker.rs) | 42 |
17 files changed, 503 insertions, 217 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md new file mode 100644 index 000000000..a60f3f7c4 --- /dev/null +++ b/embassy-sync/CHANGELOG.md | |||
| @@ -0,0 +1,22 @@ | |||
| 1 | # Changelog | ||
| 2 | |||
| 3 | All notable changes to this project will be documented in this file. | ||
| 4 | |||
| 5 | The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), | ||
| 6 | and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). | ||
| 7 | |||
| 8 | ## 0.2.0 - 2023-04-13 | ||
| 9 | |||
| 10 | - pubsub: Fix messages not getting popped when the last subscriber that needed them gets dropped. | ||
| 11 | - pubsub: Move instead of clone messages when the last subscriber pops them. | ||
| 12 | - pubsub: Pop messages which count is 0 after unsubscribe. | ||
| 13 | - Update `embedded-io` from `0.3` to `0.4` (uses `async fn` in traits). | ||
| 14 | - impl `Default` for `WakerRegistration` | ||
| 15 | - impl `Default` for `Signal` | ||
| 16 | - Remove unnecessary uses of `atomic-polyfill` | ||
| 17 | - Add `#[must_use]` to all futures. | ||
| 18 | |||
| 19 | |||
| 20 | ## 0.1.0 - 2022-08-26 | ||
| 21 | |||
| 22 | - First release \ No newline at end of file | ||
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 0d14bba55..340724eab 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml | |||
| @@ -1,7 +1,17 @@ | |||
| 1 | [package] | 1 | [package] |
| 2 | name = "embassy-sync" | 2 | name = "embassy-sync" |
| 3 | version = "0.1.0" | 3 | version = "0.2.0" |
| 4 | edition = "2021" | 4 | edition = "2021" |
| 5 | description = "no-std, no-alloc synchronization primitives with async support" | ||
| 6 | repository = "https://github.com/embassy-rs/embassy" | ||
| 7 | readme = "README.md" | ||
| 8 | license = "MIT OR Apache-2.0" | ||
| 9 | categories = [ | ||
| 10 | "embedded", | ||
| 11 | "no-std", | ||
| 12 | "concurrency", | ||
| 13 | "asynchronous", | ||
| 14 | ] | ||
| 5 | 15 | ||
| 6 | [package.metadata.embassy_docs] | 16 | [package.metadata.embassy_docs] |
| 7 | src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" | 17 | src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" |
| @@ -9,19 +19,23 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/ | |||
| 9 | features = ["nightly"] | 19 | features = ["nightly"] |
| 10 | target = "thumbv7em-none-eabi" | 20 | target = "thumbv7em-none-eabi" |
| 11 | 21 | ||
| 22 | [package.metadata.docs.rs] | ||
| 23 | features = ["nightly"] | ||
| 24 | |||
| 12 | [features] | 25 | [features] |
| 13 | nightly = ["embedded-io/async"] | 26 | nightly = ["embedded-io/async"] |
| 27 | std = [] | ||
| 28 | turbowakers = [] | ||
| 14 | 29 | ||
| 15 | [dependencies] | 30 | [dependencies] |
| 16 | defmt = { version = "0.3", optional = true } | 31 | defmt = { version = "0.3", optional = true } |
| 17 | log = { version = "0.4.14", optional = true } | 32 | log = { version = "0.4.14", optional = true } |
| 18 | 33 | ||
| 19 | futures-util = { version = "0.3.17", default-features = false } | 34 | futures-util = { version = "0.3.17", default-features = false } |
| 20 | atomic-polyfill = "1.0.1" | ||
| 21 | critical-section = "1.1" | 35 | critical-section = "1.1" |
| 22 | heapless = "0.7.5" | 36 | heapless = "0.7.5" |
| 23 | cfg-if = "1.0.0" | 37 | cfg-if = "1.0.0" |
| 24 | embedded-io = "0.3.0" | 38 | embedded-io = "0.4.0" |
| 25 | 39 | ||
| 26 | [dev-dependencies] | 40 | [dev-dependencies] |
| 27 | futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } | 41 | futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } |
| @@ -31,4 +45,4 @@ futures-util = { version = "0.3.17", features = [ "channel" ] } | |||
| 31 | 45 | ||
| 32 | # Enable critical-section implementation for std, for tests | 46 | # Enable critical-section implementation for std, for tests |
| 33 | critical-section = { version = "1.1", features = ["std"] } | 47 | critical-section = { version = "1.1", features = ["std"] } |
| 34 | static_cell = "1.0" | 48 | static_cell = "1.1" |
diff --git a/embassy-sync/README.md b/embassy-sync/README.md index 106295c0d..cc65cf6ef 100644 --- a/embassy-sync/README.md +++ b/embassy-sync/README.md | |||
| @@ -1,12 +1,32 @@ | |||
| 1 | # embassy-sync | 1 | # embassy-sync |
| 2 | 2 | ||
| 3 | Synchronization primitives and data structures with an async API: | 3 | An [Embassy](https://embassy.dev) project. |
| 4 | |||
| 5 | Synchronization primitives and data structures with async support: | ||
| 4 | 6 | ||
| 5 | - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. | 7 | - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. |
| 6 | - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. | 8 | - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. |
| 7 | - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. | 9 | - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. |
| 8 | - [`Mutex`](mutex::Mutex) - A Mutex for synchronizing state between asynchronous tasks. | 10 | - [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. |
| 9 | - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. | 11 | - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. |
| 10 | - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. | 12 | - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. |
| 11 | - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. | 13 | - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. |
| 12 | - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. | 14 | - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. |
| 15 | |||
| 16 | ## Interoperability | ||
| 17 | |||
| 18 | Futures from this crate can run on any executor. | ||
| 19 | |||
| 20 | ## Minimum supported Rust version (MSRV) | ||
| 21 | |||
| 22 | Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release. | ||
| 23 | |||
| 24 | ## License | ||
| 25 | |||
| 26 | This work is licensed under either of | ||
| 27 | |||
| 28 | - Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or | ||
| 29 | <http://www.apache.org/licenses/LICENSE-2.0>) | ||
| 30 | - MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>) | ||
| 31 | |||
| 32 | at your option. | ||
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 76f42d0e7..77352874d 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -181,6 +181,7 @@ where | |||
| 181 | } | 181 | } |
| 182 | 182 | ||
| 183 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. | 183 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. |
| 184 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 184 | pub struct RecvFuture<'ch, M, T, const N: usize> | 185 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 185 | where | 186 | where |
| 186 | M: RawMutex, | 187 | M: RawMutex, |
| @@ -203,6 +204,7 @@ where | |||
| 203 | } | 204 | } |
| 204 | 205 | ||
| 205 | /// Future returned by [`DynamicReceiver::recv`]. | 206 | /// Future returned by [`DynamicReceiver::recv`]. |
| 207 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 206 | pub struct DynamicRecvFuture<'ch, T> { | 208 | pub struct DynamicRecvFuture<'ch, T> { |
| 207 | channel: &'ch dyn DynamicChannel<T>, | 209 | channel: &'ch dyn DynamicChannel<T>, |
| 208 | } | 210 | } |
| @@ -219,6 +221,7 @@ impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | |||
| 219 | } | 221 | } |
| 220 | 222 | ||
| 221 | /// Future returned by [`Channel::send`] and [`Sender::send`]. | 223 | /// Future returned by [`Channel::send`] and [`Sender::send`]. |
| 224 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 222 | pub struct SendFuture<'ch, M, T, const N: usize> | 225 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 223 | where | 226 | where |
| 224 | M: RawMutex, | 227 | M: RawMutex, |
| @@ -250,6 +253,7 @@ where | |||
| 250 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | 253 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} |
| 251 | 254 | ||
| 252 | /// Future returned by [`DynamicSender::send`]. | 255 | /// Future returned by [`DynamicSender::send`]. |
| 256 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 253 | pub struct DynamicSendFuture<'ch, T> { | 257 | pub struct DynamicSendFuture<'ch, T> { |
| 254 | channel: &'ch dyn DynamicChannel<T>, | 258 | channel: &'ch dyn DynamicChannel<T>, |
| 255 | message: Option<T>, | 259 | message: Option<T>, |
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index f8bb0a035..066970813 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs | |||
| @@ -195,9 +195,6 @@ macro_rules! unwrap { | |||
| 195 | } | 195 | } |
| 196 | } | 196 | } |
| 197 | 197 | ||
| 198 | #[cfg(feature = "defmt-timestamp-uptime")] | ||
| 199 | defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } | ||
| 200 | |||
| 201 | #[derive(Debug, Copy, Clone, Eq, PartialEq)] | 198 | #[derive(Debug, Copy, Clone, Eq, PartialEq)] |
| 202 | pub struct NoneError; | 199 | pub struct NoneError; |
| 203 | 200 | ||
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 25150e8aa..53d95d081 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] | 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] |
| 2 | #![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] | 2 | #![cfg_attr(feature = "nightly", feature(async_fn_in_trait, impl_trait_projections))] |
| 3 | #![allow(clippy::new_without_default)] | 3 | #![allow(clippy::new_without_default)] |
| 4 | #![doc = include_str!("../README.md")] | 4 | #![doc = include_str!("../README.md")] |
| 5 | #![warn(missing_docs)] | 5 | #![warn(missing_docs)] |
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 75a6e8dd3..fcf056d36 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs | |||
| @@ -2,11 +2,10 @@ | |||
| 2 | //! | 2 | //! |
| 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. | 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. |
| 4 | use core::cell::{RefCell, UnsafeCell}; | 4 | use core::cell::{RefCell, UnsafeCell}; |
| 5 | use core::future::poll_fn; | ||
| 5 | use core::ops::{Deref, DerefMut}; | 6 | use core::ops::{Deref, DerefMut}; |
| 6 | use core::task::Poll; | 7 | use core::task::Poll; |
| 7 | 8 | ||
| 8 | use futures_util::future::poll_fn; | ||
| 9 | |||
| 10 | use crate::blocking_mutex::raw::RawMutex; | 9 | use crate::blocking_mutex::raw::RawMutex; |
| 11 | use crate::blocking_mutex::Mutex as BlockingMutex; | 10 | use crate::blocking_mutex::Mutex as BlockingMutex; |
| 12 | use crate::waitqueue::WakerRegistration; | 11 | use crate::waitqueue::WakerRegistration; |
| @@ -111,6 +110,22 @@ where | |||
| 111 | 110 | ||
| 112 | Ok(MutexGuard { mutex: self }) | 111 | Ok(MutexGuard { mutex: self }) |
| 113 | } | 112 | } |
| 113 | |||
| 114 | /// Consumes this mutex, returning the underlying data. | ||
| 115 | pub fn into_inner(self) -> T | ||
| 116 | where | ||
| 117 | T: Sized, | ||
| 118 | { | ||
| 119 | self.inner.into_inner() | ||
| 120 | } | ||
| 121 | |||
| 122 | /// Returns a mutable reference to the underlying data. | ||
| 123 | /// | ||
| 124 | /// Since this call borrows the Mutex mutably, no actual locking needs to | ||
| 125 | /// take place -- the mutable borrow statically guarantees no locks exist. | ||
| 126 | pub fn get_mut(&mut self) -> &mut T { | ||
| 127 | self.inner.get_mut() | ||
| 128 | } | ||
| 114 | } | 129 | } |
| 115 | 130 | ||
| 116 | /// Async mutex guard. | 131 | /// Async mutex guard. |
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 7d64b648e..13bf4ef01 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs | |||
| @@ -32,22 +32,23 @@ impl<'p, M, const N: usize> Writer<'p, M, N> | |||
| 32 | where | 32 | where |
| 33 | M: RawMutex, | 33 | M: RawMutex, |
| 34 | { | 34 | { |
| 35 | /// Writes a value. | 35 | /// Write some bytes to the pipe. |
| 36 | /// | 36 | /// |
| 37 | /// See [`Pipe::write()`] | 37 | /// See [`Pipe::write()`] |
| 38 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | 38 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { |
| 39 | self.pipe.write(buf) | 39 | self.pipe.write(buf) |
| 40 | } | 40 | } |
| 41 | 41 | ||
| 42 | /// Attempt to immediately write a message. | 42 | /// Attempt to immediately write some bytes to the pipe. |
| 43 | /// | 43 | /// |
| 44 | /// See [`Pipe::write()`] | 44 | /// See [`Pipe::try_write()`] |
| 45 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | 45 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { |
| 46 | self.pipe.try_write(buf) | 46 | self.pipe.try_write(buf) |
| 47 | } | 47 | } |
| 48 | } | 48 | } |
| 49 | 49 | ||
| 50 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. | 50 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. |
| 51 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 51 | pub struct WriteFuture<'p, M, const N: usize> | 52 | pub struct WriteFuture<'p, M, const N: usize> |
| 52 | where | 53 | where |
| 53 | M: RawMutex, | 54 | M: RawMutex, |
| @@ -94,22 +95,23 @@ impl<'p, M, const N: usize> Reader<'p, M, N> | |||
| 94 | where | 95 | where |
| 95 | M: RawMutex, | 96 | M: RawMutex, |
| 96 | { | 97 | { |
| 97 | /// Reads a value. | 98 | /// Read some bytes from the pipe. |
| 98 | /// | 99 | /// |
| 99 | /// See [`Pipe::read()`] | 100 | /// See [`Pipe::read()`] |
| 100 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | 101 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { |
| 101 | self.pipe.read(buf) | 102 | self.pipe.read(buf) |
| 102 | } | 103 | } |
| 103 | 104 | ||
| 104 | /// Attempt to immediately read a message. | 105 | /// Attempt to immediately read some bytes from the pipe. |
| 105 | /// | 106 | /// |
| 106 | /// See [`Pipe::read()`] | 107 | /// See [`Pipe::try_read()`] |
| 107 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | 108 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { |
| 108 | self.pipe.try_read(buf) | 109 | self.pipe.try_read(buf) |
| 109 | } | 110 | } |
| 110 | } | 111 | } |
| 111 | 112 | ||
| 112 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | 113 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. |
| 114 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 113 | pub struct ReadFuture<'p, M, const N: usize> | 115 | pub struct ReadFuture<'p, M, const N: usize> |
| 114 | where | 116 | where |
| 115 | M: RawMutex, | 117 | M: RawMutex, |
| @@ -219,12 +221,11 @@ impl<const N: usize> PipeState<N> { | |||
| 219 | } | 221 | } |
| 220 | } | 222 | } |
| 221 | 223 | ||
| 222 | /// A bounded pipe for communicating between asynchronous tasks | 224 | /// A bounded byte-oriented pipe for communicating between asynchronous tasks |
| 223 | /// with backpressure. | 225 | /// with backpressure. |
| 224 | /// | 226 | /// |
| 225 | /// The pipe will buffer up to the provided number of messages. Once the | 227 | /// The pipe will buffer up to the provided number of bytes. Once the |
| 226 | /// buffer is full, attempts to `write` new messages will wait until a message is | 228 | /// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. |
| 227 | /// read from the pipe. | ||
| 228 | /// | 229 | /// |
| 229 | /// All data written will become available in the same order as it was written. | 230 | /// All data written will become available in the same order as it was written. |
| 230 | pub struct Pipe<M, const N: usize> | 231 | pub struct Pipe<M, const N: usize> |
| @@ -275,40 +276,66 @@ where | |||
| 275 | Reader { pipe: self } | 276 | Reader { pipe: self } |
| 276 | } | 277 | } |
| 277 | 278 | ||
| 278 | /// Write a value, waiting until there is capacity. | 279 | /// Write some bytes to the pipe. |
| 280 | /// | ||
| 281 | /// This method writes a nonzero amount of bytes from `buf` into the pipe, and | ||
| 282 | /// returns the amount of bytes written. | ||
| 279 | /// | 283 | /// |
| 280 | /// Writeing completes when the value has been pushed to the pipe's queue. | 284 | /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full, |
| 281 | /// This doesn't mean the value has been read yet. | 285 | /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that |
| 286 | /// returns an error instead of waiting. | ||
| 287 | /// | ||
| 288 | /// It is not guaranteed that all bytes in the buffer are written, even if there's enough | ||
| 289 | /// free space in the pipe buffer for all. In other words, it is possible for `write` to return | ||
| 290 | /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave | ||
| 291 | /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like | ||
| 292 | /// `write_all` from the `embedded-io` crate. | ||
| 282 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | 293 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { |
| 283 | WriteFuture { pipe: self, buf } | 294 | WriteFuture { pipe: self, buf } |
| 284 | } | 295 | } |
| 285 | 296 | ||
| 286 | /// Attempt to immediately write a message. | 297 | /// Write all bytes to the pipe. |
| 287 | /// | ||
| 288 | /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's | ||
| 289 | /// buffer is full, instead of waiting. | ||
| 290 | /// | 298 | /// |
| 291 | /// # Errors | 299 | /// This method writes all bytes from `buf` into the pipe |
| 300 | pub async fn write_all(&self, mut buf: &[u8]) { | ||
| 301 | while !buf.is_empty() { | ||
| 302 | let n = self.write(buf).await; | ||
| 303 | buf = &buf[n..]; | ||
| 304 | } | ||
| 305 | } | ||
| 306 | |||
| 307 | /// Attempt to immediately write some bytes to the pipe. | ||
| 292 | /// | 308 | /// |
| 293 | /// If the pipe capacity has been reached, i.e., the pipe has `n` | 309 | /// This method will either write a nonzero amount of bytes to the pipe immediately, |
| 294 | /// buffered values where `n` is the argument passed to [`Pipe`], then an | 310 | /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant |
| 295 | /// error is returned. | 311 | /// that waits instead of returning an error. |
| 296 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | 312 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { |
| 297 | self.lock(|c| c.try_write(buf)) | 313 | self.lock(|c| c.try_write(buf)) |
| 298 | } | 314 | } |
| 299 | 315 | ||
| 300 | /// Receive the next value. | 316 | /// Read some bytes from the pipe. |
| 317 | /// | ||
| 318 | /// This method reads a nonzero amount of bytes from the pipe into `buf` and | ||
| 319 | /// returns the amount of bytes read. | ||
| 320 | /// | ||
| 321 | /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty, | ||
| 322 | /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that | ||
| 323 | /// returns an error instead of waiting. | ||
| 301 | /// | 324 | /// |
| 302 | /// If there are no messages in the pipe's buffer, this method will | 325 | /// It is not guaranteed that all bytes in the buffer are read, even if there's enough |
| 303 | /// wait until a message is written. | 326 | /// space in `buf` for all. In other words, it is possible for `read` to return |
| 327 | /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes | ||
| 328 | /// in the pipe buffer. You should always `read` in a loop, or use helpers like | ||
| 329 | /// `read_exact` from the `embedded-io` crate. | ||
| 304 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | 330 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { |
| 305 | ReadFuture { pipe: self, buf } | 331 | ReadFuture { pipe: self, buf } |
| 306 | } | 332 | } |
| 307 | 333 | ||
| 308 | /// Attempt to immediately read a message. | 334 | /// Attempt to immediately read some bytes from the pipe. |
| 309 | /// | 335 | /// |
| 310 | /// This method will either read a message from the pipe immediately or return an error | 336 | /// This method will either read a nonzero amount of bytes from the pipe immediately, |
| 311 | /// if the pipe is empty. | 337 | /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant |
| 338 | /// that waits instead of returning an error. | ||
| 312 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | 339 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { |
| 313 | self.lock(|c| c.try_read(buf)) | 340 | self.lock(|c| c.try_read(buf)) |
| 314 | } | 341 | } |
| @@ -352,8 +379,6 @@ where | |||
| 352 | mod io_impls { | 379 | mod io_impls { |
| 353 | use core::convert::Infallible; | 380 | use core::convert::Infallible; |
| 354 | 381 | ||
| 355 | use futures_util::FutureExt; | ||
| 356 | |||
| 357 | use super::*; | 382 | use super::*; |
| 358 | 383 | ||
| 359 | impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { | 384 | impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { |
| @@ -361,30 +386,18 @@ mod io_impls { | |||
| 361 | } | 386 | } |
| 362 | 387 | ||
| 363 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { | 388 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { |
| 364 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 389 | async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 365 | where | 390 | Ok(Pipe::read(self, buf).await) |
| 366 | Self: 'a; | ||
| 367 | |||
| 368 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 369 | Pipe::read(self, buf).map(Ok) | ||
| 370 | } | 391 | } |
| 371 | } | 392 | } |
| 372 | 393 | ||
| 373 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { | 394 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { |
| 374 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 395 | async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 375 | where | 396 | Ok(Pipe::write(self, buf).await) |
| 376 | Self: 'a; | ||
| 377 | |||
| 378 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 379 | Pipe::write(self, buf).map(Ok) | ||
| 380 | } | 397 | } |
| 381 | 398 | ||
| 382 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | 399 | async fn flush(&mut self) -> Result<(), Self::Error> { |
| 383 | where | 400 | Ok(()) |
| 384 | Self: 'a; | ||
| 385 | |||
| 386 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 387 | futures_util::future::ready(Ok(())) | ||
| 388 | } | 401 | } |
| 389 | } | 402 | } |
| 390 | 403 | ||
| @@ -393,30 +406,18 @@ mod io_impls { | |||
| 393 | } | 406 | } |
| 394 | 407 | ||
| 395 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { | 408 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { |
| 396 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 409 | async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 397 | where | 410 | Ok(Pipe::read(self, buf).await) |
| 398 | Self: 'a; | ||
| 399 | |||
| 400 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 401 | Pipe::read(self, buf).map(Ok) | ||
| 402 | } | 411 | } |
| 403 | } | 412 | } |
| 404 | 413 | ||
| 405 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { | 414 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { |
| 406 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 415 | async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 407 | where | 416 | Ok(Pipe::write(self, buf).await) |
| 408 | Self: 'a; | ||
| 409 | |||
| 410 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 411 | Pipe::write(self, buf).map(Ok) | ||
| 412 | } | 417 | } |
| 413 | 418 | ||
| 414 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | 419 | async fn flush(&mut self) -> Result<(), Self::Error> { |
| 415 | where | 420 | Ok(()) |
| 416 | Self: 'a; | ||
| 417 | |||
| 418 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 419 | futures_util::future::ready(Ok(())) | ||
| 420 | } | 421 | } |
| 421 | } | 422 | } |
| 422 | 423 | ||
| @@ -425,12 +426,8 @@ mod io_impls { | |||
| 425 | } | 426 | } |
| 426 | 427 | ||
| 427 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { | 428 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { |
| 428 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 429 | async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 429 | where | 430 | Ok(Reader::read(self, buf).await) |
| 430 | Self: 'a; | ||
| 431 | |||
| 432 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 433 | Reader::read(self, buf).map(Ok) | ||
| 434 | } | 431 | } |
| 435 | } | 432 | } |
| 436 | 433 | ||
| @@ -439,20 +436,12 @@ mod io_impls { | |||
| 439 | } | 436 | } |
| 440 | 437 | ||
| 441 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { | 438 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { |
| 442 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 439 | async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 443 | where | 440 | Ok(Writer::write(self, buf).await) |
| 444 | Self: 'a; | ||
| 445 | |||
| 446 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 447 | Writer::write(self, buf).map(Ok) | ||
| 448 | } | 441 | } |
| 449 | 442 | ||
| 450 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | 443 | async fn flush(&mut self) -> Result<(), Self::Error> { |
| 451 | where | 444 | Ok(()) |
| 452 | Self: 'a; | ||
| 453 | |||
| 454 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 455 | futures_util::future::ready(Ok(())) | ||
| 456 | } | 445 | } |
| 457 | } | 446 | } |
| 458 | } | 447 | } |
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..6afd54af5 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -4,7 +4,7 @@ | |||
| 4 | 4 | ||
| 5 | use core::cell::RefCell; | 5 | use core::cell::RefCell; |
| 6 | use core::fmt::Debug; | 6 | use core::fmt::Debug; |
| 7 | use core::task::{Context, Poll, Waker}; | 7 | use core::task::{Context, Poll}; |
| 8 | 8 | ||
| 9 | use heapless::Deque; | 9 | use heapless::Deque; |
| 10 | 10 | ||
| @@ -179,7 +179,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 179 | // No, so we need to reregister our waker and sleep again | 179 | // No, so we need to reregister our waker and sleep again |
| 180 | None => { | 180 | None => { |
| 181 | if let Some(cx) = cx { | 181 | if let Some(cx) = cx { |
| 182 | s.register_subscriber_waker(cx.waker()); | 182 | s.subscriber_wakers.register(cx.waker()); |
| 183 | } | 183 | } |
| 184 | Poll::Pending | 184 | Poll::Pending |
| 185 | } | 185 | } |
| @@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 192 | }) | 192 | }) |
| 193 | } | 193 | } |
| 194 | 194 | ||
| 195 | fn available(&self, next_message_id: u64) -> u64 { | ||
| 196 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) | ||
| 197 | } | ||
| 198 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | 199 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 196 | self.inner.lock(|s| { | 200 | self.inner.lock(|s| { |
| 197 | let mut s = s.borrow_mut(); | 201 | let mut s = s.borrow_mut(); |
| @@ -202,7 +206,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 202 | // The queue is full, so we need to reregister our waker and go to sleep | 206 | // The queue is full, so we need to reregister our waker and go to sleep |
| 203 | Err(message) => { | 207 | Err(message) => { |
| 204 | if let Some(cx) = cx { | 208 | if let Some(cx) = cx { |
| 205 | s.register_publisher_waker(cx.waker()); | 209 | s.publisher_wakers.register(cx.waker()); |
| 206 | } | 210 | } |
| 207 | Err(message) | 211 | Err(message) |
| 208 | } | 212 | } |
| @@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 217 | }) | 221 | }) |
| 218 | } | 222 | } |
| 219 | 223 | ||
| 224 | fn space(&self) -> usize { | ||
| 225 | self.inner.lock(|s| { | ||
| 226 | let s = s.borrow(); | ||
| 227 | s.queue.capacity() - s.queue.len() | ||
| 228 | }) | ||
| 229 | } | ||
| 230 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 231 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 221 | self.inner.lock(|s| { | 232 | self.inner.lock(|s| { |
| 222 | let mut s = s.borrow_mut(); | 233 | let mut s = s.borrow_mut(); |
| @@ -311,44 +322,19 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 311 | 322 | ||
| 312 | // We're reading this item, so decrement the counter | 323 | // We're reading this item, so decrement the counter |
| 313 | queue_item.1 -= 1; | 324 | queue_item.1 -= 1; |
| 314 | let message = queue_item.0.clone(); | ||
| 315 | 325 | ||
| 316 | if current_message_index == 0 && queue_item.1 == 0 { | 326 | let message = if current_message_index == 0 && queue_item.1 == 0 { |
| 317 | self.queue.pop_front(); | 327 | let (message, _) = self.queue.pop_front().unwrap(); |
| 318 | self.publisher_wakers.wake(); | 328 | self.publisher_wakers.wake(); |
| 319 | } | 329 | // Return pop'd message without clone |
| 330 | message | ||
| 331 | } else { | ||
| 332 | queue_item.0.clone() | ||
| 333 | }; | ||
| 320 | 334 | ||
| 321 | Some(WaitResult::Message(message)) | 335 | Some(WaitResult::Message(message)) |
| 322 | } | 336 | } |
| 323 | 337 | ||
| 324 | fn register_subscriber_waker(&mut self, waker: &Waker) { | ||
| 325 | match self.subscriber_wakers.register(waker) { | ||
| 326 | Ok(()) => {} | ||
| 327 | Err(_) => { | ||
| 328 | // All waker slots were full. This can only happen when there was a subscriber that now has dropped. | ||
| 329 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 330 | // Any future that is still active will simply reregister. | ||
| 331 | // This won't happen a lot, so it's ok. | ||
| 332 | self.subscriber_wakers.wake(); | ||
| 333 | self.subscriber_wakers.register(waker).unwrap(); | ||
| 334 | } | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | fn register_publisher_waker(&mut self, waker: &Waker) { | ||
| 339 | match self.publisher_wakers.register(waker) { | ||
| 340 | Ok(()) => {} | ||
| 341 | Err(_) => { | ||
| 342 | // All waker slots were full. This can only happen when there was a publisher that now has dropped. | ||
| 343 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 344 | // Any future that is still active will simply reregister. | ||
| 345 | // This won't happen a lot, so it's ok. | ||
| 346 | self.publisher_wakers.wake(); | ||
| 347 | self.publisher_wakers.register(waker).unwrap(); | ||
| 348 | } | ||
| 349 | } | ||
| 350 | } | ||
| 351 | |||
| 352 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { | 338 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { |
| 353 | self.subscriber_count -= 1; | 339 | self.subscriber_count -= 1; |
| 354 | 340 | ||
| @@ -360,6 +346,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 360 | .iter_mut() | 346 | .iter_mut() |
| 361 | .skip(current_message_index) | 347 | .skip(current_message_index) |
| 362 | .for_each(|(_, counter)| *counter -= 1); | 348 | .for_each(|(_, counter)| *counter -= 1); |
| 349 | |||
| 350 | let mut wake_publishers = false; | ||
| 351 | while let Some((_, count)) = self.queue.front() { | ||
| 352 | if *count == 0 { | ||
| 353 | self.queue.pop_front().unwrap(); | ||
| 354 | wake_publishers = true; | ||
| 355 | } else { | ||
| 356 | break; | ||
| 357 | } | ||
| 358 | } | ||
| 359 | |||
| 360 | if wake_publishers { | ||
| 361 | self.publisher_wakers.wake(); | ||
| 362 | } | ||
| 363 | } | 363 | } |
| 364 | } | 364 | } |
| 365 | 365 | ||
| @@ -388,6 +388,10 @@ pub trait PubSubBehavior<T> { | |||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. |
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 390 | 390 | ||
| 391 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. | ||
| 392 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. | ||
| 393 | fn available(&self, next_message_id: u64) -> u64; | ||
| 394 | |||
| 391 | /// Try to publish a message to the queue. | 395 | /// Try to publish a message to the queue. |
| 392 | /// | 396 | /// |
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 397 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| @@ -396,6 +400,9 @@ pub trait PubSubBehavior<T> { | |||
| 396 | /// Publish a message immediately | 400 | /// Publish a message immediately |
| 397 | fn publish_immediate(&self, message: T); | 401 | fn publish_immediate(&self, message: T); |
| 398 | 402 | ||
| 403 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 404 | fn space(&self) -> usize; | ||
| 405 | |||
| 399 | /// Let the channel know that a subscriber has dropped | 406 | /// Let the channel know that a subscriber has dropped |
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 407 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 401 | 408 | ||
| @@ -539,4 +546,113 @@ mod tests { | |||
| 539 | 546 | ||
| 540 | drop(sub0); | 547 | drop(sub0); |
| 541 | } | 548 | } |
| 549 | |||
| 550 | #[futures_test::test] | ||
| 551 | async fn correct_available() { | ||
| 552 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 553 | |||
| 554 | let sub0 = channel.subscriber().unwrap(); | ||
| 555 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 556 | let pub0 = channel.publisher().unwrap(); | ||
| 557 | |||
| 558 | assert_eq!(sub0.available(), 0); | ||
| 559 | assert_eq!(sub1.available(), 0); | ||
| 560 | |||
| 561 | pub0.publish(42).await; | ||
| 562 | |||
| 563 | assert_eq!(sub0.available(), 1); | ||
| 564 | assert_eq!(sub1.available(), 1); | ||
| 565 | |||
| 566 | sub1.next_message().await; | ||
| 567 | |||
| 568 | assert_eq!(sub1.available(), 0); | ||
| 569 | |||
| 570 | pub0.publish(42).await; | ||
| 571 | |||
| 572 | assert_eq!(sub0.available(), 2); | ||
| 573 | assert_eq!(sub1.available(), 1); | ||
| 574 | } | ||
| 575 | |||
| 576 | #[futures_test::test] | ||
| 577 | async fn correct_space() { | ||
| 578 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 579 | |||
| 580 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 581 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 582 | let pub0 = channel.publisher().unwrap(); | ||
| 583 | |||
| 584 | assert_eq!(pub0.space(), 4); | ||
| 585 | |||
| 586 | pub0.publish(42).await; | ||
| 587 | |||
| 588 | assert_eq!(pub0.space(), 3); | ||
| 589 | |||
| 590 | pub0.publish(42).await; | ||
| 591 | |||
| 592 | assert_eq!(pub0.space(), 2); | ||
| 593 | |||
| 594 | sub0.next_message().await; | ||
| 595 | sub0.next_message().await; | ||
| 596 | |||
| 597 | assert_eq!(pub0.space(), 2); | ||
| 598 | |||
| 599 | sub1.next_message().await; | ||
| 600 | assert_eq!(pub0.space(), 3); | ||
| 601 | sub1.next_message().await; | ||
| 602 | assert_eq!(pub0.space(), 4); | ||
| 603 | } | ||
| 604 | |||
| 605 | #[futures_test::test] | ||
| 606 | async fn empty_channel_when_last_subscriber_is_dropped() { | ||
| 607 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 608 | |||
| 609 | let pub0 = channel.publisher().unwrap(); | ||
| 610 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 611 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 612 | |||
| 613 | assert_eq!(4, pub0.space()); | ||
| 614 | |||
| 615 | pub0.publish(1).await; | ||
| 616 | pub0.publish(2).await; | ||
| 617 | |||
| 618 | assert_eq!(2, channel.space()); | ||
| 619 | |||
| 620 | assert_eq!(1, sub0.try_next_message_pure().unwrap()); | ||
| 621 | assert_eq!(2, sub0.try_next_message_pure().unwrap()); | ||
| 622 | |||
| 623 | assert_eq!(2, channel.space()); | ||
| 624 | |||
| 625 | drop(sub0); | ||
| 626 | |||
| 627 | assert_eq!(2, channel.space()); | ||
| 628 | |||
| 629 | assert_eq!(1, sub1.try_next_message_pure().unwrap()); | ||
| 630 | |||
| 631 | assert_eq!(3, channel.space()); | ||
| 632 | |||
| 633 | drop(sub1); | ||
| 634 | |||
| 635 | assert_eq!(4, channel.space()); | ||
| 636 | } | ||
| 637 | |||
| 638 | struct CloneCallCounter(usize); | ||
| 639 | |||
| 640 | impl Clone for CloneCallCounter { | ||
| 641 | fn clone(&self) -> Self { | ||
| 642 | Self(self.0 + 1) | ||
| 643 | } | ||
| 644 | } | ||
| 645 | |||
| 646 | #[futures_test::test] | ||
| 647 | async fn skip_clone_for_last_message() { | ||
| 648 | let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new(); | ||
| 649 | let pub0 = channel.publisher().unwrap(); | ||
| 650 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 651 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 652 | |||
| 653 | pub0.publish(CloneCallCounter(0)).await; | ||
| 654 | |||
| 655 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); | ||
| 656 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); | ||
| 657 | } | ||
| 542 | } | 658 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..e1edc9eb9 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 43 | self.channel.publish_with_context(message, None) | 43 | self.channel.publish_with_context(message, None) |
| 44 | } | 44 | } |
| 45 | |||
| 46 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 47 | /// | ||
| 48 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 49 | /// So checking doesn't give any guarantees.* | ||
| 50 | pub fn space(&self) -> usize { | ||
| 51 | self.channel.space() | ||
| 52 | } | ||
| 45 | } | 53 | } |
| 46 | 54 | ||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 55 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { | |||
| 115 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 123 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 116 | self.channel.publish_with_context(message, None) | 124 | self.channel.publish_with_context(message, None) |
| 117 | } | 125 | } |
| 126 | |||
| 127 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 128 | /// | ||
| 129 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 130 | /// So checking doesn't give any guarantees.* | ||
| 131 | pub fn space(&self) -> usize { | ||
| 132 | self.channel.space() | ||
| 133 | } | ||
| 118 | } | 134 | } |
| 119 | 135 | ||
| 120 | /// An immediate publisher that holds a dynamic reference to the channel | 136 | /// An immediate publisher that holds a dynamic reference to the channel |
| @@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 158 | } | 174 | } |
| 159 | 175 | ||
| 160 | /// Future for the publisher wait action | 176 | /// Future for the publisher wait action |
| 177 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 178 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 162 | /// The message we need to publish | 179 | /// The message we need to publish |
| 163 | message: Option<T>, | 180 | message: Option<T>, |
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..f420a75f0 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||
| 64 | } | 64 | } |
| 65 | } | 65 | } |
| 66 | } | 66 | } |
| 67 | |||
| 68 | /// The amount of messages this subscriber hasn't received yet | ||
| 69 | pub fn available(&self) -> u64 { | ||
| 70 | self.channel.available(self.next_message_id) | ||
| 71 | } | ||
| 67 | } | 72 | } |
| 68 | 73 | ||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | 74 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
| @@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 135 | } | 140 | } |
| 136 | 141 | ||
| 137 | /// Future for the subscriber wait action | 142 | /// Future for the subscriber wait action |
| 143 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 144 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | 145 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 140 | } | 146 | } |
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index f6ebeb9b9..bea67d8be 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs | |||
| @@ -1,9 +1,11 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to a task. | 1 | //! A synchronization primitive for passing the latest value to a task. |
| 2 | use core::cell::UnsafeCell; | 2 | use core::cell::Cell; |
| 3 | use core::future::Future; | 3 | use core::future::{poll_fn, Future}; |
| 4 | use core::mem; | ||
| 5 | use core::task::{Context, Poll, Waker}; | 4 | use core::task::{Context, Poll, Waker}; |
| 6 | 5 | ||
| 6 | use crate::blocking_mutex::raw::RawMutex; | ||
| 7 | use crate::blocking_mutex::Mutex; | ||
| 8 | |||
| 7 | /// Single-slot signaling primitive. | 9 | /// Single-slot signaling primitive. |
| 8 | /// | 10 | /// |
| 9 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except | 11 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except |
| @@ -20,16 +22,20 @@ use core::task::{Context, Poll, Waker}; | |||
| 20 | /// | 22 | /// |
| 21 | /// ``` | 23 | /// ``` |
| 22 | /// use embassy_sync::signal::Signal; | 24 | /// use embassy_sync::signal::Signal; |
| 25 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 23 | /// | 26 | /// |
| 24 | /// enum SomeCommand { | 27 | /// enum SomeCommand { |
| 25 | /// On, | 28 | /// On, |
| 26 | /// Off, | 29 | /// Off, |
| 27 | /// } | 30 | /// } |
| 28 | /// | 31 | /// |
| 29 | /// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new(); | 32 | /// static SOME_SIGNAL: Signal<CriticalSectionRawMutex, SomeCommand> = Signal::new(); |
| 30 | /// ``` | 33 | /// ``` |
| 31 | pub struct Signal<T> { | 34 | pub struct Signal<M, T> |
| 32 | state: UnsafeCell<State<T>>, | 35 | where |
| 36 | M: RawMutex, | ||
| 37 | { | ||
| 38 | state: Mutex<M, Cell<State<T>>>, | ||
| 33 | } | 39 | } |
| 34 | 40 | ||
| 35 | enum State<T> { | 41 | enum State<T> { |
| @@ -38,24 +44,36 @@ enum State<T> { | |||
| 38 | Signaled(T), | 44 | Signaled(T), |
| 39 | } | 45 | } |
| 40 | 46 | ||
| 41 | unsafe impl<T: Send> Send for Signal<T> {} | 47 | impl<M, T> Signal<M, T> |
| 42 | unsafe impl<T: Send> Sync for Signal<T> {} | 48 | where |
| 43 | 49 | M: RawMutex, | |
| 44 | impl<T> Signal<T> { | 50 | { |
| 45 | /// Create a new `Signal`. | 51 | /// Create a new `Signal`. |
| 46 | pub const fn new() -> Self { | 52 | pub const fn new() -> Self { |
| 47 | Self { | 53 | Self { |
| 48 | state: UnsafeCell::new(State::None), | 54 | state: Mutex::new(Cell::new(State::None)), |
| 49 | } | 55 | } |
| 50 | } | 56 | } |
| 51 | } | 57 | } |
| 52 | 58 | ||
| 53 | impl<T: Send> Signal<T> { | 59 | impl<M, T> Default for Signal<M, T> |
| 60 | where | ||
| 61 | M: RawMutex, | ||
| 62 | { | ||
| 63 | fn default() -> Self { | ||
| 64 | Self::new() | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | impl<M, T: Send> Signal<M, T> | ||
| 69 | where | ||
| 70 | M: RawMutex, | ||
| 71 | { | ||
| 54 | /// Mark this Signal as signaled. | 72 | /// Mark this Signal as signaled. |
| 55 | pub fn signal(&self, val: T) { | 73 | pub fn signal(&self, val: T) { |
| 56 | critical_section::with(|_| unsafe { | 74 | self.state.lock(|cell| { |
| 57 | let state = &mut *self.state.get(); | 75 | let state = cell.replace(State::Signaled(val)); |
| 58 | if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { | 76 | if let State::Waiting(waker) = state { |
| 59 | waker.wake(); | 77 | waker.wake(); |
| 60 | } | 78 | } |
| 61 | }) | 79 | }) |
| @@ -63,38 +81,46 @@ impl<T: Send> Signal<T> { | |||
| 63 | 81 | ||
| 64 | /// Remove the queued value in this `Signal`, if any. | 82 | /// Remove the queued value in this `Signal`, if any. |
| 65 | pub fn reset(&self) { | 83 | pub fn reset(&self) { |
| 66 | critical_section::with(|_| unsafe { | 84 | self.state.lock(|cell| cell.set(State::None)); |
| 67 | let state = &mut *self.state.get(); | ||
| 68 | *state = State::None | ||
| 69 | }) | ||
| 70 | } | 85 | } |
| 71 | 86 | ||
| 72 | /// Manually poll the Signal future. | 87 | fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 73 | pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { | 88 | self.state.lock(|cell| { |
| 74 | critical_section::with(|_| unsafe { | 89 | let state = cell.replace(State::None); |
| 75 | let state = &mut *self.state.get(); | ||
| 76 | match state { | 90 | match state { |
| 77 | State::None => { | 91 | State::None => { |
| 78 | *state = State::Waiting(cx.waker().clone()); | 92 | cell.set(State::Waiting(cx.waker().clone())); |
| 93 | Poll::Pending | ||
| 94 | } | ||
| 95 | State::Waiting(w) if w.will_wake(cx.waker()) => { | ||
| 96 | cell.set(State::Waiting(w)); | ||
| 97 | Poll::Pending | ||
| 98 | } | ||
| 99 | State::Waiting(w) => { | ||
| 100 | cell.set(State::Waiting(cx.waker().clone())); | ||
| 101 | w.wake(); | ||
| 79 | Poll::Pending | 102 | Poll::Pending |
| 80 | } | 103 | } |
| 81 | State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, | 104 | State::Signaled(res) => Poll::Ready(res), |
| 82 | State::Waiting(_) => panic!("waker overflow"), | ||
| 83 | State::Signaled(_) => match mem::replace(state, State::None) { | ||
| 84 | State::Signaled(res) => Poll::Ready(res), | ||
| 85 | _ => unreachable!(), | ||
| 86 | }, | ||
| 87 | } | 105 | } |
| 88 | }) | 106 | }) |
| 89 | } | 107 | } |
| 90 | 108 | ||
| 91 | /// Future that completes when this Signal has been signaled. | 109 | /// Future that completes when this Signal has been signaled. |
| 92 | pub fn wait(&self) -> impl Future<Output = T> + '_ { | 110 | pub fn wait(&self) -> impl Future<Output = T> + '_ { |
| 93 | futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) | 111 | poll_fn(move |cx| self.poll_wait(cx)) |
| 94 | } | 112 | } |
| 95 | 113 | ||
| 96 | /// non-blocking method to check whether this signal has been signaled. | 114 | /// non-blocking method to check whether this signal has been signaled. |
| 97 | pub fn signaled(&self) -> bool { | 115 | pub fn signaled(&self) -> bool { |
| 98 | critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) | 116 | self.state.lock(|cell| { |
| 117 | let state = cell.replace(State::None); | ||
| 118 | |||
| 119 | let res = matches!(state, State::Signaled(_)); | ||
| 120 | |||
| 121 | cell.set(state); | ||
| 122 | |||
| 123 | res | ||
| 124 | }) | ||
| 99 | } | 125 | } |
| 100 | } | 126 | } |
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs new file mode 100644 index 000000000..63fe04a6e --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker.rs | |||
| @@ -0,0 +1,41 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::task::Waker; | ||
| 3 | |||
| 4 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 5 | use crate::blocking_mutex::Mutex; | ||
| 6 | |||
| 7 | /// Utility struct to register and wake a waker. | ||
| 8 | pub struct AtomicWaker { | ||
| 9 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | ||
| 10 | } | ||
| 11 | |||
| 12 | impl AtomicWaker { | ||
| 13 | /// Create a new `AtomicWaker`. | ||
| 14 | pub const fn new() -> Self { | ||
| 15 | Self { | ||
| 16 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | ||
| 17 | } | ||
| 18 | } | ||
| 19 | |||
| 20 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 21 | pub fn register(&self, w: &Waker) { | ||
| 22 | critical_section::with(|cs| { | ||
| 23 | let cell = self.waker.borrow(cs); | ||
| 24 | cell.set(match cell.replace(None) { | ||
| 25 | Some(w2) if (w2.will_wake(w)) => Some(w2), | ||
| 26 | _ => Some(w.clone()), | ||
| 27 | }) | ||
| 28 | }) | ||
| 29 | } | ||
| 30 | |||
| 31 | /// Wake the registered waker, if any. | ||
| 32 | pub fn wake(&self) { | ||
| 33 | critical_section::with(|cs| { | ||
| 34 | let cell = self.waker.borrow(cs); | ||
| 35 | if let Some(w) = cell.replace(None) { | ||
| 36 | w.wake_by_ref(); | ||
| 37 | cell.set(Some(w)); | ||
| 38 | } | ||
| 39 | }) | ||
| 40 | } | ||
| 41 | } | ||
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs new file mode 100644 index 000000000..5c6a96ec8 --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs | |||
| @@ -0,0 +1,30 @@ | |||
| 1 | use core::ptr; | ||
| 2 | use core::ptr::NonNull; | ||
| 3 | use core::sync::atomic::{AtomicPtr, Ordering}; | ||
| 4 | use core::task::Waker; | ||
| 5 | |||
| 6 | /// Utility struct to register and wake a waker. | ||
| 7 | pub struct AtomicWaker { | ||
| 8 | waker: AtomicPtr<()>, | ||
| 9 | } | ||
| 10 | |||
| 11 | impl AtomicWaker { | ||
| 12 | /// Create a new `AtomicWaker`. | ||
| 13 | pub const fn new() -> Self { | ||
| 14 | Self { | ||
| 15 | waker: AtomicPtr::new(ptr::null_mut()), | ||
| 16 | } | ||
| 17 | } | ||
| 18 | |||
| 19 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 20 | pub fn register(&self, w: &Waker) { | ||
| 21 | self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release); | ||
| 22 | } | ||
| 23 | |||
| 24 | /// Wake the registered waker, if any. | ||
| 25 | pub fn wake(&self) { | ||
| 26 | if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) { | ||
| 27 | unsafe { Waker::from_turbo_ptr(ptr) }.wake(); | ||
| 28 | } | ||
| 29 | } | ||
| 30 | } | ||
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs index 6661a6b61..6b0b0c64e 100644 --- a/embassy-sync/src/waitqueue/mod.rs +++ b/embassy-sync/src/waitqueue/mod.rs | |||
| @@ -1,7 +1,11 @@ | |||
| 1 | //! Async low-level wait queues | 1 | //! Async low-level wait queues |
| 2 | 2 | ||
| 3 | mod waker; | 3 | #[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")] |
| 4 | pub use waker::*; | 4 | mod atomic_waker; |
| 5 | pub use atomic_waker::*; | ||
| 6 | |||
| 7 | mod waker_registration; | ||
| 8 | pub use waker_registration::*; | ||
| 5 | 9 | ||
| 6 | mod multi_waker; | 10 | mod multi_waker; |
| 7 | pub use multi_waker::*; | 11 | pub use multi_waker::*; |
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 325d2cb3a..824d192da 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs | |||
| @@ -1,33 +1,58 @@ | |||
| 1 | use core::task::Waker; | 1 | use core::task::Waker; |
| 2 | 2 | ||
| 3 | use super::WakerRegistration; | 3 | use heapless::Vec; |
| 4 | 4 | ||
| 5 | /// Utility struct to register and wake multiple wakers. | 5 | /// Utility struct to register and wake multiple wakers. |
| 6 | pub struct MultiWakerRegistration<const N: usize> { | 6 | pub struct MultiWakerRegistration<const N: usize> { |
| 7 | wakers: [WakerRegistration; N], | 7 | wakers: Vec<Waker, N>, |
| 8 | } | 8 | } |
| 9 | 9 | ||
| 10 | impl<const N: usize> MultiWakerRegistration<N> { | 10 | impl<const N: usize> MultiWakerRegistration<N> { |
| 11 | /// Create a new empty instance | 11 | /// Create a new empty instance |
| 12 | pub const fn new() -> Self { | 12 | pub const fn new() -> Self { |
| 13 | const WAKER: WakerRegistration = WakerRegistration::new(); | 13 | Self { wakers: Vec::new() } |
| 14 | Self { wakers: [WAKER; N] } | ||
| 15 | } | 14 | } |
| 16 | 15 | ||
| 17 | /// Register a waker. If the buffer is full the function returns it in the error | 16 | /// Register a waker. If the buffer is full the function returns it in the error |
| 18 | pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { | 17 | pub fn register<'a>(&mut self, w: &'a Waker) { |
| 19 | if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { | 18 | // If we already have some waker that wakes the same task as `w`, do nothing. |
| 20 | waker_slot.register(w); | 19 | // This avoids cloning wakers, and avoids unnecessary mass-wakes. |
| 21 | Ok(()) | 20 | for w2 in &self.wakers { |
| 22 | } else { | 21 | if w.will_wake(w2) { |
| 23 | Err(w) | 22 | return; |
| 23 | } | ||
| 24 | } | ||
| 25 | |||
| 26 | if self.wakers.is_full() { | ||
| 27 | // All waker slots were full. It's a bit inefficient, but we can wake everything. | ||
| 28 | // Any future that is still active will simply reregister. | ||
| 29 | // This won't happen a lot, so it's ok. | ||
| 30 | self.wake(); | ||
| 31 | } | ||
| 32 | |||
| 33 | if self.wakers.push(w.clone()).is_err() { | ||
| 34 | // This can't happen unless N=0 | ||
| 35 | // (Either `wakers` wasn't full, or it was in which case `wake()` empied it) | ||
| 36 | panic!("tried to push a waker to a zero-length MultiWakerRegistration") | ||
| 24 | } | 37 | } |
| 25 | } | 38 | } |
| 26 | 39 | ||
| 27 | /// Wake all registered wakers. This clears the buffer | 40 | /// Wake all registered wakers. This clears the buffer |
| 28 | pub fn wake(&mut self) { | 41 | pub fn wake(&mut self) { |
| 29 | for waker_slot in self.wakers.iter_mut() { | 42 | // heapless::Vec has no `drain()`, do it unsafely ourselves... |
| 30 | waker_slot.wake() | 43 | |
| 44 | // First set length to 0, without dropping the contents. | ||
| 45 | // This is necessary for soundness: if wake() panics and we're using panic=unwind. | ||
| 46 | // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state. | ||
| 47 | // (it'll leak wakers, but that's not UB) | ||
| 48 | let len = self.wakers.len(); | ||
| 49 | unsafe { self.wakers.set_len(0) } | ||
| 50 | |||
| 51 | for i in 0..len { | ||
| 52 | // Move a waker out of the vec. | ||
| 53 | let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() }; | ||
| 54 | // Wake it by value, which consumes (drops) it. | ||
| 55 | waker.wake(); | ||
| 31 | } | 56 | } |
| 32 | } | 57 | } |
| 33 | } | 58 | } |
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker_registration.rs index 64e300eb8..9b666e7c4 100644 --- a/embassy-sync/src/waitqueue/waker.rs +++ b/embassy-sync/src/waitqueue/waker_registration.rs | |||
| @@ -1,12 +1,8 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::mem; | 1 | use core::mem; |
| 3 | use core::task::Waker; | 2 | use core::task::Waker; |
| 4 | 3 | ||
| 5 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 6 | use crate::blocking_mutex::Mutex; | ||
| 7 | |||
| 8 | /// Utility struct to register and wake a waker. | 4 | /// Utility struct to register and wake a waker. |
| 9 | #[derive(Debug)] | 5 | #[derive(Debug, Default)] |
| 10 | pub struct WakerRegistration { | 6 | pub struct WakerRegistration { |
| 11 | waker: Option<Waker>, | 7 | waker: Option<Waker>, |
| 12 | } | 8 | } |
| @@ -54,39 +50,3 @@ impl WakerRegistration { | |||
| 54 | self.waker.is_some() | 50 | self.waker.is_some() |
| 55 | } | 51 | } |
| 56 | } | 52 | } |
| 57 | |||
| 58 | /// Utility struct to register and wake a waker. | ||
| 59 | pub struct AtomicWaker { | ||
| 60 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | ||
| 61 | } | ||
| 62 | |||
| 63 | impl AtomicWaker { | ||
| 64 | /// Create a new `AtomicWaker`. | ||
| 65 | pub const fn new() -> Self { | ||
| 66 | Self { | ||
| 67 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | ||
| 68 | } | ||
| 69 | } | ||
| 70 | |||
| 71 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 72 | pub fn register(&self, w: &Waker) { | ||
| 73 | critical_section::with(|cs| { | ||
| 74 | let cell = self.waker.borrow(cs); | ||
| 75 | cell.set(match cell.replace(None) { | ||
| 76 | Some(w2) if (w2.will_wake(w)) => Some(w2), | ||
| 77 | _ => Some(w.clone()), | ||
| 78 | }) | ||
| 79 | }) | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Wake the registered waker, if any. | ||
| 83 | pub fn wake(&self) { | ||
| 84 | critical_section::with(|cs| { | ||
| 85 | let cell = self.waker.borrow(cs); | ||
| 86 | if let Some(w) = cell.replace(None) { | ||
| 87 | w.wake_by_ref(); | ||
| 88 | cell.set(Some(w)); | ||
| 89 | } | ||
| 90 | }) | ||
| 91 | } | ||
| 92 | } | ||
