From 1b9d5e50710cefde4bd1e234695783d62e824c68 Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 6 Jun 2021 18:36:16 +1000 Subject: Multi Producer Single Consumer channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An MPSC inspired by Tokio and Crossbeam. The MPSC is designed to support both single and multi core processors, with only single core implemented at this time. The allocation of the channel’s buffer is inspired by the const generic parameters that Heapless provides. --- examples/nrf/src/bin/mpsc.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 examples/nrf/src/bin/mpsc.rs (limited to 'examples') diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs new file mode 100644 index 000000000..6a0f8f471 --- /dev/null +++ b/examples/nrf/src/bin/mpsc.rs @@ -0,0 +1,64 @@ +#![no_std] +#![no_main] +#![feature(min_type_alias_impl_trait)] +#![feature(impl_trait_in_bindings)] +#![feature(type_alias_impl_trait)] +#![allow(incomplete_features)] + +#[path = "../example_common.rs"] +mod example_common; + +use defmt::panic; +use embassy::executor::Spawner; +use embassy::time::{Duration, Timer}; +use embassy::util::mpsc::TryRecvError; +use embassy::util::{mpsc, Forever}; +use embassy_nrf::gpio::{Level, Output, OutputDrive}; +use embassy_nrf::Peripherals; +use embedded_hal::digital::v2::OutputPin; +use mpsc::{Channel, Sender, WithThreadModeOnly}; + +enum LedState { + On, + Off, +} + +static CHANNEL: Forever> = Forever::new(); + +#[embassy::task(pool_size = 1)] +async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { + loop { + let _ = sender.send(LedState::On).await; + Timer::after(Duration::from_secs(1)).await; + let _ = sender.send(LedState::Off).await; + Timer::after(Duration::from_secs(1)).await; + } +} + +#[embassy::main] +async fn main(spawner: Spawner, p: Peripherals) { + let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); + + let channel = CHANNEL.put(Channel::with_thread_mode_only()); + let (sender, mut receiver) = mpsc::split(channel); + + spawner.spawn(my_task(sender)).unwrap(); + + // We could just loop on `receiver.recv()` for simplicity. The code below + // is optimized to drain the queue as fast as possible in the spirit of + // handling events as fast as possible. This optimization is benign when in + // thread mode, but can be useful when interrupts are sending messages + // with the channel having been created via with_critical_sections. + loop { + let maybe_message = match receiver.try_recv() { + m @ Ok(..) => m.ok(), + Err(TryRecvError::Empty) => receiver.recv().await, + Err(TryRecvError::Closed) => break, + }; + match maybe_message { + Some(LedState::On) => led.set_high().unwrap(), + Some(LedState::Off) => led.set_low().unwrap(), + _ => (), + } + } +} -- cgit From 816b78c0d9733362d8653eb2032f126e6a710030 Mon Sep 17 00:00:00 2001 From: huntc Date: Tue, 6 Jul 2021 23:20:47 +1000 Subject: Reduces the types on sender and receiver In exchange for an UnsafeCell being passed into split --- examples/nrf/src/bin/mpsc.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'examples') diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 6a0f8f471..d692abee2 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -8,6 +8,8 @@ #[path = "../example_common.rs"] mod example_common; +use core::cell::UnsafeCell; + use defmt::panic; use embassy::executor::Spawner; use embassy::time::{Duration, Timer}; @@ -23,10 +25,10 @@ enum LedState { Off, } -static CHANNEL: Forever> = Forever::new(); +static CHANNEL: Forever>> = Forever::new(); #[embassy::task(pool_size = 1)] -async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { +async fn my_task(sender: Sender<'static, LedState>) { loop { let _ = sender.send(LedState::On).await; Timer::after(Duration::from_secs(1)).await; @@ -39,7 +41,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { async fn main(spawner: Spawner, p: Peripherals) { let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(Channel::with_thread_mode_only()); + let channel = CHANNEL.put(UnsafeCell::new(Channel::with_thread_mode_only())); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap(); -- cgit From ae62948d6c21bc1ac4af50c3e39888c52d696b24 Mon Sep 17 00:00:00 2001 From: huntc Date: Wed, 7 Jul 2021 18:08:36 +1000 Subject: Replace UnsafeCell Using a new ChannelCell so that there's no leaking of the abstraction --- examples/nrf/src/bin/mpsc.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'examples') diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index d692abee2..eafa29e60 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -8,12 +8,10 @@ #[path = "../example_common.rs"] mod example_common; -use core::cell::UnsafeCell; - use defmt::panic; use embassy::executor::Spawner; use embassy::time::{Duration, Timer}; -use embassy::util::mpsc::TryRecvError; +use embassy::util::mpsc::{ChannelCell, TryRecvError}; use embassy::util::{mpsc, Forever}; use embassy_nrf::gpio::{Level, Output, OutputDrive}; use embassy_nrf::Peripherals; @@ -25,7 +23,7 @@ enum LedState { Off, } -static CHANNEL: Forever>> = Forever::new(); +static CHANNEL: Forever>> = Forever::new(); #[embassy::task(pool_size = 1)] async fn my_task(sender: Sender<'static, LedState>) { @@ -41,7 +39,7 @@ async fn my_task(sender: Sender<'static, LedState>) { async fn main(spawner: Spawner, p: Peripherals) { let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(UnsafeCell::new(Channel::with_thread_mode_only())); + let channel = CHANNEL.put(ChannelCell::new(Channel::with_thread_mode_only())); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap(); -- cgit From 5f87c7808c9d896a2a2d5e064a58ed2ac23a4348 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 9 Jul 2021 12:04:22 +1000 Subject: Remove the cell and trait At the expense of exposing the channel types again. We do this as we want to avoid using dyn traits given their overhead for embedded environments. --- examples/nrf/src/bin/mpsc.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'examples') diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index eafa29e60..6a0f8f471 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -11,7 +11,7 @@ mod example_common; use defmt::panic; use embassy::executor::Spawner; use embassy::time::{Duration, Timer}; -use embassy::util::mpsc::{ChannelCell, TryRecvError}; +use embassy::util::mpsc::TryRecvError; use embassy::util::{mpsc, Forever}; use embassy_nrf::gpio::{Level, Output, OutputDrive}; use embassy_nrf::Peripherals; @@ -23,10 +23,10 @@ enum LedState { Off, } -static CHANNEL: Forever>> = Forever::new(); +static CHANNEL: Forever> = Forever::new(); #[embassy::task(pool_size = 1)] -async fn my_task(sender: Sender<'static, LedState>) { +async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { loop { let _ = sender.send(LedState::On).await; Timer::after(Duration::from_secs(1)).await; @@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, LedState>) { async fn main(spawner: Spawner, p: Peripherals) { let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(ChannelCell::new(Channel::with_thread_mode_only())); + let channel = CHANNEL.put(Channel::with_thread_mode_only()); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap(); -- cgit From f159beec1cbd1406f63ca7c3e84a1d598bbadaa1 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 9 Jul 2021 12:13:07 +1000 Subject: Use of a NoopMutex --- examples/nrf/src/bin/mpsc.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'examples') diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 6a0f8f471..c2cb107e1 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -16,17 +16,17 @@ use embassy::util::{mpsc, Forever}; use embassy_nrf::gpio::{Level, Output, OutputDrive}; use embassy_nrf::Peripherals; use embedded_hal::digital::v2::OutputPin; -use mpsc::{Channel, Sender, WithThreadModeOnly}; +use mpsc::{Channel, Sender, WithNoThreads}; enum LedState { On, Off, } -static CHANNEL: Forever> = Forever::new(); +static CHANNEL: Forever> = Forever::new(); #[embassy::task(pool_size = 1)] -async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { +async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { loop { let _ = sender.send(LedState::On).await; Timer::after(Duration::from_secs(1)).await; @@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { async fn main(spawner: Spawner, p: Peripherals) { let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(Channel::with_thread_mode_only()); + let channel = CHANNEL.put(Channel::with_no_threads()); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap(); -- cgit From 3778f55d80f70b336f6ca846f365cf619032a685 Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 15 Jul 2021 12:08:35 +1000 Subject: Provides a cleaner construction of the channel with the common "new" naming --- examples/nrf/src/bin/mpsc.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'examples') diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index c2cb107e1..443955239 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -37,9 +37,10 @@ async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { #[embassy::main] async fn main(spawner: Spawner, p: Peripherals) { + let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(Channel::with_no_threads()); + let channel = CHANNEL.put(Channel::new()); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap(); -- cgit