diff options
| author | Dario Nieuwenhuis <[email protected]> | 2024-10-21 10:48:07 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-10-21 10:48:07 +0200 |
| commit | 379a59329158febc8edce1d62f62c07d71f22105 (patch) | |
| tree | 9088ec503b56f284ba2747da3ccdb46a16b5b6b3 /embassy-sync | |
| parent | e8ba9696f140e604344d84fba93bd9854de56c0a (diff) | |
| parent | 89bad07e817dec482d385f765da5be3b6d2d0e4c (diff) | |
Merge pull request #3358 from mammothbane/main
embassy_sync: `Sink` adapter for `pubsub::Pub`
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/Cargo.toml | 3 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 26 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 67 |
3 files changed, 95 insertions, 1 deletions
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 7b7d2bf8e..2f049b6bc 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml | |||
| @@ -27,6 +27,7 @@ turbowakers = [] | |||
| 27 | defmt = { version = "0.3", optional = true } | 27 | defmt = { version = "0.3", optional = true } |
| 28 | log = { version = "0.4.14", optional = true } | 28 | log = { version = "0.4.14", optional = true } |
| 29 | 29 | ||
| 30 | futures-sink = { version = "0.3", default-features = false, features = [] } | ||
| 30 | futures-util = { version = "0.3.17", default-features = false } | 31 | futures-util = { version = "0.3.17", default-features = false } |
| 31 | critical-section = "1.1" | 32 | critical-section = "1.1" |
| 32 | heapless = "0.8" | 33 | heapless = "0.8" |
| @@ -37,7 +38,7 @@ embedded-io-async = { version = "0.6.1" } | |||
| 37 | futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } | 38 | futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } |
| 38 | futures-test = "0.3.17" | 39 | futures-test = "0.3.17" |
| 39 | futures-timer = "3.0.2" | 40 | futures-timer = "3.0.2" |
| 40 | futures-util = { version = "0.3.17", features = [ "channel" ] } | 41 | futures-util = { version = "0.3.17", features = [ "channel", "sink" ] } |
| 41 | 42 | ||
| 42 | # Enable critical-section implementation for std, for tests | 43 | # Enable critical-section implementation for std, for tests |
| 43 | critical-section = { version = "1.1", features = ["std"] } | 44 | critical-section = { version = "1.1", features = ["std"] } |
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index ae5951829..a2360a1d8 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -755,4 +755,30 @@ mod tests { | |||
| 755 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); | 755 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); |
| 756 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); | 756 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); |
| 757 | } | 757 | } |
| 758 | |||
| 759 | #[futures_test::test] | ||
| 760 | async fn publisher_sink() { | ||
| 761 | use futures_util::{SinkExt, StreamExt}; | ||
| 762 | |||
| 763 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 764 | |||
| 765 | let mut sub = channel.subscriber().unwrap(); | ||
| 766 | |||
| 767 | let publ = channel.publisher().unwrap(); | ||
| 768 | let mut sink = publ.sink(); | ||
| 769 | |||
| 770 | sink.send(0).await.unwrap(); | ||
| 771 | assert_eq!(0, sub.try_next_message_pure().unwrap()); | ||
| 772 | |||
| 773 | sink.send(1).await.unwrap(); | ||
| 774 | assert_eq!(1, sub.try_next_message_pure().unwrap()); | ||
| 775 | |||
| 776 | sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok)) | ||
| 777 | .await | ||
| 778 | .unwrap(); | ||
| 779 | assert_eq!(0, sub.try_next_message_pure().unwrap()); | ||
| 780 | assert_eq!(1, sub.try_next_message_pure().unwrap()); | ||
| 781 | assert_eq!(2, sub.try_next_message_pure().unwrap()); | ||
| 782 | assert_eq!(3, sub.try_next_message_pure().unwrap()); | ||
| 783 | } | ||
| 758 | } | 784 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e66b3b1db..7a1ab66de 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 74 | pub fn is_full(&self) -> bool { | 74 | pub fn is_full(&self) -> bool { |
| 75 | self.channel.is_full() | 75 | self.channel.is_full() |
| 76 | } | 76 | } |
| 77 | |||
| 78 | /// Create a [`futures::Sink`] adapter for this publisher. | ||
| 79 | #[inline] | ||
| 80 | pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { | ||
| 81 | PubSink { publ: self, fut: None } | ||
| 82 | } | ||
| 77 | } | 83 | } |
| 78 | 84 | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 85 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 221 | } | 227 | } |
| 222 | } | 228 | } |
| 223 | 229 | ||
| 230 | #[must_use = "Sinks do nothing unless polled"] | ||
| 231 | /// [`futures_sink::Sink`] adapter for [`Pub`]. | ||
| 232 | pub struct PubSink<'a, 'p, PSB, T> | ||
| 233 | where | ||
| 234 | T: Clone, | ||
| 235 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 236 | { | ||
| 237 | publ: &'p Pub<'a, PSB, T>, | ||
| 238 | fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>, | ||
| 239 | } | ||
| 240 | |||
| 241 | impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T> | ||
| 242 | where | ||
| 243 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 244 | T: Clone, | ||
| 245 | { | ||
| 246 | /// Try to make progress on the pending future if we have one. | ||
| 247 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | ||
| 248 | let Some(mut fut) = self.fut.take() else { | ||
| 249 | return Poll::Ready(()); | ||
| 250 | }; | ||
| 251 | |||
| 252 | if Pin::new(&mut fut).poll(cx).is_pending() { | ||
| 253 | self.fut = Some(fut); | ||
| 254 | return Poll::Pending; | ||
| 255 | } | ||
| 256 | |||
| 257 | Poll::Ready(()) | ||
| 258 | } | ||
| 259 | } | ||
| 260 | |||
| 261 | impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T> | ||
| 262 | where | ||
| 263 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 264 | T: Clone, | ||
| 265 | { | ||
| 266 | type Error = core::convert::Infallible; | ||
| 267 | |||
| 268 | #[inline] | ||
| 269 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 270 | self.poll(cx).map(Ok) | ||
| 271 | } | ||
| 272 | |||
| 273 | #[inline] | ||
| 274 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | ||
| 275 | self.fut = Some(self.publ.publish(item)); | ||
| 276 | |||
| 277 | Ok(()) | ||
| 278 | } | ||
| 279 | |||
| 280 | #[inline] | ||
| 281 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 282 | self.poll(cx).map(Ok) | ||
| 283 | } | ||
| 284 | |||
| 285 | #[inline] | ||
| 286 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 287 | self.poll(cx).map(Ok) | ||
| 288 | } | ||
| 289 | } | ||
| 290 | |||
| 224 | /// Future for the publisher wait action | 291 | /// Future for the publisher wait action |
| 225 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 292 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 226 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 293 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
