diff options
| author | Dario Nieuwenhuis <[email protected]> | 2022-08-22 22:00:06 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2022-08-22 22:18:13 +0200 |
| commit | 5677b13a86beca58aa57ecfd7cea0db7ceb189fa (patch) | |
| tree | 0c7425dae57acb94cb6ddca27def7e77609369b3 /embassy-sync/src/pubsub/subscriber.rs | |
| parent | 21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 (diff) | |
sync: flatten module structure.
Diffstat (limited to 'embassy-sync/src/pubsub/subscriber.rs')
| -rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs new file mode 100644 index 000000000..b9a2cbe18 --- /dev/null +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -0,0 +1,152 @@ | |||
| 1 | //! Implementation of anything directly subscriber related | ||
| 2 | |||
| 3 | use core::future::Future; | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | use core::ops::{Deref, DerefMut}; | ||
| 6 | use core::pin::Pin; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use super::{PubSubBehavior, PubSubChannel, WaitResult}; | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | |||
| 12 | /// A subscriber to a channel | ||
| 13 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 14 | /// The message id of the next message we are yet to receive | ||
| 15 | next_message_id: u64, | ||
| 16 | /// The channel we are a subscriber to | ||
| 17 | channel: &'a PSB, | ||
| 18 | _phantom: PhantomData<T>, | ||
| 19 | } | ||
| 20 | |||
| 21 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | ||
| 22 | pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { | ||
| 23 | Self { | ||
| 24 | next_message_id, | ||
| 25 | channel, | ||
| 26 | _phantom: Default::default(), | ||
| 27 | } | ||
| 28 | } | ||
| 29 | |||
| 30 | /// Wait for a published message | ||
| 31 | pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { | ||
| 32 | SubscriberWaitFuture { subscriber: self } | ||
| 33 | } | ||
| 34 | |||
| 35 | /// Wait for a published message (ignoring lag results) | ||
| 36 | pub async fn next_message_pure(&mut self) -> T { | ||
| 37 | loop { | ||
| 38 | match self.next_message().await { | ||
| 39 | WaitResult::Lagged(_) => continue, | ||
| 40 | WaitResult::Message(message) => break message, | ||
| 41 | } | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Try to see if there's a published message we haven't received yet. | ||
| 46 | /// | ||
| 47 | /// This function does not peek. The message is received if there is one. | ||
| 48 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { | ||
| 49 | match self.channel.get_message_with_context(&mut self.next_message_id, None) { | ||
| 50 | Poll::Ready(result) => Some(result), | ||
| 51 | Poll::Pending => None, | ||
| 52 | } | ||
| 53 | } | ||
| 54 | |||
| 55 | /// Try to see if there's a published message we haven't received yet (ignoring lag results). | ||
| 56 | /// | ||
| 57 | /// This function does not peek. The message is received if there is one. | ||
| 58 | pub fn try_next_message_pure(&mut self) -> Option<T> { | ||
| 59 | loop { | ||
| 60 | match self.try_next_message() { | ||
| 61 | Some(WaitResult::Lagged(_)) => continue, | ||
| 62 | Some(WaitResult::Message(message)) => break Some(message), | ||
| 63 | None => break None, | ||
| 64 | } | ||
| 65 | } | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | ||
| 70 | fn drop(&mut self) { | ||
| 71 | self.channel.unregister_subscriber(self.next_message_id) | ||
| 72 | } | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} | ||
| 76 | |||
| 77 | /// Warning: The stream implementation ignores lag results and returns all messages. | ||
| 78 | /// This might miss some messages without you knowing it. | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { | ||
| 80 | type Item = T; | ||
| 81 | |||
| 82 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 83 | match self | ||
| 84 | .channel | ||
| 85 | .get_message_with_context(&mut self.next_message_id, Some(cx)) | ||
| 86 | { | ||
| 87 | Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), | ||
| 88 | Poll::Ready(WaitResult::Lagged(_)) => { | ||
| 89 | cx.waker().wake_by_ref(); | ||
| 90 | Poll::Pending | ||
| 91 | } | ||
| 92 | Poll::Pending => Poll::Pending, | ||
| 93 | } | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | /// A subscriber that holds a dynamic reference to the channel | ||
| 98 | pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 99 | |||
| 100 | impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { | ||
| 101 | type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 102 | |||
| 103 | fn deref(&self) -> &Self::Target { | ||
| 104 | &self.0 | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { | ||
| 109 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 110 | &mut self.0 | ||
| 111 | } | ||
| 112 | } | ||
| 113 | |||
| 114 | /// A subscriber that holds a generic reference to the channel | ||
| 115 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 116 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 117 | ); | ||
| 118 | |||
| 119 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 120 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> | ||
| 121 | { | ||
| 122 | type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 123 | |||
| 124 | fn deref(&self) -> &Self::Target { | ||
| 125 | &self.0 | ||
| 126 | } | ||
| 127 | } | ||
| 128 | |||
| 129 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 130 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> | ||
| 131 | { | ||
| 132 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 133 | &mut self.0 | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | /// Future for the subscriber wait action | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | ||
| 140 | } | ||
| 141 | |||
| 142 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { | ||
| 143 | type Output = WaitResult<T>; | ||
| 144 | |||
| 145 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 146 | self.subscriber | ||
| 147 | .channel | ||
| 148 | .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} | ||
