aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src/channel')
-rw-r--r--embassy-sync/src/channel/mod.rs5
-rw-r--r--embassy-sync/src/channel/mpmc.rs596
-rw-r--r--embassy-sync/src/channel/pubsub/mod.rs542
-rw-r--r--embassy-sync/src/channel/pubsub/publisher.rs182
-rw-r--r--embassy-sync/src/channel/pubsub/subscriber.rs152
-rw-r--r--embassy-sync/src/channel/signal.rs100
6 files changed, 1577 insertions, 0 deletions
diff --git a/embassy-sync/src/channel/mod.rs b/embassy-sync/src/channel/mod.rs
new file mode 100644
index 000000000..5df1f5c5c
--- /dev/null
+++ b/embassy-sync/src/channel/mod.rs
@@ -0,0 +1,5 @@
1//! Async channels
2
3pub mod mpmc;
4pub mod pubsub;
5pub mod signal;
diff --git a/embassy-sync/src/channel/mpmc.rs b/embassy-sync/src/channel/mpmc.rs
new file mode 100644
index 000000000..7bebd3412
--- /dev/null
+++ b/embassy-sync/src/channel/mpmc.rs
@@ -0,0 +1,596 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20
21use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32/// Send-only access to a [`Channel`].
33#[derive(Copy)]
34pub struct Sender<'ch, M, T, const N: usize>
35where
36 M: RawMutex,
37{
38 channel: &'ch Channel<M, T, N>,
39}
40
41impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
42where
43 M: RawMutex,
44{
45 fn clone(&self) -> Self {
46 Sender { channel: self.channel }
47 }
48}
49
50impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
51where
52 M: RawMutex,
53{
54 /// Sends a value.
55 ///
56 /// See [`Channel::send()`]
57 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
58 self.channel.send(message)
59 }
60
61 /// Attempt to immediately send a message.
62 ///
63 /// See [`Channel::send()`]
64 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67}
68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, T> {
72 channel: &'ch dyn DynamicChannel<T>,
73}
74
75impl<'ch, T> Clone for DynamicSender<'ch, T> {
76 fn clone(&self) -> Self {
77 DynamicSender { channel: self.channel }
78 }
79}
80
81impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
82where
83 M: RawMutex,
84{
85 fn from(s: Sender<'ch, M, T, N>) -> Self {
86 Self { channel: s.channel }
87 }
88}
89
90impl<'ch, T> DynamicSender<'ch, T> {
91 /// Sends a value.
92 ///
93 /// See [`Channel::send()`]
94 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
95 DynamicSendFuture {
96 channel: self.channel,
97 message: Some(message),
98 }
99 }
100
101 /// Attempt to immediately send a message.
102 ///
103 /// See [`Channel::send()`]
104 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
105 self.channel.try_send_with_context(message, None)
106 }
107}
108
109/// Receive-only access to a [`Channel`].
110#[derive(Copy)]
111pub struct Receiver<'ch, M, T, const N: usize>
112where
113 M: RawMutex,
114{
115 channel: &'ch Channel<M, T, N>,
116}
117
118impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
119where
120 M: RawMutex,
121{
122 fn clone(&self) -> Self {
123 Receiver { channel: self.channel }
124 }
125}
126
127impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
128where
129 M: RawMutex,
130{
131 /// Receive the next value.
132 ///
133 /// See [`Channel::recv()`].
134 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
135 self.channel.recv()
136 }
137
138 /// Attempt to immediately receive the next value.
139 ///
140 /// See [`Channel::try_recv()`]
141 pub fn try_recv(&self) -> Result<T, TryRecvError> {
142 self.channel.try_recv()
143 }
144}
145
146/// Receive-only access to a [`Channel`] without knowing channel size.
147#[derive(Copy)]
148pub struct DynamicReceiver<'ch, T> {
149 channel: &'ch dyn DynamicChannel<T>,
150}
151
152impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
153 fn clone(&self) -> Self {
154 DynamicReceiver { channel: self.channel }
155 }
156}
157
158impl<'ch, T> DynamicReceiver<'ch, T> {
159 /// Receive the next value.
160 ///
161 /// See [`Channel::recv()`].
162 pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
163 DynamicRecvFuture { channel: self.channel }
164 }
165
166 /// Attempt to immediately receive the next value.
167 ///
168 /// See [`Channel::try_recv()`]
169 pub fn try_recv(&self) -> Result<T, TryRecvError> {
170 self.channel.try_recv_with_context(None)
171 }
172}
173
174impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
175where
176 M: RawMutex,
177{
178 fn from(s: Receiver<'ch, M, T, N>) -> Self {
179 Self { channel: s.channel }
180 }
181}
182
183/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
184pub struct RecvFuture<'ch, M, T, const N: usize>
185where
186 M: RawMutex,
187{
188 channel: &'ch Channel<M, T, N>,
189}
190
191impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
192where
193 M: RawMutex,
194{
195 type Output = T;
196
197 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
198 match self.channel.try_recv_with_context(Some(cx)) {
199 Ok(v) => Poll::Ready(v),
200 Err(TryRecvError::Empty) => Poll::Pending,
201 }
202 }
203}
204
205/// Future returned by [`DynamicReceiver::recv`].
206pub struct DynamicRecvFuture<'ch, T> {
207 channel: &'ch dyn DynamicChannel<T>,
208}
209
210impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
211 type Output = T;
212
213 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
214 match self.channel.try_recv_with_context(Some(cx)) {
215 Ok(v) => Poll::Ready(v),
216 Err(TryRecvError::Empty) => Poll::Pending,
217 }
218 }
219}
220
221/// Future returned by [`Channel::send`] and [`Sender::send`].
222pub struct SendFuture<'ch, M, T, const N: usize>
223where
224 M: RawMutex,
225{
226 channel: &'ch Channel<M, T, N>,
227 message: Option<T>,
228}
229
230impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
231where
232 M: RawMutex,
233{
234 type Output = ();
235
236 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237 match self.message.take() {
238 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
239 Ok(..) => Poll::Ready(()),
240 Err(TrySendError::Full(m)) => {
241 self.message = Some(m);
242 Poll::Pending
243 }
244 },
245 None => panic!("Message cannot be None"),
246 }
247 }
248}
249
250impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
251
252/// Future returned by [`DynamicSender::send`].
253pub struct DynamicSendFuture<'ch, T> {
254 channel: &'ch dyn DynamicChannel<T>,
255 message: Option<T>,
256}
257
258impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
259 type Output = ();
260
261 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 match self.message.take() {
263 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
264 Ok(..) => Poll::Ready(()),
265 Err(TrySendError::Full(m)) => {
266 self.message = Some(m);
267 Poll::Pending
268 }
269 },
270 None => panic!("Message cannot be None"),
271 }
272 }
273}
274
275impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
276
277trait DynamicChannel<T> {
278 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
279
280 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
281}
282
283/// Error returned by [`try_recv`](Channel::try_recv).
284#[derive(PartialEq, Eq, Clone, Copy, Debug)]
285#[cfg_attr(feature = "defmt", derive(defmt::Format))]
286pub enum TryRecvError {
287 /// A message could not be received because the channel is empty.
288 Empty,
289}
290
291/// Error returned by [`try_send`](Channel::try_send).
292#[derive(PartialEq, Eq, Clone, Copy, Debug)]
293#[cfg_attr(feature = "defmt", derive(defmt::Format))]
294pub enum TrySendError<T> {
295 /// The data could not be sent on the channel because the channel is
296 /// currently full and sending would require blocking.
297 Full(T),
298}
299
300struct ChannelState<T, const N: usize> {
301 queue: Deque<T, N>,
302 receiver_waker: WakerRegistration,
303 senders_waker: WakerRegistration,
304}
305
306impl<T, const N: usize> ChannelState<T, N> {
307 const fn new() -> Self {
308 ChannelState {
309 queue: Deque::new(),
310 receiver_waker: WakerRegistration::new(),
311 senders_waker: WakerRegistration::new(),
312 }
313 }
314
315 fn try_recv(&mut self) -> Result<T, TryRecvError> {
316 self.try_recv_with_context(None)
317 }
318
319 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
320 if self.queue.is_full() {
321 self.senders_waker.wake();
322 }
323
324 if let Some(message) = self.queue.pop_front() {
325 Ok(message)
326 } else {
327 if let Some(cx) = cx {
328 self.receiver_waker.register(cx.waker());
329 }
330 Err(TryRecvError::Empty)
331 }
332 }
333
334 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
335 self.try_send_with_context(message, None)
336 }
337
338 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
339 match self.queue.push_back(message) {
340 Ok(()) => {
341 self.receiver_waker.wake();
342 Ok(())
343 }
344 Err(message) => {
345 if let Some(cx) = cx {
346 self.senders_waker.register(cx.waker());
347 }
348 Err(TrySendError::Full(message))
349 }
350 }
351 }
352}
353
354/// A bounded channel for communicating between asynchronous tasks
355/// with backpressure.
356///
357/// The channel will buffer up to the provided number of messages. Once the
358/// buffer is full, attempts to `send` new messages will wait until a message is
359/// received from the channel.
360///
361/// All data sent will become available in the same order as it was sent.
362pub struct Channel<M, T, const N: usize>
363where
364 M: RawMutex,
365{
366 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
367}
368
369impl<M, T, const N: usize> Channel<M, T, N>
370where
371 M: RawMutex,
372{
373 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
374 ///
375 /// ```
376 /// use embassy_sync::channel::mpmc::Channel;
377 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
378 ///
379 /// // Declare a bounded channel of 3 u32s.
380 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
381 /// ```
382 pub const fn new() -> Self {
383 Self {
384 inner: Mutex::new(RefCell::new(ChannelState::new())),
385 }
386 }
387
388 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
389 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
390 }
391
392 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
393 self.lock(|c| c.try_recv_with_context(cx))
394 }
395
396 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
397 self.lock(|c| c.try_send_with_context(m, cx))
398 }
399
400 /// Get a sender for this channel.
401 pub fn sender(&self) -> Sender<'_, M, T, N> {
402 Sender { channel: self }
403 }
404
405 /// Get a receiver for this channel.
406 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
407 Receiver { channel: self }
408 }
409
410 /// Send a value, waiting until there is capacity.
411 ///
412 /// Sending completes when the value has been pushed to the channel's queue.
413 /// This doesn't mean the value has been received yet.
414 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
415 SendFuture {
416 channel: self,
417 message: Some(message),
418 }
419 }
420
421 /// Attempt to immediately send a message.
422 ///
423 /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
424 /// buffer is full, instead of waiting.
425 ///
426 /// # Errors
427 ///
428 /// If the channel capacity has been reached, i.e., the channel has `n`
429 /// buffered values where `n` is the argument passed to [`Channel`], then an
430 /// error is returned.
431 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
432 self.lock(|c| c.try_send(message))
433 }
434
435 /// Receive the next value.
436 ///
437 /// If there are no messages in the channel's buffer, this method will
438 /// wait until a message is sent.
439 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
440 RecvFuture { channel: self }
441 }
442
443 /// Attempt to immediately receive a message.
444 ///
445 /// This method will either receive a message from the channel immediately or return an error
446 /// if the channel is empty.
447 pub fn try_recv(&self) -> Result<T, TryRecvError> {
448 self.lock(|c| c.try_recv())
449 }
450}
451
452/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
453/// tradeoff cost of dynamic dispatch.
454impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
455where
456 M: RawMutex,
457{
458 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
459 Channel::try_send_with_context(self, m, cx)
460 }
461
462 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
463 Channel::try_recv_with_context(self, cx)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use core::time::Duration;
470
471 use futures_executor::ThreadPool;
472 use futures_timer::Delay;
473 use futures_util::task::SpawnExt;
474 use static_cell::StaticCell;
475
476 use super::*;
477 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
478
479 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
480 c.queue.capacity() - c.queue.len()
481 }
482
483 #[test]
484 fn sending_once() {
485 let mut c = ChannelState::<u32, 3>::new();
486 assert!(c.try_send(1).is_ok());
487 assert_eq!(capacity(&c), 2);
488 }
489
490 #[test]
491 fn sending_when_full() {
492 let mut c = ChannelState::<u32, 3>::new();
493 let _ = c.try_send(1);
494 let _ = c.try_send(1);
495 let _ = c.try_send(1);
496 match c.try_send(2) {
497 Err(TrySendError::Full(2)) => assert!(true),
498 _ => assert!(false),
499 }
500 assert_eq!(capacity(&c), 0);
501 }
502
503 #[test]
504 fn receiving_once_with_one_send() {
505 let mut c = ChannelState::<u32, 3>::new();
506 assert!(c.try_send(1).is_ok());
507 assert_eq!(c.try_recv().unwrap(), 1);
508 assert_eq!(capacity(&c), 3);
509 }
510
511 #[test]
512 fn receiving_when_empty() {
513 let mut c = ChannelState::<u32, 3>::new();
514 match c.try_recv() {
515 Err(TryRecvError::Empty) => assert!(true),
516 _ => assert!(false),
517 }
518 assert_eq!(capacity(&c), 3);
519 }
520
521 #[test]
522 fn simple_send_and_receive() {
523 let c = Channel::<NoopRawMutex, u32, 3>::new();
524 assert!(c.try_send(1).is_ok());
525 assert_eq!(c.try_recv().unwrap(), 1);
526 }
527
528 #[test]
529 fn cloning() {
530 let c = Channel::<NoopRawMutex, u32, 3>::new();
531 let r1 = c.receiver();
532 let s1 = c.sender();
533
534 let _ = r1.clone();
535 let _ = s1.clone();
536 }
537
538 #[test]
539 fn dynamic_dispatch() {
540 let c = Channel::<NoopRawMutex, u32, 3>::new();
541 let s: DynamicSender<'_, u32> = c.sender().into();
542 let r: DynamicReceiver<'_, u32> = c.receiver().into();
543
544 assert!(s.try_send(1).is_ok());
545 assert_eq!(r.try_recv().unwrap(), 1);
546 }
547
548 #[futures_test::test]
549 async fn receiver_receives_given_try_send_async() {
550 let executor = ThreadPool::new().unwrap();
551
552 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
553 let c = &*CHANNEL.init(Channel::new());
554 let c2 = c;
555 assert!(executor
556 .spawn(async move {
557 assert!(c2.try_send(1).is_ok());
558 })
559 .is_ok());
560 assert_eq!(c.recv().await, 1);
561 }
562
563 #[futures_test::test]
564 async fn sender_send_completes_if_capacity() {
565 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
566 c.send(1).await;
567 assert_eq!(c.recv().await, 1);
568 }
569
570 #[futures_test::test]
571 async fn senders_sends_wait_until_capacity() {
572 let executor = ThreadPool::new().unwrap();
573
574 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
575 let c = &*CHANNEL.init(Channel::new());
576 assert!(c.try_send(1).is_ok());
577
578 let c2 = c;
579 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
580 let c2 = c;
581 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
582 // Wish I could think of a means of determining that the async send is waiting instead.
583 // However, I've used the debugger to observe that the send does indeed wait.
584 Delay::new(Duration::from_millis(500)).await;
585 assert_eq!(c.recv().await, 1);
586 assert!(executor
587 .spawn(async move {
588 loop {
589 c.recv().await;
590 }
591 })
592 .is_ok());
593 send_task_1.unwrap().await;
594 send_task_2.unwrap().await;
595 }
596}
diff --git a/embassy-sync/src/channel/pubsub/mod.rs b/embassy-sync/src/channel/pubsub/mod.rs
new file mode 100644
index 000000000..f62b4d118
--- /dev/null
+++ b/embassy-sync/src/channel/pubsub/mod.rs
@@ -0,0 +1,542 @@
1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2
3#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
24///
25/// Any published message can be read by all subscribers.
26/// A publisher can choose how it sends its message.
27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy_sync::channel::pubsub::WaitResult;
38/// # use embassy_sync::channel::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79 PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81 /// Create a new channel
82 pub const fn new() -> Self {
83 Self {
84 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85 }
86 }
87
88 /// Create a new subscriber. It will only receive messages that are published after its creation.
89 ///
90 /// If there are no subscriber slots left, an error will be returned.
91 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut();
94
95 if s.subscriber_count >= SUBS {
96 Err(Error::MaximumSubscribersReached)
97 } else {
98 s.subscriber_count += 1;
99 Ok(Subscriber(Sub::new(s.next_message_id, self)))
100 }
101 })
102 }
103
104 /// Create a new subscriber. It will only receive messages that are published after its creation.
105 ///
106 /// If there are no subscriber slots left, an error will be returned.
107 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108 self.inner.lock(|inner| {
109 let mut s = inner.borrow_mut();
110
111 if s.subscriber_count >= SUBS {
112 Err(Error::MaximumSubscribersReached)
113 } else {
114 s.subscriber_count += 1;
115 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116 }
117 })
118 }
119
120 /// Create a new publisher
121 ///
122 /// If there are no publisher slots left, an error will be returned.
123 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut();
126
127 if s.publisher_count >= PUBS {
128 Err(Error::MaximumPublishersReached)
129 } else {
130 s.publisher_count += 1;
131 Ok(Publisher(Pub::new(self)))
132 }
133 })
134 }
135
136 /// Create a new publisher
137 ///
138 /// If there are no publisher slots left, an error will be returned.
139 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140 self.inner.lock(|inner| {
141 let mut s = inner.borrow_mut();
142
143 if s.publisher_count >= PUBS {
144 Err(Error::MaximumPublishersReached)
145 } else {
146 s.publisher_count += 1;
147 Ok(DynPublisher(Pub::new(self)))
148 }
149 })
150 }
151
152 /// Create a new publisher that can only send immediate messages.
153 /// This kind of publisher does not take up a publisher slot.
154 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self))
156 }
157
158 /// Create a new publisher that can only send immediate messages.
159 /// This kind of publisher does not take up a publisher slot.
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self))
162 }
163}
164
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
169 self.inner.lock(|s| {
170 let mut s = s.borrow_mut();
171
172 // Check if we can read a message
173 match s.get_message(*next_message_id) {
174 // Yes, so we are done polling
175 Some(WaitResult::Message(message)) => {
176 *next_message_id += 1;
177 Poll::Ready(WaitResult::Message(message))
178 }
179 // No, so we need to reregister our waker and sleep again
180 None => {
181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker());
183 }
184 Poll::Pending
185 }
186 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
187 Some(WaitResult::Lagged(amount)) => {
188 *next_message_id += amount;
189 Poll::Ready(WaitResult::Lagged(amount))
190 }
191 }
192 })
193 }
194
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| {
197 let mut s = s.borrow_mut();
198 // Try to publish the message
199 match s.try_publish(message) {
200 // We did it, we are ready
201 Ok(()) => Ok(()),
202 // The queue is full, so we need to reregister our waker and go to sleep
203 Err(message) => {
204 if let Some(cx) = cx {
205 s.register_publisher_waker(cx.waker());
206 }
207 Err(message)
208 }
209 }
210 })
211 }
212
213 fn publish_immediate(&self, message: T) {
214 self.inner.lock(|s| {
215 let mut s = s.borrow_mut();
216 s.publish_immediate(message)
217 })
218 }
219
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| {
222 let mut s = s.borrow_mut();
223 s.unregister_subscriber(subscriber_next_message_id)
224 })
225 }
226
227 fn unregister_publisher(&self) {
228 self.inner.lock(|s| {
229 let mut s = s.borrow_mut();
230 s.unregister_publisher()
231 })
232 }
233}
234
235/// Internal state for the PubSub channel
236struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
237 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
238 queue: Deque<(T, usize), CAP>,
239 /// Every message has an id.
240 /// Don't worry, we won't run out.
241 /// If a million messages were published every second, then the ID's would run out in about 584942 years.
242 next_message_id: u64,
243 /// Collection of wakers for Subscribers that are waiting.
244 subscriber_wakers: MultiWakerRegistration<SUBS>,
245 /// Collection of wakers for Publishers that are waiting.
246 publisher_wakers: MultiWakerRegistration<PUBS>,
247 /// The amount of subscribers that are active
248 subscriber_count: usize,
249 /// The amount of publishers that are active
250 publisher_count: usize,
251}
252
253impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
254 /// Create a new internal channel state
255 const fn new() -> Self {
256 Self {
257 queue: Deque::new(),
258 next_message_id: 0,
259 subscriber_wakers: MultiWakerRegistration::new(),
260 publisher_wakers: MultiWakerRegistration::new(),
261 subscriber_count: 0,
262 publisher_count: 0,
263 }
264 }
265
266 fn try_publish(&mut self, message: T) -> Result<(), T> {
267 if self.subscriber_count == 0 {
268 // We don't need to publish anything because there is no one to receive it
269 return Ok(());
270 }
271
272 if self.queue.is_full() {
273 return Err(message);
274 }
275 // We just did a check for this
276 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
277
278 self.next_message_id += 1;
279
280 // Wake all of the subscribers
281 self.subscriber_wakers.wake();
282
283 Ok(())
284 }
285
286 fn publish_immediate(&mut self, message: T) {
287 // Make space in the queue if required
288 if self.queue.is_full() {
289 self.queue.pop_front();
290 }
291
292 // This will succeed because we made sure there is space
293 self.try_publish(message).ok().unwrap();
294 }
295
296 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
297 let start_id = self.next_message_id - self.queue.len() as u64;
298
299 if message_id < start_id {
300 return Some(WaitResult::Lagged(start_id - message_id));
301 }
302
303 let current_message_index = (message_id - start_id) as usize;
304
305 if current_message_index >= self.queue.len() {
306 return None;
307 }
308
309 // We've checked that the index is valid
310 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
311
312 // We're reading this item, so decrement the counter
313 queue_item.1 -= 1;
314 let message = queue_item.0.clone();
315
316 if current_message_index == 0 && queue_item.1 == 0 {
317 self.queue.pop_front();
318 self.publisher_wakers.wake();
319 }
320
321 Some(WaitResult::Message(message))
322 }
323
324 fn register_subscriber_waker(&mut self, waker: &Waker) {
325 match self.subscriber_wakers.register(waker) {
326 Ok(()) => {}
327 Err(_) => {
328 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
329 // We need to throw it away. It's a bit inefficient, but we can wake everything.
330 // Any future that is still active will simply reregister.
331 // This won't happen a lot, so it's ok.
332 self.subscriber_wakers.wake();
333 self.subscriber_wakers.register(waker).unwrap();
334 }
335 }
336 }
337
338 fn register_publisher_waker(&mut self, waker: &Waker) {
339 match self.publisher_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.publisher_wakers.wake();
347 self.publisher_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
353 self.subscriber_count -= 1;
354
355 // All messages that haven't been read yet by this subscriber must have their counter decremented
356 let start_id = self.next_message_id - self.queue.len() as u64;
357 if subscriber_next_message_id >= start_id {
358 let current_message_index = (subscriber_next_message_id - start_id) as usize;
359 self.queue
360 .iter_mut()
361 .skip(current_message_index)
362 .for_each(|(_, counter)| *counter -= 1);
363 }
364 }
365
366 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1;
368 }
369}
370
371/// Error type for the [PubSubChannel]
372#[derive(Debug, PartialEq, Eq, Clone)]
373#[cfg_attr(feature = "defmt", derive(defmt::Format))]
374pub enum Error {
375 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
376 /// the capacity of the channels must be increased.
377 MaximumSubscribersReached,
378 /// All publisher slots are used. To add another publisher, first another publisher must be dropped or
379 /// the capacity of the channels must be increased.
380 MaximumPublishersReached,
381}
382
383/// 'Middle level' behaviour of the pubsub channel.
384/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id.
387 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390
391 /// Try to publish a message to the queue.
392 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
394 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
395
396 /// Publish a message immediately
397 fn publish_immediate(&self, message: T);
398
399 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401
402 /// Let the channel know that a publisher has dropped
403 fn unregister_publisher(&self);
404}
405
406/// The result of the subscriber wait procedure
407#[derive(Debug, Clone, PartialEq, Eq)]
408#[cfg_attr(feature = "defmt", derive(defmt::Format))]
409pub enum WaitResult<T> {
410 /// The subscriber did not receive all messages and lagged by the given amount of messages.
411 /// (This is the amount of messages that were missed)
412 Lagged(u64),
413 /// A message was received
414 Message(T),
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::blocking_mutex::raw::NoopRawMutex;
421
422 #[futures_test::test]
423 async fn dyn_pub_sub_works() {
424 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
425
426 let mut sub0 = channel.dyn_subscriber().unwrap();
427 let mut sub1 = channel.dyn_subscriber().unwrap();
428 let pub0 = channel.dyn_publisher().unwrap();
429
430 pub0.publish(42).await;
431
432 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
433 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
434
435 assert_eq!(sub0.try_next_message(), None);
436 assert_eq!(sub1.try_next_message(), None);
437 }
438
439 #[futures_test::test]
440 async fn all_subscribers_receive() {
441 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
442
443 let mut sub0 = channel.subscriber().unwrap();
444 let mut sub1 = channel.subscriber().unwrap();
445 let pub0 = channel.publisher().unwrap();
446
447 pub0.publish(42).await;
448
449 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
450 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
451
452 assert_eq!(sub0.try_next_message(), None);
453 assert_eq!(sub1.try_next_message(), None);
454 }
455
456 #[futures_test::test]
457 async fn lag_when_queue_full_on_immediate_publish() {
458 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
459
460 let mut sub0 = channel.subscriber().unwrap();
461 let pub0 = channel.publisher().unwrap();
462
463 pub0.publish_immediate(42);
464 pub0.publish_immediate(43);
465 pub0.publish_immediate(44);
466 pub0.publish_immediate(45);
467 pub0.publish_immediate(46);
468 pub0.publish_immediate(47);
469
470 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
471 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
472 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
473 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
474 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
475 assert_eq!(sub0.try_next_message(), None);
476 }
477
478 #[test]
479 fn limited_subs_and_pubs() {
480 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
481
482 let sub0 = channel.subscriber();
483 let sub1 = channel.subscriber();
484 let sub2 = channel.subscriber();
485 let sub3 = channel.subscriber();
486 let sub4 = channel.subscriber();
487
488 assert!(sub0.is_ok());
489 assert!(sub1.is_ok());
490 assert!(sub2.is_ok());
491 assert!(sub3.is_ok());
492 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
493
494 drop(sub0);
495
496 let sub5 = channel.subscriber();
497 assert!(sub5.is_ok());
498
499 // publishers
500
501 let pub0 = channel.publisher();
502 let pub1 = channel.publisher();
503 let pub2 = channel.publisher();
504 let pub3 = channel.publisher();
505 let pub4 = channel.publisher();
506
507 assert!(pub0.is_ok());
508 assert!(pub1.is_ok());
509 assert!(pub2.is_ok());
510 assert!(pub3.is_ok());
511 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
512
513 drop(pub0);
514
515 let pub5 = channel.publisher();
516 assert!(pub5.is_ok());
517 }
518
519 #[test]
520 fn publisher_wait_on_full_queue() {
521 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
522
523 let pub0 = channel.publisher().unwrap();
524
525 // There are no subscribers, so the queue will never be full
526 assert_eq!(pub0.try_publish(0), Ok(()));
527 assert_eq!(pub0.try_publish(0), Ok(()));
528 assert_eq!(pub0.try_publish(0), Ok(()));
529 assert_eq!(pub0.try_publish(0), Ok(()));
530 assert_eq!(pub0.try_publish(0), Ok(()));
531
532 let sub0 = channel.subscriber().unwrap();
533
534 assert_eq!(pub0.try_publish(0), Ok(()));
535 assert_eq!(pub0.try_publish(0), Ok(()));
536 assert_eq!(pub0.try_publish(0), Ok(()));
537 assert_eq!(pub0.try_publish(0), Ok(()));
538 assert_eq!(pub0.try_publish(0), Err(0));
539
540 drop(sub0);
541 }
542}
diff --git a/embassy-sync/src/channel/pubsub/publisher.rs b/embassy-sync/src/channel/pubsub/publisher.rs
new file mode 100644
index 000000000..705797f60
--- /dev/null
+++ b/embassy-sync/src/channel/pubsub/publisher.rs
@@ -0,0 +1,182 @@
1//! Implementation of anything directly publisher related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A publisher to a channel
13pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The channel we are a publisher for
15 channel: &'a PSB,
16 _phantom: PhantomData<T>,
17}
18
19impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
20 pub(super) fn new(channel: &'a PSB) -> Self {
21 Self {
22 channel,
23 _phantom: Default::default(),
24 }
25 }
26
27 /// Publish a message right now even when the queue is full.
28 /// This may cause a subscriber to miss an older message.
29 pub fn publish_immediate(&self, message: T) {
30 self.channel.publish_immediate(message)
31 }
32
33 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
34 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
35 PublisherWaitFuture {
36 message: Some(message),
37 publisher: self,
38 }
39 }
40
41 /// Publish a message if there is space in the message queue
42 pub fn try_publish(&self, message: T) -> Result<(), T> {
43 self.channel.publish_with_context(message, None)
44 }
45}
46
47impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
48 fn drop(&mut self) {
49 self.channel.unregister_publisher()
50 }
51}
52
53/// A publisher that holds a dynamic reference to the channel
54pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
55
56impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
57 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
58
59 fn deref(&self) -> &Self::Target {
60 &self.0
61 }
62}
63
64impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
65 fn deref_mut(&mut self) -> &mut Self::Target {
66 &mut self.0
67 }
68}
69
70/// A publisher that holds a generic reference to the channel
71pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
72 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
73);
74
75impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
76 for Publisher<'a, M, T, CAP, SUBS, PUBS>
77{
78 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
79
80 fn deref(&self) -> &Self::Target {
81 &self.0
82 }
83}
84
85impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
86 for Publisher<'a, M, T, CAP, SUBS, PUBS>
87{
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.0
90 }
91}
92
93/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
94/// (So an infinite amount is possible)
95pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
96 /// The channel we are a publisher for
97 channel: &'a PSB,
98 _phantom: PhantomData<T>,
99}
100
101impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
102 pub(super) fn new(channel: &'a PSB) -> Self {
103 Self {
104 channel,
105 _phantom: Default::default(),
106 }
107 }
108 /// Publish the message right now even when the queue is full.
109 /// This may cause a subscriber to miss an older message.
110 pub fn publish_immediate(&self, message: T) {
111 self.channel.publish_immediate(message)
112 }
113
114 /// Publish a message if there is space in the message queue
115 pub fn try_publish(&self, message: T) -> Result<(), T> {
116 self.channel.publish_with_context(message, None)
117 }
118}
119
120/// An immediate publisher that holds a dynamic reference to the channel
121pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
122
123impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
124 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
125
126 fn deref(&self) -> &Self::Target {
127 &self.0
128 }
129}
130
131impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// An immediate publisher that holds a generic reference to the channel
138pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
139 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
140);
141
142impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
143 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
144{
145 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
146
147 fn deref(&self) -> &Self::Target {
148 &self.0
149 }
150}
151
152impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
153 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
154{
155 fn deref_mut(&mut self) -> &mut Self::Target {
156 &mut self.0
157 }
158}
159
160/// Future for the publisher wait action
161pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
162 /// The message we need to publish
163 message: Option<T>,
164 publisher: &'s Pub<'a, PSB, T>,
165}
166
167impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
168 type Output = ();
169
170 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let message = self.message.take().unwrap();
172 match self.publisher.channel.publish_with_context(message, Some(cx)) {
173 Ok(()) => Poll::Ready(()),
174 Err(message) => {
175 self.message = Some(message);
176 Poll::Pending
177 }
178 }
179 }
180}
181
182impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy-sync/src/channel/pubsub/subscriber.rs b/embassy-sync/src/channel/pubsub/subscriber.rs
new file mode 100644
index 000000000..b9a2cbe18
--- /dev/null
+++ b/embassy-sync/src/channel/pubsub/subscriber.rs
@@ -0,0 +1,152 @@
1//! Implementation of anything directly subscriber related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A subscriber to a channel
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The message id of the next message we are yet to receive
15 next_message_id: u64,
16 /// The channel we are a subscriber to
17 channel: &'a PSB,
18 _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23 Self {
24 next_message_id,
25 channel,
26 _phantom: Default::default(),
27 }
28 }
29
30 /// Wait for a published message
31 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32 SubscriberWaitFuture { subscriber: self }
33 }
34
35 /// Wait for a published message (ignoring lag results)
36 pub async fn next_message_pure(&mut self) -> T {
37 loop {
38 match self.next_message().await {
39 WaitResult::Lagged(_) => continue,
40 WaitResult::Message(message) => break message,
41 }
42 }
43 }
44
45 /// Try to see if there's a published message we haven't received yet.
46 ///
47 /// This function does not peek. The message is received if there is one.
48 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50 Poll::Ready(result) => Some(result),
51 Poll::Pending => None,
52 }
53 }
54
55 /// Try to see if there's a published message we haven't received yet (ignoring lag results).
56 ///
57 /// This function does not peek. The message is received if there is one.
58 pub fn try_next_message_pure(&mut self) -> Option<T> {
59 loop {
60 match self.try_next_message() {
61 Some(WaitResult::Lagged(_)) => continue,
62 Some(WaitResult::Message(message)) => break Some(message),
63 None => break None,
64 }
65 }
66 }
67}
68
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
70 fn drop(&mut self) {
71 self.channel.unregister_subscriber(self.next_message_id)
72 }
73}
74
75impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
76
77/// Warning: The stream implementation ignores lag results and returns all messages.
78/// This might miss some messages without you knowing it.
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
80 type Item = T;
81
82 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 match self
84 .channel
85 .get_message_with_context(&mut self.next_message_id, Some(cx))
86 {
87 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
88 Poll::Ready(WaitResult::Lagged(_)) => {
89 cx.waker().wake_by_ref();
90 Poll::Pending
91 }
92 Poll::Pending => Poll::Pending,
93 }
94 }
95}
96
97/// A subscriber that holds a dynamic reference to the channel
98pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
99
100impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
101 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.0
111 }
112}
113
114/// A subscriber that holds a generic reference to the channel
115pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
116 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
117);
118
119impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
120 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
121{
122 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
123
124 fn deref(&self) -> &Self::Target {
125 &self.0
126 }
127}
128
129impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
130 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
131{
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// Future for the subscriber wait action
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>,
140}
141
142impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
143 type Output = WaitResult<T>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 self.subscriber
147 .channel
148 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
149 }
150}
151
152impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy-sync/src/channel/signal.rs b/embassy-sync/src/channel/signal.rs
new file mode 100644
index 000000000..9279266c1
--- /dev/null
+++ b/embassy-sync/src/channel/signal.rs
@@ -0,0 +1,100 @@
1//! A synchronization primitive for passing the latest value to a task.
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::mem;
5use core::task::{Context, Poll, Waker};
6
7/// Single-slot signaling primitive.
8///
9/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except
10/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead
11/// of waiting for the receiver to pop the previous value.
12///
13/// It is useful for sending data between tasks when the receiver only cares about
14/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state"
15/// updates.
16///
17/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead.
18///
19/// Signals are generally declared as `static`s and then borrowed as required.
20///
21/// ```
22/// use embassy_sync::channel::signal::Signal;
23///
24/// enum SomeCommand {
25/// On,
26/// Off,
27/// }
28///
29/// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new();
30/// ```
31pub struct Signal<T> {
32 state: UnsafeCell<State<T>>,
33}
34
35enum State<T> {
36 None,
37 Waiting(Waker),
38 Signaled(T),
39}
40
41unsafe impl<T: Send> Send for Signal<T> {}
42unsafe impl<T: Send> Sync for Signal<T> {}
43
44impl<T> Signal<T> {
45 /// Create a new `Signal`.
46 pub const fn new() -> Self {
47 Self {
48 state: UnsafeCell::new(State::None),
49 }
50 }
51}
52
53impl<T: Send> Signal<T> {
54 /// Mark this Signal as signaled.
55 pub fn signal(&self, val: T) {
56 critical_section::with(|_| unsafe {
57 let state = &mut *self.state.get();
58 if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) {
59 waker.wake();
60 }
61 })
62 }
63
64 /// Remove the queued value in this `Signal`, if any.
65 pub fn reset(&self) {
66 critical_section::with(|_| unsafe {
67 let state = &mut *self.state.get();
68 *state = State::None
69 })
70 }
71
72 /// Manually poll the Signal future.
73 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
74 critical_section::with(|_| unsafe {
75 let state = &mut *self.state.get();
76 match state {
77 State::None => {
78 *state = State::Waiting(cx.waker().clone());
79 Poll::Pending
80 }
81 State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending,
82 State::Waiting(_) => panic!("waker overflow"),
83 State::Signaled(_) => match mem::replace(state, State::None) {
84 State::Signaled(res) => Poll::Ready(res),
85 _ => unreachable!(),
86 },
87 }
88 })
89 }
90
91 /// Future that completes when this Signal has been signaled.
92 pub fn wait(&self) -> impl Future<Output = T> + '_ {
93 futures_util::future::poll_fn(move |cx| self.poll_wait(cx))
94 }
95
96 /// non-blocking method to check whether this signal has been signaled.
97 pub fn signaled(&self) -> bool {
98 critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_)))
99 }
100}