From f8f9c38b2e2527c6e3b8396e06fbb18fc1ce2a1c Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 29 Apr 2025 08:49:19 -0400 Subject: add a task registry to tracing infrastructure --- embassy-executor/src/raw/mod.rs | 2 +- embassy-executor/src/raw/trace.rs | 139 +++++++++++++++++++++++++++++++++++++- embassy-executor/src/spawner.rs | 27 ++++++++ 3 files changed, 166 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 88d839e07..35c82557c 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -18,7 +18,7 @@ mod state; pub mod timer_queue; #[cfg(feature = "trace")] -mod trace; +pub mod trace; pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index aba519c8f..bdd3e4706 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -83,6 +83,129 @@ use crate::raw::{SyncExecutor, TaskRef}; +use core::cell::UnsafeCell; +use core::sync::atomic::{AtomicUsize, Ordering}; +use rtos_trace::TaskInfo; + +const MAX_TASKS: usize = 1000; + +/// Represents a task being tracked in the task registry. +/// +/// Contains the task's unique identifier and optional name. +#[derive(Clone)] +pub struct TrackedTask { + task_id: u32, + name: Option<&'static str>, +} + +/// A thread-safe registry for tracking tasks in the system. +/// +/// This registry maintains a list of active tasks with their IDs and optional names. +/// It supports registering, unregistering, and querying information about tasks. +/// The registry has a fixed capacity of `MAX_TASKS`. +pub struct TaskRegistry { + tasks: [UnsafeCell>; MAX_TASKS], + count: AtomicUsize, +} + +impl TaskRegistry { + /// Creates a new empty task registry. + /// + /// This initializes a registry that can track up to `MAX_TASKS` tasks. + pub const fn new() -> Self { + const EMPTY: UnsafeCell> = UnsafeCell::new(None); + Self { + tasks: [EMPTY; MAX_TASKS], + count: AtomicUsize::new(0), + } + } + + /// Registers a new task in the registry. + /// + /// # Arguments + /// * `task_id` - Unique identifier for the task + /// * `name` - Optional name for the task + /// + /// # Note + /// If the registry is full, the task will not be registered. + pub fn register(&self, task_id: u32, name: Option<&'static str>) { + let count = self.count.load(Ordering::Relaxed); + if count < MAX_TASKS { + for i in 0..MAX_TASKS { + unsafe { + let slot = &self.tasks[i]; + let slot_ref = &mut *slot.get(); + if slot_ref.is_none() { + *slot_ref = Some(TrackedTask { task_id, name }); + self.count.fetch_add(1, Ordering::Relaxed); + break; + } + } + } + } + } + + /// Removes a task from the registry. + /// + /// # Arguments + /// * `task_id` - Unique identifier of the task to remove + pub fn unregister(&self, task_id: u32) { + for i in 0..MAX_TASKS { + unsafe { + let slot = &self.tasks[i]; + let slot_ref = &mut *slot.get(); + if let Some(task) = slot_ref { + if task.task_id == task_id { + *slot_ref = None; + self.count.fetch_sub(1, Ordering::Relaxed); + break; + } + } + } + } + } + + /// Returns an iterator over all registered tasks. + /// + /// This allows accessing information about all tasks currently in the registry. + pub fn get_all_tasks(&self) -> impl Iterator + '_ { + (0..MAX_TASKS).filter_map(move |i| unsafe { + let slot = &self.tasks[i]; + (*slot.get()).clone() + }) + } + + /// Retrieves the name of a task with the given ID. + /// + /// # Arguments + /// * `task_id` - Unique identifier of the task + /// + /// # Returns + /// The name of the task if found and named, or `None` otherwise + pub fn get_task_name(&self, task_id: u32) -> Option<&'static str> { + for i in 0..MAX_TASKS { + unsafe { + let slot = &self.tasks[i]; + let slot_ref = &*slot.get(); + if let Some(task) = slot_ref { + if task.task_id == task_id { + return task.name; + } + } + } + } + None + } +} + +unsafe impl Sync for TaskRegistry {} +unsafe impl Send for TaskRegistry {} + +/// Global task registry instance used for tracking all tasks in the system. +/// +/// This provides a centralized registry accessible from anywhere in the application. +pub static TASK_REGISTRY: TaskRegistry = TaskRegistry::new(); + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always @@ -153,6 +276,8 @@ pub(crate) fn poll_start(executor: &SyncExecutor) { #[inline] pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { + let task_id = task.as_ptr() as u32; + #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) @@ -164,10 +289,14 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { + let task_id = task.as_ptr() as u32; + #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } + + TASK_REGISTRY.unregister(task_id); } #[inline] @@ -213,7 +342,15 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { - // We don't know what tasks exist, so we can't send them. + for task in TASK_REGISTRY.get_all_tasks() { + let info = rtos_trace::TaskInfo { + name: TASK_REGISTRY.get_task_name(task.task_id).unwrap(), + priority: 0, + stack_base: 0, + stack_size: 0, + }; + rtos_trace::trace::task_send_info(task.task_id, info); + } } fn time() -> u64 { const fn gcd(a: u64, b: u64) -> u64 { diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index ff243081c..ea754341b 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -5,6 +5,8 @@ use core::sync::atomic::Ordering; use core::task::Poll; use super::raw; +#[cfg(feature = "rtos-trace")] +use super::raw::trace::TASK_REGISTRY; /// Token to spawn a newly-created task in an executor. /// @@ -154,6 +156,31 @@ impl Spawner { } } + /// Spawns a new task with a specified name. + /// + /// # Arguments + /// * `name` - Static string name to associate with the task + /// * `token` - Token representing the task to spawn + /// + /// # Returns + /// Result indicating whether the spawn was successful + #[cfg(feature = "rtos-trace")] + pub fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; + mem::forget(token); + + match task { + Some(task) => { + let task_id = task.as_ptr() as u32; + TASK_REGISTRY.register(task_id, Some(name)); + + unsafe { self.executor.spawn(task) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } + // Used by the `embassy_executor_macros::main!` macro to throw an error when spawn // fails. This is here to allow conditional use of `defmt::unwrap!` // without introducing a `defmt` feature in the `embassy_executor_macros` package, -- cgit From 032898adf5848da237e4bf55b8c06c2ff73cae7c Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Mon, 5 May 2025 12:14:14 -0400 Subject: add a stub implementation for spawn_named When rtos-trace is not enabled, spawn_named will use spawn instead --- embassy-executor/src/spawner.rs | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index ea754341b..f87700be6 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -181,6 +181,13 @@ impl Spawner { } } + /// When rtos-trace is disabled, spawn_named falls back to regular spawn. +/// This maintains API compatibility while optimizing out the name parameter. + #[cfg(not(feature = "rtos-trace"))] + pub fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + self.spawn(token) + } + // Used by the `embassy_executor_macros::main!` macro to throw an error when spawn // fails. This is here to allow conditional use of `defmt::unwrap!` // without introducing a `defmt` feature in the `embassy_executor_macros` package, -- cgit From bbffd2b3f9f27dd9c3ae3f66ac88bcd1ee1dcb93 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Mon, 5 May 2025 12:17:03 -0400 Subject: whitespace in the documentation --- embassy-executor/src/spawner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index f87700be6..4fc4312b9 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -182,7 +182,7 @@ impl Spawner { } /// When rtos-trace is disabled, spawn_named falls back to regular spawn. -/// This maintains API compatibility while optimizing out the name parameter. + /// This maintains API compatibility while optimizing out the name parameter. #[cfg(not(feature = "rtos-trace"))] pub fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { self.spawn(token) -- cgit From 05d52decb2a98ad5111962b71e667c692e68c23e Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:04:21 -0400 Subject: add name to TaskHeader --- embassy-executor/src/raw/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 35c82557c..2928848b8 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -89,6 +89,8 @@ pub(crate) struct TaskHeader { /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. pub(crate) timer_queue_item: timer_queue::TimerQueueItem, + #[cfg(feature = "trace")] + pub(crate) name: Option<&'static str>, } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -190,6 +192,8 @@ impl TaskStorage { poll_fn: SyncUnsafeCell::new(None), timer_queue_item: timer_queue::TimerQueueItem::new(), + #[cfg(feature = "trace")] + name: None, }, future: UninitCell::uninit(), } -- cgit From 61f0f889a0dc89410218be725a43dcd967e53003 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:23:39 -0400 Subject: add get/set for task name --- embassy-executor/src/raw/mod.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 2928848b8..3f4e06350 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -151,6 +151,21 @@ impl TaskRef { pub fn as_id(self) -> u32 { self.ptr.as_ptr() as u32 } + + /// Get the name for a task + #[cfg(feature = "trace")] + pub fn name(&self) -> Option<&'static str> { + self.header().name + } + + /// Set the name for a task + #[cfg(feature = "trace")] + pub fn set_name(&self, name: Option<&'static str>) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).name = name; + } + } } /// Raw storage in which a task can be spawned. -- cgit From 54b3fb6e7a12598e0f6299c18a333060d6a3f9c7 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:27:19 -0400 Subject: remove name from TaskRegistry and retrieve from task header instead --- embassy-executor/src/raw/trace.rs | 33 ++++++--------------------------- embassy-executor/src/spawner.rs | 3 ++- 2 files changed, 8 insertions(+), 28 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index bdd3e4706..28be79cee 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,7 +81,7 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskRef}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; @@ -95,7 +95,6 @@ const MAX_TASKS: usize = 1000; #[derive(Clone)] pub struct TrackedTask { task_id: u32, - name: Option<&'static str>, } /// A thread-safe registry for tracking tasks in the system. @@ -128,7 +127,7 @@ impl TaskRegistry { /// /// # Note /// If the registry is full, the task will not be registered. - pub fn register(&self, task_id: u32, name: Option<&'static str>) { + pub fn register(&self, task_id: u32) { let count = self.count.load(Ordering::Relaxed); if count < MAX_TASKS { for i in 0..MAX_TASKS { @@ -136,7 +135,7 @@ impl TaskRegistry { let slot = &self.tasks[i]; let slot_ref = &mut *slot.get(); if slot_ref.is_none() { - *slot_ref = Some(TrackedTask { task_id, name }); + *slot_ref = Some(TrackedTask { task_id }); self.count.fetch_add(1, Ordering::Relaxed); break; } @@ -174,28 +173,6 @@ impl TaskRegistry { (*slot.get()).clone() }) } - - /// Retrieves the name of a task with the given ID. - /// - /// # Arguments - /// * `task_id` - Unique identifier of the task - /// - /// # Returns - /// The name of the task if found and named, or `None` otherwise - pub fn get_task_name(&self, task_id: u32) -> Option<&'static str> { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &*slot.get(); - if let Some(task) = slot_ref { - if task.task_id == task_id { - return task.name; - } - } - } - } - None - } } unsafe impl Sync for TaskRegistry {} @@ -343,8 +320,10 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { for task in TASK_REGISTRY.get_all_tasks() { + let task_ref = unsafe { TaskRef::from_ptr(task.task_id as *const TaskHeader) }; + let name = task_ref.name().unwrap_or("unnamed\0"); let info = rtos_trace::TaskInfo { - name: TASK_REGISTRY.get_task_name(task.task_id).unwrap(), + name, priority: 0, stack_base: 0, stack_size: 0, diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 4fc4312b9..40202299f 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -171,8 +171,9 @@ impl Spawner { match task { Some(task) => { + task.set_name(Some(name)); let task_id = task.as_ptr() as u32; - TASK_REGISTRY.register(task_id, Some(name)); + TASK_REGISTRY.register(task_id); unsafe { self.executor.spawn(task) }; Ok(()) -- cgit From f4e0cbb7cc476b171acd0b21448e9bbc848a616d Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 09:59:27 -0400 Subject: add ID field to TaskHeader --- embassy-executor/src/raw/mod.rs | 19 +++++++++++++++++++ embassy-executor/src/spawner.rs | 1 + 2 files changed, 20 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 3f4e06350..075d8a254 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -91,6 +91,8 @@ pub(crate) struct TaskHeader { pub(crate) timer_queue_item: timer_queue::TimerQueueItem, #[cfg(feature = "trace")] pub(crate) name: Option<&'static str>, + #[cfg(feature = "trace")] + pub(crate) id: u32, } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -166,6 +168,21 @@ impl TaskRef { (*header_ptr).name = name; } } + + /// Get the ID for a task + #[cfg(feature = "trace")] + pub fn id(&self) -> u32 { + self.header().id + } + + /// Set the ID for a task + #[cfg(feature = "trace")] + pub fn set_id(&self, id: u32) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).id = id; + } + } } /// Raw storage in which a task can be spawned. @@ -209,6 +226,8 @@ impl TaskStorage { timer_queue_item: timer_queue::TimerQueueItem::new(), #[cfg(feature = "trace")] name: None, + #[cfg(feature = "trace")] + id: 0, }, future: UninitCell::uninit(), } diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 40202299f..7f907346d 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -174,6 +174,7 @@ impl Spawner { task.set_name(Some(name)); let task_id = task.as_ptr() as u32; TASK_REGISTRY.register(task_id); + task.set_id(task_id); unsafe { self.executor.spawn(task) }; Ok(()) -- cgit From 6085916714b79a888e117a2d7223e78c9a5de9d3 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 11:47:04 -0400 Subject: use an intrusive linked list in TaskHeader to track tasks --- embassy-executor/src/raw/mod.rs | 76 ++++++++++++++- embassy-executor/src/raw/trace.rs | 190 ++++++++++++++++++-------------------- embassy-executor/src/spawner.rs | 3 - 3 files changed, 163 insertions(+), 106 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 075d8a254..b4adfe01b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -93,6 +93,78 @@ pub(crate) struct TaskHeader { pub(crate) name: Option<&'static str>, #[cfg(feature = "trace")] pub(crate) id: u32, + #[cfg(feature = "trace")] + all_tasks_next: AtomicPtr, +} + +/// A thread-safe tracker for all tasks in the system +/// +/// This struct uses an intrusive linked list approach to track all tasks +/// without additional memory allocations. It maintains a global list of +/// tasks that can be traversed to find all currently existing tasks. +#[cfg(feature = "trace")] +pub struct TaskTracker { + head: AtomicPtr, +} + +#[cfg(feature = "trace")] +impl TaskTracker { + /// Creates a new empty task tracker + /// + /// Initializes a tracker with no tasks in its list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } + + /// Adds a task to the tracker + /// + /// This method inserts a task at the head of the intrusive linked list. + /// The operation is thread-safe and lock-free, using atomic operations + /// to ensure consistency even when called from different contexts. + /// + /// # Arguments + /// * `task` - The task reference to add to the tracker + pub fn add(&self, task: TaskRef) { + let task_ptr = task.as_ptr() as *mut TaskHeader; + + loop { + let current_head = self.head.load(Ordering::Acquire); + unsafe { + (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); + } + + if self + .head + .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + /// Performs an operation on each task in the tracker + /// + /// This method traverses the entire list of tasks and calls the provided + /// function for each task. This allows inspecting or processing all tasks + /// in the system without modifying the tracker's structure. + /// + /// # Arguments + /// * `f` - A function to call for each task in the tracker + pub fn for_each(&self, mut f: F) + where + F: FnMut(TaskRef), + { + let mut current = self.head.load(Ordering::Acquire); + while !current.is_null() { + let task = unsafe { TaskRef::from_ptr(current) }; + f(task); + + current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; + } + } } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. @@ -173,7 +245,7 @@ impl TaskRef { #[cfg(feature = "trace")] pub fn id(&self) -> u32 { self.header().id - } + } /// Set the ID for a task #[cfg(feature = "trace")] @@ -228,6 +300,8 @@ impl TaskStorage { name: None, #[cfg(feature = "trace")] id: 0, + #[cfg(feature = "trace")] + all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), }, future: UninitCell::uninit(), } diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 28be79cee..81c8a0024 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,107 +81,19 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; use rtos_trace::TaskInfo; -const MAX_TASKS: usize = 1000; - -/// Represents a task being tracked in the task registry. -/// -/// Contains the task's unique identifier and optional name. -#[derive(Clone)] -pub struct TrackedTask { - task_id: u32, -} - -/// A thread-safe registry for tracking tasks in the system. -/// -/// This registry maintains a list of active tasks with their IDs and optional names. -/// It supports registering, unregistering, and querying information about tasks. -/// The registry has a fixed capacity of `MAX_TASKS`. -pub struct TaskRegistry { - tasks: [UnsafeCell>; MAX_TASKS], - count: AtomicUsize, -} - -impl TaskRegistry { - /// Creates a new empty task registry. - /// - /// This initializes a registry that can track up to `MAX_TASKS` tasks. - pub const fn new() -> Self { - const EMPTY: UnsafeCell> = UnsafeCell::new(None); - Self { - tasks: [EMPTY; MAX_TASKS], - count: AtomicUsize::new(0), - } - } - - /// Registers a new task in the registry. - /// - /// # Arguments - /// * `task_id` - Unique identifier for the task - /// * `name` - Optional name for the task - /// - /// # Note - /// If the registry is full, the task will not be registered. - pub fn register(&self, task_id: u32) { - let count = self.count.load(Ordering::Relaxed); - if count < MAX_TASKS { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &mut *slot.get(); - if slot_ref.is_none() { - *slot_ref = Some(TrackedTask { task_id }); - self.count.fetch_add(1, Ordering::Relaxed); - break; - } - } - } - } - } - - /// Removes a task from the registry. - /// - /// # Arguments - /// * `task_id` - Unique identifier of the task to remove - pub fn unregister(&self, task_id: u32) { - for i in 0..MAX_TASKS { - unsafe { - let slot = &self.tasks[i]; - let slot_ref = &mut *slot.get(); - if let Some(task) = slot_ref { - if task.task_id == task_id { - *slot_ref = None; - self.count.fetch_sub(1, Ordering::Relaxed); - break; - } - } - } - } - } - - /// Returns an iterator over all registered tasks. - /// - /// This allows accessing information about all tasks currently in the registry. - pub fn get_all_tasks(&self) -> impl Iterator + '_ { - (0..MAX_TASKS).filter_map(move |i| unsafe { - let slot = &self.tasks[i]; - (*slot.get()).clone() - }) - } -} - -unsafe impl Sync for TaskRegistry {} -unsafe impl Send for TaskRegistry {} - -/// Global task registry instance used for tracking all tasks in the system. +/// Global task tracker instance /// -/// This provides a centralized registry accessible from anywhere in the application. -pub static TASK_REGISTRY: TaskRegistry = TaskRegistry::new(); +/// This static provides access to the global task tracker which maintains +/// a list of all tasks in the system. It's automatically updated by the +/// task lifecycle hooks in the trace module. +#[cfg(feature = "trace")] +pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); #[cfg(not(feature = "rtos-trace"))] extern "Rust" { @@ -262,6 +174,9 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[cfg(feature = "rtos-trace")] rtos_trace::trace::task_new(task.as_ptr() as u32); + + #[cfg(feature = "rtos-trace")] + TASK_TRACKER.add(*task); } #[inline] @@ -272,8 +187,6 @@ pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } - - TASK_REGISTRY.unregister(task_id); } #[inline] @@ -316,20 +229,93 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { rtos_trace::trace::system_idle(); } +/// Returns an iterator over all active tasks in the system +/// +/// This function provides a convenient way to iterate over all tasks +/// that are currently tracked in the system. The returned iterator +/// yields each task in the global task tracker. +/// +/// # Returns +/// An iterator that yields `TaskRef` items for each task +#[cfg(feature = "trace")] +pub fn get_all_active_tasks() -> impl Iterator + 'static { + struct TaskIterator<'a> { + tracker: &'a TaskTracker, + current: *mut TaskHeader, + } + + impl<'a> Iterator for TaskIterator<'a> { + type Item = TaskRef; + + fn next(&mut self) -> Option { + if self.current.is_null() { + return None; + } + + let task = unsafe { TaskRef::from_ptr(self.current) }; + self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) }; + + Some(task) + } + } + + TaskIterator { + tracker: &TASK_TRACKER, + current: TASK_TRACKER.head.load(Ordering::Acquire), + } +} + +/// Get all active tasks, filtered by a predicate function +#[cfg(feature = "trace")] +pub fn filter_active_tasks(predicate: F) -> impl Iterator + 'static +where + F: Fn(&TaskRef) -> bool + 'static, +{ + get_all_active_tasks().filter(move |task| predicate(task)) +} + +/// Count the number of active tasks +#[cfg(feature = "trace")] +pub fn count_active_tasks() -> usize { + let mut count = 0; + TASK_TRACKER.for_each(|_| count += 1); + count +} + +/// Perform an action on each active task +#[cfg(feature = "trace")] +pub fn with_all_active_tasks(f: F) +where + F: FnMut(TaskRef), +{ + TASK_TRACKER.for_each(f); +} + +/// Get tasks by name +#[cfg(feature = "trace")] +pub fn get_tasks_by_name(name: &'static str) -> impl Iterator + 'static { + filter_active_tasks(move |task| task.name() == Some(name)) +} + +/// Get tasks by ID +#[cfg(feature = "trace")] +pub fn get_task_by_id(id: u32) -> Option { + filter_active_tasks(move |task| task.id() == id).next() +} + #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { - for task in TASK_REGISTRY.get_all_tasks() { - let task_ref = unsafe { TaskRef::from_ptr(task.task_id as *const TaskHeader) }; - let name = task_ref.name().unwrap_or("unnamed\0"); + with_all_active_tasks(|task| { + let name = task.name().unwrap_or("unnamed task\0"); let info = rtos_trace::TaskInfo { name, priority: 0, stack_base: 0, stack_size: 0, }; - rtos_trace::trace::task_send_info(task.task_id, info); - } + rtos_trace::trace::task_send_info(task.as_id(), info); + }); } fn time() -> u64 { const fn gcd(a: u64, b: u64) -> u64 { diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 7f907346d..5e42f01bf 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -5,8 +5,6 @@ use core::sync::atomic::Ordering; use core::task::Poll; use super::raw; -#[cfg(feature = "rtos-trace")] -use super::raw::trace::TASK_REGISTRY; /// Token to spawn a newly-created task in an executor. /// @@ -173,7 +171,6 @@ impl Spawner { Some(task) => { task.set_name(Some(name)); let task_id = task.as_ptr() as u32; - TASK_REGISTRY.register(task_id); task.set_id(task_id); unsafe { self.executor.spawn(task) }; -- cgit From f2429c212e77969bacfe726cd293bf0ab5903664 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Tue, 6 May 2025 11:54:45 -0400 Subject: fix whitespace in the imports in trace.rs --- embassy-executor/src/raw/trace.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 81c8a0024..c0599d2c7 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -81,12 +81,13 @@ #![allow(unused)] -use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; - use core::cell::UnsafeCell; use core::sync::atomic::{AtomicUsize, Ordering}; + use rtos_trace::TaskInfo; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; + /// Global task tracker instance /// /// This static provides access to the global task tracker which maintains -- cgit From b3e13cc6de744a241521cff20725706a1e40ef25 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 10:58:07 -0400 Subject: make tracing API functions internal --- embassy-executor/src/raw/trace.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index c0599d2c7..503d806bd 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -239,7 +239,7 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { /// # Returns /// An iterator that yields `TaskRef` items for each task #[cfg(feature = "trace")] -pub fn get_all_active_tasks() -> impl Iterator + 'static { +fn get_all_active_tasks() -> impl Iterator + 'static { struct TaskIterator<'a> { tracker: &'a TaskTracker, current: *mut TaskHeader, @@ -285,7 +285,7 @@ pub fn count_active_tasks() -> usize { /// Perform an action on each active task #[cfg(feature = "trace")] -pub fn with_all_active_tasks(f: F) +fn with_all_active_tasks(f: F) where F: FnMut(TaskRef), { -- cgit From 8f18810ec61ce235a4c895413936e0216ed22c4f Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 10:58:27 -0400 Subject: remove unused tracing API --- embassy-executor/src/raw/trace.rs | 29 ----------------------------- 1 file changed, 29 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 503d806bd..fec3a4834 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -266,23 +266,6 @@ fn get_all_active_tasks() -> impl Iterator + 'static { } } -/// Get all active tasks, filtered by a predicate function -#[cfg(feature = "trace")] -pub fn filter_active_tasks(predicate: F) -> impl Iterator + 'static -where - F: Fn(&TaskRef) -> bool + 'static, -{ - get_all_active_tasks().filter(move |task| predicate(task)) -} - -/// Count the number of active tasks -#[cfg(feature = "trace")] -pub fn count_active_tasks() -> usize { - let mut count = 0; - TASK_TRACKER.for_each(|_| count += 1); - count -} - /// Perform an action on each active task #[cfg(feature = "trace")] fn with_all_active_tasks(f: F) @@ -292,18 +275,6 @@ where TASK_TRACKER.for_each(f); } -/// Get tasks by name -#[cfg(feature = "trace")] -pub fn get_tasks_by_name(name: &'static str) -> impl Iterator + 'static { - filter_active_tasks(move |task| task.name() == Some(name)) -} - -/// Get tasks by ID -#[cfg(feature = "trace")] -pub fn get_task_by_id(id: u32) -> Option { - filter_active_tasks(move |task| task.id() == id).next() -} - #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { -- cgit From 56b5e35c60743d65aacee753d1db391c3cbeae16 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 10:58:59 -0400 Subject: change rtos-trace feature flag on tracing API to trace feature flag --- embassy-executor/src/spawner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 5e42f01bf..a0d246616 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -162,7 +162,7 @@ impl Spawner { /// /// # Returns /// Result indicating whether the spawn was successful - #[cfg(feature = "rtos-trace")] + #[cfg(feature = "trace")] pub fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { let task = token.raw_task; mem::forget(token); @@ -182,7 +182,7 @@ impl Spawner { /// When rtos-trace is disabled, spawn_named falls back to regular spawn. /// This maintains API compatibility while optimizing out the name parameter. - #[cfg(not(feature = "rtos-trace"))] + #[cfg(not(feature = "trace"))] pub fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { self.spawn(token) } -- cgit From 8a8deb704fdd58cecf463f033cd3c3d1cc3534c7 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 11:20:22 -0400 Subject: move spawn_named into trace.rs through TraceExt trait --- embassy-executor/src/raw/trace.rs | 43 +++++++++++++++++++++++++++++++++++++++ embassy-executor/src/spawner.rs | 37 ++------------------------------- 2 files changed, 45 insertions(+), 35 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index fec3a4834..eb960f721 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -87,6 +87,49 @@ use core::sync::atomic::{AtomicUsize, Ordering}; use rtos_trace::TaskInfo; use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; +use crate::spawner::{SpawnError, SpawnToken, Spawner}; + +/// Extension trait adding tracing capabilities to the Spawner +pub trait TraceExt { + /// Spawns a new task with a specified name. + /// + /// # Arguments + /// * `name` - Static string name to associate with the task + /// * `token` - Token representing the task to spawn + /// + /// # Returns + /// Result indicating whether the spawn was successful + fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError>; +} + +#[cfg(feature = "trace")] +impl TraceExt for Spawner { + fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; + core::mem::forget(token); + + match task { + Some(task) => { + task.set_name(Some(name)); + let task_id = task.as_ptr() as u32; + task.set_id(task_id); + + unsafe { self.executor.spawn(task) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } +} + +/// When trace is disabled, spawn_named falls back to regular spawn. +/// This maintains API compatibility while optimizing out the name parameter. +#[cfg(not(feature = "trace"))] +impl TraceExt for Spawner { + fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + self.spawn(token) + } +} /// Global task tracker instance /// diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index a0d246616..6b8db4f8f 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -22,7 +22,7 @@ use super::raw; /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] pub struct SpawnToken { - raw_task: Option, + pub(crate) raw_task: Option, phantom: PhantomData<*mut S>, } @@ -103,7 +103,7 @@ impl core::error::Error for SpawnError {} /// If you want to spawn tasks from another thread, use [SendSpawner]. #[derive(Copy, Clone)] pub struct Spawner { - executor: &'static raw::Executor, + pub(crate) executor: &'static raw::Executor, not_send: PhantomData<*mut ()>, } @@ -154,39 +154,6 @@ impl Spawner { } } - /// Spawns a new task with a specified name. - /// - /// # Arguments - /// * `name` - Static string name to associate with the task - /// * `token` - Token representing the task to spawn - /// - /// # Returns - /// Result indicating whether the spawn was successful - #[cfg(feature = "trace")] - pub fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - let task = token.raw_task; - mem::forget(token); - - match task { - Some(task) => { - task.set_name(Some(name)); - let task_id = task.as_ptr() as u32; - task.set_id(task_id); - - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } - - /// When rtos-trace is disabled, spawn_named falls back to regular spawn. - /// This maintains API compatibility while optimizing out the name parameter. - #[cfg(not(feature = "trace"))] - pub fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - self.spawn(token) - } - // Used by the `embassy_executor_macros::main!` macro to throw an error when spawn // fails. This is here to allow conditional use of `defmt::unwrap!` // without introducing a `defmt` feature in the `embassy_executor_macros` package, -- cgit From 462d04c6d5a0fc6072cf9bdb0faa60da74ff46d2 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 13:34:32 -0400 Subject: move TaskTracker to trace --- embassy-executor/src/raw/mod.rs | 70 ------------------------------------ embassy-executor/src/raw/trace.rs | 74 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 72 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index b4adfe01b..882e4605b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -97,76 +97,6 @@ pub(crate) struct TaskHeader { all_tasks_next: AtomicPtr, } -/// A thread-safe tracker for all tasks in the system -/// -/// This struct uses an intrusive linked list approach to track all tasks -/// without additional memory allocations. It maintains a global list of -/// tasks that can be traversed to find all currently existing tasks. -#[cfg(feature = "trace")] -pub struct TaskTracker { - head: AtomicPtr, -} - -#[cfg(feature = "trace")] -impl TaskTracker { - /// Creates a new empty task tracker - /// - /// Initializes a tracker with no tasks in its list. - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(core::ptr::null_mut()), - } - } - - /// Adds a task to the tracker - /// - /// This method inserts a task at the head of the intrusive linked list. - /// The operation is thread-safe and lock-free, using atomic operations - /// to ensure consistency even when called from different contexts. - /// - /// # Arguments - /// * `task` - The task reference to add to the tracker - pub fn add(&self, task: TaskRef) { - let task_ptr = task.as_ptr() as *mut TaskHeader; - - loop { - let current_head = self.head.load(Ordering::Acquire); - unsafe { - (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); - } - - if self - .head - .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - break; - } - } - } - - /// Performs an operation on each task in the tracker - /// - /// This method traverses the entire list of tasks and calls the provided - /// function for each task. This allows inspecting or processing all tasks - /// in the system without modifying the tracker's structure. - /// - /// # Arguments - /// * `f` - A function to call for each task in the tracker - pub fn for_each(&self, mut f: F) - where - F: FnMut(TaskRef), - { - let mut current = self.head.load(Ordering::Acquire); - while !current.is_null() { - let task = unsafe { TaskRef::from_ptr(current) }; - f(task); - - current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; - } - } -} - /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Clone, Copy, PartialEq)] pub struct TaskRef { diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index eb960f721..b59da0526 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -82,11 +82,11 @@ #![allow(unused)] use core::cell::UnsafeCell; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use rtos_trace::TaskInfo; -use crate::raw::{SyncExecutor, TaskHeader, TaskRef, TaskTracker}; +use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// Extension trait adding tracing capabilities to the Spawner @@ -139,6 +139,76 @@ impl TraceExt for Spawner { #[cfg(feature = "trace")] pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); +/// A thread-safe tracker for all tasks in the system +/// +/// This struct uses an intrusive linked list approach to track all tasks +/// without additional memory allocations. It maintains a global list of +/// tasks that can be traversed to find all currently existing tasks. +#[cfg(feature = "trace")] +pub struct TaskTracker { + head: AtomicPtr, +} + +#[cfg(feature = "trace")] +impl TaskTracker { + /// Creates a new empty task tracker + /// + /// Initializes a tracker with no tasks in its list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } + + /// Adds a task to the tracker + /// + /// This method inserts a task at the head of the intrusive linked list. + /// The operation is thread-safe and lock-free, using atomic operations + /// to ensure consistency even when called from different contexts. + /// + /// # Arguments + /// * `task` - The task reference to add to the tracker + pub fn add(&self, task: TaskRef) { + let task_ptr = task.as_ptr() as *mut TaskHeader; + + loop { + let current_head = self.head.load(Ordering::Acquire); + unsafe { + (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed); + } + + if self + .head + .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + /// Performs an operation on each task in the tracker + /// + /// This method traverses the entire list of tasks and calls the provided + /// function for each task. This allows inspecting or processing all tasks + /// in the system without modifying the tracker's structure. + /// + /// # Arguments + /// * `f` - A function to call for each task in the tracker + pub fn for_each(&self, mut f: F) + where + F: FnMut(TaskRef), + { + let mut current = self.head.load(Ordering::Acquire); + while !current.is_null() { + let task = unsafe { TaskRef::from_ptr(current) }; + f(task); + + current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) }; + } + } +} + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always -- cgit From 3b873bb6bb51b9bdac9272b5ec629a6ac54a89f7 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 13:40:32 -0400 Subject: implement TaskRefTrace for tracing-only fields in TaskRef --- embassy-executor/src/raw/mod.rs | 36 --------------------------- embassy-executor/src/raw/trace.rs | 52 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 36 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 882e4605b..e7a27035a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -149,42 +149,6 @@ impl TaskRef { pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() } - - /// Get the ID for a task - #[cfg(feature = "trace")] - pub fn as_id(self) -> u32 { - self.ptr.as_ptr() as u32 - } - - /// Get the name for a task - #[cfg(feature = "trace")] - pub fn name(&self) -> Option<&'static str> { - self.header().name - } - - /// Set the name for a task - #[cfg(feature = "trace")] - pub fn set_name(&self, name: Option<&'static str>) { - unsafe { - let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; - (*header_ptr).name = name; - } - } - - /// Get the ID for a task - #[cfg(feature = "trace")] - pub fn id(&self) -> u32 { - self.header().id - } - - /// Set the ID for a task - #[cfg(feature = "trace")] - pub fn set_id(&self, id: u32) { - unsafe { - let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; - (*header_ptr).id = id; - } - } } /// Raw storage in which a task can be spawned. diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index b59da0526..b30f23468 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -209,6 +209,58 @@ impl TaskTracker { } } +/// Extension trait for `TaskRef` that provides tracing functionality. +/// +/// This trait is only available when the `trace` feature is enabled. +/// It extends `TaskRef` with methods for accessing and modifying task identifiers +/// and names, which are useful for debugging, logging, and performance analysis. +#[cfg(feature = "trace")] +pub trait TaskRefTrace { + /// Get the ID for a task + fn as_id(self) -> u32; + + /// Get the name for a task + fn name(&self) -> Option<&'static str>; + + /// Set the name for a task + fn set_name(&self, name: Option<&'static str>); + + /// Get the ID for a task + fn id(&self) -> u32; + + /// Set the ID for a task + fn set_id(&self, id: u32); +} + +#[cfg(feature = "trace")] +impl TaskRefTrace for TaskRef { + fn as_id(self) -> u32 { + self.ptr.as_ptr() as u32 + } + + fn name(&self) -> Option<&'static str> { + self.header().name + } + + fn set_name(&self, name: Option<&'static str>) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).name = name; + } + } + + fn id(&self) -> u32 { + self.header().id + } + + fn set_id(&self, id: u32) { + unsafe { + let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; + (*header_ptr).id = id; + } + } +} + #[cfg(not(feature = "rtos-trace"))] extern "Rust" { /// This callback is called when the executor begins polling. This will always -- cgit From 194a3044acb5cd9691ced78596b9fd81e6884667 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 13:41:23 -0400 Subject: remove unused task_id --- embassy-executor/src/raw/trace.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index b30f23468..04e9d234f 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -216,9 +216,6 @@ impl TaskTracker { /// and names, which are useful for debugging, logging, and performance analysis. #[cfg(feature = "trace")] pub trait TaskRefTrace { - /// Get the ID for a task - fn as_id(self) -> u32; - /// Get the name for a task fn name(&self) -> Option<&'static str>; @@ -234,10 +231,6 @@ pub trait TaskRefTrace { #[cfg(feature = "trace")] impl TaskRefTrace for TaskRef { - fn as_id(self) -> u32 { - self.ptr.as_ptr() as u32 - } - fn name(&self) -> Option<&'static str> { self.header().name } @@ -331,8 +324,6 @@ pub(crate) fn poll_start(executor: &SyncExecutor) { #[inline] pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { - let task_id = task.as_ptr() as u32; - #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) @@ -347,8 +338,6 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { - let task_id = task.as_ptr() as u32; - #[cfg(not(feature = "rtos-trace"))] unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) @@ -451,7 +440,7 @@ impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { stack_base: 0, stack_size: 0, }; - rtos_trace::trace::task_send_info(task.as_id(), info); + rtos_trace::trace::task_send_info(task.id(), info); }); } fn time() -> u64 { -- cgit From e968c4763694d676cca6f1bd30949619dd12e962 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 14:03:03 -0400 Subject: update TraceExt trait name for Spawner --- embassy-executor/src/raw/trace.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 04e9d234f..f55033530 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -90,7 +90,7 @@ use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// Extension trait adding tracing capabilities to the Spawner -pub trait TraceExt { +pub trait SpawnerTraceExt { /// Spawns a new task with a specified name. /// /// # Arguments @@ -103,7 +103,7 @@ pub trait TraceExt { } #[cfg(feature = "trace")] -impl TraceExt for Spawner { +impl SpawnerTraceExt for Spawner { fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { let task = token.raw_task; core::mem::forget(token); @@ -125,7 +125,7 @@ impl TraceExt for Spawner { /// When trace is disabled, spawn_named falls back to regular spawn. /// This maintains API compatibility while optimizing out the name parameter. #[cfg(not(feature = "trace"))] -impl TraceExt for Spawner { +impl SpawnerTraceExt for Spawner { fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { self.spawn(token) } -- cgit From dfaab013ebaaa4a19c06f2eb00821712ff13cf7a Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 14:35:43 -0400 Subject: move SpawnerTraceExt back into Spawner --- embassy-executor/src/raw/trace.rs | 42 -------------------------------- embassy-executor/src/spawner.rs | 50 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 42 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index f55033530..593f7b0ba 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -89,48 +89,6 @@ use rtos_trace::TaskInfo; use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; use crate::spawner::{SpawnError, SpawnToken, Spawner}; -/// Extension trait adding tracing capabilities to the Spawner -pub trait SpawnerTraceExt { - /// Spawns a new task with a specified name. - /// - /// # Arguments - /// * `name` - Static string name to associate with the task - /// * `token` - Token representing the task to spawn - /// - /// # Returns - /// Result indicating whether the spawn was successful - fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError>; -} - -#[cfg(feature = "trace")] -impl SpawnerTraceExt for Spawner { - fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - let task = token.raw_task; - core::mem::forget(token); - - match task { - Some(task) => { - task.set_name(Some(name)); - let task_id = task.as_ptr() as u32; - task.set_id(task_id); - - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } -} - -/// When trace is disabled, spawn_named falls back to regular spawn. -/// This maintains API compatibility while optimizing out the name parameter. -#[cfg(not(feature = "trace"))] -impl SpawnerTraceExt for Spawner { - fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - self.spawn(token) - } -} - /// Global task tracker instance /// /// This static provides access to the global task tracker which maintains diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 6b8db4f8f..bfb32ebcc 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -6,6 +6,9 @@ use core::task::Poll; use super::raw; +#[cfg(feature = "trace")] +use crate::raw::trace::TaskRefTrace; + /// Token to spawn a newly-created task in an executor. /// /// When calling a task function (like `#[embassy_executor::task] async fn my_task() { ... }`), the returned @@ -180,6 +183,53 @@ impl Spawner { } } +/// Extension trait adding tracing capabilities to the Spawner +/// +/// This trait provides an additional method to spawn tasks with an associated name, +/// which can be useful for debugging and tracing purposes. +pub trait SpawnerTraceExt { + /// Spawns a new task with a specified name. + /// + /// # Arguments + /// * `name` - Static string name to associate with the task + /// * `token` - Token representing the task to spawn + /// + /// # Returns + /// Result indicating whether the spawn was successful + fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError>; +} + +/// Implementation of the SpawnerTraceExt trait for Spawner when trace is enabled +#[cfg(feature = "trace")] +impl SpawnerTraceExt for Spawner { + fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + let task = token.raw_task; + core::mem::forget(token); + + match task { + Some(task) => { + // Set the name and ID when trace is enabled + task.set_name(Some(name)); + let task_id = task.as_ptr() as u32; + task.set_id(task_id); + + unsafe { self.executor.spawn(task) }; + Ok(()) + } + None => Err(SpawnError::Busy), + } + } +} + +/// Implementation of the SpawnerTraceExt trait for Spawner when trace is disabled +#[cfg(not(feature = "trace"))] +impl SpawnerTraceExt for Spawner { + fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { + // When trace is disabled, just forward to regular spawn and ignore the name + self.spawn(token) + } +} + /// Handle to spawn tasks into an executor from any thread. /// /// This Spawner can be used from any thread (it is Send), but it can -- cgit From 3ffa2e4f3f9ecbca8637ae1603194a63d55b4396 Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 16:30:06 -0400 Subject: remove unnecessary trace flags --- embassy-executor/src/raw/trace.rs | 7 ------- 1 file changed, 7 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 593f7b0ba..6c9cfda25 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -94,7 +94,6 @@ use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// This static provides access to the global task tracker which maintains /// a list of all tasks in the system. It's automatically updated by the /// task lifecycle hooks in the trace module. -#[cfg(feature = "trace")] pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); /// A thread-safe tracker for all tasks in the system @@ -102,12 +101,10 @@ pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); /// This struct uses an intrusive linked list approach to track all tasks /// without additional memory allocations. It maintains a global list of /// tasks that can be traversed to find all currently existing tasks. -#[cfg(feature = "trace")] pub struct TaskTracker { head: AtomicPtr, } -#[cfg(feature = "trace")] impl TaskTracker { /// Creates a new empty task tracker /// @@ -172,7 +169,6 @@ impl TaskTracker { /// This trait is only available when the `trace` feature is enabled. /// It extends `TaskRef` with methods for accessing and modifying task identifiers /// and names, which are useful for debugging, logging, and performance analysis. -#[cfg(feature = "trace")] pub trait TaskRefTrace { /// Get the name for a task fn name(&self) -> Option<&'static str>; @@ -187,7 +183,6 @@ pub trait TaskRefTrace { fn set_id(&self, id: u32); } -#[cfg(feature = "trace")] impl TaskRefTrace for TaskRef { fn name(&self) -> Option<&'static str> { self.header().name @@ -350,7 +345,6 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { /// /// # Returns /// An iterator that yields `TaskRef` items for each task -#[cfg(feature = "trace")] fn get_all_active_tasks() -> impl Iterator + 'static { struct TaskIterator<'a> { tracker: &'a TaskTracker, @@ -379,7 +373,6 @@ fn get_all_active_tasks() -> impl Iterator + 'static { } /// Perform an action on each active task -#[cfg(feature = "trace")] fn with_all_active_tasks(f: F) where F: FnMut(TaskRef), -- cgit From ebb6132f5f9c55ad4ced2602134f8e2c69135c1e Mon Sep 17 00:00:00 2001 From: Kat Perez Date: Thu, 8 May 2025 16:31:47 -0400 Subject: rustfmt --- embassy-executor/src/spawner.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index bfb32ebcc..522d97db3 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -5,7 +5,6 @@ use core::sync::atomic::Ordering; use core::task::Poll; use super::raw; - #[cfg(feature = "trace")] use crate::raw::trace::TaskRefTrace; -- cgit From 0f9a7a057fb7dfb2358acec9068fece82c7c7a89 Mon Sep 17 00:00:00 2001 From: Johan Anderholm Date: Sat, 30 Dec 2023 11:54:16 +0100 Subject: executor: Make state implementations and their conditions match Use u8 for state_atomics and state_critical_section since that is all that is needed. Change arm condition to "32" since that is what is used and required. --- embassy-executor/src/raw/mod.rs | 2 +- embassy-executor/src/raw/state_atomics.rs | 10 +++++----- embassy-executor/src/raw/state_critical_section.rs | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e7a27035a..913da2e25 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -11,7 +11,7 @@ #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; -#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] +#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")] #[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] mod state; diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index b6576bfc2..e813548ae 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -1,4 +1,4 @@ -use core::sync::atomic::{AtomicU32, Ordering}; +use core::sync::atomic::{AtomicU8, Ordering}; #[derive(Clone, Copy)] pub(crate) struct Token(()); @@ -11,18 +11,18 @@ pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { } /// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +pub(crate) const STATE_SPAWNED: u8 = 1 << 0; /// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1; pub(crate) struct State { - state: AtomicU32, + state: AtomicU8, } impl State { pub const fn new() -> State { Self { - state: AtomicU32::new(0), + state: AtomicU8::new(0), } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 6b627ff79..ec08f2f58 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -4,12 +4,12 @@ pub(crate) use critical_section::{with as locked, CriticalSection as Token}; use critical_section::{CriticalSection, Mutex}; /// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +pub(crate) const STATE_SPAWNED: u8 = 1 << 0; /// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1; pub(crate) struct State { - state: Mutex>, + state: Mutex>, } impl State { @@ -19,11 +19,11 @@ impl State { } } - fn update(&self, f: impl FnOnce(&mut u32) -> R) -> R { + fn update(&self, f: impl FnOnce(&mut u8) -> R) -> R { critical_section::with(|cs| self.update_with_cs(cs, f)) } - fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R { + fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u8) -> R) -> R { let s = self.state.borrow(cs); let mut val = s.get(); let r = f(&mut val); -- cgit From 5a07ea5d851768223e2e41342e69d14c1afb2b2b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sun, 9 Mar 2025 20:55:11 +0100 Subject: Add support for Cortex-A/R --- embassy-executor/src/arch/cortex_ar.rs | 84 ++++++++++++++++++++++++++++++++++ embassy-executor/src/lib.rs | 2 + 2 files changed, 86 insertions(+) create mode 100644 embassy-executor/src/arch/cortex_ar.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/arch/cortex_ar.rs b/embassy-executor/src/arch/cortex_ar.rs new file mode 100644 index 000000000..f9e2f3f7c --- /dev/null +++ b/embassy-executor/src/arch/cortex_ar.rs @@ -0,0 +1,84 @@ +#[cfg(feature = "executor-interrupt")] +compile_error!("`executor-interrupt` is not supported with `arch-cortex-ar`."); + +#[export_name = "__pender"] +#[cfg(any(feature = "executor-thread", feature = "executor-interrupt"))] +fn __pender(context: *mut ()) { + // `context` is always `usize::MAX` created by `Executor::run`. + let context = context as usize; + + #[cfg(feature = "executor-thread")] + // Try to make Rust optimize the branching away if we only use thread mode. + if !cfg!(feature = "executor-interrupt") || context == THREAD_PENDER { + cortex_ar::asm::sev(); + return; + } +} + +#[cfg(feature = "executor-thread")] +pub use thread::*; +#[cfg(feature = "executor-thread")] +mod thread { + pub(super) const THREAD_PENDER: usize = usize::MAX; + + use core::marker::PhantomData; + + use cortex_ar::asm::wfe; + pub use embassy_executor_macros::main_cortex_ar as main; + + use crate::{raw, Spawner}; + + /// Thread mode executor, using WFE/SEV. + /// + /// This is the simplest and most common kind of executor. It runs on + /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction + /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction + /// is executed, to make the `WFE` exit from sleep and poll the task. + /// + /// This executor allows for ultra low power consumption for chips where `WFE` + /// triggers low-power sleep without extra steps. If your chip requires extra steps, + /// you may use [`raw::Executor`] directly to program custom behavior. + pub struct Executor { + inner: raw::Executor, + not_send: PhantomData<*mut ()>, + } + + impl Executor { + /// Create a new Executor. + pub fn new() -> Self { + Self { + inner: raw::Executor::new(THREAD_PENDER as *mut ()), + not_send: PhantomData, + } + } + + /// Run the executor. + /// + /// The `init` closure is called with a [`Spawner`] that spawns tasks on + /// this executor. Use it to spawn the initial task(s). After `init` returns, + /// the executor starts running the tasks. + /// + /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), + /// for example by passing it as an argument to the initial tasks. + /// + /// This function requires `&'static mut self`. This means you have to store the + /// Executor instance in a place where it'll live forever and grants you mutable + /// access. There's a few ways to do this: + /// + /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) + /// - a `static mut` (unsafe) + /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) + /// + /// This function never returns. + pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + init(self.inner.spawner()); + + loop { + unsafe { + self.inner.poll(); + } + wfe(); + } + } + } +} diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index d6fd3d651..dfe420bab 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -26,6 +26,7 @@ macro_rules! check_at_most_one { check_at_most_one!( "arch-avr", "arch-cortex-m", + "arch-cortex-ar", "arch-riscv32", "arch-std", "arch-wasm", @@ -35,6 +36,7 @@ check_at_most_one!( #[cfg(feature = "_arch")] #[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")] #[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")] +#[cfg_attr(feature = "arch-cortex-ar", path = "arch/cortex_ar.rs")] #[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] #[cfg_attr(feature = "arch-std", path = "arch/std.rs")] #[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] -- cgit From a4d4f62a1e0e808ec3dd93e282f517a2f8ad9fa5 Mon Sep 17 00:00:00 2001 From: Matthew Tran <0e4ef622@gmail.com> Date: Wed, 28 May 2025 22:00:25 -0500 Subject: Allow `-> impl Future` in #[task] --- embassy-executor/src/lib.rs | 47 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index dfe420bab..70abfcc3a 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -65,8 +65,17 @@ pub mod _export { use crate::raw::TaskPool; + trait TaskReturnValue {} + impl TaskReturnValue for () {} + impl TaskReturnValue for Never {} + + #[diagnostic::on_unimplemented( + message = "task function futures must resolve to `()`", + note = "use `async fn` or change the return type to `impl Future`" + )] + #[allow(private_bounds)] pub trait TaskFn: Copy { - type Fut: Future + 'static; + type Fut: Future + 'static; } macro_rules! task_fn_impl { @@ -74,7 +83,7 @@ pub mod _export { impl TaskFn<($($Tn,)*)> for F where F: Copy + FnOnce($($Tn,)*) -> Fut, - Fut: Future + 'static, + Fut: Future + 'static, { type Fut = Fut; } @@ -205,4 +214,38 @@ pub mod _export { Align268435456: 268435456, Align536870912: 536870912, ); + + #[allow(dead_code)] + trait HasOutput { + type Output; + } + + impl HasOutput for fn() -> O { + type Output = O; + } + + #[allow(dead_code)] + type Never = ! as HasOutput>::Output; +} + +/// Implementation details for embassy macros. +/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. +#[doc(hidden)] +#[cfg(feature = "nightly")] +pub mod _export { + pub trait TaskReturnValue {} + impl TaskReturnValue for () {} + impl TaskReturnValue for Never {} + + #[allow(dead_code)] + trait HasOutput { + type Output; + } + + impl HasOutput for fn() -> O { + type Output = O; + } + + #[allow(dead_code)] + type Never = ! as HasOutput>::Output; } -- cgit From b06a708f81d208236763a121797807fd5b48aee6 Mon Sep 17 00:00:00 2001 From: Matthew Tran <0e4ef622@gmail.com> Date: Thu, 29 May 2025 05:54:25 -0500 Subject: Mention ! in diagnostic --- embassy-executor/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index 70abfcc3a..e26e8ee7d 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -70,7 +70,7 @@ pub mod _export { impl TaskReturnValue for Never {} #[diagnostic::on_unimplemented( - message = "task function futures must resolve to `()`", + message = "task futures must resolve to `()` or `!`", note = "use `async fn` or change the return type to `impl Future`" )] #[allow(private_bounds)] -- cgit From 0d83fbbb57cf17186a1b8f40f57ef7a35b3e9627 Mon Sep 17 00:00:00 2001 From: Matthew Tran <0e4ef622@gmail.com> Date: Sun, 1 Jun 2025 10:32:24 -0500 Subject: Add diagnostic::on_unimplemented for nightly --- embassy-executor/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index e26e8ee7d..e174a0594 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -233,6 +233,10 @@ pub mod _export { #[doc(hidden)] #[cfg(feature = "nightly")] pub mod _export { + #[diagnostic::on_unimplemented( + message = "task futures must resolve to `()` or `!`", + note = "use `async fn` or change the return type to `impl Future`" + )] pub trait TaskReturnValue {} impl TaskReturnValue for () {} impl TaskReturnValue for Never {} -- cgit From b861dd172829c5b34e95644287544e090dd9f568 Mon Sep 17 00:00:00 2001 From: Florian Grandel Date: Sat, 5 Jul 2025 18:27:46 +0200 Subject: embassy-executor: rtos-trace: fix task naming for new tasks Tasks that are spawned after starting SystemViewer were not named. This change ensures that tasks spawned while SystemViewer is running will be properly named, too. Signed-off-by: Florian Grandel --- embassy-executor/src/raw/trace.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 6c9cfda25..aa27ab37e 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -283,7 +283,17 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { } #[cfg(feature = "rtos-trace")] - rtos_trace::trace::task_new(task.as_ptr() as u32); + { + rtos_trace::trace::task_new(task.as_ptr() as u32); + let name = task.name().unwrap_or("unnamed task\0"); + let info = rtos_trace::TaskInfo { + name, + priority: 0, + stack_base: 0, + stack_size: 0, + }; + rtos_trace::trace::task_send_info(task.id(), info); + } #[cfg(feature = "rtos-trace")] TASK_TRACKER.add(*task); -- cgit From 0c136c7b050ded4bf660ea7a50381698ab9d5f09 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 8 Jul 2025 22:39:53 +0200 Subject: executor: mark Spawner::for_current_executor() as unsafe. It's unsound with manually-created Contexts, see https://github.com/embassy-rs/embassy/issues/4379 --- embassy-executor/src/spawner.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 522d97db3..2909d19a0 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -122,10 +122,26 @@ impl Spawner { /// This function is `async` just to get access to the current async /// context. It returns instantly, it does not block/yield. /// + /// Using this method is discouraged due to it being unsafe. Consider the following + /// alternatives instead: + /// + /// - Pass the initial `Spawner` as an argument to tasks. Note that it's `Copy`, so you can + /// make as many copies of it as you want. + /// - Use `SendSpawner::for_current_executor()` instead, which is safe but can only be used + /// if task arguments are `Send`. + /// + /// The only case where using this method is absolutely required is obtaining the `Spawner` + /// for an `InterruptExecutor`. + /// + /// # Safety + /// + /// You must only execute this with an async `Context` created by the Embassy executor. + /// You must not execute it with manually-created `Context`s. + /// /// # Panics /// /// Panics if the current executor is not an Embassy executor. - pub fn for_current_executor() -> impl Future { + pub unsafe fn for_current_executor() -> impl Future { poll_fn(|cx| { let task = raw::task_from_waker(cx.waker()); let executor = unsafe { -- cgit From 539ff78ebbdedbb75d0faf940e3ee69f5e7f276a Mon Sep 17 00:00:00 2001 From: Brezak Date: Wed, 23 Jul 2025 19:51:31 +0200 Subject: embassy-executor: explicitly return impl Future in task inner task --- embassy-executor/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index e174a0594..0747db032 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -216,7 +216,7 @@ pub mod _export { ); #[allow(dead_code)] - trait HasOutput { + pub trait HasOutput { type Output; } @@ -225,7 +225,7 @@ pub mod _export { } #[allow(dead_code)] - type Never = ! as HasOutput>::Output; + pub type Never = ! as HasOutput>::Output; } /// Implementation details for embassy macros. @@ -242,7 +242,7 @@ pub mod _export { impl TaskReturnValue for Never {} #[allow(dead_code)] - trait HasOutput { + pub trait HasOutput { type Output; } @@ -251,5 +251,5 @@ pub mod _export { } #[allow(dead_code)] - type Never = ! as HasOutput>::Output; + pub type Never = ! as HasOutput>::Output; } -- cgit From 98595f659c309703aab411b6b3be7579b6e93c5d Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 28 Jul 2025 15:37:34 +0200 Subject: `embassy-time`: add missing `Debug` & `defmt::Format` derives `defmt::Format` is *not* implemented for `MockDriver` and `InnerMockDriver` because the former contains the latter and the latter is using `Queue` from `embassy-time-queue-utils` which so far does not have a `defmt` dependency. since this is just a mock driver it shouldn't be relevant if it has no `defmt::Format` impl. --- embassy-executor/src/raw/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 913da2e25..c8f1f46c2 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -98,7 +98,7 @@ pub(crate) struct TaskHeader { } /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. -#[derive(Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] pub struct TaskRef { ptr: NonNull, } -- cgit From b7964df8875384269ac4d2a52ae149e25abecbaf Mon Sep 17 00:00:00 2001 From: Bart Slinger Date: Thu, 14 Aug 2025 13:54:43 +0800 Subject: add missing feature gate for rtos-trace --- embassy-executor/src/raw/trace.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index aa27ab37e..f484abf58 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -84,6 +84,7 @@ use core::cell::UnsafeCell; use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +#[cfg(feature = "rtos-trace")] use rtos_trace::TaskInfo; use crate::raw::{SyncExecutor, TaskHeader, TaskRef}; -- cgit From 74037f04933f4ec9a678e0b47fd6819e7c0489a9 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 4 Aug 2025 00:05:25 +0200 Subject: Make TimerQueueItem opaque --- embassy-executor/src/raw/mod.rs | 33 ++++++++------- embassy-executor/src/raw/timer_queue.rs | 73 --------------------------------- 2 files changed, 16 insertions(+), 90 deletions(-) delete mode 100644 embassy-executor/src/raw/timer_queue.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index c8f1f46c2..8e783b2af 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -16,7 +16,6 @@ mod run_queue; #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] mod state; -pub mod timer_queue; #[cfg(feature = "trace")] pub mod trace; pub(crate) mod util; @@ -31,8 +30,9 @@ use core::ptr::NonNull; #[cfg(not(feature = "arch-avr"))] use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering; -use core::task::{Context, Poll}; +use core::task::{Context, Poll, Waker}; +use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; @@ -42,6 +42,11 @@ use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; +#[no_mangle] +extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem { + unsafe { task_from_waker(waker).timer_queue_item() } +} + /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: @@ -88,7 +93,7 @@ pub(crate) struct TaskHeader { poll_fn: SyncUnsafeCell>, /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. - pub(crate) timer_queue_item: timer_queue::TimerQueueItem, + pub(crate) timer_queue_item: TimerQueueItem, #[cfg(feature = "trace")] pub(crate) name: Option<&'static str>, #[cfg(feature = "trace")] @@ -120,16 +125,6 @@ impl TaskRef { } } - /// # Safety - /// - /// The result of this function must only be compared - /// for equality, or stored, but not used. - pub const unsafe fn dangling() -> Self { - Self { - ptr: NonNull::dangling(), - } - } - pub(crate) fn header(self) -> &'static TaskHeader { unsafe { self.ptr.as_ref() } } @@ -140,9 +135,13 @@ impl TaskRef { executor.as_ref().map(|e| Executor::wrap(e)) } - /// Returns a reference to the timer queue item. - pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { - &self.header().timer_queue_item + /// Returns a mutable reference to the timer queue item. + /// + /// Safety + /// + /// This function must only be called in the context of the integrated timer queue. + unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem { + unsafe { &mut self.ptr.as_mut().timer_queue_item } } /// The returned pointer is valid for the entire TaskStorage. @@ -189,7 +188,7 @@ impl TaskStorage { // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), - timer_queue_item: timer_queue::TimerQueueItem::new(), + timer_queue_item: TimerQueueItem::new(), #[cfg(feature = "trace")] name: None, #[cfg(feature = "trace")] diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs deleted file mode 100644 index e52453be4..000000000 --- a/embassy-executor/src/raw/timer_queue.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! Timer queue operations. - -use core::cell::Cell; - -use super::TaskRef; - -#[cfg(feature = "_timer-item-payload")] -macro_rules! define_opaque { - ($size:tt) => { - /// An opaque data type. - #[repr(align($size))] - pub struct OpaqueData { - data: [u8; $size], - } - - impl OpaqueData { - const fn new() -> Self { - Self { data: [0; $size] } - } - - /// Access the data as a reference to a type `T`. - /// - /// Safety: - /// - /// The caller must ensure that the size of the type `T` is less than, or equal to - /// the size of the payload, and must ensure that the alignment of the type `T` is - /// less than, or equal to the alignment of the payload. - /// - /// The type must be valid when zero-initialized. - pub unsafe fn as_ref(&self) -> &T { - &*(self.data.as_ptr() as *const T) - } - } - }; -} - -#[cfg(feature = "timer-item-payload-size-1")] -define_opaque!(1); -#[cfg(feature = "timer-item-payload-size-2")] -define_opaque!(2); -#[cfg(feature = "timer-item-payload-size-4")] -define_opaque!(4); -#[cfg(feature = "timer-item-payload-size-8")] -define_opaque!(8); - -/// An item in the timer queue. -pub struct TimerQueueItem { - /// The next item in the queue. - /// - /// If this field contains `Some`, the item is in the queue. The last item in the queue has a - /// value of `Some(dangling_pointer)` - pub next: Cell>, - - /// The time at which this item expires. - pub expires_at: Cell, - - /// Some implementation-defined, zero-initialized piece of data. - #[cfg(feature = "_timer-item-payload")] - pub payload: OpaqueData, -} - -unsafe impl Sync for TimerQueueItem {} - -impl TimerQueueItem { - pub(crate) const fn new() -> Self { - Self { - next: Cell::new(None), - expires_at: Cell::new(0), - #[cfg(feature = "_timer-item-payload")] - payload: OpaqueData::new(), - } - } -} -- cgit From 1bf3a44e5d9a6709eb0ce1dc518de82a64a72a05 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 17 Aug 2025 11:45:53 +0200 Subject: Retain timer_queue_item --- embassy-executor/src/raw/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 8e783b2af..4b17d4982 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -140,7 +140,7 @@ impl TaskRef { /// Safety /// /// This function must only be called in the context of the integrated timer queue. - unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem { + pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem { unsafe { &mut self.ptr.as_mut().timer_queue_item } } -- cgit From 658a52fb99e47d3d2f08ebf66335774930ad35ac Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 8 Jul 2025 23:29:31 +0200 Subject: executor: do not store task IDs in RAM, we can get it from the pointer every time. --- embassy-executor/src/metadata.rs | 1 + embassy-executor/src/raw/mod.rs | 10 ++++++---- embassy-executor/src/raw/trace.rs | 17 ----------------- embassy-executor/src/spawner.rs | 10 ++++------ 4 files changed, 11 insertions(+), 27 deletions(-) create mode 100644 embassy-executor/src/metadata.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs new file mode 100644 index 000000000..957417f6b --- /dev/null +++ b/embassy-executor/src/metadata.rs @@ -0,0 +1 @@ +pub struct Metadata {} diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 4b17d4982..bcd4ee432 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -97,8 +97,6 @@ pub(crate) struct TaskHeader { #[cfg(feature = "trace")] pub(crate) name: Option<&'static str>, #[cfg(feature = "trace")] - pub(crate) id: u32, - #[cfg(feature = "trace")] all_tasks_next: AtomicPtr, } @@ -148,6 +146,12 @@ impl TaskRef { pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() } + + /// Returns the task ID. + /// This can be used in combination with rtos-trace to match task names with IDs + pub fn id(&self) -> u32 { + self.as_ptr() as u32 + } } /// Raw storage in which a task can be spawned. @@ -192,8 +196,6 @@ impl TaskStorage { #[cfg(feature = "trace")] name: None, #[cfg(feature = "trace")] - id: 0, - #[cfg(feature = "trace")] all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), }, future: UninitCell::uninit(), diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index f484abf58..e769d63da 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -176,12 +176,6 @@ pub trait TaskRefTrace { /// Set the name for a task fn set_name(&self, name: Option<&'static str>); - - /// Get the ID for a task - fn id(&self) -> u32; - - /// Set the ID for a task - fn set_id(&self, id: u32); } impl TaskRefTrace for TaskRef { @@ -195,17 +189,6 @@ impl TaskRefTrace for TaskRef { (*header_ptr).name = name; } } - - fn id(&self) -> u32 { - self.header().id - } - - fn set_id(&self, id: u32) { - unsafe { - let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; - (*header_ptr).id = id; - } - } } #[cfg(not(feature = "rtos-trace"))] diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 2909d19a0..7550e8ea4 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -36,12 +36,12 @@ impl SpawnToken { } } - /// Returns the task id if available, otherwise 0 - /// This can be used in combination with rtos-trace to match task names with id's + /// Returns the task ID if available, otherwise 0 + /// This can be used in combination with rtos-trace to match task names with IDs pub fn id(&self) -> u32 { match self.raw_task { None => 0, - Some(t) => t.as_ptr() as u32, + Some(t) => t.id(), } } @@ -223,10 +223,8 @@ impl SpawnerTraceExt for Spawner { match task { Some(task) => { - // Set the name and ID when trace is enabled + // Set the name when trace is enabled task.set_name(Some(name)); - let task_id = task.as_ptr() as u32; - task.set_id(task_id); unsafe { self.executor.spawn(task) }; Ok(()) -- cgit From 2ba34ce2178d576f339f0b0dac70ac125f81cc5b Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 8 Jul 2025 23:36:51 +0200 Subject: executor: allow trace and rtos-trace to coexist additively. Before, enabling `trace` would enable embassy-native tracing, and enabling *both* would *disable* embassy-native tracing. --- embassy-executor/src/raw/mod.rs | 26 +++++++++++++------------- embassy-executor/src/raw/trace.rs | 25 +++++++++++++++---------- 2 files changed, 28 insertions(+), 23 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index bcd4ee432..87328df5a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -16,7 +16,7 @@ mod run_queue; #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] mod state; -#[cfg(feature = "trace")] +#[cfg(feature = "_any_trace")] pub mod trace; pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] @@ -94,9 +94,9 @@ pub(crate) struct TaskHeader { /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. pub(crate) timer_queue_item: TimerQueueItem, - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] pub(crate) name: Option<&'static str>, - #[cfg(feature = "trace")] + #[cfg(feature = "rtos-trace")] all_tasks_next: AtomicPtr, } @@ -193,9 +193,9 @@ impl TaskStorage { poll_fn: SyncUnsafeCell::new(None), timer_queue_item: TimerQueueItem::new(), - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] name: None, - #[cfg(feature = "trace")] + #[cfg(feature = "rtos-trace")] all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), }, future: UninitCell::uninit(), @@ -231,7 +231,7 @@ impl TaskStorage { let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed); // As the future has finished and this function will not be called @@ -246,7 +246,7 @@ impl TaskStorage { // after we're done with it. this.raw.state.despawn(); - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::task_end(exec_ptr, &p); } Poll::Pending => {} @@ -419,7 +419,7 @@ impl SyncExecutor { /// - `task` must NOT be already enqueued (in this executor or another one). #[inline(always)] unsafe fn enqueue(&self, task: TaskRef, l: state::Token) { - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::task_ready_begin(self, &task); if self.run_queue.enqueue(task, l) { @@ -432,7 +432,7 @@ impl SyncExecutor { .executor .store((self as *const Self).cast_mut(), Ordering::Relaxed); - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::task_new(self, &task); state::locked(|l| { @@ -444,23 +444,23 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::poll_start(self); self.run_queue.dequeue_all(|p| { let task = p.header(); - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::task_exec_begin(self, &p); // Run the task task.poll_fn.get().unwrap_unchecked()(p); - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::task_exec_end(self, &p); }); - #[cfg(feature = "trace")] + #[cfg(feature = "_any_trace")] trace::executor_idle(self) } } diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index e769d63da..636608d02 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -95,17 +95,20 @@ use crate::spawner::{SpawnError, SpawnToken, Spawner}; /// This static provides access to the global task tracker which maintains /// a list of all tasks in the system. It's automatically updated by the /// task lifecycle hooks in the trace module. -pub static TASK_TRACKER: TaskTracker = TaskTracker::new(); +#[cfg(feature = "rtos-trace")] +pub(crate) static TASK_TRACKER: TaskTracker = TaskTracker::new(); /// A thread-safe tracker for all tasks in the system /// /// This struct uses an intrusive linked list approach to track all tasks /// without additional memory allocations. It maintains a global list of /// tasks that can be traversed to find all currently existing tasks. -pub struct TaskTracker { +#[cfg(feature = "rtos-trace")] +pub(crate) struct TaskTracker { head: AtomicPtr, } +#[cfg(feature = "rtos-trace")] impl TaskTracker { /// Creates a new empty task tracker /// @@ -191,7 +194,7 @@ impl TaskRefTrace for TaskRef { } } -#[cfg(not(feature = "rtos-trace"))] +#[cfg(feature = "trace")] extern "Rust" { /// This callback is called when the executor begins polling. This will always /// be paired with a later call to `_embassy_trace_executor_idle`. @@ -253,7 +256,7 @@ extern "Rust" { #[inline] pub(crate) fn poll_start(executor: &SyncExecutor) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_poll_start(executor as *const _ as u32) } @@ -261,7 +264,7 @@ pub(crate) fn poll_start(executor: &SyncExecutor) { #[inline] pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) } @@ -285,7 +288,7 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) } @@ -293,7 +296,7 @@ pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32) } @@ -303,7 +306,7 @@ pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32) } @@ -313,7 +316,7 @@ pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32) } @@ -323,7 +326,7 @@ pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) { #[inline] pub(crate) fn executor_idle(executor: &SyncExecutor) { - #[cfg(not(feature = "rtos-trace"))] + #[cfg(feature = "trace")] unsafe { _embassy_trace_executor_idle(executor as *const _ as u32) } @@ -339,6 +342,7 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { /// /// # Returns /// An iterator that yields `TaskRef` items for each task +#[cfg(feature = "rtos-trace")] fn get_all_active_tasks() -> impl Iterator + 'static { struct TaskIterator<'a> { tracker: &'a TaskTracker, @@ -367,6 +371,7 @@ fn get_all_active_tasks() -> impl Iterator + 'static { } /// Perform an action on each active task +#[cfg(feature = "rtos-trace")] fn with_all_active_tasks(f: F) where F: FnMut(TaskRef), -- cgit From da9cdf0c536ec4fa7bdfb649750c44f70ef1cd55 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 9 Jul 2025 01:18:04 +0200 Subject: executor: add "task metadata" concept, make name a task metadata. --- embassy-executor/src/lib.rs | 3 ++ embassy-executor/src/metadata.rs | 56 ++++++++++++++++++++++++++++++++- embassy-executor/src/raw/mod.rs | 14 ++++++--- embassy-executor/src/raw/trace.rs | 29 +---------------- embassy-executor/src/spawner.rs | 66 ++++++++------------------------------- 5 files changed, 82 insertions(+), 86 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index 0747db032..e47b8eb9f 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs @@ -54,6 +54,9 @@ pub mod raw; mod spawner; pub use spawner::*; +mod metadata; +pub use metadata::*; + /// Implementation details for embassy macros. /// Do not use. Used for macros and HALs only. Not covered by semver guarantees. #[doc(hidden)] diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs index 957417f6b..f92c9b37c 100644 --- a/embassy-executor/src/metadata.rs +++ b/embassy-executor/src/metadata.rs @@ -1 +1,55 @@ -pub struct Metadata {} +#[cfg(feature = "metadata-name")] +use core::cell::Cell; +use core::future::{poll_fn, Future}; +use core::task::Poll; + +#[cfg(feature = "metadata-name")] +use critical_section::Mutex; + +use crate::raw; + +/// Metadata associated with a task. +pub struct Metadata { + #[cfg(feature = "metadata-name")] + name: Mutex>>, +} + +impl Metadata { + pub(crate) const fn new() -> Self { + Self { + #[cfg(feature = "metadata-name")] + name: Mutex::new(Cell::new(None)), + } + } + + pub(crate) fn reset(&self) { + #[cfg(feature = "metadata-name")] + critical_section::with(|cs| self.name.borrow(cs).set(None)); + } + + /// Get the metadata for the current task. + /// + /// You can use this to read or modify the current task's metadata. + /// + /// This function is `async` just to get access to the current async + /// context. It returns instantly, it does not block/yield. + pub fn for_current_task() -> impl Future { + poll_fn(|cx| Poll::Ready(raw::task_from_waker(cx.waker()).metadata())) + } + + /// Get this task's name + /// + /// NOTE: this takes a critical section. + #[cfg(feature = "metadata-name")] + pub fn name(&self) -> Option<&'static str> { + critical_section::with(|cs| self.name.borrow(cs).get()) + } + + /// Set this task's name + /// + /// NOTE: this takes a critical section. + #[cfg(feature = "metadata-name")] + pub fn set_name(&self, name: &'static str) { + critical_section::with(|cs| self.name.borrow(cs).set(Some(name))) + } +} diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 87328df5a..a7e65360d 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -41,6 +41,7 @@ use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; +use crate::Metadata; #[no_mangle] extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem { @@ -94,8 +95,9 @@ pub(crate) struct TaskHeader { /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. pub(crate) timer_queue_item: TimerQueueItem, - #[cfg(feature = "_any_trace")] - pub(crate) name: Option<&'static str>, + + pub(crate) metadata: Metadata, + #[cfg(feature = "rtos-trace")] all_tasks_next: AtomicPtr, } @@ -127,6 +129,10 @@ impl TaskRef { unsafe { self.ptr.as_ref() } } + pub(crate) fn metadata(self) -> &'static Metadata { + unsafe { &self.ptr.as_ref().metadata } + } + /// Returns a reference to the executor that the task is currently running on. pub unsafe fn executor(self) -> Option<&'static Executor> { let executor = self.header().executor.load(Ordering::Relaxed); @@ -193,8 +199,7 @@ impl TaskStorage { poll_fn: SyncUnsafeCell::new(None), timer_queue_item: TimerQueueItem::new(), - #[cfg(feature = "_any_trace")] - name: None, + metadata: Metadata::new(), #[cfg(feature = "rtos-trace")] all_tasks_next: AtomicPtr::new(core::ptr::null_mut()), }, @@ -281,6 +286,7 @@ impl AvailableTask { fn initialize_impl(self, future: impl FnOnce() -> F) -> SpawnToken { unsafe { + self.task.raw.metadata.reset(); self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); self.task.future.write_in_place(future); diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index 636608d02..ab0c1b8b6 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -168,32 +168,6 @@ impl TaskTracker { } } -/// Extension trait for `TaskRef` that provides tracing functionality. -/// -/// This trait is only available when the `trace` feature is enabled. -/// It extends `TaskRef` with methods for accessing and modifying task identifiers -/// and names, which are useful for debugging, logging, and performance analysis. -pub trait TaskRefTrace { - /// Get the name for a task - fn name(&self) -> Option<&'static str>; - - /// Set the name for a task - fn set_name(&self, name: Option<&'static str>); -} - -impl TaskRefTrace for TaskRef { - fn name(&self) -> Option<&'static str> { - self.header().name - } - - fn set_name(&self, name: Option<&'static str>) { - unsafe { - let header_ptr = self.ptr.as_ptr() as *mut TaskHeader; - (*header_ptr).name = name; - } - } -} - #[cfg(feature = "trace")] extern "Rust" { /// This callback is called when the executor begins polling. This will always @@ -383,9 +357,8 @@ where impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { fn task_list() { with_all_active_tasks(|task| { - let name = task.name().unwrap_or("unnamed task\0"); let info = rtos_trace::TaskInfo { - name, + name: task.metadata().name().unwrap_or("unnamed task\0"), priority: 0, stack_base: 0, stack_size: 0, diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 7550e8ea4..cd2113a28 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -5,8 +5,7 @@ use core::sync::atomic::Ordering; use core::task::Poll; use super::raw; -#[cfg(feature = "trace")] -use crate::raw::trace::TaskRefTrace; +use crate::Metadata; /// Token to spawn a newly-created task in an executor. /// @@ -36,6 +35,14 @@ impl SpawnToken { } } + /// Return a SpawnToken that represents a failed spawn. + pub fn new_failed() -> Self { + Self { + raw_task: None, + phantom: PhantomData, + } + } + /// Returns the task ID if available, otherwise 0 /// This can be used in combination with rtos-trace to match task names with IDs pub fn id(&self) -> u32 { @@ -45,12 +52,10 @@ impl SpawnToken { } } - /// Return a SpawnToken that represents a failed spawn. - pub fn new_failed() -> Self { - Self { - raw_task: None, - phantom: PhantomData, - } + /// Get the metadata for this task. You can use this to set metadata fields + /// prior to spawning it. + pub fn metadata(&self) -> &Metadata { + self.raw_task.unwrap().metadata() } } @@ -198,51 +203,6 @@ impl Spawner { } } -/// Extension trait adding tracing capabilities to the Spawner -/// -/// This trait provides an additional method to spawn tasks with an associated name, -/// which can be useful for debugging and tracing purposes. -pub trait SpawnerTraceExt { - /// Spawns a new task with a specified name. - /// - /// # Arguments - /// * `name` - Static string name to associate with the task - /// * `token` - Token representing the task to spawn - /// - /// # Returns - /// Result indicating whether the spawn was successful - fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError>; -} - -/// Implementation of the SpawnerTraceExt trait for Spawner when trace is enabled -#[cfg(feature = "trace")] -impl SpawnerTraceExt for Spawner { - fn spawn_named(&self, name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - let task = token.raw_task; - core::mem::forget(token); - - match task { - Some(task) => { - // Set the name when trace is enabled - task.set_name(Some(name)); - - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } -} - -/// Implementation of the SpawnerTraceExt trait for Spawner when trace is disabled -#[cfg(not(feature = "trace"))] -impl SpawnerTraceExt for Spawner { - fn spawn_named(&self, _name: &'static str, token: SpawnToken) -> Result<(), SpawnError> { - // When trace is disabled, just forward to regular spawn and ignore the name - self.spawn(token) - } -} - /// Handle to spawn tasks into an executor from any thread. /// /// This Spawner can be used from any thread (it is Send), but it can -- cgit From 34ff67cdbf25e278ff99bd4a05b6b8c6a30fa5d1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 9 Jul 2025 01:18:47 +0200 Subject: executor: do not deref a mut ptr to the entire taskheader. --- embassy-executor/src/raw/trace.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index ab0c1b8b6..e52960dc7 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -128,7 +128,7 @@ impl TaskTracker { /// # Arguments /// * `task` - The task reference to add to the tracker pub fn add(&self, task: TaskRef) { - let task_ptr = task.as_ptr() as *mut TaskHeader; + let task_ptr = task.as_ptr(); loop { let current_head = self.head.load(Ordering::Acquire); @@ -138,7 +138,7 @@ impl TaskTracker { if self .head - .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed) + .compare_exchange(current_head, task_ptr.cast_mut(), Ordering::Release, Ordering::Relaxed) .is_ok() { break; -- cgit From 8aec341f28a00012e1771d5c35d2647e11830755 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 9 Jul 2025 01:49:31 +0200 Subject: executor: return error when creating the spawntoken, not when spawning. --- embassy-executor/src/raw/mod.rs | 18 ++++++------ embassy-executor/src/spawner.rs | 65 ++++++----------------------------------- 2 files changed, 18 insertions(+), 65 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index a7e65360d..bdaa32951 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -41,7 +41,7 @@ use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; -use crate::Metadata; +use crate::{Metadata, SpawnError}; #[no_mangle] extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem { @@ -220,11 +220,11 @@ impl TaskStorage { /// /// Once the task has finished running, you may spawn it again. It is allowed to spawn it /// on a different executor. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result, SpawnError> { let task = AvailableTask::claim(self); match task { - Some(task) => task.initialize(future), - None => SpawnToken::new_failed(), + Some(task) => Ok(task.initialize(future)), + None => Err(SpawnError::Busy), } } @@ -353,10 +353,10 @@ impl TaskPool { } } - fn spawn_impl(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + fn spawn_impl(&'static self, future: impl FnOnce() -> F) -> Result, SpawnError> { match self.pool.iter().find_map(AvailableTask::claim) { - Some(task) => task.initialize_impl::(future), - None => SpawnToken::new_failed(), + Some(task) => Ok(task.initialize_impl::(future)), + None => Err(SpawnError::Busy), } } @@ -367,7 +367,7 @@ impl TaskPool { /// This will loop over the pool and spawn the task in the first storage that /// is currently free. If none is free, a "poisoned" SpawnToken is returned, /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. - pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken { + pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result, SpawnError> { self.spawn_impl::(future) } @@ -380,7 +380,7 @@ impl TaskPool { /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` /// is an `async fn`, NOT a hand-written `Future`. #[doc(hidden)] - pub unsafe fn _spawn_async_fn(&'static self, future: FutFn) -> SpawnToken + pub unsafe fn _spawn_async_fn(&'static self, future: FutFn) -> Result, SpawnError> where FutFn: FnOnce() -> F, { diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index cd2113a28..83d896b76 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -23,39 +23,28 @@ use crate::Metadata; /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] pub struct SpawnToken { - pub(crate) raw_task: Option, + pub(crate) raw_task: raw::TaskRef, phantom: PhantomData<*mut S>, } impl SpawnToken { pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self { Self { - raw_task: Some(raw_task), + raw_task, phantom: PhantomData, } } - /// Return a SpawnToken that represents a failed spawn. - pub fn new_failed() -> Self { - Self { - raw_task: None, - phantom: PhantomData, - } - } - - /// Returns the task ID if available, otherwise 0 + /// Returns the task ID. /// This can be used in combination with rtos-trace to match task names with IDs pub fn id(&self) -> u32 { - match self.raw_task { - None => 0, - Some(t) => t.id(), - } + self.raw_task.id() } /// Get the metadata for this task. You can use this to set metadata fields /// prior to spawning it. pub fn metadata(&self) -> &Metadata { - self.raw_task.unwrap().metadata() + self.raw_task.metadata() } } @@ -164,30 +153,10 @@ impl Spawner { /// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + pub fn spawn(&self, token: SpawnToken) { let task = token.raw_task; mem::forget(token); - - match task { - Some(task) => { - unsafe { self.executor.spawn(task) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } - - // Used by the `embassy_executor_macros::main!` macro to throw an error when spawn - // fails. This is here to allow conditional use of `defmt::unwrap!` - // without introducing a `defmt` feature in the `embassy_executor_macros` package, - // which would require use of `-Z namespaced-features`. - /// Spawn a task into an executor, panicking on failure. - /// - /// # Panics - /// - /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { - unwrap!(self.spawn(token)); + unsafe { self.executor.spawn(task) } } /// Convert this Spawner to a SendSpawner. This allows you to send the @@ -245,25 +214,9 @@ impl SendSpawner { /// Spawn a task into an executor. /// /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). - pub fn spawn(&self, token: SpawnToken) -> Result<(), SpawnError> { + pub fn spawn(&self, token: SpawnToken) { let header = token.raw_task; mem::forget(token); - - match header { - Some(header) => { - unsafe { self.executor.spawn(header) }; - Ok(()) - } - None => Err(SpawnError::Busy), - } - } - - /// Spawn a task into an executor, panicking on failure. - /// - /// # Panics - /// - /// Panics if the spawning fails. - pub fn must_spawn(&self, token: SpawnToken) { - unwrap!(self.spawn(token)); + unsafe { self.executor.spawn(header) } } } -- cgit From 916dce55ea9f8341422eb6d55c17d0a0fcfedce0 Mon Sep 17 00:00:00 2001 From: diondokter Date: Fri, 29 Aug 2025 13:30:11 +0200 Subject: Fix test & rtos-trace --- embassy-executor/src/raw/trace.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index e52960dc7..b3086948c 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs @@ -246,7 +246,7 @@ pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { #[cfg(feature = "rtos-trace")] { rtos_trace::trace::task_new(task.as_ptr() as u32); - let name = task.name().unwrap_or("unnamed task\0"); + let name = task.metadata().name().unwrap_or("unnamed task\0"); let info = rtos_trace::TaskInfo { name, priority: 0, -- cgit From 3fb6a9191c3d132bca5984a1ad79ad211e533912 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 31 Aug 2025 10:20:03 +0200 Subject: Prefer pointer-sized atomic operations --- embassy-executor/src/raw/mod.rs | 10 ++++++++-- embassy-executor/src/raw/state_atomics.rs | 21 ++++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index bdaa32951..4280c5750 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -12,8 +12,14 @@ mod run_queue; #[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")] -#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] -#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] +#[cfg_attr( + all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")), + path = "state_atomics.rs" +)] +#[cfg_attr( + not(any(target_has_atomic = "8", target_has_atomic = "32")), + path = "state_critical_section.rs" +)] mod state; #[cfg(feature = "_any_trace")] diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index e813548ae..6675875be 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -1,4 +1,15 @@ -use core::sync::atomic::{AtomicU8, Ordering}; +// Prefer pointer-width atomic operations, as narrower ones may be slower. +#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))] +type AtomicState = core::sync::atomic::AtomicU32; +#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))] +type AtomicState = core::sync::atomic::AtomicU8; + +#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))] +type StateBits = u32; +#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))] +type StateBits = u8; + +use core::sync::atomic::Ordering; #[derive(Clone, Copy)] pub(crate) struct Token(()); @@ -11,18 +22,18 @@ pub(crate) fn locked(f: impl FnOnce(Token) -> R) -> R { } /// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u8 = 1 << 0; +pub(crate) const STATE_SPAWNED: StateBits = 1 << 0; /// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1; pub(crate) struct State { - state: AtomicU8, + state: AtomicState, } impl State { pub const fn new() -> State { Self { - state: AtomicU8::new(0), + state: AtomicState::new(0), } } -- cgit From fb531da007bad7129bfd247a901286b27de0c509 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sun, 31 Aug 2025 10:39:04 +0200 Subject: Prefer word-sized state in CS impl --- embassy-executor/src/raw/state_critical_section.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index ec08f2f58..b69a6ac66 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -3,13 +3,18 @@ use core::cell::Cell; pub(crate) use critical_section::{with as locked, CriticalSection as Token}; use critical_section::{CriticalSection, Mutex}; +#[cfg(target_arch = "avr")] +type StateBits = u8; +#[cfg(not(target_arch = "avr"))] +type StateBits = usize; + /// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u8 = 1 << 0; +pub(crate) const STATE_SPAWNED: StateBits = 1 << 0; /// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1; pub(crate) struct State { - state: Mutex>, + state: Mutex>, } impl State { @@ -19,11 +24,11 @@ impl State { } } - fn update(&self, f: impl FnOnce(&mut u8) -> R) -> R { + fn update(&self, f: impl FnOnce(&mut StateBits) -> R) -> R { critical_section::with(|cs| self.update_with_cs(cs, f)) } - fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u8) -> R) -> R { + fn update_with_cs(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut StateBits) -> R) -> R { let s = self.state.borrow(cs); let mut val = s.get(); let r = f(&mut val); -- cgit From 535c80e61f17e4ee4605e00623aabeda2181352d Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 20 Mar 2025 09:47:56 +0100 Subject: Add initial DRS scheduler placeholder * Start hacking in cordyceps This adds a third kind of runqueue, for now it should work the same as the current "atomics" runqueue, but uses a cordyceps TransferStack instead of the existing home-rolled linked list. * Clean up, use new cordyceps feature * A bit more cleanup * Update docs to be more clear --- embassy-executor/src/raw/mod.rs | 64 +++++++++++++++++++++-- embassy-executor/src/raw/run_queue_drs_atomics.rs | 47 +++++++++++++++++ 2 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 embassy-executor/src/raw/run_queue_drs_atomics.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 4280c5750..894a996ec 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,7 +7,14 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. -#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] +#[cfg_attr( + all(not(feature = "drs-scheduler"), target_has_atomic = "ptr"), + path = "run_queue_atomics.rs", +)] +#[cfg_attr( + all(feature = "drs-scheduler", target_has_atomic = "ptr"), + path = "run_queue_drs_atomics.rs", +)] #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; @@ -33,6 +40,8 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; +#[cfg(feature = "drs-scheduler")] +use core::ptr::addr_of_mut; #[cfg(not(feature = "arch-avr"))] use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering; @@ -42,7 +51,9 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -use self::run_queue::{RunQueue, RunQueueItem}; +use self::run_queue::RunQueue; +#[cfg(not(feature = "drs-scheduler"))] +use self::run_queue::RunQueueItem; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; @@ -54,6 +65,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static unsafe { task_from_waker(waker).timer_queue_item() } } +#[cfg(feature = "drs-scheduler")] +use cordyceps::{stack, Linked}; + /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: @@ -93,9 +107,29 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` +#[cfg_attr(feature = "drs-scheduler", repr(C))] pub(crate) struct TaskHeader { - pub(crate) state: State, + // TODO(AJM): Make a decision whether we want to support the spicier "pointer recast"/"type punning" + // method of implementing the `cordyceps::Linked` trait or not. + // + // Currently, I do the safer version with `addr_of_mut!`, which doesn't REQUIRE that the first + // element is the `links` field, at the potential cost of a little extra pointer math. + // + // The optimizer *might* (total guess) notice that we are always doing an offset of zero in the + // call to `addr_of_mut` in the `impl Linked for TaskHeader` below, and get the best of both worlds, + // but right now this is maybe a little over cautious. + // + // See https://docs.rs/cordyceps/latest/cordyceps/trait.Linked.html#implementing-linkedlinks for + // more context on the choices here. + #[cfg(feature = "drs-scheduler")] + pub(crate) links: stack::Links, + + // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though + // right now cordyceps doesn't work on non-atomic systems + #[cfg(not(feature = "drs-scheduler"))] pub(crate) run_queue_item: RunQueueItem, + + pub(crate) state: State, pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, @@ -108,6 +142,25 @@ pub(crate) struct TaskHeader { all_tasks_next: AtomicPtr, } +#[cfg(feature = "drs-scheduler")] +unsafe impl Linked> for TaskHeader { + type Handle = TaskRef; + + fn into_ptr(r: Self::Handle) -> NonNull { + r.ptr.cast() + } + + unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { + let ptr: NonNull = ptr; + TaskRef { ptr } + } + + unsafe fn links(ptr: NonNull) -> NonNull> { + let ptr: *mut TaskHeader = ptr.as_ptr(); + NonNull::new_unchecked(addr_of_mut!((*ptr).links)) + } +} + /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Debug, Clone, Copy, PartialEq)] pub struct TaskRef { @@ -198,8 +251,11 @@ impl TaskStorage { pub const fn new() -> Self { Self { raw: TaskHeader { - state: State::new(), + #[cfg(not(feature = "drs-scheduler"))] run_queue_item: RunQueueItem::new(), + #[cfg(feature = "drs-scheduler")] + links: stack::Links::new(), + state: State::new(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs new file mode 100644 index 000000000..53ada1b14 --- /dev/null +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs @@ -0,0 +1,47 @@ +use super::{TaskHeader, TaskRef}; +use cordyceps::TransferStack; + + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + stack: TransferStack, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + stack: TransferStack::new(), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { + self.stack.push_was_empty(task) + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + let taken = self.stack.take_all(); + for taskref in taken { + taskref.header().state.run_dequeue(); + on_task(taskref); + } + } +} -- cgit From 1f50e4d496458dbc7fccd9d028217ebfa7735471 Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 20 Mar 2025 14:32:14 +0100 Subject: Implement Deadline Ranked Scheduling This implements a minimal version of Deadline Rank Scheduling, as well as ways to access and set Deadlines. This still needs some UX improvements, but is likely Enough for testing. --- embassy-executor/src/raw/mod.rs | 12 ++ embassy-executor/src/raw/run_queue_drs_atomics.rs | 141 +++++++++++++++++++++- 2 files changed, 149 insertions(+), 4 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 894a996ec..9b8a4ea8a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -68,6 +68,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static #[cfg(feature = "drs-scheduler")] use cordyceps::{stack, Linked}; +#[cfg(feature = "drs-scheduler")] +pub use run_queue::Deadline; + /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: @@ -124,6 +127,9 @@ pub(crate) struct TaskHeader { #[cfg(feature = "drs-scheduler")] pub(crate) links: stack::Links, + #[cfg(feature = "drs-scheduler")] + pub(crate) deadline: SyncUnsafeCell, + // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though // right now cordyceps doesn't work on non-atomic systems #[cfg(not(feature = "drs-scheduler"))] @@ -255,6 +261,8 @@ impl TaskStorage { run_queue_item: RunQueueItem::new(), #[cfg(feature = "drs-scheduler")] links: stack::Links::new(), + #[cfg(feature = "drs-scheduler")] + deadline: SyncUnsafeCell::new(0u64), state: State::new(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` @@ -352,6 +360,10 @@ impl AvailableTask { self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); self.task.future.write_in_place(future); + // TODO(AJM): Some other way of setting this? Just a placeholder + #[cfg(feature = "drs-scheduler")] + self.task.raw.deadline.set(u64::MAX); + let task = TaskRef::new(self.task); SpawnToken::new(task) diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs index 53ada1b14..69b7b3bf0 100644 --- a/embassy-executor/src/raw/run_queue_drs_atomics.rs +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs @@ -1,6 +1,7 @@ use super::{TaskHeader, TaskRef}; -use cordyceps::TransferStack; - +use cordyceps::{SortedList, TransferStack}; +use core::future::{Future, poll_fn}; +use core::task::Poll; /// Atomic task queue using a very, very simple lock-free linked-list queue: /// @@ -38,10 +39,142 @@ impl RunQueue { /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - let taken = self.stack.take_all(); - for taskref in taken { + let mut sorted = SortedList::::new(|lhs, rhs| unsafe { + // TODO: Do we need any kind of access control here? Not if we say that + // tasks can only set their own priority, which they can't do if we're in + // the scheduler + lhs.deadline.get().cmp(&rhs.deadline.get()) + }); + + loop { + // For each loop, grab any newly pended items + let taken = self.stack.take_all(); + + // Sort these into the list - this is potentially expensive! We do an + // insertion sort of new items, which iterates the linked list. + // + // Something on the order of `O(n * m)`, where `n` is the number + // of new tasks, and `m` is the number of already pending tasks. + sorted.extend(taken); + + // Pop the task with the SOONEST deadline. If there are no tasks + // pending, then we are done. + let Some(taskref) = sorted.pop_front() else { + return; + }; + + // We got one task, mark it as dequeued, and process the task. taskref.header().state.run_dequeue(); on_task(taskref); } } } + +/// A type for interacting with the deadline of the current task +pub struct Deadline { + /// Deadline value in ticks, same time base and ticks as `embassy-time` + pub instant_ticks: u64, +} + +impl Deadline { + /// Set the current task's deadline at exactly `instant_ticks` + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline. + /// + /// Analogous to `Timer::at`. + /// + /// TODO: Should we check/panic if the deadline is in the past? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(instant_ticks); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `duration_ticks` in the future from when + /// this future is polled. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to `Timer::after` + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + let now = embassy_time_driver::now(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = now.saturating_add(duration_ticks); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `increment_ticks` from the previous deadline. + /// + /// Note that by default (unless otherwise set), tasks start life with the deadline + /// u64::MAX, which means this method will have no effect. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to one increment of `Ticker::every().next()`. + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + // Get the last value + let last = task.header().deadline.get(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = last.saturating_add(increment_ticks); + + // Store the new value + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Get the current task's deadline as a tick value. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + pub fn get_current_task_deadline() -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + let deadline = unsafe { + task.header().deadline.get() + }; + Poll::Ready(Self { instant_ticks: deadline }) + }) + } +} -- cgit From 8c70aafd4be63ff7af895f116444fb81438ae6e0 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 18:35:41 +0200 Subject: Make some things more consistent --- embassy-executor/src/raw/mod.rs | 53 ++--------------------- embassy-executor/src/raw/run_queue_drs_atomics.rs | 31 +++++++++++-- 2 files changed, 31 insertions(+), 53 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 9b8a4ea8a..2e5941ef7 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -40,7 +40,6 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; -#[cfg(feature = "drs-scheduler")] use core::ptr::addr_of_mut; #[cfg(not(feature = "arch-avr"))] use core::sync::atomic::AtomicPtr; @@ -51,9 +50,7 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -use self::run_queue::RunQueue; -#[cfg(not(feature = "drs-scheduler"))] -use self::run_queue::RunQueueItem; +use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; @@ -65,9 +62,6 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static unsafe { task_from_waker(waker).timer_queue_item() } } -#[cfg(feature = "drs-scheduler")] -use cordyceps::{stack, Linked}; - #[cfg(feature = "drs-scheduler")] pub use run_queue::Deadline; @@ -110,31 +104,14 @@ pub use run_queue::Deadline; /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` -#[cfg_attr(feature = "drs-scheduler", repr(C))] pub(crate) struct TaskHeader { - // TODO(AJM): Make a decision whether we want to support the spicier "pointer recast"/"type punning" - // method of implementing the `cordyceps::Linked` trait or not. - // - // Currently, I do the safer version with `addr_of_mut!`, which doesn't REQUIRE that the first - // element is the `links` field, at the potential cost of a little extra pointer math. - // - // The optimizer *might* (total guess) notice that we are always doing an offset of zero in the - // call to `addr_of_mut` in the `impl Linked for TaskHeader` below, and get the best of both worlds, - // but right now this is maybe a little over cautious. - // - // See https://docs.rs/cordyceps/latest/cordyceps/trait.Linked.html#implementing-linkedlinks for - // more context on the choices here. - #[cfg(feature = "drs-scheduler")] - pub(crate) links: stack::Links, + pub(crate) run_queue_item: RunQueueItem, #[cfg(feature = "drs-scheduler")] + /// Deadline Rank Scheduler Deadline. This field should not be accessed outside the context of + /// the task itself as it being polled by the executor. pub(crate) deadline: SyncUnsafeCell, - // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though - // right now cordyceps doesn't work on non-atomic systems - #[cfg(not(feature = "drs-scheduler"))] - pub(crate) run_queue_item: RunQueueItem, - pub(crate) state: State, pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, @@ -148,25 +125,6 @@ pub(crate) struct TaskHeader { all_tasks_next: AtomicPtr, } -#[cfg(feature = "drs-scheduler")] -unsafe impl Linked> for TaskHeader { - type Handle = TaskRef; - - fn into_ptr(r: Self::Handle) -> NonNull { - r.ptr.cast() - } - - unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { - let ptr: NonNull = ptr; - TaskRef { ptr } - } - - unsafe fn links(ptr: NonNull) -> NonNull> { - let ptr: *mut TaskHeader = ptr.as_ptr(); - NonNull::new_unchecked(addr_of_mut!((*ptr).links)) - } -} - /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Debug, Clone, Copy, PartialEq)] pub struct TaskRef { @@ -257,11 +215,8 @@ impl TaskStorage { pub const fn new() -> Self { Self { raw: TaskHeader { - #[cfg(not(feature = "drs-scheduler"))] run_queue_item: RunQueueItem::new(), #[cfg(feature = "drs-scheduler")] - links: stack::Links::new(), - #[cfg(feature = "drs-scheduler")] deadline: SyncUnsafeCell::new(0u64), state: State::new(), executor: AtomicPtr::new(core::ptr::null_mut()), diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs index 69b7b3bf0..047265954 100644 --- a/embassy-executor/src/raw/run_queue_drs_atomics.rs +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs @@ -2,6 +2,29 @@ use super::{TaskHeader, TaskRef}; use cordyceps::{SortedList, TransferStack}; use core::future::{Future, poll_fn}; use core::task::Poll; +use core::ptr::{addr_of_mut, NonNull}; +use cordyceps::sorted_list::Links; +use cordyceps::Linked; + +pub(crate) type RunQueueItem = Links; + +unsafe impl Linked> for super::TaskHeader { + type Handle = TaskRef; + + fn into_ptr(r: Self::Handle) -> NonNull { + r.ptr.cast() + } + + unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { + let ptr: NonNull = ptr; + TaskRef { ptr } + } + + unsafe fn links(ptr: NonNull) -> NonNull> { + let ptr: *mut TaskHeader = ptr.as_ptr(); + NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) + } +} /// Atomic task queue using a very, very simple lock-free linked-list queue: /// @@ -39,10 +62,10 @@ impl RunQueue { /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - let mut sorted = SortedList::::new(|lhs, rhs| unsafe { - // TODO: Do we need any kind of access control here? Not if we say that - // tasks can only set their own priority, which they can't do if we're in - // the scheduler + // SAFETY: `deadline` can only be set through the `Deadline` interface, which + // only allows access to this value while the given task is being polled. + // This acts as mutual exclusion for access. + let mut sorted = SortedList::::new_custom(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); -- cgit From ba0426f767bb602750bed4fae87a156b661c0e92 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 18:50:12 +0200 Subject: Combine DRS and non-DRS atomic scheduler, using cordyceps --- embassy-executor/src/raw/deadline.rs | 111 ++++++++++++ embassy-executor/src/raw/mod.rs | 18 +- embassy-executor/src/raw/run_queue_atomics.rs | 110 +++++++----- embassy-executor/src/raw/run_queue_drs_atomics.rs | 203 ---------------------- 4 files changed, 186 insertions(+), 256 deletions(-) create mode 100644 embassy-executor/src/raw/deadline.rs delete mode 100644 embassy-executor/src/raw/run_queue_drs_atomics.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs new file mode 100644 index 000000000..3f60936cc --- /dev/null +++ b/embassy-executor/src/raw/deadline.rs @@ -0,0 +1,111 @@ +use core::future::{poll_fn, Future}; +use core::task::Poll; + +/// A type for interacting with the deadline of the current task +pub struct Deadline { + /// Deadline value in ticks, same time base and ticks as `embassy-time` + pub instant_ticks: u64, +} + +impl Deadline { + /// Set the current task's deadline at exactly `instant_ticks` + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline. + /// + /// Analogous to `Timer::at`. + /// + /// TODO: Should we check/panic if the deadline is in the past? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(instant_ticks); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `duration_ticks` in the future from when + /// this future is polled. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to `Timer::after` + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + let now = embassy_time_driver::now(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = now.saturating_add(duration_ticks); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `increment_ticks` from the previous deadline. + /// + /// Note that by default (unless otherwise set), tasks start life with the deadline + /// u64::MAX, which means this method will have no effect. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to one increment of `Ticker::every().next()`. + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + // Get the last value + let last = task.header().deadline.get(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = last.saturating_add(increment_ticks); + + // Store the new value + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Get the current task's deadline as a tick value. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + pub fn get_current_task_deadline() -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + let deadline = unsafe { task.header().deadline.get() }; + Poll::Ready(Self { + instant_ticks: deadline, + }) + }) + } +} diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 2e5941ef7..0dd247d30 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,14 +7,7 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. -#[cfg_attr( - all(not(feature = "drs-scheduler"), target_has_atomic = "ptr"), - path = "run_queue_atomics.rs", -)] -#[cfg_attr( - all(feature = "drs-scheduler", target_has_atomic = "ptr"), - path = "run_queue_drs_atomics.rs", -)] +#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; @@ -35,6 +28,9 @@ pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; +#[cfg(feature = "drs-scheduler")] +mod deadline; + use core::future::Future; use core::marker::PhantomData; use core::mem; @@ -50,6 +46,9 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; +#[cfg(feature = "drs-scheduler")] +pub use deadline::Deadline; + use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; @@ -62,9 +61,6 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static unsafe { task_from_waker(waker).timer_queue_item() } } -#[cfg(feature = "drs-scheduler")] -pub use run_queue::Deadline; - /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index ce511d79a..bc5d38250 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -1,19 +1,36 @@ -use core::ptr; -use core::ptr::NonNull; -use core::sync::atomic::{AtomicPtr, Ordering}; +use core::ptr::{addr_of_mut, NonNull}; + +use cordyceps::sorted_list::Links; +use cordyceps::{Linked, SortedList, TransferStack}; use super::{TaskHeader, TaskRef}; -use crate::raw::util::SyncUnsafeCell; -pub(crate) struct RunQueueItem { - next: SyncUnsafeCell>, -} +/// Use `cordyceps::sorted_list::Links` as the singly linked list +/// for RunQueueItems. +pub(crate) type RunQueueItem = Links; -impl RunQueueItem { - pub const fn new() -> Self { - Self { - next: SyncUnsafeCell::new(None), - } +/// Implements the `Linked` trait, allowing for singly linked list usage +/// of any of cordyceps' `TransferStack` (used for the atomic runqueue), +/// `SortedList` (used with the DRS scheduler), or `Stack`, which is +/// popped atomically from the `TransferStack`. +unsafe impl Linked> for TaskHeader { + type Handle = TaskRef; + + // Convert a TaskRef into a TaskHeader ptr + fn into_ptr(r: TaskRef) -> NonNull { + r.ptr + } + + // Convert a TaskHeader into a TaskRef + unsafe fn from_ptr(ptr: NonNull) -> TaskRef { + TaskRef { ptr } + } + + // Given a pointer to a TaskHeader, obtain a pointer to the Links structure, + // which can be used to traverse to other TaskHeader nodes in the linked list + unsafe fn links(ptr: NonNull) -> NonNull> { + let ptr: *mut TaskHeader = ptr.as_ptr(); + NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) } } @@ -29,13 +46,13 @@ impl RunQueueItem { /// current batch is completely processed, so even if a task enqueues itself instantly (for example /// by waking its own waker) can't prevent other tasks from running. pub(crate) struct RunQueue { - head: AtomicPtr, + stack: TransferStack, } impl RunQueue { pub const fn new() -> Self { Self { - head: AtomicPtr::new(ptr::null_mut()), + stack: TransferStack::new(), } } @@ -46,43 +63,52 @@ impl RunQueue { /// `item` must NOT be already enqueued in any queue. #[inline(always)] pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { - let mut was_empty = false; - - self.head - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { - was_empty = prev.is_null(); - unsafe { - // safety: the pointer is either null or valid - let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); - // safety: there are no concurrent accesses to `next` - task.header().run_queue_item.next.set(prev); - } - Some(task.as_ptr() as *mut _) - }) - .ok(); - - was_empty + self.stack.push_was_empty(task) } /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + #[cfg(not(feature = "drs-scheduler"))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // Atomically empty the queue. - let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + let taken = self.stack.take_all(); + for taskref in taken { + taskref.header().state.run_dequeue(); + on_task(taskref); + } + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + #[cfg(feature = "drs-scheduler")] + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + // SAFETY: `deadline` can only be set through the `Deadline` interface, which + // only allows access to this value while the given task is being polled. + // This acts as mutual exclusion for access. + let mut sorted = + SortedList::::new_custom(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); + + loop { + // For each loop, grab any newly pended items + let taken = self.stack.take_all(); - // safety: the pointer is either null or valid - let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; + // Sort these into the list - this is potentially expensive! We do an + // insertion sort of new items, which iterates the linked list. + // + // Something on the order of `O(n * m)`, where `n` is the number + // of new tasks, and `m` is the number of already pending tasks. + sorted.extend(taken); - // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = next { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - // safety: there are no concurrent accesses to `next` - next = unsafe { task.header().run_queue_item.next.get() }; + // Pop the task with the SOONEST deadline. If there are no tasks + // pending, then we are done. + let Some(taskref) = sorted.pop_front() else { + return; + }; - task.header().state.run_dequeue(); - on_task(task); + // We got one task, mark it as dequeued, and process the task. + taskref.header().state.run_dequeue(); + on_task(taskref); } } } diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs deleted file mode 100644 index 047265954..000000000 --- a/embassy-executor/src/raw/run_queue_drs_atomics.rs +++ /dev/null @@ -1,203 +0,0 @@ -use super::{TaskHeader, TaskRef}; -use cordyceps::{SortedList, TransferStack}; -use core::future::{Future, poll_fn}; -use core::task::Poll; -use core::ptr::{addr_of_mut, NonNull}; -use cordyceps::sorted_list::Links; -use cordyceps::Linked; - -pub(crate) type RunQueueItem = Links; - -unsafe impl Linked> for super::TaskHeader { - type Handle = TaskRef; - - fn into_ptr(r: Self::Handle) -> NonNull { - r.ptr.cast() - } - - unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { - let ptr: NonNull = ptr; - TaskRef { ptr } - } - - unsafe fn links(ptr: NonNull) -> NonNull> { - let ptr: *mut TaskHeader = ptr.as_ptr(); - NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't create fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - stack: TransferStack, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - stack: TransferStack::new(), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - /// - /// # Safety - /// - /// `item` must NOT be already enqueued in any queue. - #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { - self.stack.push_was_empty(task) - } - - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // SAFETY: `deadline` can only be set through the `Deadline` interface, which - // only allows access to this value while the given task is being polled. - // This acts as mutual exclusion for access. - let mut sorted = SortedList::::new_custom(|lhs, rhs| unsafe { - lhs.deadline.get().cmp(&rhs.deadline.get()) - }); - - loop { - // For each loop, grab any newly pended items - let taken = self.stack.take_all(); - - // Sort these into the list - this is potentially expensive! We do an - // insertion sort of new items, which iterates the linked list. - // - // Something on the order of `O(n * m)`, where `n` is the number - // of new tasks, and `m` is the number of already pending tasks. - sorted.extend(taken); - - // Pop the task with the SOONEST deadline. If there are no tasks - // pending, then we are done. - let Some(taskref) = sorted.pop_front() else { - return; - }; - - // We got one task, mark it as dequeued, and process the task. - taskref.header().state.run_dequeue(); - on_task(taskref); - } - } -} - -/// A type for interacting with the deadline of the current task -pub struct Deadline { - /// Deadline value in ticks, same time base and ticks as `embassy-time` - pub instant_ticks: u64, -} - -impl Deadline { - /// Set the current task's deadline at exactly `instant_ticks` - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline. - /// - /// Analogous to `Timer::at`. - /// - /// TODO: Should we check/panic if the deadline is in the past? - #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - task.header().deadline.set(instant_ticks); - } - Poll::Ready(()) - }) - } - - /// Set the current task's deadline `duration_ticks` in the future from when - /// this future is polled. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - /// - /// Analogous to `Timer::after` - /// - /// TODO: Do we want to return what the deadline is? - #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - let now = embassy_time_driver::now(); - - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = now.saturating_add(duration_ticks); - - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - task.header().deadline.set(deadline); - } - Poll::Ready(()) - }) - } - - /// Set the current task's deadline `increment_ticks` from the previous deadline. - /// - /// Note that by default (unless otherwise set), tasks start life with the deadline - /// u64::MAX, which means this method will have no effect. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - /// - /// Analogous to one increment of `Ticker::every().next()`. - /// - /// TODO: Do we want to return what the deadline is? - #[must_use = "Setting deadline must be polled to be effective"] - pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - // Get the last value - let last = task.header().deadline.get(); - - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = last.saturating_add(increment_ticks); - - // Store the new value - task.header().deadline.set(deadline); - } - Poll::Ready(()) - }) - } - - /// Get the current task's deadline as a tick value. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - pub fn get_current_task_deadline() -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - let deadline = unsafe { - task.header().deadline.get() - }; - Poll::Ready(Self { instant_ticks: deadline }) - }) - } -} -- cgit From ed2e51bfa4f92b422233343a0c5b1af98fb36537 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 19:32:12 +0200 Subject: Dependency enablement trickery --- embassy-executor/src/raw/deadline.rs | 2 ++ embassy-executor/src/raw/mod.rs | 11 +++++++---- embassy-executor/src/raw/run_queue_atomics.rs | 19 ++++++++++++++++--- 3 files changed, 25 insertions(+), 7 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index 3f60936cc..c8cc94c52 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -2,6 +2,8 @@ use core::future::{poll_fn, Future}; use core::task::Poll; /// A type for interacting with the deadline of the current task +/// +/// Requires the `drs-scheduler` feature pub struct Deadline { /// Deadline value in ticks, same time base and ticks as `embassy-time` pub instant_ticks: u64, diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 0dd247d30..f4fbe1bfc 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -101,14 +101,14 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` pub(crate) struct TaskHeader { + pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, - #[cfg(feature = "drs-scheduler")] /// Deadline Rank Scheduler Deadline. This field should not be accessed outside the context of /// the task itself as it being polled by the executor. + #[cfg(feature = "drs-scheduler")] pub(crate) deadline: SyncUnsafeCell, - pub(crate) state: State, pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, @@ -211,10 +211,12 @@ impl TaskStorage { pub const fn new() -> Self { Self { raw: TaskHeader { + state: State::new(), run_queue_item: RunQueueItem::new(), + // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This + // will be lazily initalized in `initialize_impl` #[cfg(feature = "drs-scheduler")] deadline: SyncUnsafeCell::new(0u64), - state: State::new(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), @@ -311,7 +313,8 @@ impl AvailableTask { self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); self.task.future.write_in_place(future); - // TODO(AJM): Some other way of setting this? Just a placeholder + // By default, deadlines are set to the maximum value, so that any task WITH + // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline #[cfg(feature = "drs-scheduler")] self.task.raw.deadline.set(u64::MAX); diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index bc5d38250..3715fc658 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -66,6 +66,8 @@ impl RunQueue { self.stack.push_was_empty(task) } + /// # Standard atomic runqueue + /// /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. @@ -78,9 +80,20 @@ impl RunQueue { } } - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + /// # Deadline Ranked Sorted Scheduler + /// + /// This algorithm will loop until all enqueued tasks are processed. + /// + /// Before polling a task, all currently enqueued tasks will be popped from the + /// runqueue, and will be added to the working `sorted` list, a linked-list that + /// sorts tasks by their deadline, with nearest deadline items in the front, and + /// furthest deadline items in the back. + /// + /// After popping and sorting all pending tasks, the SOONEST task will be popped + /// from the front of the queue, and polled by calling `on_task` on it. + /// + /// This process will repeat until the local `sorted` queue AND the global + /// runqueue are both empty, at which point this function will return. #[cfg(feature = "drs-scheduler")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // SAFETY: `deadline` can only be set through the `Deadline` interface, which -- cgit From 2a068c528383b3ddc1213b9a5da5445498962bd2 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 19:41:19 +0200 Subject: Conditional import --- embassy-executor/src/raw/run_queue_atomics.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index 3715fc658..a63f0d116 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -1,7 +1,9 @@ use core::ptr::{addr_of_mut, NonNull}; use cordyceps::sorted_list::Links; -use cordyceps::{Linked, SortedList, TransferStack}; +#[cfg(feature = "drs-scheduler")] +use cordyceps::SortedList; +use cordyceps::{Linked, TransferStack}; use super::{TaskHeader, TaskRef}; -- cgit From 08a57b1cb0c3c4a40bd03e6e6ea1c97777300cf4 Mon Sep 17 00:00:00 2001 From: James Munns Date: Wed, 2 Apr 2025 23:30:40 +0200 Subject: Update with changes from the PR --- embassy-executor/src/raw/run_queue_atomics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index a63f0d116..08765e06b 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -102,7 +102,7 @@ impl RunQueue { // only allows access to this value while the given task is being polled. // This acts as mutual exclusion for access. let mut sorted = - SortedList::::new_custom(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); + SortedList::::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); loop { // For each loop, grab any newly pended items -- cgit From b65a3a301a29c737f336ca344f671d4e9793fda8 Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 3 Apr 2025 10:27:26 +0200 Subject: Clean up some TODOs --- embassy-executor/src/raw/deadline.rs | 62 ++++++++++++++++++++++++++++++------ embassy-executor/src/raw/mod.rs | 2 +- 2 files changed, 53 insertions(+), 11 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index c8cc94c52..0b88ee2d6 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -10,6 +10,16 @@ pub struct Deadline { } impl Deadline { + /// Sentinel value representing an "unset" deadline, which has lower priority + /// than any other set deadline value + pub const UNSET_DEADLINE_TICKS: u64 = u64::MAX; + + /// Does the given Deadline represent an "unset" deadline? + #[inline] + pub fn is_unset(&self) -> bool { + self.instant_ticks == Self::UNSET_DEADLINE_TICKS + } + /// Set the current task's deadline at exactly `instant_ticks` /// /// This method is a future in order to access the currently executing task's @@ -17,7 +27,7 @@ impl Deadline { /// /// Analogous to `Timer::at`. /// - /// TODO: Should we check/panic if the deadline is in the past? + /// This method does NOT check whether the deadline has already passed. #[must_use = "Setting deadline must be polled to be effective"] pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { poll_fn(move |cx| { @@ -32,16 +42,16 @@ impl Deadline { } /// Set the current task's deadline `duration_ticks` in the future from when - /// this future is polled. + /// this future is polled. This deadline is saturated to the max tick value. /// /// This method is a future in order to access the currently executing task's - /// header which contains the deadline + /// header which contains the deadline. /// - /// Analogous to `Timer::after` + /// Analogous to `Timer::after`. /// - /// TODO: Do we want to return what the deadline is? + /// Returns the deadline that was set. #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { + pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); let now = embassy_time_driver::now(); @@ -56,12 +66,16 @@ impl Deadline { unsafe { task.header().deadline.set(deadline); } - Poll::Ready(()) + Poll::Ready(Deadline { + instant_ticks: deadline, + }) }) } /// Set the current task's deadline `increment_ticks` from the previous deadline. /// + /// This deadline is saturated to the max tick value. + /// /// Note that by default (unless otherwise set), tasks start life with the deadline /// u64::MAX, which means this method will have no effect. /// @@ -70,9 +84,9 @@ impl Deadline { /// /// Analogous to one increment of `Ticker::every().next()`. /// - /// TODO: Do we want to return what the deadline is? + /// Returns the deadline that was set. #[must_use = "Setting deadline must be polled to be effective"] - pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { + pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); @@ -89,8 +103,11 @@ impl Deadline { // Store the new value task.header().deadline.set(deadline); + + Poll::Ready(Deadline { + instant_ticks: deadline, + }) } - Poll::Ready(()) }) } @@ -110,4 +127,29 @@ impl Deadline { }) }) } + + /// Clear the current task's deadline, returning the previous value. + /// + /// This sets the deadline to the default value of `u64::MAX`, meaning all + /// tasks with set deadlines will be scheduled BEFORE this task. + pub fn clear_current_task_deadline() -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + let deadline = unsafe { + // get the old value + let d = task.header().deadline.get(); + // Store the default value + task.header().deadline.set(Self::UNSET_DEADLINE_TICKS); + // return the old value + d + }; + + Poll::Ready(Self { + instant_ticks: deadline, + }) + }) + } } diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f4fbe1bfc..a0890a864 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -316,7 +316,7 @@ impl AvailableTask { // By default, deadlines are set to the maximum value, so that any task WITH // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline #[cfg(feature = "drs-scheduler")] - self.task.raw.deadline.set(u64::MAX); + self.task.raw.deadline.set(deadline::Deadline::UNSET_DEADLINE_TICKS); let task = TaskRef::new(self.task); -- cgit From b1b2955b604f558d8bd2fcca07b8fd8da3e236fa Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 8 Apr 2025 10:05:55 +0200 Subject: Switch to released version of `cordyceps`, add error if used w/o atomics --- embassy-executor/src/raw/deadline.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index 0b88ee2d6..da07d1aac 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -1,6 +1,9 @@ use core::future::{poll_fn, Future}; use core::task::Poll; +#[cfg(not(target_has_atomic = "ptr"))] +compile_error!("The `drs-scheduler` feature is currently only supported on targets with atomics."); + /// A type for interacting with the deadline of the current task /// /// Requires the `drs-scheduler` feature -- cgit From 3929142f4c08028ea1982e79fd912e1a44900892 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 8 Apr 2025 10:11:07 +0200 Subject: One more must_use --- embassy-executor/src/raw/deadline.rs | 1 + embassy-executor/src/raw/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index da07d1aac..ae6394822 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -135,6 +135,7 @@ impl Deadline { /// /// This sets the deadline to the default value of `u64::MAX`, meaning all /// tasks with set deadlines will be scheduled BEFORE this task. + #[must_use = "Clearing deadline must be polled to be effective"] pub fn clear_current_task_deadline() -> impl Future { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index a0890a864..21dc67b7e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -36,7 +36,6 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; -use core::ptr::addr_of_mut; #[cfg(not(feature = "arch-avr"))] use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering; @@ -48,6 +47,8 @@ use portable_atomic::AtomicPtr; #[cfg(feature = "drs-scheduler")] pub use deadline::Deadline; +#[cfg(feature = "arch-avr")] +use portable_atomic::AtomicPtr; use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; -- cgit From 0e28ba1091257111f71b76a664d7038dbfcf9b5e Mon Sep 17 00:00:00 2001 From: James Munns Date: Wed, 16 Apr 2025 15:59:28 +0200 Subject: "Deadline Rank Sorted Scheduler" -> "Earliest Deadline First Scheduler" --- embassy-executor/src/raw/deadline.rs | 4 ++-- embassy-executor/src/raw/mod.rs | 14 +++++++------- embassy-executor/src/raw/run_queue_atomics.rs | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index ae6394822..006c7caf1 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -2,11 +2,11 @@ use core::future::{poll_fn, Future}; use core::task::Poll; #[cfg(not(target_has_atomic = "ptr"))] -compile_error!("The `drs-scheduler` feature is currently only supported on targets with atomics."); +compile_error!("The `edf-scheduler` feature is currently only supported on targets with atomics."); /// A type for interacting with the deadline of the current task /// -/// Requires the `drs-scheduler` feature +/// Requires the `edf-scheduler` feature pub struct Deadline { /// Deadline value in ticks, same time base and ticks as `embassy-time` pub instant_ticks: u64, diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 21dc67b7e..96e7fda74 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -28,7 +28,7 @@ pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; -#[cfg(feature = "drs-scheduler")] +#[cfg(feature = "edf-scheduler")] mod deadline; use core::future::Future; @@ -45,7 +45,7 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -#[cfg(feature = "drs-scheduler")] +#[cfg(feature = "edf-scheduler")] pub use deadline::Deadline; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; @@ -105,9 +105,9 @@ pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, - /// Deadline Rank Scheduler Deadline. This field should not be accessed outside the context of - /// the task itself as it being polled by the executor. - #[cfg(feature = "drs-scheduler")] + /// Earliest Deadline First scheduler Deadline. This field should not be accessed + /// outside the context of the task itself as it being polled by the executor. + #[cfg(feature = "edf-scheduler")] pub(crate) deadline: SyncUnsafeCell, pub(crate) executor: AtomicPtr, @@ -216,7 +216,7 @@ impl TaskStorage { run_queue_item: RunQueueItem::new(), // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This // will be lazily initalized in `initialize_impl` - #[cfg(feature = "drs-scheduler")] + #[cfg(feature = "edf-scheduler")] deadline: SyncUnsafeCell::new(0u64), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` @@ -316,7 +316,7 @@ impl AvailableTask { // By default, deadlines are set to the maximum value, so that any task WITH // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline - #[cfg(feature = "drs-scheduler")] + #[cfg(feature = "edf-scheduler")] self.task.raw.deadline.set(deadline::Deadline::UNSET_DEADLINE_TICKS); let task = TaskRef::new(self.task); diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index 08765e06b..65a9b7859 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -1,7 +1,7 @@ use core::ptr::{addr_of_mut, NonNull}; use cordyceps::sorted_list::Links; -#[cfg(feature = "drs-scheduler")] +#[cfg(feature = "edf-scheduler")] use cordyceps::SortedList; use cordyceps::{Linked, TransferStack}; @@ -73,7 +73,7 @@ impl RunQueue { /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - #[cfg(not(feature = "drs-scheduler"))] + #[cfg(not(feature = "edf-scheduler"))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { let taken = self.stack.take_all(); for taskref in taken { @@ -82,7 +82,7 @@ impl RunQueue { } } - /// # Deadline Ranked Sorted Scheduler + /// # Earliest Deadline First Scheduler /// /// This algorithm will loop until all enqueued tasks are processed. /// @@ -96,7 +96,7 @@ impl RunQueue { /// /// This process will repeat until the local `sorted` queue AND the global /// runqueue are both empty, at which point this function will return. - #[cfg(feature = "drs-scheduler")] + #[cfg(feature = "edf-scheduler")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // SAFETY: `deadline` can only be set through the `Deadline` interface, which // only allows access to this value while the given task is being polled. -- cgit From 7af8f35a50c631802615044e12cc9c74614f78bb Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 3 Jun 2025 16:34:12 +0200 Subject: There can be only one (run queue) --- embassy-executor/src/raw/deadline.rs | 3 - embassy-executor/src/raw/mod.rs | 2 - embassy-executor/src/raw/run_queue.rs | 151 +++++++++++++++++++++ embassy-executor/src/raw/run_queue_atomics.rs | 129 ------------------ .../src/raw/run_queue_critical_section.rs | 74 ---------- 5 files changed, 151 insertions(+), 208 deletions(-) create mode 100644 embassy-executor/src/raw/run_queue.rs delete mode 100644 embassy-executor/src/raw/run_queue_atomics.rs delete mode 100644 embassy-executor/src/raw/run_queue_critical_section.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index 006c7caf1..0fb22a7ce 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -1,9 +1,6 @@ use core::future::{poll_fn, Future}; use core::task::Poll; -#[cfg(not(target_has_atomic = "ptr"))] -compile_error!("The `edf-scheduler` feature is currently only supported on targets with atomics."); - /// A type for interacting with the deadline of the current task /// /// Requires the `edf-scheduler` feature diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 96e7fda74..cc43690cb 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,8 +7,6 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. -#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] -#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; #[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")] diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs new file mode 100644 index 000000000..f630041e0 --- /dev/null +++ b/embassy-executor/src/raw/run_queue.rs @@ -0,0 +1,151 @@ +use core::ptr::{addr_of_mut, NonNull}; + +use cordyceps::sorted_list::Links; +use cordyceps::Linked; +#[cfg(feature = "edf-scheduler")] +use cordyceps::SortedList; + +#[cfg(target_has_atomic = "ptr")] +type TransferStack = cordyceps::TransferStack; + +#[cfg(not(target_has_atomic = "ptr"))] +type TransferStack = cordyceps::TransferStack; + +use super::{TaskHeader, TaskRef}; + +/// Use `cordyceps::sorted_list::Links` as the singly linked list +/// for RunQueueItems. +pub(crate) type RunQueueItem = Links; + +/// Implements the `Linked` trait, allowing for singly linked list usage +/// of any of cordyceps' `TransferStack` (used for the atomic runqueue), +/// `SortedList` (used with the DRS scheduler), or `Stack`, which is +/// popped atomically from the `TransferStack`. +unsafe impl Linked> for TaskHeader { + type Handle = TaskRef; + + // Convert a TaskRef into a TaskHeader ptr + fn into_ptr(r: TaskRef) -> NonNull { + r.ptr + } + + // Convert a TaskHeader into a TaskRef + unsafe fn from_ptr(ptr: NonNull) -> TaskRef { + TaskRef { ptr } + } + + // Given a pointer to a TaskHeader, obtain a pointer to the Links structure, + // which can be used to traverse to other TaskHeader nodes in the linked list + unsafe fn links(ptr: NonNull) -> NonNull> { + let ptr: *mut TaskHeader = ptr.as_ptr(); + NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + stack: TransferStack, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + stack: TransferStack::new(), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { + self.stack.push_was_empty(task) + } + + /// # Standard atomic runqueue + /// + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + #[cfg(not(feature = "edf-scheduler"))] + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + let taken = self.stack.take_all(); + for taskref in taken { + run_dequeue(&taskref); + on_task(taskref); + } + } + + /// # Earliest Deadline First Scheduler + /// + /// This algorithm will loop until all enqueued tasks are processed. + /// + /// Before polling a task, all currently enqueued tasks will be popped from the + /// runqueue, and will be added to the working `sorted` list, a linked-list that + /// sorts tasks by their deadline, with nearest deadline items in the front, and + /// furthest deadline items in the back. + /// + /// After popping and sorting all pending tasks, the SOONEST task will be popped + /// from the front of the queue, and polled by calling `on_task` on it. + /// + /// This process will repeat until the local `sorted` queue AND the global + /// runqueue are both empty, at which point this function will return. + #[cfg(feature = "edf-scheduler")] + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + // SAFETY: `deadline` can only be set through the `Deadline` interface, which + // only allows access to this value while the given task is being polled. + // This acts as mutual exclusion for access. + let mut sorted = + SortedList::::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); + + loop { + // For each loop, grab any newly pended items + let taken = self.stack.take_all(); + + // Sort these into the list - this is potentially expensive! We do an + // insertion sort of new items, which iterates the linked list. + // + // Something on the order of `O(n * m)`, where `n` is the number + // of new tasks, and `m` is the number of already pending tasks. + sorted.extend(taken); + + // Pop the task with the SOONEST deadline. If there are no tasks + // pending, then we are done. + let Some(taskref) = sorted.pop_front() else { + return; + }; + + // We got one task, mark it as dequeued, and process the task. + run_dequeue(&taskref); + on_task(taskref); + } + } +} + +/// atomic state does not require a cs... +#[cfg(target_has_atomic = "ptr")] +#[inline(always)] +fn run_dequeue(taskref: &TaskRef) { + taskref.header().state.run_dequeue(); +} + +/// ...while non-atomic state does +#[cfg(not(target_has_atomic = "ptr"))] +#[inline(always)] +fn run_dequeue(taskref: &TaskRef) { + critical_section::with(|cs| { + taskref.header().state.run_dequeue(cs); + }) +} diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs deleted file mode 100644 index 65a9b7859..000000000 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ /dev/null @@ -1,129 +0,0 @@ -use core::ptr::{addr_of_mut, NonNull}; - -use cordyceps::sorted_list::Links; -#[cfg(feature = "edf-scheduler")] -use cordyceps::SortedList; -use cordyceps::{Linked, TransferStack}; - -use super::{TaskHeader, TaskRef}; - -/// Use `cordyceps::sorted_list::Links` as the singly linked list -/// for RunQueueItems. -pub(crate) type RunQueueItem = Links; - -/// Implements the `Linked` trait, allowing for singly linked list usage -/// of any of cordyceps' `TransferStack` (used for the atomic runqueue), -/// `SortedList` (used with the DRS scheduler), or `Stack`, which is -/// popped atomically from the `TransferStack`. -unsafe impl Linked> for TaskHeader { - type Handle = TaskRef; - - // Convert a TaskRef into a TaskHeader ptr - fn into_ptr(r: TaskRef) -> NonNull { - r.ptr - } - - // Convert a TaskHeader into a TaskRef - unsafe fn from_ptr(ptr: NonNull) -> TaskRef { - TaskRef { ptr } - } - - // Given a pointer to a TaskHeader, obtain a pointer to the Links structure, - // which can be used to traverse to other TaskHeader nodes in the linked list - unsafe fn links(ptr: NonNull) -> NonNull> { - let ptr: *mut TaskHeader = ptr.as_ptr(); - NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't create fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - stack: TransferStack, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - stack: TransferStack::new(), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - /// - /// # Safety - /// - /// `item` must NOT be already enqueued in any queue. - #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { - self.stack.push_was_empty(task) - } - - /// # Standard atomic runqueue - /// - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - #[cfg(not(feature = "edf-scheduler"))] - pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - let taken = self.stack.take_all(); - for taskref in taken { - taskref.header().state.run_dequeue(); - on_task(taskref); - } - } - - /// # Earliest Deadline First Scheduler - /// - /// This algorithm will loop until all enqueued tasks are processed. - /// - /// Before polling a task, all currently enqueued tasks will be popped from the - /// runqueue, and will be added to the working `sorted` list, a linked-list that - /// sorts tasks by their deadline, with nearest deadline items in the front, and - /// furthest deadline items in the back. - /// - /// After popping and sorting all pending tasks, the SOONEST task will be popped - /// from the front of the queue, and polled by calling `on_task` on it. - /// - /// This process will repeat until the local `sorted` queue AND the global - /// runqueue are both empty, at which point this function will return. - #[cfg(feature = "edf-scheduler")] - pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // SAFETY: `deadline` can only be set through the `Deadline` interface, which - // only allows access to this value while the given task is being polled. - // This acts as mutual exclusion for access. - let mut sorted = - SortedList::::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); - - loop { - // For each loop, grab any newly pended items - let taken = self.stack.take_all(); - - // Sort these into the list - this is potentially expensive! We do an - // insertion sort of new items, which iterates the linked list. - // - // Something on the order of `O(n * m)`, where `n` is the number - // of new tasks, and `m` is the number of already pending tasks. - sorted.extend(taken); - - // Pop the task with the SOONEST deadline. If there are no tasks - // pending, then we are done. - let Some(taskref) = sorted.pop_front() else { - return; - }; - - // We got one task, mark it as dequeued, and process the task. - taskref.header().state.run_dequeue(); - on_task(taskref); - } - } -} diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs deleted file mode 100644 index 86c4085ed..000000000 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ /dev/null @@ -1,74 +0,0 @@ -use core::cell::Cell; - -use critical_section::{CriticalSection, Mutex}; - -use super::TaskRef; - -pub(crate) struct RunQueueItem { - next: Mutex>>, -} - -impl RunQueueItem { - pub const fn new() -> Self { - Self { - next: Mutex::new(Cell::new(None)), - } - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't create fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - head: Mutex>>, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - head: Mutex::new(Cell::new(None)), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - /// - /// # Safety - /// - /// `item` must NOT be already enqueued in any queue. - #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool { - let prev = self.head.borrow(cs).replace(Some(task)); - task.header().run_queue_item.next.borrow(cs).set(prev); - - prev.is_none() - } - - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // Atomically empty the queue. - let mut next = critical_section::with(|cs| self.head.borrow(cs).take()); - - // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = next { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - - critical_section::with(|cs| { - next = task.header().run_queue_item.next.borrow(cs).get(); - task.header().state.run_dequeue(cs); - }); - - on_task(task); - } - } -} -- cgit From d88ea8dd2adefba42489173d5119e888ffa73f07 Mon Sep 17 00:00:00 2001 From: James Munns Date: Wed, 4 Jun 2025 12:22:32 +0200 Subject: Update with cordyceps changes --- embassy-executor/src/raw/run_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index f630041e0..c6c7d7109 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -9,7 +9,7 @@ use cordyceps::SortedList; type TransferStack = cordyceps::TransferStack; #[cfg(not(target_has_atomic = "ptr"))] -type TransferStack = cordyceps::TransferStack; +type TransferStack = cordyceps::MutexTransferStack; use super::{TaskHeader, TaskRef}; -- cgit From db063945e76a9b62672377ed71e6e833a295a054 Mon Sep 17 00:00:00 2001 From: James Munns Date: Wed, 2 Jul 2025 13:48:32 +0200 Subject: Inline the "MutexTransferStack" impl as it is unclear whether it will be merged upstream --- embassy-executor/src/raw/run_queue.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index c6c7d7109..5fd703aad 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -9,7 +9,7 @@ use cordyceps::SortedList; type TransferStack = cordyceps::TransferStack; #[cfg(not(target_has_atomic = "ptr"))] -type TransferStack = cordyceps::MutexTransferStack; +type TransferStack = MutexTransferStack; use super::{TaskHeader, TaskRef}; @@ -149,3 +149,32 @@ fn run_dequeue(taskref: &TaskRef) { taskref.header().state.run_dequeue(cs); }) } + +/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex +#[cfg(not(target_has_atomic="ptr"))] +struct MutexTransferStack>> { + inner: mutex::BlockingMutex>, +} + +#[cfg(not(target_has_atomic="ptr"))] +impl>> MutexTransferStack { + const fn new() -> Self { + Self { + inner: mutex::BlockingMutex::new(cordyceps::Stack::new()), + } + } + + fn push_was_empty(&self, item: T::Handle) -> bool { + self.inner.with_lock(|stack| { + let is_empty = stack.is_empty(); + stack.push(item); + is_empty + }) + } + + fn take_all(&self) -> cordyceps::Stack { + self.inner.with_lock(|stack| { + stack.take_all() + }) + } +} -- cgit From cf171ad6d9c0a7487400beb9e4a436e5c1b64e19 Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 3 Jul 2025 09:11:32 +0200 Subject: fmt --- embassy-executor/src/raw/run_queue.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 5fd703aad..b4b22819f 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -151,12 +151,12 @@ fn run_dequeue(taskref: &TaskRef) { } /// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex -#[cfg(not(target_has_atomic="ptr"))] +#[cfg(not(target_has_atomic = "ptr"))] struct MutexTransferStack>> { inner: mutex::BlockingMutex>, } -#[cfg(not(target_has_atomic="ptr"))] +#[cfg(not(target_has_atomic = "ptr"))] impl>> MutexTransferStack { const fn new() -> Self { Self { @@ -173,8 +173,6 @@ impl>> MutexTransferStack { } fn take_all(&self) -> cordyceps::Stack { - self.inner.with_lock(|stack| { - stack.take_all() - }) + self.inner.with_lock(|stack| stack.take_all()) } } -- cgit From 20b56b0fe0570f0d1e8c61d23d067627a4dfc165 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 15 Jul 2025 13:33:51 +0200 Subject: Update to use critical-section::Mutex instead of mutex::BlockingMutex This allows the scheduler to better collaborate with existing critical sections --- embassy-executor/src/raw/run_queue.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index b4b22819f..9acb9dd28 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -70,8 +70,12 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { - self.stack.push_was_empty(task) + pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool { + self.stack.push_was_empty( + task, + #[cfg(not(target_has_atomic = "ptr"))] + _tok, + ) } /// # Standard atomic runqueue @@ -153,26 +157,28 @@ fn run_dequeue(taskref: &TaskRef) { /// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex #[cfg(not(target_has_atomic = "ptr"))] struct MutexTransferStack>> { - inner: mutex::BlockingMutex>, + inner: critical_section::Mutex>>, } #[cfg(not(target_has_atomic = "ptr"))] impl>> MutexTransferStack { const fn new() -> Self { Self { - inner: mutex::BlockingMutex::new(cordyceps::Stack::new()), + inner: critical_section::Mutex::new(core::cell::RefCell::new(cordyceps::Stack::new())), } } - fn push_was_empty(&self, item: T::Handle) -> bool { - self.inner.with_lock(|stack| { - let is_empty = stack.is_empty(); - stack.push(item); - is_empty - }) + fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool { + let mut guard = self.inner.borrow_ref_mut(token); + let is_empty = guard.is_empty(); + guard.push(item); + is_empty } fn take_all(&self) -> cordyceps::Stack { - self.inner.with_lock(|stack| stack.take_all()) + critical_section::with(|cs| { + let mut guard = self.inner.borrow_ref_mut(cs); + guard.take_all() + }) } } -- cgit From 38e5e2e9ceb9a34badfdfc57477f0dba23c64ced Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 15 Jul 2025 14:51:08 +0200 Subject: Replace use of RefCell with UnsafeCell --- embassy-executor/src/raw/run_queue.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 9acb9dd28..97d26a18a 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -157,19 +157,26 @@ fn run_dequeue(taskref: &TaskRef) { /// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex #[cfg(not(target_has_atomic = "ptr"))] struct MutexTransferStack>> { - inner: critical_section::Mutex>>, + inner: critical_section::Mutex>>, } #[cfg(not(target_has_atomic = "ptr"))] impl>> MutexTransferStack { const fn new() -> Self { Self { - inner: critical_section::Mutex::new(core::cell::RefCell::new(cordyceps::Stack::new())), + inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())), } } + /// Push an item to the transfer stack, returning whether the stack was previously empty fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool { - let mut guard = self.inner.borrow_ref_mut(token); + /// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access + /// for the lifetime of the token, but does NOT protect against re-entrant access. + /// However, we never *return* the reference, nor do we recurse (or call another method + /// like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the + /// presence of the critical section is sufficient to guarantee exclusive access to + /// the `inner` field for the purposes of this function + let mut guard = unsafe { &mut *self.inner.borrow(token).get() }; let is_empty = guard.is_empty(); guard.push(item); is_empty @@ -177,7 +184,13 @@ impl>> MutexTransferStack { fn take_all(&self) -> cordyceps::Stack { critical_section::with(|cs| { - let mut guard = self.inner.borrow_ref_mut(cs); + /// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access + /// for the lifetime of the token, but does NOT protect against re-entrant access. + /// However, we never *return* the reference, nor do we recurse (or call another method + /// like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the + /// presence of the critical section is sufficient to guarantee exclusive access to + /// the `inner` field for the purposes of this function + let mut guard = unsafe { &mut *self.inner.borrow(cs).get() }; guard.take_all() }) } -- cgit From 4479f5bbfce9002b965f9e3e189cdd5c61096eff Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 15 Jul 2025 14:54:20 +0200 Subject: Regular comments not doc comments --- embassy-executor/src/raw/run_queue.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 97d26a18a..1eb5775d8 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -170,12 +170,12 @@ impl>> MutexTransferStack { /// Push an item to the transfer stack, returning whether the stack was previously empty fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool { - /// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access - /// for the lifetime of the token, but does NOT protect against re-entrant access. - /// However, we never *return* the reference, nor do we recurse (or call another method - /// like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the - /// presence of the critical section is sufficient to guarantee exclusive access to - /// the `inner` field for the purposes of this function + // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access + // for the lifetime of the token, but does NOT protect against re-entrant access. + // However, we never *return* the reference, nor do we recurse (or call another method + // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the + // presence of the critical section is sufficient to guarantee exclusive access to + // the `inner` field for the purposes of this function. let mut guard = unsafe { &mut *self.inner.borrow(token).get() }; let is_empty = guard.is_empty(); guard.push(item); @@ -184,12 +184,12 @@ impl>> MutexTransferStack { fn take_all(&self) -> cordyceps::Stack { critical_section::with(|cs| { - /// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access - /// for the lifetime of the token, but does NOT protect against re-entrant access. - /// However, we never *return* the reference, nor do we recurse (or call another method - /// like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the - /// presence of the critical section is sufficient to guarantee exclusive access to - /// the `inner` field for the purposes of this function + // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access + // for the lifetime of the token, but does NOT protect against re-entrant access. + // However, we never *return* the reference, nor do we recurse (or call another method + // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the + // presence of the critical section is sufficient to guarantee exclusive access to + // the `inner` field for the purposes of this function. let mut guard = unsafe { &mut *self.inner.borrow(cs).get() }; guard.take_all() }) -- cgit From b5c9e721009fd4331cdc1ce58a07698eb54f2959 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 15 Jul 2025 14:58:41 +0200 Subject: Rename, remove excess mut --- embassy-executor/src/raw/run_queue.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 1eb5775d8..97060f4b9 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -176,9 +176,9 @@ impl>> MutexTransferStack { // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the // presence of the critical section is sufficient to guarantee exclusive access to // the `inner` field for the purposes of this function. - let mut guard = unsafe { &mut *self.inner.borrow(token).get() }; - let is_empty = guard.is_empty(); - guard.push(item); + let inner = unsafe { &mut *self.inner.borrow(token).get() }; + let is_empty = inner.is_empty(); + inner.push(item); is_empty } @@ -190,8 +190,8 @@ impl>> MutexTransferStack { // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the // presence of the critical section is sufficient to guarantee exclusive access to // the `inner` field for the purposes of this function. - let mut guard = unsafe { &mut *self.inner.borrow(cs).get() }; - guard.take_all() + let inner = unsafe { &mut *self.inner.borrow(cs).get() }; + inner.take_all() }) } } -- cgit From 3f606b28f3b32e9e3b9a9f136eeef52828a78512 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Tue, 15 Jul 2025 13:40:30 +0200 Subject: Change deadline to use internal atomics --- embassy-executor/src/raw/deadline.rs | 101 +++++++++++++++++----------------- embassy-executor/src/raw/mod.rs | 4 +- embassy-executor/src/raw/run_queue.rs | 8 +-- 3 files changed, 55 insertions(+), 58 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index 0fb22a7ce..a61852612 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -1,15 +1,41 @@ use core::future::{poll_fn, Future}; +use core::sync::atomic::{AtomicU32, Ordering}; use core::task::Poll; /// A type for interacting with the deadline of the current task /// /// Requires the `edf-scheduler` feature pub struct Deadline { - /// Deadline value in ticks, same time base and ticks as `embassy-time` - pub instant_ticks: u64, + instant_ticks_hi: AtomicU32, + instant_ticks_lo: AtomicU32, } impl Deadline { + pub(crate) const fn new(instant_ticks: u64) -> Self { + Self { + instant_ticks_hi: AtomicU32::new((instant_ticks >> 32) as u32), + instant_ticks_lo: AtomicU32::new(instant_ticks as u32), + } + } + + pub(crate) const fn new_unset() -> Self { + Self::new(Self::UNSET_DEADLINE_TICKS) + } + + pub(crate) fn set(&self, instant_ticks: u64) { + self.instant_ticks_hi + .store((instant_ticks >> 32) as u32, Ordering::Relaxed); + self.instant_ticks_lo.store(instant_ticks as u32, Ordering::Relaxed); + } + + /// Deadline value in ticks, same time base and ticks as `embassy-time` + pub fn instant_ticks(&self) -> u64 { + let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64; + let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64; + + (hi << 32) | lo + } + /// Sentinel value representing an "unset" deadline, which has lower priority /// than any other set deadline value pub const UNSET_DEADLINE_TICKS: u64 = u64::MAX; @@ -17,7 +43,7 @@ impl Deadline { /// Does the given Deadline represent an "unset" deadline? #[inline] pub fn is_unset(&self) -> bool { - self.instant_ticks == Self::UNSET_DEADLINE_TICKS + self.instant_ticks() == Self::UNSET_DEADLINE_TICKS } /// Set the current task's deadline at exactly `instant_ticks` @@ -32,11 +58,7 @@ impl Deadline { pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - task.header().deadline.set(instant_ticks); - } + task.header().deadline.set(instant_ticks); Poll::Ready(()) }) } @@ -61,14 +83,9 @@ impl Deadline { // reasons later. let deadline = now.saturating_add(duration_ticks); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - task.header().deadline.set(deadline); - } - Poll::Ready(Deadline { - instant_ticks: deadline, - }) + task.header().deadline.set(deadline); + + Poll::Ready(Deadline::new(deadline)) }) } @@ -90,24 +107,18 @@ impl Deadline { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - // Get the last value - let last = task.header().deadline.get(); + // Get the last value + let last = task.header().deadline.instant_ticks(); - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = last.saturating_add(increment_ticks); + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = last.saturating_add(increment_ticks); - // Store the new value - task.header().deadline.set(deadline); + // Store the new value + task.header().deadline.set(deadline); - Poll::Ready(Deadline { - instant_ticks: deadline, - }) - } + Poll::Ready(Deadline::new(deadline)) }) } @@ -119,12 +130,8 @@ impl Deadline { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - let deadline = unsafe { task.header().deadline.get() }; - Poll::Ready(Self { - instant_ticks: deadline, - }) + let deadline = task.header().deadline.instant_ticks(); + Poll::Ready(Self::new(deadline)) }) } @@ -137,20 +144,12 @@ impl Deadline { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - let deadline = unsafe { - // get the old value - let d = task.header().deadline.get(); - // Store the default value - task.header().deadline.set(Self::UNSET_DEADLINE_TICKS); - // return the old value - d - }; - - Poll::Ready(Self { - instant_ticks: deadline, - }) + // get the old value + let deadline = task.header().deadline.instant_ticks(); + // Store the default value + task.header().deadline.set(Self::UNSET_DEADLINE_TICKS); + + Poll::Ready(Self::new(deadline)) }) } } diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index cc43690cb..be2c5ee28 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -106,7 +106,7 @@ pub(crate) struct TaskHeader { /// Earliest Deadline First scheduler Deadline. This field should not be accessed /// outside the context of the task itself as it being polled by the executor. #[cfg(feature = "edf-scheduler")] - pub(crate) deadline: SyncUnsafeCell, + pub(crate) deadline: Deadline, pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, @@ -215,7 +215,7 @@ impl TaskStorage { // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This // will be lazily initalized in `initialize_impl` #[cfg(feature = "edf-scheduler")] - deadline: SyncUnsafeCell::new(0u64), + deadline: Deadline::new_unset(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 97060f4b9..e8a046a48 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -108,11 +108,9 @@ impl RunQueue { /// runqueue are both empty, at which point this function will return. #[cfg(feature = "edf-scheduler")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // SAFETY: `deadline` can only be set through the `Deadline` interface, which - // only allows access to this value while the given task is being polled. - // This acts as mutual exclusion for access. - let mut sorted = - SortedList::::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); + let mut sorted = SortedList::::new_with_cmp(|lhs, rhs| { + lhs.deadline.instant_ticks().cmp(&rhs.deadline.instant_ticks()) + }); loop { // For each loop, grab any newly pended items -- cgit From d6d4df1c768f8ae43ad1339b74d351f4cbad0386 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Tue, 15 Jul 2025 14:30:02 +0200 Subject: Add some docs --- embassy-executor/src/raw/deadline.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index a61852612..cbb379b82 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -4,7 +4,11 @@ use core::task::Poll; /// A type for interacting with the deadline of the current task /// -/// Requires the `edf-scheduler` feature +/// Requires the `edf-scheduler` feature. +/// +/// Note: Interacting with the deadline should be done locally in a task. +/// In theory you could try to set or read the deadline from another task, +/// but that will result in weird (though not unsound) behavior. pub struct Deadline { instant_ticks_hi: AtomicU32, instant_ticks_lo: AtomicU32, -- cgit From 52d178560501a464dba67da89a1570ae9a2cf66c Mon Sep 17 00:00:00 2001 From: diondokter Date: Fri, 29 Aug 2025 14:36:17 +0200 Subject: Introduce metadata-deadline and let the EDF scheduler use it --- embassy-executor/src/metadata.rs | 13 +++++++++++++ embassy-executor/src/raw/deadline.rs | 18 +++++++++--------- embassy-executor/src/raw/mod.rs | 19 +++++++------------ embassy-executor/src/raw/run_queue.rs | 5 ++++- 4 files changed, 33 insertions(+), 22 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs index f92c9b37c..fd8095629 100644 --- a/embassy-executor/src/metadata.rs +++ b/embassy-executor/src/metadata.rs @@ -12,6 +12,8 @@ use crate::raw; pub struct Metadata { #[cfg(feature = "metadata-name")] name: Mutex>>, + #[cfg(feature = "metadata-deadline")] + deadline: raw::Deadline, } impl Metadata { @@ -19,6 +21,10 @@ impl Metadata { Self { #[cfg(feature = "metadata-name")] name: Mutex::new(Cell::new(None)), + // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This + // will be lazily initalized in `initialize_impl` + #[cfg(feature = "metadata-deadline")] + deadline: raw::Deadline::new_unset(), } } @@ -52,4 +58,11 @@ impl Metadata { pub fn set_name(&self, name: &'static str) { critical_section::with(|cs| self.name.borrow(cs).set(Some(name))) } + + /// Earliest Deadline First scheduler Deadline. This field should not be accessed + /// outside the context of the task itself as it being polled by the executor. + #[cfg(feature = "metadata-deadline")] + pub fn deadline(&self) -> &raw::Deadline { + &self.deadline + } } diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index cbb379b82..5b585195d 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -62,7 +62,7 @@ impl Deadline { pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); - task.header().deadline.set(instant_ticks); + task.header().metadata.deadline().set(instant_ticks); Poll::Ready(()) }) } @@ -87,7 +87,7 @@ impl Deadline { // reasons later. let deadline = now.saturating_add(duration_ticks); - task.header().deadline.set(deadline); + task.header().metadata.deadline().set(deadline); Poll::Ready(Deadline::new(deadline)) }) @@ -109,10 +109,10 @@ impl Deadline { #[must_use = "Setting deadline must be polled to be effective"] pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); + let task_header = super::task_from_waker(cx.waker()).header(); // Get the last value - let last = task.header().deadline.instant_ticks(); + let last = task_header.metadata.deadline().instant_ticks(); // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave // it for now, we can probably make this wrapping_add for performance @@ -120,7 +120,7 @@ impl Deadline { let deadline = last.saturating_add(increment_ticks); // Store the new value - task.header().deadline.set(deadline); + task_header.metadata.deadline().set(deadline); Poll::Ready(Deadline::new(deadline)) }) @@ -134,7 +134,7 @@ impl Deadline { poll_fn(move |cx| { let task = super::task_from_waker(cx.waker()); - let deadline = task.header().deadline.instant_ticks(); + let deadline = task.header().metadata.deadline().instant_ticks(); Poll::Ready(Self::new(deadline)) }) } @@ -146,12 +146,12 @@ impl Deadline { #[must_use = "Clearing deadline must be polled to be effective"] pub fn clear_current_task_deadline() -> impl Future { poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); + let task_header = super::task_from_waker(cx.waker()).header(); // get the old value - let deadline = task.header().deadline.instant_ticks(); + let deadline = task_header.metadata.deadline().instant_ticks(); // Store the default value - task.header().deadline.set(Self::UNSET_DEADLINE_TICKS); + task_header.metadata.deadline().set(Self::UNSET_DEADLINE_TICKS); Poll::Ready(Self::new(deadline)) }) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index be2c5ee28..f93bfdef9 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -26,7 +26,7 @@ pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; -#[cfg(feature = "edf-scheduler")] +#[cfg(feature = "metadata-deadline")] mod deadline; use core::future::Future; @@ -43,7 +43,7 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -#[cfg(feature = "edf-scheduler")] +#[cfg(feature = "metadata-deadline")] pub use deadline::Deadline; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; @@ -103,11 +103,6 @@ pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, - /// Earliest Deadline First scheduler Deadline. This field should not be accessed - /// outside the context of the task itself as it being polled by the executor. - #[cfg(feature = "edf-scheduler")] - pub(crate) deadline: Deadline, - pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, @@ -212,10 +207,6 @@ impl TaskStorage { raw: TaskHeader { state: State::new(), run_queue_item: RunQueueItem::new(), - // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This - // will be lazily initalized in `initialize_impl` - #[cfg(feature = "edf-scheduler")] - deadline: Deadline::new_unset(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), @@ -315,7 +306,11 @@ impl AvailableTask { // By default, deadlines are set to the maximum value, so that any task WITH // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline #[cfg(feature = "edf-scheduler")] - self.task.raw.deadline.set(deadline::Deadline::UNSET_DEADLINE_TICKS); + self.task + .raw + .metadata + .deadline() + .set(deadline::Deadline::UNSET_DEADLINE_TICKS); let task = TaskRef::new(self.task); diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index e8a046a48..978ca082a 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -109,7 +109,10 @@ impl RunQueue { #[cfg(feature = "edf-scheduler")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { let mut sorted = SortedList::::new_with_cmp(|lhs, rhs| { - lhs.deadline.instant_ticks().cmp(&rhs.deadline.instant_ticks()) + lhs.metadata + .deadline() + .instant_ticks() + .cmp(&rhs.metadata.deadline().instant_ticks()) }); loop { -- cgit From a853bbe2a4dbb64c2e691ddcb258b2530d2b5af5 Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Fri, 29 Aug 2025 15:16:31 +0200 Subject: Happy CI :) --- embassy-executor/src/raw/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f93bfdef9..86ee86842 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -39,12 +39,9 @@ use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering; use core::task::{Context, Poll, Waker}; -use embassy_executor_timer_queue::TimerQueueItem; -#[cfg(feature = "arch-avr")] -use portable_atomic::AtomicPtr; - #[cfg(feature = "metadata-deadline")] pub use deadline::Deadline; +use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -- cgit From 401fac6ea95b6dd16492d784f99f07fb9a1b318b Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Mon, 8 Sep 2025 11:40:34 +0200 Subject: Make requested API changes --- embassy-executor/src/metadata.rs | 64 ++++++++++++++++++-- embassy-executor/src/raw/deadline.rs | 109 ---------------------------------- embassy-executor/src/raw/mod.rs | 6 +- embassy-executor/src/raw/run_queue.rs | 6 +- 4 files changed, 65 insertions(+), 120 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs index fd8095629..81c5afafb 100644 --- a/embassy-executor/src/metadata.rs +++ b/embassy-executor/src/metadata.rs @@ -7,12 +7,14 @@ use core::task::Poll; use critical_section::Mutex; use crate::raw; +#[cfg(feature = "scheduler-deadline")] +use crate::raw::Deadline; /// Metadata associated with a task. pub struct Metadata { #[cfg(feature = "metadata-name")] name: Mutex>>, - #[cfg(feature = "metadata-deadline")] + #[cfg(feature = "scheduler-deadline")] deadline: raw::Deadline, } @@ -23,7 +25,7 @@ impl Metadata { name: Mutex::new(Cell::new(None)), // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This // will be lazily initalized in `initialize_impl` - #[cfg(feature = "metadata-deadline")] + #[cfg(feature = "scheduler-deadline")] deadline: raw::Deadline::new_unset(), } } @@ -59,10 +61,62 @@ impl Metadata { critical_section::with(|cs| self.name.borrow(cs).set(Some(name))) } - /// Earliest Deadline First scheduler Deadline. This field should not be accessed - /// outside the context of the task itself as it being polled by the executor. - #[cfg(feature = "metadata-deadline")] + /// Get this task's deadline. + #[cfg(feature = "scheduler-deadline")] pub fn deadline(&self) -> &raw::Deadline { &self.deadline } + + /// Set this task's deadline. + /// + /// This method does NOT check whether the deadline has already passed. + #[cfg(feature = "scheduler-deadline")] + pub fn set_deadline(&self, instant_ticks: u64) { + self.deadline.set(instant_ticks); + } + + /// Remove this task's deadline. + /// This brings it back to the defaul where it's not scheduled ahead of other tasks. + #[cfg(feature = "scheduler-deadline")] + pub fn unset_deadline(&self) { + self.deadline.set(Deadline::UNSET_DEADLINE_TICKS); + } + + /// Set this task's deadline `duration_ticks` in the future from when + /// this future is polled. This deadline is saturated to the max tick value. + /// + /// Analogous to `Timer::after`. + #[cfg(all(feature = "scheduler-deadline", feature = "embassy-time-driver"))] + pub fn set_deadline_after(&self, duration_ticks: u64) { + let now = embassy_time_driver::now(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = now.saturating_add(duration_ticks); + + self.set_deadline(deadline); + } + + /// Set the this task's deadline `increment_ticks` from the previous deadline. + /// + /// This deadline is saturated to the max tick value. + /// + /// Note that by default (unless otherwise set), tasks start life with the deadline + /// not set, which means this method will have no effect. + /// + /// Analogous to one increment of `Ticker::every().next()`. + /// + /// Returns the deadline that was set. + #[cfg(feature = "scheduler-deadline")] + pub fn increment_deadline(&self, duration_ticks: u64) { + let last = self.deadline().instant_ticks(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = last.saturating_add(duration_ticks); + + self.set_deadline(deadline); + } } diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index 5b585195d..d08dd06ed 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -1,6 +1,4 @@ -use core::future::{poll_fn, Future}; use core::sync::atomic::{AtomicU32, Ordering}; -use core::task::Poll; /// A type for interacting with the deadline of the current task /// @@ -49,111 +47,4 @@ impl Deadline { pub fn is_unset(&self) -> bool { self.instant_ticks() == Self::UNSET_DEADLINE_TICKS } - - /// Set the current task's deadline at exactly `instant_ticks` - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline. - /// - /// Analogous to `Timer::at`. - /// - /// This method does NOT check whether the deadline has already passed. - #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - task.header().metadata.deadline().set(instant_ticks); - Poll::Ready(()) - }) - } - - /// Set the current task's deadline `duration_ticks` in the future from when - /// this future is polled. This deadline is saturated to the max tick value. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline. - /// - /// Analogous to `Timer::after`. - /// - /// Returns the deadline that was set. - #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - let now = embassy_time_driver::now(); - - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = now.saturating_add(duration_ticks); - - task.header().metadata.deadline().set(deadline); - - Poll::Ready(Deadline::new(deadline)) - }) - } - - /// Set the current task's deadline `increment_ticks` from the previous deadline. - /// - /// This deadline is saturated to the max tick value. - /// - /// Note that by default (unless otherwise set), tasks start life with the deadline - /// u64::MAX, which means this method will have no effect. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - /// - /// Analogous to one increment of `Ticker::every().next()`. - /// - /// Returns the deadline that was set. - #[must_use = "Setting deadline must be polled to be effective"] - pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task_header = super::task_from_waker(cx.waker()).header(); - - // Get the last value - let last = task_header.metadata.deadline().instant_ticks(); - - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = last.saturating_add(increment_ticks); - - // Store the new value - task_header.metadata.deadline().set(deadline); - - Poll::Ready(Deadline::new(deadline)) - }) - } - - /// Get the current task's deadline as a tick value. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - pub fn get_current_task_deadline() -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - - let deadline = task.header().metadata.deadline().instant_ticks(); - Poll::Ready(Self::new(deadline)) - }) - } - - /// Clear the current task's deadline, returning the previous value. - /// - /// This sets the deadline to the default value of `u64::MAX`, meaning all - /// tasks with set deadlines will be scheduled BEFORE this task. - #[must_use = "Clearing deadline must be polled to be effective"] - pub fn clear_current_task_deadline() -> impl Future { - poll_fn(move |cx| { - let task_header = super::task_from_waker(cx.waker()).header(); - - // get the old value - let deadline = task_header.metadata.deadline().instant_ticks(); - // Store the default value - task_header.metadata.deadline().set(Self::UNSET_DEADLINE_TICKS); - - Poll::Ready(Self::new(deadline)) - }) - } } diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 86ee86842..6a9dd9749 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -26,7 +26,7 @@ pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; -#[cfg(feature = "metadata-deadline")] +#[cfg(feature = "scheduler-deadline")] mod deadline; use core::future::Future; @@ -39,7 +39,7 @@ use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering; use core::task::{Context, Poll, Waker}; -#[cfg(feature = "metadata-deadline")] +#[cfg(feature = "scheduler-deadline")] pub use deadline::Deadline; use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] @@ -302,7 +302,7 @@ impl AvailableTask { // By default, deadlines are set to the maximum value, so that any task WITH // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline - #[cfg(feature = "edf-scheduler")] + #[cfg(feature = "scheduler-deadline")] self.task .raw .metadata diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 978ca082a..29c977226 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -2,7 +2,7 @@ use core::ptr::{addr_of_mut, NonNull}; use cordyceps::sorted_list::Links; use cordyceps::Linked; -#[cfg(feature = "edf-scheduler")] +#[cfg(feature = "scheduler-deadline")] use cordyceps::SortedList; #[cfg(target_has_atomic = "ptr")] @@ -83,7 +83,7 @@ impl RunQueue { /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - #[cfg(not(feature = "edf-scheduler"))] + #[cfg(not(feature = "scheduler-deadline"))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { let taken = self.stack.take_all(); for taskref in taken { @@ -106,7 +106,7 @@ impl RunQueue { /// /// This process will repeat until the local `sorted` queue AND the global /// runqueue are both empty, at which point this function will return. - #[cfg(feature = "edf-scheduler")] + #[cfg(feature = "scheduler-deadline")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { let mut sorted = SortedList::::new_with_cmp(|lhs, rhs| { lhs.metadata -- cgit From 09701a339d9085d86a69bf271299d7b59eda9fdc Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Mon, 8 Sep 2025 12:33:04 +0200 Subject: Fix example --- embassy-executor/src/raw/deadline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index d08dd06ed..f6d016ae7 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -2,7 +2,7 @@ use core::sync::atomic::{AtomicU32, Ordering}; /// A type for interacting with the deadline of the current task /// -/// Requires the `edf-scheduler` feature. +/// Requires the `scheduler-deadline` feature. /// /// Note: Interacting with the deadline should be done locally in a task. /// In theory you could try to set or read the deadline from another task, -- cgit From e1209c5563576d18c4d033b015c9a5dd6145d581 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 11 Sep 2025 15:40:33 +0200 Subject: executor: make Deadline actually private. --- embassy-executor/src/metadata.rs | 8 ++++---- embassy-executor/src/raw/deadline.rs | 14 ++++---------- embassy-executor/src/raw/mod.rs | 8 ++------ embassy-executor/src/raw/run_queue.rs | 8 ++------ 4 files changed, 12 insertions(+), 26 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs index 81c5afafb..4220048a6 100644 --- a/embassy-executor/src/metadata.rs +++ b/embassy-executor/src/metadata.rs @@ -63,8 +63,8 @@ impl Metadata { /// Get this task's deadline. #[cfg(feature = "scheduler-deadline")] - pub fn deadline(&self) -> &raw::Deadline { - &self.deadline + pub fn deadline(&self) -> u64 { + self.deadline.instant_ticks() } /// Set this task's deadline. @@ -79,7 +79,7 @@ impl Metadata { /// This brings it back to the defaul where it's not scheduled ahead of other tasks. #[cfg(feature = "scheduler-deadline")] pub fn unset_deadline(&self) { - self.deadline.set(Deadline::UNSET_DEADLINE_TICKS); + self.deadline.set(Deadline::UNSET_TICKS); } /// Set this task's deadline `duration_ticks` in the future from when @@ -110,7 +110,7 @@ impl Metadata { /// Returns the deadline that was set. #[cfg(feature = "scheduler-deadline")] pub fn increment_deadline(&self, duration_ticks: u64) { - let last = self.deadline().instant_ticks(); + let last = self.deadline(); // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave // it for now, we can probably make this wrapping_add for performance diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs index f6d016ae7..cc89fadb0 100644 --- a/embassy-executor/src/raw/deadline.rs +++ b/embassy-executor/src/raw/deadline.rs @@ -7,7 +7,7 @@ use core::sync::atomic::{AtomicU32, Ordering}; /// Note: Interacting with the deadline should be done locally in a task. /// In theory you could try to set or read the deadline from another task, /// but that will result in weird (though not unsound) behavior. -pub struct Deadline { +pub(crate) struct Deadline { instant_ticks_hi: AtomicU32, instant_ticks_lo: AtomicU32, } @@ -21,7 +21,7 @@ impl Deadline { } pub(crate) const fn new_unset() -> Self { - Self::new(Self::UNSET_DEADLINE_TICKS) + Self::new(Self::UNSET_TICKS) } pub(crate) fn set(&self, instant_ticks: u64) { @@ -31,7 +31,7 @@ impl Deadline { } /// Deadline value in ticks, same time base and ticks as `embassy-time` - pub fn instant_ticks(&self) -> u64 { + pub(crate) fn instant_ticks(&self) -> u64 { let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64; let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64; @@ -40,11 +40,5 @@ impl Deadline { /// Sentinel value representing an "unset" deadline, which has lower priority /// than any other set deadline value - pub const UNSET_DEADLINE_TICKS: u64 = u64::MAX; - - /// Does the given Deadline represent an "unset" deadline? - #[inline] - pub fn is_unset(&self) -> bool { - self.instant_ticks() == Self::UNSET_DEADLINE_TICKS - } + pub(crate) const UNSET_TICKS: u64 = u64::MAX; } diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6a9dd9749..51a363385 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -40,7 +40,7 @@ use core::sync::atomic::Ordering; use core::task::{Context, Poll, Waker}; #[cfg(feature = "scheduler-deadline")] -pub use deadline::Deadline; +pub(crate) use deadline::Deadline; use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; @@ -303,11 +303,7 @@ impl AvailableTask { // By default, deadlines are set to the maximum value, so that any task WITH // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline #[cfg(feature = "scheduler-deadline")] - self.task - .raw - .metadata - .deadline() - .set(deadline::Deadline::UNSET_DEADLINE_TICKS); + self.task.raw.metadata.unset_deadline(); let task = TaskRef::new(self.task); diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 29c977226..d98c26f73 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -108,12 +108,8 @@ impl RunQueue { /// runqueue are both empty, at which point this function will return. #[cfg(feature = "scheduler-deadline")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - let mut sorted = SortedList::::new_with_cmp(|lhs, rhs| { - lhs.metadata - .deadline() - .instant_ticks() - .cmp(&rhs.metadata.deadline().instant_ticks()) - }); + let mut sorted = + SortedList::::new_with_cmp(|lhs, rhs| lhs.metadata.deadline().cmp(&rhs.metadata.deadline())); loop { // For each loop, grab any newly pended items -- cgit From 6ec9bcb1c4dfbe5fc5365d93e75c516bb03bf9fc Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Thu, 11 Sep 2025 16:15:27 +0200 Subject: executor: add priority scheduler. --- embassy-executor/src/metadata.rs | 26 ++++++++++++++++++++++++++ embassy-executor/src/raw/mod.rs | 5 ----- embassy-executor/src/raw/run_queue.rs | 29 ++++++++++++++++++++++++----- 3 files changed, 50 insertions(+), 10 deletions(-) (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs index 4220048a6..bc0df0f83 100644 --- a/embassy-executor/src/metadata.rs +++ b/embassy-executor/src/metadata.rs @@ -1,6 +1,8 @@ #[cfg(feature = "metadata-name")] use core::cell::Cell; use core::future::{poll_fn, Future}; +#[cfg(feature = "scheduler-priority")] +use core::sync::atomic::{AtomicU8, Ordering}; use core::task::Poll; #[cfg(feature = "metadata-name")] @@ -14,6 +16,8 @@ use crate::raw::Deadline; pub struct Metadata { #[cfg(feature = "metadata-name")] name: Mutex>>, + #[cfg(feature = "scheduler-priority")] + priority: AtomicU8, #[cfg(feature = "scheduler-deadline")] deadline: raw::Deadline, } @@ -23,6 +27,8 @@ impl Metadata { Self { #[cfg(feature = "metadata-name")] name: Mutex::new(Cell::new(None)), + #[cfg(feature = "scheduler-priority")] + priority: AtomicU8::new(0), // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This // will be lazily initalized in `initialize_impl` #[cfg(feature = "scheduler-deadline")] @@ -33,6 +39,14 @@ impl Metadata { pub(crate) fn reset(&self) { #[cfg(feature = "metadata-name")] critical_section::with(|cs| self.name.borrow(cs).set(None)); + + #[cfg(feature = "scheduler-priority")] + self.set_priority(0); + + // By default, deadlines are set to the maximum value, so that any task WITH + // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline + #[cfg(feature = "scheduler-deadline")] + self.unset_deadline(); } /// Get the metadata for the current task. @@ -61,6 +75,18 @@ impl Metadata { critical_section::with(|cs| self.name.borrow(cs).set(Some(name))) } + /// Get this task's priority. + #[cfg(feature = "scheduler-priority")] + pub fn priority(&self) -> u8 { + self.priority.load(Ordering::Relaxed) + } + + /// Set this task's priority. + #[cfg(feature = "scheduler-priority")] + pub fn set_priority(&self, priority: u8) { + self.priority.store(priority, Ordering::Relaxed) + } + /// Get this task's deadline. #[cfg(feature = "scheduler-deadline")] pub fn deadline(&self) -> u64 { diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 51a363385..9f36c60bc 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -300,11 +300,6 @@ impl AvailableTask { self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); self.task.future.write_in_place(future); - // By default, deadlines are set to the maximum value, so that any task WITH - // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline - #[cfg(feature = "scheduler-deadline")] - self.task.raw.metadata.unset_deadline(); - let task = TaskRef::new(self.task); SpawnToken::new(task) diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index d98c26f73..b8b052310 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -2,7 +2,7 @@ use core::ptr::{addr_of_mut, NonNull}; use cordyceps::sorted_list::Links; use cordyceps::Linked; -#[cfg(feature = "scheduler-deadline")] +#[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))] use cordyceps::SortedList; #[cfg(target_has_atomic = "ptr")] @@ -83,7 +83,7 @@ impl RunQueue { /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - #[cfg(not(feature = "scheduler-deadline"))] + #[cfg(not(any(feature = "scheduler-priority", feature = "scheduler-deadline")))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { let taken = self.stack.take_all(); for taskref in taken { @@ -106,10 +106,29 @@ impl RunQueue { /// /// This process will repeat until the local `sorted` queue AND the global /// runqueue are both empty, at which point this function will return. - #[cfg(feature = "scheduler-deadline")] + #[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - let mut sorted = - SortedList::::new_with_cmp(|lhs, rhs| lhs.metadata.deadline().cmp(&rhs.metadata.deadline())); + let mut sorted = SortedList::::new_with_cmp(|lhs, rhs| { + // compare by priority first + #[cfg(feature = "scheduler-priority")] + { + let lp = lhs.metadata.priority(); + let rp = rhs.metadata.priority(); + if lp != rp { + return lp.cmp(&rp).reverse(); + } + } + // compare deadlines in case of tie. + #[cfg(feature = "scheduler-deadline")] + { + let ld = lhs.metadata.deadline(); + let rd = rhs.metadata.deadline(); + if ld != rd { + return ld.cmp(&rd); + } + } + core::cmp::Ordering::Equal + }); loop { // For each loop, grab any newly pended items -- cgit