diff options
| -rw-r--r-- | embassy/src/channel/pubsub.rs | 55 |
1 files changed, 43 insertions, 12 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index fb8d0ef5f..5d81431ec 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs | |||
| @@ -270,14 +270,14 @@ pub struct Subscriber<'a, T: Clone> { | |||
| 270 | 270 | ||
| 271 | impl<'a, T: Clone> Subscriber<'a, T> { | 271 | impl<'a, T: Clone> Subscriber<'a, T> { |
| 272 | /// Wait for a published message | 272 | /// Wait for a published message |
| 273 | pub fn wait<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { | 273 | pub fn next<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { |
| 274 | SubscriberWaitFuture { subscriber: self } | 274 | SubscriberWaitFuture { subscriber: self } |
| 275 | } | 275 | } |
| 276 | 276 | ||
| 277 | /// Try to see if there's a published message we haven't received yet. | 277 | /// Try to see if there's a published message we haven't received yet. |
| 278 | /// | 278 | /// |
| 279 | /// This function does not peek. The message is received if there is one. | 279 | /// This function does not peek. The message is received if there is one. |
| 280 | pub fn check(&mut self) -> Option<WaitResult<T>> { | 280 | pub fn try_next(&mut self) -> Option<WaitResult<T>> { |
| 281 | match self.channel.get_message(self.next_message_id) { | 281 | match self.channel.get_message(self.next_message_id) { |
| 282 | Some(WaitResult::Lagged(amount)) => { | 282 | Some(WaitResult::Lagged(amount)) => { |
| 283 | self.next_message_id += amount; | 283 | self.next_message_id += amount; |
| @@ -300,6 +300,37 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> { | |||
| 300 | } | 300 | } |
| 301 | } | 301 | } |
| 302 | 302 | ||
| 303 | impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | ||
| 304 | type Item = WaitResult<T>; | ||
| 305 | |||
| 306 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 307 | let this = unsafe { self.get_unchecked_mut() }; | ||
| 308 | |||
| 309 | // Check if we can read a message | ||
| 310 | match this.channel.get_message(this.next_message_id) { | ||
| 311 | // Yes, so we are done polling | ||
| 312 | Some(WaitResult::Message(message)) => { | ||
| 313 | this.next_message_id += 1; | ||
| 314 | Poll::Ready(Some(WaitResult::Message(message))) | ||
| 315 | } | ||
| 316 | // No, so we need to reregister our waker and sleep again | ||
| 317 | None => { | ||
| 318 | unsafe { | ||
| 319 | this | ||
| 320 | .channel | ||
| 321 | .register_subscriber_waker(this.subscriber_index, cx.waker()); | ||
| 322 | } | ||
| 323 | Poll::Pending | ||
| 324 | } | ||
| 325 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged | ||
| 326 | Some(WaitResult::Lagged(amount)) => { | ||
| 327 | this.next_message_id += amount; | ||
| 328 | Poll::Ready(Some(WaitResult::Lagged(amount))) | ||
| 329 | } | ||
| 330 | } | ||
| 331 | } | ||
| 332 | } | ||
| 333 | |||
| 303 | /// A publisher to a channel | 334 | /// A publisher to a channel |
| 304 | /// | 335 | /// |
| 305 | /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's | 336 | /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's |
| @@ -494,11 +525,11 @@ mod tests { | |||
| 494 | 525 | ||
| 495 | pub0.publish(42).await; | 526 | pub0.publish(42).await; |
| 496 | 527 | ||
| 497 | assert_eq!(sub0.wait().await, WaitResult::Message(42)); | 528 | assert_eq!(sub0.next().await, WaitResult::Message(42)); |
| 498 | assert_eq!(sub1.wait().await, WaitResult::Message(42)); | 529 | assert_eq!(sub1.next().await, WaitResult::Message(42)); |
| 499 | 530 | ||
| 500 | assert_eq!(sub0.check(), None); | 531 | assert_eq!(sub0.try_next(), None); |
| 501 | assert_eq!(sub1.check(), None); | 532 | assert_eq!(sub1.try_next(), None); |
| 502 | } | 533 | } |
| 503 | 534 | ||
| 504 | #[futures_test::test] | 535 | #[futures_test::test] |
| @@ -515,12 +546,12 @@ mod tests { | |||
| 515 | pub0.publish_immediate(46); | 546 | pub0.publish_immediate(46); |
| 516 | pub0.publish_immediate(47); | 547 | pub0.publish_immediate(47); |
| 517 | 548 | ||
| 518 | assert_eq!(sub0.check(), Some(WaitResult::Lagged(2))); | 549 | assert_eq!(sub0.try_next(), Some(WaitResult::Lagged(2))); |
| 519 | assert_eq!(sub0.wait().await, WaitResult::Message(44)); | 550 | assert_eq!(sub0.next().await, WaitResult::Message(44)); |
| 520 | assert_eq!(sub0.wait().await, WaitResult::Message(45)); | 551 | assert_eq!(sub0.next().await, WaitResult::Message(45)); |
| 521 | assert_eq!(sub0.wait().await, WaitResult::Message(46)); | 552 | assert_eq!(sub0.next().await, WaitResult::Message(46)); |
| 522 | assert_eq!(sub0.wait().await, WaitResult::Message(47)); | 553 | assert_eq!(sub0.next().await, WaitResult::Message(47)); |
| 523 | assert_eq!(sub0.check(), None); | 554 | assert_eq!(sub0.try_next(), None); |
| 524 | } | 555 | } |
| 525 | 556 | ||
| 526 | #[test] | 557 | #[test] |
