aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorMathias <[email protected]>2022-10-26 11:47:00 +0200
committerMathias <[email protected]>2022-10-26 11:47:00 +0200
commitd1eee5262580f4bb9f8ce357d8315ca7f4064900 (patch)
tree7465b2456066cbf9aeb269dda705ad9f074b7c8e /embassy-sync
parent38faae26e5fc2d2ec10ac4d513628bded4c628ef (diff)
parentce1cba761c2942b7faa27f4098487c6468784729 (diff)
Merge branch 'master' of https://github.com/embassy-rs/embassy into embassy-stm32/uart-flowcontrol
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/Cargo.toml10
-rw-r--r--embassy-sync/README.md24
-rw-r--r--embassy-sync/src/pubsub/mod.rs73
-rw-r--r--embassy-sync/src/pubsub/publisher.rs17
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs6
5 files changed, 128 insertions, 2 deletions
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
index 14ab1d003..584d5ba9f 100644
--- a/embassy-sync/Cargo.toml
+++ b/embassy-sync/Cargo.toml
@@ -2,6 +2,16 @@
2name = "embassy-sync" 2name = "embassy-sync"
3version = "0.1.0" 3version = "0.1.0"
4edition = "2021" 4edition = "2021"
5description = "no-std, no-alloc synchronization primitives with async support"
6repository = "https://github.com/embassy-rs/embassy"
7readme = "README.md"
8license = "MIT OR Apache-2.0"
9categories = [
10 "embedded",
11 "no-std",
12 "concurrency",
13 "asynchronous",
14]
5 15
6[package.metadata.embassy_docs] 16[package.metadata.embassy_docs]
7src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/" 17src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/"
diff --git a/embassy-sync/README.md b/embassy-sync/README.md
index 106295c0d..cc65cf6ef 100644
--- a/embassy-sync/README.md
+++ b/embassy-sync/README.md
@@ -1,12 +1,32 @@
1# embassy-sync 1# embassy-sync
2 2
3Synchronization primitives and data structures with an async API: 3An [Embassy](https://embassy.dev) project.
4
5Synchronization primitives and data structures with async support:
4 6
5- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. 7- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer.
6- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. 8- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers.
7- [`Signal`](signal::Signal) - Signalling latest value to a single consumer. 9- [`Signal`](signal::Signal) - Signalling latest value to a single consumer.
8- [`Mutex`](mutex::Mutex) - A Mutex for synchronizing state between asynchronous tasks. 10- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks.
9- [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits. 11- [`Pipe`](pipe::Pipe) - Byte stream implementing `embedded_io` traits.
10- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. 12- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`.
11- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. 13- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API.
12- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. 14- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s.
15
16## Interoperability
17
18Futures from this crate can run on any executor.
19
20## Minimum supported Rust version (MSRV)
21
22Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release.
23
24## License
25
26This work is licensed under either of
27
28- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or
29 <http://www.apache.org/licenses/LICENSE-2.0>)
30- MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
31
32at your option.
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 62a9e4763..faaf99dc6 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
192 }) 192 })
193 } 193 }
194 194
195 fn available(&self, next_message_id: u64) -> u64 {
196 self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
197 }
198
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { 199 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| { 200 self.inner.lock(|s| {
197 let mut s = s.borrow_mut(); 201 let mut s = s.borrow_mut();
@@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
217 }) 221 })
218 } 222 }
219 223
224 fn space(&self) -> usize {
225 self.inner.lock(|s| {
226 let s = s.borrow();
227 s.queue.capacity() - s.queue.len()
228 })
229 }
230
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 231 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| { 232 self.inner.lock(|s| {
222 let mut s = s.borrow_mut(); 233 let mut s = s.borrow_mut();
@@ -388,6 +399,10 @@ pub trait PubSubBehavior<T> {
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 399 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 400 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390 401
402 /// Get the amount of messages that are between the given the next_message_id and the most recent message.
403 /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
404 fn available(&self, next_message_id: u64) -> u64;
405
391 /// Try to publish a message to the queue. 406 /// Try to publish a message to the queue.
392 /// 407 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. 408 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
@@ -396,6 +411,9 @@ pub trait PubSubBehavior<T> {
396 /// Publish a message immediately 411 /// Publish a message immediately
397 fn publish_immediate(&self, message: T); 412 fn publish_immediate(&self, message: T);
398 413
414 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
415 fn space(&self) -> usize;
416
399 /// Let the channel know that a subscriber has dropped 417 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 418 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401 419
@@ -539,4 +557,59 @@ mod tests {
539 557
540 drop(sub0); 558 drop(sub0);
541 } 559 }
560
561 #[futures_test::test]
562 async fn correct_available() {
563 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
564
565 let sub0 = channel.subscriber().unwrap();
566 let mut sub1 = channel.subscriber().unwrap();
567 let pub0 = channel.publisher().unwrap();
568
569 assert_eq!(sub0.available(), 0);
570 assert_eq!(sub1.available(), 0);
571
572 pub0.publish(42).await;
573
574 assert_eq!(sub0.available(), 1);
575 assert_eq!(sub1.available(), 1);
576
577 sub1.next_message().await;
578
579 assert_eq!(sub1.available(), 0);
580
581 pub0.publish(42).await;
582
583 assert_eq!(sub0.available(), 2);
584 assert_eq!(sub1.available(), 1);
585 }
586
587 #[futures_test::test]
588 async fn correct_space() {
589 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
590
591 let mut sub0 = channel.subscriber().unwrap();
592 let mut sub1 = channel.subscriber().unwrap();
593 let pub0 = channel.publisher().unwrap();
594
595 assert_eq!(pub0.space(), 4);
596
597 pub0.publish(42).await;
598
599 assert_eq!(pub0.space(), 3);
600
601 pub0.publish(42).await;
602
603 assert_eq!(pub0.space(), 2);
604
605 sub0.next_message().await;
606 sub0.next_message().await;
607
608 assert_eq!(pub0.space(), 2);
609
610 sub1.next_message().await;
611 assert_eq!(pub0.space(), 3);
612 sub1.next_message().await;
613 assert_eq!(pub0.space(), 4);
614 }
542} 615}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index 705797f60..e1edc9eb9 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
42 pub fn try_publish(&self, message: T) -> Result<(), T> { 42 pub fn try_publish(&self, message: T) -> Result<(), T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
47 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
49 /// So checking doesn't give any guarantees.*
50 pub fn space(&self) -> usize {
51 self.channel.space()
52 }
45} 53}
46 54
47impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { 55impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
@@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
115 pub fn try_publish(&self, message: T) -> Result<(), T> { 123 pub fn try_publish(&self, message: T) -> Result<(), T> {
116 self.channel.publish_with_context(message, None) 124 self.channel.publish_with_context(message, None)
117 } 125 }
126
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers
128 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something.
130 /// So checking doesn't give any guarantees.*
131 pub fn space(&self) -> usize {
132 self.channel.space()
133 }
118} 134}
119 135
120/// An immediate publisher that holds a dynamic reference to the channel 136/// An immediate publisher that holds a dynamic reference to the channel
@@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
158} 174}
159 175
160/// Future for the publisher wait action 176/// Future for the publisher wait action
177#[must_use = "futures do nothing unless you `.await` or poll them"]
161pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 178pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
162 /// The message we need to publish 179 /// The message we need to publish
163 message: Option<T>, 180 message: Option<T>,
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index b9a2cbe18..f420a75f0 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
64 } 64 }
65 } 65 }
66 } 66 }
67
68 /// The amount of messages this subscriber hasn't received yet
69 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id)
71 }
67} 72}
68 73
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
@@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS:
135} 140}
136 141
137/// Future for the subscriber wait action 142/// Future for the subscriber wait action
143#[must_use = "futures do nothing unless you `.await` or poll them"]
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { 144pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>, 145 subscriber: &'s mut Sub<'a, PSB, T>,
140} 146}