aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorjrmoulton <[email protected]>2025-06-10 15:47:54 -0600
committerjrmoulton <[email protected]>2025-06-10 15:48:36 -0600
commitcfad9798ff99d4de0571a512d156b5fe1ef1d427 (patch)
treefc3bf670f82d139de19466cddad1e909db7f3d2e /embassy-sync
parentfc342915e6155dec7bafa3e135da7f37a9a07f5c (diff)
parent6186d111a5c150946ee5b7e9e68d987a38c1a463 (diff)
merge new embassy changes
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/CHANGELOG.md46
-rw-r--r--embassy-sync/Cargo.toml9
-rw-r--r--embassy-sync/README.md5
-rw-r--r--embassy-sync/src/blocking_mutex/mod.rs18
-rw-r--r--embassy-sync/src/channel.rs293
-rw-r--r--embassy-sync/src/lib.rs2
-rw-r--r--embassy-sync/src/mutex.rs8
-rw-r--r--embassy-sync/src/once_lock.rs5
-rw-r--r--embassy-sync/src/pipe.rs273
-rw-r--r--embassy-sync/src/priority_channel.rs182
-rw-r--r--embassy-sync/src/pubsub/mod.rs89
-rw-r--r--embassy-sync/src/pubsub/publisher.rs67
-rw-r--r--embassy-sync/src/rwlock.rs387
-rw-r--r--embassy-sync/src/signal.rs5
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker.rs45
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker_turbo.rs3
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs2
-rw-r--r--embassy-sync/src/waitqueue/waker_registration.rs4
-rw-r--r--embassy-sync/src/watch.rs1121
-rw-r--r--embassy-sync/src/zerocopy_channel.rs136
20 files changed, 2619 insertions, 81 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md
index 8f2d26fe0..89684de0c 100644
--- a/embassy-sync/CHANGELOG.md
+++ b/embassy-sync/CHANGELOG.md
@@ -7,7 +7,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7 7
8## Unreleased 8## Unreleased
9 9
10- Add LazyLock sync primitive. 10## 0.7.0 - 2025-05-28
11
12- Add `remove_if` to `priority_channel::{Receiver, PriorityChannel}`.
13- impl `Stream` for `channel::{Receiver, Channel}`.
14- Fix channels to wake senders on `clear()`.
15 For `Channel`, `PriorityChannel`, `PubSub`, `zerocopy_channel::Channel`.
16- Allow `zerocopy_channel::Channel` to auto-implement `Sync`/`Send`.
17- Add `must_use` to `MutexGuard`.
18- Add a `RwLock`.
19- Add `lock_mut` to `blocking_mutex::Mutex`.
20- Don't select a critical-section implementation when `std` feature is enabled.
21- Improve waker documentation.
22- Improve `Signal` and `Watch` documentation.
23- Update to defmt 1.0. This remains compatible with latest defmt 0.3.
24- Add `peek` method on `channel` and `priority_channel`.
25- Add dynamic sender and receiver that are Send + Sync for `channel`.
26
27## 0.6.2 - 2025-01-15
28
29- Add dynamic dispatch variant of `Pipe`.
30
31## 0.6.1 - 2024-11-22
32
33- Add `LazyLock` sync primitive.
34- Add `Watch` sync primitive.
35- Add `clear`, `len`, `is_empty` and `is_full` functions to `zerocopy_channel`.
36- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `channel::{Sender, Receiver}`.
37- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `priority_channel::{Sender, Receiver}`.
38- Add `GenericAtomicWaker` utility.
11 39
12## 0.6.0 - 2024-05-29 40## 0.6.0 - 2024-05-29
13 41
@@ -16,20 +44,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
16- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`. 44- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`.
17- Made `PubSubBehavior` sealed 45- Made `PubSubBehavior` sealed
18 - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)` 46 - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)`
19- Add OnceLock sync primitive. 47- Add `OnceLock` sync primitive.
20- Add constructor for DynamicChannel 48- Add constructor for `DynamicChannel`
21- Add ready_to_receive functions to Channel and Receiver. 49- Add ready_to_receive functions to `Channel` and `Receiver`.
22 50
23## 0.5.0 - 2023-12-04 51## 0.5.0 - 2023-12-04
24 52
25- Add a PriorityChannel. 53- Add a `PriorityChannel`.
26- Remove nightly and unstable-traits features in preparation for 1.75. 54- Remove `nightly` and `unstable-traits` features in preparation for 1.75.
27- Upgrade heapless to 0.8. 55- Upgrade `heapless` to 0.8.
28- Upgrade static-cell to 2.0. 56- Upgrade `static-cell` to 2.0.
29 57
30## 0.4.0 - 2023-10-31 58## 0.4.0 - 2023-10-31
31 59
32- Re-add impl_trait_projections 60- Re-add `impl_trait_projections`
33- switch to `embedded-io 0.6` 61- switch to `embedded-io 0.6`
34 62
35## 0.3.0 - 2023-09-14 63## 0.3.0 - 2023-09-14
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
index 7b7d2bf8e..99962f9f6 100644
--- a/embassy-sync/Cargo.toml
+++ b/embassy-sync/Cargo.toml
@@ -1,6 +1,6 @@
1[package] 1[package]
2name = "embassy-sync" 2name = "embassy-sync"
3version = "0.6.0" 3version = "0.7.0"
4edition = "2021" 4edition = "2021"
5description = "no-std, no-alloc synchronization primitives with async support" 5description = "no-std, no-alloc synchronization primitives with async support"
6repository = "https://github.com/embassy-rs/embassy" 6repository = "https://github.com/embassy-rs/embassy"
@@ -20,13 +20,14 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/
20target = "thumbv7em-none-eabi" 20target = "thumbv7em-none-eabi"
21 21
22[features] 22[features]
23std = ["critical-section/std"] 23std = []
24turbowakers = [] 24turbowakers = []
25 25
26[dependencies] 26[dependencies]
27defmt = { version = "0.3", optional = true } 27defmt = { version = "1.0.1", optional = true }
28log = { version = "0.4.14", optional = true } 28log = { version = "0.4.14", optional = true }
29 29
30futures-sink = { version = "0.3", default-features = false, features = [] }
30futures-util = { version = "0.3.17", default-features = false } 31futures-util = { version = "0.3.17", default-features = false }
31critical-section = "1.1" 32critical-section = "1.1"
32heapless = "0.8" 33heapless = "0.8"
@@ -37,7 +38,7 @@ embedded-io-async = { version = "0.6.1" }
37futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } 38futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
38futures-test = "0.3.17" 39futures-test = "0.3.17"
39futures-timer = "3.0.2" 40futures-timer = "3.0.2"
40futures-util = { version = "0.3.17", features = [ "channel" ] } 41futures-util = { version = "0.3.17", features = [ "channel", "sink" ] }
41 42
42# Enable critical-section implementation for std, for tests 43# Enable critical-section implementation for std, for tests
43critical-section = { version = "1.1", features = ["std"] } 44critical-section = { version = "1.1", features = ["std"] }
diff --git a/embassy-sync/README.md b/embassy-sync/README.md
index dec1fbc32..91c59884f 100644
--- a/embassy-sync/README.md
+++ b/embassy-sync/README.md
@@ -5,13 +5,14 @@ An [Embassy](https://embassy.dev) project.
5Synchronization primitives and data structures with async support: 5Synchronization primitives and data structures with async support:
6 6
7- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. 7- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer.
8- [`PriorityChannel`](channel::priority::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are shifted to the front of the channel. 8- [`PriorityChannel`](priority_channel::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are shifted to the front of the channel.
9- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. 9- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers.
10- [`Signal`](signal::Signal) - Signalling latest value to a single consumer. 10- [`Signal`](signal::Signal) - Signalling latest value to a single consumer.
11- [`Watch`](watch::Watch) - Signalling latest value to multiple consumers.
11- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. 12- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks.
12- [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. 13- [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits.
13- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. 14- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`.
14- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. 15- [`AtomicWaker`](waitqueue::AtomicWaker) - Utility to register and wake a `Waker` from interrupt context.
15- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. 16- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s.
16- [`LazyLock`](lazy_lock::LazyLock) - A value which is initialized on the first access 17- [`LazyLock`](lazy_lock::LazyLock) - A value which is initialized on the first access
17 18
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs
index 8a4a4c642..a41bc3569 100644
--- a/embassy-sync/src/blocking_mutex/mod.rs
+++ b/embassy-sync/src/blocking_mutex/mod.rs
@@ -50,6 +50,23 @@ impl<R: RawMutex, T> Mutex<R, T> {
50 f(inner) 50 f(inner)
51 }) 51 })
52 } 52 }
53
54 /// Creates a critical section and grants temporary mutable access to the protected data.
55 ///
56 /// # Safety
57 ///
58 /// This method is marked unsafe because calling this method re-entrantly, i.e. within
59 /// another `lock_mut` or `lock` closure, violates Rust's aliasing rules. Calling this
60 /// method at the same time from different tasks is safe. For a safe alternative with
61 /// mutable access that never causes UB, use a `RefCell` in a `Mutex`.
62 pub unsafe fn lock_mut<U>(&self, f: impl FnOnce(&mut T) -> U) -> U {
63 self.raw.lock(|| {
64 let ptr = self.data.get() as *mut T;
65 // Safety: we have exclusive access to the data, as long as this mutex is not locked re-entrantly
66 let inner = unsafe { &mut *ptr };
67 f(inner)
68 })
69 }
53} 70}
54 71
55impl<R, T> Mutex<R, T> { 72impl<R, T> Mutex<R, T> {
@@ -104,6 +121,7 @@ impl<T> Mutex<raw::CriticalSectionRawMutex, T> {
104 121
105impl<T> Mutex<raw::NoopRawMutex, T> { 122impl<T> Mutex<raw::NoopRawMutex, T> {
106 /// Borrows the data 123 /// Borrows the data
124 #[allow(clippy::should_implement_trait)]
107 pub fn borrow(&self) -> &T { 125 pub fn borrow(&self) -> &T {
108 let ptr = self.data.get() as *const T; 126 let ptr = self.data.get() as *const T;
109 unsafe { &*ptr } 127 unsafe { &*ptr }
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 55ac5fb66..856551417 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -72,6 +72,48 @@ where
72 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 72 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73 self.channel.poll_ready_to_send(cx) 73 self.channel.poll_ready_to_send(cx)
74 } 74 }
75
76 /// Returns the maximum number of elements the channel can hold.
77 ///
78 /// See [`Channel::capacity()`]
79 pub const fn capacity(&self) -> usize {
80 self.channel.capacity()
81 }
82
83 /// Returns the free capacity of the channel.
84 ///
85 /// See [`Channel::free_capacity()`]
86 pub fn free_capacity(&self) -> usize {
87 self.channel.free_capacity()
88 }
89
90 /// Clears all elements in the channel.
91 ///
92 /// See [`Channel::clear()`]
93 pub fn clear(&self) {
94 self.channel.clear();
95 }
96
97 /// Returns the number of elements currently in the channel.
98 ///
99 /// See [`Channel::len()`]
100 pub fn len(&self) -> usize {
101 self.channel.len()
102 }
103
104 /// Returns whether the channel is empty.
105 ///
106 /// See [`Channel::is_empty()`]
107 pub fn is_empty(&self) -> bool {
108 self.channel.is_empty()
109 }
110
111 /// Returns whether the channel is full.
112 ///
113 /// See [`Channel::is_full()`]
114 pub fn is_full(&self) -> bool {
115 self.channel.is_full()
116 }
75} 117}
76 118
77/// Send-only access to a [`Channel`] without knowing channel size. 119/// Send-only access to a [`Channel`] without knowing channel size.
@@ -122,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> {
122 } 164 }
123} 165}
124 166
167/// Send-only access to a [`Channel`] without knowing channel size.
168/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
169pub struct SendDynamicSender<'ch, T> {
170 pub(crate) channel: &'ch dyn DynamicChannel<T>,
171}
172
173impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
174 fn clone(&self) -> Self {
175 *self
176 }
177}
178
179impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
180unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
181unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
182
183impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
184where
185 M: RawMutex + Sync + Send,
186{
187 fn from(s: Sender<'ch, M, T, N>) -> Self {
188 Self { channel: s.channel }
189 }
190}
191
192impl<'ch, T> SendDynamicSender<'ch, T> {
193 /// Sends a value.
194 ///
195 /// See [`Channel::send()`]
196 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
197 DynamicSendFuture {
198 channel: self.channel,
199 message: Some(message),
200 }
201 }
202
203 /// Attempt to immediately send a message.
204 ///
205 /// See [`Channel::send()`]
206 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
207 self.channel.try_send_with_context(message, None)
208 }
209
210 /// Allows a poll_fn to poll until the channel is ready to send
211 ///
212 /// See [`Channel::poll_ready_to_send()`]
213 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
214 self.channel.poll_ready_to_send(cx)
215 }
216}
217
125/// Receive-only access to a [`Channel`]. 218/// Receive-only access to a [`Channel`].
126pub struct Receiver<'ch, M, T, const N: usize> 219pub struct Receiver<'ch, M, T, const N: usize>
127where 220where
@@ -166,6 +259,16 @@ where
166 self.channel.try_receive() 259 self.channel.try_receive()
167 } 260 }
168 261
262 /// Peek at the next value without removing it from the queue.
263 ///
264 /// See [`Channel::try_peek()`]
265 pub fn try_peek(&self) -> Result<T, TryReceiveError>
266 where
267 T: Clone,
268 {
269 self.channel.try_peek()
270 }
271
169 /// Allows a poll_fn to poll until the channel is ready to receive 272 /// Allows a poll_fn to poll until the channel is ready to receive
170 /// 273 ///
171 /// See [`Channel::poll_ready_to_receive()`] 274 /// See [`Channel::poll_ready_to_receive()`]
@@ -179,6 +282,48 @@ where
179 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 282 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
180 self.channel.poll_receive(cx) 283 self.channel.poll_receive(cx)
181 } 284 }
285
286 /// Returns the maximum number of elements the channel can hold.
287 ///
288 /// See [`Channel::capacity()`]
289 pub const fn capacity(&self) -> usize {
290 self.channel.capacity()
291 }
292
293 /// Returns the free capacity of the channel.
294 ///
295 /// See [`Channel::free_capacity()`]
296 pub fn free_capacity(&self) -> usize {
297 self.channel.free_capacity()
298 }
299
300 /// Clears all elements in the channel.
301 ///
302 /// See [`Channel::clear()`]
303 pub fn clear(&self) {
304 self.channel.clear();
305 }
306
307 /// Returns the number of elements currently in the channel.
308 ///
309 /// See [`Channel::len()`]
310 pub fn len(&self) -> usize {
311 self.channel.len()
312 }
313
314 /// Returns whether the channel is empty.
315 ///
316 /// See [`Channel::is_empty()`]
317 pub fn is_empty(&self) -> bool {
318 self.channel.is_empty()
319 }
320
321 /// Returns whether the channel is full.
322 ///
323 /// See [`Channel::is_full()`]
324 pub fn is_full(&self) -> bool {
325 self.channel.is_full()
326 }
182} 327}
183 328
184/// Receive-only access to a [`Channel`] without knowing channel size. 329/// Receive-only access to a [`Channel`] without knowing channel size.
@@ -209,6 +354,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
209 self.channel.try_receive_with_context(None) 354 self.channel.try_receive_with_context(None)
210 } 355 }
211 356
357 /// Peek at the next value without removing it from the queue.
358 ///
359 /// See [`Channel::try_peek()`]
360 pub fn try_peek(&self) -> Result<T, TryReceiveError>
361 where
362 T: Clone,
363 {
364 self.channel.try_peek_with_context(None)
365 }
366
212 /// Allows a poll_fn to poll until the channel is ready to receive 367 /// Allows a poll_fn to poll until the channel is ready to receive
213 /// 368 ///
214 /// See [`Channel::poll_ready_to_receive()`] 369 /// See [`Channel::poll_ready_to_receive()`]
@@ -233,6 +388,72 @@ where
233 } 388 }
234} 389}
235 390
391/// Receive-only access to a [`Channel`] without knowing channel size.
392/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
393pub struct SendDynamicReceiver<'ch, T> {
394 pub(crate) channel: &'ch dyn DynamicChannel<T>,
395}
396
397impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
398 fn clone(&self) -> Self {
399 *self
400 }
401}
402
403impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
404unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
405unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
406
407impl<'ch, T> SendDynamicReceiver<'ch, T> {
408 /// Receive the next value.
409 ///
410 /// See [`Channel::receive()`].
411 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
412 DynamicReceiveFuture { channel: self.channel }
413 }
414
415 /// Attempt to immediately receive the next value.
416 ///
417 /// See [`Channel::try_receive()`]
418 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
419 self.channel.try_receive_with_context(None)
420 }
421
422 /// Allows a poll_fn to poll until the channel is ready to receive
423 ///
424 /// See [`Channel::poll_ready_to_receive()`]
425 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
426 self.channel.poll_ready_to_receive(cx)
427 }
428
429 /// Poll the channel for the next item
430 ///
431 /// See [`Channel::poll_receive()`]
432 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
433 self.channel.poll_receive(cx)
434 }
435}
436
437impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
438where
439 M: RawMutex + Sync + Send,
440{
441 fn from(s: Receiver<'ch, M, T, N>) -> Self {
442 Self { channel: s.channel }
443 }
444}
445
446impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N>
447where
448 M: RawMutex,
449{
450 type Item = T;
451
452 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453 self.channel.poll_receive(cx).map(Some)
454 }
455}
456
236/// Future returned by [`Channel::receive`] and [`Receiver::receive`]. 457/// Future returned by [`Channel::receive`] and [`Receiver::receive`].
237#[must_use = "futures do nothing unless you `.await` or poll them"] 458#[must_use = "futures do nothing unless you `.await` or poll them"]
238pub struct ReceiveFuture<'ch, M, T, const N: usize> 459pub struct ReceiveFuture<'ch, M, T, const N: usize>
@@ -368,6 +589,10 @@ pub(crate) trait DynamicChannel<T> {
368 589
369 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; 590 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
370 591
592 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
593 where
594 T: Clone;
595
371 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; 596 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
372 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; 597 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
373 598
@@ -410,6 +635,31 @@ impl<T, const N: usize> ChannelState<T, N> {
410 self.try_receive_with_context(None) 635 self.try_receive_with_context(None)
411 } 636 }
412 637
638 fn try_peek(&mut self) -> Result<T, TryReceiveError>
639 where
640 T: Clone,
641 {
642 self.try_peek_with_context(None)
643 }
644
645 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
646 where
647 T: Clone,
648 {
649 if self.queue.is_full() {
650 self.senders_waker.wake();
651 }
652
653 if let Some(message) = self.queue.front() {
654 Ok(message.clone())
655 } else {
656 if let Some(cx) = cx {
657 self.receiver_waker.register(cx.waker());
658 }
659 Err(TryReceiveError::Empty)
660 }
661 }
662
413 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 663 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
414 if self.queue.is_full() { 664 if self.queue.is_full() {
415 self.senders_waker.wake(); 665 self.senders_waker.wake();
@@ -478,6 +728,9 @@ impl<T, const N: usize> ChannelState<T, N> {
478 } 728 }
479 729
480 fn clear(&mut self) { 730 fn clear(&mut self) {
731 if self.queue.is_full() {
732 self.senders_waker.wake();
733 }
481 self.queue.clear(); 734 self.queue.clear();
482 } 735 }
483 736
@@ -536,6 +789,13 @@ where
536 self.lock(|c| c.try_receive_with_context(cx)) 789 self.lock(|c| c.try_receive_with_context(cx))
537 } 790 }
538 791
792 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
793 where
794 T: Clone,
795 {
796 self.lock(|c| c.try_peek_with_context(cx))
797 }
798
539 /// Poll the channel for the next message 799 /// Poll the channel for the next message
540 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 800 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
541 self.lock(|c| c.poll_receive(cx)) 801 self.lock(|c| c.poll_receive(cx))
@@ -624,6 +884,17 @@ where
624 self.lock(|c| c.try_receive()) 884 self.lock(|c| c.try_receive())
625 } 885 }
626 886
887 /// Peek at the next value without removing it from the queue.
888 ///
889 /// This method will either receive a copy of the message from the channel immediately or return
890 /// an error if the channel is empty.
891 pub fn try_peek(&self) -> Result<T, TryReceiveError>
892 where
893 T: Clone,
894 {
895 self.lock(|c| c.try_peek())
896 }
897
627 /// Returns the maximum number of elements the channel can hold. 898 /// Returns the maximum number of elements the channel can hold.
628 pub const fn capacity(&self) -> usize { 899 pub const fn capacity(&self) -> usize {
629 N 900 N
@@ -671,6 +942,13 @@ where
671 Channel::try_receive_with_context(self, cx) 942 Channel::try_receive_with_context(self, cx)
672 } 943 }
673 944
945 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
946 where
947 T: Clone,
948 {
949 Channel::try_peek_with_context(self, cx)
950 }
951
674 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 952 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
675 Channel::poll_ready_to_send(self, cx) 953 Channel::poll_ready_to_send(self, cx)
676 } 954 }
@@ -684,6 +962,17 @@ where
684 } 962 }
685} 963}
686 964
965impl<M, T, const N: usize> futures_util::Stream for Channel<M, T, N>
966where
967 M: RawMutex,
968{
969 type Item = T;
970
971 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972 self.poll_receive(cx).map(Some)
973 }
974}
975
687#[cfg(test)] 976#[cfg(test)]
688mod tests { 977mod tests {
689 use core::time::Duration; 978 use core::time::Duration;
@@ -742,6 +1031,8 @@ mod tests {
742 fn simple_send_and_receive() { 1031 fn simple_send_and_receive() {
743 let c = Channel::<NoopRawMutex, u32, 3>::new(); 1032 let c = Channel::<NoopRawMutex, u32, 3>::new();
744 assert!(c.try_send(1).is_ok()); 1033 assert!(c.try_send(1).is_ok());
1034 assert_eq!(c.try_peek().unwrap(), 1);
1035 assert_eq!(c.try_peek().unwrap(), 1);
745 assert_eq!(c.try_receive().unwrap(), 1); 1036 assert_eq!(c.try_receive().unwrap(), 1);
746 } 1037 }
747 1038
@@ -772,6 +1063,8 @@ mod tests {
772 let r = c.dyn_receiver(); 1063 let r = c.dyn_receiver();
773 1064
774 assert!(s.try_send(1).is_ok()); 1065 assert!(s.try_send(1).is_ok());
1066 assert_eq!(r.try_peek().unwrap(), 1);
1067 assert_eq!(r.try_peek().unwrap(), 1);
775 assert_eq!(r.try_receive().unwrap(), 1); 1068 assert_eq!(r.try_receive().unwrap(), 1);
776 } 1069 }
777 1070
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 014bf1d06..5d91b4d9c 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -18,7 +18,9 @@ pub mod once_lock;
18pub mod pipe; 18pub mod pipe;
19pub mod priority_channel; 19pub mod priority_channel;
20pub mod pubsub; 20pub mod pubsub;
21pub mod rwlock;
21pub mod semaphore; 22pub mod semaphore;
22pub mod signal; 23pub mod signal;
23pub mod waitqueue; 24pub mod waitqueue;
25pub mod watch;
24pub mod zerocopy_channel; 26pub mod zerocopy_channel;
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 8c3a3af9f..7528a9f68 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -2,7 +2,7 @@
2//! 2//!
3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. 3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell}; 4use core::cell::{RefCell, UnsafeCell};
5use core::future::poll_fn; 5use core::future::{poll_fn, Future};
6use core::ops::{Deref, DerefMut}; 6use core::ops::{Deref, DerefMut};
7use core::task::Poll; 7use core::task::Poll;
8use core::{fmt, mem}; 8use core::{fmt, mem};
@@ -73,7 +73,7 @@ where
73 /// Lock the mutex. 73 /// Lock the mutex.
74 /// 74 ///
75 /// This will wait for the mutex to be unlocked if it's already locked. 75 /// This will wait for the mutex to be unlocked if it's already locked.
76 pub async fn lock(&self) -> MutexGuard<'_, M, T> { 76 pub fn lock(&self) -> impl Future<Output = MutexGuard<'_, M, T>> {
77 poll_fn(|cx| { 77 poll_fn(|cx| {
78 let ready = self.state.lock(|s| { 78 let ready = self.state.lock(|s| {
79 let mut s = s.borrow_mut(); 79 let mut s = s.borrow_mut();
@@ -92,7 +92,6 @@ where
92 Poll::Pending 92 Poll::Pending
93 } 93 }
94 }) 94 })
95 .await
96 } 95 }
97 96
98 /// Attempt to immediately lock the mutex. 97 /// Attempt to immediately lock the mutex.
@@ -138,7 +137,7 @@ impl<M: RawMutex, T> From<T> for Mutex<M, T> {
138impl<M, T> Default for Mutex<M, T> 137impl<M, T> Default for Mutex<M, T>
139where 138where
140 M: RawMutex, 139 M: RawMutex,
141 T: ?Sized + Default, 140 T: Default,
142{ 141{
143 fn default() -> Self { 142 fn default() -> Self {
144 Self::new(Default::default()) 143 Self::new(Default::default())
@@ -172,6 +171,7 @@ where
172/// 171///
173/// Dropping it unlocks the mutex. 172/// Dropping it unlocks the mutex.
174#[clippy::has_significant_drop] 173#[clippy::has_significant_drop]
174#[must_use = "if unused the Mutex will immediately unlock"]
175pub struct MutexGuard<'a, M, T> 175pub struct MutexGuard<'a, M, T>
176where 176where
177 M: RawMutex, 177 M: RawMutex,
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
index 55608ba32..cd05b986d 100644
--- a/embassy-sync/src/once_lock.rs
+++ b/embassy-sync/src/once_lock.rs
@@ -1,7 +1,7 @@
1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. 1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
2 2
3use core::cell::Cell; 3use core::cell::Cell;
4use core::future::poll_fn; 4use core::future::{poll_fn, Future};
5use core::mem::MaybeUninit; 5use core::mem::MaybeUninit;
6use core::sync::atomic::{AtomicBool, Ordering}; 6use core::sync::atomic::{AtomicBool, Ordering};
7use core::task::Poll; 7use core::task::Poll;
@@ -55,7 +55,7 @@ impl<T> OnceLock<T> {
55 55
56 /// Get a reference to the underlying value, waiting for it to be set. 56 /// Get a reference to the underlying value, waiting for it to be set.
57 /// If the value is already set, this will return immediately. 57 /// If the value is already set, this will return immediately.
58 pub async fn get(&self) -> &T { 58 pub fn get(&self) -> impl Future<Output = &T> {
59 poll_fn(|cx| match self.try_get() { 59 poll_fn(|cx| match self.try_get() {
60 Some(data) => Poll::Ready(data), 60 Some(data) => Poll::Ready(data),
61 None => { 61 None => {
@@ -63,7 +63,6 @@ impl<T> OnceLock<T> {
63 Poll::Pending 63 Poll::Pending
64 } 64 }
65 }) 65 })
66 .await
67 } 66 }
68 67
69 /// Try to get a reference to the underlying value if it exists. 68 /// Try to get a reference to the underlying value if it exists.
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index cd5b8ed75..2598652d2 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -532,6 +532,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N>
532 } 532 }
533} 533}
534 534
535//
536// Type-erased variants
537//
538
539pub(crate) trait DynamicPipe {
540 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
541 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
542
543 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
544 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
545
546 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
547 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
548
549 fn consume(&self, amt: usize);
550 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
551}
552
553impl<M, const N: usize> DynamicPipe for Pipe<M, N>
554where
555 M: RawMutex,
556{
557 fn consume(&self, amt: usize) {
558 Pipe::consume(self, amt)
559 }
560
561 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
562 Pipe::try_fill_buf_with_context(self, cx)
563 }
564
565 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
566 Pipe::write(self, buf).into()
567 }
568
569 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
570 Pipe::read(self, buf).into()
571 }
572
573 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
574 Pipe::try_read(self, buf)
575 }
576
577 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
578 Pipe::try_write(self, buf)
579 }
580
581 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
582 Pipe::try_write_with_context(self, cx, buf)
583 }
584
585 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
586 Pipe::try_read_with_context(self, cx, buf)
587 }
588}
589
590/// Write-only access to a [`DynamicPipe`].
591pub struct DynamicWriter<'p> {
592 pipe: &'p dyn DynamicPipe,
593}
594
595impl<'p> Clone for DynamicWriter<'p> {
596 fn clone(&self) -> Self {
597 *self
598 }
599}
600
601impl<'p> Copy for DynamicWriter<'p> {}
602
603impl<'p> DynamicWriter<'p> {
604 /// Write some bytes to the pipe.
605 ///
606 /// See [`Pipe::write()`]
607 pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
608 self.pipe.write(buf)
609 }
610
611 /// Attempt to immediately write some bytes to the pipe.
612 ///
613 /// See [`Pipe::try_write()`]
614 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
615 self.pipe.try_write(buf)
616 }
617}
618
619impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
620where
621 M: RawMutex,
622{
623 fn from(value: Writer<'p, M, N>) -> Self {
624 Self { pipe: value.pipe }
625 }
626}
627
628/// Future returned by [`DynamicWriter::write`].
629#[must_use = "futures do nothing unless you `.await` or poll them"]
630pub struct DynamicWriteFuture<'p> {
631 pipe: &'p dyn DynamicPipe,
632 buf: &'p [u8],
633}
634
635impl<'p> Future for DynamicWriteFuture<'p> {
636 type Output = usize;
637
638 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
639 match self.pipe.try_write_with_context(Some(cx), self.buf) {
640 Ok(n) => Poll::Ready(n),
641 Err(TryWriteError::Full) => Poll::Pending,
642 }
643 }
644}
645
646impl<'p> Unpin for DynamicWriteFuture<'p> {}
647
648impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
649where
650 M: RawMutex,
651{
652 fn from(value: WriteFuture<'p, M, N>) -> Self {
653 Self {
654 pipe: value.pipe,
655 buf: value.buf,
656 }
657 }
658}
659
660/// Read-only access to a [`DynamicPipe`].
661pub struct DynamicReader<'p> {
662 pipe: &'p dyn DynamicPipe,
663}
664
665impl<'p> DynamicReader<'p> {
666 /// Read some bytes from the pipe.
667 ///
668 /// See [`Pipe::read()`]
669 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
670 self.pipe.read(buf)
671 }
672
673 /// Attempt to immediately read some bytes from the pipe.
674 ///
675 /// See [`Pipe::try_read()`]
676 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
677 self.pipe.try_read(buf)
678 }
679
680 /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
681 ///
682 /// If no bytes are currently available to read, this function waits until at least one byte is available.
683 ///
684 /// If the reader is at end-of-file (EOF), an empty slice is returned.
685 pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
686 DynamicFillBufFuture { pipe: Some(self.pipe) }
687 }
688
689 /// Try returning contents of the internal buffer.
690 ///
691 /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
692 ///
693 /// If the reader is at end-of-file (EOF), an empty slice is returned.
694 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
695 unsafe { self.pipe.try_fill_buf_with_context(None) }
696 }
697
698 /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
699 pub fn consume(&mut self, amt: usize) {
700 self.pipe.consume(amt)
701 }
702}
703
704impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
705where
706 M: RawMutex,
707{
708 fn from(value: Reader<'p, M, N>) -> Self {
709 Self { pipe: value.pipe }
710 }
711}
712
713/// Future returned by [`Pipe::read`] and [`Reader::read`].
714#[must_use = "futures do nothing unless you `.await` or poll them"]
715pub struct DynamicReadFuture<'p> {
716 pipe: &'p dyn DynamicPipe,
717 buf: &'p mut [u8],
718}
719
720impl<'p> Future for DynamicReadFuture<'p> {
721 type Output = usize;
722
723 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
724 match self.pipe.try_read_with_context(Some(cx), self.buf) {
725 Ok(n) => Poll::Ready(n),
726 Err(TryReadError::Empty) => Poll::Pending,
727 }
728 }
729}
730
731impl<'p> Unpin for DynamicReadFuture<'p> {}
732
733impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
734where
735 M: RawMutex,
736{
737 fn from(value: ReadFuture<'p, M, N>) -> Self {
738 Self {
739 pipe: value.pipe,
740 buf: value.buf,
741 }
742 }
743}
744
745/// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`].
746#[must_use = "futures do nothing unless you `.await` or poll them"]
747pub struct DynamicFillBufFuture<'p> {
748 pipe: Option<&'p dyn DynamicPipe>,
749}
750
751impl<'p> Future for DynamicFillBufFuture<'p> {
752 type Output = &'p [u8];
753
754 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755 let pipe = self.pipe.take().unwrap();
756 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
757 Ok(buf) => Poll::Ready(buf),
758 Err(TryReadError::Empty) => {
759 self.pipe = Some(pipe);
760 Poll::Pending
761 }
762 }
763 }
764}
765
766impl<'p> Unpin for DynamicFillBufFuture<'p> {}
767
768impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
769where
770 M: RawMutex,
771{
772 fn from(value: FillBufFuture<'p, M, N>) -> Self {
773 Self {
774 pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
775 }
776 }
777}
778
535#[cfg(test)] 779#[cfg(test)]
536mod tests { 780mod tests {
537 use futures_executor::ThreadPool; 781 use futures_executor::ThreadPool;
@@ -619,6 +863,35 @@ mod tests {
619 let _ = w.clone(); 863 let _ = w.clone();
620 } 864 }
621 865
866 #[test]
867 fn dynamic_dispatch_pipe() {
868 let mut c = Pipe::<NoopRawMutex, 3>::new();
869 let (r, w) = c.split();
870 let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
871
872 assert!(w.try_write(&[42, 43]).is_ok());
873 let buf = r.try_fill_buf().unwrap();
874 assert_eq!(buf, &[42, 43]);
875 let buf = r.try_fill_buf().unwrap();
876 assert_eq!(buf, &[42, 43]);
877 r.consume(1);
878 let buf = r.try_fill_buf().unwrap();
879 assert_eq!(buf, &[43]);
880 r.consume(1);
881 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
882 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
883 assert_eq!(w.try_write(&[45, 46]), Ok(2));
884 let buf = r.try_fill_buf().unwrap();
885 assert_eq!(buf, &[44]); // only one byte due to wraparound.
886 r.consume(1);
887 let buf = r.try_fill_buf().unwrap();
888 assert_eq!(buf, &[45, 46]);
889 assert!(w.try_write(&[47]).is_ok());
890 let buf = r.try_fill_buf().unwrap();
891 assert_eq!(buf, &[45, 46, 47]);
892 r.consume(3);
893 }
894
622 #[futures_test::test] 895 #[futures_test::test]
623 async fn receiver_receives_given_try_write_async() { 896 async fn receiver_receives_given_try_write_async() {
624 let executor = ThreadPool::new().unwrap(); 897 let executor = ThreadPool::new().unwrap();
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 24c6c5a7f..623c52993 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -71,6 +71,48 @@ where
71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx) 72 self.channel.poll_ready_to_send(cx)
73 } 73 }
74
75 /// Returns the maximum number of elements the channel can hold.
76 ///
77 /// See [`PriorityChannel::capacity()`]
78 pub const fn capacity(&self) -> usize {
79 self.channel.capacity()
80 }
81
82 /// Returns the free capacity of the channel.
83 ///
84 /// See [`PriorityChannel::free_capacity()`]
85 pub fn free_capacity(&self) -> usize {
86 self.channel.free_capacity()
87 }
88
89 /// Clears all elements in the channel.
90 ///
91 /// See [`PriorityChannel::clear()`]
92 pub fn clear(&self) {
93 self.channel.clear();
94 }
95
96 /// Returns the number of elements currently in the channel.
97 ///
98 /// See [`PriorityChannel::len()`]
99 pub fn len(&self) -> usize {
100 self.channel.len()
101 }
102
103 /// Returns whether the channel is empty.
104 ///
105 /// See [`PriorityChannel::is_empty()`]
106 pub fn is_empty(&self) -> bool {
107 self.channel.is_empty()
108 }
109
110 /// Returns whether the channel is full.
111 ///
112 /// See [`PriorityChannel::is_full()`]
113 pub fn is_full(&self) -> bool {
114 self.channel.is_full()
115 }
74} 116}
75 117
76impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> 118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
@@ -133,6 +175,16 @@ where
133 self.channel.try_receive() 175 self.channel.try_receive()
134 } 176 }
135 177
178 /// Peek at the next value without removing it from the queue.
179 ///
180 /// See [`PriorityChannel::try_peek()`]
181 pub fn try_peek(&self) -> Result<T, TryReceiveError>
182 where
183 T: Clone,
184 {
185 self.channel.try_peek_with_context(None)
186 }
187
136 /// Allows a poll_fn to poll until the channel is ready to receive 188 /// Allows a poll_fn to poll until the channel is ready to receive
137 /// 189 ///
138 /// See [`PriorityChannel::poll_ready_to_receive()`] 190 /// See [`PriorityChannel::poll_ready_to_receive()`]
@@ -146,6 +198,59 @@ where
146 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 198 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
147 self.channel.poll_receive(cx) 199 self.channel.poll_receive(cx)
148 } 200 }
201
202 /// Removes the elements from the channel that satisfy the predicate.
203 ///
204 /// See [`PriorityChannel::remove_if()`]
205 pub fn remove_if<F>(&self, predicate: F)
206 where
207 F: Fn(&T) -> bool,
208 T: Clone,
209 {
210 self.channel.remove_if(predicate)
211 }
212
213 /// Returns the maximum number of elements the channel can hold.
214 ///
215 /// See [`PriorityChannel::capacity()`]
216 pub const fn capacity(&self) -> usize {
217 self.channel.capacity()
218 }
219
220 /// Returns the free capacity of the channel.
221 ///
222 /// See [`PriorityChannel::free_capacity()`]
223 pub fn free_capacity(&self) -> usize {
224 self.channel.free_capacity()
225 }
226
227 /// Clears all elements in the channel.
228 ///
229 /// See [`PriorityChannel::clear()`]
230 pub fn clear(&self) {
231 self.channel.clear();
232 }
233
234 /// Returns the number of elements currently in the channel.
235 ///
236 /// See [`PriorityChannel::len()`]
237 pub fn len(&self) -> usize {
238 self.channel.len()
239 }
240
241 /// Returns whether the channel is empty.
242 ///
243 /// See [`PriorityChannel::is_empty()`]
244 pub fn is_empty(&self) -> bool {
245 self.channel.is_empty()
246 }
247
248 /// Returns whether the channel is full.
249 ///
250 /// See [`PriorityChannel::is_full()`]
251 pub fn is_full(&self) -> bool {
252 self.channel.is_full()
253 }
149} 254}
150 255
151impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> 256impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
@@ -248,6 +353,31 @@ where
248 self.try_receive_with_context(None) 353 self.try_receive_with_context(None)
249 } 354 }
250 355
356 fn try_peek(&mut self) -> Result<T, TryReceiveError>
357 where
358 T: Clone,
359 {
360 self.try_peek_with_context(None)
361 }
362
363 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364 where
365 T: Clone,
366 {
367 if self.queue.len() == self.queue.capacity() {
368 self.senders_waker.wake();
369 }
370
371 if let Some(message) = self.queue.peek() {
372 Ok(message.clone())
373 } else {
374 if let Some(cx) = cx {
375 self.receiver_waker.register(cx.waker());
376 }
377 Err(TryReceiveError::Empty)
378 }
379 }
380
251 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 381 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
252 if self.queue.len() == self.queue.capacity() { 382 if self.queue.len() == self.queue.capacity() {
253 self.senders_waker.wake(); 383 self.senders_waker.wake();
@@ -316,6 +446,9 @@ where
316 } 446 }
317 447
318 fn clear(&mut self) { 448 fn clear(&mut self) {
449 if self.queue.len() == self.queue.capacity() {
450 self.senders_waker.wake();
451 }
319 self.queue.clear(); 452 self.queue.clear();
320 } 453 }
321 454
@@ -380,6 +513,13 @@ where
380 self.lock(|c| c.try_receive_with_context(cx)) 513 self.lock(|c| c.try_receive_with_context(cx))
381 } 514 }
382 515
516 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517 where
518 T: Clone,
519 {
520 self.lock(|c| c.try_peek_with_context(cx))
521 }
522
383 /// Poll the channel for the next message 523 /// Poll the channel for the next message
384 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 524 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
385 self.lock(|c| c.poll_receive(cx)) 525 self.lock(|c| c.poll_receive(cx))
@@ -450,6 +590,37 @@ where
450 self.lock(|c| c.try_receive()) 590 self.lock(|c| c.try_receive())
451 } 591 }
452 592
593 /// Peek at the next value without removing it from the queue.
594 ///
595 /// This method will either receive a copy of the message from the channel immediately or return
596 /// an error if the channel is empty.
597 pub fn try_peek(&self) -> Result<T, TryReceiveError>
598 where
599 T: Clone,
600 {
601 self.lock(|c| c.try_peek())
602 }
603
604 /// Removes elements from the channel based on the given predicate.
605 pub fn remove_if<F>(&self, predicate: F)
606 where
607 F: Fn(&T) -> bool,
608 T: Clone,
609 {
610 self.lock(|c| {
611 let mut new_heap = BinaryHeap::<T, K, N>::new();
612 for item in c.queue.iter() {
613 if !predicate(item) {
614 match new_heap.push(item.clone()) {
615 Ok(_) => (),
616 Err(_) => panic!("Error pushing item to heap"),
617 }
618 }
619 }
620 c.queue = new_heap;
621 });
622 }
623
453 /// Returns the maximum number of elements the channel can hold. 624 /// Returns the maximum number of elements the channel can hold.
454 pub const fn capacity(&self) -> usize { 625 pub const fn capacity(&self) -> usize {
455 N 626 N
@@ -499,6 +670,13 @@ where
499 PriorityChannel::try_receive_with_context(self, cx) 670 PriorityChannel::try_receive_with_context(self, cx)
500 } 671 }
501 672
673 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674 where
675 T: Clone,
676 {
677 PriorityChannel::try_peek_with_context(self, cx)
678 }
679
502 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 680 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
503 PriorityChannel::poll_ready_to_send(self, cx) 681 PriorityChannel::poll_ready_to_send(self, cx)
504 } 682 }
@@ -587,6 +765,8 @@ mod tests {
587 fn simple_send_and_receive() { 765 fn simple_send_and_receive() {
588 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); 766 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
589 assert!(c.try_send(1).is_ok()); 767 assert!(c.try_send(1).is_ok());
768 assert_eq!(c.try_peek().unwrap(), 1);
769 assert_eq!(c.try_peek().unwrap(), 1);
590 assert_eq!(c.try_receive().unwrap(), 1); 770 assert_eq!(c.try_receive().unwrap(), 1);
591 } 771 }
592 772
@@ -607,6 +787,8 @@ mod tests {
607 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 787 let r: DynamicReceiver<'_, u32> = c.receiver().into();
608 788
609 assert!(s.try_send(1).is_ok()); 789 assert!(s.try_send(1).is_ok());
790 assert_eq!(r.try_peek().unwrap(), 1);
791 assert_eq!(r.try_peek().unwrap(), 1);
610 assert_eq!(r.try_receive().unwrap(), 1); 792 assert_eq!(r.try_receive().unwrap(), 1);
611 } 793 }
612 794
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index a97eb7d5b..606efff0a 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -27,8 +27,8 @@ pub use subscriber::{DynSubscriber, Subscriber};
27/// 27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. 28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message 29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive 30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged. 31/// an error to indicate that it has lagged.
32/// 32///
33/// ## Example 33/// ## Example
34/// 34///
@@ -194,6 +194,25 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
194 } 194 }
195} 195}
196 196
197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
198 for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{
200 fn publish_immediate(&self, message: T) {
201 self.inner.lock(|s| {
202 let mut s = s.borrow_mut();
203 s.publish_immediate(message)
204 })
205 }
206
207 fn capacity(&self) -> usize {
208 self.capacity()
209 }
210
211 fn is_full(&self) -> bool {
212 self.is_full()
213 }
214}
215
197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> 216impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
198 for PubSubChannel<M, T, CAP, SUBS, PUBS> 217 for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{ 218{
@@ -246,13 +265,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
246 }) 265 })
247 } 266 }
248 267
249 fn publish_immediate(&self, message: T) {
250 self.inner.lock(|s| {
251 let mut s = s.borrow_mut();
252 s.publish_immediate(message)
253 })
254 }
255
256 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 268 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
257 self.inner.lock(|s| { 269 self.inner.lock(|s| {
258 let mut s = s.borrow_mut(); 270 let mut s = s.borrow_mut();
@@ -267,10 +279,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
267 }) 279 })
268 } 280 }
269 281
270 fn capacity(&self) -> usize {
271 self.capacity()
272 }
273
274 fn free_capacity(&self) -> usize { 282 fn free_capacity(&self) -> usize {
275 self.free_capacity() 283 self.free_capacity()
276 } 284 }
@@ -286,10 +294,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
286 fn is_empty(&self) -> bool { 294 fn is_empty(&self) -> bool {
287 self.is_empty() 295 self.is_empty()
288 } 296 }
289
290 fn is_full(&self) -> bool {
291 self.is_full()
292 }
293} 297}
294 298
295/// Internal state for the PubSub channel 299/// Internal state for the PubSub channel
@@ -417,6 +421,9 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
417 } 421 }
418 422
419 fn clear(&mut self) { 423 fn clear(&mut self) {
424 if self.is_full() {
425 self.publisher_wakers.wake();
426 }
420 self.queue.clear(); 427 self.queue.clear();
421 } 428 }
422 429
@@ -445,8 +452,6 @@ pub enum Error {
445 MaximumPublishersReached, 452 MaximumPublishersReached,
446} 453}
447 454
448/// 'Middle level' behaviour of the pubsub channel.
449/// This trait is used so that Sub and Pub can be generic over the channel.
450trait SealedPubSubBehavior<T> { 455trait SealedPubSubBehavior<T> {
451 /// Try to get a message from the queue with the given message id. 456 /// Try to get a message from the queue with the given message id.
452 /// 457 ///
@@ -462,12 +467,6 @@ trait SealedPubSubBehavior<T> {
462 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. 467 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
463 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; 468 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
464 469
465 /// Publish a message immediately
466 fn publish_immediate(&self, message: T);
467
468 /// Returns the maximum number of elements the channel can hold.
469 fn capacity(&self) -> usize;
470
471 /// Returns the free capacity of the channel. 470 /// Returns the free capacity of the channel.
472 /// 471 ///
473 /// This is equivalent to `capacity() - len()` 472 /// This is equivalent to `capacity() - len()`
@@ -482,9 +481,6 @@ trait SealedPubSubBehavior<T> {
482 /// Returns whether the channel is empty. 481 /// Returns whether the channel is empty.
483 fn is_empty(&self) -> bool; 482 fn is_empty(&self) -> bool;
484 483
485 /// Returns whether the channel is full.
486 fn is_full(&self) -> bool;
487
488 /// Let the channel know that a subscriber has dropped 484 /// Let the channel know that a subscriber has dropped
489 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 485 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
490 486
@@ -495,9 +491,16 @@ trait SealedPubSubBehavior<T> {
495/// 'Middle level' behaviour of the pubsub channel. 491/// 'Middle level' behaviour of the pubsub channel.
496/// This trait is used so that Sub and Pub can be generic over the channel. 492/// This trait is used so that Sub and Pub can be generic over the channel.
497#[allow(private_bounds)] 493#[allow(private_bounds)]
498pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {} 494pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
495 /// Publish a message immediately
496 fn publish_immediate(&self, message: T);
499 497
500impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {} 498 /// Returns the maximum number of elements the channel can hold.
499 fn capacity(&self) -> usize;
500
501 /// Returns whether the channel is full.
502 fn is_full(&self) -> bool;
503}
501 504
502/// The result of the subscriber wait procedure 505/// The result of the subscriber wait procedure
503#[derive(Debug, Clone, PartialEq, Eq)] 506#[derive(Debug, Clone, PartialEq, Eq)]
@@ -755,4 +758,30 @@ mod tests {
755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0); 758 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0); 759 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
757 } 760 }
761
762 #[futures_test::test]
763 async fn publisher_sink() {
764 use futures_util::{SinkExt, StreamExt};
765
766 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
767
768 let mut sub = channel.subscriber().unwrap();
769
770 let publ = channel.publisher().unwrap();
771 let mut sink = publ.sink();
772
773 sink.send(0).await.unwrap();
774 assert_eq!(0, sub.try_next_message_pure().unwrap());
775
776 sink.send(1).await.unwrap();
777 assert_eq!(1, sub.try_next_message_pure().unwrap());
778
779 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
780 .await
781 .unwrap();
782 assert_eq!(0, sub.try_next_message_pure().unwrap());
783 assert_eq!(1, sub.try_next_message_pure().unwrap());
784 assert_eq!(2, sub.try_next_message_pure().unwrap());
785 assert_eq!(3, sub.try_next_message_pure().unwrap());
786 }
758} 787}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e66b3b1db..7a1ab66de 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
74 pub fn is_full(&self) -> bool { 74 pub fn is_full(&self) -> bool {
75 self.channel.is_full() 75 self.channel.is_full()
76 } 76 }
77
78 /// Create a [`futures::Sink`] adapter for this publisher.
79 #[inline]
80 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
81 PubSink { publ: self, fut: None }
82 }
77} 83}
78 84
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 85impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
221 } 227 }
222} 228}
223 229
230#[must_use = "Sinks do nothing unless polled"]
231/// [`futures_sink::Sink`] adapter for [`Pub`].
232pub struct PubSink<'a, 'p, PSB, T>
233where
234 T: Clone,
235 PSB: PubSubBehavior<T> + ?Sized,
236{
237 publ: &'p Pub<'a, PSB, T>,
238 fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
239}
240
241impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
242where
243 PSB: PubSubBehavior<T> + ?Sized,
244 T: Clone,
245{
246 /// Try to make progress on the pending future if we have one.
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
248 let Some(mut fut) = self.fut.take() else {
249 return Poll::Ready(());
250 };
251
252 if Pin::new(&mut fut).poll(cx).is_pending() {
253 self.fut = Some(fut);
254 return Poll::Pending;
255 }
256
257 Poll::Ready(())
258 }
259}
260
261impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
262where
263 PSB: PubSubBehavior<T> + ?Sized,
264 T: Clone,
265{
266 type Error = core::convert::Infallible;
267
268 #[inline]
269 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270 self.poll(cx).map(Ok)
271 }
272
273 #[inline]
274 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
275 self.fut = Some(self.publ.publish(item));
276
277 Ok(())
278 }
279
280 #[inline]
281 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282 self.poll(cx).map(Ok)
283 }
284
285 #[inline]
286 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 self.poll(cx).map(Ok)
288 }
289}
290
224/// Future for the publisher wait action 291/// Future for the publisher wait action
225#[must_use = "futures do nothing unless you `.await` or poll them"] 292#[must_use = "futures do nothing unless you `.await` or poll them"]
226pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 293pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
diff --git a/embassy-sync/src/rwlock.rs b/embassy-sync/src/rwlock.rs
new file mode 100644
index 000000000..deeadd167
--- /dev/null
+++ b/embassy-sync/src/rwlock.rs
@@ -0,0 +1,387 @@
1//! Async read-write lock.
2//!
3//! This module provides a read-write lock that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell};
5use core::fmt;
6use core::future::{poll_fn, Future};
7use core::ops::{Deref, DerefMut};
8use core::task::Poll;
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex as BlockingMutex;
12use crate::waitqueue::WakerRegistration;
13
14/// Error returned by [`RwLock::try_read`] and [`RwLock::try_write`] when the lock is already held.
15#[derive(PartialEq, Eq, Clone, Copy, Debug)]
16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError;
18
19struct State {
20 readers: usize,
21 writer: bool,
22 waker: WakerRegistration,
23}
24
25/// Async read-write lock.
26///
27/// The read-write lock is generic over the raw mutex implementation `M` and the data `T` it protects.
28/// The raw read-write lock is used to guard access to the internal state. It
29/// is held for very short periods only, while locking and unlocking. It is *not* held
30/// for the entire time the async RwLock is locked.
31///
32/// Which implementation you select depends on the context in which you're using the read-write lock.
33///
34/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts.
35///
36/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor.
37///
38/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton.
39///
40
41pub struct RwLock<M, T>
42where
43 M: RawMutex,
44 T: ?Sized,
45{
46 state: BlockingMutex<M, RefCell<State>>,
47 inner: UnsafeCell<T>,
48}
49
50unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for RwLock<M, T> {}
51unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for RwLock<M, T> {}
52
53/// Async read-write lock.
54impl<M, T> RwLock<M, T>
55where
56 M: RawMutex,
57{
58 /// Create a new read-write lock with the given value.
59 pub const fn new(value: T) -> Self {
60 Self {
61 inner: UnsafeCell::new(value),
62 state: BlockingMutex::new(RefCell::new(State {
63 readers: 0,
64 writer: false,
65 waker: WakerRegistration::new(),
66 })),
67 }
68 }
69}
70
71impl<M, T> RwLock<M, T>
72where
73 M: RawMutex,
74 T: ?Sized,
75{
76 /// Lock the read-write lock for reading.
77 ///
78 /// This will wait for the lock to be available if it's already locked for writing.
79 pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, M, T>> {
80 poll_fn(|cx| {
81 let ready = self.state.lock(|s| {
82 let mut s = s.borrow_mut();
83 if s.writer {
84 s.waker.register(cx.waker());
85 false
86 } else {
87 s.readers += 1;
88 true
89 }
90 });
91
92 if ready {
93 Poll::Ready(RwLockReadGuard { rwlock: self })
94 } else {
95 Poll::Pending
96 }
97 })
98 }
99
100 /// Lock the read-write lock for writing.
101 ///
102 /// This will wait for the lock to be available if it's already locked for reading or writing.
103 pub fn write(&self) -> impl Future<Output = RwLockWriteGuard<'_, M, T>> {
104 poll_fn(|cx| {
105 let ready = self.state.lock(|s| {
106 let mut s = s.borrow_mut();
107 if s.writer || s.readers > 0 {
108 s.waker.register(cx.waker());
109 false
110 } else {
111 s.writer = true;
112 true
113 }
114 });
115
116 if ready {
117 Poll::Ready(RwLockWriteGuard { rwlock: self })
118 } else {
119 Poll::Pending
120 }
121 })
122 }
123
124 /// Attempt to immediately lock the rwlock.
125 ///
126 /// If the rwlock is already locked, this will return an error instead of waiting.
127 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, M, T>, TryLockError> {
128 self.state
129 .lock(|s| {
130 let mut s = s.borrow_mut();
131 if s.writer {
132 return Err(());
133 }
134 s.readers += 1;
135 Ok(())
136 })
137 .map_err(|_| TryLockError)?;
138
139 Ok(RwLockReadGuard { rwlock: self })
140 }
141
142 /// Attempt to immediately lock the rwlock.
143 ///
144 /// If the rwlock is already locked, this will return an error instead of waiting.
145 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, M, T>, TryLockError> {
146 self.state
147 .lock(|s| {
148 let mut s = s.borrow_mut();
149 if s.writer || s.readers > 0 {
150 return Err(());
151 }
152 s.writer = true;
153 Ok(())
154 })
155 .map_err(|_| TryLockError)?;
156
157 Ok(RwLockWriteGuard { rwlock: self })
158 }
159
160 /// Consumes this read-write lock, returning the underlying data.
161 pub fn into_inner(self) -> T
162 where
163 T: Sized,
164 {
165 self.inner.into_inner()
166 }
167
168 /// Returns a mutable reference to the underlying data.
169 ///
170 /// Since this call borrows the RwLock mutably, no actual locking needs to
171 /// take place -- the mutable borrow statically guarantees no locks exist.
172 pub fn get_mut(&mut self) -> &mut T {
173 self.inner.get_mut()
174 }
175}
176
177impl<M: RawMutex, T> From<T> for RwLock<M, T> {
178 fn from(from: T) -> Self {
179 Self::new(from)
180 }
181}
182
183impl<M, T> Default for RwLock<M, T>
184where
185 M: RawMutex,
186 T: Default,
187{
188 fn default() -> Self {
189 Self::new(Default::default())
190 }
191}
192
193impl<M, T> fmt::Debug for RwLock<M, T>
194where
195 M: RawMutex,
196 T: ?Sized + fmt::Debug,
197{
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 let mut d = f.debug_struct("RwLock");
200 match self.try_read() {
201 Ok(guard) => d.field("inner", &&*guard),
202 Err(TryLockError) => d.field("inner", &"Locked"),
203 }
204 .finish_non_exhaustive()
205 }
206}
207
208/// Async read lock guard.
209///
210/// Owning an instance of this type indicates having
211/// successfully locked the read-write lock for reading, and grants access to the contents.
212///
213/// Dropping it unlocks the read-write lock.
214#[clippy::has_significant_drop]
215#[must_use = "if unused the RwLock will immediately unlock"]
216pub struct RwLockReadGuard<'a, R, T>
217where
218 R: RawMutex,
219 T: ?Sized,
220{
221 rwlock: &'a RwLock<R, T>,
222}
223
224impl<'a, M, T> Drop for RwLockReadGuard<'a, M, T>
225where
226 M: RawMutex,
227 T: ?Sized,
228{
229 fn drop(&mut self) {
230 self.rwlock.state.lock(|s| {
231 let mut s = unwrap!(s.try_borrow_mut());
232 s.readers -= 1;
233 if s.readers == 0 {
234 s.waker.wake();
235 }
236 })
237 }
238}
239
240impl<'a, M, T> Deref for RwLockReadGuard<'a, M, T>
241where
242 M: RawMutex,
243 T: ?Sized,
244{
245 type Target = T;
246 fn deref(&self) -> &Self::Target {
247 // Safety: the RwLockReadGuard represents shared access to the contents
248 // of the read-write lock, so it's OK to get it.
249 unsafe { &*(self.rwlock.inner.get() as *const T) }
250 }
251}
252
253impl<'a, M, T> fmt::Debug for RwLockReadGuard<'a, M, T>
254where
255 M: RawMutex,
256 T: ?Sized + fmt::Debug,
257{
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 fmt::Debug::fmt(&**self, f)
260 }
261}
262
263impl<'a, M, T> fmt::Display for RwLockReadGuard<'a, M, T>
264where
265 M: RawMutex,
266 T: ?Sized + fmt::Display,
267{
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 fmt::Display::fmt(&**self, f)
270 }
271}
272
273/// Async write lock guard.
274///
275/// Owning an instance of this type indicates having
276/// successfully locked the read-write lock for writing, and grants access to the contents.
277///
278/// Dropping it unlocks the read-write lock.
279#[clippy::has_significant_drop]
280#[must_use = "if unused the RwLock will immediately unlock"]
281pub struct RwLockWriteGuard<'a, R, T>
282where
283 R: RawMutex,
284 T: ?Sized,
285{
286 rwlock: &'a RwLock<R, T>,
287}
288
289impl<'a, R, T> Drop for RwLockWriteGuard<'a, R, T>
290where
291 R: RawMutex,
292 T: ?Sized,
293{
294 fn drop(&mut self) {
295 self.rwlock.state.lock(|s| {
296 let mut s = unwrap!(s.try_borrow_mut());
297 s.writer = false;
298 s.waker.wake();
299 })
300 }
301}
302
303impl<'a, R, T> Deref for RwLockWriteGuard<'a, R, T>
304where
305 R: RawMutex,
306 T: ?Sized,
307{
308 type Target = T;
309 fn deref(&self) -> &Self::Target {
310 // Safety: the RwLockWriteGuard represents exclusive access to the contents
311 // of the read-write lock, so it's OK to get it.
312 unsafe { &*(self.rwlock.inner.get() as *mut T) }
313 }
314}
315
316impl<'a, R, T> DerefMut for RwLockWriteGuard<'a, R, T>
317where
318 R: RawMutex,
319 T: ?Sized,
320{
321 fn deref_mut(&mut self) -> &mut Self::Target {
322 // Safety: the RwLockWriteGuard represents exclusive access to the contents
323 // of the read-write lock, so it's OK to get it.
324 unsafe { &mut *(self.rwlock.inner.get()) }
325 }
326}
327
328impl<'a, R, T> fmt::Debug for RwLockWriteGuard<'a, R, T>
329where
330 R: RawMutex,
331 T: ?Sized + fmt::Debug,
332{
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 fmt::Debug::fmt(&**self, f)
335 }
336}
337
338impl<'a, R, T> fmt::Display for RwLockWriteGuard<'a, R, T>
339where
340 R: RawMutex,
341 T: ?Sized + fmt::Display,
342{
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 fmt::Display::fmt(&**self, f)
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use crate::blocking_mutex::raw::NoopRawMutex;
351 use crate::rwlock::RwLock;
352
353 #[futures_test::test]
354 async fn read_guard_releases_lock_when_dropped() {
355 let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]);
356
357 {
358 let guard = rwlock.read().await;
359 assert_eq!(*guard, [0, 1]);
360 }
361
362 {
363 let guard = rwlock.read().await;
364 assert_eq!(*guard, [0, 1]);
365 }
366
367 assert_eq!(*rwlock.read().await, [0, 1]);
368 }
369
370 #[futures_test::test]
371 async fn write_guard_releases_lock_when_dropped() {
372 let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]);
373
374 {
375 let mut guard = rwlock.write().await;
376 assert_eq!(*guard, [0, 1]);
377 guard[1] = 2;
378 }
379
380 {
381 let guard = rwlock.read().await;
382 assert_eq!(*guard, [0, 2]);
383 }
384
385 assert_eq!(*rwlock.read().await, [0, 2]);
386 }
387}
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index a0f4b5a74..e7095401e 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -6,7 +6,7 @@ use core::task::{Context, Poll, Waker};
6use crate::blocking_mutex::raw::RawMutex; 6use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex; 7use crate::blocking_mutex::Mutex;
8 8
9/// Single-slot signaling primitive. 9/// Single-slot signaling primitive for a _single_ consumer.
10/// 10///
11/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except 11/// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except
12/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead 12/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead
@@ -17,6 +17,7 @@ use crate::blocking_mutex::Mutex;
17/// updates. 17/// updates.
18/// 18///
19/// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead. 19/// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead.
20/// For multiple consumers, use [`Watch`](crate::watch::Watch) instead.
20/// 21///
21/// Signals are generally declared as `static`s and then borrowed as required. 22/// Signals are generally declared as `static`s and then borrowed as required.
22/// 23///
@@ -106,7 +107,7 @@ where
106 }) 107 })
107 } 108 }
108 109
109 /// Future that completes when this Signal has been signaled. 110 /// Future that completes when this Signal has been signaled, taking the value out of the signal.
110 pub fn wait(&self) -> impl Future<Output = T> + '_ { 111 pub fn wait(&self) -> impl Future<Output = T> + '_ {
111 poll_fn(move |cx| self.poll_wait(cx)) 112 poll_fn(move |cx| self.poll_wait(cx))
112 } 113 }
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs
index 63fe04a6e..5a9910e7f 100644
--- a/embassy-sync/src/waitqueue/atomic_waker.rs
+++ b/embassy-sync/src/waitqueue/atomic_waker.rs
@@ -1,26 +1,28 @@
1use core::cell::Cell; 1use core::cell::Cell;
2use core::task::Waker; 2use core::task::Waker;
3 3
4use crate::blocking_mutex::raw::CriticalSectionRawMutex; 4use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
5use crate::blocking_mutex::Mutex; 5use crate::blocking_mutex::Mutex;
6 6
7/// Utility struct to register and wake a waker. 7/// Utility struct to register and wake a waker.
8pub struct AtomicWaker { 8/// If a waker is registered, registering another waker will replace the previous one without waking it.
9 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, 9/// Intended to wake a task from an interrupt. Therefore, it is generally not expected,
10/// that multiple tasks register try to register a waker simultaneously.
11pub struct GenericAtomicWaker<M: RawMutex> {
12 waker: Mutex<M, Cell<Option<Waker>>>,
10} 13}
11 14
12impl AtomicWaker { 15impl<M: RawMutex> GenericAtomicWaker<M> {
13 /// Create a new `AtomicWaker`. 16 /// Create a new `AtomicWaker`.
14 pub const fn new() -> Self { 17 pub const fn new(mutex: M) -> Self {
15 Self { 18 Self {
16 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), 19 waker: Mutex::const_new(mutex, Cell::new(None)),
17 } 20 }
18 } 21 }
19 22
20 /// Register a waker. Overwrites the previous waker, if any. 23 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&self, w: &Waker) { 24 pub fn register(&self, w: &Waker) {
22 critical_section::with(|cs| { 25 self.waker.lock(|cell| {
23 let cell = self.waker.borrow(cs);
24 cell.set(match cell.replace(None) { 26 cell.set(match cell.replace(None) {
25 Some(w2) if (w2.will_wake(w)) => Some(w2), 27 Some(w2) if (w2.will_wake(w)) => Some(w2),
26 _ => Some(w.clone()), 28 _ => Some(w.clone()),
@@ -30,8 +32,7 @@ impl AtomicWaker {
30 32
31 /// Wake the registered waker, if any. 33 /// Wake the registered waker, if any.
32 pub fn wake(&self) { 34 pub fn wake(&self) {
33 critical_section::with(|cs| { 35 self.waker.lock(|cell| {
34 let cell = self.waker.borrow(cs);
35 if let Some(w) = cell.replace(None) { 36 if let Some(w) = cell.replace(None) {
36 w.wake_by_ref(); 37 w.wake_by_ref();
37 cell.set(Some(w)); 38 cell.set(Some(w));
@@ -39,3 +40,27 @@ impl AtomicWaker {
39 }) 40 })
40 } 41 }
41} 42}
43
44/// Utility struct to register and wake a waker.
45pub struct AtomicWaker {
46 waker: GenericAtomicWaker<CriticalSectionRawMutex>,
47}
48
49impl AtomicWaker {
50 /// Create a new `AtomicWaker`.
51 pub const fn new() -> Self {
52 Self {
53 waker: GenericAtomicWaker::new(CriticalSectionRawMutex::new()),
54 }
55 }
56
57 /// Register a waker. Overwrites the previous waker, if any.
58 pub fn register(&self, w: &Waker) {
59 self.waker.register(w);
60 }
61
62 /// Wake the registered waker, if any.
63 pub fn wake(&self) {
64 self.waker.wake();
65 }
66}
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
index 5c6a96ec8..c06b83056 100644
--- a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
+++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs
@@ -4,6 +4,9 @@ use core::sync::atomic::{AtomicPtr, Ordering};
4use core::task::Waker; 4use core::task::Waker;
5 5
6/// Utility struct to register and wake a waker. 6/// Utility struct to register and wake a waker.
7/// If a waker is registered, registering another waker will replace the previous one without waking it.
8/// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected,
9/// that multiple tasks register try to register a waker simultaneously.
7pub struct AtomicWaker { 10pub struct AtomicWaker {
8 waker: AtomicPtr<()>, 11 waker: AtomicPtr<()>,
9} 12}
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 0e520bf40..0384d6bed 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -3,6 +3,8 @@ use core::task::Waker;
3use heapless::Vec; 3use heapless::Vec;
4 4
5/// Utility struct to register and wake multiple wakers. 5/// Utility struct to register and wake multiple wakers.
6/// Queue of wakers with a maximum length of `N`.
7/// Intended for waking multiple tasks.
6pub struct MultiWakerRegistration<const N: usize> { 8pub struct MultiWakerRegistration<const N: usize> {
7 wakers: Vec<Waker, N>, 9 wakers: Vec<Waker, N>,
8} 10}
diff --git a/embassy-sync/src/waitqueue/waker_registration.rs b/embassy-sync/src/waitqueue/waker_registration.rs
index 9b666e7c4..7f24f8fb6 100644
--- a/embassy-sync/src/waitqueue/waker_registration.rs
+++ b/embassy-sync/src/waitqueue/waker_registration.rs
@@ -2,6 +2,10 @@ use core::mem;
2use core::task::Waker; 2use core::task::Waker;
3 3
4/// Utility struct to register and wake a waker. 4/// Utility struct to register and wake a waker.
5/// If a waker is registered, registering another waker will replace the previous one.
6/// The previous waker will be woken in this case, giving it a chance to reregister itself.
7/// Although it is possible to wake multiple tasks this way,
8/// this will cause them to wake each other in a loop registering themselves.
5#[derive(Debug, Default)] 9#[derive(Debug, Default)]
6pub struct WakerRegistration { 10pub struct WakerRegistration {
7 waker: Option<Waker>, 11 waker: Option<Waker>,
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
new file mode 100644
index 000000000..08d6a833d
--- /dev/null
+++ b/embassy-sync/src/watch.rs
@@ -0,0 +1,1121 @@
1//! A synchronization primitive for passing the latest value to **multiple** receivers.
2
3use core::cell::RefCell;
4use core::future::{poll_fn, Future};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// The `Watch` is a single-slot signaling primitive that allows _multiple_ (`N`) receivers to concurrently await
14/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
15/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
16/// value when a new one is sent, without waiting for all receivers to read the previous value.
17///
18/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
19/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
20/// always provided with the latest value.
21///
22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
24/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
25/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
26/// ```
27///
28/// use futures_executor::block_on;
29/// use embassy_sync::watch::Watch;
30/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
31///
32/// let f = async {
33///
34/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
35///
36/// // Obtain receivers and sender
37/// let mut rcv0 = WATCH.receiver().unwrap();
38/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
39/// let mut snd = WATCH.sender();
40///
41/// // No more receivers, and no update
42/// assert!(WATCH.receiver().is_none());
43/// assert_eq!(rcv1.try_changed(), None);
44///
45/// snd.send(10);
46///
47/// // Receive the new value (async or try)
48/// assert_eq!(rcv0.changed().await, 10);
49/// assert_eq!(rcv1.try_changed(), Some(10));
50///
51/// // No update
52/// assert_eq!(rcv0.try_changed(), None);
53/// assert_eq!(rcv1.try_changed(), None);
54///
55/// snd.send(20);
56///
57/// // Using `get` marks the value as seen
58/// assert_eq!(rcv1.get().await, 20);
59/// assert_eq!(rcv1.try_changed(), None);
60///
61/// // But `get` also returns when unchanged
62/// assert_eq!(rcv1.get().await, 20);
63/// assert_eq!(rcv1.get().await, 20);
64///
65/// };
66/// block_on(f);
67/// ```
68pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
69 mutex: Mutex<M, RefCell<WatchState<T, N>>>,
70}
71
72struct WatchState<T: Clone, const N: usize> {
73 data: Option<T>,
74 current_id: u64,
75 wakers: MultiWakerRegistration<N>,
76 receiver_count: usize,
77}
78
79trait SealedWatchBehavior<T> {
80 /// Poll the `Watch` for the current value, making it as seen.
81 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
82
83 /// Poll the `Watch` for the value if it matches the predicate function
84 /// `f`, making it as seen.
85 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
86
87 /// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
88 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
89
90 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
91 fn try_changed(&self, id: &mut u64) -> Option<T>;
92
93 /// Poll the `Watch` for a changed value that matches the predicate function
94 /// `f`, marking it as seen.
95 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
96
97 /// Tries to retrieve the value of the `Watch` if it has changed and matches the
98 /// predicate function `f`, marking it as seen.
99 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
100
101 /// Used when a receiver is dropped to decrement the receiver count.
102 ///
103 /// ## This method should not be called by the user.
104 fn drop_receiver(&self);
105
106 /// Clears the value of the `Watch`.
107 fn clear(&self);
108
109 /// Sends a new value to the `Watch`.
110 fn send(&self, val: T);
111
112 /// Modify the value of the `Watch` using a closure. Returns `false` if the
113 /// `Watch` does not already contain a value.
114 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
115
116 /// Modify the value of the `Watch` using a closure. Returns `false` if the
117 /// `Watch` does not already contain a value.
118 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
119}
120
121/// A trait representing the 'inner' behavior of the `Watch`.
122#[allow(private_bounds)]
123pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
124 /// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
125 fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
126
127 /// Tries to get the value of the `Watch` if it matches the predicate function
128 /// `f`, marking it as seen.
129 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
130
131 /// Checks if the `Watch` is been initialized with a value.
132 fn contains_value(&self) -> bool;
133}
134
135impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
136 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
137 self.mutex.lock(|state| {
138 let mut s = state.borrow_mut();
139 match &s.data {
140 Some(data) => {
141 *id = s.current_id;
142 Poll::Ready(data.clone())
143 }
144 None => {
145 s.wakers.register(cx.waker());
146 Poll::Pending
147 }
148 }
149 })
150 }
151
152 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
153 self.mutex.lock(|state| {
154 let mut s = state.borrow_mut();
155 match s.data {
156 Some(ref data) if f(data) => {
157 *id = s.current_id;
158 Poll::Ready(data.clone())
159 }
160 _ => {
161 s.wakers.register(cx.waker());
162 Poll::Pending
163 }
164 }
165 })
166 }
167
168 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
169 self.mutex.lock(|state| {
170 let mut s = state.borrow_mut();
171 match (&s.data, s.current_id > *id) {
172 (Some(data), true) => {
173 *id = s.current_id;
174 Poll::Ready(data.clone())
175 }
176 _ => {
177 s.wakers.register(cx.waker());
178 Poll::Pending
179 }
180 }
181 })
182 }
183
184 fn try_changed(&self, id: &mut u64) -> Option<T> {
185 self.mutex.lock(|state| {
186 let s = state.borrow();
187 match s.current_id > *id {
188 true => {
189 *id = s.current_id;
190 s.data.clone()
191 }
192 false => None,
193 }
194 })
195 }
196
197 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
198 self.mutex.lock(|state| {
199 let mut s = state.borrow_mut();
200 match (&s.data, s.current_id > *id) {
201 (Some(data), true) if f(data) => {
202 *id = s.current_id;
203 Poll::Ready(data.clone())
204 }
205 _ => {
206 s.wakers.register(cx.waker());
207 Poll::Pending
208 }
209 }
210 })
211 }
212
213 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
214 self.mutex.lock(|state| {
215 let s = state.borrow();
216 match (&s.data, s.current_id > *id) {
217 (Some(data), true) if f(data) => {
218 *id = s.current_id;
219 s.data.clone()
220 }
221 _ => None,
222 }
223 })
224 }
225
226 fn drop_receiver(&self) {
227 self.mutex.lock(|state| {
228 let mut s = state.borrow_mut();
229 s.receiver_count -= 1;
230 })
231 }
232
233 fn clear(&self) {
234 self.mutex.lock(|state| {
235 let mut s = state.borrow_mut();
236 s.data = None;
237 })
238 }
239
240 fn send(&self, val: T) {
241 self.mutex.lock(|state| {
242 let mut s = state.borrow_mut();
243 s.data = Some(val);
244 s.current_id += 1;
245 s.wakers.wake();
246 })
247 }
248
249 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
250 self.mutex.lock(|state| {
251 let mut s = state.borrow_mut();
252 f(&mut s.data);
253 s.current_id += 1;
254 s.wakers.wake();
255 })
256 }
257
258 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
259 self.mutex.lock(|state| {
260 let mut s = state.borrow_mut();
261 if f(&mut s.data) {
262 s.current_id += 1;
263 s.wakers.wake();
264 }
265 })
266 }
267}
268
269impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
270 fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
271 self.mutex.lock(|state| {
272 let s = state.borrow();
273 if let Some(id) = id {
274 *id = s.current_id;
275 }
276 s.data.clone()
277 })
278 }
279
280 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
281 self.mutex.lock(|state| {
282 let s = state.borrow();
283 match s.data {
284 Some(ref data) if f(data) => {
285 if let Some(id) = id {
286 *id = s.current_id;
287 }
288 Some(data.clone())
289 }
290 _ => None,
291 }
292 })
293 }
294
295 fn contains_value(&self) -> bool {
296 self.mutex.lock(|state| state.borrow().data.is_some())
297 }
298}
299
300impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
301 /// Create a new `Watch` channel for `N` receivers.
302 pub const fn new() -> Self {
303 Self {
304 mutex: Mutex::new(RefCell::new(WatchState {
305 data: None,
306 current_id: 0,
307 wakers: MultiWakerRegistration::new(),
308 receiver_count: 0,
309 })),
310 }
311 }
312
313 /// Create a new `Watch` channel with default data.
314 pub const fn new_with(data: T) -> Self {
315 Self {
316 mutex: Mutex::new(RefCell::new(WatchState {
317 data: Some(data),
318 current_id: 0,
319 wakers: MultiWakerRegistration::new(),
320 receiver_count: 0,
321 })),
322 }
323 }
324
325 /// Create a new [`Sender`] for the `Watch`.
326 pub fn sender(&self) -> Sender<'_, M, T, N> {
327 Sender(Snd::new(self))
328 }
329
330 /// Create a new [`DynSender`] for the `Watch`.
331 pub fn dyn_sender(&self) -> DynSender<'_, T> {
332 DynSender(Snd::new(self))
333 }
334
335 /// Try to create a new [`Receiver`] for the `Watch`. If the
336 /// maximum number of receivers has been reached, `None` is returned.
337 pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
338 self.mutex.lock(|state| {
339 let mut s = state.borrow_mut();
340 if s.receiver_count < N {
341 s.receiver_count += 1;
342 Some(Receiver(Rcv::new(self, 0)))
343 } else {
344 None
345 }
346 })
347 }
348
349 /// Try to create a new [`DynReceiver`] for the `Watch`. If the
350 /// maximum number of receivers has been reached, `None` is returned.
351 pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
352 self.mutex.lock(|state| {
353 let mut s = state.borrow_mut();
354 if s.receiver_count < N {
355 s.receiver_count += 1;
356 Some(DynReceiver(Rcv::new(self, 0)))
357 } else {
358 None
359 }
360 })
361 }
362
363 /// Try to create a new [`AnonReceiver`] for the `Watch`.
364 pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
365 AnonReceiver(AnonRcv::new(self, 0))
366 }
367
368 /// Try to create a new [`DynAnonReceiver`] for the `Watch`.
369 pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
370 DynAnonReceiver(AnonRcv::new(self, 0))
371 }
372
373 /// Returns the message ID of the latest message sent to the `Watch`.
374 ///
375 /// This counter is monotonic, and is incremented every time a new message is sent.
376 pub fn get_msg_id(&self) -> u64 {
377 self.mutex.lock(|state| state.borrow().current_id)
378 }
379
380 /// Tries to get the value of the `Watch`.
381 pub fn try_get(&self) -> Option<T> {
382 WatchBehavior::try_get(self, None)
383 }
384
385 /// Tries to get the value of the `Watch` if it matches the predicate function `f`.
386 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
387 where
388 F: Fn(&T) -> bool,
389 {
390 WatchBehavior::try_get_and(self, None, &mut f)
391 }
392}
393
394/// A receiver can `.await` a change in the `Watch` value.
395pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
396 watch: &'a W,
397 _phantom: PhantomData<T>,
398}
399
400impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
401 fn clone(&self) -> Self {
402 Self {
403 watch: self.watch,
404 _phantom: PhantomData,
405 }
406 }
407}
408
409impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
410 /// Creates a new `Receiver` with a reference to the `Watch`.
411 fn new(watch: &'a W) -> Self {
412 Self {
413 watch,
414 _phantom: PhantomData,
415 }
416 }
417
418 /// Sends a new value to the `Watch`.
419 pub fn send(&self, val: T) {
420 self.watch.send(val)
421 }
422
423 /// Clears the value of the `Watch`.
424 /// This will cause calls to [`Rcv::get`] to be pending.
425 pub fn clear(&self) {
426 self.watch.clear()
427 }
428
429 /// Tries to retrieve the value of the `Watch`.
430 pub fn try_get(&self) -> Option<T> {
431 self.watch.try_get(None)
432 }
433
434 /// Tries to peek the current value of the `Watch` if it matches the predicate
435 /// function `f`.
436 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
437 where
438 F: Fn(&T) -> bool,
439 {
440 self.watch.try_get_and(None, &mut f)
441 }
442
443 /// Returns true if the `Watch` contains a value.
444 pub fn contains_value(&self) -> bool {
445 self.watch.contains_value()
446 }
447
448 /// Modify the value of the `Watch` using a closure.
449 pub fn send_modify<F>(&self, mut f: F)
450 where
451 F: Fn(&mut Option<T>),
452 {
453 self.watch.send_modify(&mut f)
454 }
455
456 /// Modify the value of the `Watch` using a closure. The closure must return
457 /// `true` if the value was modified, which notifies all receivers.
458 pub fn send_if_modified<F>(&self, mut f: F)
459 where
460 F: Fn(&mut Option<T>) -> bool,
461 {
462 self.watch.send_if_modified(&mut f)
463 }
464}
465
466/// A sender of a `Watch` channel.
467///
468/// For a simpler type definition, consider [`DynSender`] at the expense of
469/// some runtime performance due to dynamic dispatch.
470pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
471
472impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
473 fn clone(&self) -> Self {
474 Self(self.0.clone())
475 }
476}
477
478impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
479 /// Converts the `Sender` into a [`DynSender`].
480 pub fn as_dyn(self) -> DynSender<'a, T> {
481 DynSender(Snd::new(self.watch))
482 }
483}
484
485impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
486 fn into(self) -> DynSender<'a, T> {
487 self.as_dyn()
488 }
489}
490
491impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
492 type Target = Snd<'a, T, Watch<M, T, N>>;
493
494 fn deref(&self) -> &Self::Target {
495 &self.0
496 }
497}
498
499impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
500 fn deref_mut(&mut self) -> &mut Self::Target {
501 &mut self.0
502 }
503}
504
505/// A sender which holds a **dynamic** reference to a `Watch` channel.
506///
507/// This is an alternative to [`Sender`] with a simpler type definition,
508pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
509
510impl<'a, T: Clone> Clone for DynSender<'a, T> {
511 fn clone(&self) -> Self {
512 Self(self.0.clone())
513 }
514}
515
516impl<'a, T: Clone> Deref for DynSender<'a, T> {
517 type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
518
519 fn deref(&self) -> &Self::Target {
520 &self.0
521 }
522}
523
524impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
525 fn deref_mut(&mut self) -> &mut Self::Target {
526 &mut self.0
527 }
528}
529
530/// A receiver can `.await` a change in the `Watch` value.
531pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
532 watch: &'a W,
533 at_id: u64,
534 _phantom: PhantomData<T>,
535}
536
537impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
538 /// Creates a new `Receiver` with a reference to the `Watch`.
539 fn new(watch: &'a W, at_id: u64) -> Self {
540 Self {
541 watch,
542 at_id,
543 _phantom: PhantomData,
544 }
545 }
546
547 /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
548 ///
549 /// **Note**: Futures do nothing unless you `.await` or poll them.
550 pub fn get(&mut self) -> impl Future<Output = T> + '_ {
551 poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
552 }
553
554 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
555 pub fn try_get(&mut self) -> Option<T> {
556 self.watch.try_get(Some(&mut self.at_id))
557 }
558
559 /// Returns the value of the `Watch` if it matches the predicate function `f`,
560 /// or waits for it to match, marking it as seen.
561 ///
562 /// **Note**: Futures do nothing unless you `.await` or poll them.
563 pub async fn get_and<F>(&mut self, mut f: F) -> T
564 where
565 F: Fn(&T) -> bool,
566 {
567 poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
568 }
569
570 /// Tries to get the current value of the `Watch` if it matches the predicate
571 /// function `f` without waiting, marking it as seen.
572 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
573 where
574 F: Fn(&T) -> bool,
575 {
576 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
577 }
578
579 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
580 ///
581 /// **Note**: Futures do nothing unless you `.await` or poll them.
582 pub async fn changed(&mut self) -> T {
583 poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
584 }
585
586 /// Tries to get the new value of the watch without waiting, marking it as seen.
587 pub fn try_changed(&mut self) -> Option<T> {
588 self.watch.try_changed(&mut self.at_id)
589 }
590
591 /// Waits for the `Watch` to change to a value which satisfies the predicate
592 /// function `f` and returns the new value, marking it as seen.
593 ///
594 /// **Note**: Futures do nothing unless you `.await` or poll them.
595 pub async fn changed_and<F>(&mut self, mut f: F) -> T
596 where
597 F: Fn(&T) -> bool,
598 {
599 poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
600 }
601
602 /// Tries to get the new value of the watch which satisfies the predicate
603 /// function `f` and returns the new value without waiting, marking it as seen.
604 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
605 where
606 F: Fn(&T) -> bool,
607 {
608 self.watch.try_changed_and(&mut self.at_id, &mut f)
609 }
610
611 /// Checks if the `Watch` contains a value. If this returns true,
612 /// then awaiting [`Rcv::get`] will return immediately.
613 pub fn contains_value(&self) -> bool {
614 self.watch.contains_value()
615 }
616}
617
618impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
619 fn drop(&mut self) {
620 self.watch.drop_receiver();
621 }
622}
623
624/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
625pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
626 watch: &'a W,
627 at_id: u64,
628 _phantom: PhantomData<T>,
629}
630
631impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
632 /// Creates a new `Receiver` with a reference to the `Watch`.
633 fn new(watch: &'a W, at_id: u64) -> Self {
634 Self {
635 watch,
636 at_id,
637 _phantom: PhantomData,
638 }
639 }
640
641 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
642 pub fn try_get(&mut self) -> Option<T> {
643 self.watch.try_get(Some(&mut self.at_id))
644 }
645
646 /// Tries to get the current value of the `Watch` if it matches the predicate
647 /// function `f` without waiting, marking it as seen.
648 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
649 where
650 F: Fn(&T) -> bool,
651 {
652 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
653 }
654
655 /// Tries to get the new value of the watch without waiting, marking it as seen.
656 pub fn try_changed(&mut self) -> Option<T> {
657 self.watch.try_changed(&mut self.at_id)
658 }
659
660 /// Tries to get the new value of the watch which satisfies the predicate
661 /// function `f` and returns the new value without waiting, marking it as seen.
662 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
663 where
664 F: Fn(&T) -> bool,
665 {
666 self.watch.try_changed_and(&mut self.at_id, &mut f)
667 }
668
669 /// Checks if the `Watch` contains a value. If this returns true,
670 /// then awaiting [`Rcv::get`] will return immediately.
671 pub fn contains_value(&self) -> bool {
672 self.watch.contains_value()
673 }
674}
675
676/// A receiver of a `Watch` channel.
677pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
678
679impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
680 /// Converts the `Receiver` into a [`DynReceiver`].
681 pub fn as_dyn(self) -> DynReceiver<'a, T> {
682 let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
683 core::mem::forget(self); // Ensures the destructor is not called
684 rcv
685 }
686}
687
688impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
689 fn into(self) -> DynReceiver<'a, T> {
690 self.as_dyn()
691 }
692}
693
694impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
695 type Target = Rcv<'a, T, Watch<M, T, N>>;
696
697 fn deref(&self) -> &Self::Target {
698 &self.0
699 }
700}
701
702impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
703 fn deref_mut(&mut self) -> &mut Self::Target {
704 &mut self.0
705 }
706}
707
708/// A receiver which holds a **dynamic** reference to a `Watch` channel.
709///
710/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
711/// some runtime performance due to dynamic dispatch.
712pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
713
714impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
715 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
716
717 fn deref(&self) -> &Self::Target {
718 &self.0
719 }
720}
721
722impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
723 fn deref_mut(&mut self) -> &mut Self::Target {
724 &mut self.0
725 }
726}
727
728/// A receiver of a `Watch` channel that cannot `.await` values.
729pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
730
731impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
732 /// Converts the `Receiver` into a [`DynReceiver`].
733 pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
734 let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
735 core::mem::forget(self); // Ensures the destructor is not called
736 rcv
737 }
738}
739
740impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
741 fn into(self) -> DynAnonReceiver<'a, T> {
742 self.as_dyn()
743 }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
747 type Target = AnonRcv<'a, T, Watch<M, T, N>>;
748
749 fn deref(&self) -> &Self::Target {
750 &self.0
751 }
752}
753
754impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
755 fn deref_mut(&mut self) -> &mut Self::Target {
756 &mut self.0
757 }
758}
759
760/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
761///
762/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
763/// some runtime performance due to dynamic dispatch.
764pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
765
766impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
767 type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
768
769 fn deref(&self) -> &Self::Target {
770 &self.0
771 }
772}
773
774impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
775 fn deref_mut(&mut self) -> &mut Self::Target {
776 &mut self.0
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use futures_executor::block_on;
783
784 use super::Watch;
785 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
786
787 #[test]
788 fn multiple_sends() {
789 let f = async {
790 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
791
792 // Obtain receiver and sender
793 let mut rcv = WATCH.receiver().unwrap();
794 let snd = WATCH.sender();
795
796 // Not initialized
797 assert_eq!(rcv.try_changed(), None);
798
799 // Receive the new value
800 snd.send(10);
801 assert_eq!(rcv.changed().await, 10);
802
803 // Receive another value
804 snd.send(20);
805 assert_eq!(rcv.try_changed(), Some(20));
806
807 // No update
808 assert_eq!(rcv.try_changed(), None);
809 };
810 block_on(f);
811 }
812
813 #[test]
814 fn all_try_get() {
815 let f = async {
816 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
817
818 // Obtain receiver and sender
819 let mut rcv = WATCH.receiver().unwrap();
820 let snd = WATCH.sender();
821
822 // Not initialized
823 assert_eq!(WATCH.try_get(), None);
824 assert_eq!(rcv.try_get(), None);
825 assert_eq!(snd.try_get(), None);
826
827 // Receive the new value
828 snd.send(10);
829 assert_eq!(WATCH.try_get(), Some(10));
830 assert_eq!(rcv.try_get(), Some(10));
831 assert_eq!(snd.try_get(), Some(10));
832
833 assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
834 assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
835 assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
836
837 assert_eq!(WATCH.try_get_and(|x| x < &5), None);
838 assert_eq!(rcv.try_get_and(|x| x < &5), None);
839 assert_eq!(snd.try_get_and(|x| x < &5), None);
840 };
841 block_on(f);
842 }
843
844 #[test]
845 fn once_lock_like() {
846 let f = async {
847 static CONFIG0: u8 = 10;
848 static CONFIG1: u8 = 20;
849
850 static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
851
852 // Obtain receiver and sender
853 let mut rcv = WATCH.receiver().unwrap();
854 let snd = WATCH.sender();
855
856 // Not initialized
857 assert_eq!(rcv.try_changed(), None);
858
859 // Receive the new value
860 snd.send(&CONFIG0);
861 let rcv0 = rcv.changed().await;
862 assert_eq!(rcv0, &10);
863
864 // Receive another value
865 snd.send(&CONFIG1);
866 let rcv1 = rcv.try_changed();
867 assert_eq!(rcv1, Some(&20));
868
869 // No update
870 assert_eq!(rcv.try_changed(), None);
871
872 // Ensure similarity with original static
873 assert_eq!(rcv0, &CONFIG0);
874 assert_eq!(rcv1, Some(&CONFIG1));
875 };
876 block_on(f);
877 }
878
879 #[test]
880 fn sender_modify() {
881 let f = async {
882 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
883
884 // Obtain receiver and sender
885 let mut rcv = WATCH.receiver().unwrap();
886 let snd = WATCH.sender();
887
888 // Receive the new value
889 snd.send(10);
890 assert_eq!(rcv.try_changed(), Some(10));
891
892 // Modify the value inplace
893 snd.send_modify(|opt| {
894 if let Some(inner) = opt {
895 *inner += 5;
896 }
897 });
898
899 // Get the modified value
900 assert_eq!(rcv.try_changed(), Some(15));
901 assert_eq!(rcv.try_changed(), None);
902 };
903 block_on(f);
904 }
905
906 #[test]
907 fn predicate_fn() {
908 let f = async {
909 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
910
911 // Obtain receiver and sender
912 let mut rcv = WATCH.receiver().unwrap();
913 let snd = WATCH.sender();
914
915 snd.send(15);
916 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
917 assert_eq!(rcv.try_get_and(|x| x < &5), None);
918 assert!(rcv.try_changed().is_none());
919
920 snd.send(20);
921 assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
922 assert_eq!(rcv.try_changed_and(|x| x > &5), None);
923
924 snd.send(25);
925 assert_eq!(rcv.try_changed_and(|x| x < &5), None);
926 assert_eq!(rcv.try_changed(), Some(25));
927
928 snd.send(30);
929 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
930 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
931 };
932 block_on(f);
933 }
934
935 #[test]
936 fn receive_after_create() {
937 let f = async {
938 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
939
940 // Obtain sender and send value
941 let snd = WATCH.sender();
942 snd.send(10);
943
944 // Obtain receiver and receive value
945 let mut rcv = WATCH.receiver().unwrap();
946 assert_eq!(rcv.try_changed(), Some(10));
947 };
948 block_on(f);
949 }
950
951 #[test]
952 fn max_receivers_drop() {
953 let f = async {
954 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
955
956 // Try to create 3 receivers (only 2 can exist at once)
957 let rcv0 = WATCH.receiver();
958 let rcv1 = WATCH.receiver();
959 let rcv2 = WATCH.receiver();
960
961 // Ensure the first two are successful and the third is not
962 assert!(rcv0.is_some());
963 assert!(rcv1.is_some());
964 assert!(rcv2.is_none());
965
966 // Drop the first receiver
967 drop(rcv0);
968
969 // Create another receiver and ensure it is successful
970 let rcv3 = WATCH.receiver();
971 assert!(rcv3.is_some());
972 };
973 block_on(f);
974 }
975
976 #[test]
977 fn multiple_receivers() {
978 let f = async {
979 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
980
981 // Obtain receivers and sender
982 let mut rcv0 = WATCH.receiver().unwrap();
983 let mut rcv1 = WATCH.anon_receiver();
984 let snd = WATCH.sender();
985
986 // No update for both
987 assert_eq!(rcv0.try_changed(), None);
988 assert_eq!(rcv1.try_changed(), None);
989
990 // Send a new value
991 snd.send(0);
992
993 // Both receivers receive the new value
994 assert_eq!(rcv0.try_changed(), Some(0));
995 assert_eq!(rcv1.try_changed(), Some(0));
996 };
997 block_on(f);
998 }
999
1000 #[test]
1001 fn clone_senders() {
1002 let f = async {
1003 // Obtain different ways to send
1004 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1005 let snd0 = WATCH.sender();
1006 let snd1 = snd0.clone();
1007
1008 // Obtain Receiver
1009 let mut rcv = WATCH.receiver().unwrap().as_dyn();
1010
1011 // Send a value from first sender
1012 snd0.send(10);
1013 assert_eq!(rcv.try_changed(), Some(10));
1014
1015 // Send a value from second sender
1016 snd1.send(20);
1017 assert_eq!(rcv.try_changed(), Some(20));
1018 };
1019 block_on(f);
1020 }
1021
1022 #[test]
1023 fn use_dynamics() {
1024 let f = async {
1025 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1026
1027 // Obtain receiver and sender
1028 let mut anon_rcv = WATCH.dyn_anon_receiver();
1029 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1030 let dyn_snd = WATCH.dyn_sender();
1031
1032 // Send a value
1033 dyn_snd.send(10);
1034
1035 // Ensure the dynamic receiver receives the value
1036 assert_eq!(anon_rcv.try_changed(), Some(10));
1037 assert_eq!(dyn_rcv.try_changed(), Some(10));
1038 assert_eq!(dyn_rcv.try_changed(), None);
1039 };
1040 block_on(f);
1041 }
1042
1043 #[test]
1044 fn convert_to_dyn() {
1045 let f = async {
1046 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1047
1048 // Obtain receiver and sender
1049 let anon_rcv = WATCH.anon_receiver();
1050 let rcv = WATCH.receiver().unwrap();
1051 let snd = WATCH.sender();
1052
1053 // Convert to dynamic
1054 let mut dyn_anon_rcv = anon_rcv.as_dyn();
1055 let mut dyn_rcv = rcv.as_dyn();
1056 let dyn_snd = snd.as_dyn();
1057
1058 // Send a value
1059 dyn_snd.send(10);
1060
1061 // Ensure the dynamic receiver receives the value
1062 assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1063 assert_eq!(dyn_rcv.try_changed(), Some(10));
1064 assert_eq!(dyn_rcv.try_changed(), None);
1065 };
1066 block_on(f);
1067 }
1068
1069 #[test]
1070 fn dynamic_receiver_count() {
1071 let f = async {
1072 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1073
1074 // Obtain receiver and sender
1075 let rcv0 = WATCH.receiver();
1076 let rcv1 = WATCH.receiver();
1077 let rcv2 = WATCH.receiver();
1078
1079 // Ensure the first two are successful and the third is not
1080 assert!(rcv0.is_some());
1081 assert!(rcv1.is_some());
1082 assert!(rcv2.is_none());
1083
1084 // Convert to dynamic
1085 let dyn_rcv0 = rcv0.unwrap().as_dyn();
1086
1087 // Drop the (now dynamic) receiver
1088 drop(dyn_rcv0);
1089
1090 // Create another receiver and ensure it is successful
1091 let rcv3 = WATCH.receiver();
1092 let rcv4 = WATCH.receiver();
1093 assert!(rcv3.is_some());
1094 assert!(rcv4.is_none());
1095 };
1096 block_on(f);
1097 }
1098
1099 #[test]
1100 fn contains_value() {
1101 let f = async {
1102 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1103
1104 // Obtain receiver and sender
1105 let rcv = WATCH.receiver().unwrap();
1106 let snd = WATCH.sender();
1107
1108 // check if the watch contains a value
1109 assert_eq!(rcv.contains_value(), false);
1110 assert_eq!(snd.contains_value(), false);
1111
1112 // Send a value
1113 snd.send(10);
1114
1115 // check if the watch contains a value
1116 assert_eq!(rcv.contains_value(), true);
1117 assert_eq!(snd.contains_value(), true);
1118 };
1119 block_on(f);
1120 }
1121}
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
index cfce9a571..e3e5b2538 100644
--- a/embassy-sync/src/zerocopy_channel.rs
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -15,7 +15,7 @@
15//! another message will result in an error being returned. 15//! another message will result in an error being returned.
16 16
17use core::cell::RefCell; 17use core::cell::RefCell;
18use core::future::poll_fn; 18use core::future::{poll_fn, Future};
19use core::marker::PhantomData; 19use core::marker::PhantomData;
20use core::task::{Context, Poll}; 20use core::task::{Context, Poll};
21 21
@@ -35,7 +35,7 @@ use crate::waitqueue::WakerRegistration;
35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through 35/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
36/// an `&mut T`. 36/// an `&mut T`.
37pub struct Channel<'a, M: RawMutex, T> { 37pub struct Channel<'a, M: RawMutex, T> {
38 buf: *mut T, 38 buf: BufferPtr<T>,
39 phantom: PhantomData<&'a mut T>, 39 phantom: PhantomData<&'a mut T>,
40 state: Mutex<M, RefCell<State>>, 40 state: Mutex<M, RefCell<State>>,
41} 41}
@@ -50,10 +50,10 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
50 assert!(len != 0); 50 assert!(len != 0);
51 51
52 Self { 52 Self {
53 buf: buf.as_mut_ptr(), 53 buf: BufferPtr(buf.as_mut_ptr()),
54 phantom: PhantomData, 54 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State { 55 state: Mutex::new(RefCell::new(State {
56 len, 56 capacity: len,
57 front: 0, 57 front: 0,
58 back: 0, 58 back: 0,
59 full: false, 59 full: false,
@@ -70,8 +70,42 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
70 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { 70 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self }) 71 (Sender { channel: self }, Receiver { channel: self })
72 } 72 }
73
74 /// Clears all elements in the channel.
75 pub fn clear(&mut self) {
76 self.state.lock(|s| {
77 s.borrow_mut().clear();
78 });
79 }
80
81 /// Returns the number of elements currently in the channel.
82 pub fn len(&self) -> usize {
83 self.state.lock(|s| s.borrow().len())
84 }
85
86 /// Returns whether the channel is empty.
87 pub fn is_empty(&self) -> bool {
88 self.state.lock(|s| s.borrow().is_empty())
89 }
90
91 /// Returns whether the channel is full.
92 pub fn is_full(&self) -> bool {
93 self.state.lock(|s| s.borrow().is_full())
94 }
95}
96
97#[repr(transparent)]
98struct BufferPtr<T>(*mut T);
99
100impl<T> BufferPtr<T> {
101 unsafe fn add(&self, count: usize) -> *mut T {
102 self.0.add(count)
103 }
73} 104}
74 105
106unsafe impl<T> Send for BufferPtr<T> {}
107unsafe impl<T> Sync for BufferPtr<T> {}
108
75/// Send-only access to a [`Channel`]. 109/// Send-only access to a [`Channel`].
76pub struct Sender<'a, M: RawMutex, T> { 110pub struct Sender<'a, M: RawMutex, T> {
77 channel: &'a Channel<'a, M, T>, 111 channel: &'a Channel<'a, M, T>,
@@ -109,12 +143,15 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
109 } 143 }
110 144
111 /// Asynchronously send a value over the channel. 145 /// Asynchronously send a value over the channel.
112 pub async fn send(&mut self) -> &mut T { 146 pub fn send(&mut self) -> impl Future<Output = &mut T> {
113 let i = poll_fn(|cx| { 147 poll_fn(|cx| {
114 self.channel.state.lock(|s| { 148 self.channel.state.lock(|s| {
115 let s = &mut *s.borrow_mut(); 149 let s = &mut *s.borrow_mut();
116 match s.push_index() { 150 match s.push_index() {
117 Some(i) => Poll::Ready(i), 151 Some(i) => {
152 let r = unsafe { &mut *self.channel.buf.add(i) };
153 Poll::Ready(r)
154 }
118 None => { 155 None => {
119 s.receive_waker.register(cx.waker()); 156 s.receive_waker.register(cx.waker());
120 Poll::Pending 157 Poll::Pending
@@ -122,14 +159,34 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
122 } 159 }
123 }) 160 })
124 }) 161 })
125 .await;
126 unsafe { &mut *self.channel.buf.add(i) }
127 } 162 }
128 163
129 /// Notify the channel that the sending of the value has been finalized. 164 /// Notify the channel that the sending of the value has been finalized.
130 pub fn send_done(&mut self) { 165 pub fn send_done(&mut self) {
131 self.channel.state.lock(|s| s.borrow_mut().push_done()) 166 self.channel.state.lock(|s| s.borrow_mut().push_done())
132 } 167 }
168
169 /// Clears all elements in the channel.
170 pub fn clear(&mut self) {
171 self.channel.state.lock(|s| {
172 s.borrow_mut().clear();
173 });
174 }
175
176 /// Returns the number of elements currently in the channel.
177 pub fn len(&self) -> usize {
178 self.channel.state.lock(|s| s.borrow().len())
179 }
180
181 /// Returns whether the channel is empty.
182 pub fn is_empty(&self) -> bool {
183 self.channel.state.lock(|s| s.borrow().is_empty())
184 }
185
186 /// Returns whether the channel is full.
187 pub fn is_full(&self) -> bool {
188 self.channel.state.lock(|s| s.borrow().is_full())
189 }
133} 190}
134 191
135/// Receive-only access to a [`Channel`]. 192/// Receive-only access to a [`Channel`].
@@ -138,7 +195,7 @@ pub struct Receiver<'a, M: RawMutex, T> {
138} 195}
139 196
140impl<'a, M: RawMutex, T> Receiver<'a, M, T> { 197impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
141 /// Creates one further [`Sender`] over the same channel. 198 /// Creates one further [`Receiver`] over the same channel.
142 pub fn borrow(&mut self) -> Receiver<'_, M, T> { 199 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
143 Receiver { channel: self.channel } 200 Receiver { channel: self.channel }
144 } 201 }
@@ -169,12 +226,15 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
169 } 226 }
170 227
171 /// Asynchronously receive a value over the channel. 228 /// Asynchronously receive a value over the channel.
172 pub async fn receive(&mut self) -> &mut T { 229 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
173 let i = poll_fn(|cx| { 230 poll_fn(|cx| {
174 self.channel.state.lock(|s| { 231 self.channel.state.lock(|s| {
175 let s = &mut *s.borrow_mut(); 232 let s = &mut *s.borrow_mut();
176 match s.pop_index() { 233 match s.pop_index() {
177 Some(i) => Poll::Ready(i), 234 Some(i) => {
235 let r = unsafe { &mut *self.channel.buf.add(i) };
236 Poll::Ready(r)
237 }
178 None => { 238 None => {
179 s.send_waker.register(cx.waker()); 239 s.send_waker.register(cx.waker());
180 Poll::Pending 240 Poll::Pending
@@ -182,18 +242,39 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
182 } 242 }
183 }) 243 })
184 }) 244 })
185 .await;
186 unsafe { &mut *self.channel.buf.add(i) }
187 } 245 }
188 246
189 /// Notify the channel that the receiving of the value has been finalized. 247 /// Notify the channel that the receiving of the value has been finalized.
190 pub fn receive_done(&mut self) { 248 pub fn receive_done(&mut self) {
191 self.channel.state.lock(|s| s.borrow_mut().pop_done()) 249 self.channel.state.lock(|s| s.borrow_mut().pop_done())
192 } 250 }
251
252 /// Clears all elements in the channel.
253 pub fn clear(&mut self) {
254 self.channel.state.lock(|s| {
255 s.borrow_mut().clear();
256 });
257 }
258
259 /// Returns the number of elements currently in the channel.
260 pub fn len(&self) -> usize {
261 self.channel.state.lock(|s| s.borrow().len())
262 }
263
264 /// Returns whether the channel is empty.
265 pub fn is_empty(&self) -> bool {
266 self.channel.state.lock(|s| s.borrow().is_empty())
267 }
268
269 /// Returns whether the channel is full.
270 pub fn is_full(&self) -> bool {
271 self.channel.state.lock(|s| s.borrow().is_full())
272 }
193} 273}
194 274
195struct State { 275struct State {
196 len: usize, 276 /// Maximum number of elements the channel can hold.
277 capacity: usize,
197 278
198 /// Front index. Always 0..=(N-1) 279 /// Front index. Always 0..=(N-1)
199 front: usize, 280 front: usize,
@@ -210,13 +291,34 @@ struct State {
210 291
211impl State { 292impl State {
212 fn increment(&self, i: usize) -> usize { 293 fn increment(&self, i: usize) -> usize {
213 if i + 1 == self.len { 294 if i + 1 == self.capacity {
214 0 295 0
215 } else { 296 } else {
216 i + 1 297 i + 1
217 } 298 }
218 } 299 }
219 300
301 fn clear(&mut self) {
302 if self.full {
303 self.receive_waker.wake();
304 }
305 self.front = 0;
306 self.back = 0;
307 self.full = false;
308 }
309
310 fn len(&self) -> usize {
311 if !self.full {
312 if self.back >= self.front {
313 self.back - self.front
314 } else {
315 self.capacity + self.back - self.front
316 }
317 } else {
318 self.capacity
319 }
320 }
321
220 fn is_full(&self) -> bool { 322 fn is_full(&self) -> bool {
221 self.full 323 self.full
222 } 324 }