diff options
| author | Peter Krull <[email protected]> | 2024-02-28 20:59:38 +0100 |
|---|---|---|
| committer | Peter Krull <[email protected]> | 2024-02-28 20:59:38 +0100 |
| commit | 6defb4fed98432dee948634f3b2001cb4ea7ec5b (patch) | |
| tree | a51aeb731240e41cb106054131b45f8e643b3690 | |
| parent | 2f58d1968a7310335a0dac4d947c6972a7707ed5 (diff) | |
Changed name to `Watch`, added `DynReceiver`, `get`-method and more reworks.
| -rw-r--r-- | embassy-sync/src/lib.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/multi_signal.rs | 529 | ||||
| -rw-r--r-- | embassy-sync/src/watch.rs | 515 |
3 files changed, 516 insertions, 530 deletions
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index f02985564..8a69541a5 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -12,11 +12,11 @@ mod ring_buffer; | |||
| 12 | 12 | ||
| 13 | pub mod blocking_mutex; | 13 | pub mod blocking_mutex; |
| 14 | pub mod channel; | 14 | pub mod channel; |
| 15 | pub mod multi_signal; | ||
| 16 | pub mod mutex; | 15 | pub mod mutex; |
| 17 | pub mod pipe; | 16 | pub mod pipe; |
| 18 | pub mod priority_channel; | 17 | pub mod priority_channel; |
| 19 | pub mod pubsub; | 18 | pub mod pubsub; |
| 20 | pub mod signal; | 19 | pub mod signal; |
| 21 | pub mod waitqueue; | 20 | pub mod waitqueue; |
| 21 | pub mod watch; | ||
| 22 | pub mod zerocopy_channel; | 22 | pub mod zerocopy_channel; |
diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs deleted file mode 100644 index ff9f72f2e..000000000 --- a/embassy-sync/src/multi_signal.rs +++ /dev/null | |||
| @@ -1,529 +0,0 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** tasks. | ||
| 2 | use core::cell::RefCell; | ||
| 3 | use core::ops::{Deref, DerefMut}; | ||
| 4 | use core::pin::Pin; | ||
| 5 | use core::task::{Context, Poll}; | ||
| 6 | |||
| 7 | use futures_util::Future; | ||
| 8 | |||
| 9 | use crate::blocking_mutex::raw::RawMutex; | ||
| 10 | use crate::blocking_mutex::Mutex; | ||
| 11 | use crate::waitqueue::MultiWakerRegistration; | ||
| 12 | |||
| 13 | /// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. | ||
| 14 | /// | ||
| 15 | /// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to | ||
| 16 | /// `.await` the latest value, and all receive it. | ||
| 17 | /// | ||
| 18 | /// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except | ||
| 19 | /// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead | ||
| 20 | /// of waiting for the receivers to pop the previous value. | ||
| 21 | /// | ||
| 22 | /// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other | ||
| 23 | /// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for | ||
| 24 | /// [`Receiver`]s to "lose" stale values. | ||
| 25 | /// | ||
| 26 | /// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared | ||
| 27 | /// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`] | ||
| 28 | /// with [`MultiSignal::receiver`] which has async methods. | ||
| 29 | /// ``` | ||
| 30 | /// | ||
| 31 | /// use futures_executor::block_on; | ||
| 32 | /// use embassy_sync::multi_signal::MultiSignal; | ||
| 33 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 34 | /// | ||
| 35 | /// let f = async { | ||
| 36 | /// | ||
| 37 | /// static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 38 | /// | ||
| 39 | /// // Obtain Receivers | ||
| 40 | /// let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 41 | /// let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 42 | /// assert!(SOME_SIGNAL.receiver().is_err()); | ||
| 43 | /// | ||
| 44 | /// SOME_SIGNAL.write(10); | ||
| 45 | /// | ||
| 46 | /// // Receive the new value | ||
| 47 | /// assert_eq!(rcv0.changed().await, 10); | ||
| 48 | /// assert_eq!(rcv1.try_changed(), Some(10)); | ||
| 49 | /// | ||
| 50 | /// // No update | ||
| 51 | /// assert_eq!(rcv0.try_changed(), None); | ||
| 52 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 53 | /// | ||
| 54 | /// SOME_SIGNAL.write(20); | ||
| 55 | /// | ||
| 56 | /// // Receive new value with predicate | ||
| 57 | /// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20); | ||
| 58 | /// assert_eq!(rcv1.try_changed_and(|x|x>&30), None); | ||
| 59 | /// | ||
| 60 | /// // Anyone can peek the current value | ||
| 61 | /// assert_eq!(rcv0.peek(), 20); | ||
| 62 | /// assert_eq!(rcv1.peek(), 20); | ||
| 63 | /// assert_eq!(SOME_SIGNAL.peek(), 20); | ||
| 64 | /// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None); | ||
| 65 | /// }; | ||
| 66 | /// block_on(f); | ||
| 67 | /// ``` | ||
| 68 | pub struct MultiSignal<M: RawMutex, T: Clone, const N: usize> { | ||
| 69 | mutex: Mutex<M, RefCell<MultiSignalState<N, T>>>, | ||
| 70 | } | ||
| 71 | |||
| 72 | struct MultiSignalState<const N: usize, T: Clone> { | ||
| 73 | data: T, | ||
| 74 | current_id: u64, | ||
| 75 | wakers: MultiWakerRegistration<N>, | ||
| 76 | receiver_count: usize, | ||
| 77 | } | ||
| 78 | |||
| 79 | #[derive(Debug)] | ||
| 80 | /// An error that can occur when a `MultiSignal` returns a `Result`. | ||
| 81 | pub enum Error { | ||
| 82 | /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached. | ||
| 83 | MaximumReceiversReached, | ||
| 84 | } | ||
| 85 | |||
| 86 | impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<M, T, N> { | ||
| 87 | /// Create a new `MultiSignal` initialized with the given value. | ||
| 88 | pub const fn new(init: T) -> Self { | ||
| 89 | Self { | ||
| 90 | mutex: Mutex::new(RefCell::new(MultiSignalState { | ||
| 91 | data: init, | ||
| 92 | current_id: 1, | ||
| 93 | wakers: MultiWakerRegistration::new(), | ||
| 94 | receiver_count: 0, | ||
| 95 | })), | ||
| 96 | } | ||
| 97 | } | ||
| 98 | |||
| 99 | /// Get a [`Receiver`] for the `MultiSignal`. | ||
| 100 | pub fn receiver<'s>(&'a self) -> Result<Receiver<'a, M, T, N>, Error> { | ||
| 101 | self.mutex.lock(|state| { | ||
| 102 | let mut s = state.borrow_mut(); | ||
| 103 | if s.receiver_count < N { | ||
| 104 | s.receiver_count += 1; | ||
| 105 | Ok(Receiver(Rcv::new(self))) | ||
| 106 | } else { | ||
| 107 | Err(Error::MaximumReceiversReached) | ||
| 108 | } | ||
| 109 | }) | ||
| 110 | } | ||
| 111 | |||
| 112 | /// Update the value of the `MultiSignal`. | ||
| 113 | pub fn write(&self, data: T) { | ||
| 114 | self.mutex.lock(|state| { | ||
| 115 | let mut s = state.borrow_mut(); | ||
| 116 | s.data = data; | ||
| 117 | s.current_id += 1; | ||
| 118 | s.wakers.wake(); | ||
| 119 | }) | ||
| 120 | } | ||
| 121 | |||
| 122 | /// Peek the current value of the `MultiSignal`. | ||
| 123 | pub fn peek(&self) -> T { | ||
| 124 | self.mutex.lock(|state| state.borrow().data.clone()) | ||
| 125 | } | ||
| 126 | |||
| 127 | /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. | ||
| 128 | pub fn peek_and(&self, mut f: impl FnMut(&T) -> bool) -> Option<T> { | ||
| 129 | self.mutex.lock(|state| { | ||
| 130 | let s = state.borrow(); | ||
| 131 | if f(&s.data) { | ||
| 132 | Some(s.data.clone()) | ||
| 133 | } else { | ||
| 134 | None | ||
| 135 | } | ||
| 136 | }) | ||
| 137 | } | ||
| 138 | |||
| 139 | /// Get the ID of the current value of the `MultiSignal`. | ||
| 140 | /// This method is mostly for testing purposes. | ||
| 141 | #[allow(dead_code)] | ||
| 142 | fn get_id(&self) -> u64 { | ||
| 143 | self.mutex.lock(|state| state.borrow().current_id) | ||
| 144 | } | ||
| 145 | } | ||
| 146 | |||
| 147 | /// A receiver is able to `.await` a changed `MultiSignal` value. | ||
| 148 | pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { | ||
| 149 | multi_sig: &'a MultiSignal<M, T, N>, | ||
| 150 | at_id: u64, | ||
| 151 | } | ||
| 152 | |||
| 153 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { | ||
| 154 | /// Create a new `Receiver` with a reference the given `MultiSignal`. | ||
| 155 | fn new(multi_sig: &'a MultiSignal<M, T, N>) -> Self { | ||
| 156 | Self { multi_sig, at_id: 0 } | ||
| 157 | } | ||
| 158 | |||
| 159 | /// Wait for a change to the value of the corresponding `MultiSignal`. | ||
| 160 | pub async fn changed(&mut self) -> T { | ||
| 161 | ReceiverWaitFuture { subscriber: self }.await | ||
| 162 | } | ||
| 163 | |||
| 164 | /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. | ||
| 165 | pub async fn changed_and<F>(&mut self, f: F) -> T | ||
| 166 | where | ||
| 167 | F: FnMut(&T) -> bool, | ||
| 168 | { | ||
| 169 | ReceiverPredFuture { | ||
| 170 | subscriber: self, | ||
| 171 | predicate: f, | ||
| 172 | } | ||
| 173 | .await | ||
| 174 | } | ||
| 175 | |||
| 176 | /// Try to get a changed value of the corresponding `MultiSignal`. | ||
| 177 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 178 | self.multi_sig.mutex.lock(|state| { | ||
| 179 | let s = state.borrow(); | ||
| 180 | match s.current_id > self.at_id { | ||
| 181 | true => { | ||
| 182 | self.at_id = s.current_id; | ||
| 183 | Some(s.data.clone()) | ||
| 184 | } | ||
| 185 | false => None, | ||
| 186 | } | ||
| 187 | }) | ||
| 188 | } | ||
| 189 | |||
| 190 | /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. | ||
| 191 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 192 | where | ||
| 193 | F: FnMut(&T) -> bool, | ||
| 194 | { | ||
| 195 | self.multi_sig.mutex.lock(|state| { | ||
| 196 | let s = state.borrow(); | ||
| 197 | match s.current_id > self.at_id && f(&s.data) { | ||
| 198 | true => { | ||
| 199 | self.at_id = s.current_id; | ||
| 200 | Some(s.data.clone()) | ||
| 201 | } | ||
| 202 | false => None, | ||
| 203 | } | ||
| 204 | }) | ||
| 205 | } | ||
| 206 | |||
| 207 | /// Peek the current value of the corresponding `MultiSignal`. | ||
| 208 | pub fn peek(&self) -> T { | ||
| 209 | self.multi_sig.peek() | ||
| 210 | } | ||
| 211 | |||
| 212 | /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. | ||
| 213 | pub fn peek_and<F>(&self, f: F) -> Option<T> | ||
| 214 | where | ||
| 215 | F: FnMut(&T) -> bool, | ||
| 216 | { | ||
| 217 | self.multi_sig.peek_and(f) | ||
| 218 | } | ||
| 219 | |||
| 220 | /// Check if the value of the corresponding `MultiSignal` has changed. | ||
| 221 | pub fn has_changed(&mut self) -> bool { | ||
| 222 | self.multi_sig | ||
| 223 | .mutex | ||
| 224 | .lock(|state| state.borrow().current_id > self.at_id) | ||
| 225 | } | ||
| 226 | } | ||
| 227 | |||
| 228 | /// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. | ||
| 229 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); | ||
| 230 | |||
| 231 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | ||
| 232 | type Target = Rcv<'a, M, T, N>; | ||
| 233 | |||
| 234 | fn deref(&self) -> &Self::Target { | ||
| 235 | &self.0 | ||
| 236 | } | ||
| 237 | } | ||
| 238 | |||
| 239 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { | ||
| 240 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 241 | &mut self.0 | ||
| 242 | } | ||
| 243 | } | ||
| 244 | |||
| 245 | /// Future for the `Receiver` wait action | ||
| 246 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 247 | pub struct ReceiverWaitFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { | ||
| 248 | subscriber: &'s mut Rcv<'a, M, T, N>, | ||
| 249 | } | ||
| 250 | |||
| 251 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverWaitFuture<'s, 'a, M, T, N> {} | ||
| 252 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverWaitFuture<'s, 'a, M, T, N> { | ||
| 253 | type Output = T; | ||
| 254 | |||
| 255 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 256 | self.get_with_context(Some(cx)) | ||
| 257 | } | ||
| 258 | } | ||
| 259 | |||
| 260 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> ReceiverWaitFuture<'s, 'a, M, T, N> { | ||
| 261 | /// Poll the `MultiSignal` with an optional context. | ||
| 262 | fn get_with_context(&mut self, cx: Option<&mut Context>) -> Poll<T> { | ||
| 263 | self.subscriber.multi_sig.mutex.lock(|state| { | ||
| 264 | let mut s = state.borrow_mut(); | ||
| 265 | match s.current_id > self.subscriber.at_id { | ||
| 266 | true => { | ||
| 267 | self.subscriber.at_id = s.current_id; | ||
| 268 | Poll::Ready(s.data.clone()) | ||
| 269 | } | ||
| 270 | _ => { | ||
| 271 | if let Some(cx) = cx { | ||
| 272 | s.wakers.register(cx.waker()); | ||
| 273 | } | ||
| 274 | Poll::Pending | ||
| 275 | } | ||
| 276 | } | ||
| 277 | }) | ||
| 278 | } | ||
| 279 | } | ||
| 280 | |||
| 281 | /// Future for the `Receiver` wait action, with the ability to filter the value with a predicate. | ||
| 282 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 283 | pub struct ReceiverPredFuture<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&'a T) -> bool, const N: usize> { | ||
| 284 | subscriber: &'s mut Rcv<'a, M, T, N>, | ||
| 285 | predicate: F, | ||
| 286 | } | ||
| 287 | |||
| 288 | impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin | ||
| 289 | for ReceiverPredFuture<'s, 'a, M, T, F, N> | ||
| 290 | { | ||
| 291 | } | ||
| 292 | impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future | ||
| 293 | for ReceiverPredFuture<'s, 'a, M, T, F, N> | ||
| 294 | { | ||
| 295 | type Output = T; | ||
| 296 | |||
| 297 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 298 | self.get_with_context_pred(Some(cx)) | ||
| 299 | } | ||
| 300 | } | ||
| 301 | |||
| 302 | impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> ReceiverPredFuture<'s, 'a, M, T, F, N> { | ||
| 303 | /// Poll the `MultiSignal` with an optional context. | ||
| 304 | fn get_with_context_pred(&mut self, cx: Option<&mut Context>) -> Poll<T> { | ||
| 305 | self.subscriber.multi_sig.mutex.lock(|state| { | ||
| 306 | let mut s = state.borrow_mut(); | ||
| 307 | match s.current_id > self.subscriber.at_id { | ||
| 308 | true if (self.predicate)(&s.data) => { | ||
| 309 | self.subscriber.at_id = s.current_id; | ||
| 310 | Poll::Ready(s.data.clone()) | ||
| 311 | } | ||
| 312 | _ => { | ||
| 313 | if let Some(cx) = cx { | ||
| 314 | s.wakers.register(cx.waker()); | ||
| 315 | } | ||
| 316 | Poll::Pending | ||
| 317 | } | ||
| 318 | } | ||
| 319 | }) | ||
| 320 | } | ||
| 321 | } | ||
| 322 | |||
| 323 | #[cfg(test)] | ||
| 324 | mod tests { | ||
| 325 | use futures_executor::block_on; | ||
| 326 | |||
| 327 | use super::*; | ||
| 328 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 329 | |||
| 330 | #[test] | ||
| 331 | fn multiple_writes() { | ||
| 332 | let f = async { | ||
| 333 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 334 | |||
| 335 | // Obtain Receivers | ||
| 336 | let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 337 | let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 338 | |||
| 339 | SOME_SIGNAL.write(10); | ||
| 340 | |||
| 341 | // Receive the new value | ||
| 342 | assert_eq!(rcv0.changed().await, 10); | ||
| 343 | assert_eq!(rcv1.changed().await, 10); | ||
| 344 | |||
| 345 | // No update | ||
| 346 | assert_eq!(rcv0.try_changed(), None); | ||
| 347 | assert_eq!(rcv1.try_changed(), None); | ||
| 348 | |||
| 349 | SOME_SIGNAL.write(20); | ||
| 350 | |||
| 351 | assert_eq!(rcv0.changed().await, 20); | ||
| 352 | assert_eq!(rcv1.changed().await, 20); | ||
| 353 | }; | ||
| 354 | block_on(f); | ||
| 355 | } | ||
| 356 | |||
| 357 | #[test] | ||
| 358 | fn max_receivers() { | ||
| 359 | let f = async { | ||
| 360 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 361 | |||
| 362 | // Obtain Receivers | ||
| 363 | let _ = SOME_SIGNAL.receiver().unwrap(); | ||
| 364 | let _ = SOME_SIGNAL.receiver().unwrap(); | ||
| 365 | assert!(SOME_SIGNAL.receiver().is_err()); | ||
| 366 | }; | ||
| 367 | block_on(f); | ||
| 368 | } | ||
| 369 | |||
| 370 | // Really weird edge case, but it's possible to have a receiver that never gets a value. | ||
| 371 | #[test] | ||
| 372 | fn receive_initial() { | ||
| 373 | let f = async { | ||
| 374 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 375 | |||
| 376 | // Obtain Receivers | ||
| 377 | let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 378 | let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 379 | |||
| 380 | assert_eq!(rcv0.try_changed(), Some(0)); | ||
| 381 | assert_eq!(rcv1.try_changed(), Some(0)); | ||
| 382 | |||
| 383 | assert_eq!(rcv0.try_changed(), None); | ||
| 384 | assert_eq!(rcv1.try_changed(), None); | ||
| 385 | }; | ||
| 386 | block_on(f); | ||
| 387 | } | ||
| 388 | |||
| 389 | #[test] | ||
| 390 | fn count_ids() { | ||
| 391 | let f = async { | ||
| 392 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 393 | |||
| 394 | // Obtain Receivers | ||
| 395 | let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 396 | let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 397 | |||
| 398 | SOME_SIGNAL.write(10); | ||
| 399 | |||
| 400 | assert_eq!(rcv0.changed().await, 10); | ||
| 401 | assert_eq!(rcv1.changed().await, 10); | ||
| 402 | |||
| 403 | assert_eq!(rcv0.try_changed(), None); | ||
| 404 | assert_eq!(rcv1.try_changed(), None); | ||
| 405 | |||
| 406 | SOME_SIGNAL.write(20); | ||
| 407 | SOME_SIGNAL.write(20); | ||
| 408 | SOME_SIGNAL.write(20); | ||
| 409 | |||
| 410 | assert_eq!(rcv0.changed().await, 20); | ||
| 411 | assert_eq!(rcv1.changed().await, 20); | ||
| 412 | |||
| 413 | assert_eq!(rcv0.try_changed(), None); | ||
| 414 | assert_eq!(rcv1.try_changed(), None); | ||
| 415 | |||
| 416 | assert_eq!(SOME_SIGNAL.get_id(), 5); | ||
| 417 | }; | ||
| 418 | block_on(f); | ||
| 419 | } | ||
| 420 | |||
| 421 | #[test] | ||
| 422 | fn peek_still_await() { | ||
| 423 | let f = async { | ||
| 424 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 425 | |||
| 426 | // Obtain Receivers | ||
| 427 | let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 428 | let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 429 | |||
| 430 | SOME_SIGNAL.write(10); | ||
| 431 | |||
| 432 | assert_eq!(rcv0.peek(), 10); | ||
| 433 | assert_eq!(rcv1.peek(), 10); | ||
| 434 | |||
| 435 | assert_eq!(rcv0.changed().await, 10); | ||
| 436 | assert_eq!(rcv1.changed().await, 10); | ||
| 437 | }; | ||
| 438 | block_on(f); | ||
| 439 | } | ||
| 440 | |||
| 441 | #[test] | ||
| 442 | fn predicate() { | ||
| 443 | let f = async { | ||
| 444 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 445 | |||
| 446 | // Obtain Receivers | ||
| 447 | let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 448 | let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 449 | |||
| 450 | SOME_SIGNAL.write(20); | ||
| 451 | |||
| 452 | assert_eq!(rcv0.changed_and(|x| x > &10).await, 20); | ||
| 453 | assert_eq!(rcv1.try_changed_and(|x| x > &30), None); | ||
| 454 | }; | ||
| 455 | block_on(f); | ||
| 456 | } | ||
| 457 | |||
| 458 | #[test] | ||
| 459 | fn mutable_predicate() { | ||
| 460 | let f = async { | ||
| 461 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 462 | |||
| 463 | // Obtain Receivers | ||
| 464 | let mut rcv = SOME_SIGNAL.receiver().unwrap(); | ||
| 465 | |||
| 466 | SOME_SIGNAL.write(10); | ||
| 467 | |||
| 468 | let mut largest = 0; | ||
| 469 | let mut predicate = |x: &u8| { | ||
| 470 | if *x > largest { | ||
| 471 | largest = *x; | ||
| 472 | } | ||
| 473 | true | ||
| 474 | }; | ||
| 475 | |||
| 476 | assert_eq!(rcv.changed_and(&mut predicate).await, 10); | ||
| 477 | |||
| 478 | SOME_SIGNAL.write(20); | ||
| 479 | |||
| 480 | assert_eq!(rcv.changed_and(&mut predicate).await, 20); | ||
| 481 | |||
| 482 | SOME_SIGNAL.write(5); | ||
| 483 | |||
| 484 | assert_eq!(rcv.changed_and(&mut predicate).await, 5); | ||
| 485 | |||
| 486 | assert_eq!(largest, 20) | ||
| 487 | }; | ||
| 488 | block_on(f); | ||
| 489 | } | ||
| 490 | |||
| 491 | #[test] | ||
| 492 | fn peek_and() { | ||
| 493 | let f = async { | ||
| 494 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 495 | |||
| 496 | // Obtain Receivers | ||
| 497 | let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 498 | let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 499 | |||
| 500 | SOME_SIGNAL.write(20); | ||
| 501 | |||
| 502 | assert_eq!(rcv0.peek_and(|x| x > &10), Some(20)); | ||
| 503 | assert_eq!(rcv1.peek_and(|x| x > &30), None); | ||
| 504 | |||
| 505 | assert_eq!(rcv0.changed().await, 20); | ||
| 506 | assert_eq!(rcv1.changed().await, 20); | ||
| 507 | }; | ||
| 508 | block_on(f); | ||
| 509 | } | ||
| 510 | |||
| 511 | #[test] | ||
| 512 | fn peek_with_static() { | ||
| 513 | let f = async { | ||
| 514 | static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 515 | |||
| 516 | // Obtain Receivers | ||
| 517 | let rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 518 | let rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 519 | |||
| 520 | SOME_SIGNAL.write(20); | ||
| 521 | |||
| 522 | assert_eq!(rcv0.peek(), 20); | ||
| 523 | assert_eq!(rcv1.peek(), 20); | ||
| 524 | assert_eq!(SOME_SIGNAL.peek(), 20); | ||
| 525 | assert_eq!(SOME_SIGNAL.peek_and(|x| x > &30), None); | ||
| 526 | }; | ||
| 527 | block_on(f); | ||
| 528 | } | ||
| 529 | } | ||
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs new file mode 100644 index 000000000..7e5e92741 --- /dev/null +++ b/embassy-sync/src/watch.rs | |||
| @@ -0,0 +1,515 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** tasks. | ||
| 2 | |||
| 3 | use core::cell::RefCell; | ||
| 4 | use core::future::poll_fn; | ||
| 5 | use core::marker::PhantomData; | ||
| 6 | use core::ops::{Deref, DerefMut}; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use crate::blocking_mutex::raw::RawMutex; | ||
| 10 | use crate::blocking_mutex::Mutex; | ||
| 11 | use crate::waitqueue::MultiWakerRegistration; | ||
| 12 | |||
| 13 | /// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. | ||
| 14 | /// | ||
| 15 | /// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to | ||
| 16 | /// `.await` the latest value, and all receive it. | ||
| 17 | /// | ||
| 18 | /// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except | ||
| 19 | /// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead | ||
| 20 | /// of waiting for the receivers to pop the previous value. | ||
| 21 | /// | ||
| 22 | /// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other | ||
| 23 | /// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for | ||
| 24 | /// [`Receiver`]s to "lose" stale values. | ||
| 25 | /// | ||
| 26 | /// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared | ||
| 27 | /// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`] | ||
| 28 | /// with [`Watch::receiver`] which has async methods. | ||
| 29 | /// ``` | ||
| 30 | /// | ||
| 31 | /// use futures_executor::block_on; | ||
| 32 | /// use embassy_sync::watch::Watch; | ||
| 33 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 34 | /// | ||
| 35 | /// let f = async { | ||
| 36 | /// | ||
| 37 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 38 | /// | ||
| 39 | /// // Obtain Receivers | ||
| 40 | /// let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 41 | /// let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 42 | /// assert!(WATCH.receiver().is_err()); | ||
| 43 | /// | ||
| 44 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 45 | /// | ||
| 46 | /// WATCH.write(10); | ||
| 47 | /// assert_eq!(WATCH.try_peek(), Some(10)); | ||
| 48 | /// | ||
| 49 | /// | ||
| 50 | /// // Receive the new value | ||
| 51 | /// assert_eq!(rcv0.changed().await, 10); | ||
| 52 | /// assert_eq!(rcv1.try_changed(), Some(10)); | ||
| 53 | /// | ||
| 54 | /// // No update | ||
| 55 | /// assert_eq!(rcv0.try_changed(), None); | ||
| 56 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 57 | /// | ||
| 58 | /// WATCH.write(20); | ||
| 59 | /// | ||
| 60 | /// // Defference `between` peek `get`. | ||
| 61 | /// assert_eq!(rcv0.peek().await, 20); | ||
| 62 | /// assert_eq!(rcv1.get().await, 20); | ||
| 63 | /// | ||
| 64 | /// assert_eq!(rcv0.try_changed(), Some(20)); | ||
| 65 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 66 | /// | ||
| 67 | /// }; | ||
| 68 | /// block_on(f); | ||
| 69 | /// ``` | ||
| 70 | pub struct Watch<M: RawMutex, T: Clone, const N: usize> { | ||
| 71 | mutex: Mutex<M, RefCell<WatchState<N, T>>>, | ||
| 72 | } | ||
| 73 | |||
| 74 | struct WatchState<const N: usize, T: Clone> { | ||
| 75 | data: Option<T>, | ||
| 76 | current_id: u64, | ||
| 77 | wakers: MultiWakerRegistration<N>, | ||
| 78 | receiver_count: usize, | ||
| 79 | } | ||
| 80 | |||
| 81 | /// A trait representing the 'inner' behavior of the `Watch`. | ||
| 82 | pub trait WatchBehavior<T: Clone> { | ||
| 83 | /// Poll the `Watch` for the current value, **without** making it as seen. | ||
| 84 | fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>; | ||
| 85 | |||
| 86 | /// Tries to peek the value of the `Watch`, **without** marking it as seen. | ||
| 87 | fn inner_try_peek(&self) -> Option<T>; | ||
| 88 | |||
| 89 | /// Poll the `Watch` for the current value, making it as seen. | ||
| 90 | fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 91 | |||
| 92 | /// Tries to get the value of the `Watch`, marking it as seen. | ||
| 93 | fn inner_try_get(&self, id: &mut u64) -> Option<T>; | ||
| 94 | |||
| 95 | /// Poll the `Watch` for a changed value, marking it as seen. | ||
| 96 | fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 97 | |||
| 98 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | ||
| 99 | fn inner_try_changed(&self, id: &mut u64) -> Option<T>; | ||
| 100 | |||
| 101 | /// Checks if the `Watch` is been initialized with a value. | ||
| 102 | fn inner_contains_value(&self) -> bool; | ||
| 103 | } | ||
| 104 | |||
| 105 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { | ||
| 106 | fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 107 | self.mutex.lock(|state| { | ||
| 108 | let mut s = state.borrow_mut(); | ||
| 109 | match &s.data { | ||
| 110 | Some(data) => Poll::Ready(data.clone()), | ||
| 111 | None => { | ||
| 112 | s.wakers.register(cx.waker()); | ||
| 113 | Poll::Pending | ||
| 114 | } | ||
| 115 | } | ||
| 116 | }) | ||
| 117 | } | ||
| 118 | |||
| 119 | fn inner_try_peek(&self) -> Option<T> { | ||
| 120 | self.mutex.lock(|state| state.borrow().data.clone()) | ||
| 121 | } | ||
| 122 | |||
| 123 | fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 124 | self.mutex.lock(|state| { | ||
| 125 | let mut s = state.borrow_mut(); | ||
| 126 | match &s.data { | ||
| 127 | Some(data) => { | ||
| 128 | *id = s.current_id; | ||
| 129 | Poll::Ready(data.clone()) | ||
| 130 | } | ||
| 131 | None => { | ||
| 132 | s.wakers.register(cx.waker()); | ||
| 133 | Poll::Pending | ||
| 134 | } | ||
| 135 | } | ||
| 136 | }) | ||
| 137 | } | ||
| 138 | |||
| 139 | fn inner_try_get(&self, id: &mut u64) -> Option<T> { | ||
| 140 | self.mutex.lock(|state| { | ||
| 141 | let s = state.borrow(); | ||
| 142 | *id = s.current_id; | ||
| 143 | state.borrow().data.clone() | ||
| 144 | }) | ||
| 145 | } | ||
| 146 | |||
| 147 | fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 148 | self.mutex.lock(|state| { | ||
| 149 | let mut s = state.borrow_mut(); | ||
| 150 | match (&s.data, s.current_id > *id) { | ||
| 151 | (Some(data), true) => { | ||
| 152 | *id = s.current_id; | ||
| 153 | Poll::Ready(data.clone()) | ||
| 154 | } | ||
| 155 | _ => { | ||
| 156 | s.wakers.register(cx.waker()); | ||
| 157 | Poll::Pending | ||
| 158 | } | ||
| 159 | } | ||
| 160 | }) | ||
| 161 | } | ||
| 162 | |||
| 163 | fn inner_try_changed(&self, id: &mut u64) -> Option<T> { | ||
| 164 | self.mutex.lock(|state| { | ||
| 165 | let s = state.borrow(); | ||
| 166 | match s.current_id > *id { | ||
| 167 | true => { | ||
| 168 | *id = s.current_id; | ||
| 169 | state.borrow().data.clone() | ||
| 170 | } | ||
| 171 | false => None, | ||
| 172 | } | ||
| 173 | }) | ||
| 174 | } | ||
| 175 | |||
| 176 | fn inner_contains_value(&self) -> bool { | ||
| 177 | self.mutex.lock(|state| state.borrow().data.is_some()) | ||
| 178 | } | ||
| 179 | } | ||
| 180 | |||
| 181 | #[derive(Debug)] | ||
| 182 | /// An error that can occur when a `Watch` returns a `Result::Err(_)`. | ||
| 183 | pub enum Error { | ||
| 184 | /// The maximum number of [`Receiver`](crate::watch::Receiver)/[`DynReceiver`](crate::watch::DynReceiver) has been reached. | ||
| 185 | MaximumReceiversReached, | ||
| 186 | } | ||
| 187 | |||
| 188 | impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | ||
| 189 | /// Create a new `Watch` channel. | ||
| 190 | pub const fn new() -> Self { | ||
| 191 | Self { | ||
| 192 | mutex: Mutex::new(RefCell::new(WatchState { | ||
| 193 | data: None, | ||
| 194 | current_id: 0, | ||
| 195 | wakers: MultiWakerRegistration::new(), | ||
| 196 | receiver_count: 0, | ||
| 197 | })), | ||
| 198 | } | ||
| 199 | } | ||
| 200 | |||
| 201 | /// Write a new value to the `Watch`. | ||
| 202 | pub fn write(&self, val: T) { | ||
| 203 | self.mutex.lock(|state| { | ||
| 204 | let mut s = state.borrow_mut(); | ||
| 205 | s.data = Some(val); | ||
| 206 | s.current_id += 1; | ||
| 207 | s.wakers.wake(); | ||
| 208 | }) | ||
| 209 | } | ||
| 210 | |||
| 211 | /// Create a new [`Receiver`] for the `Watch`. | ||
| 212 | pub fn receiver(&self) -> Result<Receiver<'_, M, T, N>, Error> { | ||
| 213 | self.mutex.lock(|state| { | ||
| 214 | let mut s = state.borrow_mut(); | ||
| 215 | if s.receiver_count < N { | ||
| 216 | s.receiver_count += 1; | ||
| 217 | Ok(Receiver(Rcv::new(self))) | ||
| 218 | } else { | ||
| 219 | Err(Error::MaximumReceiversReached) | ||
| 220 | } | ||
| 221 | }) | ||
| 222 | } | ||
| 223 | |||
| 224 | /// Create a new [`DynReceiver`] for the `Watch`. | ||
| 225 | pub fn dyn_receiver(&self) -> Result<DynReceiver<'_, T>, Error> { | ||
| 226 | self.mutex.lock(|state| { | ||
| 227 | let mut s = state.borrow_mut(); | ||
| 228 | if s.receiver_count < N { | ||
| 229 | s.receiver_count += 1; | ||
| 230 | Ok(DynReceiver(Rcv::new(self))) | ||
| 231 | } else { | ||
| 232 | Err(Error::MaximumReceiversReached) | ||
| 233 | } | ||
| 234 | }) | ||
| 235 | } | ||
| 236 | |||
| 237 | /// Tries to retrieve the value of the `Watch`. | ||
| 238 | pub fn try_peek(&self) -> Option<T> { | ||
| 239 | self.inner_try_peek() | ||
| 240 | } | ||
| 241 | |||
| 242 | /// Returns true if the `Watch` contains a value. | ||
| 243 | pub fn contains_value(&self) -> bool { | ||
| 244 | self.inner_contains_value() | ||
| 245 | } | ||
| 246 | |||
| 247 | /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. | ||
| 248 | pub fn clear(&self) { | ||
| 249 | self.mutex.lock(|state| { | ||
| 250 | let mut s = state.borrow_mut(); | ||
| 251 | s.data = None; | ||
| 252 | }) | ||
| 253 | } | ||
| 254 | } | ||
| 255 | |||
| 256 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 257 | pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 258 | watch: &'a W, | ||
| 259 | at_id: u64, | ||
| 260 | _phantom: PhantomData<T>, | ||
| 261 | } | ||
| 262 | |||
| 263 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | ||
| 264 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 265 | fn new(watch: &'a W) -> Self { | ||
| 266 | Self { | ||
| 267 | watch, | ||
| 268 | at_id: 0, | ||
| 269 | _phantom: PhantomData, | ||
| 270 | } | ||
| 271 | } | ||
| 272 | |||
| 273 | /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. | ||
| 274 | pub async fn peek(&self) -> T { | ||
| 275 | poll_fn(|cx| self.watch.inner_poll_peek(cx)).await | ||
| 276 | } | ||
| 277 | |||
| 278 | /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. | ||
| 279 | pub fn try_peek(&self) -> Option<T> { | ||
| 280 | self.watch.inner_try_peek() | ||
| 281 | } | ||
| 282 | |||
| 283 | /// Returns the current value of the `Watch` if it is initialized, marking it as seen. | ||
| 284 | pub async fn get(&mut self) -> T { | ||
| 285 | poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await | ||
| 286 | } | ||
| 287 | |||
| 288 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 289 | pub fn try_get(&mut self) -> Option<T> { | ||
| 290 | self.watch.inner_try_get(&mut self.at_id) | ||
| 291 | } | ||
| 292 | |||
| 293 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | ||
| 294 | pub async fn changed(&mut self) -> T { | ||
| 295 | poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await | ||
| 296 | } | ||
| 297 | |||
| 298 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 299 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 300 | self.watch.inner_try_changed(&mut self.at_id) | ||
| 301 | } | ||
| 302 | |||
| 303 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 304 | /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. | ||
| 305 | pub fn contains_value(&self) -> bool { | ||
| 306 | self.watch.inner_contains_value() | ||
| 307 | } | ||
| 308 | } | ||
| 309 | |||
| 310 | /// A receiver of a `Watch` channel. | ||
| 311 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); | ||
| 312 | |||
| 313 | /// A receiver which holds a **reference** to a `Watch` channel. | ||
| 314 | /// | ||
| 315 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of | ||
| 316 | /// some runtime performance due to dynamic dispatch. | ||
| 317 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 318 | |||
| 319 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | ||
| 320 | type Target = Rcv<'a, T, Watch<M, T, N>>; | ||
| 321 | |||
| 322 | fn deref(&self) -> &Self::Target { | ||
| 323 | &self.0 | ||
| 324 | } | ||
| 325 | } | ||
| 326 | |||
| 327 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { | ||
| 328 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 329 | &mut self.0 | ||
| 330 | } | ||
| 331 | } | ||
| 332 | |||
| 333 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { | ||
| 334 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 335 | |||
| 336 | fn deref(&self) -> &Self::Target { | ||
| 337 | &self.0 | ||
| 338 | } | ||
| 339 | } | ||
| 340 | |||
| 341 | impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { | ||
| 342 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 343 | &mut self.0 | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | #[cfg(test)] | ||
| 348 | mod tests { | ||
| 349 | use futures_executor::block_on; | ||
| 350 | |||
| 351 | use super::*; | ||
| 352 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 353 | |||
| 354 | #[test] | ||
| 355 | fn multiple_writes() { | ||
| 356 | let f = async { | ||
| 357 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 358 | |||
| 359 | // Obtain Receivers | ||
| 360 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 361 | let mut rcv1 = WATCH.dyn_receiver().unwrap(); | ||
| 362 | |||
| 363 | WATCH.write(10); | ||
| 364 | |||
| 365 | // Receive the new value | ||
| 366 | assert_eq!(rcv0.changed().await, 10); | ||
| 367 | assert_eq!(rcv1.changed().await, 10); | ||
| 368 | |||
| 369 | // No update | ||
| 370 | assert_eq!(rcv0.try_changed(), None); | ||
| 371 | assert_eq!(rcv1.try_changed(), None); | ||
| 372 | |||
| 373 | WATCH.write(20); | ||
| 374 | |||
| 375 | assert_eq!(rcv0.changed().await, 20); | ||
| 376 | assert_eq!(rcv1.changed().await, 20); | ||
| 377 | }; | ||
| 378 | block_on(f); | ||
| 379 | } | ||
| 380 | |||
| 381 | #[test] | ||
| 382 | fn max_receivers() { | ||
| 383 | let f = async { | ||
| 384 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 385 | |||
| 386 | // Obtain Receivers | ||
| 387 | let _ = WATCH.receiver().unwrap(); | ||
| 388 | let _ = WATCH.receiver().unwrap(); | ||
| 389 | assert!(WATCH.receiver().is_err()); | ||
| 390 | }; | ||
| 391 | block_on(f); | ||
| 392 | } | ||
| 393 | |||
| 394 | #[test] | ||
| 395 | fn receive_initial() { | ||
| 396 | let f = async { | ||
| 397 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 398 | |||
| 399 | // Obtain Receivers | ||
| 400 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 401 | let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 402 | |||
| 403 | assert_eq!(rcv0.contains_value(), false); | ||
| 404 | |||
| 405 | assert_eq!(rcv0.try_changed(), None); | ||
| 406 | assert_eq!(rcv1.try_changed(), None); | ||
| 407 | |||
| 408 | WATCH.write(0); | ||
| 409 | |||
| 410 | assert_eq!(rcv0.contains_value(), true); | ||
| 411 | |||
| 412 | assert_eq!(rcv0.try_changed(), Some(0)); | ||
| 413 | assert_eq!(rcv1.try_changed(), Some(0)); | ||
| 414 | }; | ||
| 415 | block_on(f); | ||
| 416 | } | ||
| 417 | |||
| 418 | #[test] | ||
| 419 | fn peek_get_changed() { | ||
| 420 | let f = async { | ||
| 421 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 422 | |||
| 423 | // Obtain Receivers | ||
| 424 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 425 | |||
| 426 | WATCH.write(10); | ||
| 427 | |||
| 428 | // Ensure peek does not mark as seen | ||
| 429 | assert_eq!(rcv0.peek().await, 10); | ||
| 430 | assert_eq!(rcv0.try_changed(), Some(10)); | ||
| 431 | assert_eq!(rcv0.try_changed(), None); | ||
| 432 | assert_eq!(rcv0.peek().await, 10); | ||
| 433 | |||
| 434 | WATCH.write(20); | ||
| 435 | |||
| 436 | // Ensure get does mark as seen | ||
| 437 | assert_eq!(rcv0.get().await, 20); | ||
| 438 | assert_eq!(rcv0.try_changed(), None); | ||
| 439 | assert_eq!(rcv0.try_get(), Some(20)); | ||
| 440 | }; | ||
| 441 | block_on(f); | ||
| 442 | } | ||
| 443 | |||
| 444 | #[test] | ||
| 445 | fn count_ids() { | ||
| 446 | let f = async { | ||
| 447 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 448 | |||
| 449 | // Obtain Receivers | ||
| 450 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 451 | let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 452 | |||
| 453 | let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); | ||
| 454 | |||
| 455 | WATCH.write(10); | ||
| 456 | |||
| 457 | assert_eq!(rcv0.changed().await, 10); | ||
| 458 | assert_eq!(rcv1.changed().await, 10); | ||
| 459 | |||
| 460 | assert_eq!(rcv0.try_changed(), None); | ||
| 461 | assert_eq!(rcv1.try_changed(), None); | ||
| 462 | |||
| 463 | WATCH.write(20); | ||
| 464 | WATCH.write(20); | ||
| 465 | WATCH.write(20); | ||
| 466 | |||
| 467 | assert_eq!(rcv0.changed().await, 20); | ||
| 468 | assert_eq!(rcv1.changed().await, 20); | ||
| 469 | |||
| 470 | assert_eq!(rcv0.try_changed(), None); | ||
| 471 | assert_eq!(rcv1.try_changed(), None); | ||
| 472 | |||
| 473 | assert_eq!(get_id(), 4); | ||
| 474 | }; | ||
| 475 | block_on(f); | ||
| 476 | } | ||
| 477 | |||
| 478 | #[test] | ||
| 479 | fn peek_still_await() { | ||
| 480 | let f = async { | ||
| 481 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 482 | |||
| 483 | // Obtain Receivers | ||
| 484 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 485 | let mut rcv1 = WATCH.receiver().unwrap(); | ||
| 486 | |||
| 487 | WATCH.write(10); | ||
| 488 | |||
| 489 | assert_eq!(rcv0.peek().await, 10); | ||
| 490 | assert_eq!(rcv1.try_peek(), Some(10)); | ||
| 491 | |||
| 492 | assert_eq!(rcv0.changed().await, 10); | ||
| 493 | assert_eq!(rcv1.changed().await, 10); | ||
| 494 | }; | ||
| 495 | block_on(f); | ||
| 496 | } | ||
| 497 | |||
| 498 | #[test] | ||
| 499 | fn peek_with_static() { | ||
| 500 | let f = async { | ||
| 501 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 502 | |||
| 503 | // Obtain Receivers | ||
| 504 | let rcv0 = WATCH.receiver().unwrap(); | ||
| 505 | let rcv1 = WATCH.receiver().unwrap(); | ||
| 506 | |||
| 507 | WATCH.write(20); | ||
| 508 | |||
| 509 | assert_eq!(rcv0.peek().await, 20); | ||
| 510 | assert_eq!(rcv1.peek().await, 20); | ||
| 511 | assert_eq!(WATCH.try_peek(), Some(20)); | ||
| 512 | }; | ||
| 513 | block_on(f); | ||
| 514 | } | ||
| 515 | } | ||
