aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/util/mpsc.rs109
-rw-r--r--embassy/src/util/mutex.rs27
-rw-r--r--examples/nrf/src/bin/mpsc.rs8
3 files changed, 78 insertions, 66 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index 65e4bf7b7..e54c507c1 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -49,6 +49,7 @@ use futures::Future;
49 49
50use super::CriticalSectionMutex; 50use super::CriticalSectionMutex;
51use super::Mutex; 51use super::Mutex;
52use super::NoopMutex;
52use super::ThreadModeMutex; 53use super::ThreadModeMutex;
53 54
54/// Send values to the associated `Receiver`. 55/// Send values to the associated `Receiver`.
@@ -96,10 +97,10 @@ unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M:
96//// 97////
97/// ```compile_fail 98/// ```compile_fail
98/// use embassy::util::mpsc; 99/// use embassy::util::mpsc;
99/// use embassy::util::mpsc::{Channel, ChannelCell, WithThreadModeOnly}; 100/// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
100/// 101///
101/// let (sender, receiver) = { 102/// let (sender, receiver) = {
102/// let mut channel = ChannelCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only()); 103/// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
103/// mpsc::split(&channel) 104/// mpsc::split(&channel)
104/// }; 105/// };
105/// ``` 106/// ```
@@ -464,10 +465,10 @@ impl<T, const N: usize> Channel<WithCriticalSections, T, N> {
464 /// 465 ///
465 /// ``` 466 /// ```
466 /// use embassy::util::mpsc; 467 /// use embassy::util::mpsc;
467 /// use embassy::util::mpsc::{Channel, ChannelCell, WithCriticalSections}; 468 /// use embassy::util::mpsc::{Channel, WithCriticalSections};
468 /// 469 ///
469 /// // Declare a bounded channel of 3 u32s. 470 /// // Declare a bounded channel of 3 u32s.
470 /// let mut channel = ChannelCell::new(mpsc::Channel::<WithCriticalSections, u32, 3>::with_critical_sections()); 471 /// let mut channel = Channel::<WithCriticalSections, u32, 3>::with_critical_sections();
471 /// // once we have a channel, obtain its sender and receiver 472 /// // once we have a channel, obtain its sender and receiver
472 /// let (sender, receiver) = mpsc::split(&channel); 473 /// let (sender, receiver) = mpsc::split(&channel);
473 /// ``` 474 /// ```
@@ -488,10 +489,10 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
488 /// 489 ///
489 /// ``` no_run 490 /// ``` no_run
490 /// use embassy::util::mpsc; 491 /// use embassy::util::mpsc;
491 /// use embassy::util::mpsc::{Channel, ChannelCell, WithThreadModeOnly}; 492 /// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
492 /// 493 ///
493 /// // Declare a bounded channel of 3 u32s. 494 /// // Declare a bounded channel of 3 u32s.
494 /// let mut channel = ChannelCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only()); 495 /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
495 /// // once we have a channel, obtain its sender and receiver 496 /// // once we have a channel, obtain its sender and receiver
496 /// let (sender, receiver) = mpsc::split(&channel); 497 /// let (sender, receiver) = mpsc::split(&channel);
497 /// ``` 498 /// ```
@@ -502,6 +503,27 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
502 } 503 }
503} 504}
504 505
506pub type WithNoThreads = NoopMutex<()>;
507
508impl<T, const N: usize> Channel<WithNoThreads, T, N> {
509 /// Establish a new bounded channel for within a single thread. To create one:
510 ///
511 /// ```
512 /// use embassy::util::mpsc;
513 /// use embassy::util::mpsc::{Channel, WithNoThreads};
514 ///
515 /// // Declare a bounded channel of 3 u32s.
516 /// let mut channel = Channel::<WithNoThreads, u32, 3>::with_no_threads();
517 /// // once we have a channel, obtain its sender and receiver
518 /// let (sender, receiver) = mpsc::split(&channel);
519 /// ```
520 pub const fn with_no_threads() -> Self {
521 let mutex = NoopMutex::new(());
522 let state = ChannelState::new();
523 Channel { mutex, state }
524 }
525}
526
505impl<M, T, const N: usize> Channel<M, T, N> 527impl<M, T, const N: usize> Channel<M, T, N>
506where 528where
507 M: Mutex<Data = ()>, 529 M: Mutex<Data = ()>,
@@ -675,43 +697,6 @@ mod tests {
675 } 697 }
676 } 698 }
677 699
678 /// A mutex that does nothing - useful for our testing purposes
679 pub struct NoopMutex<T> {
680 inner: UnsafeCell<T>,
681 }
682
683 impl<T> NoopMutex<T> {
684 pub const fn new(value: T) -> Self {
685 NoopMutex {
686 inner: UnsafeCell::new(value),
687 }
688 }
689 }
690
691 impl<T> NoopMutex<T> {
692 pub fn borrow(&self) -> &T {
693 unsafe { &*self.inner.get() }
694 }
695 }
696
697 impl<T> Mutex for NoopMutex<T> {
698 type Data = T;
699
700 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
701 f(self.borrow())
702 }
703 }
704
705 pub type WithNoThreads = NoopMutex<()>;
706
707 impl<T, const N: usize> Channel<WithNoThreads, T, N> {
708 pub const fn with_no_threads() -> Self {
709 let mutex = NoopMutex::new(());
710 let state = ChannelState::new();
711 Channel { mutex, state }
712 }
713 }
714
715 #[test] 700 #[test]
716 fn sending_once() { 701 fn sending_once() {
717 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 702 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
@@ -772,7 +757,7 @@ mod tests {
772 757
773 #[test] 758 #[test]
774 fn simple_send_and_receive() { 759 fn simple_send_and_receive() {
775 let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); 760 let c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
776 let (s, r) = split(&c); 761 let (s, r) = split(&c);
777 assert!(s.clone().try_send(1).is_ok()); 762 assert!(s.clone().try_send(1).is_ok());
778 assert_eq!(r.try_recv().unwrap(), 1); 763 assert_eq!(r.try_recv().unwrap(), 1);
@@ -780,7 +765,7 @@ mod tests {
780 765
781 #[test] 766 #[test]
782 fn should_close_without_sender() { 767 fn should_close_without_sender() {
783 let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); 768 let c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
784 let (s, r) = split(&c); 769 let (s, r) = split(&c);
785 drop(s); 770 drop(s);
786 match r.try_recv() { 771 match r.try_recv() {
@@ -791,7 +776,7 @@ mod tests {
791 776
792 #[test] 777 #[test]
793 fn should_close_once_drained() { 778 fn should_close_once_drained() {
794 let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); 779 let c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
795 let (s, r) = split(&c); 780 let (s, r) = split(&c);
796 assert!(s.try_send(1).is_ok()); 781 assert!(s.try_send(1).is_ok());
797 drop(s); 782 drop(s);
@@ -804,7 +789,7 @@ mod tests {
804 789
805 #[test] 790 #[test]
806 fn should_reject_send_when_receiver_dropped() { 791 fn should_reject_send_when_receiver_dropped() {
807 let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); 792 let c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
808 let (s, r) = split(&c); 793 let (s, r) = split(&c);
809 drop(r); 794 drop(r);
810 match s.try_send(1) { 795 match s.try_send(1) {
@@ -815,7 +800,7 @@ mod tests {
815 800
816 #[test] 801 #[test]
817 fn should_reject_send_when_channel_closed() { 802 fn should_reject_send_when_channel_closed() {
818 let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); 803 let c = Channel::<WithNoThreads, u32, 3>::with_no_threads();
819 let (s, mut r) = split(&c); 804 let (s, mut r) = split(&c);
820 assert!(s.try_send(1).is_ok()); 805 assert!(s.try_send(1).is_ok());
821 r.close(); 806 r.close();
@@ -831,8 +816,8 @@ mod tests {
831 async fn receiver_closes_when_sender_dropped_async() { 816 async fn receiver_closes_when_sender_dropped_async() {
832 let executor = ThreadPool::new().unwrap(); 817 let executor = ThreadPool::new().unwrap();
833 818
834 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 3>> = 819 static mut CHANNEL: Channel<WithCriticalSections, u32, 3> =
835 ChannelCell::new(Channel::with_critical_sections()); 820 Channel::with_critical_sections();
836 let (s, mut r) = split(unsafe { &CHANNEL }); 821 let (s, mut r) = split(unsafe { &CHANNEL });
837 assert!(executor 822 assert!(executor
838 .spawn(async move { 823 .spawn(async move {
@@ -846,8 +831,8 @@ mod tests {
846 async fn receiver_receives_given_try_send_async() { 831 async fn receiver_receives_given_try_send_async() {
847 let executor = ThreadPool::new().unwrap(); 832 let executor = ThreadPool::new().unwrap();
848 833
849 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 3>> = 834 static mut CHANNEL: Channel<WithCriticalSections, u32, 3> =
850 ChannelCell::new(Channel::with_critical_sections()); 835 Channel::with_critical_sections();
851 let (s, mut r) = split(unsafe { &CHANNEL }); 836 let (s, mut r) = split(unsafe { &CHANNEL });
852 assert!(executor 837 assert!(executor
853 .spawn(async move { 838 .spawn(async move {
@@ -859,8 +844,8 @@ mod tests {
859 844
860 #[futures_test::test] 845 #[futures_test::test]
861 async fn sender_send_completes_if_capacity() { 846 async fn sender_send_completes_if_capacity() {
862 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = 847 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
863 ChannelCell::new(Channel::with_critical_sections()); 848 Channel::with_critical_sections();
864 let (s, mut r) = split(unsafe { &CHANNEL }); 849 let (s, mut r) = split(unsafe { &CHANNEL });
865 assert!(s.send(1).await.is_ok()); 850 assert!(s.send(1).await.is_ok());
866 assert_eq!(r.recv().await, Some(1)); 851 assert_eq!(r.recv().await, Some(1));
@@ -868,8 +853,8 @@ mod tests {
868 853
869 #[futures_test::test] 854 #[futures_test::test]
870 async fn sender_send_completes_if_closed() { 855 async fn sender_send_completes_if_closed() {
871 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = 856 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
872 ChannelCell::new(Channel::with_critical_sections()); 857 Channel::with_critical_sections();
873 let (s, r) = split(unsafe { &CHANNEL }); 858 let (s, r) = split(unsafe { &CHANNEL });
874 drop(r); 859 drop(r);
875 match s.send(1).await { 860 match s.send(1).await {
@@ -882,8 +867,8 @@ mod tests {
882 async fn senders_sends_wait_until_capacity() { 867 async fn senders_sends_wait_until_capacity() {
883 let executor = ThreadPool::new().unwrap(); 868 let executor = ThreadPool::new().unwrap();
884 869
885 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = 870 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
886 ChannelCell::new(Channel::with_critical_sections()); 871 Channel::with_critical_sections();
887 let (s0, mut r) = split(unsafe { &CHANNEL }); 872 let (s0, mut r) = split(unsafe { &CHANNEL });
888 assert!(s0.try_send(1).is_ok()); 873 assert!(s0.try_send(1).is_ok());
889 let s1 = s0.clone(); 874 let s1 = s0.clone();
@@ -902,8 +887,8 @@ mod tests {
902 887
903 #[futures_test::test] 888 #[futures_test::test]
904 async fn sender_close_completes_if_closing() { 889 async fn sender_close_completes_if_closing() {
905 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = 890 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
906 ChannelCell::new(Channel::with_critical_sections()); 891 Channel::with_critical_sections();
907 let (s, mut r) = split(unsafe { &CHANNEL }); 892 let (s, mut r) = split(unsafe { &CHANNEL });
908 r.close(); 893 r.close();
909 s.closed().await; 894 s.closed().await;
@@ -911,8 +896,8 @@ mod tests {
911 896
912 #[futures_test::test] 897 #[futures_test::test]
913 async fn sender_close_completes_if_closed() { 898 async fn sender_close_completes_if_closed() {
914 static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = 899 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> =
915 ChannelCell::new(Channel::with_critical_sections()); 900 Channel::with_critical_sections();
916 let (s, r) = split(unsafe { &CHANNEL }); 901 let (s, r) = split(unsafe { &CHANNEL });
917 drop(r); 902 drop(r);
918 s.closed().await; 903 s.closed().await;
diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs
index 682fcb39d..db3423cb3 100644
--- a/embassy/src/util/mutex.rs
+++ b/embassy/src/util/mutex.rs
@@ -105,3 +105,30 @@ pub fn in_thread_mode() -> bool {
105 return cortex_m::peripheral::SCB::vect_active() 105 return cortex_m::peripheral::SCB::vect_active()
106 == cortex_m::peripheral::scb::VectActive::ThreadMode; 106 == cortex_m::peripheral::scb::VectActive::ThreadMode;
107} 107}
108
109/// A "mutex" that does nothing and cannot be shared between threads.
110pub struct NoopMutex<T> {
111 inner: UnsafeCell<T>,
112}
113
114impl<T> NoopMutex<T> {
115 pub const fn new(value: T) -> Self {
116 NoopMutex {
117 inner: UnsafeCell::new(value),
118 }
119 }
120}
121
122impl<T> NoopMutex<T> {
123 pub fn borrow(&self) -> &T {
124 unsafe { &*self.inner.get() }
125 }
126}
127
128impl<T> Mutex for NoopMutex<T> {
129 type Data = T;
130
131 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
132 f(self.borrow())
133 }
134}
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
index 6a0f8f471..c2cb107e1 100644
--- a/examples/nrf/src/bin/mpsc.rs
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -16,17 +16,17 @@ use embassy::util::{mpsc, Forever};
16use embassy_nrf::gpio::{Level, Output, OutputDrive}; 16use embassy_nrf::gpio::{Level, Output, OutputDrive};
17use embassy_nrf::Peripherals; 17use embassy_nrf::Peripherals;
18use embedded_hal::digital::v2::OutputPin; 18use embedded_hal::digital::v2::OutputPin;
19use mpsc::{Channel, Sender, WithThreadModeOnly}; 19use mpsc::{Channel, Sender, WithNoThreads};
20 20
21enum LedState { 21enum LedState {
22 On, 22 On,
23 Off, 23 Off,
24} 24}
25 25
26static CHANNEL: Forever<Channel<WithThreadModeOnly, LedState, 1>> = Forever::new(); 26static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new();
27 27
28#[embassy::task(pool_size = 1)] 28#[embassy::task(pool_size = 1)]
29async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { 29async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) {
30 loop { 30 loop {
31 let _ = sender.send(LedState::On).await; 31 let _ = sender.send(LedState::On).await;
32 Timer::after(Duration::from_secs(1)).await; 32 Timer::after(Duration::from_secs(1)).await;
@@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) {
39async fn main(spawner: Spawner, p: Peripherals) { 39async fn main(spawner: Spawner, p: Peripherals) {
40 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); 40 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
41 41
42 let channel = CHANNEL.put(Channel::with_thread_mode_only()); 42 let channel = CHANNEL.put(Channel::with_no_threads());
43 let (sender, mut receiver) = mpsc::split(channel); 43 let (sender, mut receiver) = mpsc::split(channel);
44 44
45 spawner.spawn(my_task(sender)).unwrap(); 45 spawner.spawn(my_task(sender)).unwrap();