diff options
Diffstat (limited to 'embassy-sync/src/pubsub/publisher.rs')
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e66b3b1db..7a1ab66de 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 74 | pub fn is_full(&self) -> bool { | 74 | pub fn is_full(&self) -> bool { |
| 75 | self.channel.is_full() | 75 | self.channel.is_full() |
| 76 | } | 76 | } |
| 77 | |||
| 78 | /// Create a [`futures::Sink`] adapter for this publisher. | ||
| 79 | #[inline] | ||
| 80 | pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { | ||
| 81 | PubSink { publ: self, fut: None } | ||
| 82 | } | ||
| 77 | } | 83 | } |
| 78 | 84 | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 85 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 221 | } | 227 | } |
| 222 | } | 228 | } |
| 223 | 229 | ||
| 230 | #[must_use = "Sinks do nothing unless polled"] | ||
| 231 | /// [`futures_sink::Sink`] adapter for [`Pub`]. | ||
| 232 | pub struct PubSink<'a, 'p, PSB, T> | ||
| 233 | where | ||
| 234 | T: Clone, | ||
| 235 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 236 | { | ||
| 237 | publ: &'p Pub<'a, PSB, T>, | ||
| 238 | fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>, | ||
| 239 | } | ||
| 240 | |||
| 241 | impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T> | ||
| 242 | where | ||
| 243 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 244 | T: Clone, | ||
| 245 | { | ||
| 246 | /// Try to make progress on the pending future if we have one. | ||
| 247 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | ||
| 248 | let Some(mut fut) = self.fut.take() else { | ||
| 249 | return Poll::Ready(()); | ||
| 250 | }; | ||
| 251 | |||
| 252 | if Pin::new(&mut fut).poll(cx).is_pending() { | ||
| 253 | self.fut = Some(fut); | ||
| 254 | return Poll::Pending; | ||
| 255 | } | ||
| 256 | |||
| 257 | Poll::Ready(()) | ||
| 258 | } | ||
| 259 | } | ||
| 260 | |||
| 261 | impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T> | ||
| 262 | where | ||
| 263 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 264 | T: Clone, | ||
| 265 | { | ||
| 266 | type Error = core::convert::Infallible; | ||
| 267 | |||
| 268 | #[inline] | ||
| 269 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 270 | self.poll(cx).map(Ok) | ||
| 271 | } | ||
| 272 | |||
| 273 | #[inline] | ||
| 274 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | ||
| 275 | self.fut = Some(self.publ.publish(item)); | ||
| 276 | |||
| 277 | Ok(()) | ||
| 278 | } | ||
| 279 | |||
| 280 | #[inline] | ||
| 281 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 282 | self.poll(cx).map(Ok) | ||
| 283 | } | ||
| 284 | |||
| 285 | #[inline] | ||
| 286 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 287 | self.poll(cx).map(Ok) | ||
| 288 | } | ||
| 289 | } | ||
| 290 | |||
| 224 | /// Future for the publisher wait action | 291 | /// Future for the publisher wait action |
| 225 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 292 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 226 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 293 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
