aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorPeter Krull <[email protected]>2024-03-02 00:14:11 +0100
committerPeter Krull <[email protected]>2024-03-02 00:14:11 +0100
commit311ab07a9af0029060813779038220481d1bf1c5 (patch)
tree6658f1db44bfda4ec0e5dc61f2220ba40e1d804a /embassy-sync
parentdf282aa23d00b2f3116081be2b07ba0c9f810fc3 (diff)
Reintroduce predicate methods. Add ability for sender to modify value in-place.
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/src/watch.rs267
1 files changed, 260 insertions, 7 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
index 01d82def4..520696f7d 100644
--- a/embassy-sync/src/watch.rs
+++ b/embassy-sync/src/watch.rs
@@ -88,21 +88,48 @@ pub trait WatchBehavior<T: Clone> {
88 /// Tries to peek the value of the `Watch`, **without** marking it as seen. 88 /// Tries to peek the value of the `Watch`, **without** marking it as seen.
89 fn try_peek(&self) -> Option<T>; 89 fn try_peek(&self) -> Option<T>;
90 90
91 /// Poll the `Watch` for the value if it matches the predicate function
92 /// `f`, **without** making it as seen.
93 fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
94
95 /// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen.
96 fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
97
91 /// Poll the `Watch` for the current value, making it as seen. 98 /// Poll the `Watch` for the current value, making it as seen.
92 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; 99 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
93 100
94 /// Tries to get the value of the `Watch`, marking it as seen. 101 /// Tries to get the value of the `Watch`, marking it as seen.
95 fn try_get(&self, id: &mut u64) -> Option<T>; 102 fn try_get(&self, id: &mut u64) -> Option<T>;
96 103
104 /// Poll the `Watch` for the value if it matches the predicate function
105 /// `f`, making it as seen.
106 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
107
108 /// Tries to get the value of the `Watch` if it matches the predicate function
109 /// `f`, marking it as seen.
110 fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
111
97 /// Poll the `Watch` for a changed value, marking it as seen. 112 /// Poll the `Watch` for a changed value, marking it as seen.
98 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; 113 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
99 114
100 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. 115 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
101 fn try_changed(&self, id: &mut u64) -> Option<T>; 116 fn try_changed(&self, id: &mut u64) -> Option<T>;
102 117
118 /// Poll the `Watch` for a changed value that matches the predicate function
119 /// `f`, marking it as seen.
120 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
121
122 /// Tries to retrieve the value of the `Watch` if it has changed and matches the
123 /// predicate function `f`, marking it as seen.
124 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
125
103 /// Checks if the `Watch` is been initialized with a value. 126 /// Checks if the `Watch` is been initialized with a value.
104 fn contains_value(&self) -> bool; 127 fn contains_value(&self) -> bool;
105 128
129 /// Modify the value of the `Watch` using a closure. Returns `false` if the
130 /// `Watch` does not already contain a value.
131 fn modify(&self, f: &mut dyn Fn(&mut Option<T>));
132
106 /// Used when a receiver is dropped to decrement the receiver count. 133 /// Used when a receiver is dropped to decrement the receiver count.
107 /// 134 ///
108 /// ## This method should not be called by the user. 135 /// ## This method should not be called by the user.
@@ -143,6 +170,29 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
143 self.mutex.lock(|state| state.borrow().data.clone()) 170 self.mutex.lock(|state| state.borrow().data.clone())
144 } 171 }
145 172
173 fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
174 self.mutex.lock(|state| {
175 let mut s = state.borrow_mut();
176 match s.data {
177 Some(ref data) if f(data) => Poll::Ready(data.clone()),
178 _ => {
179 s.wakers.register(cx.waker());
180 Poll::Pending
181 }
182 }
183 })
184 }
185
186 fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
187 self.mutex.lock(|state| {
188 let s = state.borrow();
189 match s.data {
190 Some(ref data) if f(data) => Some(data.clone()),
191 _ => None,
192 }
193 })
194 }
195
146 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { 196 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
147 self.mutex.lock(|state| { 197 self.mutex.lock(|state| {
148 let mut s = state.borrow_mut(); 198 let mut s = state.borrow_mut();
@@ -167,6 +217,35 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
167 }) 217 })
168 } 218 }
169 219
220 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
221 self.mutex.lock(|state| {
222 let mut s = state.borrow_mut();
223 match s.data {
224 Some(ref data) if f(data) => {
225 *id = s.current_id;
226 Poll::Ready(data.clone())
227 }
228 _ => {
229 s.wakers.register(cx.waker());
230 Poll::Pending
231 }
232 }
233 })
234 }
235
236 fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
237 self.mutex.lock(|state| {
238 let s = state.borrow();
239 match s.data {
240 Some(ref data) if f(data) => {
241 *id = s.current_id;
242 Some(data.clone())
243 }
244 _ => None,
245 }
246 })
247 }
248
170 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { 249 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
171 self.mutex.lock(|state| { 250 self.mutex.lock(|state| {
172 let mut s = state.borrow_mut(); 251 let mut s = state.borrow_mut();
@@ -189,13 +268,42 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
189 match s.current_id > *id { 268 match s.current_id > *id {
190 true => { 269 true => {
191 *id = s.current_id; 270 *id = s.current_id;
192 state.borrow().data.clone() 271 s.data.clone()
193 } 272 }
194 false => None, 273 false => None,
195 } 274 }
196 }) 275 })
197 } 276 }
198 277
278 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
279 self.mutex.lock(|state| {
280 let mut s = state.borrow_mut();
281 match (&s.data, s.current_id > *id) {
282 (Some(data), true) if f(data) => {
283 *id = s.current_id;
284 Poll::Ready(data.clone())
285 }
286 _ => {
287 s.wakers.register(cx.waker());
288 Poll::Pending
289 }
290 }
291 })
292 }
293
294 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
295 self.mutex.lock(|state| {
296 let s = state.borrow();
297 match (&s.data, s.current_id > *id) {
298 (Some(data), true) if f(data) => {
299 *id = s.current_id;
300 s.data.clone()
301 }
302 _ => None,
303 }
304 })
305 }
306
199 fn contains_value(&self) -> bool { 307 fn contains_value(&self) -> bool {
200 self.mutex.lock(|state| state.borrow().data.is_some()) 308 self.mutex.lock(|state| state.borrow().data.is_some())
201 } 309 }
@@ -206,6 +314,15 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
206 s.receiver_count -= 1; 314 s.receiver_count -= 1;
207 }) 315 })
208 } 316 }
317
318 fn modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
319 self.mutex.lock(|state| {
320 let mut s = state.borrow_mut();
321 f(&mut s.data);
322 s.current_id += 1;
323 s.wakers.wake();
324 })
325 }
209} 326}
210 327
211impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { 328impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
@@ -300,10 +417,27 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
300 self.watch.try_peek() 417 self.watch.try_peek()
301 } 418 }
302 419
420 /// Tries to peek the current value of the `Watch` if it matches the predicate
421 /// function `f`.
422 pub fn try_peek_and<F>(&self, mut f: F) -> Option<T>
423 where
424 F: Fn(&T) -> bool,
425 {
426 self.watch.try_peek_and(&mut f)
427 }
428
303 /// Returns true if the `Watch` contains a value. 429 /// Returns true if the `Watch` contains a value.
304 pub fn contains_value(&self) -> bool { 430 pub fn contains_value(&self) -> bool {
305 self.watch.contains_value() 431 self.watch.contains_value()
306 } 432 }
433
434 /// Modify the value of the `Watch` using a closure.
435 pub fn modify<F>(&self, mut f: F)
436 where
437 F: Fn(&mut Option<T>),
438 {
439 self.watch.modify(&mut f)
440 }
307} 441}
308 442
309/// A sender of a `Watch` channel. 443/// A sender of a `Watch` channel.
@@ -399,6 +533,26 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
399 self.watch.try_peek() 533 self.watch.try_peek()
400 } 534 }
401 535
536 /// Returns the current value of the `Watch` if it matches the predicate function `f`,
537 /// or waits for it to match, **without** marking it as seen.
538 ///
539 /// **Note**: Futures do nothing unless you `.await` or poll them.
540 pub async fn peek_and<F>(&self, mut f: F) -> T
541 where
542 F: Fn(&T) -> bool,
543 {
544 poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await
545 }
546
547 /// Tries to peek the current value of the `Watch` if it matches the predicate
548 /// function `f` without waiting, and **without** marking it as seen.
549 pub fn try_peek_and<F>(&self, mut f: F) -> Option<T>
550 where
551 F: Fn(&T) -> bool,
552 {
553 self.watch.try_peek_and(&mut f)
554 }
555
402 /// Returns the current value of the `Watch` once it is initialized, marking it as seen. 556 /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
403 /// 557 ///
404 /// **Note**: Futures do nothing unless you `.await` or poll them. 558 /// **Note**: Futures do nothing unless you `.await` or poll them.
@@ -411,6 +565,26 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
411 self.watch.try_get(&mut self.at_id) 565 self.watch.try_get(&mut self.at_id)
412 } 566 }
413 567
568 /// Returns the value of the `Watch` if it matches the predicate function `f`,
569 /// or waits for it to match, marking it as seen.
570 ///
571 /// **Note**: Futures do nothing unless you `.await` or poll them.
572 pub async fn get_and<F>(&mut self, mut f: F) -> T
573 where
574 F: Fn(&T) -> bool,
575 {
576 poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
577 }
578
579 /// Tries to get the current value of the `Watch` if it matches the predicate
580 /// function `f` without waiting, marking it as seen.
581 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
582 where
583 F: Fn(&T) -> bool,
584 {
585 self.watch.try_get_and(&mut self.at_id, &mut f)
586 }
587
414 /// Waits for the `Watch` to change and returns the new value, marking it as seen. 588 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
415 /// 589 ///
416 /// **Note**: Futures do nothing unless you `.await` or poll them. 590 /// **Note**: Futures do nothing unless you `.await` or poll them.
@@ -423,6 +597,26 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
423 self.watch.try_changed(&mut self.at_id) 597 self.watch.try_changed(&mut self.at_id)
424 } 598 }
425 599
600 /// Waits for the `Watch` to change to a value which satisfies the predicate
601 /// function `f` and returns the new value, marking it as seen.
602 ///
603 /// **Note**: Futures do nothing unless you `.await` or poll them.
604 pub async fn changed_and<F>(&mut self, mut f: F) -> T
605 where
606 F: Fn(&T) -> bool,
607 {
608 poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
609 }
610
611 /// Tries to get the new value of the watch which satisfies the predicate
612 /// function `f` and returns the new value without waiting, marking it as seen.
613 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
614 where
615 F: Fn(&T) -> bool,
616 {
617 self.watch.try_changed_and(&mut self.at_id, &mut f)
618 }
619
426 /// Checks if the `Watch` contains a value. If this returns true, 620 /// Checks if the `Watch` contains a value. If this returns true,
427 /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. 621 /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately.
428 pub fn contains_value(&self) -> bool { 622 pub fn contains_value(&self) -> bool {
@@ -442,12 +636,9 @@ pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<
442impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { 636impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
443 /// Converts the `Receiver` into a [`DynReceiver`]. 637 /// Converts the `Receiver` into a [`DynReceiver`].
444 pub fn as_dyn(self) -> DynReceiver<'a, T> { 638 pub fn as_dyn(self) -> DynReceiver<'a, T> {
445 // We need to increment the receiver count since the original 639 let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
446 // receiver is being dropped, which decrements the count. 640 core::mem::forget(self); // Ensures the destructor is not called
447 self.watch.mutex.lock(|state| { 641 rcv
448 state.borrow_mut().receiver_count += 1;
449 });
450 DynReceiver(Rcv::new(self.0.watch, self.at_id))
451 } 642 }
452} 643}
453 644
@@ -525,6 +716,68 @@ mod tests {
525 } 716 }
526 717
527 #[test] 718 #[test]
719 fn sender_modify() {
720 let f = async {
721 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
722
723 // Obtain receiver and sender
724 let mut rcv = WATCH.receiver().unwrap();
725 let snd = WATCH.sender();
726
727 // Receive the new value
728 snd.send(10);
729 assert_eq!(rcv.try_changed(), Some(10));
730
731 // Modify the value inplace
732 snd.modify(|opt|{
733 if let Some(inner) = opt {
734 *inner += 5;
735 }
736 });
737
738 // Get the modified value
739 assert_eq!(rcv.try_changed(), Some(15));
740 assert_eq!(rcv.try_changed(), None);
741 };
742 block_on(f);
743 }
744
745 #[test]
746 fn predicate_fn() {
747 let f = async {
748 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
749
750 // Obtain receiver and sender
751 let mut rcv = WATCH.receiver().unwrap();
752 let snd = WATCH.sender();
753
754 snd.send(10);
755 assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10));
756 assert_eq!(rcv.try_peek_and(|x| x < &5), None);
757 assert!(rcv.try_changed().is_some());
758
759 snd.send(15);
760 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
761 assert_eq!(rcv.try_get_and(|x| x < &5), None);
762 assert!(rcv.try_changed().is_none());
763
764 snd.send(20);
765 assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
766 assert_eq!(rcv.try_changed_and(|x| x > &5), None);
767
768 snd.send(25);
769 assert_eq!(rcv.try_changed_and(|x| x < &5), None);
770 assert_eq!(rcv.try_changed(), Some(25));
771
772 snd.send(30);
773 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
774 assert_eq!(rcv.peek_and(|x| x > &5).await, 30);
775 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
776 };
777 block_on(f);
778 }
779
780 #[test]
528 fn receive_after_create() { 781 fn receive_after_create() {
529 let f = async { 782 let f = async {
530 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); 783 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();