aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-sync/src/channel.rs2
-rw-r--r--embassy-sync/src/priority_channel.rs100
2 files changed, 41 insertions, 61 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index a512e0c41..aa267471c 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -321,7 +321,7 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
321 321
322impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} 322impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
323 323
324trait DynamicChannel<T> { 324pub(crate) trait DynamicChannel<T> {
325 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; 325 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
326 326
327 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; 327 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index 04148b56f..13c407c21 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -7,17 +7,19 @@ use core::future::Future;
7use core::pin::Pin; 7use core::pin::Pin;
8use core::task::{Context, Poll}; 8use core::task::{Context, Poll};
9 9
10use heapless::binary_heap::Kind;
10use heapless::BinaryHeap; 11use heapless::BinaryHeap;
11 12
12use crate::blocking_mutex::raw::RawMutex; 13use crate::blocking_mutex::raw::RawMutex;
13use crate::blocking_mutex::Mutex; 14use crate::blocking_mutex::Mutex;
15use crate::channel::{DynamicChannel, TryReceiveError, TrySendError};
14use crate::waitqueue::WakerRegistration; 16use crate::waitqueue::WakerRegistration;
15 17
16/// Send-only access to a [`PriorityChannel`]. 18/// Send-only access to a [`PriorityChannel`].
17pub struct Sender<'ch, M, T, K, const N: usize> 19pub struct Sender<'ch, M, T, K, const N: usize>
18where 20where
19 T: Ord, 21 T: Ord,
20 K: heapless::binary_heap::Kind, 22 K: Kind,
21 M: RawMutex, 23 M: RawMutex,
22{ 24{
23 channel: &'ch PriorityChannel<M, T, K, N>, 25 channel: &'ch PriorityChannel<M, T, K, N>,
@@ -26,7 +28,7 @@ where
26impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N> 28impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
27where 29where
28 T: Ord, 30 T: Ord,
29 K: heapless::binary_heap::Kind, 31 K: Kind,
30 M: RawMutex, 32 M: RawMutex,
31{ 33{
32 fn clone(&self) -> Self { 34 fn clone(&self) -> Self {
@@ -37,7 +39,7 @@ where
37impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N> 39impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
38where 40where
39 T: Ord, 41 T: Ord,
40 K: heapless::binary_heap::Kind, 42 K: Kind,
41 M: RawMutex, 43 M: RawMutex,
42{ 44{
43} 45}
@@ -45,7 +47,7 @@ where
45impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N> 47impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
46where 48where
47 T: Ord, 49 T: Ord,
48 K: heapless::binary_heap::Kind, 50 K: Kind,
49 M: RawMutex, 51 M: RawMutex,
50{ 52{
51 /// Sends a value. 53 /// Sends a value.
@@ -86,7 +88,7 @@ impl<'ch, T> Copy for DynamicSender<'ch, T> {}
86impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> 88impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
87where 89where
88 T: Ord, 90 T: Ord,
89 K: heapless::binary_heap::Kind, 91 K: Kind,
90 M: RawMutex, 92 M: RawMutex,
91{ 93{
92 fn from(s: Sender<'ch, M, T, K, N>) -> Self { 94 fn from(s: Sender<'ch, M, T, K, N>) -> Self {
@@ -124,7 +126,7 @@ impl<'ch, T> DynamicSender<'ch, T> {
124pub struct Receiver<'ch, M, T, K, const N: usize> 126pub struct Receiver<'ch, M, T, K, const N: usize>
125where 127where
126 T: Ord, 128 T: Ord,
127 K: heapless::binary_heap::Kind, 129 K: Kind,
128 M: RawMutex, 130 M: RawMutex,
129{ 131{
130 channel: &'ch PriorityChannel<M, T, K, N>, 132 channel: &'ch PriorityChannel<M, T, K, N>,
@@ -133,7 +135,7 @@ where
133impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N> 135impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
134where 136where
135 T: Ord, 137 T: Ord,
136 K: heapless::binary_heap::Kind, 138 K: Kind,
137 M: RawMutex, 139 M: RawMutex,
138{ 140{
139 fn clone(&self) -> Self { 141 fn clone(&self) -> Self {
@@ -144,7 +146,7 @@ where
144impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N> 146impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
145where 147where
146 T: Ord, 148 T: Ord,
147 K: heapless::binary_heap::Kind, 149 K: Kind,
148 M: RawMutex, 150 M: RawMutex,
149{ 151{
150} 152}
@@ -152,7 +154,7 @@ where
152impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N> 154impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
153where 155where
154 T: Ord, 156 T: Ord,
155 K: heapless::binary_heap::Kind, 157 K: Kind,
156 M: RawMutex, 158 M: RawMutex,
157{ 159{
158 /// Receive the next value. 160 /// Receive the next value.
@@ -230,7 +232,7 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
230impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> 232impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
231where 233where
232 T: Ord, 234 T: Ord,
233 K: heapless::binary_heap::Kind, 235 K: Kind,
234 M: RawMutex, 236 M: RawMutex,
235{ 237{
236 fn from(s: Receiver<'ch, M, T, K, N>) -> Self { 238 fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
@@ -243,7 +245,7 @@ where
243pub struct ReceiveFuture<'ch, M, T, K, const N: usize> 245pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
244where 246where
245 T: Ord, 247 T: Ord,
246 K: heapless::binary_heap::Kind, 248 K: Kind,
247 M: RawMutex, 249 M: RawMutex,
248{ 250{
249 channel: &'ch PriorityChannel<M, T, K, N>, 251 channel: &'ch PriorityChannel<M, T, K, N>,
@@ -252,7 +254,7 @@ where
252impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N> 254impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
253where 255where
254 T: Ord, 256 T: Ord,
255 K: heapless::binary_heap::Kind, 257 K: Kind,
256 M: RawMutex, 258 M: RawMutex,
257{ 259{
258 type Output = T; 260 type Output = T;
@@ -284,7 +286,7 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
284pub struct SendFuture<'ch, M, T, K, const N: usize> 286pub struct SendFuture<'ch, M, T, K, const N: usize>
285where 287where
286 T: Ord, 288 T: Ord,
287 K: heapless::binary_heap::Kind, 289 K: Kind,
288 M: RawMutex, 290 M: RawMutex,
289{ 291{
290 channel: &'ch PriorityChannel<M, T, K, N>, 292 channel: &'ch PriorityChannel<M, T, K, N>,
@@ -294,7 +296,7 @@ where
294impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N> 296impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
295where 297where
296 T: Ord, 298 T: Ord,
297 K: heapless::binary_heap::Kind, 299 K: Kind,
298 M: RawMutex, 300 M: RawMutex,
299{ 301{
300 type Output = (); 302 type Output = ();
@@ -316,7 +318,7 @@ where
316impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N> 318impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
317where 319where
318 T: Ord, 320 T: Ord,
319 K: heapless::binary_heap::Kind, 321 K: Kind,
320 M: RawMutex, 322 M: RawMutex,
321{ 323{
322} 324}
@@ -347,34 +349,6 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
347 349
348impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} 350impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
349 351
350trait DynamicChannel<T> {
351 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
352
353 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
354
355 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
356 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
357
358 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
359}
360
361/// Error returned by [`try_receive`](PriorityChannel::try_receive).
362#[derive(PartialEq, Eq, Clone, Copy, Debug)]
363#[cfg_attr(feature = "defmt", derive(defmt::Format))]
364pub enum TryReceiveError {
365 /// A message could not be received because the channel is empty.
366 Empty,
367}
368
369/// Error returned by [`try_send`](PriorityChannel::try_send).
370#[derive(PartialEq, Eq, Clone, Copy, Debug)]
371#[cfg_attr(feature = "defmt", derive(defmt::Format))]
372pub enum TrySendError<T> {
373 /// The data could not be sent on the channel because the channel is
374 /// currently full and sending would require blocking.
375 Full(T),
376}
377
378struct ChannelState<T, K, const N: usize> { 352struct ChannelState<T, K, const N: usize> {
379 queue: BinaryHeap<T, K, N>, 353 queue: BinaryHeap<T, K, N>,
380 receiver_waker: WakerRegistration, 354 receiver_waker: WakerRegistration,
@@ -384,7 +358,7 @@ struct ChannelState<T, K, const N: usize> {
384impl<T, K, const N: usize> ChannelState<T, K, N> 358impl<T, K, const N: usize> ChannelState<T, K, N>
385where 359where
386 T: Ord, 360 T: Ord,
387 K: heapless::binary_heap::Kind, 361 K: Kind,
388{ 362{
389 const fn new() -> Self { 363 const fn new() -> Self {
390 ChannelState { 364 ChannelState {
@@ -477,7 +451,7 @@ where
477pub struct PriorityChannel<M, T, K, const N: usize> 451pub struct PriorityChannel<M, T, K, const N: usize>
478where 452where
479 T: Ord, 453 T: Ord,
480 K: heapless::binary_heap::Kind, 454 K: Kind,
481 M: RawMutex, 455 M: RawMutex,
482{ 456{
483 inner: Mutex<M, RefCell<ChannelState<T, K, N>>>, 457 inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
@@ -486,17 +460,18 @@ where
486impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N> 460impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
487where 461where
488 T: Ord, 462 T: Ord,
489 K: heapless::binary_heap::Kind, 463 K: Kind,
490 M: RawMutex, 464 M: RawMutex,
491{ 465{
492 /// Establish a new bounded channel. For example, to create one with a NoopMutex: 466 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
493 /// 467 ///
494 /// ``` 468 /// ```
495 /// use embassy_sync::channel::PriorityChannel; 469 /// # use heapless::binary_heap::Max;
470 /// use embassy_sync::priority_channel::PriorityChannel;
496 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; 471 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
497 /// 472 ///
498 /// // Declare a bounded channel of 3 u32s. 473 /// // Declare a bounded channel of 3 u32s.
499 /// let mut channel = PriorityChannel::<NoopRawMutex, u32, 3>::new(); 474 /// let mut channel = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
500 /// ``` 475 /// ```
501 pub const fn new() -> Self { 476 pub const fn new() -> Self {
502 Self { 477 Self {
@@ -588,7 +563,7 @@ where
588impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N> 563impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
589where 564where
590 T: Ord, 565 T: Ord,
591 K: heapless::binary_heap::Kind, 566 K: Kind,
592 M: RawMutex, 567 M: RawMutex,
593{ 568{
594 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { 569 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
@@ -619,25 +594,30 @@ mod tests {
619 use futures_executor::ThreadPool; 594 use futures_executor::ThreadPool;
620 use futures_timer::Delay; 595 use futures_timer::Delay;
621 use futures_util::task::SpawnExt; 596 use futures_util::task::SpawnExt;
597 use heapless::binary_heap::{Kind, Max};
622 use static_cell::StaticCell; 598 use static_cell::StaticCell;
623 599
624 use super::*; 600 use super::*;
625 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; 601 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
626 602
627 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { 603 fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
604 where
605 T: Ord,
606 K: Kind,
607 {
628 c.queue.capacity() - c.queue.len() 608 c.queue.capacity() - c.queue.len()
629 } 609 }
630 610
631 #[test] 611 #[test]
632 fn sending_once() { 612 fn sending_once() {
633 let mut c = ChannelState::<u32, 3>::new(); 613 let mut c = ChannelState::<u32, Max, 3>::new();
634 assert!(c.try_send(1).is_ok()); 614 assert!(c.try_send(1).is_ok());
635 assert_eq!(capacity(&c), 2); 615 assert_eq!(capacity(&c), 2);
636 } 616 }
637 617
638 #[test] 618 #[test]
639 fn sending_when_full() { 619 fn sending_when_full() {
640 let mut c = ChannelState::<u32, 3>::new(); 620 let mut c = ChannelState::<u32, Max, 3>::new();
641 let _ = c.try_send(1); 621 let _ = c.try_send(1);
642 let _ = c.try_send(1); 622 let _ = c.try_send(1);
643 let _ = c.try_send(1); 623 let _ = c.try_send(1);
@@ -650,7 +630,7 @@ mod tests {
650 630
651 #[test] 631 #[test]
652 fn receiving_once_with_one_send() { 632 fn receiving_once_with_one_send() {
653 let mut c = ChannelState::<u32, 3>::new(); 633 let mut c = ChannelState::<u32, Max, 3>::new();
654 assert!(c.try_send(1).is_ok()); 634 assert!(c.try_send(1).is_ok());
655 assert_eq!(c.try_receive().unwrap(), 1); 635 assert_eq!(c.try_receive().unwrap(), 1);
656 assert_eq!(capacity(&c), 3); 636 assert_eq!(capacity(&c), 3);
@@ -658,7 +638,7 @@ mod tests {
658 638
659 #[test] 639 #[test]
660 fn receiving_when_empty() { 640 fn receiving_when_empty() {
661 let mut c = ChannelState::<u32, 3>::new(); 641 let mut c = ChannelState::<u32, Max, 3>::new();
662 match c.try_receive() { 642 match c.try_receive() {
663 Err(TryReceiveError::Empty) => assert!(true), 643 Err(TryReceiveError::Empty) => assert!(true),
664 _ => assert!(false), 644 _ => assert!(false),
@@ -668,14 +648,14 @@ mod tests {
668 648
669 #[test] 649 #[test]
670 fn simple_send_and_receive() { 650 fn simple_send_and_receive() {
671 let c = PriorityChannel::<NoopRawMutex, u32, 3>::new(); 651 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
672 assert!(c.try_send(1).is_ok()); 652 assert!(c.try_send(1).is_ok());
673 assert_eq!(c.try_receive().unwrap(), 1); 653 assert_eq!(c.try_receive().unwrap(), 1);
674 } 654 }
675 655
676 #[test] 656 #[test]
677 fn cloning() { 657 fn cloning() {
678 let c = PriorityChannel::<NoopRawMutex, u32, 3>::new(); 658 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
679 let r1 = c.receiver(); 659 let r1 = c.receiver();
680 let s1 = c.sender(); 660 let s1 = c.sender();
681 661
@@ -685,7 +665,7 @@ mod tests {
685 665
686 #[test] 666 #[test]
687 fn dynamic_dispatch() { 667 fn dynamic_dispatch() {
688 let c = PriorityChannel::<NoopRawMutex, u32, 3>::new(); 668 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
689 let s: DynamicSender<'_, u32> = c.sender().into(); 669 let s: DynamicSender<'_, u32> = c.sender().into();
690 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 670 let r: DynamicReceiver<'_, u32> = c.receiver().into();
691 671
@@ -697,7 +677,7 @@ mod tests {
697 async fn receiver_receives_given_try_send_async() { 677 async fn receiver_receives_given_try_send_async() {
698 let executor = ThreadPool::new().unwrap(); 678 let executor = ThreadPool::new().unwrap();
699 679
700 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); 680 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
701 let c = &*CHANNEL.init(PriorityChannel::new()); 681 let c = &*CHANNEL.init(PriorityChannel::new());
702 let c2 = c; 682 let c2 = c;
703 assert!(executor 683 assert!(executor
@@ -710,7 +690,7 @@ mod tests {
710 690
711 #[futures_test::test] 691 #[futures_test::test]
712 async fn sender_send_completes_if_capacity() { 692 async fn sender_send_completes_if_capacity() {
713 let c = PriorityChannel::<CriticalSectionRawMutex, u32, 1>::new(); 693 let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
714 c.send(1).await; 694 c.send(1).await;
715 assert_eq!(c.receive().await, 1); 695 assert_eq!(c.receive().await, 1);
716 } 696 }
@@ -719,7 +699,7 @@ mod tests {
719 async fn senders_sends_wait_until_capacity() { 699 async fn senders_sends_wait_until_capacity() {
720 let executor = ThreadPool::new().unwrap(); 700 let executor = ThreadPool::new().unwrap();
721 701
722 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new(); 702 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
723 let c = &*CHANNEL.init(PriorityChannel::new()); 703 let c = &*CHANNEL.init(PriorityChannel::new());
724 assert!(c.try_send(1).is_ok()); 704 assert!(c.try_send(1).is_ok());
725 705