aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
authorOlof <[email protected]>2024-12-18 01:48:25 +0100
committerGitHub <[email protected]>2024-12-18 01:48:25 +0100
commit7cf96e4730964d085015320648c870a05fbaf431 (patch)
tree04072529b62082cb66443377b589fe08169f83be /embassy-sync/src
parent8678911028a591d72fd1d8418407b5885ed4c417 (diff)
parent341036a8b865609767fbf9015b482ea70ed4f23f (diff)
Merge branch 'embassy-rs:main' into u5_adc
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/blocking_mutex/mod.rs1
-rw-r--r--embassy-sync/src/channel.rs84
-rw-r--r--embassy-sync/src/lib.rs1
-rw-r--r--embassy-sync/src/mutex.rs2
-rw-r--r--embassy-sync/src/priority_channel.rs84
-rw-r--r--embassy-sync/src/pubsub/mod.rs30
-rw-r--r--embassy-sync/src/pubsub/publisher.rs67
-rw-r--r--embassy-sync/src/waitqueue/atomic_waker.rs42
-rw-r--r--embassy-sync/src/watch.rs1121
-rw-r--r--embassy-sync/src/zerocopy_channel.rs91
10 files changed, 1507 insertions, 16 deletions
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs
index 8a4a4c642..beafdb43d 100644
--- a/embassy-sync/src/blocking_mutex/mod.rs
+++ b/embassy-sync/src/blocking_mutex/mod.rs
@@ -104,6 +104,7 @@ impl<T> Mutex<raw::CriticalSectionRawMutex, T> {
104 104
105impl<T> Mutex<raw::NoopRawMutex, T> { 105impl<T> Mutex<raw::NoopRawMutex, T> {
106 /// Borrows the data 106 /// Borrows the data
107 #[allow(clippy::should_implement_trait)]
107 pub fn borrow(&self) -> &T { 108 pub fn borrow(&self) -> &T {
108 let ptr = self.data.get() as *const T; 109 let ptr = self.data.get() as *const T;
109 unsafe { &*ptr } 110 unsafe { &*ptr }
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 55ac5fb66..18b053111 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -72,6 +72,48 @@ where
72 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 72 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73 self.channel.poll_ready_to_send(cx) 73 self.channel.poll_ready_to_send(cx)
74 } 74 }
75
76 /// Returns the maximum number of elements the channel can hold.
77 ///
78 /// See [`Channel::capacity()`]
79 pub const fn capacity(&self) -> usize {
80 self.channel.capacity()
81 }
82
83 /// Returns the free capacity of the channel.
84 ///
85 /// See [`Channel::free_capacity()`]
86 pub fn free_capacity(&self) -> usize {
87 self.channel.free_capacity()
88 }
89
90 /// Clears all elements in the channel.
91 ///
92 /// See [`Channel::clear()`]
93 pub fn clear(&self) {
94 self.channel.clear();
95 }
96
97 /// Returns the number of elements currently in the channel.
98 ///
99 /// See [`Channel::len()`]
100 pub fn len(&self) -> usize {
101 self.channel.len()
102 }
103
104 /// Returns whether the channel is empty.
105 ///
106 /// See [`Channel::is_empty()`]
107 pub fn is_empty(&self) -> bool {
108 self.channel.is_empty()
109 }
110
111 /// Returns whether the channel is full.
112 ///
113 /// See [`Channel::is_full()`]
114 pub fn is_full(&self) -> bool {
115 self.channel.is_full()
116 }
75} 117}
76 118
77/// Send-only access to a [`Channel`] without knowing channel size. 119/// Send-only access to a [`Channel`] without knowing channel size.
@@ -179,6 +221,48 @@ where
179 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 221 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
180 self.channel.poll_receive(cx) 222 self.channel.poll_receive(cx)
181 } 223 }
224
225 /// Returns the maximum number of elements the channel can hold.
226 ///
227 /// See [`Channel::capacity()`]
228 pub const fn capacity(&self) -> usize {
229 self.channel.capacity()
230 }
231
232 /// Returns the free capacity of the channel.
233 ///
234 /// See [`Channel::free_capacity()`]
235 pub fn free_capacity(&self) -> usize {
236 self.channel.free_capacity()
237 }
238
239 /// Clears all elements in the channel.
240 ///
241 /// See [`Channel::clear()`]
242 pub fn clear(&self) {
243 self.channel.clear();
244 }
245
246 /// Returns the number of elements currently in the channel.
247 ///
248 /// See [`Channel::len()`]
249 pub fn len(&self) -> usize {
250 self.channel.len()
251 }
252
253 /// Returns whether the channel is empty.
254 ///
255 /// See [`Channel::is_empty()`]
256 pub fn is_empty(&self) -> bool {
257 self.channel.is_empty()
258 }
259
260 /// Returns whether the channel is full.
261 ///
262 /// See [`Channel::is_full()`]
263 pub fn is_full(&self) -> bool {
264 self.channel.is_full()
265 }
182} 266}
183 267
184/// Receive-only access to a [`Channel`] without knowing channel size. 268/// Receive-only access to a [`Channel`] without knowing channel size.
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 014bf1d06..df0f5e815 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -21,4 +21,5 @@ pub mod pubsub;
21pub mod semaphore; 21pub mod semaphore;
22pub mod signal; 22pub mod signal;
23pub mod waitqueue; 23pub mod waitqueue;
24pub mod watch;
24pub mod zerocopy_channel; 25pub mod zerocopy_channel;
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 8c3a3af9f..08f66e374 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -138,7 +138,7 @@ impl<M: RawMutex, T> From<T> for Mutex<M, T> {
138impl<M, T> Default for Mutex<M, T> 138impl<M, T> Default for Mutex<M, T>
139where 139where
140 M: RawMutex, 140 M: RawMutex,
141 T: ?Sized + Default, 141 T: Default,
142{ 142{
143 fn default() -> Self { 143 fn default() -> Self {
144 Self::new(Default::default()) 144 Self::new(Default::default())
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 24c6c5a7f..1f4d8667c 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -71,6 +71,48 @@ where
71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx) 72 self.channel.poll_ready_to_send(cx)
73 } 73 }
74
75 /// Returns the maximum number of elements the channel can hold.
76 ///
77 /// See [`PriorityChannel::capacity()`]
78 pub const fn capacity(&self) -> usize {
79 self.channel.capacity()
80 }
81
82 /// Returns the free capacity of the channel.
83 ///
84 /// See [`PriorityChannel::free_capacity()`]
85 pub fn free_capacity(&self) -> usize {
86 self.channel.free_capacity()
87 }
88
89 /// Clears all elements in the channel.
90 ///
91 /// See [`PriorityChannel::clear()`]
92 pub fn clear(&self) {
93 self.channel.clear();
94 }
95
96 /// Returns the number of elements currently in the channel.
97 ///
98 /// See [`PriorityChannel::len()`]
99 pub fn len(&self) -> usize {
100 self.channel.len()
101 }
102
103 /// Returns whether the channel is empty.
104 ///
105 /// See [`PriorityChannel::is_empty()`]
106 pub fn is_empty(&self) -> bool {
107 self.channel.is_empty()
108 }
109
110 /// Returns whether the channel is full.
111 ///
112 /// See [`PriorityChannel::is_full()`]
113 pub fn is_full(&self) -> bool {
114 self.channel.is_full()
115 }
74} 116}
75 117
76impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> 118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
@@ -146,6 +188,48 @@ where
146 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 188 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
147 self.channel.poll_receive(cx) 189 self.channel.poll_receive(cx)
148 } 190 }
191
192 /// Returns the maximum number of elements the channel can hold.
193 ///
194 /// See [`PriorityChannel::capacity()`]
195 pub const fn capacity(&self) -> usize {
196 self.channel.capacity()
197 }
198
199 /// Returns the free capacity of the channel.
200 ///
201 /// See [`PriorityChannel::free_capacity()`]
202 pub fn free_capacity(&self) -> usize {
203 self.channel.free_capacity()
204 }
205
206 /// Clears all elements in the channel.
207 ///
208 /// See [`PriorityChannel::clear()`]
209 pub fn clear(&self) {
210 self.channel.clear();
211 }
212
213 /// Returns the number of elements currently in the channel.
214 ///
215 /// See [`PriorityChannel::len()`]
216 pub fn len(&self) -> usize {
217 self.channel.len()
218 }
219
220 /// Returns whether the channel is empty.
221 ///
222 /// See [`PriorityChannel::is_empty()`]
223 pub fn is_empty(&self) -> bool {
224 self.channel.is_empty()
225 }
226
227 /// Returns whether the channel is full.
228 ///
229 /// See [`PriorityChannel::is_full()`]
230 pub fn is_full(&self) -> bool {
231 self.channel.is_full()
232 }
149} 233}
150 234
151impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> 235impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 812302e2b..a2360a1d8 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -27,8 +27,8 @@ pub use subscriber::{DynSubscriber, Subscriber};
27/// 27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. 28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message 29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive 30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged. 31/// an error to indicate that it has lagged.
32/// 32///
33/// ## Example 33/// ## Example
34/// 34///
@@ -755,4 +755,30 @@ mod tests {
755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0); 755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0); 756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
757 } 757 }
758
759 #[futures_test::test]
760 async fn publisher_sink() {
761 use futures_util::{SinkExt, StreamExt};
762
763 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
764
765 let mut sub = channel.subscriber().unwrap();
766
767 let publ = channel.publisher().unwrap();
768 let mut sink = publ.sink();
769
770 sink.send(0).await.unwrap();
771 assert_eq!(0, sub.try_next_message_pure().unwrap());
772
773 sink.send(1).await.unwrap();
774 assert_eq!(1, sub.try_next_message_pure().unwrap());
775
776 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
777 .await
778 .unwrap();
779 assert_eq!(0, sub.try_next_message_pure().unwrap());
780 assert_eq!(1, sub.try_next_message_pure().unwrap());
781 assert_eq!(2, sub.try_next_message_pure().unwrap());
782 assert_eq!(3, sub.try_next_message_pure().unwrap());
783 }
758} 784}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e66b3b1db..7a1ab66de 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
74 pub fn is_full(&self) -> bool { 74 pub fn is_full(&self) -> bool {
75 self.channel.is_full() 75 self.channel.is_full()
76 } 76 }
77
78 /// Create a [`futures::Sink`] adapter for this publisher.
79 #[inline]
80 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
81 PubSink { publ: self, fut: None }
82 }
77} 83}
78 84
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 85impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
221 } 227 }
222} 228}
223 229
230#[must_use = "Sinks do nothing unless polled"]
231/// [`futures_sink::Sink`] adapter for [`Pub`].
232pub struct PubSink<'a, 'p, PSB, T>
233where
234 T: Clone,
235 PSB: PubSubBehavior<T> + ?Sized,
236{
237 publ: &'p Pub<'a, PSB, T>,
238 fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
239}
240
241impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
242where
243 PSB: PubSubBehavior<T> + ?Sized,
244 T: Clone,
245{
246 /// Try to make progress on the pending future if we have one.
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
248 let Some(mut fut) = self.fut.take() else {
249 return Poll::Ready(());
250 };
251
252 if Pin::new(&mut fut).poll(cx).is_pending() {
253 self.fut = Some(fut);
254 return Poll::Pending;
255 }
256
257 Poll::Ready(())
258 }
259}
260
261impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
262where
263 PSB: PubSubBehavior<T> + ?Sized,
264 T: Clone,
265{
266 type Error = core::convert::Infallible;
267
268 #[inline]
269 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270 self.poll(cx).map(Ok)
271 }
272
273 #[inline]
274 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
275 self.fut = Some(self.publ.publish(item));
276
277 Ok(())
278 }
279
280 #[inline]
281 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282 self.poll(cx).map(Ok)
283 }
284
285 #[inline]
286 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 self.poll(cx).map(Ok)
288 }
289}
290
224/// Future for the publisher wait action 291/// Future for the publisher wait action
225#[must_use = "futures do nothing unless you `.await` or poll them"] 292#[must_use = "futures do nothing unless you `.await` or poll them"]
226pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 293pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs
index 63fe04a6e..231902c5a 100644
--- a/embassy-sync/src/waitqueue/atomic_waker.rs
+++ b/embassy-sync/src/waitqueue/atomic_waker.rs
@@ -1,26 +1,25 @@
1use core::cell::Cell; 1use core::cell::Cell;
2use core::task::Waker; 2use core::task::Waker;
3 3
4use crate::blocking_mutex::raw::CriticalSectionRawMutex; 4use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
5use crate::blocking_mutex::Mutex; 5use crate::blocking_mutex::Mutex;
6 6
7/// Utility struct to register and wake a waker. 7/// Utility struct to register and wake a waker.
8pub struct AtomicWaker { 8pub struct GenericAtomicWaker<M: RawMutex> {
9 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, 9 waker: Mutex<M, Cell<Option<Waker>>>,
10} 10}
11 11
12impl AtomicWaker { 12impl<M: RawMutex> GenericAtomicWaker<M> {
13 /// Create a new `AtomicWaker`. 13 /// Create a new `AtomicWaker`.
14 pub const fn new() -> Self { 14 pub const fn new(mutex: M) -> Self {
15 Self { 15 Self {
16 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), 16 waker: Mutex::const_new(mutex, Cell::new(None)),
17 } 17 }
18 } 18 }
19 19
20 /// Register a waker. Overwrites the previous waker, if any. 20 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&self, w: &Waker) { 21 pub fn register(&self, w: &Waker) {
22 critical_section::with(|cs| { 22 self.waker.lock(|cell| {
23 let cell = self.waker.borrow(cs);
24 cell.set(match cell.replace(None) { 23 cell.set(match cell.replace(None) {
25 Some(w2) if (w2.will_wake(w)) => Some(w2), 24 Some(w2) if (w2.will_wake(w)) => Some(w2),
26 _ => Some(w.clone()), 25 _ => Some(w.clone()),
@@ -30,8 +29,7 @@ impl AtomicWaker {
30 29
31 /// Wake the registered waker, if any. 30 /// Wake the registered waker, if any.
32 pub fn wake(&self) { 31 pub fn wake(&self) {
33 critical_section::with(|cs| { 32 self.waker.lock(|cell| {
34 let cell = self.waker.borrow(cs);
35 if let Some(w) = cell.replace(None) { 33 if let Some(w) = cell.replace(None) {
36 w.wake_by_ref(); 34 w.wake_by_ref();
37 cell.set(Some(w)); 35 cell.set(Some(w));
@@ -39,3 +37,27 @@ impl AtomicWaker {
39 }) 37 })
40 } 38 }
41} 39}
40
41/// Utility struct to register and wake a waker.
42pub struct AtomicWaker {
43 waker: GenericAtomicWaker<CriticalSectionRawMutex>,
44}
45
46impl AtomicWaker {
47 /// Create a new `AtomicWaker`.
48 pub const fn new() -> Self {
49 Self {
50 waker: GenericAtomicWaker::new(CriticalSectionRawMutex::new()),
51 }
52 }
53
54 /// Register a waker. Overwrites the previous waker, if any.
55 pub fn register(&self, w: &Waker) {
56 self.waker.register(w);
57 }
58
59 /// Wake the registered waker, if any.
60 pub fn wake(&self) {
61 self.waker.wake();
62 }
63}
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
new file mode 100644
index 000000000..404e31714
--- /dev/null
+++ b/embassy-sync/src/watch.rs
@@ -0,0 +1,1121 @@
1//! A synchronization primitive for passing the latest value to **multiple** receivers.
2
3use core::cell::RefCell;
4use core::future::poll_fn;
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await
14/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
15/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
16/// value when a new one is sent, without waiting for all receivers to read the previous value.
17///
18/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
19/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
20/// always provided with the latest value.
21///
22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
24/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
25/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
26/// ```
27///
28/// use futures_executor::block_on;
29/// use embassy_sync::watch::Watch;
30/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
31///
32/// let f = async {
33///
34/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
35///
36/// // Obtain receivers and sender
37/// let mut rcv0 = WATCH.receiver().unwrap();
38/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
39/// let mut snd = WATCH.sender();
40///
41/// // No more receivers, and no update
42/// assert!(WATCH.receiver().is_none());
43/// assert_eq!(rcv1.try_changed(), None);
44///
45/// snd.send(10);
46///
47/// // Receive the new value (async or try)
48/// assert_eq!(rcv0.changed().await, 10);
49/// assert_eq!(rcv1.try_changed(), Some(10));
50///
51/// // No update
52/// assert_eq!(rcv0.try_changed(), None);
53/// assert_eq!(rcv1.try_changed(), None);
54///
55/// snd.send(20);
56///
57/// // Using `get` marks the value as seen
58/// assert_eq!(rcv1.get().await, 20);
59/// assert_eq!(rcv1.try_changed(), None);
60///
61/// // But `get` also returns when unchanged
62/// assert_eq!(rcv1.get().await, 20);
63/// assert_eq!(rcv1.get().await, 20);
64///
65/// };
66/// block_on(f);
67/// ```
68pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
69 mutex: Mutex<M, RefCell<WatchState<T, N>>>,
70}
71
72struct WatchState<T: Clone, const N: usize> {
73 data: Option<T>,
74 current_id: u64,
75 wakers: MultiWakerRegistration<N>,
76 receiver_count: usize,
77}
78
79trait SealedWatchBehavior<T> {
80 /// Poll the `Watch` for the current value, making it as seen.
81 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
82
83 /// Poll the `Watch` for the value if it matches the predicate function
84 /// `f`, making it as seen.
85 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
86
87 /// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
88 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
89
90 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
91 fn try_changed(&self, id: &mut u64) -> Option<T>;
92
93 /// Poll the `Watch` for a changed value that matches the predicate function
94 /// `f`, marking it as seen.
95 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
96
97 /// Tries to retrieve the value of the `Watch` if it has changed and matches the
98 /// predicate function `f`, marking it as seen.
99 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
100
101 /// Used when a receiver is dropped to decrement the receiver count.
102 ///
103 /// ## This method should not be called by the user.
104 fn drop_receiver(&self);
105
106 /// Clears the value of the `Watch`.
107 fn clear(&self);
108
109 /// Sends a new value to the `Watch`.
110 fn send(&self, val: T);
111
112 /// Modify the value of the `Watch` using a closure. Returns `false` if the
113 /// `Watch` does not already contain a value.
114 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
115
116 /// Modify the value of the `Watch` using a closure. Returns `false` if the
117 /// `Watch` does not already contain a value.
118 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
119}
120
121/// A trait representing the 'inner' behavior of the `Watch`.
122#[allow(private_bounds)]
123pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
124 /// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
125 fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
126
127 /// Tries to get the value of the `Watch` if it matches the predicate function
128 /// `f`, marking it as seen.
129 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
130
131 /// Checks if the `Watch` is been initialized with a value.
132 fn contains_value(&self) -> bool;
133}
134
135impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
136 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
137 self.mutex.lock(|state| {
138 let mut s = state.borrow_mut();
139 match &s.data {
140 Some(data) => {
141 *id = s.current_id;
142 Poll::Ready(data.clone())
143 }
144 None => {
145 s.wakers.register(cx.waker());
146 Poll::Pending
147 }
148 }
149 })
150 }
151
152 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
153 self.mutex.lock(|state| {
154 let mut s = state.borrow_mut();
155 match s.data {
156 Some(ref data) if f(data) => {
157 *id = s.current_id;
158 Poll::Ready(data.clone())
159 }
160 _ => {
161 s.wakers.register(cx.waker());
162 Poll::Pending
163 }
164 }
165 })
166 }
167
168 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
169 self.mutex.lock(|state| {
170 let mut s = state.borrow_mut();
171 match (&s.data, s.current_id > *id) {
172 (Some(data), true) => {
173 *id = s.current_id;
174 Poll::Ready(data.clone())
175 }
176 _ => {
177 s.wakers.register(cx.waker());
178 Poll::Pending
179 }
180 }
181 })
182 }
183
184 fn try_changed(&self, id: &mut u64) -> Option<T> {
185 self.mutex.lock(|state| {
186 let s = state.borrow();
187 match s.current_id > *id {
188 true => {
189 *id = s.current_id;
190 s.data.clone()
191 }
192 false => None,
193 }
194 })
195 }
196
197 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
198 self.mutex.lock(|state| {
199 let mut s = state.borrow_mut();
200 match (&s.data, s.current_id > *id) {
201 (Some(data), true) if f(data) => {
202 *id = s.current_id;
203 Poll::Ready(data.clone())
204 }
205 _ => {
206 s.wakers.register(cx.waker());
207 Poll::Pending
208 }
209 }
210 })
211 }
212
213 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
214 self.mutex.lock(|state| {
215 let s = state.borrow();
216 match (&s.data, s.current_id > *id) {
217 (Some(data), true) if f(data) => {
218 *id = s.current_id;
219 s.data.clone()
220 }
221 _ => None,
222 }
223 })
224 }
225
226 fn drop_receiver(&self) {
227 self.mutex.lock(|state| {
228 let mut s = state.borrow_mut();
229 s.receiver_count -= 1;
230 })
231 }
232
233 fn clear(&self) {
234 self.mutex.lock(|state| {
235 let mut s = state.borrow_mut();
236 s.data = None;
237 })
238 }
239
240 fn send(&self, val: T) {
241 self.mutex.lock(|state| {
242 let mut s = state.borrow_mut();
243 s.data = Some(val);
244 s.current_id += 1;
245 s.wakers.wake();
246 })
247 }
248
249 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
250 self.mutex.lock(|state| {
251 let mut s = state.borrow_mut();
252 f(&mut s.data);
253 s.current_id += 1;
254 s.wakers.wake();
255 })
256 }
257
258 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
259 self.mutex.lock(|state| {
260 let mut s = state.borrow_mut();
261 if f(&mut s.data) {
262 s.current_id += 1;
263 s.wakers.wake();
264 }
265 })
266 }
267}
268
269impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
270 fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
271 self.mutex.lock(|state| {
272 let s = state.borrow();
273 if let Some(id) = id {
274 *id = s.current_id;
275 }
276 s.data.clone()
277 })
278 }
279
280 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
281 self.mutex.lock(|state| {
282 let s = state.borrow();
283 match s.data {
284 Some(ref data) if f(data) => {
285 if let Some(id) = id {
286 *id = s.current_id;
287 }
288 Some(data.clone())
289 }
290 _ => None,
291 }
292 })
293 }
294
295 fn contains_value(&self) -> bool {
296 self.mutex.lock(|state| state.borrow().data.is_some())
297 }
298}
299
300impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
301 /// Create a new `Watch` channel.
302 pub const fn new() -> Self {
303 Self {
304 mutex: Mutex::new(RefCell::new(WatchState {
305 data: None,
306 current_id: 0,
307 wakers: MultiWakerRegistration::new(),
308 receiver_count: 0,
309 })),
310 }
311 }
312
313 /// Create a new `Watch` channel with default data.
314 pub const fn new_with(data: T) -> Self {
315 Self {
316 mutex: Mutex::new(RefCell::new(WatchState {
317 data: Some(data),
318 current_id: 0,
319 wakers: MultiWakerRegistration::new(),
320 receiver_count: 0,
321 })),
322 }
323 }
324
325 /// Create a new [`Sender`] for the `Watch`.
326 pub fn sender(&self) -> Sender<'_, M, T, N> {
327 Sender(Snd::new(self))
328 }
329
330 /// Create a new [`DynSender`] for the `Watch`.
331 pub fn dyn_sender(&self) -> DynSender<'_, T> {
332 DynSender(Snd::new(self))
333 }
334
335 /// Try to create a new [`Receiver`] for the `Watch`. If the
336 /// maximum number of receivers has been reached, `None` is returned.
337 pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
338 self.mutex.lock(|state| {
339 let mut s = state.borrow_mut();
340 if s.receiver_count < N {
341 s.receiver_count += 1;
342 Some(Receiver(Rcv::new(self, 0)))
343 } else {
344 None
345 }
346 })
347 }
348
349 /// Try to create a new [`DynReceiver`] for the `Watch`. If the
350 /// maximum number of receivers has been reached, `None` is returned.
351 pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
352 self.mutex.lock(|state| {
353 let mut s = state.borrow_mut();
354 if s.receiver_count < N {
355 s.receiver_count += 1;
356 Some(DynReceiver(Rcv::new(self, 0)))
357 } else {
358 None
359 }
360 })
361 }
362
363 /// Try to create a new [`AnonReceiver`] for the `Watch`.
364 pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
365 AnonReceiver(AnonRcv::new(self, 0))
366 }
367
368 /// Try to create a new [`DynAnonReceiver`] for the `Watch`.
369 pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
370 DynAnonReceiver(AnonRcv::new(self, 0))
371 }
372
373 /// Returns the message ID of the latest message sent to the `Watch`.
374 ///
375 /// This counter is monotonic, and is incremented every time a new message is sent.
376 pub fn get_msg_id(&self) -> u64 {
377 self.mutex.lock(|state| state.borrow().current_id)
378 }
379
380 /// Tries to get the value of the `Watch`.
381 pub fn try_get(&self) -> Option<T> {
382 WatchBehavior::try_get(self, None)
383 }
384
385 /// Tries to get the value of the `Watch` if it matches the predicate function `f`.
386 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
387 where
388 F: Fn(&T) -> bool,
389 {
390 WatchBehavior::try_get_and(self, None, &mut f)
391 }
392}
393
394/// A receiver can `.await` a change in the `Watch` value.
395pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
396 watch: &'a W,
397 _phantom: PhantomData<T>,
398}
399
400impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
401 fn clone(&self) -> Self {
402 Self {
403 watch: self.watch,
404 _phantom: PhantomData,
405 }
406 }
407}
408
409impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
410 /// Creates a new `Receiver` with a reference to the `Watch`.
411 fn new(watch: &'a W) -> Self {
412 Self {
413 watch,
414 _phantom: PhantomData,
415 }
416 }
417
418 /// Sends a new value to the `Watch`.
419 pub fn send(&self, val: T) {
420 self.watch.send(val)
421 }
422
423 /// Clears the value of the `Watch`.
424 /// This will cause calls to [`Rcv::get`] to be pending.
425 pub fn clear(&self) {
426 self.watch.clear()
427 }
428
429 /// Tries to retrieve the value of the `Watch`.
430 pub fn try_get(&self) -> Option<T> {
431 self.watch.try_get(None)
432 }
433
434 /// Tries to peek the current value of the `Watch` if it matches the predicate
435 /// function `f`.
436 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
437 where
438 F: Fn(&T) -> bool,
439 {
440 self.watch.try_get_and(None, &mut f)
441 }
442
443 /// Returns true if the `Watch` contains a value.
444 pub fn contains_value(&self) -> bool {
445 self.watch.contains_value()
446 }
447
448 /// Modify the value of the `Watch` using a closure.
449 pub fn send_modify<F>(&self, mut f: F)
450 where
451 F: Fn(&mut Option<T>),
452 {
453 self.watch.send_modify(&mut f)
454 }
455
456 /// Modify the value of the `Watch` using a closure. The closure must return
457 /// `true` if the value was modified, which notifies all receivers.
458 pub fn send_if_modified<F>(&self, mut f: F)
459 where
460 F: Fn(&mut Option<T>) -> bool,
461 {
462 self.watch.send_if_modified(&mut f)
463 }
464}
465
466/// A sender of a `Watch` channel.
467///
468/// For a simpler type definition, consider [`DynSender`] at the expense of
469/// some runtime performance due to dynamic dispatch.
470pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
471
472impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
473 fn clone(&self) -> Self {
474 Self(self.0.clone())
475 }
476}
477
478impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
479 /// Converts the `Sender` into a [`DynSender`].
480 pub fn as_dyn(self) -> DynSender<'a, T> {
481 DynSender(Snd::new(self.watch))
482 }
483}
484
485impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
486 fn into(self) -> DynSender<'a, T> {
487 self.as_dyn()
488 }
489}
490
491impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
492 type Target = Snd<'a, T, Watch<M, T, N>>;
493
494 fn deref(&self) -> &Self::Target {
495 &self.0
496 }
497}
498
499impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
500 fn deref_mut(&mut self) -> &mut Self::Target {
501 &mut self.0
502 }
503}
504
505/// A sender which holds a **dynamic** reference to a `Watch` channel.
506///
507/// This is an alternative to [`Sender`] with a simpler type definition,
508pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
509
510impl<'a, T: Clone> Clone for DynSender<'a, T> {
511 fn clone(&self) -> Self {
512 Self(self.0.clone())
513 }
514}
515
516impl<'a, T: Clone> Deref for DynSender<'a, T> {
517 type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
518
519 fn deref(&self) -> &Self::Target {
520 &self.0
521 }
522}
523
524impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
525 fn deref_mut(&mut self) -> &mut Self::Target {
526 &mut self.0
527 }
528}
529
530/// A receiver can `.await` a change in the `Watch` value.
531pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
532 watch: &'a W,
533 at_id: u64,
534 _phantom: PhantomData<T>,
535}
536
537impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
538 /// Creates a new `Receiver` with a reference to the `Watch`.
539 fn new(watch: &'a W, at_id: u64) -> Self {
540 Self {
541 watch,
542 at_id,
543 _phantom: PhantomData,
544 }
545 }
546
547 /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
548 ///
549 /// **Note**: Futures do nothing unless you `.await` or poll them.
550 pub async fn get(&mut self) -> T {
551 poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)).await
552 }
553
554 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
555 pub fn try_get(&mut self) -> Option<T> {
556 self.watch.try_get(Some(&mut self.at_id))
557 }
558
559 /// Returns the value of the `Watch` if it matches the predicate function `f`,
560 /// or waits for it to match, marking it as seen.
561 ///
562 /// **Note**: Futures do nothing unless you `.await` or poll them.
563 pub async fn get_and<F>(&mut self, mut f: F) -> T
564 where
565 F: Fn(&T) -> bool,
566 {
567 poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
568 }
569
570 /// Tries to get the current value of the `Watch` if it matches the predicate
571 /// function `f` without waiting, marking it as seen.
572 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
573 where
574 F: Fn(&T) -> bool,
575 {
576 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
577 }
578
579 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
580 ///
581 /// **Note**: Futures do nothing unless you `.await` or poll them.
582 pub async fn changed(&mut self) -> T {
583 poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
584 }
585
586 /// Tries to get the new value of the watch without waiting, marking it as seen.
587 pub fn try_changed(&mut self) -> Option<T> {
588 self.watch.try_changed(&mut self.at_id)
589 }
590
591 /// Waits for the `Watch` to change to a value which satisfies the predicate
592 /// function `f` and returns the new value, marking it as seen.
593 ///
594 /// **Note**: Futures do nothing unless you `.await` or poll them.
595 pub async fn changed_and<F>(&mut self, mut f: F) -> T
596 where
597 F: Fn(&T) -> bool,
598 {
599 poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
600 }
601
602 /// Tries to get the new value of the watch which satisfies the predicate
603 /// function `f` and returns the new value without waiting, marking it as seen.
604 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
605 where
606 F: Fn(&T) -> bool,
607 {
608 self.watch.try_changed_and(&mut self.at_id, &mut f)
609 }
610
611 /// Checks if the `Watch` contains a value. If this returns true,
612 /// then awaiting [`Rcv::get`] will return immediately.
613 pub fn contains_value(&self) -> bool {
614 self.watch.contains_value()
615 }
616}
617
618impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
619 fn drop(&mut self) {
620 self.watch.drop_receiver();
621 }
622}
623
624/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
625pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
626 watch: &'a W,
627 at_id: u64,
628 _phantom: PhantomData<T>,
629}
630
631impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
632 /// Creates a new `Receiver` with a reference to the `Watch`.
633 fn new(watch: &'a W, at_id: u64) -> Self {
634 Self {
635 watch,
636 at_id,
637 _phantom: PhantomData,
638 }
639 }
640
641 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
642 pub fn try_get(&mut self) -> Option<T> {
643 self.watch.try_get(Some(&mut self.at_id))
644 }
645
646 /// Tries to get the current value of the `Watch` if it matches the predicate
647 /// function `f` without waiting, marking it as seen.
648 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
649 where
650 F: Fn(&T) -> bool,
651 {
652 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
653 }
654
655 /// Tries to get the new value of the watch without waiting, marking it as seen.
656 pub fn try_changed(&mut self) -> Option<T> {
657 self.watch.try_changed(&mut self.at_id)
658 }
659
660 /// Tries to get the new value of the watch which satisfies the predicate
661 /// function `f` and returns the new value without waiting, marking it as seen.
662 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
663 where
664 F: Fn(&T) -> bool,
665 {
666 self.watch.try_changed_and(&mut self.at_id, &mut f)
667 }
668
669 /// Checks if the `Watch` contains a value. If this returns true,
670 /// then awaiting [`Rcv::get`] will return immediately.
671 pub fn contains_value(&self) -> bool {
672 self.watch.contains_value()
673 }
674}
675
676/// A receiver of a `Watch` channel.
677pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
678
679impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
680 /// Converts the `Receiver` into a [`DynReceiver`].
681 pub fn as_dyn(self) -> DynReceiver<'a, T> {
682 let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
683 core::mem::forget(self); // Ensures the destructor is not called
684 rcv
685 }
686}
687
688impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
689 fn into(self) -> DynReceiver<'a, T> {
690 self.as_dyn()
691 }
692}
693
694impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
695 type Target = Rcv<'a, T, Watch<M, T, N>>;
696
697 fn deref(&self) -> &Self::Target {
698 &self.0
699 }
700}
701
702impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
703 fn deref_mut(&mut self) -> &mut Self::Target {
704 &mut self.0
705 }
706}
707
708/// A receiver which holds a **dynamic** reference to a `Watch` channel.
709///
710/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
711/// some runtime performance due to dynamic dispatch.
712pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
713
714impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
715 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
716
717 fn deref(&self) -> &Self::Target {
718 &self.0
719 }
720}
721
722impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
723 fn deref_mut(&mut self) -> &mut Self::Target {
724 &mut self.0
725 }
726}
727
728/// A receiver of a `Watch` channel that cannot `.await` values.
729pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
730
731impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
732 /// Converts the `Receiver` into a [`DynReceiver`].
733 pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
734 let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
735 core::mem::forget(self); // Ensures the destructor is not called
736 rcv
737 }
738}
739
740impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
741 fn into(self) -> DynAnonReceiver<'a, T> {
742 self.as_dyn()
743 }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
747 type Target = AnonRcv<'a, T, Watch<M, T, N>>;
748
749 fn deref(&self) -> &Self::Target {
750 &self.0
751 }
752}
753
754impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
755 fn deref_mut(&mut self) -> &mut Self::Target {
756 &mut self.0
757 }
758}
759
760/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
761///
762/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
763/// some runtime performance due to dynamic dispatch.
764pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
765
766impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
767 type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
768
769 fn deref(&self) -> &Self::Target {
770 &self.0
771 }
772}
773
774impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
775 fn deref_mut(&mut self) -> &mut Self::Target {
776 &mut self.0
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use futures_executor::block_on;
783
784 use super::Watch;
785 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
786
787 #[test]
788 fn multiple_sends() {
789 let f = async {
790 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
791
792 // Obtain receiver and sender
793 let mut rcv = WATCH.receiver().unwrap();
794 let snd = WATCH.sender();
795
796 // Not initialized
797 assert_eq!(rcv.try_changed(), None);
798
799 // Receive the new value
800 snd.send(10);
801 assert_eq!(rcv.changed().await, 10);
802
803 // Receive another value
804 snd.send(20);
805 assert_eq!(rcv.try_changed(), Some(20));
806
807 // No update
808 assert_eq!(rcv.try_changed(), None);
809 };
810 block_on(f);
811 }
812
813 #[test]
814 fn all_try_get() {
815 let f = async {
816 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
817
818 // Obtain receiver and sender
819 let mut rcv = WATCH.receiver().unwrap();
820 let snd = WATCH.sender();
821
822 // Not initialized
823 assert_eq!(WATCH.try_get(), None);
824 assert_eq!(rcv.try_get(), None);
825 assert_eq!(snd.try_get(), None);
826
827 // Receive the new value
828 snd.send(10);
829 assert_eq!(WATCH.try_get(), Some(10));
830 assert_eq!(rcv.try_get(), Some(10));
831 assert_eq!(snd.try_get(), Some(10));
832
833 assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
834 assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
835 assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
836
837 assert_eq!(WATCH.try_get_and(|x| x < &5), None);
838 assert_eq!(rcv.try_get_and(|x| x < &5), None);
839 assert_eq!(snd.try_get_and(|x| x < &5), None);
840 };
841 block_on(f);
842 }
843
844 #[test]
845 fn once_lock_like() {
846 let f = async {
847 static CONFIG0: u8 = 10;
848 static CONFIG1: u8 = 20;
849
850 static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
851
852 // Obtain receiver and sender
853 let mut rcv = WATCH.receiver().unwrap();
854 let snd = WATCH.sender();
855
856 // Not initialized
857 assert_eq!(rcv.try_changed(), None);
858
859 // Receive the new value
860 snd.send(&CONFIG0);
861 let rcv0 = rcv.changed().await;
862 assert_eq!(rcv0, &10);
863
864 // Receive another value
865 snd.send(&CONFIG1);
866 let rcv1 = rcv.try_changed();
867 assert_eq!(rcv1, Some(&20));
868
869 // No update
870 assert_eq!(rcv.try_changed(), None);
871
872 // Ensure similarity with original static
873 assert_eq!(rcv0, &CONFIG0);
874 assert_eq!(rcv1, Some(&CONFIG1));
875 };
876 block_on(f);
877 }
878
879 #[test]
880 fn sender_modify() {
881 let f = async {
882 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
883
884 // Obtain receiver and sender
885 let mut rcv = WATCH.receiver().unwrap();
886 let snd = WATCH.sender();
887
888 // Receive the new value
889 snd.send(10);
890 assert_eq!(rcv.try_changed(), Some(10));
891
892 // Modify the value inplace
893 snd.send_modify(|opt| {
894 if let Some(inner) = opt {
895 *inner += 5;
896 }
897 });
898
899 // Get the modified value
900 assert_eq!(rcv.try_changed(), Some(15));
901 assert_eq!(rcv.try_changed(), None);
902 };
903 block_on(f);
904 }
905
906 #[test]
907 fn predicate_fn() {
908 let f = async {
909 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
910
911 // Obtain receiver and sender
912 let mut rcv = WATCH.receiver().unwrap();
913 let snd = WATCH.sender();
914
915 snd.send(15);
916 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
917 assert_eq!(rcv.try_get_and(|x| x < &5), None);
918 assert!(rcv.try_changed().is_none());
919
920 snd.send(20);
921 assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
922 assert_eq!(rcv.try_changed_and(|x| x > &5), None);
923
924 snd.send(25);
925 assert_eq!(rcv.try_changed_and(|x| x < &5), None);
926 assert_eq!(rcv.try_changed(), Some(25));
927
928 snd.send(30);
929 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
930 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
931 };
932 block_on(f);
933 }
934
935 #[test]
936 fn receive_after_create() {
937 let f = async {
938 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
939
940 // Obtain sender and send value
941 let snd = WATCH.sender();
942 snd.send(10);
943
944 // Obtain receiver and receive value
945 let mut rcv = WATCH.receiver().unwrap();
946 assert_eq!(rcv.try_changed(), Some(10));
947 };
948 block_on(f);
949 }
950
951 #[test]
952 fn max_receivers_drop() {
953 let f = async {
954 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
955
956 // Try to create 3 receivers (only 2 can exist at once)
957 let rcv0 = WATCH.receiver();
958 let rcv1 = WATCH.receiver();
959 let rcv2 = WATCH.receiver();
960
961 // Ensure the first two are successful and the third is not
962 assert!(rcv0.is_some());
963 assert!(rcv1.is_some());
964 assert!(rcv2.is_none());
965
966 // Drop the first receiver
967 drop(rcv0);
968
969 // Create another receiver and ensure it is successful
970 let rcv3 = WATCH.receiver();
971 assert!(rcv3.is_some());
972 };
973 block_on(f);
974 }
975
976 #[test]
977 fn multiple_receivers() {
978 let f = async {
979 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
980
981 // Obtain receivers and sender
982 let mut rcv0 = WATCH.receiver().unwrap();
983 let mut rcv1 = WATCH.anon_receiver();
984 let snd = WATCH.sender();
985
986 // No update for both
987 assert_eq!(rcv0.try_changed(), None);
988 assert_eq!(rcv1.try_changed(), None);
989
990 // Send a new value
991 snd.send(0);
992
993 // Both receivers receive the new value
994 assert_eq!(rcv0.try_changed(), Some(0));
995 assert_eq!(rcv1.try_changed(), Some(0));
996 };
997 block_on(f);
998 }
999
1000 #[test]
1001 fn clone_senders() {
1002 let f = async {
1003 // Obtain different ways to send
1004 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1005 let snd0 = WATCH.sender();
1006 let snd1 = snd0.clone();
1007
1008 // Obtain Receiver
1009 let mut rcv = WATCH.receiver().unwrap().as_dyn();
1010
1011 // Send a value from first sender
1012 snd0.send(10);
1013 assert_eq!(rcv.try_changed(), Some(10));
1014
1015 // Send a value from second sender
1016 snd1.send(20);
1017 assert_eq!(rcv.try_changed(), Some(20));
1018 };
1019 block_on(f);
1020 }
1021
1022 #[test]
1023 fn use_dynamics() {
1024 let f = async {
1025 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1026
1027 // Obtain receiver and sender
1028 let mut anon_rcv = WATCH.dyn_anon_receiver();
1029 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1030 let dyn_snd = WATCH.dyn_sender();
1031
1032 // Send a value
1033 dyn_snd.send(10);
1034
1035 // Ensure the dynamic receiver receives the value
1036 assert_eq!(anon_rcv.try_changed(), Some(10));
1037 assert_eq!(dyn_rcv.try_changed(), Some(10));
1038 assert_eq!(dyn_rcv.try_changed(), None);
1039 };
1040 block_on(f);
1041 }
1042
1043 #[test]
1044 fn convert_to_dyn() {
1045 let f = async {
1046 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1047
1048 // Obtain receiver and sender
1049 let anon_rcv = WATCH.anon_receiver();
1050 let rcv = WATCH.receiver().unwrap();
1051 let snd = WATCH.sender();
1052
1053 // Convert to dynamic
1054 let mut dyn_anon_rcv = anon_rcv.as_dyn();
1055 let mut dyn_rcv = rcv.as_dyn();
1056 let dyn_snd = snd.as_dyn();
1057
1058 // Send a value
1059 dyn_snd.send(10);
1060
1061 // Ensure the dynamic receiver receives the value
1062 assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1063 assert_eq!(dyn_rcv.try_changed(), Some(10));
1064 assert_eq!(dyn_rcv.try_changed(), None);
1065 };
1066 block_on(f);
1067 }
1068
1069 #[test]
1070 fn dynamic_receiver_count() {
1071 let f = async {
1072 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1073
1074 // Obtain receiver and sender
1075 let rcv0 = WATCH.receiver();
1076 let rcv1 = WATCH.receiver();
1077 let rcv2 = WATCH.receiver();
1078
1079 // Ensure the first two are successful and the third is not
1080 assert!(rcv0.is_some());
1081 assert!(rcv1.is_some());
1082 assert!(rcv2.is_none());
1083
1084 // Convert to dynamic
1085 let dyn_rcv0 = rcv0.unwrap().as_dyn();
1086
1087 // Drop the (now dynamic) receiver
1088 drop(dyn_rcv0);
1089
1090 // Create another receiver and ensure it is successful
1091 let rcv3 = WATCH.receiver();
1092 let rcv4 = WATCH.receiver();
1093 assert!(rcv3.is_some());
1094 assert!(rcv4.is_none());
1095 };
1096 block_on(f);
1097 }
1098
1099 #[test]
1100 fn contains_value() {
1101 let f = async {
1102 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1103
1104 // Obtain receiver and sender
1105 let rcv = WATCH.receiver().unwrap();
1106 let snd = WATCH.sender();
1107
1108 // check if the watch contains a value
1109 assert_eq!(rcv.contains_value(), false);
1110 assert_eq!(snd.contains_value(), false);
1111
1112 // Send a value
1113 snd.send(10);
1114
1115 // check if the watch contains a value
1116 assert_eq!(rcv.contains_value(), true);
1117 assert_eq!(snd.contains_value(), true);
1118 };
1119 block_on(f);
1120 }
1121}
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
index cfce9a571..fabb69bf6 100644
--- a/embassy-sync/src/zerocopy_channel.rs
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -53,7 +53,7 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
53 buf: buf.as_mut_ptr(), 53 buf: buf.as_mut_ptr(),
54 phantom: PhantomData, 54 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State { 55 state: Mutex::new(RefCell::new(State {
56 len, 56 capacity: len,
57 front: 0, 57 front: 0,
58 back: 0, 58 back: 0,
59 full: false, 59 full: false,
@@ -70,6 +70,28 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
70 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { 70 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self }) 71 (Sender { channel: self }, Receiver { channel: self })
72 } 72 }
73
74 /// Clears all elements in the channel.
75 pub fn clear(&mut self) {
76 self.state.lock(|s| {
77 s.borrow_mut().clear();
78 });
79 }
80
81 /// Returns the number of elements currently in the channel.
82 pub fn len(&self) -> usize {
83 self.state.lock(|s| s.borrow().len())
84 }
85
86 /// Returns whether the channel is empty.
87 pub fn is_empty(&self) -> bool {
88 self.state.lock(|s| s.borrow().is_empty())
89 }
90
91 /// Returns whether the channel is full.
92 pub fn is_full(&self) -> bool {
93 self.state.lock(|s| s.borrow().is_full())
94 }
73} 95}
74 96
75/// Send-only access to a [`Channel`]. 97/// Send-only access to a [`Channel`].
@@ -130,6 +152,28 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
130 pub fn send_done(&mut self) { 152 pub fn send_done(&mut self) {
131 self.channel.state.lock(|s| s.borrow_mut().push_done()) 153 self.channel.state.lock(|s| s.borrow_mut().push_done())
132 } 154 }
155
156 /// Clears all elements in the channel.
157 pub fn clear(&mut self) {
158 self.channel.state.lock(|s| {
159 s.borrow_mut().clear();
160 });
161 }
162
163 /// Returns the number of elements currently in the channel.
164 pub fn len(&self) -> usize {
165 self.channel.state.lock(|s| s.borrow().len())
166 }
167
168 /// Returns whether the channel is empty.
169 pub fn is_empty(&self) -> bool {
170 self.channel.state.lock(|s| s.borrow().is_empty())
171 }
172
173 /// Returns whether the channel is full.
174 pub fn is_full(&self) -> bool {
175 self.channel.state.lock(|s| s.borrow().is_full())
176 }
133} 177}
134 178
135/// Receive-only access to a [`Channel`]. 179/// Receive-only access to a [`Channel`].
@@ -190,10 +234,33 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
190 pub fn receive_done(&mut self) { 234 pub fn receive_done(&mut self) {
191 self.channel.state.lock(|s| s.borrow_mut().pop_done()) 235 self.channel.state.lock(|s| s.borrow_mut().pop_done())
192 } 236 }
237
238 /// Clears all elements in the channel.
239 pub fn clear(&mut self) {
240 self.channel.state.lock(|s| {
241 s.borrow_mut().clear();
242 });
243 }
244
245 /// Returns the number of elements currently in the channel.
246 pub fn len(&self) -> usize {
247 self.channel.state.lock(|s| s.borrow().len())
248 }
249
250 /// Returns whether the channel is empty.
251 pub fn is_empty(&self) -> bool {
252 self.channel.state.lock(|s| s.borrow().is_empty())
253 }
254
255 /// Returns whether the channel is full.
256 pub fn is_full(&self) -> bool {
257 self.channel.state.lock(|s| s.borrow().is_full())
258 }
193} 259}
194 260
195struct State { 261struct State {
196 len: usize, 262 /// Maximum number of elements the channel can hold.
263 capacity: usize,
197 264
198 /// Front index. Always 0..=(N-1) 265 /// Front index. Always 0..=(N-1)
199 front: usize, 266 front: usize,
@@ -210,13 +277,31 @@ struct State {
210 277
211impl State { 278impl State {
212 fn increment(&self, i: usize) -> usize { 279 fn increment(&self, i: usize) -> usize {
213 if i + 1 == self.len { 280 if i + 1 == self.capacity {
214 0 281 0
215 } else { 282 } else {
216 i + 1 283 i + 1
217 } 284 }
218 } 285 }
219 286
287 fn clear(&mut self) {
288 self.front = 0;
289 self.back = 0;
290 self.full = false;
291 }
292
293 fn len(&self) -> usize {
294 if !self.full {
295 if self.back >= self.front {
296 self.back - self.front
297 } else {
298 self.capacity + self.back - self.front
299 }
300 } else {
301 self.capacity
302 }
303 }
304
220 fn is_full(&self) -> bool { 305 fn is_full(&self) -> bool {
221 self.full 306 self.full
222 } 307 }