From 01d8508b6c9bf89e27a83b9587bcc13c358c1e51 Mon Sep 17 00:00:00 2001 From: Oleksandr Babak Date: Fri, 6 Sep 2024 11:16:44 +0200 Subject: fix: nightly api changed during the night --- embassy-executor/src/raw/waker.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 8d3910a25..8bb2cfd05 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -50,8 +50,7 @@ pub fn task_from_waker(waker: &Waker) -> TaskRef { #[cfg(feature = "nightly")] { - let raw_waker = waker.as_raw(); - (raw_waker.vtable(), raw_waker.data()) + (waker.vtable(), waker.data()) } }; -- cgit From 1e850ae79149e737c1ba39a383596eabcb0bb940 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Wed, 6 Nov 2024 10:44:01 +0100 Subject: Detect and allow older nightlies --- embassy-executor/src/raw/waker.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 8bb2cfd05..30b8cdd4c 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -50,7 +50,16 @@ pub fn task_from_waker(waker: &Waker) -> TaskRef { #[cfg(feature = "nightly")] { - (waker.vtable(), waker.data()) + #[cfg(not(at_least_2024_09_06))] + { + let raw_waker = waker.as_raw(); + (raw_waker.vtable(), raw_waker.data()) + } + + #[cfg(at_least_2024_09_06)] + { + (waker.vtable(), waker.data()) + } } }; -- cgit From baeb59b5b8d63ef9bb6ecada518ea8b911d2dc30 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 12 Nov 2024 16:28:26 +0100 Subject: executor: use WakerHack unconditionally even if `nightly` feature is enabled. (#3528) This ensures the executor compiles with all recent nightly versions, including the stable-but-with-nightly-features-enabled xtensa rustc. --- embassy-executor/src/raw/waker.rs | 42 ++++++++++----------------------------- 1 file changed, 11 insertions(+), 31 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 30b8cdd4c..d2256adfa 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -32,40 +32,20 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { /// /// Panics if the waker is not created by the Embassy executor. pub fn task_from_waker(waker: &Waker) -> TaskRef { - let (vtable, data) = { - #[cfg(not(feature = "nightly"))] - { - struct WakerHack { - data: *const (), - vtable: &'static RawWakerVTable, - } - - // safety: OK because WakerHack has the same layout as Waker. - // This is not really guaranteed because the structs are `repr(Rust)`, it is - // indeed the case in the current implementation. - // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 - let hack: &WakerHack = unsafe { core::mem::transmute(waker) }; - (hack.vtable, hack.data) - } - - #[cfg(feature = "nightly")] - { - #[cfg(not(at_least_2024_09_06))] - { - let raw_waker = waker.as_raw(); - (raw_waker.vtable(), raw_waker.data()) - } + struct WakerHack { + data: *const (), + vtable: &'static RawWakerVTable, + } - #[cfg(at_least_2024_09_06)] - { - (waker.vtable(), waker.data()) - } - } - }; + // safety: OK because WakerHack has the same layout as Waker. + // This is not really guaranteed because the structs are `repr(Rust)`, it is + // indeed the case in the current implementation. + // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 + let hack: &WakerHack = unsafe { core::mem::transmute(waker) }; - if vtable != &VTABLE { + if hack.vtable != &VTABLE { panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") } // safety: our wakers are always created with `TaskRef::as_ptr` - unsafe { TaskRef::from_ptr(data as *const TaskHeader) } + unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) } } -- cgit From 853c5c567add8134b8419cf0a6a2b6c8cb0b0aa6 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 12 Nov 2024 16:30:46 +0100 Subject: executor: compare vtable addr instead of contents. Saves a whopping 44 bytes of text, yay. --- embassy-executor/src/raw/waker.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index d2256adfa..9c70f995a 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -43,7 +43,9 @@ pub fn task_from_waker(waker: &Waker) -> TaskRef { // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 let hack: &WakerHack = unsafe { core::mem::transmute(waker) }; - if hack.vtable != &VTABLE { + // make sure to compare vtable addresses. Doing `==` on the references + // will compare the contents, which is slower. + if hack.vtable as *const _ != &VTABLE as *const _ { panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") } // safety: our wakers are always created with `TaskRef::as_ptr` -- cgit From ff02ee1a221122ede6e30a94156c42e22b400578 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 19 Nov 2024 14:39:32 +0100 Subject: Only set callback once --- embassy-executor/src/raw/mod.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index d9ea5c005..e8a5b8970 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -328,7 +328,7 @@ impl SyncExecutor { #[cfg(feature = "integrated-timers")] let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) }; - Self { + let this = Self { run_queue: RunQueue::new(), pender, @@ -336,7 +336,12 @@ impl SyncExecutor { timer_queue: timer_queue::TimerQueue::new(), #[cfg(feature = "integrated-timers")] alarm, - } + }; + + #[cfg(feature = "integrated-timers")] + embassy_time_driver::set_alarm_callback(this.alarm, Self::alarm_callback, &this as *const _ as *mut ()); + + this } /// Enqueue a task in the task queue @@ -374,9 +379,6 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { - #[cfg(feature = "integrated-timers")] - embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); - #[allow(clippy::never_loop)] loop { #[cfg(feature = "integrated-timers")] -- cgit From 8ebe059ecb311ee949f92dde33f2cb8d972b0f7b Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 19 Nov 2024 15:59:31 +0100 Subject: Add initialize --- embassy-executor/src/raw/mod.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e8a5b8970..ebabee1ba 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -328,7 +328,7 @@ impl SyncExecutor { #[cfg(feature = "integrated-timers")] let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) }; - let this = Self { + Self { run_queue: RunQueue::new(), pender, @@ -336,12 +336,12 @@ impl SyncExecutor { timer_queue: timer_queue::TimerQueue::new(), #[cfg(feature = "integrated-timers")] alarm, - }; + } + } + pub(crate) unsafe fn initialize(&'static self) { #[cfg(feature = "integrated-timers")] - embassy_time_driver::set_alarm_callback(this.alarm, Self::alarm_callback, &this as *const _ as *mut ()); - - this + embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); } /// Enqueue a task in the task queue @@ -494,6 +494,15 @@ impl Executor { } } + /// Initializes the executor. + /// + /// # Safety + /// + /// This function must be called once before any other method is called. + pub unsafe fn initialize(&'static self) { + self.inner.initialize(); + } + /// Spawn a task in this executor. /// /// # Safety @@ -518,6 +527,8 @@ impl Executor { /// /// # Safety /// + /// You must call `initialize` before calling this method. + /// /// You must NOT call `poll` reentrantly on the same executor. /// /// In particular, note that `poll` may call the pender synchronously. Therefore, you -- cgit From f0be2fdce4856888bd412fe9475ae55e05cf20a2 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 9 Dec 2024 15:16:03 +0100 Subject: Extend tracing api to support executor id and end task Allow applications to provide a trace implementation that only needs to implement APIs used by the embassy executor, and provide more context in the event of multiple executors being used. --- embassy-executor/src/raw/mod.rs | 52 ++++++---------------- embassy-executor/src/raw/trace.rs | 90 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 40 deletions(-) create mode 100644 embassy-executor/src/raw/trace.rs (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index ebabee1ba..3f93eae6f 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -18,6 +18,8 @@ mod state; #[cfg(feature = "integrated-timers")] mod timer_queue; +#[cfg(feature = "trace")] +mod trace; pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; @@ -31,8 +33,6 @@ use core::task::{Context, Poll}; #[cfg(feature = "integrated-timers")] use embassy_time_driver::AlarmHandle; -#[cfg(feature = "rtos-trace")] -use rtos_trace::trace; use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; @@ -352,8 +352,8 @@ impl SyncExecutor { /// - `task` must NOT be already enqueued (in this executor or another one). #[inline(always)] unsafe fn enqueue(&self, task: TaskRef) { - #[cfg(feature = "rtos-trace")] - trace::task_ready_begin(task.as_ptr() as u32); + #[cfg(feature = "trace")] + trace::task_ready_begin(self, &task); if self.run_queue.enqueue(task) { self.pender.pend(); @@ -369,8 +369,8 @@ impl SyncExecutor { pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); - #[cfg(feature = "rtos-trace")] - trace::task_new(task.as_ptr() as u32); + #[cfg(feature = "trace")] + trace::task_new(self, &task); self.enqueue(task); } @@ -400,14 +400,14 @@ impl SyncExecutor { return; } - #[cfg(feature = "rtos-trace")] - trace::task_exec_begin(p.as_ptr() as u32); + #[cfg(feature = "trace")] + trace::task_exec_begin(self, &p); // Run the task task.poll_fn.get().unwrap_unchecked()(p); - #[cfg(feature = "rtos-trace")] - trace::task_exec_end(); + #[cfg(feature = "trace")] + trace::task_exec_end(self, &p); // Enqueue or update into timer_queue #[cfg(feature = "integrated-timers")] @@ -430,8 +430,8 @@ impl SyncExecutor { } } - #[cfg(feature = "rtos-trace")] - trace::system_idle(); + #[cfg(feature = "trace")] + trace::executor_idle(self) } } @@ -593,31 +593,3 @@ impl embassy_time_queue_driver::TimerQueue for TimerQueue { #[cfg(feature = "integrated-timers")] embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); - -#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))] -const fn gcd(a: u64, b: u64) -> u64 { - if b == 0 { - a - } else { - gcd(b, a % b) - } -} - -#[cfg(feature = "rtos-trace")] -impl rtos_trace::RtosTraceOSCallbacks for Executor { - fn task_list() { - // We don't know what tasks exist, so we can't send them. - } - #[cfg(feature = "integrated-timers")] - fn time() -> u64 { - const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000); - embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M) - } - #[cfg(not(feature = "integrated-timers"))] - fn time() -> u64 { - 0 - } -} - -#[cfg(feature = "rtos-trace")] -rtos_trace::global_os_callbacks! {Executor} diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs new file mode 100644 index 000000000..c7bcf9c11 --- /dev/null +++ b/embassy-executor/src/raw/trace.rs @@ -0,0 +1,90 @@ +#![allow(unused)] +use crate::raw::{SyncExecutor, TaskRef}; + +#[cfg(not(feature = "rtos-trace"))] +extern "Rust" { + fn _embassy_trace_task_new(executor_id: u32, task_id: u32); + fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32); + fn _embassy_trace_task_exec_end(excutor_id: u32, task_id: u32); + fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32); + fn _embassy_trace_executor_idle(executor_id: u32); +} + +#[inline] +pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) + } + + #[cfg(feature = "rtos-trace")] + rtos_trace::trace::task_new(task.as_ptr() as u32); +} + +#[inline] +pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32) + } + #[cfg(feature = "rtos-trace")] + rtos_trace::trace::task_ready_begin(task.as_ptr() as u32); +} + +#[inline] +pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32) + } + #[cfg(feature = "rtos-trace")] + rtos_trace::trace::task_exec_begin(task.as_ptr() as u32); +} + +#[inline] +pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32) + } + #[cfg(feature = "rtos-trace")] + rtos_trace::trace::task_exec_end(); +} + +#[inline] +pub(crate) fn executor_idle(executor: &SyncExecutor) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_executor_idle(executor as *const _ as u32) + } + #[cfg(feature = "rtos-trace")] + rtos_trace::trace::system_idle(); +} + +#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))] +const fn gcd(a: u64, b: u64) -> u64 { + if b == 0 { + a + } else { + gcd(b, a % b) + } +} + +#[cfg(feature = "rtos-trace")] +impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { + fn task_list() { + // We don't know what tasks exist, so we can't send them. + } + #[cfg(feature = "integrated-timers")] + fn time() -> u64 { + const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000); + embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M) + } + #[cfg(not(feature = "integrated-timers"))] + fn time() -> u64 { + 0 + } +} + +#[cfg(feature = "rtos-trace")] +rtos_trace::global_os_callbacks! {SyncExecutor} -- cgit From 5a5495aac43d75610735f2ca80fb6c8e8f31ed71 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 26 Nov 2024 23:54:21 +0100 Subject: Refactor integrated-timers --- embassy-executor/src/raw/mod.rs | 135 ++++++-------------------------- embassy-executor/src/raw/timer_queue.rs | 89 +++++++++++++-------- embassy-executor/src/raw/util.rs | 5 ++ 3 files changed, 88 insertions(+), 141 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 3f93eae6f..80bd49bad 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -17,7 +17,7 @@ mod run_queue; mod state; #[cfg(feature = "integrated-timers")] -mod timer_queue; +pub mod timer_queue; #[cfg(feature = "trace")] mod trace; pub(crate) mod util; @@ -31,9 +31,6 @@ use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; -#[cfg(feature = "integrated-timers")] -use embassy_time_driver::AlarmHandle; - use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; @@ -47,8 +44,7 @@ pub(crate) struct TaskHeader { pub(crate) executor: SyncUnsafeCell>, poll_fn: SyncUnsafeCell>, - #[cfg(feature = "integrated-timers")] - pub(crate) expires_at: SyncUnsafeCell, + /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. #[cfg(feature = "integrated-timers")] pub(crate) timer_queue_item: timer_queue::TimerQueueItem, } @@ -80,6 +76,12 @@ impl TaskRef { unsafe { self.ptr.as_ref() } } + /// Returns a reference to the executor that the task is currently running on. + #[cfg(feature = "integrated-timers")] + pub unsafe fn executor(self) -> Option<&'static Executor> { + self.header().executor.get().map(|e| Executor::wrap(e)) + } + /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() @@ -120,8 +122,6 @@ impl TaskStorage { // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), - #[cfg(feature = "integrated-timers")] - expires_at: SyncUnsafeCell::new(0), #[cfg(feature = "integrated-timers")] timer_queue_item: timer_queue::TimerQueueItem::new(), }, @@ -160,9 +160,6 @@ impl TaskStorage { Poll::Ready(_) => { this.future.drop_in_place(); this.raw.state.despawn(); - - #[cfg(feature = "integrated-timers")] - this.raw.expires_at.set(u64::MAX); } Poll::Pending => {} } @@ -316,34 +313,16 @@ impl Pender { pub(crate) struct SyncExecutor { run_queue: RunQueue, pender: Pender, - - #[cfg(feature = "integrated-timers")] - pub(crate) timer_queue: timer_queue::TimerQueue, - #[cfg(feature = "integrated-timers")] - alarm: AlarmHandle, } impl SyncExecutor { pub(crate) fn new(pender: Pender) -> Self { - #[cfg(feature = "integrated-timers")] - let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) }; - Self { run_queue: RunQueue::new(), pender, - - #[cfg(feature = "integrated-timers")] - timer_queue: timer_queue::TimerQueue::new(), - #[cfg(feature = "integrated-timers")] - alarm, } } - pub(crate) unsafe fn initialize(&'static self) { - #[cfg(feature = "integrated-timers")] - embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); - } - /// Enqueue a task in the task queue /// /// # Safety @@ -360,12 +339,6 @@ impl SyncExecutor { } } - #[cfg(feature = "integrated-timers")] - fn alarm_callback(ctx: *mut ()) { - let this: &Self = unsafe { &*(ctx as *const Self) }; - this.pender.pend(); - } - pub(super) unsafe fn spawn(&'static self, task: TaskRef) { task.header().executor.set(Some(self)); @@ -379,56 +352,27 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { - #[allow(clippy::never_loop)] - loop { - #[cfg(feature = "integrated-timers")] - self.timer_queue - .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); - - self.run_queue.dequeue_all(|p| { - let task = p.header(); - - #[cfg(feature = "integrated-timers")] - task.expires_at.set(u64::MAX); - - if !task.state.run_dequeue() { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } - - #[cfg(feature = "trace")] - trace::task_exec_begin(self, &p); + self.run_queue.dequeue_all(|p| { + let task = p.header(); + + if !task.state.run_dequeue() { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } - // Run the task - task.poll_fn.get().unwrap_unchecked()(p); + #[cfg(feature = "trace")] + trace::task_exec_begin(self, &p); - #[cfg(feature = "trace")] - trace::task_exec_end(self, &p); + // Run the task + task.poll_fn.get().unwrap_unchecked()(p); - // Enqueue or update into timer_queue - #[cfg(feature = "integrated-timers")] - self.timer_queue.update(p); - }); - - #[cfg(feature = "integrated-timers")] - { - // If this is already in the past, set_alarm might return false - // In that case do another poll loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - if embassy_time_driver::set_alarm(self.alarm, next_expiration) { - break; - } - } - - #[cfg(not(feature = "integrated-timers"))] - { - break; - } - } + #[cfg(feature = "trace")] + trace::task_exec_end(self, &p); + }); #[cfg(feature = "trace")] trace::executor_idle(self) @@ -494,15 +438,6 @@ impl Executor { } } - /// Initializes the executor. - /// - /// # Safety - /// - /// This function must be called once before any other method is called. - pub unsafe fn initialize(&'static self) { - self.inner.initialize(); - } - /// Spawn a task in this executor. /// /// # Safety @@ -575,21 +510,3 @@ pub fn wake_task_no_pend(task: TaskRef) { } } } - -#[cfg(feature = "integrated-timers")] -struct TimerQueue; - -#[cfg(feature = "integrated-timers")] -impl embassy_time_queue_driver::TimerQueue for TimerQueue { - fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) { - let task = waker::task_from_waker(waker); - let task = task.header(); - unsafe { - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); - } - } -} - -#[cfg(feature = "integrated-timers")] -embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 94a5f340b..953bf014f 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,75 +1,100 @@ +//! Timer queue operations. use core::cmp::min; +use super::util::SyncUnsafeCell; use super::TaskRef; -use crate::raw::util::SyncUnsafeCell; pub(crate) struct TimerQueueItem { next: SyncUnsafeCell>, + expires_at: SyncUnsafeCell, } impl TimerQueueItem { pub const fn new() -> Self { Self { next: SyncUnsafeCell::new(None), + expires_at: SyncUnsafeCell::new(0), } } } -pub(crate) struct TimerQueue { +/// A timer queue, with items integrated into tasks. +pub struct TimerQueue { head: SyncUnsafeCell>, } impl TimerQueue { + /// Creates a new timer queue. pub const fn new() -> Self { Self { head: SyncUnsafeCell::new(None), } } - pub(crate) unsafe fn update(&self, p: TaskRef) { - let task = p.header(); - if task.expires_at.get() != u64::MAX { + /// Schedules a task to run at a specific time. + /// + /// If this function returns `true`, the called should find the next expiration time and set + /// a new alarm for that time. + pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool { + unsafe { + let task = p.header(); + let item = &task.timer_queue_item; if task.state.timer_enqueue() { - task.timer_queue_item.next.set(self.head.get()); - self.head.set(Some(p)); + // If not in the queue, add it and update. + let prev = self.head.replace(Some(p)); + item.next.set(prev); + } else if at <= item.expires_at.get() { + // If expiration is sooner than previously set, update. + } else { + // Task does not need to be updated. + return false; } + + item.expires_at.set(at); + true } } - pub(crate) unsafe fn next_expiration(&self) -> u64 { - let mut res = u64::MAX; - self.retain(|p| { - let task = p.header(); - let expires = task.expires_at.get(); - res = min(res, expires); - expires != u64::MAX - }); - res - } + /// Dequeues expired timers and returns the next alarm time. + /// + /// The provided callback will be called for each expired task. Tasks that never expire + /// will be removed, but the callback will not be called. + pub fn next_expiration(&mut self, now: u64) -> u64 { + let mut next_expiration = u64::MAX; - pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) { self.retain(|p| { let task = p.header(); - if task.expires_at.get() <= now { - on_task(p); + let item = &task.timer_queue_item; + let expires = unsafe { item.expires_at.get() }; + + if expires <= now { + // Timer expired, process task. + super::wake_task(p); false } else { - true + // Timer didn't yet expire, or never expires. + next_expiration = min(next_expiration, expires); + expires != u64::MAX } }); + + next_expiration } - pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { - let mut prev = &self.head; - while let Some(p) = prev.get() { - let task = p.header(); - if f(p) { - // Skip to next - prev = &task.timer_queue_item.next; - } else { - // Remove it - prev.set(task.timer_queue_item.next.get()); - task.state.timer_dequeue(); + fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { + unsafe { + let mut prev = &self.head; + while let Some(p) = prev.get() { + let task = p.header(); + let item = &task.timer_queue_item; + if f(p) { + // Skip to next + prev = &item.next; + } else { + // Remove it + prev.set(item.next.get()); + task.state.timer_dequeue(); + } } } } diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index c46085e45..e2633658a 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs @@ -54,4 +54,9 @@ impl SyncUnsafeCell { { *self.value.get() } + + #[cfg(feature = "integrated-timers")] + pub unsafe fn replace(&self, value: T) -> T { + core::mem::replace(&mut *self.value.get(), value) + } } -- cgit From 12f58fbcfd3f10b43795936127a890c6a0f8f280 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 8 Dec 2024 23:04:43 +0100 Subject: Remove TIMER_QUEUED state --- embassy-executor/src/raw/state_atomics.rs | 18 ------------------ embassy-executor/src/raw/state_atomics_arm.rs | 19 ++----------------- embassy-executor/src/raw/state_critical_section.rs | 21 --------------------- embassy-executor/src/raw/timer_queue.rs | 4 ++-- 4 files changed, 4 insertions(+), 58 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index e1279ac0b..e4127897e 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -4,9 +4,6 @@ use core::sync::atomic::{AtomicU32, Ordering}; pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -#[cfg(feature = "integrated-timers")] -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: AtomicU32, @@ -55,19 +52,4 @@ impl State { let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); state & STATE_SPAWNED != 0 } - - /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) - #[cfg(feature = "integrated-timers")] - #[inline(always)] - pub fn timer_enqueue(&self) -> bool { - let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); - old_state & STATE_TIMER_QUEUED == 0 - } - - /// Unmark the task as timer-queued. - #[cfg(feature = "integrated-timers")] - #[inline(always)] - pub fn timer_dequeue(&self) { - self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); - } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index e4dfe5093..b673c7359 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -11,9 +11,8 @@ pub(crate) struct State { spawned: AtomicBool, /// Task is in the executor run queue run_queued: AtomicBool, - /// Task is in the executor timer queue - timer_queued: AtomicBool, pad: AtomicBool, + pad2: AtomicBool, } impl State { @@ -21,8 +20,8 @@ impl State { Self { spawned: AtomicBool::new(false), run_queued: AtomicBool::new(false), - timer_queued: AtomicBool::new(false), pad: AtomicBool::new(false), + pad2: AtomicBool::new(false), } } @@ -86,18 +85,4 @@ impl State { self.run_queued.store(false, Ordering::Relaxed); r } - - /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) - #[cfg(feature = "integrated-timers")] - #[inline(always)] - pub fn timer_enqueue(&self) -> bool { - !self.timer_queued.swap(true, Ordering::Relaxed) - } - - /// Unmark the task as timer-queued. - #[cfg(feature = "integrated-timers")] - #[inline(always)] - pub fn timer_dequeue(&self) { - self.timer_queued.store(false, Ordering::Relaxed); - } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index c3cc1b0b7..b92eed006 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -6,9 +6,6 @@ use critical_section::Mutex; pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -#[cfg(feature = "integrated-timers")] -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: Mutex>, @@ -72,22 +69,4 @@ impl State { ok }) } - - /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) - #[cfg(feature = "integrated-timers")] - #[inline(always)] - pub fn timer_enqueue(&self) -> bool { - self.update(|s| { - let ok = *s & STATE_TIMER_QUEUED == 0; - *s |= STATE_TIMER_QUEUED; - ok - }) - } - - /// Unmark the task as timer-queued. - #[cfg(feature = "integrated-timers")] - #[inline(always)] - pub fn timer_dequeue(&self) { - self.update(|s| *s &= !STATE_TIMER_QUEUED); - } } diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 953bf014f..513397090 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -39,7 +39,7 @@ impl TimerQueue { unsafe { let task = p.header(); let item = &task.timer_queue_item; - if task.state.timer_enqueue() { + if item.next.get().is_none() { // If not in the queue, add it and update. let prev = self.head.replace(Some(p)); item.next.set(prev); @@ -93,7 +93,7 @@ impl TimerQueue { } else { // Remove it prev.set(item.next.get()); - task.state.timer_dequeue(); + item.next.set(None); } } } -- cgit From dc18ee29a0f93ce34892731ee0580a3e9e3f2298 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 8 Dec 2024 23:07:35 +0100 Subject: Do not access task header --- embassy-executor/src/raw/mod.rs | 6 ++++++ embassy-executor/src/raw/timer_queue.rs | 14 ++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 80bd49bad..f9c6509f1 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -82,6 +82,12 @@ impl TaskRef { self.header().executor.get().map(|e| Executor::wrap(e)) } + /// Returns a reference to the timer queue item. + #[cfg(feature = "integrated-timers")] + pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { + &self.header().timer_queue_item + } + /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 513397090..e0a22f4d4 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -4,13 +4,14 @@ use core::cmp::min; use super::util::SyncUnsafeCell; use super::TaskRef; -pub(crate) struct TimerQueueItem { +/// An item in the timer queue. +pub struct TimerQueueItem { next: SyncUnsafeCell>, expires_at: SyncUnsafeCell, } impl TimerQueueItem { - pub const fn new() -> Self { + pub(crate) const fn new() -> Self { Self { next: SyncUnsafeCell::new(None), expires_at: SyncUnsafeCell::new(0), @@ -37,8 +38,7 @@ impl TimerQueue { /// a new alarm for that time. pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool { unsafe { - let task = p.header(); - let item = &task.timer_queue_item; + let item = p.timer_queue_item(); if item.next.get().is_none() { // If not in the queue, add it and update. let prev = self.head.replace(Some(p)); @@ -63,8 +63,7 @@ impl TimerQueue { let mut next_expiration = u64::MAX; self.retain(|p| { - let task = p.header(); - let item = &task.timer_queue_item; + let item = p.timer_queue_item(); let expires = unsafe { item.expires_at.get() }; if expires <= now { @@ -85,8 +84,7 @@ impl TimerQueue { unsafe { let mut prev = &self.head; while let Some(p) = prev.get() { - let task = p.header(); - let item = &task.timer_queue_item; + let item = p.timer_queue_item(); if f(p) { // Skip to next prev = &item.next; -- cgit From d45ea43892198484b5f6dcea4c351dc11d226cc4 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 8 Dec 2024 23:21:53 +0100 Subject: Move integrated timer queue into time-queue-driver --- embassy-executor/src/raw/timer_queue.rs | 96 ++++----------------------------- embassy-executor/src/raw/util.rs | 5 -- 2 files changed, 11 insertions(+), 90 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index e0a22f4d4..46e346c1b 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,99 +1,25 @@ //! Timer queue operations. -use core::cmp::min; -use super::util::SyncUnsafeCell; +use core::cell::Cell; + use super::TaskRef; /// An item in the timer queue. pub struct TimerQueueItem { - next: SyncUnsafeCell>, - expires_at: SyncUnsafeCell, -} + /// The next item in the queue. + pub next: Cell>, -impl TimerQueueItem { - pub(crate) const fn new() -> Self { - Self { - next: SyncUnsafeCell::new(None), - expires_at: SyncUnsafeCell::new(0), - } - } + /// The time at which this item expires. + pub expires_at: Cell, } -/// A timer queue, with items integrated into tasks. -pub struct TimerQueue { - head: SyncUnsafeCell>, -} +unsafe impl Sync for TimerQueueItem {} -impl TimerQueue { - /// Creates a new timer queue. - pub const fn new() -> Self { +impl TimerQueueItem { + pub(crate) const fn new() -> Self { Self { - head: SyncUnsafeCell::new(None), - } - } - - /// Schedules a task to run at a specific time. - /// - /// If this function returns `true`, the called should find the next expiration time and set - /// a new alarm for that time. - pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool { - unsafe { - let item = p.timer_queue_item(); - if item.next.get().is_none() { - // If not in the queue, add it and update. - let prev = self.head.replace(Some(p)); - item.next.set(prev); - } else if at <= item.expires_at.get() { - // If expiration is sooner than previously set, update. - } else { - // Task does not need to be updated. - return false; - } - - item.expires_at.set(at); - true - } - } - - /// Dequeues expired timers and returns the next alarm time. - /// - /// The provided callback will be called for each expired task. Tasks that never expire - /// will be removed, but the callback will not be called. - pub fn next_expiration(&mut self, now: u64) -> u64 { - let mut next_expiration = u64::MAX; - - self.retain(|p| { - let item = p.timer_queue_item(); - let expires = unsafe { item.expires_at.get() }; - - if expires <= now { - // Timer expired, process task. - super::wake_task(p); - false - } else { - // Timer didn't yet expire, or never expires. - next_expiration = min(next_expiration, expires); - expires != u64::MAX - } - }); - - next_expiration - } - - fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { - unsafe { - let mut prev = &self.head; - while let Some(p) = prev.get() { - let item = p.timer_queue_item(); - if f(p) { - // Skip to next - prev = &item.next; - } else { - // Remove it - prev.set(item.next.get()); - item.next.set(None); - } - } + next: Cell::new(None), + expires_at: Cell::new(0), } } } diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index e2633658a..c46085e45 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs @@ -54,9 +54,4 @@ impl SyncUnsafeCell { { *self.value.get() } - - #[cfg(feature = "integrated-timers")] - pub unsafe fn replace(&self, value: T) -> T { - core::mem::replace(&mut *self.value.get(), value) - } } -- cgit From ec96395d084d5edc8be25ddaea8547e2ebd447a6 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 9 Dec 2024 08:43:57 +0100 Subject: Prevent task from respawning while in the timer queue --- embassy-executor/src/raw/mod.rs | 36 ++++++++++++++++++- embassy-executor/src/raw/state_atomics.rs | 36 +++++++++++++++++++ embassy-executor/src/raw/state_atomics_arm.rs | 40 ++++++++++++++++++++-- embassy-executor/src/raw/state_critical_section.rs | 29 ++++++++++++++++ embassy-executor/src/raw/timer_queue.rs | 15 +++++++- 5 files changed, 152 insertions(+), 4 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f9c6509f1..14d689900 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -50,7 +50,7 @@ pub(crate) struct TaskHeader { } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq)] pub struct TaskRef { ptr: NonNull, } @@ -72,6 +72,16 @@ impl TaskRef { } } + /// # Safety + /// + /// The result of this function must only be compared + /// for equality, or stored, but not used. + pub const unsafe fn dangling() -> Self { + Self { + ptr: NonNull::dangling(), + } + } + pub(crate) fn header(self) -> &'static TaskHeader { unsafe { self.ptr.as_ref() } } @@ -88,6 +98,30 @@ impl TaskRef { &self.header().timer_queue_item } + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + /// + /// Entering this state prevents the task from being respawned while in a timer queue. + /// + /// Safety: + /// + /// This functions should only be called by the timer queue implementation, before + /// enqueueing the timer item. + #[cfg(feature = "integrated-timers")] + pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { + self.header().state.timer_enqueue() + } + + /// Unmark the task as timer-queued. + /// + /// Safety: + /// + /// This functions should only be called by the timer queue implementation, after the task has + /// been removed from the timer queue. + #[cfg(feature = "integrated-timers")] + pub unsafe fn timer_dequeue(&self) { + self.header().state.timer_dequeue() + } + /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index e4127897e..d03c61ade 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -1,9 +1,15 @@ use core::sync::atomic::{AtomicU32, Ordering}; +#[cfg(feature = "integrated-timers")] +use super::timer_queue::TimerEnqueueOperation; + /// Task is spawned (has a future) pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: AtomicU32, @@ -52,4 +58,34 @@ impl State { let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); state & STATE_SPAWNED != 0 } + + /// Mark the task as timer-queued. Return whether it can be enqueued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> TimerEnqueueOperation { + if self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // If not started, ignore it + if state & STATE_SPAWNED == 0 { + None + } else { + // Mark it as enqueued + Some(state | STATE_TIMER_QUEUED) + } + }) + .is_ok() + { + TimerEnqueueOperation::Enqueue + } else { + TimerEnqueueOperation::Ignore + } + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed); + } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index b673c7359..f6f2e8f08 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -1,9 +1,14 @@ use core::arch::asm; use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; +#[cfg(feature = "integrated-timers")] +use super::timer_queue::TimerEnqueueOperation; + // Must be kept in sync with the layout of `State`! pub(crate) const STATE_SPAWNED: u32 = 1 << 0; pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16; #[repr(C, align(4))] pub(crate) struct State { @@ -11,8 +16,9 @@ pub(crate) struct State { spawned: AtomicBool, /// Task is in the executor run queue run_queued: AtomicBool, + /// Task is in the executor timer queue + timer_queued: AtomicBool, pad: AtomicBool, - pad2: AtomicBool, } impl State { @@ -20,8 +26,8 @@ impl State { Self { spawned: AtomicBool::new(false), run_queued: AtomicBool::new(false), + timer_queued: AtomicBool::new(false), pad: AtomicBool::new(false), - pad2: AtomicBool::new(false), } } @@ -85,4 +91,34 @@ impl State { self.run_queued.store(false, Ordering::Relaxed); r } + + /// Mark the task as timer-queued. Return whether it can be enqueued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> TimerEnqueueOperation { + if self + .as_u32() + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // If not started, ignore it + if state & STATE_SPAWNED == 0 { + None + } else { + // Mark it as enqueued + Some(state | STATE_TIMER_QUEUED) + } + }) + .is_ok() + { + TimerEnqueueOperation::Enqueue + } else { + TimerEnqueueOperation::Ignore + } + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.timer_queued.store(false, Ordering::Relaxed); + } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index b92eed006..c0ec2f530 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -2,10 +2,16 @@ use core::cell::Cell; use critical_section::Mutex; +#[cfg(feature = "integrated-timers")] +use super::timer_queue::TimerEnqueueOperation; + /// Task is spawned (has a future) pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: Mutex>, @@ -69,4 +75,27 @@ impl State { ok }) } + + /// Mark the task as timer-queued. Return whether it can be enqueued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> TimerEnqueueOperation { + self.update(|s| { + // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is + // just being spawned, because its executor pointer may still be changing. + if *s & STATE_SPAWNED == STATE_SPAWNED { + *s |= STATE_TIMER_QUEUED; + TimerEnqueueOperation::Enqueue + } else { + TimerEnqueueOperation::Ignore + } + }) + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.update(|s| *s &= !STATE_TIMER_QUEUED); + } } diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 46e346c1b..c36708401 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -7,6 +7,9 @@ use super::TaskRef; /// An item in the timer queue. pub struct TimerQueueItem { /// The next item in the queue. + /// + /// If this field contains `Some`, the item is in the queue. The last item in the queue has a + /// value of `Some(dangling_pointer)` pub next: Cell>, /// The time at which this item expires. @@ -19,7 +22,17 @@ impl TimerQueueItem { pub(crate) const fn new() -> Self { Self { next: Cell::new(None), - expires_at: Cell::new(0), + expires_at: Cell::new(u64::MAX), } } } + +/// The operation to perform after `timer_enqueue` is called. +#[derive(Debug, Copy, Clone, PartialEq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TimerEnqueueOperation { + /// Enqueue the task. + Enqueue, + /// Update the task's expiration time. + Ignore, +} -- cgit From 2f2e2c6031a1abaecdac5ed2febe109e647fe6fd Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 9 Dec 2024 00:28:14 +0100 Subject: Make `integrated-timers` the default, remove Cargo feature. --- embassy-executor/src/raw/mod.rs | 7 ------- embassy-executor/src/raw/state_atomics.rs | 4 ---- embassy-executor/src/raw/state_atomics_arm.rs | 4 ---- embassy-executor/src/raw/state_critical_section.rs | 4 ---- embassy-executor/src/raw/trace.rs | 22 ++++++++-------------- 5 files changed, 8 insertions(+), 33 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 14d689900..2feaab155 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -16,7 +16,6 @@ mod run_queue; #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] mod state; -#[cfg(feature = "integrated-timers")] pub mod timer_queue; #[cfg(feature = "trace")] mod trace; @@ -45,7 +44,6 @@ pub(crate) struct TaskHeader { poll_fn: SyncUnsafeCell>, /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. - #[cfg(feature = "integrated-timers")] pub(crate) timer_queue_item: timer_queue::TimerQueueItem, } @@ -87,13 +85,11 @@ impl TaskRef { } /// Returns a reference to the executor that the task is currently running on. - #[cfg(feature = "integrated-timers")] pub unsafe fn executor(self) -> Option<&'static Executor> { self.header().executor.get().map(|e| Executor::wrap(e)) } /// Returns a reference to the timer queue item. - #[cfg(feature = "integrated-timers")] pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { &self.header().timer_queue_item } @@ -106,7 +102,6 @@ impl TaskRef { /// /// This functions should only be called by the timer queue implementation, before /// enqueueing the timer item. - #[cfg(feature = "integrated-timers")] pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { self.header().state.timer_enqueue() } @@ -117,7 +112,6 @@ impl TaskRef { /// /// This functions should only be called by the timer queue implementation, after the task has /// been removed from the timer queue. - #[cfg(feature = "integrated-timers")] pub unsafe fn timer_dequeue(&self) { self.header().state.timer_dequeue() } @@ -162,7 +156,6 @@ impl TaskStorage { // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), - #[cfg(feature = "integrated-timers")] timer_queue_item: timer_queue::TimerQueueItem::new(), }, future: UninitCell::uninit(), diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index d03c61ade..15eb9a368 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -1,6 +1,5 @@ use core::sync::atomic::{AtomicU32, Ordering}; -#[cfg(feature = "integrated-timers")] use super::timer_queue::TimerEnqueueOperation; /// Task is spawned (has a future) @@ -8,7 +7,6 @@ pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; /// Task is in the executor timer queue -#[cfg(feature = "integrated-timers")] pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { @@ -60,7 +58,6 @@ impl State { } /// Mark the task as timer-queued. Return whether it can be enqueued. - #[cfg(feature = "integrated-timers")] #[inline(always)] pub fn timer_enqueue(&self) -> TimerEnqueueOperation { if self @@ -83,7 +80,6 @@ impl State { } /// Unmark the task as timer-queued. - #[cfg(feature = "integrated-timers")] #[inline(always)] pub fn timer_dequeue(&self) { self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed); diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index f6f2e8f08..7a152e8c0 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -1,13 +1,11 @@ use core::arch::asm; use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; -#[cfg(feature = "integrated-timers")] use super::timer_queue::TimerEnqueueOperation; // Must be kept in sync with the layout of `State`! pub(crate) const STATE_SPAWNED: u32 = 1 << 0; pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; -#[cfg(feature = "integrated-timers")] pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16; #[repr(C, align(4))] @@ -93,7 +91,6 @@ impl State { } /// Mark the task as timer-queued. Return whether it can be enqueued. - #[cfg(feature = "integrated-timers")] #[inline(always)] pub fn timer_enqueue(&self) -> TimerEnqueueOperation { if self @@ -116,7 +113,6 @@ impl State { } /// Unmark the task as timer-queued. - #[cfg(feature = "integrated-timers")] #[inline(always)] pub fn timer_dequeue(&self) { self.timer_queued.store(false, Ordering::Relaxed); diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index c0ec2f530..367162ba2 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -2,7 +2,6 @@ use core::cell::Cell; use critical_section::Mutex; -#[cfg(feature = "integrated-timers")] use super::timer_queue::TimerEnqueueOperation; /// Task is spawned (has a future) @@ -10,7 +9,6 @@ pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; /// Task is in the executor timer queue -#[cfg(feature = "integrated-timers")] pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { @@ -77,7 +75,6 @@ impl State { } /// Mark the task as timer-queued. Return whether it can be enqueued. - #[cfg(feature = "integrated-timers")] #[inline(always)] pub fn timer_enqueue(&self) -> TimerEnqueueOperation { self.update(|s| { @@ -93,7 +90,6 @@ impl State { } /// Unmark the task as timer-queued. - #[cfg(feature = "integrated-timers")] #[inline(always)] pub fn timer_dequeue(&self) { self.update(|s| *s &= !STATE_TIMER_QUEUED); diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index c7bcf9c11..b34387b58 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -61,29 +61,23 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { rtos_trace::trace::system_idle(); } -#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))] -const fn gcd(a: u64, b: u64) -> u64 { - if b == 0 { - a - } else { - gcd(b, a % b) - } -} - #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { // We don't know what tasks exist, so we can't send them. } - #[cfg(feature = "integrated-timers")] fn time() -> u64 { + const fn gcd(a: u64, b: u64) -> u64 { + if b == 0 { + a + } else { + gcd(b, a % b) + } + } + const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000); embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M) } - #[cfg(not(feature = "integrated-timers"))] - fn time() -> u64 { - 0 - } } #[cfg(feature = "rtos-trace")] -- cgit From 5c4983236c2e68b6ba2ce325ed77ec39466fc3b6 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Fri, 13 Dec 2024 21:45:52 +0100 Subject: Make sure an exited task does not get stuck in a timer queue --- embassy-executor/src/raw/mod.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 2feaab155..b825fa6c2 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -192,7 +192,17 @@ impl TaskStorage { match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); + + // Mark this task to be timer queued, to prevent re-queueing it. + this.raw.state.timer_enqueue(); + + // Now mark the task as not spawned, so that + // - it can be spawned again once it has been removed from the timer queue + // - it can not be timer-queued again this.raw.state.despawn(); + + // Schedule the task by hand in the past, so it runs immediately. + unsafe { _embassy_time_schedule_wake(0, &waker) } } Poll::Pending => {} } @@ -211,6 +221,10 @@ impl TaskStorage { } } +extern "Rust" { + fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker); +} + /// An uninitialized [`TaskStorage`]. pub struct AvailableTask { task: &'static TaskStorage, -- cgit From e861344b179b3e955ac47f1985b7f97fdfb93892 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 15 Dec 2024 17:44:42 +0100 Subject: Fix comments and tweak task exit --- embassy-executor/src/raw/mod.rs | 21 +++++++++++++++------ embassy-executor/src/raw/timer_queue.rs | 5 +++-- 2 files changed, 18 insertions(+), 8 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index b825fa6c2..7da14468d 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -94,13 +94,14 @@ impl TaskRef { &self.header().timer_queue_item } - /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + /// Mark the task as timer-queued. Return whether it should be actually enqueued + /// using `_embassy_time_schedule_wake`. /// /// Entering this state prevents the task from being respawned while in a timer queue. /// /// Safety: /// - /// This functions should only be called by the timer queue implementation, before + /// This functions should only be called by the timer queue driver, before /// enqueueing the timer item. pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { self.header().state.timer_enqueue() @@ -193,16 +194,24 @@ impl TaskStorage { Poll::Ready(_) => { this.future.drop_in_place(); - // Mark this task to be timer queued, to prevent re-queueing it. - this.raw.state.timer_enqueue(); + // Mark this task to be timer queued. + // We're splitting the enqueue in two parts, so that we can change task state + // to something that prevent re-queueing. + let op = this.raw.state.timer_enqueue(); // Now mark the task as not spawned, so that // - it can be spawned again once it has been removed from the timer queue // - it can not be timer-queued again + // We must do this before scheduling the wake, to prevent the task from being + // dequeued by the time driver while it's still SPAWNED. this.raw.state.despawn(); - // Schedule the task by hand in the past, so it runs immediately. - unsafe { _embassy_time_schedule_wake(0, &waker) } + // Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's + // better to be safe. + if op == timer_queue::TimerEnqueueOperation::Enqueue { + // Schedule the task in the past, so it gets dequeued ASAP. + unsafe { _embassy_time_schedule_wake(0, &waker) } + } } Poll::Pending => {} } diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index c36708401..cd9a73822 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -30,9 +30,10 @@ impl TimerQueueItem { /// The operation to perform after `timer_enqueue` is called. #[derive(Debug, Copy, Clone, PartialEq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[must_use] pub enum TimerEnqueueOperation { - /// Enqueue the task. + /// Enqueue the task (or update its expiration time). Enqueue, - /// Update the task's expiration time. + /// The task must not be enqueued in the timer queue. Ignore, } -- cgit From a10290b28e41922b0f53aafbcc82c49ee3f4e22f Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 09:15:15 +0100 Subject: Zero-inizialize expires_at --- embassy-executor/src/raw/timer_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index cd9a73822..2ba0e00a9 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -22,7 +22,7 @@ impl TimerQueueItem { pub(crate) const fn new() -> Self { Self { next: Cell::new(None), - expires_at: Cell::new(u64::MAX), + expires_at: Cell::new(0), } } } -- cgit From f389ba37219d842d7db0ab94cd421c69645a5757 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Fri, 13 Dec 2024 20:35:40 +0100 Subject: Only lock once to wake a task --- embassy-executor/src/raw/mod.rs | 20 ++++++----- embassy-executor/src/raw/run_queue_atomics.rs | 2 +- .../src/raw/run_queue_critical_section.rs | 10 +++--- embassy-executor/src/raw/state_atomics.rs | 20 +++++++++-- embassy-executor/src/raw/state_atomics_arm.rs | 19 +++++++--- embassy-executor/src/raw/state_critical_section.rs | 42 +++++++++++++--------- 6 files changed, 73 insertions(+), 40 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 7da14468d..bcbd214a9 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -386,11 +386,11 @@ impl SyncExecutor { /// - `task` must be set up to run in this executor. /// - `task` must NOT be already enqueued (in this executor or another one). #[inline(always)] - unsafe fn enqueue(&self, task: TaskRef) { + unsafe fn enqueue(&self, task: TaskRef, l: state::Token) { #[cfg(feature = "trace")] trace::task_ready_begin(self, &task); - if self.run_queue.enqueue(task) { + if self.run_queue.enqueue(task, l) { self.pender.pend(); } } @@ -401,7 +401,9 @@ impl SyncExecutor { #[cfg(feature = "trace")] trace::task_new(self, &task); - self.enqueue(task); + state::locked(|l| { + self.enqueue(task, l); + }) } /// # Safety @@ -544,13 +546,13 @@ impl Executor { /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task(task: TaskRef) { let header = task.header(); - if header.state.run_enqueue() { + header.state.run_enqueue(|l| { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); - executor.enqueue(task); + executor.enqueue(task, l); } - } + }); } /// Wake a task by `TaskRef` without calling pend. @@ -558,11 +560,11 @@ pub fn wake_task(task: TaskRef) { /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task_no_pend(task: TaskRef) { let header = task.header(); - if header.state.run_enqueue() { + header.state.run_enqueue(|l| { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); - executor.run_queue.enqueue(task); + executor.run_queue.enqueue(task, l); } - } + }); } diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index 90907cfda..efdafdff0 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -45,7 +45,7 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { let mut was_empty = false; self.head diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs index ba59c8f29..90f09e8c8 100644 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ b/embassy-executor/src/raw/run_queue_critical_section.rs @@ -44,13 +44,11 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { - critical_section::with(|cs| { - let prev = self.head.borrow(cs).replace(Some(task)); - task.header().run_queue_item.next.borrow(cs).set(prev); + pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool { + let prev = self.head.borrow(cs).replace(Some(task)); + task.header().run_queue_item.next.borrow(cs).set(prev); - prev.is_none() - }) + prev.is_none() } /// Empty the queue, then call `on_task` for each task that was in the queue. diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index 15eb9a368..abfe94486 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering}; use super::timer_queue::TimerEnqueueOperation; +pub(crate) struct Token(()); + +/// Creates a token and passes it to the closure. +/// +/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. +pub(crate) fn locked(f: impl FnOnce(Token)) { + f(Token(())); +} + /// Task is spawned (has a future) pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue @@ -34,10 +43,12 @@ impl State { self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); } - /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given + /// function if the task was successfully marked. #[inline(always)] - pub fn run_enqueue(&self) -> bool { - self.state + pub fn run_enqueue(&self, f: impl FnOnce(Token)) { + if self + .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { // If already scheduled, or if not started, if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { @@ -48,6 +59,9 @@ impl State { } }) .is_ok() + { + locked(f); + } } /// Unmark the task as run-queued. Return whether the task is spawned. diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index 7a152e8c0..f0f014652 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; use super::timer_queue::TimerEnqueueOperation; +pub(crate) struct Token(()); + +/// Creates a token and passes it to the closure. +/// +/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. +pub(crate) fn locked(f: impl FnOnce(Token)) { + f(Token(())); +} + // Must be kept in sync with the layout of `State`! pub(crate) const STATE_SPAWNED: u32 = 1 << 0; pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; @@ -57,9 +66,10 @@ impl State { self.spawned.store(false, Ordering::Relaxed); } - /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given + /// function if the task was successfully marked. #[inline(always)] - pub fn run_enqueue(&self) -> bool { + pub fn run_enqueue(&self, f: impl FnOnce(Token)) { unsafe { loop { let state: u32; @@ -67,14 +77,15 @@ impl State { if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { asm!("clrex", options(nomem, nostack)); - return false; + return; } let outcome: usize; let new_state = state | STATE_RUN_QUEUED; asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack)); if outcome == 0 { - return true; + locked(f); + return; } } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 367162ba2..8e570b33c 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -1,6 +1,7 @@ use core::cell::Cell; -use critical_section::Mutex; +pub(crate) use critical_section::{with as locked, CriticalSection as Token}; +use critical_section::{CriticalSection, Mutex}; use super::timer_queue::TimerEnqueueOperation; @@ -23,13 +24,15 @@ impl State { } fn update(&self, f: impl FnOnce(&mut u32) -> R) -> R { - critical_section::with(|cs| { - let s = self.state.borrow(cs); - let mut val = s.get(); - let r = f(&mut val); - s.set(val); - r - }) + critical_section::with(|cs| self.update_with_cs(cs, f)) + } + + fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R { + let s = self.state.borrow(cs); + let mut val = s.get(); + let r = f(&mut val); + s.set(val); + r } /// If task is idle, mark it as spawned + run_queued and return true. @@ -51,17 +54,22 @@ impl State { self.update(|s| *s &= !STATE_SPAWNED); } - /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given + /// function if the task was successfully marked. #[inline(always)] - pub fn run_enqueue(&self) -> bool { - self.update(|s| { - if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { - false - } else { - *s |= STATE_RUN_QUEUED; - true + pub fn run_enqueue(&self, f: impl FnOnce(Token)) { + critical_section::with(|cs| { + if self.update_with_cs(cs, |s| { + if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { + false + } else { + *s |= STATE_RUN_QUEUED; + true + } + }) { + f(cs); } - }) + }); } /// Unmark the task as run-queued. Return whether the task is spawned. -- cgit From b44ef5ccb40d6b778e623e6e68a234c2e0615d25 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 15:56:55 +0100 Subject: Fix racy access of TaskHeader::executor --- embassy-executor/src/raw/mod.rs | 70 ++++++++++++++++++++++++--- embassy-executor/src/raw/state_atomics.rs | 5 +- embassy-executor/src/raw/state_atomics_arm.rs | 5 +- 3 files changed, 69 insertions(+), 11 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index bcbd214a9..808a78389 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -14,7 +14,58 @@ mod run_queue; #[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] #[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] -mod state; +pub(crate) mod state; + +#[cfg(target_has_atomic = "ptr")] +mod owner { + use core::sync::atomic::{AtomicPtr, Ordering}; + + use super::{state::Token, SyncExecutor}; + + pub(crate) struct ExecutorRef(AtomicPtr); + + impl ExecutorRef { + pub const fn new() -> Self { + Self(AtomicPtr::new(core::ptr::null_mut())) + } + + pub fn set(&self, executor: Option<&'static SyncExecutor>, _: Token) { + let ptr = executor.map(|e| e as *const SyncExecutor).unwrap_or(core::ptr::null()); + self.0.store(ptr.cast_mut(), Ordering::Release); + } + + pub fn get(&self, _: Token) -> *const SyncExecutor { + self.0.load(Ordering::Acquire).cast_const() + } + } +} +#[cfg(not(target_has_atomic = "ptr"))] +mod owner { + use super::{state::Token, SyncExecutor}; + use core::cell::Cell; + + use critical_section::Mutex; + + pub(crate) struct ExecutorRef(Mutex>); + + unsafe impl Send for ExecutorRef {} + unsafe impl Sync for ExecutorRef {} + + impl ExecutorRef { + pub const fn new() -> Self { + Self(Mutex::new(Cell::new(core::ptr::null()))) + } + + pub fn set(&self, executor: Option<&'static SyncExecutor>, cs: Token) { + let ptr = executor.map(|e| e as *const SyncExecutor).unwrap_or(core::ptr::null()); + self.0.borrow(cs).set(ptr); + } + + pub fn get(&self, cs: Token) -> *const SyncExecutor { + self.0.borrow(cs).get() + } + } +} pub mod timer_queue; #[cfg(feature = "trace")] @@ -30,6 +81,8 @@ use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; +use crate::raw::owner::ExecutorRef; + use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; @@ -40,7 +93,7 @@ use super::SpawnToken; pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: SyncUnsafeCell>, + pub(crate) executor: ExecutorRef, poll_fn: SyncUnsafeCell>, /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. @@ -86,7 +139,8 @@ impl TaskRef { /// Returns a reference to the executor that the task is currently running on. pub unsafe fn executor(self) -> Option<&'static Executor> { - self.header().executor.get().map(|e| Executor::wrap(e)) + let executor = state::locked(|token| self.header().executor.get(token)); + executor.as_ref().map(|e| Executor::wrap(e)) } /// Returns a reference to the timer queue item. @@ -153,7 +207,7 @@ impl TaskStorage { raw: TaskHeader { state: State::new(), run_queue_item: RunQueueItem::new(), - executor: SyncUnsafeCell::new(None), + executor: ExecutorRef::new(), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), @@ -396,7 +450,9 @@ impl SyncExecutor { } pub(super) unsafe fn spawn(&'static self, task: TaskRef) { - task.header().executor.set(Some(self)); + state::locked(|l| { + task.header().executor.set(Some(self), l); + }); #[cfg(feature = "trace")] trace::task_new(self, &task); @@ -549,7 +605,7 @@ pub fn wake_task(task: TaskRef) { header.state.run_enqueue(|l| { // We have just marked the task as scheduled, so enqueue it. unsafe { - let executor = header.executor.get().unwrap_unchecked(); + let executor = header.executor.get(l).as_ref().unwrap_unchecked(); executor.enqueue(task, l); } }); @@ -563,7 +619,7 @@ pub fn wake_task_no_pend(task: TaskRef) { header.state.run_enqueue(|l| { // We have just marked the task as scheduled, so enqueue it. unsafe { - let executor = header.executor.get().unwrap_unchecked(); + let executor = header.executor.get(l).as_ref().unwrap_unchecked(); executor.run_queue.enqueue(task, l); } }); diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index abfe94486..d7350464f 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -2,13 +2,14 @@ use core::sync::atomic::{AtomicU32, Ordering}; use super::timer_queue::TimerEnqueueOperation; +#[derive(Clone, Copy)] pub(crate) struct Token(()); /// Creates a token and passes it to the closure. /// /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. -pub(crate) fn locked(f: impl FnOnce(Token)) { - f(Token(())); +pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { + f(Token(())) } /// Task is spawned (has a future) diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index f0f014652..c1e8f69ab 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -3,13 +3,14 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; use super::timer_queue::TimerEnqueueOperation; +#[derive(Clone, Copy)] pub(crate) struct Token(()); /// Creates a token and passes it to the closure. /// /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. -pub(crate) fn locked(f: impl FnOnce(Token)) { - f(Token(())); +pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { + f(Token(())) } // Must be kept in sync with the layout of `State`! -- cgit From b47a631abf0c200c3b29b8e4ec199421835a0525 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 17:24:17 +0100 Subject: Rely on atomic load-store on all targets --- embassy-executor/src/raw/mod.rs | 72 ++++++----------------------------------- 1 file changed, 10 insertions(+), 62 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 808a78389..5a476213b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -14,58 +14,7 @@ mod run_queue; #[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] #[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] -pub(crate) mod state; - -#[cfg(target_has_atomic = "ptr")] -mod owner { - use core::sync::atomic::{AtomicPtr, Ordering}; - - use super::{state::Token, SyncExecutor}; - - pub(crate) struct ExecutorRef(AtomicPtr); - - impl ExecutorRef { - pub const fn new() -> Self { - Self(AtomicPtr::new(core::ptr::null_mut())) - } - - pub fn set(&self, executor: Option<&'static SyncExecutor>, _: Token) { - let ptr = executor.map(|e| e as *const SyncExecutor).unwrap_or(core::ptr::null()); - self.0.store(ptr.cast_mut(), Ordering::Release); - } - - pub fn get(&self, _: Token) -> *const SyncExecutor { - self.0.load(Ordering::Acquire).cast_const() - } - } -} -#[cfg(not(target_has_atomic = "ptr"))] -mod owner { - use super::{state::Token, SyncExecutor}; - use core::cell::Cell; - - use critical_section::Mutex; - - pub(crate) struct ExecutorRef(Mutex>); - - unsafe impl Send for ExecutorRef {} - unsafe impl Sync for ExecutorRef {} - - impl ExecutorRef { - pub const fn new() -> Self { - Self(Mutex::new(Cell::new(core::ptr::null()))) - } - - pub fn set(&self, executor: Option<&'static SyncExecutor>, cs: Token) { - let ptr = executor.map(|e| e as *const SyncExecutor).unwrap_or(core::ptr::null()); - self.0.borrow(cs).set(ptr); - } - - pub fn get(&self, cs: Token) -> *const SyncExecutor { - self.0.borrow(cs).get() - } - } -} +mod state; pub mod timer_queue; #[cfg(feature = "trace")] @@ -79,10 +28,9 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; +use core::sync::atomic::{AtomicPtr, Ordering}; use core::task::{Context, Poll}; -use crate::raw::owner::ExecutorRef; - use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; @@ -93,7 +41,7 @@ use super::SpawnToken; pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, - pub(crate) executor: ExecutorRef, + pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. @@ -139,7 +87,7 @@ impl TaskRef { /// Returns a reference to the executor that the task is currently running on. pub unsafe fn executor(self) -> Option<&'static Executor> { - let executor = state::locked(|token| self.header().executor.get(token)); + let executor = self.header().executor.load(Ordering::Relaxed); executor.as_ref().map(|e| Executor::wrap(e)) } @@ -207,7 +155,7 @@ impl TaskStorage { raw: TaskHeader { state: State::new(), run_queue_item: RunQueueItem::new(), - executor: ExecutorRef::new(), + executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), @@ -450,9 +398,9 @@ impl SyncExecutor { } pub(super) unsafe fn spawn(&'static self, task: TaskRef) { - state::locked(|l| { - task.header().executor.set(Some(self), l); - }); + task.header() + .executor + .store((self as *const Self).cast_mut(), Ordering::Relaxed); #[cfg(feature = "trace")] trace::task_new(self, &task); @@ -605,7 +553,7 @@ pub fn wake_task(task: TaskRef) { header.state.run_enqueue(|l| { // We have just marked the task as scheduled, so enqueue it. unsafe { - let executor = header.executor.get(l).as_ref().unwrap_unchecked(); + let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked(); executor.enqueue(task, l); } }); @@ -619,7 +567,7 @@ pub fn wake_task_no_pend(task: TaskRef) { header.state.run_enqueue(|l| { // We have just marked the task as scheduled, so enqueue it. unsafe { - let executor = header.executor.get(l).as_ref().unwrap_unchecked(); + let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked(); executor.run_queue.enqueue(task, l); } }); -- cgit From 3c121e5425e0a1901c459d27e3e5929f86d0a206 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 18:08:19 +0100 Subject: Remove special handling of integrated timer queue --- embassy-executor/src/raw/mod.rs | 22 ---------------------- 1 file changed, 22 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 5a476213b..997db6756 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -195,25 +195,7 @@ impl TaskStorage { match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); - - // Mark this task to be timer queued. - // We're splitting the enqueue in two parts, so that we can change task state - // to something that prevent re-queueing. - let op = this.raw.state.timer_enqueue(); - - // Now mark the task as not spawned, so that - // - it can be spawned again once it has been removed from the timer queue - // - it can not be timer-queued again - // We must do this before scheduling the wake, to prevent the task from being - // dequeued by the time driver while it's still SPAWNED. this.raw.state.despawn(); - - // Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's - // better to be safe. - if op == timer_queue::TimerEnqueueOperation::Enqueue { - // Schedule the task in the past, so it gets dequeued ASAP. - unsafe { _embassy_time_schedule_wake(0, &waker) } - } } Poll::Pending => {} } @@ -232,10 +214,6 @@ impl TaskStorage { } } -extern "Rust" { - fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker); -} - /// An uninitialized [`TaskStorage`]. pub struct AvailableTask { task: &'static TaskStorage, -- cgit From c9f32b7e3667f29c4ab15d4dbab37acdb471d0ed Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 18:51:09 +0100 Subject: Attach payload to TimerQueueItem --- embassy-executor/src/raw/timer_queue.rs | 45 +++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 2ba0e00a9..c4dba18ff 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -4,6 +4,45 @@ use core::cell::Cell; use super::TaskRef; +#[cfg(feature = "_timer-item-payload")] +macro_rules! define_opaque { + ($size:tt) => { + /// An opaque data type. + #[repr(align($size))] + pub struct OpaqueData { + data: [u8; $size], + } + + impl OpaqueData { + const fn new() -> Self { + Self { data: [0; $size] } + } + + /// Access the data as a reference to a type `T`. + /// + /// Safety: + /// + /// The caller must ensure that the size of the type `T` is less than, or equal to + /// the size of the payload, and must ensure that the alignment of the type `T` is + /// less than, or equal to the alignment of the payload. + /// + /// The type must be valid when zero-initialized. + pub unsafe fn as_ref(&self) -> &T { + &*(self.data.as_ptr() as *const T) + } + } + }; +} + +#[cfg(feature = "timer-item-payload-size-1")] +define_opaque!(1); +#[cfg(feature = "timer-item-payload-size-2")] +define_opaque!(2); +#[cfg(feature = "timer-item-payload-size-4")] +define_opaque!(4); +#[cfg(feature = "timer-item-payload-size-8")] +define_opaque!(8); + /// An item in the timer queue. pub struct TimerQueueItem { /// The next item in the queue. @@ -14,6 +53,10 @@ pub struct TimerQueueItem { /// The time at which this item expires. pub expires_at: Cell, + + /// Some implementation-defined, zero-initialized piece of data. + #[cfg(feature = "_timer-item-payload")] + pub payload: OpaqueData, } unsafe impl Sync for TimerQueueItem {} @@ -23,6 +66,8 @@ impl TimerQueueItem { Self { next: Cell::new(None), expires_at: Cell::new(0), + #[cfg(feature = "_timer-item-payload")] + payload: OpaqueData::new(), } } } -- cgit From fbd0fe06bde7949d3374d10e540291493e088314 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 18:56:42 +0100 Subject: Remove special handling of integrated timer items --- embassy-executor/src/raw/mod.rs | 23 ----------------------- embassy-executor/src/raw/timer_queue.rs | 11 ----------- 2 files changed, 34 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 997db6756..bdd5ff5ae 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -96,29 +96,6 @@ impl TaskRef { &self.header().timer_queue_item } - /// Mark the task as timer-queued. Return whether it should be actually enqueued - /// using `_embassy_time_schedule_wake`. - /// - /// Entering this state prevents the task from being respawned while in a timer queue. - /// - /// Safety: - /// - /// This functions should only be called by the timer queue driver, before - /// enqueueing the timer item. - pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { - self.header().state.timer_enqueue() - } - - /// Unmark the task as timer-queued. - /// - /// Safety: - /// - /// This functions should only be called by the timer queue implementation, after the task has - /// been removed from the timer queue. - pub unsafe fn timer_dequeue(&self) { - self.header().state.timer_dequeue() - } - /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index c4dba18ff..e52453be4 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -71,14 +71,3 @@ impl TimerQueueItem { } } } - -/// The operation to perform after `timer_enqueue` is called. -#[derive(Debug, Copy, Clone, PartialEq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -#[must_use] -pub enum TimerEnqueueOperation { - /// Enqueue the task (or update its expiration time). - Enqueue, - /// The task must not be enqueued in the timer queue. - Ignore, -} -- cgit From c90d048ecb611908f5696b4f57d689bdb254aee6 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 16 Dec 2024 19:03:00 +0100 Subject: Remove TIMER_QUEUED --- embassy-executor/src/raw/state_atomics.rs | 32 ------------------- embassy-executor/src/raw/state_atomics_arm.rs | 36 ++-------------------- embassy-executor/src/raw/state_critical_section.rs | 25 --------------- 3 files changed, 2 insertions(+), 91 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index d7350464f..6f5266bda 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -1,7 +1,5 @@ use core::sync::atomic::{AtomicU32, Ordering}; -use super::timer_queue::TimerEnqueueOperation; - #[derive(Clone, Copy)] pub(crate) struct Token(()); @@ -16,8 +14,6 @@ pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: AtomicU32, @@ -71,32 +67,4 @@ impl State { let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); state & STATE_SPAWNED != 0 } - - /// Mark the task as timer-queued. Return whether it can be enqueued. - #[inline(always)] - pub fn timer_enqueue(&self) -> TimerEnqueueOperation { - if self - .state - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If not started, ignore it - if state & STATE_SPAWNED == 0 { - None - } else { - // Mark it as enqueued - Some(state | STATE_TIMER_QUEUED) - } - }) - .is_ok() - { - TimerEnqueueOperation::Enqueue - } else { - TimerEnqueueOperation::Ignore - } - } - - /// Unmark the task as timer-queued. - #[inline(always)] - pub fn timer_dequeue(&self) { - self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed); - } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index c1e8f69ab..4896b33c5 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -1,8 +1,6 @@ use core::arch::asm; use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; -use super::timer_queue::TimerEnqueueOperation; - #[derive(Clone, Copy)] pub(crate) struct Token(()); @@ -16,7 +14,6 @@ pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { // Must be kept in sync with the layout of `State`! pub(crate) const STATE_SPAWNED: u32 = 1 << 0; pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16; #[repr(C, align(4))] pub(crate) struct State { @@ -24,9 +21,8 @@ pub(crate) struct State { spawned: AtomicBool, /// Task is in the executor run queue run_queued: AtomicBool, - /// Task is in the executor timer queue - timer_queued: AtomicBool, pad: AtomicBool, + pad2: AtomicBool, } impl State { @@ -34,8 +30,8 @@ impl State { Self { spawned: AtomicBool::new(false), run_queued: AtomicBool::new(false), - timer_queued: AtomicBool::new(false), pad: AtomicBool::new(false), + pad2: AtomicBool::new(false), } } @@ -101,32 +97,4 @@ impl State { self.run_queued.store(false, Ordering::Relaxed); r } - - /// Mark the task as timer-queued. Return whether it can be enqueued. - #[inline(always)] - pub fn timer_enqueue(&self) -> TimerEnqueueOperation { - if self - .as_u32() - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If not started, ignore it - if state & STATE_SPAWNED == 0 { - None - } else { - // Mark it as enqueued - Some(state | STATE_TIMER_QUEUED) - } - }) - .is_ok() - { - TimerEnqueueOperation::Enqueue - } else { - TimerEnqueueOperation::Ignore - } - } - - /// Unmark the task as timer-queued. - #[inline(always)] - pub fn timer_dequeue(&self) { - self.timer_queued.store(false, Ordering::Relaxed); - } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 8e570b33c..29b10f6e3 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -3,14 +3,10 @@ use core::cell::Cell; pub(crate) use critical_section::{with as locked, CriticalSection as Token}; use critical_section::{CriticalSection, Mutex}; -use super::timer_queue::TimerEnqueueOperation; - /// Task is spawned (has a future) pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: Mutex>, @@ -81,25 +77,4 @@ impl State { ok }) } - - /// Mark the task as timer-queued. Return whether it can be enqueued. - #[inline(always)] - pub fn timer_enqueue(&self) -> TimerEnqueueOperation { - self.update(|s| { - // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is - // just being spawned, because its executor pointer may still be changing. - if *s & STATE_SPAWNED == STATE_SPAWNED { - *s |= STATE_TIMER_QUEUED; - TimerEnqueueOperation::Enqueue - } else { - TimerEnqueueOperation::Ignore - } - }) - } - - /// Unmark the task as timer-queued. - #[inline(always)] - pub fn timer_dequeue(&self) { - self.update(|s| *s &= !STATE_TIMER_QUEUED); - } } -- cgit From fc25fca00b48630073575d14bcc713912d0b0104 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 17 Dec 2024 13:06:31 +0100 Subject: Remove WakerHack for good. Now that 1.83 xtensa is out, we can remove it unconditionally. --- embassy-executor/src/raw/waker.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 9c70f995a..b7d57c314 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -32,22 +32,11 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { /// /// Panics if the waker is not created by the Embassy executor. pub fn task_from_waker(waker: &Waker) -> TaskRef { - struct WakerHack { - data: *const (), - vtable: &'static RawWakerVTable, - } - - // safety: OK because WakerHack has the same layout as Waker. - // This is not really guaranteed because the structs are `repr(Rust)`, it is - // indeed the case in the current implementation. - // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992 - let hack: &WakerHack = unsafe { core::mem::transmute(waker) }; - // make sure to compare vtable addresses. Doing `==` on the references // will compare the contents, which is slower. - if hack.vtable as *const _ != &VTABLE as *const _ { + if waker.vtable() as *const _ != &VTABLE as *const _ { panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") } // safety: our wakers are always created with `TaskRef::as_ptr` - unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) } + unsafe { TaskRef::from_ptr(waker.data() as *const TaskHeader) } } -- cgit From 7eac184af0b6bf88c43158b9a791d7c169d5bb3c Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 9 Dec 2024 17:11:47 +0100 Subject: Document task states and state transitions --- embassy-executor/src/raw/mod.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index bdd5ff5ae..0ac569946 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -38,6 +38,44 @@ pub use self::waker::task_from_waker; use super::SpawnToken; /// Raw task header for use in task pointers. +/// +/// A task can be in one of the following states: +/// +/// - Not spawned: the task is ready to spawn. +/// - `SPAWNED`: the task is currently spawned and may be running. +/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`. +/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without +/// polling the task's future. +/// +/// A task's complete life cycle is as follows: +/// +/// ```text +/// ┌────────────┐ ┌────────────────────────┐ +/// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│ +/// │ │ │ │ │ +/// │ └─────┬──────┘ └──────▲─────────────────┘ +/// │ 1 │ +/// │ │ ┌────────────┘ +/// │ │ 5 +/// │ ┌─────▼────┴─────────┐ +/// │ │Spawned|Run enqueued│ +/// │ │ │ +/// │ └─────┬▲─────────────┘ +/// │ 2│ +/// │ │3 +/// │ ┌─────▼┴─────┐ +/// └─4┤ Spawned │ +/// │ │ +/// └────────────┘ +/// ``` +/// +/// Transitions: +/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn` +/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` +/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue` +/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` +/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` +/// - 6: Task is dequeued and then ignored via `State::run_dequeue` pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, -- cgit From c6ca46c82529e014aaceb218ad88978c50f0db07 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 26 Nov 2024 00:20:34 +0100 Subject: Set RUN_QUEUED unconditionally --- embassy-executor/src/raw/mod.rs | 3 ++- embassy-executor/src/raw/state_atomics.rs | 15 ++------------- embassy-executor/src/raw/state_atomics_arm.rs | 2 +- embassy-executor/src/raw/state_critical_section.rs | 9 +++------ 4 files changed, 8 insertions(+), 21 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 0ac569946..6503b556f 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -52,7 +52,7 @@ use super::SpawnToken; /// ```text /// ┌────────────┐ ┌────────────────────────┐ /// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│ -/// │ │ │ │ │ +/// │ │ ├7─►│ │ /// │ └─────┬──────┘ └──────▲─────────────────┘ /// │ 1 │ /// │ │ ┌────────────┘ @@ -76,6 +76,7 @@ use super::SpawnToken; /// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` /// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` /// - 6: Task is dequeued and then ignored via `State::run_dequeue` +/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index 6f5266bda..bdd317b53 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -44,19 +44,8 @@ impl State { /// function if the task was successfully marked. #[inline(always)] pub fn run_enqueue(&self, f: impl FnOnce(Token)) { - if self - .state - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - None - } else { - // Mark it as scheduled - Some(state | STATE_RUN_QUEUED) - } - }) - .is_ok() - { + let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel); + if prev & STATE_RUN_QUEUED == 0 { locked(f); } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index 4896b33c5..06bf24343 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -72,7 +72,7 @@ impl State { let state: u32; asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack)); - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { + if state & STATE_RUN_QUEUED != 0 { asm!("clrex", options(nomem, nostack)); return; } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 29b10f6e3..4733af278 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -56,12 +56,9 @@ impl State { pub fn run_enqueue(&self, f: impl FnOnce(Token)) { critical_section::with(|cs| { if self.update_with_cs(cs, |s| { - if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { - false - } else { - *s |= STATE_RUN_QUEUED; - true - } + let ok = *s & STATE_RUN_QUEUED == 0; + *s |= STATE_RUN_QUEUED; + ok }) { f(cs); } -- cgit From 889b419fc40f252726dbdc8a67bc4d27aa5b81f3 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 17 Dec 2024 17:14:59 +0100 Subject: Simplify ARM run_enqueue --- embassy-executor/src/raw/state_atomics_arm.rs | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index 06bf24343..cbda0d89d 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -1,4 +1,3 @@ -use core::arch::asm; use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; #[derive(Clone, Copy)] @@ -67,24 +66,10 @@ impl State { /// function if the task was successfully marked. #[inline(always)] pub fn run_enqueue(&self, f: impl FnOnce(Token)) { - unsafe { - loop { - let state: u32; - asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack)); + let old = self.run_queued.swap(true, Ordering::AcqRel); - if state & STATE_RUN_QUEUED != 0 { - asm!("clrex", options(nomem, nostack)); - return; - } - - let outcome: usize; - let new_state = state | STATE_RUN_QUEUED; - asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack)); - if outcome == 0 { - locked(f); - return; - } - } + if !old { + locked(f); } } -- cgit From edb8f21a741358f7c80b744f008f1e5acc77b429 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sat, 7 Dec 2024 14:45:16 +0100 Subject: Take critical section instead of unsafe --- embassy-executor/src/raw/mod.rs | 9 --------- embassy-executor/src/raw/run_queue_atomics.rs | 11 ++++++++++- embassy-executor/src/raw/run_queue_critical_section.rs | 16 ++++++++++++---- 3 files changed, 22 insertions(+), 14 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6503b556f..c79fdae60 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -411,15 +411,6 @@ impl SyncExecutor { self.run_queue.dequeue_all(|p| { let task = p.header(); - if !task.state.run_dequeue() { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; - } - #[cfg(feature = "trace")] trace::task_exec_begin(self, &p); diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index efdafdff0..aad90d767 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -81,7 +81,16 @@ impl RunQueue { // safety: there are no concurrent accesses to `next` next = unsafe { task.header().run_queue_item.next.get() }; - on_task(task); + let run_task = task.header().state.run_dequeue(); + + if run_task { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + on_task(task); + } } } } diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs index 90f09e8c8..4f1b2855a 100644 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ b/embassy-executor/src/raw/run_queue_critical_section.rs @@ -63,11 +63,19 @@ impl RunQueue { // If the task re-enqueues itself, the `next` pointer will get overwritten. // Therefore, first read the next pointer, and only then process the task. - // safety: we know if the task is enqueued, no one else will touch the `next` pointer. - let cs = unsafe { CriticalSection::new() }; - next = task.header().run_queue_item.next.borrow(cs).get(); + let run_task = critical_section::with(|cs| { + next = task.header().run_queue_item.next.borrow(cs).get(); + task.header().state.run_dequeue(cs) + }); - on_task(task); + if run_task { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + on_task(task); + } } } } -- cgit From 8fd08b1e97533c7526bb4937770060d18bb37410 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 17 Dec 2024 18:05:48 +0100 Subject: Swap poll_fn to allow polling exited tasks --- embassy-executor/src/raw/mod.rs | 17 +++++++++++++++-- embassy-executor/src/raw/run_queue_atomics.rs | 12 ++---------- embassy-executor/src/raw/run_queue_critical_section.rs | 13 +++---------- embassy-executor/src/raw/state_atomics.rs | 5 ++--- embassy-executor/src/raw/state_atomics_arm.rs | 4 +--- embassy-executor/src/raw/state_critical_section.rs | 8 ++------ 6 files changed, 25 insertions(+), 34 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index c79fdae60..242e9c365 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -202,16 +202,29 @@ impl TaskStorage { } } + unsafe fn poll_to_despawn(p: TaskRef) { + // The task's future has already been dropped, we just mark it as `!SPAWNED`. + let this = &*p.as_ptr().cast::>(); + this.raw.state.despawn(); + } + unsafe fn poll(p: TaskRef) { - let this = &*(p.as_ptr() as *const TaskStorage); + let this = &*p.as_ptr().cast::>(); let future = Pin::new_unchecked(this.future.as_mut()); let waker = waker::from_task(p); let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { + waker.wake_by_ref(); + + // As the future has finished and this function will not be called + // again, we can safely drop the future here. this.future.drop_in_place(); - this.raw.state.despawn(); + + // We replace the poll_fn with a despawn function, so that the task is cleaned up + // when the executor polls it next. + this.raw.poll_fn.set(Some(Self::poll_to_despawn)); } Poll::Pending => {} } diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index aad90d767..ce511d79a 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -81,16 +81,8 @@ impl RunQueue { // safety: there are no concurrent accesses to `next` next = unsafe { task.header().run_queue_item.next.get() }; - let run_task = task.header().state.run_dequeue(); - - if run_task { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - on_task(task); - } + task.header().state.run_dequeue(); + on_task(task); } } } diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs index 4f1b2855a..86c4085ed 100644 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ b/embassy-executor/src/raw/run_queue_critical_section.rs @@ -63,19 +63,12 @@ impl RunQueue { // If the task re-enqueues itself, the `next` pointer will get overwritten. // Therefore, first read the next pointer, and only then process the task. - let run_task = critical_section::with(|cs| { + critical_section::with(|cs| { next = task.header().run_queue_item.next.borrow(cs).get(); - task.header().state.run_dequeue(cs) + task.header().state.run_dequeue(cs); }); - if run_task { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - on_task(task); - } + on_task(task); } } } diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index bdd317b53..b6576bfc2 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -52,8 +52,7 @@ impl State { /// Unmark the task as run-queued. Return whether the task is spawned. #[inline(always)] - pub fn run_dequeue(&self) -> bool { - let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - state & STATE_SPAWNED != 0 + pub fn run_dequeue(&self) { + self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index cbda0d89d..b743dcc2c 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -75,11 +75,9 @@ impl State { /// Unmark the task as run-queued. Return whether the task is spawned. #[inline(always)] - pub fn run_dequeue(&self) -> bool { + pub fn run_dequeue(&self) { compiler_fence(Ordering::Release); - let r = self.spawned.load(Ordering::Relaxed); self.run_queued.store(false, Ordering::Relaxed); - r } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 4733af278..6b627ff79 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -67,11 +67,7 @@ impl State { /// Unmark the task as run-queued. Return whether the task is spawned. #[inline(always)] - pub fn run_dequeue(&self) -> bool { - self.update(|s| { - let ok = *s & STATE_SPAWNED != 0; - *s &= !STATE_RUN_QUEUED; - ok - }) + pub fn run_dequeue(&self, cs: CriticalSection<'_>) { + self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED) } } -- cgit From 7d5fbe26c955bae4bd394d1092702bd81f849c9b Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 17 Dec 2024 18:17:36 +0100 Subject: Update state diagram --- embassy-executor/src/raw/mod.rs | 43 ++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 22 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 242e9c365..5df5ca9e1 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -50,33 +50,32 @@ use super::SpawnToken; /// A task's complete life cycle is as follows: /// /// ```text -/// ┌────────────┐ ┌────────────────────────┐ -/// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│ -/// │ │ ├7─►│ │ -/// │ └─────┬──────┘ └──────▲─────────────────┘ -/// │ 1 │ -/// │ │ ┌────────────┘ -/// │ │ 5 -/// │ ┌─────▼────┴─────────┐ -/// │ │Spawned|Run enqueued│ -/// │ │ │ -/// │ └─────┬▲─────────────┘ -/// │ 2│ -/// │ │3 -/// │ ┌─────▼┴─────┐ -/// └─4┤ Spawned │ -/// │ │ -/// └────────────┘ +/// ┌────────────┐ ┌────────────────────────┐ +/// │Not spawned │◄─5┤Not spawned|Run enqueued│ +/// │ ├6─►│ │ +/// └─────┬──────┘ └──────▲─────────────────┘ +/// 1 │ +/// │ ┌────────────┘ +/// │ 4 +/// ┌─────▼────┴─────────┐ +/// │Spawned|Run enqueued│ +/// │ │ +/// └─────┬▲─────────────┘ +/// 2│ +/// │3 +/// ┌─────▼┴─────┐ +/// │ Spawned │ +/// │ │ +/// └────────────┘ /// ``` /// /// Transitions: /// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn` /// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` -/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue` -/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` -/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` -/// - 6: Task is dequeued and then ignored via `State::run_dequeue` -/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` +/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue` +/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` +/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. +/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, -- cgit From a011f487690465f8ae64fd74f4c51a8be3979890 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 17 Dec 2024 18:37:17 +0100 Subject: Make poll_to_despawn non-generic --- embassy-executor/src/raw/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 5df5ca9e1..39d2d73ab 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -161,6 +161,12 @@ pub struct TaskStorage { future: UninitCell, // Valid if STATE_SPAWNED } +unsafe fn poll_to_despawn(p: TaskRef) { + // The task's future has already been dropped, we just mark it as `!SPAWNED`. + let this = p.header(); + this.state.despawn(); +} + impl TaskStorage { const NEW: Self = Self::new(); @@ -201,12 +207,6 @@ impl TaskStorage { } } - unsafe fn poll_to_despawn(p: TaskRef) { - // The task's future has already been dropped, we just mark it as `!SPAWNED`. - let this = &*p.as_ptr().cast::>(); - this.raw.state.despawn(); - } - unsafe fn poll(p: TaskRef) { let this = &*p.as_ptr().cast::>(); @@ -223,7 +223,7 @@ impl TaskStorage { // We replace the poll_fn with a despawn function, so that the task is cleaned up // when the executor polls it next. - this.raw.poll_fn.set(Some(Self::poll_to_despawn)); + this.raw.poll_fn.set(Some(poll_to_despawn)); } Poll::Pending => {} } -- cgit From 2ca374fc9c0d0abe579716d1a7c2dc0724321ee7 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 17 Dec 2024 18:46:32 +0100 Subject: Don't force a wake to despawn --- embassy-executor/src/raw/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 39d2d73ab..4a4ecf603 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -215,8 +215,6 @@ impl TaskStorage { let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { - waker.wake_by_ref(); - // As the future has finished and this function will not be called // again, we can safely drop the future here. this.future.drop_in_place(); @@ -224,6 +222,10 @@ impl TaskStorage { // We replace the poll_fn with a despawn function, so that the task is cleaned up // when the executor polls it next. this.raw.poll_fn.set(Some(poll_to_despawn)); + + // Make sure we despawn last, so that other threads can only spawn the task + // after we're done with it. + this.raw.state.despawn(); } Poll::Pending => {} } -- cgit From 76d8a896bbff612e3d2db27554891c71d28988af Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 17 Dec 2024 18:51:22 +0100 Subject: Make poll_to_despawn a no_op --- embassy-executor/src/raw/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 4a4ecf603..e38a2af66 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -161,10 +161,8 @@ pub struct TaskStorage { future: UninitCell, // Valid if STATE_SPAWNED } -unsafe fn poll_to_despawn(p: TaskRef) { - // The task's future has already been dropped, we just mark it as `!SPAWNED`. - let this = p.header(); - this.state.despawn(); +unsafe fn poll_exited(_p: TaskRef) { + // Nothing to do, the task is already !SPAWNED and dequeued. } impl TaskStorage { @@ -221,7 +219,7 @@ impl TaskStorage { // We replace the poll_fn with a despawn function, so that the task is cleaned up // when the executor polls it next. - this.raw.poll_fn.set(Some(poll_to_despawn)); + this.raw.poll_fn.set(Some(poll_exited)); // Make sure we despawn last, so that other threads can only spawn the task // after we're done with it. -- cgit From 882e2180a4ec7f448ea4ddaccf2a65e6757654c7 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:06:04 +0200 Subject: Add docs, add `task_end` trace point --- embassy-executor/src/raw/mod.rs | 12 +++ embassy-executor/src/raw/trace.rs | 149 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e38a2af66..73f4f00ee 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -138,6 +138,12 @@ impl TaskRef { pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() } + + /// Get the ID for a task + #[cfg(feature = "trace")] + pub fn as_id(self) -> u32 { + self.ptr.as_ptr() as u32 + } } /// Raw storage in which a task can be spawned. @@ -224,6 +230,9 @@ impl TaskStorage { // Make sure we despawn last, so that other threads can only spawn the task // after we're done with it. this.raw.state.despawn(); + + #[cfg(feature = "trace")] + trace::task_end(self, &task); } Poll::Pending => {} } @@ -420,6 +429,9 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { + #[cfg(feature = "trace")] + trace::poll_start(self); + self.run_queue.dequeue_all(|p| { let task = p.header(); diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index b34387b58..57222d60b 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -1,15 +1,156 @@ #![allow(unused)] use crate::raw::{SyncExecutor, TaskRef}; +//! # Tracing +//! +//! The `trace` feature enables a number of callbacks that can be used to track the +//! lifecycle of tasks and/or executors. +//! +//! Callbacks will have one or both of the following IDs passed to them: +//! +//! 1. A `task_id`, a `u32` value unique to a task for the duration of the time it is valid +//! 2. An `executor_id`, a `u32` value unique to an executor for the duration of the time it is +//! valid +//! +//! Today, both `task_id` and `executor_id` are u32s containing the least significant 32 bits of +//! the address of the task or executor, however this is NOT a stable guarantee, and MAY change +//! at any time. +//! +//! IDs are only guaranteed to be unique for the duration of time the item is valid. If a task +//! ends, and is respond, it MAY or MAY NOT have the same ID. For tasks, this time is defined +//! as the time between `_embassy_trace_task_new` and `_embassy_trace_task_end` for a given task. +//! For executors, this time is not defined, but is often "forever" for practical embedded +//! programs. +//! +//! Callbacks can be used by enabling the `trace` feature, and providing implementations of the +//! `extern "Rust"` functions below. All callbacks must be implemented. +//! +//! ## Stability +//! +//! The `trace` interface is considered unstable. Callbacks may change, be added, or be removed +//! in any minor or trivial release. +//! +//! ## Task Tracing lifecycle +//! +//! ```text +//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │(1) │ +//! │ │ +//! ╔════▼════╗ (2) ┌─────────┐ (3) ┌─────────┐ │ +//! │ ║ SPAWNED ║────▶│ WAITING │────▶│ RUNNING │ +//! ╚═════════╝ └─────────┘ └─────────┘ │ +//! │ ▲ ▲ │ │ │ +//! │ (4) │ │(6) │ +//! │ │ └ ─ ─ ┘ │ │ +//! │ │ │ │ +//! │ ┌──────┐ (5) │ │ ┌─────┐ +//! │ IDLE │◀────────────────┘ └─▶│ END │ │ +//! │ └──────┘ └─────┘ +//! ┌──────────────────────┐ │ +//! └ ┤ Task Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! └──────────────────────┘ +//! ``` +//! +//! 1. A task is spawned. `_embassy_trace_task_new` is called +//! 2. A task is enqueued for the first time, `_embassy_trace_task_ready_begin` is called +//! 3. A task is polled, `_embassy_trace_task_exec_begin` is called +//! 4. WHILE a task is polled, the task is re-awoken, and `_embassy_trace_task_ready_begin` is +//! called. The task does not IMMEDIATELY move state, until polling is complete and the +//! RUNNING state is existed. `_embassy_trace_task_exec_end` is called when polling is +//! complete, marking the transition to WAITING +//! 5. Polling is complete, `_embassy_trace_task_exec_end` is called +//! 6. The task has completed, and `_embassy_trace_task_end` is called. +//! +//! ## Executor Tracing lifecycle +//! +//! ```text +//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │(1) │ +//! │ │ +//! ╔═══▼══╗ (2) ┌────────────┐ (3) ┌─────────┐ │ +//! │ ║ IDLE ║──────────▶│ SCHEDULING │──────▶│ POLLING │ +//! ╚══════╝ └────────────┘ └─────────┘ │ +//! │ ▲ │ ▲ │ +//! │ (5) │ │ (4) │ │ +//! │ └──────────────┘ └────────────┘ +//! ┌──────────────────────────┐ │ +//! └ ┤ Executor Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! └──────────────────────────┘ +//! ``` +//! +//! 1. The executor is started (no associated trace) +//! 2. A task on this executor is awoken. `_embassy_trace_task_ready_begin` is called +//! when this occurs, and `_embassy_trace_poll_start` is called when the executor +//! actually begins running. +//! 3. The executor has decided a task to poll. `_embassy_trace_task_exec_begin` is called. +//! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called. +//! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called. + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { + /// This callback is called when the executor begins polling. This will always + /// be paired with a later call to `_embassy_trace_executor_idle`. + /// + /// This marks the EXECUTOR state transition from IDLE -> SCHEDULING. + fn _embassy_trace_poll_start(executor_id: u32); + + /// This callback is called AFTER a task is initialized/allocated, and BEFORE + /// it is enqueued to run for the first time. If the task ends (and does not + /// loop "forever"), there will be a matching call to `_embassy_trace_task_end`. + /// + /// Tasks start life in the SPAWNED state. fn _embassy_trace_task_new(executor_id: u32, task_id: u32); + + /// This callback is called AFTER a task is destructed/freed. This will always + /// have a prior matching call to `_embassy_trace_task_new`. + fn _embassy_trace_task_end(executor_id: u32, task_id: u32); + + /// This callback is called AFTER a task has been dequeued from the runqueue, + /// and BEFORE the task is polled. There will always be a matching call to + /// `_embassy_trace_task_exec_end`. + /// + /// This marks the TASK state transition from WAITING -> RUNNING + /// This marks the EXECUTOR state transition from SCHEDULING -> POLLING fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32); + + /// This callback is called AFTER a task has completed polling. There will + /// always be a matching call to `_embassy_trace_task_exec_begin`. + /// + /// This marks the TASK state transition from either: + /// * RUNNING -> IDLE - if there were no `_embassy_trace_task_ready_begin` events + /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task + /// * RUNNING -> WAITING - if there WAS a `_embassy_trace_task_ready_begin` event + /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task + /// + /// This marks the EXECUTOR state transition from POLLING -> SCHEDULING fn _embassy_trace_task_exec_end(excutor_id: u32, task_id: u32); + + /// This callback is called AFTER the waker for a task is awoken, and BEFORE it + /// is added to the run queue. + /// + /// If the given task is currently RUNNING, this marks no state change, BUT the + /// RUNNING task will then move to the WAITING stage when polling is complete. + /// + /// If the given task is currently IDLE, this marks the TASK state transition + /// from IDLE -> WAITING. fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32); + + /// This callback is called AFTER all dequeued tasks in a single call to poll + /// have been processed. This will always be paired with a call to + /// `_embassy_trace_executor_idle`. + /// + /// This marks the EXECUTOR state transition from fn _embassy_trace_executor_idle(executor_id: u32); } +#[inline] +pub(crate) fn poll_start(executor: &SyncExecutor) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_poll_start(executor as *const _ as u32) + } +} + #[inline] pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[cfg(not(feature = "rtos-trace"))] @@ -21,6 +162,14 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { rtos_trace::trace::task_new(task.as_ptr() as u32); } +#[inline] +pub(crate) fn task_end(executor: &SyncExecutor, task: &TaskRef) { + #[cfg(not(feature = "rtos-trace"))] + unsafe { + _embassy_trace_task_end(executor as *const _ as u32, task.as_ptr() as u32) + } +} + #[inline] pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) { #[cfg(not(feature = "rtos-trace"))] -- cgit From 84cd416bed6a80d0fab4433c26ada9b16495b4c1 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:11:42 +0200 Subject: Finish trailing sentence --- embassy-executor/src/raw/trace.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 57222d60b..efb74b3d2 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -139,7 +139,7 @@ extern "Rust" { /// have been processed. This will always be paired with a call to /// `_embassy_trace_executor_idle`. /// - /// This marks the EXECUTOR state transition from + /// This marks the EXECUTOR state transition from SCHEDULING -> IDLE fn _embassy_trace_executor_idle(executor_id: u32); } -- cgit From 8a8e4500531342001977cdc99d6d107dd84be8e1 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:12:58 +0200 Subject: Reorder doc comments for format reasons --- embassy-executor/src/raw/trace.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index efb74b3d2..0952a9bc0 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -1,6 +1,3 @@ -#![allow(unused)] -use crate::raw::{SyncExecutor, TaskRef}; - //! # Tracing //! //! The `trace` feature enables a number of callbacks that can be used to track the @@ -86,6 +83,9 @@ use crate::raw::{SyncExecutor, TaskRef}; //! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called. //! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called. +#![allow(unused)] +use crate::raw::{SyncExecutor, TaskRef}; + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always -- cgit From c3efb85b85eece9f706d232888e0fb70ee1270e4 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:23:39 +0200 Subject: Fix task_end callback --- embassy-executor/src/raw/mod.rs | 5 ++++- embassy-executor/src/raw/trace.rs | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 73f4f00ee..5b1f33a0e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -219,6 +219,9 @@ impl TaskStorage { let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { + #[cfg(feature = "trace")] + let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed); + // As the future has finished and this function will not be called // again, we can safely drop the future here. this.future.drop_in_place(); @@ -232,7 +235,7 @@ impl TaskStorage { this.raw.state.despawn(); #[cfg(feature = "trace")] - trace::task_end(self, &task); + trace::task_end(exec_ptr, &p); } Poll::Pending => {} } diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 0952a9bc0..56724b0bb 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -84,6 +84,7 @@ //! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called. #![allow(unused)] + use crate::raw::{SyncExecutor, TaskRef}; #[cfg(not(feature = "rtos-trace"))] @@ -163,10 +164,10 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { } #[inline] -pub(crate) fn task_end(executor: &SyncExecutor, task: &TaskRef) { +pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { #[cfg(not(feature = "rtos-trace"))] unsafe { - _embassy_trace_task_end(executor as *const _ as u32, task.as_ptr() as u32) + _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } } -- cgit From 3e25a7be8670f06d577ebf011dc41e3c4752db95 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:27:32 +0200 Subject: Small grammar fixes --- embassy-executor/src/raw/trace.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 56724b0bb..5480e77b9 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -14,7 +14,7 @@ //! at any time. //! //! IDs are only guaranteed to be unique for the duration of time the item is valid. If a task -//! ends, and is respond, it MAY or MAY NOT have the same ID. For tasks, this time is defined +//! ends, and is re-spawned, it MAY or MAY NOT have the same ID. For tasks, this valid time is defined //! as the time between `_embassy_trace_task_new` and `_embassy_trace_task_end` for a given task. //! For executors, this time is not defined, but is often "forever" for practical embedded //! programs. -- cgit From 2e474b7df2466364964156e119b37bd8445169d7 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:30:36 +0200 Subject: Remove notes about stability --- embassy-executor/src/raw/trace.rs | 5 ----- 1 file changed, 5 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 5480e77b9..8fcf84658 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -22,11 +22,6 @@ //! Callbacks can be used by enabling the `trace` feature, and providing implementations of the //! `extern "Rust"` functions below. All callbacks must be implemented. //! -//! ## Stability -//! -//! The `trace` interface is considered unstable. Callbacks may change, be added, or be removed -//! in any minor or trivial release. -//! //! ## Task Tracing lifecycle //! //! ```text -- cgit From ef3c1b87d16d17e312e869546fae00f895f68989 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 14:35:21 +0200 Subject: Minor docs improvements --- embassy-executor/src/raw/trace.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 8fcf84658..aba519c8f 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -33,7 +33,7 @@ //! ╚═════════╝ └─────────┘ └─────────┘ │ //! │ ▲ ▲ │ │ │ //! │ (4) │ │(6) │ -//! │ │ └ ─ ─ ┘ │ │ +//! │ │(7) └ ─ ─ ┘ │ │ //! │ │ │ │ //! │ ┌──────┐ (5) │ │ ┌─────┐ //! │ IDLE │◀────────────────┘ └─▶│ END │ │ @@ -43,7 +43,7 @@ //! └──────────────────────┘ //! ``` //! -//! 1. A task is spawned. `_embassy_trace_task_new` is called +//! 1. A task is spawned, `_embassy_trace_task_new` is called //! 2. A task is enqueued for the first time, `_embassy_trace_task_ready_begin` is called //! 3. A task is polled, `_embassy_trace_task_exec_begin` is called //! 4. WHILE a task is polled, the task is re-awoken, and `_embassy_trace_task_ready_begin` is @@ -51,7 +51,8 @@ //! RUNNING state is existed. `_embassy_trace_task_exec_end` is called when polling is //! complete, marking the transition to WAITING //! 5. Polling is complete, `_embassy_trace_task_exec_end` is called -//! 6. The task has completed, and `_embassy_trace_task_end` is called. +//! 6. The task has completed, and `_embassy_trace_task_end` is called +//! 7. A task is awoken, `_embassy_trace_task_ready_begin` is called //! //! ## Executor Tracing lifecycle //! @@ -73,10 +74,10 @@ //! 1. The executor is started (no associated trace) //! 2. A task on this executor is awoken. `_embassy_trace_task_ready_begin` is called //! when this occurs, and `_embassy_trace_poll_start` is called when the executor -//! actually begins running. -//! 3. The executor has decided a task to poll. `_embassy_trace_task_exec_begin` is called. -//! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called. -//! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called. +//! actually begins running +//! 3. The executor has decided a task to poll. `_embassy_trace_task_exec_begin` is called +//! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called +//! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called #![allow(unused)] @@ -129,6 +130,9 @@ extern "Rust" { /// /// If the given task is currently IDLE, this marks the TASK state transition /// from IDLE -> WAITING. + /// + /// NOTE: This may be called from an interrupt, outside the context of the current + /// task or executor. fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32); /// This callback is called AFTER all dequeued tasks in a single call to poll -- cgit From b7e1b1ca941b3b92dede55f2a5b65fcad788ce42 Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 3 Apr 2025 10:35:00 +0200 Subject: Fix some intra-doc links --- embassy-executor/src/raw/waker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index b7d57c314..d0d7b003d 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs @@ -26,7 +26,7 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { /// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps /// avoid dynamic dispatch. /// -/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). +/// You can use the returned task pointer to wake the task with [`wake_task`]. /// /// # Panics /// -- cgit From 3f87ce6f50865b04a56b50476aa6cc7a0f92f0e9 Mon Sep 17 00:00:00 2001 From: Kaspar Schleiser Date: Fri, 19 Jan 2024 11:03:00 +0100 Subject: embassy-executor: introduce `Executor::id()` --- embassy-executor/src/raw/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 5b1f33a0e..56faa911d 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -555,6 +555,11 @@ impl Executor { pub fn spawner(&'static self) -> super::Spawner { super::Spawner::new(self) } + + /// Get a unique ID for this Executor. + pub fn id(&'static self) -> usize { + &self.inner as *const SyncExecutor as usize + } } /// Wake a task by `TaskRef`. -- cgit From 66a02a4f8dceaa7c7e4f30df64b1011943f68423 Mon Sep 17 00:00:00 2001 From: Phil-hacker Date: Tue, 8 Apr 2025 22:10:17 +0200 Subject: fix the avr executor --- embassy-executor/src/raw/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 56faa911d..d14fe9281 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -28,9 +28,11 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::{AtomicPtr, Ordering}; +use core::sync::atomic::Ordering; use core::task::{Context, Poll}; +use portable_atomic::AtomicPtr; + use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; -- cgit From ce40a39a874bbf34d0438afce189bf2fca5bb1e9 Mon Sep 17 00:00:00 2001 From: Phil-hacker Date: Tue, 8 Apr 2025 22:15:05 +0200 Subject: fix compilation on anything not using the feature `arch-avr` --- embassy-executor/src/raw/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index d14fe9281..41dade75e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -29,8 +29,10 @@ use core::mem; use core::pin::Pin; use core::ptr::NonNull; use core::sync::atomic::Ordering; +#[cfg(not(feature = "arch-avr"))] +use core::sync::atomic::AtomicPtr; use core::task::{Context, Poll}; - +#[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; use self::run_queue::{RunQueue, RunQueueItem}; -- cgit From 3a85ecebef61017069da9f1e1d48d20c3889b76b Mon Sep 17 00:00:00 2001 From: Phil-hacker Date: Tue, 8 Apr 2025 22:15:53 +0200 Subject: fix rustfmt --- embassy-executor/src/raw/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 41dade75e..88d839e07 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -28,10 +28,11 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::Ordering; #[cfg(not(feature = "arch-avr"))] use core::sync::atomic::AtomicPtr; +use core::sync::atomic::Ordering; use core::task::{Context, Poll}; + #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -- cgit From f8f9c38b2e2527c6e3b8396e06fbb18fc1ce2a1c Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 29 Apr 2025 08:49:19 -0400 Subject: add a task registry to tracing infrastructure --- embassy-executor/src/raw/mod.rs | 2 +- embassy-executor/src/raw/trace.rs | 139 +++++++++++++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 88d839e07..35c82557c 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -18,7 +18,7 @@ mod state; pub mod timer_queue; #[cfg(feature = "trace")] -mod trace; +pub mod trace; pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index aba519c8f..bdd3e4706 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -83,6 +83,129 @@ use crate::raw::{SyncExecutor, TaskRef}; +use core::cell::UnsafeCell; +use core::sync::atomic::{AtomicUsize, Ordering}; +use rtos_trace::TaskInfo; + +const MAX_TASKS: usize = 1000; + +/// Represents a task being tracked in the task registry. +/// +/// Contains the task's unique identifier and optional name. +#[derive(Clone)] +pub struct TrackedTask { + task_id: u32, + name: Option<&'static str>, +} + +/// A thread-safe registry for tracking tasks in the system. +/// +/// This registry maintains a list of active tasks with their IDs and optional names. +/// It supports registering, unregistering, and querying information about tasks. +/// The registry has a fixed capacity of `MAX_TASKS`. +pub struct TaskRegistry { + tasks: [UnsafeCell>; MAX_TASKS], + count: AtomicUsize, +} + +impl TaskRegistry { + /// Creates a new empty task registry. + /// + /// This initializes a registry that can track up to `MAX_TASKS` tasks. + pub const fn new() -> Self { + const EMPTY: UnsafeCell> = UnsafeCell::new(None); + Self { + tasks: [EMPTY; MAX_TASKS], + count: AtomicUsize::new(0), + } + } + + /// Registers a new task in the registry. + /// + /// # Arguments + /// * `task_id` - Unique identifier for the task + /// * `name` - Optional name for the task + /// + /// # Note + /// If the registry is full, the task will not be registered. + pub fn register(&self, task_id: u32, name: Option<&'static str>) { + let count = self.count.load(Ordering::Relaxed); + if count < MAX_TASKS { + for i in 0..MAX_TASKS { + unsafe { + let slot = &self.tasks[i]; + let slot_ref = &mut *slot.get(); + if slot_ref.is_none() { + *slot_ref = Some(TrackedTask { task_id, name }); + self.count.fetch_add(1, Ordering::Relaxed); + break; + } + } + } + } + } + + /// Removes a task from the registry. + /// + /// # Arguments + /// * `task_id` - Unique identifier of the task to remove + pub fn unregister(&self, task_id: u32) { + for i in 0..MAX_TASKS { + unsafe { + let slot = &self.tasks[i]; + let slot_ref = &mut *slot.get(); + if let Some(task) = slot_ref { + if task.task_id == task_id { + *slot_ref = None; + self.count.fetch_sub(1, Ordering::Relaxed); + break; + } + } + } + } + } + + /// Returns an iterator over all registered tasks. + /// + /// This allows accessing information about all tasks currently in the registry. + pub fn get_all_tasks(&self) -> impl Iterator + '_ { + (0..MAX_TASKS).filter_map(move |i| unsafe { + let slot = &self.tasks[i]; + (*slot.get()).clone() + }) + } + + /// Retrieves the name of a task with the given ID. + /// + /// # Arguments + /// * `task_id` - Unique identifier of the task + /// + /// # Returns + /// The name of the task if found and named, or `None` otherwise + pub fn get_task_name(&self, task_id: u32) -> Option<&'static str> { + for i in 0..MAX_TASKS { + unsafe { + let slot = &self.tasks[i]; + let slot_ref = &*slot.get(); + if let Some(task) = slot_ref { + if task.task_id == task_id { + return task.name; + } + } + } + } + None + } +} + +unsafe impl Sync for TaskRegistry {} +unsafe impl Send for TaskRegistry {} + +/// Global task registry instance used for tracking all tasks in the system. +/// +/// This provides a centralized registry accessible from anywhere in the application. +pub static TASK_REGISTRY: TaskRegistry = TaskRegistry::new(); + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always @@ -153,6 +276,8 @@ pub(crate) fn poll_start(executor: &SyncExecutor) { #[inline] pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { + let task_id = task.as_ptr() as u32; + #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) @@ -164,10 +289,14 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { + let task_id = task.as_ptr() as u32; + #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } + + TASK_REGISTRY.unregister(task_id); } #[inline] @@ -213,7 +342,15 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { - // We don't know what tasks exist, so we can't send them. + for task in TASK_REGISTRY.get_all_tasks() { + let info = rtos_trace::TaskInfo { + name: TASK_REGISTRY.get_task_name(task.task_id).unwrap(), + priority: 0, + stack_base: 0, + stack_size: 0, + }; + rtos_trace::trace::task_send_info(task.task_id, info); + } } fn time() -> u64 { const fn gcd(a: u64, b: u64) -> u64 { -- cgit From 05d52decb2a98ad5111962b71e667c692e68c23e Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:04:21 -0400 Subject: add name to TaskHeader --- embassy-executor/src/raw/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 35c82557c..2928848b8 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -89,6 +89,8 @@ pub(crate) struct TaskHeader { /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. pub(crate) timer_queue_item: timer_queue::TimerQueueItem, + #[cfg(feature = "trace")] + pub(crate) name: Option<&'static str>, } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -190,6 +192,8 @@ impl TaskStorage { poll_fn: SyncUnsafeCell::new(None), timer_queue_item: timer_queue::TimerQueueItem::new(), + #[cfg(feature = "trace")] + name: None, }, future: UninitCell::uninit(), } -- cgit From 61f0f889a0dc89410218be725a43dcd967e53003 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:23:39 -0400 Subject: add get/set for task name --- embassy-executor/src/raw/mod.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 2928848b8..3f4e06350 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -151,6 +151,21 @@ impl TaskRef { pub fn as_id(self) -> u32 { self.ptr.as_ptr() as u32 } + + /// Get the name for a task + #[cfg(feature = "trace")] + pub fn name(&self) -> Option<&'static str> { + self.header().name + } + + /// Set the name for a task + #[cfg(feature = "trace")] + pub fn set_name(&self, name: Option<&'static str>) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).name = name; + } + } } /// Raw storage in which a task can be spawned. -- cgit From 54b3fb6e7a12598e0f6299c18a333060d6a3f9c7 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:27:19 -0400 Subject: remove name from TaskRegistry and retrieve from task header instead --- embassy-executor/src/raw/trace.rs | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index bdd3e4706..28be79cee 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,7 +81,7 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskRef}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; @@ -95,7 +95,6 @@ const MAX_TASKS: usize = 1000; #[derive(Clone)] pub struct TrackedTask { task_id: u32, - name: Option<&'static str>, } /// A thread-safe registry for tracking tasks in the system. @@ -128,7 +127,7 @@ impl TaskRegistry { /// /// # Note /// If the registry is full, the task will not be registered. - pub fn register(&self, task_id: u32, name: Option<&'static str>) { + pub fn register(&self, task_id: u32) { let count = self.count.load(Ordering::Relaxed); if count < MAX_TASKS { for i in 0..MAX_TASKS { @@ -136,7 +135,7 @@ impl TaskRegistry { let slot = &self.tasks[i]; let slot_ref = &mut *slot.get(); if slot_ref.is_none() { - *slot_ref = Some(TrackedTask { task_id, name }); + *slot_ref = Some(TrackedTask { task_id }); self.count.fetch_add(1, Ordering::Relaxed); break; } @@ -174,28 +173,6 @@ impl TaskRegistry { (*slot.get()).clone() }) } - - /// Retrieves the name of a task with the given ID. - /// - /// # Arguments - /// * `task_id` - Unique identifier of the task - /// - /// # Returns - /// The name of the task if found and named, or `None` otherwise - pub fn get_task_name(&self, task_id: u32) -> Option<&'static str> { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &*slot.get(); - if let Some(task) = slot_ref { - if task.task_id == task_id { - return task.name; - } - } - } - } - None - } } unsafe impl Sync for TaskRegistry {} @@ -343,8 +320,10 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { for task in TASK_REGISTRY.get_all_tasks() { + let task_ref = unsafe { TaskRef::from_ptr(task.task_id as *const TaskHeader) }; + let name = task_ref.name().unwrap_or("unnamed\0"); let info = rtos_trace::TaskInfo { - name: TASK_REGISTRY.get_task_name(task.task_id).unwrap(), + name, priority: 0, stack_base: 0, stack_size: 0, -- cgit From f4e0cbb7cc476b171acd0b21448e9bbc848a616d Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:59:27 -0400 Subject: add ID field to TaskHeader --- embassy-executor/src/raw/mod.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 3f4e06350..075d8a254 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -91,6 +91,8 @@ pub(crate) struct TaskHeader { pub(crate) timer_queue_item: timer_queue::TimerQueueItem, #[cfg(feature = "trace")] pub(crate) name: Option<&'static str>, + #[cfg(feature = "trace")] + pub(crate) id: u32, } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -166,6 +168,21 @@ impl TaskRef { (*header_ptr).name = name; } } + + /// Get the ID for a task + #[cfg(feature = "trace")] + pub fn id(&self) -> u32 { + self.header().id + } + + /// Set the ID for a task + #[cfg(feature = "trace")] + pub fn set_id(&self, id: u32) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).id = id; + } + } } /// Raw storage in which a task can be spawned. @@ -209,6 +226,8 @@ impl TaskStorage { timer_queue_item: timer_queue::TimerQueueItem::new(), #[cfg(feature = "trace")] name: None, + #[cfg(feature = "trace")] + id: 0, }, future: UninitCell::uninit(), } -- cgit From 6085916714b79a888e117a2d7223e78c9a5de9d3 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 11:47:04 -0400 Subject: use an intrusive linked list in TaskHeader to track tasks --- embassy-executor/src/raw/mod.rs | 76 ++++++++++++++- embassy-executor/src/raw/trace.rs | 190 ++++++++++++++++++-------------------- 2 files changed, 163 insertions(+), 103 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 075d8a254..b4adfe01b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -93,6 +93,78 @@ pub(crate) struct TaskHeader { pub(crate) name: Option<&'static str>, #[cfg(feature = "trace")] pub(crate) id: u32, + #[cfg(feature = "trace")] + all_tasks_next: AtomicPtr, +} + +/// A thread-safe tracker for all tasks in the system +/// +/// This struct uses an intrusive linked list approach to track all tasks +/// without additional memory allocations. It maintains a global list of +/// tasks that can be traversed to find all currently existing tasks. +#[cfg(feature = "trace")] +pub struct TaskTracker { + head: AtomicPtr, +} + +#[cfg(feature = "trace")] +impl TaskTracker { + /// Creates a new empty task tracker + /// + /// Initializes a tracker with no tasks in its list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } + + /// Adds a task to the tracker + /// + /// This method inserts a task at the head of the intrusive linked list. + /// The operation is thread-safe and lock-free, using atomic operations + /// to ensure consistency even when called from different contexts. + /// + /// # Arguments + /// * `task` - The task reference to add to the tracker + pub fn add(&self, task: TaskRef) { + let task_ptr = task.as_ptr() as *mut TaskHeader; + + loop { + let current_head = self.head.load(Ordering::Acquire); + unsafe { + (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); + } + + if self + .head + .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + /// Performs an operation on each task in the tracker + /// + /// This method traverses the entire list of tasks and calls the provided + /// function for each task. This allows inspecting or processing all tasks + /// in the system without modifying the tracker's structure. + /// + /// # Arguments + /// * `f` - A function to call for each task in the tracker + pub fn for_each(&self, mut f: F) + where + F: FnMut(TaskRef), + { + let mut current = self.head.load(Ordering::Acquire); + while !current.is_null() { + let task = unsafe { TaskRef::from_ptr(current) }; + f(task); + + current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; + } + } } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -173,7 +245,7 @@ impl TaskRef { #[cfg(feature = "trace")] pub fn id(&self) -> u32 { self.header().id - } + } /// Set the ID for a task #[cfg(feature = "trace")] @@ -228,6 +300,8 @@ impl TaskStorage { name: None, #[cfg(feature = "trace")] id: 0, + #[cfg(feature = "trace")] + all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), }, future: UninitCell::uninit(), } diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 28be79cee..81c8a0024 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,107 +81,19 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; use rtos_trace::TaskInfo; -const MAX_TASKS: usize = 1000; - -/// Represents a task being tracked in the task registry. -/// -/// Contains the task's unique identifier and optional name. -#[derive(Clone)] -pub struct TrackedTask { - task_id: u32, -} - -/// A thread-safe registry for tracking tasks in the system. -/// -/// This registry maintains a list of active tasks with their IDs and optional names. -/// It supports registering, unregistering, and querying information about tasks. -/// The registry has a fixed capacity of `MAX_TASKS`. -pub struct TaskRegistry { - tasks: [UnsafeCell>; MAX_TASKS], - count: AtomicUsize, -} - -impl TaskRegistry { - /// Creates a new empty task registry. - /// - /// This initializes a registry that can track up to `MAX_TASKS` tasks. - pub const fn new() -> Self { - const EMPTY: UnsafeCell> = UnsafeCell::new(None); - Self { - tasks: [EMPTY; MAX_TASKS], - count: AtomicUsize::new(0), - } - } - - /// Registers a new task in the registry. - /// - /// # Arguments - /// * `task_id` - Unique identifier for the task - /// * `name` - Optional name for the task - /// - /// # Note - /// If the registry is full, the task will not be registered. - pub fn register(&self, task_id: u32) { - let count = self.count.load(Ordering::Relaxed); - if count < MAX_TASKS { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &mut *slot.get(); - if slot_ref.is_none() { - *slot_ref = Some(TrackedTask { task_id }); - self.count.fetch_add(1, Ordering::Relaxed); - break; - } - } - } - } - } - - /// Removes a task from the registry. - /// - /// # Arguments - /// * `task_id` - Unique identifier of the task to remove - pub fn unregister(&self, task_id: u32) { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &mut *slot.get(); - if let Some(task) = slot_ref { - if task.task_id == task_id { - *slot_ref = None; - self.count.fetch_sub(1, Ordering::Relaxed); - break; - } - } - } - } - } - - /// Returns an iterator over all registered tasks. - /// - /// This allows accessing information about all tasks currently in the registry. - pub fn get_all_tasks(&self) -> impl Iterator + '_ { - (0..MAX_TASKS).filter_map(move |i| unsafe { - let slot = &self.tasks[i]; - (*slot.get()).clone() - }) - } -} - -unsafe impl Sync for TaskRegistry {} -unsafe impl Send for TaskRegistry {} - -/// Global task registry instance used for tracking all tasks in the system. +/// Global task tracker instance /// -/// This provides a centralized registry accessible from anywhere in the application. -pub static TASK_REGISTRY: TaskRegistry = TaskRegistry::new(); +/// This static provides access to the global task tracker which maintains +/// a list of all tasks in the system. It's automatically updated by the +/// task lifecycle hooks in the trace module. +#[cfg(feature = "trace")] +pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); #[cfg(not(feature = "rtos-trace"))] extern "Rust" { @@ -262,6 +174,9 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[cfg(feature = "rtos-trace")] rtos_trace::trace::task_new(task.as_ptr() as u32); + + #[cfg(feature = "rtos-trace")] + TASK_TRACKER.add(*task); } #[inline] @@ -272,8 +187,6 @@ pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } - - TASK_REGISTRY.unregister(task_id); } #[inline] @@ -316,20 +229,93 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { rtos_trace::trace::system_idle(); } +/// Returns an iterator over all active tasks in the system +/// +/// This function provides a convenient way to iterate over all tasks +/// that are currently tracked in the system. The returned iterator +/// yields each task in the global task tracker. +/// +/// # Returns +/// An iterator that yields `TaskRef` items for each task +#[cfg(feature = "trace")] +pub fn get_all_active_tasks() -> impl Iterator + 'static { + struct TaskIterator<'a> { + tracker: &'a TaskTracker, + current: *mut TaskHeader, + } + + impl<'a> Iterator for TaskIterator<'a> { + type Item = TaskRef; + + fn next(&mut self) -> Option { + if self.current.is_null() { + return None; + } + + let task = unsafe { TaskRef::from_ptr(self.current) }; + self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) }; + + Some(task) + } + } + + TaskIterator { + tracker: &TASK_TRACKER, + current: TASK_TRACKER.head.load(Ordering::Acquire), + } +} + +/// Get all active tasks, filtered by a predicate function +#[cfg(feature = "trace")] +pub fn filter_active_tasks(predicate: F) -> impl Iterator + 'static +where + F: Fn(&TaskRef) -> bool + 'static, +{ + get_all_active_tasks().filter(move |task| predicate(task)) +} + +/// Count the number of active tasks +#[cfg(feature = "trace")] +pub fn count_active_tasks() -> usize { + let mut count = 0; + TASK_TRACKER.for_each(|_| count += 1); + count +} + +/// Perform an action on each active task +#[cfg(feature = "trace")] +pub fn with_all_active_tasks(f: F) +where + F: FnMut(TaskRef), +{ + TASK_TRACKER.for_each(f); +} + +/// Get tasks by name +#[cfg(feature = "trace")] +pub fn get_tasks_by_name(name: &'static str) -> impl Iterator + 'static { + filter_active_tasks(move |task| task.name() == Some(name)) +} + +/// Get tasks by ID +#[cfg(feature = "trace")] +pub fn get_task_by_id(id: u32) -> Option { + filter_active_tasks(move |task| task.id() == id).next() +} + #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { - for task in TASK_REGISTRY.get_all_tasks() { - let task_ref = unsafe { TaskRef::from_ptr(task.task_id as *const TaskHeader) }; - let name = task_ref.name().unwrap_or("unnamed\0"); + with_all_active_tasks(|task| { + let name = task.name().unwrap_or("unnamed task\0"); let info = rtos_trace::TaskInfo { name, priority: 0, stack_base: 0, stack_size: 0, }; - rtos_trace::trace::task_send_info(task.task_id, info); - } + rtos_trace::trace::task_send_info(task.as_id(), info); + }); } fn time() -> u64 { const fn gcd(a: u64, b: u64) -> u64 { -- cgit From f2429c212e77969bacfe726cd293bf0ab5903664 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 11:54:45 -0400 Subject: fix whitespace in the imports in trace.rs --- embassy-executor/src/raw/trace.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 81c8a0024..c0599d2c7 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,12 +81,13 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; - use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; + use rtos_trace::TaskInfo; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; + /// Global task tracker instance /// /// This static provides access to the global task tracker which maintains -- cgit From b3e13cc6de744a241521cff20725706a1e40ef25 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 10:58:07 -0400 Subject: make tracing API functions internal --- embassy-executor/src/raw/trace.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index c0599d2c7..503d806bd 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -239,7 +239,7 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { /// # Returns /// An iterator that yields `TaskRef` items for each task #[cfg(feature = "trace")] -pub fn get_all_active_tasks() -> impl Iterator + 'static { +fn get_all_active_tasks() -> impl Iterator + 'static { struct TaskIterator<'a> { tracker: &'a TaskTracker, current: *mut TaskHeader, @@ -285,7 +285,7 @@ pub fn count_active_tasks() -> usize { /// Perform an action on each active task #[cfg(feature = "trace")] -pub fn with_all_active_tasks(f: F) +fn with_all_active_tasks(f: F) where F: FnMut(TaskRef), { -- cgit From 8f18810ec61ce235a4c895413936e0216ed22c4f Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 10:58:27 -0400 Subject: remove unused tracing API --- embassy-executor/src/raw/trace.rs | 29 ----------------------------- 1 file changed, 29 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 503d806bd..fec3a4834 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -266,23 +266,6 @@ fn get_all_active_tasks() -> impl Iterator + 'static { } } -/// Get all active tasks, filtered by a predicate function -#[cfg(feature = "trace")] -pub fn filter_active_tasks(predicate: F) -> impl Iterator + 'static -where - F: Fn(&TaskRef) -> bool + 'static, -{ - get_all_active_tasks().filter(move |task| predicate(task)) -} - -/// Count the number of active tasks -#[cfg(feature = "trace")] -pub fn count_active_tasks() -> usize { - let mut count = 0; - TASK_TRACKER.for_each(|_| count += 1); - count -} - /// Perform an action on each active task #[cfg(feature = "trace")] fn with_all_active_tasks(f: F) @@ -292,18 +275,6 @@ where TASK_TRACKER.for_each(f); } -/// Get tasks by name -#[cfg(feature = "trace")] -pub fn get_tasks_by_name(name: &'static str) -> impl Iterator + 'static { - filter_active_tasks(move |task| task.name() == Some(name)) -} - -/// Get tasks by ID -#[cfg(feature = "trace")] -pub fn get_task_by_id(id: u32) -> Option { - filter_active_tasks(move |task| task.id() == id).next() -} - #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { -- cgit From 8a8deb704fdd58cecf463f033cd3c3d1cc3534c7 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 11:20:22 -0400 Subject: move spawn_named into trace.rs through TraceExt trait --- embassy-executor/src/raw/trace.rs | 43 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index fec3a4834..eb960f721 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -87,6 +87,49 @@ use core::sync::atomic::{AtomicUsize, Ordering}; use rtos_trace::TaskInfo; use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; +use crate::spawner::{SpawnError, SpawnToken, Spawner}; + +/// Extension trait adding tracing capabilities to the Spawner +pub trait TraceExt { + /// Spawns a new task with a specified name. + /// + /// # Arguments + /// * `name` - Static string name to associate with the task + /// * `token` - Token representing the task to spawn + /// + /// # Returns + /// Result indicating whether the spawn was successful + fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError>; +} + +#[cfg(feature = "trace")] +impl TraceExt for Spawner { + fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; + core::mem::forget(token); + + match task { + Some(task) => { + task.set_name(Some(name)); + let task_id = task.as_ptr() as u32; + task.set_id(task_id); + + unsafe { self.executor.spawn(task) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } +} + +/// When trace is disabled, spawn_named falls back to regular spawn. +/// This maintains API compatibility while optimizing out the name parameter. +#[cfg(not(feature = "trace"))] +impl TraceExt for Spawner { + fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + self.spawn(token) + } +} /// Global task tracker instance /// -- cgit From 462d04c6d5a0fc6072cf9bdb0faa60da74ff46d2 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 13:34:32 -0400 Subject: move TaskTracker to trace --- embassy-executor/src/raw/mod.rs | 70 ------------------------------------ embassy-executor/src/raw/trace.rs | 74 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 72 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index b4adfe01b..882e4605b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -97,76 +97,6 @@ pub(crate) struct TaskHeader { all_tasks_next: AtomicPtr, } -/// A thread-safe tracker for all tasks in the system -/// -/// This struct uses an intrusive linked list approach to track all tasks -/// without additional memory allocations. It maintains a global list of -/// tasks that can be traversed to find all currently existing tasks. -#[cfg(feature = "trace")] -pub struct TaskTracker { - head: AtomicPtr, -} - -#[cfg(feature = "trace")] -impl TaskTracker { - /// Creates a new empty task tracker - /// - /// Initializes a tracker with no tasks in its list. - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(core::ptr::null_mut()), - } - } - - /// Adds a task to the tracker - /// - /// This method inserts a task at the head of the intrusive linked list. - /// The operation is thread-safe and lock-free, using atomic operations - /// to ensure consistency even when called from different contexts. - /// - /// # Arguments - /// * `task` - The task reference to add to the tracker - pub fn add(&self, task: TaskRef) { - let task_ptr = task.as_ptr() as *mut TaskHeader; - - loop { - let current_head = self.head.load(Ordering::Acquire); - unsafe { - (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); - } - - if self - .head - .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - break; - } - } - } - - /// Performs an operation on each task in the tracker - /// - /// This method traverses the entire list of tasks and calls the provided - /// function for each task. This allows inspecting or processing all tasks - /// in the system without modifying the tracker's structure. - /// - /// # Arguments - /// * `f` - A function to call for each task in the tracker - pub fn for_each(&self, mut f: F) - where - F: FnMut(TaskRef), - { - let mut current = self.head.load(Ordering::Acquire); - while !current.is_null() { - let task = unsafe { TaskRef::from_ptr(current) }; - f(task); - - current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; - } - } -} - /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Clone, Copy, PartialEq)] pub struct TaskRef { diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index eb960f721..b59da0526 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -82,11 +82,11 @@ #![allow(unused)] use core::cell::UnsafeCell; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use rtos_trace::TaskInfo; -use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// Extension trait adding tracing capabilities to the Spawner @@ -139,6 +139,76 @@ impl TraceExt for Spawner { #[cfg(feature = "trace")] pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); +/// A thread-safe tracker for all tasks in the system +/// +/// This struct uses an intrusive linked list approach to track all tasks +/// without additional memory allocations. It maintains a global list of +/// tasks that can be traversed to find all currently existing tasks. +#[cfg(feature = "trace")] +pub struct TaskTracker { + head: AtomicPtr, +} + +#[cfg(feature = "trace")] +impl TaskTracker { + /// Creates a new empty task tracker + /// + /// Initializes a tracker with no tasks in its list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } + + /// Adds a task to the tracker + /// + /// This method inserts a task at the head of the intrusive linked list. + /// The operation is thread-safe and lock-free, using atomic operations + /// to ensure consistency even when called from different contexts. + /// + /// # Arguments + /// * `task` - The task reference to add to the tracker + pub fn add(&self, task: TaskRef) { + let task_ptr = task.as_ptr() as *mut TaskHeader; + + loop { + let current_head = self.head.load(Ordering::Acquire); + unsafe { + (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); + } + + if self + .head + .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + /// Performs an operation on each task in the tracker + /// + /// This method traverses the entire list of tasks and calls the provided + /// function for each task. This allows inspecting or processing all tasks + /// in the system without modifying the tracker's structure. + /// + /// # Arguments + /// * `f` - A function to call for each task in the tracker + pub fn for_each(&self, mut f: F) + where + F: FnMut(TaskRef), + { + let mut current = self.head.load(Ordering::Acquire); + while !current.is_null() { + let task = unsafe { TaskRef::from_ptr(current) }; + f(task); + + current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; + } + } +} + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always -- cgit From 3b873bb6bb51b9bdac9272b5ec629a6ac54a89f7 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 13:40:32 -0400 Subject: implement TaskRefTrace for tracing-only fields in TaskRef --- embassy-executor/src/raw/mod.rs | 36 --------------------------- embassy-executor/src/raw/trace.rs | 52 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 36 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 882e4605b..e7a27035a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -149,42 +149,6 @@ impl TaskRef { pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() } - - /// Get the ID for a task - #[cfg(feature = "trace")] - pub fn as_id(self) -> u32 { - self.ptr.as_ptr() as u32 - } - - /// Get the name for a task - #[cfg(feature = "trace")] - pub fn name(&self) -> Option<&'static str> { - self.header().name - } - - /// Set the name for a task - #[cfg(feature = "trace")] - pub fn set_name(&self, name: Option<&'static str>) { - unsafe { - let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; - (*header_ptr).name = name; - } - } - - /// Get the ID for a task - #[cfg(feature = "trace")] - pub fn id(&self) -> u32 { - self.header().id - } - - /// Set the ID for a task - #[cfg(feature = "trace")] - pub fn set_id(&self, id: u32) { - unsafe { - let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; - (*header_ptr).id = id; - } - } } /// Raw storage in which a task can be spawned. diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index b59da0526..b30f23468 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -209,6 +209,58 @@ impl TaskTracker { } } +/// Extension trait for `TaskRef` that provides tracing functionality. +/// +/// This trait is only available when the `trace` feature is enabled. +/// It extends `TaskRef` with methods for accessing and modifying task identifiers +/// and names, which are useful for debugging, logging, and performance analysis. +#[cfg(feature = "trace")] +pub trait TaskRefTrace { + /// Get the ID for a task + fn as_id(self) -> u32; + + /// Get the name for a task + fn name(&self) -> Option<&'static str>; + + /// Set the name for a task + fn set_name(&self, name: Option<&'static str>); + + /// Get the ID for a task + fn id(&self) -> u32; + + /// Set the ID for a task + fn set_id(&self, id: u32); +} + +#[cfg(feature = "trace")] +impl TaskRefTrace for TaskRef { + fn as_id(self) -> u32 { + self.ptr.as_ptr() as u32 + } + + fn name(&self) -> Option<&'static str> { + self.header().name + } + + fn set_name(&self, name: Option<&'static str>) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).name = name; + } + } + + fn id(&self) -> u32 { + self.header().id + } + + fn set_id(&self, id: u32) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).id = id; + } + } +} + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always -- cgit From 194a3044acb5cd9691ced78596b9fd81e6884667 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 13:41:23 -0400 Subject: remove unused task_id --- embassy-executor/src/raw/trace.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index b30f23468..04e9d234f 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -216,9 +216,6 @@ impl TaskTracker { /// and names, which are useful for debugging, logging, and performance analysis. #[cfg(feature = "trace")] pub trait TaskRefTrace { - /// Get the ID for a task - fn as_id(self) -> u32; - /// Get the name for a task fn name(&self) -> Option<&'static str>; @@ -234,10 +231,6 @@ pub trait TaskRefTrace { #[cfg(feature = "trace")] impl TaskRefTrace for TaskRef { - fn as_id(self) -> u32 { - self.ptr.as_ptr() as u32 - } - fn name(&self) -> Option<&'static str> { self.header().name } @@ -331,8 +324,6 @@ pub(crate) fn poll_start(executor: &SyncExecutor) { #[inline] pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { - let task_id = task.as_ptr() as u32; - #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) @@ -347,8 +338,6 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { - let task_id = task.as_ptr() as u32; - #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) @@ -451,7 +440,7 @@ impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { stack_base: 0, stack_size: 0, }; - rtos_trace::trace::task_send_info(task.as_id(), info); + rtos_trace::trace::task_send_info(task.id(), info); }); } fn time() -> u64 { -- cgit From e968c4763694d676cca6f1bd30949619dd12e962 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 14:03:03 -0400 Subject: update TraceExt trait name for Spawner --- embassy-executor/src/raw/trace.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 04e9d234f..f55033530 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -90,7 +90,7 @@ use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// Extension trait adding tracing capabilities to the Spawner -pub trait TraceExt { +pub trait SpawnerTraceExt { /// Spawns a new task with a specified name. /// /// # Arguments @@ -103,7 +103,7 @@ pub trait TraceExt { } #[cfg(feature = "trace")] -impl TraceExt for Spawner { +impl SpawnerTraceExt for Spawner { fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { let task = token.raw_task; core::mem::forget(token); @@ -125,7 +125,7 @@ impl TraceExt for Spawner { /// When trace is disabled, spawn_named falls back to regular spawn. /// This maintains API compatibility while optimizing out the name parameter. #[cfg(not(feature = "trace"))] -impl TraceExt for Spawner { +impl SpawnerTraceExt for Spawner { fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { self.spawn(token) } -- cgit From dfaab013ebaaa4a19c06f2eb00821712ff13cf7a Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 14:35:43 -0400 Subject: move SpawnerTraceExt back into Spawner --- embassy-executor/src/raw/trace.rs | 42 --------------------------------------- 1 file changed, 42 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index f55033530..593f7b0ba 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -89,48 +89,6 @@ use rtos_trace::TaskInfo; use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use crate::spawner::{SpawnError, SpawnToken, Spawner}; -/// Extension trait adding tracing capabilities to the Spawner -pub trait SpawnerTraceExt { - /// Spawns a new task with a specified name. - /// - /// # Arguments - /// * `name` - Static string name to associate with the task - /// * `token` - Token representing the task to spawn - /// - /// # Returns - /// Result indicating whether the spawn was successful - fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError>; -} - -#[cfg(feature = "trace")] -impl SpawnerTraceExt for Spawner { - fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - let task = token.raw_task; - core::mem::forget(token); - - match task { - Some(task) => { - task.set_name(Some(name)); - let task_id = task.as_ptr() as u32; - task.set_id(task_id); - - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } -} - -/// When trace is disabled, spawn_named falls back to regular spawn. -/// This maintains API compatibility while optimizing out the name parameter. -#[cfg(not(feature = "trace"))] -impl SpawnerTraceExt for Spawner { - fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - self.spawn(token) - } -} - /// Global task tracker instance /// /// This static provides access to the global task tracker which maintains -- cgit From 3ffa2e4f3f9ecbca8637ae1603194a63d55b4396 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 16:30:06 -0400 Subject: remove unnecessary trace flags --- embassy-executor/src/raw/trace.rs | 7 ------- 1 file changed, 7 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 593f7b0ba..6c9cfda25 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -94,7 +94,6 @@ use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// This static provides access to the global task tracker which maintains /// a list of all tasks in the system. It's automatically updated by the /// task lifecycle hooks in the trace module. -#[cfg(feature = "trace")] pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); /// A thread-safe tracker for all tasks in the system @@ -102,12 +101,10 @@ pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); /// This struct uses an intrusive linked list approach to track all tasks /// without additional memory allocations. It maintains a global list of /// tasks that can be traversed to find all currently existing tasks. -#[cfg(feature = "trace")] pub struct TaskTracker { head: AtomicPtr, } -#[cfg(feature = "trace")] impl TaskTracker { /// Creates a new empty task tracker /// @@ -172,7 +169,6 @@ impl TaskTracker { /// This trait is only available when the `trace` feature is enabled. /// It extends `TaskRef` with methods for accessing and modifying task identifiers /// and names, which are useful for debugging, logging, and performance analysis. -#[cfg(feature = "trace")] pub trait TaskRefTrace { /// Get the name for a task fn name(&self) -> Option<&'static str>; @@ -187,7 +183,6 @@ pub trait TaskRefTrace { fn set_id(&self, id: u32); } -#[cfg(feature = "trace")] impl TaskRefTrace for TaskRef { fn name(&self) -> Option<&'static str> { self.header().name @@ -350,7 +345,6 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { /// /// # Returns /// An iterator that yields `TaskRef` items for each task -#[cfg(feature = "trace")] fn get_all_active_tasks() -> impl Iterator + 'static { struct TaskIterator<'a> { tracker: &'a TaskTracker, @@ -379,7 +373,6 @@ fn get_all_active_tasks() -> impl Iterator + 'static { } /// Perform an action on each active task -#[cfg(feature = "trace")] fn with_all_active_tasks(f: F) where F: FnMut(TaskRef), -- cgit From 0f9a7a057fb7dfb2358acec9068fece82c7c7a89 Mon Sep 17 00:00:00 2001 From: Johan Anderholm Date: Sat, 30 Dec 2023 11:54:16 +0100 Subject: executor: Make state implementations and their conditions match Use u8 for state_atomics and state_critical_section since that is all that is needed. Change arm condition to "32" since that is what is used and required. --- embassy-executor/src/raw/mod.rs | 2 +- embassy-executor/src/raw/state_atomics.rs | 10 +++++----- embassy-executor/src/raw/state_critical_section.rs | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e7a27035a..913da2e25 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -11,7 +11,7 @@ #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; -#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] +#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")] #[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] mod state; diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index b6576bfc2..e813548ae 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -1,4 +1,4 @@ -use core::sync::atomic::{AtomicU32, Ordering}; +use core::sync::atomic::{AtomicU8, Ordering}; #[derive(Clone, Copy)] pub(crate) struct Token(()); @@ -11,18 +11,18 @@ pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { } /// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +pub(crate) const STATE_SPAWNED: u8 = 1 << 0; /// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1; pub(crate) struct State { - state: AtomicU32, + state: AtomicU8, } impl State { pub const fn new() -> State { Self { - state: AtomicU32::new(0), + state: AtomicU8::new(0), } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 6b627ff79..ec08f2f58 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -4,12 +4,12 @@ pub(crate) use critical_section::{with as locked, CriticalSection as Token}; use critical_section::{CriticalSection, Mutex}; /// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +pub(crate) const STATE_SPAWNED: u8 = 1 << 0; /// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1; pub(crate) struct State { - state: Mutex>, + state: Mutex>, } impl State { @@ -19,11 +19,11 @@ impl State { } } - fn update(&self, f: impl FnOnce(&mut u32) -> R) -> R { + fn update(&self, f: impl FnOnce(&mut u8) -> R) -> R { critical_section::with(|cs| self.update_with_cs(cs, f)) } - fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R { + fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u8) -> R) -> R { let s = self.state.borrow(cs); let mut val = s.get(); let r = f(&mut val); -- cgit