aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorPeter Krull <[email protected]>2024-02-29 16:45:44 +0100
committerPeter Krull <[email protected]>2024-02-29 16:45:44 +0100
commitae2f10992149279884ea564b00eb18c8bf1f464e (patch)
treea5f69cb268570fcd7844b6e4f79da5ebd4872be6 /embassy-sync
parent6defb4fed98432dee948634f3b2001cb4ea7ec5b (diff)
Added sender types, support for dropping receivers, converting to dyn-types, revised tests.
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/src/watch.rs521
1 files changed, 374 insertions, 147 deletions
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs
index 7e5e92741..3e22b1e7b 100644
--- a/embassy-sync/src/watch.rs
+++ b/embassy-sync/src/watch.rs
@@ -1,4 +1,4 @@
1//! A synchronization primitive for passing the latest value to **multiple** tasks. 1//! A synchronization primitive for passing the latest value to **multiple** receivers.
2 2
3use core::cell::RefCell; 3use core::cell::RefCell;
4use core::future::poll_fn; 4use core::future::poll_fn;
@@ -10,22 +10,17 @@ use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex; 10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration; 11use crate::waitqueue::MultiWakerRegistration;
12 12
13/// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. 13/// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await
14/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
15/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
16/// value when a new one is sent, without waiting for all receivers to read the previous value.
14/// 17///
15/// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to 18/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
16/// `.await` the latest value, and all receive it. 19/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
20/// always provided with the latest value.
17/// 21///
18/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except 22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
19/// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead 23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program.
20/// of waiting for the receivers to pop the previous value.
21///
22/// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other
23/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for
24/// [`Receiver`]s to "lose" stale values.
25///
26/// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared
27/// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`]
28/// with [`Watch::receiver`] which has async methods.
29/// ``` 24/// ```
30/// 25///
31/// use futures_executor::block_on; 26/// use futures_executor::block_on;
@@ -36,18 +31,18 @@ use crate::waitqueue::MultiWakerRegistration;
36/// 31///
37/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 32/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
38/// 33///
39/// // Obtain Receivers 34/// // Obtain receivers and sender
40/// let mut rcv0 = WATCH.receiver().unwrap(); 35/// let mut rcv0 = WATCH.receiver().unwrap();
41/// let mut rcv1 = WATCH.receiver().unwrap(); 36/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
42/// assert!(WATCH.receiver().is_err()); 37/// let mut snd = WATCH.sender();
43/// 38///
39/// // No more receivers, and no update
40/// assert!(WATCH.receiver().is_err());
44/// assert_eq!(rcv1.try_changed(), None); 41/// assert_eq!(rcv1.try_changed(), None);
45/// 42///
46/// WATCH.write(10); 43/// snd.send(10);
47/// assert_eq!(WATCH.try_peek(), Some(10));
48///
49/// 44///
50/// // Receive the new value 45/// // Receive the new value (async or try)
51/// assert_eq!(rcv0.changed().await, 10); 46/// assert_eq!(rcv0.changed().await, 10);
52/// assert_eq!(rcv1.try_changed(), Some(10)); 47/// assert_eq!(rcv1.try_changed(), Some(10));
53/// 48///
@@ -55,13 +50,14 @@ use crate::waitqueue::MultiWakerRegistration;
55/// assert_eq!(rcv0.try_changed(), None); 50/// assert_eq!(rcv0.try_changed(), None);
56/// assert_eq!(rcv1.try_changed(), None); 51/// assert_eq!(rcv1.try_changed(), None);
57/// 52///
58/// WATCH.write(20); 53/// snd.send(20);
59/// 54///
60/// // Defference `between` peek `get`. 55/// // Peek does not mark the value as seen
61/// assert_eq!(rcv0.peek().await, 20); 56/// assert_eq!(rcv0.peek().await, 20);
62/// assert_eq!(rcv1.get().await, 20);
63///
64/// assert_eq!(rcv0.try_changed(), Some(20)); 57/// assert_eq!(rcv0.try_changed(), Some(20));
58///
59/// // Get marks the value as seen
60/// assert_eq!(rcv1.get().await, 20);
65/// assert_eq!(rcv1.try_changed(), None); 61/// assert_eq!(rcv1.try_changed(), None);
66/// 62///
67/// }; 63/// };
@@ -80,30 +76,57 @@ struct WatchState<const N: usize, T: Clone> {
80 76
81/// A trait representing the 'inner' behavior of the `Watch`. 77/// A trait representing the 'inner' behavior of the `Watch`.
82pub trait WatchBehavior<T: Clone> { 78pub trait WatchBehavior<T: Clone> {
79 /// Sends a new value to the `Watch`.
80 fn send(&self, val: T);
81
82 /// Clears the value of the `Watch`.
83 fn clear(&self);
84
83 /// Poll the `Watch` for the current value, **without** making it as seen. 85 /// Poll the `Watch` for the current value, **without** making it as seen.
84 fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>; 86 fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>;
85 87
86 /// Tries to peek the value of the `Watch`, **without** marking it as seen. 88 /// Tries to peek the value of the `Watch`, **without** marking it as seen.
87 fn inner_try_peek(&self) -> Option<T>; 89 fn try_peek(&self) -> Option<T>;
88 90
89 /// Poll the `Watch` for the current value, making it as seen. 91 /// Poll the `Watch` for the current value, making it as seen.
90 fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; 92 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
91 93
92 /// Tries to get the value of the `Watch`, marking it as seen. 94 /// Tries to get the value of the `Watch`, marking it as seen.
93 fn inner_try_get(&self, id: &mut u64) -> Option<T>; 95 fn try_get(&self, id: &mut u64) -> Option<T>;
94 96
95 /// Poll the `Watch` for a changed value, marking it as seen. 97 /// Poll the `Watch` for a changed value, marking it as seen.
96 fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; 98 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
97 99
98 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. 100 /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
99 fn inner_try_changed(&self, id: &mut u64) -> Option<T>; 101 fn try_changed(&self, id: &mut u64) -> Option<T>;
100 102
101 /// Checks if the `Watch` is been initialized with a value. 103 /// Checks if the `Watch` is been initialized with a value.
102 fn inner_contains_value(&self) -> bool; 104 fn contains_value(&self) -> bool;
105
106 /// Used when a receiver is dropped to decrement the receiver count.
107 ///
108 /// ## This method should not be called by the user.
109 fn drop_receiver(&self);
103} 110}
104 111
105impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { 112impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
106 fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> { 113 fn send(&self, val: T) {
114 self.mutex.lock(|state| {
115 let mut s = state.borrow_mut();
116 s.data = Some(val);
117 s.current_id += 1;
118 s.wakers.wake();
119 })
120 }
121
122 fn clear(&self) {
123 self.mutex.lock(|state| {
124 let mut s = state.borrow_mut();
125 s.data = None;
126 })
127 }
128
129 fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> {
107 self.mutex.lock(|state| { 130 self.mutex.lock(|state| {
108 let mut s = state.borrow_mut(); 131 let mut s = state.borrow_mut();
109 match &s.data { 132 match &s.data {
@@ -116,11 +139,11 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
116 }) 139 })
117 } 140 }
118 141
119 fn inner_try_peek(&self) -> Option<T> { 142 fn try_peek(&self) -> Option<T> {
120 self.mutex.lock(|state| state.borrow().data.clone()) 143 self.mutex.lock(|state| state.borrow().data.clone())
121 } 144 }
122 145
123 fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { 146 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
124 self.mutex.lock(|state| { 147 self.mutex.lock(|state| {
125 let mut s = state.borrow_mut(); 148 let mut s = state.borrow_mut();
126 match &s.data { 149 match &s.data {
@@ -136,7 +159,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
136 }) 159 })
137 } 160 }
138 161
139 fn inner_try_get(&self, id: &mut u64) -> Option<T> { 162 fn try_get(&self, id: &mut u64) -> Option<T> {
140 self.mutex.lock(|state| { 163 self.mutex.lock(|state| {
141 let s = state.borrow(); 164 let s = state.borrow();
142 *id = s.current_id; 165 *id = s.current_id;
@@ -144,7 +167,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
144 }) 167 })
145 } 168 }
146 169
147 fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { 170 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
148 self.mutex.lock(|state| { 171 self.mutex.lock(|state| {
149 let mut s = state.borrow_mut(); 172 let mut s = state.borrow_mut();
150 match (&s.data, s.current_id > *id) { 173 match (&s.data, s.current_id > *id) {
@@ -160,7 +183,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
160 }) 183 })
161 } 184 }
162 185
163 fn inner_try_changed(&self, id: &mut u64) -> Option<T> { 186 fn try_changed(&self, id: &mut u64) -> Option<T> {
164 self.mutex.lock(|state| { 187 self.mutex.lock(|state| {
165 let s = state.borrow(); 188 let s = state.borrow();
166 match s.current_id > *id { 189 match s.current_id > *id {
@@ -173,9 +196,16 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
173 }) 196 })
174 } 197 }
175 198
176 fn inner_contains_value(&self) -> bool { 199 fn contains_value(&self) -> bool {
177 self.mutex.lock(|state| state.borrow().data.is_some()) 200 self.mutex.lock(|state| state.borrow().data.is_some())
178 } 201 }
202
203 fn drop_receiver(&self) {
204 self.mutex.lock(|state| {
205 let mut s = state.borrow_mut();
206 s.receiver_count -= 1;
207 })
208 }
179} 209}
180 210
181#[derive(Debug)] 211#[derive(Debug)]
@@ -198,14 +228,14 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
198 } 228 }
199 } 229 }
200 230
201 /// Write a new value to the `Watch`. 231 /// Create a new [`Receiver`] for the `Watch`.
202 pub fn write(&self, val: T) { 232 pub fn sender(&self) -> Sender<'_, M, T, N> {
203 self.mutex.lock(|state| { 233 Sender(Snd::new(self))
204 let mut s = state.borrow_mut(); 234 }
205 s.data = Some(val); 235
206 s.current_id += 1; 236 /// Create a new [`DynReceiver`] for the `Watch`.
207 s.wakers.wake(); 237 pub fn dyn_sender(&self) -> DynSender<'_, T> {
208 }) 238 DynSender(Snd::new(self))
209 } 239 }
210 240
211 /// Create a new [`Receiver`] for the `Watch`. 241 /// Create a new [`Receiver`] for the `Watch`.
@@ -214,7 +244,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
214 let mut s = state.borrow_mut(); 244 let mut s = state.borrow_mut();
215 if s.receiver_count < N { 245 if s.receiver_count < N {
216 s.receiver_count += 1; 246 s.receiver_count += 1;
217 Ok(Receiver(Rcv::new(self))) 247 Ok(Receiver(Rcv::new(self, 0)))
218 } else { 248 } else {
219 Err(Error::MaximumReceiversReached) 249 Err(Error::MaximumReceiversReached)
220 } 250 }
@@ -227,29 +257,121 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
227 let mut s = state.borrow_mut(); 257 let mut s = state.borrow_mut();
228 if s.receiver_count < N { 258 if s.receiver_count < N {
229 s.receiver_count += 1; 259 s.receiver_count += 1;
230 Ok(DynReceiver(Rcv::new(self))) 260 Ok(DynReceiver(Rcv::new(self, 0)))
231 } else { 261 } else {
232 Err(Error::MaximumReceiversReached) 262 Err(Error::MaximumReceiversReached)
233 } 263 }
234 }) 264 })
235 } 265 }
266}
267
268/// A receiver can `.await` a change in the `Watch` value.
269pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
270 watch: &'a W,
271 _phantom: PhantomData<T>,
272}
273
274impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
275 fn clone(&self) -> Self {
276 Self {
277 watch: self.watch,
278 _phantom: PhantomData,
279 }
280 }
281}
282
283impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
284 /// Creates a new `Receiver` with a reference to the `Watch`.
285 fn new(watch: &'a W) -> Self {
286 Self {
287 watch,
288 _phantom: PhantomData,
289 }
290 }
291
292 /// Sends a new value to the `Watch`.
293 pub fn send(&self, val: T) {
294 self.watch.send(val)
295 }
296
297 /// Clears the value of the `Watch`.
298 /// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending.
299 pub fn clear(&self) {
300 self.watch.clear()
301 }
236 302
237 /// Tries to retrieve the value of the `Watch`. 303 /// Tries to retrieve the value of the `Watch`.
238 pub fn try_peek(&self) -> Option<T> { 304 pub fn try_peek(&self) -> Option<T> {
239 self.inner_try_peek() 305 self.watch.try_peek()
240 } 306 }
241 307
242 /// Returns true if the `Watch` contains a value. 308 /// Returns true if the `Watch` contains a value.
243 pub fn contains_value(&self) -> bool { 309 pub fn contains_value(&self) -> bool {
244 self.inner_contains_value() 310 self.watch.contains_value()
245 } 311 }
312}
246 313
247 /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. 314/// A sender of a `Watch` channel.
248 pub fn clear(&self) { 315///
249 self.mutex.lock(|state| { 316/// For a simpler type definition, consider [`DynSender`] at the expense of
250 let mut s = state.borrow_mut(); 317/// some runtime performance due to dynamic dispatch.
251 s.data = None; 318pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
252 }) 319
320impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
321 fn clone(&self) -> Self {
322 Self(self.0.clone())
323 }
324}
325
326impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
327 /// Converts the `Sender` into a [`DynSender`].
328 pub fn as_dyn(self) -> DynSender<'a, T> {
329 DynSender(Snd::new(self.watch))
330 }
331}
332
333impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
334 fn into(self) -> DynSender<'a, T> {
335 self.as_dyn()
336 }
337}
338
339impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
340 type Target = Snd<'a, T, Watch<M, T, N>>;
341
342 fn deref(&self) -> &Self::Target {
343 &self.0
344 }
345}
346
347impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
348 fn deref_mut(&mut self) -> &mut Self::Target {
349 &mut self.0
350 }
351}
352
353/// A sender which holds a **dynamic** reference to a `Watch` channel.
354///
355/// This is an alternative to [`Sender`] with a simpler type definition,
356pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
357
358impl<'a, T: Clone> Clone for DynSender<'a, T> {
359 fn clone(&self) -> Self {
360 Self(self.0.clone())
361 }
362}
363
364impl<'a, T: Clone> Deref for DynSender<'a, T> {
365 type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
366
367 fn deref(&self) -> &Self::Target {
368 &self.0
369 }
370}
371
372impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
373 fn deref_mut(&mut self) -> &mut Self::Target {
374 &mut self.0
253 } 375 }
254} 376}
255 377
@@ -262,59 +384,83 @@ pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
262 384
263impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { 385impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
264 /// Creates a new `Receiver` with a reference to the `Watch`. 386 /// Creates a new `Receiver` with a reference to the `Watch`.
265 fn new(watch: &'a W) -> Self { 387 fn new(watch: &'a W, at_id: u64) -> Self {
266 Self { 388 Self {
267 watch, 389 watch,
268 at_id: 0, 390 at_id,
269 _phantom: PhantomData, 391 _phantom: PhantomData,
270 } 392 }
271 } 393 }
272 394
273 /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. 395 /// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen.
396 ///
397 /// **Note**: Futures do nothing unless you `.await` or poll them.
274 pub async fn peek(&self) -> T { 398 pub async fn peek(&self) -> T {
275 poll_fn(|cx| self.watch.inner_poll_peek(cx)).await 399 poll_fn(|cx| self.watch.poll_peek(cx)).await
276 } 400 }
277 401
278 /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. 402 /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen.
279 pub fn try_peek(&self) -> Option<T> { 403 pub fn try_peek(&self) -> Option<T> {
280 self.watch.inner_try_peek() 404 self.watch.try_peek()
281 } 405 }
282 406
283 /// Returns the current value of the `Watch` if it is initialized, marking it as seen. 407 /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
408 ///
409 /// **Note**: Futures do nothing unless you `.await` or poll them.
284 pub async fn get(&mut self) -> T { 410 pub async fn get(&mut self) -> T {
285 poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await 411 poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)).await
286 } 412 }
287 413
288 /// Tries to get the current value of the `Watch` without waiting, marking it as seen. 414 /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
289 pub fn try_get(&mut self) -> Option<T> { 415 pub fn try_get(&mut self) -> Option<T> {
290 self.watch.inner_try_get(&mut self.at_id) 416 self.watch.try_get(&mut self.at_id)
291 } 417 }
292 418
293 /// Waits for the `Watch` to change and returns the new value, marking it as seen. 419 /// Waits for the `Watch` to change and returns the new value, marking it as seen.
420 ///
421 /// **Note**: Futures do nothing unless you `.await` or poll them.
294 pub async fn changed(&mut self) -> T { 422 pub async fn changed(&mut self) -> T {
295 poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await 423 poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
296 } 424 }
297 425
298 /// Tries to get the new value of the watch without waiting, marking it as seen. 426 /// Tries to get the new value of the watch without waiting, marking it as seen.
299 pub fn try_changed(&mut self) -> Option<T> { 427 pub fn try_changed(&mut self) -> Option<T> {
300 self.watch.inner_try_changed(&mut self.at_id) 428 self.watch.try_changed(&mut self.at_id)
301 } 429 }
302 430
303 /// Checks if the `Watch` contains a value. If this returns true, 431 /// Checks if the `Watch` contains a value. If this returns true,
304 /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. 432 /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately.
305 pub fn contains_value(&self) -> bool { 433 pub fn contains_value(&self) -> bool {
306 self.watch.inner_contains_value() 434 self.watch.contains_value()
435 }
436}
437
438impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
439 fn drop(&mut self) {
440 self.watch.drop_receiver();
307 } 441 }
308} 442}
309 443
310/// A receiver of a `Watch` channel. 444/// A receiver of a `Watch` channel.
311pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); 445pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
312 446
313/// A receiver which holds a **reference** to a `Watch` channel. 447impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
314/// 448 /// Converts the `Receiver` into a [`DynReceiver`].
315/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of 449 pub fn as_dyn(self) -> DynReceiver<'a, T> {
316/// some runtime performance due to dynamic dispatch. 450 // We need to increment the receiver count since the original
317pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); 451 // receiver is being dropped, which decrements the count.
452 self.watch.mutex.lock(|state| {
453 state.borrow_mut().receiver_count += 1;
454 });
455 DynReceiver(Rcv::new(self.0.watch, self.at_id))
456 }
457}
458
459impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
460 fn into(self) -> DynReceiver<'a, T> {
461 self.as_dyn()
462 }
463}
318 464
319impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { 465impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
320 type Target = Rcv<'a, T, Watch<M, T, N>>; 466 type Target = Rcv<'a, T, Watch<M, T, N>>;
@@ -330,6 +476,12 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T,
330 } 476 }
331} 477}
332 478
479/// A receiver which holds a **dynamic** reference to a `Watch` channel.
480///
481/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
482/// some runtime performance due to dynamic dispatch.
483pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
484
333impl<'a, T: Clone> Deref for DynReceiver<'a, T> { 485impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
334 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; 486 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
335 487
@@ -348,67 +500,94 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
348mod tests { 500mod tests {
349 use futures_executor::block_on; 501 use futures_executor::block_on;
350 502
351 use super::*; 503 use super::Watch;
352 use crate::blocking_mutex::raw::CriticalSectionRawMutex; 504 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
353 505
354 #[test] 506 #[test]
355 fn multiple_writes() { 507 fn multiple_sends() {
356 let f = async { 508 let f = async {
357 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 509 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
358 510
359 // Obtain Receivers 511 // Obtain receiver and sender
360 let mut rcv0 = WATCH.receiver().unwrap(); 512 let mut rcv = WATCH.receiver().unwrap();
361 let mut rcv1 = WATCH.dyn_receiver().unwrap(); 513 let snd = WATCH.sender();
362 514
363 WATCH.write(10); 515 // Not initialized
516 assert_eq!(rcv.try_changed(), None);
364 517
365 // Receive the new value 518 // Receive the new value
366 assert_eq!(rcv0.changed().await, 10); 519 snd.send(10);
367 assert_eq!(rcv1.changed().await, 10); 520 assert_eq!(rcv.changed().await, 10);
521
522 // Receive another value
523 snd.send(20);
524 assert_eq!(rcv.try_changed(), Some(20));
368 525
369 // No update 526 // No update
370 assert_eq!(rcv0.try_changed(), None); 527 assert_eq!(rcv.try_changed(), None);
371 assert_eq!(rcv1.try_changed(), None); 528 };
529 block_on(f);
530 }
372 531
373 WATCH.write(20); 532 #[test]
533 fn receive_after_create() {
534 let f = async {
535 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
536
537 // Obtain sender and send value
538 let snd = WATCH.sender();
539 snd.send(10);
374 540
375 assert_eq!(rcv0.changed().await, 20); 541 // Obtain receiver and receive value
376 assert_eq!(rcv1.changed().await, 20); 542 let mut rcv = WATCH.receiver().unwrap();
543 assert_eq!(rcv.try_changed(), Some(10));
377 }; 544 };
378 block_on(f); 545 block_on(f);
379 } 546 }
380 547
381 #[test] 548 #[test]
382 fn max_receivers() { 549 fn max_receivers_drop() {
383 let f = async { 550 let f = async {
384 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 551 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
385 552
386 // Obtain Receivers 553 // Try to create 3 receivers (only 2 can exist at once)
387 let _ = WATCH.receiver().unwrap(); 554 let rcv0 = WATCH.receiver();
388 let _ = WATCH.receiver().unwrap(); 555 let rcv1 = WATCH.receiver();
389 assert!(WATCH.receiver().is_err()); 556 let rcv2 = WATCH.receiver();
557
558 // Ensure the first two are successful and the third is not
559 assert!(rcv0.is_ok());
560 assert!(rcv1.is_ok());
561 assert!(rcv2.is_err());
562
563 // Drop the first receiver
564 drop(rcv0);
565
566 // Create another receiver and ensure it is successful
567 let rcv3 = WATCH.receiver();
568 assert!(rcv3.is_ok());
390 }; 569 };
391 block_on(f); 570 block_on(f);
392 } 571 }
393 572
394 #[test] 573 #[test]
395 fn receive_initial() { 574 fn multiple_receivers() {
396 let f = async { 575 let f = async {
397 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 576 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
398 577
399 // Obtain Receivers 578 // Obtain receivers and sender
400 let mut rcv0 = WATCH.receiver().unwrap(); 579 let mut rcv0 = WATCH.receiver().unwrap();
401 let mut rcv1 = WATCH.receiver().unwrap(); 580 let mut rcv1 = WATCH.receiver().unwrap();
581 let snd = WATCH.sender();
402 582
403 assert_eq!(rcv0.contains_value(), false); 583 // No update for both
404
405 assert_eq!(rcv0.try_changed(), None); 584 assert_eq!(rcv0.try_changed(), None);
406 assert_eq!(rcv1.try_changed(), None); 585 assert_eq!(rcv1.try_changed(), None);
407 586
408 WATCH.write(0); 587 // Send a new value
409 588 snd.send(0);
410 assert_eq!(rcv0.contains_value(), true);
411 589
590 // Both receivers receive the new value
412 assert_eq!(rcv0.try_changed(), Some(0)); 591 assert_eq!(rcv0.try_changed(), Some(0));
413 assert_eq!(rcv1.try_changed(), Some(0)); 592 assert_eq!(rcv1.try_changed(), Some(0));
414 }; 593 };
@@ -416,99 +595,147 @@ mod tests {
416 } 595 }
417 596
418 #[test] 597 #[test]
598 fn clone_senders() {
599 let f = async {
600 // Obtain different ways to send
601 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
602 let snd0 = WATCH.sender();
603 let snd1 = snd0.clone();
604
605 // Obtain Receiver
606 let mut rcv = WATCH.receiver().unwrap().as_dyn();
607
608 // Send a value from first sender
609 snd0.send(10);
610 assert_eq!(rcv.try_changed(), Some(10));
611
612 // Send a value from second sender
613 snd1.send(20);
614 assert_eq!(rcv.try_changed(), Some(20));
615 };
616 block_on(f);
617 }
618
619 #[test]
419 fn peek_get_changed() { 620 fn peek_get_changed() {
420 let f = async { 621 let f = async {
421 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 622 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
422 623
423 // Obtain Receivers 624 // Obtain receiver and sender
424 let mut rcv0 = WATCH.receiver().unwrap(); 625 let mut rcv = WATCH.receiver().unwrap();
626 let snd = WATCH.sender();
425 627
426 WATCH.write(10); 628 // Send a value
629 snd.send(10);
427 630
428 // Ensure peek does not mark as seen 631 // Ensure peek does not mark as seen
429 assert_eq!(rcv0.peek().await, 10); 632 assert_eq!(rcv.peek().await, 10);
430 assert_eq!(rcv0.try_changed(), Some(10)); 633 assert_eq!(rcv.try_changed(), Some(10));
431 assert_eq!(rcv0.try_changed(), None); 634 assert_eq!(rcv.try_changed(), None);
432 assert_eq!(rcv0.peek().await, 10); 635 assert_eq!(rcv.try_peek(), Some(10));
433 636
434 WATCH.write(20); 637 // Send a value
638 snd.send(20);
435 639
436 // Ensure get does mark as seen 640 // Ensure get does mark as seen
437 assert_eq!(rcv0.get().await, 20); 641 assert_eq!(rcv.get().await, 20);
438 assert_eq!(rcv0.try_changed(), None); 642 assert_eq!(rcv.try_changed(), None);
439 assert_eq!(rcv0.try_get(), Some(20)); 643 assert_eq!(rcv.try_get(), Some(20));
440 }; 644 };
441 block_on(f); 645 block_on(f);
442 } 646 }
443 647
444 #[test] 648 #[test]
445 fn count_ids() { 649 fn use_dynamics() {
446 let f = async { 650 let f = async {
447 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 651 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
448 652
449 // Obtain Receivers 653 // Obtain receiver and sender
450 let mut rcv0 = WATCH.receiver().unwrap(); 654 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
451 let mut rcv1 = WATCH.receiver().unwrap(); 655 let dyn_snd = WATCH.dyn_sender();
452 656
453 let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); 657 // Send a value
658 dyn_snd.send(10);
454 659
455 WATCH.write(10); 660 // Ensure the dynamic receiver receives the value
456 661 assert_eq!(dyn_rcv.try_changed(), Some(10));
457 assert_eq!(rcv0.changed().await, 10); 662 assert_eq!(dyn_rcv.try_changed(), None);
458 assert_eq!(rcv1.changed().await, 10); 663 };
664 block_on(f);
665 }
459 666
460 assert_eq!(rcv0.try_changed(), None); 667 #[test]
461 assert_eq!(rcv1.try_changed(), None); 668 fn convert_to_dyn() {
669 let f = async {
670 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
462 671
463 WATCH.write(20); 672 // Obtain receiver and sender
464 WATCH.write(20); 673 let rcv = WATCH.receiver().unwrap();
465 WATCH.write(20); 674 let snd = WATCH.sender();
466 675
467 assert_eq!(rcv0.changed().await, 20); 676 // Convert to dynamic
468 assert_eq!(rcv1.changed().await, 20); 677 let mut dyn_rcv = rcv.as_dyn();
678 let dyn_snd = snd.as_dyn();
469 679
470 assert_eq!(rcv0.try_changed(), None); 680 // Send a value
471 assert_eq!(rcv1.try_changed(), None); 681 dyn_snd.send(10);
472 682
473 assert_eq!(get_id(), 4); 683 // Ensure the dynamic receiver receives the value
684 assert_eq!(dyn_rcv.try_changed(), Some(10));
685 assert_eq!(dyn_rcv.try_changed(), None);
474 }; 686 };
475 block_on(f); 687 block_on(f);
476 } 688 }
477 689
478 #[test] 690 #[test]
479 fn peek_still_await() { 691 fn dynamic_receiver_count() {
480 let f = async { 692 let f = async {
481 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 693 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
482 694
483 // Obtain Receivers 695 // Obtain receiver and sender
484 let mut rcv0 = WATCH.receiver().unwrap(); 696 let rcv0 = WATCH.receiver();
485 let mut rcv1 = WATCH.receiver().unwrap(); 697 let rcv1 = WATCH.receiver();
698 let rcv2 = WATCH.receiver();
486 699
487 WATCH.write(10); 700 // Ensure the first two are successful and the third is not
701 assert!(rcv0.is_ok());
702 assert!(rcv1.is_ok());
703 assert!(rcv2.is_err());
488 704
489 assert_eq!(rcv0.peek().await, 10); 705 // Convert to dynamic
490 assert_eq!(rcv1.try_peek(), Some(10)); 706 let dyn_rcv0 = rcv0.unwrap().as_dyn();
491 707
492 assert_eq!(rcv0.changed().await, 10); 708 // Drop the (now dynamic) receiver
493 assert_eq!(rcv1.changed().await, 10); 709 drop(dyn_rcv0);
710
711 // Create another receiver and ensure it is successful
712 let rcv3 = WATCH.receiver();
713 let rcv4 = WATCH.receiver();
714 assert!(rcv3.is_ok());
715 assert!(rcv4.is_err());
494 }; 716 };
495 block_on(f); 717 block_on(f);
496 } 718 }
497 719
498 #[test] 720 #[test]
499 fn peek_with_static() { 721 fn contains_value() {
500 let f = async { 722 let f = async {
501 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); 723 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
502 724
503 // Obtain Receivers 725 // Obtain receiver and sender
504 let rcv0 = WATCH.receiver().unwrap(); 726 let rcv = WATCH.receiver().unwrap();
505 let rcv1 = WATCH.receiver().unwrap(); 727 let snd = WATCH.sender();
728
729 // check if the watch contains a value
730 assert_eq!(rcv.contains_value(), false);
731 assert_eq!(snd.contains_value(), false);
506 732
507 WATCH.write(20); 733 // Send a value
734 snd.send(10);
508 735
509 assert_eq!(rcv0.peek().await, 20); 736 // check if the watch contains a value
510 assert_eq!(rcv1.peek().await, 20); 737 assert_eq!(rcv.contains_value(), true);
511 assert_eq!(WATCH.try_peek(), Some(20)); 738 assert_eq!(snd.contains_value(), true);
512 }; 739 };
513 block_on(f); 740 block_on(f);
514 } 741 }