diff options
| author | Mathias <[email protected]> | 2022-10-24 12:14:26 +0200 |
|---|---|---|
| committer | Mathias <[email protected]> | 2022-10-24 12:14:26 +0200 |
| commit | 8d809c96ecf2fabf77f0fb72f2a59acd18306bf2 (patch) | |
| tree | da3e28e491bbaadbc448b9a021291e2164b7531e /embassy-sync | |
| parent | 7152031229da19005e5b0d52c8c72d359d3e0daa (diff) | |
| parent | ce1cba761c2942b7faa27f4098487c6468784729 (diff) | |
Merge branch 'master' of https://github.com/embassy-rs/embassy into embassy-rp/flash
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/Cargo.toml | 10 | ||||
| -rw-r--r-- | embassy-sync/README.md | 24 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 73 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 17 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 6 |
5 files changed, 128 insertions, 2 deletions
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 14ab1d003..584d5ba9f 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml | |||
| @@ -2,6 +2,16 @@ | |||
| 2 | name = "embassy-sync" | 2 | name = "embassy-sync" |
| 3 | version = "0.1.0" | 3 | version = "0.1.0" |
| 4 | edition = "2021" | 4 | edition = "2021" |
| 5 | description = "no-std, no-alloc synchronization primitives with async support" | ||
| 6 | repository = "https://github.com/embassy-rs/embassy" | ||
| 7 | readme = "README.md" | ||
| 8 | license = "MIT OR Apache-2.0" | ||
| 9 | categories = [ | ||
| 10 | "embedded", | ||
| 11 | "no-std", | ||
| 12 | "concurrency", | ||
| 13 | "asynchronous", | ||
| 14 | ] | ||
| 5 | 15 | ||
| 6 | [package.metadata.embassy_docs] | 16 | [package.metadata.embassy_docs] |
| 7 | src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" | 17 | src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" |
diff --git a/embassy-sync/README.md b/embassy-sync/README.md index 106295c0d..cc65cf6ef 100644 --- a/embassy-sync/README.md +++ b/embassy-sync/README.md | |||
| @@ -1,12 +1,32 @@ | |||
| 1 | # embassy-sync | 1 | # embassy-sync |
| 2 | 2 | ||
| 3 | Synchronization primitives and data structures with an async API: | 3 | An [Embassy](https://embassy.dev) project. |
| 4 | |||
| 5 | Synchronization primitives and data structures with async support: | ||
| 4 | 6 | ||
| 5 | - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. | 7 | - [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. |
| 6 | - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. | 8 | - [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. |
| 7 | - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. | 9 | - [`Signal`](signal::Signal) - Signalling latest value to a single consumer. |
| 8 | - [`Mutex`](mutex::Mutex) - A Mutex for synchronizing state between asynchronous tasks. | 10 | - [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. |
| 9 | - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. | 11 | - [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. |
| 10 | - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. | 12 | - [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. |
| 11 | - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. | 13 | - [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. |
| 12 | - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. | 14 | - [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. |
| 15 | |||
| 16 | ## Interoperability | ||
| 17 | |||
| 18 | Futures from this crate can run on any executor. | ||
| 19 | |||
| 20 | ## Minimum supported Rust version (MSRV) | ||
| 21 | |||
| 22 | Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release. | ||
| 23 | |||
| 24 | ## License | ||
| 25 | |||
| 26 | This work is licensed under either of | ||
| 27 | |||
| 28 | - Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or | ||
| 29 | <http://www.apache.org/licenses/LICENSE-2.0>) | ||
| 30 | - MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>) | ||
| 31 | |||
| 32 | at your option. | ||
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..faaf99dc6 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 192 | }) | 192 | }) |
| 193 | } | 193 | } |
| 194 | 194 | ||
| 195 | fn available(&self, next_message_id: u64) -> u64 { | ||
| 196 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) | ||
| 197 | } | ||
| 198 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | 199 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 196 | self.inner.lock(|s| { | 200 | self.inner.lock(|s| { |
| 197 | let mut s = s.borrow_mut(); | 201 | let mut s = s.borrow_mut(); |
| @@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 217 | }) | 221 | }) |
| 218 | } | 222 | } |
| 219 | 223 | ||
| 224 | fn space(&self) -> usize { | ||
| 225 | self.inner.lock(|s| { | ||
| 226 | let s = s.borrow(); | ||
| 227 | s.queue.capacity() - s.queue.len() | ||
| 228 | }) | ||
| 229 | } | ||
| 230 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 231 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 221 | self.inner.lock(|s| { | 232 | self.inner.lock(|s| { |
| 222 | let mut s = s.borrow_mut(); | 233 | let mut s = s.borrow_mut(); |
| @@ -388,6 +399,10 @@ pub trait PubSubBehavior<T> { | |||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | 399 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. |
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | 400 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 390 | 401 | ||
| 402 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. | ||
| 403 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. | ||
| 404 | fn available(&self, next_message_id: u64) -> u64; | ||
| 405 | |||
| 391 | /// Try to publish a message to the queue. | 406 | /// Try to publish a message to the queue. |
| 392 | /// | 407 | /// |
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 408 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| @@ -396,6 +411,9 @@ pub trait PubSubBehavior<T> { | |||
| 396 | /// Publish a message immediately | 411 | /// Publish a message immediately |
| 397 | fn publish_immediate(&self, message: T); | 412 | fn publish_immediate(&self, message: T); |
| 398 | 413 | ||
| 414 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 415 | fn space(&self) -> usize; | ||
| 416 | |||
| 399 | /// Let the channel know that a subscriber has dropped | 417 | /// Let the channel know that a subscriber has dropped |
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 418 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 401 | 419 | ||
| @@ -539,4 +557,59 @@ mod tests { | |||
| 539 | 557 | ||
| 540 | drop(sub0); | 558 | drop(sub0); |
| 541 | } | 559 | } |
| 560 | |||
| 561 | #[futures_test::test] | ||
| 562 | async fn correct_available() { | ||
| 563 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 564 | |||
| 565 | let sub0 = channel.subscriber().unwrap(); | ||
| 566 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 567 | let pub0 = channel.publisher().unwrap(); | ||
| 568 | |||
| 569 | assert_eq!(sub0.available(), 0); | ||
| 570 | assert_eq!(sub1.available(), 0); | ||
| 571 | |||
| 572 | pub0.publish(42).await; | ||
| 573 | |||
| 574 | assert_eq!(sub0.available(), 1); | ||
| 575 | assert_eq!(sub1.available(), 1); | ||
| 576 | |||
| 577 | sub1.next_message().await; | ||
| 578 | |||
| 579 | assert_eq!(sub1.available(), 0); | ||
| 580 | |||
| 581 | pub0.publish(42).await; | ||
| 582 | |||
| 583 | assert_eq!(sub0.available(), 2); | ||
| 584 | assert_eq!(sub1.available(), 1); | ||
| 585 | } | ||
| 586 | |||
| 587 | #[futures_test::test] | ||
| 588 | async fn correct_space() { | ||
| 589 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 590 | |||
| 591 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 592 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 593 | let pub0 = channel.publisher().unwrap(); | ||
| 594 | |||
| 595 | assert_eq!(pub0.space(), 4); | ||
| 596 | |||
| 597 | pub0.publish(42).await; | ||
| 598 | |||
| 599 | assert_eq!(pub0.space(), 3); | ||
| 600 | |||
| 601 | pub0.publish(42).await; | ||
| 602 | |||
| 603 | assert_eq!(pub0.space(), 2); | ||
| 604 | |||
| 605 | sub0.next_message().await; | ||
| 606 | sub0.next_message().await; | ||
| 607 | |||
| 608 | assert_eq!(pub0.space(), 2); | ||
| 609 | |||
| 610 | sub1.next_message().await; | ||
| 611 | assert_eq!(pub0.space(), 3); | ||
| 612 | sub1.next_message().await; | ||
| 613 | assert_eq!(pub0.space(), 4); | ||
| 614 | } | ||
| 542 | } | 615 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..e1edc9eb9 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 43 | self.channel.publish_with_context(message, None) | 43 | self.channel.publish_with_context(message, None) |
| 44 | } | 44 | } |
| 45 | |||
| 46 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 47 | /// | ||
| 48 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 49 | /// So checking doesn't give any guarantees.* | ||
| 50 | pub fn space(&self) -> usize { | ||
| 51 | self.channel.space() | ||
| 52 | } | ||
| 45 | } | 53 | } |
| 46 | 54 | ||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 55 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { | |||
| 115 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 123 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 116 | self.channel.publish_with_context(message, None) | 124 | self.channel.publish_with_context(message, None) |
| 117 | } | 125 | } |
| 126 | |||
| 127 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 128 | /// | ||
| 129 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 130 | /// So checking doesn't give any guarantees.* | ||
| 131 | pub fn space(&self) -> usize { | ||
| 132 | self.channel.space() | ||
| 133 | } | ||
| 118 | } | 134 | } |
| 119 | 135 | ||
| 120 | /// An immediate publisher that holds a dynamic reference to the channel | 136 | /// An immediate publisher that holds a dynamic reference to the channel |
| @@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 158 | } | 174 | } |
| 159 | 175 | ||
| 160 | /// Future for the publisher wait action | 176 | /// Future for the publisher wait action |
| 177 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 178 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 162 | /// The message we need to publish | 179 | /// The message we need to publish |
| 163 | message: Option<T>, | 180 | message: Option<T>, |
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..f420a75f0 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||
| 64 | } | 64 | } |
| 65 | } | 65 | } |
| 66 | } | 66 | } |
| 67 | |||
| 68 | /// The amount of messages this subscriber hasn't received yet | ||
| 69 | pub fn available(&self) -> u64 { | ||
| 70 | self.channel.available(self.next_message_id) | ||
| 71 | } | ||
| 67 | } | 72 | } |
| 68 | 73 | ||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | 74 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
| @@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 135 | } | 140 | } |
| 136 | 141 | ||
| 137 | /// Future for the subscriber wait action | 142 | /// Future for the subscriber wait action |
| 143 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 144 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | 145 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 140 | } | 146 | } |
