From ae179d49af2c2758d5e6d9aab9ffd7ecd5ab70ae Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 25 Aug 2021 00:20:29 +0200 Subject: executor: improve module structure --- embassy/src/executor/arch/arm.rs | 76 +++++++++ embassy/src/executor/mod.rs | 186 +--------------------- embassy/src/executor/raw.rs | 271 -------------------------------- embassy/src/executor/raw/mod.rs | 260 ++++++++++++++++++++++++++++++ embassy/src/executor/raw/run_queue.rs | 71 +++++++++ embassy/src/executor/raw/timer_queue.rs | 88 +++++++++++ embassy/src/executor/raw/util.rs | 32 ++++ embassy/src/executor/raw/waker.rs | 36 +++++ embassy/src/executor/run_queue.rs | 71 --------- embassy/src/executor/spawner.rs | 128 +++++++++++++++ embassy/src/executor/timer_queue.rs | 88 ----------- embassy/src/executor/util.rs | 32 ---- embassy/src/executor/waker.rs | 36 ----- 13 files changed, 696 insertions(+), 679 deletions(-) create mode 100644 embassy/src/executor/arch/arm.rs delete mode 100644 embassy/src/executor/raw.rs create mode 100644 embassy/src/executor/raw/mod.rs create mode 100644 embassy/src/executor/raw/run_queue.rs create mode 100644 embassy/src/executor/raw/timer_queue.rs create mode 100644 embassy/src/executor/raw/util.rs create mode 100644 embassy/src/executor/raw/waker.rs delete mode 100644 embassy/src/executor/run_queue.rs create mode 100644 embassy/src/executor/spawner.rs delete mode 100644 embassy/src/executor/timer_queue.rs delete mode 100644 embassy/src/executor/util.rs delete mode 100644 embassy/src/executor/waker.rs diff --git a/embassy/src/executor/arch/arm.rs b/embassy/src/executor/arch/arm.rs new file mode 100644 index 000000000..4fd734cd7 --- /dev/null +++ b/embassy/src/executor/arch/arm.rs @@ -0,0 +1,76 @@ +use core::marker::PhantomData; +use core::ptr; + +use super::{raw, Spawner}; +use crate::interrupt::{Interrupt, InterruptExt}; + +pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Executor { + pub fn new() -> Self { + Self { + inner: raw::Executor::new(|_| cortex_m::asm::sev(), ptr::null_mut()), + not_send: PhantomData, + } + } + + /// Runs the executor. + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(unsafe { self.inner.spawner() }); + + loop { + unsafe { self.inner.run_queued() }; + cortex_m::asm::wfe(); + } + } +} + +fn pend_by_number(n: u16) { + #[derive(Clone, Copy)] + struct N(u16); + unsafe impl cortex_m::interrupt::InterruptNumber for N { + fn number(self) -> u16 { + self.0 + } + } + cortex_m::peripheral::NVIC::pend(N(n)) +} + +pub struct InterruptExecutor { + irq: I, + inner: raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl InterruptExecutor { + pub fn new(irq: I) -> Self { + let ctx = irq.number() as *mut (); + Self { + irq, + inner: raw::Executor::new(|ctx| pend_by_number(ctx as u16), ctx), + not_send: PhantomData, + } + } + + /// Start the executor. + /// + /// `init` is called in the interrupt context, then the interrupt is + /// configured to run the executor. + pub fn start(&'static mut self, init: impl FnOnce(Spawner) + Send) { + self.irq.disable(); + + init(unsafe { self.inner.spawner() }); + + self.irq.set_handler(|ctx| unsafe { + let executor = &*(ctx as *const raw::Executor); + executor.run_queued(); + }); + self.irq.set_handler_context(&self.inner as *const _ as _); + self.irq.enable(); + } +} diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index f3c877290..e0ac566f1 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs @@ -1,183 +1,7 @@ -use core::marker::PhantomData; -use core::ptr::NonNull; -use core::{mem, ptr}; - +#[path = "arch/arm.rs"] +mod arch; pub mod raw; -mod run_queue; -#[cfg(feature = "time")] -mod timer_queue; -mod util; -mod waker; - -use crate::interrupt::{Interrupt, InterruptExt}; - -#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] -pub struct SpawnToken { - raw_task: Option>, - phantom: PhantomData<*mut F>, -} - -impl Drop for SpawnToken { - fn drop(&mut self) { - // TODO deallocate the task instead. - panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") - } -} - -#[derive(Copy, Clone, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum SpawnError { - Busy, -} - -/// Handle to spawn tasks into an executor. -/// -/// This Spawner can spawn any task (Send and non-Send ones), but it can -/// only be used in the executor thread (it is not Send itself). -/// -/// If you want to spawn tasks from another thread, use [SendSpawner]. -#[derive(Copy, Clone)] -pub struct Spawner { - executor: &'static raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl Spawner { - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { - let task = token.raw_task; - mem::forget(token); - - match task { - Some(task) => { - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } - - /// Used by the `embassy_macros::main!` macro to throw an error when spawn - /// fails. This is here to allow conditional use of `defmt::unwrap!` - /// without introducing a `defmt` feature in the `embassy_macros` package, - /// which would require use of `-Z namespaced-features`. - pub fn must_spawn(&self, token: SpawnToken) -> () { - unwrap!(self.spawn(token)); - } - - /// Convert this Spawner to a SendSpawner. This allows you to send the - /// spawner to other threads, but the spawner loses the ability to spawn - /// non-Send tasks. - pub fn make_send(&self) -> SendSpawner { - SendSpawner { - executor: self.executor, - not_send: PhantomData, - } - } -} - -/// Handle to spawn tasks into an executor from any thread. -/// -/// This Spawner can be used from any thread (it implements Send and Sync, so after any task (Send and non-Send ones), but it can -/// only be used in the executor thread (it is not Send itself). -/// -/// If you want to spawn tasks from another thread, use [SendSpawner]. -#[derive(Copy, Clone)] -pub struct SendSpawner { - executor: &'static raw::Executor, - not_send: PhantomData<*mut ()>, -} - -unsafe impl Send for SendSpawner {} -unsafe impl Sync for SendSpawner {} - -/// Handle to spawn tasks to an executor. -/// -/// This Spawner can spawn any task (Send and non-Send ones), but it can -/// only be used in the executor thread (it is not Send itself). -/// -/// If you want to spawn tasks from another thread, use [SendSpawner]. -impl SendSpawner { - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { - let header = token.raw_task; - mem::forget(token); - - match header { - Some(header) => { - unsafe { self.executor.spawn(header) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } -} - -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl Executor { - pub fn new() -> Self { - Self { - inner: raw::Executor::new(|_| cortex_m::asm::sev(), ptr::null_mut()), - not_send: PhantomData, - } - } - - /// Runs the executor. - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(unsafe { self.inner.spawner() }); - - loop { - unsafe { self.inner.run_queued() }; - cortex_m::asm::wfe(); - } - } -} - -fn pend_by_number(n: u16) { - #[derive(Clone, Copy)] - struct N(u16); - unsafe impl cortex_m::interrupt::InterruptNumber for N { - fn number(self) -> u16 { - self.0 - } - } - cortex_m::peripheral::NVIC::pend(N(n)) -} - -pub struct InterruptExecutor { - irq: I, - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} - -impl InterruptExecutor { - pub fn new(irq: I) -> Self { - let ctx = irq.number() as *mut (); - Self { - irq, - inner: raw::Executor::new(|ctx| pend_by_number(ctx as u16), ctx), - not_send: PhantomData, - } - } - - /// Start the executor. - /// - /// `init` is called in the interrupt context, then the interrupt is - /// configured to run the executor. - pub fn start(&'static mut self, init: impl FnOnce(Spawner) + Send) { - self.irq.disable(); - - init(unsafe { self.inner.spawner() }); +mod spawner; - self.irq.set_handler(|ctx| unsafe { - let executor = &*(ctx as *const raw::Executor); - executor.run_queued(); - }); - self.irq.set_handler_context(&self.inner as *const _ as _); - self.irq.enable(); - } -} +pub use arch::*; +pub use spawner::*; diff --git a/embassy/src/executor/raw.rs b/embassy/src/executor/raw.rs deleted file mode 100644 index fac46d1f4..000000000 --- a/embassy/src/executor/raw.rs +++ /dev/null @@ -1,271 +0,0 @@ -use atomic_polyfill::{AtomicU32, Ordering}; -use core::cell::Cell; -use core::future::Future; -use core::marker::PhantomData; -use core::pin::Pin; -use core::ptr::NonNull; -use core::task::{Context, Poll}; -use core::{mem, ptr}; - -use super::run_queue::{RunQueue, RunQueueItem}; -use super::util::UninitCell; -use super::waker; -use super::SpawnToken; - -#[cfg(feature = "time")] -use super::timer_queue::{TimerQueue, TimerQueueItem}; -#[cfg(feature = "time")] -use crate::time::driver::{self, AlarmHandle}; -#[cfg(feature = "time")] -use crate::time::Instant; - -/// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; -/// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -#[cfg(feature = "time")] -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; - -pub struct TaskHeader { - pub(crate) state: AtomicU32, - pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 - pub(crate) poll_fn: UninitCell)>, // Valid if STATE_SPAWNED - - #[cfg(feature = "time")] - pub(crate) expires_at: Cell, - #[cfg(feature = "time")] - pub(crate) timer_queue_item: TimerQueueItem, -} - -impl TaskHeader { - pub(crate) const fn new() -> Self { - Self { - state: AtomicU32::new(0), - run_queue_item: RunQueueItem::new(), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - - #[cfg(feature = "time")] - expires_at: Cell::new(Instant::from_ticks(0)), - #[cfg(feature = "time")] - timer_queue_item: TimerQueueItem::new(), - } - } - - pub(crate) unsafe fn enqueue(&self) { - let mut current = self.state.load(Ordering::Acquire); - loop { - // If already scheduled, or if not started, - if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { - return; - } - - // Mark it as scheduled - let new = current | STATE_RUN_QUEUED; - - match self.state.compare_exchange_weak( - current, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(next_current) => current = next_current, - } - } - - // We have just marked the task as scheduled, so enqueue it. - let executor = &*self.executor.get(); - executor.enqueue(self as *const TaskHeader as *mut TaskHeader); - } -} - -// repr(C) is needed to guarantee that the Task is located at offset 0 -// This makes it safe to cast between Task and Task pointers. -#[repr(C)] -pub struct Task { - raw: TaskHeader, - future: UninitCell, // Valid if STATE_SPAWNED -} - -impl Task { - pub const fn new() -> Self { - Self { - raw: TaskHeader::new(), - future: UninitCell::uninit(), - } - } - - pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { - for task in pool { - if task.spawn_allocate() { - return unsafe { task.spawn_initialize(future) }; - } - } - - SpawnToken { - raw_task: None, - phantom: PhantomData, - } - } - - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - if self.spawn_allocate() { - unsafe { self.spawn_initialize(future) } - } else { - SpawnToken { - raw_task: None, - phantom: PhantomData, - } - } - } - - fn spawn_allocate(&'static self) -> bool { - let state = STATE_SPAWNED | STATE_RUN_QUEUED; - self.raw - .state - .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - } - - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - // Initialize the task - self.raw.poll_fn.write(Self::poll); - self.future.write(future()); - - return SpawnToken { - raw_task: Some(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)), - phantom: PhantomData, - }; - } - - unsafe fn poll(p: NonNull) { - let this = &*(p.as_ptr() as *const Task); - - let future = Pin::new_unchecked(this.future.as_mut()); - let waker = waker::from_task(p); - let mut cx = Context::from_waker(&waker); - match future.poll(&mut cx) { - Poll::Ready(_) => { - this.future.drop_in_place(); - this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); - } - Poll::Pending => {} - } - - // the compiler is emitting a virtual call for waker drop, but we know - // it's a noop for our waker. - mem::forget(waker); - } -} - -unsafe impl Sync for Task {} - -pub struct Executor { - run_queue: RunQueue, - signal_fn: fn(*mut ()), - signal_ctx: *mut (), - - #[cfg(feature = "time")] - timer_queue: TimerQueue, - #[cfg(feature = "time")] - alarm: AlarmHandle, -} - -impl Executor { - pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { - #[cfg(feature = "time")] - let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; - #[cfg(feature = "time")] - driver::set_alarm_callback(alarm, signal_fn, signal_ctx); - - Self { - run_queue: RunQueue::new(), - signal_fn, - signal_ctx, - - #[cfg(feature = "time")] - timer_queue: TimerQueue::new(), - #[cfg(feature = "time")] - alarm, - } - } - - pub fn set_signal_ctx(&mut self, signal_ctx: *mut ()) { - self.signal_ctx = signal_ctx; - } - - unsafe fn enqueue(&self, item: *mut TaskHeader) { - if self.run_queue.enqueue(item) { - (self.signal_fn)(self.signal_ctx) - } - } - - pub unsafe fn spawn(&'static self, task: NonNull) { - let task = task.as_ref(); - task.executor.set(self); - self.enqueue(task as *const _ as _); - } - - pub unsafe fn run_queued(&'static self) { - #[cfg(feature = "time")] - self.timer_queue.dequeue_expired(Instant::now(), |p| { - p.as_ref().enqueue(); - }); - - self.run_queue.dequeue_all(|p| { - let task = p.as_ref(); - - #[cfg(feature = "time")] - task.expires_at.set(Instant::MAX); - - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } - - // Run the task - task.poll_fn.read()(p as _); - - // Enqueue or update into timer_queue - #[cfg(feature = "time")] - self.timer_queue.update(p); - }); - - #[cfg(feature = "time")] - { - // If this is in the past, set_alarm will immediately trigger the alarm, - // which will make the wfe immediately return so we do another loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - driver::set_alarm(self.alarm, next_expiration.as_ticks()); - } - } - - pub unsafe fn spawner(&'static self) -> super::Spawner { - super::Spawner { - executor: self, - not_send: PhantomData, - } - } -} - -pub use super::waker::task_from_waker; - -pub unsafe fn wake_task(task: NonNull) { - task.as_ref().enqueue(); -} - -#[cfg(feature = "time")] -pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) { - let task = waker::task_from_waker(waker); - let task = task.as_ref(); - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); -} diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs new file mode 100644 index 000000000..235a09198 --- /dev/null +++ b/embassy/src/executor/raw/mod.rs @@ -0,0 +1,260 @@ +mod run_queue; +#[cfg(feature = "time")] +mod timer_queue; +mod util; +mod waker; + +use atomic_polyfill::{AtomicU32, Ordering}; +use core::cell::Cell; +use core::future::Future; +use core::pin::Pin; +use core::ptr::NonNull; +use core::task::{Context, Poll}; +use core::{mem, ptr}; + +use self::run_queue::{RunQueue, RunQueueItem}; +use self::util::UninitCell; +use super::SpawnToken; +#[cfg(feature = "time")] +use crate::time::driver::{self, AlarmHandle}; +#[cfg(feature = "time")] +use crate::time::Instant; + +pub use self::waker::task_from_waker; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "time")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub struct TaskHeader { + pub(crate) state: AtomicU32, + pub(crate) run_queue_item: RunQueueItem, + pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + pub(crate) poll_fn: UninitCell)>, // Valid if STATE_SPAWNED + + #[cfg(feature = "time")] + pub(crate) expires_at: Cell, + #[cfg(feature = "time")] + pub(crate) timer_queue_item: timer_queue::TimerQueueItem, +} + +impl TaskHeader { + pub(crate) const fn new() -> Self { + Self { + state: AtomicU32::new(0), + run_queue_item: RunQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + + #[cfg(feature = "time")] + expires_at: Cell::new(Instant::from_ticks(0)), + #[cfg(feature = "time")] + timer_queue_item: timer_queue::TimerQueueItem::new(), + } + } + + pub(crate) unsafe fn enqueue(&self) { + let mut current = self.state.load(Ordering::Acquire); + loop { + // If already scheduled, or if not started, + if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { + return; + } + + // Mark it as scheduled + let new = current | STATE_RUN_QUEUED; + + match self.state.compare_exchange_weak( + current, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(next_current) => current = next_current, + } + } + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*self.executor.get(); + executor.enqueue(self as *const TaskHeader as *mut TaskHeader); + } +} + +// repr(C) is needed to guarantee that the Task is located at offset 0 +// This makes it safe to cast between Task and Task pointers. +#[repr(C)] +pub struct Task { + raw: TaskHeader, + future: UninitCell, // Valid if STATE_SPAWNED +} + +impl Task { + pub const fn new() -> Self { + Self { + raw: TaskHeader::new(), + future: UninitCell::uninit(), + } + } + + pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { + for task in pool { + if task.spawn_allocate() { + return unsafe { task.spawn_initialize(future) }; + } + } + + SpawnToken::new_failed() + } + + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + if self.spawn_allocate() { + unsafe { self.spawn_initialize(future) } + } else { + SpawnToken::new_failed() + } + } + + fn spawn_allocate(&'static self) -> bool { + let state = STATE_SPAWNED | STATE_RUN_QUEUED; + self.raw + .state + .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + // Initialize the task + self.raw.poll_fn.write(Self::poll); + self.future.write(future()); + + SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) + } + + unsafe fn poll(p: NonNull) { + let this = &*(p.as_ptr() as *const Task); + + let future = Pin::new_unchecked(this.future.as_mut()); + let waker = waker::from_task(p); + let mut cx = Context::from_waker(&waker); + match future.poll(&mut cx) { + Poll::Ready(_) => { + this.future.drop_in_place(); + this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + } + Poll::Pending => {} + } + + // the compiler is emitting a virtual call for waker drop, but we know + // it's a noop for our waker. + mem::forget(waker); + } +} + +unsafe impl Sync for Task {} + +pub struct Executor { + run_queue: RunQueue, + signal_fn: fn(*mut ()), + signal_ctx: *mut (), + + #[cfg(feature = "time")] + pub(crate) timer_queue: timer_queue::TimerQueue, + #[cfg(feature = "time")] + alarm: AlarmHandle, +} + +impl Executor { + pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + #[cfg(feature = "time")] + let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; + #[cfg(feature = "time")] + driver::set_alarm_callback(alarm, signal_fn, signal_ctx); + + Self { + run_queue: RunQueue::new(), + signal_fn, + signal_ctx, + + #[cfg(feature = "time")] + timer_queue: timer_queue::TimerQueue::new(), + #[cfg(feature = "time")] + alarm, + } + } + + pub fn set_signal_ctx(&mut self, signal_ctx: *mut ()) { + self.signal_ctx = signal_ctx; + } + + unsafe fn enqueue(&self, item: *mut TaskHeader) { + if self.run_queue.enqueue(item) { + (self.signal_fn)(self.signal_ctx) + } + } + + pub unsafe fn spawn(&'static self, task: NonNull) { + let task = task.as_ref(); + task.executor.set(self); + self.enqueue(task as *const _ as _); + } + + pub unsafe fn run_queued(&'static self) { + #[cfg(feature = "time")] + self.timer_queue.dequeue_expired(Instant::now(), |p| { + p.as_ref().enqueue(); + }); + + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); + + #[cfg(feature = "time")] + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + // Run the task + task.poll_fn.read()(p as _); + + // Enqueue or update into timer_queue + #[cfg(feature = "time")] + self.timer_queue.update(p); + }); + + #[cfg(feature = "time")] + { + // If this is in the past, set_alarm will immediately trigger the alarm, + // which will make the wfe immediately return so we do another loop iteration. + let next_expiration = self.timer_queue.next_expiration(); + driver::set_alarm(self.alarm, next_expiration.as_ticks()); + } + } + + pub unsafe fn spawner(&'static self) -> super::Spawner { + super::Spawner::new(self) + } +} + +pub unsafe fn wake_task(task: NonNull) { + task.as_ref().enqueue(); +} + +#[cfg(feature = "time")] +pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) { + let task = waker::task_from_waker(waker); + let task = task.as_ref(); + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); +} diff --git a/embassy/src/executor/raw/run_queue.rs b/embassy/src/executor/raw/run_queue.rs new file mode 100644 index 000000000..8e8bc8ff3 --- /dev/null +++ b/embassy/src/executor/raw/run_queue.rs @@ -0,0 +1,71 @@ +use atomic_polyfill::{AtomicPtr, Ordering}; +use core::ptr; +use core::ptr::NonNull; + +use super::TaskHeader; + +pub(crate) struct RunQueueItem { + next: AtomicPtr, +} + +impl RunQueueItem { + pub const fn new() -> Self { + Self { + next: AtomicPtr::new(ptr::null_mut()), + } + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't crate fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + head: AtomicPtr, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool { + let mut prev = self.head.load(Ordering::Acquire); + loop { + (*item).run_queue_item.next.store(prev, Ordering::Relaxed); + match self + .head + .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(next_prev) => prev = next_prev, + } + } + + prev.is_null() + } + + pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull)) { + let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + while !task.is_null() { + // If the task re-enqueues itself, the `next` pointer will get overwritten. + // Therefore, first read the next pointer, and only then process the task. + let next = (*task).run_queue_item.next.load(Ordering::Relaxed); + + on_task(NonNull::new_unchecked(task)); + + task = next + } + } +} diff --git a/embassy/src/executor/raw/timer_queue.rs b/embassy/src/executor/raw/timer_queue.rs new file mode 100644 index 000000000..e96910bb0 --- /dev/null +++ b/embassy/src/executor/raw/timer_queue.rs @@ -0,0 +1,88 @@ +use atomic_polyfill::Ordering; +use core::cell::Cell; +use core::cmp::min; +use core::ptr; +use core::ptr::NonNull; + +use super::{TaskHeader, STATE_TIMER_QUEUED}; +use crate::time::Instant; + +pub(crate) struct TimerQueueItem { + next: Cell<*mut TaskHeader>, +} + +impl TimerQueueItem { + pub const fn new() -> Self { + Self { + next: Cell::new(ptr::null_mut()), + } + } +} + +pub(crate) struct TimerQueue { + head: Cell<*mut TaskHeader>, +} + +impl TimerQueue { + pub const fn new() -> Self { + Self { + head: Cell::new(ptr::null_mut()), + } + } + + pub(crate) unsafe fn update(&self, p: NonNull) { + let task = p.as_ref(); + if task.expires_at.get() != Instant::MAX { + let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); + let is_new = old_state & STATE_TIMER_QUEUED == 0; + + if is_new { + task.timer_queue_item.next.set(self.head.get()); + self.head.set(p.as_ptr()); + } + } + } + + pub(crate) unsafe fn next_expiration(&self) -> Instant { + let mut res = Instant::MAX; + self.retain(|p| { + let task = p.as_ref(); + let expires = task.expires_at.get(); + res = min(res, expires); + expires != Instant::MAX + }); + res + } + + pub(crate) unsafe fn dequeue_expired( + &self, + now: Instant, + on_task: impl Fn(NonNull), + ) { + self.retain(|p| { + let task = p.as_ref(); + if task.expires_at.get() <= now { + on_task(p); + false + } else { + true + } + }); + } + + pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull) -> bool) { + let mut prev = &self.head; + while !prev.get().is_null() { + let p = NonNull::new_unchecked(prev.get()); + let task = &*p.as_ptr(); + if f(p) { + // Skip to next + prev = &task.timer_queue_item.next; + } else { + // Remove it + prev.set(task.timer_queue_item.next.get()); + task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + } + } + } +} diff --git a/embassy/src/executor/raw/util.rs b/embassy/src/executor/raw/util.rs new file mode 100644 index 000000000..ca15b6955 --- /dev/null +++ b/embassy/src/executor/raw/util.rs @@ -0,0 +1,32 @@ +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::ptr; + +pub(crate) struct UninitCell(MaybeUninit>); +impl UninitCell { + pub const fn uninit() -> Self { + Self(MaybeUninit::uninit()) + } + + pub unsafe fn as_mut_ptr(&self) -> *mut T { + (*self.0.as_ptr()).get() + } + + pub unsafe fn as_mut(&self) -> &mut T { + &mut *self.as_mut_ptr() + } + + pub unsafe fn write(&self, val: T) { + ptr::write(self.as_mut_ptr(), val) + } + + pub unsafe fn drop_in_place(&self) { + ptr::drop_in_place(self.as_mut_ptr()) + } +} + +impl UninitCell { + pub unsafe fn read(&self) -> T { + ptr::read(self.as_mut_ptr()) + } +} diff --git a/embassy/src/executor/raw/waker.rs b/embassy/src/executor/raw/waker.rs new file mode 100644 index 000000000..e53190f17 --- /dev/null +++ b/embassy/src/executor/raw/waker.rs @@ -0,0 +1,36 @@ +use core::mem; +use core::ptr::NonNull; +use core::task::{RawWaker, RawWakerVTable, Waker}; + +use super::TaskHeader; + +const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); + +unsafe fn clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &VTABLE) +} + +unsafe fn wake(p: *const ()) { + (*(p as *mut TaskHeader)).enqueue() +} + +unsafe fn drop(_: *const ()) { + // nop +} + +pub(crate) unsafe fn from_task(p: NonNull) -> Waker { + Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) +} + +pub unsafe fn task_from_waker(waker: &Waker) -> NonNull { + let hack: &WakerHack = mem::transmute(waker); + if hack.vtable != &VTABLE { + panic!("Found waker not created by the embassy executor. Consider enabling the `executor-agnostic` feature on the `embassy` crate.") + } + NonNull::new_unchecked(hack.data as *mut TaskHeader) +} + +struct WakerHack { + data: *const (), + vtable: &'static RawWakerVTable, +} diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs deleted file mode 100644 index 083916139..000000000 --- a/embassy/src/executor/run_queue.rs +++ /dev/null @@ -1,71 +0,0 @@ -use atomic_polyfill::{AtomicPtr, Ordering}; -use core::ptr; -use core::ptr::NonNull; - -use super::raw::TaskHeader; - -pub(crate) struct RunQueueItem { - next: AtomicPtr, -} - -impl RunQueueItem { - pub const fn new() -> Self { - Self { - next: AtomicPtr::new(ptr::null_mut()), - } - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't crate fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - head: AtomicPtr, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool { - let mut prev = self.head.load(Ordering::Acquire); - loop { - (*item).run_queue_item.next.store(prev, Ordering::Relaxed); - match self - .head - .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => break, - Err(next_prev) => prev = next_prev, - } - } - - prev.is_null() - } - - pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull)) { - let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); - - while !task.is_null() { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - let next = (*task).run_queue_item.next.load(Ordering::Relaxed); - - on_task(NonNull::new_unchecked(task)); - - task = next - } - } -} diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs new file mode 100644 index 000000000..36100aecb --- /dev/null +++ b/embassy/src/executor/spawner.rs @@ -0,0 +1,128 @@ +use core::marker::PhantomData; +use core::mem; +use core::ptr::NonNull; + +use super::raw; + +#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] +pub struct SpawnToken { + raw_task: Option>, + phantom: PhantomData<*mut F>, +} + +impl SpawnToken { + pub(crate) unsafe fn new(raw_task: NonNull) -> Self { + Self { + raw_task: Some(raw_task), + phantom: PhantomData, + } + } + + pub(crate) fn new_failed() -> Self { + Self { + raw_task: None, + phantom: PhantomData, + } + } +} + +impl Drop for SpawnToken { + fn drop(&mut self) { + // TODO deallocate the task instead. + panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") + } +} + +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum SpawnError { + Busy, +} + +/// Handle to spawn tasks into an executor. +/// +/// This Spawner can spawn any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +#[derive(Copy, Clone)] +pub struct Spawner { + executor: &'static raw::Executor, + not_send: PhantomData<*mut ()>, +} + +impl Spawner { + pub(crate) unsafe fn new(executor: &'static raw::Executor) -> Self { + Self { + executor, + not_send: PhantomData, + } + } + + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; + mem::forget(token); + + match task { + Some(task) => { + unsafe { self.executor.spawn(task) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } + + /// Used by the `embassy_macros::main!` macro to throw an error when spawn + /// fails. This is here to allow conditional use of `defmt::unwrap!` + /// without introducing a `defmt` feature in the `embassy_macros` package, + /// which would require use of `-Z namespaced-features`. + pub fn must_spawn(&self, token: SpawnToken) -> () { + unwrap!(self.spawn(token)); + } + + /// Convert this Spawner to a SendSpawner. This allows you to send the + /// spawner to other threads, but the spawner loses the ability to spawn + /// non-Send tasks. + pub fn make_send(&self) -> SendSpawner { + SendSpawner { + executor: self.executor, + not_send: PhantomData, + } + } +} + +/// Handle to spawn tasks into an executor from any thread. +/// +/// This Spawner can be used from any thread (it implements Send and Sync, so after any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +#[derive(Copy, Clone)] +pub struct SendSpawner { + executor: &'static raw::Executor, + not_send: PhantomData<*mut ()>, +} + +unsafe impl Send for SendSpawner {} +unsafe impl Sync for SendSpawner {} + +/// Handle to spawn tasks to an executor. +/// +/// This Spawner can spawn any task (Send and non-Send ones), but it can +/// only be used in the executor thread (it is not Send itself). +/// +/// If you want to spawn tasks from another thread, use [SendSpawner]. +impl SendSpawner { + pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + let header = token.raw_task; + mem::forget(token); + + match header { + Some(header) => { + unsafe { self.executor.spawn(header) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } +} diff --git a/embassy/src/executor/timer_queue.rs b/embassy/src/executor/timer_queue.rs deleted file mode 100644 index 76bc27ad4..000000000 --- a/embassy/src/executor/timer_queue.rs +++ /dev/null @@ -1,88 +0,0 @@ -use atomic_polyfill::Ordering; -use core::cell::Cell; -use core::cmp::min; -use core::ptr; -use core::ptr::NonNull; - -use super::raw::{TaskHeader, STATE_TIMER_QUEUED}; -use crate::time::Instant; - -pub(crate) struct TimerQueueItem { - next: Cell<*mut TaskHeader>, -} - -impl TimerQueueItem { - pub const fn new() -> Self { - Self { - next: Cell::new(ptr::null_mut()), - } - } -} - -pub(crate) struct TimerQueue { - head: Cell<*mut TaskHeader>, -} - -impl TimerQueue { - pub const fn new() -> Self { - Self { - head: Cell::new(ptr::null_mut()), - } - } - - pub(crate) unsafe fn update(&self, p: NonNull) { - let task = p.as_ref(); - if task.expires_at.get() != Instant::MAX { - let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); - let is_new = old_state & STATE_TIMER_QUEUED == 0; - - if is_new { - task.timer_queue_item.next.set(self.head.get()); - self.head.set(p.as_ptr()); - } - } - } - - pub(crate) unsafe fn next_expiration(&self) -> Instant { - let mut res = Instant::MAX; - self.retain(|p| { - let task = p.as_ref(); - let expires = task.expires_at.get(); - res = min(res, expires); - expires != Instant::MAX - }); - res - } - - pub(crate) unsafe fn dequeue_expired( - &self, - now: Instant, - on_task: impl Fn(NonNull), - ) { - self.retain(|p| { - let task = p.as_ref(); - if task.expires_at.get() <= now { - on_task(p); - false - } else { - true - } - }); - } - - pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull) -> bool) { - let mut prev = &self.head; - while !prev.get().is_null() { - let p = NonNull::new_unchecked(prev.get()); - let task = &*p.as_ptr(); - if f(p) { - // Skip to next - prev = &task.timer_queue_item.next; - } else { - // Remove it - prev.set(task.timer_queue_item.next.get()); - task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); - } - } - } -} diff --git a/embassy/src/executor/util.rs b/embassy/src/executor/util.rs deleted file mode 100644 index ca15b6955..000000000 --- a/embassy/src/executor/util.rs +++ /dev/null @@ -1,32 +0,0 @@ -use core::cell::UnsafeCell; -use core::mem::MaybeUninit; -use core::ptr; - -pub(crate) struct UninitCell(MaybeUninit>); -impl UninitCell { - pub const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - - pub unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_mut(&self) -> &mut T { - &mut *self.as_mut_ptr() - } - - pub unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) - } - - pub unsafe fn drop_in_place(&self) { - ptr::drop_in_place(self.as_mut_ptr()) - } -} - -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs deleted file mode 100644 index ea5b501f1..000000000 --- a/embassy/src/executor/waker.rs +++ /dev/null @@ -1,36 +0,0 @@ -use core::mem; -use core::ptr::NonNull; -use core::task::{RawWaker, RawWakerVTable, Waker}; - -use super::raw::TaskHeader; - -const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); - -unsafe fn clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &VTABLE) -} - -unsafe fn wake(p: *const ()) { - (*(p as *mut TaskHeader)).enqueue() -} - -unsafe fn drop(_: *const ()) { - // nop -} - -pub(crate) unsafe fn from_task(p: NonNull) -> Waker { - Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) -} - -pub unsafe fn task_from_waker(waker: &Waker) -> NonNull { - let hack: &WakerHack = mem::transmute(waker); - if hack.vtable != &VTABLE { - panic!("Found waker not created by the embassy executor. Consider enabling the `executor-agnostic` feature on the `embassy` crate.") - } - NonNull::new_unchecked(hack.data as *mut TaskHeader) -} - -struct WakerHack { - data: *const (), - vtable: &'static RawWakerVTable, -} -- cgit