From f4ebc36b638a081b4a8b68ae72c4cca5199c4c4c Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 29 Sep 2022 14:24:42 +0200 Subject: Futures in pub & sub are now awaited instead of returned for better user compiler diagnostics. Added functions for reading how many messages are available --- embassy-sync/src/pubsub/mod.rs | 73 +++++++++++++++++++++++++++++++++++ embassy-sync/src/pubsub/publisher.rs | 13 ++++++- embassy-sync/src/pubsub/subscriber.rs | 11 ++++-- 3 files changed, 92 insertions(+), 5 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..335d7e33e 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -192,6 +192,10 @@ impl u64 { + self.inner.lock(|s| s.borrow().next_message_id - next_message_id) + } + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -217,6 +221,13 @@ impl usize { + self.inner.lock(|s| { + let s = s.borrow(); + s.queue.capacity() - s.queue.len() + }) + } + fn unregister_subscriber(&self, subscriber_next_message_id: u64) { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -388,6 +399,10 @@ pub trait PubSubBehavior { /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; + /// Get the amount of messages that are between the given the next_message_id and the most recent message. + /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. + fn available(&self, next_message_id: u64) -> u64; + /// Try to publish a message to the queue. /// /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. @@ -396,6 +411,9 @@ pub trait PubSubBehavior { /// Publish a message immediately fn publish_immediate(&self, message: T); + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + fn space(&self) -> usize; + /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -539,4 +557,59 @@ mod tests { drop(sub0); } + + #[futures_test::test] + async fn correct_available() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + assert_eq!(sub0.available(), 0); + assert_eq!(sub1.available(), 0); + + pub0.publish(42).await; + + assert_eq!(sub0.available(), 1); + assert_eq!(sub1.available(), 1); + + sub1.next_message().await; + + assert_eq!(sub1.available(), 0); + + pub0.publish(42).await; + + assert_eq!(sub0.available(), 2); + assert_eq!(sub1.available(), 1); + } + + #[futures_test::test] + async fn correct_space() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + assert_eq!(pub0.space(), 4); + + pub0.publish(42).await; + + assert_eq!(pub0.space(), 3); + + pub0.publish(42).await; + + assert_eq!(pub0.space(), 2); + + sub0.next_message().await; + sub0.next_message().await; + + assert_eq!(pub0.space(), 2); + + sub1.next_message().await; + assert_eq!(pub0.space(), 3); + sub1.next_message().await; + assert_eq!(pub0.space(), 4); + } } diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..484f1dbfd 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -31,17 +31,26 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { } /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { + pub async fn publish<'s>(&'s self, message: T) { PublisherWaitFuture { message: Some(message), publisher: self, } + .await } /// Publish a message if there is space in the message queue pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.publish_with_context(message, None) } + + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// + /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. + /// So checking doesn't give any guarantees.* + pub fn space(&self) -> usize { + self.channel.space() + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { @@ -158,7 +167,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, publisher: &'s Pub<'a, PSB, T>, diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..8a8e9144b 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -28,8 +28,8 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { - SubscriberWaitFuture { subscriber: self } + pub async fn next_message(&mut self) -> WaitResult { + SubscriberWaitFuture { subscriber: self }.await } /// Wait for a published message (ignoring lag results) @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } } } + + /// The amount of messages this subscriber hasn't received yet + pub fn available(&self) -> u64 { + self.channel.available(self.next_message_id) + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { @@ -135,7 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { subscriber: &'s mut Sub<'a, PSB, T>, } -- cgit From 874384826d4a6f9c9a9c8d3abf41f99a662f58fb Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Thu, 29 Sep 2022 15:15:10 +0200 Subject: Went back to named futures but now with must_use --- embassy-sync/src/pubsub/mod.rs | 2 +- embassy-sync/src/pubsub/publisher.rs | 6 +++--- embassy-sync/src/pubsub/subscriber.rs | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 335d7e33e..faaf99dc6 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -562,7 +562,7 @@ mod tests { async fn correct_available() { let channel = PubSubChannel::::new(); - let mut sub0 = channel.subscriber().unwrap(); + let sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); let pub0 = channel.publisher().unwrap(); diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 484f1dbfd..faa67d947 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -31,12 +31,11 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { } /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub async fn publish<'s>(&'s self, message: T) { + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { PublisherWaitFuture { message: Some(message), publisher: self, } - .await } /// Publish a message if there is space in the message queue @@ -167,7 +166,8 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the publisher wait action -struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { /// The message we need to publish message: Option, publisher: &'s Pub<'a, PSB, T>, diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index 8a8e9144b..f420a75f0 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -28,8 +28,8 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } /// Wait for a published message - pub async fn next_message(&mut self) -> WaitResult { - SubscriberWaitFuture { subscriber: self }.await + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { + SubscriberWaitFuture { subscriber: self } } /// Wait for a published message (ignoring lag results) @@ -140,7 +140,8 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: } /// Future for the subscriber wait action -struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { subscriber: &'s mut Sub<'a, PSB, T>, } -- cgit From 59765590e0339e8f4294719b6e99a6ab110266d8 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 4 Oct 2022 16:38:11 +0200 Subject: Add required info to embassy-sync package Updates the README.md based on embassy-futures structure. --- embassy-sync/Cargo.toml | 10 ++++++++++ embassy-sync/README.md | 24 ++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 14ab1d003..584d5ba9f 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml @@ -2,6 +2,16 @@ name = "embassy-sync" version = "0.1.0" edition = "2021" +description = "no-std, no-alloc synchronization primitives with async support" +repository = "https://github.com/embassy-rs/embassy" +readme = "README.md" +license = "MIT OR Apache-2.0" +categories = [ + "embedded", + "no-std", + "concurrency", + "asynchronous", +] [package.metadata.embassy_docs] src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" diff --git a/embassy-sync/README.md b/embassy-sync/README.md index 106295c0d..cc65cf6ef 100644 --- a/embassy-sync/README.md +++ b/embassy-sync/README.md @@ -1,12 +1,32 @@ # embassy-sync -Synchronization primitives and data structures with an async API: +An [Embassy](https://embassy.dev) project. + +Synchronization primitives and data structures with async support: - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. -- [`Mutex`](mutex::Mutex) - A Mutex for synchronizing state between asynchronous tasks. +- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. + +## Interoperability + +Futures from this crate can run on any executor. + +## Minimum supported Rust version (MSRV) + +Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release. + +## License + +This work is licensed under either of + +- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or + ) +- MIT license ([LICENSE-MIT](LICENSE-MIT) or ) + +at your option. -- cgit From 530182d6683531f7c259448e6c54c866f35837c7 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Wed, 5 Oct 2022 15:15:03 +0200 Subject: Forgot to add space function to immediate publisher --- embassy-sync/src/pubsub/publisher.rs | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'embassy-sync') diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index faa67d947..e1edc9eb9 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -123,6 +123,14 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { pub fn try_publish(&self, message: T) -> Result<(), T> { self.channel.publish_with_context(message, None) } + + /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// + /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. + /// So checking doesn't give any guarantees.* + pub fn space(&self) -> usize { + self.channel.space() + } } /// An immediate publisher that holds a dynamic reference to the channel -- cgit