diff options
| author | Peter Krull <[email protected]> | 2024-02-29 16:45:44 +0100 |
|---|---|---|
| committer | Peter Krull <[email protected]> | 2024-02-29 16:45:44 +0100 |
| commit | ae2f10992149279884ea564b00eb18c8bf1f464e (patch) | |
| tree | a5f69cb268570fcd7844b6e4f79da5ebd4872be6 /embassy-sync/src/watch.rs | |
| parent | 6defb4fed98432dee948634f3b2001cb4ea7ec5b (diff) | |
Added sender types, support for dropping receivers, converting to dyn-types, revised tests.
Diffstat (limited to 'embassy-sync/src/watch.rs')
| -rw-r--r-- | embassy-sync/src/watch.rs | 521 |
1 files changed, 374 insertions, 147 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 7e5e92741..3e22b1e7b 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs | |||
| @@ -1,4 +1,4 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** tasks. | 1 | //! A synchronization primitive for passing the latest value to **multiple** receivers. |
| 2 | 2 | ||
| 3 | use core::cell::RefCell; | 3 | use core::cell::RefCell; |
| 4 | use core::future::poll_fn; | 4 | use core::future::poll_fn; |
| @@ -10,22 +10,17 @@ use crate::blocking_mutex::raw::RawMutex; | |||
| 10 | use crate::blocking_mutex::Mutex; | 10 | use crate::blocking_mutex::Mutex; |
| 11 | use crate::waitqueue::MultiWakerRegistration; | 11 | use crate::waitqueue::MultiWakerRegistration; |
| 12 | 12 | ||
| 13 | /// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. | 13 | /// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await |
| 14 | /// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers, | ||
| 15 | /// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous | ||
| 16 | /// value when a new one is sent, without waiting for all receivers to read the previous value. | ||
| 14 | /// | 17 | /// |
| 15 | /// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to | 18 | /// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks |
| 16 | /// `.await` the latest value, and all receive it. | 19 | /// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are |
| 20 | /// always provided with the latest value. | ||
| 17 | /// | 21 | /// |
| 18 | /// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except | 22 | /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] |
| 19 | /// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead | 23 | /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program. |
| 20 | /// of waiting for the receivers to pop the previous value. | ||
| 21 | /// | ||
| 22 | /// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other | ||
| 23 | /// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for | ||
| 24 | /// [`Receiver`]s to "lose" stale values. | ||
| 25 | /// | ||
| 26 | /// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared | ||
| 27 | /// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`] | ||
| 28 | /// with [`Watch::receiver`] which has async methods. | ||
| 29 | /// ``` | 24 | /// ``` |
| 30 | /// | 25 | /// |
| 31 | /// use futures_executor::block_on; | 26 | /// use futures_executor::block_on; |
| @@ -36,18 +31,18 @@ use crate::waitqueue::MultiWakerRegistration; | |||
| 36 | /// | 31 | /// |
| 37 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 32 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 38 | /// | 33 | /// |
| 39 | /// // Obtain Receivers | 34 | /// // Obtain receivers and sender |
| 40 | /// let mut rcv0 = WATCH.receiver().unwrap(); | 35 | /// let mut rcv0 = WATCH.receiver().unwrap(); |
| 41 | /// let mut rcv1 = WATCH.receiver().unwrap(); | 36 | /// let mut rcv1 = WATCH.dyn_receiver().unwrap(); |
| 42 | /// assert!(WATCH.receiver().is_err()); | 37 | /// let mut snd = WATCH.sender(); |
| 43 | /// | 38 | /// |
| 39 | /// // No more receivers, and no update | ||
| 40 | /// assert!(WATCH.receiver().is_err()); | ||
| 44 | /// assert_eq!(rcv1.try_changed(), None); | 41 | /// assert_eq!(rcv1.try_changed(), None); |
| 45 | /// | 42 | /// |
| 46 | /// WATCH.write(10); | 43 | /// snd.send(10); |
| 47 | /// assert_eq!(WATCH.try_peek(), Some(10)); | ||
| 48 | /// | ||
| 49 | /// | 44 | /// |
| 50 | /// // Receive the new value | 45 | /// // Receive the new value (async or try) |
| 51 | /// assert_eq!(rcv0.changed().await, 10); | 46 | /// assert_eq!(rcv0.changed().await, 10); |
| 52 | /// assert_eq!(rcv1.try_changed(), Some(10)); | 47 | /// assert_eq!(rcv1.try_changed(), Some(10)); |
| 53 | /// | 48 | /// |
| @@ -55,13 +50,14 @@ use crate::waitqueue::MultiWakerRegistration; | |||
| 55 | /// assert_eq!(rcv0.try_changed(), None); | 50 | /// assert_eq!(rcv0.try_changed(), None); |
| 56 | /// assert_eq!(rcv1.try_changed(), None); | 51 | /// assert_eq!(rcv1.try_changed(), None); |
| 57 | /// | 52 | /// |
| 58 | /// WATCH.write(20); | 53 | /// snd.send(20); |
| 59 | /// | 54 | /// |
| 60 | /// // Defference `between` peek `get`. | 55 | /// // Peek does not mark the value as seen |
| 61 | /// assert_eq!(rcv0.peek().await, 20); | 56 | /// assert_eq!(rcv0.peek().await, 20); |
| 62 | /// assert_eq!(rcv1.get().await, 20); | ||
| 63 | /// | ||
| 64 | /// assert_eq!(rcv0.try_changed(), Some(20)); | 57 | /// assert_eq!(rcv0.try_changed(), Some(20)); |
| 58 | /// | ||
| 59 | /// // Get marks the value as seen | ||
| 60 | /// assert_eq!(rcv1.get().await, 20); | ||
| 65 | /// assert_eq!(rcv1.try_changed(), None); | 61 | /// assert_eq!(rcv1.try_changed(), None); |
| 66 | /// | 62 | /// |
| 67 | /// }; | 63 | /// }; |
| @@ -80,30 +76,57 @@ struct WatchState<const N: usize, T: Clone> { | |||
| 80 | 76 | ||
| 81 | /// A trait representing the 'inner' behavior of the `Watch`. | 77 | /// A trait representing the 'inner' behavior of the `Watch`. |
| 82 | pub trait WatchBehavior<T: Clone> { | 78 | pub trait WatchBehavior<T: Clone> { |
| 79 | /// Sends a new value to the `Watch`. | ||
| 80 | fn send(&self, val: T); | ||
| 81 | |||
| 82 | /// Clears the value of the `Watch`. | ||
| 83 | fn clear(&self); | ||
| 84 | |||
| 83 | /// Poll the `Watch` for the current value, **without** making it as seen. | 85 | /// Poll the `Watch` for the current value, **without** making it as seen. |
| 84 | fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>; | 86 | fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>; |
| 85 | 87 | ||
| 86 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. | 88 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. |
| 87 | fn inner_try_peek(&self) -> Option<T>; | 89 | fn try_peek(&self) -> Option<T>; |
| 88 | 90 | ||
| 89 | /// Poll the `Watch` for the current value, making it as seen. | 91 | /// Poll the `Watch` for the current value, making it as seen. |
| 90 | fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | 92 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
| 91 | 93 | ||
| 92 | /// Tries to get the value of the `Watch`, marking it as seen. | 94 | /// Tries to get the value of the `Watch`, marking it as seen. |
| 93 | fn inner_try_get(&self, id: &mut u64) -> Option<T>; | 95 | fn try_get(&self, id: &mut u64) -> Option<T>; |
| 94 | 96 | ||
| 95 | /// Poll the `Watch` for a changed value, marking it as seen. | 97 | /// Poll the `Watch` for a changed value, marking it as seen. |
| 96 | fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | 98 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; |
| 97 | 99 | ||
| 98 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | 100 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. |
| 99 | fn inner_try_changed(&self, id: &mut u64) -> Option<T>; | 101 | fn try_changed(&self, id: &mut u64) -> Option<T>; |
| 100 | 102 | ||
| 101 | /// Checks if the `Watch` is been initialized with a value. | 103 | /// Checks if the `Watch` is been initialized with a value. |
| 102 | fn inner_contains_value(&self) -> bool; | 104 | fn contains_value(&self) -> bool; |
| 105 | |||
| 106 | /// Used when a receiver is dropped to decrement the receiver count. | ||
| 107 | /// | ||
| 108 | /// ## This method should not be called by the user. | ||
| 109 | fn drop_receiver(&self); | ||
| 103 | } | 110 | } |
| 104 | 111 | ||
| 105 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { | 112 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { |
| 106 | fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> { | 113 | fn send(&self, val: T) { |
| 114 | self.mutex.lock(|state| { | ||
| 115 | let mut s = state.borrow_mut(); | ||
| 116 | s.data = Some(val); | ||
| 117 | s.current_id += 1; | ||
| 118 | s.wakers.wake(); | ||
| 119 | }) | ||
| 120 | } | ||
| 121 | |||
| 122 | fn clear(&self) { | ||
| 123 | self.mutex.lock(|state| { | ||
| 124 | let mut s = state.borrow_mut(); | ||
| 125 | s.data = None; | ||
| 126 | }) | ||
| 127 | } | ||
| 128 | |||
| 129 | fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 107 | self.mutex.lock(|state| { | 130 | self.mutex.lock(|state| { |
| 108 | let mut s = state.borrow_mut(); | 131 | let mut s = state.borrow_mut(); |
| 109 | match &s.data { | 132 | match &s.data { |
| @@ -116,11 +139,11 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 116 | }) | 139 | }) |
| 117 | } | 140 | } |
| 118 | 141 | ||
| 119 | fn inner_try_peek(&self) -> Option<T> { | 142 | fn try_peek(&self) -> Option<T> { |
| 120 | self.mutex.lock(|state| state.borrow().data.clone()) | 143 | self.mutex.lock(|state| state.borrow().data.clone()) |
| 121 | } | 144 | } |
| 122 | 145 | ||
| 123 | fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | 146 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
| 124 | self.mutex.lock(|state| { | 147 | self.mutex.lock(|state| { |
| 125 | let mut s = state.borrow_mut(); | 148 | let mut s = state.borrow_mut(); |
| 126 | match &s.data { | 149 | match &s.data { |
| @@ -136,7 +159,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 136 | }) | 159 | }) |
| 137 | } | 160 | } |
| 138 | 161 | ||
| 139 | fn inner_try_get(&self, id: &mut u64) -> Option<T> { | 162 | fn try_get(&self, id: &mut u64) -> Option<T> { |
| 140 | self.mutex.lock(|state| { | 163 | self.mutex.lock(|state| { |
| 141 | let s = state.borrow(); | 164 | let s = state.borrow(); |
| 142 | *id = s.current_id; | 165 | *id = s.current_id; |
| @@ -144,7 +167,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 144 | }) | 167 | }) |
| 145 | } | 168 | } |
| 146 | 169 | ||
| 147 | fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | 170 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { |
| 148 | self.mutex.lock(|state| { | 171 | self.mutex.lock(|state| { |
| 149 | let mut s = state.borrow_mut(); | 172 | let mut s = state.borrow_mut(); |
| 150 | match (&s.data, s.current_id > *id) { | 173 | match (&s.data, s.current_id > *id) { |
| @@ -160,7 +183,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 160 | }) | 183 | }) |
| 161 | } | 184 | } |
| 162 | 185 | ||
| 163 | fn inner_try_changed(&self, id: &mut u64) -> Option<T> { | 186 | fn try_changed(&self, id: &mut u64) -> Option<T> { |
| 164 | self.mutex.lock(|state| { | 187 | self.mutex.lock(|state| { |
| 165 | let s = state.borrow(); | 188 | let s = state.borrow(); |
| 166 | match s.current_id > *id { | 189 | match s.current_id > *id { |
| @@ -173,9 +196,16 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> | |||
| 173 | }) | 196 | }) |
| 174 | } | 197 | } |
| 175 | 198 | ||
| 176 | fn inner_contains_value(&self) -> bool { | 199 | fn contains_value(&self) -> bool { |
| 177 | self.mutex.lock(|state| state.borrow().data.is_some()) | 200 | self.mutex.lock(|state| state.borrow().data.is_some()) |
| 178 | } | 201 | } |
| 202 | |||
| 203 | fn drop_receiver(&self) { | ||
| 204 | self.mutex.lock(|state| { | ||
| 205 | let mut s = state.borrow_mut(); | ||
| 206 | s.receiver_count -= 1; | ||
| 207 | }) | ||
| 208 | } | ||
| 179 | } | 209 | } |
| 180 | 210 | ||
| 181 | #[derive(Debug)] | 211 | #[derive(Debug)] |
| @@ -198,14 +228,14 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | |||
| 198 | } | 228 | } |
| 199 | } | 229 | } |
| 200 | 230 | ||
| 201 | /// Write a new value to the `Watch`. | 231 | /// Create a new [`Receiver`] for the `Watch`. |
| 202 | pub fn write(&self, val: T) { | 232 | pub fn sender(&self) -> Sender<'_, M, T, N> { |
| 203 | self.mutex.lock(|state| { | 233 | Sender(Snd::new(self)) |
| 204 | let mut s = state.borrow_mut(); | 234 | } |
| 205 | s.data = Some(val); | 235 | |
| 206 | s.current_id += 1; | 236 | /// Create a new [`DynReceiver`] for the `Watch`. |
| 207 | s.wakers.wake(); | 237 | pub fn dyn_sender(&self) -> DynSender<'_, T> { |
| 208 | }) | 238 | DynSender(Snd::new(self)) |
| 209 | } | 239 | } |
| 210 | 240 | ||
| 211 | /// Create a new [`Receiver`] for the `Watch`. | 241 | /// Create a new [`Receiver`] for the `Watch`. |
| @@ -214,7 +244,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | |||
| 214 | let mut s = state.borrow_mut(); | 244 | let mut s = state.borrow_mut(); |
| 215 | if s.receiver_count < N { | 245 | if s.receiver_count < N { |
| 216 | s.receiver_count += 1; | 246 | s.receiver_count += 1; |
| 217 | Ok(Receiver(Rcv::new(self))) | 247 | Ok(Receiver(Rcv::new(self, 0))) |
| 218 | } else { | 248 | } else { |
| 219 | Err(Error::MaximumReceiversReached) | 249 | Err(Error::MaximumReceiversReached) |
| 220 | } | 250 | } |
| @@ -227,29 +257,121 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | |||
| 227 | let mut s = state.borrow_mut(); | 257 | let mut s = state.borrow_mut(); |
| 228 | if s.receiver_count < N { | 258 | if s.receiver_count < N { |
| 229 | s.receiver_count += 1; | 259 | s.receiver_count += 1; |
| 230 | Ok(DynReceiver(Rcv::new(self))) | 260 | Ok(DynReceiver(Rcv::new(self, 0))) |
| 231 | } else { | 261 | } else { |
| 232 | Err(Error::MaximumReceiversReached) | 262 | Err(Error::MaximumReceiversReached) |
| 233 | } | 263 | } |
| 234 | }) | 264 | }) |
| 235 | } | 265 | } |
| 266 | } | ||
| 267 | |||
| 268 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 269 | pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 270 | watch: &'a W, | ||
| 271 | _phantom: PhantomData<T>, | ||
| 272 | } | ||
| 273 | |||
| 274 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> { | ||
| 275 | fn clone(&self) -> Self { | ||
| 276 | Self { | ||
| 277 | watch: self.watch, | ||
| 278 | _phantom: PhantomData, | ||
| 279 | } | ||
| 280 | } | ||
| 281 | } | ||
| 282 | |||
| 283 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { | ||
| 284 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 285 | fn new(watch: &'a W) -> Self { | ||
| 286 | Self { | ||
| 287 | watch, | ||
| 288 | _phantom: PhantomData, | ||
| 289 | } | ||
| 290 | } | ||
| 291 | |||
| 292 | /// Sends a new value to the `Watch`. | ||
| 293 | pub fn send(&self, val: T) { | ||
| 294 | self.watch.send(val) | ||
| 295 | } | ||
| 296 | |||
| 297 | /// Clears the value of the `Watch`. | ||
| 298 | /// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. | ||
| 299 | pub fn clear(&self) { | ||
| 300 | self.watch.clear() | ||
| 301 | } | ||
| 236 | 302 | ||
| 237 | /// Tries to retrieve the value of the `Watch`. | 303 | /// Tries to retrieve the value of the `Watch`. |
| 238 | pub fn try_peek(&self) -> Option<T> { | 304 | pub fn try_peek(&self) -> Option<T> { |
| 239 | self.inner_try_peek() | 305 | self.watch.try_peek() |
| 240 | } | 306 | } |
| 241 | 307 | ||
| 242 | /// Returns true if the `Watch` contains a value. | 308 | /// Returns true if the `Watch` contains a value. |
| 243 | pub fn contains_value(&self) -> bool { | 309 | pub fn contains_value(&self) -> bool { |
| 244 | self.inner_contains_value() | 310 | self.watch.contains_value() |
| 245 | } | 311 | } |
| 312 | } | ||
| 246 | 313 | ||
| 247 | /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. | 314 | /// A sender of a `Watch` channel. |
| 248 | pub fn clear(&self) { | 315 | /// |
| 249 | self.mutex.lock(|state| { | 316 | /// For a simpler type definition, consider [`DynSender`] at the expense of |
| 250 | let mut s = state.borrow_mut(); | 317 | /// some runtime performance due to dynamic dispatch. |
| 251 | s.data = None; | 318 | pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>); |
| 252 | }) | 319 | |
| 320 | impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { | ||
| 321 | fn clone(&self) -> Self { | ||
| 322 | Self(self.0.clone()) | ||
| 323 | } | ||
| 324 | } | ||
| 325 | |||
| 326 | impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> { | ||
| 327 | /// Converts the `Sender` into a [`DynSender`]. | ||
| 328 | pub fn as_dyn(self) -> DynSender<'a, T> { | ||
| 329 | DynSender(Snd::new(self.watch)) | ||
| 330 | } | ||
| 331 | } | ||
| 332 | |||
| 333 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> { | ||
| 334 | fn into(self) -> DynSender<'a, T> { | ||
| 335 | self.as_dyn() | ||
| 336 | } | ||
| 337 | } | ||
| 338 | |||
| 339 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> { | ||
| 340 | type Target = Snd<'a, T, Watch<M, T, N>>; | ||
| 341 | |||
| 342 | fn deref(&self) -> &Self::Target { | ||
| 343 | &self.0 | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> { | ||
| 348 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 349 | &mut self.0 | ||
| 350 | } | ||
| 351 | } | ||
| 352 | |||
| 353 | /// A sender which holds a **dynamic** reference to a `Watch` channel. | ||
| 354 | /// | ||
| 355 | /// This is an alternative to [`Sender`] with a simpler type definition, | ||
| 356 | pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 357 | |||
| 358 | impl<'a, T: Clone> Clone for DynSender<'a, T> { | ||
| 359 | fn clone(&self) -> Self { | ||
| 360 | Self(self.0.clone()) | ||
| 361 | } | ||
| 362 | } | ||
| 363 | |||
| 364 | impl<'a, T: Clone> Deref for DynSender<'a, T> { | ||
| 365 | type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 366 | |||
| 367 | fn deref(&self) -> &Self::Target { | ||
| 368 | &self.0 | ||
| 369 | } | ||
| 370 | } | ||
| 371 | |||
| 372 | impl<'a, T: Clone> DerefMut for DynSender<'a, T> { | ||
| 373 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 374 | &mut self.0 | ||
| 253 | } | 375 | } |
| 254 | } | 376 | } |
| 255 | 377 | ||
| @@ -262,59 +384,83 @@ pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | |||
| 262 | 384 | ||
| 263 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | 385 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { |
| 264 | /// Creates a new `Receiver` with a reference to the `Watch`. | 386 | /// Creates a new `Receiver` with a reference to the `Watch`. |
| 265 | fn new(watch: &'a W) -> Self { | 387 | fn new(watch: &'a W, at_id: u64) -> Self { |
| 266 | Self { | 388 | Self { |
| 267 | watch, | 389 | watch, |
| 268 | at_id: 0, | 390 | at_id, |
| 269 | _phantom: PhantomData, | 391 | _phantom: PhantomData, |
| 270 | } | 392 | } |
| 271 | } | 393 | } |
| 272 | 394 | ||
| 273 | /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. | 395 | /// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen. |
| 396 | /// | ||
| 397 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 274 | pub async fn peek(&self) -> T { | 398 | pub async fn peek(&self) -> T { |
| 275 | poll_fn(|cx| self.watch.inner_poll_peek(cx)).await | 399 | poll_fn(|cx| self.watch.poll_peek(cx)).await |
| 276 | } | 400 | } |
| 277 | 401 | ||
| 278 | /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. | 402 | /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. |
| 279 | pub fn try_peek(&self) -> Option<T> { | 403 | pub fn try_peek(&self) -> Option<T> { |
| 280 | self.watch.inner_try_peek() | 404 | self.watch.try_peek() |
| 281 | } | 405 | } |
| 282 | 406 | ||
| 283 | /// Returns the current value of the `Watch` if it is initialized, marking it as seen. | 407 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. |
| 408 | /// | ||
| 409 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 284 | pub async fn get(&mut self) -> T { | 410 | pub async fn get(&mut self) -> T { |
| 285 | poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await | 411 | poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)).await |
| 286 | } | 412 | } |
| 287 | 413 | ||
| 288 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | 414 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. |
| 289 | pub fn try_get(&mut self) -> Option<T> { | 415 | pub fn try_get(&mut self) -> Option<T> { |
| 290 | self.watch.inner_try_get(&mut self.at_id) | 416 | self.watch.try_get(&mut self.at_id) |
| 291 | } | 417 | } |
| 292 | 418 | ||
| 293 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | 419 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. |
| 420 | /// | ||
| 421 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 294 | pub async fn changed(&mut self) -> T { | 422 | pub async fn changed(&mut self) -> T { |
| 295 | poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await | 423 | poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await |
| 296 | } | 424 | } |
| 297 | 425 | ||
| 298 | /// Tries to get the new value of the watch without waiting, marking it as seen. | 426 | /// Tries to get the new value of the watch without waiting, marking it as seen. |
| 299 | pub fn try_changed(&mut self) -> Option<T> { | 427 | pub fn try_changed(&mut self) -> Option<T> { |
| 300 | self.watch.inner_try_changed(&mut self.at_id) | 428 | self.watch.try_changed(&mut self.at_id) |
| 301 | } | 429 | } |
| 302 | 430 | ||
| 303 | /// Checks if the `Watch` contains a value. If this returns true, | 431 | /// Checks if the `Watch` contains a value. If this returns true, |
| 304 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. | 432 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. |
| 305 | pub fn contains_value(&self) -> bool { | 433 | pub fn contains_value(&self) -> bool { |
| 306 | self.watch.inner_contains_value() | 434 | self.watch.contains_value() |
| 435 | } | ||
| 436 | } | ||
| 437 | |||
| 438 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> { | ||
| 439 | fn drop(&mut self) { | ||
| 440 | self.watch.drop_receiver(); | ||
| 307 | } | 441 | } |
| 308 | } | 442 | } |
| 309 | 443 | ||
| 310 | /// A receiver of a `Watch` channel. | 444 | /// A receiver of a `Watch` channel. |
| 311 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); | 445 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); |
| 312 | 446 | ||
| 313 | /// A receiver which holds a **reference** to a `Watch` channel. | 447 | impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { |
| 314 | /// | 448 | /// Converts the `Receiver` into a [`DynReceiver`]. |
| 315 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of | 449 | pub fn as_dyn(self) -> DynReceiver<'a, T> { |
| 316 | /// some runtime performance due to dynamic dispatch. | 450 | // We need to increment the receiver count since the original |
| 317 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); | 451 | // receiver is being dropped, which decrements the count. |
| 452 | self.watch.mutex.lock(|state| { | ||
| 453 | state.borrow_mut().receiver_count += 1; | ||
| 454 | }); | ||
| 455 | DynReceiver(Rcv::new(self.0.watch, self.at_id)) | ||
| 456 | } | ||
| 457 | } | ||
| 458 | |||
| 459 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> { | ||
| 460 | fn into(self) -> DynReceiver<'a, T> { | ||
| 461 | self.as_dyn() | ||
| 462 | } | ||
| 463 | } | ||
| 318 | 464 | ||
| 319 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | 465 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { |
| 320 | type Target = Rcv<'a, T, Watch<M, T, N>>; | 466 | type Target = Rcv<'a, T, Watch<M, T, N>>; |
| @@ -330,6 +476,12 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, | |||
| 330 | } | 476 | } |
| 331 | } | 477 | } |
| 332 | 478 | ||
| 479 | /// A receiver which holds a **dynamic** reference to a `Watch` channel. | ||
| 480 | /// | ||
| 481 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of | ||
| 482 | /// some runtime performance due to dynamic dispatch. | ||
| 483 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 484 | |||
| 333 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { | 485 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { |
| 334 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; | 486 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; |
| 335 | 487 | ||
| @@ -348,67 +500,94 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { | |||
| 348 | mod tests { | 500 | mod tests { |
| 349 | use futures_executor::block_on; | 501 | use futures_executor::block_on; |
| 350 | 502 | ||
| 351 | use super::*; | 503 | use super::Watch; |
| 352 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | 504 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; |
| 353 | 505 | ||
| 354 | #[test] | 506 | #[test] |
| 355 | fn multiple_writes() { | 507 | fn multiple_sends() { |
| 356 | let f = async { | 508 | let f = async { |
| 357 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 509 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); |
| 358 | 510 | ||
| 359 | // Obtain Receivers | 511 | // Obtain receiver and sender |
| 360 | let mut rcv0 = WATCH.receiver().unwrap(); | 512 | let mut rcv = WATCH.receiver().unwrap(); |
| 361 | let mut rcv1 = WATCH.dyn_receiver().unwrap(); | 513 | let snd = WATCH.sender(); |
| 362 | 514 | ||
| 363 | WATCH.write(10); | 515 | // Not initialized |
| 516 | assert_eq!(rcv.try_changed(), None); | ||
| 364 | 517 | ||
| 365 | // Receive the new value | 518 | // Receive the new value |
| 366 | assert_eq!(rcv0.changed().await, 10); | 519 | snd.send(10); |
| 367 | assert_eq!(rcv1.changed().await, 10); | 520 | assert_eq!(rcv.changed().await, 10); |
| 521 | |||
| 522 | // Receive another value | ||
| 523 | snd.send(20); | ||
| 524 | assert_eq!(rcv.try_changed(), Some(20)); | ||
| 368 | 525 | ||
| 369 | // No update | 526 | // No update |
| 370 | assert_eq!(rcv0.try_changed(), None); | 527 | assert_eq!(rcv.try_changed(), None); |
| 371 | assert_eq!(rcv1.try_changed(), None); | 528 | }; |
| 529 | block_on(f); | ||
| 530 | } | ||
| 372 | 531 | ||
| 373 | WATCH.write(20); | 532 | #[test] |
| 533 | fn receive_after_create() { | ||
| 534 | let f = async { | ||
| 535 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 536 | |||
| 537 | // Obtain sender and send value | ||
| 538 | let snd = WATCH.sender(); | ||
| 539 | snd.send(10); | ||
| 374 | 540 | ||
| 375 | assert_eq!(rcv0.changed().await, 20); | 541 | // Obtain receiver and receive value |
| 376 | assert_eq!(rcv1.changed().await, 20); | 542 | let mut rcv = WATCH.receiver().unwrap(); |
| 543 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 377 | }; | 544 | }; |
| 378 | block_on(f); | 545 | block_on(f); |
| 379 | } | 546 | } |
| 380 | 547 | ||
| 381 | #[test] | 548 | #[test] |
| 382 | fn max_receivers() { | 549 | fn max_receivers_drop() { |
| 383 | let f = async { | 550 | let f = async { |
| 384 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 551 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 385 | 552 | ||
| 386 | // Obtain Receivers | 553 | // Try to create 3 receivers (only 2 can exist at once) |
| 387 | let _ = WATCH.receiver().unwrap(); | 554 | let rcv0 = WATCH.receiver(); |
| 388 | let _ = WATCH.receiver().unwrap(); | 555 | let rcv1 = WATCH.receiver(); |
| 389 | assert!(WATCH.receiver().is_err()); | 556 | let rcv2 = WATCH.receiver(); |
| 557 | |||
| 558 | // Ensure the first two are successful and the third is not | ||
| 559 | assert!(rcv0.is_ok()); | ||
| 560 | assert!(rcv1.is_ok()); | ||
| 561 | assert!(rcv2.is_err()); | ||
| 562 | |||
| 563 | // Drop the first receiver | ||
| 564 | drop(rcv0); | ||
| 565 | |||
| 566 | // Create another receiver and ensure it is successful | ||
| 567 | let rcv3 = WATCH.receiver(); | ||
| 568 | assert!(rcv3.is_ok()); | ||
| 390 | }; | 569 | }; |
| 391 | block_on(f); | 570 | block_on(f); |
| 392 | } | 571 | } |
| 393 | 572 | ||
| 394 | #[test] | 573 | #[test] |
| 395 | fn receive_initial() { | 574 | fn multiple_receivers() { |
| 396 | let f = async { | 575 | let f = async { |
| 397 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 576 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 398 | 577 | ||
| 399 | // Obtain Receivers | 578 | // Obtain receivers and sender |
| 400 | let mut rcv0 = WATCH.receiver().unwrap(); | 579 | let mut rcv0 = WATCH.receiver().unwrap(); |
| 401 | let mut rcv1 = WATCH.receiver().unwrap(); | 580 | let mut rcv1 = WATCH.receiver().unwrap(); |
| 581 | let snd = WATCH.sender(); | ||
| 402 | 582 | ||
| 403 | assert_eq!(rcv0.contains_value(), false); | 583 | // No update for both |
| 404 | |||
| 405 | assert_eq!(rcv0.try_changed(), None); | 584 | assert_eq!(rcv0.try_changed(), None); |
| 406 | assert_eq!(rcv1.try_changed(), None); | 585 | assert_eq!(rcv1.try_changed(), None); |
| 407 | 586 | ||
| 408 | WATCH.write(0); | 587 | // Send a new value |
| 409 | 588 | snd.send(0); | |
| 410 | assert_eq!(rcv0.contains_value(), true); | ||
| 411 | 589 | ||
| 590 | // Both receivers receive the new value | ||
| 412 | assert_eq!(rcv0.try_changed(), Some(0)); | 591 | assert_eq!(rcv0.try_changed(), Some(0)); |
| 413 | assert_eq!(rcv1.try_changed(), Some(0)); | 592 | assert_eq!(rcv1.try_changed(), Some(0)); |
| 414 | }; | 593 | }; |
| @@ -416,99 +595,147 @@ mod tests { | |||
| 416 | } | 595 | } |
| 417 | 596 | ||
| 418 | #[test] | 597 | #[test] |
| 598 | fn clone_senders() { | ||
| 599 | let f = async { | ||
| 600 | // Obtain different ways to send | ||
| 601 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 602 | let snd0 = WATCH.sender(); | ||
| 603 | let snd1 = snd0.clone(); | ||
| 604 | |||
| 605 | // Obtain Receiver | ||
| 606 | let mut rcv = WATCH.receiver().unwrap().as_dyn(); | ||
| 607 | |||
| 608 | // Send a value from first sender | ||
| 609 | snd0.send(10); | ||
| 610 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 611 | |||
| 612 | // Send a value from second sender | ||
| 613 | snd1.send(20); | ||
| 614 | assert_eq!(rcv.try_changed(), Some(20)); | ||
| 615 | }; | ||
| 616 | block_on(f); | ||
| 617 | } | ||
| 618 | |||
| 619 | #[test] | ||
| 419 | fn peek_get_changed() { | 620 | fn peek_get_changed() { |
| 420 | let f = async { | 621 | let f = async { |
| 421 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 622 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 422 | 623 | ||
| 423 | // Obtain Receivers | 624 | // Obtain receiver and sender |
| 424 | let mut rcv0 = WATCH.receiver().unwrap(); | 625 | let mut rcv = WATCH.receiver().unwrap(); |
| 626 | let snd = WATCH.sender(); | ||
| 425 | 627 | ||
| 426 | WATCH.write(10); | 628 | // Send a value |
| 629 | snd.send(10); | ||
| 427 | 630 | ||
| 428 | // Ensure peek does not mark as seen | 631 | // Ensure peek does not mark as seen |
| 429 | assert_eq!(rcv0.peek().await, 10); | 632 | assert_eq!(rcv.peek().await, 10); |
| 430 | assert_eq!(rcv0.try_changed(), Some(10)); | 633 | assert_eq!(rcv.try_changed(), Some(10)); |
| 431 | assert_eq!(rcv0.try_changed(), None); | 634 | assert_eq!(rcv.try_changed(), None); |
| 432 | assert_eq!(rcv0.peek().await, 10); | 635 | assert_eq!(rcv.try_peek(), Some(10)); |
| 433 | 636 | ||
| 434 | WATCH.write(20); | 637 | // Send a value |
| 638 | snd.send(20); | ||
| 435 | 639 | ||
| 436 | // Ensure get does mark as seen | 640 | // Ensure get does mark as seen |
| 437 | assert_eq!(rcv0.get().await, 20); | 641 | assert_eq!(rcv.get().await, 20); |
| 438 | assert_eq!(rcv0.try_changed(), None); | 642 | assert_eq!(rcv.try_changed(), None); |
| 439 | assert_eq!(rcv0.try_get(), Some(20)); | 643 | assert_eq!(rcv.try_get(), Some(20)); |
| 440 | }; | 644 | }; |
| 441 | block_on(f); | 645 | block_on(f); |
| 442 | } | 646 | } |
| 443 | 647 | ||
| 444 | #[test] | 648 | #[test] |
| 445 | fn count_ids() { | 649 | fn use_dynamics() { |
| 446 | let f = async { | 650 | let f = async { |
| 447 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 651 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 448 | 652 | ||
| 449 | // Obtain Receivers | 653 | // Obtain receiver and sender |
| 450 | let mut rcv0 = WATCH.receiver().unwrap(); | 654 | let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); |
| 451 | let mut rcv1 = WATCH.receiver().unwrap(); | 655 | let dyn_snd = WATCH.dyn_sender(); |
| 452 | 656 | ||
| 453 | let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); | 657 | // Send a value |
| 658 | dyn_snd.send(10); | ||
| 454 | 659 | ||
| 455 | WATCH.write(10); | 660 | // Ensure the dynamic receiver receives the value |
| 456 | 661 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | |
| 457 | assert_eq!(rcv0.changed().await, 10); | 662 | assert_eq!(dyn_rcv.try_changed(), None); |
| 458 | assert_eq!(rcv1.changed().await, 10); | 663 | }; |
| 664 | block_on(f); | ||
| 665 | } | ||
| 459 | 666 | ||
| 460 | assert_eq!(rcv0.try_changed(), None); | 667 | #[test] |
| 461 | assert_eq!(rcv1.try_changed(), None); | 668 | fn convert_to_dyn() { |
| 669 | let f = async { | ||
| 670 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 462 | 671 | ||
| 463 | WATCH.write(20); | 672 | // Obtain receiver and sender |
| 464 | WATCH.write(20); | 673 | let rcv = WATCH.receiver().unwrap(); |
| 465 | WATCH.write(20); | 674 | let snd = WATCH.sender(); |
| 466 | 675 | ||
| 467 | assert_eq!(rcv0.changed().await, 20); | 676 | // Convert to dynamic |
| 468 | assert_eq!(rcv1.changed().await, 20); | 677 | let mut dyn_rcv = rcv.as_dyn(); |
| 678 | let dyn_snd = snd.as_dyn(); | ||
| 469 | 679 | ||
| 470 | assert_eq!(rcv0.try_changed(), None); | 680 | // Send a value |
| 471 | assert_eq!(rcv1.try_changed(), None); | 681 | dyn_snd.send(10); |
| 472 | 682 | ||
| 473 | assert_eq!(get_id(), 4); | 683 | // Ensure the dynamic receiver receives the value |
| 684 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | ||
| 685 | assert_eq!(dyn_rcv.try_changed(), None); | ||
| 474 | }; | 686 | }; |
| 475 | block_on(f); | 687 | block_on(f); |
| 476 | } | 688 | } |
| 477 | 689 | ||
| 478 | #[test] | 690 | #[test] |
| 479 | fn peek_still_await() { | 691 | fn dynamic_receiver_count() { |
| 480 | let f = async { | 692 | let f = async { |
| 481 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 693 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 482 | 694 | ||
| 483 | // Obtain Receivers | 695 | // Obtain receiver and sender |
| 484 | let mut rcv0 = WATCH.receiver().unwrap(); | 696 | let rcv0 = WATCH.receiver(); |
| 485 | let mut rcv1 = WATCH.receiver().unwrap(); | 697 | let rcv1 = WATCH.receiver(); |
| 698 | let rcv2 = WATCH.receiver(); | ||
| 486 | 699 | ||
| 487 | WATCH.write(10); | 700 | // Ensure the first two are successful and the third is not |
| 701 | assert!(rcv0.is_ok()); | ||
| 702 | assert!(rcv1.is_ok()); | ||
| 703 | assert!(rcv2.is_err()); | ||
| 488 | 704 | ||
| 489 | assert_eq!(rcv0.peek().await, 10); | 705 | // Convert to dynamic |
| 490 | assert_eq!(rcv1.try_peek(), Some(10)); | 706 | let dyn_rcv0 = rcv0.unwrap().as_dyn(); |
| 491 | 707 | ||
| 492 | assert_eq!(rcv0.changed().await, 10); | 708 | // Drop the (now dynamic) receiver |
| 493 | assert_eq!(rcv1.changed().await, 10); | 709 | drop(dyn_rcv0); |
| 710 | |||
| 711 | // Create another receiver and ensure it is successful | ||
| 712 | let rcv3 = WATCH.receiver(); | ||
| 713 | let rcv4 = WATCH.receiver(); | ||
| 714 | assert!(rcv3.is_ok()); | ||
| 715 | assert!(rcv4.is_err()); | ||
| 494 | }; | 716 | }; |
| 495 | block_on(f); | 717 | block_on(f); |
| 496 | } | 718 | } |
| 497 | 719 | ||
| 498 | #[test] | 720 | #[test] |
| 499 | fn peek_with_static() { | 721 | fn contains_value() { |
| 500 | let f = async { | 722 | let f = async { |
| 501 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | 723 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); |
| 502 | 724 | ||
| 503 | // Obtain Receivers | 725 | // Obtain receiver and sender |
| 504 | let rcv0 = WATCH.receiver().unwrap(); | 726 | let rcv = WATCH.receiver().unwrap(); |
| 505 | let rcv1 = WATCH.receiver().unwrap(); | 727 | let snd = WATCH.sender(); |
| 728 | |||
| 729 | // check if the watch contains a value | ||
| 730 | assert_eq!(rcv.contains_value(), false); | ||
| 731 | assert_eq!(snd.contains_value(), false); | ||
| 506 | 732 | ||
| 507 | WATCH.write(20); | 733 | // Send a value |
| 734 | snd.send(10); | ||
| 508 | 735 | ||
| 509 | assert_eq!(rcv0.peek().await, 20); | 736 | // check if the watch contains a value |
| 510 | assert_eq!(rcv1.peek().await, 20); | 737 | assert_eq!(rcv.contains_value(), true); |
| 511 | assert_eq!(WATCH.try_peek(), Some(20)); | 738 | assert_eq!(snd.contains_value(), true); |
| 512 | }; | 739 | }; |
| 513 | block_on(f); | 740 | block_on(f); |
| 514 | } | 741 | } |
