aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/channel/pubsub.rs253
1 files changed, 223 insertions, 30 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs
index 41f275c4e..334401884 100644
--- a/embassy/src/channel/pubsub.rs
+++ b/embassy/src/channel/pubsub.rs
@@ -3,6 +3,8 @@
3use core::cell::RefCell; 3use core::cell::RefCell;
4use core::fmt::Debug; 4use core::fmt::Debug;
5use core::future::Future; 5use core::future::Future;
6use core::marker::PhantomData;
7use core::ops::{Deref, DerefMut};
6use core::pin::Pin; 8use core::pin::Pin;
7use core::task::{Context, Poll, Waker}; 9use core::task::{Context, Poll, Waker};
8 10
@@ -38,7 +40,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
38 /// Create a new subscriber. It will only receive messages that are published after its creation. 40 /// Create a new subscriber. It will only receive messages that are published after its creation.
39 /// 41 ///
40 /// If there are no subscriber slots left, an error will be returned. 42 /// If there are no subscriber slots left, an error will be returned.
41 pub fn subscriber(&self) -> Result<Subscriber<'_, T>, Error> { 43 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
42 self.inner.lock(|inner| { 44 self.inner.lock(|inner| {
43 let mut s = inner.borrow_mut(); 45 let mut s = inner.borrow_mut();
44 46
@@ -46,10 +48,31 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
46 Err(Error::MaximumSubscribersReached) 48 Err(Error::MaximumSubscribersReached)
47 } else { 49 } else {
48 s.subscriber_count += 1; 50 s.subscriber_count += 1;
49 Ok(Subscriber { 51 Ok(Subscriber(Sub {
50 next_message_id: s.next_message_id, 52 next_message_id: s.next_message_id,
51 channel: self, 53 channel: self,
52 }) 54 _phantom: Default::default(),
55 }))
56 }
57 })
58 }
59
60 /// Create a new subscriber. It will only receive messages that are published after its creation.
61 ///
62 /// If there are no subscriber slots left, an error will be returned.
63 pub fn dyn_subscriber<'a>(&'a self) -> Result<DynSubscriber<'a, T>, Error> {
64 self.inner.lock(|inner| {
65 let mut s = inner.borrow_mut();
66
67 if s.subscriber_count >= SUBS {
68 Err(Error::MaximumSubscribersReached)
69 } else {
70 s.subscriber_count += 1;
71 Ok(DynSubscriber(Sub {
72 next_message_id: s.next_message_id,
73 channel: self as _,
74 _phantom: Default::default(),
75 }))
53 } 76 }
54 }) 77 })
55 } 78 }
@@ -57,7 +80,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
57 /// Create a new publisher 80 /// Create a new publisher
58 /// 81 ///
59 /// If there are no publisher slots left, an error will be returned. 82 /// If there are no publisher slots left, an error will be returned.
60 pub fn publisher(&self) -> Result<Publisher<'_, T>, Error> { 83 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
61 self.inner.lock(|inner| { 84 self.inner.lock(|inner| {
62 let mut s = inner.borrow_mut(); 85 let mut s = inner.borrow_mut();
63 86
@@ -65,15 +88,49 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
65 Err(Error::MaximumPublishersReached) 88 Err(Error::MaximumPublishersReached)
66 } else { 89 } else {
67 s.publisher_count += 1; 90 s.publisher_count += 1;
68 Ok(Publisher { channel: self }) 91 Ok(Publisher(Pub {
92 channel: self,
93 _phantom: Default::default(),
94 }))
95 }
96 })
97 }
98
99 /// Create a new publisher
100 ///
101 /// If there are no publisher slots left, an error will be returned.
102 pub fn dyn_publisher<'a>(&'a self) -> Result<DynPublisher<'a, T>, Error> {
103 self.inner.lock(|inner| {
104 let mut s = inner.borrow_mut();
105
106 if s.publisher_count >= PUBS {
107 Err(Error::MaximumPublishersReached)
108 } else {
109 s.publisher_count += 1;
110 Ok(DynPublisher(Pub {
111 channel: self,
112 _phantom: Default::default(),
113 }))
69 } 114 }
70 }) 115 })
71 } 116 }
72 117
73 /// Create a new publisher that can only send immediate messages. 118 /// Create a new publisher that can only send immediate messages.
74 /// This kind of publisher does not take up a publisher slot. 119 /// This kind of publisher does not take up a publisher slot.
75 pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, T> { 120 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
76 ImmediatePublisher { channel: self } 121 ImmediatePublisher(ImmediatePub {
122 channel: self,
123 _phantom: Default::default(),
124 })
125 }
126
127 /// Create a new publisher that can only send immediate messages.
128 /// This kind of publisher does not take up a publisher slot.
129 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
130 DynImmediatePublisher(ImmediatePub {
131 channel: self,
132 _phantom: Default::default(),
133 })
77 } 134 }
78} 135}
79 136
@@ -287,16 +344,17 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
287/// 344///
288/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's 345/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
289/// generics are erased on this subscriber 346/// generics are erased on this subscriber
290pub struct Subscriber<'a, T: Clone> { 347pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
291 /// The message id of the next message we are yet to receive 348 /// The message id of the next message we are yet to receive
292 next_message_id: u64, 349 next_message_id: u64,
293 /// The channel we are a subscriber to 350 /// The channel we are a subscriber to
294 channel: &'a dyn PubSubBehavior<T>, 351 channel: &'a PSB,
352 _phantom: PhantomData<T>,
295} 353}
296 354
297impl<'a, T: Clone> Subscriber<'a, T> { 355impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
298 /// Wait for a published message 356 /// Wait for a published message
299 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { 357 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
300 SubscriberWaitFuture { subscriber: self } 358 SubscriberWaitFuture { subscriber: self }
301 } 359 }
302 360
@@ -334,15 +392,55 @@ impl<'a, T: Clone> Subscriber<'a, T> {
334 } 392 }
335} 393}
336 394
337impl<'a, T: Clone> Drop for Subscriber<'a, T> { 395impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
338 fn drop(&mut self) { 396 fn drop(&mut self) {
339 self.channel.unregister_subscriber(self.next_message_id) 397 self.channel.unregister_subscriber(self.next_message_id)
340 } 398 }
341} 399}
342 400
401impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
402
403pub struct DynSubscriber<'a, T: Clone>(Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
404
405impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
406 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
407
408 fn deref(&self) -> &Self::Target {
409 &self.0
410 }
411}
412
413impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
414 fn deref_mut(&mut self) -> &mut Self::Target {
415 &mut self.0
416 }
417}
418
419pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
420 Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
421);
422
423impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
424 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
425{
426 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
427
428 fn deref(&self) -> &Self::Target {
429 &self.0
430 }
431}
432
433impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
434 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
435{
436 fn deref_mut(&mut self) -> &mut Self::Target {
437 &mut self.0
438 }
439}
440
343/// Warning: The stream implementation ignores lag results and returns all messages. 441/// Warning: The stream implementation ignores lag results and returns all messages.
344/// This might miss some messages without you knowing it. 442/// This might miss some messages without you knowing it.
345impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { 443impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures::Stream for Sub<'a, PSB, T> {
346 type Item = T; 444 type Item = T;
347 445
348 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 446 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -364,12 +462,13 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
364/// 462///
365/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's 463/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
366/// generics are erased on this subscriber 464/// generics are erased on this subscriber
367pub struct Publisher<'a, T: Clone> { 465pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
368 /// The channel we are a publisher for 466 /// The channel we are a publisher for
369 channel: &'a dyn PubSubBehavior<T>, 467 channel: &'a PSB,
468 _phantom: PhantomData<T>,
370} 469}
371 470
372impl<'a, T: Clone> Publisher<'a, T> { 471impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
373 /// Publish a message right now even when the queue is full. 472 /// Publish a message right now even when the queue is full.
374 /// This may cause a subscriber to miss an older message. 473 /// This may cause a subscriber to miss an older message.
375 pub fn publish_immediate(&self, message: T) { 474 pub fn publish_immediate(&self, message: T) {
@@ -377,7 +476,7 @@ impl<'a, T: Clone> Publisher<'a, T> {
377 } 476 }
378 477
379 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message 478 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
380 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, T> { 479 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
381 PublisherWaitFuture { 480 PublisherWaitFuture {
382 message: Some(message), 481 message: Some(message),
383 publisher: self, 482 publisher: self,
@@ -390,20 +489,59 @@ impl<'a, T: Clone> Publisher<'a, T> {
390 } 489 }
391} 490}
392 491
393impl<'a, T: Clone> Drop for Publisher<'a, T> { 492impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
394 fn drop(&mut self) { 493 fn drop(&mut self) {
395 self.channel.unregister_publisher() 494 self.channel.unregister_publisher()
396 } 495 }
397} 496}
398 497
498pub struct DynPublisher<'a, T: Clone>(Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
499
500impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
501 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
502
503 fn deref(&self) -> &Self::Target {
504 &self.0
505 }
506}
507
508impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
509 fn deref_mut(&mut self) -> &mut Self::Target {
510 &mut self.0
511 }
512}
513
514pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
515 Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
516);
517
518impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
519 for Publisher<'a, M, T, CAP, SUBS, PUBS>
520{
521 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
522
523 fn deref(&self) -> &Self::Target {
524 &self.0
525 }
526}
527
528impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
529 for Publisher<'a, M, T, CAP, SUBS, PUBS>
530{
531 fn deref_mut(&mut self) -> &mut Self::Target {
532 &mut self.0
533 }
534}
535
399/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. 536/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
400/// (So an infinite amount is possible) 537/// (So an infinite amount is possible)
401pub struct ImmediatePublisher<'a, T: Clone> { 538pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
402 /// The channel we are a publisher for 539 /// The channel we are a publisher for
403 channel: &'a dyn PubSubBehavior<T>, 540 channel: &'a PSB,
541 _phantom: PhantomData<T>,
404} 542}
405 543
406impl<'a, T: Clone> ImmediatePublisher<'a, T> { 544impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
407 /// Publish the message right now even when the queue is full. 545 /// Publish the message right now even when the queue is full.
408 /// This may cause a subscriber to miss an older message. 546 /// This may cause a subscriber to miss an older message.
409 pub fn publish_immediate(&mut self, message: T) { 547 pub fn publish_immediate(&mut self, message: T) {
@@ -416,6 +554,44 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> {
416 } 554 }
417} 555}
418 556
557pub struct DynImmediatePublisher<'a, T: Clone>(ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
558
559impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
560 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
561
562 fn deref(&self) -> &Self::Target {
563 &self.0
564 }
565}
566
567impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
568 fn deref_mut(&mut self) -> &mut Self::Target {
569 &mut self.0
570 }
571}
572
573pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
574 ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
575);
576
577impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
578 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
579{
580 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
581
582 fn deref(&self) -> &Self::Target {
583 &self.0
584 }
585}
586
587impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
588 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
589{
590 fn deref_mut(&mut self) -> &mut Self::Target {
591 &mut self.0
592 }
593}
594
419/// Error type for the [PubSubChannel] 595/// Error type for the [PubSubChannel]
420#[derive(Debug, PartialEq, Clone)] 596#[derive(Debug, PartialEq, Clone)]
421pub enum Error { 597pub enum Error {
@@ -427,7 +603,7 @@ pub enum Error {
427 MaximumPublishersReached, 603 MaximumPublishersReached,
428} 604}
429 605
430trait PubSubBehavior<T> { 606pub trait PubSubBehavior<T> {
431 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 607 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
432 608
433 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; 609 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
@@ -440,11 +616,11 @@ trait PubSubBehavior<T> {
440} 616}
441 617
442/// Future for the subscriber wait action 618/// Future for the subscriber wait action
443pub struct SubscriberWaitFuture<'s, 'a, T: Clone> { 619pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
444 subscriber: &'s mut Subscriber<'a, T>, 620 subscriber: &'s mut Sub<'a, PSB, T>,
445} 621}
446 622
447impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { 623impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
448 type Output = WaitResult<T>; 624 type Output = WaitResult<T>;
449 625
450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 626 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -454,16 +630,16 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> {
454 } 630 }
455} 631}
456 632
457impl<'s, 'a, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, T> {} 633impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}
458 634
459/// Future for the publisher wait action 635/// Future for the publisher wait action
460pub struct PublisherWaitFuture<'s, 'a, T: Clone> { 636pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
461 /// The message we need to publish 637 /// The message we need to publish
462 message: Option<T>, 638 message: Option<T>,
463 publisher: &'s Publisher<'a, T>, 639 publisher: &'s Pub<'a, PSB, T>,
464} 640}
465 641
466impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { 642impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
467 type Output = (); 643 type Output = ();
468 644
469 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 645 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -478,7 +654,7 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> {
478 } 654 }
479} 655}
480 656
481impl<'s, 'a, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, T> {} 657impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}
482 658
483/// The result of the subscriber wait procedure 659/// The result of the subscriber wait procedure
484#[derive(Debug, Clone, PartialEq)] 660#[derive(Debug, Clone, PartialEq)]
@@ -496,6 +672,23 @@ mod tests {
496 use crate::blocking_mutex::raw::NoopRawMutex; 672 use crate::blocking_mutex::raw::NoopRawMutex;
497 673
498 #[futures_test::test] 674 #[futures_test::test]
675 async fn dyn_pub_sub_works() {
676 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
677
678 let mut sub0 = channel.dyn_subscriber().unwrap();
679 let mut sub1 = channel.dyn_subscriber().unwrap();
680 let pub0 = channel.dyn_publisher().unwrap();
681
682 pub0.publish(42).await;
683
684 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
685 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
686
687 assert_eq!(sub0.try_next_message(), None);
688 assert_eq!(sub1.try_next_message(), None);
689 }
690
691 #[futures_test::test]
499 async fn all_subscribers_receive() { 692 async fn all_subscribers_receive() {
500 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 693 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
501 694