aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuntc <[email protected]>2021-07-15 12:08:35 +1000
committerhuntc <[email protected]>2021-07-15 12:31:53 +1000
commit3778f55d80f70b336f6ca846f365cf619032a685 (patch)
treec8208386107aac157217758b838da4789f85186a
parent6f78527aeb7a0bacb02ca3264edd04d37550ea02 (diff)
Provides a cleaner construction of the channel with the common "new" naming
-rw-r--r--embassy/src/util/mpsc.rs122
-rw-r--r--embassy/src/util/mutex.rs14
-rw-r--r--examples/nrf/src/bin/mpsc.rs3
3 files changed, 51 insertions, 88 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index 246bd27e4..cc9e2a5dd 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -581,75 +581,27 @@ where
581 581
582pub type WithCriticalSections = CriticalSectionMutex<()>; 582pub type WithCriticalSections = CriticalSectionMutex<()>;
583 583
584impl<T, const N: usize> Channel<WithCriticalSections, T, N> {
585 /// Establish a new bounded channel using critical sections. Critical sections
586 /// should be used only single core targets where communication is required
587 /// from exception mode e.g. interrupt handlers. To create one:
588 ///
589 /// ```
590 /// use embassy::util::mpsc;
591 /// use embassy::util::mpsc::{Channel, WithCriticalSections};
592 ///
593 /// // Declare a bounded channel of 3 u32s.
594 /// let mut channel = Channel::<WithCriticalSections, u32, 3>::with_critical_sections();
595 /// // once we have a channel, obtain its sender and receiver
596 /// let (sender, receiver) = mpsc::split(&mut channel);
597 /// ```
598 pub const fn with_critical_sections() -> Self {
599 let mutex = CriticalSectionMutex::new(());
600 let state = ChannelState::new();
601 let channel_cell = ChannelCell { mutex, state };
602 Channel {
603 channel_cell: UnsafeCell::new(channel_cell),
604 receiver_consumed: PhantomData,
605 }
606 }
607}
608
609pub type WithThreadModeOnly = ThreadModeMutex<()>; 584pub type WithThreadModeOnly = ThreadModeMutex<()>;
610 585
611impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
612 /// Establish a new bounded channel for use in Cortex-M thread mode. Thread
613 /// mode is intended for application threads on a single core, not interrupts.
614 /// As such, only one task at a time can acquire a resource and so this
615 /// channel avoids all locks. To create one:
616 ///
617 /// ``` no_run
618 /// use embassy::util::mpsc;
619 /// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
620 ///
621 /// // Declare a bounded channel of 3 u32s.
622 /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
623 /// // once we have a channel, obtain its sender and receiver
624 /// let (sender, receiver) = mpsc::split(&mut channel);
625 /// ```
626 pub const fn with_thread_mode_only() -> Self {
627 let mutex = ThreadModeMutex::new(());
628 let state = ChannelState::new();
629 let channel_cell = ChannelCell { mutex, state };
630 Channel {
631 channel_cell: UnsafeCell::new(channel_cell),
632 receiver_consumed: PhantomData,
633 }
634 }
635}
636
637pub type WithNoThreads = NoopMutex<()>; 586pub type WithNoThreads = NoopMutex<()>;
638 587
639impl<T, const N: usize> Channel<WithNoThreads, T, N> { 588impl<M, T, const N: usize> Channel<M, T, N>
640 /// Establish a new bounded channel for within a single thread. To create one: 589where
590 M: Mutex<Data = ()>,
591{
592 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
641 /// 593 ///
642 /// ``` 594 /// ```
643 /// use embassy::util::mpsc; 595 /// use embassy::util::mpsc;
644 /// use embassy::util::mpsc::{Channel, WithNoThreads}; 596 /// use embassy::util::mpsc::{Channel, WithNoThreads};
645 /// 597 ///
646 /// // Declare a bounded channel of 3 u32s. 598 /// // Declare a bounded channel of 3 u32s.
647 /// let mut channel = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 599 /// let mut channel = Channel::<WithNoThreads, u32, 3>::new();
648 /// // once we have a channel, obtain its sender and receiver 600 /// // once we have a channel, obtain its sender and receiver
649 /// let (sender, receiver) = mpsc::split(&mut channel); 601 /// let (sender, receiver) = mpsc::split(&mut channel);
650 /// ``` 602 /// ```
651 pub const fn with_no_threads() -> Self { 603 pub fn new() -> Self {
652 let mutex = NoopMutex::new(()); 604 let mutex = M::new(());
653 let state = ChannelState::new(); 605 let state = ChannelState::new();
654 let channel_cell = ChannelCell { mutex, state }; 606 let channel_cell = ChannelCell { mutex, state };
655 Channel { 607 Channel {
@@ -657,12 +609,7 @@ impl<T, const N: usize> Channel<WithNoThreads, T, N> {
657 receiver_consumed: PhantomData, 609 receiver_consumed: PhantomData,
658 } 610 }
659 } 611 }
660}
661 612
662impl<M, T, const N: usize> Channel<M, T, N>
663where
664 M: Mutex<Data = ()>,
665{
666 fn lock<R>( 613 fn lock<R>(
667 channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, 614 channel_cell: &UnsafeCell<ChannelCell<M, T, N>>,
668 f: impl FnOnce(&mut ChannelState<T, N>) -> R, 615 f: impl FnOnce(&mut ChannelState<T, N>) -> R,
@@ -684,6 +631,8 @@ mod tests {
684 use futures_executor::ThreadPool; 631 use futures_executor::ThreadPool;
685 use futures_timer::Delay; 632 use futures_timer::Delay;
686 633
634 use crate::util::Forever;
635
687 use super::*; 636 use super::*;
688 637
689 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { 638 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
@@ -758,7 +707,7 @@ mod tests {
758 707
759 #[test] 708 #[test]
760 fn simple_send_and_receive() { 709 fn simple_send_and_receive() {
761 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 710 let mut c = Channel::<WithNoThreads, u32, 3>::new();
762 let (s, r) = split(&mut c); 711 let (s, r) = split(&mut c);
763 assert!(s.clone().try_send(1).is_ok()); 712 assert!(s.clone().try_send(1).is_ok());
764 assert_eq!(r.try_recv().unwrap(), 1); 713 assert_eq!(r.try_recv().unwrap(), 1);
@@ -766,7 +715,7 @@ mod tests {
766 715
767 #[test] 716 #[test]
768 fn should_close_without_sender() { 717 fn should_close_without_sender() {
769 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 718 let mut c = Channel::<WithNoThreads, u32, 3>::new();
770 let (s, r) = split(&mut c); 719 let (s, r) = split(&mut c);
771 drop(s); 720 drop(s);
772 match r.try_recv() { 721 match r.try_recv() {
@@ -777,7 +726,7 @@ mod tests {
777 726
778 #[test] 727 #[test]
779 fn should_close_once_drained() { 728 fn should_close_once_drained() {
780 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 729 let mut c = Channel::<WithNoThreads, u32, 3>::new();
781 let (s, r) = split(&mut c); 730 let (s, r) = split(&mut c);
782 assert!(s.try_send(1).is_ok()); 731 assert!(s.try_send(1).is_ok());
783 drop(s); 732 drop(s);
@@ -790,7 +739,7 @@ mod tests {
790 739
791 #[test] 740 #[test]
792 fn should_reject_send_when_receiver_dropped() { 741 fn should_reject_send_when_receiver_dropped() {
793 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 742 let mut c = Channel::<WithNoThreads, u32, 3>::new();
794 let (s, r) = split(&mut c); 743 let (s, r) = split(&mut c);
795 drop(r); 744 drop(r);
796 match s.try_send(1) { 745 match s.try_send(1) {
@@ -801,7 +750,7 @@ mod tests {
801 750
802 #[test] 751 #[test]
803 fn should_reject_send_when_channel_closed() { 752 fn should_reject_send_when_channel_closed() {
804 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 753 let mut c = Channel::<WithNoThreads, u32, 3>::new();
805 let (s, mut r) = split(&mut c); 754 let (s, mut r) = split(&mut c);
806 assert!(s.try_send(1).is_ok()); 755 assert!(s.try_send(1).is_ok());
807 r.close(); 756 r.close();
@@ -817,9 +766,9 @@ mod tests {
817 async fn receiver_closes_when_sender_dropped_async() { 766 async fn receiver_closes_when_sender_dropped_async() {
818 let executor = ThreadPool::new().unwrap(); 767 let executor = ThreadPool::new().unwrap();
819 768
820 static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = 769 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
821 Channel::with_critical_sections(); 770 let c = CHANNEL.put(Channel::new());
822 let (s, mut r) = split(unsafe { &mut CHANNEL }); 771 let (s, mut r) = split(c);
823 assert!(executor 772 assert!(executor
824 .spawn(async move { 773 .spawn(async move {
825 drop(s); 774 drop(s);
@@ -832,9 +781,9 @@ mod tests {
832 async fn receiver_receives_given_try_send_async() { 781 async fn receiver_receives_given_try_send_async() {
833 let executor = ThreadPool::new().unwrap(); 782 let executor = ThreadPool::new().unwrap();
834 783
835 static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = 784 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
836 Channel::with_critical_sections(); 785 let c = CHANNEL.put(Channel::new());
837 let (s, mut r) = split(unsafe { &mut CHANNEL }); 786 let (s, mut r) = split(c);
838 assert!(executor 787 assert!(executor
839 .spawn(async move { 788 .spawn(async move {
840 assert!(s.try_send(1).is_ok()); 789 assert!(s.try_send(1).is_ok());
@@ -845,18 +794,17 @@ mod tests {
845 794
846 #[futures_test::test] 795 #[futures_test::test]
847 async fn sender_send_completes_if_capacity() { 796 async fn sender_send_completes_if_capacity() {
848 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 797 let mut c = Channel::<WithCriticalSections, u32, 1>::new();
849 Channel::with_critical_sections(); 798 let (s, mut r) = split(&mut c);
850 let (s, mut r) = split(unsafe { &mut CHANNEL });
851 assert!(s.send(1).await.is_ok()); 799 assert!(s.send(1).await.is_ok());
852 assert_eq!(r.recv().await, Some(1)); 800 assert_eq!(r.recv().await, Some(1));
853 } 801 }
854 802
855 #[futures_test::test] 803 #[futures_test::test]
856 async fn sender_send_completes_if_closed() { 804 async fn sender_send_completes_if_closed() {
857 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 805 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
858 Channel::with_critical_sections(); 806 let c = CHANNEL.put(Channel::new());
859 let (s, r) = split(unsafe { &mut CHANNEL }); 807 let (s, r) = split(c);
860 drop(r); 808 drop(r);
861 match s.send(1).await { 809 match s.send(1).await {
862 Err(SendError(1)) => assert!(true), 810 Err(SendError(1)) => assert!(true),
@@ -868,9 +816,9 @@ mod tests {
868 async fn senders_sends_wait_until_capacity() { 816 async fn senders_sends_wait_until_capacity() {
869 let executor = ThreadPool::new().unwrap(); 817 let executor = ThreadPool::new().unwrap();
870 818
871 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 819 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
872 Channel::with_critical_sections(); 820 let c = CHANNEL.put(Channel::new());
873 let (s0, mut r) = split(unsafe { &mut CHANNEL }); 821 let (s0, mut r) = split(c);
874 assert!(s0.try_send(1).is_ok()); 822 assert!(s0.try_send(1).is_ok());
875 let s1 = s0.clone(); 823 let s1 = s0.clone();
876 let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); 824 let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
@@ -888,18 +836,18 @@ mod tests {
888 836
889 #[futures_test::test] 837 #[futures_test::test]
890 async fn sender_close_completes_if_closing() { 838 async fn sender_close_completes_if_closing() {
891 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 839 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
892 Channel::with_critical_sections(); 840 let c = CHANNEL.put(Channel::new());
893 let (s, mut r) = split(unsafe { &mut CHANNEL }); 841 let (s, mut r) = split(c);
894 r.close(); 842 r.close();
895 s.closed().await; 843 s.closed().await;
896 } 844 }
897 845
898 #[futures_test::test] 846 #[futures_test::test]
899 async fn sender_close_completes_if_closed() { 847 async fn sender_close_completes_if_closed() {
900 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 848 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
901 Channel::with_critical_sections(); 849 let c = CHANNEL.put(Channel::new());
902 let (s, r) = split(unsafe { &mut CHANNEL }); 850 let (s, r) = split(c);
903 drop(r); 851 drop(r);
904 s.closed().await; 852 s.closed().await;
905 } 853 }
diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs
index c8fe84026..0506ffe6f 100644
--- a/embassy/src/util/mutex.rs
+++ b/embassy/src/util/mutex.rs
@@ -8,6 +8,8 @@ pub trait Mutex {
8 /// Data protected by the mutex. 8 /// Data protected by the mutex.
9 type Data; 9 type Data;
10 10
11 fn new(data: Self::Data) -> Self;
12
11 /// Creates a critical section and grants temporary access to the protected data. 13 /// Creates a critical section and grants temporary access to the protected data.
12 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; 14 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R;
13} 15}
@@ -47,6 +49,10 @@ impl<T> CriticalSectionMutex<T> {
47impl<T> Mutex for CriticalSectionMutex<T> { 49impl<T> Mutex for CriticalSectionMutex<T> {
48 type Data = T; 50 type Data = T;
49 51
52 fn new(data: T) -> Self {
53 Self::new(data)
54 }
55
50 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { 56 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
51 critical_section::with(|cs| f(self.borrow(cs))) 57 critical_section::with(|cs| f(self.borrow(cs)))
52 } 58 }
@@ -92,6 +98,10 @@ impl<T> ThreadModeMutex<T> {
92impl<T> Mutex for ThreadModeMutex<T> { 98impl<T> Mutex for ThreadModeMutex<T> {
93 type Data = T; 99 type Data = T;
94 100
101 fn new(data: T) -> Self {
102 Self::new(data)
103 }
104
95 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { 105 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
96 f(self.borrow()) 106 f(self.borrow())
97 } 107 }
@@ -126,6 +136,10 @@ impl<T> NoopMutex<T> {
126impl<T> Mutex for NoopMutex<T> { 136impl<T> Mutex for NoopMutex<T> {
127 type Data = T; 137 type Data = T;
128 138
139 fn new(data: T) -> Self {
140 Self::new(data)
141 }
142
129 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { 143 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
130 f(self.borrow()) 144 f(self.borrow())
131 } 145 }
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
index c2cb107e1..443955239 100644
--- a/examples/nrf/src/bin/mpsc.rs
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -37,9 +37,10 @@ async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) {
37 37
38#[embassy::main] 38#[embassy::main]
39async fn main(spawner: Spawner, p: Peripherals) { 39async fn main(spawner: Spawner, p: Peripherals) {
40
40 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); 41 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
41 42
42 let channel = CHANNEL.put(Channel::with_no_threads()); 43 let channel = CHANNEL.put(Channel::new());
43 let (sender, mut receiver) = mpsc::split(channel); 44 let (sender, mut receiver) = mpsc::split(channel);
44 45
45 spawner.spawn(my_task(sender)).unwrap(); 46 spawner.spawn(my_task(sender)).unwrap();