diff options
| author | Peter Krull <[email protected]> | 2024-09-23 20:09:35 +0200 |
|---|---|---|
| committer | Peter Krull <[email protected]> | 2024-09-23 20:09:35 +0200 |
| commit | a669611d7c8fb17666f35086ea20c476b6029854 (patch) | |
| tree | 6e33d16acca31da04324fbf2903395950b913f76 /embassy-sync/src/watch.rs | |
| parent | a2c473306f4a7c8e99add2546450ab3a7a97436e (diff) | |
Discontinue peek, add AnonReceiver
Diffstat (limited to 'embassy-sync/src/watch.rs')
| -rw-r--r-- | embassy-sync/src/watch.rs | 440 |
1 files changed, 289 insertions, 151 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 298c09d43..1b4a8b589 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs | |||
| @@ -20,7 +20,9 @@ use crate::waitqueue::MultiWakerRegistration; | |||
| 20 | /// always provided with the latest value. | 20 | /// always provided with the latest value. |
| 21 | /// | 21 | /// |
| 22 | /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] | 22 | /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] |
| 23 | /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program. | 23 | /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`] |
| 24 | /// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the | ||
| 25 | /// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel. | ||
| 24 | /// ``` | 26 | /// ``` |
| 25 | /// | 27 | /// |
| 26 | /// use futures_executor::block_on; | 28 | /// use futures_executor::block_on; |
| @@ -41,25 +43,25 @@ use crate::waitqueue::MultiWakerRegistration; | |||
| 41 | /// assert_eq!(rcv1.try_changed(), None); | 43 | /// assert_eq!(rcv1.try_changed(), None); |
| 42 | /// | 44 | /// |
| 43 | /// snd.send(10); | 45 | /// snd.send(10); |
| 44 | /// | 46 | /// |
| 45 | /// // Receive the new value (async or try) | 47 | /// // Receive the new value (async or try) |
| 46 | /// assert_eq!(rcv0.changed().await, 10); | 48 | /// assert_eq!(rcv0.changed().await, 10); |
| 47 | /// assert_eq!(rcv1.try_changed(), Some(10)); | 49 | /// assert_eq!(rcv1.try_changed(), Some(10)); |
| 48 | /// | 50 | /// |
| 49 | /// // No update | 51 | /// // No update |
| 50 | /// assert_eq!(rcv0.try_changed(), None); | 52 | /// assert_eq!(rcv0.try_changed(), None); |
| 51 | /// assert_eq!(rcv1.try_changed(), None); | 53 | /// assert_eq!(rcv1.try_changed(), None); |
| 52 | /// | 54 | /// |
| 53 | /// snd.send(20); | 55 | /// snd.send(20); |
| 54 | /// | 56 | /// |
| 55 | /// // Peek does not mark the value as seen | 57 | /// // Using `get` marks the value as seen |
| 56 | /// assert_eq!(rcv0.peek().await, 20); | ||
| 57 | /// assert_eq!(rcv0.try_changed(), Some(20)); | ||
| 58 | /// | ||
| 59 | /// // Get marks the value as seen | ||
| 60 | /// assert_eq!(rcv1.get().await, 20); | 58 | /// assert_eq!(rcv1.get().await, 20); |
| 61 | /// assert_eq!(rcv1.try_changed(), None); | 59 | /// assert_eq!(rcv1.try_changed(), None); |
| 62 | /// | 60 | /// |
| 61 | /// // But `get` also returns when unchanged | ||
| 62 | /// assert_eq!(rcv1.get().await, 20); | ||
| 63 | /// assert_eq!(rcv1.get().await, 20); | ||
| 64 | /// | ||
| 63 | /// }; | 65 | /// }; |
| 64 | /// block_on(f); | 66 | /// block_on(f); |
| 65 | /// ``` | 67 | /// ``` |
| @@ -82,24 +84,11 @@ pub trait WatchBehavior<T: Clone> { | |||
| 82 | /// Clears the value of the `Watch`. | 84 | /// Clears the value of the `Watch`. |
| 83 | fn clear(&self); | 85 | fn clear(&self); |
| 84 | 86 | ||
| 85 | /// Poll the `Watch` for the current value, **without** making it as seen. | ||
| 86 | fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>; | ||
| 87 | |||
| 88 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. | ||
| 89 | fn try_peek(&self) -> Option<T>; | ||
| 90 | |||
| 91 | /// Poll the `Watch` for the value if it matches the predicate function | ||
| 92 | /// `f`, **without** making it as seen. | ||
| 93 | fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 94 | |||
| 95 | /// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen. | ||
| 96 | fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 97 | |||
| 98 | /// Poll the `Watch` for the current value, making it as seen. | 87 | /// Poll the `Watch` for the current value, making it as seen. |
| 99 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | 88 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
| 100 | 89 | ||
| 101 | /// Tries to get the value of the `Watch`, marking it as seen. | 90 | /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. |
| 102 | fn try_get(&self, id: &mut u64) -> Option<T>; | 91 | fn try_get(&self, id: Option<&mut u64>) -> Option<T>; |
| 103 | 92 | ||
| 104 | /// Poll the `Watch` for the value if it matches the predicate function | 93 | /// Poll the `Watch` for the value if it matches the predicate function |
| 105 | /// `f`, making it as seen. | 94 | /// `f`, making it as seen. |
| @@ -107,9 +96,9 @@ pub trait WatchBehavior<T: Clone> { | |||
| 107 | 96 | ||
| 108 | /// Tries to get the value of the `Watch` if it matches the predicate function | 97 | /// Tries to get the value of the `Watch` if it matches the predicate function |
| 109 | /// `f`, marking it as seen. | 98 | /// `f`, marking it as seen. |
| 110 | fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | 99 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>; |
| 111 | 100 | ||
| 112 | /// Poll the `Watch` for a changed value, marking it as seen. | 101 | /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. |
| 113 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | 102 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
| 114 | 103 | ||
| 115 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | 104 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. |
| @@ -128,7 +117,11 @@ pub trait WatchBehavior<T: Clone> { | |||
| 128 | 117 | ||
| 129 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | 118 | /// Modify the value of the `Watch` using a closure. Returns `false` if the |
| 130 | /// `Watch` does not already contain a value. | 119 | /// `Watch` does not already contain a value. |
| 131 | fn modify(&self, f: &mut dyn Fn(&mut Option<T>)); | 120 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)); |
| 121 | |||
| 122 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | ||
| 123 | /// `Watch` does not already contain a value. | ||
| 124 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool); | ||
| 132 | 125 | ||
| 133 | /// Used when a receiver is dropped to decrement the receiver count. | 126 | /// Used when a receiver is dropped to decrement the receiver count. |
| 134 | /// | 127 | /// |
| @@ -153,46 +146,6 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 153 | }) | 146 | }) |
| 154 | } | 147 | } |
| 155 | 148 | ||
| 156 | fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 157 | self.mutex.lock(|state| { | ||
| 158 | let mut s = state.borrow_mut(); | ||
| 159 | match &s.data { | ||
| 160 | Some(data) => Poll::Ready(data.clone()), | ||
| 161 | None => { | ||
| 162 | s.wakers.register(cx.waker()); | ||
| 163 | Poll::Pending | ||
| 164 | } | ||
| 165 | } | ||
| 166 | }) | ||
| 167 | } | ||
| 168 | |||
| 169 | fn try_peek(&self) -> Option<T> { | ||
| 170 | self.mutex.lock(|state| state.borrow().data.clone()) | ||
| 171 | } | ||
| 172 | |||
| 173 | fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 174 | self.mutex.lock(|state| { | ||
| 175 | let mut s = state.borrow_mut(); | ||
| 176 | match s.data { | ||
| 177 | Some(ref data) if f(data) => Poll::Ready(data.clone()), | ||
| 178 | _ => { | ||
| 179 | s.wakers.register(cx.waker()); | ||
| 180 | Poll::Pending | ||
| 181 | } | ||
| 182 | } | ||
| 183 | }) | ||
| 184 | } | ||
| 185 | |||
| 186 | fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 187 | self.mutex.lock(|state| { | ||
| 188 | let s = state.borrow(); | ||
| 189 | match s.data { | ||
| 190 | Some(ref data) if f(data) => Some(data.clone()), | ||
| 191 | _ => None, | ||
| 192 | } | ||
| 193 | }) | ||
| 194 | } | ||
| 195 | |||
| 196 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | 149 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
| 197 | self.mutex.lock(|state| { | 150 | self.mutex.lock(|state| { |
| 198 | let mut s = state.borrow_mut(); | 151 | let mut s = state.borrow_mut(); |
| @@ -209,11 +162,13 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 209 | }) | 162 | }) |
| 210 | } | 163 | } |
| 211 | 164 | ||
| 212 | fn try_get(&self, id: &mut u64) -> Option<T> { | 165 | fn try_get(&self, id: Option<&mut u64>) -> Option<T> { |
| 213 | self.mutex.lock(|state| { | 166 | self.mutex.lock(|state| { |
| 214 | let s = state.borrow(); | 167 | let s = state.borrow(); |
| 215 | *id = s.current_id; | 168 | if let Some(id) = id { |
| 216 | state.borrow().data.clone() | 169 | *id = s.current_id; |
| 170 | } | ||
| 171 | s.data.clone() | ||
| 217 | }) | 172 | }) |
| 218 | } | 173 | } |
| 219 | 174 | ||
| @@ -233,12 +188,14 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 233 | }) | 188 | }) |
| 234 | } | 189 | } |
| 235 | 190 | ||
| 236 | fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | 191 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> { |
| 237 | self.mutex.lock(|state| { | 192 | self.mutex.lock(|state| { |
| 238 | let s = state.borrow(); | 193 | let s = state.borrow(); |
| 239 | match s.data { | 194 | match s.data { |
| 240 | Some(ref data) if f(data) => { | 195 | Some(ref data) if f(data) => { |
| 241 | *id = s.current_id; | 196 | if let Some(id) = id { |
| 197 | *id = s.current_id; | ||
| 198 | } | ||
| 242 | Some(data.clone()) | 199 | Some(data.clone()) |
| 243 | } | 200 | } |
| 244 | _ => None, | 201 | _ => None, |
| @@ -315,7 +272,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 315 | }) | 272 | }) |
| 316 | } | 273 | } |
| 317 | 274 | ||
| 318 | fn modify(&self, f: &mut dyn Fn(&mut Option<T>)) { | 275 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) { |
| 319 | self.mutex.lock(|state| { | 276 | self.mutex.lock(|state| { |
| 320 | let mut s = state.borrow_mut(); | 277 | let mut s = state.borrow_mut(); |
| 321 | f(&mut s.data); | 278 | f(&mut s.data); |
| @@ -323,6 +280,16 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 323 | s.wakers.wake(); | 280 | s.wakers.wake(); |
| 324 | }) | 281 | }) |
| 325 | } | 282 | } |
| 283 | |||
| 284 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) { | ||
| 285 | self.mutex.lock(|state| { | ||
| 286 | let mut s = state.borrow_mut(); | ||
| 287 | if f(&mut s.data) { | ||
| 288 | s.current_id += 1; | ||
| 289 | s.wakers.wake(); | ||
| 290 | } | ||
| 291 | }) | ||
| 292 | } | ||
| 326 | } | 293 | } |
| 327 | 294 | ||
| 328 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | 295 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { |
| @@ -375,6 +342,60 @@ impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | |||
| 375 | } | 342 | } |
| 376 | }) | 343 | }) |
| 377 | } | 344 | } |
| 345 | |||
| 346 | /// Try to create a new [`AnonReceiver`] for the `Watch`. | ||
| 347 | pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> { | ||
| 348 | AnonReceiver(AnonRcv::new(self, 0)) | ||
| 349 | } | ||
| 350 | |||
| 351 | /// Try to create a new [`DynAnonReceiver`] for the `Watch`. | ||
| 352 | pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> { | ||
| 353 | DynAnonReceiver(AnonRcv::new(self, 0)) | ||
| 354 | } | ||
| 355 | |||
| 356 | /// Returns the message ID of the latest message sent to the `Watch`. | ||
| 357 | /// | ||
| 358 | /// This counter is monotonic, and is incremented every time a new message is sent. | ||
| 359 | pub fn get_msg_id(&self) -> u64 { | ||
| 360 | self.mutex.lock(|state| state.borrow().current_id) | ||
| 361 | } | ||
| 362 | |||
| 363 | /// Waits for the `Watch` to be initialized with a value using a busy-wait mechanism. | ||
| 364 | /// | ||
| 365 | /// This is useful for initialization code where receivers may only be interested in | ||
| 366 | /// awaiting the value once in the lifetime of the program. It is therefore a temporaryily | ||
| 367 | /// CPU-inefficient operation, while being more memory efficient than using a `Receiver`. | ||
| 368 | /// | ||
| 369 | /// **Note** Be careful about using this within an InterruptExecutor, as it will starve | ||
| 370 | /// tasks in lower-priority executors. | ||
| 371 | pub async fn spin_get(&self) -> T { | ||
| 372 | poll_fn(|cx| { | ||
| 373 | self.mutex.lock(|state| { | ||
| 374 | let s = state.borrow(); | ||
| 375 | match &s.data { | ||
| 376 | Some(data) => Poll::Ready(data.clone()), | ||
| 377 | None => { | ||
| 378 | cx.waker().wake_by_ref(); | ||
| 379 | Poll::Pending | ||
| 380 | } | ||
| 381 | } | ||
| 382 | }) | ||
| 383 | }) | ||
| 384 | .await | ||
| 385 | } | ||
| 386 | |||
| 387 | /// Tries to get the value of the `Watch`. | ||
| 388 | pub fn try_get(&self) -> Option<T> { | ||
| 389 | WatchBehavior::try_get(self, None) | ||
| 390 | } | ||
| 391 | |||
| 392 | /// Tries to get the value of the `Watch` if it matches the predicate function `f`. | ||
| 393 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> | ||
| 394 | where | ||
| 395 | F: Fn(&T) -> bool, | ||
| 396 | { | ||
| 397 | WatchBehavior::try_get_and(self, None, &mut f) | ||
| 398 | } | ||
| 378 | } | 399 | } |
| 379 | 400 | ||
| 380 | /// A receiver can `.await` a change in the `Watch` value. | 401 | /// A receiver can `.await` a change in the `Watch` value. |
| @@ -407,23 +428,23 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { | |||
| 407 | } | 428 | } |
| 408 | 429 | ||
| 409 | /// Clears the value of the `Watch`. | 430 | /// Clears the value of the `Watch`. |
| 410 | /// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. | 431 | /// This will cause calls to [`Rcv::get`] to be pending. |
| 411 | pub fn clear(&self) { | 432 | pub fn clear(&self) { |
| 412 | self.watch.clear() | 433 | self.watch.clear() |
| 413 | } | 434 | } |
| 414 | 435 | ||
| 415 | /// Tries to retrieve the value of the `Watch`. | 436 | /// Tries to retrieve the value of the `Watch`. |
| 416 | pub fn try_peek(&self) -> Option<T> { | 437 | pub fn try_get(&self) -> Option<T> { |
| 417 | self.watch.try_peek() | 438 | self.watch.try_get(None) |
| 418 | } | 439 | } |
| 419 | 440 | ||
| 420 | /// Tries to peek the current value of the `Watch` if it matches the predicate | 441 | /// Tries to peek the current value of the `Watch` if it matches the predicate |
| 421 | /// function `f`. | 442 | /// function `f`. |
| 422 | pub fn try_peek_and<F>(&self, mut f: F) -> Option<T> | 443 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> |
| 423 | where | 444 | where |
| 424 | F: Fn(&T) -> bool, | 445 | F: Fn(&T) -> bool, |
| 425 | { | 446 | { |
| 426 | self.watch.try_peek_and(&mut f) | 447 | self.watch.try_get_and(None, &mut f) |
| 427 | } | 448 | } |
| 428 | 449 | ||
| 429 | /// Returns true if the `Watch` contains a value. | 450 | /// Returns true if the `Watch` contains a value. |
| @@ -432,11 +453,20 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { | |||
| 432 | } | 453 | } |
| 433 | 454 | ||
| 434 | /// Modify the value of the `Watch` using a closure. | 455 | /// Modify the value of the `Watch` using a closure. |
| 435 | pub fn modify<F>(&self, mut f: F) | 456 | pub fn send_modify<F>(&self, mut f: F) |
| 436 | where | 457 | where |
| 437 | F: Fn(&mut Option<T>), | 458 | F: Fn(&mut Option<T>), |
| 438 | { | 459 | { |
| 439 | self.watch.modify(&mut f) | 460 | self.watch.send_modify(&mut f) |
| 461 | } | ||
| 462 | |||
| 463 | /// Modify the value of the `Watch` using a closure. The closure must return | ||
| 464 | /// `true` if the value was modified, which notifies all receivers. | ||
| 465 | pub fn send_if_modified<F>(&self, mut f: F) | ||
| 466 | where | ||
| 467 | F: Fn(&mut Option<T>) -> bool, | ||
| 468 | { | ||
| 469 | self.watch.send_if_modified(&mut f) | ||
| 440 | } | 470 | } |
| 441 | } | 471 | } |
| 442 | 472 | ||
| @@ -521,38 +551,6 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 521 | } | 551 | } |
| 522 | } | 552 | } |
| 523 | 553 | ||
| 524 | /// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen. | ||
| 525 | /// | ||
| 526 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 527 | pub async fn peek(&self) -> T { | ||
| 528 | poll_fn(|cx| self.watch.poll_peek(cx)).await | ||
| 529 | } | ||
| 530 | |||
| 531 | /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. | ||
| 532 | pub fn try_peek(&self) -> Option<T> { | ||
| 533 | self.watch.try_peek() | ||
| 534 | } | ||
| 535 | |||
| 536 | /// Returns the current value of the `Watch` if it matches the predicate function `f`, | ||
| 537 | /// or waits for it to match, **without** marking it as seen. | ||
| 538 | /// | ||
| 539 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 540 | pub async fn peek_and<F>(&self, mut f: F) -> T | ||
| 541 | where | ||
| 542 | F: Fn(&T) -> bool, | ||
| 543 | { | ||
| 544 | poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await | ||
| 545 | } | ||
| 546 | |||
| 547 | /// Tries to peek the current value of the `Watch` if it matches the predicate | ||
| 548 | /// function `f` without waiting, and **without** marking it as seen. | ||
| 549 | pub fn try_peek_and<F>(&self, mut f: F) -> Option<T> | ||
| 550 | where | ||
| 551 | F: Fn(&T) -> bool, | ||
| 552 | { | ||
| 553 | self.watch.try_peek_and(&mut f) | ||
| 554 | } | ||
| 555 | |||
| 556 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. | 554 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. |
| 557 | /// | 555 | /// |
| 558 | /// **Note**: Futures do nothing unless you `.await` or poll them. | 556 | /// **Note**: Futures do nothing unless you `.await` or poll them. |
| @@ -562,7 +560,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 562 | 560 | ||
| 563 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | 561 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. |
| 564 | pub fn try_get(&mut self) -> Option<T> { | 562 | pub fn try_get(&mut self) -> Option<T> { |
| 565 | self.watch.try_get(&mut self.at_id) | 563 | self.watch.try_get(Some(&mut self.at_id)) |
| 566 | } | 564 | } |
| 567 | 565 | ||
| 568 | /// Returns the value of the `Watch` if it matches the predicate function `f`, | 566 | /// Returns the value of the `Watch` if it matches the predicate function `f`, |
| @@ -582,7 +580,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 582 | where | 580 | where |
| 583 | F: Fn(&T) -> bool, | 581 | F: Fn(&T) -> bool, |
| 584 | { | 582 | { |
| 585 | self.watch.try_get_and(&mut self.at_id, &mut f) | 583 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) |
| 586 | } | 584 | } |
| 587 | 585 | ||
| 588 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | 586 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. |
| @@ -618,7 +616,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | |||
| 618 | } | 616 | } |
| 619 | 617 | ||
| 620 | /// Checks if the `Watch` contains a value. If this returns true, | 618 | /// Checks if the `Watch` contains a value. If this returns true, |
| 621 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. | 619 | /// then awaiting [`Rcv::get`] will return immediately. |
| 622 | pub fn contains_value(&self) -> bool { | 620 | pub fn contains_value(&self) -> bool { |
| 623 | self.watch.contains_value() | 621 | self.watch.contains_value() |
| 624 | } | 622 | } |
| @@ -630,6 +628,58 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> { | |||
| 630 | } | 628 | } |
| 631 | } | 629 | } |
| 632 | 630 | ||
| 631 | /// A anonymous receiver can NOT `.await` a change in the `Watch` value. | ||
| 632 | pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 633 | watch: &'a W, | ||
| 634 | at_id: u64, | ||
| 635 | _phantom: PhantomData<T>, | ||
| 636 | } | ||
| 637 | |||
| 638 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> { | ||
| 639 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 640 | fn new(watch: &'a W, at_id: u64) -> Self { | ||
| 641 | Self { | ||
| 642 | watch, | ||
| 643 | at_id, | ||
| 644 | _phantom: PhantomData, | ||
| 645 | } | ||
| 646 | } | ||
| 647 | |||
| 648 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 649 | pub fn try_get(&mut self) -> Option<T> { | ||
| 650 | self.watch.try_get(Some(&mut self.at_id)) | ||
| 651 | } | ||
| 652 | |||
| 653 | /// Tries to get the current value of the `Watch` if it matches the predicate | ||
| 654 | /// function `f` without waiting, marking it as seen. | ||
| 655 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 656 | where | ||
| 657 | F: Fn(&T) -> bool, | ||
| 658 | { | ||
| 659 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) | ||
| 660 | } | ||
| 661 | |||
| 662 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 663 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 664 | self.watch.try_changed(&mut self.at_id) | ||
| 665 | } | ||
| 666 | |||
| 667 | /// Tries to get the new value of the watch which satisfies the predicate | ||
| 668 | /// function `f` and returns the new value without waiting, marking it as seen. | ||
| 669 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 670 | where | ||
| 671 | F: Fn(&T) -> bool, | ||
| 672 | { | ||
| 673 | self.watch.try_changed_and(&mut self.at_id, &mut f) | ||
| 674 | } | ||
| 675 | |||
| 676 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 677 | /// then awaiting [`Rcv::get`] will return immediately. | ||
| 678 | pub fn contains_value(&self) -> bool { | ||
| 679 | self.watch.contains_value() | ||
| 680 | } | ||
| 681 | } | ||
| 682 | |||
| 633 | /// A receiver of a `Watch` channel. | 683 | /// A receiver of a `Watch` channel. |
| 634 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); | 684 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); |
| 635 | 685 | ||
| @@ -682,6 +732,58 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { | |||
| 682 | } | 732 | } |
| 683 | } | 733 | } |
| 684 | 734 | ||
| 735 | /// A receiver of a `Watch` channel that cannot `.await` values. | ||
| 736 | pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>); | ||
| 737 | |||
| 738 | impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { | ||
| 739 | /// Converts the `Receiver` into a [`DynReceiver`]. | ||
| 740 | pub fn as_dyn(self) -> DynAnonReceiver<'a, T> { | ||
| 741 | let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id)); | ||
| 742 | core::mem::forget(self); // Ensures the destructor is not called | ||
| 743 | rcv | ||
| 744 | } | ||
| 745 | } | ||
| 746 | |||
| 747 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> { | ||
| 748 | fn into(self) -> DynAnonReceiver<'a, T> { | ||
| 749 | self.as_dyn() | ||
| 750 | } | ||
| 751 | } | ||
| 752 | |||
| 753 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> { | ||
| 754 | type Target = AnonRcv<'a, T, Watch<M, T, N>>; | ||
| 755 | |||
| 756 | fn deref(&self) -> &Self::Target { | ||
| 757 | &self.0 | ||
| 758 | } | ||
| 759 | } | ||
| 760 | |||
| 761 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> { | ||
| 762 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 763 | &mut self.0 | ||
| 764 | } | ||
| 765 | } | ||
| 766 | |||
| 767 | /// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel. | ||
| 768 | /// | ||
| 769 | /// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of | ||
| 770 | /// some runtime performance due to dynamic dispatch. | ||
| 771 | pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 772 | |||
| 773 | impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> { | ||
| 774 | type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 775 | |||
| 776 | fn deref(&self) -> &Self::Target { | ||
| 777 | &self.0 | ||
| 778 | } | ||
| 779 | } | ||
| 780 | |||
| 781 | impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> { | ||
| 782 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 783 | &mut self.0 | ||
| 784 | } | ||
| 785 | } | ||
| 786 | |||
| 685 | #[cfg(test)] | 787 | #[cfg(test)] |
| 686 | mod tests { | 788 | mod tests { |
| 687 | use futures_executor::block_on; | 789 | use futures_executor::block_on; |
| @@ -716,6 +818,72 @@ mod tests { | |||
| 716 | } | 818 | } |
| 717 | 819 | ||
| 718 | #[test] | 820 | #[test] |
| 821 | fn all_try_get() { | ||
| 822 | let f = async { | ||
| 823 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 824 | |||
| 825 | // Obtain receiver and sender | ||
| 826 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 827 | let snd = WATCH.sender(); | ||
| 828 | |||
| 829 | // Not initialized | ||
| 830 | assert_eq!(WATCH.try_get(), None); | ||
| 831 | assert_eq!(rcv.try_get(), None); | ||
| 832 | assert_eq!(snd.try_get(), None); | ||
| 833 | |||
| 834 | // Receive the new value | ||
| 835 | snd.send(10); | ||
| 836 | assert_eq!(WATCH.try_get(), Some(10)); | ||
| 837 | assert_eq!(rcv.try_get(), Some(10)); | ||
| 838 | assert_eq!(snd.try_get(), Some(10)); | ||
| 839 | |||
| 840 | assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10)); | ||
| 841 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(10)); | ||
| 842 | assert_eq!(snd.try_get_and(|x| x > &5), Some(10)); | ||
| 843 | |||
| 844 | assert_eq!(WATCH.try_get_and(|x| x < &5), None); | ||
| 845 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | ||
| 846 | assert_eq!(snd.try_get_and(|x| x < &5), None); | ||
| 847 | }; | ||
| 848 | block_on(f); | ||
| 849 | } | ||
| 850 | |||
| 851 | #[test] | ||
| 852 | fn once_lock_like() { | ||
| 853 | let f = async { | ||
| 854 | static CONFIG0: u8 = 10; | ||
| 855 | static CONFIG1: u8 = 20; | ||
| 856 | |||
| 857 | static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new(); | ||
| 858 | |||
| 859 | // Obtain receiver and sender | ||
| 860 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 861 | let snd = WATCH.sender(); | ||
| 862 | |||
| 863 | // Not initialized | ||
| 864 | assert_eq!(rcv.try_changed(), None); | ||
| 865 | |||
| 866 | // Receive the new value | ||
| 867 | snd.send(&CONFIG0); | ||
| 868 | let rcv0 = rcv.changed().await; | ||
| 869 | assert_eq!(rcv0, &10); | ||
| 870 | |||
| 871 | // Receive another value | ||
| 872 | snd.send(&CONFIG1); | ||
| 873 | let rcv1 = rcv.try_changed(); | ||
| 874 | assert_eq!(rcv1, Some(&20)); | ||
| 875 | |||
| 876 | // No update | ||
| 877 | assert_eq!(rcv.try_changed(), None); | ||
| 878 | |||
| 879 | // Ensure similarity with original static | ||
| 880 | assert_eq!(rcv0, &CONFIG0); | ||
| 881 | assert_eq!(rcv1, Some(&CONFIG1)); | ||
| 882 | }; | ||
| 883 | block_on(f); | ||
| 884 | } | ||
| 885 | |||
| 886 | #[test] | ||
| 719 | fn sender_modify() { | 887 | fn sender_modify() { |
| 720 | let f = async { | 888 | let f = async { |
| 721 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | 889 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
| @@ -729,7 +897,7 @@ mod tests { | |||
| 729 | assert_eq!(rcv.try_changed(), Some(10)); | 897 | assert_eq!(rcv.try_changed(), Some(10)); |
| 730 | 898 | ||
| 731 | // Modify the value inplace | 899 | // Modify the value inplace |
| 732 | snd.modify(|opt| { | 900 | snd.send_modify(|opt| { |
| 733 | if let Some(inner) = opt { | 901 | if let Some(inner) = opt { |
| 734 | *inner += 5; | 902 | *inner += 5; |
| 735 | } | 903 | } |
| @@ -751,11 +919,6 @@ mod tests { | |||
| 751 | let mut rcv = WATCH.receiver().unwrap(); | 919 | let mut rcv = WATCH.receiver().unwrap(); |
| 752 | let snd = WATCH.sender(); | 920 | let snd = WATCH.sender(); |
| 753 | 921 | ||
| 754 | snd.send(10); | ||
| 755 | assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10)); | ||
| 756 | assert_eq!(rcv.try_peek_and(|x| x < &5), None); | ||
| 757 | assert!(rcv.try_changed().is_some()); | ||
| 758 | |||
| 759 | snd.send(15); | 922 | snd.send(15); |
| 760 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); | 923 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); |
| 761 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | 924 | assert_eq!(rcv.try_get_and(|x| x < &5), None); |
| @@ -771,7 +934,6 @@ mod tests { | |||
| 771 | 934 | ||
| 772 | snd.send(30); | 935 | snd.send(30); |
| 773 | assert_eq!(rcv.changed_and(|x| x > &5).await, 30); | 936 | assert_eq!(rcv.changed_and(|x| x > &5).await, 30); |
| 774 | assert_eq!(rcv.peek_and(|x| x > &5).await, 30); | ||
| 775 | assert_eq!(rcv.get_and(|x| x > &5).await, 30); | 937 | assert_eq!(rcv.get_and(|x| x > &5).await, 30); |
| 776 | }; | 938 | }; |
| 777 | block_on(f); | 939 | block_on(f); |
| @@ -825,7 +987,7 @@ mod tests { | |||
| 825 | 987 | ||
| 826 | // Obtain receivers and sender | 988 | // Obtain receivers and sender |
| 827 | let mut rcv0 = WATCH.receiver().unwrap(); | 989 | let mut rcv0 = WATCH.receiver().unwrap(); |
| 828 | let mut rcv1 = WATCH.receiver().unwrap(); | 990 | let mut rcv1 = WATCH.anon_receiver(); |
| 829 | let snd = WATCH.sender(); | 991 | let snd = WATCH.sender(); |
| 830 | 992 | ||
| 831 | // No update for both | 993 | // No update for both |
| @@ -865,40 +1027,12 @@ mod tests { | |||
| 865 | } | 1027 | } |
| 866 | 1028 | ||
| 867 | #[test] | 1029 | #[test] |
| 868 | fn peek_get_changed() { | ||
| 869 | let f = async { | ||
| 870 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 871 | |||
| 872 | // Obtain receiver and sender | ||
| 873 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 874 | let snd = WATCH.sender(); | ||
| 875 | |||
| 876 | // Send a value | ||
| 877 | snd.send(10); | ||
| 878 | |||
| 879 | // Ensure peek does not mark as seen | ||
| 880 | assert_eq!(rcv.peek().await, 10); | ||
| 881 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 882 | assert_eq!(rcv.try_changed(), None); | ||
| 883 | assert_eq!(rcv.try_peek(), Some(10)); | ||
| 884 | |||
| 885 | // Send a value | ||
| 886 | snd.send(20); | ||
| 887 | |||
| 888 | // Ensure get does mark as seen | ||
| 889 | assert_eq!(rcv.get().await, 20); | ||
| 890 | assert_eq!(rcv.try_changed(), None); | ||
| 891 | assert_eq!(rcv.try_get(), Some(20)); | ||
| 892 | }; | ||
| 893 | block_on(f); | ||
| 894 | } | ||
| 895 | |||
| 896 | #[test] | ||
| 897 | fn use_dynamics() { | 1030 | fn use_dynamics() { |
| 898 | let f = async { | 1031 | let f = async { |
| 899 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 1032 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 900 | 1033 | ||
| 901 | // Obtain receiver and sender | 1034 | // Obtain receiver and sender |
| 1035 | let mut anon_rcv = WATCH.dyn_anon_receiver(); | ||
| 902 | let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); | 1036 | let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); |
| 903 | let dyn_snd = WATCH.dyn_sender(); | 1037 | let dyn_snd = WATCH.dyn_sender(); |
| 904 | 1038 | ||
| @@ -906,6 +1040,7 @@ mod tests { | |||
| 906 | dyn_snd.send(10); | 1040 | dyn_snd.send(10); |
| 907 | 1041 | ||
| 908 | // Ensure the dynamic receiver receives the value | 1042 | // Ensure the dynamic receiver receives the value |
| 1043 | assert_eq!(anon_rcv.try_changed(), Some(10)); | ||
| 909 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | 1044 | assert_eq!(dyn_rcv.try_changed(), Some(10)); |
| 910 | assert_eq!(dyn_rcv.try_changed(), None); | 1045 | assert_eq!(dyn_rcv.try_changed(), None); |
| 911 | }; | 1046 | }; |
| @@ -918,10 +1053,12 @@ mod tests { | |||
| 918 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 1053 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 919 | 1054 | ||
| 920 | // Obtain receiver and sender | 1055 | // Obtain receiver and sender |
| 1056 | let anon_rcv = WATCH.anon_receiver(); | ||
| 921 | let rcv = WATCH.receiver().unwrap(); | 1057 | let rcv = WATCH.receiver().unwrap(); |
| 922 | let snd = WATCH.sender(); | 1058 | let snd = WATCH.sender(); |
| 923 | 1059 | ||
| 924 | // Convert to dynamic | 1060 | // Convert to dynamic |
| 1061 | let mut dyn_anon_rcv = anon_rcv.as_dyn(); | ||
| 925 | let mut dyn_rcv = rcv.as_dyn(); | 1062 | let mut dyn_rcv = rcv.as_dyn(); |
| 926 | let dyn_snd = snd.as_dyn(); | 1063 | let dyn_snd = snd.as_dyn(); |
| 927 | 1064 | ||
| @@ -929,6 +1066,7 @@ mod tests { | |||
| 929 | dyn_snd.send(10); | 1066 | dyn_snd.send(10); |
| 930 | 1067 | ||
| 931 | // Ensure the dynamic receiver receives the value | 1068 | // Ensure the dynamic receiver receives the value |
| 1069 | assert_eq!(dyn_anon_rcv.try_changed(), Some(10)); | ||
| 932 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | 1070 | assert_eq!(dyn_rcv.try_changed(), Some(10)); |
| 933 | assert_eq!(dyn_rcv.try_changed(), None); | 1071 | assert_eq!(dyn_rcv.try_changed(), None); |
| 934 | }; | 1072 | }; |
