diff options
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 73 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 9 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 6 |
3 files changed, 88 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..faaf99dc6 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 192 | }) | 192 | }) |
| 193 | } | 193 | } |
| 194 | 194 | ||
| 195 | fn available(&self, next_message_id: u64) -> u64 { | ||
| 196 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) | ||
| 197 | } | ||
| 198 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | 199 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 196 | self.inner.lock(|s| { | 200 | self.inner.lock(|s| { |
| 197 | let mut s = s.borrow_mut(); | 201 | let mut s = s.borrow_mut(); |
| @@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 217 | }) | 221 | }) |
| 218 | } | 222 | } |
| 219 | 223 | ||
| 224 | fn space(&self) -> usize { | ||
| 225 | self.inner.lock(|s| { | ||
| 226 | let s = s.borrow(); | ||
| 227 | s.queue.capacity() - s.queue.len() | ||
| 228 | }) | ||
| 229 | } | ||
| 230 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 231 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 221 | self.inner.lock(|s| { | 232 | self.inner.lock(|s| { |
| 222 | let mut s = s.borrow_mut(); | 233 | let mut s = s.borrow_mut(); |
| @@ -388,6 +399,10 @@ pub trait PubSubBehavior<T> { | |||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | 399 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. |
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | 400 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 390 | 401 | ||
| 402 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. | ||
| 403 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. | ||
| 404 | fn available(&self, next_message_id: u64) -> u64; | ||
| 405 | |||
| 391 | /// Try to publish a message to the queue. | 406 | /// Try to publish a message to the queue. |
| 392 | /// | 407 | /// |
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 408 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| @@ -396,6 +411,9 @@ pub trait PubSubBehavior<T> { | |||
| 396 | /// Publish a message immediately | 411 | /// Publish a message immediately |
| 397 | fn publish_immediate(&self, message: T); | 412 | fn publish_immediate(&self, message: T); |
| 398 | 413 | ||
| 414 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 415 | fn space(&self) -> usize; | ||
| 416 | |||
| 399 | /// Let the channel know that a subscriber has dropped | 417 | /// Let the channel know that a subscriber has dropped |
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 418 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 401 | 419 | ||
| @@ -539,4 +557,59 @@ mod tests { | |||
| 539 | 557 | ||
| 540 | drop(sub0); | 558 | drop(sub0); |
| 541 | } | 559 | } |
| 560 | |||
| 561 | #[futures_test::test] | ||
| 562 | async fn correct_available() { | ||
| 563 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 564 | |||
| 565 | let sub0 = channel.subscriber().unwrap(); | ||
| 566 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 567 | let pub0 = channel.publisher().unwrap(); | ||
| 568 | |||
| 569 | assert_eq!(sub0.available(), 0); | ||
| 570 | assert_eq!(sub1.available(), 0); | ||
| 571 | |||
| 572 | pub0.publish(42).await; | ||
| 573 | |||
| 574 | assert_eq!(sub0.available(), 1); | ||
| 575 | assert_eq!(sub1.available(), 1); | ||
| 576 | |||
| 577 | sub1.next_message().await; | ||
| 578 | |||
| 579 | assert_eq!(sub1.available(), 0); | ||
| 580 | |||
| 581 | pub0.publish(42).await; | ||
| 582 | |||
| 583 | assert_eq!(sub0.available(), 2); | ||
| 584 | assert_eq!(sub1.available(), 1); | ||
| 585 | } | ||
| 586 | |||
| 587 | #[futures_test::test] | ||
| 588 | async fn correct_space() { | ||
| 589 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 590 | |||
| 591 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 592 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 593 | let pub0 = channel.publisher().unwrap(); | ||
| 594 | |||
| 595 | assert_eq!(pub0.space(), 4); | ||
| 596 | |||
| 597 | pub0.publish(42).await; | ||
| 598 | |||
| 599 | assert_eq!(pub0.space(), 3); | ||
| 600 | |||
| 601 | pub0.publish(42).await; | ||
| 602 | |||
| 603 | assert_eq!(pub0.space(), 2); | ||
| 604 | |||
| 605 | sub0.next_message().await; | ||
| 606 | sub0.next_message().await; | ||
| 607 | |||
| 608 | assert_eq!(pub0.space(), 2); | ||
| 609 | |||
| 610 | sub1.next_message().await; | ||
| 611 | assert_eq!(pub0.space(), 3); | ||
| 612 | sub1.next_message().await; | ||
| 613 | assert_eq!(pub0.space(), 4); | ||
| 614 | } | ||
| 542 | } | 615 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..faa67d947 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 43 | self.channel.publish_with_context(message, None) | 43 | self.channel.publish_with_context(message, None) |
| 44 | } | 44 | } |
| 45 | |||
| 46 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 47 | /// | ||
| 48 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 49 | /// So checking doesn't give any guarantees.* | ||
| 50 | pub fn space(&self) -> usize { | ||
| 51 | self.channel.space() | ||
| 52 | } | ||
| 45 | } | 53 | } |
| 46 | 54 | ||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 55 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -158,6 +166,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 158 | } | 166 | } |
| 159 | 167 | ||
| 160 | /// Future for the publisher wait action | 168 | /// Future for the publisher wait action |
| 169 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 170 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 162 | /// The message we need to publish | 171 | /// The message we need to publish |
| 163 | message: Option<T>, | 172 | message: Option<T>, |
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..f420a75f0 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||
| 64 | } | 64 | } |
| 65 | } | 65 | } |
| 66 | } | 66 | } |
| 67 | |||
| 68 | /// The amount of messages this subscriber hasn't received yet | ||
| 69 | pub fn available(&self) -> u64 { | ||
| 70 | self.channel.available(self.next_message_id) | ||
| 71 | } | ||
| 67 | } | 72 | } |
| 68 | 73 | ||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | 74 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
| @@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 135 | } | 140 | } |
| 136 | 141 | ||
| 137 | /// Future for the subscriber wait action | 142 | /// Future for the subscriber wait action |
| 143 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 144 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | 145 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 140 | } | 146 | } |
