From a0f1b0ee01d461607660d2d56b5b1bdc57e0d3fb Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 29 Jul 2022 21:58:35 +0200 Subject: Split embassy crate into embassy-executor, embassy-util. --- embassy-executor/src/executor/arch/cortex_m.rs | 59 +++ embassy-executor/src/executor/arch/riscv32.rs | 74 ++++ embassy-executor/src/executor/arch/std.rs | 84 +++++ embassy-executor/src/executor/arch/wasm.rs | 74 ++++ embassy-executor/src/executor/arch/xtensa.rs | 75 ++++ embassy-executor/src/executor/mod.rs | 44 +++ embassy-executor/src/executor/raw/mod.rs | 433 +++++++++++++++++++++++ embassy-executor/src/executor/raw/run_queue.rs | 74 ++++ embassy-executor/src/executor/raw/timer_queue.rs | 85 +++++ embassy-executor/src/executor/raw/util.rs | 33 ++ embassy-executor/src/executor/raw/waker.rs | 53 +++ embassy-executor/src/executor/spawner.rs | 202 +++++++++++ 12 files changed, 1290 insertions(+) create mode 100644 embassy-executor/src/executor/arch/cortex_m.rs create mode 100644 embassy-executor/src/executor/arch/riscv32.rs create mode 100644 embassy-executor/src/executor/arch/std.rs create mode 100644 embassy-executor/src/executor/arch/wasm.rs create mode 100644 embassy-executor/src/executor/arch/xtensa.rs create mode 100644 embassy-executor/src/executor/mod.rs create mode 100644 embassy-executor/src/executor/raw/mod.rs create mode 100644 embassy-executor/src/executor/raw/run_queue.rs create mode 100644 embassy-executor/src/executor/raw/timer_queue.rs create mode 100644 embassy-executor/src/executor/raw/util.rs create mode 100644 embassy-executor/src/executor/raw/waker.rs create mode 100644 embassy-executor/src/executor/spawner.rs (limited to 'embassy-executor/src/executor') diff --git a/embassy-executor/src/executor/arch/cortex_m.rs b/embassy-executor/src/executor/arch/cortex_m.rs new file mode 100644 index 000000000..d6e758dfb --- /dev/null +++ b/embassy-executor/src/executor/arch/cortex_m.rs @@ -0,0 +1,59 @@ +use core::arch::asm; +use core::marker::PhantomData; +use core::ptr; + +use super::{raw, Spawner}; + +/// Thread mode executor, using WFE/SEV. +/// +/// This is the simplest and most common kind of executor. It runs on +/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction +/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction +/// is executed, to make the `WFE` exit from sleep and poll the task. +/// +/// This executor allows for ultra low power consumption for chips where `WFE` +/// triggers low-power sleep without extra steps. If your chip requires extra steps, +/// you may use [`raw::Executor`] directly to program custom behavior. +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [Forever](crate::util::Forever) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + asm!("wfe"); + }; + } + } +} diff --git a/embassy-executor/src/executor/arch/riscv32.rs b/embassy-executor/src/executor/arch/riscv32.rs new file mode 100644 index 000000000..7a7d5698c --- /dev/null +++ b/embassy-executor/src/executor/arch/riscv32.rs @@ -0,0 +1,74 @@ +use core::marker::PhantomData; +use core::ptr; + +use atomic_polyfill::{AtomicBool, Ordering}; + +use super::{raw, Spawner}; + +/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV +/// +static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + +/// RISCV32 Executor +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + // use Signal_Work_Thread_Mode as substitute for local interrupt register + inner: raw::Executor::new( + |_| { + SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); + }, + ptr::null_mut(), + ), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [Forever](crate::util::Forever) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + // we do not care about race conditions between the load and store operations, interrupts + //will only set this value to true. + critical_section::with(|_| { + // if there is work to do, loop back to polling + // TODO can we relax this? + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } + // if not, wait for interrupt + else { + core::arch::asm!("wfi"); + } + }); + // if an interrupt occurred while waiting, it will be serviced here + } + } + } +} diff --git a/embassy-executor/src/executor/arch/std.rs b/embassy-executor/src/executor/arch/std.rs new file mode 100644 index 000000000..b93ab8a79 --- /dev/null +++ b/embassy-executor/src/executor/arch/std.rs @@ -0,0 +1,84 @@ +use std::marker::PhantomData; +use std::sync::{Condvar, Mutex}; + +use super::{raw, Spawner}; + +/// Single-threaded std-based executor. +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + signaler: &'static Signaler, +} + +impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let signaler = &*Box::leak(Box::new(Signaler::new())); + Self { + inner: raw::Executor::new( + |p| unsafe { + let s = &*(p as *const () as *const Signaler); + s.signal() + }, + signaler as *const _ as _, + ), + not_send: PhantomData, + signaler, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [Forever](crate::util::Forever) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { self.inner.poll() }; + self.signaler.wait() + } + } +} + +struct Signaler { + mutex: Mutex, + condvar: Condvar, +} + +impl Signaler { + fn new() -> Self { + Self { + mutex: Mutex::new(false), + condvar: Condvar::new(), + } + } + + fn wait(&self) { + let mut signaled = self.mutex.lock().unwrap(); + while !*signaled { + signaled = self.condvar.wait(signaled).unwrap(); + } + *signaled = false; + } + + fn signal(&self) { + let mut signaled = self.mutex.lock().unwrap(); + *signaled = true; + self.condvar.notify_one(); + } +} diff --git a/embassy-executor/src/executor/arch/wasm.rs b/embassy-executor/src/executor/arch/wasm.rs new file mode 100644 index 000000000..9d5aa31ed --- /dev/null +++ b/embassy-executor/src/executor/arch/wasm.rs @@ -0,0 +1,74 @@ +use core::marker::PhantomData; + +use js_sys::Promise; +use wasm_bindgen::prelude::*; + +use super::raw::util::UninitCell; +use super::raw::{self}; +use super::Spawner; + +/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. +pub struct Executor { + inner: raw::Executor, + ctx: &'static WasmContext, + not_send: PhantomData<*mut ()>, +} + +pub(crate) struct WasmContext { + promise: Promise, + closure: UninitCell>, +} + +impl WasmContext { + pub fn new() -> Self { + Self { + promise: Promise::resolve(&JsValue::undefined()), + closure: UninitCell::uninit(), + } + } +} + +impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let ctx = &*Box::leak(Box::new(WasmContext::new())); + let inner = raw::Executor::new( + |p| unsafe { + let ctx = &*(p as *const () as *const WasmContext); + let _ = ctx.promise.then(ctx.closure.as_mut()); + }, + ctx as *const _ as _, + ); + Self { + inner, + not_send: PhantomData, + ctx, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [Forever](crate::util::Forever) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { + unsafe { + let executor = &self.inner; + self.ctx.closure.write(Closure::new(move |_| { + executor.poll(); + })); + init(self.inner.spawner()); + } + } +} diff --git a/embassy-executor/src/executor/arch/xtensa.rs b/embassy-executor/src/executor/arch/xtensa.rs new file mode 100644 index 000000000..20bd7b8a5 --- /dev/null +++ b/embassy-executor/src/executor/arch/xtensa.rs @@ -0,0 +1,75 @@ +use core::marker::PhantomData; +use core::ptr; + +use atomic_polyfill::{AtomicBool, Ordering}; + +use super::{raw, Spawner}; + +/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa +/// +static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + +/// Xtensa Executor +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + // use Signal_Work_Thread_Mode as substitute for local interrupt register + inner: raw::Executor::new( + |_| { + SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); + }, + ptr::null_mut(), + ), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [Forever](crate::util::Forever) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + // we do not care about race conditions between the load and store operations, interrupts + // will only set this value to true. + // if there is work to do, loop back to polling + // TODO can we relax this? + critical_section::with(|_| { + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } else { + // waiti sets the PS.INTLEVEL when slipping into sleep + // because critical sections in Xtensa are implemented via increasing + // PS.INTLEVEL the critical section ends here + // take care not add code after `waiti` if it needs to be inside the CS + core::arch::asm!("waiti 0"); // critical section ends here + } + }); + } + } + } +} diff --git a/embassy-executor/src/executor/mod.rs b/embassy-executor/src/executor/mod.rs new file mode 100644 index 000000000..45d00c568 --- /dev/null +++ b/embassy-executor/src/executor/mod.rs @@ -0,0 +1,44 @@ +//! Async task executor. +//! +//! This module provides an async/await executor designed for embedded usage. +//! +//! - No `alloc`, no heap needed. Task futures are statically allocated. +//! - No "fixed capacity" data structures, executor works with 1 or 1000 tasks without needing config/tuning. +//! - Integrated timer queue: sleeping is easy, just do `Timer::after(Duration::from_secs(1)).await;`. +//! - No busy-loop polling: CPU sleeps when there's no work to do, using interrupts or `WFE/SEV`. +//! - Efficient polling: a wake will only poll the woken task, not all of them. +//! - Fair: a task can't monopolize CPU time even if it's constantly being woken. All other tasks get a chance to run before a given task gets polled for the second time. +//! - Creating multiple executor instances is supported, to run tasks with multiple priority levels. This allows higher-priority tasks to preempt lower-priority tasks. + +cfg_if::cfg_if! { + if #[cfg(cortex_m)] { + #[path="arch/cortex_m.rs"] + mod arch; + pub use arch::*; + } + else if #[cfg(target_arch="riscv32")] { + #[path="arch/riscv32.rs"] + mod arch; + pub use arch::*; + } + else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { + #[path="arch/xtensa.rs"] + mod arch; + pub use arch::*; + } + else if #[cfg(feature="wasm")] { + #[path="arch/wasm.rs"] + mod arch; + pub use arch::*; + } + else if #[cfg(feature="std")] { + #[path="arch/std.rs"] + mod arch; + pub use arch::*; + } +} + +pub mod raw; + +mod spawner; +pub use spawner::*; diff --git a/embassy-executor/src/executor/raw/mod.rs b/embassy-executor/src/executor/raw/mod.rs new file mode 100644 index 000000000..87317bc02 --- /dev/null +++ b/embassy-executor/src/executor/raw/mod.rs @@ -0,0 +1,433 @@ +//! Raw executor. +//! +//! This module exposes "raw" Executor and Task structs for more low level control. +//! +//! ## WARNING: here be dragons! +//! +//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe +//! executor wrappers in [`executor`](crate::executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. + +mod run_queue; +#[cfg(feature = "time")] +mod timer_queue; +pub(crate) mod util; +mod waker; + +use core::cell::Cell; +use core::future::Future; +use core::pin::Pin; +use core::ptr::NonNull; +use core::task::{Context, Poll}; +use core::{mem, ptr}; + +use atomic_polyfill::{AtomicU32, Ordering}; +use critical_section::CriticalSection; + +use self::run_queue::{RunQueue, RunQueueItem}; +use self::util::UninitCell; +pub use self::waker::task_from_waker; +use super::SpawnToken; +#[cfg(feature = "time")] +use crate::time::driver::{self, AlarmHandle}; +#[cfg(feature = "time")] +use crate::time::Instant; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "time")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +/// Raw task header for use in task pointers. +/// +/// This is an opaque struct, used for raw pointers to tasks, for use +/// with funtions like [`wake_task`] and [`task_from_waker`]. +pub struct TaskHeader { + pub(crate) state: AtomicU32, + pub(crate) run_queue_item: RunQueueItem, + pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + pub(crate) poll_fn: UninitCell)>, // Valid if STATE_SPAWNED + + #[cfg(feature = "time")] + pub(crate) expires_at: Cell, + #[cfg(feature = "time")] + pub(crate) timer_queue_item: timer_queue::TimerQueueItem, +} + +impl TaskHeader { + pub(crate) const fn new() -> Self { + Self { + state: AtomicU32::new(0), + run_queue_item: RunQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + + #[cfg(feature = "time")] + expires_at: Cell::new(Instant::from_ticks(0)), + #[cfg(feature = "time")] + timer_queue_item: timer_queue::TimerQueueItem::new(), + } + } + + pub(crate) unsafe fn enqueue(&self) { + critical_section::with(|cs| { + let state = self.state.load(Ordering::Relaxed); + + // If already scheduled, or if not started, + if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { + return; + } + + // Mark it as scheduled + self.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*self.executor.get(); + executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader); + }) + } +} + +/// Raw storage in which a task can be spawned. +/// +/// This struct holds the necessary memory to spawn one task whose future is `F`. +/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You +/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. +/// +/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished +/// running. Hence the relevant methods require `&'static self`. It may be reused, however. +/// +/// Internally, the [embassy_executor::task](embassy_macros::task) macro allocates an array of `TaskStorage`s +/// in a `static`. The most common reason to use the raw `Task` is to have control of where +/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. + +// repr(C) is needed to guarantee that the Task is located at offset 0 +// This makes it safe to cast between TaskHeader and TaskStorage pointers. +#[repr(C)] +pub struct TaskStorage { + raw: TaskHeader, + future: UninitCell, // Valid if STATE_SPAWNED +} + +impl TaskStorage { + const NEW: Self = Self::new(); + + /// Create a new TaskStorage, in not-spawned state. + pub const fn new() -> Self { + Self { + raw: TaskHeader::new(), + future: UninitCell::uninit(), + } + } + + /// Try to spawn the task. + /// + /// The `future` closure constructs the future. It's only called if spawning is + /// actually possible. It is a closure instead of a simple `future: F` param to ensure + /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to + /// NRVO optimizations. + /// + /// This function will fail if the task is already spawned and has not finished running. + /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will + /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. + /// + /// Once the task has finished running, you may spawn it again. It is allowed to spawn it + /// on a different executor. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + if self.spawn_mark_used() { + return unsafe { SpawnToken::::new(self.spawn_initialize(future)) }; + } + + SpawnToken::::new_failed() + } + + fn spawn_mark_used(&'static self) -> bool { + let state = STATE_SPAWNED | STATE_RUN_QUEUED; + self.raw + .state + .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull { + // Initialize the task + self.raw.poll_fn.write(Self::poll); + self.future.write(future()); + NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader) + } + + unsafe fn poll(p: NonNull) { + let this = &*(p.as_ptr() as *const TaskStorage); + + let future = Pin::new_unchecked(this.future.as_mut()); + let waker = waker::from_task(p); + let mut cx = Context::from_waker(&waker); + match future.poll(&mut cx) { + Poll::Ready(_) => { + this.future.drop_in_place(); + this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + } + Poll::Pending => {} + } + + // the compiler is emitting a virtual call for waker drop, but we know + // it's a noop for our waker. + mem::forget(waker); + } +} + +unsafe impl Sync for TaskStorage {} + +/// Raw storage that can hold up to N tasks of the same type. +/// +/// This is essentially a `[TaskStorage; N]`. +pub struct TaskPool { + pool: [TaskStorage; N], +} + +impl TaskPool { + /// Create a new TaskPool, with all tasks in non-spawned state. + pub const fn new() -> Self { + Self { + pool: [TaskStorage::NEW; N], + } + } + + /// Try to spawn a task in the pool. + /// + /// See [`TaskStorage::spawn()`] for details. + /// + /// This will loop over the pool and spawn the task in the first storage that + /// is currently free. If none is free, a "poisoned" SpawnToken is returned, + /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + for task in &self.pool { + if task.spawn_mark_used() { + return unsafe { SpawnToken::::new(task.spawn_initialize(future)) }; + } + } + + SpawnToken::::new_failed() + } + + /// Like spawn(), but allows the task to be send-spawned if the args are Send even if + /// the future is !Send. + /// + /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used + /// by the Embassy macros ONLY. + /// + /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` + /// is an `async fn`, NOT a hand-written `Future`. + #[doc(hidden)] + pub unsafe fn _spawn_async_fn(&'static self, future: FutFn) -> SpawnToken + where + FutFn: FnOnce() -> F, + { + // When send-spawning a task, we construct the future in this thread, and effectively + // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, + // send-spawning should require the future `F` to be `Send`. + // + // The problem is this is more restrictive than needed. Once the future is executing, + // it is never sent to another thread. It is only sent when spawning. It should be + // enough for the task's arguments to be Send. (and in practice it's super easy to + // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) + // + // We can do it by sending the task args and constructing the future in the executor thread + // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy + // of the args. + // + // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the + // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. + // + // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, + // but it's possible it'll be guaranteed in the future. See zulip thread: + // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) + // + // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. + // This is why we return `SpawnToken` below. + // + // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly + // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken`. + + for task in &self.pool { + if task.spawn_mark_used() { + return SpawnToken::::new(task.spawn_initialize(future)); + } + } + + SpawnToken::::new_failed() + } +} + +/// Raw executor. +/// +/// This is the core of the Embassy executor. It is low-level, requiring manual +/// handling of wakeups and task polling. If you can, prefer using one of the +/// higher level executors in [`crate::executor`]. +/// +/// The raw executor leaves it up to you to handle wakeups and scheduling: +/// +/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks +/// that "want to run"). +/// - You must supply a `signal_fn`. The executor will call it to notify you it has work +/// to do. You must arrange for `poll()` to be called as soon as possible. +/// +/// `signal_fn` can be called from *any* context: any thread, any interrupt priority +/// level, etc. It may be called synchronously from any `Executor` method call as well. +/// You must deal with this correctly. +/// +/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates +/// the requirement for `poll` to not be called reentrantly. +pub struct Executor { + run_queue: RunQueue, + signal_fn: fn(*mut ()), + signal_ctx: *mut (), + + #[cfg(feature = "time")] + pub(crate) timer_queue: timer_queue::TimerQueue, + #[cfg(feature = "time")] + alarm: AlarmHandle, +} + +impl Executor { + /// Create a new executor. + /// + /// When the executor has work to do, it will call `signal_fn` with + /// `signal_ctx` as argument. + /// + /// See [`Executor`] docs for details on `signal_fn`. + pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + #[cfg(feature = "time")] + let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; + #[cfg(feature = "time")] + driver::set_alarm_callback(alarm, signal_fn, signal_ctx); + + Self { + run_queue: RunQueue::new(), + signal_fn, + signal_ctx, + + #[cfg(feature = "time")] + timer_queue: timer_queue::TimerQueue::new(), + #[cfg(feature = "time")] + alarm, + } + } + + /// Enqueue a task in the task queue + /// + /// # Safety + /// - `task` must be a valid pointer to a spawned task. + /// - `task` must be set up to run in this executor. + /// - `task` must NOT be already enqueued (in this executor or another one). + #[inline(always)] + unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) { + if self.run_queue.enqueue(cs, task) { + (self.signal_fn)(self.signal_ctx) + } + } + + /// Spawn a task in this executor. + /// + /// # Safety + /// + /// `task` must be a valid pointer to an initialized but not-already-spawned task. + /// + /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. + /// In this case, the task's Future must be Send. This is because this is effectively + /// sending the task to the executor thread. + pub(super) unsafe fn spawn(&'static self, task: NonNull) { + let task = task.as_ref(); + task.executor.set(self); + + critical_section::with(|cs| { + self.enqueue(cs, task as *const _ as _); + }) + } + + /// Poll all queued tasks in this executor. + /// + /// This loops over all tasks that are queued to be polled (i.e. they're + /// freshly spawned or they've been woken). Other tasks are not polled. + /// + /// You must call `poll` after receiving a call to `signal_fn`. It is OK + /// to call `poll` even when not requested by `signal_fn`, but it wastes + /// energy. + /// + /// # Safety + /// + /// You must NOT call `poll` reentrantly on the same executor. + /// + /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you + /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to + /// somehow schedule for `poll()` to be called later, at a time you know for sure there's + /// no `poll()` already running. + pub unsafe fn poll(&'static self) { + #[cfg(feature = "time")] + self.timer_queue.dequeue_expired(Instant::now(), |p| { + p.as_ref().enqueue(); + }); + + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); + + #[cfg(feature = "time")] + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + // Run the task + task.poll_fn.read()(p as _); + + // Enqueue or update into timer_queue + #[cfg(feature = "time")] + self.timer_queue.update(p); + }); + + #[cfg(feature = "time")] + { + // If this is already in the past, set_alarm will immediately trigger the alarm. + // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, + // so we immediately do another poll loop iteration. + let next_expiration = self.timer_queue.next_expiration(); + driver::set_alarm(self.alarm, next_expiration.as_ticks()); + } + } + + /// Get a spawner that spawns tasks in this executor. + /// + /// It is OK to call this method multiple times to obtain multiple + /// `Spawner`s. You may also copy `Spawner`s. + pub fn spawner(&'static self) -> super::Spawner { + super::Spawner::new(self) + } +} + +/// Wake a task by raw pointer. +/// +/// You can obtain task pointers from `Waker`s using [`task_from_waker`]. +/// +/// # Safety +/// +/// `task` must be a valid task pointer obtained from [`task_from_waker`]. +pub unsafe fn wake_task(task: NonNull) { + task.as_ref().enqueue(); +} + +#[cfg(feature = "time")] +pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) { + let task = waker::task_from_waker(waker); + let task = task.as_ref(); + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); +} diff --git a/embassy-executor/src/executor/raw/run_queue.rs b/embassy-executor/src/executor/raw/run_queue.rs new file mode 100644 index 000000000..31615da7e --- /dev/null +++ b/embassy-executor/src/executor/raw/run_queue.rs @@ -0,0 +1,74 @@ +use core::ptr; +use core::ptr::NonNull; + +use atomic_polyfill::{AtomicPtr, Ordering}; +use critical_section::CriticalSection; + +use super::TaskHeader; + +pub(crate) struct RunQueueItem { + next: AtomicPtr, +} + +impl RunQueueItem { + pub const fn new() -> Self { + Self { + next: AtomicPtr::new(ptr::null_mut()), + } + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + head: AtomicPtr, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool { + let prev = self.head.load(Ordering::Relaxed); + (*task).run_queue_item.next.store(prev, Ordering::Relaxed); + self.head.store(task, Ordering::Relaxed); + prev.is_null() + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull)) { + // Atomically empty the queue. + let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + // Iterate the linked list of tasks that were previously in the queue. + while let Some(task) = NonNull::new(ptr) { + // If the task re-enqueues itself, the `next` pointer will get overwritten. + // Therefore, first read the next pointer, and only then process the task. + let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); + + on_task(task); + + ptr = next + } + } +} diff --git a/embassy-executor/src/executor/raw/timer_queue.rs b/embassy-executor/src/executor/raw/timer_queue.rs new file mode 100644 index 000000000..62fcfc531 --- /dev/null +++ b/embassy-executor/src/executor/raw/timer_queue.rs @@ -0,0 +1,85 @@ +use core::cell::Cell; +use core::cmp::min; +use core::ptr; +use core::ptr::NonNull; + +use atomic_polyfill::Ordering; + +use super::{TaskHeader, STATE_TIMER_QUEUED}; +use crate::time::Instant; + +pub(crate) struct TimerQueueItem { + next: Cell<*mut TaskHeader>, +} + +impl TimerQueueItem { + pub const fn new() -> Self { + Self { + next: Cell::new(ptr::null_mut()), + } + } +} + +pub(crate) struct TimerQueue { + head: Cell<*mut TaskHeader>, +} + +impl TimerQueue { + pub const fn new() -> Self { + Self { + head: Cell::new(ptr::null_mut()), + } + } + + pub(crate) unsafe fn update(&self, p: NonNull) { + let task = p.as_ref(); + if task.expires_at.get() != Instant::MAX { + let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); + let is_new = old_state & STATE_TIMER_QUEUED == 0; + + if is_new { + task.timer_queue_item.next.set(self.head.get()); + self.head.set(p.as_ptr()); + } + } + } + + pub(crate) unsafe fn next_expiration(&self) -> Instant { + let mut res = Instant::MAX; + self.retain(|p| { + let task = p.as_ref(); + let expires = task.expires_at.get(); + res = min(res, expires); + expires != Instant::MAX + }); + res + } + + pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull)) { + self.retain(|p| { + let task = p.as_ref(); + if task.expires_at.get() <= now { + on_task(p); + false + } else { + true + } + }); + } + + pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull) -> bool) { + let mut prev = &self.head; + while !prev.get().is_null() { + let p = NonNull::new_unchecked(prev.get()); + let task = &*p.as_ptr(); + if f(p) { + // Skip to next + prev = &task.timer_queue_item.next; + } else { + // Remove it + prev.set(task.timer_queue_item.next.get()); + task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + } + } + } +} diff --git a/embassy-executor/src/executor/raw/util.rs b/embassy-executor/src/executor/raw/util.rs new file mode 100644 index 000000000..ed5822188 --- /dev/null +++ b/embassy-executor/src/executor/raw/util.rs @@ -0,0 +1,33 @@ +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::ptr; + +pub(crate) struct UninitCell(MaybeUninit>); +impl UninitCell { + pub const fn uninit() -> Self { + Self(MaybeUninit::uninit()) + } + + pub unsafe fn as_mut_ptr(&self) -> *mut T { + (*self.0.as_ptr()).get() + } + + #[allow(clippy::mut_from_ref)] + pub unsafe fn as_mut(&self) -> &mut T { + &mut *self.as_mut_ptr() + } + + pub unsafe fn write(&self, val: T) { + ptr::write(self.as_mut_ptr(), val) + } + + pub unsafe fn drop_in_place(&self) { + ptr::drop_in_place(self.as_mut_ptr()) + } +} + +impl UninitCell { + pub unsafe fn read(&self) -> T { + ptr::read(self.as_mut_ptr()) + } +} diff --git a/embassy-executor/src/executor/raw/waker.rs b/embassy-executor/src/executor/raw/waker.rs new file mode 100644 index 000000000..f6ae332fa --- /dev/null +++ b/embassy-executor/src/executor/raw/waker.rs @@ -0,0 +1,53 @@ +use core::mem; +use core::ptr::NonNull; +use core::task::{RawWaker, RawWakerVTable, Waker}; + +use super::TaskHeader; + +const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); + +unsafe fn clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &VTABLE) +} + +unsafe fn wake(p: *const ()) { + (*(p as *mut TaskHeader)).enqueue() +} + +unsafe fn drop(_: *const ()) { + // nop +} + +pub(crate) unsafe fn from_task(p: NonNull) -> Waker { + Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) +} + +/// Get a task pointer from a waker. +/// +/// This can be used as an optimization in wait queues to store task pointers +/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps +/// avoid dynamic dispatch. +/// +/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). +/// +/// # Panics +/// +/// Panics if the waker is not created by the Embassy executor. +pub fn task_from_waker(waker: &Waker) -> NonNull { + // safety: OK because WakerHack has the same layout as Waker. + // This is not really guaranteed because the structs are `repr(Rust)`, it is + // indeed the case in the current implementation. + // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 + let hack: &WakerHack = unsafe { mem::transmute(waker) }; + if hack.vtable != &VTABLE { + panic!("Found waker not created by the Embassy executor. `embassy_executor::time::Timer` only works with the Embassy executor.") + } + + // safety: we never create a waker with a null data pointer. + unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } +} + +struct WakerHack { + data: *const (), + vtable: &'static RawWakerVTable, +} diff --git a/embassy-executor/src/executor/spawner.rs b/embassy-executor/src/executor/spawner.rs new file mode 100644 index 000000000..25a0d7dbb --- /dev/null +++ b/embassy-executor/src/executor/spawner.rs @@ -0,0 +1,202 @@ +use core::marker::PhantomData; +use core::mem; +use core::ptr::NonNull; +use core::task::Poll; + +use futures_util::future::poll_fn; + +use super::raw; + +/// Token to spawn a newly-created task in an executor. +/// +/// When calling a task function (like `#[embassy_executor::task] async fn my_task() { ... }`), the returned +/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must +/// then spawn it into an executor, typically with [`Spawner::spawn()`]. +/// +/// The generic parameter `S` determines whether the task can be spawned in executors +/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`]. +/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`]. +/// +/// # Panics +/// +/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. +/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. +#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] +pub struct SpawnToken { + raw_task: Option>, + phantom: PhantomData<*mut S>, +} + +impl SpawnToken { + pub(crate) unsafe fn new(raw_task: NonNull) -> Self { + Self { + raw_task: Some(raw_task), + phantom: PhantomData, + } + } + + pub(crate) fn new_failed() -> Self { + Self { + raw_task: None, + phantom: PhantomData, + } + } +} + +impl Drop for SpawnToken { + fn drop(&mut self) { + // TODO deallocate the task instead. + panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") + } +} + +/// Error returned when spawning a task. +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum SpawnError { + /// Too many instances of this task are already running. + /// + /// By default, a task marked with `#[embassy_executor::task]` can only have one instance + /// running at a time. You may allow multiple instances to run in parallel with + /// `#[embassy_executor::task(pool_size = 4)]`, at the cost of higher RAM usage. + Busy, +} + +/// Handle to spawn tasks into an executor. +/// +/// This Spawner can spawn any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +#[derive(Copy, Clone)] +pub struct Spawner { + executor: &'static raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Spawner { + pub(crate) fn new(executor: &'static raw::Executor) -> Self { + Self { + executor, + not_send: PhantomData, + } + } + + /// Get a Spawner for the current executor. + /// + /// This function is `async` just to get access to the current async + /// context. It returns instantly, it does not block/yield. + /// + /// # Panics + /// + /// Panics if the current executor is not an Embassy executor. + pub async fn for_current_executor() -> Self { + poll_fn(|cx| unsafe { + let task = raw::task_from_waker(cx.waker()); + let executor = (*task.as_ptr()).executor.get(); + Poll::Ready(Self::new(&*executor)) + }) + .await + } + + /// Spawn a task into an executor. + /// + /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; + mem::forget(token); + + match task { + Some(task) => { + unsafe { self.executor.spawn(task) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } + + // Used by the `embassy_macros::main!` macro to throw an error when spawn + // fails. This is here to allow conditional use of `defmt::unwrap!` + // without introducing a `defmt` feature in the `embassy_macros` package, + // which would require use of `-Z namespaced-features`. + /// Spawn a task into an executor, panicking on failure. + /// + /// # Panics + /// + /// Panics if the spawning fails. + pub fn must_spawn(&self, token: SpawnToken) { + unwrap!(self.spawn(token)); + } + + /// Convert this Spawner to a SendSpawner. This allows you to send the + /// spawner to other threads, but the spawner loses the ability to spawn + /// non-Send tasks. + pub fn make_send(&self) -> SendSpawner { + SendSpawner { + executor: self.executor, + } + } +} + +/// Handle to spawn tasks into an executor from any thread. +/// +/// This Spawner can be used from any thread (it is Send), but it can +/// only spawn Send tasks. The reason for this is spawning is effectively +/// "sending" the tasks to the executor thread. +/// +/// If you want to spawn non-Send tasks, use [Spawner]. +#[derive(Copy, Clone)] +pub struct SendSpawner { + executor: &'static raw::Executor, +} + +unsafe impl Send for SendSpawner {} +unsafe impl Sync for SendSpawner {} + +impl SendSpawner { + pub(crate) fn new(executor: &'static raw::Executor) -> Self { + Self { executor } + } + + /// Get a Spawner for the current executor. + /// + /// This function is `async` just to get access to the current async + /// context. It returns instantly, it does not block/yield. + /// + /// # Panics + /// + /// Panics if the current executor is not an Embassy executor. + pub async fn for_current_executor() -> Self { + poll_fn(|cx| unsafe { + let task = raw::task_from_waker(cx.waker()); + let executor = (*task.as_ptr()).executor.get(); + Poll::Ready(Self::new(&*executor)) + }) + .await + } + + /// Spawn a task into an executor. + /// + /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + let header = token.raw_task; + mem::forget(token); + + match header { + Some(header) => { + unsafe { self.executor.spawn(header) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } + + /// Spawn a task into an executor, panicking on failure. + /// + /// # Panics + /// + /// Panics if the spawning fails. + pub fn must_spawn(&self, token: SpawnToken) { + unwrap!(self.spawn(token)); + } +} -- cgit