aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/channel/pubsub/mod.rs42
-rw-r--r--embassy/src/waitqueue/multi_waker.rs2
2 files changed, 44 insertions, 0 deletions
diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs
index e421df58d..12f0d24ab 100644
--- a/embassy/src/channel/pubsub/mod.rs
+++ b/embassy/src/channel/pubsub/mod.rs
@@ -29,6 +29,48 @@ pub use subscriber::{DynSubscriber, Subscriber};
29/// - With [Publisher::publish_immediate] the publisher doesn't await and instead lets the oldest message 29/// - With [Publisher::publish_immediate] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive 30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged. 31/// an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy::channel::pubsub::WaitResult;
38/// # use embassy::channel::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
32pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { 74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
33 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, 75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
34} 76}
diff --git a/embassy/src/waitqueue/multi_waker.rs b/embassy/src/waitqueue/multi_waker.rs
index 6e8710cb4..325d2cb3a 100644
--- a/embassy/src/waitqueue/multi_waker.rs
+++ b/embassy/src/waitqueue/multi_waker.rs
@@ -2,11 +2,13 @@ use core::task::Waker;
2 2
3use super::WakerRegistration; 3use super::WakerRegistration;
4 4
5/// Utility struct to register and wake multiple wakers.
5pub struct MultiWakerRegistration<const N: usize> { 6pub struct MultiWakerRegistration<const N: usize> {
6 wakers: [WakerRegistration; N], 7 wakers: [WakerRegistration; N],
7} 8}
8 9
9impl<const N: usize> MultiWakerRegistration<N> { 10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance
10 pub const fn new() -> Self { 12 pub const fn new() -> Self {
11 const WAKER: WakerRegistration = WakerRegistration::new(); 13 const WAKER: WakerRegistration = WakerRegistration::new();
12 Self { wakers: [WAKER; N] } 14 Self { wakers: [WAKER; N] }