From ead987245d083b7e6be7158ea3fb63c8a47bf304 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sat, 11 Sep 2021 01:53:53 +0200 Subject: embassy: Refactor module structure to remove kitchen-sink `util`. --- .vscode/settings.json | 3 +- embassy-hal-common/src/drop.rs | 51 ++ embassy-hal-common/src/lib.rs | 1 + embassy-hal-common/src/usb/usb_serial.rs | 2 +- embassy-net/src/stack.rs | 4 +- embassy-nrf/src/buffered_uarte.rs | 3 +- embassy-nrf/src/gpiote.rs | 2 +- embassy-nrf/src/qspi.rs | 5 +- embassy-nrf/src/rng.rs | 4 +- embassy-nrf/src/saadc.rs | 3 +- embassy-nrf/src/spim.rs | 4 +- embassy-nrf/src/time_driver.rs | 2 +- embassy-nrf/src/timer.rs | 8 +- embassy-nrf/src/twim.rs | 3 +- embassy-nrf/src/uarte.rs | 5 +- embassy-rp/src/timer.rs | 2 +- embassy-stm32/src/dma/bdma.rs | 3 +- embassy-stm32/src/dma/dma.rs | 3 +- embassy-stm32/src/eth/v2/mod.rs | 3 +- embassy-stm32/src/exti.rs | 3 +- embassy-stm32/src/i2c/v2.rs | 4 +- embassy-stm32/src/rng.rs | 3 +- embassy-stm32/src/sdmmc/v2.rs | 8 +- embassy-stm32/src/usart/v2.rs | 3 +- embassy/src/blocking_mutex/mod.rs | 161 ++++++ embassy/src/channel/mod.rs | 4 + embassy/src/channel/mpsc.rs | 894 ++++++++++++++++++++++++++++++ embassy/src/channel/signal.rs | 73 +++ embassy/src/lib.rs | 4 + embassy/src/util/drop_bomb.rs | 28 - embassy/src/util/mod.rs | 16 +- embassy/src/util/mpsc.rs | 897 ------------------------------- embassy/src/util/mutex.rs | 159 ------ embassy/src/util/on_drop.rs | 24 - embassy/src/util/signal.rs | 171 ------ embassy/src/util/waker.rs | 81 --- embassy/src/util/waker_agnostic.rs | 84 --- embassy/src/waitqueue/mod.rs | 5 + embassy/src/waitqueue/waker.rs | 80 +++ embassy/src/waitqueue/waker_agnostic.rs | 84 +++ examples/nrf/src/bin/mpsc.rs | 5 +- examples/stm32wl55/src/bin/subghz.rs | 32 +- 42 files changed, 1431 insertions(+), 1503 deletions(-) create mode 100644 embassy-hal-common/src/drop.rs create mode 100644 embassy/src/blocking_mutex/mod.rs create mode 100644 embassy/src/channel/mod.rs create mode 100644 embassy/src/channel/mpsc.rs create mode 100644 embassy/src/channel/signal.rs delete mode 100644 embassy/src/util/drop_bomb.rs delete mode 100644 embassy/src/util/mpsc.rs delete mode 100644 embassy/src/util/mutex.rs delete mode 100644 embassy/src/util/on_drop.rs delete mode 100644 embassy/src/util/signal.rs delete mode 100644 embassy/src/util/waker.rs delete mode 100644 embassy/src/util/waker_agnostic.rs create mode 100644 embassy/src/waitqueue/mod.rs create mode 100644 embassy/src/waitqueue/waker.rs create mode 100644 embassy/src/waitqueue/waker_agnostic.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index ed01f7557..a5a656637 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,11 +6,12 @@ "rust-analyzer.checkOnSave.allTargets": false, "rust-analyzer.cargo.noDefaultFeatures": true, "rust-analyzer.checkOnSave.noDefaultFeatures": true, - //"rust-analyzer.cargo.target": "thumbv7em-none-eabi", + "rust-analyzer.cargo.target": "thumbv7em-none-eabi", "rust-analyzer.cargo.features": [ // These are needed to prevent embassy-net from failing to build "embassy-net/medium-ethernet", "embassy-net/tcp", + "embassy-net/pool-16", ], "rust-analyzer.procMacro.enable": true, "rust-analyzer.cargo.runBuildScripts": true, diff --git a/embassy-hal-common/src/drop.rs b/embassy-hal-common/src/drop.rs new file mode 100644 index 000000000..74a484de7 --- /dev/null +++ b/embassy-hal-common/src/drop.rs @@ -0,0 +1,51 @@ +use core::mem; +use core::mem::MaybeUninit; + +pub struct OnDrop { + f: MaybeUninit, +} + +impl OnDrop { + pub fn new(f: F) -> Self { + Self { + f: MaybeUninit::new(f), + } + } + + pub fn defuse(self) { + mem::forget(self) + } +} + +impl Drop for OnDrop { + fn drop(&mut self) { + unsafe { self.f.as_ptr().read()() } + } +} + +/// An explosive ordinance that panics if it is improperly disposed of. +/// +/// This is to forbid dropping futures, when there is absolutely no other choice. +/// +/// To correctly dispose of this device, call the [defuse](struct.DropBomb.html#method.defuse) +/// method before this object is dropped. +pub struct DropBomb { + _private: (), +} + +impl DropBomb { + pub fn new() -> Self { + Self { _private: () } + } + + /// Diffuses the bomb, rendering it safe to drop. + pub fn defuse(self) { + mem::forget(self) + } +} + +impl Drop for DropBomb { + fn drop(&mut self) { + panic!("boom") + } +} diff --git a/embassy-hal-common/src/lib.rs b/embassy-hal-common/src/lib.rs index ea20747eb..01e2d3aee 100644 --- a/embassy-hal-common/src/lib.rs +++ b/embassy-hal-common/src/lib.rs @@ -3,6 +3,7 @@ // This mod MUST go first, so that the others see its macros. pub(crate) mod fmt; +pub mod drop; pub mod interrupt; mod macros; pub mod peripheral; diff --git a/embassy-hal-common/src/usb/usb_serial.rs b/embassy-hal-common/src/usb/usb_serial.rs index 8b27152b5..ca43a4d73 100644 --- a/embassy-hal-common/src/usb/usb_serial.rs +++ b/embassy-hal-common/src/usb/usb_serial.rs @@ -4,7 +4,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use embassy::io::{self, AsyncBufRead, AsyncWrite}; -use embassy::util::WakerRegistration; +use embassy::waitqueue::WakerRegistration; use usb_device::bus::UsbBus; use usb_device::class_prelude::*; use usb_device::UsbError; diff --git a/embassy-net/src/stack.rs b/embassy-net/src/stack.rs index 5f871bd1f..f26808cd0 100644 --- a/embassy-net/src/stack.rs +++ b/embassy-net/src/stack.rs @@ -2,9 +2,9 @@ use core::cell::RefCell; use core::future::Future; use core::task::Context; use core::task::Poll; +use embassy::blocking_mutex::ThreadModeMutex; use embassy::time::{Instant, Timer}; -use embassy::util::ThreadModeMutex; -use embassy::util::WakerRegistration; +use embassy::waitqueue::WakerRegistration; use futures::pin_mut; use smoltcp::iface::InterfaceBuilder; #[cfg(feature = "medium-ethernet")] diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 90ce49582..cd08875cd 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -6,7 +6,8 @@ use core::sync::atomic::{compiler_fence, Ordering}; use core::task::{Context, Poll}; use embassy::interrupt::InterruptExt; use embassy::io::{AsyncBufRead, AsyncWrite, Result}; -use embassy::util::{Unborrow, WakerRegistration}; +use embassy::util::Unborrow; +use embassy::waitqueue::WakerRegistration; use embassy_hal_common::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::ring_buffer::RingBuffer; use embassy_hal_common::{low_power_wait_until, unborrow}; diff --git a/embassy-nrf/src/gpiote.rs b/embassy-nrf/src/gpiote.rs index 847b2fbf3..001ee7fb8 100644 --- a/embassy-nrf/src/gpiote.rs +++ b/embassy-nrf/src/gpiote.rs @@ -4,7 +4,7 @@ use core::marker::PhantomData; use core::task::{Context, Poll}; use embassy::interrupt::{Interrupt, InterruptExt}; use embassy::traits::gpio::{WaitForAnyEdge, WaitForHigh, WaitForLow}; -use embassy::util::AtomicWaker; +use embassy::waitqueue::AtomicWaker; use embassy_hal_common::unsafe_impl_unborrow; use embedded_hal::digital::v2::{InputPin, StatefulOutputPin}; use futures::future::poll_fn; diff --git a/embassy-nrf/src/qspi.rs b/embassy-nrf/src/qspi.rs index 28becfd56..e87094250 100644 --- a/embassy-nrf/src/qspi.rs +++ b/embassy-nrf/src/qspi.rs @@ -6,7 +6,8 @@ use core::ptr; use core::task::Poll; use embassy::interrupt::{Interrupt, InterruptExt}; use embassy::traits::flash::{Error, Flash}; -use embassy::util::{AtomicWaker, DropBomb, Unborrow}; +use embassy::util::Unborrow; +use embassy_hal_common::drop::DropBomb; use embassy_hal_common::unborrow; use futures::future::poll_fn; @@ -397,6 +398,8 @@ impl<'d, T: Instance> Flash for Qspi<'d, T> { } pub(crate) mod sealed { + use embassy::waitqueue::AtomicWaker; + use super::*; pub struct State { diff --git a/embassy-nrf/src/rng.rs b/embassy-nrf/src/rng.rs index 6cdcccf3b..20d033a12 100644 --- a/embassy-nrf/src/rng.rs +++ b/embassy-nrf/src/rng.rs @@ -8,9 +8,9 @@ use core::task::Poll; use embassy::interrupt::InterruptExt; use embassy::traits; -use embassy::util::AtomicWaker; -use embassy::util::OnDrop; use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; +use embassy_hal_common::drop::OnDrop; use embassy_hal_common::unborrow; use futures::future::poll_fn; use rand_core::RngCore; diff --git a/embassy-nrf/src/saadc.rs b/embassy-nrf/src/saadc.rs index bc7f34716..e909e7d5a 100644 --- a/embassy-nrf/src/saadc.rs +++ b/embassy-nrf/src/saadc.rs @@ -3,7 +3,8 @@ use core::marker::PhantomData; use core::sync::atomic::{compiler_fence, Ordering}; use core::task::Poll; use embassy::interrupt::InterruptExt; -use embassy::util::{AtomicWaker, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; use embassy_hal_common::unborrow; use futures::future::poll_fn; diff --git a/embassy-nrf/src/spim.rs b/embassy-nrf/src/spim.rs index 9a7fb4f67..e88fb460c 100644 --- a/embassy-nrf/src/spim.rs +++ b/embassy-nrf/src/spim.rs @@ -6,7 +6,7 @@ use core::sync::atomic::{compiler_fence, Ordering}; use core::task::Poll; use embassy::interrupt::InterruptExt; use embassy::traits; -use embassy::util::{AtomicWaker, Unborrow}; +use embassy::util::Unborrow; use embassy_hal_common::unborrow; use futures::future::poll_fn; use traits::spi::{FullDuplex, Read, Spi, Write}; @@ -359,6 +359,8 @@ impl<'d, T: Instance> embedded_hal::blocking::spi::Write for Spim<'d, T> { } pub(crate) mod sealed { + use embassy::waitqueue::AtomicWaker; + use super::*; pub struct State { diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs index f93ebb54a..19356c2d2 100644 --- a/embassy-nrf/src/time_driver.rs +++ b/embassy-nrf/src/time_driver.rs @@ -2,9 +2,9 @@ use core::cell::Cell; use core::sync::atomic::{compiler_fence, AtomicU32, AtomicU8, Ordering}; use core::{mem, ptr}; use critical_section::CriticalSection; +use embassy::blocking_mutex::CriticalSectionMutex as Mutex; use embassy::interrupt::{Interrupt, InterruptExt}; use embassy::time::driver::{AlarmHandle, Driver}; -use embassy::util::CriticalSectionMutex as Mutex; use crate::interrupt; use crate::pac; diff --git a/embassy-nrf/src/timer.rs b/embassy-nrf/src/timer.rs index 638fd8229..5690ff0d8 100644 --- a/embassy-nrf/src/timer.rs +++ b/embassy-nrf/src/timer.rs @@ -5,8 +5,9 @@ use core::task::Poll; use embassy::interrupt::Interrupt; use embassy::interrupt::InterruptExt; -use embassy::util::OnDrop; use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; +use embassy_hal_common::drop::OnDrop; use embassy_hal_common::unborrow; use futures::future::poll_fn; @@ -15,7 +16,6 @@ use crate::ppi::Event; use crate::ppi::Task; pub(crate) mod sealed { - use embassy::util::AtomicWaker; use super::*; @@ -43,8 +43,8 @@ macro_rules! impl_timer { fn regs() -> &'static pac::timer0::RegisterBlock { unsafe { &*(pac::$pac_type::ptr() as *const pac::timer0::RegisterBlock) } } - fn waker(n: usize) -> &'static ::embassy::util::AtomicWaker { - use ::embassy::util::AtomicWaker; + fn waker(n: usize) -> &'static ::embassy::waitqueue::AtomicWaker { + use ::embassy::waitqueue::AtomicWaker; const NEW_AW: AtomicWaker = AtomicWaker::new(); static WAKERS: [AtomicWaker; $ccs] = [NEW_AW; $ccs]; &WAKERS[n] diff --git a/embassy-nrf/src/twim.rs b/embassy-nrf/src/twim.rs index ac263bad7..8173f66b0 100644 --- a/embassy-nrf/src/twim.rs +++ b/embassy-nrf/src/twim.rs @@ -12,7 +12,8 @@ use core::sync::atomic::{compiler_fence, Ordering::SeqCst}; use core::task::Poll; use embassy::interrupt::{Interrupt, InterruptExt}; use embassy::traits; -use embassy::util::{AtomicWaker, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; use embassy_hal_common::unborrow; use futures::future::poll_fn; use traits::i2c::I2c; diff --git a/embassy-nrf/src/uarte.rs b/embassy-nrf/src/uarte.rs index a6909be68..320426060 100644 --- a/embassy-nrf/src/uarte.rs +++ b/embassy-nrf/src/uarte.rs @@ -8,7 +8,8 @@ use core::sync::atomic::{compiler_fence, Ordering}; use core::task::Poll; use embassy::interrupt::InterruptExt; use embassy::traits::uart::{Error, Read, ReadUntilIdle, Write}; -use embassy::util::{AtomicWaker, OnDrop, Unborrow}; +use embassy::util::Unborrow; +use embassy_hal_common::drop::OnDrop; use embassy_hal_common::unborrow; use futures::future::poll_fn; @@ -439,6 +440,8 @@ impl<'d, U: Instance, T: TimerInstance> Write for UarteWithIdle<'d, U, T> { } pub(crate) mod sealed { + use embassy::waitqueue::AtomicWaker; + use super::*; pub struct State { diff --git a/embassy-rp/src/timer.rs b/embassy-rp/src/timer.rs index ed265c47f..b3c047ca4 100644 --- a/embassy-rp/src/timer.rs +++ b/embassy-rp/src/timer.rs @@ -1,9 +1,9 @@ use atomic_polyfill::{AtomicU8, Ordering}; use core::cell::Cell; use critical_section::CriticalSection; +use embassy::blocking_mutex::CriticalSectionMutex as Mutex; use embassy::interrupt::{Interrupt, InterruptExt}; use embassy::time::driver::{AlarmHandle, Driver}; -use embassy::util::CriticalSectionMutex as Mutex; use crate::{interrupt, pac}; diff --git a/embassy-stm32/src/dma/bdma.rs b/embassy-stm32/src/dma/bdma.rs index fbd753a71..35c0b3ee7 100644 --- a/embassy-stm32/src/dma/bdma.rs +++ b/embassy-stm32/src/dma/bdma.rs @@ -5,7 +5,8 @@ use core::sync::atomic::{fence, Ordering}; use core::task::Poll; use embassy::interrupt::{Interrupt, InterruptExt}; -use embassy::util::{AtomicWaker, OnDrop}; +use embassy::waitqueue::AtomicWaker; +use embassy_hal_common::drop::OnDrop; use futures::future::poll_fn; use crate::dma::{Channel, Request}; diff --git a/embassy-stm32/src/dma/dma.rs b/embassy-stm32/src/dma/dma.rs index bce9656d1..ec5ac98a0 100644 --- a/embassy-stm32/src/dma/dma.rs +++ b/embassy-stm32/src/dma/dma.rs @@ -3,7 +3,8 @@ use core::sync::atomic::{fence, Ordering}; use core::task::Poll; use embassy::interrupt::{Interrupt, InterruptExt}; -use embassy::util::{AtomicWaker, OnDrop}; +use embassy::waitqueue::AtomicWaker; +use embassy_hal_common::drop::OnDrop; use futures::future::poll_fn; use crate::interrupt; diff --git a/embassy-stm32/src/eth/v2/mod.rs b/embassy-stm32/src/eth/v2/mod.rs index 42eb0680c..ff734f78c 100644 --- a/embassy-stm32/src/eth/v2/mod.rs +++ b/embassy-stm32/src/eth/v2/mod.rs @@ -2,7 +2,8 @@ use core::marker::PhantomData; use core::sync::atomic::{fence, Ordering}; use core::task::Waker; -use embassy::util::{AtomicWaker, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; use embassy_hal_common::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::unborrow; use embassy_net::{Device, DeviceCapabilities, LinkState, PacketBuf, MTU}; diff --git a/embassy-stm32/src/exti.rs b/embassy-stm32/src/exti.rs index 8e4989a3e..6d3de3a15 100644 --- a/embassy-stm32/src/exti.rs +++ b/embassy-stm32/src/exti.rs @@ -4,7 +4,8 @@ use core::marker::PhantomData; use core::pin::Pin; use core::task::{Context, Poll}; use embassy::traits::gpio::{WaitForAnyEdge, WaitForFallingEdge, WaitForRisingEdge}; -use embassy::util::{AtomicWaker, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; use embassy_hal_common::unsafe_impl_unborrow; use embedded_hal::digital::v2::InputPin; diff --git a/embassy-stm32/src/i2c/v2.rs b/embassy-stm32/src/i2c/v2.rs index fc4f52cf3..5e9e24392 100644 --- a/embassy-stm32/src/i2c/v2.rs +++ b/embassy-stm32/src/i2c/v2.rs @@ -4,7 +4,9 @@ use core::task::Poll; use atomic_polyfill::{AtomicUsize, Ordering}; use embassy::interrupt::InterruptExt; -use embassy::util::{AtomicWaker, OnDrop, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; +use embassy_hal_common::drop::OnDrop; use embassy_hal_common::unborrow; use embedded_hal::blocking::i2c::Read; use embedded_hal::blocking::i2c::Write; diff --git a/embassy-stm32/src/rng.rs b/embassy-stm32/src/rng.rs index 0afba3ba7..5655ed967 100644 --- a/embassy-stm32/src/rng.rs +++ b/embassy-stm32/src/rng.rs @@ -3,7 +3,8 @@ use core::future::Future; use core::task::Poll; use embassy::traits; -use embassy::util::{AtomicWaker, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; use embassy_hal_common::unborrow; use futures::future::poll_fn; use rand_core::{CryptoRng, RngCore}; diff --git a/embassy-stm32/src/sdmmc/v2.rs b/embassy-stm32/src/sdmmc/v2.rs index aa1d68ae7..6032c2bb1 100644 --- a/embassy-stm32/src/sdmmc/v2.rs +++ b/embassy-stm32/src/sdmmc/v2.rs @@ -5,7 +5,9 @@ use core::marker::PhantomData; use core::task::Poll; use embassy::interrupt::InterruptExt; -use embassy::util::{AtomicWaker, OnDrop, Unborrow}; +use embassy::util::Unborrow; +use embassy::waitqueue::AtomicWaker; +use embassy_hal_common::drop::OnDrop; use embassy_hal_common::unborrow; use futures::future::poll_fn; use sdio_host::{BusWidth, CardCapacity, CardStatus, CurrentState, SDStatus, CID, CSD, OCR, SCR}; @@ -1479,8 +1481,8 @@ crate::pac::peripherals!( INNER } - fn state() -> &'static ::embassy::util::AtomicWaker { - static WAKER: ::embassy::util::AtomicWaker = ::embassy::util::AtomicWaker::new(); + fn state() -> &'static ::embassy::waitqueue::AtomicWaker { + static WAKER: ::embassy::waitqueue::AtomicWaker = ::embassy::waitqueue::AtomicWaker::new(); &WAKER } } diff --git a/embassy-stm32/src/usart/v2.rs b/embassy-stm32/src/usart/v2.rs index 92c0cbc2e..fc3036404 100644 --- a/embassy-stm32/src/usart/v2.rs +++ b/embassy-stm32/src/usart/v2.rs @@ -4,7 +4,8 @@ use core::marker::PhantomData; use core::pin::Pin; use core::task::Context; use core::task::Poll; -use embassy::util::{Unborrow, WakerRegistration}; +use embassy::util::Unborrow; +use embassy::waitqueue::WakerRegistration; use embassy_hal_common::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; use embassy_hal_common::ring_buffer::RingBuffer; use embassy_hal_common::unborrow; diff --git a/embassy/src/blocking_mutex/mod.rs b/embassy/src/blocking_mutex/mod.rs new file mode 100644 index 000000000..d112d2ede --- /dev/null +++ b/embassy/src/blocking_mutex/mod.rs @@ -0,0 +1,161 @@ +//! Blocking mutex (not async) + +use core::cell::UnsafeCell; +use critical_section::CriticalSection; + +/// Any object implementing this trait guarantees exclusive access to the data contained +/// within the mutex for the duration of the lock. +/// Adapted from https://github.com/rust-embedded/mutex-trait. +pub trait Mutex { + /// Data protected by the mutex. + type Data; + + fn new(data: Self::Data) -> Self; + + /// Creates a critical section and grants temporary access to the protected data. + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; +} + +/// A "mutex" based on critical sections +/// +/// # Safety +/// +/// **This Mutex is only safe on single-core systems.** +/// +/// On multi-core systems, a `CriticalSection` **is not sufficient** to ensure exclusive access. +pub struct CriticalSectionMutex { + inner: UnsafeCell, +} + +// NOTE: A `CriticalSectionMutex` can be used as a channel so the protected data must be `Send` +// to prevent sending non-Sendable stuff (e.g. access tokens) across different +// execution contexts (e.g. interrupts) +unsafe impl Sync for CriticalSectionMutex where T: Send {} + +impl CriticalSectionMutex { + /// Creates a new mutex + pub const fn new(value: T) -> Self { + CriticalSectionMutex { + inner: UnsafeCell::new(value), + } + } +} + +impl CriticalSectionMutex { + /// Borrows the data for the duration of the critical section + pub fn borrow<'cs>(&'cs self, _cs: CriticalSection<'cs>) -> &'cs T { + unsafe { &*self.inner.get() } + } +} + +impl Mutex for CriticalSectionMutex { + type Data = T; + + fn new(data: T) -> Self { + Self::new(data) + } + + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { + critical_section::with(|cs| f(self.borrow(cs))) + } +} + +/// A "mutex" that only allows borrowing from thread mode. +/// +/// # Safety +/// +/// **This Mutex is only safe on single-core systems.** +/// +/// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access. +pub struct ThreadModeMutex { + inner: UnsafeCell, +} + +// NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode. +// Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can +// be Send+Sync even if T is not Send (unlike CriticalSectionMutex) +unsafe impl Sync for ThreadModeMutex {} +unsafe impl Send for ThreadModeMutex {} + +impl ThreadModeMutex { + /// Creates a new mutex + pub const fn new(value: T) -> Self { + ThreadModeMutex { + inner: UnsafeCell::new(value), + } + } + + /// Borrows the data + pub fn borrow(&self) -> &T { + assert!( + in_thread_mode(), + "ThreadModeMutex can only be borrowed from thread mode." + ); + unsafe { &*self.inner.get() } + } +} + +impl Mutex for ThreadModeMutex { + type Data = T; + + fn new(data: T) -> Self { + Self::new(data) + } + + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { + f(self.borrow()) + } +} + +impl Drop for ThreadModeMutex { + fn drop(&mut self) { + // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so + // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if + // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, + // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. + assert!( + in_thread_mode(), + "ThreadModeMutex can only be dropped from thread mode." + ); + + // Drop of the inner `T` happens after this. + } +} + +pub fn in_thread_mode() -> bool { + #[cfg(feature = "std")] + return Some("main") == std::thread::current().name(); + + #[cfg(not(feature = "std"))] + return cortex_m::peripheral::SCB::vect_active() + == cortex_m::peripheral::scb::VectActive::ThreadMode; +} + +/// A "mutex" that does nothing and cannot be shared between threads. +pub struct NoopMutex { + inner: T, +} + +impl NoopMutex { + pub const fn new(value: T) -> Self { + NoopMutex { inner: value } + } +} + +impl NoopMutex { + pub fn borrow(&self) -> &T { + &self.inner + } +} + +impl Mutex for NoopMutex { + type Data = T; + + fn new(data: T) -> Self { + Self::new(data) + } + + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { + f(self.borrow()) + } +} diff --git a/embassy/src/channel/mod.rs b/embassy/src/channel/mod.rs new file mode 100644 index 000000000..9e8c67ee9 --- /dev/null +++ b/embassy/src/channel/mod.rs @@ -0,0 +1,4 @@ +//! Async channels + +pub mod mpsc; +pub mod signal; diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs new file mode 100644 index 000000000..b20d48a95 --- /dev/null +++ b/embassy/src/channel/mpsc.rs @@ -0,0 +1,894 @@ +//! A multi-producer, single-consumer queue for sending values between +//! asynchronous tasks. This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. +//! +//! Similar to the `mpsc` channels provided by `std`, the channel constructor +//! functions provide separate send and receive handles, [`Sender`] and +//! [`Receiver`]. If there is no message to read, the current task will be +//! notified when a new value is sent. [`Sender`] allows sending values into +//! the channel. If the bounded channel is at capacity, the send is rejected. +//! +//! # Disconnection +//! +//! When all [`Sender`] handles have been dropped, it is no longer +//! possible to send values into the channel. This is considered the termination +//! event of the stream. +//! +//! If the [`Receiver`] handle is dropped, then messages can no longer +//! be read out of the channel. In this case, all further attempts to send will +//! result in an error. +//! +//! # Clean Shutdown +//! +//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to +//! remain in the channel. Instead, it is usually desirable to perform a "clean" +//! shutdown. To do this, the receiver first calls `close`, which will prevent +//! any further messages to be sent into the channel. Then, the receiver +//! consumes the channel to completion, at which point the receiver can be +//! dropped. +//! +//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html + +use core::cell::UnsafeCell; +use core::fmt; +use core::marker::PhantomData; +use core::mem::MaybeUninit; +use core::pin::Pin; +use core::ptr; +use core::task::Context; +use core::task::Poll; +use core::task::Waker; + +use futures::Future; + +use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; +use crate::waitqueue::WakerRegistration; + +/// Send values to the associated `Receiver`. +/// +/// Instances are created by the [`split`](split) function. +pub struct Sender<'ch, M, T, const N: usize> +where + M: Mutex, +{ + channel_cell: &'ch UnsafeCell>, +} + +// Safe to pass the sender around +unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex + Sync +{} +unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex + Sync +{} + +/// Receive values from the associated `Sender`. +/// +/// Instances are created by the [`split`](split) function. +pub struct Receiver<'ch, M, T, const N: usize> +where + M: Mutex, +{ + channel_cell: &'ch UnsafeCell>, + _receiver_consumed: &'ch mut PhantomData<()>, +} + +// Safe to pass the receiver around +unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where + M: Mutex + Sync +{ +} +unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where + M: Mutex + Sync +{ +} + +/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. +/// +/// All data sent on `Sender` will become available on `Receiver` in the same +/// order as it was sent. +/// +/// The `Sender` can be cloned to `send` to the same channel from multiple code +/// locations. Only one `Receiver` is valid. +/// +/// If the `Receiver` is disconnected while trying to `send`, the `send` method +/// will return a `SendError`. Similarly, if `Sender` is disconnected while +/// trying to `recv`, the `recv` method will return a `RecvError`. +/// +/// Note that when splitting the channel, the sender and receiver cannot outlive +/// their channel. The following will therefore fail compilation: +//// +/// ```compile_fail +/// use embassy::channel::mpsc; +/// use embassy::channel::mpsc::{Channel, WithThreadModeOnly}; +/// +/// let (sender, receiver) = { +/// let mut channel = Channel::::with_thread_mode_only(); +/// mpsc::split(&mut channel) +/// }; +/// ``` +pub fn split( + channel: &mut Channel, +) -> (Sender, Receiver) +where + M: Mutex, +{ + let sender = Sender { + channel_cell: &channel.channel_cell, + }; + let receiver = Receiver { + channel_cell: &channel.channel_cell, + _receiver_consumed: &mut channel.receiver_consumed, + }; + Channel::lock(&channel.channel_cell, |c| { + c.register_receiver(); + c.register_sender(); + }); + (sender, receiver) +} + +impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> +where + M: Mutex, +{ + /// Receives the next value for this receiver. + /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. This indicates that no + /// further values can ever be received from this `Receiver`. The channel is + /// closed when all senders have been dropped, or when [`close`] is called. + /// + /// If there are no messages in the channel's buffer, but the channel has + /// not yet been closed, this method will sleep until a message is sent or + /// the channel is closed. + /// + /// Note that if [`close`] is called, but there are still outstanding + /// messages from before it was closed, the channel is not considered + /// closed by `recv` until they are all consumed. + /// + /// [`close`]: Self::close + pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { + RecvFuture { + channel_cell: self.channel_cell, + } + } + + /// Attempts to immediately receive a message on this `Receiver` + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + pub fn try_recv(&self) -> Result { + Channel::lock(self.channel_cell, |c| c.try_recv()) + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + /// + /// To guarantee that no messages are dropped, after calling `close()`, + /// `recv()` must be called until `None` is returned. If there are + /// outstanding messages, the `recv` method will not return `None` + /// until those are released. + /// + pub fn close(&mut self) { + Channel::lock(self.channel_cell, |c| c.close()) + } +} + +impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> +where + M: Mutex, +{ + fn drop(&mut self) { + Channel::lock(self.channel_cell, |c| c.deregister_receiver()) + } +} + +pub struct RecvFuture<'ch, M, T, const N: usize> +where + M: Mutex, +{ + channel_cell: &'ch UnsafeCell>, +} + +impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +where + M: Mutex, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Channel::lock(self.channel_cell, |c| { + match c.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(Some(v)), + Err(TryRecvError::Closed) => Poll::Ready(None), + Err(TryRecvError::Empty) => Poll::Pending, + } + }) + } +} + +// Safe to pass the receive future around since it locks channel whenever polled +unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where + M: Mutex + Sync +{ +} +unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where + M: Mutex + Sync +{ +} + +impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> +where + M: Mutex, +{ + /// Sends a value, waiting until there is capacity. + /// + /// A successful send occurs when it is determined that the other end of the + /// channel has not hung up already. An unsuccessful send would be one where + /// the corresponding receiver has already been closed. Note that a return + /// value of `Err` means that the data will never be received, but a return + /// value of `Ok` does not mean that the data will be received. It is + /// possible for the corresponding receiver to hang up immediately after + /// this function returns `Ok`. + /// + /// # Errors + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`Receiver`] handle dropping, the function returns + /// an error. The error includes the value passed to `send`. + /// + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { + SendFuture { + sender: self.clone(), + message: Some(message), + } + } + + /// Attempts to immediately send a message on this `Sender` + /// + /// This method differs from [`send`] by returning immediately if the channel's + /// buffer is full or no receiver is waiting to acquire some data. Compared + /// with [`send`], this function has two failure cases instead of one (one for + /// disconnection, one for a full buffer). + /// + /// # Errors + /// + /// If the channel capacity has been reached, i.e., the channel has `n` + /// buffered values where `n` is the argument passed to [`channel`], then an + /// error is returned. + /// + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`Receiver`] handle dropping, the function returns + /// an error. The error includes the value passed to `send`. + /// + /// [`send`]: Sender::send + /// [`channel`]: channel + /// [`close`]: Receiver::close + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + Channel::lock(self.channel_cell, |c| c.try_send(message)) + } + + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + pub async fn closed(&self) { + CloseFuture { + sender: self.clone(), + } + .await + } + + /// Checks if the channel has been closed. This happens when the + /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is + /// called. + /// + /// [`Receiver`]: crate::sync::mpsc::Receiver + /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close + pub fn is_closed(&self) -> bool { + Channel::lock(self.channel_cell, |c| c.is_closed()) + } +} + +pub struct SendFuture<'ch, M, T, const N: usize> +where + M: Mutex, +{ + sender: Sender<'ch, M, T, N>, + message: Option, +} + +impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> +where + M: Mutex, +{ + type Output = Result<(), SendError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match Channel::lock(self.sender.channel_cell, |c| { + c.try_send_with_context(m, Some(cx)) + }) { + Ok(..) => Poll::Ready(Ok(())), + Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex {} + +struct CloseFuture<'ch, M, T, const N: usize> +where + M: Mutex, +{ + sender: Sender<'ch, M, T, N>, +} + +impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> +where + M: Mutex, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if Channel::lock(self.sender.channel_cell, |c| { + c.is_closed_with_context(Some(cx)) + }) { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + +impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> +where + M: Mutex, +{ + fn drop(&mut self) { + Channel::lock(self.channel_cell, |c| c.deregister_sender()) + } +} + +impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> +where + M: Mutex, +{ + #[allow(clippy::clone_double_ref)] + fn clone(&self) -> Self { + Channel::lock(self.channel_cell, |c| c.register_sender()); + Sender { + channel_cell: self.channel_cell.clone(), + } + } +} + +/// An error returned from the [`try_recv`] method. +/// +/// [`try_recv`]: super::Receiver::try_recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryRecvError { + /// A message could not be received because the channel is empty. + Empty, + + /// The message could not be received because the channel is empty and closed. + Closed, +} + +/// Error returned by the `Sender`. +#[derive(Debug)] +pub struct SendError(pub T); + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +#[cfg(feature = "defmt")] +impl defmt::Format for SendError { + fn format(&self, fmt: defmt::Formatter<'_>) { + defmt::write!(fmt, "channel closed") + } +} + +/// This enumeration is the list of the possible error outcomes for the +/// [try_send](super::Sender::try_send) method. +#[derive(Debug)] +pub enum TrySendError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), + + /// The receive half of the channel was explicitly closed or has been + /// dropped. + Closed(T), +} + +impl fmt::Display for TrySendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TrySendError::Full(..) => "no available capacity", + TrySendError::Closed(..) => "channel closed", + } + ) + } +} + +#[cfg(feature = "defmt")] +impl defmt::Format for TrySendError { + fn format(&self, fmt: defmt::Formatter<'_>) { + match self { + TrySendError::Full(..) => defmt::write!(fmt, "no available capacity"), + TrySendError::Closed(..) => defmt::write!(fmt, "channel closed"), + } + } +} + +struct ChannelState { + buf: [MaybeUninit>; N], + read_pos: usize, + write_pos: usize, + full: bool, + closed: bool, + receiver_registered: bool, + senders_registered: u32, + receiver_waker: WakerRegistration, + senders_waker: WakerRegistration, +} + +impl ChannelState { + const INIT: MaybeUninit> = MaybeUninit::uninit(); + + const fn new() -> Self { + ChannelState { + buf: [Self::INIT; N], + read_pos: 0, + write_pos: 0, + full: false, + closed: false, + receiver_registered: false, + senders_registered: 0, + receiver_waker: WakerRegistration::new(), + senders_waker: WakerRegistration::new(), + } + } + + fn try_recv(&mut self) -> Result { + self.try_recv_with_context(None) + } + + fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { + if self.read_pos != self.write_pos || self.full { + if self.full { + self.full = false; + self.senders_waker.wake(); + } + let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() }; + self.read_pos = (self.read_pos + 1) % self.buf.len(); + Ok(message) + } else if !self.closed { + cx.into_iter() + .for_each(|cx| self.set_receiver_waker(&cx.waker())); + Err(TryRecvError::Empty) + } else { + Err(TryRecvError::Closed) + } + } + + fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.try_send_with_context(message, None) + } + + fn try_send_with_context( + &mut self, + message: T, + cx: Option<&mut Context<'_>>, + ) -> Result<(), TrySendError> { + if !self.closed { + if !self.full { + self.buf[self.write_pos] = MaybeUninit::new(message.into()); + self.write_pos = (self.write_pos + 1) % self.buf.len(); + if self.write_pos == self.read_pos { + self.full = true; + } + self.receiver_waker.wake(); + Ok(()) + } else { + cx.into_iter() + .for_each(|cx| self.set_senders_waker(&cx.waker())); + Err(TrySendError::Full(message)) + } + } else { + Err(TrySendError::Closed(message)) + } + } + + fn close(&mut self) { + self.receiver_waker.wake(); + self.closed = true; + } + + fn is_closed(&mut self) -> bool { + self.is_closed_with_context(None) + } + + fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool { + if self.closed { + cx.into_iter() + .for_each(|cx| self.set_senders_waker(&cx.waker())); + true + } else { + false + } + } + + fn register_receiver(&mut self) { + assert!(!self.receiver_registered); + self.receiver_registered = true; + } + + fn deregister_receiver(&mut self) { + if self.receiver_registered { + self.closed = true; + self.senders_waker.wake(); + } + self.receiver_registered = false; + } + + fn register_sender(&mut self) { + self.senders_registered += 1; + } + + fn deregister_sender(&mut self) { + assert!(self.senders_registered > 0); + self.senders_registered -= 1; + if self.senders_registered == 0 { + self.receiver_waker.wake(); + self.closed = true; + } + } + + fn set_receiver_waker(&mut self, receiver_waker: &Waker) { + self.receiver_waker.register(receiver_waker); + } + + fn set_senders_waker(&mut self, senders_waker: &Waker) { + // Dispose of any existing sender causing them to be polled again. + // This could cause a spin given multiple concurrent senders, however given that + // most sends only block waiting for the receiver to become active, this should + // be a short-lived activity. The upside is a greatly simplified implementation + // that avoids the need for intrusive linked-lists and unsafe operations on pinned + // pointers. + self.senders_waker.wake(); + self.senders_waker.register(senders_waker); + } +} + +impl Drop for ChannelState { + fn drop(&mut self) { + while self.read_pos != self.write_pos || self.full { + self.full = false; + unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) }; + self.read_pos = (self.read_pos + 1) % N; + } + } +} + +/// A a bounded mpsc channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +pub struct Channel +where + M: Mutex, +{ + channel_cell: UnsafeCell>, + receiver_consumed: PhantomData<()>, +} + +struct ChannelCell +where + M: Mutex, +{ + mutex: M, + state: ChannelState, +} + +pub type WithCriticalSections = CriticalSectionMutex<()>; + +pub type WithThreadModeOnly = ThreadModeMutex<()>; + +pub type WithNoThreads = NoopMutex<()>; + +impl Channel +where + M: Mutex, +{ + /// Establish a new bounded channel. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy::channel::mpsc; + /// use embassy::channel::mpsc::{Channel, WithNoThreads}; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = Channel::::new(); + /// // once we have a channel, obtain its sender and receiver + /// let (sender, receiver) = mpsc::split(&mut channel); + /// ``` + pub fn new() -> Self { + let mutex = M::new(()); + let state = ChannelState::new(); + let channel_cell = ChannelCell { mutex, state }; + Channel { + channel_cell: UnsafeCell::new(channel_cell), + receiver_consumed: PhantomData, + } + } + + fn lock( + channel_cell: &UnsafeCell>, + f: impl FnOnce(&mut ChannelState) -> R, + ) -> R { + unsafe { + let channel_cell = &mut *(channel_cell.get()); + let mutex = &mut channel_cell.mutex; + let mut state = &mut channel_cell.state; + mutex.lock(|_| f(&mut state)) + } + } +} + +#[cfg(test)] +mod tests { + use core::time::Duration; + + use futures::task::SpawnExt; + use futures_executor::ThreadPool; + use futures_timer::Delay; + + use crate::util::Forever; + + use super::*; + + fn capacity(c: &ChannelState) -> usize { + if !c.full { + if c.write_pos > c.read_pos { + (c.buf.len() - c.write_pos) + c.read_pos + } else { + (c.buf.len() - c.read_pos) + c.write_pos + } + } else { + 0 + } + } + + #[test] + fn sending_once() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn sending_when_full() { + let mut c = ChannelState::::new(); + let _ = c.try_send(1); + let _ = c.try_send(1); + let _ = c.try_send(1); + match c.try_send(2) { + Err(TrySendError::Full(2)) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 0); + } + + #[test] + fn sending_when_closed() { + let mut c = ChannelState::::new(); + c.closed = true; + match c.try_send(2) { + Err(TrySendError::Closed(2)) => assert!(true), + _ => assert!(false), + } + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = ChannelState::::new(); + match c.try_recv() { + Err(TryRecvError::Empty) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_closed() { + let mut c = ChannelState::::new(); + c.closed = true; + match c.try_recv() { + Err(TryRecvError::Closed) => assert!(true), + _ => assert!(false), + } + } + + #[test] + fn simple_send_and_receive() { + let mut c = Channel::::new(); + let (s, r) = split(&mut c); + assert!(s.clone().try_send(1).is_ok()); + assert_eq!(r.try_recv().unwrap(), 1); + } + + #[test] + fn should_close_without_sender() { + let mut c = Channel::::new(); + let (s, r) = split(&mut c); + drop(s); + match r.try_recv() { + Err(TryRecvError::Closed) => assert!(true), + _ => assert!(false), + } + } + + #[test] + fn should_close_once_drained() { + let mut c = Channel::::new(); + let (s, r) = split(&mut c); + assert!(s.try_send(1).is_ok()); + drop(s); + assert_eq!(r.try_recv().unwrap(), 1); + match r.try_recv() { + Err(TryRecvError::Closed) => assert!(true), + _ => assert!(false), + } + } + + #[test] + fn should_reject_send_when_receiver_dropped() { + let mut c = Channel::::new(); + let (s, r) = split(&mut c); + drop(r); + match s.try_send(1) { + Err(TrySendError::Closed(1)) => assert!(true), + _ => assert!(false), + } + } + + #[test] + fn should_reject_send_when_channel_closed() { + let mut c = Channel::::new(); + let (s, mut r) = split(&mut c); + assert!(s.try_send(1).is_ok()); + r.close(); + assert_eq!(r.try_recv().unwrap(), 1); + match r.try_recv() { + Err(TryRecvError::Closed) => assert!(true), + _ => assert!(false), + } + assert!(s.is_closed()); + } + + #[futures_test::test] + async fn receiver_closes_when_sender_dropped_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, mut r) = split(c); + assert!(executor + .spawn(async move { + drop(s); + }) + .is_ok()); + assert_eq!(r.recv().await, None); + } + + #[futures_test::test] + async fn receiver_receives_given_try_send_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, mut r) = split(c); + assert!(executor + .spawn(async move { + assert!(s.try_send(1).is_ok()); + }) + .is_ok()); + assert_eq!(r.recv().await, Some(1)); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let mut c = Channel::::new(); + let (s, mut r) = split(&mut c); + assert!(s.send(1).await.is_ok()); + assert_eq!(r.recv().await, Some(1)); + } + + #[futures_test::test] + async fn sender_send_completes_if_closed() { + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, r) = split(c); + drop(r); + match s.send(1).await { + Err(SendError(1)) => assert!(true), + _ => assert!(false), + } + } + + #[futures_test::test] + async fn senders_sends_wait_until_capacity() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s0, mut r) = split(c); + assert!(s0.try_send(1).is_ok()); + let s1 = s0.clone(); + let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); + let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await }); + // Wish I could think of a means of determining that the async send is waiting instead. + // However, I've used the debugger to observe that the send does indeed wait. + Delay::new(Duration::from_millis(500)).await; + assert_eq!(r.recv().await, Some(1)); + assert!(executor + .spawn(async move { while let Some(_) = r.recv().await {} }) + .is_ok()); + assert!(send_task_1.unwrap().await.is_ok()); + assert!(send_task_2.unwrap().await.is_ok()); + } + + #[futures_test::test] + async fn sender_close_completes_if_closing() { + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, mut r) = split(c); + r.close(); + s.closed().await; + } + + #[futures_test::test] + async fn sender_close_completes_if_closed() { + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, r) = split(c); + drop(r); + s.closed().await; + } +} diff --git a/embassy/src/channel/signal.rs b/embassy/src/channel/signal.rs new file mode 100644 index 000000000..d5698732c --- /dev/null +++ b/embassy/src/channel/signal.rs @@ -0,0 +1,73 @@ +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem; +use core::task::{Context, Poll, Waker}; + +/// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks. +/// +/// For more advanced use cases, please consider [futures-intrusive](https://crates.io/crates/futures-intrusive) channels or mutexes. +pub struct Signal { + state: UnsafeCell>, +} + +enum State { + None, + Waiting(Waker), + Signaled(T), +} + +unsafe impl Send for Signal {} +unsafe impl Sync for Signal {} + +impl Signal { + pub const fn new() -> Self { + Self { + state: UnsafeCell::new(State::None), + } + } + + /// Mark this Signal as completed. + pub fn signal(&self, val: T) { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { + waker.wake(); + } + }) + } + + pub fn reset(&self) { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + *state = State::None + }) + } + + pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + match state { + State::None => { + *state = State::Waiting(cx.waker().clone()); + Poll::Pending + } + State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, + State::Waiting(_) => panic!("waker overflow"), + State::Signaled(_) => match mem::replace(state, State::None) { + State::Signaled(res) => Poll::Ready(res), + _ => unreachable!(), + }, + } + }) + } + + /// Future that completes when this Signal has been signaled. + pub fn wait(&self) -> impl Future + '_ { + futures::future::poll_fn(move |cx| self.poll_wait(cx)) + } + + /// non-blocking method to check whether this signal has been signaled. + pub fn signaled(&self) -> bool { + critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) + } +} diff --git a/embassy/src/lib.rs b/embassy/src/lib.rs index 9e050a57a..847142285 100644 --- a/embassy/src/lib.rs +++ b/embassy/src/lib.rs @@ -7,6 +7,10 @@ // This mod MUST go first, so that the others see its macros. pub(crate) mod fmt; +pub mod blocking_mutex; +pub mod channel; +pub mod waitqueue; + pub mod executor; pub mod interrupt; pub mod io; diff --git a/embassy/src/util/drop_bomb.rs b/embassy/src/util/drop_bomb.rs deleted file mode 100644 index efb36a97d..000000000 --- a/embassy/src/util/drop_bomb.rs +++ /dev/null @@ -1,28 +0,0 @@ -use core::mem; - -/// An explosive ordinance that panics if it is improperly disposed of. -/// -/// This is to forbid dropping futures, when there is absolutely no other choice. -/// -/// To correctly dispose of this device, call the [defuse](struct.DropBomb.html#method.defuse) -/// method before this object is dropped. -pub struct DropBomb { - _private: (), -} - -impl DropBomb { - pub fn new() -> Self { - Self { _private: () } - } - - /// Diffuses the bomb, rendering it safe to drop. - pub fn defuse(self) { - mem::forget(self) - } -} - -impl Drop for DropBomb { - fn drop(&mut self) { - panic!("boom") - } -} diff --git a/embassy/src/util/mod.rs b/embassy/src/util/mod.rs index e66576b33..f832fa2f6 100644 --- a/embassy/src/util/mod.rs +++ b/embassy/src/util/mod.rs @@ -1,21 +1,7 @@ -//! Async utilities -mod drop_bomb; +//! Misc utilities mod forever; -mod mutex; -mod on_drop; -mod signal; -#[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] -mod waker; - -pub use drop_bomb::*; pub use forever::*; -pub mod mpsc; -pub use mutex::*; -pub use on_drop::*; -pub use signal::*; -pub use waker::*; - /// Unsafely unborrow an owned singleton out of a `&mut`. /// /// It is intended to be implemented for owned peripheral singletons, such as `USART3` or `AnyPin`. diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs deleted file mode 100644 index a0f884ec6..000000000 --- a/embassy/src/util/mpsc.rs +++ /dev/null @@ -1,897 +0,0 @@ -//! A multi-producer, single-consumer queue for sending values between -//! asynchronous tasks. This queue takes a Mutex type so that various -//! targets can be attained. For example, a ThreadModeMutex can be used -//! for single-core Cortex-M targets where messages are only passed -//! between tasks running in thread mode. Similarly, a CriticalSectionMutex -//! can also be used for single-core targets where messages are to be -//! passed from exception mode e.g. out of an interrupt handler. -//! -//! This module provides a bounded channel that has a limit on the number of -//! messages that it can store, and if this limit is reached, trying to send -//! another message will result in an error being returned. -//! -//! Similar to the `mpsc` channels provided by `std`, the channel constructor -//! functions provide separate send and receive handles, [`Sender`] and -//! [`Receiver`]. If there is no message to read, the current task will be -//! notified when a new value is sent. [`Sender`] allows sending values into -//! the channel. If the bounded channel is at capacity, the send is rejected. -//! -//! # Disconnection -//! -//! When all [`Sender`] handles have been dropped, it is no longer -//! possible to send values into the channel. This is considered the termination -//! event of the stream. -//! -//! If the [`Receiver`] handle is dropped, then messages can no longer -//! be read out of the channel. In this case, all further attempts to send will -//! result in an error. -//! -//! # Clean Shutdown -//! -//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to -//! remain in the channel. Instead, it is usually desirable to perform a "clean" -//! shutdown. To do this, the receiver first calls `close`, which will prevent -//! any further messages to be sent into the channel. Then, the receiver -//! consumes the channel to completion, at which point the receiver can be -//! dropped. -//! -//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html - -use core::cell::UnsafeCell; -use core::fmt; -use core::marker::PhantomData; -use core::mem::MaybeUninit; -use core::pin::Pin; -use core::ptr; -use core::task::Context; -use core::task::Poll; -use core::task::Waker; - -use futures::Future; - -use super::CriticalSectionMutex; -use super::Mutex; -use super::NoopMutex; -use super::ThreadModeMutex; -use super::WakerRegistration; - -/// Send values to the associated `Receiver`. -/// -/// Instances are created by the [`split`](split) function. -pub struct Sender<'ch, M, T, const N: usize> -where - M: Mutex, -{ - channel_cell: &'ch UnsafeCell>, -} - -// Safe to pass the sender around -unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex + Sync -{} -unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex + Sync -{} - -/// Receive values from the associated `Sender`. -/// -/// Instances are created by the [`split`](split) function. -pub struct Receiver<'ch, M, T, const N: usize> -where - M: Mutex, -{ - channel_cell: &'ch UnsafeCell>, - _receiver_consumed: &'ch mut PhantomData<()>, -} - -// Safe to pass the receiver around -unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where - M: Mutex + Sync -{ -} -unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where - M: Mutex + Sync -{ -} - -/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. -/// -/// All data sent on `Sender` will become available on `Receiver` in the same -/// order as it was sent. -/// -/// The `Sender` can be cloned to `send` to the same channel from multiple code -/// locations. Only one `Receiver` is valid. -/// -/// If the `Receiver` is disconnected while trying to `send`, the `send` method -/// will return a `SendError`. Similarly, if `Sender` is disconnected while -/// trying to `recv`, the `recv` method will return a `RecvError`. -/// -/// Note that when splitting the channel, the sender and receiver cannot outlive -/// their channel. The following will therefore fail compilation: -//// -/// ```compile_fail -/// use embassy::util::mpsc; -/// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; -/// -/// let (sender, receiver) = { -/// let mut channel = Channel::::with_thread_mode_only(); -/// mpsc::split(&mut channel) -/// }; -/// ``` -pub fn split( - channel: &mut Channel, -) -> (Sender, Receiver) -where - M: Mutex, -{ - let sender = Sender { - channel_cell: &channel.channel_cell, - }; - let receiver = Receiver { - channel_cell: &channel.channel_cell, - _receiver_consumed: &mut channel.receiver_consumed, - }; - Channel::lock(&channel.channel_cell, |c| { - c.register_receiver(); - c.register_sender(); - }); - (sender, receiver) -} - -impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> -where - M: Mutex, -{ - /// Receives the next value for this receiver. - /// - /// This method returns `None` if the channel has been closed and there are - /// no remaining messages in the channel's buffer. This indicates that no - /// further values can ever be received from this `Receiver`. The channel is - /// closed when all senders have been dropped, or when [`close`] is called. - /// - /// If there are no messages in the channel's buffer, but the channel has - /// not yet been closed, this method will sleep until a message is sent or - /// the channel is closed. - /// - /// Note that if [`close`] is called, but there are still outstanding - /// messages from before it was closed, the channel is not considered - /// closed by `recv` until they are all consumed. - /// - /// [`close`]: Self::close - pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { - RecvFuture { - channel_cell: self.channel_cell, - } - } - - /// Attempts to immediately receive a message on this `Receiver` - /// - /// This method will either receive a message from the channel immediately or return an error - /// if the channel is empty. - pub fn try_recv(&self) -> Result { - Channel::lock(self.channel_cell, |c| c.try_recv()) - } - - /// Closes the receiving half of a channel without dropping it. - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - /// - /// To guarantee that no messages are dropped, after calling `close()`, - /// `recv()` must be called until `None` is returned. If there are - /// outstanding messages, the `recv` method will not return `None` - /// until those are released. - /// - pub fn close(&mut self) { - Channel::lock(self.channel_cell, |c| c.close()) - } -} - -impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> -where - M: Mutex, -{ - fn drop(&mut self) { - Channel::lock(self.channel_cell, |c| c.deregister_receiver()) - } -} - -pub struct RecvFuture<'ch, M, T, const N: usize> -where - M: Mutex, -{ - channel_cell: &'ch UnsafeCell>, -} - -impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> -where - M: Mutex, -{ - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Channel::lock(self.channel_cell, |c| { - match c.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(Some(v)), - Err(TryRecvError::Closed) => Poll::Ready(None), - Err(TryRecvError::Empty) => Poll::Pending, - } - }) - } -} - -// Safe to pass the receive future around since it locks channel whenever polled -unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where - M: Mutex + Sync -{ -} -unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where - M: Mutex + Sync -{ -} - -impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> -where - M: Mutex, -{ - /// Sends a value, waiting until there is capacity. - /// - /// A successful send occurs when it is determined that the other end of the - /// channel has not hung up already. An unsuccessful send would be one where - /// the corresponding receiver has already been closed. Note that a return - /// value of `Err` means that the data will never be received, but a return - /// value of `Ok` does not mean that the data will be received. It is - /// possible for the corresponding receiver to hang up immediately after - /// this function returns `Ok`. - /// - /// # Errors - /// - /// If the receive half of the channel is closed, either due to [`close`] - /// being called or the [`Receiver`] handle dropping, the function returns - /// an error. The error includes the value passed to `send`. - /// - /// [`close`]: Receiver::close - /// [`Receiver`]: Receiver - pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { - SendFuture { - sender: self.clone(), - message: Some(message), - } - } - - /// Attempts to immediately send a message on this `Sender` - /// - /// This method differs from [`send`] by returning immediately if the channel's - /// buffer is full or no receiver is waiting to acquire some data. Compared - /// with [`send`], this function has two failure cases instead of one (one for - /// disconnection, one for a full buffer). - /// - /// # Errors - /// - /// If the channel capacity has been reached, i.e., the channel has `n` - /// buffered values where `n` is the argument passed to [`channel`], then an - /// error is returned. - /// - /// If the receive half of the channel is closed, either due to [`close`] - /// being called or the [`Receiver`] handle dropping, the function returns - /// an error. The error includes the value passed to `send`. - /// - /// [`send`]: Sender::send - /// [`channel`]: channel - /// [`close`]: Receiver::close - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - Channel::lock(self.channel_cell, |c| c.try_send(message)) - } - - /// Completes when the receiver has dropped. - /// - /// This allows the producers to get notified when interest in the produced - /// values is canceled and immediately stop doing work. - pub async fn closed(&self) { - CloseFuture { - sender: self.clone(), - } - .await - } - - /// Checks if the channel has been closed. This happens when the - /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is - /// called. - /// - /// [`Receiver`]: crate::sync::mpsc::Receiver - /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close - pub fn is_closed(&self) -> bool { - Channel::lock(self.channel_cell, |c| c.is_closed()) - } -} - -pub struct SendFuture<'ch, M, T, const N: usize> -where - M: Mutex, -{ - sender: Sender<'ch, M, T, N>, - message: Option, -} - -impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> -where - M: Mutex, -{ - type Output = Result<(), SendError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match Channel::lock(self.sender.channel_cell, |c| { - c.try_send_with_context(m, Some(cx)) - }) { - Ok(..) => Poll::Ready(Ok(())), - Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex {} - -struct CloseFuture<'ch, M, T, const N: usize> -where - M: Mutex, -{ - sender: Sender<'ch, M, T, N>, -} - -impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> -where - M: Mutex, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if Channel::lock(self.sender.channel_cell, |c| { - c.is_closed_with_context(Some(cx)) - }) { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - -impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> -where - M: Mutex, -{ - fn drop(&mut self) { - Channel::lock(self.channel_cell, |c| c.deregister_sender()) - } -} - -impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> -where - M: Mutex, -{ - #[allow(clippy::clone_double_ref)] - fn clone(&self) -> Self { - Channel::lock(self.channel_cell, |c| c.register_sender()); - Sender { - channel_cell: self.channel_cell.clone(), - } - } -} - -/// An error returned from the [`try_recv`] method. -/// -/// [`try_recv`]: super::Receiver::try_recv -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryRecvError { - /// A message could not be received because the channel is empty. - Empty, - - /// The message could not be received because the channel is empty and closed. - Closed, -} - -/// Error returned by the `Sender`. -#[derive(Debug)] -pub struct SendError(pub T); - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -#[cfg(feature = "defmt")] -impl defmt::Format for SendError { - fn format(&self, fmt: defmt::Formatter<'_>) { - defmt::write!(fmt, "channel closed") - } -} - -/// This enumeration is the list of the possible error outcomes for the -/// [try_send](super::Sender::try_send) method. -#[derive(Debug)] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), - - /// The receive half of the channel was explicitly closed or has been - /// dropped. - Closed(T), -} - -impl fmt::Display for TrySendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "{}", - match self { - TrySendError::Full(..) => "no available capacity", - TrySendError::Closed(..) => "channel closed", - } - ) - } -} - -#[cfg(feature = "defmt")] -impl defmt::Format for TrySendError { - fn format(&self, fmt: defmt::Formatter<'_>) { - match self { - TrySendError::Full(..) => defmt::write!(fmt, "no available capacity"), - TrySendError::Closed(..) => defmt::write!(fmt, "channel closed"), - } - } -} - -struct ChannelState { - buf: [MaybeUninit>; N], - read_pos: usize, - write_pos: usize, - full: bool, - closed: bool, - receiver_registered: bool, - senders_registered: u32, - receiver_waker: WakerRegistration, - senders_waker: WakerRegistration, -} - -impl ChannelState { - const INIT: MaybeUninit> = MaybeUninit::uninit(); - - const fn new() -> Self { - ChannelState { - buf: [Self::INIT; N], - read_pos: 0, - write_pos: 0, - full: false, - closed: false, - receiver_registered: false, - senders_registered: 0, - receiver_waker: WakerRegistration::new(), - senders_waker: WakerRegistration::new(), - } - } - - fn try_recv(&mut self) -> Result { - self.try_recv_with_context(None) - } - - fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { - if self.read_pos != self.write_pos || self.full { - if self.full { - self.full = false; - self.senders_waker.wake(); - } - let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() }; - self.read_pos = (self.read_pos + 1) % self.buf.len(); - Ok(message) - } else if !self.closed { - cx.into_iter() - .for_each(|cx| self.set_receiver_waker(&cx.waker())); - Err(TryRecvError::Empty) - } else { - Err(TryRecvError::Closed) - } - } - - fn try_send(&mut self, message: T) -> Result<(), TrySendError> { - self.try_send_with_context(message, None) - } - - fn try_send_with_context( - &mut self, - message: T, - cx: Option<&mut Context<'_>>, - ) -> Result<(), TrySendError> { - if !self.closed { - if !self.full { - self.buf[self.write_pos] = MaybeUninit::new(message.into()); - self.write_pos = (self.write_pos + 1) % self.buf.len(); - if self.write_pos == self.read_pos { - self.full = true; - } - self.receiver_waker.wake(); - Ok(()) - } else { - cx.into_iter() - .for_each(|cx| self.set_senders_waker(&cx.waker())); - Err(TrySendError::Full(message)) - } - } else { - Err(TrySendError::Closed(message)) - } - } - - fn close(&mut self) { - self.receiver_waker.wake(); - self.closed = true; - } - - fn is_closed(&mut self) -> bool { - self.is_closed_with_context(None) - } - - fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool { - if self.closed { - cx.into_iter() - .for_each(|cx| self.set_senders_waker(&cx.waker())); - true - } else { - false - } - } - - fn register_receiver(&mut self) { - assert!(!self.receiver_registered); - self.receiver_registered = true; - } - - fn deregister_receiver(&mut self) { - if self.receiver_registered { - self.closed = true; - self.senders_waker.wake(); - } - self.receiver_registered = false; - } - - fn register_sender(&mut self) { - self.senders_registered += 1; - } - - fn deregister_sender(&mut self) { - assert!(self.senders_registered > 0); - self.senders_registered -= 1; - if self.senders_registered == 0 { - self.receiver_waker.wake(); - self.closed = true; - } - } - - fn set_receiver_waker(&mut self, receiver_waker: &Waker) { - self.receiver_waker.register(receiver_waker); - } - - fn set_senders_waker(&mut self, senders_waker: &Waker) { - // Dispose of any existing sender causing them to be polled again. - // This could cause a spin given multiple concurrent senders, however given that - // most sends only block waiting for the receiver to become active, this should - // be a short-lived activity. The upside is a greatly simplified implementation - // that avoids the need for intrusive linked-lists and unsafe operations on pinned - // pointers. - self.senders_waker.wake(); - self.senders_waker.register(senders_waker); - } -} - -impl Drop for ChannelState { - fn drop(&mut self) { - while self.read_pos != self.write_pos || self.full { - self.full = false; - unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) }; - self.read_pos = (self.read_pos + 1) % N; - } - } -} - -/// A a bounded mpsc channel for communicating between asynchronous tasks -/// with backpressure. -/// -/// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is -/// received from the channel. -/// -/// All data sent will become available in the same order as it was sent. -pub struct Channel -where - M: Mutex, -{ - channel_cell: UnsafeCell>, - receiver_consumed: PhantomData<()>, -} - -struct ChannelCell -where - M: Mutex, -{ - mutex: M, - state: ChannelState, -} - -pub type WithCriticalSections = CriticalSectionMutex<()>; - -pub type WithThreadModeOnly = ThreadModeMutex<()>; - -pub type WithNoThreads = NoopMutex<()>; - -impl Channel -where - M: Mutex, -{ - /// Establish a new bounded channel. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy::util::mpsc; - /// use embassy::util::mpsc::{Channel, WithNoThreads}; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::new(); - /// // once we have a channel, obtain its sender and receiver - /// let (sender, receiver) = mpsc::split(&mut channel); - /// ``` - pub fn new() -> Self { - let mutex = M::new(()); - let state = ChannelState::new(); - let channel_cell = ChannelCell { mutex, state }; - Channel { - channel_cell: UnsafeCell::new(channel_cell), - receiver_consumed: PhantomData, - } - } - - fn lock( - channel_cell: &UnsafeCell>, - f: impl FnOnce(&mut ChannelState) -> R, - ) -> R { - unsafe { - let channel_cell = &mut *(channel_cell.get()); - let mutex = &mut channel_cell.mutex; - let mut state = &mut channel_cell.state; - mutex.lock(|_| f(&mut state)) - } - } -} - -#[cfg(test)] -mod tests { - use core::time::Duration; - - use futures::task::SpawnExt; - use futures_executor::ThreadPool; - use futures_timer::Delay; - - use crate::util::Forever; - - use super::*; - - fn capacity(c: &ChannelState) -> usize { - if !c.full { - if c.write_pos > c.read_pos { - (c.buf.len() - c.write_pos) + c.read_pos - } else { - (c.buf.len() - c.read_pos) + c.write_pos - } - } else { - 0 - } - } - - #[test] - fn sending_once() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn sending_when_full() { - let mut c = ChannelState::::new(); - let _ = c.try_send(1); - let _ = c.try_send(1); - let _ = c.try_send(1); - match c.try_send(2) { - Err(TrySendError::Full(2)) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 0); - } - - #[test] - fn sending_when_closed() { - let mut c = ChannelState::::new(); - c.closed = true; - match c.try_send(2) { - Err(TrySendError::Closed(2)) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = ChannelState::::new(); - match c.try_recv() { - Err(TryRecvError::Empty) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_closed() { - let mut c = ChannelState::::new(); - c.closed = true; - match c.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn simple_send_and_receive() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - assert!(s.clone().try_send(1).is_ok()); - assert_eq!(r.try_recv().unwrap(), 1); - } - - #[test] - fn should_close_without_sender() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - drop(s); - match r.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn should_close_once_drained() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - assert!(s.try_send(1).is_ok()); - drop(s); - assert_eq!(r.try_recv().unwrap(), 1); - match r.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn should_reject_send_when_receiver_dropped() { - let mut c = Channel::::new(); - let (s, r) = split(&mut c); - drop(r); - match s.try_send(1) { - Err(TrySendError::Closed(1)) => assert!(true), - _ => assert!(false), - } - } - - #[test] - fn should_reject_send_when_channel_closed() { - let mut c = Channel::::new(); - let (s, mut r) = split(&mut c); - assert!(s.try_send(1).is_ok()); - r.close(); - assert_eq!(r.try_recv().unwrap(), 1); - match r.try_recv() { - Err(TryRecvError::Closed) => assert!(true), - _ => assert!(false), - } - assert!(s.is_closed()); - } - - #[futures_test::test] - async fn receiver_closes_when_sender_dropped_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = split(c); - assert!(executor - .spawn(async move { - drop(s); - }) - .is_ok()); - assert_eq!(r.recv().await, None); - } - - #[futures_test::test] - async fn receiver_receives_given_try_send_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = split(c); - assert!(executor - .spawn(async move { - assert!(s.try_send(1).is_ok()); - }) - .is_ok()); - assert_eq!(r.recv().await, Some(1)); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let mut c = Channel::::new(); - let (s, mut r) = split(&mut c); - assert!(s.send(1).await.is_ok()); - assert_eq!(r.recv().await, Some(1)); - } - - #[futures_test::test] - async fn sender_send_completes_if_closed() { - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, r) = split(c); - drop(r); - match s.send(1).await { - Err(SendError(1)) => assert!(true), - _ => assert!(false), - } - } - - #[futures_test::test] - async fn senders_sends_wait_until_capacity() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s0, mut r) = split(c); - assert!(s0.try_send(1).is_ok()); - let s1 = s0.clone(); - let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); - let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await }); - // Wish I could think of a means of determining that the async send is waiting instead. - // However, I've used the debugger to observe that the send does indeed wait. - Delay::new(Duration::from_millis(500)).await; - assert_eq!(r.recv().await, Some(1)); - assert!(executor - .spawn(async move { while let Some(_) = r.recv().await {} }) - .is_ok()); - assert!(send_task_1.unwrap().await.is_ok()); - assert!(send_task_2.unwrap().await.is_ok()); - } - - #[futures_test::test] - async fn sender_close_completes_if_closing() { - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, mut r) = split(c); - r.close(); - s.closed().await; - } - - #[futures_test::test] - async fn sender_close_completes_if_closed() { - static CHANNEL: Forever> = Forever::new(); - let c = CHANNEL.put(Channel::new()); - let (s, r) = split(c); - drop(r); - s.closed().await; - } -} diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs deleted file mode 100644 index 9a00a409e..000000000 --- a/embassy/src/util/mutex.rs +++ /dev/null @@ -1,159 +0,0 @@ -use core::cell::UnsafeCell; -use critical_section::CriticalSection; - -/// Any object implementing this trait guarantees exclusive access to the data contained -/// within the mutex for the duration of the lock. -/// Adapted from https://github.com/rust-embedded/mutex-trait. -pub trait Mutex { - /// Data protected by the mutex. - type Data; - - fn new(data: Self::Data) -> Self; - - /// Creates a critical section and grants temporary access to the protected data. - fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; -} - -/// A "mutex" based on critical sections -/// -/// # Safety -/// -/// **This Mutex is only safe on single-core systems.** -/// -/// On multi-core systems, a `CriticalSection` **is not sufficient** to ensure exclusive access. -pub struct CriticalSectionMutex { - inner: UnsafeCell, -} - -// NOTE: A `CriticalSectionMutex` can be used as a channel so the protected data must be `Send` -// to prevent sending non-Sendable stuff (e.g. access tokens) across different -// execution contexts (e.g. interrupts) -unsafe impl Sync for CriticalSectionMutex where T: Send {} - -impl CriticalSectionMutex { - /// Creates a new mutex - pub const fn new(value: T) -> Self { - CriticalSectionMutex { - inner: UnsafeCell::new(value), - } - } -} - -impl CriticalSectionMutex { - /// Borrows the data for the duration of the critical section - pub fn borrow<'cs>(&'cs self, _cs: CriticalSection<'cs>) -> &'cs T { - unsafe { &*self.inner.get() } - } -} - -impl Mutex for CriticalSectionMutex { - type Data = T; - - fn new(data: T) -> Self { - Self::new(data) - } - - fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { - critical_section::with(|cs| f(self.borrow(cs))) - } -} - -/// A "mutex" that only allows borrowing from thread mode. -/// -/// # Safety -/// -/// **This Mutex is only safe on single-core systems.** -/// -/// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access. -pub struct ThreadModeMutex { - inner: UnsafeCell, -} - -// NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode. -// Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can -// be Send+Sync even if T is not Send (unlike CriticalSectionMutex) -unsafe impl Sync for ThreadModeMutex {} -unsafe impl Send for ThreadModeMutex {} - -impl ThreadModeMutex { - /// Creates a new mutex - pub const fn new(value: T) -> Self { - ThreadModeMutex { - inner: UnsafeCell::new(value), - } - } - - /// Borrows the data - pub fn borrow(&self) -> &T { - assert!( - in_thread_mode(), - "ThreadModeMutex can only be borrowed from thread mode." - ); - unsafe { &*self.inner.get() } - } -} - -impl Mutex for ThreadModeMutex { - type Data = T; - - fn new(data: T) -> Self { - Self::new(data) - } - - fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { - f(self.borrow()) - } -} - -impl Drop for ThreadModeMutex { - fn drop(&mut self) { - // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so - // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if - // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, - // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. - assert!( - in_thread_mode(), - "ThreadModeMutex can only be dropped from thread mode." - ); - - // Drop of the inner `T` happens after this. - } -} - -pub fn in_thread_mode() -> bool { - #[cfg(feature = "std")] - return Some("main") == std::thread::current().name(); - - #[cfg(not(feature = "std"))] - return cortex_m::peripheral::SCB::vect_active() - == cortex_m::peripheral::scb::VectActive::ThreadMode; -} - -/// A "mutex" that does nothing and cannot be shared between threads. -pub struct NoopMutex { - inner: T, -} - -impl NoopMutex { - pub const fn new(value: T) -> Self { - NoopMutex { inner: value } - } -} - -impl NoopMutex { - pub fn borrow(&self) -> &T { - &self.inner - } -} - -impl Mutex for NoopMutex { - type Data = T; - - fn new(data: T) -> Self { - Self::new(data) - } - - fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { - f(self.borrow()) - } -} diff --git a/embassy/src/util/on_drop.rs b/embassy/src/util/on_drop.rs deleted file mode 100644 index 10f3407f4..000000000 --- a/embassy/src/util/on_drop.rs +++ /dev/null @@ -1,24 +0,0 @@ -use core::mem; -use core::mem::MaybeUninit; - -pub struct OnDrop { - f: MaybeUninit, -} - -impl OnDrop { - pub fn new(f: F) -> Self { - Self { - f: MaybeUninit::new(f), - } - } - - pub fn defuse(self) { - mem::forget(self) - } -} - -impl Drop for OnDrop { - fn drop(&mut self) { - unsafe { self.f.as_ptr().read()() } - } -} diff --git a/embassy/src/util/signal.rs b/embassy/src/util/signal.rs deleted file mode 100644 index bb832533c..000000000 --- a/embassy/src/util/signal.rs +++ /dev/null @@ -1,171 +0,0 @@ -use core::cell::UnsafeCell; -use core::future::Future; -use core::mem; -use core::ptr; -use core::task::{Context, Poll, Waker}; -use cortex_m::peripheral::NVIC; -use cortex_m::peripheral::{scb, SCB}; -use executor::raw::TaskHeader; -use ptr::NonNull; - -use crate::executor; -use crate::interrupt::{Interrupt, InterruptExt}; - -/// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks. -/// -/// For more advanced use cases, please consider [futures-intrusive](https://crates.io/crates/futures-intrusive) channels or mutexes. -pub struct Signal { - state: UnsafeCell>, -} - -enum State { - None, - Waiting(Waker), - Signaled(T), -} - -unsafe impl Send for Signal {} -unsafe impl Sync for Signal {} - -impl Signal { - pub const fn new() -> Self { - Self { - state: UnsafeCell::new(State::None), - } - } - - /// Mark this Signal as completed. - pub fn signal(&self, val: T) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { - waker.wake(); - } - }) - } - - pub fn reset(&self) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - *state = State::None - }) - } - - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - match state { - State::None => { - *state = State::Waiting(cx.waker().clone()); - Poll::Pending - } - State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, - State::Waiting(_) => panic!("waker overflow"), - State::Signaled(_) => match mem::replace(state, State::None) { - State::Signaled(res) => Poll::Ready(res), - _ => unreachable!(), - }, - } - }) - } - - /// Future that completes when this Signal has been signaled. - pub fn wait(&self) -> impl Future + '_ { - futures::future::poll_fn(move |cx| self.poll_wait(cx)) - } - - /// non-blocking method to check whether this signal has been signaled. - pub fn signaled(&self) -> bool { - critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) - } -} - -// ========== - -pub fn wake_on_interrupt(interrupt: &mut impl Interrupt, waker: &Waker) { - interrupt.disable(); - interrupt.set_handler(irq_wake_handler); - interrupt.set_handler_context(unsafe { executor::raw::task_from_waker(waker) }.as_ptr() as _); - interrupt.enable(); -} - -unsafe fn irq_wake_handler(ctx: *mut ()) { - if let Some(task) = NonNull::new(ctx as *mut TaskHeader) { - executor::raw::wake_task(task); - } - - let irq = match SCB::vect_active() { - scb::VectActive::Interrupt { irqn } => irqn, - _ => unreachable!(), - }; - - NVIC::mask(crate::interrupt::NrWrap(irq as u16)); -} - -// ========== - -struct NrWrap(u8); -unsafe impl cortex_m::interrupt::Nr for NrWrap { - fn nr(&self) -> u8 { - self.0 - } -} - -/// Creates a future that completes when the specified Interrupt is triggered. -/// -/// The input handler is unregistered when this Future is dropped. -/// -/// Example: -/// ``` no_compile -/// use embassy::traits::*; -/// use embassy::util::InterruptFuture; -/// use embassy_stm32::interrupt; // Adjust this to your MCU's embassy HAL. -/// #[embassy::task] -/// async fn demo_interrupt_future() { -/// // Using STM32f446 interrupt names, adjust this to your application as necessary. -/// // Wait for TIM2 to tick. -/// let mut tim2_interrupt = interrupt::take!(TIM2); -/// InterruptFuture::new(&mut tim2_interrupt).await; -/// // TIM2 interrupt went off, do something... -/// } -/// ``` -pub struct InterruptFuture<'a, I: Interrupt> { - interrupt: &'a mut I, -} - -impl<'a, I: Interrupt> Drop for InterruptFuture<'a, I> { - fn drop(&mut self) { - self.interrupt.disable(); - self.interrupt.remove_handler(); - } -} - -impl<'a, I: Interrupt> InterruptFuture<'a, I> { - pub fn new(interrupt: &'a mut I) -> Self { - interrupt.disable(); - interrupt.set_handler(irq_wake_handler); - interrupt.set_handler_context(ptr::null_mut()); - interrupt.unpend(); - interrupt.enable(); - - Self { interrupt } - } -} - -impl<'a, I: Interrupt> Unpin for InterruptFuture<'a, I> {} - -impl<'a, I: Interrupt> Future for InterruptFuture<'a, I> { - type Output = (); - - fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let s = unsafe { self.get_unchecked_mut() }; - s.interrupt.set_handler_context(unsafe { - executor::raw::task_from_waker(&cx.waker()).cast().as_ptr() - }); - if s.interrupt.is_enabled() { - Poll::Pending - } else { - Poll::Ready(()) - } - } -} diff --git a/embassy/src/util/waker.rs b/embassy/src/util/waker.rs deleted file mode 100644 index 1ac6054f9..000000000 --- a/embassy/src/util/waker.rs +++ /dev/null @@ -1,81 +0,0 @@ -use core::ptr::{self, NonNull}; -use core::task::Waker; - -use atomic_polyfill::{compiler_fence, AtomicPtr, Ordering}; - -use crate::executor::raw::{task_from_waker, wake_task, TaskHeader}; - -/// Utility struct to register and wake a waker. -#[derive(Debug)] -pub struct WakerRegistration { - waker: Option>, -} - -impl WakerRegistration { - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - let w = unsafe { task_from_waker(w) }; - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, do nothing. - Some(w2) if w == w2 => {} - Some(w2) => { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - - unsafe { wake_task(w2) } - self.waker = Some(w); - } - None => self.waker = Some(w), - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - unsafe { wake_task(w) } - } - } -} - -// SAFETY: `WakerRegistration` effectively contains an `Option`, -// which is `Send` and `Sync`. -unsafe impl Send for WakerRegistration {} -unsafe impl Sync for WakerRegistration {} - -pub struct AtomicWaker { - waker: AtomicPtr, -} - -impl AtomicWaker { - pub const fn new() -> Self { - Self { - waker: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&self, w: &Waker) { - let w = unsafe { task_from_waker(w) }; - self.waker.store(w.as_ptr(), Ordering::Relaxed); - compiler_fence(Ordering::SeqCst); - } - - /// Wake the registered waker, if any. - pub fn wake(&self) { - let w2 = self.waker.load(Ordering::Relaxed); - if let Some(w2) = NonNull::new(w2) { - unsafe { wake_task(w2) }; - } - } -} diff --git a/embassy/src/util/waker_agnostic.rs b/embassy/src/util/waker_agnostic.rs deleted file mode 100644 index 1675c53a0..000000000 --- a/embassy/src/util/waker_agnostic.rs +++ /dev/null @@ -1,84 +0,0 @@ -use core::cell::Cell; -use core::mem; -use core::task::Waker; - -use crate::util::CriticalSectionMutex as Mutex; - -/// Utility struct to register and wake a waker. -#[derive(Debug)] -pub struct WakerRegistration { - waker: Option, -} - -impl WakerRegistration { - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(w)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - w.wake() - } - } -} - -/// Utility struct to register and wake a waker. -pub struct AtomicWaker { - waker: Mutex>>, -} - -impl AtomicWaker { - pub const fn new() -> Self { - Self { - waker: Mutex::new(Cell::new(None)), - } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&self, w: &Waker) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - cell.set(match cell.replace(None) { - Some(w2) if (w2.will_wake(w)) => Some(w2), - _ => Some(w.clone()), - }) - }) - } - - /// Wake the registered waker, if any. - pub fn wake(&self) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - if let Some(w) = cell.replace(None) { - w.wake_by_ref(); - cell.set(Some(w)); - } - }) - } -} diff --git a/embassy/src/waitqueue/mod.rs b/embassy/src/waitqueue/mod.rs new file mode 100644 index 000000000..a2bafad99 --- /dev/null +++ b/embassy/src/waitqueue/mod.rs @@ -0,0 +1,5 @@ +//! Async low-level wait queues + +#[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] +mod waker; +pub use waker::*; diff --git a/embassy/src/waitqueue/waker.rs b/embassy/src/waitqueue/waker.rs new file mode 100644 index 000000000..9eddbdaa1 --- /dev/null +++ b/embassy/src/waitqueue/waker.rs @@ -0,0 +1,80 @@ +use atomic_polyfill::{compiler_fence, AtomicPtr, Ordering}; +use core::ptr::{self, NonNull}; +use core::task::Waker; + +use crate::executor::raw::{task_from_waker, wake_task, TaskHeader}; + +/// Utility struct to register and wake a waker. +#[derive(Debug)] +pub struct WakerRegistration { + waker: Option>, +} + +impl WakerRegistration { + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + let w = unsafe { task_from_waker(w) }; + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, do nothing. + Some(w2) if w == w2 => {} + Some(w2) => { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + + unsafe { wake_task(w2) } + self.waker = Some(w); + } + None => self.waker = Some(w), + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + unsafe { wake_task(w) } + } + } +} + +// SAFETY: `WakerRegistration` effectively contains an `Option`, +// which is `Send` and `Sync`. +unsafe impl Send for WakerRegistration {} +unsafe impl Sync for WakerRegistration {} + +pub struct AtomicWaker { + waker: AtomicPtr, +} + +impl AtomicWaker { + pub const fn new() -> Self { + Self { + waker: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + let w = unsafe { task_from_waker(w) }; + self.waker.store(w.as_ptr(), Ordering::Relaxed); + compiler_fence(Ordering::SeqCst); + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + let w2 = self.waker.load(Ordering::Relaxed); + if let Some(w2) = NonNull::new(w2) { + unsafe { wake_task(w2) }; + } + } +} diff --git a/embassy/src/waitqueue/waker_agnostic.rs b/embassy/src/waitqueue/waker_agnostic.rs new file mode 100644 index 000000000..f583fa6f4 --- /dev/null +++ b/embassy/src/waitqueue/waker_agnostic.rs @@ -0,0 +1,84 @@ +use core::cell::Cell; +use core::mem; +use core::task::Waker; + +use crate::blocking_mutex::CriticalSectionMutex as Mutex; + +/// Utility struct to register and wake a waker. +#[derive(Debug)] +pub struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + w.wake() + } + } +} + +/// Utility struct to register and wake a waker. +pub struct AtomicWaker { + waker: Mutex>>, +} + +impl AtomicWaker { + pub const fn new() -> Self { + Self { + waker: Mutex::new(Cell::new(None)), + } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&self, w: &Waker) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + cell.set(match cell.replace(None) { + Some(w2) if (w2.will_wake(w)) => Some(w2), + _ => Some(w.clone()), + }) + }) + } + + /// Wake the registered waker, if any. + pub fn wake(&self) { + critical_section::with(|cs| { + let cell = self.waker.borrow(cs); + if let Some(w) = cell.replace(None) { + w.wake_by_ref(); + cell.set(Some(w)); + } + }) + } +} diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index c8cc67d77..79fa3dfb9 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -6,14 +6,13 @@ mod example_common; use defmt::unwrap; +use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError, WithNoThreads}; use embassy::executor::Spawner; use embassy::time::{Duration, Timer}; -use embassy::util::mpsc::TryRecvError; -use embassy::util::{mpsc, Forever}; +use embassy::util::Forever; use embassy_nrf::gpio::{Level, Output, OutputDrive}; use embassy_nrf::Peripherals; use embedded_hal::digital::v2::OutputPin; -use mpsc::{Channel, Sender, WithNoThreads}; enum LedState { On, diff --git a/examples/stm32wl55/src/bin/subghz.rs b/examples/stm32wl55/src/bin/subghz.rs index 1e406886a..89549c766 100644 --- a/examples/stm32wl55/src/bin/subghz.rs +++ b/examples/stm32wl55/src/bin/subghz.rs @@ -8,16 +8,16 @@ #[path = "../example_common.rs"] mod example_common; -use embassy::{traits::gpio::WaitForRisingEdge, util::InterruptFuture}; -use embassy_stm32::{ - dbgmcu::Dbgmcu, - dma::NoDma, - exti::ExtiInput, - gpio::{Input, Level, Output, Pull, Speed}, - interrupt, - subghz::*, - Peripherals, -}; +use embassy::channel::signal::Signal; +use embassy::interrupt::{Interrupt, InterruptExt}; +use embassy::traits::gpio::WaitForRisingEdge; +use embassy_stm32::dbgmcu::Dbgmcu; +use embassy_stm32::dma::NoDma; +use embassy_stm32::exti::ExtiInput; +use embassy_stm32::gpio::{Input, Level, Output, Pull, Speed}; +use embassy_stm32::interrupt; +use embassy_stm32::subghz::*; +use embassy_stm32::Peripherals; use embedded_hal::digital::v2::OutputPin; use example_common::unwrap; @@ -83,7 +83,13 @@ async fn main(_spawner: embassy::executor::Spawner, p: Peripherals) { let button = Input::new(p.PA0, Pull::Up); let mut pin = ExtiInput::new(button, p.EXTI0); - let mut radio_irq = interrupt::take!(SUBGHZ_RADIO); + static IRQ_SIGNAL: Signal<()> = Signal::new(); + let radio_irq = interrupt::take!(SUBGHZ_RADIO); + radio_irq.set_handler(|_| { + IRQ_SIGNAL.signal(()); + unsafe { interrupt::SUBGHZ_RADIO::steal() }.disable(); + }); + let mut radio = SubGhz::new(p.SUBGHZSPI, p.PA5, p.PA7, p.PA6, NoDma, NoDma); defmt::info!("Radio ready for use"); @@ -118,7 +124,9 @@ async fn main(_spawner: embassy::executor::Spawner, p: Peripherals) { unwrap!(radio.write_buffer(TX_BUF_OFFSET, PING_DATA_BYTES)); unwrap!(radio.set_tx(Timeout::DISABLED)); - InterruptFuture::new(&mut radio_irq).await; + radio_irq.enable(); + IRQ_SIGNAL.wait().await; + let (_, irq_status) = unwrap!(radio.irq_status()); if irq_status & Irq::TxDone.mask() != 0 { defmt::info!("TX done"); -- cgit