aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/watch.rs440
1 files changed, 289 insertions, 151 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
index 298c09d43..1b4a8b589 100644
--- a/embassy-sync/src/watch.rs
+++ b/embassy-sync/src/watch.rs
@@ -20,7 +20,9 @@ use crate::waitqueue::MultiWakerRegistration;
20/// always provided with the latest value. 20/// always provided with the latest value.
21/// 21///
22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] 22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program. 23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
24/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
25/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
24/// ``` 26/// ```
25/// 27///
26/// use futures_executor::block_on; 28/// use futures_executor::block_on;
@@ -41,25 +43,25 @@ use crate::waitqueue::MultiWakerRegistration;
41/// assert_eq!(rcv1.try_changed(), None); 43/// assert_eq!(rcv1.try_changed(), None);
42/// 44///
43/// snd.send(10); 45/// snd.send(10);
44/// 46///
45/// // Receive the new value (async or try) 47/// // Receive the new value (async or try)
46/// assert_eq!(rcv0.changed().await, 10); 48/// assert_eq!(rcv0.changed().await, 10);
47/// assert_eq!(rcv1.try_changed(), Some(10)); 49/// assert_eq!(rcv1.try_changed(), Some(10));
48/// 50///
49/// // No update 51/// // No update
50/// assert_eq!(rcv0.try_changed(), None); 52/// assert_eq!(rcv0.try_changed(), None);
51/// assert_eq!(rcv1.try_changed(), None); 53/// assert_eq!(rcv1.try_changed(), None);
52/// 54///
53/// snd.send(20); 55/// snd.send(20);
54/// 56///
55/// // Peek does not mark the value as seen 57/// // Using `get` marks the value as seen
56/// assert_eq!(rcv0.peek().await, 20);
57/// assert_eq!(rcv0.try_changed(), Some(20));
58///
59/// // Get marks the value as seen
60/// assert_eq!(rcv1.get().await, 20); 58/// assert_eq!(rcv1.get().await, 20);
61/// assert_eq!(rcv1.try_changed(), None); 59/// assert_eq!(rcv1.try_changed(), None);
62/// 60///
61/// // But `get` also returns when unchanged
62/// assert_eq!(rcv1.get().await, 20);
63/// assert_eq!(rcv1.get().await, 20);
64///
63/// }; 65/// };
64/// block_on(f); 66/// block_on(f);
65/// ``` 67/// ```
@@ -82,24 +84,11 @@ pub trait WatchBehavior<T: Clone> {
82 /// Clears the value of the `Watch`. 84 /// Clears the value of the `Watch`.
83 fn clear(&self); 85 fn clear(&self);
84 86
85 /// Poll the `Watch` for the current value, **without** making it as seen.
86 fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>;
87
88 /// Tries to peek the value of the `Watch`, **without** marking it as seen.
89 fn try_peek(&self) -> Option<T>;
90
91 /// Poll the `Watch` for the value if it matches the predicate function
92 /// `f`, **without** making it as seen.
93 fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
94
95 /// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen.
96 fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
97
98 /// Poll the `Watch` for the current value, making it as seen. 87 /// Poll the `Watch` for the current value, making it as seen.
99 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; 88 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
100 89
101 /// Tries to get the value of the `Watch`, marking it as seen. 90 /// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
102 fn try_get(&self, id: &mut u64) -> Option<T>; 91 fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
103 92
104 /// Poll the `Watch` for the value if it matches the predicate function 93 /// Poll the `Watch` for the value if it matches the predicate function
105 /// `f`, making it as seen. 94 /// `f`, making it as seen.
@@ -107,9 +96,9 @@ pub trait WatchBehavior<T: Clone> {
107 96
108 /// Tries to get the value of the `Watch` if it matches the predicate function 97 /// Tries to get the value of the `Watch` if it matches the predicate function
109 /// `f`, marking it as seen. 98 /// `f`, marking it as seen.
110 fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; 99 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
111 100
112 /// Poll the `Watch` for a changed value, marking it as seen. 101 /// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
113 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; 102 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
114 103
115 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. 104 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
@@ -128,7 +117,11 @@ pub trait WatchBehavior<T: Clone> {
128 117
129 /// Modify the value of the `Watch` using a closure. Returns `false` if the 118 /// Modify the value of the `Watch` using a closure. Returns `false` if the
130 /// `Watch` does not already contain a value. 119 /// `Watch` does not already contain a value.
131 fn modify(&self, f: &mut dyn Fn(&mut Option<T>)); 120 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
121
122 /// Modify the value of the `Watch` using a closure. Returns `false` if the
123 /// `Watch` does not already contain a value.
124 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
132 125
133 /// Used when a receiver is dropped to decrement the receiver count. 126 /// Used when a receiver is dropped to decrement the receiver count.
134 /// 127 ///
@@ -153,46 +146,6 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
153 }) 146 })
154 } 147 }
155 148
156 fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> {
157 self.mutex.lock(|state| {
158 let mut s = state.borrow_mut();
159 match &s.data {
160 Some(data) => Poll::Ready(data.clone()),
161 None => {
162 s.wakers.register(cx.waker());
163 Poll::Pending
164 }
165 }
166 })
167 }
168
169 fn try_peek(&self) -> Option<T> {
170 self.mutex.lock(|state| state.borrow().data.clone())
171 }
172
173 fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
174 self.mutex.lock(|state| {
175 let mut s = state.borrow_mut();
176 match s.data {
177 Some(ref data) if f(data) => Poll::Ready(data.clone()),
178 _ => {
179 s.wakers.register(cx.waker());
180 Poll::Pending
181 }
182 }
183 })
184 }
185
186 fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
187 self.mutex.lock(|state| {
188 let s = state.borrow();
189 match s.data {
190 Some(ref data) if f(data) => Some(data.clone()),
191 _ => None,
192 }
193 })
194 }
195
196 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { 149 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
197 self.mutex.lock(|state| { 150 self.mutex.lock(|state| {
198 let mut s = state.borrow_mut(); 151 let mut s = state.borrow_mut();
@@ -209,11 +162,13 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
209 }) 162 })
210 } 163 }
211 164
212 fn try_get(&self, id: &mut u64) -> Option<T> { 165 fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
213 self.mutex.lock(|state| { 166 self.mutex.lock(|state| {
214 let s = state.borrow(); 167 let s = state.borrow();
215 *id = s.current_id; 168 if let Some(id) = id {
216 state.borrow().data.clone() 169 *id = s.current_id;
170 }
171 s.data.clone()
217 }) 172 })
218 } 173 }
219 174
@@ -233,12 +188,14 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
233 }) 188 })
234 } 189 }
235 190
236 fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { 191 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
237 self.mutex.lock(|state| { 192 self.mutex.lock(|state| {
238 let s = state.borrow(); 193 let s = state.borrow();
239 match s.data { 194 match s.data {
240 Some(ref data) if f(data) => { 195 Some(ref data) if f(data) => {
241 *id = s.current_id; 196 if let Some(id) = id {
197 *id = s.current_id;
198 }
242 Some(data.clone()) 199 Some(data.clone())
243 } 200 }
244 _ => None, 201 _ => None,
@@ -315,7 +272,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
315 }) 272 })
316 } 273 }
317 274
318 fn modify(&self, f: &mut dyn Fn(&mut Option<T>)) { 275 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
319 self.mutex.lock(|state| { 276 self.mutex.lock(|state| {
320 let mut s = state.borrow_mut(); 277 let mut s = state.borrow_mut();
321 f(&mut s.data); 278 f(&mut s.data);
@@ -323,6 +280,16 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
323 s.wakers.wake(); 280 s.wakers.wake();
324 }) 281 })
325 } 282 }
283
284 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
285 self.mutex.lock(|state| {
286 let mut s = state.borrow_mut();
287 if f(&mut s.data) {
288 s.current_id += 1;
289 s.wakers.wake();
290 }
291 })
292 }
326} 293}
327 294
328impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { 295impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
@@ -375,6 +342,60 @@ impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
375 } 342 }
376 }) 343 })
377 } 344 }
345
346 /// Try to create a new [`AnonReceiver`] for the `Watch`.
347 pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
348 AnonReceiver(AnonRcv::new(self, 0))
349 }
350
351 /// Try to create a new [`DynAnonReceiver`] for the `Watch`.
352 pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
353 DynAnonReceiver(AnonRcv::new(self, 0))
354 }
355
356 /// Returns the message ID of the latest message sent to the `Watch`.
357 ///
358 /// This counter is monotonic, and is incremented every time a new message is sent.
359 pub fn get_msg_id(&self) -> u64 {
360 self.mutex.lock(|state| state.borrow().current_id)
361 }
362
363 /// Waits for the `Watch` to be initialized with a value using a busy-wait mechanism.
364 ///
365 /// This is useful for initialization code where receivers may only be interested in
366 /// awaiting the value once in the lifetime of the program. It is therefore a temporaryily
367 /// CPU-inefficient operation, while being more memory efficient than using a `Receiver`.
368 ///
369 /// **Note** Be careful about using this within an InterruptExecutor, as it will starve
370 /// tasks in lower-priority executors.
371 pub async fn spin_get(&self) -> T {
372 poll_fn(|cx| {
373 self.mutex.lock(|state| {
374 let s = state.borrow();
375 match &s.data {
376 Some(data) => Poll::Ready(data.clone()),
377 None => {
378 cx.waker().wake_by_ref();
379 Poll::Pending
380 }
381 }
382 })
383 })
384 .await
385 }
386
387 /// Tries to get the value of the `Watch`.
388 pub fn try_get(&self) -> Option<T> {
389 WatchBehavior::try_get(self, None)
390 }
391
392 /// Tries to get the value of the `Watch` if it matches the predicate function `f`.
393 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
394 where
395 F: Fn(&T) -> bool,
396 {
397 WatchBehavior::try_get_and(self, None, &mut f)
398 }
378} 399}
379 400
380/// A receiver can `.await` a change in the `Watch` value. 401/// A receiver can `.await` a change in the `Watch` value.
@@ -407,23 +428,23 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
407 } 428 }
408 429
409 /// Clears the value of the `Watch`. 430 /// Clears the value of the `Watch`.
410 /// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. 431 /// This will cause calls to [`Rcv::get`] to be pending.
411 pub fn clear(&self) { 432 pub fn clear(&self) {
412 self.watch.clear() 433 self.watch.clear()
413 } 434 }
414 435
415 /// Tries to retrieve the value of the `Watch`. 436 /// Tries to retrieve the value of the `Watch`.
416 pub fn try_peek(&self) -> Option<T> { 437 pub fn try_get(&self) -> Option<T> {
417 self.watch.try_peek() 438 self.watch.try_get(None)
418 } 439 }
419 440
420 /// Tries to peek the current value of the `Watch` if it matches the predicate 441 /// Tries to peek the current value of the `Watch` if it matches the predicate
421 /// function `f`. 442 /// function `f`.
422 pub fn try_peek_and<F>(&self, mut f: F) -> Option<T> 443 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
423 where 444 where
424 F: Fn(&T) -> bool, 445 F: Fn(&T) -> bool,
425 { 446 {
426 self.watch.try_peek_and(&mut f) 447 self.watch.try_get_and(None, &mut f)
427 } 448 }
428 449
429 /// Returns true if the `Watch` contains a value. 450 /// Returns true if the `Watch` contains a value.
@@ -432,11 +453,20 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
432 } 453 }
433 454
434 /// Modify the value of the `Watch` using a closure. 455 /// Modify the value of the `Watch` using a closure.
435 pub fn modify<F>(&self, mut f: F) 456 pub fn send_modify<F>(&self, mut f: F)
436 where 457 where
437 F: Fn(&mut Option<T>), 458 F: Fn(&mut Option<T>),
438 { 459 {
439 self.watch.modify(&mut f) 460 self.watch.send_modify(&mut f)
461 }
462
463 /// Modify the value of the `Watch` using a closure. The closure must return
464 /// `true` if the value was modified, which notifies all receivers.
465 pub fn send_if_modified<F>(&self, mut f: F)
466 where
467 F: Fn(&mut Option<T>) -> bool,
468 {
469 self.watch.send_if_modified(&mut f)
440 } 470 }
441} 471}
442 472
@@ -521,38 +551,6 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
521 } 551 }
522 } 552 }
523 553
524 /// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen.
525 ///
526 /// **Note**: Futures do nothing unless you `.await` or poll them.
527 pub async fn peek(&self) -> T {
528 poll_fn(|cx| self.watch.poll_peek(cx)).await
529 }
530
531 /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen.
532 pub fn try_peek(&self) -> Option<T> {
533 self.watch.try_peek()
534 }
535
536 /// Returns the current value of the `Watch` if it matches the predicate function `f`,
537 /// or waits for it to match, **without** marking it as seen.
538 ///
539 /// **Note**: Futures do nothing unless you `.await` or poll them.
540 pub async fn peek_and<F>(&self, mut f: F) -> T
541 where
542 F: Fn(&T) -> bool,
543 {
544 poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await
545 }
546
547 /// Tries to peek the current value of the `Watch` if it matches the predicate
548 /// function `f` without waiting, and **without** marking it as seen.
549 pub fn try_peek_and<F>(&self, mut f: F) -> Option<T>
550 where
551 F: Fn(&T) -> bool,
552 {
553 self.watch.try_peek_and(&mut f)
554 }
555
556 /// Returns the current value of the `Watch` once it is initialized, marking it as seen. 554 /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
557 /// 555 ///
558 /// **Note**: Futures do nothing unless you `.await` or poll them. 556 /// **Note**: Futures do nothing unless you `.await` or poll them.
@@ -562,7 +560,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
562 560
563 /// Tries to get the current value of the `Watch` without waiting, marking it as seen. 561 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
564 pub fn try_get(&mut self) -> Option<T> { 562 pub fn try_get(&mut self) -> Option<T> {
565 self.watch.try_get(&mut self.at_id) 563 self.watch.try_get(Some(&mut self.at_id))
566 } 564 }
567 565
568 /// Returns the value of the `Watch` if it matches the predicate function `f`, 566 /// Returns the value of the `Watch` if it matches the predicate function `f`,
@@ -582,7 +580,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
582 where 580 where
583 F: Fn(&T) -> bool, 581 F: Fn(&T) -> bool,
584 { 582 {
585 self.watch.try_get_and(&mut self.at_id, &mut f) 583 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
586 } 584 }
587 585
588 /// Waits for the `Watch` to change and returns the new value, marking it as seen. 586 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
@@ -618,7 +616,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
618 } 616 }
619 617
620 /// Checks if the `Watch` contains a value. If this returns true, 618 /// Checks if the `Watch` contains a value. If this returns true,
621 /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. 619 /// then awaiting [`Rcv::get`] will return immediately.
622 pub fn contains_value(&self) -> bool { 620 pub fn contains_value(&self) -> bool {
623 self.watch.contains_value() 621 self.watch.contains_value()
624 } 622 }
@@ -630,6 +628,58 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
630 } 628 }
631} 629}
632 630
631/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
632pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
633 watch: &'a W,
634 at_id: u64,
635 _phantom: PhantomData<T>,
636}
637
638impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
639 /// Creates a new `Receiver` with a reference to the `Watch`.
640 fn new(watch: &'a W, at_id: u64) -> Self {
641 Self {
642 watch,
643 at_id,
644 _phantom: PhantomData,
645 }
646 }
647
648 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
649 pub fn try_get(&mut self) -> Option<T> {
650 self.watch.try_get(Some(&mut self.at_id))
651 }
652
653 /// Tries to get the current value of the `Watch` if it matches the predicate
654 /// function `f` without waiting, marking it as seen.
655 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
656 where
657 F: Fn(&T) -> bool,
658 {
659 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
660 }
661
662 /// Tries to get the new value of the watch without waiting, marking it as seen.
663 pub fn try_changed(&mut self) -> Option<T> {
664 self.watch.try_changed(&mut self.at_id)
665 }
666
667 /// Tries to get the new value of the watch which satisfies the predicate
668 /// function `f` and returns the new value without waiting, marking it as seen.
669 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
670 where
671 F: Fn(&T) -> bool,
672 {
673 self.watch.try_changed_and(&mut self.at_id, &mut f)
674 }
675
676 /// Checks if the `Watch` contains a value. If this returns true,
677 /// then awaiting [`Rcv::get`] will return immediately.
678 pub fn contains_value(&self) -> bool {
679 self.watch.contains_value()
680 }
681}
682
633/// A receiver of a `Watch` channel. 683/// A receiver of a `Watch` channel.
634pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); 684pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
635 685
@@ -682,6 +732,58 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
682 } 732 }
683} 733}
684 734
735/// A receiver of a `Watch` channel that cannot `.await` values.
736pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
737
738impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
739 /// Converts the `Receiver` into a [`DynReceiver`].
740 pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
741 let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
742 core::mem::forget(self); // Ensures the destructor is not called
743 rcv
744 }
745}
746
747impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
748 fn into(self) -> DynAnonReceiver<'a, T> {
749 self.as_dyn()
750 }
751}
752
753impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
754 type Target = AnonRcv<'a, T, Watch<M, T, N>>;
755
756 fn deref(&self) -> &Self::Target {
757 &self.0
758 }
759}
760
761impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
762 fn deref_mut(&mut self) -> &mut Self::Target {
763 &mut self.0
764 }
765}
766
767/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
768///
769/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
770/// some runtime performance due to dynamic dispatch.
771pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
772
773impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
774 type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
775
776 fn deref(&self) -> &Self::Target {
777 &self.0
778 }
779}
780
781impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
782 fn deref_mut(&mut self) -> &mut Self::Target {
783 &mut self.0
784 }
785}
786
685#[cfg(test)] 787#[cfg(test)]
686mod tests { 788mod tests {
687 use futures_executor::block_on; 789 use futures_executor::block_on;
@@ -716,6 +818,72 @@ mod tests {
716 } 818 }
717 819
718 #[test] 820 #[test]
821 fn all_try_get() {
822 let f = async {
823 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
824
825 // Obtain receiver and sender
826 let mut rcv = WATCH.receiver().unwrap();
827 let snd = WATCH.sender();
828
829 // Not initialized
830 assert_eq!(WATCH.try_get(), None);
831 assert_eq!(rcv.try_get(), None);
832 assert_eq!(snd.try_get(), None);
833
834 // Receive the new value
835 snd.send(10);
836 assert_eq!(WATCH.try_get(), Some(10));
837 assert_eq!(rcv.try_get(), Some(10));
838 assert_eq!(snd.try_get(), Some(10));
839
840 assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
841 assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
842 assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
843
844 assert_eq!(WATCH.try_get_and(|x| x < &5), None);
845 assert_eq!(rcv.try_get_and(|x| x < &5), None);
846 assert_eq!(snd.try_get_and(|x| x < &5), None);
847 };
848 block_on(f);
849 }
850
851 #[test]
852 fn once_lock_like() {
853 let f = async {
854 static CONFIG0: u8 = 10;
855 static CONFIG1: u8 = 20;
856
857 static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
858
859 // Obtain receiver and sender
860 let mut rcv = WATCH.receiver().unwrap();
861 let snd = WATCH.sender();
862
863 // Not initialized
864 assert_eq!(rcv.try_changed(), None);
865
866 // Receive the new value
867 snd.send(&CONFIG0);
868 let rcv0 = rcv.changed().await;
869 assert_eq!(rcv0, &10);
870
871 // Receive another value
872 snd.send(&CONFIG1);
873 let rcv1 = rcv.try_changed();
874 assert_eq!(rcv1, Some(&20));
875
876 // No update
877 assert_eq!(rcv.try_changed(), None);
878
879 // Ensure similarity with original static
880 assert_eq!(rcv0, &CONFIG0);
881 assert_eq!(rcv1, Some(&CONFIG1));
882 };
883 block_on(f);
884 }
885
886 #[test]
719 fn sender_modify() { 887 fn sender_modify() {
720 let f = async { 888 let f = async {
721 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); 889 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
@@ -729,7 +897,7 @@ mod tests {
729 assert_eq!(rcv.try_changed(), Some(10)); 897 assert_eq!(rcv.try_changed(), Some(10));
730 898
731 // Modify the value inplace 899 // Modify the value inplace
732 snd.modify(|opt| { 900 snd.send_modify(|opt| {
733 if let Some(inner) = opt { 901 if let Some(inner) = opt {
734 *inner += 5; 902 *inner += 5;
735 } 903 }
@@ -751,11 +919,6 @@ mod tests {
751 let mut rcv = WATCH.receiver().unwrap(); 919 let mut rcv = WATCH.receiver().unwrap();
752 let snd = WATCH.sender(); 920 let snd = WATCH.sender();
753 921
754 snd.send(10);
755 assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10));
756 assert_eq!(rcv.try_peek_and(|x| x < &5), None);
757 assert!(rcv.try_changed().is_some());
758
759 snd.send(15); 922 snd.send(15);
760 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); 923 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
761 assert_eq!(rcv.try_get_and(|x| x < &5), None); 924 assert_eq!(rcv.try_get_and(|x| x < &5), None);
@@ -771,7 +934,6 @@ mod tests {
771 934
772 snd.send(30); 935 snd.send(30);
773 assert_eq!(rcv.changed_and(|x| x > &5).await, 30); 936 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
774 assert_eq!(rcv.peek_and(|x| x > &5).await, 30);
775 assert_eq!(rcv.get_and(|x| x > &5).await, 30); 937 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
776 }; 938 };
777 block_on(f); 939 block_on(f);
@@ -825,7 +987,7 @@ mod tests {
825 987
826 // Obtain receivers and sender 988 // Obtain receivers and sender
827 let mut rcv0 = WATCH.receiver().unwrap(); 989 let mut rcv0 = WATCH.receiver().unwrap();
828 let mut rcv1 = WATCH.receiver().unwrap(); 990 let mut rcv1 = WATCH.anon_receiver();
829 let snd = WATCH.sender(); 991 let snd = WATCH.sender();
830 992
831 // No update for both 993 // No update for both
@@ -865,40 +1027,12 @@ mod tests {
865 } 1027 }
866 1028
867 #[test] 1029 #[test]
868 fn peek_get_changed() {
869 let f = async {
870 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
871
872 // Obtain receiver and sender
873 let mut rcv = WATCH.receiver().unwrap();
874 let snd = WATCH.sender();
875
876 // Send a value
877 snd.send(10);
878
879 // Ensure peek does not mark as seen
880 assert_eq!(rcv.peek().await, 10);
881 assert_eq!(rcv.try_changed(), Some(10));
882 assert_eq!(rcv.try_changed(), None);
883 assert_eq!(rcv.try_peek(), Some(10));
884
885 // Send a value
886 snd.send(20);
887
888 // Ensure get does mark as seen
889 assert_eq!(rcv.get().await, 20);
890 assert_eq!(rcv.try_changed(), None);
891 assert_eq!(rcv.try_get(), Some(20));
892 };
893 block_on(f);
894 }
895
896 #[test]
897 fn use_dynamics() { 1030 fn use_dynamics() {
898 let f = async { 1031 let f = async {
899 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 1032 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
900 1033
901 // Obtain receiver and sender 1034 // Obtain receiver and sender
1035 let mut anon_rcv = WATCH.dyn_anon_receiver();
902 let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); 1036 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
903 let dyn_snd = WATCH.dyn_sender(); 1037 let dyn_snd = WATCH.dyn_sender();
904 1038
@@ -906,6 +1040,7 @@ mod tests {
906 dyn_snd.send(10); 1040 dyn_snd.send(10);
907 1041
908 // Ensure the dynamic receiver receives the value 1042 // Ensure the dynamic receiver receives the value
1043 assert_eq!(anon_rcv.try_changed(), Some(10));
909 assert_eq!(dyn_rcv.try_changed(), Some(10)); 1044 assert_eq!(dyn_rcv.try_changed(), Some(10));
910 assert_eq!(dyn_rcv.try_changed(), None); 1045 assert_eq!(dyn_rcv.try_changed(), None);
911 }; 1046 };
@@ -918,10 +1053,12 @@ mod tests {
918 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 1053 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
919 1054
920 // Obtain receiver and sender 1055 // Obtain receiver and sender
1056 let anon_rcv = WATCH.anon_receiver();
921 let rcv = WATCH.receiver().unwrap(); 1057 let rcv = WATCH.receiver().unwrap();
922 let snd = WATCH.sender(); 1058 let snd = WATCH.sender();
923 1059
924 // Convert to dynamic 1060 // Convert to dynamic
1061 let mut dyn_anon_rcv = anon_rcv.as_dyn();
925 let mut dyn_rcv = rcv.as_dyn(); 1062 let mut dyn_rcv = rcv.as_dyn();
926 let dyn_snd = snd.as_dyn(); 1063 let dyn_snd = snd.as_dyn();
927 1064
@@ -929,6 +1066,7 @@ mod tests {
929 dyn_snd.send(10); 1066 dyn_snd.send(10);
930 1067
931 // Ensure the dynamic receiver receives the value 1068 // Ensure the dynamic receiver receives the value
1069 assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
932 assert_eq!(dyn_rcv.try_changed(), Some(10)); 1070 assert_eq!(dyn_rcv.try_changed(), Some(10));
933 assert_eq!(dyn_rcv.try_changed(), None); 1071 assert_eq!(dyn_rcv.try_changed(), None);
934 }; 1072 };