diff options
| author | Peter Krull <[email protected]> | 2024-03-02 00:14:11 +0100 |
|---|---|---|
| committer | Peter Krull <[email protected]> | 2024-03-02 00:14:11 +0100 |
| commit | 311ab07a9af0029060813779038220481d1bf1c5 (patch) | |
| tree | 6658f1db44bfda4ec0e5dc61f2220ba40e1d804a /embassy-sync | |
| parent | df282aa23d00b2f3116081be2b07ba0c9f810fc3 (diff) | |
Reintroduce predicate methods. Add ability for sender to modify value in-place.
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/src/watch.rs | 267 |
1 files changed, 260 insertions, 7 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 01d82def4..520696f7d 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs | |||
| @@ -88,21 +88,48 @@ pub trait WatchBehavior<T: Clone> { | |||
| 88 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. | 88 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. |
| 89 | fn try_peek(&self) -> Option<T>; | 89 | fn try_peek(&self) -> Option<T>; |
| 90 | 90 | ||
| 91 | /// Poll the `Watch` for the value if it matches the predicate function | ||
| 92 | /// `f`, **without** making it as seen. | ||
| 93 | fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 94 | |||
| 95 | /// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen. | ||
| 96 | fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 97 | |||
| 91 | /// Poll the `Watch` for the current value, making it as seen. | 98 | /// Poll the `Watch` for the current value, making it as seen. |
| 92 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | 99 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
| 93 | 100 | ||
| 94 | /// Tries to get the value of the `Watch`, marking it as seen. | 101 | /// Tries to get the value of the `Watch`, marking it as seen. |
| 95 | fn try_get(&self, id: &mut u64) -> Option<T>; | 102 | fn try_get(&self, id: &mut u64) -> Option<T>; |
| 96 | 103 | ||
| 104 | /// Poll the `Watch` for the value if it matches the predicate function | ||
| 105 | /// `f`, making it as seen. | ||
| 106 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 107 | |||
| 108 | /// Tries to get the value of the `Watch` if it matches the predicate function | ||
| 109 | /// `f`, marking it as seen. | ||
| 110 | fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 111 | |||
| 97 | /// Poll the `Watch` for a changed value, marking it as seen. | 112 | /// Poll the `Watch` for a changed value, marking it as seen. |
| 98 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | 113 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
| 99 | 114 | ||
| 100 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | 115 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. |
| 101 | fn try_changed(&self, id: &mut u64) -> Option<T>; | 116 | fn try_changed(&self, id: &mut u64) -> Option<T>; |
| 102 | 117 | ||
| 118 | /// Poll the `Watch` for a changed value that matches the predicate function | ||
| 119 | /// `f`, marking it as seen. | ||
| 120 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 121 | |||
| 122 | /// Tries to retrieve the value of the `Watch` if it has changed and matches the | ||
| 123 | /// predicate function `f`, marking it as seen. | ||
| 124 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 125 | |||
| 103 | /// Checks if the `Watch` is been initialized with a value. | 126 | /// Checks if the `Watch` is been initialized with a value. |
| 104 | fn contains_value(&self) -> bool; | 127 | fn contains_value(&self) -> bool; |
| 105 | 128 | ||
| 129 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | ||
| 130 | /// `Watch` does not already contain a value. | ||
| 131 | fn modify(&self, f: &mut dyn Fn(&mut Option<T>)); | ||
| 132 | |||
| 106 | /// Used when a receiver is dropped to decrement the receiver count. | 133 | /// Used when a receiver is dropped to decrement the receiver count. |
| 107 | /// | 134 | /// |
| 108 | /// ## This method should not be called by the user. | 135 | /// ## This method should not be called by the user. |
| @@ -143,6 +170,29 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 143 | self.mutex.lock(|state| state.borrow().data.clone()) | 170 | self.mutex.lock(|state| state.borrow().data.clone()) |
| 144 | } | 171 | } |
| 145 | 172 | ||
| 173 | fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 174 | self.mutex.lock(|state| { | ||
| 175 | let mut s = state.borrow_mut(); | ||
| 176 | match s.data { | ||
| 177 | Some(ref data) if f(data) => Poll::Ready(data.clone()), | ||
| 178 | _ => { | ||
| 179 | s.wakers.register(cx.waker()); | ||
| 180 | Poll::Pending | ||
| 181 | } | ||
| 182 | } | ||
| 183 | }) | ||
| 184 | } | ||
| 185 | |||
| 186 | fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 187 | self.mutex.lock(|state| { | ||
| 188 | let s = state.borrow(); | ||
| 189 | match s.data { | ||
| 190 | Some(ref data) if f(data) => Some(data.clone()), | ||
| 191 | _ => None, | ||
| 192 | } | ||
| 193 | }) | ||
| 194 | } | ||
| 195 | |||
| 146 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | 196 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
| 147 | self.mutex.lock(|state| { | 197 | self.mutex.lock(|state| { |
| 148 | let mut s = state.borrow_mut(); | 198 | let mut s = state.borrow_mut(); |
| @@ -167,6 +217,35 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 167 | }) | 217 | }) |
| 168 | } | 218 | } |
| 169 | 219 | ||
| 220 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 221 | self.mutex.lock(|state| { | ||
| 222 | let mut s = state.borrow_mut(); | ||
| 223 | match s.data { | ||
| 224 | Some(ref data) if f(data) => { | ||
| 225 | *id = s.current_id; | ||
| 226 | Poll::Ready(data.clone()) | ||
| 227 | } | ||
| 228 | _ => { | ||
| 229 | s.wakers.register(cx.waker()); | ||
| 230 | Poll::Pending | ||
| 231 | } | ||
| 232 | } | ||
| 233 | }) | ||
| 234 | } | ||
| 235 | |||
| 236 | fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 237 | self.mutex.lock(|state| { | ||
| 238 | let s = state.borrow(); | ||
| 239 | match s.data { | ||
| 240 | Some(ref data) if f(data) => { | ||
| 241 | *id = s.current_id; | ||
| 242 | Some(data.clone()) | ||
| 243 | } | ||
| 244 | _ => None, | ||
| 245 | } | ||
| 246 | }) | ||
| 247 | } | ||
| 248 | |||
| 170 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | 249 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
| 171 | self.mutex.lock(|state| { | 250 | self.mutex.lock(|state| { |
| 172 | let mut s = state.borrow_mut(); | 251 | let mut s = state.borrow_mut(); |
| @@ -189,13 +268,42 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 189 | match s.current_id > *id { | 268 | match s.current_id > *id { |
| 190 | true => { | 269 | true => { |
| 191 | *id = s.current_id; | 270 | *id = s.current_id; |
| 192 | state.borrow().data.clone() | 271 | s.data.clone() |
| 193 | } | 272 | } |
| 194 | false => None, | 273 | false => None, |
| 195 | } | 274 | } |
| 196 | }) | 275 | }) |
| 197 | } | 276 | } |
| 198 | 277 | ||
| 278 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 279 | self.mutex.lock(|state| { | ||
| 280 | let mut s = state.borrow_mut(); | ||
| 281 | match (&s.data, s.current_id > *id) { | ||
| 282 | (Some(data), true) if f(data) => { | ||
| 283 | *id = s.current_id; | ||
| 284 | Poll::Ready(data.clone()) | ||
| 285 | } | ||
| 286 | _ => { | ||
| 287 | s.wakers.register(cx.waker()); | ||
| 288 | Poll::Pending | ||
| 289 | } | ||
| 290 | } | ||
| 291 | }) | ||
| 292 | } | ||
| 293 | |||
| 294 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 295 | self.mutex.lock(|state| { | ||
| 296 | let s = state.borrow(); | ||
| 297 | match (&s.data, s.current_id > *id) { | ||
| 298 | (Some(data), true) if f(data) => { | ||
| 299 | *id = s.current_id; | ||
| 300 | s.data.clone() | ||
| 301 | } | ||
| 302 | _ => None, | ||
| 303 | } | ||
| 304 | }) | ||
| 305 | } | ||
| 306 | |||
| 199 | fn contains_value(&self) -> bool { | 307 | fn contains_value(&self) -> bool { |
| 200 | self.mutex.lock(|state| state.borrow().data.is_some()) | 308 | self.mutex.lock(|state| state.borrow().data.is_some()) |
| 201 | } | 309 | } |
| @@ -206,6 +314,15 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 206 | s.receiver_count -= 1; | 314 | s.receiver_count -= 1; |
| 207 | }) | 315 | }) |
| 208 | } | 316 | } |
| 317 | |||
| 318 | fn modify(&self, f: &mut dyn Fn(&mut Option<T>)) { | ||
| 319 | self.mutex.lock(|state| { | ||
| 320 | let mut s = state.borrow_mut(); | ||
| 321 | f(&mut s.data); | ||
| 322 | s.current_id += 1; | ||
| 323 | s.wakers.wake(); | ||
| 324 | }) | ||
| 325 | } | ||
| 209 | } | 326 | } |
| 210 | 327 | ||
| 211 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | 328 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { |
| @@ -300,10 +417,27 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { | |||
| 300 | self.watch.try_peek() | 417 | self.watch.try_peek() |
| 301 | } | 418 | } |
| 302 | 419 | ||
| 420 | /// Tries to peek the current value of the `Watch` if it matches the predicate | ||
| 421 | /// function `f`. | ||
| 422 | pub fn try_peek_and<F>(&self, mut f: F) -> Option<T> | ||
| 423 | where | ||
| 424 | F: Fn(&T) -> bool, | ||
| 425 | { | ||
| 426 | self.watch.try_peek_and(&mut f) | ||
| 427 | } | ||
| 428 | |||
| 303 | /// Returns true if the `Watch` contains a value. | 429 | /// Returns true if the `Watch` contains a value. |
| 304 | pub fn contains_value(&self) -> bool { | 430 | pub fn contains_value(&self) -> bool { |
| 305 | self.watch.contains_value() | 431 | self.watch.contains_value() |
| 306 | } | 432 | } |
| 433 | |||
| 434 | /// Modify the value of the `Watch` using a closure. | ||
| 435 | pub fn modify<F>(&self, mut f: F) | ||
| 436 | where | ||
| 437 | F: Fn(&mut Option<T>), | ||
| 438 | { | ||
| 439 | self.watch.modify(&mut f) | ||
| 440 | } | ||
| 307 | } | 441 | } |
| 308 | 442 | ||
| 309 | /// A sender of a `Watch` channel. | 443 | /// A sender of a `Watch` channel. |
| @@ -399,6 +533,26 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 399 | self.watch.try_peek() | 533 | self.watch.try_peek() |
| 400 | } | 534 | } |
| 401 | 535 | ||
| 536 | /// Returns the current value of the `Watch` if it matches the predicate function `f`, | ||
| 537 | /// or waits for it to match, **without** marking it as seen. | ||
| 538 | /// | ||
| 539 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 540 | pub async fn peek_and<F>(&self, mut f: F) -> T | ||
| 541 | where | ||
| 542 | F: Fn(&T) -> bool, | ||
| 543 | { | ||
| 544 | poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await | ||
| 545 | } | ||
| 546 | |||
| 547 | /// Tries to peek the current value of the `Watch` if it matches the predicate | ||
| 548 | /// function `f` without waiting, and **without** marking it as seen. | ||
| 549 | pub fn try_peek_and<F>(&self, mut f: F) -> Option<T> | ||
| 550 | where | ||
| 551 | F: Fn(&T) -> bool, | ||
| 552 | { | ||
| 553 | self.watch.try_peek_and(&mut f) | ||
| 554 | } | ||
| 555 | |||
| 402 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. | 556 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. |
| 403 | /// | 557 | /// |
| 404 | /// **Note**: Futures do nothing unless you `.await` or poll them. | 558 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
| @@ -411,6 +565,26 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 411 | self.watch.try_get(&mut self.at_id) | 565 | self.watch.try_get(&mut self.at_id) |
| 412 | } | 566 | } |
| 413 | 567 | ||
| 568 | /// Returns the value of the `Watch` if it matches the predicate function `f`, | ||
| 569 | /// or waits for it to match, marking it as seen. | ||
| 570 | /// | ||
| 571 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 572 | pub async fn get_and<F>(&mut self, mut f: F) -> T | ||
| 573 | where | ||
| 574 | F: Fn(&T) -> bool, | ||
| 575 | { | ||
| 576 | poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await | ||
| 577 | } | ||
| 578 | |||
| 579 | /// Tries to get the current value of the `Watch` if it matches the predicate | ||
| 580 | /// function `f` without waiting, marking it as seen. | ||
| 581 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 582 | where | ||
| 583 | F: Fn(&T) -> bool, | ||
| 584 | { | ||
| 585 | self.watch.try_get_and(&mut self.at_id, &mut f) | ||
| 586 | } | ||
| 587 | |||
| 414 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | 588 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. |
| 415 | /// | 589 | /// |
| 416 | /// **Note**: Futures do nothing unless you `.await` or poll them. | 590 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
| @@ -423,6 +597,26 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 423 | self.watch.try_changed(&mut self.at_id) | 597 | self.watch.try_changed(&mut self.at_id) |
| 424 | } | 598 | } |
| 425 | 599 | ||
| 600 | /// Waits for the `Watch` to change to a value which satisfies the predicate | ||
| 601 | /// function `f` and returns the new value, marking it as seen. | ||
| 602 | /// | ||
| 603 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 604 | pub async fn changed_and<F>(&mut self, mut f: F) -> T | ||
| 605 | where | ||
| 606 | F: Fn(&T) -> bool, | ||
| 607 | { | ||
| 608 | poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await | ||
| 609 | } | ||
| 610 | |||
| 611 | /// Tries to get the new value of the watch which satisfies the predicate | ||
| 612 | /// function `f` and returns the new value without waiting, marking it as seen. | ||
| 613 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 614 | where | ||
| 615 | F: Fn(&T) -> bool, | ||
| 616 | { | ||
| 617 | self.watch.try_changed_and(&mut self.at_id, &mut f) | ||
| 618 | } | ||
| 619 | |||
| 426 | /// Checks if the `Watch` contains a value. If this returns true, | 620 | /// Checks if the `Watch` contains a value. If this returns true, |
| 427 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. | 621 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. |
| 428 | pub fn contains_value(&self) -> bool { | 622 | pub fn contains_value(&self) -> bool { |
| @@ -442,12 +636,9 @@ pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch< | |||
| 442 | impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { | 636 | impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { |
| 443 | /// Converts the `Receiver` into a [`DynReceiver`]. | 637 | /// Converts the `Receiver` into a [`DynReceiver`]. |
| 444 | pub fn as_dyn(self) -> DynReceiver<'a, T> { | 638 | pub fn as_dyn(self) -> DynReceiver<'a, T> { |
| 445 | // We need to increment the receiver count since the original | 639 | let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id)); |
| 446 | // receiver is being dropped, which decrements the count. | 640 | core::mem::forget(self); // Ensures the destructor is not called |
| 447 | self.watch.mutex.lock(|state| { | 641 | rcv |
| 448 | state.borrow_mut().receiver_count += 1; | ||
| 449 | }); | ||
| 450 | DynReceiver(Rcv::new(self.0.watch, self.at_id)) | ||
| 451 | } | 642 | } |
| 452 | } | 643 | } |
| 453 | 644 | ||
| @@ -525,6 +716,68 @@ mod tests { | |||
| 525 | } | 716 | } |
| 526 | 717 | ||
| 527 | #[test] | 718 | #[test] |
| 719 | fn sender_modify() { | ||
| 720 | let f = async { | ||
| 721 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 722 | |||
| 723 | // Obtain receiver and sender | ||
| 724 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 725 | let snd = WATCH.sender(); | ||
| 726 | |||
| 727 | // Receive the new value | ||
| 728 | snd.send(10); | ||
| 729 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 730 | |||
| 731 | // Modify the value inplace | ||
| 732 | snd.modify(|opt|{ | ||
| 733 | if let Some(inner) = opt { | ||
| 734 | *inner += 5; | ||
| 735 | } | ||
| 736 | }); | ||
| 737 | |||
| 738 | // Get the modified value | ||
| 739 | assert_eq!(rcv.try_changed(), Some(15)); | ||
| 740 | assert_eq!(rcv.try_changed(), None); | ||
| 741 | }; | ||
| 742 | block_on(f); | ||
| 743 | } | ||
| 744 | |||
| 745 | #[test] | ||
| 746 | fn predicate_fn() { | ||
| 747 | let f = async { | ||
| 748 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 749 | |||
| 750 | // Obtain receiver and sender | ||
| 751 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 752 | let snd = WATCH.sender(); | ||
| 753 | |||
| 754 | snd.send(10); | ||
| 755 | assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10)); | ||
| 756 | assert_eq!(rcv.try_peek_and(|x| x < &5), None); | ||
| 757 | assert!(rcv.try_changed().is_some()); | ||
| 758 | |||
| 759 | snd.send(15); | ||
| 760 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); | ||
| 761 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | ||
| 762 | assert!(rcv.try_changed().is_none()); | ||
| 763 | |||
| 764 | snd.send(20); | ||
| 765 | assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20)); | ||
| 766 | assert_eq!(rcv.try_changed_and(|x| x > &5), None); | ||
| 767 | |||
| 768 | snd.send(25); | ||
| 769 | assert_eq!(rcv.try_changed_and(|x| x < &5), None); | ||
| 770 | assert_eq!(rcv.try_changed(), Some(25)); | ||
| 771 | |||
| 772 | snd.send(30); | ||
| 773 | assert_eq!(rcv.changed_and(|x| x > &5).await, 30); | ||
| 774 | assert_eq!(rcv.peek_and(|x| x > &5).await, 30); | ||
| 775 | assert_eq!(rcv.get_and(|x| x > &5).await, 30); | ||
| 776 | }; | ||
| 777 | block_on(f); | ||
| 778 | } | ||
| 779 | |||
| 780 | #[test] | ||
| 528 | fn receive_after_create() { | 781 | fn receive_after_create() { |
| 529 | let f = async { | 782 | let f = async { |
| 530 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | 783 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
