diff options
Diffstat (limited to 'embassy-sync/src/priority_channel.rs')
| -rw-r--r-- | embassy-sync/src/priority_channel.rs | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 36959204f..623c52993 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs | |||
| @@ -175,6 +175,16 @@ where | |||
| 175 | self.channel.try_receive() | 175 | self.channel.try_receive() |
| 176 | } | 176 | } |
| 177 | 177 | ||
| 178 | /// Peek at the next value without removing it from the queue. | ||
| 179 | /// | ||
| 180 | /// See [`PriorityChannel::try_peek()`] | ||
| 181 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 182 | where | ||
| 183 | T: Clone, | ||
| 184 | { | ||
| 185 | self.channel.try_peek_with_context(None) | ||
| 186 | } | ||
| 187 | |||
| 178 | /// Allows a poll_fn to poll until the channel is ready to receive | 188 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 179 | /// | 189 | /// |
| 180 | /// See [`PriorityChannel::poll_ready_to_receive()`] | 190 | /// See [`PriorityChannel::poll_ready_to_receive()`] |
| @@ -343,6 +353,31 @@ where | |||
| 343 | self.try_receive_with_context(None) | 353 | self.try_receive_with_context(None) |
| 344 | } | 354 | } |
| 345 | 355 | ||
| 356 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 357 | where | ||
| 358 | T: Clone, | ||
| 359 | { | ||
| 360 | self.try_peek_with_context(None) | ||
| 361 | } | ||
| 362 | |||
| 363 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 364 | where | ||
| 365 | T: Clone, | ||
| 366 | { | ||
| 367 | if self.queue.len() == self.queue.capacity() { | ||
| 368 | self.senders_waker.wake(); | ||
| 369 | } | ||
| 370 | |||
| 371 | if let Some(message) = self.queue.peek() { | ||
| 372 | Ok(message.clone()) | ||
| 373 | } else { | ||
| 374 | if let Some(cx) = cx { | ||
| 375 | self.receiver_waker.register(cx.waker()); | ||
| 376 | } | ||
| 377 | Err(TryReceiveError::Empty) | ||
| 378 | } | ||
| 379 | } | ||
| 380 | |||
| 346 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 381 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 347 | if self.queue.len() == self.queue.capacity() { | 382 | if self.queue.len() == self.queue.capacity() { |
| 348 | self.senders_waker.wake(); | 383 | self.senders_waker.wake(); |
| @@ -478,6 +513,13 @@ where | |||
| 478 | self.lock(|c| c.try_receive_with_context(cx)) | 513 | self.lock(|c| c.try_receive_with_context(cx)) |
| 479 | } | 514 | } |
| 480 | 515 | ||
| 516 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 517 | where | ||
| 518 | T: Clone, | ||
| 519 | { | ||
| 520 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 521 | } | ||
| 522 | |||
| 481 | /// Poll the channel for the next message | 523 | /// Poll the channel for the next message |
| 482 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 524 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 483 | self.lock(|c| c.poll_receive(cx)) | 525 | self.lock(|c| c.poll_receive(cx)) |
| @@ -548,6 +590,17 @@ where | |||
| 548 | self.lock(|c| c.try_receive()) | 590 | self.lock(|c| c.try_receive()) |
| 549 | } | 591 | } |
| 550 | 592 | ||
| 593 | /// Peek at the next value without removing it from the queue. | ||
| 594 | /// | ||
| 595 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 596 | /// an error if the channel is empty. | ||
| 597 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 598 | where | ||
| 599 | T: Clone, | ||
| 600 | { | ||
| 601 | self.lock(|c| c.try_peek()) | ||
| 602 | } | ||
| 603 | |||
| 551 | /// Removes elements from the channel based on the given predicate. | 604 | /// Removes elements from the channel based on the given predicate. |
| 552 | pub fn remove_if<F>(&self, predicate: F) | 605 | pub fn remove_if<F>(&self, predicate: F) |
| 553 | where | 606 | where |
| @@ -617,6 +670,13 @@ where | |||
| 617 | PriorityChannel::try_receive_with_context(self, cx) | 670 | PriorityChannel::try_receive_with_context(self, cx) |
| 618 | } | 671 | } |
| 619 | 672 | ||
| 673 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 674 | where | ||
| 675 | T: Clone, | ||
| 676 | { | ||
| 677 | PriorityChannel::try_peek_with_context(self, cx) | ||
| 678 | } | ||
| 679 | |||
| 620 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 680 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 621 | PriorityChannel::poll_ready_to_send(self, cx) | 681 | PriorityChannel::poll_ready_to_send(self, cx) |
| 622 | } | 682 | } |
| @@ -705,6 +765,8 @@ mod tests { | |||
| 705 | fn simple_send_and_receive() { | 765 | fn simple_send_and_receive() { |
| 706 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); | 766 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); |
| 707 | assert!(c.try_send(1).is_ok()); | 767 | assert!(c.try_send(1).is_ok()); |
| 768 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 769 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 708 | assert_eq!(c.try_receive().unwrap(), 1); | 770 | assert_eq!(c.try_receive().unwrap(), 1); |
| 709 | } | 771 | } |
| 710 | 772 | ||
| @@ -725,6 +787,8 @@ mod tests { | |||
| 725 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | 787 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
| 726 | 788 | ||
| 727 | assert!(s.try_send(1).is_ok()); | 789 | assert!(s.try_send(1).is_ok()); |
| 790 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 791 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 728 | assert_eq!(r.try_receive().unwrap(), 1); | 792 | assert_eq!(r.try_receive().unwrap(), 1); |
| 729 | } | 793 | } |
| 730 | 794 | ||
