diff options
| author | jrmoulton <[email protected]> | 2025-06-10 15:47:54 -0600 |
|---|---|---|
| committer | jrmoulton <[email protected]> | 2025-06-10 15:48:36 -0600 |
| commit | cfad9798ff99d4de0571a512d156b5fe1ef1d427 (patch) | |
| tree | fc3bf670f82d139de19466cddad1e909db7f3d2e /embassy-sync | |
| parent | fc342915e6155dec7bafa3e135da7f37a9a07f5c (diff) | |
| parent | 6186d111a5c150946ee5b7e9e68d987a38c1a463 (diff) | |
merge new embassy changes
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/CHANGELOG.md | 46 | ||||
| -rw-r--r-- | embassy-sync/Cargo.toml | 9 | ||||
| -rw-r--r-- | embassy-sync/README.md | 5 | ||||
| -rw-r--r-- | embassy-sync/src/blocking_mutex/mod.rs | 18 | ||||
| -rw-r--r-- | embassy-sync/src/channel.rs | 293 | ||||
| -rw-r--r-- | embassy-sync/src/lib.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/mutex.rs | 8 | ||||
| -rw-r--r-- | embassy-sync/src/once_lock.rs | 5 | ||||
| -rw-r--r-- | embassy-sync/src/pipe.rs | 273 | ||||
| -rw-r--r-- | embassy-sync/src/priority_channel.rs | 182 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 89 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 67 | ||||
| -rw-r--r-- | embassy-sync/src/rwlock.rs | 387 | ||||
| -rw-r--r-- | embassy-sync/src/signal.rs | 5 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/atomic_waker.rs | 45 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/atomic_waker_turbo.rs | 3 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/multi_waker.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/waker_registration.rs | 4 | ||||
| -rw-r--r-- | embassy-sync/src/watch.rs | 1121 | ||||
| -rw-r--r-- | embassy-sync/src/zerocopy_channel.rs | 136 |
20 files changed, 2619 insertions, 81 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md index 8f2d26fe0..89684de0c 100644 --- a/embassy-sync/CHANGELOG.md +++ b/embassy-sync/CHANGELOG.md | |||
| @@ -7,7 +7,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| 7 | 7 | ||
| 8 | ## Unreleased | 8 | ## Unreleased |
| 9 | 9 | ||
| 10 | - Add LazyLock sync primitive. | 10 | ## 0.7.0 - 2025-05-28 |
| 11 | |||
| 12 | - Add `remove_if` to `priority_channel::{Receiver, PriorityChannel}`. | ||
| 13 | - impl `Stream` for `channel::{Receiver, Channel}`. | ||
| 14 | - Fix channels to wake senders on `clear()`. | ||
| 15 | For `Channel`, `PriorityChannel`, `PubSub`, `zerocopy_channel::Channel`. | ||
| 16 | - Allow `zerocopy_channel::Channel` to auto-implement `Sync`/`Send`. | ||
| 17 | - Add `must_use` to `MutexGuard`. | ||
| 18 | - Add a `RwLock`. | ||
| 19 | - Add `lock_mut` to `blocking_mutex::Mutex`. | ||
| 20 | - Don't select a critical-section implementation when `std` feature is enabled. | ||
| 21 | - Improve waker documentation. | ||
| 22 | - Improve `Signal` and `Watch` documentation. | ||
| 23 | - Update to defmt 1.0. This remains compatible with latest defmt 0.3. | ||
| 24 | - Add `peek` method on `channel` and `priority_channel`. | ||
| 25 | - Add dynamic sender and receiver that are Send + Sync for `channel`. | ||
| 26 | |||
| 27 | ## 0.6.2 - 2025-01-15 | ||
| 28 | |||
| 29 | - Add dynamic dispatch variant of `Pipe`. | ||
| 30 | |||
| 31 | ## 0.6.1 - 2024-11-22 | ||
| 32 | |||
| 33 | - Add `LazyLock` sync primitive. | ||
| 34 | - Add `Watch` sync primitive. | ||
| 35 | - Add `clear`, `len`, `is_empty` and `is_full` functions to `zerocopy_channel`. | ||
| 36 | - Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `channel::{Sender, Receiver}`. | ||
| 37 | - Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `priority_channel::{Sender, Receiver}`. | ||
| 38 | - Add `GenericAtomicWaker` utility. | ||
| 11 | 39 | ||
| 12 | ## 0.6.0 - 2024-05-29 | 40 | ## 0.6.0 - 2024-05-29 |
| 13 | 41 | ||
| @@ -16,20 +44,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| 16 | - Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`. | 44 | - Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`. |
| 17 | - Made `PubSubBehavior` sealed | 45 | - Made `PubSubBehavior` sealed |
| 18 | - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)` | 46 | - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)` |
| 19 | - Add OnceLock sync primitive. | 47 | - Add `OnceLock` sync primitive. |
| 20 | - Add constructor for DynamicChannel | 48 | - Add constructor for `DynamicChannel` |
| 21 | - Add ready_to_receive functions to Channel and Receiver. | 49 | - Add ready_to_receive functions to `Channel` and `Receiver`. |
| 22 | 50 | ||
| 23 | ## 0.5.0 - 2023-12-04 | 51 | ## 0.5.0 - 2023-12-04 |
| 24 | 52 | ||
| 25 | - Add a PriorityChannel. | 53 | - Add a `PriorityChannel`. |
| 26 | - Remove nightly and unstable-traits features in preparation for 1.75. | 54 | - Remove `nightly` and `unstable-traits` features in preparation for 1.75. |
| 27 | - Upgrade heapless to 0.8. | 55 | - Upgrade `heapless` to 0.8. |
| 28 | - Upgrade static-cell to 2.0. | 56 | - Upgrade `static-cell` to 2.0. |
| 29 | 57 | ||
| 30 | ## 0.4.0 - 2023-10-31 | 58 | ## 0.4.0 - 2023-10-31 |
| 31 | 59 | ||
| 32 | - Re-add impl_trait_projections | 60 | - Re-add `impl_trait_projections` |
| 33 | - switch to `embedded-io 0.6` | 61 | - switch to `embedded-io 0.6` |
| 34 | 62 | ||
| 35 | ## 0.3.0 - 2023-09-14 | 63 | ## 0.3.0 - 2023-09-14 |
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 7b7d2bf8e..99962f9f6 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml | |||
| @@ -1,6 +1,6 @@ | |||
| 1 | [package] | 1 | [package] |
| 2 | name = "embassy-sync" | 2 | name = "embassy-sync" |
| 3 | version = "0.6.0" | 3 | version = "0.7.0" |
| 4 | edition = "2021" | 4 | edition = "2021" |
| 5 | description = "no-std, no-alloc synchronization primitives with async support" | 5 | description = "no-std, no-alloc synchronization primitives with async support" |
| 6 | repository = "https://github.com/embassy-rs/embassy" | 6 | repository = "https://github.com/embassy-rs/embassy" |
| @@ -20,13 +20,14 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/ | |||
| 20 | target = "thumbv7em-none-eabi" | 20 | target = "thumbv7em-none-eabi" |
| 21 | 21 | ||
| 22 | [features] | 22 | [features] |
| 23 | std = ["critical-section/std"] | 23 | std = [] |
| 24 | turbowakers = [] | 24 | turbowakers = [] |
| 25 | 25 | ||
| 26 | [dependencies] | 26 | [dependencies] |
| 27 | defmt = { version = "0.3", optional = true } | 27 | defmt = { version = "1.0.1", optional = true } |
| 28 | log = { version = "0.4.14", optional = true } | 28 | log = { version = "0.4.14", optional = true } |
| 29 | 29 | ||
| 30 | futures-sink = { version = "0.3", default-features = false, features = [] } | ||
| 30 | futures-util = { version = "0.3.17", default-features = false } | 31 | futures-util = { version = "0.3.17", default-features = false } |
| 31 | critical-section = "1.1" | 32 | critical-section = "1.1" |
| 32 | heapless = "0.8" | 33 | heapless = "0.8" |
| @@ -37,7 +38,7 @@ embedded-io-async = { version = "0.6.1" } | |||
| 37 | futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } | 38 | futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } |
| 38 | futures-test = "0.3.17" | 39 | futures-test = "0.3.17" |
| 39 | futures-timer = "3.0.2" | 40 | futures-timer = "3.0.2" |
| 40 | futures-util = { version = "0.3.17", features = [ "channel" ] } | 41 | futures-util = { version = "0.3.17", features = [ "channel", "sink" ] } |
| 41 | 42 | ||
| 42 | # Enable critical-section implementation for std, for tests | 43 | # Enable critical-section implementation for std, for tests |
| 43 | critical-section = { version = "1.1", features = ["std"] } | 44 | critical-section = { version = "1.1", features = ["std"] } |
diff --git a/embassy-sync/README.md b/embassy-sync/README.md index dec1fbc32..91c59884f 100644 --- a/embassy-sync/README.md +++ b/embassy-sync/README.md | |||
| @@ -5,13 +5,14 @@ An [Embassy](https://embassy.dev) project. | |||
| 5 | Synchronization primitives and data structures with async support: | 5 | Synchronization primitives and data structures with async support: |
| 6 | 6 | ||
| 7 | - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. | 7 | - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. |
| 8 | - [`PriorityChannel`](channel::priority::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are shifted to the front of the channel. | 8 | - [`PriorityChannel`](priority_channel::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are shifted to the front of the channel. |
| 9 | - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. | 9 | - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. |
| 10 | - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. | 10 | - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. |
| 11 | - [`Watch`](watch::Watch) - Signalling latest value to multiple consumers. | ||
| 11 | - [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. | 12 | - [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. |
| 12 | - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. | 13 | - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. |
| 13 | - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. | 14 | - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. |
| 14 | - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. | 15 | - [`AtomicWaker`](waitqueue::AtomicWaker) - Utility to register and wake a `Waker` from interrupt context. |
| 15 | - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. | 16 | - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. |
| 16 | - [`LazyLock`](lazy_lock::LazyLock) - A value which is initialized on the first access | 17 | - [`LazyLock`](lazy_lock::LazyLock) - A value which is initialized on the first access |
| 17 | 18 | ||
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs index 8a4a4c642..a41bc3569 100644 --- a/embassy-sync/src/blocking_mutex/mod.rs +++ b/embassy-sync/src/blocking_mutex/mod.rs | |||
| @@ -50,6 +50,23 @@ impl<R: RawMutex, T> Mutex<R, T> { | |||
| 50 | f(inner) | 50 | f(inner) |
| 51 | }) | 51 | }) |
| 52 | } | 52 | } |
| 53 | |||
| 54 | /// Creates a critical section and grants temporary mutable access to the protected data. | ||
| 55 | /// | ||
| 56 | /// # Safety | ||
| 57 | /// | ||
| 58 | /// This method is marked unsafe because calling this method re-entrantly, i.e. within | ||
| 59 | /// another `lock_mut` or `lock` closure, violates Rust's aliasing rules. Calling this | ||
| 60 | /// method at the same time from different tasks is safe. For a safe alternative with | ||
| 61 | /// mutable access that never causes UB, use a `RefCell` in a `Mutex`. | ||
| 62 | pub unsafe fn lock_mut<U>(&self, f: impl FnOnce(&mut T) -> U) -> U { | ||
| 63 | self.raw.lock(|| { | ||
| 64 | let ptr = self.data.get() as *mut T; | ||
| 65 | // Safety: we have exclusive access to the data, as long as this mutex is not locked re-entrantly | ||
| 66 | let inner = unsafe { &mut *ptr }; | ||
| 67 | f(inner) | ||
| 68 | }) | ||
| 69 | } | ||
| 53 | } | 70 | } |
| 54 | 71 | ||
| 55 | impl<R, T> Mutex<R, T> { | 72 | impl<R, T> Mutex<R, T> { |
| @@ -104,6 +121,7 @@ impl<T> Mutex<raw::CriticalSectionRawMutex, T> { | |||
| 104 | 121 | ||
| 105 | impl<T> Mutex<raw::NoopRawMutex, T> { | 122 | impl<T> Mutex<raw::NoopRawMutex, T> { |
| 106 | /// Borrows the data | 123 | /// Borrows the data |
| 124 | #[allow(clippy::should_implement_trait)] | ||
| 107 | pub fn borrow(&self) -> &T { | 125 | pub fn borrow(&self) -> &T { |
| 108 | let ptr = self.data.get() as *const T; | 126 | let ptr = self.data.get() as *const T; |
| 109 | unsafe { &*ptr } | 127 | unsafe { &*ptr } |
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 55ac5fb66..856551417 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -72,6 +72,48 @@ where | |||
| 72 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 72 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 73 | self.channel.poll_ready_to_send(cx) | 73 | self.channel.poll_ready_to_send(cx) |
| 74 | } | 74 | } |
| 75 | |||
| 76 | /// Returns the maximum number of elements the channel can hold. | ||
| 77 | /// | ||
| 78 | /// See [`Channel::capacity()`] | ||
| 79 | pub const fn capacity(&self) -> usize { | ||
| 80 | self.channel.capacity() | ||
| 81 | } | ||
| 82 | |||
| 83 | /// Returns the free capacity of the channel. | ||
| 84 | /// | ||
| 85 | /// See [`Channel::free_capacity()`] | ||
| 86 | pub fn free_capacity(&self) -> usize { | ||
| 87 | self.channel.free_capacity() | ||
| 88 | } | ||
| 89 | |||
| 90 | /// Clears all elements in the channel. | ||
| 91 | /// | ||
| 92 | /// See [`Channel::clear()`] | ||
| 93 | pub fn clear(&self) { | ||
| 94 | self.channel.clear(); | ||
| 95 | } | ||
| 96 | |||
| 97 | /// Returns the number of elements currently in the channel. | ||
| 98 | /// | ||
| 99 | /// See [`Channel::len()`] | ||
| 100 | pub fn len(&self) -> usize { | ||
| 101 | self.channel.len() | ||
| 102 | } | ||
| 103 | |||
| 104 | /// Returns whether the channel is empty. | ||
| 105 | /// | ||
| 106 | /// See [`Channel::is_empty()`] | ||
| 107 | pub fn is_empty(&self) -> bool { | ||
| 108 | self.channel.is_empty() | ||
| 109 | } | ||
| 110 | |||
| 111 | /// Returns whether the channel is full. | ||
| 112 | /// | ||
| 113 | /// See [`Channel::is_full()`] | ||
| 114 | pub fn is_full(&self) -> bool { | ||
| 115 | self.channel.is_full() | ||
| 116 | } | ||
| 75 | } | 117 | } |
| 76 | 118 | ||
| 77 | /// Send-only access to a [`Channel`] without knowing channel size. | 119 | /// Send-only access to a [`Channel`] without knowing channel size. |
| @@ -122,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> { | |||
| 122 | } | 164 | } |
| 123 | } | 165 | } |
| 124 | 166 | ||
| 167 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 168 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 169 | pub struct SendDynamicSender<'ch, T> { | ||
| 170 | pub(crate) channel: &'ch dyn DynamicChannel<T>, | ||
| 171 | } | ||
| 172 | |||
| 173 | impl<'ch, T> Clone for SendDynamicSender<'ch, T> { | ||
| 174 | fn clone(&self) -> Self { | ||
| 175 | *self | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | impl<'ch, T> Copy for SendDynamicSender<'ch, T> {} | ||
| 180 | unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {} | ||
| 181 | unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {} | ||
| 182 | |||
| 183 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T> | ||
| 184 | where | ||
| 185 | M: RawMutex + Sync + Send, | ||
| 186 | { | ||
| 187 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 188 | Self { channel: s.channel } | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 192 | impl<'ch, T> SendDynamicSender<'ch, T> { | ||
| 193 | /// Sends a value. | ||
| 194 | /// | ||
| 195 | /// See [`Channel::send()`] | ||
| 196 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 197 | DynamicSendFuture { | ||
| 198 | channel: self.channel, | ||
| 199 | message: Some(message), | ||
| 200 | } | ||
| 201 | } | ||
| 202 | |||
| 203 | /// Attempt to immediately send a message. | ||
| 204 | /// | ||
| 205 | /// See [`Channel::send()`] | ||
| 206 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 207 | self.channel.try_send_with_context(message, None) | ||
| 208 | } | ||
| 209 | |||
| 210 | /// Allows a poll_fn to poll until the channel is ready to send | ||
| 211 | /// | ||
| 212 | /// See [`Channel::poll_ready_to_send()`] | ||
| 213 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 214 | self.channel.poll_ready_to_send(cx) | ||
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 125 | /// Receive-only access to a [`Channel`]. | 218 | /// Receive-only access to a [`Channel`]. |
| 126 | pub struct Receiver<'ch, M, T, const N: usize> | 219 | pub struct Receiver<'ch, M, T, const N: usize> |
| 127 | where | 220 | where |
| @@ -166,6 +259,16 @@ where | |||
| 166 | self.channel.try_receive() | 259 | self.channel.try_receive() |
| 167 | } | 260 | } |
| 168 | 261 | ||
| 262 | /// Peek at the next value without removing it from the queue. | ||
| 263 | /// | ||
| 264 | /// See [`Channel::try_peek()`] | ||
| 265 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 266 | where | ||
| 267 | T: Clone, | ||
| 268 | { | ||
| 269 | self.channel.try_peek() | ||
| 270 | } | ||
| 271 | |||
| 169 | /// Allows a poll_fn to poll until the channel is ready to receive | 272 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 170 | /// | 273 | /// |
| 171 | /// See [`Channel::poll_ready_to_receive()`] | 274 | /// See [`Channel::poll_ready_to_receive()`] |
| @@ -179,6 +282,48 @@ where | |||
| 179 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 282 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 180 | self.channel.poll_receive(cx) | 283 | self.channel.poll_receive(cx) |
| 181 | } | 284 | } |
| 285 | |||
| 286 | /// Returns the maximum number of elements the channel can hold. | ||
| 287 | /// | ||
| 288 | /// See [`Channel::capacity()`] | ||
| 289 | pub const fn capacity(&self) -> usize { | ||
| 290 | self.channel.capacity() | ||
| 291 | } | ||
| 292 | |||
| 293 | /// Returns the free capacity of the channel. | ||
| 294 | /// | ||
| 295 | /// See [`Channel::free_capacity()`] | ||
| 296 | pub fn free_capacity(&self) -> usize { | ||
| 297 | self.channel.free_capacity() | ||
| 298 | } | ||
| 299 | |||
| 300 | /// Clears all elements in the channel. | ||
| 301 | /// | ||
| 302 | /// See [`Channel::clear()`] | ||
| 303 | pub fn clear(&self) { | ||
| 304 | self.channel.clear(); | ||
| 305 | } | ||
| 306 | |||
| 307 | /// Returns the number of elements currently in the channel. | ||
| 308 | /// | ||
| 309 | /// See [`Channel::len()`] | ||
| 310 | pub fn len(&self) -> usize { | ||
| 311 | self.channel.len() | ||
| 312 | } | ||
| 313 | |||
| 314 | /// Returns whether the channel is empty. | ||
| 315 | /// | ||
| 316 | /// See [`Channel::is_empty()`] | ||
| 317 | pub fn is_empty(&self) -> bool { | ||
| 318 | self.channel.is_empty() | ||
| 319 | } | ||
| 320 | |||
| 321 | /// Returns whether the channel is full. | ||
| 322 | /// | ||
| 323 | /// See [`Channel::is_full()`] | ||
| 324 | pub fn is_full(&self) -> bool { | ||
| 325 | self.channel.is_full() | ||
| 326 | } | ||
| 182 | } | 327 | } |
| 183 | 328 | ||
| 184 | /// Receive-only access to a [`Channel`] without knowing channel size. | 329 | /// Receive-only access to a [`Channel`] without knowing channel size. |
| @@ -209,6 +354,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> { | |||
| 209 | self.channel.try_receive_with_context(None) | 354 | self.channel.try_receive_with_context(None) |
| 210 | } | 355 | } |
| 211 | 356 | ||
| 357 | /// Peek at the next value without removing it from the queue. | ||
| 358 | /// | ||
| 359 | /// See [`Channel::try_peek()`] | ||
| 360 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 361 | where | ||
| 362 | T: Clone, | ||
| 363 | { | ||
| 364 | self.channel.try_peek_with_context(None) | ||
| 365 | } | ||
| 366 | |||
| 212 | /// Allows a poll_fn to poll until the channel is ready to receive | 367 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 213 | /// | 368 | /// |
| 214 | /// See [`Channel::poll_ready_to_receive()`] | 369 | /// See [`Channel::poll_ready_to_receive()`] |
| @@ -233,6 +388,72 @@ where | |||
| 233 | } | 388 | } |
| 234 | } | 389 | } |
| 235 | 390 | ||
| 391 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 392 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 393 | pub struct SendDynamicReceiver<'ch, T> { | ||
| 394 | pub(crate) channel: &'ch dyn DynamicChannel<T>, | ||
| 395 | } | ||
| 396 | |||
| 397 | impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> { | ||
| 398 | fn clone(&self) -> Self { | ||
| 399 | *self | ||
| 400 | } | ||
| 401 | } | ||
| 402 | |||
| 403 | impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {} | ||
| 404 | unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {} | ||
| 405 | unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {} | ||
| 406 | |||
| 407 | impl<'ch, T> SendDynamicReceiver<'ch, T> { | ||
| 408 | /// Receive the next value. | ||
| 409 | /// | ||
| 410 | /// See [`Channel::receive()`]. | ||
| 411 | pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { | ||
| 412 | DynamicReceiveFuture { channel: self.channel } | ||
| 413 | } | ||
| 414 | |||
| 415 | /// Attempt to immediately receive the next value. | ||
| 416 | /// | ||
| 417 | /// See [`Channel::try_receive()`] | ||
| 418 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { | ||
| 419 | self.channel.try_receive_with_context(None) | ||
| 420 | } | ||
| 421 | |||
| 422 | /// Allows a poll_fn to poll until the channel is ready to receive | ||
| 423 | /// | ||
| 424 | /// See [`Channel::poll_ready_to_receive()`] | ||
| 425 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 426 | self.channel.poll_ready_to_receive(cx) | ||
| 427 | } | ||
| 428 | |||
| 429 | /// Poll the channel for the next item | ||
| 430 | /// | ||
| 431 | /// See [`Channel::poll_receive()`] | ||
| 432 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 433 | self.channel.poll_receive(cx) | ||
| 434 | } | ||
| 435 | } | ||
| 436 | |||
| 437 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T> | ||
| 438 | where | ||
| 439 | M: RawMutex + Sync + Send, | ||
| 440 | { | ||
| 441 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 442 | Self { channel: s.channel } | ||
| 443 | } | ||
| 444 | } | ||
| 445 | |||
| 446 | impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> | ||
| 447 | where | ||
| 448 | M: RawMutex, | ||
| 449 | { | ||
| 450 | type Item = T; | ||
| 451 | |||
| 452 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 453 | self.channel.poll_receive(cx).map(Some) | ||
| 454 | } | ||
| 455 | } | ||
| 456 | |||
| 236 | /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. | 457 | /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. |
| 237 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 458 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 238 | pub struct ReceiveFuture<'ch, M, T, const N: usize> | 459 | pub struct ReceiveFuture<'ch, M, T, const N: usize> |
| @@ -368,6 +589,10 @@ pub(crate) trait DynamicChannel<T> { | |||
| 368 | 589 | ||
| 369 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; | 590 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; |
| 370 | 591 | ||
| 592 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 593 | where | ||
| 594 | T: Clone; | ||
| 595 | |||
| 371 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; | 596 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; |
| 372 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; | 597 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; |
| 373 | 598 | ||
| @@ -410,6 +635,31 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 410 | self.try_receive_with_context(None) | 635 | self.try_receive_with_context(None) |
| 411 | } | 636 | } |
| 412 | 637 | ||
| 638 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 639 | where | ||
| 640 | T: Clone, | ||
| 641 | { | ||
| 642 | self.try_peek_with_context(None) | ||
| 643 | } | ||
| 644 | |||
| 645 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 646 | where | ||
| 647 | T: Clone, | ||
| 648 | { | ||
| 649 | if self.queue.is_full() { | ||
| 650 | self.senders_waker.wake(); | ||
| 651 | } | ||
| 652 | |||
| 653 | if let Some(message) = self.queue.front() { | ||
| 654 | Ok(message.clone()) | ||
| 655 | } else { | ||
| 656 | if let Some(cx) = cx { | ||
| 657 | self.receiver_waker.register(cx.waker()); | ||
| 658 | } | ||
| 659 | Err(TryReceiveError::Empty) | ||
| 660 | } | ||
| 661 | } | ||
| 662 | |||
| 413 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 663 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 414 | if self.queue.is_full() { | 664 | if self.queue.is_full() { |
| 415 | self.senders_waker.wake(); | 665 | self.senders_waker.wake(); |
| @@ -478,6 +728,9 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 478 | } | 728 | } |
| 479 | 729 | ||
| 480 | fn clear(&mut self) { | 730 | fn clear(&mut self) { |
| 731 | if self.queue.is_full() { | ||
| 732 | self.senders_waker.wake(); | ||
| 733 | } | ||
| 481 | self.queue.clear(); | 734 | self.queue.clear(); |
| 482 | } | 735 | } |
| 483 | 736 | ||
| @@ -536,6 +789,13 @@ where | |||
| 536 | self.lock(|c| c.try_receive_with_context(cx)) | 789 | self.lock(|c| c.try_receive_with_context(cx)) |
| 537 | } | 790 | } |
| 538 | 791 | ||
| 792 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 793 | where | ||
| 794 | T: Clone, | ||
| 795 | { | ||
| 796 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 797 | } | ||
| 798 | |||
| 539 | /// Poll the channel for the next message | 799 | /// Poll the channel for the next message |
| 540 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 800 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 541 | self.lock(|c| c.poll_receive(cx)) | 801 | self.lock(|c| c.poll_receive(cx)) |
| @@ -624,6 +884,17 @@ where | |||
| 624 | self.lock(|c| c.try_receive()) | 884 | self.lock(|c| c.try_receive()) |
| 625 | } | 885 | } |
| 626 | 886 | ||
| 887 | /// Peek at the next value without removing it from the queue. | ||
| 888 | /// | ||
| 889 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 890 | /// an error if the channel is empty. | ||
| 891 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 892 | where | ||
| 893 | T: Clone, | ||
| 894 | { | ||
| 895 | self.lock(|c| c.try_peek()) | ||
| 896 | } | ||
| 897 | |||
| 627 | /// Returns the maximum number of elements the channel can hold. | 898 | /// Returns the maximum number of elements the channel can hold. |
| 628 | pub const fn capacity(&self) -> usize { | 899 | pub const fn capacity(&self) -> usize { |
| 629 | N | 900 | N |
| @@ -671,6 +942,13 @@ where | |||
| 671 | Channel::try_receive_with_context(self, cx) | 942 | Channel::try_receive_with_context(self, cx) |
| 672 | } | 943 | } |
| 673 | 944 | ||
| 945 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 946 | where | ||
| 947 | T: Clone, | ||
| 948 | { | ||
| 949 | Channel::try_peek_with_context(self, cx) | ||
| 950 | } | ||
| 951 | |||
| 674 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 952 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 675 | Channel::poll_ready_to_send(self, cx) | 953 | Channel::poll_ready_to_send(self, cx) |
| 676 | } | 954 | } |
| @@ -684,6 +962,17 @@ where | |||
| 684 | } | 962 | } |
| 685 | } | 963 | } |
| 686 | 964 | ||
| 965 | impl<M, T, const N: usize> futures_util::Stream for Channel<M, T, N> | ||
| 966 | where | ||
| 967 | M: RawMutex, | ||
| 968 | { | ||
| 969 | type Item = T; | ||
| 970 | |||
| 971 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 972 | self.poll_receive(cx).map(Some) | ||
| 973 | } | ||
| 974 | } | ||
| 975 | |||
| 687 | #[cfg(test)] | 976 | #[cfg(test)] |
| 688 | mod tests { | 977 | mod tests { |
| 689 | use core::time::Duration; | 978 | use core::time::Duration; |
| @@ -742,6 +1031,8 @@ mod tests { | |||
| 742 | fn simple_send_and_receive() { | 1031 | fn simple_send_and_receive() { |
| 743 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | 1032 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
| 744 | assert!(c.try_send(1).is_ok()); | 1033 | assert!(c.try_send(1).is_ok()); |
| 1034 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 1035 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 745 | assert_eq!(c.try_receive().unwrap(), 1); | 1036 | assert_eq!(c.try_receive().unwrap(), 1); |
| 746 | } | 1037 | } |
| 747 | 1038 | ||
| @@ -772,6 +1063,8 @@ mod tests { | |||
| 772 | let r = c.dyn_receiver(); | 1063 | let r = c.dyn_receiver(); |
| 773 | 1064 | ||
| 774 | assert!(s.try_send(1).is_ok()); | 1065 | assert!(s.try_send(1).is_ok()); |
| 1066 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 1067 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 775 | assert_eq!(r.try_receive().unwrap(), 1); | 1068 | assert_eq!(r.try_receive().unwrap(), 1); |
| 776 | } | 1069 | } |
| 777 | 1070 | ||
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 014bf1d06..5d91b4d9c 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -18,7 +18,9 @@ pub mod once_lock; | |||
| 18 | pub mod pipe; | 18 | pub mod pipe; |
| 19 | pub mod priority_channel; | 19 | pub mod priority_channel; |
| 20 | pub mod pubsub; | 20 | pub mod pubsub; |
| 21 | pub mod rwlock; | ||
| 21 | pub mod semaphore; | 22 | pub mod semaphore; |
| 22 | pub mod signal; | 23 | pub mod signal; |
| 23 | pub mod waitqueue; | 24 | pub mod waitqueue; |
| 25 | pub mod watch; | ||
| 24 | pub mod zerocopy_channel; | 26 | pub mod zerocopy_channel; |
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 8c3a3af9f..7528a9f68 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs | |||
| @@ -2,7 +2,7 @@ | |||
| 2 | //! | 2 | //! |
| 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. | 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. |
| 4 | use core::cell::{RefCell, UnsafeCell}; | 4 | use core::cell::{RefCell, UnsafeCell}; |
| 5 | use core::future::poll_fn; | 5 | use core::future::{poll_fn, Future}; |
| 6 | use core::ops::{Deref, DerefMut}; | 6 | use core::ops::{Deref, DerefMut}; |
| 7 | use core::task::Poll; | 7 | use core::task::Poll; |
| 8 | use core::{fmt, mem}; | 8 | use core::{fmt, mem}; |
| @@ -73,7 +73,7 @@ where | |||
| 73 | /// Lock the mutex. | 73 | /// Lock the mutex. |
| 74 | /// | 74 | /// |
| 75 | /// This will wait for the mutex to be unlocked if it's already locked. | 75 | /// This will wait for the mutex to be unlocked if it's already locked. |
| 76 | pub async fn lock(&self) -> MutexGuard<'_, M, T> { | 76 | pub fn lock(&self) -> impl Future<Output = MutexGuard<'_, M, T>> { |
| 77 | poll_fn(|cx| { | 77 | poll_fn(|cx| { |
| 78 | let ready = self.state.lock(|s| { | 78 | let ready = self.state.lock(|s| { |
| 79 | let mut s = s.borrow_mut(); | 79 | let mut s = s.borrow_mut(); |
| @@ -92,7 +92,6 @@ where | |||
| 92 | Poll::Pending | 92 | Poll::Pending |
| 93 | } | 93 | } |
| 94 | }) | 94 | }) |
| 95 | .await | ||
| 96 | } | 95 | } |
| 97 | 96 | ||
| 98 | /// Attempt to immediately lock the mutex. | 97 | /// Attempt to immediately lock the mutex. |
| @@ -138,7 +137,7 @@ impl<M: RawMutex, T> From<T> for Mutex<M, T> { | |||
| 138 | impl<M, T> Default for Mutex<M, T> | 137 | impl<M, T> Default for Mutex<M, T> |
| 139 | where | 138 | where |
| 140 | M: RawMutex, | 139 | M: RawMutex, |
| 141 | T: ?Sized + Default, | 140 | T: Default, |
| 142 | { | 141 | { |
| 143 | fn default() -> Self { | 142 | fn default() -> Self { |
| 144 | Self::new(Default::default()) | 143 | Self::new(Default::default()) |
| @@ -172,6 +171,7 @@ where | |||
| 172 | /// | 171 | /// |
| 173 | /// Dropping it unlocks the mutex. | 172 | /// Dropping it unlocks the mutex. |
| 174 | #[clippy::has_significant_drop] | 173 | #[clippy::has_significant_drop] |
| 174 | #[must_use = "if unused the Mutex will immediately unlock"] | ||
| 175 | pub struct MutexGuard<'a, M, T> | 175 | pub struct MutexGuard<'a, M, T> |
| 176 | where | 176 | where |
| 177 | M: RawMutex, | 177 | M: RawMutex, |
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index 55608ba32..cd05b986d 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs | |||
| @@ -1,7 +1,7 @@ | |||
| 1 | //! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. | 1 | //! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. |
| 2 | 2 | ||
| 3 | use core::cell::Cell; | 3 | use core::cell::Cell; |
| 4 | use core::future::poll_fn; | 4 | use core::future::{poll_fn, Future}; |
| 5 | use core::mem::MaybeUninit; | 5 | use core::mem::MaybeUninit; |
| 6 | use core::sync::atomic::{AtomicBool, Ordering}; | 6 | use core::sync::atomic::{AtomicBool, Ordering}; |
| 7 | use core::task::Poll; | 7 | use core::task::Poll; |
| @@ -55,7 +55,7 @@ impl<T> OnceLock<T> { | |||
| 55 | 55 | ||
| 56 | /// Get a reference to the underlying value, waiting for it to be set. | 56 | /// Get a reference to the underlying value, waiting for it to be set. |
| 57 | /// If the value is already set, this will return immediately. | 57 | /// If the value is already set, this will return immediately. |
| 58 | pub async fn get(&self) -> &T { | 58 | pub fn get(&self) -> impl Future<Output = &T> { |
| 59 | poll_fn(|cx| match self.try_get() { | 59 | poll_fn(|cx| match self.try_get() { |
| 60 | Some(data) => Poll::Ready(data), | 60 | Some(data) => Poll::Ready(data), |
| 61 | None => { | 61 | None => { |
| @@ -63,7 +63,6 @@ impl<T> OnceLock<T> { | |||
| 63 | Poll::Pending | 63 | Poll::Pending |
| 64 | } | 64 | } |
| 65 | }) | 65 | }) |
| 66 | .await | ||
| 67 | } | 66 | } |
| 68 | 67 | ||
| 69 | /// Try to get a reference to the underlying value if it exists. | 68 | /// Try to get a reference to the underlying value if it exists. |
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index cd5b8ed75..2598652d2 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs | |||
| @@ -532,6 +532,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> | |||
| 532 | } | 532 | } |
| 533 | } | 533 | } |
| 534 | 534 | ||
| 535 | // | ||
| 536 | // Type-erased variants | ||
| 537 | // | ||
| 538 | |||
| 539 | pub(crate) trait DynamicPipe { | ||
| 540 | fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>; | ||
| 541 | fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>; | ||
| 542 | |||
| 543 | fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>; | ||
| 544 | fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>; | ||
| 545 | |||
| 546 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>; | ||
| 547 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>; | ||
| 548 | |||
| 549 | fn consume(&self, amt: usize); | ||
| 550 | unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>; | ||
| 551 | } | ||
| 552 | |||
| 553 | impl<M, const N: usize> DynamicPipe for Pipe<M, N> | ||
| 554 | where | ||
| 555 | M: RawMutex, | ||
| 556 | { | ||
| 557 | fn consume(&self, amt: usize) { | ||
| 558 | Pipe::consume(self, amt) | ||
| 559 | } | ||
| 560 | |||
| 561 | unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> { | ||
| 562 | Pipe::try_fill_buf_with_context(self, cx) | ||
| 563 | } | ||
| 564 | |||
| 565 | fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> { | ||
| 566 | Pipe::write(self, buf).into() | ||
| 567 | } | ||
| 568 | |||
| 569 | fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> { | ||
| 570 | Pipe::read(self, buf).into() | ||
| 571 | } | ||
| 572 | |||
| 573 | fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 574 | Pipe::try_read(self, buf) | ||
| 575 | } | ||
| 576 | |||
| 577 | fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 578 | Pipe::try_write(self, buf) | ||
| 579 | } | ||
| 580 | |||
| 581 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 582 | Pipe::try_write_with_context(self, cx, buf) | ||
| 583 | } | ||
| 584 | |||
| 585 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 586 | Pipe::try_read_with_context(self, cx, buf) | ||
| 587 | } | ||
| 588 | } | ||
| 589 | |||
| 590 | /// Write-only access to a [`DynamicPipe`]. | ||
| 591 | pub struct DynamicWriter<'p> { | ||
| 592 | pipe: &'p dyn DynamicPipe, | ||
| 593 | } | ||
| 594 | |||
| 595 | impl<'p> Clone for DynamicWriter<'p> { | ||
| 596 | fn clone(&self) -> Self { | ||
| 597 | *self | ||
| 598 | } | ||
| 599 | } | ||
| 600 | |||
| 601 | impl<'p> Copy for DynamicWriter<'p> {} | ||
| 602 | |||
| 603 | impl<'p> DynamicWriter<'p> { | ||
| 604 | /// Write some bytes to the pipe. | ||
| 605 | /// | ||
| 606 | /// See [`Pipe::write()`] | ||
| 607 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> { | ||
| 608 | self.pipe.write(buf) | ||
| 609 | } | ||
| 610 | |||
| 611 | /// Attempt to immediately write some bytes to the pipe. | ||
| 612 | /// | ||
| 613 | /// See [`Pipe::try_write()`] | ||
| 614 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 615 | self.pipe.try_write(buf) | ||
| 616 | } | ||
| 617 | } | ||
| 618 | |||
| 619 | impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p> | ||
| 620 | where | ||
| 621 | M: RawMutex, | ||
| 622 | { | ||
| 623 | fn from(value: Writer<'p, M, N>) -> Self { | ||
| 624 | Self { pipe: value.pipe } | ||
| 625 | } | ||
| 626 | } | ||
| 627 | |||
| 628 | /// Future returned by [`DynamicWriter::write`]. | ||
| 629 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 630 | pub struct DynamicWriteFuture<'p> { | ||
| 631 | pipe: &'p dyn DynamicPipe, | ||
| 632 | buf: &'p [u8], | ||
| 633 | } | ||
| 634 | |||
| 635 | impl<'p> Future for DynamicWriteFuture<'p> { | ||
| 636 | type Output = usize; | ||
| 637 | |||
| 638 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 639 | match self.pipe.try_write_with_context(Some(cx), self.buf) { | ||
| 640 | Ok(n) => Poll::Ready(n), | ||
| 641 | Err(TryWriteError::Full) => Poll::Pending, | ||
| 642 | } | ||
| 643 | } | ||
| 644 | } | ||
| 645 | |||
| 646 | impl<'p> Unpin for DynamicWriteFuture<'p> {} | ||
| 647 | |||
| 648 | impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p> | ||
| 649 | where | ||
| 650 | M: RawMutex, | ||
| 651 | { | ||
| 652 | fn from(value: WriteFuture<'p, M, N>) -> Self { | ||
| 653 | Self { | ||
| 654 | pipe: value.pipe, | ||
| 655 | buf: value.buf, | ||
| 656 | } | ||
| 657 | } | ||
| 658 | } | ||
| 659 | |||
| 660 | /// Read-only access to a [`DynamicPipe`]. | ||
| 661 | pub struct DynamicReader<'p> { | ||
| 662 | pipe: &'p dyn DynamicPipe, | ||
| 663 | } | ||
| 664 | |||
| 665 | impl<'p> DynamicReader<'p> { | ||
| 666 | /// Read some bytes from the pipe. | ||
| 667 | /// | ||
| 668 | /// See [`Pipe::read()`] | ||
| 669 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> { | ||
| 670 | self.pipe.read(buf) | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Attempt to immediately read some bytes from the pipe. | ||
| 674 | /// | ||
| 675 | /// See [`Pipe::try_read()`] | ||
| 676 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 677 | self.pipe.try_read(buf) | ||
| 678 | } | ||
| 679 | |||
| 680 | /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty. | ||
| 681 | /// | ||
| 682 | /// If no bytes are currently available to read, this function waits until at least one byte is available. | ||
| 683 | /// | ||
| 684 | /// If the reader is at end-of-file (EOF), an empty slice is returned. | ||
| 685 | pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> { | ||
| 686 | DynamicFillBufFuture { pipe: Some(self.pipe) } | ||
| 687 | } | ||
| 688 | |||
| 689 | /// Try returning contents of the internal buffer. | ||
| 690 | /// | ||
| 691 | /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`. | ||
| 692 | /// | ||
| 693 | /// If the reader is at end-of-file (EOF), an empty slice is returned. | ||
| 694 | pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> { | ||
| 695 | unsafe { self.pipe.try_fill_buf_with_context(None) } | ||
| 696 | } | ||
| 697 | |||
| 698 | /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`. | ||
| 699 | pub fn consume(&mut self, amt: usize) { | ||
| 700 | self.pipe.consume(amt) | ||
| 701 | } | ||
| 702 | } | ||
| 703 | |||
| 704 | impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p> | ||
| 705 | where | ||
| 706 | M: RawMutex, | ||
| 707 | { | ||
| 708 | fn from(value: Reader<'p, M, N>) -> Self { | ||
| 709 | Self { pipe: value.pipe } | ||
| 710 | } | ||
| 711 | } | ||
| 712 | |||
| 713 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | ||
| 714 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 715 | pub struct DynamicReadFuture<'p> { | ||
| 716 | pipe: &'p dyn DynamicPipe, | ||
| 717 | buf: &'p mut [u8], | ||
| 718 | } | ||
| 719 | |||
| 720 | impl<'p> Future for DynamicReadFuture<'p> { | ||
| 721 | type Output = usize; | ||
| 722 | |||
| 723 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 724 | match self.pipe.try_read_with_context(Some(cx), self.buf) { | ||
| 725 | Ok(n) => Poll::Ready(n), | ||
| 726 | Err(TryReadError::Empty) => Poll::Pending, | ||
| 727 | } | ||
| 728 | } | ||
| 729 | } | ||
| 730 | |||
| 731 | impl<'p> Unpin for DynamicReadFuture<'p> {} | ||
| 732 | |||
| 733 | impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p> | ||
| 734 | where | ||
| 735 | M: RawMutex, | ||
| 736 | { | ||
| 737 | fn from(value: ReadFuture<'p, M, N>) -> Self { | ||
| 738 | Self { | ||
| 739 | pipe: value.pipe, | ||
| 740 | buf: value.buf, | ||
| 741 | } | ||
| 742 | } | ||
| 743 | } | ||
| 744 | |||
| 745 | /// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`]. | ||
| 746 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 747 | pub struct DynamicFillBufFuture<'p> { | ||
| 748 | pipe: Option<&'p dyn DynamicPipe>, | ||
| 749 | } | ||
| 750 | |||
| 751 | impl<'p> Future for DynamicFillBufFuture<'p> { | ||
| 752 | type Output = &'p [u8]; | ||
| 753 | |||
| 754 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 755 | let pipe = self.pipe.take().unwrap(); | ||
| 756 | match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } { | ||
| 757 | Ok(buf) => Poll::Ready(buf), | ||
| 758 | Err(TryReadError::Empty) => { | ||
| 759 | self.pipe = Some(pipe); | ||
| 760 | Poll::Pending | ||
| 761 | } | ||
| 762 | } | ||
| 763 | } | ||
| 764 | } | ||
| 765 | |||
| 766 | impl<'p> Unpin for DynamicFillBufFuture<'p> {} | ||
| 767 | |||
| 768 | impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p> | ||
| 769 | where | ||
| 770 | M: RawMutex, | ||
| 771 | { | ||
| 772 | fn from(value: FillBufFuture<'p, M, N>) -> Self { | ||
| 773 | Self { | ||
| 774 | pipe: value.pipe.map(|p| p as &dyn DynamicPipe), | ||
| 775 | } | ||
| 776 | } | ||
| 777 | } | ||
| 778 | |||
| 535 | #[cfg(test)] | 779 | #[cfg(test)] |
| 536 | mod tests { | 780 | mod tests { |
| 537 | use futures_executor::ThreadPool; | 781 | use futures_executor::ThreadPool; |
| @@ -619,6 +863,35 @@ mod tests { | |||
| 619 | let _ = w.clone(); | 863 | let _ = w.clone(); |
| 620 | } | 864 | } |
| 621 | 865 | ||
| 866 | #[test] | ||
| 867 | fn dynamic_dispatch_pipe() { | ||
| 868 | let mut c = Pipe::<NoopRawMutex, 3>::new(); | ||
| 869 | let (r, w) = c.split(); | ||
| 870 | let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into()); | ||
| 871 | |||
| 872 | assert!(w.try_write(&[42, 43]).is_ok()); | ||
| 873 | let buf = r.try_fill_buf().unwrap(); | ||
| 874 | assert_eq!(buf, &[42, 43]); | ||
| 875 | let buf = r.try_fill_buf().unwrap(); | ||
| 876 | assert_eq!(buf, &[42, 43]); | ||
| 877 | r.consume(1); | ||
| 878 | let buf = r.try_fill_buf().unwrap(); | ||
| 879 | assert_eq!(buf, &[43]); | ||
| 880 | r.consume(1); | ||
| 881 | assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty)); | ||
| 882 | assert_eq!(w.try_write(&[44, 45, 46]), Ok(1)); | ||
| 883 | assert_eq!(w.try_write(&[45, 46]), Ok(2)); | ||
| 884 | let buf = r.try_fill_buf().unwrap(); | ||
| 885 | assert_eq!(buf, &[44]); // only one byte due to wraparound. | ||
| 886 | r.consume(1); | ||
| 887 | let buf = r.try_fill_buf().unwrap(); | ||
| 888 | assert_eq!(buf, &[45, 46]); | ||
| 889 | assert!(w.try_write(&[47]).is_ok()); | ||
| 890 | let buf = r.try_fill_buf().unwrap(); | ||
| 891 | assert_eq!(buf, &[45, 46, 47]); | ||
| 892 | r.consume(3); | ||
| 893 | } | ||
| 894 | |||
| 622 | #[futures_test::test] | 895 | #[futures_test::test] |
| 623 | async fn receiver_receives_given_try_write_async() { | 896 | async fn receiver_receives_given_try_write_async() { |
| 624 | let executor = ThreadPool::new().unwrap(); | 897 | let executor = ThreadPool::new().unwrap(); |
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 24c6c5a7f..623c52993 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs | |||
| @@ -71,6 +71,48 @@ where | |||
| 71 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 71 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 72 | self.channel.poll_ready_to_send(cx) | 72 | self.channel.poll_ready_to_send(cx) |
| 73 | } | 73 | } |
| 74 | |||
| 75 | /// Returns the maximum number of elements the channel can hold. | ||
| 76 | /// | ||
| 77 | /// See [`PriorityChannel::capacity()`] | ||
| 78 | pub const fn capacity(&self) -> usize { | ||
| 79 | self.channel.capacity() | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Returns the free capacity of the channel. | ||
| 83 | /// | ||
| 84 | /// See [`PriorityChannel::free_capacity()`] | ||
| 85 | pub fn free_capacity(&self) -> usize { | ||
| 86 | self.channel.free_capacity() | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Clears all elements in the channel. | ||
| 90 | /// | ||
| 91 | /// See [`PriorityChannel::clear()`] | ||
| 92 | pub fn clear(&self) { | ||
| 93 | self.channel.clear(); | ||
| 94 | } | ||
| 95 | |||
| 96 | /// Returns the number of elements currently in the channel. | ||
| 97 | /// | ||
| 98 | /// See [`PriorityChannel::len()`] | ||
| 99 | pub fn len(&self) -> usize { | ||
| 100 | self.channel.len() | ||
| 101 | } | ||
| 102 | |||
| 103 | /// Returns whether the channel is empty. | ||
| 104 | /// | ||
| 105 | /// See [`PriorityChannel::is_empty()`] | ||
| 106 | pub fn is_empty(&self) -> bool { | ||
| 107 | self.channel.is_empty() | ||
| 108 | } | ||
| 109 | |||
| 110 | /// Returns whether the channel is full. | ||
| 111 | /// | ||
| 112 | /// See [`PriorityChannel::is_full()`] | ||
| 113 | pub fn is_full(&self) -> bool { | ||
| 114 | self.channel.is_full() | ||
| 115 | } | ||
| 74 | } | 116 | } |
| 75 | 117 | ||
| 76 | impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> | 118 | impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> |
| @@ -133,6 +175,16 @@ where | |||
| 133 | self.channel.try_receive() | 175 | self.channel.try_receive() |
| 134 | } | 176 | } |
| 135 | 177 | ||
| 178 | /// Peek at the next value without removing it from the queue. | ||
| 179 | /// | ||
| 180 | /// See [`PriorityChannel::try_peek()`] | ||
| 181 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 182 | where | ||
| 183 | T: Clone, | ||
| 184 | { | ||
| 185 | self.channel.try_peek_with_context(None) | ||
| 186 | } | ||
| 187 | |||
| 136 | /// Allows a poll_fn to poll until the channel is ready to receive | 188 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 137 | /// | 189 | /// |
| 138 | /// See [`PriorityChannel::poll_ready_to_receive()`] | 190 | /// See [`PriorityChannel::poll_ready_to_receive()`] |
| @@ -146,6 +198,59 @@ where | |||
| 146 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 198 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 147 | self.channel.poll_receive(cx) | 199 | self.channel.poll_receive(cx) |
| 148 | } | 200 | } |
| 201 | |||
| 202 | /// Removes the elements from the channel that satisfy the predicate. | ||
| 203 | /// | ||
| 204 | /// See [`PriorityChannel::remove_if()`] | ||
| 205 | pub fn remove_if<F>(&self, predicate: F) | ||
| 206 | where | ||
| 207 | F: Fn(&T) -> bool, | ||
| 208 | T: Clone, | ||
| 209 | { | ||
| 210 | self.channel.remove_if(predicate) | ||
| 211 | } | ||
| 212 | |||
| 213 | /// Returns the maximum number of elements the channel can hold. | ||
| 214 | /// | ||
| 215 | /// See [`PriorityChannel::capacity()`] | ||
| 216 | pub const fn capacity(&self) -> usize { | ||
| 217 | self.channel.capacity() | ||
| 218 | } | ||
| 219 | |||
| 220 | /// Returns the free capacity of the channel. | ||
| 221 | /// | ||
| 222 | /// See [`PriorityChannel::free_capacity()`] | ||
| 223 | pub fn free_capacity(&self) -> usize { | ||
| 224 | self.channel.free_capacity() | ||
| 225 | } | ||
| 226 | |||
| 227 | /// Clears all elements in the channel. | ||
| 228 | /// | ||
| 229 | /// See [`PriorityChannel::clear()`] | ||
| 230 | pub fn clear(&self) { | ||
| 231 | self.channel.clear(); | ||
| 232 | } | ||
| 233 | |||
| 234 | /// Returns the number of elements currently in the channel. | ||
| 235 | /// | ||
| 236 | /// See [`PriorityChannel::len()`] | ||
| 237 | pub fn len(&self) -> usize { | ||
| 238 | self.channel.len() | ||
| 239 | } | ||
| 240 | |||
| 241 | /// Returns whether the channel is empty. | ||
| 242 | /// | ||
| 243 | /// See [`PriorityChannel::is_empty()`] | ||
| 244 | pub fn is_empty(&self) -> bool { | ||
| 245 | self.channel.is_empty() | ||
| 246 | } | ||
| 247 | |||
| 248 | /// Returns whether the channel is full. | ||
| 249 | /// | ||
| 250 | /// See [`PriorityChannel::is_full()`] | ||
| 251 | pub fn is_full(&self) -> bool { | ||
| 252 | self.channel.is_full() | ||
| 253 | } | ||
| 149 | } | 254 | } |
| 150 | 255 | ||
| 151 | impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> | 256 | impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> |
| @@ -248,6 +353,31 @@ where | |||
| 248 | self.try_receive_with_context(None) | 353 | self.try_receive_with_context(None) |
| 249 | } | 354 | } |
| 250 | 355 | ||
| 356 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 357 | where | ||
| 358 | T: Clone, | ||
| 359 | { | ||
| 360 | self.try_peek_with_context(None) | ||
| 361 | } | ||
| 362 | |||
| 363 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 364 | where | ||
| 365 | T: Clone, | ||
| 366 | { | ||
| 367 | if self.queue.len() == self.queue.capacity() { | ||
| 368 | self.senders_waker.wake(); | ||
| 369 | } | ||
| 370 | |||
| 371 | if let Some(message) = self.queue.peek() { | ||
| 372 | Ok(message.clone()) | ||
| 373 | } else { | ||
| 374 | if let Some(cx) = cx { | ||
| 375 | self.receiver_waker.register(cx.waker()); | ||
| 376 | } | ||
| 377 | Err(TryReceiveError::Empty) | ||
| 378 | } | ||
| 379 | } | ||
| 380 | |||
| 251 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 381 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 252 | if self.queue.len() == self.queue.capacity() { | 382 | if self.queue.len() == self.queue.capacity() { |
| 253 | self.senders_waker.wake(); | 383 | self.senders_waker.wake(); |
| @@ -316,6 +446,9 @@ where | |||
| 316 | } | 446 | } |
| 317 | 447 | ||
| 318 | fn clear(&mut self) { | 448 | fn clear(&mut self) { |
| 449 | if self.queue.len() == self.queue.capacity() { | ||
| 450 | self.senders_waker.wake(); | ||
| 451 | } | ||
| 319 | self.queue.clear(); | 452 | self.queue.clear(); |
| 320 | } | 453 | } |
| 321 | 454 | ||
| @@ -380,6 +513,13 @@ where | |||
| 380 | self.lock(|c| c.try_receive_with_context(cx)) | 513 | self.lock(|c| c.try_receive_with_context(cx)) |
| 381 | } | 514 | } |
| 382 | 515 | ||
| 516 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 517 | where | ||
| 518 | T: Clone, | ||
| 519 | { | ||
| 520 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 521 | } | ||
| 522 | |||
| 383 | /// Poll the channel for the next message | 523 | /// Poll the channel for the next message |
| 384 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 524 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 385 | self.lock(|c| c.poll_receive(cx)) | 525 | self.lock(|c| c.poll_receive(cx)) |
| @@ -450,6 +590,37 @@ where | |||
| 450 | self.lock(|c| c.try_receive()) | 590 | self.lock(|c| c.try_receive()) |
| 451 | } | 591 | } |
| 452 | 592 | ||
| 593 | /// Peek at the next value without removing it from the queue. | ||
| 594 | /// | ||
| 595 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 596 | /// an error if the channel is empty. | ||
| 597 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 598 | where | ||
| 599 | T: Clone, | ||
| 600 | { | ||
| 601 | self.lock(|c| c.try_peek()) | ||
| 602 | } | ||
| 603 | |||
| 604 | /// Removes elements from the channel based on the given predicate. | ||
| 605 | pub fn remove_if<F>(&self, predicate: F) | ||
| 606 | where | ||
| 607 | F: Fn(&T) -> bool, | ||
| 608 | T: Clone, | ||
| 609 | { | ||
| 610 | self.lock(|c| { | ||
| 611 | let mut new_heap = BinaryHeap::<T, K, N>::new(); | ||
| 612 | for item in c.queue.iter() { | ||
| 613 | if !predicate(item) { | ||
| 614 | match new_heap.push(item.clone()) { | ||
| 615 | Ok(_) => (), | ||
| 616 | Err(_) => panic!("Error pushing item to heap"), | ||
| 617 | } | ||
| 618 | } | ||
| 619 | } | ||
| 620 | c.queue = new_heap; | ||
| 621 | }); | ||
| 622 | } | ||
| 623 | |||
| 453 | /// Returns the maximum number of elements the channel can hold. | 624 | /// Returns the maximum number of elements the channel can hold. |
| 454 | pub const fn capacity(&self) -> usize { | 625 | pub const fn capacity(&self) -> usize { |
| 455 | N | 626 | N |
| @@ -499,6 +670,13 @@ where | |||
| 499 | PriorityChannel::try_receive_with_context(self, cx) | 670 | PriorityChannel::try_receive_with_context(self, cx) |
| 500 | } | 671 | } |
| 501 | 672 | ||
| 673 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 674 | where | ||
| 675 | T: Clone, | ||
| 676 | { | ||
| 677 | PriorityChannel::try_peek_with_context(self, cx) | ||
| 678 | } | ||
| 679 | |||
| 502 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 680 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 503 | PriorityChannel::poll_ready_to_send(self, cx) | 681 | PriorityChannel::poll_ready_to_send(self, cx) |
| 504 | } | 682 | } |
| @@ -587,6 +765,8 @@ mod tests { | |||
| 587 | fn simple_send_and_receive() { | 765 | fn simple_send_and_receive() { |
| 588 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); | 766 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); |
| 589 | assert!(c.try_send(1).is_ok()); | 767 | assert!(c.try_send(1).is_ok()); |
| 768 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 769 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 590 | assert_eq!(c.try_receive().unwrap(), 1); | 770 | assert_eq!(c.try_receive().unwrap(), 1); |
| 591 | } | 771 | } |
| 592 | 772 | ||
| @@ -607,6 +787,8 @@ mod tests { | |||
| 607 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | 787 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
| 608 | 788 | ||
| 609 | assert!(s.try_send(1).is_ok()); | 789 | assert!(s.try_send(1).is_ok()); |
| 790 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 791 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 610 | assert_eq!(r.try_receive().unwrap(), 1); | 792 | assert_eq!(r.try_receive().unwrap(), 1); |
| 611 | } | 793 | } |
| 612 | 794 | ||
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index a97eb7d5b..606efff0a 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -27,8 +27,8 @@ pub use subscriber::{DynSubscriber, Subscriber}; | |||
| 27 | /// | 27 | /// |
| 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. | 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. |
| 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message | 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message |
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive | 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive |
| 31 | /// an error to indicate that it has lagged. | 31 | /// an error to indicate that it has lagged. |
| 32 | /// | 32 | /// |
| 33 | /// ## Example | 33 | /// ## Example |
| 34 | /// | 34 | /// |
| @@ -194,6 +194,25 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 194 | } | 194 | } |
| 195 | } | 195 | } |
| 196 | 196 | ||
| 197 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T> | ||
| 198 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 199 | { | ||
| 200 | fn publish_immediate(&self, message: T) { | ||
| 201 | self.inner.lock(|s| { | ||
| 202 | let mut s = s.borrow_mut(); | ||
| 203 | s.publish_immediate(message) | ||
| 204 | }) | ||
| 205 | } | ||
| 206 | |||
| 207 | fn capacity(&self) -> usize { | ||
| 208 | self.capacity() | ||
| 209 | } | ||
| 210 | |||
| 211 | fn is_full(&self) -> bool { | ||
| 212 | self.is_full() | ||
| 213 | } | ||
| 214 | } | ||
| 215 | |||
| 197 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> | 216 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> |
| 198 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | 217 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
| 199 | { | 218 | { |
| @@ -246,13 +265,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 246 | }) | 265 | }) |
| 247 | } | 266 | } |
| 248 | 267 | ||
| 249 | fn publish_immediate(&self, message: T) { | ||
| 250 | self.inner.lock(|s| { | ||
| 251 | let mut s = s.borrow_mut(); | ||
| 252 | s.publish_immediate(message) | ||
| 253 | }) | ||
| 254 | } | ||
| 255 | |||
| 256 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 268 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 257 | self.inner.lock(|s| { | 269 | self.inner.lock(|s| { |
| 258 | let mut s = s.borrow_mut(); | 270 | let mut s = s.borrow_mut(); |
| @@ -267,10 +279,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 267 | }) | 279 | }) |
| 268 | } | 280 | } |
| 269 | 281 | ||
| 270 | fn capacity(&self) -> usize { | ||
| 271 | self.capacity() | ||
| 272 | } | ||
| 273 | |||
| 274 | fn free_capacity(&self) -> usize { | 282 | fn free_capacity(&self) -> usize { |
| 275 | self.free_capacity() | 283 | self.free_capacity() |
| 276 | } | 284 | } |
| @@ -286,10 +294,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 286 | fn is_empty(&self) -> bool { | 294 | fn is_empty(&self) -> bool { |
| 287 | self.is_empty() | 295 | self.is_empty() |
| 288 | } | 296 | } |
| 289 | |||
| 290 | fn is_full(&self) -> bool { | ||
| 291 | self.is_full() | ||
| 292 | } | ||
| 293 | } | 297 | } |
| 294 | 298 | ||
| 295 | /// Internal state for the PubSub channel | 299 | /// Internal state for the PubSub channel |
| @@ -417,6 +421,9 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 417 | } | 421 | } |
| 418 | 422 | ||
| 419 | fn clear(&mut self) { | 423 | fn clear(&mut self) { |
| 424 | if self.is_full() { | ||
| 425 | self.publisher_wakers.wake(); | ||
| 426 | } | ||
| 420 | self.queue.clear(); | 427 | self.queue.clear(); |
| 421 | } | 428 | } |
| 422 | 429 | ||
| @@ -445,8 +452,6 @@ pub enum Error { | |||
| 445 | MaximumPublishersReached, | 452 | MaximumPublishersReached, |
| 446 | } | 453 | } |
| 447 | 454 | ||
| 448 | /// 'Middle level' behaviour of the pubsub channel. | ||
| 449 | /// This trait is used so that Sub and Pub can be generic over the channel. | ||
| 450 | trait SealedPubSubBehavior<T> { | 455 | trait SealedPubSubBehavior<T> { |
| 451 | /// Try to get a message from the queue with the given message id. | 456 | /// Try to get a message from the queue with the given message id. |
| 452 | /// | 457 | /// |
| @@ -462,12 +467,6 @@ trait SealedPubSubBehavior<T> { | |||
| 462 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 467 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| 463 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; | 468 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; |
| 464 | 469 | ||
| 465 | /// Publish a message immediately | ||
| 466 | fn publish_immediate(&self, message: T); | ||
| 467 | |||
| 468 | /// Returns the maximum number of elements the channel can hold. | ||
| 469 | fn capacity(&self) -> usize; | ||
| 470 | |||
| 471 | /// Returns the free capacity of the channel. | 470 | /// Returns the free capacity of the channel. |
| 472 | /// | 471 | /// |
| 473 | /// This is equivalent to `capacity() - len()` | 472 | /// This is equivalent to `capacity() - len()` |
| @@ -482,9 +481,6 @@ trait SealedPubSubBehavior<T> { | |||
| 482 | /// Returns whether the channel is empty. | 481 | /// Returns whether the channel is empty. |
| 483 | fn is_empty(&self) -> bool; | 482 | fn is_empty(&self) -> bool; |
| 484 | 483 | ||
| 485 | /// Returns whether the channel is full. | ||
| 486 | fn is_full(&self) -> bool; | ||
| 487 | |||
| 488 | /// Let the channel know that a subscriber has dropped | 484 | /// Let the channel know that a subscriber has dropped |
| 489 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 485 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 490 | 486 | ||
| @@ -495,9 +491,16 @@ trait SealedPubSubBehavior<T> { | |||
| 495 | /// 'Middle level' behaviour of the pubsub channel. | 491 | /// 'Middle level' behaviour of the pubsub channel. |
| 496 | /// This trait is used so that Sub and Pub can be generic over the channel. | 492 | /// This trait is used so that Sub and Pub can be generic over the channel. |
| 497 | #[allow(private_bounds)] | 493 | #[allow(private_bounds)] |
| 498 | pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {} | 494 | pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> { |
| 495 | /// Publish a message immediately | ||
| 496 | fn publish_immediate(&self, message: T); | ||
| 499 | 497 | ||
| 500 | impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {} | 498 | /// Returns the maximum number of elements the channel can hold. |
| 499 | fn capacity(&self) -> usize; | ||
| 500 | |||
| 501 | /// Returns whether the channel is full. | ||
| 502 | fn is_full(&self) -> bool; | ||
| 503 | } | ||
| 501 | 504 | ||
| 502 | /// The result of the subscriber wait procedure | 505 | /// The result of the subscriber wait procedure |
| 503 | #[derive(Debug, Clone, PartialEq, Eq)] | 506 | #[derive(Debug, Clone, PartialEq, Eq)] |
| @@ -755,4 +758,30 @@ mod tests { | |||
| 755 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); | 758 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); |
| 756 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); | 759 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); |
| 757 | } | 760 | } |
| 761 | |||
| 762 | #[futures_test::test] | ||
| 763 | async fn publisher_sink() { | ||
| 764 | use futures_util::{SinkExt, StreamExt}; | ||
| 765 | |||
| 766 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 767 | |||
| 768 | let mut sub = channel.subscriber().unwrap(); | ||
| 769 | |||
| 770 | let publ = channel.publisher().unwrap(); | ||
| 771 | let mut sink = publ.sink(); | ||
| 772 | |||
| 773 | sink.send(0).await.unwrap(); | ||
| 774 | assert_eq!(0, sub.try_next_message_pure().unwrap()); | ||
| 775 | |||
| 776 | sink.send(1).await.unwrap(); | ||
| 777 | assert_eq!(1, sub.try_next_message_pure().unwrap()); | ||
| 778 | |||
| 779 | sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok)) | ||
| 780 | .await | ||
| 781 | .unwrap(); | ||
| 782 | assert_eq!(0, sub.try_next_message_pure().unwrap()); | ||
| 783 | assert_eq!(1, sub.try_next_message_pure().unwrap()); | ||
| 784 | assert_eq!(2, sub.try_next_message_pure().unwrap()); | ||
| 785 | assert_eq!(3, sub.try_next_message_pure().unwrap()); | ||
| 786 | } | ||
| 758 | } | 787 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e66b3b1db..7a1ab66de 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 74 | pub fn is_full(&self) -> bool { | 74 | pub fn is_full(&self) -> bool { |
| 75 | self.channel.is_full() | 75 | self.channel.is_full() |
| 76 | } | 76 | } |
| 77 | |||
| 78 | /// Create a [`futures::Sink`] adapter for this publisher. | ||
| 79 | #[inline] | ||
| 80 | pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { | ||
| 81 | PubSink { publ: self, fut: None } | ||
| 82 | } | ||
| 77 | } | 83 | } |
| 78 | 84 | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 85 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 221 | } | 227 | } |
| 222 | } | 228 | } |
| 223 | 229 | ||
| 230 | #[must_use = "Sinks do nothing unless polled"] | ||
| 231 | /// [`futures_sink::Sink`] adapter for [`Pub`]. | ||
| 232 | pub struct PubSink<'a, 'p, PSB, T> | ||
| 233 | where | ||
| 234 | T: Clone, | ||
| 235 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 236 | { | ||
| 237 | publ: &'p Pub<'a, PSB, T>, | ||
| 238 | fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>, | ||
| 239 | } | ||
| 240 | |||
| 241 | impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T> | ||
| 242 | where | ||
| 243 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 244 | T: Clone, | ||
| 245 | { | ||
| 246 | /// Try to make progress on the pending future if we have one. | ||
| 247 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | ||
| 248 | let Some(mut fut) = self.fut.take() else { | ||
| 249 | return Poll::Ready(()); | ||
| 250 | }; | ||
| 251 | |||
| 252 | if Pin::new(&mut fut).poll(cx).is_pending() { | ||
| 253 | self.fut = Some(fut); | ||
| 254 | return Poll::Pending; | ||
| 255 | } | ||
| 256 | |||
| 257 | Poll::Ready(()) | ||
| 258 | } | ||
| 259 | } | ||
| 260 | |||
| 261 | impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T> | ||
| 262 | where | ||
| 263 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 264 | T: Clone, | ||
| 265 | { | ||
| 266 | type Error = core::convert::Infallible; | ||
| 267 | |||
| 268 | #[inline] | ||
| 269 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 270 | self.poll(cx).map(Ok) | ||
| 271 | } | ||
| 272 | |||
| 273 | #[inline] | ||
| 274 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | ||
| 275 | self.fut = Some(self.publ.publish(item)); | ||
| 276 | |||
| 277 | Ok(()) | ||
| 278 | } | ||
| 279 | |||
| 280 | #[inline] | ||
| 281 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 282 | self.poll(cx).map(Ok) | ||
| 283 | } | ||
| 284 | |||
| 285 | #[inline] | ||
| 286 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 287 | self.poll(cx).map(Ok) | ||
| 288 | } | ||
| 289 | } | ||
| 290 | |||
| 224 | /// Future for the publisher wait action | 291 | /// Future for the publisher wait action |
| 225 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 292 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 226 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 293 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
diff --git a/embassy-sync/src/rwlock.rs b/embassy-sync/src/rwlock.rs new file mode 100644 index 000000000..deeadd167 --- /dev/null +++ b/embassy-sync/src/rwlock.rs | |||
| @@ -0,0 +1,387 @@ | |||
| 1 | //! Async read-write lock. | ||
| 2 | //! | ||
| 3 | //! This module provides a read-write lock that can be used to synchronize data between asynchronous tasks. | ||
| 4 | use core::cell::{RefCell, UnsafeCell}; | ||
| 5 | use core::fmt; | ||
| 6 | use core::future::{poll_fn, Future}; | ||
| 7 | use core::ops::{Deref, DerefMut}; | ||
| 8 | use core::task::Poll; | ||
| 9 | |||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | use crate::blocking_mutex::Mutex as BlockingMutex; | ||
| 12 | use crate::waitqueue::WakerRegistration; | ||
| 13 | |||
| 14 | /// Error returned by [`RwLock::try_read`] and [`RwLock::try_write`] when the lock is already held. | ||
| 15 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 16 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 17 | pub struct TryLockError; | ||
| 18 | |||
| 19 | struct State { | ||
| 20 | readers: usize, | ||
| 21 | writer: bool, | ||
| 22 | waker: WakerRegistration, | ||
| 23 | } | ||
| 24 | |||
| 25 | /// Async read-write lock. | ||
| 26 | /// | ||
| 27 | /// The read-write lock is generic over the raw mutex implementation `M` and the data `T` it protects. | ||
| 28 | /// The raw read-write lock is used to guard access to the internal state. It | ||
| 29 | /// is held for very short periods only, while locking and unlocking. It is *not* held | ||
| 30 | /// for the entire time the async RwLock is locked. | ||
| 31 | /// | ||
| 32 | /// Which implementation you select depends on the context in which you're using the read-write lock. | ||
| 33 | /// | ||
| 34 | /// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts. | ||
| 35 | /// | ||
| 36 | /// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor. | ||
| 37 | /// | ||
| 38 | /// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton. | ||
| 39 | /// | ||
| 40 | |||
| 41 | pub struct RwLock<M, T> | ||
| 42 | where | ||
| 43 | M: RawMutex, | ||
| 44 | T: ?Sized, | ||
| 45 | { | ||
| 46 | state: BlockingMutex<M, RefCell<State>>, | ||
| 47 | inner: UnsafeCell<T>, | ||
| 48 | } | ||
| 49 | |||
| 50 | unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for RwLock<M, T> {} | ||
| 51 | unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for RwLock<M, T> {} | ||
| 52 | |||
| 53 | /// Async read-write lock. | ||
| 54 | impl<M, T> RwLock<M, T> | ||
| 55 | where | ||
| 56 | M: RawMutex, | ||
| 57 | { | ||
| 58 | /// Create a new read-write lock with the given value. | ||
| 59 | pub const fn new(value: T) -> Self { | ||
| 60 | Self { | ||
| 61 | inner: UnsafeCell::new(value), | ||
| 62 | state: BlockingMutex::new(RefCell::new(State { | ||
| 63 | readers: 0, | ||
| 64 | writer: false, | ||
| 65 | waker: WakerRegistration::new(), | ||
| 66 | })), | ||
| 67 | } | ||
| 68 | } | ||
| 69 | } | ||
| 70 | |||
| 71 | impl<M, T> RwLock<M, T> | ||
| 72 | where | ||
| 73 | M: RawMutex, | ||
| 74 | T: ?Sized, | ||
| 75 | { | ||
| 76 | /// Lock the read-write lock for reading. | ||
| 77 | /// | ||
| 78 | /// This will wait for the lock to be available if it's already locked for writing. | ||
| 79 | pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, M, T>> { | ||
| 80 | poll_fn(|cx| { | ||
| 81 | let ready = self.state.lock(|s| { | ||
| 82 | let mut s = s.borrow_mut(); | ||
| 83 | if s.writer { | ||
| 84 | s.waker.register(cx.waker()); | ||
| 85 | false | ||
| 86 | } else { | ||
| 87 | s.readers += 1; | ||
| 88 | true | ||
| 89 | } | ||
| 90 | }); | ||
| 91 | |||
| 92 | if ready { | ||
| 93 | Poll::Ready(RwLockReadGuard { rwlock: self }) | ||
| 94 | } else { | ||
| 95 | Poll::Pending | ||
| 96 | } | ||
| 97 | }) | ||
| 98 | } | ||
| 99 | |||
| 100 | /// Lock the read-write lock for writing. | ||
| 101 | /// | ||
| 102 | /// This will wait for the lock to be available if it's already locked for reading or writing. | ||
| 103 | pub fn write(&self) -> impl Future<Output = RwLockWriteGuard<'_, M, T>> { | ||
| 104 | poll_fn(|cx| { | ||
| 105 | let ready = self.state.lock(|s| { | ||
| 106 | let mut s = s.borrow_mut(); | ||
| 107 | if s.writer || s.readers > 0 { | ||
| 108 | s.waker.register(cx.waker()); | ||
| 109 | false | ||
| 110 | } else { | ||
| 111 | s.writer = true; | ||
| 112 | true | ||
| 113 | } | ||
| 114 | }); | ||
| 115 | |||
| 116 | if ready { | ||
| 117 | Poll::Ready(RwLockWriteGuard { rwlock: self }) | ||
| 118 | } else { | ||
| 119 | Poll::Pending | ||
| 120 | } | ||
| 121 | }) | ||
| 122 | } | ||
| 123 | |||
| 124 | /// Attempt to immediately lock the rwlock. | ||
| 125 | /// | ||
| 126 | /// If the rwlock is already locked, this will return an error instead of waiting. | ||
| 127 | pub fn try_read(&self) -> Result<RwLockReadGuard<'_, M, T>, TryLockError> { | ||
| 128 | self.state | ||
| 129 | .lock(|s| { | ||
| 130 | let mut s = s.borrow_mut(); | ||
| 131 | if s.writer { | ||
| 132 | return Err(()); | ||
| 133 | } | ||
| 134 | s.readers += 1; | ||
| 135 | Ok(()) | ||
| 136 | }) | ||
| 137 | .map_err(|_| TryLockError)?; | ||
| 138 | |||
| 139 | Ok(RwLockReadGuard { rwlock: self }) | ||
| 140 | } | ||
| 141 | |||
| 142 | /// Attempt to immediately lock the rwlock. | ||
| 143 | /// | ||
| 144 | /// If the rwlock is already locked, this will return an error instead of waiting. | ||
| 145 | pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, M, T>, TryLockError> { | ||
| 146 | self.state | ||
| 147 | .lock(|s| { | ||
| 148 | let mut s = s.borrow_mut(); | ||
| 149 | if s.writer || s.readers > 0 { | ||
| 150 | return Err(()); | ||
| 151 | } | ||
| 152 | s.writer = true; | ||
| 153 | Ok(()) | ||
| 154 | }) | ||
| 155 | .map_err(|_| TryLockError)?; | ||
| 156 | |||
| 157 | Ok(RwLockWriteGuard { rwlock: self }) | ||
| 158 | } | ||
| 159 | |||
| 160 | /// Consumes this read-write lock, returning the underlying data. | ||
| 161 | pub fn into_inner(self) -> T | ||
| 162 | where | ||
| 163 | T: Sized, | ||
| 164 | { | ||
| 165 | self.inner.into_inner() | ||
| 166 | } | ||
| 167 | |||
| 168 | /// Returns a mutable reference to the underlying data. | ||
| 169 | /// | ||
| 170 | /// Since this call borrows the RwLock mutably, no actual locking needs to | ||
| 171 | /// take place -- the mutable borrow statically guarantees no locks exist. | ||
| 172 | pub fn get_mut(&mut self) -> &mut T { | ||
| 173 | self.inner.get_mut() | ||
| 174 | } | ||
| 175 | } | ||
| 176 | |||
| 177 | impl<M: RawMutex, T> From<T> for RwLock<M, T> { | ||
| 178 | fn from(from: T) -> Self { | ||
| 179 | Self::new(from) | ||
| 180 | } | ||
| 181 | } | ||
| 182 | |||
| 183 | impl<M, T> Default for RwLock<M, T> | ||
| 184 | where | ||
| 185 | M: RawMutex, | ||
| 186 | T: Default, | ||
| 187 | { | ||
| 188 | fn default() -> Self { | ||
| 189 | Self::new(Default::default()) | ||
| 190 | } | ||
| 191 | } | ||
| 192 | |||
| 193 | impl<M, T> fmt::Debug for RwLock<M, T> | ||
| 194 | where | ||
| 195 | M: RawMutex, | ||
| 196 | T: ?Sized + fmt::Debug, | ||
| 197 | { | ||
| 198 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 199 | let mut d = f.debug_struct("RwLock"); | ||
| 200 | match self.try_read() { | ||
| 201 | Ok(guard) => d.field("inner", &&*guard), | ||
| 202 | Err(TryLockError) => d.field("inner", &"Locked"), | ||
| 203 | } | ||
| 204 | .finish_non_exhaustive() | ||
| 205 | } | ||
| 206 | } | ||
| 207 | |||
| 208 | /// Async read lock guard. | ||
| 209 | /// | ||
| 210 | /// Owning an instance of this type indicates having | ||
| 211 | /// successfully locked the read-write lock for reading, and grants access to the contents. | ||
| 212 | /// | ||
| 213 | /// Dropping it unlocks the read-write lock. | ||
| 214 | #[clippy::has_significant_drop] | ||
| 215 | #[must_use = "if unused the RwLock will immediately unlock"] | ||
| 216 | pub struct RwLockReadGuard<'a, R, T> | ||
| 217 | where | ||
| 218 | R: RawMutex, | ||
| 219 | T: ?Sized, | ||
| 220 | { | ||
| 221 | rwlock: &'a RwLock<R, T>, | ||
| 222 | } | ||
| 223 | |||
| 224 | impl<'a, M, T> Drop for RwLockReadGuard<'a, M, T> | ||
| 225 | where | ||
| 226 | M: RawMutex, | ||
| 227 | T: ?Sized, | ||
| 228 | { | ||
| 229 | fn drop(&mut self) { | ||
| 230 | self.rwlock.state.lock(|s| { | ||
| 231 | let mut s = unwrap!(s.try_borrow_mut()); | ||
| 232 | s.readers -= 1; | ||
| 233 | if s.readers == 0 { | ||
| 234 | s.waker.wake(); | ||
| 235 | } | ||
| 236 | }) | ||
| 237 | } | ||
| 238 | } | ||
| 239 | |||
| 240 | impl<'a, M, T> Deref for RwLockReadGuard<'a, M, T> | ||
| 241 | where | ||
| 242 | M: RawMutex, | ||
| 243 | T: ?Sized, | ||
| 244 | { | ||
| 245 | type Target = T; | ||
| 246 | fn deref(&self) -> &Self::Target { | ||
| 247 | // Safety: the RwLockReadGuard represents shared access to the contents | ||
| 248 | // of the read-write lock, so it's OK to get it. | ||
| 249 | unsafe { &*(self.rwlock.inner.get() as *const T) } | ||
| 250 | } | ||
| 251 | } | ||
| 252 | |||
| 253 | impl<'a, M, T> fmt::Debug for RwLockReadGuard<'a, M, T> | ||
| 254 | where | ||
| 255 | M: RawMutex, | ||
| 256 | T: ?Sized + fmt::Debug, | ||
| 257 | { | ||
| 258 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 259 | fmt::Debug::fmt(&**self, f) | ||
| 260 | } | ||
| 261 | } | ||
| 262 | |||
| 263 | impl<'a, M, T> fmt::Display for RwLockReadGuard<'a, M, T> | ||
| 264 | where | ||
| 265 | M: RawMutex, | ||
| 266 | T: ?Sized + fmt::Display, | ||
| 267 | { | ||
| 268 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 269 | fmt::Display::fmt(&**self, f) | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | /// Async write lock guard. | ||
| 274 | /// | ||
| 275 | /// Owning an instance of this type indicates having | ||
| 276 | /// successfully locked the read-write lock for writing, and grants access to the contents. | ||
| 277 | /// | ||
| 278 | /// Dropping it unlocks the read-write lock. | ||
| 279 | #[clippy::has_significant_drop] | ||
| 280 | #[must_use = "if unused the RwLock will immediately unlock"] | ||
| 281 | pub struct RwLockWriteGuard<'a, R, T> | ||
| 282 | where | ||
| 283 | R: RawMutex, | ||
| 284 | T: ?Sized, | ||
| 285 | { | ||
| 286 | rwlock: &'a RwLock<R, T>, | ||
| 287 | } | ||
| 288 | |||
| 289 | impl<'a, R, T> Drop for RwLockWriteGuard<'a, R, T> | ||
| 290 | where | ||
| 291 | R: RawMutex, | ||
| 292 | T: ?Sized, | ||
| 293 | { | ||
| 294 | fn drop(&mut self) { | ||
| 295 | self.rwlock.state.lock(|s| { | ||
| 296 | let mut s = unwrap!(s.try_borrow_mut()); | ||
| 297 | s.writer = false; | ||
| 298 | s.waker.wake(); | ||
| 299 | }) | ||
| 300 | } | ||
| 301 | } | ||
| 302 | |||
| 303 | impl<'a, R, T> Deref for RwLockWriteGuard<'a, R, T> | ||
| 304 | where | ||
| 305 | R: RawMutex, | ||
| 306 | T: ?Sized, | ||
| 307 | { | ||
| 308 | type Target = T; | ||
| 309 | fn deref(&self) -> &Self::Target { | ||
| 310 | // Safety: the RwLockWriteGuard represents exclusive access to the contents | ||
| 311 | // of the read-write lock, so it's OK to get it. | ||
| 312 | unsafe { &*(self.rwlock.inner.get() as *mut T) } | ||
| 313 | } | ||
| 314 | } | ||
| 315 | |||
| 316 | impl<'a, R, T> DerefMut for RwLockWriteGuard<'a, R, T> | ||
| 317 | where | ||
| 318 | R: RawMutex, | ||
| 319 | T: ?Sized, | ||
| 320 | { | ||
| 321 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 322 | // Safety: the RwLockWriteGuard represents exclusive access to the contents | ||
| 323 | // of the read-write lock, so it's OK to get it. | ||
| 324 | unsafe { &mut *(self.rwlock.inner.get()) } | ||
| 325 | } | ||
| 326 | } | ||
| 327 | |||
| 328 | impl<'a, R, T> fmt::Debug for RwLockWriteGuard<'a, R, T> | ||
| 329 | where | ||
| 330 | R: RawMutex, | ||
| 331 | T: ?Sized + fmt::Debug, | ||
| 332 | { | ||
| 333 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 334 | fmt::Debug::fmt(&**self, f) | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | impl<'a, R, T> fmt::Display for RwLockWriteGuard<'a, R, T> | ||
| 339 | where | ||
| 340 | R: RawMutex, | ||
| 341 | T: ?Sized + fmt::Display, | ||
| 342 | { | ||
| 343 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 344 | fmt::Display::fmt(&**self, f) | ||
| 345 | } | ||
| 346 | } | ||
| 347 | |||
| 348 | #[cfg(test)] | ||
| 349 | mod tests { | ||
| 350 | use crate::blocking_mutex::raw::NoopRawMutex; | ||
| 351 | use crate::rwlock::RwLock; | ||
| 352 | |||
| 353 | #[futures_test::test] | ||
| 354 | async fn read_guard_releases_lock_when_dropped() { | ||
| 355 | let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]); | ||
| 356 | |||
| 357 | { | ||
| 358 | let guard = rwlock.read().await; | ||
| 359 | assert_eq!(*guard, [0, 1]); | ||
| 360 | } | ||
| 361 | |||
| 362 | { | ||
| 363 | let guard = rwlock.read().await; | ||
| 364 | assert_eq!(*guard, [0, 1]); | ||
| 365 | } | ||
| 366 | |||
| 367 | assert_eq!(*rwlock.read().await, [0, 1]); | ||
| 368 | } | ||
| 369 | |||
| 370 | #[futures_test::test] | ||
| 371 | async fn write_guard_releases_lock_when_dropped() { | ||
| 372 | let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]); | ||
| 373 | |||
| 374 | { | ||
| 375 | let mut guard = rwlock.write().await; | ||
| 376 | assert_eq!(*guard, [0, 1]); | ||
| 377 | guard[1] = 2; | ||
| 378 | } | ||
| 379 | |||
| 380 | { | ||
| 381 | let guard = rwlock.read().await; | ||
| 382 | assert_eq!(*guard, [0, 2]); | ||
| 383 | } | ||
| 384 | |||
| 385 | assert_eq!(*rwlock.read().await, [0, 2]); | ||
| 386 | } | ||
| 387 | } | ||
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index a0f4b5a74..e7095401e 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs | |||
| @@ -6,7 +6,7 @@ use core::task::{Context, Poll, Waker}; | |||
| 6 | use crate::blocking_mutex::raw::RawMutex; | 6 | use crate::blocking_mutex::raw::RawMutex; |
| 7 | use crate::blocking_mutex::Mutex; | 7 | use crate::blocking_mutex::Mutex; |
| 8 | 8 | ||
| 9 | /// Single-slot signaling primitive. | 9 | /// Single-slot signaling primitive for a _single_ consumer. |
| 10 | /// | 10 | /// |
| 11 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except | 11 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except |
| 12 | /// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead | 12 | /// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead |
| @@ -17,6 +17,7 @@ use crate::blocking_mutex::Mutex; | |||
| 17 | /// updates. | 17 | /// updates. |
| 18 | /// | 18 | /// |
| 19 | /// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead. | 19 | /// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead. |
| 20 | /// For multiple consumers, use [`Watch`](crate::watch::Watch) instead. | ||
| 20 | /// | 21 | /// |
| 21 | /// Signals are generally declared as `static`s and then borrowed as required. | 22 | /// Signals are generally declared as `static`s and then borrowed as required. |
| 22 | /// | 23 | /// |
| @@ -106,7 +107,7 @@ where | |||
| 106 | }) | 107 | }) |
| 107 | } | 108 | } |
| 108 | 109 | ||
| 109 | /// Future that completes when this Signal has been signaled. | 110 | /// Future that completes when this Signal has been signaled, taking the value out of the signal. |
| 110 | pub fn wait(&self) -> impl Future<Output = T> + '_ { | 111 | pub fn wait(&self) -> impl Future<Output = T> + '_ { |
| 111 | poll_fn(move |cx| self.poll_wait(cx)) | 112 | poll_fn(move |cx| self.poll_wait(cx)) |
| 112 | } | 113 | } |
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs index 63fe04a6e..5a9910e7f 100644 --- a/embassy-sync/src/waitqueue/atomic_waker.rs +++ b/embassy-sync/src/waitqueue/atomic_waker.rs | |||
| @@ -1,26 +1,28 @@ | |||
| 1 | use core::cell::Cell; | 1 | use core::cell::Cell; |
| 2 | use core::task::Waker; | 2 | use core::task::Waker; |
| 3 | 3 | ||
| 4 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | 4 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; |
| 5 | use crate::blocking_mutex::Mutex; | 5 | use crate::blocking_mutex::Mutex; |
| 6 | 6 | ||
| 7 | /// Utility struct to register and wake a waker. | 7 | /// Utility struct to register and wake a waker. |
| 8 | pub struct AtomicWaker { | 8 | /// If a waker is registered, registering another waker will replace the previous one without waking it. |
| 9 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | 9 | /// Intended to wake a task from an interrupt. Therefore, it is generally not expected, |
| 10 | /// that multiple tasks register try to register a waker simultaneously. | ||
| 11 | pub struct GenericAtomicWaker<M: RawMutex> { | ||
| 12 | waker: Mutex<M, Cell<Option<Waker>>>, | ||
| 10 | } | 13 | } |
| 11 | 14 | ||
| 12 | impl AtomicWaker { | 15 | impl<M: RawMutex> GenericAtomicWaker<M> { |
| 13 | /// Create a new `AtomicWaker`. | 16 | /// Create a new `AtomicWaker`. |
| 14 | pub const fn new() -> Self { | 17 | pub const fn new(mutex: M) -> Self { |
| 15 | Self { | 18 | Self { |
| 16 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | 19 | waker: Mutex::const_new(mutex, Cell::new(None)), |
| 17 | } | 20 | } |
| 18 | } | 21 | } |
| 19 | 22 | ||
| 20 | /// Register a waker. Overwrites the previous waker, if any. | 23 | /// Register a waker. Overwrites the previous waker, if any. |
| 21 | pub fn register(&self, w: &Waker) { | 24 | pub fn register(&self, w: &Waker) { |
| 22 | critical_section::with(|cs| { | 25 | self.waker.lock(|cell| { |
| 23 | let cell = self.waker.borrow(cs); | ||
| 24 | cell.set(match cell.replace(None) { | 26 | cell.set(match cell.replace(None) { |
| 25 | Some(w2) if (w2.will_wake(w)) => Some(w2), | 27 | Some(w2) if (w2.will_wake(w)) => Some(w2), |
| 26 | _ => Some(w.clone()), | 28 | _ => Some(w.clone()), |
| @@ -30,8 +32,7 @@ impl AtomicWaker { | |||
| 30 | 32 | ||
| 31 | /// Wake the registered waker, if any. | 33 | /// Wake the registered waker, if any. |
| 32 | pub fn wake(&self) { | 34 | pub fn wake(&self) { |
| 33 | critical_section::with(|cs| { | 35 | self.waker.lock(|cell| { |
| 34 | let cell = self.waker.borrow(cs); | ||
| 35 | if let Some(w) = cell.replace(None) { | 36 | if let Some(w) = cell.replace(None) { |
| 36 | w.wake_by_ref(); | 37 | w.wake_by_ref(); |
| 37 | cell.set(Some(w)); | 38 | cell.set(Some(w)); |
| @@ -39,3 +40,27 @@ impl AtomicWaker { | |||
| 39 | }) | 40 | }) |
| 40 | } | 41 | } |
| 41 | } | 42 | } |
| 43 | |||
| 44 | /// Utility struct to register and wake a waker. | ||
| 45 | pub struct AtomicWaker { | ||
| 46 | waker: GenericAtomicWaker<CriticalSectionRawMutex>, | ||
| 47 | } | ||
| 48 | |||
| 49 | impl AtomicWaker { | ||
| 50 | /// Create a new `AtomicWaker`. | ||
| 51 | pub const fn new() -> Self { | ||
| 52 | Self { | ||
| 53 | waker: GenericAtomicWaker::new(CriticalSectionRawMutex::new()), | ||
| 54 | } | ||
| 55 | } | ||
| 56 | |||
| 57 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 58 | pub fn register(&self, w: &Waker) { | ||
| 59 | self.waker.register(w); | ||
| 60 | } | ||
| 61 | |||
| 62 | /// Wake the registered waker, if any. | ||
| 63 | pub fn wake(&self) { | ||
| 64 | self.waker.wake(); | ||
| 65 | } | ||
| 66 | } | ||
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs index 5c6a96ec8..c06b83056 100644 --- a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs | |||
| @@ -4,6 +4,9 @@ use core::sync::atomic::{AtomicPtr, Ordering}; | |||
| 4 | use core::task::Waker; | 4 | use core::task::Waker; |
| 5 | 5 | ||
| 6 | /// Utility struct to register and wake a waker. | 6 | /// Utility struct to register and wake a waker. |
| 7 | /// If a waker is registered, registering another waker will replace the previous one without waking it. | ||
| 8 | /// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected, | ||
| 9 | /// that multiple tasks register try to register a waker simultaneously. | ||
| 7 | pub struct AtomicWaker { | 10 | pub struct AtomicWaker { |
| 8 | waker: AtomicPtr<()>, | 11 | waker: AtomicPtr<()>, |
| 9 | } | 12 | } |
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 0e520bf40..0384d6bed 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs | |||
| @@ -3,6 +3,8 @@ use core::task::Waker; | |||
| 3 | use heapless::Vec; | 3 | use heapless::Vec; |
| 4 | 4 | ||
| 5 | /// Utility struct to register and wake multiple wakers. | 5 | /// Utility struct to register and wake multiple wakers. |
| 6 | /// Queue of wakers with a maximum length of `N`. | ||
| 7 | /// Intended for waking multiple tasks. | ||
| 6 | pub struct MultiWakerRegistration<const N: usize> { | 8 | pub struct MultiWakerRegistration<const N: usize> { |
| 7 | wakers: Vec<Waker, N>, | 9 | wakers: Vec<Waker, N>, |
| 8 | } | 10 | } |
diff --git a/embassy-sync/src/waitqueue/waker_registration.rs b/embassy-sync/src/waitqueue/waker_registration.rs index 9b666e7c4..7f24f8fb6 100644 --- a/embassy-sync/src/waitqueue/waker_registration.rs +++ b/embassy-sync/src/waitqueue/waker_registration.rs | |||
| @@ -2,6 +2,10 @@ use core::mem; | |||
| 2 | use core::task::Waker; | 2 | use core::task::Waker; |
| 3 | 3 | ||
| 4 | /// Utility struct to register and wake a waker. | 4 | /// Utility struct to register and wake a waker. |
| 5 | /// If a waker is registered, registering another waker will replace the previous one. | ||
| 6 | /// The previous waker will be woken in this case, giving it a chance to reregister itself. | ||
| 7 | /// Although it is possible to wake multiple tasks this way, | ||
| 8 | /// this will cause them to wake each other in a loop registering themselves. | ||
| 5 | #[derive(Debug, Default)] | 9 | #[derive(Debug, Default)] |
| 6 | pub struct WakerRegistration { | 10 | pub struct WakerRegistration { |
| 7 | waker: Option<Waker>, | 11 | waker: Option<Waker>, |
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs new file mode 100644 index 000000000..08d6a833d --- /dev/null +++ b/embassy-sync/src/watch.rs | |||
| @@ -0,0 +1,1121 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** receivers. | ||
| 2 | |||
| 3 | use core::cell::RefCell; | ||
| 4 | use core::future::{poll_fn, Future}; | ||
| 5 | use core::marker::PhantomData; | ||
| 6 | use core::ops::{Deref, DerefMut}; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use crate::blocking_mutex::raw::RawMutex; | ||
| 10 | use crate::blocking_mutex::Mutex; | ||
| 11 | use crate::waitqueue::MultiWakerRegistration; | ||
| 12 | |||
| 13 | /// The `Watch` is a single-slot signaling primitive that allows _multiple_ (`N`) receivers to concurrently await | ||
| 14 | /// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers, | ||
| 15 | /// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous | ||
| 16 | /// value when a new one is sent, without waiting for all receivers to read the previous value. | ||
| 17 | /// | ||
| 18 | /// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks | ||
| 19 | /// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are | ||
| 20 | /// always provided with the latest value. | ||
| 21 | /// | ||
| 22 | /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] | ||
| 23 | /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`] | ||
| 24 | /// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the | ||
| 25 | /// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel. | ||
| 26 | /// ``` | ||
| 27 | /// | ||
| 28 | /// use futures_executor::block_on; | ||
| 29 | /// use embassy_sync::watch::Watch; | ||
| 30 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 31 | /// | ||
| 32 | /// let f = async { | ||
| 33 | /// | ||
| 34 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 35 | /// | ||
| 36 | /// // Obtain receivers and sender | ||
| 37 | /// let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 38 | /// let mut rcv1 = WATCH.dyn_receiver().unwrap(); | ||
| 39 | /// let mut snd = WATCH.sender(); | ||
| 40 | /// | ||
| 41 | /// // No more receivers, and no update | ||
| 42 | /// assert!(WATCH.receiver().is_none()); | ||
| 43 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 44 | /// | ||
| 45 | /// snd.send(10); | ||
| 46 | /// | ||
| 47 | /// // Receive the new value (async or try) | ||
| 48 | /// assert_eq!(rcv0.changed().await, 10); | ||
| 49 | /// assert_eq!(rcv1.try_changed(), Some(10)); | ||
| 50 | /// | ||
| 51 | /// // No update | ||
| 52 | /// assert_eq!(rcv0.try_changed(), None); | ||
| 53 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 54 | /// | ||
| 55 | /// snd.send(20); | ||
| 56 | /// | ||
| 57 | /// // Using `get` marks the value as seen | ||
| 58 | /// assert_eq!(rcv1.get().await, 20); | ||
| 59 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 60 | /// | ||
| 61 | /// // But `get` also returns when unchanged | ||
| 62 | /// assert_eq!(rcv1.get().await, 20); | ||
| 63 | /// assert_eq!(rcv1.get().await, 20); | ||
| 64 | /// | ||
| 65 | /// }; | ||
| 66 | /// block_on(f); | ||
| 67 | /// ``` | ||
| 68 | pub struct Watch<M: RawMutex, T: Clone, const N: usize> { | ||
| 69 | mutex: Mutex<M, RefCell<WatchState<T, N>>>, | ||
| 70 | } | ||
| 71 | |||
| 72 | struct WatchState<T: Clone, const N: usize> { | ||
| 73 | data: Option<T>, | ||
| 74 | current_id: u64, | ||
| 75 | wakers: MultiWakerRegistration<N>, | ||
| 76 | receiver_count: usize, | ||
| 77 | } | ||
| 78 | |||
| 79 | trait SealedWatchBehavior<T> { | ||
| 80 | /// Poll the `Watch` for the current value, making it as seen. | ||
| 81 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 82 | |||
| 83 | /// Poll the `Watch` for the value if it matches the predicate function | ||
| 84 | /// `f`, making it as seen. | ||
| 85 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 86 | |||
| 87 | /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. | ||
| 88 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 89 | |||
| 90 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | ||
| 91 | fn try_changed(&self, id: &mut u64) -> Option<T>; | ||
| 92 | |||
| 93 | /// Poll the `Watch` for a changed value that matches the predicate function | ||
| 94 | /// `f`, marking it as seen. | ||
| 95 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 96 | |||
| 97 | /// Tries to retrieve the value of the `Watch` if it has changed and matches the | ||
| 98 | /// predicate function `f`, marking it as seen. | ||
| 99 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 100 | |||
| 101 | /// Used when a receiver is dropped to decrement the receiver count. | ||
| 102 | /// | ||
| 103 | /// ## This method should not be called by the user. | ||
| 104 | fn drop_receiver(&self); | ||
| 105 | |||
| 106 | /// Clears the value of the `Watch`. | ||
| 107 | fn clear(&self); | ||
| 108 | |||
| 109 | /// Sends a new value to the `Watch`. | ||
| 110 | fn send(&self, val: T); | ||
| 111 | |||
| 112 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | ||
| 113 | /// `Watch` does not already contain a value. | ||
| 114 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)); | ||
| 115 | |||
| 116 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | ||
| 117 | /// `Watch` does not already contain a value. | ||
| 118 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool); | ||
| 119 | } | ||
| 120 | |||
| 121 | /// A trait representing the 'inner' behavior of the `Watch`. | ||
| 122 | #[allow(private_bounds)] | ||
| 123 | pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> { | ||
| 124 | /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. | ||
| 125 | fn try_get(&self, id: Option<&mut u64>) -> Option<T>; | ||
| 126 | |||
| 127 | /// Tries to get the value of the `Watch` if it matches the predicate function | ||
| 128 | /// `f`, marking it as seen. | ||
| 129 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 130 | |||
| 131 | /// Checks if the `Watch` is been initialized with a value. | ||
| 132 | fn contains_value(&self) -> bool; | ||
| 133 | } | ||
| 134 | |||
| 135 | impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> { | ||
| 136 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 137 | self.mutex.lock(|state| { | ||
| 138 | let mut s = state.borrow_mut(); | ||
| 139 | match &s.data { | ||
| 140 | Some(data) => { | ||
| 141 | *id = s.current_id; | ||
| 142 | Poll::Ready(data.clone()) | ||
| 143 | } | ||
| 144 | None => { | ||
| 145 | s.wakers.register(cx.waker()); | ||
| 146 | Poll::Pending | ||
| 147 | } | ||
| 148 | } | ||
| 149 | }) | ||
| 150 | } | ||
| 151 | |||
| 152 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 153 | self.mutex.lock(|state| { | ||
| 154 | let mut s = state.borrow_mut(); | ||
| 155 | match s.data { | ||
| 156 | Some(ref data) if f(data) => { | ||
| 157 | *id = s.current_id; | ||
| 158 | Poll::Ready(data.clone()) | ||
| 159 | } | ||
| 160 | _ => { | ||
| 161 | s.wakers.register(cx.waker()); | ||
| 162 | Poll::Pending | ||
| 163 | } | ||
| 164 | } | ||
| 165 | }) | ||
| 166 | } | ||
| 167 | |||
| 168 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 169 | self.mutex.lock(|state| { | ||
| 170 | let mut s = state.borrow_mut(); | ||
| 171 | match (&s.data, s.current_id > *id) { | ||
| 172 | (Some(data), true) => { | ||
| 173 | *id = s.current_id; | ||
| 174 | Poll::Ready(data.clone()) | ||
| 175 | } | ||
| 176 | _ => { | ||
| 177 | s.wakers.register(cx.waker()); | ||
| 178 | Poll::Pending | ||
| 179 | } | ||
| 180 | } | ||
| 181 | }) | ||
| 182 | } | ||
| 183 | |||
| 184 | fn try_changed(&self, id: &mut u64) -> Option<T> { | ||
| 185 | self.mutex.lock(|state| { | ||
| 186 | let s = state.borrow(); | ||
| 187 | match s.current_id > *id { | ||
| 188 | true => { | ||
| 189 | *id = s.current_id; | ||
| 190 | s.data.clone() | ||
| 191 | } | ||
| 192 | false => None, | ||
| 193 | } | ||
| 194 | }) | ||
| 195 | } | ||
| 196 | |||
| 197 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 198 | self.mutex.lock(|state| { | ||
| 199 | let mut s = state.borrow_mut(); | ||
| 200 | match (&s.data, s.current_id > *id) { | ||
| 201 | (Some(data), true) if f(data) => { | ||
| 202 | *id = s.current_id; | ||
| 203 | Poll::Ready(data.clone()) | ||
| 204 | } | ||
| 205 | _ => { | ||
| 206 | s.wakers.register(cx.waker()); | ||
| 207 | Poll::Pending | ||
| 208 | } | ||
| 209 | } | ||
| 210 | }) | ||
| 211 | } | ||
| 212 | |||
| 213 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 214 | self.mutex.lock(|state| { | ||
| 215 | let s = state.borrow(); | ||
| 216 | match (&s.data, s.current_id > *id) { | ||
| 217 | (Some(data), true) if f(data) => { | ||
| 218 | *id = s.current_id; | ||
| 219 | s.data.clone() | ||
| 220 | } | ||
| 221 | _ => None, | ||
| 222 | } | ||
| 223 | }) | ||
| 224 | } | ||
| 225 | |||
| 226 | fn drop_receiver(&self) { | ||
| 227 | self.mutex.lock(|state| { | ||
| 228 | let mut s = state.borrow_mut(); | ||
| 229 | s.receiver_count -= 1; | ||
| 230 | }) | ||
| 231 | } | ||
| 232 | |||
| 233 | fn clear(&self) { | ||
| 234 | self.mutex.lock(|state| { | ||
| 235 | let mut s = state.borrow_mut(); | ||
| 236 | s.data = None; | ||
| 237 | }) | ||
| 238 | } | ||
| 239 | |||
| 240 | fn send(&self, val: T) { | ||
| 241 | self.mutex.lock(|state| { | ||
| 242 | let mut s = state.borrow_mut(); | ||
| 243 | s.data = Some(val); | ||
| 244 | s.current_id += 1; | ||
| 245 | s.wakers.wake(); | ||
| 246 | }) | ||
| 247 | } | ||
| 248 | |||
| 249 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) { | ||
| 250 | self.mutex.lock(|state| { | ||
| 251 | let mut s = state.borrow_mut(); | ||
| 252 | f(&mut s.data); | ||
| 253 | s.current_id += 1; | ||
| 254 | s.wakers.wake(); | ||
| 255 | }) | ||
| 256 | } | ||
| 257 | |||
| 258 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) { | ||
| 259 | self.mutex.lock(|state| { | ||
| 260 | let mut s = state.borrow_mut(); | ||
| 261 | if f(&mut s.data) { | ||
| 262 | s.current_id += 1; | ||
| 263 | s.wakers.wake(); | ||
| 264 | } | ||
| 265 | }) | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { | ||
| 270 | fn try_get(&self, id: Option<&mut u64>) -> Option<T> { | ||
| 271 | self.mutex.lock(|state| { | ||
| 272 | let s = state.borrow(); | ||
| 273 | if let Some(id) = id { | ||
| 274 | *id = s.current_id; | ||
| 275 | } | ||
| 276 | s.data.clone() | ||
| 277 | }) | ||
| 278 | } | ||
| 279 | |||
| 280 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 281 | self.mutex.lock(|state| { | ||
| 282 | let s = state.borrow(); | ||
| 283 | match s.data { | ||
| 284 | Some(ref data) if f(data) => { | ||
| 285 | if let Some(id) = id { | ||
| 286 | *id = s.current_id; | ||
| 287 | } | ||
| 288 | Some(data.clone()) | ||
| 289 | } | ||
| 290 | _ => None, | ||
| 291 | } | ||
| 292 | }) | ||
| 293 | } | ||
| 294 | |||
| 295 | fn contains_value(&self) -> bool { | ||
| 296 | self.mutex.lock(|state| state.borrow().data.is_some()) | ||
| 297 | } | ||
| 298 | } | ||
| 299 | |||
| 300 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | ||
| 301 | /// Create a new `Watch` channel for `N` receivers. | ||
| 302 | pub const fn new() -> Self { | ||
| 303 | Self { | ||
| 304 | mutex: Mutex::new(RefCell::new(WatchState { | ||
| 305 | data: None, | ||
| 306 | current_id: 0, | ||
| 307 | wakers: MultiWakerRegistration::new(), | ||
| 308 | receiver_count: 0, | ||
| 309 | })), | ||
| 310 | } | ||
| 311 | } | ||
| 312 | |||
| 313 | /// Create a new `Watch` channel with default data. | ||
| 314 | pub const fn new_with(data: T) -> Self { | ||
| 315 | Self { | ||
| 316 | mutex: Mutex::new(RefCell::new(WatchState { | ||
| 317 | data: Some(data), | ||
| 318 | current_id: 0, | ||
| 319 | wakers: MultiWakerRegistration::new(), | ||
| 320 | receiver_count: 0, | ||
| 321 | })), | ||
| 322 | } | ||
| 323 | } | ||
| 324 | |||
| 325 | /// Create a new [`Sender`] for the `Watch`. | ||
| 326 | pub fn sender(&self) -> Sender<'_, M, T, N> { | ||
| 327 | Sender(Snd::new(self)) | ||
| 328 | } | ||
| 329 | |||
| 330 | /// Create a new [`DynSender`] for the `Watch`. | ||
| 331 | pub fn dyn_sender(&self) -> DynSender<'_, T> { | ||
| 332 | DynSender(Snd::new(self)) | ||
| 333 | } | ||
| 334 | |||
| 335 | /// Try to create a new [`Receiver`] for the `Watch`. If the | ||
| 336 | /// maximum number of receivers has been reached, `None` is returned. | ||
| 337 | pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> { | ||
| 338 | self.mutex.lock(|state| { | ||
| 339 | let mut s = state.borrow_mut(); | ||
| 340 | if s.receiver_count < N { | ||
| 341 | s.receiver_count += 1; | ||
| 342 | Some(Receiver(Rcv::new(self, 0))) | ||
| 343 | } else { | ||
| 344 | None | ||
| 345 | } | ||
| 346 | }) | ||
| 347 | } | ||
| 348 | |||
| 349 | /// Try to create a new [`DynReceiver`] for the `Watch`. If the | ||
| 350 | /// maximum number of receivers has been reached, `None` is returned. | ||
| 351 | pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> { | ||
| 352 | self.mutex.lock(|state| { | ||
| 353 | let mut s = state.borrow_mut(); | ||
| 354 | if s.receiver_count < N { | ||
| 355 | s.receiver_count += 1; | ||
| 356 | Some(DynReceiver(Rcv::new(self, 0))) | ||
| 357 | } else { | ||
| 358 | None | ||
| 359 | } | ||
| 360 | }) | ||
| 361 | } | ||
| 362 | |||
| 363 | /// Try to create a new [`AnonReceiver`] for the `Watch`. | ||
| 364 | pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> { | ||
| 365 | AnonReceiver(AnonRcv::new(self, 0)) | ||
| 366 | } | ||
| 367 | |||
| 368 | /// Try to create a new [`DynAnonReceiver`] for the `Watch`. | ||
| 369 | pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> { | ||
| 370 | DynAnonReceiver(AnonRcv::new(self, 0)) | ||
| 371 | } | ||
| 372 | |||
| 373 | /// Returns the message ID of the latest message sent to the `Watch`. | ||
| 374 | /// | ||
| 375 | /// This counter is monotonic, and is incremented every time a new message is sent. | ||
| 376 | pub fn get_msg_id(&self) -> u64 { | ||
| 377 | self.mutex.lock(|state| state.borrow().current_id) | ||
| 378 | } | ||
| 379 | |||
| 380 | /// Tries to get the value of the `Watch`. | ||
| 381 | pub fn try_get(&self) -> Option<T> { | ||
| 382 | WatchBehavior::try_get(self, None) | ||
| 383 | } | ||
| 384 | |||
| 385 | /// Tries to get the value of the `Watch` if it matches the predicate function `f`. | ||
| 386 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> | ||
| 387 | where | ||
| 388 | F: Fn(&T) -> bool, | ||
| 389 | { | ||
| 390 | WatchBehavior::try_get_and(self, None, &mut f) | ||
| 391 | } | ||
| 392 | } | ||
| 393 | |||
| 394 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 395 | pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 396 | watch: &'a W, | ||
| 397 | _phantom: PhantomData<T>, | ||
| 398 | } | ||
| 399 | |||
| 400 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> { | ||
| 401 | fn clone(&self) -> Self { | ||
| 402 | Self { | ||
| 403 | watch: self.watch, | ||
| 404 | _phantom: PhantomData, | ||
| 405 | } | ||
| 406 | } | ||
| 407 | } | ||
| 408 | |||
| 409 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { | ||
| 410 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 411 | fn new(watch: &'a W) -> Self { | ||
| 412 | Self { | ||
| 413 | watch, | ||
| 414 | _phantom: PhantomData, | ||
| 415 | } | ||
| 416 | } | ||
| 417 | |||
| 418 | /// Sends a new value to the `Watch`. | ||
| 419 | pub fn send(&self, val: T) { | ||
| 420 | self.watch.send(val) | ||
| 421 | } | ||
| 422 | |||
| 423 | /// Clears the value of the `Watch`. | ||
| 424 | /// This will cause calls to [`Rcv::get`] to be pending. | ||
| 425 | pub fn clear(&self) { | ||
| 426 | self.watch.clear() | ||
| 427 | } | ||
| 428 | |||
| 429 | /// Tries to retrieve the value of the `Watch`. | ||
| 430 | pub fn try_get(&self) -> Option<T> { | ||
| 431 | self.watch.try_get(None) | ||
| 432 | } | ||
| 433 | |||
| 434 | /// Tries to peek the current value of the `Watch` if it matches the predicate | ||
| 435 | /// function `f`. | ||
| 436 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> | ||
| 437 | where | ||
| 438 | F: Fn(&T) -> bool, | ||
| 439 | { | ||
| 440 | self.watch.try_get_and(None, &mut f) | ||
| 441 | } | ||
| 442 | |||
| 443 | /// Returns true if the `Watch` contains a value. | ||
| 444 | pub fn contains_value(&self) -> bool { | ||
| 445 | self.watch.contains_value() | ||
| 446 | } | ||
| 447 | |||
| 448 | /// Modify the value of the `Watch` using a closure. | ||
| 449 | pub fn send_modify<F>(&self, mut f: F) | ||
| 450 | where | ||
| 451 | F: Fn(&mut Option<T>), | ||
| 452 | { | ||
| 453 | self.watch.send_modify(&mut f) | ||
| 454 | } | ||
| 455 | |||
| 456 | /// Modify the value of the `Watch` using a closure. The closure must return | ||
| 457 | /// `true` if the value was modified, which notifies all receivers. | ||
| 458 | pub fn send_if_modified<F>(&self, mut f: F) | ||
| 459 | where | ||
| 460 | F: Fn(&mut Option<T>) -> bool, | ||
| 461 | { | ||
| 462 | self.watch.send_if_modified(&mut f) | ||
| 463 | } | ||
| 464 | } | ||
| 465 | |||
| 466 | /// A sender of a `Watch` channel. | ||
| 467 | /// | ||
| 468 | /// For a simpler type definition, consider [`DynSender`] at the expense of | ||
| 469 | /// some runtime performance due to dynamic dispatch. | ||
| 470 | pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>); | ||
| 471 | |||
| 472 | impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { | ||
| 473 | fn clone(&self) -> Self { | ||
| 474 | Self(self.0.clone()) | ||
| 475 | } | ||
| 476 | } | ||
| 477 | |||
| 478 | impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> { | ||
| 479 | /// Converts the `Sender` into a [`DynSender`]. | ||
| 480 | pub fn as_dyn(self) -> DynSender<'a, T> { | ||
| 481 | DynSender(Snd::new(self.watch)) | ||
| 482 | } | ||
| 483 | } | ||
| 484 | |||
| 485 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> { | ||
| 486 | fn into(self) -> DynSender<'a, T> { | ||
| 487 | self.as_dyn() | ||
| 488 | } | ||
| 489 | } | ||
| 490 | |||
| 491 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> { | ||
| 492 | type Target = Snd<'a, T, Watch<M, T, N>>; | ||
| 493 | |||
| 494 | fn deref(&self) -> &Self::Target { | ||
| 495 | &self.0 | ||
| 496 | } | ||
| 497 | } | ||
| 498 | |||
| 499 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> { | ||
| 500 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 501 | &mut self.0 | ||
| 502 | } | ||
| 503 | } | ||
| 504 | |||
| 505 | /// A sender which holds a **dynamic** reference to a `Watch` channel. | ||
| 506 | /// | ||
| 507 | /// This is an alternative to [`Sender`] with a simpler type definition, | ||
| 508 | pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 509 | |||
| 510 | impl<'a, T: Clone> Clone for DynSender<'a, T> { | ||
| 511 | fn clone(&self) -> Self { | ||
| 512 | Self(self.0.clone()) | ||
| 513 | } | ||
| 514 | } | ||
| 515 | |||
| 516 | impl<'a, T: Clone> Deref for DynSender<'a, T> { | ||
| 517 | type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 518 | |||
| 519 | fn deref(&self) -> &Self::Target { | ||
| 520 | &self.0 | ||
| 521 | } | ||
| 522 | } | ||
| 523 | |||
| 524 | impl<'a, T: Clone> DerefMut for DynSender<'a, T> { | ||
| 525 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 526 | &mut self.0 | ||
| 527 | } | ||
| 528 | } | ||
| 529 | |||
| 530 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 531 | pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 532 | watch: &'a W, | ||
| 533 | at_id: u64, | ||
| 534 | _phantom: PhantomData<T>, | ||
| 535 | } | ||
| 536 | |||
| 537 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | ||
| 538 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 539 | fn new(watch: &'a W, at_id: u64) -> Self { | ||
| 540 | Self { | ||
| 541 | watch, | ||
| 542 | at_id, | ||
| 543 | _phantom: PhantomData, | ||
| 544 | } | ||
| 545 | } | ||
| 546 | |||
| 547 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. | ||
| 548 | /// | ||
| 549 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 550 | pub fn get(&mut self) -> impl Future<Output = T> + '_ { | ||
| 551 | poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)) | ||
| 552 | } | ||
| 553 | |||
| 554 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 555 | pub fn try_get(&mut self) -> Option<T> { | ||
| 556 | self.watch.try_get(Some(&mut self.at_id)) | ||
| 557 | } | ||
| 558 | |||
| 559 | /// Returns the value of the `Watch` if it matches the predicate function `f`, | ||
| 560 | /// or waits for it to match, marking it as seen. | ||
| 561 | /// | ||
| 562 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 563 | pub async fn get_and<F>(&mut self, mut f: F) -> T | ||
| 564 | where | ||
| 565 | F: Fn(&T) -> bool, | ||
| 566 | { | ||
| 567 | poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await | ||
| 568 | } | ||
| 569 | |||
| 570 | /// Tries to get the current value of the `Watch` if it matches the predicate | ||
| 571 | /// function `f` without waiting, marking it as seen. | ||
| 572 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 573 | where | ||
| 574 | F: Fn(&T) -> bool, | ||
| 575 | { | ||
| 576 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) | ||
| 577 | } | ||
| 578 | |||
| 579 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | ||
| 580 | /// | ||
| 581 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 582 | pub async fn changed(&mut self) -> T { | ||
| 583 | poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await | ||
| 584 | } | ||
| 585 | |||
| 586 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 587 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 588 | self.watch.try_changed(&mut self.at_id) | ||
| 589 | } | ||
| 590 | |||
| 591 | /// Waits for the `Watch` to change to a value which satisfies the predicate | ||
| 592 | /// function `f` and returns the new value, marking it as seen. | ||
| 593 | /// | ||
| 594 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 595 | pub async fn changed_and<F>(&mut self, mut f: F) -> T | ||
| 596 | where | ||
| 597 | F: Fn(&T) -> bool, | ||
| 598 | { | ||
| 599 | poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await | ||
| 600 | } | ||
| 601 | |||
| 602 | /// Tries to get the new value of the watch which satisfies the predicate | ||
| 603 | /// function `f` and returns the new value without waiting, marking it as seen. | ||
| 604 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 605 | where | ||
| 606 | F: Fn(&T) -> bool, | ||
| 607 | { | ||
| 608 | self.watch.try_changed_and(&mut self.at_id, &mut f) | ||
| 609 | } | ||
| 610 | |||
| 611 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 612 | /// then awaiting [`Rcv::get`] will return immediately. | ||
| 613 | pub fn contains_value(&self) -> bool { | ||
| 614 | self.watch.contains_value() | ||
| 615 | } | ||
| 616 | } | ||
| 617 | |||
| 618 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> { | ||
| 619 | fn drop(&mut self) { | ||
| 620 | self.watch.drop_receiver(); | ||
| 621 | } | ||
| 622 | } | ||
| 623 | |||
| 624 | /// A anonymous receiver can NOT `.await` a change in the `Watch` value. | ||
| 625 | pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 626 | watch: &'a W, | ||
| 627 | at_id: u64, | ||
| 628 | _phantom: PhantomData<T>, | ||
| 629 | } | ||
| 630 | |||
| 631 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> { | ||
| 632 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 633 | fn new(watch: &'a W, at_id: u64) -> Self { | ||
| 634 | Self { | ||
| 635 | watch, | ||
| 636 | at_id, | ||
| 637 | _phantom: PhantomData, | ||
| 638 | } | ||
| 639 | } | ||
| 640 | |||
| 641 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 642 | pub fn try_get(&mut self) -> Option<T> { | ||
| 643 | self.watch.try_get(Some(&mut self.at_id)) | ||
| 644 | } | ||
| 645 | |||
| 646 | /// Tries to get the current value of the `Watch` if it matches the predicate | ||
| 647 | /// function `f` without waiting, marking it as seen. | ||
| 648 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 649 | where | ||
| 650 | F: Fn(&T) -> bool, | ||
| 651 | { | ||
| 652 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) | ||
| 653 | } | ||
| 654 | |||
| 655 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 656 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 657 | self.watch.try_changed(&mut self.at_id) | ||
| 658 | } | ||
| 659 | |||
| 660 | /// Tries to get the new value of the watch which satisfies the predicate | ||
| 661 | /// function `f` and returns the new value without waiting, marking it as seen. | ||
| 662 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 663 | where | ||
| 664 | F: Fn(&T) -> bool, | ||
| 665 | { | ||
| 666 | self.watch.try_changed_and(&mut self.at_id, &mut f) | ||
| 667 | } | ||
| 668 | |||
| 669 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 670 | /// then awaiting [`Rcv::get`] will return immediately. | ||
| 671 | pub fn contains_value(&self) -> bool { | ||
| 672 | self.watch.contains_value() | ||
| 673 | } | ||
| 674 | } | ||
| 675 | |||
| 676 | /// A receiver of a `Watch` channel. | ||
| 677 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); | ||
| 678 | |||
| 679 | impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { | ||
| 680 | /// Converts the `Receiver` into a [`DynReceiver`]. | ||
| 681 | pub fn as_dyn(self) -> DynReceiver<'a, T> { | ||
| 682 | let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id)); | ||
| 683 | core::mem::forget(self); // Ensures the destructor is not called | ||
| 684 | rcv | ||
| 685 | } | ||
| 686 | } | ||
| 687 | |||
| 688 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> { | ||
| 689 | fn into(self) -> DynReceiver<'a, T> { | ||
| 690 | self.as_dyn() | ||
| 691 | } | ||
| 692 | } | ||
| 693 | |||
| 694 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | ||
| 695 | type Target = Rcv<'a, T, Watch<M, T, N>>; | ||
| 696 | |||
| 697 | fn deref(&self) -> &Self::Target { | ||
| 698 | &self.0 | ||
| 699 | } | ||
| 700 | } | ||
| 701 | |||
| 702 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { | ||
| 703 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 704 | &mut self.0 | ||
| 705 | } | ||
| 706 | } | ||
| 707 | |||
| 708 | /// A receiver which holds a **dynamic** reference to a `Watch` channel. | ||
| 709 | /// | ||
| 710 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of | ||
| 711 | /// some runtime performance due to dynamic dispatch. | ||
| 712 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 713 | |||
| 714 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { | ||
| 715 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 716 | |||
| 717 | fn deref(&self) -> &Self::Target { | ||
| 718 | &self.0 | ||
| 719 | } | ||
| 720 | } | ||
| 721 | |||
| 722 | impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { | ||
| 723 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 724 | &mut self.0 | ||
| 725 | } | ||
| 726 | } | ||
| 727 | |||
| 728 | /// A receiver of a `Watch` channel that cannot `.await` values. | ||
| 729 | pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>); | ||
| 730 | |||
| 731 | impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { | ||
| 732 | /// Converts the `Receiver` into a [`DynReceiver`]. | ||
| 733 | pub fn as_dyn(self) -> DynAnonReceiver<'a, T> { | ||
| 734 | let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id)); | ||
| 735 | core::mem::forget(self); // Ensures the destructor is not called | ||
| 736 | rcv | ||
| 737 | } | ||
| 738 | } | ||
| 739 | |||
| 740 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> { | ||
| 741 | fn into(self) -> DynAnonReceiver<'a, T> { | ||
| 742 | self.as_dyn() | ||
| 743 | } | ||
| 744 | } | ||
| 745 | |||
| 746 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> { | ||
| 747 | type Target = AnonRcv<'a, T, Watch<M, T, N>>; | ||
| 748 | |||
| 749 | fn deref(&self) -> &Self::Target { | ||
| 750 | &self.0 | ||
| 751 | } | ||
| 752 | } | ||
| 753 | |||
| 754 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> { | ||
| 755 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 756 | &mut self.0 | ||
| 757 | } | ||
| 758 | } | ||
| 759 | |||
| 760 | /// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel. | ||
| 761 | /// | ||
| 762 | /// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of | ||
| 763 | /// some runtime performance due to dynamic dispatch. | ||
| 764 | pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 765 | |||
| 766 | impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> { | ||
| 767 | type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 768 | |||
| 769 | fn deref(&self) -> &Self::Target { | ||
| 770 | &self.0 | ||
| 771 | } | ||
| 772 | } | ||
| 773 | |||
| 774 | impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> { | ||
| 775 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 776 | &mut self.0 | ||
| 777 | } | ||
| 778 | } | ||
| 779 | |||
| 780 | #[cfg(test)] | ||
| 781 | mod tests { | ||
| 782 | use futures_executor::block_on; | ||
| 783 | |||
| 784 | use super::Watch; | ||
| 785 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 786 | |||
| 787 | #[test] | ||
| 788 | fn multiple_sends() { | ||
| 789 | let f = async { | ||
| 790 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 791 | |||
| 792 | // Obtain receiver and sender | ||
| 793 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 794 | let snd = WATCH.sender(); | ||
| 795 | |||
| 796 | // Not initialized | ||
| 797 | assert_eq!(rcv.try_changed(), None); | ||
| 798 | |||
| 799 | // Receive the new value | ||
| 800 | snd.send(10); | ||
| 801 | assert_eq!(rcv.changed().await, 10); | ||
| 802 | |||
| 803 | // Receive another value | ||
| 804 | snd.send(20); | ||
| 805 | assert_eq!(rcv.try_changed(), Some(20)); | ||
| 806 | |||
| 807 | // No update | ||
| 808 | assert_eq!(rcv.try_changed(), None); | ||
| 809 | }; | ||
| 810 | block_on(f); | ||
| 811 | } | ||
| 812 | |||
| 813 | #[test] | ||
| 814 | fn all_try_get() { | ||
| 815 | let f = async { | ||
| 816 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 817 | |||
| 818 | // Obtain receiver and sender | ||
| 819 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 820 | let snd = WATCH.sender(); | ||
| 821 | |||
| 822 | // Not initialized | ||
| 823 | assert_eq!(WATCH.try_get(), None); | ||
| 824 | assert_eq!(rcv.try_get(), None); | ||
| 825 | assert_eq!(snd.try_get(), None); | ||
| 826 | |||
| 827 | // Receive the new value | ||
| 828 | snd.send(10); | ||
| 829 | assert_eq!(WATCH.try_get(), Some(10)); | ||
| 830 | assert_eq!(rcv.try_get(), Some(10)); | ||
| 831 | assert_eq!(snd.try_get(), Some(10)); | ||
| 832 | |||
| 833 | assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10)); | ||
| 834 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(10)); | ||
| 835 | assert_eq!(snd.try_get_and(|x| x > &5), Some(10)); | ||
| 836 | |||
| 837 | assert_eq!(WATCH.try_get_and(|x| x < &5), None); | ||
| 838 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | ||
| 839 | assert_eq!(snd.try_get_and(|x| x < &5), None); | ||
| 840 | }; | ||
| 841 | block_on(f); | ||
| 842 | } | ||
| 843 | |||
| 844 | #[test] | ||
| 845 | fn once_lock_like() { | ||
| 846 | let f = async { | ||
| 847 | static CONFIG0: u8 = 10; | ||
| 848 | static CONFIG1: u8 = 20; | ||
| 849 | |||
| 850 | static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new(); | ||
| 851 | |||
| 852 | // Obtain receiver and sender | ||
| 853 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 854 | let snd = WATCH.sender(); | ||
| 855 | |||
| 856 | // Not initialized | ||
| 857 | assert_eq!(rcv.try_changed(), None); | ||
| 858 | |||
| 859 | // Receive the new value | ||
| 860 | snd.send(&CONFIG0); | ||
| 861 | let rcv0 = rcv.changed().await; | ||
| 862 | assert_eq!(rcv0, &10); | ||
| 863 | |||
| 864 | // Receive another value | ||
| 865 | snd.send(&CONFIG1); | ||
| 866 | let rcv1 = rcv.try_changed(); | ||
| 867 | assert_eq!(rcv1, Some(&20)); | ||
| 868 | |||
| 869 | // No update | ||
| 870 | assert_eq!(rcv.try_changed(), None); | ||
| 871 | |||
| 872 | // Ensure similarity with original static | ||
| 873 | assert_eq!(rcv0, &CONFIG0); | ||
| 874 | assert_eq!(rcv1, Some(&CONFIG1)); | ||
| 875 | }; | ||
| 876 | block_on(f); | ||
| 877 | } | ||
| 878 | |||
| 879 | #[test] | ||
| 880 | fn sender_modify() { | ||
| 881 | let f = async { | ||
| 882 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 883 | |||
| 884 | // Obtain receiver and sender | ||
| 885 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 886 | let snd = WATCH.sender(); | ||
| 887 | |||
| 888 | // Receive the new value | ||
| 889 | snd.send(10); | ||
| 890 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 891 | |||
| 892 | // Modify the value inplace | ||
| 893 | snd.send_modify(|opt| { | ||
| 894 | if let Some(inner) = opt { | ||
| 895 | *inner += 5; | ||
| 896 | } | ||
| 897 | }); | ||
| 898 | |||
| 899 | // Get the modified value | ||
| 900 | assert_eq!(rcv.try_changed(), Some(15)); | ||
| 901 | assert_eq!(rcv.try_changed(), None); | ||
| 902 | }; | ||
| 903 | block_on(f); | ||
| 904 | } | ||
| 905 | |||
| 906 | #[test] | ||
| 907 | fn predicate_fn() { | ||
| 908 | let f = async { | ||
| 909 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 910 | |||
| 911 | // Obtain receiver and sender | ||
| 912 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 913 | let snd = WATCH.sender(); | ||
| 914 | |||
| 915 | snd.send(15); | ||
| 916 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); | ||
| 917 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | ||
| 918 | assert!(rcv.try_changed().is_none()); | ||
| 919 | |||
| 920 | snd.send(20); | ||
| 921 | assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20)); | ||
| 922 | assert_eq!(rcv.try_changed_and(|x| x > &5), None); | ||
| 923 | |||
| 924 | snd.send(25); | ||
| 925 | assert_eq!(rcv.try_changed_and(|x| x < &5), None); | ||
| 926 | assert_eq!(rcv.try_changed(), Some(25)); | ||
| 927 | |||
| 928 | snd.send(30); | ||
| 929 | assert_eq!(rcv.changed_and(|x| x > &5).await, 30); | ||
| 930 | assert_eq!(rcv.get_and(|x| x > &5).await, 30); | ||
| 931 | }; | ||
| 932 | block_on(f); | ||
| 933 | } | ||
| 934 | |||
| 935 | #[test] | ||
| 936 | fn receive_after_create() { | ||
| 937 | let f = async { | ||
| 938 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 939 | |||
| 940 | // Obtain sender and send value | ||
| 941 | let snd = WATCH.sender(); | ||
| 942 | snd.send(10); | ||
| 943 | |||
| 944 | // Obtain receiver and receive value | ||
| 945 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 946 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 947 | }; | ||
| 948 | block_on(f); | ||
| 949 | } | ||
| 950 | |||
| 951 | #[test] | ||
| 952 | fn max_receivers_drop() { | ||
| 953 | let f = async { | ||
| 954 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 955 | |||
| 956 | // Try to create 3 receivers (only 2 can exist at once) | ||
| 957 | let rcv0 = WATCH.receiver(); | ||
| 958 | let rcv1 = WATCH.receiver(); | ||
| 959 | let rcv2 = WATCH.receiver(); | ||
| 960 | |||
| 961 | // Ensure the first two are successful and the third is not | ||
| 962 | assert!(rcv0.is_some()); | ||
| 963 | assert!(rcv1.is_some()); | ||
| 964 | assert!(rcv2.is_none()); | ||
| 965 | |||
| 966 | // Drop the first receiver | ||
| 967 | drop(rcv0); | ||
| 968 | |||
| 969 | // Create another receiver and ensure it is successful | ||
| 970 | let rcv3 = WATCH.receiver(); | ||
| 971 | assert!(rcv3.is_some()); | ||
| 972 | }; | ||
| 973 | block_on(f); | ||
| 974 | } | ||
| 975 | |||
| 976 | #[test] | ||
| 977 | fn multiple_receivers() { | ||
| 978 | let f = async { | ||
| 979 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 980 | |||
| 981 | // Obtain receivers and sender | ||
| 982 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 983 | let mut rcv1 = WATCH.anon_receiver(); | ||
| 984 | let snd = WATCH.sender(); | ||
| 985 | |||
| 986 | // No update for both | ||
| 987 | assert_eq!(rcv0.try_changed(), None); | ||
| 988 | assert_eq!(rcv1.try_changed(), None); | ||
| 989 | |||
| 990 | // Send a new value | ||
| 991 | snd.send(0); | ||
| 992 | |||
| 993 | // Both receivers receive the new value | ||
| 994 | assert_eq!(rcv0.try_changed(), Some(0)); | ||
| 995 | assert_eq!(rcv1.try_changed(), Some(0)); | ||
| 996 | }; | ||
| 997 | block_on(f); | ||
| 998 | } | ||
| 999 | |||
| 1000 | #[test] | ||
| 1001 | fn clone_senders() { | ||
| 1002 | let f = async { | ||
| 1003 | // Obtain different ways to send | ||
| 1004 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 1005 | let snd0 = WATCH.sender(); | ||
| 1006 | let snd1 = snd0.clone(); | ||
| 1007 | |||
| 1008 | // Obtain Receiver | ||
| 1009 | let mut rcv = WATCH.receiver().unwrap().as_dyn(); | ||
| 1010 | |||
| 1011 | // Send a value from first sender | ||
| 1012 | snd0.send(10); | ||
| 1013 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 1014 | |||
| 1015 | // Send a value from second sender | ||
| 1016 | snd1.send(20); | ||
| 1017 | assert_eq!(rcv.try_changed(), Some(20)); | ||
| 1018 | }; | ||
| 1019 | block_on(f); | ||
| 1020 | } | ||
| 1021 | |||
| 1022 | #[test] | ||
| 1023 | fn use_dynamics() { | ||
| 1024 | let f = async { | ||
| 1025 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1026 | |||
| 1027 | // Obtain receiver and sender | ||
| 1028 | let mut anon_rcv = WATCH.dyn_anon_receiver(); | ||
| 1029 | let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); | ||
| 1030 | let dyn_snd = WATCH.dyn_sender(); | ||
| 1031 | |||
| 1032 | // Send a value | ||
| 1033 | dyn_snd.send(10); | ||
| 1034 | |||
| 1035 | // Ensure the dynamic receiver receives the value | ||
| 1036 | assert_eq!(anon_rcv.try_changed(), Some(10)); | ||
| 1037 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | ||
| 1038 | assert_eq!(dyn_rcv.try_changed(), None); | ||
| 1039 | }; | ||
| 1040 | block_on(f); | ||
| 1041 | } | ||
| 1042 | |||
| 1043 | #[test] | ||
| 1044 | fn convert_to_dyn() { | ||
| 1045 | let f = async { | ||
| 1046 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1047 | |||
| 1048 | // Obtain receiver and sender | ||
| 1049 | let anon_rcv = WATCH.anon_receiver(); | ||
| 1050 | let rcv = WATCH.receiver().unwrap(); | ||
| 1051 | let snd = WATCH.sender(); | ||
| 1052 | |||
| 1053 | // Convert to dynamic | ||
| 1054 | let mut dyn_anon_rcv = anon_rcv.as_dyn(); | ||
| 1055 | let mut dyn_rcv = rcv.as_dyn(); | ||
| 1056 | let dyn_snd = snd.as_dyn(); | ||
| 1057 | |||
| 1058 | // Send a value | ||
| 1059 | dyn_snd.send(10); | ||
| 1060 | |||
| 1061 | // Ensure the dynamic receiver receives the value | ||
| 1062 | assert_eq!(dyn_anon_rcv.try_changed(), Some(10)); | ||
| 1063 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | ||
| 1064 | assert_eq!(dyn_rcv.try_changed(), None); | ||
| 1065 | }; | ||
| 1066 | block_on(f); | ||
| 1067 | } | ||
| 1068 | |||
| 1069 | #[test] | ||
| 1070 | fn dynamic_receiver_count() { | ||
| 1071 | let f = async { | ||
| 1072 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1073 | |||
| 1074 | // Obtain receiver and sender | ||
| 1075 | let rcv0 = WATCH.receiver(); | ||
| 1076 | let rcv1 = WATCH.receiver(); | ||
| 1077 | let rcv2 = WATCH.receiver(); | ||
| 1078 | |||
| 1079 | // Ensure the first two are successful and the third is not | ||
| 1080 | assert!(rcv0.is_some()); | ||
| 1081 | assert!(rcv1.is_some()); | ||
| 1082 | assert!(rcv2.is_none()); | ||
| 1083 | |||
| 1084 | // Convert to dynamic | ||
| 1085 | let dyn_rcv0 = rcv0.unwrap().as_dyn(); | ||
| 1086 | |||
| 1087 | // Drop the (now dynamic) receiver | ||
| 1088 | drop(dyn_rcv0); | ||
| 1089 | |||
| 1090 | // Create another receiver and ensure it is successful | ||
| 1091 | let rcv3 = WATCH.receiver(); | ||
| 1092 | let rcv4 = WATCH.receiver(); | ||
| 1093 | assert!(rcv3.is_some()); | ||
| 1094 | assert!(rcv4.is_none()); | ||
| 1095 | }; | ||
| 1096 | block_on(f); | ||
| 1097 | } | ||
| 1098 | |||
| 1099 | #[test] | ||
| 1100 | fn contains_value() { | ||
| 1101 | let f = async { | ||
| 1102 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1103 | |||
| 1104 | // Obtain receiver and sender | ||
| 1105 | let rcv = WATCH.receiver().unwrap(); | ||
| 1106 | let snd = WATCH.sender(); | ||
| 1107 | |||
| 1108 | // check if the watch contains a value | ||
| 1109 | assert_eq!(rcv.contains_value(), false); | ||
| 1110 | assert_eq!(snd.contains_value(), false); | ||
| 1111 | |||
| 1112 | // Send a value | ||
| 1113 | snd.send(10); | ||
| 1114 | |||
| 1115 | // check if the watch contains a value | ||
| 1116 | assert_eq!(rcv.contains_value(), true); | ||
| 1117 | assert_eq!(snd.contains_value(), true); | ||
| 1118 | }; | ||
| 1119 | block_on(f); | ||
| 1120 | } | ||
| 1121 | } | ||
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs index cfce9a571..e3e5b2538 100644 --- a/embassy-sync/src/zerocopy_channel.rs +++ b/embassy-sync/src/zerocopy_channel.rs | |||
| @@ -15,7 +15,7 @@ | |||
| 15 | //! another message will result in an error being returned. | 15 | //! another message will result in an error being returned. |
| 16 | 16 | ||
| 17 | use core::cell::RefCell; | 17 | use core::cell::RefCell; |
| 18 | use core::future::poll_fn; | 18 | use core::future::{poll_fn, Future}; |
| 19 | use core::marker::PhantomData; | 19 | use core::marker::PhantomData; |
| 20 | use core::task::{Context, Poll}; | 20 | use core::task::{Context, Poll}; |
| 21 | 21 | ||
| @@ -35,7 +35,7 @@ use crate::waitqueue::WakerRegistration; | |||
| 35 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through | 35 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through |
| 36 | /// an `&mut T`. | 36 | /// an `&mut T`. |
| 37 | pub struct Channel<'a, M: RawMutex, T> { | 37 | pub struct Channel<'a, M: RawMutex, T> { |
| 38 | buf: *mut T, | 38 | buf: BufferPtr<T>, |
| 39 | phantom: PhantomData<&'a mut T>, | 39 | phantom: PhantomData<&'a mut T>, |
| 40 | state: Mutex<M, RefCell<State>>, | 40 | state: Mutex<M, RefCell<State>>, |
| 41 | } | 41 | } |
| @@ -50,10 +50,10 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { | |||
| 50 | assert!(len != 0); | 50 | assert!(len != 0); |
| 51 | 51 | ||
| 52 | Self { | 52 | Self { |
| 53 | buf: buf.as_mut_ptr(), | 53 | buf: BufferPtr(buf.as_mut_ptr()), |
| 54 | phantom: PhantomData, | 54 | phantom: PhantomData, |
| 55 | state: Mutex::new(RefCell::new(State { | 55 | state: Mutex::new(RefCell::new(State { |
| 56 | len, | 56 | capacity: len, |
| 57 | front: 0, | 57 | front: 0, |
| 58 | back: 0, | 58 | back: 0, |
| 59 | full: false, | 59 | full: false, |
| @@ -70,8 +70,42 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { | |||
| 70 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { | 70 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { |
| 71 | (Sender { channel: self }, Receiver { channel: self }) | 71 | (Sender { channel: self }, Receiver { channel: self }) |
| 72 | } | 72 | } |
| 73 | |||
| 74 | /// Clears all elements in the channel. | ||
| 75 | pub fn clear(&mut self) { | ||
| 76 | self.state.lock(|s| { | ||
| 77 | s.borrow_mut().clear(); | ||
| 78 | }); | ||
| 79 | } | ||
| 80 | |||
| 81 | /// Returns the number of elements currently in the channel. | ||
| 82 | pub fn len(&self) -> usize { | ||
| 83 | self.state.lock(|s| s.borrow().len()) | ||
| 84 | } | ||
| 85 | |||
| 86 | /// Returns whether the channel is empty. | ||
| 87 | pub fn is_empty(&self) -> bool { | ||
| 88 | self.state.lock(|s| s.borrow().is_empty()) | ||
| 89 | } | ||
| 90 | |||
| 91 | /// Returns whether the channel is full. | ||
| 92 | pub fn is_full(&self) -> bool { | ||
| 93 | self.state.lock(|s| s.borrow().is_full()) | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | #[repr(transparent)] | ||
| 98 | struct BufferPtr<T>(*mut T); | ||
| 99 | |||
| 100 | impl<T> BufferPtr<T> { | ||
| 101 | unsafe fn add(&self, count: usize) -> *mut T { | ||
| 102 | self.0.add(count) | ||
| 103 | } | ||
| 73 | } | 104 | } |
| 74 | 105 | ||
| 106 | unsafe impl<T> Send for BufferPtr<T> {} | ||
| 107 | unsafe impl<T> Sync for BufferPtr<T> {} | ||
| 108 | |||
| 75 | /// Send-only access to a [`Channel`]. | 109 | /// Send-only access to a [`Channel`]. |
| 76 | pub struct Sender<'a, M: RawMutex, T> { | 110 | pub struct Sender<'a, M: RawMutex, T> { |
| 77 | channel: &'a Channel<'a, M, T>, | 111 | channel: &'a Channel<'a, M, T>, |
| @@ -109,12 +143,15 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 109 | } | 143 | } |
| 110 | 144 | ||
| 111 | /// Asynchronously send a value over the channel. | 145 | /// Asynchronously send a value over the channel. |
| 112 | pub async fn send(&mut self) -> &mut T { | 146 | pub fn send(&mut self) -> impl Future<Output = &mut T> { |
| 113 | let i = poll_fn(|cx| { | 147 | poll_fn(|cx| { |
| 114 | self.channel.state.lock(|s| { | 148 | self.channel.state.lock(|s| { |
| 115 | let s = &mut *s.borrow_mut(); | 149 | let s = &mut *s.borrow_mut(); |
| 116 | match s.push_index() { | 150 | match s.push_index() { |
| 117 | Some(i) => Poll::Ready(i), | 151 | Some(i) => { |
| 152 | let r = unsafe { &mut *self.channel.buf.add(i) }; | ||
| 153 | Poll::Ready(r) | ||
| 154 | } | ||
| 118 | None => { | 155 | None => { |
| 119 | s.receive_waker.register(cx.waker()); | 156 | s.receive_waker.register(cx.waker()); |
| 120 | Poll::Pending | 157 | Poll::Pending |
| @@ -122,14 +159,34 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 122 | } | 159 | } |
| 123 | }) | 160 | }) |
| 124 | }) | 161 | }) |
| 125 | .await; | ||
| 126 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 127 | } | 162 | } |
| 128 | 163 | ||
| 129 | /// Notify the channel that the sending of the value has been finalized. | 164 | /// Notify the channel that the sending of the value has been finalized. |
| 130 | pub fn send_done(&mut self) { | 165 | pub fn send_done(&mut self) { |
| 131 | self.channel.state.lock(|s| s.borrow_mut().push_done()) | 166 | self.channel.state.lock(|s| s.borrow_mut().push_done()) |
| 132 | } | 167 | } |
| 168 | |||
| 169 | /// Clears all elements in the channel. | ||
| 170 | pub fn clear(&mut self) { | ||
| 171 | self.channel.state.lock(|s| { | ||
| 172 | s.borrow_mut().clear(); | ||
| 173 | }); | ||
| 174 | } | ||
| 175 | |||
| 176 | /// Returns the number of elements currently in the channel. | ||
| 177 | pub fn len(&self) -> usize { | ||
| 178 | self.channel.state.lock(|s| s.borrow().len()) | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Returns whether the channel is empty. | ||
| 182 | pub fn is_empty(&self) -> bool { | ||
| 183 | self.channel.state.lock(|s| s.borrow().is_empty()) | ||
| 184 | } | ||
| 185 | |||
| 186 | /// Returns whether the channel is full. | ||
| 187 | pub fn is_full(&self) -> bool { | ||
| 188 | self.channel.state.lock(|s| s.borrow().is_full()) | ||
| 189 | } | ||
| 133 | } | 190 | } |
| 134 | 191 | ||
| 135 | /// Receive-only access to a [`Channel`]. | 192 | /// Receive-only access to a [`Channel`]. |
| @@ -138,7 +195,7 @@ pub struct Receiver<'a, M: RawMutex, T> { | |||
| 138 | } | 195 | } |
| 139 | 196 | ||
| 140 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | 197 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { |
| 141 | /// Creates one further [`Sender`] over the same channel. | 198 | /// Creates one further [`Receiver`] over the same channel. |
| 142 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { | 199 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { |
| 143 | Receiver { channel: self.channel } | 200 | Receiver { channel: self.channel } |
| 144 | } | 201 | } |
| @@ -169,12 +226,15 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 169 | } | 226 | } |
| 170 | 227 | ||
| 171 | /// Asynchronously receive a value over the channel. | 228 | /// Asynchronously receive a value over the channel. |
| 172 | pub async fn receive(&mut self) -> &mut T { | 229 | pub fn receive(&mut self) -> impl Future<Output = &mut T> { |
| 173 | let i = poll_fn(|cx| { | 230 | poll_fn(|cx| { |
| 174 | self.channel.state.lock(|s| { | 231 | self.channel.state.lock(|s| { |
| 175 | let s = &mut *s.borrow_mut(); | 232 | let s = &mut *s.borrow_mut(); |
| 176 | match s.pop_index() { | 233 | match s.pop_index() { |
| 177 | Some(i) => Poll::Ready(i), | 234 | Some(i) => { |
| 235 | let r = unsafe { &mut *self.channel.buf.add(i) }; | ||
| 236 | Poll::Ready(r) | ||
| 237 | } | ||
| 178 | None => { | 238 | None => { |
| 179 | s.send_waker.register(cx.waker()); | 239 | s.send_waker.register(cx.waker()); |
| 180 | Poll::Pending | 240 | Poll::Pending |
| @@ -182,18 +242,39 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 182 | } | 242 | } |
| 183 | }) | 243 | }) |
| 184 | }) | 244 | }) |
| 185 | .await; | ||
| 186 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 187 | } | 245 | } |
| 188 | 246 | ||
| 189 | /// Notify the channel that the receiving of the value has been finalized. | 247 | /// Notify the channel that the receiving of the value has been finalized. |
| 190 | pub fn receive_done(&mut self) { | 248 | pub fn receive_done(&mut self) { |
| 191 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) | 249 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) |
| 192 | } | 250 | } |
| 251 | |||
| 252 | /// Clears all elements in the channel. | ||
| 253 | pub fn clear(&mut self) { | ||
| 254 | self.channel.state.lock(|s| { | ||
| 255 | s.borrow_mut().clear(); | ||
| 256 | }); | ||
| 257 | } | ||
| 258 | |||
| 259 | /// Returns the number of elements currently in the channel. | ||
| 260 | pub fn len(&self) -> usize { | ||
| 261 | self.channel.state.lock(|s| s.borrow().len()) | ||
| 262 | } | ||
| 263 | |||
| 264 | /// Returns whether the channel is empty. | ||
| 265 | pub fn is_empty(&self) -> bool { | ||
| 266 | self.channel.state.lock(|s| s.borrow().is_empty()) | ||
| 267 | } | ||
| 268 | |||
| 269 | /// Returns whether the channel is full. | ||
| 270 | pub fn is_full(&self) -> bool { | ||
| 271 | self.channel.state.lock(|s| s.borrow().is_full()) | ||
| 272 | } | ||
| 193 | } | 273 | } |
| 194 | 274 | ||
| 195 | struct State { | 275 | struct State { |
| 196 | len: usize, | 276 | /// Maximum number of elements the channel can hold. |
| 277 | capacity: usize, | ||
| 197 | 278 | ||
| 198 | /// Front index. Always 0..=(N-1) | 279 | /// Front index. Always 0..=(N-1) |
| 199 | front: usize, | 280 | front: usize, |
| @@ -210,13 +291,34 @@ struct State { | |||
| 210 | 291 | ||
| 211 | impl State { | 292 | impl State { |
| 212 | fn increment(&self, i: usize) -> usize { | 293 | fn increment(&self, i: usize) -> usize { |
| 213 | if i + 1 == self.len { | 294 | if i + 1 == self.capacity { |
| 214 | 0 | 295 | 0 |
| 215 | } else { | 296 | } else { |
| 216 | i + 1 | 297 | i + 1 |
| 217 | } | 298 | } |
| 218 | } | 299 | } |
| 219 | 300 | ||
| 301 | fn clear(&mut self) { | ||
| 302 | if self.full { | ||
| 303 | self.receive_waker.wake(); | ||
| 304 | } | ||
| 305 | self.front = 0; | ||
| 306 | self.back = 0; | ||
| 307 | self.full = false; | ||
| 308 | } | ||
| 309 | |||
| 310 | fn len(&self) -> usize { | ||
| 311 | if !self.full { | ||
| 312 | if self.back >= self.front { | ||
| 313 | self.back - self.front | ||
| 314 | } else { | ||
| 315 | self.capacity + self.back - self.front | ||
| 316 | } | ||
| 317 | } else { | ||
| 318 | self.capacity | ||
| 319 | } | ||
| 320 | } | ||
| 321 | |||
| 220 | fn is_full(&self) -> bool { | 322 | fn is_full(&self) -> bool { |
| 221 | self.full | 323 | self.full |
| 222 | } | 324 | } |
