aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2024-10-21 10:48:07 +0200
committerGitHub <[email protected]>2024-10-21 10:48:07 +0200
commit379a59329158febc8edce1d62f62c07d71f22105 (patch)
tree9088ec503b56f284ba2747da3ccdb46a16b5b6b3 /embassy-sync
parente8ba9696f140e604344d84fba93bd9854de56c0a (diff)
parent89bad07e817dec482d385f765da5be3b6d2d0e4c (diff)
Merge pull request #3358 from mammothbane/main
embassy_sync: `Sink` adapter for `pubsub::Pub`
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/Cargo.toml3
-rw-r--r--embassy-sync/src/pubsub/mod.rs26
-rw-r--r--embassy-sync/src/pubsub/publisher.rs67
3 files changed, 95 insertions, 1 deletions
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
index 7b7d2bf8e..2f049b6bc 100644
--- a/embassy-sync/Cargo.toml
+++ b/embassy-sync/Cargo.toml
@@ -27,6 +27,7 @@ turbowakers = []
27defmt = { version = "0.3", optional = true } 27defmt = { version = "0.3", optional = true }
28log = { version = "0.4.14", optional = true } 28log = { version = "0.4.14", optional = true }
29 29
30futures-sink = { version = "0.3", default-features = false, features = [] }
30futures-util = { version = "0.3.17", default-features = false } 31futures-util = { version = "0.3.17", default-features = false }
31critical-section = "1.1" 32critical-section = "1.1"
32heapless = "0.8" 33heapless = "0.8"
@@ -37,7 +38,7 @@ embedded-io-async = { version = "0.6.1" }
37futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } 38futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
38futures-test = "0.3.17" 39futures-test = "0.3.17"
39futures-timer = "3.0.2" 40futures-timer = "3.0.2"
40futures-util = { version = "0.3.17", features = [ "channel" ] } 41futures-util = { version = "0.3.17", features = [ "channel", "sink" ] }
41 42
42# Enable critical-section implementation for std, for tests 43# Enable critical-section implementation for std, for tests
43critical-section = { version = "1.1", features = ["std"] } 44critical-section = { version = "1.1", features = ["std"] }
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index ae5951829..a2360a1d8 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -755,4 +755,30 @@ mod tests {
755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0); 755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0); 756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
757 } 757 }
758
759 #[futures_test::test]
760 async fn publisher_sink() {
761 use futures_util::{SinkExt, StreamExt};
762
763 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
764
765 let mut sub = channel.subscriber().unwrap();
766
767 let publ = channel.publisher().unwrap();
768 let mut sink = publ.sink();
769
770 sink.send(0).await.unwrap();
771 assert_eq!(0, sub.try_next_message_pure().unwrap());
772
773 sink.send(1).await.unwrap();
774 assert_eq!(1, sub.try_next_message_pure().unwrap());
775
776 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
777 .await
778 .unwrap();
779 assert_eq!(0, sub.try_next_message_pure().unwrap());
780 assert_eq!(1, sub.try_next_message_pure().unwrap());
781 assert_eq!(2, sub.try_next_message_pure().unwrap());
782 assert_eq!(3, sub.try_next_message_pure().unwrap());
783 }
758} 784}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e66b3b1db..7a1ab66de 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -74,6 +74,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
74 pub fn is_full(&self) -> bool { 74 pub fn is_full(&self) -> bool {
75 self.channel.is_full() 75 self.channel.is_full()
76 } 76 }
77
78 /// Create a [`futures::Sink`] adapter for this publisher.
79 #[inline]
80 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
81 PubSink { publ: self, fut: None }
82 }
77} 83}
78 84
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 85impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -221,6 +227,67 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
221 } 227 }
222} 228}
223 229
230#[must_use = "Sinks do nothing unless polled"]
231/// [`futures_sink::Sink`] adapter for [`Pub`].
232pub struct PubSink<'a, 'p, PSB, T>
233where
234 T: Clone,
235 PSB: PubSubBehavior<T> + ?Sized,
236{
237 publ: &'p Pub<'a, PSB, T>,
238 fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
239}
240
241impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
242where
243 PSB: PubSubBehavior<T> + ?Sized,
244 T: Clone,
245{
246 /// Try to make progress on the pending future if we have one.
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
248 let Some(mut fut) = self.fut.take() else {
249 return Poll::Ready(());
250 };
251
252 if Pin::new(&mut fut).poll(cx).is_pending() {
253 self.fut = Some(fut);
254 return Poll::Pending;
255 }
256
257 Poll::Ready(())
258 }
259}
260
261impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
262where
263 PSB: PubSubBehavior<T> + ?Sized,
264 T: Clone,
265{
266 type Error = core::convert::Infallible;
267
268 #[inline]
269 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270 self.poll(cx).map(Ok)
271 }
272
273 #[inline]
274 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
275 self.fut = Some(self.publ.publish(item));
276
277 Ok(())
278 }
279
280 #[inline]
281 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282 self.poll(cx).map(Ok)
283 }
284
285 #[inline]
286 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 self.poll(cx).map(Ok)
288 }
289}
290
224/// Future for the publisher wait action 291/// Future for the publisher wait action
225#[must_use = "futures do nothing unless you `.await` or poll them"] 292#[must_use = "futures do nothing unless you `.await` or poll them"]
226pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 293pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {