diff options
| author | Peter Krull <[email protected]> | 2024-02-28 20:59:38 +0100 |
|---|---|---|
| committer | Peter Krull <[email protected]> | 2024-02-28 20:59:38 +0100 |
| commit | 6defb4fed98432dee948634f3b2001cb4ea7ec5b (patch) | |
| tree | a51aeb731240e41cb106054131b45f8e643b3690 /embassy-sync/src/watch.rs | |
| parent | 2f58d1968a7310335a0dac4d947c6972a7707ed5 (diff) | |
Changed name to `Watch`, added `DynReceiver`, `get`-method and more reworks.
Diffstat (limited to 'embassy-sync/src/watch.rs')
| -rw-r--r-- | embassy-sync/src/watch.rs | 515 |
1 files changed, 515 insertions, 0 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs new file mode 100644 index 000000000..7e5e92741 --- /dev/null +++ b/embassy-sync/src/watch.rs | |||
| @@ -0,0 +1,515 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** tasks. | ||
| 2 | |||
| 3 | use core::cell::RefCell; | ||
| 4 | use core::future::poll_fn; | ||
| 5 | use core::marker::PhantomData; | ||
| 6 | use core::ops::{Deref, DerefMut}; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use crate::blocking_mutex::raw::RawMutex; | ||
| 10 | use crate::blocking_mutex::Mutex; | ||
| 11 | use crate::waitqueue::MultiWakerRegistration; | ||
| 12 | |||
| 13 | /// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. | ||
| 14 | /// | ||
| 15 | /// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to | ||
| 16 | /// `.await` the latest value, and all receive it. | ||
| 17 | /// | ||
| 18 | /// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except | ||
| 19 | /// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead | ||
| 20 | /// of waiting for the receivers to pop the previous value. | ||
| 21 | /// | ||
| 22 | /// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other | ||
| 23 | /// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for | ||
| 24 | /// [`Receiver`]s to "lose" stale values. | ||
| 25 | /// | ||
| 26 | /// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared | ||
| 27 | /// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`] | ||
| 28 | /// with [`Watch::receiver`] which has async methods. | ||
| 29 | /// ``` | ||
| 30 | /// | ||
| 31 | /// use futures_executor::block_on; | ||
| 32 | /// use embassy_sync::watch::Watch; | ||
| 33 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 34 | /// | ||
| 35 | /// let f = async { | ||
| 36 | /// | ||
| 37 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 38 | /// | ||
| 39 | /// // Obtain Receivers | ||
| 40 | /// let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 41 | /// let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 42 | /// assert!(WATCH.receiver().is_err()); | ||
| 43 | /// | ||
| 44 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 45 | /// | ||
| 46 | /// WATCH.write(10); | ||
| 47 | /// assert_eq!(WATCH.try_peek(), Some(10)); | ||
| 48 | /// | ||
| 49 | /// | ||
| 50 | /// // Receive the new value | ||
| 51 | /// assert_eq!(rcv0.changed().await, 10); | ||
| 52 | /// assert_eq!(rcv1.try_changed(), Some(10)); | ||
| 53 | /// | ||
| 54 | /// // No update | ||
| 55 | /// assert_eq!(rcv0.try_changed(), None); | ||
| 56 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 57 | /// | ||
| 58 | /// WATCH.write(20); | ||
| 59 | /// | ||
| 60 | /// // Defference `between` peek `get`. | ||
| 61 | /// assert_eq!(rcv0.peek().await, 20); | ||
| 62 | /// assert_eq!(rcv1.get().await, 20); | ||
| 63 | /// | ||
| 64 | /// assert_eq!(rcv0.try_changed(), Some(20)); | ||
| 65 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 66 | /// | ||
| 67 | /// }; | ||
| 68 | /// block_on(f); | ||
| 69 | /// ``` | ||
| 70 | pub struct Watch<M: RawMutex, T: Clone, const N: usize> { | ||
| 71 | mutex: Mutex<M, RefCell<WatchState<N, T>>>, | ||
| 72 | } | ||
| 73 | |||
| 74 | struct WatchState<const N: usize, T: Clone> { | ||
| 75 | data: Option<T>, | ||
| 76 | current_id: u64, | ||
| 77 | wakers: MultiWakerRegistration<N>, | ||
| 78 | receiver_count: usize, | ||
| 79 | } | ||
| 80 | |||
| 81 | /// A trait representing the 'inner' behavior of the `Watch`. | ||
| 82 | pub trait WatchBehavior<T: Clone> { | ||
| 83 | /// Poll the `Watch` for the current value, **without** making it as seen. | ||
| 84 | fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>; | ||
| 85 | |||
| 86 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. | ||
| 87 | fn inner_try_peek(&self) -> Option<T>; | ||
| 88 | |||
| 89 | /// Poll the `Watch` for the current value, making it as seen. | ||
| 90 | fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 91 | |||
| 92 | /// Tries to get the value of the `Watch`, marking it as seen. | ||
| 93 | fn inner_try_get(&self, id: &mut u64) -> Option<T>; | ||
| 94 | |||
| 95 | /// Poll the `Watch` for a changed value, marking it as seen. | ||
| 96 | fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 97 | |||
| 98 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | ||
| 99 | fn inner_try_changed(&self, id: &mut u64) -> Option<T>; | ||
| 100 | |||
| 101 | /// Checks if the `Watch` is been initialized with a value. | ||
| 102 | fn inner_contains_value(&self) -> bool; | ||
| 103 | } | ||
| 104 | |||
| 105 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { | ||
| 106 | fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 107 | self.mutex.lock(|state| { | ||
| 108 | let mut s = state.borrow_mut(); | ||
| 109 | match &s.data { | ||
| 110 | Some(data) => Poll::Ready(data.clone()), | ||
| 111 | None => { | ||
| 112 | s.wakers.register(cx.waker()); | ||
| 113 | Poll::Pending | ||
| 114 | } | ||
| 115 | } | ||
| 116 | }) | ||
| 117 | } | ||
| 118 | |||
| 119 | fn inner_try_peek(&self) -> Option<T> { | ||
| 120 | self.mutex.lock(|state| state.borrow().data.clone()) | ||
| 121 | } | ||
| 122 | |||
| 123 | fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 124 | self.mutex.lock(|state| { | ||
| 125 | let mut s = state.borrow_mut(); | ||
| 126 | match &s.data { | ||
| 127 | Some(data) => { | ||
| 128 | *id = s.current_id; | ||
| 129 | Poll::Ready(data.clone()) | ||
| 130 | } | ||
| 131 | None => { | ||
| 132 | s.wakers.register(cx.waker()); | ||
| 133 | Poll::Pending | ||
| 134 | } | ||
| 135 | } | ||
| 136 | }) | ||
| 137 | } | ||
| 138 | |||
| 139 | fn inner_try_get(&self, id: &mut u64) -> Option<T> { | ||
| 140 | self.mutex.lock(|state| { | ||
| 141 | let s = state.borrow(); | ||
| 142 | *id = s.current_id; | ||
| 143 | state.borrow().data.clone() | ||
| 144 | }) | ||
| 145 | } | ||
| 146 | |||
| 147 | fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 148 | self.mutex.lock(|state| { | ||
| 149 | let mut s = state.borrow_mut(); | ||
| 150 | match (&s.data, s.current_id > *id) { | ||
| 151 | (Some(data), true) => { | ||
| 152 | *id = s.current_id; | ||
| 153 | Poll::Ready(data.clone()) | ||
| 154 | } | ||
| 155 | _ => { | ||
| 156 | s.wakers.register(cx.waker()); | ||
| 157 | Poll::Pending | ||
| 158 | } | ||
| 159 | } | ||
| 160 | }) | ||
| 161 | } | ||
| 162 | |||
| 163 | fn inner_try_changed(&self, id: &mut u64) -> Option<T> { | ||
| 164 | self.mutex.lock(|state| { | ||
| 165 | let s = state.borrow(); | ||
| 166 | match s.current_id > *id { | ||
| 167 | true => { | ||
| 168 | *id = s.current_id; | ||
| 169 | state.borrow().data.clone() | ||
| 170 | } | ||
| 171 | false => None, | ||
| 172 | } | ||
| 173 | }) | ||
| 174 | } | ||
| 175 | |||
| 176 | fn inner_contains_value(&self) -> bool { | ||
| 177 | self.mutex.lock(|state| state.borrow().data.is_some()) | ||
| 178 | } | ||
| 179 | } | ||
| 180 | |||
| 181 | #[derive(Debug)] | ||
| 182 | /// An error that can occur when a `Watch` returns a `Result::Err(_)`. | ||
| 183 | pub enum Error { | ||
| 184 | /// The maximum number of [`Receiver`](crate::watch::Receiver)/[`DynReceiver`](crate::watch::DynReceiver) has been reached. | ||
| 185 | MaximumReceiversReached, | ||
| 186 | } | ||
| 187 | |||
| 188 | impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | ||
| 189 | /// Create a new `Watch` channel. | ||
| 190 | pub const fn new() -> Self { | ||
| 191 | Self { | ||
| 192 | mutex: Mutex::new(RefCell::new(WatchState { | ||
| 193 | data: None, | ||
| 194 | current_id: 0, | ||
| 195 | wakers: MultiWakerRegistration::new(), | ||
| 196 | receiver_count: 0, | ||
| 197 | })), | ||
| 198 | } | ||
| 199 | } | ||
| 200 | |||
| 201 | /// Write a new value to the `Watch`. | ||
| 202 | pub fn write(&self, val: T) { | ||
| 203 | self.mutex.lock(|state| { | ||
| 204 | let mut s = state.borrow_mut(); | ||
| 205 | s.data = Some(val); | ||
| 206 | s.current_id += 1; | ||
| 207 | s.wakers.wake(); | ||
| 208 | }) | ||
| 209 | } | ||
| 210 | |||
| 211 | /// Create a new [`Receiver`] for the `Watch`. | ||
| 212 | pub fn receiver(&self) -> Result<Receiver<'_, M, T, N>, Error> { | ||
| 213 | self.mutex.lock(|state| { | ||
| 214 | let mut s = state.borrow_mut(); | ||
| 215 | if s.receiver_count < N { | ||
| 216 | s.receiver_count += 1; | ||
| 217 | Ok(Receiver(Rcv::new(self))) | ||
| 218 | } else { | ||
| 219 | Err(Error::MaximumReceiversReached) | ||
| 220 | } | ||
| 221 | }) | ||
| 222 | } | ||
| 223 | |||
| 224 | /// Create a new [`DynReceiver`] for the `Watch`. | ||
| 225 | pub fn dyn_receiver(&self) -> Result<DynReceiver<'_, T>, Error> { | ||
| 226 | self.mutex.lock(|state| { | ||
| 227 | let mut s = state.borrow_mut(); | ||
| 228 | if s.receiver_count < N { | ||
| 229 | s.receiver_count += 1; | ||
| 230 | Ok(DynReceiver(Rcv::new(self))) | ||
| 231 | } else { | ||
| 232 | Err(Error::MaximumReceiversReached) | ||
| 233 | } | ||
| 234 | }) | ||
| 235 | } | ||
| 236 | |||
| 237 | /// Tries to retrieve the value of the `Watch`. | ||
| 238 | pub fn try_peek(&self) -> Option<T> { | ||
| 239 | self.inner_try_peek() | ||
| 240 | } | ||
| 241 | |||
| 242 | /// Returns true if the `Watch` contains a value. | ||
| 243 | pub fn contains_value(&self) -> bool { | ||
| 244 | self.inner_contains_value() | ||
| 245 | } | ||
| 246 | |||
| 247 | /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. | ||
| 248 | pub fn clear(&self) { | ||
| 249 | self.mutex.lock(|state| { | ||
| 250 | let mut s = state.borrow_mut(); | ||
| 251 | s.data = None; | ||
| 252 | }) | ||
| 253 | } | ||
| 254 | } | ||
| 255 | |||
| 256 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 257 | pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 258 | watch: &'a W, | ||
| 259 | at_id: u64, | ||
| 260 | _phantom: PhantomData<T>, | ||
| 261 | } | ||
| 262 | |||
| 263 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | ||
| 264 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 265 | fn new(watch: &'a W) -> Self { | ||
| 266 | Self { | ||
| 267 | watch, | ||
| 268 | at_id: 0, | ||
| 269 | _phantom: PhantomData, | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. | ||
| 274 | pub async fn peek(&self) -> T { | ||
| 275 | poll_fn(|cx| self.watch.inner_poll_peek(cx)).await | ||
| 276 | } | ||
| 277 | |||
| 278 | /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. | ||
| 279 | pub fn try_peek(&self) -> Option<T> { | ||
| 280 | self.watch.inner_try_peek() | ||
| 281 | } | ||
| 282 | |||
| 283 | /// Returns the current value of the `Watch` if it is initialized, marking it as seen. | ||
| 284 | pub async fn get(&mut self) -> T { | ||
| 285 | poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await | ||
| 286 | } | ||
| 287 | |||
| 288 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 289 | pub fn try_get(&mut self) -> Option<T> { | ||
| 290 | self.watch.inner_try_get(&mut self.at_id) | ||
| 291 | } | ||
| 292 | |||
| 293 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | ||
| 294 | pub async fn changed(&mut self) -> T { | ||
| 295 | poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 299 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 300 | self.watch.inner_try_changed(&mut self.at_id) | ||
| 301 | } | ||
| 302 | |||
| 303 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 304 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. | ||
| 305 | pub fn contains_value(&self) -> bool { | ||
| 306 | self.watch.inner_contains_value() | ||
| 307 | } | ||
| 308 | } | ||
| 309 | |||
| 310 | /// A receiver of a `Watch` channel. | ||
| 311 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); | ||
| 312 | |||
| 313 | /// A receiver which holds a **reference** to a `Watch` channel. | ||
| 314 | /// | ||
| 315 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of | ||
| 316 | /// some runtime performance due to dynamic dispatch. | ||
| 317 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 318 | |||
| 319 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | ||
| 320 | type Target = Rcv<'a, T, Watch<M, T, N>>; | ||
| 321 | |||
| 322 | fn deref(&self) -> &Self::Target { | ||
| 323 | &self.0 | ||
| 324 | } | ||
| 325 | } | ||
| 326 | |||
| 327 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { | ||
| 328 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 329 | &mut self.0 | ||
| 330 | } | ||
| 331 | } | ||
| 332 | |||
| 333 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { | ||
| 334 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 335 | |||
| 336 | fn deref(&self) -> &Self::Target { | ||
| 337 | &self.0 | ||
| 338 | } | ||
| 339 | } | ||
| 340 | |||
| 341 | impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { | ||
| 342 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 343 | &mut self.0 | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | #[cfg(test)] | ||
| 348 | mod tests { | ||
| 349 | use futures_executor::block_on; | ||
| 350 | |||
| 351 | use super::*; | ||
| 352 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 353 | |||
| 354 | #[test] | ||
| 355 | fn multiple_writes() { | ||
| 356 | let f = async { | ||
| 357 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 358 | |||
| 359 | // Obtain Receivers | ||
| 360 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 361 | let mut rcv1 = WATCH.dyn_receiver().unwrap(); | ||
| 362 | |||
| 363 | WATCH.write(10); | ||
| 364 | |||
| 365 | // Receive the new value | ||
| 366 | assert_eq!(rcv0.changed().await, 10); | ||
| 367 | assert_eq!(rcv1.changed().await, 10); | ||
| 368 | |||
| 369 | // No update | ||
| 370 | assert_eq!(rcv0.try_changed(), None); | ||
| 371 | assert_eq!(rcv1.try_changed(), None); | ||
| 372 | |||
| 373 | WATCH.write(20); | ||
| 374 | |||
| 375 | assert_eq!(rcv0.changed().await, 20); | ||
| 376 | assert_eq!(rcv1.changed().await, 20); | ||
| 377 | }; | ||
| 378 | block_on(f); | ||
| 379 | } | ||
| 380 | |||
| 381 | #[test] | ||
| 382 | fn max_receivers() { | ||
| 383 | let f = async { | ||
| 384 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 385 | |||
| 386 | // Obtain Receivers | ||
| 387 | let _ = WATCH.receiver().unwrap(); | ||
| 388 | let _ = WATCH.receiver().unwrap(); | ||
| 389 | assert!(WATCH.receiver().is_err()); | ||
| 390 | }; | ||
| 391 | block_on(f); | ||
| 392 | } | ||
| 393 | |||
| 394 | #[test] | ||
| 395 | fn receive_initial() { | ||
| 396 | let f = async { | ||
| 397 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 398 | |||
| 399 | // Obtain Receivers | ||
| 400 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 401 | let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 402 | |||
| 403 | assert_eq!(rcv0.contains_value(), false); | ||
| 404 | |||
| 405 | assert_eq!(rcv0.try_changed(), None); | ||
| 406 | assert_eq!(rcv1.try_changed(), None); | ||
| 407 | |||
| 408 | WATCH.write(0); | ||
| 409 | |||
| 410 | assert_eq!(rcv0.contains_value(), true); | ||
| 411 | |||
| 412 | assert_eq!(rcv0.try_changed(), Some(0)); | ||
| 413 | assert_eq!(rcv1.try_changed(), Some(0)); | ||
| 414 | }; | ||
| 415 | block_on(f); | ||
| 416 | } | ||
| 417 | |||
| 418 | #[test] | ||
| 419 | fn peek_get_changed() { | ||
| 420 | let f = async { | ||
| 421 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 422 | |||
| 423 | // Obtain Receivers | ||
| 424 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 425 | |||
| 426 | WATCH.write(10); | ||
| 427 | |||
| 428 | // Ensure peek does not mark as seen | ||
| 429 | assert_eq!(rcv0.peek().await, 10); | ||
| 430 | assert_eq!(rcv0.try_changed(), Some(10)); | ||
| 431 | assert_eq!(rcv0.try_changed(), None); | ||
| 432 | assert_eq!(rcv0.peek().await, 10); | ||
| 433 | |||
| 434 | WATCH.write(20); | ||
| 435 | |||
| 436 | // Ensure get does mark as seen | ||
| 437 | assert_eq!(rcv0.get().await, 20); | ||
| 438 | assert_eq!(rcv0.try_changed(), None); | ||
| 439 | assert_eq!(rcv0.try_get(), Some(20)); | ||
| 440 | }; | ||
| 441 | block_on(f); | ||
| 442 | } | ||
| 443 | |||
| 444 | #[test] | ||
| 445 | fn count_ids() { | ||
| 446 | let f = async { | ||
| 447 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 448 | |||
| 449 | // Obtain Receivers | ||
| 450 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 451 | let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 452 | |||
| 453 | let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); | ||
| 454 | |||
| 455 | WATCH.write(10); | ||
| 456 | |||
| 457 | assert_eq!(rcv0.changed().await, 10); | ||
| 458 | assert_eq!(rcv1.changed().await, 10); | ||
| 459 | |||
| 460 | assert_eq!(rcv0.try_changed(), None); | ||
| 461 | assert_eq!(rcv1.try_changed(), None); | ||
| 462 | |||
| 463 | WATCH.write(20); | ||
| 464 | WATCH.write(20); | ||
| 465 | WATCH.write(20); | ||
| 466 | |||
| 467 | assert_eq!(rcv0.changed().await, 20); | ||
| 468 | assert_eq!(rcv1.changed().await, 20); | ||
| 469 | |||
| 470 | assert_eq!(rcv0.try_changed(), None); | ||
| 471 | assert_eq!(rcv1.try_changed(), None); | ||
| 472 | |||
| 473 | assert_eq!(get_id(), 4); | ||
| 474 | }; | ||
| 475 | block_on(f); | ||
| 476 | } | ||
| 477 | |||
| 478 | #[test] | ||
| 479 | fn peek_still_await() { | ||
| 480 | let f = async { | ||
| 481 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 482 | |||
| 483 | // Obtain Receivers | ||
| 484 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 485 | let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 486 | |||
| 487 | WATCH.write(10); | ||
| 488 | |||
| 489 | assert_eq!(rcv0.peek().await, 10); | ||
| 490 | assert_eq!(rcv1.try_peek(), Some(10)); | ||
| 491 | |||
| 492 | assert_eq!(rcv0.changed().await, 10); | ||
| 493 | assert_eq!(rcv1.changed().await, 10); | ||
| 494 | }; | ||
| 495 | block_on(f); | ||
| 496 | } | ||
| 497 | |||
| 498 | #[test] | ||
| 499 | fn peek_with_static() { | ||
| 500 | let f = async { | ||
| 501 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 502 | |||
| 503 | // Obtain Receivers | ||
| 504 | let rcv0 = WATCH.receiver().unwrap(); | ||
| 505 | let rcv1 = WATCH.receiver().unwrap(); | ||
| 506 | |||
| 507 | WATCH.write(20); | ||
| 508 | |||
| 509 | assert_eq!(rcv0.peek().await, 20); | ||
| 510 | assert_eq!(rcv1.peek().await, 20); | ||
| 511 | assert_eq!(WATCH.try_peek(), Some(20)); | ||
| 512 | }; | ||
| 513 | block_on(f); | ||
| 514 | } | ||
| 515 | } | ||
