aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDion Dokter <[email protected]>2022-06-17 15:06:41 +0200
committerDion Dokter <[email protected]>2022-06-17 15:06:41 +0200
commit949b548d45910ca8ad7a3a5d4f65f9b230431c9e (patch)
tree761eeb36a092fefce21ec3bf78cbe3d407c9492e
parent4a5127aead15b913976138f94226f09026e04771 (diff)
Refactor pub/sub impls into their own files
-rw-r--r--embassy/src/channel/pubsub/mod.rs350
-rw-r--r--embassy/src/channel/pubsub/publisher.rs183
-rw-r--r--embassy/src/channel/pubsub/subscriber.rs153
3 files changed, 363 insertions, 323 deletions
diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs
index 334401884..fdd675362 100644
--- a/embassy/src/channel/pubsub/mod.rs
+++ b/embassy/src/channel/pubsub/mod.rs
@@ -1,19 +1,25 @@
1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. 1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2 2
3#![deny(missing_docs)]
4
3use core::cell::RefCell; 5use core::cell::RefCell;
4use core::fmt::Debug; 6use core::fmt::Debug;
5use core::future::Future;
6use core::marker::PhantomData;
7use core::ops::{Deref, DerefMut};
8use core::pin::Pin;
9use core::task::{Context, Poll, Waker}; 7use core::task::{Context, Poll, Waker};
10 8
11use heapless::Deque; 9use heapless::Deque;
12 10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex; 13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex; 14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration; 15use crate::waitqueue::MultiWakerRegistration;
16 16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
17/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers 23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
18/// 24///
19/// Any published message can be read by all subscribers. 25/// Any published message can be read by all subscribers.
@@ -48,11 +54,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
48 Err(Error::MaximumSubscribersReached) 54 Err(Error::MaximumSubscribersReached)
49 } else { 55 } else {
50 s.subscriber_count += 1; 56 s.subscriber_count += 1;
51 Ok(Subscriber(Sub { 57 Ok(Subscriber(Sub::new(s.next_message_id, self)))
52 next_message_id: s.next_message_id,
53 channel: self,
54 _phantom: Default::default(),
55 }))
56 } 58 }
57 }) 59 })
58 } 60 }
@@ -68,11 +70,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
68 Err(Error::MaximumSubscribersReached) 70 Err(Error::MaximumSubscribersReached)
69 } else { 71 } else {
70 s.subscriber_count += 1; 72 s.subscriber_count += 1;
71 Ok(DynSubscriber(Sub { 73 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
72 next_message_id: s.next_message_id,
73 channel: self as _,
74 _phantom: Default::default(),
75 }))
76 } 74 }
77 }) 75 })
78 } 76 }
@@ -88,10 +86,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
88 Err(Error::MaximumPublishersReached) 86 Err(Error::MaximumPublishersReached)
89 } else { 87 } else {
90 s.publisher_count += 1; 88 s.publisher_count += 1;
91 Ok(Publisher(Pub { 89 Ok(Publisher(Pub::new(self)))
92 channel: self,
93 _phantom: Default::default(),
94 }))
95 } 90 }
96 }) 91 })
97 } 92 }
@@ -107,10 +102,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
107 Err(Error::MaximumPublishersReached) 102 Err(Error::MaximumPublishersReached)
108 } else { 103 } else {
109 s.publisher_count += 1; 104 s.publisher_count += 1;
110 Ok(DynPublisher(Pub { 105 Ok(DynPublisher(Pub::new(self)))
111 channel: self,
112 _phantom: Default::default(),
113 }))
114 } 106 }
115 }) 107 })
116 } 108 }
@@ -118,19 +110,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
118 /// Create a new publisher that can only send immediate messages. 110 /// Create a new publisher that can only send immediate messages.
119 /// This kind of publisher does not take up a publisher slot. 111 /// This kind of publisher does not take up a publisher slot.
120 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { 112 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
121 ImmediatePublisher(ImmediatePub { 113 ImmediatePublisher(ImmediatePub::new(self))
122 channel: self,
123 _phantom: Default::default(),
124 })
125 } 114 }
126 115
127 /// Create a new publisher that can only send immediate messages. 116 /// Create a new publisher that can only send immediate messages.
128 /// This kind of publisher does not take up a publisher slot. 117 /// This kind of publisher does not take up a publisher slot.
129 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { 118 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
130 DynImmediatePublisher(ImmediatePub { 119 DynImmediatePublisher(ImmediatePub::new(self))
131 channel: self,
132 _phantom: Default::default(),
133 })
134 } 120 }
135} 121}
136 122
@@ -340,258 +326,6 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
340 } 326 }
341} 327}
342 328
343/// A subscriber to a channel
344///
345/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
346/// generics are erased on this subscriber
347pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
348 /// The message id of the next message we are yet to receive
349 next_message_id: u64,
350 /// The channel we are a subscriber to
351 channel: &'a PSB,
352 _phantom: PhantomData<T>,
353}
354
355impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
356 /// Wait for a published message
357 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
358 SubscriberWaitFuture { subscriber: self }
359 }
360
361 /// Wait for a published message (ignoring lag results)
362 pub async fn next_message_pure(&mut self) -> T {
363 loop {
364 match self.next_message().await {
365 WaitResult::Lagged(_) => continue,
366 WaitResult::Message(message) => break message,
367 }
368 }
369 }
370
371 /// Try to see if there's a published message we haven't received yet.
372 ///
373 /// This function does not peek. The message is received if there is one.
374 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
375 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
376 Poll::Ready(result) => Some(result),
377 Poll::Pending => None,
378 }
379 }
380
381 /// Try to see if there's a published message we haven't received yet (ignoring lag results).
382 ///
383 /// This function does not peek. The message is received if there is one.
384 pub fn try_next_message_pure(&mut self) -> Option<T> {
385 loop {
386 match self.try_next_message() {
387 Some(WaitResult::Lagged(_)) => continue,
388 Some(WaitResult::Message(message)) => break Some(message),
389 None => break None,
390 }
391 }
392 }
393}
394
395impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
396 fn drop(&mut self) {
397 self.channel.unregister_subscriber(self.next_message_id)
398 }
399}
400
401impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
402
403pub struct DynSubscriber<'a, T: Clone>(Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
404
405impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
406 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
407
408 fn deref(&self) -> &Self::Target {
409 &self.0
410 }
411}
412
413impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
414 fn deref_mut(&mut self) -> &mut Self::Target {
415 &mut self.0
416 }
417}
418
419pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
420 Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
421);
422
423impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
424 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
425{
426 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
427
428 fn deref(&self) -> &Self::Target {
429 &self.0
430 }
431}
432
433impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
434 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
435{
436 fn deref_mut(&mut self) -> &mut Self::Target {
437 &mut self.0
438 }
439}
440
441/// Warning: The stream implementation ignores lag results and returns all messages.
442/// This might miss some messages without you knowing it.
443impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> {
444 type Item = T;
445
446 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
447 match self
448 .channel
449 .get_message_with_context(&mut self.next_message_id, Some(cx))
450 {
451 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
452 Poll::Ready(WaitResult::Lagged(_)) => {
453 cx.waker().wake_by_ref();
454 Poll::Pending
455 }
456 Poll::Pending => Poll::Pending,
457 }
458 }
459}
460
461/// A publisher to a channel
462///
463/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
464/// generics are erased on this subscriber
465pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
466 /// The channel we are a publisher for
467 channel: &'a PSB,
468 _phantom: PhantomData<T>,
469}
470
471impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
472 /// Publish a message right now even when the queue is full.
473 /// This may cause a subscriber to miss an older message.
474 pub fn publish_immediate(&self, message: T) {
475 self.channel.publish_immediate(message)
476 }
477
478 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
479 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
480 PublisherWaitFuture {
481 message: Some(message),
482 publisher: self,
483 }
484 }
485
486 /// Publish a message if there is space in the message queue
487 pub fn try_publish(&self, message: T) -> Result<(), T> {
488 self.channel.publish_with_context(message, None)
489 }
490}
491
492impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
493 fn drop(&mut self) {
494 self.channel.unregister_publisher()
495 }
496}
497
498pub struct DynPublisher<'a, T: Clone>(Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
499
500impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
501 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
502
503 fn deref(&self) -> &Self::Target {
504 &self.0
505 }
506}
507
508impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
509 fn deref_mut(&mut self) -> &mut Self::Target {
510 &mut self.0
511 }
512}
513
514pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
515 Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
516);
517
518impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
519 for Publisher<'a, M, T, CAP, SUBS, PUBS>
520{
521 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
522
523 fn deref(&self) -> &Self::Target {
524 &self.0
525 }
526}
527
528impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
529 for Publisher<'a, M, T, CAP, SUBS, PUBS>
530{
531 fn deref_mut(&mut self) -> &mut Self::Target {
532 &mut self.0
533 }
534}
535
536/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
537/// (So an infinite amount is possible)
538pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
539 /// The channel we are a publisher for
540 channel: &'a PSB,
541 _phantom: PhantomData<T>,
542}
543
544impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
545 /// Publish the message right now even when the queue is full.
546 /// This may cause a subscriber to miss an older message.
547 pub fn publish_immediate(&mut self, message: T) {
548 self.channel.publish_immediate(message)
549 }
550
551 /// Publish a message if there is space in the message queue
552 pub fn try_publish(&self, message: T) -> Result<(), T> {
553 self.channel.publish_with_context(message, None)
554 }
555}
556
557pub struct DynImmediatePublisher<'a, T: Clone>(ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
558
559impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
560 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
561
562 fn deref(&self) -> &Self::Target {
563 &self.0
564 }
565}
566
567impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
568 fn deref_mut(&mut self) -> &mut Self::Target {
569 &mut self.0
570 }
571}
572
573pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
574 ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
575);
576
577impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
578 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
579{
580 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
581
582 fn deref(&self) -> &Self::Target {
583 &self.0
584 }
585}
586
587impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
588 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
589{
590 fn deref_mut(&mut self) -> &mut Self::Target {
591 &mut self.0
592 }
593}
594
595/// Error type for the [PubSubChannel] 329/// Error type for the [PubSubChannel]
596#[derive(Debug, PartialEq, Clone)] 330#[derive(Debug, PartialEq, Clone)]
597pub enum Error { 331pub enum Error {
@@ -603,59 +337,29 @@ pub enum Error {
603 MaximumPublishersReached, 337 MaximumPublishersReached,
604} 338}
605 339
340/// 'Middle level' behaviour of the pubsub channel.
341/// This trait is used so that Sub and Pub can be generic over the channel.
606pub trait PubSubBehavior<T> { 342pub trait PubSubBehavior<T> {
343 /// Try to get a message from the queue with the given message id.
344 ///
345 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
607 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 346 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
608 347
348 /// Try to publish a message to the queue.
349 ///
350 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
609 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; 351 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
610 352
353 /// Publish a message immediately
611 fn publish_immediate(&self, message: T); 354 fn publish_immediate(&self, message: T);
612 355
356 /// Let the channel know that a subscriber has dropped
613 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 357 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
614 358
359 /// Let the channel know that a publisher has dropped
615 fn unregister_publisher(&self); 360 fn unregister_publisher(&self);
616} 361}
617 362
618/// Future for the subscriber wait action
619pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
620 subscriber: &'s mut Sub<'a, PSB, T>,
621}
622
623impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
624 type Output = WaitResult<T>;
625
626 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
627 self.subscriber
628 .channel
629 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
630 }
631}
632
633impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}
634
635/// Future for the publisher wait action
636pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
637 /// The message we need to publish
638 message: Option<T>,
639 publisher: &'s Pub<'a, PSB, T>,
640}
641
642impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
643 type Output = ();
644
645 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
646 let message = self.message.take().unwrap();
647 match self.publisher.channel.publish_with_context(message, Some(cx)) {
648 Ok(()) => Poll::Ready(()),
649 Err(message) => {
650 self.message = Some(message);
651 Poll::Pending
652 }
653 }
654 }
655}
656
657impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}
658
659/// The result of the subscriber wait procedure 363/// The result of the subscriber wait procedure
660#[derive(Debug, Clone, PartialEq)] 364#[derive(Debug, Clone, PartialEq)]
661pub enum WaitResult<T> { 365pub enum WaitResult<T> {
diff --git a/embassy/src/channel/pubsub/publisher.rs b/embassy/src/channel/pubsub/publisher.rs
new file mode 100644
index 000000000..89a0b9247
--- /dev/null
+++ b/embassy/src/channel/pubsub/publisher.rs
@@ -0,0 +1,183 @@
1//! Implementation of anything directly publisher related
2
3use core::marker::PhantomData;
4use core::ops::{Deref, DerefMut};
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use futures::Future;
9
10use super::{PubSubBehavior, PubSubChannel};
11use crate::blocking_mutex::raw::RawMutex;
12
13/// A publisher to a channel
14pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
15 /// The channel we are a publisher for
16 channel: &'a PSB,
17 _phantom: PhantomData<T>,
18}
19
20impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
21 pub(super) fn new(channel: &'a PSB) -> Self {
22 Self {
23 channel,
24 _phantom: Default::default(),
25 }
26 }
27
28 /// Publish a message right now even when the queue is full.
29 /// This may cause a subscriber to miss an older message.
30 pub fn publish_immediate(&self, message: T) {
31 self.channel.publish_immediate(message)
32 }
33
34 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
35 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
36 PublisherWaitFuture {
37 message: Some(message),
38 publisher: self,
39 }
40 }
41
42 /// Publish a message if there is space in the message queue
43 pub fn try_publish(&self, message: T) -> Result<(), T> {
44 self.channel.publish_with_context(message, None)
45 }
46}
47
48impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
49 fn drop(&mut self) {
50 self.channel.unregister_publisher()
51 }
52}
53
54/// A publisher that holds a dynamic reference to the channel
55pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
56
57impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
58 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
66 fn deref_mut(&mut self) -> &mut Self::Target {
67 &mut self.0
68 }
69}
70
71/// A publisher that holds a generic reference to the channel
72pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
73 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
74);
75
76impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
77 for Publisher<'a, M, T, CAP, SUBS, PUBS>
78{
79 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
80
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
87 for Publisher<'a, M, T, CAP, SUBS, PUBS>
88{
89 fn deref_mut(&mut self) -> &mut Self::Target {
90 &mut self.0
91 }
92}
93
94/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
95/// (So an infinite amount is possible)
96pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
97 /// The channel we are a publisher for
98 channel: &'a PSB,
99 _phantom: PhantomData<T>,
100}
101
102impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
103 pub(super) fn new(channel: &'a PSB) -> Self {
104 Self {
105 channel,
106 _phantom: Default::default(),
107 }
108 }
109 /// Publish the message right now even when the queue is full.
110 /// This may cause a subscriber to miss an older message.
111 pub fn publish_immediate(&mut self, message: T) {
112 self.channel.publish_immediate(message)
113 }
114
115 /// Publish a message if there is space in the message queue
116 pub fn try_publish(&self, message: T) -> Result<(), T> {
117 self.channel.publish_with_context(message, None)
118 }
119}
120
121/// An immediate publisher that holds a dynamic reference to the channel
122pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
123
124impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
125 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
126
127 fn deref(&self) -> &Self::Target {
128 &self.0
129 }
130}
131
132impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
133 fn deref_mut(&mut self) -> &mut Self::Target {
134 &mut self.0
135 }
136}
137
138/// An immediate publisher that holds a generic reference to the channel
139pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
140 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
141);
142
143impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
144 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
145{
146 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
147
148 fn deref(&self) -> &Self::Target {
149 &self.0
150 }
151}
152
153impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
154 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
155{
156 fn deref_mut(&mut self) -> &mut Self::Target {
157 &mut self.0
158 }
159}
160
161/// Future for the publisher wait action
162pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
163 /// The message we need to publish
164 message: Option<T>,
165 publisher: &'s Pub<'a, PSB, T>,
166}
167
168impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
169 type Output = ();
170
171 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172 let message = self.message.take().unwrap();
173 match self.publisher.channel.publish_with_context(message, Some(cx)) {
174 Ok(()) => Poll::Ready(()),
175 Err(message) => {
176 self.message = Some(message);
177 Poll::Pending
178 }
179 }
180 }
181}
182
183impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy/src/channel/pubsub/subscriber.rs b/embassy/src/channel/pubsub/subscriber.rs
new file mode 100644
index 000000000..23c4938d9
--- /dev/null
+++ b/embassy/src/channel/pubsub/subscriber.rs
@@ -0,0 +1,153 @@
1//! Implementation of anything directly subscriber related
2
3use core::marker::PhantomData;
4use core::ops::{Deref, DerefMut};
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use futures::Future;
9
10use super::{PubSubBehavior, PubSubChannel, WaitResult};
11use crate::blocking_mutex::raw::RawMutex;
12
13/// A subscriber to a channel
14pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
15 /// The message id of the next message we are yet to receive
16 next_message_id: u64,
17 /// The channel we are a subscriber to
18 channel: &'a PSB,
19 _phantom: PhantomData<T>,
20}
21
22impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
23 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
24 Self {
25 next_message_id,
26 channel,
27 _phantom: Default::default(),
28 }
29 }
30
31 /// Wait for a published message
32 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
33 SubscriberWaitFuture { subscriber: self }
34 }
35
36 /// Wait for a published message (ignoring lag results)
37 pub async fn next_message_pure(&mut self) -> T {
38 loop {
39 match self.next_message().await {
40 WaitResult::Lagged(_) => continue,
41 WaitResult::Message(message) => break message,
42 }
43 }
44 }
45
46 /// Try to see if there's a published message we haven't received yet.
47 ///
48 /// This function does not peek. The message is received if there is one.
49 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
50 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
51 Poll::Ready(result) => Some(result),
52 Poll::Pending => None,
53 }
54 }
55
56 /// Try to see if there's a published message we haven't received yet (ignoring lag results).
57 ///
58 /// This function does not peek. The message is received if there is one.
59 pub fn try_next_message_pure(&mut self) -> Option<T> {
60 loop {
61 match self.try_next_message() {
62 Some(WaitResult::Lagged(_)) => continue,
63 Some(WaitResult::Message(message)) => break Some(message),
64 None => break None,
65 }
66 }
67 }
68}
69
70impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
71 fn drop(&mut self) {
72 self.channel.unregister_subscriber(self.next_message_id)
73 }
74}
75
76impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
77
78/// Warning: The stream implementation ignores lag results and returns all messages.
79/// This might miss some messages without you knowing it.
80impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> {
81 type Item = T;
82
83 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84 match self
85 .channel
86 .get_message_with_context(&mut self.next_message_id, Some(cx))
87 {
88 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
89 Poll::Ready(WaitResult::Lagged(_)) => {
90 cx.waker().wake_by_ref();
91 Poll::Pending
92 }
93 Poll::Pending => Poll::Pending,
94 }
95 }
96}
97
98/// A subscriber that holds a dynamic reference to the channel
99pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
100
101impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
102 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
103
104 fn deref(&self) -> &Self::Target {
105 &self.0
106 }
107}
108
109impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
110 fn deref_mut(&mut self) -> &mut Self::Target {
111 &mut self.0
112 }
113}
114
115/// A subscriber that holds a generic reference to the channel
116pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
117 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
118);
119
120impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
121 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
122{
123 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
124
125 fn deref(&self) -> &Self::Target {
126 &self.0
127 }
128}
129
130impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
131 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
132{
133 fn deref_mut(&mut self) -> &mut Self::Target {
134 &mut self.0
135 }
136}
137
138/// Future for the subscriber wait action
139pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
140 subscriber: &'s mut Sub<'a, PSB, T>,
141}
142
143impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
144 type Output = WaitResult<T>;
145
146 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147 self.subscriber
148 .channel
149 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
150 }
151}
152
153impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}