aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-sync/src/lib.rs1
-rw-r--r--embassy-sync/src/multi_signal.rs285
2 files changed, 286 insertions, 0 deletions
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index d88c76db5..f02985564 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -12,6 +12,7 @@ mod ring_buffer;
12 12
13pub mod blocking_mutex; 13pub mod blocking_mutex;
14pub mod channel; 14pub mod channel;
15pub mod multi_signal;
15pub mod mutex; 16pub mod mutex;
16pub mod pipe; 17pub mod pipe;
17pub mod priority_channel; 18pub mod priority_channel;
diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs
new file mode 100644
index 000000000..db858f269
--- /dev/null
+++ b/embassy-sync/src/multi_signal.rs
@@ -0,0 +1,285 @@
1//! A synchronization primitive for passing the latest value to **multiple** tasks.
2use core::{
3 cell::RefCell,
4 marker::PhantomData,
5 ops::{Deref, DerefMut},
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_util::Future;
11
12use crate::{
13 blocking_mutex::{raw::RawMutex, Mutex},
14 waitqueue::MultiWakerRegistration,
15};
16
17/// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s.
18///
19/// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to
20/// `.await` the latest value, and all receive it.
21///
22/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except
23/// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead
24/// of waiting for the receivers to pop the previous value.
25///
26/// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other
27/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for
28/// [`Receiver`]s to "lose" stale values.
29///
30/// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared
31/// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`]
32/// with [`MultiSignal::receiver`] which has async methods.
33/// ```
34///
35/// use futures_executor::block_on;
36/// use embassy_sync::multi_signal::MultiSignal;
37/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
38///
39/// let f = async {
40///
41/// static SOME_SIGNAL: MultiSignal<CriticalSectionRawMutex, u8, 2> = MultiSignal::new(0);
42///
43/// // Obtain Receivers
44/// let mut rcv0 = SOME_SIGNAL.receiver().unwrap();
45/// let mut rcv1 = SOME_SIGNAL.receiver().unwrap();
46/// assert!(SOME_SIGNAL.receiver().is_err());
47///
48/// SOME_SIGNAL.write(10);
49///
50/// // Receive the new value
51/// assert_eq!(rcv0.changed().await, 10);
52/// assert_eq!(rcv1.try_changed(), Some(10));
53///
54/// // No update
55/// assert_eq!(rcv0.try_changed(), None);
56/// assert_eq!(rcv1.try_changed(), None);
57///
58/// SOME_SIGNAL.write(20);
59///
60/// // Receive new value with predicate
61/// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20);
62/// assert_eq!(rcv1.try_changed_and(|x|x>&30), None);
63///
64/// // Anyone can peek the current value
65/// assert_eq!(rcv0.peek(), 20);
66/// assert_eq!(rcv1.peek(), 20);
67/// assert_eq!(SOME_SIGNAL.peek(), 20);
68/// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None);
69/// };
70/// block_on(f);
71/// ```
72pub struct MultiSignal<'a, M: RawMutex, T: Clone, const N: usize> {
73 mutex: Mutex<M, RefCell<MultiSignalState<N, T>>>,
74 _phantom: PhantomData<&'a ()>,
75}
76
77struct MultiSignalState<const N: usize, T: Clone> {
78 data: T,
79 current_id: u64,
80 wakers: MultiWakerRegistration<N>,
81 receiver_count: usize,
82}
83
84#[derive(Debug)]
85/// An error that can occur when a `MultiSignal` returns a `Result`.
86pub enum Error {
87 /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached.
88 MaximumReceiversReached,
89}
90
91impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> {
92 /// Create a new `MultiSignal` initialized with the given value.
93 pub const fn new(init: T) -> Self {
94 Self {
95 mutex: Mutex::new(RefCell::new(MultiSignalState {
96 data: init,
97 current_id: 1,
98 wakers: MultiWakerRegistration::new(),
99 receiver_count: 0,
100 })),
101 _phantom: PhantomData,
102 }
103 }
104
105 /// Get a [`Receiver`] for the `MultiSignal`.
106 pub fn receiver(&'a self) -> Result<Receiver<'a, M, T, N>, Error> {
107 self.mutex.lock(|state| {
108 let mut s = state.borrow_mut();
109 if s.receiver_count < N {
110 s.receiver_count += 1;
111 Ok(Receiver(Rcv::new(self)))
112 } else {
113 Err(Error::MaximumReceiversReached)
114 }
115 })
116 }
117
118 /// Update the value of the `MultiSignal`.
119 pub fn write(&self, data: T) {
120 self.mutex.lock(|state| {
121 let mut s = state.borrow_mut();
122 s.data = data;
123 s.current_id += 1;
124 s.wakers.wake();
125 })
126 }
127
128 /// Peek the current value of the `MultiSignal`.
129 pub fn peek(&self) -> T {
130 self.mutex.lock(|state| state.borrow().data.clone())
131 }
132
133 /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`.
134 pub fn peek_and(&self, f: fn(&T) -> bool) -> Option<T> {
135 self.mutex.lock(|state| {
136 let s = state.borrow();
137 if f(&s.data) {
138 Some(s.data.clone())
139 } else {
140 None
141 }
142 })
143 }
144
145 /// Get the ID of the current value of the `MultiSignal`.
146 /// This method is mostly for testing purposes.
147 #[allow(dead_code)]
148 fn get_id(&self) -> u64 {
149 self.mutex.lock(|state| state.borrow().current_id)
150 }
151
152 /// Poll the `MultiSignal` with an optional context.
153 fn get_with_context(&'a self, waker: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll<T> {
154 self.mutex.lock(|state| {
155 let mut s = state.borrow_mut();
156 match (s.current_id > waker.at_id, waker.predicate) {
157 (true, None) => {
158 waker.at_id = s.current_id;
159 Poll::Ready(s.data.clone())
160 }
161 (true, Some(f)) if f(&s.data) => {
162 waker.at_id = s.current_id;
163 Poll::Ready(s.data.clone())
164 }
165 _ => {
166 if let Some(cx) = cx {
167 s.wakers.register(cx.waker());
168 }
169 Poll::Pending
170 }
171 }
172 })
173 }
174}
175
176/// A receiver is able to `.await` a changed `MultiSignal` value.
177pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> {
178 multi_sig: &'a MultiSignal<'a, M, T, N>,
179 predicate: Option<fn(&T) -> bool>,
180 at_id: u64,
181}
182
183// f: Option<impl FnMut(&T) -> bool>
184impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> {
185 /// Create a new `Receiver` with a reference the given `MultiSignal`.
186 fn new(multi_sig: &'a MultiSignal<'a, M, T, N>) -> Self {
187 Self {
188 multi_sig,
189 predicate: None,
190 at_id: 0,
191 }
192 }
193
194 /// Wait for a change to the value of the corresponding `MultiSignal`.
195 pub fn changed<'s>(&'s mut self) -> ReceiverFuture<'s, 'a, M, T, N> {
196 self.predicate = None;
197 ReceiverFuture { subscriber: self }
198 }
199
200 /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`.
201 pub fn changed_and<'s>(&'s mut self, f: fn(&T) -> bool) -> ReceiverFuture<'s, 'a, M, T, N> {
202 self.predicate = Some(f);
203 ReceiverFuture { subscriber: self }
204 }
205
206 /// Try to get a changed value of the corresponding `MultiSignal`.
207 pub fn try_changed(&mut self) -> Option<T> {
208 self.multi_sig.mutex.lock(|state| {
209 let s = state.borrow();
210 match s.current_id > self.at_id {
211 true => {
212 self.at_id = s.current_id;
213 Some(s.data.clone())
214 }
215 false => None,
216 }
217 })
218 }
219
220 /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`.
221 pub fn try_changed_and(&mut self, f: fn(&T) -> bool) -> Option<T> {
222 self.multi_sig.mutex.lock(|state| {
223 let s = state.borrow();
224 match s.current_id > self.at_id && f(&s.data) {
225 true => {
226 self.at_id = s.current_id;
227 Some(s.data.clone())
228 }
229 false => None,
230 }
231 })
232 }
233
234 /// Peek the current value of the corresponding `MultiSignal`.
235 pub fn peek(&self) -> T {
236 self.multi_sig.peek()
237 }
238
239 /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`.
240 pub fn peek_and(&self, f: fn(&T) -> bool) -> Option<T> {
241 self.multi_sig.peek_and(f)
242 }
243
244 /// Check if the value of the corresponding `MultiSignal` has changed.
245 pub fn has_changed(&mut self) -> bool {
246 self.multi_sig
247 .mutex
248 .lock(|state| state.borrow().current_id > self.at_id)
249 }
250}
251
252/// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value.
253pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>);
254
255impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
256 type Target = Rcv<'a, M, T, N>;
257
258 fn deref(&self) -> &Self::Target {
259 &self.0
260 }
261}
262
263impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
264 fn deref_mut(&mut self) -> &mut Self::Target {
265 &mut self.0
266 }
267}
268
269/// Future for the `Receiver` wait action
270#[must_use = "futures do nothing unless you `.await` or poll them"]
271pub struct ReceiverFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> {
272 subscriber: &'s mut Rcv<'a, M, T, N>,
273}
274
275impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverFuture<'s, 'a, M, T, N> {
276 type Output = T;
277
278 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
279 self.subscriber
280 .multi_sig
281 .get_with_context(&mut self.subscriber, Some(cx))
282 }
283}
284
285impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverFuture<'s, 'a, M, T, N> {}