aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/priority_channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src/priority_channel.rs')
-rw-r--r--embassy-sync/src/priority_channel.rs64
1 files changed, 64 insertions, 0 deletions
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 36959204f..623c52993 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -175,6 +175,16 @@ where
175 self.channel.try_receive() 175 self.channel.try_receive()
176 } 176 }
177 177
178 /// Peek at the next value without removing it from the queue.
179 ///
180 /// See [`PriorityChannel::try_peek()`]
181 pub fn try_peek(&self) -> Result<T, TryReceiveError>
182 where
183 T: Clone,
184 {
185 self.channel.try_peek_with_context(None)
186 }
187
178 /// Allows a poll_fn to poll until the channel is ready to receive 188 /// Allows a poll_fn to poll until the channel is ready to receive
179 /// 189 ///
180 /// See [`PriorityChannel::poll_ready_to_receive()`] 190 /// See [`PriorityChannel::poll_ready_to_receive()`]
@@ -343,6 +353,31 @@ where
343 self.try_receive_with_context(None) 353 self.try_receive_with_context(None)
344 } 354 }
345 355
356 fn try_peek(&mut self) -> Result<T, TryReceiveError>
357 where
358 T: Clone,
359 {
360 self.try_peek_with_context(None)
361 }
362
363 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364 where
365 T: Clone,
366 {
367 if self.queue.len() == self.queue.capacity() {
368 self.senders_waker.wake();
369 }
370
371 if let Some(message) = self.queue.peek() {
372 Ok(message.clone())
373 } else {
374 if let Some(cx) = cx {
375 self.receiver_waker.register(cx.waker());
376 }
377 Err(TryReceiveError::Empty)
378 }
379 }
380
346 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 381 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
347 if self.queue.len() == self.queue.capacity() { 382 if self.queue.len() == self.queue.capacity() {
348 self.senders_waker.wake(); 383 self.senders_waker.wake();
@@ -478,6 +513,13 @@ where
478 self.lock(|c| c.try_receive_with_context(cx)) 513 self.lock(|c| c.try_receive_with_context(cx))
479 } 514 }
480 515
516 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517 where
518 T: Clone,
519 {
520 self.lock(|c| c.try_peek_with_context(cx))
521 }
522
481 /// Poll the channel for the next message 523 /// Poll the channel for the next message
482 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { 524 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
483 self.lock(|c| c.poll_receive(cx)) 525 self.lock(|c| c.poll_receive(cx))
@@ -548,6 +590,17 @@ where
548 self.lock(|c| c.try_receive()) 590 self.lock(|c| c.try_receive())
549 } 591 }
550 592
593 /// Peek at the next value without removing it from the queue.
594 ///
595 /// This method will either receive a copy of the message from the channel immediately or return
596 /// an error if the channel is empty.
597 pub fn try_peek(&self) -> Result<T, TryReceiveError>
598 where
599 T: Clone,
600 {
601 self.lock(|c| c.try_peek())
602 }
603
551 /// Removes elements from the channel based on the given predicate. 604 /// Removes elements from the channel based on the given predicate.
552 pub fn remove_if<F>(&self, predicate: F) 605 pub fn remove_if<F>(&self, predicate: F)
553 where 606 where
@@ -617,6 +670,13 @@ where
617 PriorityChannel::try_receive_with_context(self, cx) 670 PriorityChannel::try_receive_with_context(self, cx)
618 } 671 }
619 672
673 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674 where
675 T: Clone,
676 {
677 PriorityChannel::try_peek_with_context(self, cx)
678 }
679
620 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { 680 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
621 PriorityChannel::poll_ready_to_send(self, cx) 681 PriorityChannel::poll_ready_to_send(self, cx)
622 } 682 }
@@ -705,6 +765,8 @@ mod tests {
705 fn simple_send_and_receive() { 765 fn simple_send_and_receive() {
706 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); 766 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
707 assert!(c.try_send(1).is_ok()); 767 assert!(c.try_send(1).is_ok());
768 assert_eq!(c.try_peek().unwrap(), 1);
769 assert_eq!(c.try_peek().unwrap(), 1);
708 assert_eq!(c.try_receive().unwrap(), 1); 770 assert_eq!(c.try_receive().unwrap(), 1);
709 } 771 }
710 772
@@ -725,6 +787,8 @@ mod tests {
725 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 787 let r: DynamicReceiver<'_, u32> = c.receiver().into();
726 788
727 assert!(s.try_send(1).is_ok()); 789 assert!(s.try_send(1).is_ok());
790 assert_eq!(r.try_peek().unwrap(), 1);
791 assert_eq!(r.try_peek().unwrap(), 1);
728 assert_eq!(r.try_receive().unwrap(), 1); 792 assert_eq!(r.try_receive().unwrap(), 1);
729 } 793 }
730 794