aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/channel/pubsub.rs55
1 files changed, 43 insertions, 12 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs
index fb8d0ef5f..5d81431ec 100644
--- a/embassy/src/channel/pubsub.rs
+++ b/embassy/src/channel/pubsub.rs
@@ -270,14 +270,14 @@ pub struct Subscriber<'a, T: Clone> {
270 270
271impl<'a, T: Clone> Subscriber<'a, T> { 271impl<'a, T: Clone> Subscriber<'a, T> {
272 /// Wait for a published message 272 /// Wait for a published message
273 pub fn wait<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> { 273 pub fn next<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, T> {
274 SubscriberWaitFuture { subscriber: self } 274 SubscriberWaitFuture { subscriber: self }
275 } 275 }
276 276
277 /// Try to see if there's a published message we haven't received yet. 277 /// Try to see if there's a published message we haven't received yet.
278 /// 278 ///
279 /// This function does not peek. The message is received if there is one. 279 /// This function does not peek. The message is received if there is one.
280 pub fn check(&mut self) -> Option<WaitResult<T>> { 280 pub fn try_next(&mut self) -> Option<WaitResult<T>> {
281 match self.channel.get_message(self.next_message_id) { 281 match self.channel.get_message(self.next_message_id) {
282 Some(WaitResult::Lagged(amount)) => { 282 Some(WaitResult::Lagged(amount)) => {
283 self.next_message_id += amount; 283 self.next_message_id += amount;
@@ -300,6 +300,37 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> {
300 } 300 }
301} 301}
302 302
303impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
304 type Item = WaitResult<T>;
305
306 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
307 let this = unsafe { self.get_unchecked_mut() };
308
309 // Check if we can read a message
310 match this.channel.get_message(this.next_message_id) {
311 // Yes, so we are done polling
312 Some(WaitResult::Message(message)) => {
313 this.next_message_id += 1;
314 Poll::Ready(Some(WaitResult::Message(message)))
315 }
316 // No, so we need to reregister our waker and sleep again
317 None => {
318 unsafe {
319 this
320 .channel
321 .register_subscriber_waker(this.subscriber_index, cx.waker());
322 }
323 Poll::Pending
324 }
325 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
326 Some(WaitResult::Lagged(amount)) => {
327 this.next_message_id += amount;
328 Poll::Ready(Some(WaitResult::Lagged(amount)))
329 }
330 }
331 }
332}
333
303/// A publisher to a channel 334/// A publisher to a channel
304/// 335///
305/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's 336/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
@@ -494,11 +525,11 @@ mod tests {
494 525
495 pub0.publish(42).await; 526 pub0.publish(42).await;
496 527
497 assert_eq!(sub0.wait().await, WaitResult::Message(42)); 528 assert_eq!(sub0.next().await, WaitResult::Message(42));
498 assert_eq!(sub1.wait().await, WaitResult::Message(42)); 529 assert_eq!(sub1.next().await, WaitResult::Message(42));
499 530
500 assert_eq!(sub0.check(), None); 531 assert_eq!(sub0.try_next(), None);
501 assert_eq!(sub1.check(), None); 532 assert_eq!(sub1.try_next(), None);
502 } 533 }
503 534
504 #[futures_test::test] 535 #[futures_test::test]
@@ -515,12 +546,12 @@ mod tests {
515 pub0.publish_immediate(46); 546 pub0.publish_immediate(46);
516 pub0.publish_immediate(47); 547 pub0.publish_immediate(47);
517 548
518 assert_eq!(sub0.check(), Some(WaitResult::Lagged(2))); 549 assert_eq!(sub0.try_next(), Some(WaitResult::Lagged(2)));
519 assert_eq!(sub0.wait().await, WaitResult::Message(44)); 550 assert_eq!(sub0.next().await, WaitResult::Message(44));
520 assert_eq!(sub0.wait().await, WaitResult::Message(45)); 551 assert_eq!(sub0.next().await, WaitResult::Message(45));
521 assert_eq!(sub0.wait().await, WaitResult::Message(46)); 552 assert_eq!(sub0.next().await, WaitResult::Message(46));
522 assert_eq!(sub0.wait().await, WaitResult::Message(47)); 553 assert_eq!(sub0.next().await, WaitResult::Message(47));
523 assert_eq!(sub0.check(), None); 554 assert_eq!(sub0.try_next(), None);
524 } 555 }
525 556
526 #[test] 557 #[test]