aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/pubsub/publisher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src/pubsub/publisher.rs')
-rw-r--r--embassy-sync/src/pubsub/publisher.rs67
1 files changed, 67 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e66b3b1db..7a1ab66de 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
74 pub fn is_full(&self) -> bool { 74 pub fn is_full(&self) -> bool {
75 self.channel.is_full() 75 self.channel.is_full()
76 } 76 }
77
78 /// Create a [`futures::Sink`] adapter for this publisher.
79 #[inline]
80 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
81 PubSink { publ: self, fut: None }
82 }
77} 83}
78 84
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 85impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
221 } 227 }
222} 228}
223 229
230#[must_use = "Sinks do nothing unless polled"]
231/// [`futures_sink::Sink`] adapter for [`Pub`].
232pub struct PubSink<'a, 'p, PSB, T>
233where
234 T: Clone,
235 PSB: PubSubBehavior<T> + ?Sized,
236{
237 publ: &'p Pub<'a, PSB, T>,
238 fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
239}
240
241impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
242where
243 PSB: PubSubBehavior<T> + ?Sized,
244 T: Clone,
245{
246 /// Try to make progress on the pending future if we have one.
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
248 let Some(mut fut) = self.fut.take() else {
249 return Poll::Ready(());
250 };
251
252 if Pin::new(&mut fut).poll(cx).is_pending() {
253 self.fut = Some(fut);
254 return Poll::Pending;
255 }
256
257 Poll::Ready(())
258 }
259}
260
261impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
262where
263 PSB: PubSubBehavior<T> + ?Sized,
264 T: Clone,
265{
266 type Error = core::convert::Infallible;
267
268 #[inline]
269 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270 self.poll(cx).map(Ok)
271 }
272
273 #[inline]
274 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
275 self.fut = Some(self.publ.publish(item));
276
277 Ok(())
278 }
279
280 #[inline]
281 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282 self.poll(cx).map(Ok)
283 }
284
285 #[inline]
286 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 self.poll(cx).map(Ok)
288 }
289}
290
224/// Future for the publisher wait action 291/// Future for the publisher wait action
225#[must_use = "futures do nothing unless you `.await` or poll them"] 292#[must_use = "futures do nothing unless you `.await` or poll them"]
226pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 293pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {