diff options
| -rw-r--r-- | embassy-sync/src/lib.rs | 1 | ||||
| -rw-r--r-- | embassy-sync/src/multi_signal.rs | 285 |
2 files changed, 286 insertions, 0 deletions
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index d88c76db5..f02985564 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -12,6 +12,7 @@ mod ring_buffer; | |||
| 12 | 12 | ||
| 13 | pub mod blocking_mutex; | 13 | pub mod blocking_mutex; |
| 14 | pub mod channel; | 14 | pub mod channel; |
| 15 | pub mod multi_signal; | ||
| 15 | pub mod mutex; | 16 | pub mod mutex; |
| 16 | pub mod pipe; | 17 | pub mod pipe; |
| 17 | pub mod priority_channel; | 18 | pub mod priority_channel; |
diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs new file mode 100644 index 000000000..db858f269 --- /dev/null +++ b/embassy-sync/src/multi_signal.rs | |||
| @@ -0,0 +1,285 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** tasks. | ||
| 2 | use core::{ | ||
| 3 | cell::RefCell, | ||
| 4 | marker::PhantomData, | ||
| 5 | ops::{Deref, DerefMut}, | ||
| 6 | pin::Pin, | ||
| 7 | task::{Context, Poll}, | ||
| 8 | }; | ||
| 9 | |||
| 10 | use futures_util::Future; | ||
| 11 | |||
| 12 | use crate::{ | ||
| 13 | blocking_mutex::{raw::RawMutex, Mutex}, | ||
| 14 | waitqueue::MultiWakerRegistration, | ||
| 15 | }; | ||
| 16 | |||
| 17 | /// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. | ||
| 18 | /// | ||
| 19 | /// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to | ||
| 20 | /// `.await` the latest value, and all receive it. | ||
| 21 | /// | ||
| 22 | /// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except | ||
| 23 | /// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead | ||
| 24 | /// of waiting for the receivers to pop the previous value. | ||
| 25 | /// | ||
| 26 | /// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other | ||
| 27 | /// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for | ||
| 28 | /// [`Receiver`]s to "lose" stale values. | ||
| 29 | /// | ||
| 30 | /// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared | ||
| 31 | /// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`] | ||
| 32 | /// with [`MultiSignal::receiver`] which has async methods. | ||
| 33 | /// ``` | ||
| 34 | /// | ||
| 35 | /// use futures_executor::block_on; | ||
| 36 | /// use embassy_sync::multi_signal::MultiSignal; | ||
| 37 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 38 | /// | ||
| 39 | /// let f = async { | ||
| 40 | /// | ||
| 41 | /// static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0); | ||
| 42 | /// | ||
| 43 | /// // Obtain Receivers | ||
| 44 | /// let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); | ||
| 45 | /// let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); | ||
| 46 | /// assert!(SOME_SIGNAL.receiver().is_err()); | ||
| 47 | /// | ||
| 48 | /// SOME_SIGNAL.write(10); | ||
| 49 | /// | ||
| 50 | /// // Receive the new value | ||
| 51 | /// assert_eq!(rcv0.changed().await, 10); | ||
| 52 | /// assert_eq!(rcv1.try_changed(), Some(10)); | ||
| 53 | /// | ||
| 54 | /// // No update | ||
| 55 | /// assert_eq!(rcv0.try_changed(), None); | ||
| 56 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 57 | /// | ||
| 58 | /// SOME_SIGNAL.write(20); | ||
| 59 | /// | ||
| 60 | /// // Receive new value with predicate | ||
| 61 | /// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20); | ||
| 62 | /// assert_eq!(rcv1.try_changed_and(|x|x>&30), None); | ||
| 63 | /// | ||
| 64 | /// // Anyone can peek the current value | ||
| 65 | /// assert_eq!(rcv0.peek(), 20); | ||
| 66 | /// assert_eq!(rcv1.peek(), 20); | ||
| 67 | /// assert_eq!(SOME_SIGNAL.peek(), 20); | ||
| 68 | /// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None); | ||
| 69 | /// }; | ||
| 70 | /// block_on(f); | ||
| 71 | /// ``` | ||
| 72 | pub struct MultiSignal<'a, M: RawMutex, T: Clone, const N: usize> { | ||
| 73 | mutex: Mutex<M, RefCell<MultiSignalState<N, T>>>, | ||
| 74 | _phantom: PhantomData<&'a ()>, | ||
| 75 | } | ||
| 76 | |||
| 77 | struct MultiSignalState<const N: usize, T: Clone> { | ||
| 78 | data: T, | ||
| 79 | current_id: u64, | ||
| 80 | wakers: MultiWakerRegistration<N>, | ||
| 81 | receiver_count: usize, | ||
| 82 | } | ||
| 83 | |||
| 84 | #[derive(Debug)] | ||
| 85 | /// An error that can occur when a `MultiSignal` returns a `Result`. | ||
| 86 | pub enum Error { | ||
| 87 | /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached. | ||
| 88 | MaximumReceiversReached, | ||
| 89 | } | ||
| 90 | |||
| 91 | impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { | ||
| 92 | /// Create a new `MultiSignal` initialized with the given value. | ||
| 93 | pub const fn new(init: T) -> Self { | ||
| 94 | Self { | ||
| 95 | mutex: Mutex::new(RefCell::new(MultiSignalState { | ||
| 96 | data: init, | ||
| 97 | current_id: 1, | ||
| 98 | wakers: MultiWakerRegistration::new(), | ||
| 99 | receiver_count: 0, | ||
| 100 | })), | ||
| 101 | _phantom: PhantomData, | ||
| 102 | } | ||
| 103 | } | ||
| 104 | |||
| 105 | /// Get a [`Receiver`] for the `MultiSignal`. | ||
| 106 | pub fn receiver(&'a self) -> Result<Receiver<'a, M, T, N>, Error> { | ||
| 107 | self.mutex.lock(|state| { | ||
| 108 | let mut s = state.borrow_mut(); | ||
| 109 | if s.receiver_count < N { | ||
| 110 | s.receiver_count += 1; | ||
| 111 | Ok(Receiver(Rcv::new(self))) | ||
| 112 | } else { | ||
| 113 | Err(Error::MaximumReceiversReached) | ||
| 114 | } | ||
| 115 | }) | ||
| 116 | } | ||
| 117 | |||
| 118 | /// Update the value of the `MultiSignal`. | ||
| 119 | pub fn write(&self, data: T) { | ||
| 120 | self.mutex.lock(|state| { | ||
| 121 | let mut s = state.borrow_mut(); | ||
| 122 | s.data = data; | ||
| 123 | s.current_id += 1; | ||
| 124 | s.wakers.wake(); | ||
| 125 | }) | ||
| 126 | } | ||
| 127 | |||
| 128 | /// Peek the current value of the `MultiSignal`. | ||
| 129 | pub fn peek(&self) -> T { | ||
| 130 | self.mutex.lock(|state| state.borrow().data.clone()) | ||
| 131 | } | ||
| 132 | |||
| 133 | /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. | ||
| 134 | pub fn peek_and(&self, f: fn(&T) -> bool) -> Option<T> { | ||
| 135 | self.mutex.lock(|state| { | ||
| 136 | let s = state.borrow(); | ||
| 137 | if f(&s.data) { | ||
| 138 | Some(s.data.clone()) | ||
| 139 | } else { | ||
| 140 | None | ||
| 141 | } | ||
| 142 | }) | ||
| 143 | } | ||
| 144 | |||
| 145 | /// Get the ID of the current value of the `MultiSignal`. | ||
| 146 | /// This method is mostly for testing purposes. | ||
| 147 | #[allow(dead_code)] | ||
| 148 | fn get_id(&self) -> u64 { | ||
| 149 | self.mutex.lock(|state| state.borrow().current_id) | ||
| 150 | } | ||
| 151 | |||
| 152 | /// Poll the `MultiSignal` with an optional context. | ||
| 153 | fn get_with_context(&'a self, waker: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll<T> { | ||
| 154 | self.mutex.lock(|state| { | ||
| 155 | let mut s = state.borrow_mut(); | ||
| 156 | match (s.current_id > waker.at_id, waker.predicate) { | ||
| 157 | (true, None) => { | ||
| 158 | waker.at_id = s.current_id; | ||
| 159 | Poll::Ready(s.data.clone()) | ||
| 160 | } | ||
| 161 | (true, Some(f)) if f(&s.data) => { | ||
| 162 | waker.at_id = s.current_id; | ||
| 163 | Poll::Ready(s.data.clone()) | ||
| 164 | } | ||
| 165 | _ => { | ||
| 166 | if let Some(cx) = cx { | ||
| 167 | s.wakers.register(cx.waker()); | ||
| 168 | } | ||
| 169 | Poll::Pending | ||
| 170 | } | ||
| 171 | } | ||
| 172 | }) | ||
| 173 | } | ||
| 174 | } | ||
| 175 | |||
| 176 | /// A receiver is able to `.await` a changed `MultiSignal` value. | ||
| 177 | pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { | ||
| 178 | multi_sig: &'a MultiSignal<'a, M, T, N>, | ||
| 179 | predicate: Option<fn(&T) -> bool>, | ||
| 180 | at_id: u64, | ||
| 181 | } | ||
| 182 | |||
| 183 | // f: Option<impl FnMut(&T) -> bool> | ||
| 184 | impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { | ||
| 185 | /// Create a new `Receiver` with a reference the given `MultiSignal`. | ||
| 186 | fn new(multi_sig: &'a MultiSignal<'a, M, T, N>) -> Self { | ||
| 187 | Self { | ||
| 188 | multi_sig, | ||
| 189 | predicate: None, | ||
| 190 | at_id: 0, | ||
| 191 | } | ||
| 192 | } | ||
| 193 | |||
| 194 | /// Wait for a change to the value of the corresponding `MultiSignal`. | ||
| 195 | pub fn changed<'s>(&'s mut self) -> ReceiverFuture<'s, 'a, M, T, N> { | ||
| 196 | self.predicate = None; | ||
| 197 | ReceiverFuture { subscriber: self } | ||
| 198 | } | ||
| 199 | |||
| 200 | /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. | ||
| 201 | pub fn changed_and<'s>(&'s mut self, f: fn(&T) -> bool) -> ReceiverFuture<'s, 'a, M, T, N> { | ||
| 202 | self.predicate = Some(f); | ||
| 203 | ReceiverFuture { subscriber: self } | ||
| 204 | } | ||
| 205 | |||
| 206 | /// Try to get a changed value of the corresponding `MultiSignal`. | ||
| 207 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 208 | self.multi_sig.mutex.lock(|state| { | ||
| 209 | let s = state.borrow(); | ||
| 210 | match s.current_id > self.at_id { | ||
| 211 | true => { | ||
| 212 | self.at_id = s.current_id; | ||
| 213 | Some(s.data.clone()) | ||
| 214 | } | ||
| 215 | false => None, | ||
| 216 | } | ||
| 217 | }) | ||
| 218 | } | ||
| 219 | |||
| 220 | /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. | ||
| 221 | pub fn try_changed_and(&mut self, f: fn(&T) -> bool) -> Option<T> { | ||
| 222 | self.multi_sig.mutex.lock(|state| { | ||
| 223 | let s = state.borrow(); | ||
| 224 | match s.current_id > self.at_id && f(&s.data) { | ||
| 225 | true => { | ||
| 226 | self.at_id = s.current_id; | ||
| 227 | Some(s.data.clone()) | ||
| 228 | } | ||
| 229 | false => None, | ||
| 230 | } | ||
| 231 | }) | ||
| 232 | } | ||
| 233 | |||
| 234 | /// Peek the current value of the corresponding `MultiSignal`. | ||
| 235 | pub fn peek(&self) -> T { | ||
| 236 | self.multi_sig.peek() | ||
| 237 | } | ||
| 238 | |||
| 239 | /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. | ||
| 240 | pub fn peek_and(&self, f: fn(&T) -> bool) -> Option<T> { | ||
| 241 | self.multi_sig.peek_and(f) | ||
| 242 | } | ||
| 243 | |||
| 244 | /// Check if the value of the corresponding `MultiSignal` has changed. | ||
| 245 | pub fn has_changed(&mut self) -> bool { | ||
| 246 | self.multi_sig | ||
| 247 | .mutex | ||
| 248 | .lock(|state| state.borrow().current_id > self.at_id) | ||
| 249 | } | ||
| 250 | } | ||
| 251 | |||
| 252 | /// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. | ||
| 253 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); | ||
| 254 | |||
| 255 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | ||
| 256 | type Target = Rcv<'a, M, T, N>; | ||
| 257 | |||
| 258 | fn deref(&self) -> &Self::Target { | ||
| 259 | &self.0 | ||
| 260 | } | ||
| 261 | } | ||
| 262 | |||
| 263 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { | ||
| 264 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 265 | &mut self.0 | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | /// Future for the `Receiver` wait action | ||
| 270 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 271 | pub struct ReceiverFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { | ||
| 272 | subscriber: &'s mut Rcv<'a, M, T, N>, | ||
| 273 | } | ||
| 274 | |||
| 275 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverFuture<'s, 'a, M, T, N> { | ||
| 276 | type Output = T; | ||
| 277 | |||
| 278 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 279 | self.subscriber | ||
| 280 | .multi_sig | ||
| 281 | .get_with_context(&mut self.subscriber, Some(cx)) | ||
| 282 | } | ||
| 283 | } | ||
| 284 | |||
| 285 | impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverFuture<'s, 'a, M, T, N> {} | ||
