From 5daa173ce4b153a532b4daa9e94c7a248231f25b Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 17 Aug 2022 23:40:16 +0200 Subject: Split embassy-time from embassy-executor. --- embassy-executor/src/executor/arch/cortex_m.rs | 59 ---- embassy-executor/src/executor/arch/riscv32.rs | 74 ---- embassy-executor/src/executor/arch/std.rs | 84 ----- embassy-executor/src/executor/arch/wasm.rs | 74 ---- embassy-executor/src/executor/arch/xtensa.rs | 75 ---- embassy-executor/src/executor/mod.rs | 44 --- embassy-executor/src/executor/raw/mod.rs | 427 ----------------------- embassy-executor/src/executor/raw/run_queue.rs | 74 ---- embassy-executor/src/executor/raw/timer_queue.rs | 85 ----- embassy-executor/src/executor/raw/util.rs | 33 -- embassy-executor/src/executor/raw/waker.rs | 53 --- embassy-executor/src/executor/spawner.rs | 202 ----------- 12 files changed, 1284 deletions(-) delete mode 100644 embassy-executor/src/executor/arch/cortex_m.rs delete mode 100644 embassy-executor/src/executor/arch/riscv32.rs delete mode 100644 embassy-executor/src/executor/arch/std.rs delete mode 100644 embassy-executor/src/executor/arch/wasm.rs delete mode 100644 embassy-executor/src/executor/arch/xtensa.rs delete mode 100644 embassy-executor/src/executor/mod.rs delete mode 100644 embassy-executor/src/executor/raw/mod.rs delete mode 100644 embassy-executor/src/executor/raw/run_queue.rs delete mode 100644 embassy-executor/src/executor/raw/timer_queue.rs delete mode 100644 embassy-executor/src/executor/raw/util.rs delete mode 100644 embassy-executor/src/executor/raw/waker.rs delete mode 100644 embassy-executor/src/executor/spawner.rs (limited to 'embassy-executor/src/executor') diff --git a/embassy-executor/src/executor/arch/cortex_m.rs b/embassy-executor/src/executor/arch/cortex_m.rs deleted file mode 100644 index d6e758dfb..000000000 --- a/embassy-executor/src/executor/arch/cortex_m.rs +++ /dev/null @@ -1,59 +0,0 @@ -use core::arch::asm; -use core::marker::PhantomData; -use core::ptr; - -use super::{raw, Spawner}; - -/// Thread mode executor, using WFE/SEV. -/// -/// This is the simplest and most common kind of executor. It runs on -/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction -/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction -/// is executed, to make the `WFE` exit from sleep and poll the task. -/// -/// This executor allows for ultra low power consumption for chips where `WFE` -/// triggers low-power sleep without extra steps. If your chip requires extra steps, -/// you may use [`raw::Executor`] directly to program custom behavior. -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()), - not_send: PhantomData, - } - } - - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [Forever](crate::util::Forever) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); - - loop { - unsafe { - self.inner.poll(); - asm!("wfe"); - }; - } - } -} diff --git a/embassy-executor/src/executor/arch/riscv32.rs b/embassy-executor/src/executor/arch/riscv32.rs deleted file mode 100644 index 7a7d5698c..000000000 --- a/embassy-executor/src/executor/arch/riscv32.rs +++ /dev/null @@ -1,74 +0,0 @@ -use core::marker::PhantomData; -use core::ptr; - -use atomic_polyfill::{AtomicBool, Ordering}; - -use super::{raw, Spawner}; - -/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV -/// -static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); - -/// RISCV32 Executor -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - // use Signal_Work_Thread_Mode as substitute for local interrupt register - inner: raw::Executor::new( - |_| { - SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); - }, - ptr::null_mut(), - ), - not_send: PhantomData, - } - } - - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [Forever](crate::util::Forever) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); - - loop { - unsafe { - self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - //will only set this value to true. - critical_section::with(|_| { - // if there is work to do, loop back to polling - // TODO can we relax this? - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } - // if not, wait for interrupt - else { - core::arch::asm!("wfi"); - } - }); - // if an interrupt occurred while waiting, it will be serviced here - } - } - } -} diff --git a/embassy-executor/src/executor/arch/std.rs b/embassy-executor/src/executor/arch/std.rs deleted file mode 100644 index b93ab8a79..000000000 --- a/embassy-executor/src/executor/arch/std.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::marker::PhantomData; -use std::sync::{Condvar, Mutex}; - -use super::{raw, Spawner}; - -/// Single-threaded std-based executor. -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, - signaler: &'static Signaler, -} - -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - let signaler = &*Box::leak(Box::new(Signaler::new())); - Self { - inner: raw::Executor::new( - |p| unsafe { - let s = &*(p as *const () as *const Signaler); - s.signal() - }, - signaler as *const _ as _, - ), - not_send: PhantomData, - signaler, - } - } - - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [Forever](crate::util::Forever) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); - - loop { - unsafe { self.inner.poll() }; - self.signaler.wait() - } - } -} - -struct Signaler { - mutex: Mutex, - condvar: Condvar, -} - -impl Signaler { - fn new() -> Self { - Self { - mutex: Mutex::new(false), - condvar: Condvar::new(), - } - } - - fn wait(&self) { - let mut signaled = self.mutex.lock().unwrap(); - while !*signaled { - signaled = self.condvar.wait(signaled).unwrap(); - } - *signaled = false; - } - - fn signal(&self) { - let mut signaled = self.mutex.lock().unwrap(); - *signaled = true; - self.condvar.notify_one(); - } -} diff --git a/embassy-executor/src/executor/arch/wasm.rs b/embassy-executor/src/executor/arch/wasm.rs deleted file mode 100644 index 9d5aa31ed..000000000 --- a/embassy-executor/src/executor/arch/wasm.rs +++ /dev/null @@ -1,74 +0,0 @@ -use core::marker::PhantomData; - -use js_sys::Promise; -use wasm_bindgen::prelude::*; - -use super::raw::util::UninitCell; -use super::raw::{self}; -use super::Spawner; - -/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. -pub struct Executor { - inner: raw::Executor, - ctx: &'static WasmContext, - not_send: PhantomData<*mut ()>, -} - -pub(crate) struct WasmContext { - promise: Promise, - closure: UninitCell>, -} - -impl WasmContext { - pub fn new() -> Self { - Self { - promise: Promise::resolve(&JsValue::undefined()), - closure: UninitCell::uninit(), - } - } -} - -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - let ctx = &*Box::leak(Box::new(WasmContext::new())); - let inner = raw::Executor::new( - |p| unsafe { - let ctx = &*(p as *const () as *const WasmContext); - let _ = ctx.promise.then(ctx.closure.as_mut()); - }, - ctx as *const _ as _, - ); - Self { - inner, - not_send: PhantomData, - ctx, - } - } - - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [Forever](crate::util::Forever) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { - unsafe { - let executor = &self.inner; - self.ctx.closure.write(Closure::new(move |_| { - executor.poll(); - })); - init(self.inner.spawner()); - } - } -} diff --git a/embassy-executor/src/executor/arch/xtensa.rs b/embassy-executor/src/executor/arch/xtensa.rs deleted file mode 100644 index 20bd7b8a5..000000000 --- a/embassy-executor/src/executor/arch/xtensa.rs +++ /dev/null @@ -1,75 +0,0 @@ -use core::marker::PhantomData; -use core::ptr; - -use atomic_polyfill::{AtomicBool, Ordering}; - -use super::{raw, Spawner}; - -/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa -/// -static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); - -/// Xtensa Executor -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - // use Signal_Work_Thread_Mode as substitute for local interrupt register - inner: raw::Executor::new( - |_| { - SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); - }, - ptr::null_mut(), - ), - not_send: PhantomData, - } - } - - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [Forever](crate::util::Forever) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); - - loop { - unsafe { - self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - // will only set this value to true. - // if there is work to do, loop back to polling - // TODO can we relax this? - critical_section::with(|_| { - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } else { - // waiti sets the PS.INTLEVEL when slipping into sleep - // because critical sections in Xtensa are implemented via increasing - // PS.INTLEVEL the critical section ends here - // take care not add code after `waiti` if it needs to be inside the CS - core::arch::asm!("waiti 0"); // critical section ends here - } - }); - } - } - } -} diff --git a/embassy-executor/src/executor/mod.rs b/embassy-executor/src/executor/mod.rs deleted file mode 100644 index 45d00c568..000000000 --- a/embassy-executor/src/executor/mod.rs +++ /dev/null @@ -1,44 +0,0 @@ -//! Async task executor. -//! -//! This module provides an async/await executor designed for embedded usage. -//! -//! - No `alloc`, no heap needed. Task futures are statically allocated. -//! - No "fixed capacity" data structures, executor works with 1 or 1000 tasks without needing config/tuning. -//! - Integrated timer queue: sleeping is easy, just do `Timer::after(Duration::from_secs(1)).await;`. -//! - No busy-loop polling: CPU sleeps when there's no work to do, using interrupts or `WFE/SEV`. -//! - Efficient polling: a wake will only poll the woken task, not all of them. -//! - Fair: a task can't monopolize CPU time even if it's constantly being woken. All other tasks get a chance to run before a given task gets polled for the second time. -//! - Creating multiple executor instances is supported, to run tasks with multiple priority levels. This allows higher-priority tasks to preempt lower-priority tasks. - -cfg_if::cfg_if! { - if #[cfg(cortex_m)] { - #[path="arch/cortex_m.rs"] - mod arch; - pub use arch::*; - } - else if #[cfg(target_arch="riscv32")] { - #[path="arch/riscv32.rs"] - mod arch; - pub use arch::*; - } - else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { - #[path="arch/xtensa.rs"] - mod arch; - pub use arch::*; - } - else if #[cfg(feature="wasm")] { - #[path="arch/wasm.rs"] - mod arch; - pub use arch::*; - } - else if #[cfg(feature="std")] { - #[path="arch/std.rs"] - mod arch; - pub use arch::*; - } -} - -pub mod raw; - -mod spawner; -pub use spawner::*; diff --git a/embassy-executor/src/executor/raw/mod.rs b/embassy-executor/src/executor/raw/mod.rs deleted file mode 100644 index fb4cc6288..000000000 --- a/embassy-executor/src/executor/raw/mod.rs +++ /dev/null @@ -1,427 +0,0 @@ -//! Raw executor. -//! -//! This module exposes "raw" Executor and Task structs for more low level control. -//! -//! ## WARNING: here be dragons! -//! -//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe -//! executor wrappers in [`executor`](crate::executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. - -mod run_queue; -#[cfg(feature = "time")] -mod timer_queue; -pub(crate) mod util; -mod waker; - -use core::cell::Cell; -use core::future::Future; -use core::pin::Pin; -use core::ptr::NonNull; -use core::task::{Context, Poll}; -use core::{mem, ptr}; - -use atomic_polyfill::{AtomicU32, Ordering}; -use critical_section::CriticalSection; - -use self::run_queue::{RunQueue, RunQueueItem}; -use self::util::UninitCell; -pub use self::waker::task_from_waker; -use super::SpawnToken; -#[cfg(feature = "time")] -use crate::time::driver::{self, AlarmHandle}; -#[cfg(feature = "time")] -use crate::time::Instant; - -/// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; -/// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -#[cfg(feature = "time")] -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; - -/// Raw task header for use in task pointers. -/// -/// This is an opaque struct, used for raw pointers to tasks, for use -/// with funtions like [`wake_task`] and [`task_from_waker`]. -pub struct TaskHeader { - pub(crate) state: AtomicU32, - pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 - pub(crate) poll_fn: UninitCell)>, // Valid if STATE_SPAWNED - - #[cfg(feature = "time")] - pub(crate) expires_at: Cell, - #[cfg(feature = "time")] - pub(crate) timer_queue_item: timer_queue::TimerQueueItem, -} - -impl TaskHeader { - pub(crate) const fn new() -> Self { - Self { - state: AtomicU32::new(0), - run_queue_item: RunQueueItem::new(), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - - #[cfg(feature = "time")] - expires_at: Cell::new(Instant::from_ticks(0)), - #[cfg(feature = "time")] - timer_queue_item: timer_queue::TimerQueueItem::new(), - } - } -} - -/// Raw storage in which a task can be spawned. -/// -/// This struct holds the necessary memory to spawn one task whose future is `F`. -/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You -/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. -/// -/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished -/// running. Hence the relevant methods require `&'static self`. It may be reused, however. -/// -/// Internally, the [embassy_executor::task](embassy_macros::task) macro allocates an array of `TaskStorage`s -/// in a `static`. The most common reason to use the raw `Task` is to have control of where -/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. - -// repr(C) is needed to guarantee that the Task is located at offset 0 -// This makes it safe to cast between TaskHeader and TaskStorage pointers. -#[repr(C)] -pub struct TaskStorage { - raw: TaskHeader, - future: UninitCell, // Valid if STATE_SPAWNED -} - -impl TaskStorage { - const NEW: Self = Self::new(); - - /// Create a new TaskStorage, in not-spawned state. - pub const fn new() -> Self { - Self { - raw: TaskHeader::new(), - future: UninitCell::uninit(), - } - } - - /// Try to spawn the task. - /// - /// The `future` closure constructs the future. It's only called if spawning is - /// actually possible. It is a closure instead of a simple `future: F` param to ensure - /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to - /// NRVO optimizations. - /// - /// This function will fail if the task is already spawned and has not finished running. - /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will - /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. - /// - /// Once the task has finished running, you may spawn it again. It is allowed to spawn it - /// on a different executor. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - if self.spawn_mark_used() { - return unsafe { SpawnToken::::new(self.spawn_initialize(future)) }; - } - - SpawnToken::::new_failed() - } - - fn spawn_mark_used(&'static self) -> bool { - let state = STATE_SPAWNED | STATE_RUN_QUEUED; - self.raw - .state - .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - } - - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull { - // Initialize the task - self.raw.poll_fn.write(Self::poll); - self.future.write(future()); - NonNull::new_unchecked(self as *const TaskStorage as *const TaskHeader as *mut TaskHeader) - } - - unsafe fn poll(p: NonNull) { - let this = &*(p.as_ptr() as *const TaskStorage); - - let future = Pin::new_unchecked(this.future.as_mut()); - let waker = waker::from_task(p); - let mut cx = Context::from_waker(&waker); - match future.poll(&mut cx) { - Poll::Ready(_) => { - this.future.drop_in_place(); - this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); - } - Poll::Pending => {} - } - - // the compiler is emitting a virtual call for waker drop, but we know - // it's a noop for our waker. - mem::forget(waker); - } -} - -unsafe impl Sync for TaskStorage {} - -/// Raw storage that can hold up to N tasks of the same type. -/// -/// This is essentially a `[TaskStorage; N]`. -pub struct TaskPool { - pool: [TaskStorage; N], -} - -impl TaskPool { - /// Create a new TaskPool, with all tasks in non-spawned state. - pub const fn new() -> Self { - Self { - pool: [TaskStorage::NEW; N], - } - } - - /// Try to spawn a task in the pool. - /// - /// See [`TaskStorage::spawn()`] for details. - /// - /// This will loop over the pool and spawn the task in the first storage that - /// is currently free. If none is free, a "poisoned" SpawnToken is returned, - /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - for task in &self.pool { - if task.spawn_mark_used() { - return unsafe { SpawnToken::::new(task.spawn_initialize(future)) }; - } - } - - SpawnToken::::new_failed() - } - - /// Like spawn(), but allows the task to be send-spawned if the args are Send even if - /// the future is !Send. - /// - /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used - /// by the Embassy macros ONLY. - /// - /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` - /// is an `async fn`, NOT a hand-written `Future`. - #[doc(hidden)] - pub unsafe fn _spawn_async_fn(&'static self, future: FutFn) -> SpawnToken - where - FutFn: FnOnce() -> F, - { - // When send-spawning a task, we construct the future in this thread, and effectively - // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, - // send-spawning should require the future `F` to be `Send`. - // - // The problem is this is more restrictive than needed. Once the future is executing, - // it is never sent to another thread. It is only sent when spawning. It should be - // enough for the task's arguments to be Send. (and in practice it's super easy to - // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) - // - // We can do it by sending the task args and constructing the future in the executor thread - // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy - // of the args. - // - // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the - // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. - // - // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, - // but it's possible it'll be guaranteed in the future. See zulip thread: - // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) - // - // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. - // This is why we return `SpawnToken` below. - // - // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly - // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken`. - - for task in &self.pool { - if task.spawn_mark_used() { - return SpawnToken::::new(task.spawn_initialize(future)); - } - } - - SpawnToken::::new_failed() - } -} - -/// Raw executor. -/// -/// This is the core of the Embassy executor. It is low-level, requiring manual -/// handling of wakeups and task polling. If you can, prefer using one of the -/// higher level executors in [`crate::executor`]. -/// -/// The raw executor leaves it up to you to handle wakeups and scheduling: -/// -/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks -/// that "want to run"). -/// - You must supply a `signal_fn`. The executor will call it to notify you it has work -/// to do. You must arrange for `poll()` to be called as soon as possible. -/// -/// `signal_fn` can be called from *any* context: any thread, any interrupt priority -/// level, etc. It may be called synchronously from any `Executor` method call as well. -/// You must deal with this correctly. -/// -/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates -/// the requirement for `poll` to not be called reentrantly. -pub struct Executor { - run_queue: RunQueue, - signal_fn: fn(*mut ()), - signal_ctx: *mut (), - - #[cfg(feature = "time")] - pub(crate) timer_queue: timer_queue::TimerQueue, - #[cfg(feature = "time")] - alarm: AlarmHandle, -} - -impl Executor { - /// Create a new executor. - /// - /// When the executor has work to do, it will call `signal_fn` with - /// `signal_ctx` as argument. - /// - /// See [`Executor`] docs for details on `signal_fn`. - pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { - #[cfg(feature = "time")] - let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; - #[cfg(feature = "time")] - driver::set_alarm_callback(alarm, signal_fn, signal_ctx); - - Self { - run_queue: RunQueue::new(), - signal_fn, - signal_ctx, - - #[cfg(feature = "time")] - timer_queue: timer_queue::TimerQueue::new(), - #[cfg(feature = "time")] - alarm, - } - } - - /// Enqueue a task in the task queue - /// - /// # Safety - /// - `task` must be a valid pointer to a spawned task. - /// - `task` must be set up to run in this executor. - /// - `task` must NOT be already enqueued (in this executor or another one). - #[inline(always)] - unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull) { - if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx) - } - } - - /// Spawn a task in this executor. - /// - /// # Safety - /// - /// `task` must be a valid pointer to an initialized but not-already-spawned task. - /// - /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. - /// In this case, the task's Future must be Send. This is because this is effectively - /// sending the task to the executor thread. - pub(super) unsafe fn spawn(&'static self, task: NonNull) { - task.as_ref().executor.set(self); - - critical_section::with(|cs| { - self.enqueue(cs, task); - }) - } - - /// Poll all queued tasks in this executor. - /// - /// This loops over all tasks that are queued to be polled (i.e. they're - /// freshly spawned or they've been woken). Other tasks are not polled. - /// - /// You must call `poll` after receiving a call to `signal_fn`. It is OK - /// to call `poll` even when not requested by `signal_fn`, but it wastes - /// energy. - /// - /// # Safety - /// - /// You must NOT call `poll` reentrantly on the same executor. - /// - /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you - /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to - /// somehow schedule for `poll()` to be called later, at a time you know for sure there's - /// no `poll()` already running. - pub unsafe fn poll(&'static self) { - #[cfg(feature = "time")] - self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); - - self.run_queue.dequeue_all(|p| { - let task = p.as_ref(); - - #[cfg(feature = "time")] - task.expires_at.set(Instant::MAX); - - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } - - // Run the task - task.poll_fn.read()(p as _); - - // Enqueue or update into timer_queue - #[cfg(feature = "time")] - self.timer_queue.update(p); - }); - - #[cfg(feature = "time")] - { - // If this is already in the past, set_alarm will immediately trigger the alarm. - // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, - // so we immediately do another poll loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - driver::set_alarm(self.alarm, next_expiration.as_ticks()); - } - } - - /// Get a spawner that spawns tasks in this executor. - /// - /// It is OK to call this method multiple times to obtain multiple - /// `Spawner`s. You may also copy `Spawner`s. - pub fn spawner(&'static self) -> super::Spawner { - super::Spawner::new(self) - } -} - -/// Wake a task by raw pointer. -/// -/// You can obtain task pointers from `Waker`s using [`task_from_waker`]. -/// -/// # Safety -/// -/// `task` must be a valid task pointer obtained from [`task_from_waker`]. -pub unsafe fn wake_task(task: NonNull) { - critical_section::with(|cs| { - let header = task.as_ref(); - let state = header.state.load(Ordering::Relaxed); - - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - return; - } - - // Mark it as scheduled - header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); - - // We have just marked the task as scheduled, so enqueue it. - let executor = &*header.executor.get(); - executor.enqueue(cs, task); - }) -} - -#[cfg(feature = "time")] -pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) { - let task = waker::task_from_waker(waker); - let task = task.as_ref(); - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); -} diff --git a/embassy-executor/src/executor/raw/run_queue.rs b/embassy-executor/src/executor/raw/run_queue.rs deleted file mode 100644 index ed8c82a5c..000000000 --- a/embassy-executor/src/executor/raw/run_queue.rs +++ /dev/null @@ -1,74 +0,0 @@ -use core::ptr; -use core::ptr::NonNull; - -use atomic_polyfill::{AtomicPtr, Ordering}; -use critical_section::CriticalSection; - -use super::TaskHeader; - -pub(crate) struct RunQueueItem { - next: AtomicPtr, -} - -impl RunQueueItem { - pub const fn new() -> Self { - Self { - next: AtomicPtr::new(ptr::null_mut()), - } - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't create fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - head: AtomicPtr, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - /// - /// # Safety - /// - /// `item` must NOT be already enqueued in any queue. - #[inline(always)] - pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull) -> bool { - let prev = self.head.load(Ordering::Relaxed); - task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); - self.head.store(task.as_ptr(), Ordering::Relaxed); - prev.is_null() - } - - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull)) { - // Atomically empty the queue. - let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); - - // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = NonNull::new(ptr) { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); - - on_task(task); - - ptr = next - } - } -} diff --git a/embassy-executor/src/executor/raw/timer_queue.rs b/embassy-executor/src/executor/raw/timer_queue.rs deleted file mode 100644 index 62fcfc531..000000000 --- a/embassy-executor/src/executor/raw/timer_queue.rs +++ /dev/null @@ -1,85 +0,0 @@ -use core::cell::Cell; -use core::cmp::min; -use core::ptr; -use core::ptr::NonNull; - -use atomic_polyfill::Ordering; - -use super::{TaskHeader, STATE_TIMER_QUEUED}; -use crate::time::Instant; - -pub(crate) struct TimerQueueItem { - next: Cell<*mut TaskHeader>, -} - -impl TimerQueueItem { - pub const fn new() -> Self { - Self { - next: Cell::new(ptr::null_mut()), - } - } -} - -pub(crate) struct TimerQueue { - head: Cell<*mut TaskHeader>, -} - -impl TimerQueue { - pub const fn new() -> Self { - Self { - head: Cell::new(ptr::null_mut()), - } - } - - pub(crate) unsafe fn update(&self, p: NonNull) { - let task = p.as_ref(); - if task.expires_at.get() != Instant::MAX { - let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); - let is_new = old_state & STATE_TIMER_QUEUED == 0; - - if is_new { - task.timer_queue_item.next.set(self.head.get()); - self.head.set(p.as_ptr()); - } - } - } - - pub(crate) unsafe fn next_expiration(&self) -> Instant { - let mut res = Instant::MAX; - self.retain(|p| { - let task = p.as_ref(); - let expires = task.expires_at.get(); - res = min(res, expires); - expires != Instant::MAX - }); - res - } - - pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull)) { - self.retain(|p| { - let task = p.as_ref(); - if task.expires_at.get() <= now { - on_task(p); - false - } else { - true - } - }); - } - - pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull) -> bool) { - let mut prev = &self.head; - while !prev.get().is_null() { - let p = NonNull::new_unchecked(prev.get()); - let task = &*p.as_ptr(); - if f(p) { - // Skip to next - prev = &task.timer_queue_item.next; - } else { - // Remove it - prev.set(task.timer_queue_item.next.get()); - task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); - } - } - } -} diff --git a/embassy-executor/src/executor/raw/util.rs b/embassy-executor/src/executor/raw/util.rs deleted file mode 100644 index ed5822188..000000000 --- a/embassy-executor/src/executor/raw/util.rs +++ /dev/null @@ -1,33 +0,0 @@ -use core::cell::UnsafeCell; -use core::mem::MaybeUninit; -use core::ptr; - -pub(crate) struct UninitCell(MaybeUninit>); -impl UninitCell { - pub const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - - pub unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - #[allow(clippy::mut_from_ref)] - pub unsafe fn as_mut(&self) -> &mut T { - &mut *self.as_mut_ptr() - } - - pub unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) - } - - pub unsafe fn drop_in_place(&self) { - ptr::drop_in_place(self.as_mut_ptr()) - } -} - -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} diff --git a/embassy-executor/src/executor/raw/waker.rs b/embassy-executor/src/executor/raw/waker.rs deleted file mode 100644 index 6b9c03a62..000000000 --- a/embassy-executor/src/executor/raw/waker.rs +++ /dev/null @@ -1,53 +0,0 @@ -use core::mem; -use core::ptr::NonNull; -use core::task::{RawWaker, RawWakerVTable, Waker}; - -use super::{wake_task, TaskHeader}; - -const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); - -unsafe fn clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &VTABLE) -} - -unsafe fn wake(p: *const ()) { - wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) -} - -unsafe fn drop(_: *const ()) { - // nop -} - -pub(crate) unsafe fn from_task(p: NonNull) -> Waker { - Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) -} - -/// Get a task pointer from a waker. -/// -/// This can be used as an optimization in wait queues to store task pointers -/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps -/// avoid dynamic dispatch. -/// -/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). -/// -/// # Panics -/// -/// Panics if the waker is not created by the Embassy executor. -pub fn task_from_waker(waker: &Waker) -> NonNull { - // safety: OK because WakerHack has the same layout as Waker. - // This is not really guaranteed because the structs are `repr(Rust)`, it is - // indeed the case in the current implementation. - // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 - let hack: &WakerHack = unsafe { mem::transmute(waker) }; - if hack.vtable != &VTABLE { - panic!("Found waker not created by the Embassy executor. `embassy_executor::time::Timer` only works with the Embassy executor.") - } - - // safety: we never create a waker with a null data pointer. - unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } -} - -struct WakerHack { - data: *const (), - vtable: &'static RawWakerVTable, -} diff --git a/embassy-executor/src/executor/spawner.rs b/embassy-executor/src/executor/spawner.rs deleted file mode 100644 index 25a0d7dbb..000000000 --- a/embassy-executor/src/executor/spawner.rs +++ /dev/null @@ -1,202 +0,0 @@ -use core::marker::PhantomData; -use core::mem; -use core::ptr::NonNull; -use core::task::Poll; - -use futures_util::future::poll_fn; - -use super::raw; - -/// Token to spawn a newly-created task in an executor. -/// -/// When calling a task function (like `#[embassy_executor::task] async fn my_task() { ... }`), the returned -/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must -/// then spawn it into an executor, typically with [`Spawner::spawn()`]. -/// -/// The generic parameter `S` determines whether the task can be spawned in executors -/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`]. -/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`]. -/// -/// # Panics -/// -/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. -/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. -#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] -pub struct SpawnToken { - raw_task: Option>, - phantom: PhantomData<*mut S>, -} - -impl SpawnToken { - pub(crate) unsafe fn new(raw_task: NonNull) -> Self { - Self { - raw_task: Some(raw_task), - phantom: PhantomData, - } - } - - pub(crate) fn new_failed() -> Self { - Self { - raw_task: None, - phantom: PhantomData, - } - } -} - -impl Drop for SpawnToken { - fn drop(&mut self) { - // TODO deallocate the task instead. - panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") - } -} - -/// Error returned when spawning a task. -#[derive(Copy, Clone, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum SpawnError { - /// Too many instances of this task are already running. - /// - /// By default, a task marked with `#[embassy_executor::task]` can only have one instance - /// running at a time. You may allow multiple instances to run in parallel with - /// `#[embassy_executor::task(pool_size = 4)]`, at the cost of higher RAM usage. - Busy, -} - -/// Handle to spawn tasks into an executor. -/// -/// This Spawner can spawn any task (Send and non-Send ones), but it can -/// only be used in the executor thread (it is not Send itself). -/// -/// If you want to spawn tasks from another thread, use [SendSpawner]. -#[derive(Copy, Clone)] -pub struct Spawner { - executor: &'static raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl Spawner { - pub(crate) fn new(executor: &'static raw::Executor) -> Self { - Self { - executor, - not_send: PhantomData, - } - } - - /// Get a Spawner for the current executor. - /// - /// This function is `async` just to get access to the current async - /// context. It returns instantly, it does not block/yield. - /// - /// # Panics - /// - /// Panics if the current executor is not an Embassy executor. - pub async fn for_current_executor() -> Self { - poll_fn(|cx| unsafe { - let task = raw::task_from_waker(cx.waker()); - let executor = (*task.as_ptr()).executor.get(); - Poll::Ready(Self::new(&*executor)) - }) - .await - } - - /// Spawn a task into an executor. - /// - /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { - let task = token.raw_task; - mem::forget(token); - - match task { - Some(task) => { - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } - - // Used by the `embassy_macros::main!` macro to throw an error when spawn - // fails. This is here to allow conditional use of `defmt::unwrap!` - // without introducing a `defmt` feature in the `embassy_macros` package, - // which would require use of `-Z namespaced-features`. - /// Spawn a task into an executor, panicking on failure. - /// - /// # Panics - /// - /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { - unwrap!(self.spawn(token)); - } - - /// Convert this Spawner to a SendSpawner. This allows you to send the - /// spawner to other threads, but the spawner loses the ability to spawn - /// non-Send tasks. - pub fn make_send(&self) -> SendSpawner { - SendSpawner { - executor: self.executor, - } - } -} - -/// Handle to spawn tasks into an executor from any thread. -/// -/// This Spawner can be used from any thread (it is Send), but it can -/// only spawn Send tasks. The reason for this is spawning is effectively -/// "sending" the tasks to the executor thread. -/// -/// If you want to spawn non-Send tasks, use [Spawner]. -#[derive(Copy, Clone)] -pub struct SendSpawner { - executor: &'static raw::Executor, -} - -unsafe impl Send for SendSpawner {} -unsafe impl Sync for SendSpawner {} - -impl SendSpawner { - pub(crate) fn new(executor: &'static raw::Executor) -> Self { - Self { executor } - } - - /// Get a Spawner for the current executor. - /// - /// This function is `async` just to get access to the current async - /// context. It returns instantly, it does not block/yield. - /// - /// # Panics - /// - /// Panics if the current executor is not an Embassy executor. - pub async fn for_current_executor() -> Self { - poll_fn(|cx| unsafe { - let task = raw::task_from_waker(cx.waker()); - let executor = (*task.as_ptr()).executor.get(); - Poll::Ready(Self::new(&*executor)) - }) - .await - } - - /// Spawn a task into an executor. - /// - /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { - let header = token.raw_task; - mem::forget(token); - - match header { - Some(header) => { - unsafe { self.executor.spawn(header) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } - - /// Spawn a task into an executor, panicking on failure. - /// - /// # Panics - /// - /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { - unwrap!(self.spawn(token)); - } -} -- cgit