aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw')
-rw-r--r--embassy-executor/src/raw/deadline.rs44
-rw-r--r--embassy-executor/src/raw/mod.rs113
-rw-r--r--embassy-executor/src/raw/run_queue.rs213
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs88
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs74
-rw-r--r--embassy-executor/src/raw/state_atomics.rs21
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs15
-rw-r--r--embassy-executor/src/raw/timer_queue.rs73
-rw-r--r--embassy-executor/src/raw/trace.rs173
9 files changed, 513 insertions, 301 deletions
diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs
new file mode 100644
index 000000000..cc89fadb0
--- /dev/null
+++ b/embassy-executor/src/raw/deadline.rs
@@ -0,0 +1,44 @@
1use core::sync::atomic::{AtomicU32, Ordering};
2
3/// A type for interacting with the deadline of the current task
4///
5/// Requires the `scheduler-deadline` feature.
6///
7/// Note: Interacting with the deadline should be done locally in a task.
8/// In theory you could try to set or read the deadline from another task,
9/// but that will result in weird (though not unsound) behavior.
10pub(crate) struct Deadline {
11 instant_ticks_hi: AtomicU32,
12 instant_ticks_lo: AtomicU32,
13}
14
15impl Deadline {
16 pub(crate) const fn new(instant_ticks: u64) -> Self {
17 Self {
18 instant_ticks_hi: AtomicU32::new((instant_ticks >> 32) as u32),
19 instant_ticks_lo: AtomicU32::new(instant_ticks as u32),
20 }
21 }
22
23 pub(crate) const fn new_unset() -> Self {
24 Self::new(Self::UNSET_TICKS)
25 }
26
27 pub(crate) fn set(&self, instant_ticks: u64) {
28 self.instant_ticks_hi
29 .store((instant_ticks >> 32) as u32, Ordering::Relaxed);
30 self.instant_ticks_lo.store(instant_ticks as u32, Ordering::Relaxed);
31 }
32
33 /// Deadline value in ticks, same time base and ticks as `embassy-time`
34 pub(crate) fn instant_ticks(&self) -> u64 {
35 let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64;
36 let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64;
37
38 (hi << 32) | lo
39 }
40
41 /// Sentinel value representing an "unset" deadline, which has lower priority
42 /// than any other set deadline value
43 pub(crate) const UNSET_TICKS: u64 = u64::MAX;
44}
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 88d839e07..9f36c60bc 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,22 +7,28 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 10mod run_queue;
13 11
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] 12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] 13#[cfg_attr(
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 14 all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
15 path = "state_atomics.rs"
16)]
17#[cfg_attr(
18 not(any(target_has_atomic = "8", target_has_atomic = "32")),
19 path = "state_critical_section.rs"
20)]
17mod state; 21mod state;
18 22
19pub mod timer_queue; 23#[cfg(feature = "_any_trace")]
20#[cfg(feature = "trace")] 24pub mod trace;
21mod trace;
22pub(crate) mod util; 25pub(crate) mod util;
23#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
24mod waker; 27mod waker;
25 28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
26use core::future::Future; 32use core::future::Future;
27use core::marker::PhantomData; 33use core::marker::PhantomData;
28use core::mem; 34use core::mem;
@@ -31,8 +37,11 @@ use core::ptr::NonNull;
31#[cfg(not(feature = "arch-avr"))] 37#[cfg(not(feature = "arch-avr"))]
32use core::sync::atomic::AtomicPtr; 38use core::sync::atomic::AtomicPtr;
33use core::sync::atomic::Ordering; 39use core::sync::atomic::Ordering;
34use core::task::{Context, Poll}; 40use core::task::{Context, Poll, Waker};
35 41
42#[cfg(feature = "scheduler-deadline")]
43pub(crate) use deadline::Deadline;
44use embassy_executor_timer_queue::TimerQueueItem;
36#[cfg(feature = "arch-avr")] 45#[cfg(feature = "arch-avr")]
37use portable_atomic::AtomicPtr; 46use portable_atomic::AtomicPtr;
38 47
@@ -41,6 +50,12 @@ use self::state::State;
41use self::util::{SyncUnsafeCell, UninitCell}; 50use self::util::{SyncUnsafeCell, UninitCell};
42pub use self::waker::task_from_waker; 51pub use self::waker::task_from_waker;
43use super::SpawnToken; 52use super::SpawnToken;
53use crate::{Metadata, SpawnError};
54
55#[no_mangle]
56extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
57 unsafe { task_from_waker(waker).timer_queue_item() }
58}
44 59
45/// Raw task header for use in task pointers. 60/// Raw task header for use in task pointers.
46/// 61///
@@ -84,15 +99,21 @@ use super::SpawnToken;
84pub(crate) struct TaskHeader { 99pub(crate) struct TaskHeader {
85 pub(crate) state: State, 100 pub(crate) state: State,
86 pub(crate) run_queue_item: RunQueueItem, 101 pub(crate) run_queue_item: RunQueueItem,
102
87 pub(crate) executor: AtomicPtr<SyncExecutor>, 103 pub(crate) executor: AtomicPtr<SyncExecutor>,
88 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 104 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
89 105
90 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. 106 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
91 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 107 pub(crate) timer_queue_item: TimerQueueItem,
108
109 pub(crate) metadata: Metadata,
110
111 #[cfg(feature = "rtos-trace")]
112 all_tasks_next: AtomicPtr<TaskHeader>,
92} 113}
93 114
94/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 115/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
95#[derive(Clone, Copy, PartialEq)] 116#[derive(Debug, Clone, Copy, PartialEq)]
96pub struct TaskRef { 117pub struct TaskRef {
97 ptr: NonNull<TaskHeader>, 118 ptr: NonNull<TaskHeader>,
98} 119}
@@ -114,29 +135,27 @@ impl TaskRef {
114 } 135 }
115 } 136 }
116 137
117 /// # Safety
118 ///
119 /// The result of this function must only be compared
120 /// for equality, or stored, but not used.
121 pub const unsafe fn dangling() -> Self {
122 Self {
123 ptr: NonNull::dangling(),
124 }
125 }
126
127 pub(crate) fn header(self) -> &'static TaskHeader { 138 pub(crate) fn header(self) -> &'static TaskHeader {
128 unsafe { self.ptr.as_ref() } 139 unsafe { self.ptr.as_ref() }
129 } 140 }
130 141
142 pub(crate) fn metadata(self) -> &'static Metadata {
143 unsafe { &self.ptr.as_ref().metadata }
144 }
145
131 /// Returns a reference to the executor that the task is currently running on. 146 /// Returns a reference to the executor that the task is currently running on.
132 pub unsafe fn executor(self) -> Option<&'static Executor> { 147 pub unsafe fn executor(self) -> Option<&'static Executor> {
133 let executor = self.header().executor.load(Ordering::Relaxed); 148 let executor = self.header().executor.load(Ordering::Relaxed);
134 executor.as_ref().map(|e| Executor::wrap(e)) 149 executor.as_ref().map(|e| Executor::wrap(e))
135 } 150 }
136 151
137 /// Returns a reference to the timer queue item. 152 /// Returns a mutable reference to the timer queue item.
138 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { 153 ///
139 &self.header().timer_queue_item 154 /// Safety
155 ///
156 /// This function must only be called in the context of the integrated timer queue.
157 pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
158 unsafe { &mut self.ptr.as_mut().timer_queue_item }
140 } 159 }
141 160
142 /// The returned pointer is valid for the entire TaskStorage. 161 /// The returned pointer is valid for the entire TaskStorage.
@@ -144,10 +163,10 @@ impl TaskRef {
144 self.ptr.as_ptr() 163 self.ptr.as_ptr()
145 } 164 }
146 165
147 /// Get the ID for a task 166 /// Returns the task ID.
148 #[cfg(feature = "trace")] 167 /// This can be used in combination with rtos-trace to match task names with IDs
149 pub fn as_id(self) -> u32 { 168 pub fn id(&self) -> u32 {
150 self.ptr.as_ptr() as u32 169 self.as_ptr() as u32
151 } 170 }
152} 171}
153 172
@@ -189,7 +208,10 @@ impl<F: Future + 'static> TaskStorage<F> {
189 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 208 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
190 poll_fn: SyncUnsafeCell::new(None), 209 poll_fn: SyncUnsafeCell::new(None),
191 210
192 timer_queue_item: timer_queue::TimerQueueItem::new(), 211 timer_queue_item: TimerQueueItem::new(),
212 metadata: Metadata::new(),
213 #[cfg(feature = "rtos-trace")]
214 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
193 }, 215 },
194 future: UninitCell::uninit(), 216 future: UninitCell::uninit(),
195 } 217 }
@@ -208,11 +230,11 @@ impl<F: Future + 'static> TaskStorage<F> {
208 /// 230 ///
209 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 231 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
210 /// on a different executor. 232 /// on a different executor.
211 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
212 let task = AvailableTask::claim(self); 234 let task = AvailableTask::claim(self);
213 match task { 235 match task {
214 Some(task) => task.initialize(future), 236 Some(task) => Ok(task.initialize(future)),
215 None => SpawnToken::new_failed(), 237 None => Err(SpawnError::Busy),
216 } 238 }
217 } 239 }
218 240
@@ -224,7 +246,7 @@ impl<F: Future + 'static> TaskStorage<F> {
224 let mut cx = Context::from_waker(&waker); 246 let mut cx = Context::from_waker(&waker);
225 match future.poll(&mut cx) { 247 match future.poll(&mut cx) {
226 Poll::Ready(_) => { 248 Poll::Ready(_) => {
227 #[cfg(feature = "trace")] 249 #[cfg(feature = "_any_trace")]
228 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed); 250 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
229 251
230 // As the future has finished and this function will not be called 252 // As the future has finished and this function will not be called
@@ -239,7 +261,7 @@ impl<F: Future + 'static> TaskStorage<F> {
239 // after we're done with it. 261 // after we're done with it.
240 this.raw.state.despawn(); 262 this.raw.state.despawn();
241 263
242 #[cfg(feature = "trace")] 264 #[cfg(feature = "_any_trace")]
243 trace::task_end(exec_ptr, &p); 265 trace::task_end(exec_ptr, &p);
244 } 266 }
245 Poll::Pending => {} 267 Poll::Pending => {}
@@ -274,6 +296,7 @@ impl<F: Future + 'static> AvailableTask<F> {
274 296
275 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { 297 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
276 unsafe { 298 unsafe {
299 self.task.raw.metadata.reset();
277 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 300 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
278 self.task.future.write_in_place(future); 301 self.task.future.write_in_place(future);
279 302
@@ -340,10 +363,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
340 } 363 }
341 } 364 }
342 365
343 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> { 366 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
344 match self.pool.iter().find_map(AvailableTask::claim) { 367 match self.pool.iter().find_map(AvailableTask::claim) {
345 Some(task) => task.initialize_impl::<T>(future), 368 Some(task) => Ok(task.initialize_impl::<T>(future)),
346 None => SpawnToken::new_failed(), 369 None => Err(SpawnError::Busy),
347 } 370 }
348 } 371 }
349 372
@@ -354,7 +377,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
354 /// This will loop over the pool and spawn the task in the first storage that 377 /// This will loop over the pool and spawn the task in the first storage that
355 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 378 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
356 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 379 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
357 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 380 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
358 self.spawn_impl::<F>(future) 381 self.spawn_impl::<F>(future)
359 } 382 }
360 383
@@ -367,7 +390,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
367 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` 390 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
368 /// is an `async fn`, NOT a hand-written `Future`. 391 /// is an `async fn`, NOT a hand-written `Future`.
369 #[doc(hidden)] 392 #[doc(hidden)]
370 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> 393 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
371 where 394 where
372 FutFn: FnOnce() -> F, 395 FutFn: FnOnce() -> F,
373 { 396 {
@@ -412,7 +435,7 @@ impl SyncExecutor {
412 /// - `task` must NOT be already enqueued (in this executor or another one). 435 /// - `task` must NOT be already enqueued (in this executor or another one).
413 #[inline(always)] 436 #[inline(always)]
414 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) { 437 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
415 #[cfg(feature = "trace")] 438 #[cfg(feature = "_any_trace")]
416 trace::task_ready_begin(self, &task); 439 trace::task_ready_begin(self, &task);
417 440
418 if self.run_queue.enqueue(task, l) { 441 if self.run_queue.enqueue(task, l) {
@@ -425,7 +448,7 @@ impl SyncExecutor {
425 .executor 448 .executor
426 .store((self as *const Self).cast_mut(), Ordering::Relaxed); 449 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
427 450
428 #[cfg(feature = "trace")] 451 #[cfg(feature = "_any_trace")]
429 trace::task_new(self, &task); 452 trace::task_new(self, &task);
430 453
431 state::locked(|l| { 454 state::locked(|l| {
@@ -437,23 +460,23 @@ impl SyncExecutor {
437 /// 460 ///
438 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 461 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
439 pub(crate) unsafe fn poll(&'static self) { 462 pub(crate) unsafe fn poll(&'static self) {
440 #[cfg(feature = "trace")] 463 #[cfg(feature = "_any_trace")]
441 trace::poll_start(self); 464 trace::poll_start(self);
442 465
443 self.run_queue.dequeue_all(|p| { 466 self.run_queue.dequeue_all(|p| {
444 let task = p.header(); 467 let task = p.header();
445 468
446 #[cfg(feature = "trace")] 469 #[cfg(feature = "_any_trace")]
447 trace::task_exec_begin(self, &p); 470 trace::task_exec_begin(self, &p);
448 471
449 // Run the task 472 // Run the task
450 task.poll_fn.get().unwrap_unchecked()(p); 473 task.poll_fn.get().unwrap_unchecked()(p);
451 474
452 #[cfg(feature = "trace")] 475 #[cfg(feature = "_any_trace")]
453 trace::task_exec_end(self, &p); 476 trace::task_exec_end(self, &p);
454 }); 477 });
455 478
456 #[cfg(feature = "trace")] 479 #[cfg(feature = "_any_trace")]
457 trace::executor_idle(self) 480 trace::executor_idle(self)
458 } 481 }
459} 482}
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..b8b052310
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,213 @@
1use core::ptr::{addr_of_mut, NonNull};
2
3use cordyceps::sorted_list::Links;
4use cordyceps::Linked;
5#[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
6use cordyceps::SortedList;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = MutexTransferStack<T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
74 self.stack.push_was_empty(
75 task,
76 #[cfg(not(target_has_atomic = "ptr"))]
77 _tok,
78 )
79 }
80
81 /// # Standard atomic runqueue
82 ///
83 /// Empty the queue, then call `on_task` for each task that was in the queue.
84 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
85 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
86 #[cfg(not(any(feature = "scheduler-priority", feature = "scheduler-deadline")))]
87 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
88 let taken = self.stack.take_all();
89 for taskref in taken {
90 run_dequeue(&taskref);
91 on_task(taskref);
92 }
93 }
94
95 /// # Earliest Deadline First Scheduler
96 ///
97 /// This algorithm will loop until all enqueued tasks are processed.
98 ///
99 /// Before polling a task, all currently enqueued tasks will be popped from the
100 /// runqueue, and will be added to the working `sorted` list, a linked-list that
101 /// sorts tasks by their deadline, with nearest deadline items in the front, and
102 /// furthest deadline items in the back.
103 ///
104 /// After popping and sorting all pending tasks, the SOONEST task will be popped
105 /// from the front of the queue, and polled by calling `on_task` on it.
106 ///
107 /// This process will repeat until the local `sorted` queue AND the global
108 /// runqueue are both empty, at which point this function will return.
109 #[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
110 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
111 let mut sorted = SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| {
112 // compare by priority first
113 #[cfg(feature = "scheduler-priority")]
114 {
115 let lp = lhs.metadata.priority();
116 let rp = rhs.metadata.priority();
117 if lp != rp {
118 return lp.cmp(&rp).reverse();
119 }
120 }
121 // compare deadlines in case of tie.
122 #[cfg(feature = "scheduler-deadline")]
123 {
124 let ld = lhs.metadata.deadline();
125 let rd = rhs.metadata.deadline();
126 if ld != rd {
127 return ld.cmp(&rd);
128 }
129 }
130 core::cmp::Ordering::Equal
131 });
132
133 loop {
134 // For each loop, grab any newly pended items
135 let taken = self.stack.take_all();
136
137 // Sort these into the list - this is potentially expensive! We do an
138 // insertion sort of new items, which iterates the linked list.
139 //
140 // Something on the order of `O(n * m)`, where `n` is the number
141 // of new tasks, and `m` is the number of already pending tasks.
142 sorted.extend(taken);
143
144 // Pop the task with the SOONEST deadline. If there are no tasks
145 // pending, then we are done.
146 let Some(taskref) = sorted.pop_front() else {
147 return;
148 };
149
150 // We got one task, mark it as dequeued, and process the task.
151 run_dequeue(&taskref);
152 on_task(taskref);
153 }
154 }
155}
156
157/// atomic state does not require a cs...
158#[cfg(target_has_atomic = "ptr")]
159#[inline(always)]
160fn run_dequeue(taskref: &TaskRef) {
161 taskref.header().state.run_dequeue();
162}
163
164/// ...while non-atomic state does
165#[cfg(not(target_has_atomic = "ptr"))]
166#[inline(always)]
167fn run_dequeue(taskref: &TaskRef) {
168 critical_section::with(|cs| {
169 taskref.header().state.run_dequeue(cs);
170 })
171}
172
173/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
174#[cfg(not(target_has_atomic = "ptr"))]
175struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
176 inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
177}
178
179#[cfg(not(target_has_atomic = "ptr"))]
180impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
181 const fn new() -> Self {
182 Self {
183 inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
184 }
185 }
186
187 /// Push an item to the transfer stack, returning whether the stack was previously empty
188 fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
189 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
190 // for the lifetime of the token, but does NOT protect against re-entrant access.
191 // However, we never *return* the reference, nor do we recurse (or call another method
192 // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
193 // presence of the critical section is sufficient to guarantee exclusive access to
194 // the `inner` field for the purposes of this function.
195 let inner = unsafe { &mut *self.inner.borrow(token).get() };
196 let is_empty = inner.is_empty();
197 inner.push(item);
198 is_empty
199 }
200
201 fn take_all(&self) -> cordyceps::Stack<T> {
202 critical_section::with(|cs| {
203 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
204 // for the lifetime of the token, but does NOT protect against re-entrant access.
205 // However, we never *return* the reference, nor do we recurse (or call another method
206 // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
207 // presence of the critical section is sufficient to guarantee exclusive access to
208 // the `inner` field for the purposes of this function.
209 let inner = unsafe { &mut *self.inner.borrow(cs).get() };
210 inner.take_all()
211 })
212 }
213}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
deleted file mode 100644
index ce511d79a..000000000
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ /dev/null
@@ -1,88 +0,0 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4
5use super::{TaskHeader, TaskRef};
6use crate::raw::util::SyncUnsafeCell;
7
8pub(crate) struct RunQueueItem {
9 next: SyncUnsafeCell<Option<TaskRef>>,
10}
11
12impl RunQueueItem {
13 pub const fn new() -> Self {
14 Self {
15 next: SyncUnsafeCell::new(None),
16 }
17 }
18}
19
20/// Atomic task queue using a very, very simple lock-free linked-list queue:
21///
22/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
23///
24/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
25/// null. Then the batch is iterated following the next pointers until null is reached.
26///
27/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
28/// for our purposes: it can't create fairness problems since the next batch won't run until the
29/// current batch is completely processed, so even if a task enqueues itself instantly (for example
30/// by waking its own waker) can't prevent other tasks from running.
31pub(crate) struct RunQueue {
32 head: AtomicPtr<TaskHeader>,
33}
34
35impl RunQueue {
36 pub const fn new() -> Self {
37 Self {
38 head: AtomicPtr::new(ptr::null_mut()),
39 }
40 }
41
42 /// Enqueues an item. Returns true if the queue was empty.
43 ///
44 /// # Safety
45 ///
46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
49 let mut was_empty = false;
50
51 self.head
52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
53 was_empty = prev.is_null();
54 unsafe {
55 // safety: the pointer is either null or valid
56 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
57 // safety: there are no concurrent accesses to `next`
58 task.header().run_queue_item.next.set(prev);
59 }
60 Some(task.as_ptr() as *mut _)
61 })
62 .ok();
63
64 was_empty
65 }
66
67 /// Empty the queue, then call `on_task` for each task that was in the queue.
68 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
69 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
70 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
71 // Atomically empty the queue.
72 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
73
74 // safety: the pointer is either null or valid
75 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
76
77 // Iterate the linked list of tasks that were previously in the queue.
78 while let Some(task) = next {
79 // If the task re-enqueues itself, the `next` pointer will get overwritten.
80 // Therefore, first read the next pointer, and only then process the task.
81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() };
83
84 task.header().state.run_dequeue();
85 on_task(task);
86 }
87 }
88}
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
deleted file mode 100644
index 86c4085ed..000000000
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ /dev/null
@@ -1,74 +0,0 @@
1use core::cell::Cell;
2
3use critical_section::{CriticalSection, Mutex};
4
5use super::TaskRef;
6
7pub(crate) struct RunQueueItem {
8 next: Mutex<Cell<Option<TaskRef>>>,
9}
10
11impl RunQueueItem {
12 pub const fn new() -> Self {
13 Self {
14 next: Mutex::new(Cell::new(None)),
15 }
16 }
17}
18
19/// Atomic task queue using a very, very simple lock-free linked-list queue:
20///
21/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
22///
23/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
24/// null. Then the batch is iterated following the next pointers until null is reached.
25///
26/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
27/// for our purposes: it can't create fairness problems since the next batch won't run until the
28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
29/// by waking its own waker) can't prevent other tasks from running.
30pub(crate) struct RunQueue {
31 head: Mutex<Cell<Option<TaskRef>>>,
32}
33
34impl RunQueue {
35 pub const fn new() -> Self {
36 Self {
37 head: Mutex::new(Cell::new(None)),
38 }
39 }
40
41 /// Enqueues an item. Returns true if the queue was empty.
42 ///
43 /// # Safety
44 ///
45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
48 let prev = self.head.borrow(cs).replace(Some(task));
49 task.header().run_queue_item.next.borrow(cs).set(prev);
50
51 prev.is_none()
52 }
53
54 /// Empty the queue, then call `on_task` for each task that was in the queue.
55 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
56 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
57 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
58 // Atomically empty the queue.
59 let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
60
61 // Iterate the linked list of tasks that were previously in the queue.
62 while let Some(task) = next {
63 // If the task re-enqueues itself, the `next` pointer will get overwritten.
64 // Therefore, first read the next pointer, and only then process the task.
65
66 critical_section::with(|cs| {
67 next = task.header().run_queue_item.next.borrow(cs).get();
68 task.header().state.run_dequeue(cs);
69 });
70
71 on_task(task);
72 }
73 }
74}
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index b6576bfc2..6675875be 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,4 +1,15 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1// Prefer pointer-width atomic operations, as narrower ones may be slower.
2#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))]
3type AtomicState = core::sync::atomic::AtomicU32;
4#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))]
5type AtomicState = core::sync::atomic::AtomicU8;
6
7#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))]
8type StateBits = u32;
9#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))]
10type StateBits = u8;
11
12use core::sync::atomic::Ordering;
2 13
3#[derive(Clone, Copy)] 14#[derive(Clone, Copy)]
4pub(crate) struct Token(()); 15pub(crate) struct Token(());
@@ -11,18 +22,18 @@ pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
11} 22}
12 23
13/// Task is spawned (has a future) 24/// Task is spawned (has a future)
14pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 25pub(crate) const STATE_SPAWNED: StateBits = 1 << 0;
15/// Task is in the executor run queue 26/// Task is in the executor run queue
16pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 27pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1;
17 28
18pub(crate) struct State { 29pub(crate) struct State {
19 state: AtomicU32, 30 state: AtomicState,
20} 31}
21 32
22impl State { 33impl State {
23 pub const fn new() -> State { 34 pub const fn new() -> State {
24 Self { 35 Self {
25 state: AtomicU32::new(0), 36 state: AtomicState::new(0),
26 } 37 }
27 } 38 }
28 39
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index 6b627ff79..b69a6ac66 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -3,13 +3,18 @@ use core::cell::Cell;
3pub(crate) use critical_section::{with as locked, CriticalSection as Token}; 3pub(crate) use critical_section::{with as locked, CriticalSection as Token};
4use critical_section::{CriticalSection, Mutex}; 4use critical_section::{CriticalSection, Mutex};
5 5
6#[cfg(target_arch = "avr")]
7type StateBits = u8;
8#[cfg(not(target_arch = "avr"))]
9type StateBits = usize;
10
6/// Task is spawned (has a future) 11/// Task is spawned (has a future)
7pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 12pub(crate) const STATE_SPAWNED: StateBits = 1 << 0;
8/// Task is in the executor run queue 13/// Task is in the executor run queue
9pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 14pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1;
10 15
11pub(crate) struct State { 16pub(crate) struct State {
12 state: Mutex<Cell<u32>>, 17 state: Mutex<Cell<StateBits>>,
13} 18}
14 19
15impl State { 20impl State {
@@ -19,11 +24,11 @@ impl State {
19 } 24 }
20 } 25 }
21 26
22 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { 27 fn update<R>(&self, f: impl FnOnce(&mut StateBits) -> R) -> R {
23 critical_section::with(|cs| self.update_with_cs(cs, f)) 28 critical_section::with(|cs| self.update_with_cs(cs, f))
24 } 29 }
25 30
26 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R { 31 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut StateBits) -> R) -> R {
27 let s = self.state.borrow(cs); 32 let s = self.state.borrow(cs);
28 let mut val = s.get(); 33 let mut val = s.get();
29 let r = f(&mut val); 34 let r = f(&mut val);
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
deleted file mode 100644
index e52453be4..000000000
--- a/embassy-executor/src/raw/timer_queue.rs
+++ /dev/null
@@ -1,73 +0,0 @@
1//! Timer queue operations.
2
3use core::cell::Cell;
4
5use super::TaskRef;
6
7#[cfg(feature = "_timer-item-payload")]
8macro_rules! define_opaque {
9 ($size:tt) => {
10 /// An opaque data type.
11 #[repr(align($size))]
12 pub struct OpaqueData {
13 data: [u8; $size],
14 }
15
16 impl OpaqueData {
17 const fn new() -> Self {
18 Self { data: [0; $size] }
19 }
20
21 /// Access the data as a reference to a type `T`.
22 ///
23 /// Safety:
24 ///
25 /// The caller must ensure that the size of the type `T` is less than, or equal to
26 /// the size of the payload, and must ensure that the alignment of the type `T` is
27 /// less than, or equal to the alignment of the payload.
28 ///
29 /// The type must be valid when zero-initialized.
30 pub unsafe fn as_ref<T>(&self) -> &T {
31 &*(self.data.as_ptr() as *const T)
32 }
33 }
34 };
35}
36
37#[cfg(feature = "timer-item-payload-size-1")]
38define_opaque!(1);
39#[cfg(feature = "timer-item-payload-size-2")]
40define_opaque!(2);
41#[cfg(feature = "timer-item-payload-size-4")]
42define_opaque!(4);
43#[cfg(feature = "timer-item-payload-size-8")]
44define_opaque!(8);
45
46/// An item in the timer queue.
47pub struct TimerQueueItem {
48 /// The next item in the queue.
49 ///
50 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
51 /// value of `Some(dangling_pointer)`
52 pub next: Cell<Option<TaskRef>>,
53
54 /// The time at which this item expires.
55 pub expires_at: Cell<u64>,
56
57 /// Some implementation-defined, zero-initialized piece of data.
58 #[cfg(feature = "_timer-item-payload")]
59 pub payload: OpaqueData,
60}
61
62unsafe impl Sync for TimerQueueItem {}
63
64impl TimerQueueItem {
65 pub(crate) const fn new() -> Self {
66 Self {
67 next: Cell::new(None),
68 expires_at: Cell::new(0),
69 #[cfg(feature = "_timer-item-payload")]
70 payload: OpaqueData::new(),
71 }
72 }
73}
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs
index aba519c8f..b3086948c 100644
--- a/embassy-executor/src/raw/trace.rs
+++ b/embassy-executor/src/raw/trace.rs
@@ -81,9 +81,94 @@
81 81
82#![allow(unused)] 82#![allow(unused)]
83 83
84use crate::raw::{SyncExecutor, TaskRef}; 84use core::cell::UnsafeCell;
85use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
85 86
86#[cfg(not(feature = "rtos-trace"))] 87#[cfg(feature = "rtos-trace")]
88use rtos_trace::TaskInfo;
89
90use crate::raw::{SyncExecutor, TaskHeader, TaskRef};
91use crate::spawner::{SpawnError, SpawnToken, Spawner};
92
93/// Global task tracker instance
94///
95/// This static provides access to the global task tracker which maintains
96/// a list of all tasks in the system. It's automatically updated by the
97/// task lifecycle hooks in the trace module.
98#[cfg(feature = "rtos-trace")]
99pub(crate) static TASK_TRACKER: TaskTracker = TaskTracker::new();
100
101/// A thread-safe tracker for all tasks in the system
102///
103/// This struct uses an intrusive linked list approach to track all tasks
104/// without additional memory allocations. It maintains a global list of
105/// tasks that can be traversed to find all currently existing tasks.
106#[cfg(feature = "rtos-trace")]
107pub(crate) struct TaskTracker {
108 head: AtomicPtr<TaskHeader>,
109}
110
111#[cfg(feature = "rtos-trace")]
112impl TaskTracker {
113 /// Creates a new empty task tracker
114 ///
115 /// Initializes a tracker with no tasks in its list.
116 pub const fn new() -> Self {
117 Self {
118 head: AtomicPtr::new(core::ptr::null_mut()),
119 }
120 }
121
122 /// Adds a task to the tracker
123 ///
124 /// This method inserts a task at the head of the intrusive linked list.
125 /// The operation is thread-safe and lock-free, using atomic operations
126 /// to ensure consistency even when called from different contexts.
127 ///
128 /// # Arguments
129 /// * `task` - The task reference to add to the tracker
130 pub fn add(&self, task: TaskRef) {
131 let task_ptr = task.as_ptr();
132
133 loop {
134 let current_head = self.head.load(Ordering::Acquire);
135 unsafe {
136 (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed);
137 }
138
139 if self
140 .head
141 .compare_exchange(current_head, task_ptr.cast_mut(), Ordering::Release, Ordering::Relaxed)
142 .is_ok()
143 {
144 break;
145 }
146 }
147 }
148
149 /// Performs an operation on each task in the tracker
150 ///
151 /// This method traverses the entire list of tasks and calls the provided
152 /// function for each task. This allows inspecting or processing all tasks
153 /// in the system without modifying the tracker's structure.
154 ///
155 /// # Arguments
156 /// * `f` - A function to call for each task in the tracker
157 pub fn for_each<F>(&self, mut f: F)
158 where
159 F: FnMut(TaskRef),
160 {
161 let mut current = self.head.load(Ordering::Acquire);
162 while !current.is_null() {
163 let task = unsafe { TaskRef::from_ptr(current) };
164 f(task);
165
166 current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) };
167 }
168 }
169}
170
171#[cfg(feature = "trace")]
87extern "Rust" { 172extern "Rust" {
88 /// This callback is called when the executor begins polling. This will always 173 /// This callback is called when the executor begins polling. This will always
89 /// be paired with a later call to `_embassy_trace_executor_idle`. 174 /// be paired with a later call to `_embassy_trace_executor_idle`.
@@ -145,7 +230,7 @@ extern "Rust" {
145 230
146#[inline] 231#[inline]
147pub(crate) fn poll_start(executor: &SyncExecutor) { 232pub(crate) fn poll_start(executor: &SyncExecutor) {
148 #[cfg(not(feature = "rtos-trace"))] 233 #[cfg(feature = "trace")]
149 unsafe { 234 unsafe {
150 _embassy_trace_poll_start(executor as *const _ as u32) 235 _embassy_trace_poll_start(executor as *const _ as u32)
151 } 236 }
@@ -153,18 +238,31 @@ pub(crate) fn poll_start(executor: &SyncExecutor) {
153 238
154#[inline] 239#[inline]
155pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) { 240pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) {
156 #[cfg(not(feature = "rtos-trace"))] 241 #[cfg(feature = "trace")]
157 unsafe { 242 unsafe {
158 _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32) 243 _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32)
159 } 244 }
160 245
161 #[cfg(feature = "rtos-trace")] 246 #[cfg(feature = "rtos-trace")]
162 rtos_trace::trace::task_new(task.as_ptr() as u32); 247 {
248 rtos_trace::trace::task_new(task.as_ptr() as u32);
249 let name = task.metadata().name().unwrap_or("unnamed task\0");
250 let info = rtos_trace::TaskInfo {
251 name,
252 priority: 0,
253 stack_base: 0,
254 stack_size: 0,
255 };
256 rtos_trace::trace::task_send_info(task.id(), info);
257 }
258
259 #[cfg(feature = "rtos-trace")]
260 TASK_TRACKER.add(*task);
163} 261}
164 262
165#[inline] 263#[inline]
166pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) { 264pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) {
167 #[cfg(not(feature = "rtos-trace"))] 265 #[cfg(feature = "trace")]
168 unsafe { 266 unsafe {
169 _embassy_trace_task_end(executor as u32, task.as_ptr() as u32) 267 _embassy_trace_task_end(executor as u32, task.as_ptr() as u32)
170 } 268 }
@@ -172,7 +270,7 @@ pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) {
172 270
173#[inline] 271#[inline]
174pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) { 272pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) {
175 #[cfg(not(feature = "rtos-trace"))] 273 #[cfg(feature = "trace")]
176 unsafe { 274 unsafe {
177 _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32) 275 _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32)
178 } 276 }
@@ -182,7 +280,7 @@ pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) {
182 280
183#[inline] 281#[inline]
184pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) { 282pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) {
185 #[cfg(not(feature = "rtos-trace"))] 283 #[cfg(feature = "trace")]
186 unsafe { 284 unsafe {
187 _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32) 285 _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32)
188 } 286 }
@@ -192,7 +290,7 @@ pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) {
192 290
193#[inline] 291#[inline]
194pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) { 292pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) {
195 #[cfg(not(feature = "rtos-trace"))] 293 #[cfg(feature = "trace")]
196 unsafe { 294 unsafe {
197 _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32) 295 _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32)
198 } 296 }
@@ -202,7 +300,7 @@ pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) {
202 300
203#[inline] 301#[inline]
204pub(crate) fn executor_idle(executor: &SyncExecutor) { 302pub(crate) fn executor_idle(executor: &SyncExecutor) {
205 #[cfg(not(feature = "rtos-trace"))] 303 #[cfg(feature = "trace")]
206 unsafe { 304 unsafe {
207 _embassy_trace_executor_idle(executor as *const _ as u32) 305 _embassy_trace_executor_idle(executor as *const _ as u32)
208 } 306 }
@@ -210,10 +308,63 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) {
210 rtos_trace::trace::system_idle(); 308 rtos_trace::trace::system_idle();
211} 309}
212 310
311/// Returns an iterator over all active tasks in the system
312///
313/// This function provides a convenient way to iterate over all tasks
314/// that are currently tracked in the system. The returned iterator
315/// yields each task in the global task tracker.
316///
317/// # Returns
318/// An iterator that yields `TaskRef` items for each task
319#[cfg(feature = "rtos-trace")]
320fn get_all_active_tasks() -> impl Iterator<Item = TaskRef> + 'static {
321 struct TaskIterator<'a> {
322 tracker: &'a TaskTracker,
323 current: *mut TaskHeader,
324 }
325
326 impl<'a> Iterator for TaskIterator<'a> {
327 type Item = TaskRef;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 if self.current.is_null() {
331 return None;
332 }
333
334 let task = unsafe { TaskRef::from_ptr(self.current) };
335 self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) };
336
337 Some(task)
338 }
339 }
340
341 TaskIterator {
342 tracker: &TASK_TRACKER,
343 current: TASK_TRACKER.head.load(Ordering::Acquire),
344 }
345}
346
347/// Perform an action on each active task
348#[cfg(feature = "rtos-trace")]
349fn with_all_active_tasks<F>(f: F)
350where
351 F: FnMut(TaskRef),
352{
353 TASK_TRACKER.for_each(f);
354}
355
213#[cfg(feature = "rtos-trace")] 356#[cfg(feature = "rtos-trace")]
214impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { 357impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor {
215 fn task_list() { 358 fn task_list() {
216 // We don't know what tasks exist, so we can't send them. 359 with_all_active_tasks(|task| {
360 let info = rtos_trace::TaskInfo {
361 name: task.metadata().name().unwrap_or("unnamed task\0"),
362 priority: 0,
363 stack_base: 0,
364 stack_size: 0,
365 };
366 rtos_trace::trace::task_send_info(task.id(), info);
367 });
217 } 368 }
218 fn time() -> u64 { 369 fn time() -> u64 {
219 const fn gcd(a: u64, b: u64) -> u64 { 370 const fn gcd(a: u64, b: u64) -> u64 {