diff options
| author | Dion Dokter <[email protected]> | 2022-06-16 14:11:41 +0200 |
|---|---|---|
| committer | Dion Dokter <[email protected]> | 2022-06-16 14:11:41 +0200 |
| commit | 790426e2f67ac5e2197139c354c3c201d098a59c (patch) | |
| tree | 2993d935bafdefcdef6e91c15de521f2f5d44a4a | |
| parent | c7cdecfc937185c9a8120ee1232c04cdedcab66d (diff) | |
Stream now ignores lag
| -rw-r--r-- | embassy/src/channel/pubsub.rs | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index d81e822a8..4c360fea9 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs | |||
| @@ -300,8 +300,10 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> { | |||
| 300 | } | 300 | } |
| 301 | } | 301 | } |
| 302 | 302 | ||
| 303 | /// Warning: The stream implementation ignores lag results and returns all messages. | ||
| 304 | /// This might miss some messages without you knowing it. | ||
| 303 | impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | 305 | impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { |
| 304 | type Item = WaitResult<T>; | 306 | type Item = T; |
| 305 | 307 | ||
| 306 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | 308 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 307 | let this = unsafe { self.get_unchecked_mut() }; | 309 | let this = unsafe { self.get_unchecked_mut() }; |
| @@ -311,7 +313,7 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | |||
| 311 | // Yes, so we are done polling | 313 | // Yes, so we are done polling |
| 312 | Some(WaitResult::Message(message)) => { | 314 | Some(WaitResult::Message(message)) => { |
| 313 | this.next_message_id += 1; | 315 | this.next_message_id += 1; |
| 314 | Poll::Ready(Some(WaitResult::Message(message))) | 316 | Poll::Ready(Some(message)) |
| 315 | } | 317 | } |
| 316 | // No, so we need to reregister our waker and sleep again | 318 | // No, so we need to reregister our waker and sleep again |
| 317 | None => { | 319 | None => { |
| @@ -321,10 +323,12 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | |||
| 321 | } | 323 | } |
| 322 | Poll::Pending | 324 | Poll::Pending |
| 323 | } | 325 | } |
| 324 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged | 326 | // We missed a couple of messages. We must do our internal bookkeeping. |
| 327 | // This stream impl doesn't return lag results, so we just ignore and start over | ||
| 325 | Some(WaitResult::Lagged(amount)) => { | 328 | Some(WaitResult::Lagged(amount)) => { |
| 326 | this.next_message_id += amount; | 329 | this.next_message_id += amount; |
| 327 | Poll::Ready(Some(WaitResult::Lagged(amount))) | 330 | cx.waker().wake_by_ref(); |
| 331 | Poll::Pending | ||
| 328 | } | 332 | } |
| 329 | } | 333 | } |
| 330 | } | 334 | } |
