diff options
| author | Dion Dokter <[email protected]> | 2022-09-29 14:24:42 +0200 |
|---|---|---|
| committer | Dion Dokter <[email protected]> | 2022-09-29 14:24:42 +0200 |
| commit | f4ebc36b638a081b4a8b68ae72c4cca5199c4c4c (patch) | |
| tree | f88ba15ae619bcdb4bd6cae7574206063ae289fc /embassy-sync/src/pubsub | |
| parent | 8b9f4ad259ebb2c4fd2258f1c12ac33a5a8f8976 (diff) | |
Futures in pub & sub are now awaited instead of returned for better user compiler diagnostics.
Added functions for reading how many messages are available
Diffstat (limited to 'embassy-sync/src/pubsub')
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 73 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 13 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 11 |
3 files changed, 92 insertions, 5 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..335d7e33e 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 192 | }) | 192 | }) |
| 193 | } | 193 | } |
| 194 | 194 | ||
| 195 | fn available(&self, next_message_id: u64) -> u64 { | ||
| 196 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) | ||
| 197 | } | ||
| 198 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | 199 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 196 | self.inner.lock(|s| { | 200 | self.inner.lock(|s| { |
| 197 | let mut s = s.borrow_mut(); | 201 | let mut s = s.borrow_mut(); |
| @@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 217 | }) | 221 | }) |
| 218 | } | 222 | } |
| 219 | 223 | ||
| 224 | fn space(&self) -> usize { | ||
| 225 | self.inner.lock(|s| { | ||
| 226 | let s = s.borrow(); | ||
| 227 | s.queue.capacity() - s.queue.len() | ||
| 228 | }) | ||
| 229 | } | ||
| 230 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 231 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 221 | self.inner.lock(|s| { | 232 | self.inner.lock(|s| { |
| 222 | let mut s = s.borrow_mut(); | 233 | let mut s = s.borrow_mut(); |
| @@ -388,6 +399,10 @@ pub trait PubSubBehavior<T> { | |||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | 399 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. |
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | 400 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 390 | 401 | ||
| 402 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. | ||
| 403 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. | ||
| 404 | fn available(&self, next_message_id: u64) -> u64; | ||
| 405 | |||
| 391 | /// Try to publish a message to the queue. | 406 | /// Try to publish a message to the queue. |
| 392 | /// | 407 | /// |
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 408 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| @@ -396,6 +411,9 @@ pub trait PubSubBehavior<T> { | |||
| 396 | /// Publish a message immediately | 411 | /// Publish a message immediately |
| 397 | fn publish_immediate(&self, message: T); | 412 | fn publish_immediate(&self, message: T); |
| 398 | 413 | ||
| 414 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 415 | fn space(&self) -> usize; | ||
| 416 | |||
| 399 | /// Let the channel know that a subscriber has dropped | 417 | /// Let the channel know that a subscriber has dropped |
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 418 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 401 | 419 | ||
| @@ -539,4 +557,59 @@ mod tests { | |||
| 539 | 557 | ||
| 540 | drop(sub0); | 558 | drop(sub0); |
| 541 | } | 559 | } |
| 560 | |||
| 561 | #[futures_test::test] | ||
| 562 | async fn correct_available() { | ||
| 563 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 564 | |||
| 565 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 566 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 567 | let pub0 = channel.publisher().unwrap(); | ||
| 568 | |||
| 569 | assert_eq!(sub0.available(), 0); | ||
| 570 | assert_eq!(sub1.available(), 0); | ||
| 571 | |||
| 572 | pub0.publish(42).await; | ||
| 573 | |||
| 574 | assert_eq!(sub0.available(), 1); | ||
| 575 | assert_eq!(sub1.available(), 1); | ||
| 576 | |||
| 577 | sub1.next_message().await; | ||
| 578 | |||
| 579 | assert_eq!(sub1.available(), 0); | ||
| 580 | |||
| 581 | pub0.publish(42).await; | ||
| 582 | |||
| 583 | assert_eq!(sub0.available(), 2); | ||
| 584 | assert_eq!(sub1.available(), 1); | ||
| 585 | } | ||
| 586 | |||
| 587 | #[futures_test::test] | ||
| 588 | async fn correct_space() { | ||
| 589 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 590 | |||
| 591 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 592 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 593 | let pub0 = channel.publisher().unwrap(); | ||
| 594 | |||
| 595 | assert_eq!(pub0.space(), 4); | ||
| 596 | |||
| 597 | pub0.publish(42).await; | ||
| 598 | |||
| 599 | assert_eq!(pub0.space(), 3); | ||
| 600 | |||
| 601 | pub0.publish(42).await; | ||
| 602 | |||
| 603 | assert_eq!(pub0.space(), 2); | ||
| 604 | |||
| 605 | sub0.next_message().await; | ||
| 606 | sub0.next_message().await; | ||
| 607 | |||
| 608 | assert_eq!(pub0.space(), 2); | ||
| 609 | |||
| 610 | sub1.next_message().await; | ||
| 611 | assert_eq!(pub0.space(), 3); | ||
| 612 | sub1.next_message().await; | ||
| 613 | assert_eq!(pub0.space(), 4); | ||
| 614 | } | ||
| 542 | } | 615 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..484f1dbfd 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -31,17 +31,26 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 31 | } | 31 | } |
| 32 | 32 | ||
| 33 | /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message | 33 | /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message |
| 34 | pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { | 34 | pub async fn publish<'s>(&'s self, message: T) { |
| 35 | PublisherWaitFuture { | 35 | PublisherWaitFuture { |
| 36 | message: Some(message), | 36 | message: Some(message), |
| 37 | publisher: self, | 37 | publisher: self, |
| 38 | } | 38 | } |
| 39 | .await | ||
| 39 | } | 40 | } |
| 40 | 41 | ||
| 41 | /// Publish a message if there is space in the message queue | 42 | /// Publish a message if there is space in the message queue |
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 43 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 43 | self.channel.publish_with_context(message, None) | 44 | self.channel.publish_with_context(message, None) |
| 44 | } | 45 | } |
| 46 | |||
| 47 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 48 | /// | ||
| 49 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 50 | /// So checking doesn't give any guarantees.* | ||
| 51 | pub fn space(&self) -> usize { | ||
| 52 | self.channel.space() | ||
| 53 | } | ||
| 45 | } | 54 | } |
| 46 | 55 | ||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 56 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -158,7 +167,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 158 | } | 167 | } |
| 159 | 168 | ||
| 160 | /// Future for the publisher wait action | 169 | /// Future for the publisher wait action |
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 170 | struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 162 | /// The message we need to publish | 171 | /// The message we need to publish |
| 163 | message: Option<T>, | 172 | message: Option<T>, |
| 164 | publisher: &'s Pub<'a, PSB, T>, | 173 | publisher: &'s Pub<'a, PSB, T>, |
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..8a8e9144b 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -28,8 +28,8 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||
| 28 | } | 28 | } |
| 29 | 29 | ||
| 30 | /// Wait for a published message | 30 | /// Wait for a published message |
| 31 | pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { | 31 | pub async fn next_message(&mut self) -> WaitResult<T> { |
| 32 | SubscriberWaitFuture { subscriber: self } | 32 | SubscriberWaitFuture { subscriber: self }.await |
| 33 | } | 33 | } |
| 34 | 34 | ||
| 35 | /// Wait for a published message (ignoring lag results) | 35 | /// Wait for a published message (ignoring lag results) |
| @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||
| 64 | } | 64 | } |
| 65 | } | 65 | } |
| 66 | } | 66 | } |
| 67 | |||
| 68 | /// The amount of messages this subscriber hasn't received yet | ||
| 69 | pub fn available(&self) -> u64 { | ||
| 70 | self.channel.available(self.next_message_id) | ||
| 71 | } | ||
| 67 | } | 72 | } |
| 68 | 73 | ||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | 74 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
| @@ -135,7 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 135 | } | 140 | } |
| 136 | 141 | ||
| 137 | /// Future for the subscriber wait action | 142 | /// Future for the subscriber wait action |
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 143 | struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | 144 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 140 | } | 145 | } |
| 141 | 146 | ||
