aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-sync/src/lib.rs2
-rw-r--r--embassy-sync/src/multi_signal.rs529
-rw-r--r--embassy-sync/src/watch.rs515
3 files changed, 516 insertions, 530 deletions
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index f02985564..8a69541a5 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -12,11 +12,11 @@ mod ring_buffer;
12 12
13pub mod blocking_mutex; 13pub mod blocking_mutex;
14pub mod channel; 14pub mod channel;
15pub mod multi_signal;
16pub mod mutex; 15pub mod mutex;
17pub mod pipe; 16pub mod pipe;
18pub mod priority_channel; 17pub mod priority_channel;
19pub mod pubsub; 18pub mod pubsub;
20pub mod signal; 19pub mod signal;
21pub mod waitqueue; 20pub mod waitqueue;
21pub mod watch;
22pub mod zerocopy_channel; 22pub mod zerocopy_channel;
diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs
deleted file mode 100644
index ff9f72f2e..000000000
--- a/embassy-sync/src/multi_signal.rs
+++ /dev/null
@@ -1,529 +0,0 @@
1//! A synchronization primitive for passing the latest value to **multiple** tasks.
2use core::cell::RefCell;
3use core::ops::{Deref, DerefMut};
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use futures_util::Future;
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s.
14///
15/// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to
16/// `.await` the latest value, and all receive it.
17///
18/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except
19/// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead
20/// of waiting for the receivers to pop the previous value.
21///
22/// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other
23/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for
24/// [`Receiver`]s to "lose" stale values.
25///
26/// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared
27/// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`]
28/// with [`MultiSignal::receiver`] which has async methods.
29/// ```
30///
31/// use futures_executor::block_on;
32/// use embassy_sync::multi_signal::MultiSignal;
33/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
34///
35/// let f = async {
36///
37/// static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
38///
39/// // Obtain Receivers
40/// let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
41/// let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
42/// assert!(SOME_SIGNAL.receiver().is_err());
43///
44/// SOME_SIGNAL.write(10);
45///
46/// // Receive the new value
47/// assert_eq!(rcv0.changed().await, 10);
48/// assert_eq!(rcv1.try_changed(), Some(10));
49///
50/// // No update
51/// assert_eq!(rcv0.try_changed(), None);
52/// assert_eq!(rcv1.try_changed(), None);
53///
54/// SOME_SIGNAL.write(20);
55///
56/// // Receive new value with predicate
57/// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20);
58/// assert_eq!(rcv1.try_changed_and(|x|x>&30), None);
59///
60/// // Anyone can peek the current value
61/// assert_eq!(rcv0.peek(), 20);
62/// assert_eq!(rcv1.peek(), 20);
63/// assert_eq!(SOME_SIGNAL.peek(), 20);
64/// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None);
65/// };
66/// block_on(f);
67/// ```
68pub struct MultiSignal<M: RawMutex, T: Clone, const N: usize> {
69 mutex: Mutex<M, RefCell<MultiSignalState<N, T>>>,
70}
71
72struct MultiSignalState<const N: usize, T: Clone> {
73 data: T,
74 current_id: u64,
75 wakers: MultiWakerRegistration<N>,
76 receiver_count: usize,
77}
78
79#[derive(Debug)]
80/// An error that can occur when a `MultiSignal` returns a `Result`.
81pub enum Error {
82 /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached.
83 MaximumReceiversReached,
84}
85
86impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<M, T, N> {
87 /// Create a new `MultiSignal` initialized with the given value.
88 pub const fn new(init: T) -> Self {
89 Self {
90 mutex: Mutex::new(RefCell::new(MultiSignalState {
91 data: init,
92 current_id: 1,
93 wakers: MultiWakerRegistration::new(),
94 receiver_count: 0,
95 })),
96 }
97 }
98
99 /// Get a [`Receiver`] for the `MultiSignal`.
100 pub fn receiver<'s>(&'a self) -> Result<Receiver<'a, M, T, N>, Error> {
101 self.mutex.lock(|state| {
102 let mut s = state.borrow_mut();
103 if s.receiver_count < N {
104 s.receiver_count += 1;
105 Ok(Receiver(Rcv::new(self)))
106 } else {
107 Err(Error::MaximumReceiversReached)
108 }
109 })
110 }
111
112 /// Update the value of the `MultiSignal`.
113 pub fn write(&self, data: T) {
114 self.mutex.lock(|state| {
115 let mut s = state.borrow_mut();
116 s.data = data;
117 s.current_id += 1;
118 s.wakers.wake();
119 })
120 }
121
122 /// Peek the current value of the `MultiSignal`.
123 pub fn peek(&self) -> T {
124 self.mutex.lock(|state| state.borrow().data.clone())
125 }
126
127 /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`.
128 pub fn peek_and(&self, mut f: impl FnMut(&T) -> bool) -> Option<T> {
129 self.mutex.lock(|state| {
130 let s = state.borrow();
131 if f(&s.data) {
132 Some(s.data.clone())
133 } else {
134 None
135 }
136 })
137 }
138
139 /// Get the ID of the current value of the `MultiSignal`.
140 /// This method is mostly for testing purposes.
141 #[allow(dead_code)]
142 fn get_id(&self) -> u64 {
143 self.mutex.lock(|state| state.borrow().current_id)
144 }
145}
146
147/// A receiver is able to `.await` a changed `MultiSignal` value.
148pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> {
149 multi_sig: &'a MultiSignal<M, T, N>,
150 at_id: u64,
151}
152
153impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> {
154 /// Create a new `Receiver` with a reference the given `MultiSignal`.
155 fn new(multi_sig: &'a MultiSignal<M, T, N>) -> Self {
156 Self { multi_sig, at_id: 0 }
157 }
158
159 /// Wait for a change to the value of the corresponding `MultiSignal`.
160 pub async fn changed(&mut self) -> T {
161 ReceiverWaitFuture { subscriber: self }.await
162 }
163
164 /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`.
165 pub async fn changed_and<F>(&mut self, f: F) -> T
166 where
167 F: FnMut(&T) -> bool,
168 {
169 ReceiverPredFuture {
170 subscriber: self,
171 predicate: f,
172 }
173 .await
174 }
175
176 /// Try to get a changed value of the corresponding `MultiSignal`.
177 pub fn try_changed(&mut self) -> Option<T> {
178 self.multi_sig.mutex.lock(|state| {
179 let s = state.borrow();
180 match s.current_id > self.at_id {
181 true => {
182 self.at_id = s.current_id;
183 Some(s.data.clone())
184 }
185 false => None,
186 }
187 })
188 }
189
190 /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`.
191 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
192 where
193 F: FnMut(&T) -> bool,
194 {
195 self.multi_sig.mutex.lock(|state| {
196 let s = state.borrow();
197 match s.current_id > self.at_id && f(&s.data) {
198 true => {
199 self.at_id = s.current_id;
200 Some(s.data.clone())
201 }
202 false => None,
203 }
204 })
205 }
206
207 /// Peek the current value of the corresponding `MultiSignal`.
208 pub fn peek(&self) -> T {
209 self.multi_sig.peek()
210 }
211
212 /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`.
213 pub fn peek_and<F>(&self, f: F) -> Option<T>
214 where
215 F: FnMut(&T) -> bool,
216 {
217 self.multi_sig.peek_and(f)
218 }
219
220 /// Check if the value of the corresponding `MultiSignal` has changed.
221 pub fn has_changed(&mut self) -> bool {
222 self.multi_sig
223 .mutex
224 .lock(|state| state.borrow().current_id > self.at_id)
225 }
226}
227
228/// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value.
229pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>);
230
231impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
232 type Target = Rcv<'a, M, T, N>;
233
234 fn deref(&self) -> &Self::Target {
235 &self.0
236 }
237}
238
239impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
240 fn deref_mut(&mut self) -> &mut Self::Target {
241 &mut self.0
242 }
243}
244
245/// Future for the `Receiver` wait action
246#[must_use = "futures do nothing unless you `.await` or poll them"]
247pub struct ReceiverWaitFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> {
248 subscriber: &'s mut Rcv<'a, M, T, N>,
249}
250
251impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverWaitFuture<'s, 'a, M, T, N> {}
252impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverWaitFuture<'s, 'a, M, T, N> {
253 type Output = T;
254
255 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256 self.get_with_context(Some(cx))
257 }
258}
259
260impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> ReceiverWaitFuture<'s, 'a, M, T, N> {
261 /// Poll the `MultiSignal` with an optional context.
262 fn get_with_context(&mut self, cx: Option<&mut Context>) -> Poll<T> {
263 self.subscriber.multi_sig.mutex.lock(|state| {
264 let mut s = state.borrow_mut();
265 match s.current_id > self.subscriber.at_id {
266 true => {
267 self.subscriber.at_id = s.current_id;
268 Poll::Ready(s.data.clone())
269 }
270 _ => {
271 if let Some(cx) = cx {
272 s.wakers.register(cx.waker());
273 }
274 Poll::Pending
275 }
276 }
277 })
278 }
279}
280
281/// Future for the `Receiver` wait action, with the ability to filter the value with a predicate.
282#[must_use = "futures do nothing unless you `.await` or poll them"]
283pub struct ReceiverPredFuture<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&'a T) -> bool, const N: usize> {
284 subscriber: &'s mut Rcv<'a, M, T, N>,
285 predicate: F,
286}
287
288impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin
289 for ReceiverPredFuture<'s, 'a, M, T, F, N>
290{
291}
292impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future
293 for ReceiverPredFuture<'s, 'a, M, T, F, N>
294{
295 type Output = T;
296
297 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
298 self.get_with_context_pred(Some(cx))
299 }
300}
301
302impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> ReceiverPredFuture<'s, 'a, M, T, F, N> {
303 /// Poll the `MultiSignal` with an optional context.
304 fn get_with_context_pred(&mut self, cx: Option<&mut Context>) -> Poll<T> {
305 self.subscriber.multi_sig.mutex.lock(|state| {
306 let mut s = state.borrow_mut();
307 match s.current_id > self.subscriber.at_id {
308 true if (self.predicate)(&s.data) => {
309 self.subscriber.at_id = s.current_id;
310 Poll::Ready(s.data.clone())
311 }
312 _ => {
313 if let Some(cx) = cx {
314 s.wakers.register(cx.waker());
315 }
316 Poll::Pending
317 }
318 }
319 })
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use futures_executor::block_on;
326
327 use super::*;
328 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
329
330 #[test]
331 fn multiple_writes() {
332 let f = async {
333 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
334
335 // Obtain Receivers
336 let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
337 let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
338
339 SOME_SIGNAL.write(10);
340
341 // Receive the new value
342 assert_eq!(rcv0.changed().await, 10);
343 assert_eq!(rcv1.changed().await, 10);
344
345 // No update
346 assert_eq!(rcv0.try_changed(), None);
347 assert_eq!(rcv1.try_changed(), None);
348
349 SOME_SIGNAL.write(20);
350
351 assert_eq!(rcv0.changed().await, 20);
352 assert_eq!(rcv1.changed().await, 20);
353 };
354 block_on(f);
355 }
356
357 #[test]
358 fn max_receivers() {
359 let f = async {
360 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
361
362 // Obtain Receivers
363 let _ = SOME_SIGNAL.receiver().unwrap();
364 let _ = SOME_SIGNAL.receiver().unwrap();
365 assert!(SOME_SIGNAL.receiver().is_err());
366 };
367 block_on(f);
368 }
369
370 // Really weird edge case, but it's possible to have a receiver that never gets a value.
371 #[test]
372 fn receive_initial() {
373 let f = async {
374 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
375
376 // Obtain Receivers
377 let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
378 let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
379
380 assert_eq!(rcv0.try_changed(), Some(0));
381 assert_eq!(rcv1.try_changed(), Some(0));
382
383 assert_eq!(rcv0.try_changed(), None);
384 assert_eq!(rcv1.try_changed(), None);
385 };
386 block_on(f);
387 }
388
389 #[test]
390 fn count_ids() {
391 let f = async {
392 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
393
394 // Obtain Receivers
395 let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
396 let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
397
398 SOME_SIGNAL.write(10);
399
400 assert_eq!(rcv0.changed().await, 10);
401 assert_eq!(rcv1.changed().await, 10);
402
403 assert_eq!(rcv0.try_changed(), None);
404 assert_eq!(rcv1.try_changed(), None);
405
406 SOME_SIGNAL.write(20);
407 SOME_SIGNAL.write(20);
408 SOME_SIGNAL.write(20);
409
410 assert_eq!(rcv0.changed().await, 20);
411 assert_eq!(rcv1.changed().await, 20);
412
413 assert_eq!(rcv0.try_changed(), None);
414 assert_eq!(rcv1.try_changed(), None);
415
416 assert_eq!(SOME_SIGNAL.get_id(), 5);
417 };
418 block_on(f);
419 }
420
421 #[test]
422 fn peek_still_await() {
423 let f = async {
424 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
425
426 // Obtain Receivers
427 let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
428 let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
429
430 SOME_SIGNAL.write(10);
431
432 assert_eq!(rcv0.peek(), 10);
433 assert_eq!(rcv1.peek(), 10);
434
435 assert_eq!(rcv0.changed().await, 10);
436 assert_eq!(rcv1.changed().await, 10);
437 };
438 block_on(f);
439 }
440
441 #[test]
442 fn predicate() {
443 let f = async {
444 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
445
446 // Obtain Receivers
447 let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
448 let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
449
450 SOME_SIGNAL.write(20);
451
452 assert_eq!(rcv0.changed_and(|x| x > &10).await, 20);
453 assert_eq!(rcv1.try_changed_and(|x| x > &30), None);
454 };
455 block_on(f);
456 }
457
458 #[test]
459 fn mutable_predicate() {
460 let f = async {
461 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
462
463 // Obtain Receivers
464 let mut rcv = SOME_SIGNAL.receiver().unwrap();
465
466 SOME_SIGNAL.write(10);
467
468 let mut largest = 0;
469 let mut predicate = |x: &u8| {
470 if *x > largest {
471 largest = *x;
472 }
473 true
474 };
475
476 assert_eq!(rcv.changed_and(&mut predicate).await, 10);
477
478 SOME_SIGNAL.write(20);
479
480 assert_eq!(rcv.changed_and(&mut predicate).await, 20);
481
482 SOME_SIGNAL.write(5);
483
484 assert_eq!(rcv.changed_and(&mut predicate).await, 5);
485
486 assert_eq!(largest, 20)
487 };
488 block_on(f);
489 }
490
491 #[test]
492 fn peek_and() {
493 let f = async {
494 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
495
496 // Obtain Receivers
497 let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
498 let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
499
500 SOME_SIGNAL.write(20);
501
502 assert_eq!(rcv0.peek_and(|x| x > &10), Some(20));
503 assert_eq!(rcv1.peek_and(|x| x > &30), None);
504
505 assert_eq!(rcv0.changed().await, 20);
506 assert_eq!(rcv1.changed().await, 20);
507 };
508 block_on(f);
509 }
510
511 #[test]
512 fn peek_with_static() {
513 let f = async {
514 static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
515
516 // Obtain Receivers
517 let rcv0 = SOME_SIGNAL.receiver().unwrap();
518 let rcv1 = SOME_SIGNAL.receiver().unwrap();
519
520 SOME_SIGNAL.write(20);
521
522 assert_eq!(rcv0.peek(), 20);
523 assert_eq!(rcv1.peek(), 20);
524 assert_eq!(SOME_SIGNAL.peek(), 20);
525 assert_eq!(SOME_SIGNAL.peek_and(|x| x > &30), None);
526 };
527 block_on(f);
528 }
529}
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
new file mode 100644
index 000000000..7e5e92741
--- /dev/null
+++ b/embassy-sync/src/watch.rs
@@ -0,0 +1,515 @@
1//! A synchronization primitive for passing the latest value to **multiple** tasks.
2
3use core::cell::RefCell;
4use core::future::poll_fn;
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s.
14///
15/// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to
16/// `.await` the latest value, and all receive it.
17///
18/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except
19/// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead
20/// of waiting for the receivers to pop the previous value.
21///
22/// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other
23/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for
24/// [`Receiver`]s to "lose" stale values.
25///
26/// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared
27/// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`]
28/// with [`Watch::receiver`] which has async methods.
29/// ```
30///
31/// use futures_executor::block_on;
32/// use embassy_sync::watch::Watch;
33/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
34///
35/// let f = async {
36///
37/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
38///
39/// // Obtain Receivers
40/// let mut rcv0 = WATCH.receiver().unwrap();
41/// let mut rcv1 = WATCH.receiver().unwrap();
42/// assert!(WATCH.receiver().is_err());
43///
44/// assert_eq!(rcv1.try_changed(), None);
45///
46/// WATCH.write(10);
47/// assert_eq!(WATCH.try_peek(), Some(10));
48///
49///
50/// // Receive the new value
51/// assert_eq!(rcv0.changed().await, 10);
52/// assert_eq!(rcv1.try_changed(), Some(10));
53///
54/// // No update
55/// assert_eq!(rcv0.try_changed(), None);
56/// assert_eq!(rcv1.try_changed(), None);
57///
58/// WATCH.write(20);
59///
60/// // Defference `between` peek `get`.
61/// assert_eq!(rcv0.peek().await, 20);
62/// assert_eq!(rcv1.get().await, 20);
63///
64/// assert_eq!(rcv0.try_changed(), Some(20));
65/// assert_eq!(rcv1.try_changed(), None);
66///
67/// };
68/// block_on(f);
69/// ```
70pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
71 mutex: Mutex<M, RefCell<WatchState<N, T>>>,
72}
73
74struct WatchState<const N: usize, T: Clone> {
75 data: Option<T>,
76 current_id: u64,
77 wakers: MultiWakerRegistration<N>,
78 receiver_count: usize,
79}
80
81/// A trait representing the 'inner' behavior of the `Watch`.
82pub trait WatchBehavior<T: Clone> {
83 /// Poll the `Watch` for the current value, **without** making it as seen.
84 fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>;
85
86 /// Tries to peek the value of the `Watch`, **without** marking it as seen.
87 fn inner_try_peek(&self) -> Option<T>;
88
89 /// Poll the `Watch` for the current value, making it as seen.
90 fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
91
92 /// Tries to get the value of the `Watch`, marking it as seen.
93 fn inner_try_get(&self, id: &mut u64) -> Option<T>;
94
95 /// Poll the `Watch` for a changed value, marking it as seen.
96 fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
97
98 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
99 fn inner_try_changed(&self, id: &mut u64) -> Option<T>;
100
101 /// Checks if the `Watch` is been initialized with a value.
102 fn inner_contains_value(&self) -> bool;
103}
104
105impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
106 fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> {
107 self.mutex.lock(|state| {
108 let mut s = state.borrow_mut();
109 match &s.data {
110 Some(data) => Poll::Ready(data.clone()),
111 None => {
112 s.wakers.register(cx.waker());
113 Poll::Pending
114 }
115 }
116 })
117 }
118
119 fn inner_try_peek(&self) -> Option<T> {
120 self.mutex.lock(|state| state.borrow().data.clone())
121 }
122
123 fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
124 self.mutex.lock(|state| {
125 let mut s = state.borrow_mut();
126 match &s.data {
127 Some(data) => {
128 *id = s.current_id;
129 Poll::Ready(data.clone())
130 }
131 None => {
132 s.wakers.register(cx.waker());
133 Poll::Pending
134 }
135 }
136 })
137 }
138
139 fn inner_try_get(&self, id: &mut u64) -> Option<T> {
140 self.mutex.lock(|state| {
141 let s = state.borrow();
142 *id = s.current_id;
143 state.borrow().data.clone()
144 })
145 }
146
147 fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
148 self.mutex.lock(|state| {
149 let mut s = state.borrow_mut();
150 match (&s.data, s.current_id > *id) {
151 (Some(data), true) => {
152 *id = s.current_id;
153 Poll::Ready(data.clone())
154 }
155 _ => {
156 s.wakers.register(cx.waker());
157 Poll::Pending
158 }
159 }
160 })
161 }
162
163 fn inner_try_changed(&self, id: &mut u64) -> Option<T> {
164 self.mutex.lock(|state| {
165 let s = state.borrow();
166 match s.current_id > *id {
167 true => {
168 *id = s.current_id;
169 state.borrow().data.clone()
170 }
171 false => None,
172 }
173 })
174 }
175
176 fn inner_contains_value(&self) -> bool {
177 self.mutex.lock(|state| state.borrow().data.is_some())
178 }
179}
180
181#[derive(Debug)]
182/// An error that can occur when a `Watch` returns a `Result::Err(_)`.
183pub enum Error {
184 /// The maximum number of [`Receiver`](crate::watch::Receiver)/[`DynReceiver`](crate::watch::DynReceiver) has been reached.
185 MaximumReceiversReached,
186}
187
188impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
189 /// Create a new `Watch` channel.
190 pub const fn new() -> Self {
191 Self {
192 mutex: Mutex::new(RefCell::new(WatchState {
193 data: None,
194 current_id: 0,
195 wakers: MultiWakerRegistration::new(),
196 receiver_count: 0,
197 })),
198 }
199 }
200
201 /// Write a new value to the `Watch`.
202 pub fn write(&self, val: T) {
203 self.mutex.lock(|state| {
204 let mut s = state.borrow_mut();
205 s.data = Some(val);
206 s.current_id += 1;
207 s.wakers.wake();
208 })
209 }
210
211 /// Create a new [`Receiver`] for the `Watch`.
212 pub fn receiver(&self) -> Result<Receiver<'_, M, T, N>, Error> {
213 self.mutex.lock(|state| {
214 let mut s = state.borrow_mut();
215 if s.receiver_count < N {
216 s.receiver_count += 1;
217 Ok(Receiver(Rcv::new(self)))
218 } else {
219 Err(Error::MaximumReceiversReached)
220 }
221 })
222 }
223
224 /// Create a new [`DynReceiver`] for the `Watch`.
225 pub fn dyn_receiver(&self) -> Result<DynReceiver<'_, T>, Error> {
226 self.mutex.lock(|state| {
227 let mut s = state.borrow_mut();
228 if s.receiver_count < N {
229 s.receiver_count += 1;
230 Ok(DynReceiver(Rcv::new(self)))
231 } else {
232 Err(Error::MaximumReceiversReached)
233 }
234 })
235 }
236
237 /// Tries to retrieve the value of the `Watch`.
238 pub fn try_peek(&self) -> Option<T> {
239 self.inner_try_peek()
240 }
241
242 /// Returns true if the `Watch` contains a value.
243 pub fn contains_value(&self) -> bool {
244 self.inner_contains_value()
245 }
246
247 /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending.
248 pub fn clear(&self) {
249 self.mutex.lock(|state| {
250 let mut s = state.borrow_mut();
251 s.data = None;
252 })
253 }
254}
255
256/// A receiver can `.await` a change in the `Watch` value.
257pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
258 watch: &'a W,
259 at_id: u64,
260 _phantom: PhantomData<T>,
261}
262
263impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
264 /// Creates a new `Receiver` with a reference to the `Watch`.
265 fn new(watch: &'a W) -> Self {
266 Self {
267 watch,
268 at_id: 0,
269 _phantom: PhantomData,
270 }
271 }
272
273 /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen.
274 pub async fn peek(&self) -> T {
275 poll_fn(|cx| self.watch.inner_poll_peek(cx)).await
276 }
277
278 /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen.
279 pub fn try_peek(&self) -> Option<T> {
280 self.watch.inner_try_peek()
281 }
282
283 /// Returns the current value of the `Watch` if it is initialized, marking it as seen.
284 pub async fn get(&mut self) -> T {
285 poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await
286 }
287
288 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
289 pub fn try_get(&mut self) -> Option<T> {
290 self.watch.inner_try_get(&mut self.at_id)
291 }
292
293 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
294 pub async fn changed(&mut self) -> T {
295 poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await
296 }
297
298 /// Tries to get the new value of the watch without waiting, marking it as seen.
299 pub fn try_changed(&mut self) -> Option<T> {
300 self.watch.inner_try_changed(&mut self.at_id)
301 }
302
303 /// Checks if the `Watch` contains a value. If this returns true,
304 /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately.
305 pub fn contains_value(&self) -> bool {
306 self.watch.inner_contains_value()
307 }
308}
309
310/// A receiver of a `Watch` channel.
311pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
312
313/// A receiver which holds a **reference** to a `Watch` channel.
314///
315/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
316/// some runtime performance due to dynamic dispatch.
317pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
318
319impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
320 type Target = Rcv<'a, T, Watch<M, T, N>>;
321
322 fn deref(&self) -> &Self::Target {
323 &self.0
324 }
325}
326
327impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
328 fn deref_mut(&mut self) -> &mut Self::Target {
329 &mut self.0
330 }
331}
332
333impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
334 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
335
336 fn deref(&self) -> &Self::Target {
337 &self.0
338 }
339}
340
341impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
342 fn deref_mut(&mut self) -> &mut Self::Target {
343 &mut self.0
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use futures_executor::block_on;
350
351 use super::*;
352 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
353
354 #[test]
355 fn multiple_writes() {
356 let f = async {
357 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
358
359 // Obtain Receivers
360 let mut rcv0 = WATCH.receiver().unwrap();
361 let mut rcv1 = WATCH.dyn_receiver().unwrap();
362
363 WATCH.write(10);
364
365 // Receive the new value
366 assert_eq!(rcv0.changed().await, 10);
367 assert_eq!(rcv1.changed().await, 10);
368
369 // No update
370 assert_eq!(rcv0.try_changed(), None);
371 assert_eq!(rcv1.try_changed(), None);
372
373 WATCH.write(20);
374
375 assert_eq!(rcv0.changed().await, 20);
376 assert_eq!(rcv1.changed().await, 20);
377 };
378 block_on(f);
379 }
380
381 #[test]
382 fn max_receivers() {
383 let f = async {
384 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
385
386 // Obtain Receivers
387 let _ = WATCH.receiver().unwrap();
388 let _ = WATCH.receiver().unwrap();
389 assert!(WATCH.receiver().is_err());
390 };
391 block_on(f);
392 }
393
394 #[test]
395 fn receive_initial() {
396 let f = async {
397 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
398
399 // Obtain Receivers
400 let mut rcv0 = WATCH.receiver().unwrap();
401 let mut rcv1 = WATCH.receiver().unwrap();
402
403 assert_eq!(rcv0.contains_value(), false);
404
405 assert_eq!(rcv0.try_changed(), None);
406 assert_eq!(rcv1.try_changed(), None);
407
408 WATCH.write(0);
409
410 assert_eq!(rcv0.contains_value(), true);
411
412 assert_eq!(rcv0.try_changed(), Some(0));
413 assert_eq!(rcv1.try_changed(), Some(0));
414 };
415 block_on(f);
416 }
417
418 #[test]
419 fn peek_get_changed() {
420 let f = async {
421 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
422
423 // Obtain Receivers
424 let mut rcv0 = WATCH.receiver().unwrap();
425
426 WATCH.write(10);
427
428 // Ensure peek does not mark as seen
429 assert_eq!(rcv0.peek().await, 10);
430 assert_eq!(rcv0.try_changed(), Some(10));
431 assert_eq!(rcv0.try_changed(), None);
432 assert_eq!(rcv0.peek().await, 10);
433
434 WATCH.write(20);
435
436 // Ensure get does mark as seen
437 assert_eq!(rcv0.get().await, 20);
438 assert_eq!(rcv0.try_changed(), None);
439 assert_eq!(rcv0.try_get(), Some(20));
440 };
441 block_on(f);
442 }
443
444 #[test]
445 fn count_ids() {
446 let f = async {
447 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
448
449 // Obtain Receivers
450 let mut rcv0 = WATCH.receiver().unwrap();
451 let mut rcv1 = WATCH.receiver().unwrap();
452
453 let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id);
454
455 WATCH.write(10);
456
457 assert_eq!(rcv0.changed().await, 10);
458 assert_eq!(rcv1.changed().await, 10);
459
460 assert_eq!(rcv0.try_changed(), None);
461 assert_eq!(rcv1.try_changed(), None);
462
463 WATCH.write(20);
464 WATCH.write(20);
465 WATCH.write(20);
466
467 assert_eq!(rcv0.changed().await, 20);
468 assert_eq!(rcv1.changed().await, 20);
469
470 assert_eq!(rcv0.try_changed(), None);
471 assert_eq!(rcv1.try_changed(), None);
472
473 assert_eq!(get_id(), 4);
474 };
475 block_on(f);
476 }
477
478 #[test]
479 fn peek_still_await() {
480 let f = async {
481 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
482
483 // Obtain Receivers
484 let mut rcv0 = WATCH.receiver().unwrap();
485 let mut rcv1 = WATCH.receiver().unwrap();
486
487 WATCH.write(10);
488
489 assert_eq!(rcv0.peek().await, 10);
490 assert_eq!(rcv1.try_peek(), Some(10));
491
492 assert_eq!(rcv0.changed().await, 10);
493 assert_eq!(rcv1.changed().await, 10);
494 };
495 block_on(f);
496 }
497
498 #[test]
499 fn peek_with_static() {
500 let f = async {
501 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
502
503 // Obtain Receivers
504 let rcv0 = WATCH.receiver().unwrap();
505 let rcv1 = WATCH.receiver().unwrap();
506
507 WATCH.write(20);
508
509 assert_eq!(rcv0.peek().await, 20);
510 assert_eq!(rcv1.peek().await, 20);
511 assert_eq!(WATCH.try_peek(), Some(20));
512 };
513 block_on(f);
514 }
515}