diff options
| author | huntc <[email protected]> | 2021-07-15 12:08:35 +1000 |
|---|---|---|
| committer | huntc <[email protected]> | 2021-07-15 12:31:53 +1000 |
| commit | 3778f55d80f70b336f6ca846f365cf619032a685 (patch) | |
| tree | c8208386107aac157217758b838da4789f85186a | |
| parent | 6f78527aeb7a0bacb02ca3264edd04d37550ea02 (diff) | |
Provides a cleaner construction of the channel with the common "new" naming
| -rw-r--r-- | embassy/src/util/mpsc.rs | 122 | ||||
| -rw-r--r-- | embassy/src/util/mutex.rs | 14 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 3 |
3 files changed, 51 insertions, 88 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index 246bd27e4..cc9e2a5dd 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs | |||
| @@ -581,75 +581,27 @@ where | |||
| 581 | 581 | ||
| 582 | pub type WithCriticalSections = CriticalSectionMutex<()>; | 582 | pub type WithCriticalSections = CriticalSectionMutex<()>; |
| 583 | 583 | ||
| 584 | impl<T, const N: usize> Channel<WithCriticalSections, T, N> { | ||
| 585 | /// Establish a new bounded channel using critical sections. Critical sections | ||
| 586 | /// should be used only single core targets where communication is required | ||
| 587 | /// from exception mode e.g. interrupt handlers. To create one: | ||
| 588 | /// | ||
| 589 | /// ``` | ||
| 590 | /// use embassy::util::mpsc; | ||
| 591 | /// use embassy::util::mpsc::{Channel, WithCriticalSections}; | ||
| 592 | /// | ||
| 593 | /// // Declare a bounded channel of 3 u32s. | ||
| 594 | /// let mut channel = Channel::<WithCriticalSections, u32, 3>::with_critical_sections(); | ||
| 595 | /// // once we have a channel, obtain its sender and receiver | ||
| 596 | /// let (sender, receiver) = mpsc::split(&mut channel); | ||
| 597 | /// ``` | ||
| 598 | pub const fn with_critical_sections() -> Self { | ||
| 599 | let mutex = CriticalSectionMutex::new(()); | ||
| 600 | let state = ChannelState::new(); | ||
| 601 | let channel_cell = ChannelCell { mutex, state }; | ||
| 602 | Channel { | ||
| 603 | channel_cell: UnsafeCell::new(channel_cell), | ||
| 604 | receiver_consumed: PhantomData, | ||
| 605 | } | ||
| 606 | } | ||
| 607 | } | ||
| 608 | |||
| 609 | pub type WithThreadModeOnly = ThreadModeMutex<()>; | 584 | pub type WithThreadModeOnly = ThreadModeMutex<()>; |
| 610 | 585 | ||
| 611 | impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> { | ||
| 612 | /// Establish a new bounded channel for use in Cortex-M thread mode. Thread | ||
| 613 | /// mode is intended for application threads on a single core, not interrupts. | ||
| 614 | /// As such, only one task at a time can acquire a resource and so this | ||
| 615 | /// channel avoids all locks. To create one: | ||
| 616 | /// | ||
| 617 | /// ``` no_run | ||
| 618 | /// use embassy::util::mpsc; | ||
| 619 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; | ||
| 620 | /// | ||
| 621 | /// // Declare a bounded channel of 3 u32s. | ||
| 622 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); | ||
| 623 | /// // once we have a channel, obtain its sender and receiver | ||
| 624 | /// let (sender, receiver) = mpsc::split(&mut channel); | ||
| 625 | /// ``` | ||
| 626 | pub const fn with_thread_mode_only() -> Self { | ||
| 627 | let mutex = ThreadModeMutex::new(()); | ||
| 628 | let state = ChannelState::new(); | ||
| 629 | let channel_cell = ChannelCell { mutex, state }; | ||
| 630 | Channel { | ||
| 631 | channel_cell: UnsafeCell::new(channel_cell), | ||
| 632 | receiver_consumed: PhantomData, | ||
| 633 | } | ||
| 634 | } | ||
| 635 | } | ||
| 636 | |||
| 637 | pub type WithNoThreads = NoopMutex<()>; | 586 | pub type WithNoThreads = NoopMutex<()>; |
| 638 | 587 | ||
| 639 | impl<T, const N: usize> Channel<WithNoThreads, T, N> { | 588 | impl<M, T, const N: usize> Channel<M, T, N> |
| 640 | /// Establish a new bounded channel for within a single thread. To create one: | 589 | where |
| 590 | M: Mutex<Data = ()>, | ||
| 591 | { | ||
| 592 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 641 | /// | 593 | /// |
| 642 | /// ``` | 594 | /// ``` |
| 643 | /// use embassy::util::mpsc; | 595 | /// use embassy::util::mpsc; |
| 644 | /// use embassy::util::mpsc::{Channel, WithNoThreads}; | 596 | /// use embassy::util::mpsc::{Channel, WithNoThreads}; |
| 645 | /// | 597 | /// |
| 646 | /// // Declare a bounded channel of 3 u32s. | 598 | /// // Declare a bounded channel of 3 u32s. |
| 647 | /// let mut channel = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 599 | /// let mut channel = Channel::<WithNoThreads, u32, 3>::new(); |
| 648 | /// // once we have a channel, obtain its sender and receiver | 600 | /// // once we have a channel, obtain its sender and receiver |
| 649 | /// let (sender, receiver) = mpsc::split(&mut channel); | 601 | /// let (sender, receiver) = mpsc::split(&mut channel); |
| 650 | /// ``` | 602 | /// ``` |
| 651 | pub const fn with_no_threads() -> Self { | 603 | pub fn new() -> Self { |
| 652 | let mutex = NoopMutex::new(()); | 604 | let mutex = M::new(()); |
| 653 | let state = ChannelState::new(); | 605 | let state = ChannelState::new(); |
| 654 | let channel_cell = ChannelCell { mutex, state }; | 606 | let channel_cell = ChannelCell { mutex, state }; |
| 655 | Channel { | 607 | Channel { |
| @@ -657,12 +609,7 @@ impl<T, const N: usize> Channel<WithNoThreads, T, N> { | |||
| 657 | receiver_consumed: PhantomData, | 609 | receiver_consumed: PhantomData, |
| 658 | } | 610 | } |
| 659 | } | 611 | } |
| 660 | } | ||
| 661 | 612 | ||
| 662 | impl<M, T, const N: usize> Channel<M, T, N> | ||
| 663 | where | ||
| 664 | M: Mutex<Data = ()>, | ||
| 665 | { | ||
| 666 | fn lock<R>( | 613 | fn lock<R>( |
| 667 | channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, | 614 | channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, |
| 668 | f: impl FnOnce(&mut ChannelState<T, N>) -> R, | 615 | f: impl FnOnce(&mut ChannelState<T, N>) -> R, |
| @@ -684,6 +631,8 @@ mod tests { | |||
| 684 | use futures_executor::ThreadPool; | 631 | use futures_executor::ThreadPool; |
| 685 | use futures_timer::Delay; | 632 | use futures_timer::Delay; |
| 686 | 633 | ||
| 634 | use crate::util::Forever; | ||
| 635 | |||
| 687 | use super::*; | 636 | use super::*; |
| 688 | 637 | ||
| 689 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | 638 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { |
| @@ -758,7 +707,7 @@ mod tests { | |||
| 758 | 707 | ||
| 759 | #[test] | 708 | #[test] |
| 760 | fn simple_send_and_receive() { | 709 | fn simple_send_and_receive() { |
| 761 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 710 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); |
| 762 | let (s, r) = split(&mut c); | 711 | let (s, r) = split(&mut c); |
| 763 | assert!(s.clone().try_send(1).is_ok()); | 712 | assert!(s.clone().try_send(1).is_ok()); |
| 764 | assert_eq!(r.try_recv().unwrap(), 1); | 713 | assert_eq!(r.try_recv().unwrap(), 1); |
| @@ -766,7 +715,7 @@ mod tests { | |||
| 766 | 715 | ||
| 767 | #[test] | 716 | #[test] |
| 768 | fn should_close_without_sender() { | 717 | fn should_close_without_sender() { |
| 769 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 718 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); |
| 770 | let (s, r) = split(&mut c); | 719 | let (s, r) = split(&mut c); |
| 771 | drop(s); | 720 | drop(s); |
| 772 | match r.try_recv() { | 721 | match r.try_recv() { |
| @@ -777,7 +726,7 @@ mod tests { | |||
| 777 | 726 | ||
| 778 | #[test] | 727 | #[test] |
| 779 | fn should_close_once_drained() { | 728 | fn should_close_once_drained() { |
| 780 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 729 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); |
| 781 | let (s, r) = split(&mut c); | 730 | let (s, r) = split(&mut c); |
| 782 | assert!(s.try_send(1).is_ok()); | 731 | assert!(s.try_send(1).is_ok()); |
| 783 | drop(s); | 732 | drop(s); |
| @@ -790,7 +739,7 @@ mod tests { | |||
| 790 | 739 | ||
| 791 | #[test] | 740 | #[test] |
| 792 | fn should_reject_send_when_receiver_dropped() { | 741 | fn should_reject_send_when_receiver_dropped() { |
| 793 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 742 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); |
| 794 | let (s, r) = split(&mut c); | 743 | let (s, r) = split(&mut c); |
| 795 | drop(r); | 744 | drop(r); |
| 796 | match s.try_send(1) { | 745 | match s.try_send(1) { |
| @@ -801,7 +750,7 @@ mod tests { | |||
| 801 | 750 | ||
| 802 | #[test] | 751 | #[test] |
| 803 | fn should_reject_send_when_channel_closed() { | 752 | fn should_reject_send_when_channel_closed() { |
| 804 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 753 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); |
| 805 | let (s, mut r) = split(&mut c); | 754 | let (s, mut r) = split(&mut c); |
| 806 | assert!(s.try_send(1).is_ok()); | 755 | assert!(s.try_send(1).is_ok()); |
| 807 | r.close(); | 756 | r.close(); |
| @@ -817,9 +766,9 @@ mod tests { | |||
| 817 | async fn receiver_closes_when_sender_dropped_async() { | 766 | async fn receiver_closes_when_sender_dropped_async() { |
| 818 | let executor = ThreadPool::new().unwrap(); | 767 | let executor = ThreadPool::new().unwrap(); |
| 819 | 768 | ||
| 820 | static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = | 769 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); |
| 821 | Channel::with_critical_sections(); | 770 | let c = CHANNEL.put(Channel::new()); |
| 822 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 771 | let (s, mut r) = split(c); |
| 823 | assert!(executor | 772 | assert!(executor |
| 824 | .spawn(async move { | 773 | .spawn(async move { |
| 825 | drop(s); | 774 | drop(s); |
| @@ -832,9 +781,9 @@ mod tests { | |||
| 832 | async fn receiver_receives_given_try_send_async() { | 781 | async fn receiver_receives_given_try_send_async() { |
| 833 | let executor = ThreadPool::new().unwrap(); | 782 | let executor = ThreadPool::new().unwrap(); |
| 834 | 783 | ||
| 835 | static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = | 784 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); |
| 836 | Channel::with_critical_sections(); | 785 | let c = CHANNEL.put(Channel::new()); |
| 837 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 786 | let (s, mut r) = split(c); |
| 838 | assert!(executor | 787 | assert!(executor |
| 839 | .spawn(async move { | 788 | .spawn(async move { |
| 840 | assert!(s.try_send(1).is_ok()); | 789 | assert!(s.try_send(1).is_ok()); |
| @@ -845,18 +794,17 @@ mod tests { | |||
| 845 | 794 | ||
| 846 | #[futures_test::test] | 795 | #[futures_test::test] |
| 847 | async fn sender_send_completes_if_capacity() { | 796 | async fn sender_send_completes_if_capacity() { |
| 848 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 797 | let mut c = Channel::<WithCriticalSections, u32, 1>::new(); |
| 849 | Channel::with_critical_sections(); | 798 | let (s, mut r) = split(&mut c); |
| 850 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | ||
| 851 | assert!(s.send(1).await.is_ok()); | 799 | assert!(s.send(1).await.is_ok()); |
| 852 | assert_eq!(r.recv().await, Some(1)); | 800 | assert_eq!(r.recv().await, Some(1)); |
| 853 | } | 801 | } |
| 854 | 802 | ||
| 855 | #[futures_test::test] | 803 | #[futures_test::test] |
| 856 | async fn sender_send_completes_if_closed() { | 804 | async fn sender_send_completes_if_closed() { |
| 857 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 805 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); |
| 858 | Channel::with_critical_sections(); | 806 | let c = CHANNEL.put(Channel::new()); |
| 859 | let (s, r) = split(unsafe { &mut CHANNEL }); | 807 | let (s, r) = split(c); |
| 860 | drop(r); | 808 | drop(r); |
| 861 | match s.send(1).await { | 809 | match s.send(1).await { |
| 862 | Err(SendError(1)) => assert!(true), | 810 | Err(SendError(1)) => assert!(true), |
| @@ -868,9 +816,9 @@ mod tests { | |||
| 868 | async fn senders_sends_wait_until_capacity() { | 816 | async fn senders_sends_wait_until_capacity() { |
| 869 | let executor = ThreadPool::new().unwrap(); | 817 | let executor = ThreadPool::new().unwrap(); |
| 870 | 818 | ||
| 871 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 819 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); |
| 872 | Channel::with_critical_sections(); | 820 | let c = CHANNEL.put(Channel::new()); |
| 873 | let (s0, mut r) = split(unsafe { &mut CHANNEL }); | 821 | let (s0, mut r) = split(c); |
| 874 | assert!(s0.try_send(1).is_ok()); | 822 | assert!(s0.try_send(1).is_ok()); |
| 875 | let s1 = s0.clone(); | 823 | let s1 = s0.clone(); |
| 876 | let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); | 824 | let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); |
| @@ -888,18 +836,18 @@ mod tests { | |||
| 888 | 836 | ||
| 889 | #[futures_test::test] | 837 | #[futures_test::test] |
| 890 | async fn sender_close_completes_if_closing() { | 838 | async fn sender_close_completes_if_closing() { |
| 891 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 839 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); |
| 892 | Channel::with_critical_sections(); | 840 | let c = CHANNEL.put(Channel::new()); |
| 893 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 841 | let (s, mut r) = split(c); |
| 894 | r.close(); | 842 | r.close(); |
| 895 | s.closed().await; | 843 | s.closed().await; |
| 896 | } | 844 | } |
| 897 | 845 | ||
| 898 | #[futures_test::test] | 846 | #[futures_test::test] |
| 899 | async fn sender_close_completes_if_closed() { | 847 | async fn sender_close_completes_if_closed() { |
| 900 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 848 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); |
| 901 | Channel::with_critical_sections(); | 849 | let c = CHANNEL.put(Channel::new()); |
| 902 | let (s, r) = split(unsafe { &mut CHANNEL }); | 850 | let (s, r) = split(c); |
| 903 | drop(r); | 851 | drop(r); |
| 904 | s.closed().await; | 852 | s.closed().await; |
| 905 | } | 853 | } |
diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs index c8fe84026..0506ffe6f 100644 --- a/embassy/src/util/mutex.rs +++ b/embassy/src/util/mutex.rs | |||
| @@ -8,6 +8,8 @@ pub trait Mutex { | |||
| 8 | /// Data protected by the mutex. | 8 | /// Data protected by the mutex. |
| 9 | type Data; | 9 | type Data; |
| 10 | 10 | ||
| 11 | fn new(data: Self::Data) -> Self; | ||
| 12 | |||
| 11 | /// Creates a critical section and grants temporary access to the protected data. | 13 | /// Creates a critical section and grants temporary access to the protected data. |
| 12 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; | 14 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; |
| 13 | } | 15 | } |
| @@ -47,6 +49,10 @@ impl<T> CriticalSectionMutex<T> { | |||
| 47 | impl<T> Mutex for CriticalSectionMutex<T> { | 49 | impl<T> Mutex for CriticalSectionMutex<T> { |
| 48 | type Data = T; | 50 | type Data = T; |
| 49 | 51 | ||
| 52 | fn new(data: T) -> Self { | ||
| 53 | Self::new(data) | ||
| 54 | } | ||
| 55 | |||
| 50 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | 56 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { |
| 51 | critical_section::with(|cs| f(self.borrow(cs))) | 57 | critical_section::with(|cs| f(self.borrow(cs))) |
| 52 | } | 58 | } |
| @@ -92,6 +98,10 @@ impl<T> ThreadModeMutex<T> { | |||
| 92 | impl<T> Mutex for ThreadModeMutex<T> { | 98 | impl<T> Mutex for ThreadModeMutex<T> { |
| 93 | type Data = T; | 99 | type Data = T; |
| 94 | 100 | ||
| 101 | fn new(data: T) -> Self { | ||
| 102 | Self::new(data) | ||
| 103 | } | ||
| 104 | |||
| 95 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | 105 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { |
| 96 | f(self.borrow()) | 106 | f(self.borrow()) |
| 97 | } | 107 | } |
| @@ -126,6 +136,10 @@ impl<T> NoopMutex<T> { | |||
| 126 | impl<T> Mutex for NoopMutex<T> { | 136 | impl<T> Mutex for NoopMutex<T> { |
| 127 | type Data = T; | 137 | type Data = T; |
| 128 | 138 | ||
| 139 | fn new(data: T) -> Self { | ||
| 140 | Self::new(data) | ||
| 141 | } | ||
| 142 | |||
| 129 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | 143 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { |
| 130 | f(self.borrow()) | 144 | f(self.borrow()) |
| 131 | } | 145 | } |
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index c2cb107e1..443955239 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs | |||
| @@ -37,9 +37,10 @@ async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { | |||
| 37 | 37 | ||
| 38 | #[embassy::main] | 38 | #[embassy::main] |
| 39 | async fn main(spawner: Spawner, p: Peripherals) { | 39 | async fn main(spawner: Spawner, p: Peripherals) { |
| 40 | |||
| 40 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); | 41 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); |
| 41 | 42 | ||
| 42 | let channel = CHANNEL.put(Channel::with_no_threads()); | 43 | let channel = CHANNEL.put(Channel::new()); |
| 43 | let (sender, mut receiver) = mpsc::split(channel); | 44 | let (sender, mut receiver) = mpsc::split(channel); |
| 44 | 45 | ||
| 45 | spawner.spawn(my_task(sender)).unwrap(); | 46 | spawner.spawn(my_task(sender)).unwrap(); |
