diff options
| -rw-r--r-- | embassy/src/channel/pubsub/mod.rs | 42 | ||||
| -rw-r--r-- | embassy/src/waitqueue/multi_waker.rs | 2 |
2 files changed, 44 insertions, 0 deletions
diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs index e421df58d..12f0d24ab 100644 --- a/embassy/src/channel/pubsub/mod.rs +++ b/embassy/src/channel/pubsub/mod.rs | |||
| @@ -29,6 +29,48 @@ pub use subscriber::{DynSubscriber, Subscriber}; | |||
| 29 | /// - With [Publisher::publish_immediate] the publisher doesn't await and instead lets the oldest message | 29 | /// - With [Publisher::publish_immediate] the publisher doesn't await and instead lets the oldest message |
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive | 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive |
| 31 | /// an error to indicate that it has lagged. | 31 | /// an error to indicate that it has lagged. |
| 32 | /// | ||
| 33 | /// ## Example | ||
| 34 | /// | ||
| 35 | /// ``` | ||
| 36 | /// # use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 37 | /// # use embassy::channel::pubsub::WaitResult; | ||
| 38 | /// # use embassy::channel::pubsub::PubSubChannel; | ||
| 39 | /// # use futures_executor::block_on; | ||
| 40 | /// # let test = async { | ||
| 41 | /// // Create the channel. This can be static as well | ||
| 42 | /// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 43 | /// | ||
| 44 | /// // This is a generic subscriber with a direct reference to the channel | ||
| 45 | /// let mut sub0 = channel.subscriber().unwrap(); | ||
| 46 | /// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel | ||
| 47 | /// let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 48 | /// | ||
| 49 | /// let pub0 = channel.publisher().unwrap(); | ||
| 50 | /// | ||
| 51 | /// // Publish a message, but wait if the queue is full | ||
| 52 | /// pub0.publish(42).await; | ||
| 53 | /// | ||
| 54 | /// // Publish a message, but if the queue is full, just kick out the oldest message. | ||
| 55 | /// // This may cause some subscribers to miss a message | ||
| 56 | /// pub0.publish_immediate(43); | ||
| 57 | /// | ||
| 58 | /// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result | ||
| 59 | /// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 60 | /// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 61 | /// | ||
| 62 | /// // Wait again, but this time ignore any Lag results | ||
| 63 | /// assert_eq!(sub0.next_message_pure().await, 43); | ||
| 64 | /// assert_eq!(sub1.next_message_pure().await, 43); | ||
| 65 | /// | ||
| 66 | /// // There's also a polling interface | ||
| 67 | /// assert_eq!(sub0.try_next_message(), None); | ||
| 68 | /// assert_eq!(sub1.try_next_message(), None); | ||
| 69 | /// # }; | ||
| 70 | /// # | ||
| 71 | /// # block_on(test); | ||
| 72 | /// ``` | ||
| 73 | /// | ||
| 32 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | 74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
| 33 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, | 75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, |
| 34 | } | 76 | } |
diff --git a/embassy/src/waitqueue/multi_waker.rs b/embassy/src/waitqueue/multi_waker.rs index 6e8710cb4..325d2cb3a 100644 --- a/embassy/src/waitqueue/multi_waker.rs +++ b/embassy/src/waitqueue/multi_waker.rs | |||
| @@ -2,11 +2,13 @@ use core::task::Waker; | |||
| 2 | 2 | ||
| 3 | use super::WakerRegistration; | 3 | use super::WakerRegistration; |
| 4 | 4 | ||
| 5 | /// Utility struct to register and wake multiple wakers. | ||
| 5 | pub struct MultiWakerRegistration<const N: usize> { | 6 | pub struct MultiWakerRegistration<const N: usize> { |
| 6 | wakers: [WakerRegistration; N], | 7 | wakers: [WakerRegistration; N], |
| 7 | } | 8 | } |
| 8 | 9 | ||
| 9 | impl<const N: usize> MultiWakerRegistration<N> { | 10 | impl<const N: usize> MultiWakerRegistration<N> { |
| 11 | /// Create a new empty instance | ||
| 10 | pub const fn new() -> Self { | 12 | pub const fn new() -> Self { |
| 11 | const WAKER: WakerRegistration = WakerRegistration::new(); | 13 | const WAKER: WakerRegistration = WakerRegistration::new(); |
| 12 | Self { wakers: [WAKER; N] } | 14 | Self { wakers: [WAKER; N] } |
