From a0487380da42a71ab7532e2bc1befd1039c18a78 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 22 Sep 2022 16:42:49 +0200 Subject: Replace futures::future::poll_fn -> core::future::poll_fn. --- embassy-executor/src/spawner.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 25a0d7dbb..400d973ff 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -1,10 +1,9 @@ +use core::future::poll_fn; use core::marker::PhantomData; use core::mem; use core::ptr::NonNull; use core::task::Poll; -use futures_util::future::poll_fn; - use super::raw; /// Token to spawn a newly-created task in an executor. -- cgit From 4d5550070fe5e80ff2296a71239c568c774b9ceb Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 24 Oct 2022 09:17:43 +0300 Subject: Change time Driver contract to never fire the alarm synchronously --- embassy-executor/src/raw/mod.rs | 72 +++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 32 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e1258ebb5..5bcb1e6e7 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -354,46 +354,54 @@ impl Executor { /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. pub unsafe fn poll(&'static self) { - #[cfg(feature = "integrated-timers")] - self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); + loop { + #[cfg(feature = "integrated-timers")] + self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); - self.run_queue.dequeue_all(|p| { - let task = p.as_ref(); + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); - #[cfg(feature = "integrated-timers")] - task.expires_at.set(Instant::MAX); - - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } + #[cfg(feature = "integrated-timers")] + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } - #[cfg(feature = "rtos-trace")] - trace::task_exec_begin(p.as_ptr() as u32); + #[cfg(feature = "rtos-trace")] + trace::task_exec_begin(p.as_ptr() as u32); - // Run the task - task.poll_fn.read()(p as _); + // Run the task + task.poll_fn.read()(p as _); - #[cfg(feature = "rtos-trace")] - trace::task_exec_end(); + #[cfg(feature = "rtos-trace")] + trace::task_exec_end(); + + // Enqueue or update into timer_queue + #[cfg(feature = "integrated-timers")] + self.timer_queue.update(p); + }); - // Enqueue or update into timer_queue #[cfg(feature = "integrated-timers")] - self.timer_queue.update(p); - }); + { + // If this is already in the past, set_alarm might return false + // In that case do another poll loop iteration. + let next_expiration = self.timer_queue.next_expiration(); + if driver::set_alarm(self.alarm, next_expiration.as_ticks()) { + break; + } + } - #[cfg(feature = "integrated-timers")] - { - // If this is already in the past, set_alarm will immediately trigger the alarm. - // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, - // so we immediately do another poll loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - driver::set_alarm(self.alarm, next_expiration.as_ticks()); + #[cfg(not(feature = "integrated-timers"))] + { + break; + } } #[cfg(feature = "rtos-trace")] -- cgit From 560eecdb737642e6aba132408039195c7d6b6dd8 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Wed, 26 Oct 2022 18:05:27 +0300 Subject: Remove the _embassy_time_schedule_wake magic --- embassy-executor/src/raw/mod.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 5bcb1e6e7..181dabe8e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -444,14 +444,21 @@ pub unsafe fn wake_task(task: NonNull) { } #[cfg(feature = "integrated-timers")] -#[no_mangle] -unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { - let task = waker::task_from_waker(waker); - let task = task.as_ref(); - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); +struct TimerQueue; + +#[cfg(feature = "integrated-timers")] +impl embassy_time::queue::TimerQueue for TimerQueue { + fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { + let task = waker::task_from_waker(waker); + let task = unsafe { task.as_ref() }; + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); + } } +#[cfg(feature = "integrated-timers")] +embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); + #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for Executor { fn task_list() { -- cgit From 6e1120e17e384a04cd3aef58a1467a4a0a862ba5 Mon Sep 17 00:00:00 2001 From: Sijmen Woutersen Date: Sun, 25 Sep 2022 20:10:11 +0200 Subject: riscv support --- embassy-executor/src/arch/riscv32.rs | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index 2a4b006da..e095c0ee0 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -54,20 +54,7 @@ impl Executor { loop { unsafe { self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - //will only set this value to true. - critical_section::with(|_| { - // if there is work to do, loop back to polling - // TODO can we relax this? - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } - // if not, wait for interrupt - else { - core::arch::asm!("wfi"); - } - }); - // if an interrupt occurred while waiting, it will be serviced here + core::arch::asm!("wfi"); } } } -- cgit From e70ae71eccf370c42043fa323002c14f94a16679 Mon Sep 17 00:00:00 2001 From: Sijmen Woutersen Date: Sat, 12 Nov 2022 10:56:49 +0100 Subject: restore SIGNAL_WORK_THREAD_MODE --- embassy-executor/src/arch/riscv32.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index e095c0ee0..76eb8b114 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -54,7 +54,12 @@ impl Executor { loop { unsafe { self.inner.poll(); - core::arch::asm!("wfi"); + // we do not care about race conditions between the load and store operations, interrupts + // will only set this value to true. + // if there is work to do, loop back to polling + if !SIGNAL_WORK_THREAD_MODE.fetch_and(false, Ordering::SeqCst) { + core::arch::asm!("wfi"); + } } } } -- cgit From 50c5cc5db64f7ddf8566626f92c0694ac9ad984e Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Wed, 23 Nov 2022 13:17:05 +0100 Subject: fix: revert race condition introduced for riscv --- embassy-executor/src/arch/riscv32.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index 76eb8b114..2a4b006da 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -55,11 +55,19 @@ impl Executor { unsafe { self.inner.poll(); // we do not care about race conditions between the load and store operations, interrupts - // will only set this value to true. - // if there is work to do, loop back to polling - if !SIGNAL_WORK_THREAD_MODE.fetch_and(false, Ordering::SeqCst) { - core::arch::asm!("wfi"); - } + //will only set this value to true. + critical_section::with(|_| { + // if there is work to do, loop back to polling + // TODO can we relax this? + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } + // if not, wait for interrupt + else { + core::arch::asm!("wfi"); + } + }); + // if an interrupt occurred while waiting, it will be serviced here } } } -- cgit From 04a7d976733e021395ff26e26dfa983e67b773a0 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 22 Nov 2022 22:04:42 +0100 Subject: refactor: autodetect macro variant Export all main macro per target architecture from embassy-macros, and select the appropriate macro in embassy-executor. --- embassy-executor/src/lib.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index e4cbd04b9..4c7e2f4cd 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -8,18 +8,22 @@ pub(crate) mod fmt; #[cfg(feature = "nightly")] -pub use embassy_macros::{main, task}; +pub use embassy_macros::task; cfg_if::cfg_if! { if #[cfg(cortex_m)] { #[path="arch/cortex_m.rs"] mod arch; pub use arch::*; + #[cfg(feature = "nightly")] + pub use embassy_macros::main_cortex_m as main; } else if #[cfg(target_arch="riscv32")] { #[path="arch/riscv32.rs"] mod arch; pub use arch::*; + #[cfg(feature = "nightly")] + pub use embassy_macros::main_riscv as main; } else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { #[path="arch/xtensa.rs"] @@ -30,11 +34,15 @@ cfg_if::cfg_if! { #[path="arch/wasm.rs"] mod arch; pub use arch::*; + #[cfg(feature = "nightly")] + pub use embassy_macros::main_wasm as main; } else if #[cfg(feature="std")] { #[path="arch/std.rs"] mod arch; pub use arch::*; + #[cfg(feature = "nightly")] + pub use embassy_macros::main_std as main; } } -- cgit From 10c9cc31b14a356e58833fd6c81456251ab3fce9 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 23 Dec 2022 20:46:49 +0100 Subject: Remove unnecessary use of atomic-polyfill. Only use it when CAS is actually needed. --- embassy-executor/src/arch/riscv32.rs | 3 +-- embassy-executor/src/arch/xtensa.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index 2a4b006da..e97a56cda 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -1,7 +1,6 @@ use core::marker::PhantomData; use core::ptr; - -use atomic_polyfill::{AtomicBool, Ordering}; +use core::sync::atomic::{AtomicBool, Ordering}; use super::{raw, Spawner}; diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs index f908aaa70..4ee0d9f78 100644 --- a/embassy-executor/src/arch/xtensa.rs +++ b/embassy-executor/src/arch/xtensa.rs @@ -1,7 +1,6 @@ use core::marker::PhantomData; use core::ptr; - -use atomic_polyfill::{AtomicBool, Ordering}; +use core::sync::atomic::{AtomicBool, Ordering}; use super::{raw, Spawner}; -- cgit From 48e1aab762e902ee0a132602d3c2f9ec0551cd6b Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Sun, 29 Jan 2023 12:55:06 -0600 Subject: executor: Replace `NonNull` with `TaskRef` --- embassy-executor/src/raw/mod.rs | 63 +++++++++++++++++++++++---------- embassy-executor/src/raw/run_queue.rs | 13 +++---- embassy-executor/src/raw/timer_queue.rs | 35 ++++++++---------- embassy-executor/src/raw/waker.rs | 13 ++++--- embassy-executor/src/spawner.rs | 9 +++-- 5 files changed, 76 insertions(+), 57 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 181dabe8e..10a154a9f 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -43,14 +43,11 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; /// Raw task header for use in task pointers. -/// -/// This is an opaque struct, used for raw pointers to tasks, for use -/// with funtions like [`wake_task`] and [`task_from_waker`]. -pub struct TaskHeader { +pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 - pub(crate) poll_fn: UninitCell)>, // Valid if STATE_SPAWNED + pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + pub(crate) poll_fn: UninitCell, // Valid if STATE_SPAWNED #[cfg(feature = "integrated-timers")] pub(crate) expires_at: Cell, @@ -59,7 +56,7 @@ pub struct TaskHeader { } impl TaskHeader { - pub(crate) const fn new() -> Self { + const fn new() -> Self { Self { state: AtomicU32::new(0), run_queue_item: RunQueueItem::new(), @@ -74,6 +71,36 @@ impl TaskHeader { } } +/// This is essentially a `&'static TaskStorage` where the type of the future has been erased. +#[derive(Clone, Copy)] +pub struct TaskRef { + ptr: NonNull, +} + +impl TaskRef { + fn new(task: &'static TaskStorage) -> Self { + Self { + ptr: NonNull::from(task).cast(), + } + } + + /// Safety: The pointer must have been obtained with `Task::as_ptr` + pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self { + Self { + ptr: NonNull::new_unchecked(ptr as *mut TaskHeader), + } + } + + pub(crate) fn header(self) -> &'static TaskHeader { + unsafe { self.ptr.as_ref() } + } + + /// The returned pointer is valid for the entire TaskStorage. + pub(crate) fn as_ptr(self) -> *const TaskHeader { + self.ptr.as_ptr() + } +} + /// Raw storage in which a task can be spawned. /// /// This struct holds the necessary memory to spawn one task whose future is `F`. @@ -135,14 +162,14 @@ impl TaskStorage { .is_ok() } - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull { + unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef { // Initialize the task self.raw.poll_fn.write(Self::poll); self.future.write(future()); - NonNull::new_unchecked(self as *const TaskStorage as *const TaskHeader as *mut TaskHeader) + TaskRef::new(self) } - unsafe fn poll(p: NonNull) { + unsafe fn poll(p: TaskRef) { let this = &*(p.as_ptr() as *const TaskStorage); let future = Pin::new_unchecked(this.future.as_mut()); @@ -307,7 +334,7 @@ impl Executor { /// - `task` must be set up to run in this executor. /// - `task` must NOT be already enqueued (in this executor or another one). #[inline(always)] - unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull) { + unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) { #[cfg(feature = "rtos-trace")] trace::task_ready_begin(task.as_ptr() as u32); @@ -325,8 +352,8 @@ impl Executor { /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. /// In this case, the task's Future must be Send. This is because this is effectively /// sending the task to the executor thread. - pub(super) unsafe fn spawn(&'static self, task: NonNull) { - task.as_ref().executor.set(self); + pub(super) unsafe fn spawn(&'static self, task: TaskRef) { + task.header().executor.set(self); #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); @@ -359,7 +386,7 @@ impl Executor { self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); self.run_queue.dequeue_all(|p| { - let task = p.as_ref(); + let task = p.header(); #[cfg(feature = "integrated-timers")] task.expires_at.set(Instant::MAX); @@ -378,7 +405,7 @@ impl Executor { trace::task_exec_begin(p.as_ptr() as u32); // Run the task - task.poll_fn.read()(p as _); + task.poll_fn.read()(p); #[cfg(feature = "rtos-trace")] trace::task_exec_end(); @@ -424,9 +451,9 @@ impl Executor { /// # Safety /// /// `task` must be a valid task pointer obtained from [`task_from_waker`]. -pub unsafe fn wake_task(task: NonNull) { +pub unsafe fn wake_task(task: TaskRef) { critical_section::with(|cs| { - let header = task.as_ref(); + let header = task.header(); let state = header.state.load(Ordering::Relaxed); // If already scheduled, or if not started, @@ -450,7 +477,7 @@ struct TimerQueue; impl embassy_time::queue::TimerQueue for TimerQueue { fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { let task = waker::task_from_waker(waker); - let task = unsafe { task.as_ref() }; + let task = task.header(); let expires_at = task.expires_at.get(); task.expires_at.set(expires_at.min(at)); } diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index ed8c82a5c..362157535 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -4,7 +4,7 @@ use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; use critical_section::CriticalSection; -use super::TaskHeader; +use super::{TaskHeader, TaskRef}; pub(crate) struct RunQueueItem { next: AtomicPtr, @@ -46,25 +46,26 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull) -> bool { + pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { let prev = self.head.load(Ordering::Relaxed); - task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); - self.head.store(task.as_ptr(), Ordering::Relaxed); + task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + self.head.store(task.as_ptr() as _, Ordering::Relaxed); prev.is_null() } /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull)) { + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // Atomically empty the queue. let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); // Iterate the linked list of tasks that were previously in the queue. while let Some(task) = NonNull::new(ptr) { + let task = unsafe { TaskRef::from_ptr(task.as_ptr()) }; // If the task re-enqueues itself, the `next` pointer will get overwritten. // Therefore, first read the next pointer, and only then process the task. - let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); + let next = task.header().run_queue_item.next.load(Ordering::Relaxed); on_task(task); diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 24c31892a..57d6d3cda 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,45 +1,39 @@ use core::cell::Cell; use core::cmp::min; -use core::ptr; -use core::ptr::NonNull; use atomic_polyfill::Ordering; use embassy_time::Instant; -use super::{TaskHeader, STATE_TIMER_QUEUED}; +use super::{TaskRef, STATE_TIMER_QUEUED}; pub(crate) struct TimerQueueItem { - next: Cell<*mut TaskHeader>, + next: Cell>, } impl TimerQueueItem { pub const fn new() -> Self { - Self { - next: Cell::new(ptr::null_mut()), - } + Self { next: Cell::new(None) } } } pub(crate) struct TimerQueue { - head: Cell<*mut TaskHeader>, + head: Cell>, } impl TimerQueue { pub const fn new() -> Self { - Self { - head: Cell::new(ptr::null_mut()), - } + Self { head: Cell::new(None) } } - pub(crate) unsafe fn update(&self, p: NonNull) { - let task = p.as_ref(); + pub(crate) unsafe fn update(&self, p: TaskRef) { + let task = p.header(); if task.expires_at.get() != Instant::MAX { let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); let is_new = old_state & STATE_TIMER_QUEUED == 0; if is_new { task.timer_queue_item.next.set(self.head.get()); - self.head.set(p.as_ptr()); + self.head.set(Some(p)); } } } @@ -47,7 +41,7 @@ impl TimerQueue { pub(crate) unsafe fn next_expiration(&self) -> Instant { let mut res = Instant::MAX; self.retain(|p| { - let task = p.as_ref(); + let task = p.header(); let expires = task.expires_at.get(); res = min(res, expires); expires != Instant::MAX @@ -55,9 +49,9 @@ impl TimerQueue { res } - pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull)) { + pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(TaskRef)) { self.retain(|p| { - let task = p.as_ref(); + let task = p.header(); if task.expires_at.get() <= now { on_task(p); false @@ -67,11 +61,10 @@ impl TimerQueue { }); } - pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull) -> bool) { + pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { let mut prev = &self.head; - while !prev.get().is_null() { - let p = NonNull::new_unchecked(prev.get()); - let task = &*p.as_ptr(); + while let Some(p) = prev.get() { + let task = p.header(); if f(p) { // Skip to next prev = &task.timer_queue_item.next; diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 5765259f2..400b37fa9 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -1,8 +1,7 @@ use core::mem; -use core::ptr::NonNull; use core::task::{RawWaker, RawWakerVTable, Waker}; -use super::{wake_task, TaskHeader}; +use super::{wake_task, TaskHeader, TaskRef}; const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); @@ -11,14 +10,14 @@ unsafe fn clone(p: *const ()) -> RawWaker { } unsafe fn wake(p: *const ()) { - wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) + wake_task(TaskRef::from_ptr(p as *const TaskHeader)) } unsafe fn drop(_: *const ()) { // nop } -pub(crate) unsafe fn from_task(p: NonNull) -> Waker { +pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) } @@ -33,7 +32,7 @@ pub(crate) unsafe fn from_task(p: NonNull) -> Waker { /// # Panics /// /// Panics if the waker is not created by the Embassy executor. -pub fn task_from_waker(waker: &Waker) -> NonNull { +pub fn task_from_waker(waker: &Waker) -> TaskRef { // safety: OK because WakerHack has the same layout as Waker. // This is not really guaranteed because the structs are `repr(Rust)`, it is // indeed the case in the current implementation. @@ -43,8 +42,8 @@ pub fn task_from_waker(waker: &Waker) -> NonNull { panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") } - // safety: we never create a waker with a null data pointer. - unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } + // safety: our wakers are always created with `TaskRef::as_ptr` + unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) } } struct WakerHack { diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 400d973ff..650ea06cb 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -1,7 +1,6 @@ use core::future::poll_fn; use core::marker::PhantomData; use core::mem; -use core::ptr::NonNull; use core::task::Poll; use super::raw; @@ -22,12 +21,12 @@ use super::raw; /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] pub struct SpawnToken { - raw_task: Option>, + raw_task: Option, phantom: PhantomData<*mut S>, } impl SpawnToken { - pub(crate) unsafe fn new(raw_task: NonNull) -> Self { + pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self { Self { raw_task: Some(raw_task), phantom: PhantomData, @@ -92,7 +91,7 @@ impl Spawner { pub async fn for_current_executor() -> Self { poll_fn(|cx| unsafe { let task = raw::task_from_waker(cx.waker()); - let executor = (*task.as_ptr()).executor.get(); + let executor = task.header().executor.get(); Poll::Ready(Self::new(&*executor)) }) .await @@ -168,7 +167,7 @@ impl SendSpawner { pub async fn for_current_executor() -> Self { poll_fn(|cx| unsafe { let task = raw::task_from_waker(cx.waker()); - let executor = (*task.as_ptr()).executor.get(); + let executor = task.header().executor.get(); Poll::Ready(Self::new(&*executor)) }) .await -- cgit From b6ca6d699ad529f56901485fed86bee1ececad6d Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Sun, 29 Jan 2023 16:32:12 -0600 Subject: Make `wake_task` safe --- embassy-executor/src/raw/mod.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 10a154a9f..183c5e6a2 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -444,14 +444,10 @@ impl Executor { } } -/// Wake a task by raw pointer. +/// Wake a task by `TaskRef`. /// -/// You can obtain task pointers from `Waker`s using [`task_from_waker`]. -/// -/// # Safety -/// -/// `task` must be a valid task pointer obtained from [`task_from_waker`]. -pub unsafe fn wake_task(task: TaskRef) { +/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. +pub fn wake_task(task: TaskRef) { critical_section::with(|cs| { let header = task.header(); let state = header.state.load(Ordering::Relaxed); @@ -465,8 +461,10 @@ pub unsafe fn wake_task(task: TaskRef) { header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); // We have just marked the task as scheduled, so enqueue it. - let executor = &*header.executor.get(); - executor.enqueue(cs, task); + unsafe { + let executor = &*header.executor.get(); + executor.enqueue(cs, task); + } }) } -- cgit From a697f1517a9c54ba042bbf70e0b2ed762d300471 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Tue, 31 Jan 2023 17:29:34 -0600 Subject: Set `poll_fn` in `TaskStorage::new` --- embassy-executor/src/raw/mod.rs | 35 ++++++++++++++--------------------- embassy-executor/src/raw/util.rs | 6 ------ 2 files changed, 14 insertions(+), 27 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 183c5e6a2..8cdce92ec 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -46,8 +46,8 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 - pub(crate) poll_fn: UninitCell, // Valid if STATE_SPAWNED + pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + poll_fn: unsafe fn(TaskRef), #[cfg(feature = "integrated-timers")] pub(crate) expires_at: Cell, @@ -55,22 +55,6 @@ pub(crate) struct TaskHeader { pub(crate) timer_queue_item: timer_queue::TimerQueueItem, } -impl TaskHeader { - const fn new() -> Self { - Self { - state: AtomicU32::new(0), - run_queue_item: RunQueueItem::new(), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - - #[cfg(feature = "integrated-timers")] - expires_at: Cell::new(Instant::from_ticks(0)), - #[cfg(feature = "integrated-timers")] - timer_queue_item: timer_queue::TimerQueueItem::new(), - } - } -} - /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Clone, Copy)] pub struct TaskRef { @@ -128,7 +112,17 @@ impl TaskStorage { /// Create a new TaskStorage, in not-spawned state. pub const fn new() -> Self { Self { - raw: TaskHeader::new(), + raw: TaskHeader { + state: AtomicU32::new(0), + run_queue_item: RunQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: Self::poll, + + #[cfg(feature = "integrated-timers")] + expires_at: Cell::new(Instant::from_ticks(0)), + #[cfg(feature = "integrated-timers")] + timer_queue_item: timer_queue::TimerQueueItem::new(), + }, future: UninitCell::uninit(), } } @@ -164,7 +158,6 @@ impl TaskStorage { unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef { // Initialize the task - self.raw.poll_fn.write(Self::poll); self.future.write(future()); TaskRef::new(self) } @@ -405,7 +398,7 @@ impl Executor { trace::task_exec_begin(p.as_ptr() as u32); // Run the task - task.poll_fn.read()(p); + (task.poll_fn)(p); #[cfg(feature = "rtos-trace")] trace::task_exec_end(); diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index ed5822188..2b1f6b6f3 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs @@ -25,9 +25,3 @@ impl UninitCell { ptr::drop_in_place(self.as_mut_ptr()) } } - -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} -- cgit From fb1946be7fa38eecb36711a1257f89dae3714b61 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Tue, 31 Jan 2023 17:49:18 -0600 Subject: Replace the pointer in `TaskHeader` with an `Option<&Executor>` --- embassy-executor/src/raw/mod.rs | 10 +++++----- embassy-executor/src/spawner.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 8cdce92ec..6783c4853 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -15,10 +15,10 @@ mod waker; use core::cell::Cell; use core::future::Future; +use core::mem; use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; -use core::{mem, ptr}; use atomic_polyfill::{AtomicU32, Ordering}; use critical_section::CriticalSection; @@ -46,7 +46,7 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 + pub(crate) executor: Cell>, poll_fn: unsafe fn(TaskRef), #[cfg(feature = "integrated-timers")] @@ -115,7 +115,7 @@ impl TaskStorage { raw: TaskHeader { state: AtomicU32::new(0), run_queue_item: RunQueueItem::new(), - executor: Cell::new(ptr::null()), + executor: Cell::new(None), poll_fn: Self::poll, #[cfg(feature = "integrated-timers")] @@ -346,7 +346,7 @@ impl Executor { /// In this case, the task's Future must be Send. This is because this is effectively /// sending the task to the executor thread. pub(super) unsafe fn spawn(&'static self, task: TaskRef) { - task.header().executor.set(self); + task.header().executor.set(Some(self)); #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); @@ -455,7 +455,7 @@ pub fn wake_task(task: TaskRef) { // We have just marked the task as scheduled, so enqueue it. unsafe { - let executor = &*header.executor.get(); + let executor = header.executor.get().unwrap_unchecked(); executor.enqueue(cs, task); } }) diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 650ea06cb..7c0a0183c 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -89,10 +89,10 @@ impl Spawner { /// /// Panics if the current executor is not an Embassy executor. pub async fn for_current_executor() -> Self { - poll_fn(|cx| unsafe { + poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); - let executor = task.header().executor.get(); - Poll::Ready(Self::new(&*executor)) + let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; + Poll::Ready(Self::new(executor)) }) .await } @@ -165,10 +165,10 @@ impl SendSpawner { /// /// Panics if the current executor is not an Embassy executor. pub async fn for_current_executor() -> Self { - poll_fn(|cx| unsafe { + poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); - let executor = task.header().executor.get(); - Poll::Ready(Self::new(&*executor)) + let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; + Poll::Ready(Self::new(executor)) }) .await } -- cgit From 4a8e9cf4d9f682bfe4942559da7e76315216c377 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Tue, 31 Jan 2023 18:49:32 -0600 Subject: Add internal `AvailableTask` type --- embassy-executor/src/raw/mod.rs | 68 ++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 28 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6783c4853..e93e60362 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -141,25 +141,14 @@ impl TaskStorage { /// Once the task has finished running, you may spawn it again. It is allowed to spawn it /// on a different executor. pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - if self.spawn_mark_used() { - return unsafe { SpawnToken::::new(self.spawn_initialize(future)) }; + let task = AvailableTask::claim(self); + match task { + Some(task) => { + let task = task.initialize(future); + unsafe { SpawnToken::::new(task) } + } + None => SpawnToken::new_failed(), } - - SpawnToken::::new_failed() - } - - fn spawn_mark_used(&'static self) -> bool { - let state = STATE_SPAWNED | STATE_RUN_QUEUED; - self.raw - .state - .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - } - - unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef { - // Initialize the task - self.future.write(future()); - TaskRef::new(self) } unsafe fn poll(p: TaskRef) { @@ -184,6 +173,27 @@ impl TaskStorage { unsafe impl Sync for TaskStorage {} +struct AvailableTask { + task: &'static TaskStorage, +} + +impl AvailableTask { + fn claim(task: &'static TaskStorage) -> Option { + task.raw + .state + .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) + .ok() + .map(|_| Self { task }) + } + + fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { + unsafe { + self.task.future.write(future()); + } + TaskRef::new(self.task) + } +} + /// Raw storage that can hold up to N tasks of the same type. /// /// This is essentially a `[TaskStorage; N]`. @@ -207,13 +217,14 @@ impl TaskPool { /// is currently free. If none is free, a "poisoned" SpawnToken is returned, /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { - for task in &self.pool { - if task.spawn_mark_used() { - return unsafe { SpawnToken::::new(task.spawn_initialize(future)) }; + let task = self.pool.iter().find_map(AvailableTask::claim); + match task { + Some(task) => { + let task = task.initialize(future); + unsafe { SpawnToken::::new(task) } } + None => SpawnToken::new_failed(), } - - SpawnToken::::new_failed() } /// Like spawn(), but allows the task to be send-spawned if the args are Send even if @@ -255,13 +266,14 @@ impl TaskPool { // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken`. - for task in &self.pool { - if task.spawn_mark_used() { - return SpawnToken::::new(task.spawn_initialize(future)); + let task = self.pool.iter().find_map(AvailableTask::claim); + match task { + Some(task) => { + let task = task.initialize(future); + unsafe { SpawnToken::::new(task) } } + None => SpawnToken::new_failed(), } - - SpawnToken::::new_failed() } } -- cgit From 791fbb3ca0caf81882f67caea9e71adf43496261 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Tue, 31 Jan 2023 21:42:45 -0600 Subject: Make `poll_fn` lazily initialized again --- embassy-executor/src/raw/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e93e60362..42bd82262 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -47,7 +47,7 @@ pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, pub(crate) executor: Cell>, - poll_fn: unsafe fn(TaskRef), + poll_fn: Cell>, #[cfg(feature = "integrated-timers")] pub(crate) expires_at: Cell, @@ -116,7 +116,8 @@ impl TaskStorage { state: AtomicU32::new(0), run_queue_item: RunQueueItem::new(), executor: Cell::new(None), - poll_fn: Self::poll, + // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` + poll_fn: Cell::new(None), #[cfg(feature = "integrated-timers")] expires_at: Cell::new(Instant::from_ticks(0)), @@ -188,6 +189,7 @@ impl AvailableTask { fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { unsafe { + self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); self.task.future.write(future()); } TaskRef::new(self.task) @@ -410,7 +412,7 @@ impl Executor { trace::task_exec_begin(p.as_ptr() as u32); // Run the task - (task.poll_fn)(p); + task.poll_fn.get().unwrap_unchecked()(p); #[cfg(feature = "rtos-trace")] trace::task_exec_end(); -- cgit From 41d558a5f40bbea865f2ba0899b34baed9c1c0d1 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Mon, 20 Mar 2023 16:20:51 -0500 Subject: executor: Allow TaskStorage to auto-implement `Sync` --- embassy-executor/src/raw/mod.rs | 156 +++++++++++++++++++++++--------- embassy-executor/src/raw/timer_queue.rs | 14 ++- embassy-executor/src/raw/util.rs | 29 ++++++ embassy-executor/src/spawner.rs | 12 +-- 4 files changed, 154 insertions(+), 57 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 42bd82262..938492c21 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -13,8 +13,8 @@ mod timer_queue; pub(crate) mod util; mod waker; -use core::cell::Cell; use core::future::Future; +use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; @@ -30,7 +30,7 @@ use embassy_time::Instant; use rtos_trace::trace; use self::run_queue::{RunQueue, RunQueueItem}; -use self::util::UninitCell; +use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; @@ -46,11 +46,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct TaskHeader { pub(crate) state: AtomicU32, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: Cell>, - poll_fn: Cell>, + pub(crate) executor: SyncUnsafeCell>, + poll_fn: SyncUnsafeCell>, #[cfg(feature = "integrated-timers")] - pub(crate) expires_at: Cell, + pub(crate) expires_at: SyncUnsafeCell, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue_item: timer_queue::TimerQueueItem, } @@ -61,6 +61,9 @@ pub struct TaskRef { ptr: NonNull, } +unsafe impl Send for TaskRef where &'static TaskHeader: Send {} +unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {} + impl TaskRef { fn new(task: &'static TaskStorage) -> Self { Self { @@ -115,12 +118,12 @@ impl TaskStorage { raw: TaskHeader { state: AtomicU32::new(0), run_queue_item: RunQueueItem::new(), - executor: Cell::new(None), + executor: SyncUnsafeCell::new(None), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` - poll_fn: Cell::new(None), + poll_fn: SyncUnsafeCell::new(None), #[cfg(feature = "integrated-timers")] - expires_at: Cell::new(Instant::from_ticks(0)), + expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)), #[cfg(feature = "integrated-timers")] timer_queue_item: timer_queue::TimerQueueItem::new(), }, @@ -170,9 +173,15 @@ impl TaskStorage { // it's a noop for our waker. mem::forget(waker); } -} -unsafe impl Sync for TaskStorage {} + #[doc(hidden)] + #[allow(dead_code)] + fn _assert_sync(self) { + fn assert_sync(_: T) {} + + assert_sync(self) + } +} struct AvailableTask { task: &'static TaskStorage, @@ -279,29 +288,13 @@ impl TaskPool { } } -/// Raw executor. -/// -/// This is the core of the Embassy executor. It is low-level, requiring manual -/// handling of wakeups and task polling. If you can, prefer using one of the -/// [higher level executors](crate::Executor). -/// -/// The raw executor leaves it up to you to handle wakeups and scheduling: -/// -/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks -/// that "want to run"). -/// - You must supply a `signal_fn`. The executor will call it to notify you it has work -/// to do. You must arrange for `poll()` to be called as soon as possible. -/// -/// `signal_fn` can be called from *any* context: any thread, any interrupt priority -/// level, etc. It may be called synchronously from any `Executor` method call as well. -/// You must deal with this correctly. -/// -/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates -/// the requirement for `poll` to not be called reentrantly. -pub struct Executor { +struct SignalCtx(*mut ()); +unsafe impl Sync for SignalCtx {} + +pub(crate) struct SyncExecutor { run_queue: RunQueue, signal_fn: fn(*mut ()), - signal_ctx: *mut (), + signal_ctx: SignalCtx, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, @@ -309,14 +302,8 @@ pub struct Executor { alarm: AlarmHandle, } -impl Executor { - /// Create a new executor. - /// - /// When the executor has work to do, it will call `signal_fn` with - /// `signal_ctx` as argument. - /// - /// See [`Executor`] docs for details on `signal_fn`. - pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { +impl SyncExecutor { + pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { #[cfg(feature = "integrated-timers")] let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; #[cfg(feature = "integrated-timers")] @@ -325,7 +312,7 @@ impl Executor { Self { run_queue: RunQueue::new(), signal_fn, - signal_ctx, + signal_ctx: SignalCtx(signal_ctx), #[cfg(feature = "integrated-timers")] timer_queue: timer_queue::TimerQueue::new(), @@ -346,7 +333,7 @@ impl Executor { trace::task_ready_begin(task.as_ptr() as u32); if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx) + (self.signal_fn)(self.signal_ctx.0) } } @@ -387,7 +374,8 @@ impl Executor { /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. - pub unsafe fn poll(&'static self) { + pub(crate) unsafe fn poll(&'static self) { + #[allow(clippy::never_loop)] loop { #[cfg(feature = "integrated-timers")] self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); @@ -441,6 +429,84 @@ impl Executor { #[cfg(feature = "rtos-trace")] trace::system_idle(); } +} + +/// Raw executor. +/// +/// This is the core of the Embassy executor. It is low-level, requiring manual +/// handling of wakeups and task polling. If you can, prefer using one of the +/// [higher level executors](crate::Executor). +/// +/// The raw executor leaves it up to you to handle wakeups and scheduling: +/// +/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks +/// that "want to run"). +/// - You must supply a `signal_fn`. The executor will call it to notify you it has work +/// to do. You must arrange for `poll()` to be called as soon as possible. +/// +/// `signal_fn` can be called from *any* context: any thread, any interrupt priority +/// level, etc. It may be called synchronously from any `Executor` method call as well. +/// You must deal with this correctly. +/// +/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates +/// the requirement for `poll` to not be called reentrantly. +#[repr(transparent)] +pub struct Executor { + pub(crate) inner: SyncExecutor, + + _not_sync: PhantomData<*mut ()>, +} + +impl Executor { + pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { + mem::transmute(inner) + } + /// Create a new executor. + /// + /// When the executor has work to do, it will call `signal_fn` with + /// `signal_ctx` as argument. + /// + /// See [`Executor`] docs for details on `signal_fn`. + pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + Self { + inner: SyncExecutor::new(signal_fn, signal_ctx), + _not_sync: PhantomData, + } + } + + /// Spawn a task in this executor. + /// + /// # Safety + /// + /// `task` must be a valid pointer to an initialized but not-already-spawned task. + /// + /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. + /// In this case, the task's Future must be Send. This is because this is effectively + /// sending the task to the executor thread. + pub(super) unsafe fn spawn(&'static self, task: TaskRef) { + self.inner.spawn(task) + } + + /// Poll all queued tasks in this executor. + /// + /// This loops over all tasks that are queued to be polled (i.e. they're + /// freshly spawned or they've been woken). Other tasks are not polled. + /// + /// You must call `poll` after receiving a call to `signal_fn`. It is OK + /// to call `poll` even when not requested by `signal_fn`, but it wastes + /// energy. + /// + /// # Safety + /// + /// You must NOT call `poll` reentrantly on the same executor. + /// + /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you + /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to + /// somehow schedule for `poll()` to be called later, at a time you know for sure there's + /// no `poll()` already running. + pub unsafe fn poll(&'static self) { + self.inner.poll() + } /// Get a spawner that spawns tasks in this executor. /// @@ -483,8 +549,10 @@ impl embassy_time::queue::TimerQueue for TimerQueue { fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { let task = waker::task_from_waker(waker); let task = task.header(); - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); + unsafe { + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); + } } } diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 57d6d3cda..dc71c95b1 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,28 +1,32 @@ -use core::cell::Cell; use core::cmp::min; use atomic_polyfill::Ordering; use embassy_time::Instant; use super::{TaskRef, STATE_TIMER_QUEUED}; +use crate::raw::util::SyncUnsafeCell; pub(crate) struct TimerQueueItem { - next: Cell>, + next: SyncUnsafeCell>, } impl TimerQueueItem { pub const fn new() -> Self { - Self { next: Cell::new(None) } + Self { + next: SyncUnsafeCell::new(None), + } } } pub(crate) struct TimerQueue { - head: Cell>, + head: SyncUnsafeCell>, } impl TimerQueue { pub const fn new() -> Self { - Self { head: Cell::new(None) } + Self { + head: SyncUnsafeCell::new(None), + } } pub(crate) unsafe fn update(&self, p: TaskRef) { diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index 2b1f6b6f3..e2e8f4df8 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs @@ -25,3 +25,32 @@ impl UninitCell { ptr::drop_in_place(self.as_mut_ptr()) } } + +unsafe impl Sync for UninitCell {} + +#[repr(transparent)] +pub struct SyncUnsafeCell { + value: UnsafeCell, +} + +unsafe impl Sync for SyncUnsafeCell {} + +impl SyncUnsafeCell { + #[inline] + pub const fn new(value: T) -> Self { + Self { + value: UnsafeCell::new(value), + } + } + + pub unsafe fn set(&self, value: T) { + *self.value.get() = value; + } + + pub unsafe fn get(&self) -> T + where + T: Copy, + { + *self.value.get() + } +} diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 7c0a0183c..2b6224045 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -92,6 +92,7 @@ impl Spawner { poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; + let executor = unsafe { raw::Executor::wrap(executor) }; Poll::Ready(Self::new(executor)) }) .await @@ -130,9 +131,7 @@ impl Spawner { /// spawner to other threads, but the spawner loses the ability to spawn /// non-Send tasks. pub fn make_send(&self) -> SendSpawner { - SendSpawner { - executor: self.executor, - } + SendSpawner::new(&self.executor.inner) } } @@ -145,14 +144,11 @@ impl Spawner { /// If you want to spawn non-Send tasks, use [Spawner]. #[derive(Copy, Clone)] pub struct SendSpawner { - executor: &'static raw::Executor, + executor: &'static raw::SyncExecutor, } -unsafe impl Send for SendSpawner {} -unsafe impl Sync for SendSpawner {} - impl SendSpawner { - pub(crate) fn new(executor: &'static raw::Executor) -> Self { + pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self { Self { executor } } -- cgit From 805bca1f5aab8f95bf37007eb9be9016bc0dd8c1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 27 Mar 2023 00:20:24 +0200 Subject: executor: deduplicate doc comments. --- embassy-executor/src/raw/mod.rs | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 938492c21..0120334b6 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -337,15 +337,6 @@ impl SyncExecutor { } } - /// Spawn a task in this executor. - /// - /// # Safety - /// - /// `task` must be a valid pointer to an initialized but not-already-spawned task. - /// - /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. - /// In this case, the task's Future must be Send. This is because this is effectively - /// sending the task to the executor thread. pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); @@ -357,23 +348,9 @@ impl SyncExecutor { }) } - /// Poll all queued tasks in this executor. - /// - /// This loops over all tasks that are queued to be polled (i.e. they're - /// freshly spawned or they've been woken). Other tasks are not polled. - /// - /// You must call `poll` after receiving a call to `signal_fn`. It is OK - /// to call `poll` even when not requested by `signal_fn`, but it wastes - /// energy. - /// /// # Safety /// - /// You must NOT call `poll` reentrantly on the same executor. - /// - /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you - /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to - /// somehow schedule for `poll()` to be called later, at a time you know for sure there's - /// no `poll()` already running. + /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { #[allow(clippy::never_loop)] loop { -- cgit From 21400da073d7173e4c2445cbbcd2cd430f120ad1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 27 Mar 2023 00:22:00 +0200 Subject: executor: Use AtomicPtr for signal_ctx, removes 1 unsafe. --- embassy-executor/src/raw/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 0120334b6..15ff18fc8 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -18,6 +18,7 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; +use core::sync::atomic::AtomicPtr; use core::task::{Context, Poll}; use atomic_polyfill::{AtomicU32, Ordering}; @@ -288,13 +289,10 @@ impl TaskPool { } } -struct SignalCtx(*mut ()); -unsafe impl Sync for SignalCtx {} - pub(crate) struct SyncExecutor { run_queue: RunQueue, signal_fn: fn(*mut ()), - signal_ctx: SignalCtx, + signal_ctx: AtomicPtr<()>, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, @@ -312,7 +310,7 @@ impl SyncExecutor { Self { run_queue: RunQueue::new(), signal_fn, - signal_ctx: SignalCtx(signal_ctx), + signal_ctx: AtomicPtr::new(signal_ctx), #[cfg(feature = "integrated-timers")] timer_queue: timer_queue::TimerQueue::new(), @@ -333,7 +331,7 @@ impl SyncExecutor { trace::task_ready_begin(task.as_ptr() as u32); if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx.0) + (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed)) } } -- cgit From 80972f1e0e6b6d409cc4d86202608c22e5ee3e5a Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 30 Mar 2023 17:55:55 +0200 Subject: executor,sync: add support for turbo-wakers. This is a `core` patch to make wakers 1 word (the task pointer) instead of 2 (task pointer + vtable). It allows having the "waker optimization" we had a while back on `WakerRegistration/AtomicWaker`, but EVERYWHERE, without patching all crates. Advantages: - Less memory usage. - Faster. - `AtomicWaker` can actually use atomics to load/store the waker, No critical section needed. - No `dyn` call, which means `cargo-call-stack` can now see through wakes. Disadvantages: - You have to patch `core`... - Breaks all executors and other things that create wakers, unless they opt in to using the new `from_ptr` API. How to use: - Run this shell script to patch `core`. https://gist.github.com/Dirbaio/c67da7cf318515181539122c9d32b395 - Enable `build-std` - Enable `build-std-features = core/turbowakers` - Enable feature `turbowakers` in `embassy-executor`, `embassy-sync`. - Make sure you have no other crate creating wakers other than `embassy-executor`. These will panic at runtime. Note that the patched `core` is equivalent to the unpached one when the `turbowakers` feature is not enabled, so it should be fine to leave it there. --- embassy-executor/src/raw/mod.rs | 1 + embassy-executor/src/raw/waker_turbo.rs | 34 +++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 embassy-executor/src/raw/waker_turbo.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 15ff18fc8..72c367c33 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -11,6 +11,7 @@ mod run_queue; #[cfg(feature = "integrated-timers")] mod timer_queue; pub(crate) mod util; +#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; use core::future::Future; diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs new file mode 100644 index 000000000..435a0ff7e --- /dev/null +++ b/embassy-executor/src/raw/waker_turbo.rs @@ -0,0 +1,34 @@ +use core::ptr::NonNull; +use core::task::Waker; + +use super::{wake_task, TaskHeader, TaskRef}; + +pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { + Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _)) +} + +/// Get a task pointer from a waker. +/// +/// This can be used as an optimization in wait queues to store task pointers +/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps +/// avoid dynamic dispatch. +/// +/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). +/// +/// # Panics +/// +/// Panics if the waker is not created by the Embassy executor. +pub fn task_from_waker(waker: &Waker) -> TaskRef { + let ptr = waker.as_turbo_ptr().as_ptr(); + + // safety: our wakers are always created with `TaskRef::as_ptr` + unsafe { TaskRef::from_ptr(ptr as *const TaskHeader) } +} + +#[inline(never)] +#[no_mangle] +fn _turbo_wake(ptr: NonNull<()>) { + // safety: our wakers are always created with `TaskRef::as_ptr` + let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) }; + wake_task(task) +} -- cgit From b41ee47115509ba5ed302c684c0c36b6a3e3f76f Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 3 Apr 2023 01:11:42 +0200 Subject: executor: unify export mod. --- embassy-executor/src/lib.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index 4c7e2f4cd..8707995b4 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -46,11 +46,13 @@ cfg_if::cfg_if! { } } +/// Implementation details for embassy macros. +/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. #[doc(hidden)] -/// Implementation details for embassy macros. DO NOT USE. -pub mod export { +pub mod _export { #[cfg(feature = "rtos-trace")] pub use rtos_trace::trace; + pub use static_cell::StaticCell; /// Expands the given block of code when `embassy-executor` is compiled with /// the `rtos-trace-interrupt` feature. @@ -75,9 +77,3 @@ pub mod raw; mod spawner; pub use spawner::*; - -/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. -#[doc(hidden)] -pub mod _export { - pub use static_cell::StaticCell; -} -- cgit From d3c4e4a20a05085eae8d568c7efdbe09bada9cf5 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 3 Apr 2023 01:18:27 +0200 Subject: executor: add Pender, rework Cargo features. This introduces a `Pender` struct with enum cases for thread-mode, interrupt-mode and custom callback executors. This avoids calls through function pointers when using only the thread or interrupt executors. Faster, and friendlier to `cargo-call-stack`. `embassy-executor` now has `arch-xxx` Cargo features to select the arch and to enable the builtin executors (thread and interrupt). --- embassy-executor/src/arch/cortex_m.rs | 238 +++++++++++++++++++++++++++------- embassy-executor/src/arch/riscv32.rs | 133 ++++++++++--------- embassy-executor/src/arch/std.rs | 150 +++++++++++---------- embassy-executor/src/arch/wasm.rs | 134 ++++++++++--------- embassy-executor/src/arch/xtensa.rs | 135 ++++++++++--------- embassy-executor/src/lib.rs | 71 +++++----- embassy-executor/src/raw/mod.rs | 97 +++++++++++--- 7 files changed, 602 insertions(+), 356 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index 4b27a264e..d6a55c4c7 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs @@ -1,59 +1,209 @@ -use core::arch::asm; -use core::marker::PhantomData; -use core::ptr; - -use super::{raw, Spawner}; - -/// Thread mode executor, using WFE/SEV. -/// -/// This is the simplest and most common kind of executor. It runs on -/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction -/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction -/// is executed, to make the `WFE` exit from sleep and poll the task. -/// -/// This executor allows for ultra low power consumption for chips where `WFE` -/// triggers low-power sleep without extra steps. If your chip requires extra steps, -/// you may use [`raw::Executor`] directly to program custom behavior. -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use core::arch::asm; + use core::marker::PhantomData; + + #[cfg(feature = "nightly")] + pub use embassy_macros::main_cortex_m as main; + + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; + + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender; + + impl ThreadPender { + pub(crate) fn pend(self) { + unsafe { core::arch::asm!("sev") } + } + } + + /// Thread mode executor, using WFE/SEV. + /// + /// This is the simplest and most common kind of executor. It runs on + /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction + /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction + /// is executed, to make the `WFE` exit from sleep and poll the task. + /// + /// This executor allows for ultra low power consumption for chips where `WFE` + /// triggers low-power sleep without extra steps. If your chip requires extra steps, + /// you may use [`raw::Executor`] directly to program custom behavior. + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + asm!("wfe"); + }; + } + } + } } -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()), - not_send: PhantomData, +#[cfg(feature = "executor-interrupt")] +pub use interrupt::*; +#[cfg(feature = "executor-interrupt")] +mod interrupt { + use core::cell::UnsafeCell; + use core::mem::MaybeUninit; + + use atomic_polyfill::{AtomicBool, Ordering}; + use cortex_m::interrupt::InterruptNumber; + use cortex_m::peripheral::NVIC; + + use crate::raw::{self, Pender, PenderInner}; + + #[derive(Clone, Copy)] + pub(crate) struct InterruptPender(u16); + + impl InterruptPender { + pub(crate) fn pend(self) { + // STIR is faster, but is only available in v7 and higher. + #[cfg(not(armv6m))] + { + let mut nvic: cortex_m::peripheral::NVIC = unsafe { core::mem::transmute(()) }; + nvic.request(self); + } + + #[cfg(armv6m)] + cortex_m::peripheral::NVIC::pend(self); } } - /// Run the executor. + unsafe impl cortex_m::interrupt::InterruptNumber for InterruptPender { + fn number(self) -> u16 { + self.0 + } + } + + /// Interrupt mode executor. /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. + /// This executor runs tasks in interrupt mode. The interrupt handler is set up + /// to poll tasks, and when a task is woken the interrupt is pended from software. /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. + /// This allows running async tasks at a priority higher than thread mode. One + /// use case is to leave thread mode free for non-async tasks. Another use case is + /// to run multiple executors: one in thread mode for low priority tasks and another in + /// interrupt mode for higher priority tasks. Higher priority tasks will preempt lower + /// priority ones. /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: + /// It is even possible to run multiple interrupt mode executors at different priorities, + /// by assigning different priorities to the interrupts. For an example on how to do this, + /// See the 'multiprio' example for 'embassy-nrf'. /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// To use it, you have to pick an interrupt that won't be used by the hardware. + /// Some chips reserve some interrupts for this purpose, sometimes named "software interrupts" (SWI). + /// If this is not the case, you may use an interrupt from any unused peripheral. /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// It is somewhat more complex to use, it's recommended to use the thread-mode + /// [`Executor`] instead, if it works for your use case. + pub struct InterruptExecutor { + started: AtomicBool, + executor: UnsafeCell>, + } + + unsafe impl Send for InterruptExecutor {} + unsafe impl Sync for InterruptExecutor {} + + impl InterruptExecutor { + /// Create a new, not started `InterruptExecutor`. + #[inline] + pub const fn new() -> Self { + Self { + started: AtomicBool::new(false), + executor: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Executor interrupt callback. + /// + /// # Safety + /// + /// You MUST call this from the interrupt handler, and from nowhere else. + pub unsafe fn on_interrupt(&'static self) { + let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; + executor.poll(); + } + + /// Start the executor. + /// + /// This initializes the executor, enables the interrupt, and returns. + /// The executor keeps running in the background through the interrupt. + /// + /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`] + /// is returned instead of a [`Spawner`](embassy_executor::Spawner) because the executor effectively runs in a + /// different "thread" (the interrupt), so spawning tasks on it is effectively + /// sending them. + /// + /// To obtain a [`Spawner`](embassy_executor::Spawner) for this executor, use [`Spawner::for_current_executor()`](embassy_executor::Spawner::for_current_executor()) from + /// a task running in it. + /// + /// # Interrupt requirements + /// + /// You must write the interrupt handler yourself, and make it call [`on_interrupt()`](Self::on_interrupt). + /// + /// This method already enables (unmasks) the interrupt, you must NOT do it yourself. + /// + /// You must set the interrupt priority before calling this method. You MUST NOT + /// do it after. + /// + pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { + if self + .started + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + panic!("InterruptExecutor::start() called multiple times on the same executor."); + } - loop { unsafe { - self.inner.poll(); - asm!("wfe"); - }; + (&mut *self.executor.get()) + .as_mut_ptr() + .write(raw::Executor::new(Pender(PenderInner::Interrupt(InterruptPender( + irq.number(), + ))))) + } + + let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; + + unsafe { NVIC::unmask(irq) } + + executor.spawner().make_send() } } } diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index e97a56cda..f66daeae4 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -1,72 +1,83 @@ -use core::marker::PhantomData; -use core::ptr; -use core::sync::atomic::{AtomicBool, Ordering}; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-riscv32`."); -use super::{raw, Spawner}; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use core::marker::PhantomData; + use core::sync::atomic::{AtomicBool, Ordering}; -/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV -/// -static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; -/// RISCV32 Executor -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender; -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - // use Signal_Work_Thread_Mode as substitute for local interrupt register - inner: raw::Executor::new( - |_| { - SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); - }, - ptr::null_mut(), - ), - not_send: PhantomData, + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst); } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV + static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + + /// RISCV32 Executor + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); - loop { - unsafe { - self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - //will only set this value to true. - critical_section::with(|_| { - // if there is work to do, loop back to polling - // TODO can we relax this? - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } - // if not, wait for interrupt - else { - core::arch::asm!("wfi"); - } - }); - // if an interrupt occurred while waiting, it will be serviced here + loop { + unsafe { + self.inner.poll(); + // we do not care about race conditions between the load and store operations, interrupts + //will only set this value to true. + critical_section::with(|_| { + // if there is work to do, loop back to polling + // TODO can we relax this? + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } + // if not, wait for interrupt + else { + core::arch::asm!("wfi"); + } + }); + // if an interrupt occurred while waiting, it will be serviced here + } } } } diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs index 701f0eb18..4e4a178f0 100644 --- a/embassy-executor/src/arch/std.rs +++ b/embassy-executor/src/arch/std.rs @@ -1,84 +1,100 @@ -use std::marker::PhantomData; -use std::sync::{Condvar, Mutex}; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-std`."); -use super::{raw, Spawner}; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use std::marker::PhantomData; + use std::sync::{Condvar, Mutex}; -/// Single-threaded std-based executor. -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, - signaler: &'static Signaler, -} + #[cfg(feature = "nightly")] + pub use embassy_macros::main_std as main; + + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - let signaler = &*Box::leak(Box::new(Signaler::new())); - Self { - inner: raw::Executor::new( - |p| unsafe { - let s = &*(p as *const () as *const Signaler); - s.signal() - }, - signaler as *const _ as _, - ), - not_send: PhantomData, - signaler, + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender(&'static Signaler); + + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + self.0.signal() } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// Single-threaded std-based executor. + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + signaler: &'static Signaler, + } - loop { - unsafe { self.inner.poll() }; - self.signaler.wait() + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let signaler = &*Box::leak(Box::new(Signaler::new())); + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(signaler)))), + not_send: PhantomData, + signaler, + } } - } -} -struct Signaler { - mutex: Mutex, - condvar: Condvar, -} + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); -impl Signaler { - fn new() -> Self { - Self { - mutex: Mutex::new(false), - condvar: Condvar::new(), + loop { + unsafe { self.inner.poll() }; + self.signaler.wait() + } } } - fn wait(&self) { - let mut signaled = self.mutex.lock().unwrap(); - while !*signaled { - signaled = self.condvar.wait(signaled).unwrap(); - } - *signaled = false; + struct Signaler { + mutex: Mutex, + condvar: Condvar, } - fn signal(&self) { - let mut signaled = self.mutex.lock().unwrap(); - *signaled = true; - self.condvar.notify_one(); + impl Signaler { + fn new() -> Self { + Self { + mutex: Mutex::new(false), + condvar: Condvar::new(), + } + } + + fn wait(&self) { + let mut signaled = self.mutex.lock().unwrap(); + while !*signaled { + signaled = self.condvar.wait(signaled).unwrap(); + } + *signaled = false; + } + + fn signal(&self) { + let mut signaled = self.mutex.lock().unwrap(); + *signaled = true; + self.condvar.notify_one(); + } } } diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs index 98091cfbb..08ab16b99 100644 --- a/embassy-executor/src/arch/wasm.rs +++ b/embassy-executor/src/arch/wasm.rs @@ -1,74 +1,88 @@ -use core::marker::PhantomData; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-wasm`."); -use js_sys::Promise; -use wasm_bindgen::prelude::*; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { -use super::raw::util::UninitCell; -use super::raw::{self}; -use super::Spawner; + use core::marker::PhantomData; -/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. -pub struct Executor { - inner: raw::Executor, - ctx: &'static WasmContext, - not_send: PhantomData<*mut ()>, -} + #[cfg(feature = "nightly")] + pub use embassy_macros::main_wasm as main; + use js_sys::Promise; + use wasm_bindgen::prelude::*; -pub(crate) struct WasmContext { - promise: Promise, - closure: UninitCell>, -} + use crate::raw::util::UninitCell; + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; + + /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. + pub struct Executor { + inner: raw::Executor, + ctx: &'static WasmContext, + not_send: PhantomData<*mut ()>, + } + + pub(crate) struct WasmContext { + promise: Promise, + closure: UninitCell>, + } + + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender(&'static WasmContext); -impl WasmContext { - pub fn new() -> Self { - Self { - promise: Promise::resolve(&JsValue::undefined()), - closure: UninitCell::uninit(), + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + let _ = self.0.promise.then(unsafe { self.0.closure.as_mut() }); } } -} -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - let ctx = &*Box::leak(Box::new(WasmContext::new())); - let inner = raw::Executor::new( - |p| unsafe { - let ctx = &*(p as *const () as *const WasmContext); - let _ = ctx.promise.then(ctx.closure.as_mut()); - }, - ctx as *const _ as _, - ); - Self { - inner, - not_send: PhantomData, - ctx, + impl WasmContext { + pub fn new() -> Self { + Self { + promise: Promise::resolve(&JsValue::undefined()), + closure: UninitCell::uninit(), + } } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { - unsafe { - let executor = &self.inner; - self.ctx.closure.write(Closure::new(move |_| { - executor.poll(); - })); - init(self.inner.spawner()); + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + let ctx = &*Box::leak(Box::new(WasmContext::new())); + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(ctx)))), + not_send: PhantomData, + ctx, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { + unsafe { + let executor = &self.inner; + self.ctx.closure.write(Closure::new(move |_| { + executor.poll(); + })); + init(self.inner.spawner()); + } } } } diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs index 4ee0d9f78..61ea92c16 100644 --- a/embassy-executor/src/arch/xtensa.rs +++ b/embassy-executor/src/arch/xtensa.rs @@ -1,73 +1,84 @@ -use core::marker::PhantomData; -use core::ptr; -use core::sync::atomic::{AtomicBool, Ordering}; +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-xtensa`."); -use super::{raw, Spawner}; +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + use core::marker::PhantomData; + use core::sync::atomic::{AtomicBool, Ordering}; -/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa -/// -static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + use crate::raw::{Pender, PenderInner}; + use crate::{raw, Spawner}; -/// Xtensa Executor -pub struct Executor { - inner: raw::Executor, - not_send: PhantomData<*mut ()>, -} + #[derive(Copy, Clone)] + pub(crate) struct ThreadPender; -impl Executor { - /// Create a new Executor. - pub fn new() -> Self { - Self { - // use Signal_Work_Thread_Mode as substitute for local interrupt register - inner: raw::Executor::new( - |_| { - SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); - }, - ptr::null_mut(), - ), - not_send: PhantomData, + impl ThreadPender { + #[allow(unused)] + pub(crate) fn pend(self) { + SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst); } } - /// Run the executor. - /// - /// The `init` closure is called with a [`Spawner`] that spawns tasks on - /// this executor. Use it to spawn the initial task(s). After `init` returns, - /// the executor starts running the tasks. - /// - /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), - /// for example by passing it as an argument to the initial tasks. - /// - /// This function requires `&'static mut self`. This means you have to store the - /// Executor instance in a place where it'll live forever and grants you mutable - /// access. There's a few ways to do this: - /// - /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) - /// - a `static mut` (unsafe) - /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) - /// - /// This function never returns. - pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { - init(self.inner.spawner()); + /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa + static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); + + /// Xtensa Executor + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); - loop { - unsafe { - self.inner.poll(); - // we do not care about race conditions between the load and store operations, interrupts - // will only set this value to true. - // if there is work to do, loop back to polling - // TODO can we relax this? - critical_section::with(|_| { - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } else { - // waiti sets the PS.INTLEVEL when slipping into sleep - // because critical sections in Xtensa are implemented via increasing - // PS.INTLEVEL the critical section ends here - // take care not add code after `waiti` if it needs to be inside the CS - core::arch::asm!("waiti 0"); // critical section ends here - } - }); + loop { + unsafe { + self.inner.poll(); + // we do not care about race conditions between the load and store operations, interrupts + // will only set this value to true. + // if there is work to do, loop back to polling + // TODO can we relax this? + critical_section::with(|_| { + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + } else { + // waiti sets the PS.INTLEVEL when slipping into sleep + // because critical sections in Xtensa are implemented via increasing + // PS.INTLEVEL the critical section ends here + // take care not add code after `waiti` if it needs to be inside the CS + core::arch::asm!("waiti 0"); // critical section ends here + } + }); + } } } } diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index 8707995b4..3ce687eb6 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -1,5 +1,5 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))] +#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)] +#![cfg_attr(all(feature = "nightly", feature = "arch-xtensa"), feature(asm_experimental_arch))] #![allow(clippy::new_without_default)] #![doc = include_str!("../README.md")] #![warn(missing_docs)] @@ -10,41 +10,35 @@ pub(crate) mod fmt; #[cfg(feature = "nightly")] pub use embassy_macros::task; -cfg_if::cfg_if! { - if #[cfg(cortex_m)] { - #[path="arch/cortex_m.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_cortex_m as main; - } - else if #[cfg(target_arch="riscv32")] { - #[path="arch/riscv32.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_riscv as main; - } - else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { - #[path="arch/xtensa.rs"] - mod arch; - pub use arch::*; - } - else if #[cfg(feature="wasm")] { - #[path="arch/wasm.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_wasm as main; - } - else if #[cfg(feature="std")] { - #[path="arch/std.rs"] - mod arch; - pub use arch::*; - #[cfg(feature = "nightly")] - pub use embassy_macros::main_std as main; - } +macro_rules! check_at_most_one { + (@amo [$($feats:literal)*] [] [$($res:tt)*]) => { + #[cfg(any($($res)*))] + compile_error!(concat!("At most one of these features can be enabled at the same time:", $(" `", $feats, "`",)*)); + }; + (@amo $feats:tt [$curr:literal $($rest:literal)*] [$($res:tt)*]) => { + check_at_most_one!(@amo $feats [$($rest)*] [$($res)* $(all(feature=$curr, feature=$rest),)*]); + }; + ($($f:literal),*$(,)?) => { + check_at_most_one!(@amo [$($f)*] [$($f)*] []); + }; } +check_at_most_one!("arch-cortex-m", "arch-riscv32", "arch-xtensa", "arch-std", "arch-wasm",); + +#[cfg(feature = "_arch")] +#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")] +#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] +#[cfg_attr(feature = "arch-xtensa", path = "arch/xtensa.rs")] +#[cfg_attr(feature = "arch-std", path = "arch/std.rs")] +#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] +mod arch; + +#[cfg(feature = "_arch")] +pub use arch::*; + +pub mod raw; + +mod spawner; +pub use spawner::*; /// Implementation details for embassy macros. /// Do not use. Used for macros and HALs only. Not covered by semver guarantees. @@ -72,8 +66,3 @@ pub mod _export { ($($tt:tt)*) => {}; } } - -pub mod raw; - -mod spawner; -pub use spawner::*; diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 72c367c33..f6c66da5a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -19,7 +19,6 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::AtomicPtr; use core::task::{Context, Poll}; use atomic_polyfill::{AtomicU32, Ordering}; @@ -290,10 +289,60 @@ impl TaskPool { } } +#[derive(Clone, Copy)] +pub(crate) enum PenderInner { + #[cfg(feature = "executor-thread")] + Thread(crate::arch::ThreadPender), + #[cfg(feature = "executor-interrupt")] + Interrupt(crate::arch::InterruptPender), + #[cfg(feature = "pender-callback")] + Callback { func: fn(*mut ()), context: *mut () }, +} + +unsafe impl Send for PenderInner {} +unsafe impl Sync for PenderInner {} + +/// Platform/architecture-specific action executed when an executor has pending work. +/// +/// When a task within an executor is woken, the `Pender` is called. This does a +/// platform/architecture-specific action to signal there is pending work in the executor. +/// When this happens, you must arrange for [`Executor::poll`] to be called. +/// +/// You can think of it as a waker, but for the whole executor. +pub struct Pender(pub(crate) PenderInner); + +impl Pender { + /// Create a `Pender` that will call an arbitrary function pointer. + /// + /// # Arguments + /// + /// - `func`: The function pointer to call. + /// - `context`: Opaque context pointer, that will be passed to the function pointer. + #[cfg(feature = "pender-callback")] + pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self { + Self(PenderInner::Callback { + func, + context: context.into(), + }) + } +} + +impl Pender { + pub(crate) fn pend(&self) { + match self.0 { + #[cfg(feature = "executor-thread")] + PenderInner::Thread(x) => x.pend(), + #[cfg(feature = "executor-interrupt")] + PenderInner::Interrupt(x) => x.pend(), + #[cfg(feature = "pender-callback")] + PenderInner::Callback { func, context } => func(context), + } + } +} + pub(crate) struct SyncExecutor { run_queue: RunQueue, - signal_fn: fn(*mut ()), - signal_ctx: AtomicPtr<()>, + pender: Pender, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, @@ -302,16 +351,13 @@ pub(crate) struct SyncExecutor { } impl SyncExecutor { - pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + pub(crate) fn new(pender: Pender) -> Self { #[cfg(feature = "integrated-timers")] let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; - #[cfg(feature = "integrated-timers")] - driver::set_alarm_callback(alarm, signal_fn, signal_ctx); Self { run_queue: RunQueue::new(), - signal_fn, - signal_ctx: AtomicPtr::new(signal_ctx), + pender, #[cfg(feature = "integrated-timers")] timer_queue: timer_queue::TimerQueue::new(), @@ -332,10 +378,16 @@ impl SyncExecutor { trace::task_ready_begin(task.as_ptr() as u32); if self.run_queue.enqueue(cs, task) { - (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed)) + self.pender.pend(); } } + #[cfg(feature = "integrated-timers")] + fn alarm_callback(ctx: *mut ()) { + let this: &Self = unsafe { &*(ctx as *const Self) }; + this.pender.pend(); + } + pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); @@ -351,6 +403,9 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { + #[cfg(feature = "integrated-timers")] + driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); + #[allow(clippy::never_loop)] loop { #[cfg(feature = "integrated-timers")] @@ -417,14 +472,14 @@ impl SyncExecutor { /// /// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks /// that "want to run"). -/// - You must supply a `signal_fn`. The executor will call it to notify you it has work +/// - You must supply a [`Pender`]. The executor will call it to notify you it has work /// to do. You must arrange for `poll()` to be called as soon as possible. /// -/// `signal_fn` can be called from *any* context: any thread, any interrupt priority +/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority /// level, etc. It may be called synchronously from any `Executor` method call as well. /// You must deal with this correctly. /// -/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates +/// In particular, you must NOT call `poll` directly from the pender callback, as this violates /// the requirement for `poll` to not be called reentrantly. #[repr(transparent)] pub struct Executor { @@ -437,15 +492,15 @@ impl Executor { pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { mem::transmute(inner) } + /// Create a new executor. /// - /// When the executor has work to do, it will call `signal_fn` with - /// `signal_ctx` as argument. + /// When the executor has work to do, it will call the [`Pender`]. /// - /// See [`Executor`] docs for details on `signal_fn`. - pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { + /// See [`Executor`] docs for details on `Pender`. + pub fn new(pender: Pender) -> Self { Self { - inner: SyncExecutor::new(signal_fn, signal_ctx), + inner: SyncExecutor::new(pender), _not_sync: PhantomData, } } @@ -468,16 +523,16 @@ impl Executor { /// This loops over all tasks that are queued to be polled (i.e. they're /// freshly spawned or they've been woken). Other tasks are not polled. /// - /// You must call `poll` after receiving a call to `signal_fn`. It is OK - /// to call `poll` even when not requested by `signal_fn`, but it wastes + /// You must call `poll` after receiving a call to the [`Pender`]. It is OK + /// to call `poll` even when not requested by the `Pender`, but it wastes /// energy. /// /// # Safety /// /// You must NOT call `poll` reentrantly on the same executor. /// - /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you - /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to + /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you + /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. pub unsafe fn poll(&'static self) { -- cgit From 8290236ed64435453a9c028c95246a86371bd4ce Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Sun, 2 Apr 2023 14:11:31 -0500 Subject: executor: Replace unsound critical sections with atomics --- embassy-executor/src/raw/mod.rs | 28 +++++++++++++--------------- embassy-executor/src/raw/run_queue.rs | 18 ++++++++++++------ 2 files changed, 25 insertions(+), 21 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f6c66da5a..bd0cff26b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -22,7 +22,6 @@ use core::ptr::NonNull; use core::task::{Context, Poll}; use atomic_polyfill::{AtomicU32, Ordering}; -use critical_section::CriticalSection; #[cfg(feature = "integrated-timers")] use embassy_time::driver::{self, AlarmHandle}; #[cfg(feature = "integrated-timers")] @@ -373,11 +372,11 @@ impl SyncExecutor { /// - `task` must be set up to run in this executor. /// - `task` must NOT be already enqueued (in this executor or another one). #[inline(always)] - unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) { + unsafe fn enqueue(&self, task: TaskRef) { #[cfg(feature = "rtos-trace")] trace::task_ready_begin(task.as_ptr() as u32); - if self.run_queue.enqueue(cs, task) { + if self.run_queue.enqueue(task) { self.pender.pend(); } } @@ -394,9 +393,7 @@ impl SyncExecutor { #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); - critical_section::with(|cs| { - self.enqueue(cs, task); - }) + self.enqueue(task); } /// # Safety @@ -552,24 +549,25 @@ impl Executor { /// /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task(task: TaskRef) { - critical_section::with(|cs| { - let header = task.header(); - let state = header.state.load(Ordering::Relaxed); + let header = task.header(); + let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { // If already scheduled, or if not started, if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - return; + None + } else { + // Mark it as scheduled + Some(state | STATE_RUN_QUEUED) } + }); - // Mark it as scheduled - header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); - + if res.is_ok() { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); - executor.enqueue(cs, task); + executor.enqueue(task); } - }) + } } #[cfg(feature = "integrated-timers")] diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 362157535..a88174a0c 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -2,7 +2,6 @@ use core::ptr; use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; -use critical_section::CriticalSection; use super::{TaskHeader, TaskRef}; @@ -46,11 +45,18 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { - let prev = self.head.load(Ordering::Relaxed); - task.header().run_queue_item.next.store(prev, Ordering::Relaxed); - self.head.store(task.as_ptr() as _, Ordering::Relaxed); - prev.is_null() + pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + let mut was_empty = false; + + self.head + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { + was_empty = prev.is_null(); + task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + Some(task.as_ptr() as *mut _) + }) + .ok(); + + was_empty } /// Empty the queue, then call `on_task` for each task that was in the queue. -- cgit From 32836129f6d0aa25f51cd60b0039072530009711 Mon Sep 17 00:00:00 2001 From: Roy Buitenhuis Date: Tue, 11 Apr 2023 14:59:38 +0200 Subject: re-export main_riscv macro as main for riscv arch. --- embassy-executor/src/arch/riscv32.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index f66daeae4..ff7ec1575 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -8,6 +8,9 @@ mod thread { use core::marker::PhantomData; use core::sync::atomic::{AtomicBool, Ordering}; + #[cfg(feature = "nightly")] + pub use embassy_macros::main_riscv as main; + use crate::raw::{Pender, PenderInner}; use crate::{raw, Spawner}; -- cgit From 6a6c673c5fd1baa1d3ca3eebb55ba430a86b1438 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Thu, 13 Apr 2023 14:21:41 -0500 Subject: Executor: Replace unnecessary atomics in runqueue --- embassy-executor/src/raw/run_queue.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index a88174a0c..f1ec19ac1 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -4,15 +4,16 @@ use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; use super::{TaskHeader, TaskRef}; +use crate::raw::util::SyncUnsafeCell; pub(crate) struct RunQueueItem { - next: AtomicPtr, + next: SyncUnsafeCell>, } impl RunQueueItem { pub const fn new() -> Self { Self { - next: AtomicPtr::new(ptr::null_mut()), + next: SyncUnsafeCell::new(None), } } } @@ -51,7 +52,12 @@ impl RunQueue { self.head .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { was_empty = prev.is_null(); - task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + unsafe { + // safety: the pointer is either null or valid + let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); + // safety: there are no concurrent accesses to `next` + task.header().run_queue_item.next.set(prev); + } Some(task.as_ptr() as *mut _) }) .ok(); @@ -64,18 +70,19 @@ impl RunQueue { /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // Atomically empty the queue. - let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + // safety: the pointer is either null or valid + let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = NonNull::new(ptr) { - let task = unsafe { TaskRef::from_ptr(task.as_ptr()) }; + while let Some(task) = next { // If the task re-enqueues itself, the `next` pointer will get overwritten. // Therefore, first read the next pointer, and only then process the task. - let next = task.header().run_queue_item.next.load(Ordering::Relaxed); + // safety: there are no concurrent accesses to `next` + next = unsafe { task.header().run_queue_item.next.get() }; on_task(task); - - ptr = next } } } -- cgit From 5fe36b6bb05f0460e12c726ab5554c9b4bad1829 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sat, 13 May 2023 13:44:19 +0200 Subject: Work around xtensa deadlock, take 2 --- embassy-executor/src/arch/xtensa.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs index 61ea92c16..017b2c52b 100644 --- a/embassy-executor/src/arch/xtensa.rs +++ b/embassy-executor/src/arch/xtensa.rs @@ -63,21 +63,29 @@ mod thread { loop { unsafe { self.inner.poll(); + + // Manual critical section implementation that only masks interrupts handlers. + // We must not acquire the cross-core on dual-core systems because that would + // prevent the other core from doing useful work while this core is sleeping. + let token: critical_section::RawRestoreState; + core::arch::asm!("rsil {0}, 5", out(reg) token); + // we do not care about race conditions between the load and store operations, interrupts // will only set this value to true. // if there is work to do, loop back to polling - // TODO can we relax this? - critical_section::with(|_| { - if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { - SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); - } else { - // waiti sets the PS.INTLEVEL when slipping into sleep - // because critical sections in Xtensa are implemented via increasing - // PS.INTLEVEL the critical section ends here - // take care not add code after `waiti` if it needs to be inside the CS - core::arch::asm!("waiti 0"); // critical section ends here - } - }); + if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { + SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); + + core::arch::asm!( + "wsr.ps {0}", + "rsync", in(reg) token) + } else { + // waiti sets the PS.INTLEVEL when slipping into sleep + // because critical sections in Xtensa are implemented via increasing + // PS.INTLEVEL the critical section ends here + // take care not add code after `waiti` if it needs to be inside the CS + core::arch::asm!("waiti 0"); // critical section ends here + } } } } -- cgit From 54fc933932e9f8e510331401820b57ba32d8ade3 Mon Sep 17 00:00:00 2001 From: Kaspar Schleiser Date: Fri, 16 Jun 2023 12:59:23 +0200 Subject: embassy-executor: introduce `InterruptExecutor::spawner()` --- embassy-executor/src/arch/cortex_m.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index d6a55c4c7..94c8134d6 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs @@ -205,5 +205,20 @@ mod interrupt { executor.spawner().make_send() } + + /// Get a SendSpawner for this executor + /// + /// This returns a [`SendSpawner`] you can use to spawn tasks on this + /// executor. + /// + /// This MUST only be called on an executor that has already been spawned. + /// The function will panic otherwise. + pub fn spawner(&'static self) -> crate::SendSpawner { + if !self.started.load(Ordering::Acquire) { + panic!("InterruptExecutor::spawner() called on uninitialized executor."); + } + let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; + executor.spawner().make_send() + } } } -- cgit From a2501bd5c1004362368d962b206b6de8f4962837 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Wed, 12 Jul 2023 16:52:52 +0200 Subject: Allow clearing finished task from timer queue --- embassy-executor/src/raw/mod.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index bd0cff26b..f3760f589 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -165,6 +165,9 @@ impl TaskStorage { Poll::Ready(_) => { this.future.drop_in_place(); this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + + #[cfg(feature = "integrated-timers")] + this.raw.expires_at.set(Instant::MAX); } Poll::Pending => {} } -- cgit