aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/channel.rs
diff options
context:
space:
mode:
authorUlf Lilleengen <[email protected]>2025-05-28 10:36:05 +0200
committerUlf Lilleengen <[email protected]>2025-05-28 11:34:57 +0200
commit042abc805a84d09231174e41edb0e498baaf7295 (patch)
treebcde4ea3fbb5aacbd2e7bfaf31c6f064a5c3e314 /embassy-sync/src/channel.rs
parent645883d8748995beb8b07f5cee93ac960f6d6a9f (diff)
feat: add support for channel peek
Add support for peeking into the front of the channel if the value implements Clone. This can be useful in single-receiver situations where you don't want to remove the item from the queue until you've successfully processed it.
Diffstat (limited to 'embassy-sync/src/channel.rs')
-rw-r--r--embassy-sync/src/channel.rs78
1 files changed, 78 insertions, 0 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 4d1fa9e39..a229c52e7 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -208,6 +208,16 @@ where
208 self.channel.try_receive() 208 self.channel.try_receive()
209 } 209 }
210 210
211 /// Peek at the next value without removing it from the queue.
212 ///
213 /// See [`Channel::try_peek()`]
214 pub fn try_peek(&self) -> Result<T, TryReceiveError>
215 where
216 T: Clone,
217 {
218 self.channel.try_peek()
219 }
220
211 /// Allows a poll_fn to poll until the channel is ready to receive 221 /// Allows a poll_fn to poll until the channel is ready to receive
212 /// 222 ///
213 /// See [`Channel::poll_ready_to_receive()`] 223 /// See [`Channel::poll_ready_to_receive()`]
@@ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
293 self.channel.try_receive_with_context(None) 303 self.channel.try_receive_with_context(None)
294 } 304 }
295 305
306 /// Peek at the next value without removing it from the queue.
307 ///
308 /// See [`Channel::try_peek()`]
309 pub fn try_peek(&self) -> Result<T, TryReceiveError>
310 where
311 T: Clone,
312 {
313 self.channel.try_peek_with_context(None)
314 }
315
296 /// Allows a poll_fn to poll until the channel is ready to receive 316 /// Allows a poll_fn to poll until the channel is ready to receive
297 /// 317 ///
298 /// See [`Channel::poll_ready_to_receive()`] 318 /// See [`Channel::poll_ready_to_receive()`]
@@ -463,6 +483,10 @@ pub(crate) trait DynamicChannel<T> {
463 483
464 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; 484 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
465 485
486 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
487 where
488 T: Clone;
489
466 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; 490 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
467 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; 491 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
468 492
@@ -505,6 +529,31 @@ impl<T, const N: usize> ChannelState<T, N> {
505 self.try_receive_with_context(None) 529 self.try_receive_with_context(None)
506 } 530 }
507 531
532 fn try_peek(&mut self) -> Result<T, TryReceiveError>
533 where
534 T: Clone,
535 {
536 self.try_peek_with_context(None)
537 }
538
539 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
540 where
541 T: Clone,
542 {
543 if self.queue.is_full() {
544 self.senders_waker.wake();
545 }
546
547 if let Some(message) = self.queue.front() {
548 Ok(message.clone())
549 } else {
550 if let Some(cx) = cx {
551 self.receiver_waker.register(cx.waker());
552 }
553 Err(TryReceiveError::Empty)
554 }
555 }
556
508 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 557 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
509 if self.queue.is_full() { 558 if self.queue.is_full() {
510 self.senders_waker.wake(); 559 self.senders_waker.wake();
@@ -634,6 +683,13 @@ where
634 self.lock(|c| c.try_receive_with_context(cx)) 683 self.lock(|c| c.try_receive_with_context(cx))
635 } 684 }
636 685
686 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
687 where
688 T: Clone,
689 {
690 self.lock(|c| c.try_peek_with_context(cx))
691 }
692
637 /// Poll the channel for the next message 693 /// Poll the channel for the next message
638 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 694 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
639 self.lock(|c| c.poll_receive(cx)) 695 self.lock(|c| c.poll_receive(cx))
@@ -722,6 +778,17 @@ where
722 self.lock(|c| c.try_receive()) 778 self.lock(|c| c.try_receive())
723 } 779 }
724 780
781 /// Peek at the next value without removing it from the queue.
782 ///
783 /// This method will either receive a copy of the message from the channel immediately or return
784 /// an error if the channel is empty.
785 pub fn try_peek(&self) -> Result<T, TryReceiveError>
786 where
787 T: Clone,
788 {
789 self.lock(|c| c.try_peek())
790 }
791
725 /// Returns the maximum number of elements the channel can hold. 792 /// Returns the maximum number of elements the channel can hold.
726 pub const fn capacity(&self) -> usize { 793 pub const fn capacity(&self) -> usize {
727 N 794 N
@@ -769,6 +836,13 @@ where
769 Channel::try_receive_with_context(self, cx) 836 Channel::try_receive_with_context(self, cx)
770 } 837 }
771 838
839 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
840 where
841 T: Clone,
842 {
843 Channel::try_peek_with_context(self, cx)
844 }
845
772 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 846 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
773 Channel::poll_ready_to_send(self, cx) 847 Channel::poll_ready_to_send(self, cx)
774 } 848 }
@@ -851,6 +925,8 @@ mod tests {
851 fn simple_send_and_receive() { 925 fn simple_send_and_receive() {
852 let c = Channel::<NoopRawMutex, u32, 3>::new(); 926 let c = Channel::<NoopRawMutex, u32, 3>::new();
853 assert!(c.try_send(1).is_ok()); 927 assert!(c.try_send(1).is_ok());
928 assert_eq!(c.try_peek().unwrap(), 1);
929 assert_eq!(c.try_peek().unwrap(), 1);
854 assert_eq!(c.try_receive().unwrap(), 1); 930 assert_eq!(c.try_receive().unwrap(), 1);
855 } 931 }
856 932
@@ -881,6 +957,8 @@ mod tests {
881 let r = c.dyn_receiver(); 957 let r = c.dyn_receiver();
882 958
883 assert!(s.try_send(1).is_ok()); 959 assert!(s.try_send(1).is_ok());
960 assert_eq!(r.try_peek().unwrap(), 1);
961 assert_eq!(r.try_peek().unwrap(), 1);
884 assert_eq!(r.try_receive().unwrap(), 1); 962 assert_eq!(r.try_receive().unwrap(), 1);
885 } 963 }
886 964