aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorUlf Lilleengen <[email protected]>2025-05-28 12:26:06 +0200
committerGitHub <[email protected]>2025-05-28 12:26:06 +0200
commit58db2f7d94031879a3a6ebd86ad8825f235b2620 (patch)
treebcde4ea3fbb5aacbd2e7bfaf31c6f064a5c3e314 /embassy-sync
parent645883d8748995beb8b07f5cee93ac960f6d6a9f (diff)
parent042abc805a84d09231174e41edb0e498baaf7295 (diff)
Merge pull request #4263 from embassy-rs/channel-peek
feat: add support for channel peek
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/src/channel.rs78
-rw-r--r--embassy-sync/src/priority_channel.rs64
2 files changed, 142 insertions, 0 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 4d1fa9e39..a229c52e7 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -208,6 +208,16 @@ where
208 self.channel.try_receive() 208 self.channel.try_receive()
209 } 209 }
210 210
211 /// Peek at the next value without removing it from the queue.
212 ///
213 /// See [`Channel::try_peek()`]
214 pub fn try_peek(&self) -> Result<T, TryReceiveError>
215 where
216 T: Clone,
217 {
218 self.channel.try_peek()
219 }
220
211 /// Allows a poll_fn to poll until the channel is ready to receive 221 /// Allows a poll_fn to poll until the channel is ready to receive
212 /// 222 ///
213 /// See [`Channel::poll_ready_to_receive()`] 223 /// See [`Channel::poll_ready_to_receive()`]
@@ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
293 self.channel.try_receive_with_context(None) 303 self.channel.try_receive_with_context(None)
294 } 304 }
295 305
306 /// Peek at the next value without removing it from the queue.
307 ///
308 /// See [`Channel::try_peek()`]
309 pub fn try_peek(&self) -> Result<T, TryReceiveError>
310 where
311 T: Clone,
312 {
313 self.channel.try_peek_with_context(None)
314 }
315
296 /// Allows a poll_fn to poll until the channel is ready to receive 316 /// Allows a poll_fn to poll until the channel is ready to receive
297 /// 317 ///
298 /// See [`Channel::poll_ready_to_receive()`] 318 /// See [`Channel::poll_ready_to_receive()`]
@@ -463,6 +483,10 @@ pub(crate) trait DynamicChannel<T> {
463 483
464 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; 484 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
465 485
486 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
487 where
488 T: Clone;
489
466 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; 490 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
467 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; 491 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
468 492
@@ -505,6 +529,31 @@ impl<T, const N: usize> ChannelState<T, N> {
505 self.try_receive_with_context(None) 529 self.try_receive_with_context(None)
506 } 530 }
507 531
532 fn try_peek(&mut self) -> Result<T, TryReceiveError>
533 where
534 T: Clone,
535 {
536 self.try_peek_with_context(None)
537 }
538
539 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
540 where
541 T: Clone,
542 {
543 if self.queue.is_full() {
544 self.senders_waker.wake();
545 }
546
547 if let Some(message) = self.queue.front() {
548 Ok(message.clone())
549 } else {
550 if let Some(cx) = cx {
551 self.receiver_waker.register(cx.waker());
552 }
553 Err(TryReceiveError::Empty)
554 }
555 }
556
508 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 557 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
509 if self.queue.is_full() { 558 if self.queue.is_full() {
510 self.senders_waker.wake(); 559 self.senders_waker.wake();
@@ -634,6 +683,13 @@ where
634 self.lock(|c| c.try_receive_with_context(cx)) 683 self.lock(|c| c.try_receive_with_context(cx))
635 } 684 }
636 685
686 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
687 where
688 T: Clone,
689 {
690 self.lock(|c| c.try_peek_with_context(cx))
691 }
692
637 /// Poll the channel for the next message 693 /// Poll the channel for the next message
638 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 694 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
639 self.lock(|c| c.poll_receive(cx)) 695 self.lock(|c| c.poll_receive(cx))
@@ -722,6 +778,17 @@ where
722 self.lock(|c| c.try_receive()) 778 self.lock(|c| c.try_receive())
723 } 779 }
724 780
781 /// Peek at the next value without removing it from the queue.
782 ///
783 /// This method will either receive a copy of the message from the channel immediately or return
784 /// an error if the channel is empty.
785 pub fn try_peek(&self) -> Result<T, TryReceiveError>
786 where
787 T: Clone,
788 {
789 self.lock(|c| c.try_peek())
790 }
791
725 /// Returns the maximum number of elements the channel can hold. 792 /// Returns the maximum number of elements the channel can hold.
726 pub const fn capacity(&self) -> usize { 793 pub const fn capacity(&self) -> usize {
727 N 794 N
@@ -769,6 +836,13 @@ where
769 Channel::try_receive_with_context(self, cx) 836 Channel::try_receive_with_context(self, cx)
770 } 837 }
771 838
839 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
840 where
841 T: Clone,
842 {
843 Channel::try_peek_with_context(self, cx)
844 }
845
772 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 846 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
773 Channel::poll_ready_to_send(self, cx) 847 Channel::poll_ready_to_send(self, cx)
774 } 848 }
@@ -851,6 +925,8 @@ mod tests {
851 fn simple_send_and_receive() { 925 fn simple_send_and_receive() {
852 let c = Channel::<NoopRawMutex, u32, 3>::new(); 926 let c = Channel::<NoopRawMutex, u32, 3>::new();
853 assert!(c.try_send(1).is_ok()); 927 assert!(c.try_send(1).is_ok());
928 assert_eq!(c.try_peek().unwrap(), 1);
929 assert_eq!(c.try_peek().unwrap(), 1);
854 assert_eq!(c.try_receive().unwrap(), 1); 930 assert_eq!(c.try_receive().unwrap(), 1);
855 } 931 }
856 932
@@ -881,6 +957,8 @@ mod tests {
881 let r = c.dyn_receiver(); 957 let r = c.dyn_receiver();
882 958
883 assert!(s.try_send(1).is_ok()); 959 assert!(s.try_send(1).is_ok());
960 assert_eq!(r.try_peek().unwrap(), 1);
961 assert_eq!(r.try_peek().unwrap(), 1);
884 assert_eq!(r.try_receive().unwrap(), 1); 962 assert_eq!(r.try_receive().unwrap(), 1);
885 } 963 }
886 964
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 36959204f..623c52993 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -175,6 +175,16 @@ where
175 self.channel.try_receive() 175 self.channel.try_receive()
176 } 176 }
177 177
178 /// Peek at the next value without removing it from the queue.
179 ///
180 /// See [`PriorityChannel::try_peek()`]
181 pub fn try_peek(&self) -> Result<T, TryReceiveError>
182 where
183 T: Clone,
184 {
185 self.channel.try_peek_with_context(None)
186 }
187
178 /// Allows a poll_fn to poll until the channel is ready to receive 188 /// Allows a poll_fn to poll until the channel is ready to receive
179 /// 189 ///
180 /// See [`PriorityChannel::poll_ready_to_receive()`] 190 /// See [`PriorityChannel::poll_ready_to_receive()`]
@@ -343,6 +353,31 @@ where
343 self.try_receive_with_context(None) 353 self.try_receive_with_context(None)
344 } 354 }
345 355
356 fn try_peek(&mut self) -> Result<T, TryReceiveError>
357 where
358 T: Clone,
359 {
360 self.try_peek_with_context(None)
361 }
362
363 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364 where
365 T: Clone,
366 {
367 if self.queue.len() == self.queue.capacity() {
368 self.senders_waker.wake();
369 }
370
371 if let Some(message) = self.queue.peek() {
372 Ok(message.clone())
373 } else {
374 if let Some(cx) = cx {
375 self.receiver_waker.register(cx.waker());
376 }
377 Err(TryReceiveError::Empty)
378 }
379 }
380
346 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 381 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
347 if self.queue.len() == self.queue.capacity() { 382 if self.queue.len() == self.queue.capacity() {
348 self.senders_waker.wake(); 383 self.senders_waker.wake();
@@ -478,6 +513,13 @@ where
478 self.lock(|c| c.try_receive_with_context(cx)) 513 self.lock(|c| c.try_receive_with_context(cx))
479 } 514 }
480 515
516 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517 where
518 T: Clone,
519 {
520 self.lock(|c| c.try_peek_with_context(cx))
521 }
522
481 /// Poll the channel for the next message 523 /// Poll the channel for the next message
482 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 524 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
483 self.lock(|c| c.poll_receive(cx)) 525 self.lock(|c| c.poll_receive(cx))
@@ -548,6 +590,17 @@ where
548 self.lock(|c| c.try_receive()) 590 self.lock(|c| c.try_receive())
549 } 591 }
550 592
593 /// Peek at the next value without removing it from the queue.
594 ///
595 /// This method will either receive a copy of the message from the channel immediately or return
596 /// an error if the channel is empty.
597 pub fn try_peek(&self) -> Result<T, TryReceiveError>
598 where
599 T: Clone,
600 {
601 self.lock(|c| c.try_peek())
602 }
603
551 /// Removes elements from the channel based on the given predicate. 604 /// Removes elements from the channel based on the given predicate.
552 pub fn remove_if<F>(&self, predicate: F) 605 pub fn remove_if<F>(&self, predicate: F)
553 where 606 where
@@ -617,6 +670,13 @@ where
617 PriorityChannel::try_receive_with_context(self, cx) 670 PriorityChannel::try_receive_with_context(self, cx)
618 } 671 }
619 672
673 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674 where
675 T: Clone,
676 {
677 PriorityChannel::try_peek_with_context(self, cx)
678 }
679
620 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 680 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
621 PriorityChannel::poll_ready_to_send(self, cx) 681 PriorityChannel::poll_ready_to_send(self, cx)
622 } 682 }
@@ -705,6 +765,8 @@ mod tests {
705 fn simple_send_and_receive() { 765 fn simple_send_and_receive() {
706 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); 766 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
707 assert!(c.try_send(1).is_ok()); 767 assert!(c.try_send(1).is_ok());
768 assert_eq!(c.try_peek().unwrap(), 1);
769 assert_eq!(c.try_peek().unwrap(), 1);
708 assert_eq!(c.try_receive().unwrap(), 1); 770 assert_eq!(c.try_receive().unwrap(), 1);
709 } 771 }
710 772
@@ -725,6 +787,8 @@ mod tests {
725 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 787 let r: DynamicReceiver<'_, u32> = c.receiver().into();
726 788
727 assert!(s.try_send(1).is_ok()); 789 assert!(s.try_send(1).is_ok());
790 assert_eq!(r.try_peek().unwrap(), 1);
791 assert_eq!(r.try_peek().unwrap(), 1);
728 assert_eq!(r.try_receive().unwrap(), 1); 792 assert_eq!(r.try_receive().unwrap(), 1);
729 } 793 }
730 794