diff options
| -rw-r--r-- | embassy/src/util/mpsc.rs | 109 | ||||
| -rw-r--r-- | embassy/src/util/mutex.rs | 27 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 8 |
3 files changed, 78 insertions, 66 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index 65e4bf7b7..e54c507c1 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs | |||
| @@ -49,6 +49,7 @@ use futures::Future; | |||
| 49 | 49 | ||
| 50 | use super::CriticalSectionMutex; | 50 | use super::CriticalSectionMutex; |
| 51 | use super::Mutex; | 51 | use super::Mutex; |
| 52 | use super::NoopMutex; | ||
| 52 | use super::ThreadModeMutex; | 53 | use super::ThreadModeMutex; |
| 53 | 54 | ||
| 54 | /// Send values to the associated `Receiver`. | 55 | /// Send values to the associated `Receiver`. |
| @@ -96,10 +97,10 @@ unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: | |||
| 96 | //// | 97 | //// |
| 97 | /// ```compile_fail | 98 | /// ```compile_fail |
| 98 | /// use embassy::util::mpsc; | 99 | /// use embassy::util::mpsc; |
| 99 | /// use embassy::util::mpsc::{Channel, ChannelCell, WithThreadModeOnly}; | 100 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; |
| 100 | /// | 101 | /// |
| 101 | /// let (sender, receiver) = { | 102 | /// let (sender, receiver) = { |
| 102 | /// let mut channel = ChannelCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only()); | 103 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); |
| 103 | /// mpsc::split(&channel) | 104 | /// mpsc::split(&channel) |
| 104 | /// }; | 105 | /// }; |
| 105 | /// ``` | 106 | /// ``` |
| @@ -464,10 +465,10 @@ impl<T, const N: usize> Channel<WithCriticalSections, T, N> { | |||
| 464 | /// | 465 | /// |
| 465 | /// ``` | 466 | /// ``` |
| 466 | /// use embassy::util::mpsc; | 467 | /// use embassy::util::mpsc; |
| 467 | /// use embassy::util::mpsc::{Channel, ChannelCell, WithCriticalSections}; | 468 | /// use embassy::util::mpsc::{Channel, WithCriticalSections}; |
| 468 | /// | 469 | /// |
| 469 | /// // Declare a bounded channel of 3 u32s. | 470 | /// // Declare a bounded channel of 3 u32s. |
| 470 | /// let mut channel = ChannelCell::new(mpsc::Channel::<WithCriticalSections, u32, 3>::with_critical_sections()); | 471 | /// let mut channel = Channel::<WithCriticalSections, u32, 3>::with_critical_sections(); |
| 471 | /// // once we have a channel, obtain its sender and receiver | 472 | /// // once we have a channel, obtain its sender and receiver |
| 472 | /// let (sender, receiver) = mpsc::split(&channel); | 473 | /// let (sender, receiver) = mpsc::split(&channel); |
| 473 | /// ``` | 474 | /// ``` |
| @@ -488,10 +489,10 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> { | |||
| 488 | /// | 489 | /// |
| 489 | /// ``` no_run | 490 | /// ``` no_run |
| 490 | /// use embassy::util::mpsc; | 491 | /// use embassy::util::mpsc; |
| 491 | /// use embassy::util::mpsc::{Channel, ChannelCell, WithThreadModeOnly}; | 492 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; |
| 492 | /// | 493 | /// |
| 493 | /// // Declare a bounded channel of 3 u32s. | 494 | /// // Declare a bounded channel of 3 u32s. |
| 494 | /// let mut channel = ChannelCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only()); | 495 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); |
| 495 | /// // once we have a channel, obtain its sender and receiver | 496 | /// // once we have a channel, obtain its sender and receiver |
| 496 | /// let (sender, receiver) = mpsc::split(&channel); | 497 | /// let (sender, receiver) = mpsc::split(&channel); |
| 497 | /// ``` | 498 | /// ``` |
| @@ -502,6 +503,27 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> { | |||
| 502 | } | 503 | } |
| 503 | } | 504 | } |
| 504 | 505 | ||
| 506 | pub type WithNoThreads = NoopMutex<()>; | ||
| 507 | |||
| 508 | impl<T, const N: usize> Channel<WithNoThreads, T, N> { | ||
| 509 | /// Establish a new bounded channel for within a single thread. To create one: | ||
| 510 | /// | ||
| 511 | /// ``` | ||
| 512 | /// use embassy::util::mpsc; | ||
| 513 | /// use embassy::util::mpsc::{Channel, WithNoThreads}; | ||
| 514 | /// | ||
| 515 | /// // Declare a bounded channel of 3 u32s. | ||
| 516 | /// let mut channel = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | ||
| 517 | /// // once we have a channel, obtain its sender and receiver | ||
| 518 | /// let (sender, receiver) = mpsc::split(&channel); | ||
| 519 | /// ``` | ||
| 520 | pub const fn with_no_threads() -> Self { | ||
| 521 | let mutex = NoopMutex::new(()); | ||
| 522 | let state = ChannelState::new(); | ||
| 523 | Channel { mutex, state } | ||
| 524 | } | ||
| 525 | } | ||
| 526 | |||
| 505 | impl<M, T, const N: usize> Channel<M, T, N> | 527 | impl<M, T, const N: usize> Channel<M, T, N> |
| 506 | where | 528 | where |
| 507 | M: Mutex<Data = ()>, | 529 | M: Mutex<Data = ()>, |
| @@ -675,43 +697,6 @@ mod tests { | |||
| 675 | } | 697 | } |
| 676 | } | 698 | } |
| 677 | 699 | ||
| 678 | /// A mutex that does nothing - useful for our testing purposes | ||
| 679 | pub struct NoopMutex<T> { | ||
| 680 | inner: UnsafeCell<T>, | ||
| 681 | } | ||
| 682 | |||
| 683 | impl<T> NoopMutex<T> { | ||
| 684 | pub const fn new(value: T) -> Self { | ||
| 685 | NoopMutex { | ||
| 686 | inner: UnsafeCell::new(value), | ||
| 687 | } | ||
| 688 | } | ||
| 689 | } | ||
| 690 | |||
| 691 | impl<T> NoopMutex<T> { | ||
| 692 | pub fn borrow(&self) -> &T { | ||
| 693 | unsafe { &*self.inner.get() } | ||
| 694 | } | ||
| 695 | } | ||
| 696 | |||
| 697 | impl<T> Mutex for NoopMutex<T> { | ||
| 698 | type Data = T; | ||
| 699 | |||
| 700 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | ||
| 701 | f(self.borrow()) | ||
| 702 | } | ||
| 703 | } | ||
| 704 | |||
| 705 | pub type WithNoThreads = NoopMutex<()>; | ||
| 706 | |||
| 707 | impl<T, const N: usize> Channel<WithNoThreads, T, N> { | ||
| 708 | pub const fn with_no_threads() -> Self { | ||
| 709 | let mutex = NoopMutex::new(()); | ||
| 710 | let state = ChannelState::new(); | ||
| 711 | Channel { mutex, state } | ||
| 712 | } | ||
| 713 | } | ||
| 714 | |||
| 715 | #[test] | 700 | #[test] |
| 716 | fn sending_once() { | 701 | fn sending_once() { |
| 717 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 702 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); |
| @@ -772,7 +757,7 @@ mod tests { | |||
| 772 | 757 | ||
| 773 | #[test] | 758 | #[test] |
| 774 | fn simple_send_and_receive() { | 759 | fn simple_send_and_receive() { |
| 775 | let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); | 760 | let c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); |
| 776 | let (s, r) = split(&c); | 761 | let (s, r) = split(&c); |
| 777 | assert!(s.clone().try_send(1).is_ok()); | 762 | assert!(s.clone().try_send(1).is_ok()); |
| 778 | assert_eq!(r.try_recv().unwrap(), 1); | 763 | assert_eq!(r.try_recv().unwrap(), 1); |
| @@ -780,7 +765,7 @@ mod tests { | |||
| 780 | 765 | ||
| 781 | #[test] | 766 | #[test] |
| 782 | fn should_close_without_sender() { | 767 | fn should_close_without_sender() { |
| 783 | let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); | 768 | let c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); |
| 784 | let (s, r) = split(&c); | 769 | let (s, r) = split(&c); |
| 785 | drop(s); | 770 | drop(s); |
| 786 | match r.try_recv() { | 771 | match r.try_recv() { |
| @@ -791,7 +776,7 @@ mod tests { | |||
| 791 | 776 | ||
| 792 | #[test] | 777 | #[test] |
| 793 | fn should_close_once_drained() { | 778 | fn should_close_once_drained() { |
| 794 | let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); | 779 | let c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); |
| 795 | let (s, r) = split(&c); | 780 | let (s, r) = split(&c); |
| 796 | assert!(s.try_send(1).is_ok()); | 781 | assert!(s.try_send(1).is_ok()); |
| 797 | drop(s); | 782 | drop(s); |
| @@ -804,7 +789,7 @@ mod tests { | |||
| 804 | 789 | ||
| 805 | #[test] | 790 | #[test] |
| 806 | fn should_reject_send_when_receiver_dropped() { | 791 | fn should_reject_send_when_receiver_dropped() { |
| 807 | let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); | 792 | let c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); |
| 808 | let (s, r) = split(&c); | 793 | let (s, r) = split(&c); |
| 809 | drop(r); | 794 | drop(r); |
| 810 | match s.try_send(1) { | 795 | match s.try_send(1) { |
| @@ -815,7 +800,7 @@ mod tests { | |||
| 815 | 800 | ||
| 816 | #[test] | 801 | #[test] |
| 817 | fn should_reject_send_when_channel_closed() { | 802 | fn should_reject_send_when_channel_closed() { |
| 818 | let c = ChannelCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); | 803 | let c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); |
| 819 | let (s, mut r) = split(&c); | 804 | let (s, mut r) = split(&c); |
| 820 | assert!(s.try_send(1).is_ok()); | 805 | assert!(s.try_send(1).is_ok()); |
| 821 | r.close(); | 806 | r.close(); |
| @@ -831,8 +816,8 @@ mod tests { | |||
| 831 | async fn receiver_closes_when_sender_dropped_async() { | 816 | async fn receiver_closes_when_sender_dropped_async() { |
| 832 | let executor = ThreadPool::new().unwrap(); | 817 | let executor = ThreadPool::new().unwrap(); |
| 833 | 818 | ||
| 834 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 3>> = | 819 | static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = |
| 835 | ChannelCell::new(Channel::with_critical_sections()); | 820 | Channel::with_critical_sections(); |
| 836 | let (s, mut r) = split(unsafe { &CHANNEL }); | 821 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 837 | assert!(executor | 822 | assert!(executor |
| 838 | .spawn(async move { | 823 | .spawn(async move { |
| @@ -846,8 +831,8 @@ mod tests { | |||
| 846 | async fn receiver_receives_given_try_send_async() { | 831 | async fn receiver_receives_given_try_send_async() { |
| 847 | let executor = ThreadPool::new().unwrap(); | 832 | let executor = ThreadPool::new().unwrap(); |
| 848 | 833 | ||
| 849 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 3>> = | 834 | static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = |
| 850 | ChannelCell::new(Channel::with_critical_sections()); | 835 | Channel::with_critical_sections(); |
| 851 | let (s, mut r) = split(unsafe { &CHANNEL }); | 836 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 852 | assert!(executor | 837 | assert!(executor |
| 853 | .spawn(async move { | 838 | .spawn(async move { |
| @@ -859,8 +844,8 @@ mod tests { | |||
| 859 | 844 | ||
| 860 | #[futures_test::test] | 845 | #[futures_test::test] |
| 861 | async fn sender_send_completes_if_capacity() { | 846 | async fn sender_send_completes_if_capacity() { |
| 862 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = | 847 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = |
| 863 | ChannelCell::new(Channel::with_critical_sections()); | 848 | Channel::with_critical_sections(); |
| 864 | let (s, mut r) = split(unsafe { &CHANNEL }); | 849 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 865 | assert!(s.send(1).await.is_ok()); | 850 | assert!(s.send(1).await.is_ok()); |
| 866 | assert_eq!(r.recv().await, Some(1)); | 851 | assert_eq!(r.recv().await, Some(1)); |
| @@ -868,8 +853,8 @@ mod tests { | |||
| 868 | 853 | ||
| 869 | #[futures_test::test] | 854 | #[futures_test::test] |
| 870 | async fn sender_send_completes_if_closed() { | 855 | async fn sender_send_completes_if_closed() { |
| 871 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = | 856 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = |
| 872 | ChannelCell::new(Channel::with_critical_sections()); | 857 | Channel::with_critical_sections(); |
| 873 | let (s, r) = split(unsafe { &CHANNEL }); | 858 | let (s, r) = split(unsafe { &CHANNEL }); |
| 874 | drop(r); | 859 | drop(r); |
| 875 | match s.send(1).await { | 860 | match s.send(1).await { |
| @@ -882,8 +867,8 @@ mod tests { | |||
| 882 | async fn senders_sends_wait_until_capacity() { | 867 | async fn senders_sends_wait_until_capacity() { |
| 883 | let executor = ThreadPool::new().unwrap(); | 868 | let executor = ThreadPool::new().unwrap(); |
| 884 | 869 | ||
| 885 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = | 870 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = |
| 886 | ChannelCell::new(Channel::with_critical_sections()); | 871 | Channel::with_critical_sections(); |
| 887 | let (s0, mut r) = split(unsafe { &CHANNEL }); | 872 | let (s0, mut r) = split(unsafe { &CHANNEL }); |
| 888 | assert!(s0.try_send(1).is_ok()); | 873 | assert!(s0.try_send(1).is_ok()); |
| 889 | let s1 = s0.clone(); | 874 | let s1 = s0.clone(); |
| @@ -902,8 +887,8 @@ mod tests { | |||
| 902 | 887 | ||
| 903 | #[futures_test::test] | 888 | #[futures_test::test] |
| 904 | async fn sender_close_completes_if_closing() { | 889 | async fn sender_close_completes_if_closing() { |
| 905 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = | 890 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = |
| 906 | ChannelCell::new(Channel::with_critical_sections()); | 891 | Channel::with_critical_sections(); |
| 907 | let (s, mut r) = split(unsafe { &CHANNEL }); | 892 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 908 | r.close(); | 893 | r.close(); |
| 909 | s.closed().await; | 894 | s.closed().await; |
| @@ -911,8 +896,8 @@ mod tests { | |||
| 911 | 896 | ||
| 912 | #[futures_test::test] | 897 | #[futures_test::test] |
| 913 | async fn sender_close_completes_if_closed() { | 898 | async fn sender_close_completes_if_closed() { |
| 914 | static mut CHANNEL: ChannelCell<Channel<WithCriticalSections, u32, 1>> = | 899 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = |
| 915 | ChannelCell::new(Channel::with_critical_sections()); | 900 | Channel::with_critical_sections(); |
| 916 | let (s, r) = split(unsafe { &CHANNEL }); | 901 | let (s, r) = split(unsafe { &CHANNEL }); |
| 917 | drop(r); | 902 | drop(r); |
| 918 | s.closed().await; | 903 | s.closed().await; |
diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs index 682fcb39d..db3423cb3 100644 --- a/embassy/src/util/mutex.rs +++ b/embassy/src/util/mutex.rs | |||
| @@ -105,3 +105,30 @@ pub fn in_thread_mode() -> bool { | |||
| 105 | return cortex_m::peripheral::SCB::vect_active() | 105 | return cortex_m::peripheral::SCB::vect_active() |
| 106 | == cortex_m::peripheral::scb::VectActive::ThreadMode; | 106 | == cortex_m::peripheral::scb::VectActive::ThreadMode; |
| 107 | } | 107 | } |
| 108 | |||
| 109 | /// A "mutex" that does nothing and cannot be shared between threads. | ||
| 110 | pub struct NoopMutex<T> { | ||
| 111 | inner: UnsafeCell<T>, | ||
| 112 | } | ||
| 113 | |||
| 114 | impl<T> NoopMutex<T> { | ||
| 115 | pub const fn new(value: T) -> Self { | ||
| 116 | NoopMutex { | ||
| 117 | inner: UnsafeCell::new(value), | ||
| 118 | } | ||
| 119 | } | ||
| 120 | } | ||
| 121 | |||
| 122 | impl<T> NoopMutex<T> { | ||
| 123 | pub fn borrow(&self) -> &T { | ||
| 124 | unsafe { &*self.inner.get() } | ||
| 125 | } | ||
| 126 | } | ||
| 127 | |||
| 128 | impl<T> Mutex for NoopMutex<T> { | ||
| 129 | type Data = T; | ||
| 130 | |||
| 131 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | ||
| 132 | f(self.borrow()) | ||
| 133 | } | ||
| 134 | } | ||
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 6a0f8f471..c2cb107e1 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs | |||
| @@ -16,17 +16,17 @@ use embassy::util::{mpsc, Forever}; | |||
| 16 | use embassy_nrf::gpio::{Level, Output, OutputDrive}; | 16 | use embassy_nrf::gpio::{Level, Output, OutputDrive}; |
| 17 | use embassy_nrf::Peripherals; | 17 | use embassy_nrf::Peripherals; |
| 18 | use embedded_hal::digital::v2::OutputPin; | 18 | use embedded_hal::digital::v2::OutputPin; |
| 19 | use mpsc::{Channel, Sender, WithThreadModeOnly}; | 19 | use mpsc::{Channel, Sender, WithNoThreads}; |
| 20 | 20 | ||
| 21 | enum LedState { | 21 | enum LedState { |
| 22 | On, | 22 | On, |
| 23 | Off, | 23 | Off, |
| 24 | } | 24 | } |
| 25 | 25 | ||
| 26 | static CHANNEL: Forever<Channel<WithThreadModeOnly, LedState, 1>> = Forever::new(); | 26 | static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new(); |
| 27 | 27 | ||
| 28 | #[embassy::task(pool_size = 1)] | 28 | #[embassy::task(pool_size = 1)] |
| 29 | async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { | 29 | async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { |
| 30 | loop { | 30 | loop { |
| 31 | let _ = sender.send(LedState::On).await; | 31 | let _ = sender.send(LedState::On).await; |
| 32 | Timer::after(Duration::from_secs(1)).await; | 32 | Timer::after(Duration::from_secs(1)).await; |
| @@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { | |||
| 39 | async fn main(spawner: Spawner, p: Peripherals) { | 39 | async fn main(spawner: Spawner, p: Peripherals) { |
| 40 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); | 40 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); |
| 41 | 41 | ||
| 42 | let channel = CHANNEL.put(Channel::with_thread_mode_only()); | 42 | let channel = CHANNEL.put(Channel::with_no_threads()); |
| 43 | let (sender, mut receiver) = mpsc::split(channel); | 43 | let (sender, mut receiver) = mpsc::split(channel); |
| 44 | 44 | ||
| 45 | spawner.spawn(my_task(sender)).unwrap(); | 45 | spawner.spawn(my_task(sender)).unwrap(); |
