aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/pubsub/subscriber.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src/pubsub/subscriber.rs')
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs152
1 files changed, 152 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
new file mode 100644
index 000000000..b9a2cbe18
--- /dev/null
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -0,0 +1,152 @@
1//! Implementation of anything directly subscriber related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A subscriber to a channel
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The message id of the next message we are yet to receive
15 next_message_id: u64,
16 /// The channel we are a subscriber to
17 channel: &'a PSB,
18 _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23 Self {
24 next_message_id,
25 channel,
26 _phantom: Default::default(),
27 }
28 }
29
30 /// Wait for a published message
31 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32 SubscriberWaitFuture { subscriber: self }
33 }
34
35 /// Wait for a published message (ignoring lag results)
36 pub async fn next_message_pure(&mut self) -> T {
37 loop {
38 match self.next_message().await {
39 WaitResult::Lagged(_) => continue,
40 WaitResult::Message(message) => break message,
41 }
42 }
43 }
44
45 /// Try to see if there's a published message we haven't received yet.
46 ///
47 /// This function does not peek. The message is received if there is one.
48 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50 Poll::Ready(result) => Some(result),
51 Poll::Pending => None,
52 }
53 }
54
55 /// Try to see if there's a published message we haven't received yet (ignoring lag results).
56 ///
57 /// This function does not peek. The message is received if there is one.
58 pub fn try_next_message_pure(&mut self) -> Option<T> {
59 loop {
60 match self.try_next_message() {
61 Some(WaitResult::Lagged(_)) => continue,
62 Some(WaitResult::Message(message)) => break Some(message),
63 None => break None,
64 }
65 }
66 }
67}
68
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
70 fn drop(&mut self) {
71 self.channel.unregister_subscriber(self.next_message_id)
72 }
73}
74
75impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
76
77/// Warning: The stream implementation ignores lag results and returns all messages.
78/// This might miss some messages without you knowing it.
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
80 type Item = T;
81
82 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 match self
84 .channel
85 .get_message_with_context(&mut self.next_message_id, Some(cx))
86 {
87 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
88 Poll::Ready(WaitResult::Lagged(_)) => {
89 cx.waker().wake_by_ref();
90 Poll::Pending
91 }
92 Poll::Pending => Poll::Pending,
93 }
94 }
95}
96
97/// A subscriber that holds a dynamic reference to the channel
98pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
99
100impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
101 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.0
111 }
112}
113
114/// A subscriber that holds a generic reference to the channel
115pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
116 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
117);
118
119impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
120 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
121{
122 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
123
124 fn deref(&self) -> &Self::Target {
125 &self.0
126 }
127}
128
129impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
130 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
131{
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// Future for the subscriber wait action
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>,
140}
141
142impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
143 type Output = WaitResult<T>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 self.subscriber
147 .channel
148 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
149 }
150}
151
152impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}