From 80972f1e0e6b6d409cc4d86202608c22e5ee3e5a Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 30 Mar 2023 17:55:55 +0200 Subject: executor,sync: add support for turbo-wakers. This is a `core` patch to make wakers 1 word (the task pointer) instead of 2 (task pointer + vtable). It allows having the "waker optimization" we had a while back on `WakerRegistration/AtomicWaker`, but EVERYWHERE, without patching all crates. Advantages: - Less memory usage. - Faster. - `AtomicWaker` can actually use atomics to load/store the waker, No critical section needed. - No `dyn` call, which means `cargo-call-stack` can now see through wakes. Disadvantages: - You have to patch `core`... - Breaks all executors and other things that create wakers, unless they opt in to using the new `from_ptr` API. How to use: - Run this shell script to patch `core`. https://gist.github.com/Dirbaio/c67da7cf318515181539122c9d32b395 - Enable `build-std` - Enable `build-std-features = core/turbowakers` - Enable feature `turbowakers` in `embassy-executor`, `embassy-sync`. - Make sure you have no other crate creating wakers other than `embassy-executor`. These will panic at runtime. Note that the patched `core` is equivalent to the unpached one when the `turbowakers` feature is not enabled, so it should be fine to leave it there. --- embassy-executor/src/raw/mod.rs | 1 + embassy-executor/src/raw/waker_turbo.rs | 34 +++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 embassy-executor/src/raw/waker_turbo.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 15ff18fc8..72c367c33 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -11,6 +11,7 @@ mod run_queue; #[cfg(feature = "integrated-timers")] mod timer_queue; pub(crate) mod util; +#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; use core::future::Future; diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs new file mode 100644 index 000000000..435a0ff7e --- /dev/null +++ b/embassy-executor/src/raw/waker_turbo.rs @@ -0,0 +1,34 @@ +use core::ptr::NonNull; +use core::task::Waker; + +use super::{wake_task, TaskHeader, TaskRef}; + +pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { + Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _)) +} + +/// Get a task pointer from a waker. +/// +/// This can be used as an optimization in wait queues to store task pointers +/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps +/// avoid dynamic dispatch. +/// +/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). +/// +/// # Panics +/// +/// Panics if the waker is not created by the Embassy executor. +pub fn task_from_waker(waker: &Waker) -> TaskRef { + let ptr = waker.as_turbo_ptr().as_ptr(); + + // safety: our wakers are always created with `TaskRef::as_ptr` + unsafe { TaskRef::from_ptr(ptr as *const TaskHeader) } +} + +#[inline(never)] +#[no_mangle] +fn _turbo_wake(ptr: NonNull<()>) { + // safety: our wakers are always created with `TaskRef::as_ptr` + let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) }; + wake_task(task) +} -- cgit From b41ee47115509ba5ed302c684c0c36b6a3e3f76f Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 3 Apr 2023 01:11:42 +0200 Subject: executor: unify export mod. --- embassy-executor/src/lib.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index 4c7e2f4cd..8707995b4 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -46,11 +46,13 @@ cfg_if::cfg_if! { } } +/// Implementation details for embassy macros. +/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. #[doc(hidden)] -/// Implementation details for embassy macros. DO NOT USE. -pub mod export { +pub mod _export { #[cfg(feature = "rtos-trace")] pub use rtos_trace::trace; + pub use static_cell::StaticCell; /// Expands the given block of code when `embassy-executor` is compiled with /// the `rtos-trace-interrupt` feature. @@ -75,9 +77,3 @@ pub mod raw; mod spawner; pub use spawner::*; - -/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. -#[doc(hidden)] -pub mod _export { - pub use static_cell::StaticCell; -} -- cgit From d3c4e4a20a05085eae8d568c7efdbe09bada9cf5 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 3 Apr 2023 01:18:27 +0200 Subject: executor: add Pender, rework Cargo features. This introduces a `Pender` struct with enum cases for thread-mode, interrupt-mode and custom callback executors. This avoids calls through function pointers when using only the thread or interrupt executors. Faster, and friendlier to `cargo-call-stack`. `embassy-executor` now has `arch-xxx` Cargo features to select the arch and to enable the builtin executors (thread and interrupt). --- embassy-executor/src/arch/cortex_m.rs | 238 +++++++++++++++++++++++++++------- embassy-executor/src/arch/riscv32.rs | 133 ++++++++++--------- embassy-executor/src/arch/std.rs | 150 +++++++++++---------- embassy-executor/src/arch/wasm.rs | 134 ++++++++++--------- embassy-executor/src/arch/xtensa.rs | 135 ++++++++++--------- embassy-executor/src/lib.rs | 71 +++++----- embassy-executor/src/raw/mod.rs | 97 +++++++++++--- 7 files changed, 602 insertions(+), 356 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index 4b27a264e..d6a55c4c7 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs @@ -1,59 +1,209 @@ -use core::arch::asm; -use core::marker::PhantomData; -use core::ptr; - -use super::{raw, Spawner}; - -/// Thread mode executor, using WFE/SEV. -/// -/// This is the simplest and most common kind of executor. It runs on -/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction -/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction -/// is executed, to make the `WFE` exit from sleep and poll the task. -/// -/// This executor allows for ultra low power consumption for chips where `WFE` -/// triggers low-power sleep without extra steps. If your chip requires extra steps, -/// you may use [`raw::Executor`] directly to program custom behavior. -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use core::arch::asm; + use core::marker::PhantomData; + + #[cfg(feature = "nightly")] + pub use embassy_macros::main_cortex_m as main; + + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; + + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender; + + impl ThreadPender { + pub(crate) fn pend(self) { + unsafe { core::arch::asm!("sev") } + } + } + + /// Thread mode executor, using WFE/SEV. + /// + /// This is the simplest and most common kind of executor. It runs on + /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction + /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction + /// is executed, to make the `WFE` exit from sleep and poll the task. + /// + /// This executor allows for ultra low power consumption for chips where `WFE` + /// triggers low-power sleep without extra steps. If your chip requires extra steps, + /// you may use [`raw::Executor`] directly to program custom behavior. + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + asm!("wfe"); + }; + } + } + } } -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()), - not_send: PhantomData, +#[cfg(feature = "executor-interrupt")] +pub use interrupt::*; +#[cfg(feature = "executor-interrupt")] +mod interrupt { + use core::cell::UnsafeCell; + use core::mem::MaybeUninit; + + use atomic_polyfill::{AtomicBool, Ordering}; + use cortex_m::interrupt::InterruptNumber; + use cortex_m::peripheral::NVIC; + + use crate::raw::{self, Pender, PenderInner}; + + #[derive(Clone, Copy)] + pub(crate) struct InterruptPender(u16); + + impl InterruptPender { + pub(crate) fn pend(self) { + // STIR is faster, but is only available in v7 and higher. + #[cfg(not(armv6m))] + { + let mut nvic: cortex_m::peripheral::NVIC = unsafe { core::mem::transmute(()) }; + nvic.request(self); + } + + #[cfg(armv6m)] + cortex_m::peripheral::NVIC::pend(self); } } - /// Run the executor. + unsafe impl cortex_m::interrupt::InterruptNumber for InterruptPender { + fn number(self) -> u16 { + self.0 + } + } + + /// Interrupt mode executor. /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. + /// This executor runs tasks in interrupt mode. The interrupt handler is set up + /// to poll tasks, and when a task is woken the interrupt is pended from software. /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. + /// This allows running async tasks at a priority higher than thread mode. One + /// use case is to leave thread mode free for non-async tasks. Another use case is + /// to run multiple executors: one in thread mode for low priority tasks and another in + /// interrupt mode for higher priority tasks. Higher priority tasks will preempt lower + /// priority ones. /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: + /// It is even possible to run multiple interrupt mode executors at different priorities, + /// by assigning different priorities to the interrupts. For an example on how to do this, + /// See the 'multiprio' example for 'embassy-nrf'. /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// To use it, you have to pick an interrupt that won't be used by the hardware. + /// Some chips reserve some interrupts for this purpose, sometimes named "software interrupts" (SWI). + /// If this is not the case, you may use an interrupt from any unused peripheral. /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// It is somewhat more complex to use, it's recommended to use the thread-mode + /// [`Executor`] instead, if it works for your use case. + pub struct InterruptExecutor { + started: AtomicBool, + executor: UnsafeCell>, + } + + unsafe impl Send for InterruptExecutor {} + unsafe impl Sync for InterruptExecutor {} + + impl InterruptExecutor { + /// Create a new, not started `InterruptExecutor`. + #[inline] + pub const fn new() -> Self { + Self { + started: AtomicBool::new(false), + executor: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Executor interrupt callback. + /// + /// # Safety + /// + /// You MUST call this from the interrupt handler, and from nowhere else. + pub unsafe fn on_interrupt(&'static self) { + let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; + executor.poll(); + } + + /// Start the executor. + /// + /// This initializes the executor, enables the interrupt, and returns. + /// The executor keeps running in the background through the interrupt. + /// + /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`] + /// is returned instead of a [`Spawner`](embassy_executor::Spawner) because the executor effectively runs in a + /// different "thread" (the interrupt), so spawning tasks on it is effectively + /// sending them. + /// + /// To obtain a [`Spawner`](embassy_executor::Spawner) for this executor, use [`Spawner::for_current_executor()`](embassy_executor::Spawner::for_current_executor()) from + /// a task running in it. + /// + /// # Interrupt requirements + /// + /// You must write the interrupt handler yourself, and make it call [`on_interrupt()`](Self::on_interrupt). + /// + /// This method already enables (unmasks) the interrupt, you must NOT do it yourself. + /// + /// You must set the interrupt priority before calling this method. You MUST NOT + /// do it after. + /// + pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { + if self + .started + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + panic!("InterruptExecutor::start() called multiple times on the same executor."); + } - loop { unsafe { - self.inner.poll(); - asm!("wfe"); - }; + (&mut *self.executor.get()) + .as_mut_ptr() + .write(raw::Executor::new(Pender(PenderInner::Interrupt(InterruptPender( + irq.number(), + ))))) + } + + let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; + + unsafe { NVIC::unmask(irq) } + + executor.spawner().make_send() } } } diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index e97a56cda..f66daeae4 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -1,72 +1,83 @@ -use core::marker::PhantomData; -use core::ptr; -use core::sync::atomic::{AtomicBool, Ordering}; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-riscv32`."); -use super::{raw, Spawner}; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use core::marker::PhantomData; + use core::sync::atomic::{AtomicBool, Ordering}; -/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV -/// -static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; -/// RISCV32 Executor -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender; -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - // use Signal_Work_Thread_Mode as substitute for local interrupt register - inner: raw::Executor::new( - |_| { - SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); - }, - ptr::null_mut(), - ), - not_send: PhantomData, + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst); } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV + static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + + /// RISCV32 Executor + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); - loop { - unsafe { - self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - //will only set this value to true. - critical_section::with(|_| { - // if there is work to do, loop back to polling - // TODO can we relax this? - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } - // if not, wait for interrupt - else { - core::arch::asm!("wfi"); - } - }); - // if an interrupt occurred while waiting, it will be serviced here + loop { + unsafe { + self.inner.poll(); + // we do not care about race conditions between the load and store operations, interrupts + //will only set this value to true. + critical_section::with(|_| { + // if there is work to do, loop back to polling + // TODO can we relax this? + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } + // if not, wait for interrupt + else { + core::arch::asm!("wfi"); + } + }); + // if an interrupt occurred while waiting, it will be serviced here + } } } } diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs index 701f0eb18..4e4a178f0 100644 --- a/embassy-executor/src/arch/std.rs +++ b/embassy-executor/src/arch/std.rs @@ -1,84 +1,100 @@ -use std::marker::PhantomData; -use std::sync::{Condvar, Mutex}; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-std`."); -use super::{raw, Spawner}; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use std::marker::PhantomData; + use std::sync::{Condvar, Mutex}; -/// Single-threaded std-based executor. -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, - signaler: &'static Signaler, -} + #[cfg(feature = "nightly")] + pub use embassy_macros::main_std as main; + + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - let signaler = &*Box::leak(Box::new(Signaler::new())); - Self { - inner: raw::Executor::new( - |p| unsafe { - let s = &*(p as *const () as *const Signaler); - s.signal() - }, - signaler as *const _ as _, - ), - not_send: PhantomData, - signaler, + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender(&'static Signaler); + + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + self.0.signal() } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// Single-threaded std-based executor. + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + signaler: &'static Signaler, + } - loop { - unsafe { self.inner.poll() }; - self.signaler.wait() + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let signaler = &*Box::leak(Box::new(Signaler::new())); + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(signaler)))), + not_send: PhantomData, + signaler, + } } - } -} -struct Signaler { - mutex: Mutex, - condvar: Condvar, -} + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); -impl Signaler { - fn new() -> Self { - Self { - mutex: Mutex::new(false), - condvar: Condvar::new(), + loop { + unsafe { self.inner.poll() }; + self.signaler.wait() + } } } - fn wait(&self) { - let mut signaled = self.mutex.lock().unwrap(); - while !*signaled { - signaled = self.condvar.wait(signaled).unwrap(); - } - *signaled = false; + struct Signaler { + mutex: Mutex, + condvar: Condvar, } - fn signal(&self) { - let mut signaled = self.mutex.lock().unwrap(); - *signaled = true; - self.condvar.notify_one(); + impl Signaler { + fn new() -> Self { + Self { + mutex: Mutex::new(false), + condvar: Condvar::new(), + } + } + + fn wait(&self) { + let mut signaled = self.mutex.lock().unwrap(); + while !*signaled { + signaled = self.condvar.wait(signaled).unwrap(); + } + *signaled = false; + } + + fn signal(&self) { + let mut signaled = self.mutex.lock().unwrap(); + *signaled = true; + self.condvar.notify_one(); + } } } diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs index 98091cfbb..08ab16b99 100644 --- a/embassy-executor/src/arch/wasm.rs +++ b/embassy-executor/src/arch/wasm.rs @@ -1,74 +1,88 @@ -use core::marker::PhantomData; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-wasm`."); -use js_sys::Promise; -use wasm_bindgen::prelude::*; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { -use super::raw::util::UninitCell; -use super::raw::{self}; -use super::Spawner; + use core::marker::PhantomData; -/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. -pub struct Executor { - inner: raw::Executor, - ctx: &'static WasmContext, - not_send: PhantomData<*mut ()>, -} + #[cfg(feature = "nightly")] + pub use embassy_macros::main_wasm as main; + use js_sys::Promise; + use wasm_bindgen::prelude::*; -pub(crate) struct WasmContext { - promise: Promise, - closure: UninitCell>, -} + use crate::raw::util::UninitCell; + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; + + /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. + pub struct Executor { + inner: raw::Executor, + ctx: &'static WasmContext, + not_send: PhantomData<*mut ()>, + } + + pub(crate) struct WasmContext { + promise: Promise, + closure: UninitCell>, + } + + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender(&'static WasmContext); -impl WasmContext { - pub fn new() -> Self { - Self { - promise: Promise::resolve(&JsValue::undefined()), - closure: UninitCell::uninit(), + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + let _ = self.0.promise.then(unsafe { self.0.closure.as_mut() }); } } -} -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - let ctx = &*Box::leak(Box::new(WasmContext::new())); - let inner = raw::Executor::new( - |p| unsafe { - let ctx = &*(p as *const () as *const WasmContext); - let _ = ctx.promise.then(ctx.closure.as_mut()); - }, - ctx as *const _ as _, - ); - Self { - inner, - not_send: PhantomData, - ctx, + impl WasmContext { + pub fn new() -> Self { + Self { + promise: Promise::resolve(&JsValue::undefined()), + closure: UninitCell::uninit(), + } } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { - unsafe { - let executor = &self.inner; - self.ctx.closure.write(Closure::new(move |_| { - executor.poll(); - })); - init(self.inner.spawner()); + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let ctx = &*Box::leak(Box::new(WasmContext::new())); + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(ctx)))), + not_send: PhantomData, + ctx, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { + unsafe { + let executor = &self.inner; + self.ctx.closure.write(Closure::new(move |_| { + executor.poll(); + })); + init(self.inner.spawner()); + } } } } diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs index 4ee0d9f78..61ea92c16 100644 --- a/embassy-executor/src/arch/xtensa.rs +++ b/embassy-executor/src/arch/xtensa.rs @@ -1,73 +1,84 @@ -use core::marker::PhantomData; -use core::ptr; -use core::sync::atomic::{AtomicBool, Ordering}; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-xtensa`."); -use super::{raw, Spawner}; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use core::marker::PhantomData; + use core::sync::atomic::{AtomicBool, Ordering}; -/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa -/// -static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; -/// Xtensa Executor -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender; -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - // use Signal_Work_Thread_Mode as substitute for local interrupt register - inner: raw::Executor::new( - |_| { - SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); - }, - ptr::null_mut(), - ), - not_send: PhantomData, + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst); } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa + static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + + /// Xtensa Executor + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); - loop { - unsafe { - self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - // will only set this value to true. - // if there is work to do, loop back to polling - // TODO can we relax this? - critical_section::with(|_| { - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } else { - // waiti sets the PS.INTLEVEL when slipping into sleep - // because critical sections in Xtensa are implemented via increasing - // PS.INTLEVEL the critical section ends here - // take care not add code after `waiti` if it needs to be inside the CS - core::arch::asm!("waiti 0"); // critical section ends here - } - }); + loop { + unsafe { + self.inner.poll(); + // we do not care about race conditions between the load and store operations, interrupts + // will only set this value to true. + // if there is work to do, loop back to polling + // TODO can we relax this? + critical_section::with(|_| { + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } else { + // waiti sets the PS.INTLEVEL when slipping into sleep + // because critical sections in Xtensa are implemented via increasing + // PS.INTLEVEL the critical section ends here + // take care not add code after `waiti` if it needs to be inside the CS + core::arch::asm!("waiti 0"); // critical section ends here + } + }); + } } } } diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index 8707995b4..3ce687eb6 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -1,5 +1,5 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))] +#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)] +#![cfg_attr(all(feature = "nightly", feature = "arch-xtensa"), feature(asm_experimental_arch))] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] #![warn(missing_docs)] @@ -10,41 +10,35 @@ pub(crate) mod fmt; #[cfg(feature = "nightly")] pub use embassy_macros::task; -cfg_if::cfg_if! { - if #[cfg(cortex_m)] { - #[path="arch/cortex_m.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_cortex_m as main; - } - else if #[cfg(target_arch="riscv32")] { - #[path="arch/riscv32.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_riscv as main; - } - else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { - #[path="arch/xtensa.rs"] - mod arch; - pub use arch::*; - } - else if #[cfg(feature="wasm")] { - #[path="arch/wasm.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_wasm as main; - } - else if #[cfg(feature="std")] { - #[path="arch/std.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_std as main; - } +macro_rules! check_at_most_one { + (@amo [$($feats:literal)*] [] [$($res:tt)*]) => { + #[cfg(any($($res)*))] + compile_error!(concat!("At most one of these features can be enabled at the same time:", $(" `", $feats, "`",)*)); + }; + (@amo $feats:tt [$curr:literal $($rest:literal)*] [$($res:tt)*]) => { + check_at_most_one!(@amo $feats [$($rest)*] [$($res)* $(all(feature=$curr, feature=$rest),)*]); + }; + ($($f:literal),*$(,)?) => { + check_at_most_one!(@amo [$($f)*] [$($f)*] []); + }; } +check_at_most_one!("arch-cortex-m", "arch-riscv32", "arch-xtensa", "arch-std", "arch-wasm",); + +#[cfg(feature = "_arch")] +#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")] +#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] +#[cfg_attr(feature = "arch-xtensa", path = "arch/xtensa.rs")] +#[cfg_attr(feature = "arch-std", path = "arch/std.rs")] +#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] +mod arch; + +#[cfg(feature = "_arch")] +pub use arch::*; + +pub mod raw; + +mod spawner; +pub use spawner::*; /// Implementation details for embassy macros. /// Do not use. Used for macros and HALs only. Not covered by semver guarantees. @@ -72,8 +66,3 @@ pub mod _export { ($($tt:tt)*) => {}; } } - -pub mod raw; - -mod spawner; -pub use spawner::*; diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 72c367c33..f6c66da5a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -19,7 +19,6 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::AtomicPtr; use core::task::{Context, Poll}; use atomic_polyfill::{AtomicU32, Ordering}; @@ -290,10 +289,60 @@ impl TaskPool { } } +#[derive(Clone, Copy)] +pub(crate) enum PenderInner { + #[cfg(feature = "executor-thread")] + Thread(crate::arch::ThreadPender), + #[cfg(feature = "executor-interrupt")] + Interrupt(crate::arch::InterruptPender), + #[cfg(feature = "pender-callback")] + Callback { func: fn(*mut ()), context: *mut () }, +} + +unsafe impl Send for PenderInner {} +unsafe impl Sync for PenderInner {} + +/// Platform/architecture-specific action executed when an executor has pending work. +/// +/// When a task within an executor is woken, the `Pender` is called. This does a +/// platform/architecture-specific action to signal there is pending work in the executor. +/// When this happens, you must arrange for [`Executor::poll`] to be called. +/// +/// You can think of it as a waker, but for the whole executor. +pub struct Pender(pub(crate) PenderInner); + +impl Pender { + /// Create a `Pender` that will call an arbitrary function pointer. + /// + /// # Arguments + /// + /// - `func`: The function pointer to call. + /// - `context`: Opaque context pointer, that will be passed to the function pointer. + #[cfg(feature = "pender-callback")] + pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self { + Self(PenderInner::Callback { + func, + context: context.into(), + }) + } +} + +impl Pender { + pub(crate) fn pend(&self) { + match self.0 { + #[cfg(feature = "executor-thread")] + PenderInner::Thread(x) => x.pend(), + #[cfg(feature = "executor-interrupt")] + PenderInner::Interrupt(x) => x.pend(), + #[cfg(feature = "pender-callback")] + PenderInner::Callback { func, context } => func(context), + } + } +} + pub(crate) struct SyncExecutor { run_queue: RunQueue, - signal_fn: fn(*mut ()), - signal_ctx: AtomicPtr<()>, + pender: Pender, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, @@ -302,16 +351,13 @@ pub(crate) struct SyncExecutor { } impl SyncExecutor { - pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + pub(crate) fn new(pender: Pender) -> Self { #[cfg(feature = "integrated-timers")] let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; - #[cfg(feature = "integrated-timers")] - driver::set_alarm_callback(alarm, signal_fn, signal_ctx); Self { run_queue: RunQueue::new(), - signal_fn, - signal_ctx: AtomicPtr::new(signal_ctx), + pender, #[cfg(feature = "integrated-timers")] timer_queue: timer_queue::TimerQueue::new(), @@ -332,10 +378,16 @@ impl SyncExecutor { trace::task_ready_begin(task.as_ptr() as u32); if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed)) + self.pender.pend(); } } + #[cfg(feature = "integrated-timers")] + fn alarm_callback(ctx: *mut ()) { + let this: &Self = unsafe { &*(ctx as *const Self) }; + this.pender.pend(); + } + pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); @@ -351,6 +403,9 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { + #[cfg(feature = "integrated-timers")] + driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); + #[allow(clippy::never_loop)] loop { #[cfg(feature = "integrated-timers")] @@ -417,14 +472,14 @@ impl SyncExecutor { /// /// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks /// that "want to run"). -/// - You must supply a `signal_fn`. The executor will call it to notify you it has work +/// - You must supply a [`Pender`]. The executor will call it to notify you it has work /// to do. You must arrange for `poll()` to be called as soon as possible. /// -/// `signal_fn` can be called from *any* context: any thread, any interrupt priority +/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority /// level, etc. It may be called synchronously from any `Executor` method call as well. /// You must deal with this correctly. /// -/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates +/// In particular, you must NOT call `poll` directly from the pender callback, as this violates /// the requirement for `poll` to not be called reentrantly. #[repr(transparent)] pub struct Executor { @@ -437,15 +492,15 @@ impl Executor { pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { mem::transmute(inner) } + /// Create a new executor. /// - /// When the executor has work to do, it will call `signal_fn` with - /// `signal_ctx` as argument. + /// When the executor has work to do, it will call the [`Pender`]. /// - /// See [`Executor`] docs for details on `signal_fn`. - pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + /// See [`Executor`] docs for details on `Pender`. + pub fn new(pender: Pender) -> Self { Self { - inner: SyncExecutor::new(signal_fn, signal_ctx), + inner: SyncExecutor::new(pender), _not_sync: PhantomData, } } @@ -468,16 +523,16 @@ impl Executor { /// This loops over all tasks that are queued to be polled (i.e. they're /// freshly spawned or they've been woken). Other tasks are not polled. /// - /// You must call `poll` after receiving a call to `signal_fn`. It is OK - /// to call `poll` even when not requested by `signal_fn`, but it wastes + /// You must call `poll` after receiving a call to the [`Pender`]. It is OK + /// to call `poll` even when not requested by the `Pender`, but it wastes /// energy. /// /// # Safety /// /// You must NOT call `poll` reentrantly on the same executor. /// - /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you - /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to + /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you + /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. pub unsafe fn poll(&'static self) { -- cgit From 8290236ed64435453a9c028c95246a86371bd4ce Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Sun, 2 Apr 2023 14:11:31 -0500 Subject: executor: Replace unsound critical sections with atomics --- embassy-executor/src/raw/mod.rs | 28 +++++++++++++--------------- embassy-executor/src/raw/run_queue.rs | 18 ++++++++++++------ 2 files changed, 25 insertions(+), 21 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f6c66da5a..bd0cff26b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -22,7 +22,6 @@ use core::ptr::NonNull; use core::task::{Context, Poll}; use atomic_polyfill::{AtomicU32, Ordering}; -use critical_section::CriticalSection; #[cfg(feature = "integrated-timers")] use embassy_time::driver::{self, AlarmHandle}; #[cfg(feature = "integrated-timers")] @@ -373,11 +372,11 @@ impl SyncExecutor { /// - `task` must be set up to run in this executor. /// - `task` must NOT be already enqueued (in this executor or another one). #[inline(always)] - unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) { + unsafe fn enqueue(&self, task: TaskRef) { #[cfg(feature = "rtos-trace")] trace::task_ready_begin(task.as_ptr() as u32); - if self.run_queue.enqueue(cs, task) { + if self.run_queue.enqueue(task) { self.pender.pend(); } } @@ -394,9 +393,7 @@ impl SyncExecutor { #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); - critical_section::with(|cs| { - self.enqueue(cs, task); - }) + self.enqueue(task); } /// # Safety @@ -552,24 +549,25 @@ impl Executor { /// /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task(task: TaskRef) { - critical_section::with(|cs| { - let header = task.header(); - let state = header.state.load(Ordering::Relaxed); + let header = task.header(); + let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { // If already scheduled, or if not started, if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - return; + None + } else { + // Mark it as scheduled + Some(state | STATE_RUN_QUEUED) } + }); - // Mark it as scheduled - header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); - + if res.is_ok() { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); - executor.enqueue(cs, task); + executor.enqueue(task); } - }) + } } #[cfg(feature = "integrated-timers")] diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 362157535..a88174a0c 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -2,7 +2,6 @@ use core::ptr; use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; -use critical_section::CriticalSection; use super::{TaskHeader, TaskRef}; @@ -46,11 +45,18 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { - let prev = self.head.load(Ordering::Relaxed); - task.header().run_queue_item.next.store(prev, Ordering::Relaxed); - self.head.store(task.as_ptr() as _, Ordering::Relaxed); - prev.is_null() + pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + let mut was_empty = false; + + self.head + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { + was_empty = prev.is_null(); + task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + Some(task.as_ptr() as *mut _) + }) + .ok(); + + was_empty } /// Empty the queue, then call `on_task` for each task that was in the queue. -- cgit