aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDion Dokter <[email protected]>2022-06-16 14:11:41 +0200
committerDion Dokter <[email protected]>2022-06-16 14:11:41 +0200
commit790426e2f67ac5e2197139c354c3c201d098a59c (patch)
tree2993d935bafdefcdef6e91c15de521f2f5d44a4a
parentc7cdecfc937185c9a8120ee1232c04cdedcab66d (diff)
Stream now ignores lag
-rw-r--r--embassy/src/channel/pubsub.rs12
1 files changed, 8 insertions, 4 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs
index d81e822a8..4c360fea9 100644
--- a/embassy/src/channel/pubsub.rs
+++ b/embassy/src/channel/pubsub.rs
@@ -300,8 +300,10 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> {
300 } 300 }
301} 301}
302 302
303/// Warning: The stream implementation ignores lag results and returns all messages.
304/// This might miss some messages without you knowing it.
303impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { 305impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
304 type Item = WaitResult<T>; 306 type Item = T;
305 307
306 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 308 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
307 let this = unsafe { self.get_unchecked_mut() }; 309 let this = unsafe { self.get_unchecked_mut() };
@@ -311,7 +313,7 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
311 // Yes, so we are done polling 313 // Yes, so we are done polling
312 Some(WaitResult::Message(message)) => { 314 Some(WaitResult::Message(message)) => {
313 this.next_message_id += 1; 315 this.next_message_id += 1;
314 Poll::Ready(Some(WaitResult::Message(message))) 316 Poll::Ready(Some(message))
315 } 317 }
316 // No, so we need to reregister our waker and sleep again 318 // No, so we need to reregister our waker and sleep again
317 None => { 319 None => {
@@ -321,10 +323,12 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
321 } 323 }
322 Poll::Pending 324 Poll::Pending
323 } 325 }
324 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged 326 // We missed a couple of messages. We must do our internal bookkeeping.
327 // This stream impl doesn't return lag results, so we just ignore and start over
325 Some(WaitResult::Lagged(amount)) => { 328 Some(WaitResult::Lagged(amount)) => {
326 this.next_message_id += amount; 329 this.next_message_id += amount;
327 Poll::Ready(Some(WaitResult::Lagged(amount))) 330 cx.waker().wake_by_ref();
331 Poll::Pending
328 } 332 }
329 } 333 }
330 } 334 }