diff options
| author | Ulf Lilleengen <[email protected]> | 2025-05-28 10:36:05 +0200 |
|---|---|---|
| committer | Ulf Lilleengen <[email protected]> | 2025-05-28 11:34:57 +0200 |
| commit | 042abc805a84d09231174e41edb0e498baaf7295 (patch) | |
| tree | bcde4ea3fbb5aacbd2e7bfaf31c6f064a5c3e314 /embassy-sync/src | |
| parent | 645883d8748995beb8b07f5cee93ac960f6d6a9f (diff) | |
feat: add support for channel peek
Add support for peeking into the front of the channel if the value
implements Clone. This can be useful in single-receiver situations where
you don't want to remove the item from the queue until you've
successfully processed it.
Diffstat (limited to 'embassy-sync/src')
| -rw-r--r-- | embassy-sync/src/channel.rs | 78 | ||||
| -rw-r--r-- | embassy-sync/src/priority_channel.rs | 64 |
2 files changed, 142 insertions, 0 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 4d1fa9e39..a229c52e7 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -208,6 +208,16 @@ where | |||
| 208 | self.channel.try_receive() | 208 | self.channel.try_receive() |
| 209 | } | 209 | } |
| 210 | 210 | ||
| 211 | /// Peek at the next value without removing it from the queue. | ||
| 212 | /// | ||
| 213 | /// See [`Channel::try_peek()`] | ||
| 214 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 215 | where | ||
| 216 | T: Clone, | ||
| 217 | { | ||
| 218 | self.channel.try_peek() | ||
| 219 | } | ||
| 220 | |||
| 211 | /// Allows a poll_fn to poll until the channel is ready to receive | 221 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 212 | /// | 222 | /// |
| 213 | /// See [`Channel::poll_ready_to_receive()`] | 223 | /// See [`Channel::poll_ready_to_receive()`] |
| @@ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> { | |||
| 293 | self.channel.try_receive_with_context(None) | 303 | self.channel.try_receive_with_context(None) |
| 294 | } | 304 | } |
| 295 | 305 | ||
| 306 | /// Peek at the next value without removing it from the queue. | ||
| 307 | /// | ||
| 308 | /// See [`Channel::try_peek()`] | ||
| 309 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 310 | where | ||
| 311 | T: Clone, | ||
| 312 | { | ||
| 313 | self.channel.try_peek_with_context(None) | ||
| 314 | } | ||
| 315 | |||
| 296 | /// Allows a poll_fn to poll until the channel is ready to receive | 316 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 297 | /// | 317 | /// |
| 298 | /// See [`Channel::poll_ready_to_receive()`] | 318 | /// See [`Channel::poll_ready_to_receive()`] |
| @@ -463,6 +483,10 @@ pub(crate) trait DynamicChannel<T> { | |||
| 463 | 483 | ||
| 464 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; | 484 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; |
| 465 | 485 | ||
| 486 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 487 | where | ||
| 488 | T: Clone; | ||
| 489 | |||
| 466 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; | 490 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; |
| 467 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; | 491 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; |
| 468 | 492 | ||
| @@ -505,6 +529,31 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 505 | self.try_receive_with_context(None) | 529 | self.try_receive_with_context(None) |
| 506 | } | 530 | } |
| 507 | 531 | ||
| 532 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 533 | where | ||
| 534 | T: Clone, | ||
| 535 | { | ||
| 536 | self.try_peek_with_context(None) | ||
| 537 | } | ||
| 538 | |||
| 539 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 540 | where | ||
| 541 | T: Clone, | ||
| 542 | { | ||
| 543 | if self.queue.is_full() { | ||
| 544 | self.senders_waker.wake(); | ||
| 545 | } | ||
| 546 | |||
| 547 | if let Some(message) = self.queue.front() { | ||
| 548 | Ok(message.clone()) | ||
| 549 | } else { | ||
| 550 | if let Some(cx) = cx { | ||
| 551 | self.receiver_waker.register(cx.waker()); | ||
| 552 | } | ||
| 553 | Err(TryReceiveError::Empty) | ||
| 554 | } | ||
| 555 | } | ||
| 556 | |||
| 508 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 557 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 509 | if self.queue.is_full() { | 558 | if self.queue.is_full() { |
| 510 | self.senders_waker.wake(); | 559 | self.senders_waker.wake(); |
| @@ -634,6 +683,13 @@ where | |||
| 634 | self.lock(|c| c.try_receive_with_context(cx)) | 683 | self.lock(|c| c.try_receive_with_context(cx)) |
| 635 | } | 684 | } |
| 636 | 685 | ||
| 686 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 687 | where | ||
| 688 | T: Clone, | ||
| 689 | { | ||
| 690 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 691 | } | ||
| 692 | |||
| 637 | /// Poll the channel for the next message | 693 | /// Poll the channel for the next message |
| 638 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 694 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 639 | self.lock(|c| c.poll_receive(cx)) | 695 | self.lock(|c| c.poll_receive(cx)) |
| @@ -722,6 +778,17 @@ where | |||
| 722 | self.lock(|c| c.try_receive()) | 778 | self.lock(|c| c.try_receive()) |
| 723 | } | 779 | } |
| 724 | 780 | ||
| 781 | /// Peek at the next value without removing it from the queue. | ||
| 782 | /// | ||
| 783 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 784 | /// an error if the channel is empty. | ||
| 785 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 786 | where | ||
| 787 | T: Clone, | ||
| 788 | { | ||
| 789 | self.lock(|c| c.try_peek()) | ||
| 790 | } | ||
| 791 | |||
| 725 | /// Returns the maximum number of elements the channel can hold. | 792 | /// Returns the maximum number of elements the channel can hold. |
| 726 | pub const fn capacity(&self) -> usize { | 793 | pub const fn capacity(&self) -> usize { |
| 727 | N | 794 | N |
| @@ -769,6 +836,13 @@ where | |||
| 769 | Channel::try_receive_with_context(self, cx) | 836 | Channel::try_receive_with_context(self, cx) |
| 770 | } | 837 | } |
| 771 | 838 | ||
| 839 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 840 | where | ||
| 841 | T: Clone, | ||
| 842 | { | ||
| 843 | Channel::try_peek_with_context(self, cx) | ||
| 844 | } | ||
| 845 | |||
| 772 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 846 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 773 | Channel::poll_ready_to_send(self, cx) | 847 | Channel::poll_ready_to_send(self, cx) |
| 774 | } | 848 | } |
| @@ -851,6 +925,8 @@ mod tests { | |||
| 851 | fn simple_send_and_receive() { | 925 | fn simple_send_and_receive() { |
| 852 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | 926 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
| 853 | assert!(c.try_send(1).is_ok()); | 927 | assert!(c.try_send(1).is_ok()); |
| 928 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 929 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 854 | assert_eq!(c.try_receive().unwrap(), 1); | 930 | assert_eq!(c.try_receive().unwrap(), 1); |
| 855 | } | 931 | } |
| 856 | 932 | ||
| @@ -881,6 +957,8 @@ mod tests { | |||
| 881 | let r = c.dyn_receiver(); | 957 | let r = c.dyn_receiver(); |
| 882 | 958 | ||
| 883 | assert!(s.try_send(1).is_ok()); | 959 | assert!(s.try_send(1).is_ok()); |
| 960 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 961 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 884 | assert_eq!(r.try_receive().unwrap(), 1); | 962 | assert_eq!(r.try_receive().unwrap(), 1); |
| 885 | } | 963 | } |
| 886 | 964 | ||
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 36959204f..623c52993 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs | |||
| @@ -175,6 +175,16 @@ where | |||
| 175 | self.channel.try_receive() | 175 | self.channel.try_receive() |
| 176 | } | 176 | } |
| 177 | 177 | ||
| 178 | /// Peek at the next value without removing it from the queue. | ||
| 179 | /// | ||
| 180 | /// See [`PriorityChannel::try_peek()`] | ||
| 181 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 182 | where | ||
| 183 | T: Clone, | ||
| 184 | { | ||
| 185 | self.channel.try_peek_with_context(None) | ||
| 186 | } | ||
| 187 | |||
| 178 | /// Allows a poll_fn to poll until the channel is ready to receive | 188 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 179 | /// | 189 | /// |
| 180 | /// See [`PriorityChannel::poll_ready_to_receive()`] | 190 | /// See [`PriorityChannel::poll_ready_to_receive()`] |
| @@ -343,6 +353,31 @@ where | |||
| 343 | self.try_receive_with_context(None) | 353 | self.try_receive_with_context(None) |
| 344 | } | 354 | } |
| 345 | 355 | ||
| 356 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 357 | where | ||
| 358 | T: Clone, | ||
| 359 | { | ||
| 360 | self.try_peek_with_context(None) | ||
| 361 | } | ||
| 362 | |||
| 363 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 364 | where | ||
| 365 | T: Clone, | ||
| 366 | { | ||
| 367 | if self.queue.len() == self.queue.capacity() { | ||
| 368 | self.senders_waker.wake(); | ||
| 369 | } | ||
| 370 | |||
| 371 | if let Some(message) = self.queue.peek() { | ||
| 372 | Ok(message.clone()) | ||
| 373 | } else { | ||
| 374 | if let Some(cx) = cx { | ||
| 375 | self.receiver_waker.register(cx.waker()); | ||
| 376 | } | ||
| 377 | Err(TryReceiveError::Empty) | ||
| 378 | } | ||
| 379 | } | ||
| 380 | |||
| 346 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 381 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 347 | if self.queue.len() == self.queue.capacity() { | 382 | if self.queue.len() == self.queue.capacity() { |
| 348 | self.senders_waker.wake(); | 383 | self.senders_waker.wake(); |
| @@ -478,6 +513,13 @@ where | |||
| 478 | self.lock(|c| c.try_receive_with_context(cx)) | 513 | self.lock(|c| c.try_receive_with_context(cx)) |
| 479 | } | 514 | } |
| 480 | 515 | ||
| 516 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 517 | where | ||
| 518 | T: Clone, | ||
| 519 | { | ||
| 520 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 521 | } | ||
| 522 | |||
| 481 | /// Poll the channel for the next message | 523 | /// Poll the channel for the next message |
| 482 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 524 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 483 | self.lock(|c| c.poll_receive(cx)) | 525 | self.lock(|c| c.poll_receive(cx)) |
| @@ -548,6 +590,17 @@ where | |||
| 548 | self.lock(|c| c.try_receive()) | 590 | self.lock(|c| c.try_receive()) |
| 549 | } | 591 | } |
| 550 | 592 | ||
| 593 | /// Peek at the next value without removing it from the queue. | ||
| 594 | /// | ||
| 595 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 596 | /// an error if the channel is empty. | ||
| 597 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 598 | where | ||
| 599 | T: Clone, | ||
| 600 | { | ||
| 601 | self.lock(|c| c.try_peek()) | ||
| 602 | } | ||
| 603 | |||
| 551 | /// Removes elements from the channel based on the given predicate. | 604 | /// Removes elements from the channel based on the given predicate. |
| 552 | pub fn remove_if<F>(&self, predicate: F) | 605 | pub fn remove_if<F>(&self, predicate: F) |
| 553 | where | 606 | where |
| @@ -617,6 +670,13 @@ where | |||
| 617 | PriorityChannel::try_receive_with_context(self, cx) | 670 | PriorityChannel::try_receive_with_context(self, cx) |
| 618 | } | 671 | } |
| 619 | 672 | ||
| 673 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 674 | where | ||
| 675 | T: Clone, | ||
| 676 | { | ||
| 677 | PriorityChannel::try_peek_with_context(self, cx) | ||
| 678 | } | ||
| 679 | |||
| 620 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 680 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 621 | PriorityChannel::poll_ready_to_send(self, cx) | 681 | PriorityChannel::poll_ready_to_send(self, cx) |
| 622 | } | 682 | } |
| @@ -705,6 +765,8 @@ mod tests { | |||
| 705 | fn simple_send_and_receive() { | 765 | fn simple_send_and_receive() { |
| 706 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); | 766 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); |
| 707 | assert!(c.try_send(1).is_ok()); | 767 | assert!(c.try_send(1).is_ok()); |
| 768 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 769 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 708 | assert_eq!(c.try_receive().unwrap(), 1); | 770 | assert_eq!(c.try_receive().unwrap(), 1); |
| 709 | } | 771 | } |
| 710 | 772 | ||
| @@ -725,6 +787,8 @@ mod tests { | |||
| 725 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | 787 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
| 726 | 788 | ||
| 727 | assert!(s.try_send(1).is_ok()); | 789 | assert!(s.try_send(1).is_ok()); |
| 790 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 791 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 728 | assert_eq!(r.try_receive().unwrap(), 1); | 792 | assert_eq!(r.try_receive().unwrap(), 1); |
| 729 | } | 793 | } |
| 730 | 794 | ||
