aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw')
-rw-r--r--embassy-executor/src/raw/deadline.rs44
-rw-r--r--embassy-executor/src/raw/mod.rs343
-rw-r--r--embassy-executor/src/raw/run_queue.rs213
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs87
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs75
-rw-r--r--embassy-executor/src/raw/state_atomics.rs74
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs60
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs79
-rw-r--r--embassy-executor/src/raw/timer_queue.rs76
-rw-r--r--embassy-executor/src/raw/trace.rs380
-rw-r--r--embassy-executor/src/raw/waker.rs37
-rw-r--r--embassy-executor/src/raw/waker_turbo.rs4
12 files changed, 910 insertions, 562 deletions
diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs
new file mode 100644
index 000000000..cc89fadb0
--- /dev/null
+++ b/embassy-executor/src/raw/deadline.rs
@@ -0,0 +1,44 @@
1use core::sync::atomic::{AtomicU32, Ordering};
2
3/// A type for interacting with the deadline of the current task
4///
5/// Requires the `scheduler-deadline` feature.
6///
7/// Note: Interacting with the deadline should be done locally in a task.
8/// In theory you could try to set or read the deadline from another task,
9/// but that will result in weird (though not unsound) behavior.
10pub(crate) struct Deadline {
11 instant_ticks_hi: AtomicU32,
12 instant_ticks_lo: AtomicU32,
13}
14
15impl Deadline {
16 pub(crate) const fn new(instant_ticks: u64) -> Self {
17 Self {
18 instant_ticks_hi: AtomicU32::new((instant_ticks >> 32) as u32),
19 instant_ticks_lo: AtomicU32::new(instant_ticks as u32),
20 }
21 }
22
23 pub(crate) const fn new_unset() -> Self {
24 Self::new(Self::UNSET_TICKS)
25 }
26
27 pub(crate) fn set(&self, instant_ticks: u64) {
28 self.instant_ticks_hi
29 .store((instant_ticks >> 32) as u32, Ordering::Relaxed);
30 self.instant_ticks_lo.store(instant_ticks as u32, Ordering::Relaxed);
31 }
32
33 /// Deadline value in ticks, same time base and ticks as `embassy-time`
34 pub(crate) fn instant_ticks(&self) -> u64 {
35 let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64;
36 let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64;
37
38 (hi << 32) | lo
39 }
40
41 /// Sentinel value representing an "unset" deadline, which has lower priority
42 /// than any other set deadline value
43 pub(crate) const UNSET_TICKS: u64 = u64::MAX;
44}
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index d9ea5c005..ab845ed3b 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,54 +7,113 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 10mod run_queue;
13 11
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] 12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] 13#[cfg_attr(
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 14 all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
15 path = "state_atomics.rs"
16)]
17#[cfg_attr(
18 not(any(target_has_atomic = "8", target_has_atomic = "32")),
19 path = "state_critical_section.rs"
20)]
17mod state; 21mod state;
18 22
19#[cfg(feature = "integrated-timers")] 23#[cfg(feature = "_any_trace")]
20mod timer_queue; 24pub mod trace;
21pub(crate) mod util; 25pub(crate) mod util;
22#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
23mod waker; 27mod waker;
24 28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
25use core::future::Future; 32use core::future::Future;
26use core::marker::PhantomData; 33use core::marker::PhantomData;
27use core::mem; 34use core::mem;
28use core::pin::Pin; 35use core::pin::Pin;
29use core::ptr::NonNull; 36use core::ptr::NonNull;
30use core::task::{Context, Poll}; 37#[cfg(not(feature = "arch-avr"))]
38use core::sync::atomic::AtomicPtr;
39use core::sync::atomic::Ordering;
40use core::task::{Context, Poll, Waker};
31 41
32#[cfg(feature = "integrated-timers")] 42#[cfg(feature = "scheduler-deadline")]
33use embassy_time_driver::AlarmHandle; 43pub(crate) use deadline::Deadline;
34#[cfg(feature = "rtos-trace")] 44use embassy_executor_timer_queue::TimerQueueItem;
35use rtos_trace::trace; 45#[cfg(feature = "arch-avr")]
46use portable_atomic::AtomicPtr;
36 47
37use self::run_queue::{RunQueue, RunQueueItem}; 48use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 49use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 50use self::util::{SyncUnsafeCell, UninitCell};
40pub use self::waker::task_from_waker; 51pub use self::waker::task_from_waker;
41use super::SpawnToken; 52use super::SpawnToken;
53use crate::{Metadata, SpawnError};
54
55#[unsafe(no_mangle)]
56extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
57 unsafe { task_from_waker(waker).timer_queue_item() }
58}
42 59
43/// Raw task header for use in task pointers. 60/// Raw task header for use in task pointers.
61///
62/// A task can be in one of the following states:
63///
64/// - Not spawned: the task is ready to spawn.
65/// - `SPAWNED`: the task is currently spawned and may be running.
66/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
67/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
68/// polling the task's future.
69///
70/// A task's complete life cycle is as follows:
71///
72/// ```text
73/// ┌────────────┐ ┌────────────────────────┐
74/// │Not spawned │◄─5┤Not spawned|Run enqueued│
75/// │ ├6─►│ │
76/// └─────┬──────┘ └──────▲─────────────────┘
77/// 1 │
78/// │ ┌────────────┘
79/// │ 4
80/// ┌─────▼────┴─────────┐
81/// │Spawned|Run enqueued│
82/// │ │
83/// └─────┬▲─────────────┘
84/// 2│
85/// │3
86/// ┌─────▼┴─────┐
87/// │ Spawned │
88/// │ │
89/// └────────────┘
90/// ```
91///
92/// Transitions:
93/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
94/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
95/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
96/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
97/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
98/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
44pub(crate) struct TaskHeader { 99pub(crate) struct TaskHeader {
45 pub(crate) state: State, 100 pub(crate) state: State,
46 pub(crate) run_queue_item: RunQueueItem, 101 pub(crate) run_queue_item: RunQueueItem,
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 102
103 pub(crate) executor: AtomicPtr<SyncExecutor>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 104 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 105
50 #[cfg(feature = "integrated-timers")] 106 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>, 107 pub(crate) timer_queue_item: TimerQueueItem,
52 #[cfg(feature = "integrated-timers")] 108
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 109 pub(crate) metadata: Metadata,
110
111 #[cfg(feature = "rtos-trace")]
112 all_tasks_next: AtomicPtr<TaskHeader>,
54} 113}
55 114
56/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 115/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
57#[derive(Clone, Copy)] 116#[derive(Debug, Clone, Copy, PartialEq)]
58pub struct TaskRef { 117pub struct TaskRef {
59 ptr: NonNull<TaskHeader>, 118 ptr: NonNull<TaskHeader>,
60} 119}
@@ -80,10 +139,35 @@ impl TaskRef {
80 unsafe { self.ptr.as_ref() } 139 unsafe { self.ptr.as_ref() }
81 } 140 }
82 141
142 pub(crate) fn metadata(self) -> &'static Metadata {
143 unsafe { &self.ptr.as_ref().metadata }
144 }
145
146 /// Returns a reference to the executor that the task is currently running on.
147 pub unsafe fn executor(self) -> Option<&'static Executor> {
148 let executor = self.header().executor.load(Ordering::Relaxed);
149 executor.as_ref().map(|e| Executor::wrap(e))
150 }
151
152 /// Returns a mutable reference to the timer queue item.
153 ///
154 /// Safety
155 ///
156 /// This function must only be called in the context of the integrated timer queue.
157 pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
158 unsafe { &mut self.ptr.as_mut().timer_queue_item }
159 }
160
83 /// The returned pointer is valid for the entire TaskStorage. 161 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 162 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 163 self.ptr.as_ptr()
86 } 164 }
165
166 /// Returns the task ID.
167 /// This can be used in combination with rtos-trace to match task names with IDs
168 pub fn id(&self) -> u32 {
169 self.as_ptr() as u32
170 }
87} 171}
88 172
89/// Raw storage in which a task can be spawned. 173/// Raw storage in which a task can be spawned.
@@ -107,6 +191,10 @@ pub struct TaskStorage<F: Future + 'static> {
107 future: UninitCell<F>, // Valid if STATE_SPAWNED 191 future: UninitCell<F>, // Valid if STATE_SPAWNED
108} 192}
109 193
194unsafe fn poll_exited(_p: TaskRef) {
195 // Nothing to do, the task is already !SPAWNED and dequeued.
196}
197
110impl<F: Future + 'static> TaskStorage<F> { 198impl<F: Future + 'static> TaskStorage<F> {
111 const NEW: Self = Self::new(); 199 const NEW: Self = Self::new();
112 200
@@ -116,14 +204,14 @@ impl<F: Future + 'static> TaskStorage<F> {
116 raw: TaskHeader { 204 raw: TaskHeader {
117 state: State::new(), 205 state: State::new(),
118 run_queue_item: RunQueueItem::new(), 206 run_queue_item: RunQueueItem::new(),
119 executor: SyncUnsafeCell::new(None), 207 executor: AtomicPtr::new(core::ptr::null_mut()),
120 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 208 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
121 poll_fn: SyncUnsafeCell::new(None), 209 poll_fn: SyncUnsafeCell::new(None),
122 210
123 #[cfg(feature = "integrated-timers")] 211 timer_queue_item: TimerQueueItem::new(),
124 expires_at: SyncUnsafeCell::new(0), 212 metadata: Metadata::new(),
125 #[cfg(feature = "integrated-timers")] 213 #[cfg(feature = "rtos-trace")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 214 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
127 }, 215 },
128 future: UninitCell::uninit(), 216 future: UninitCell::uninit(),
129 } 217 }
@@ -142,27 +230,39 @@ impl<F: Future + 'static> TaskStorage<F> {
142 /// 230 ///
143 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 231 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
144 /// on a different executor. 232 /// on a different executor.
145 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
146 let task = AvailableTask::claim(self); 234 let task = AvailableTask::claim(self);
147 match task { 235 match task {
148 Some(task) => task.initialize(future), 236 Some(task) => Ok(task.initialize(future)),
149 None => SpawnToken::new_failed(), 237 None => Err(SpawnError::Busy),
150 } 238 }
151 } 239 }
152 240
153 unsafe fn poll(p: TaskRef) { 241 unsafe fn poll(p: TaskRef) {
154 let this = &*(p.as_ptr() as *const TaskStorage<F>); 242 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
155 243
156 let future = Pin::new_unchecked(this.future.as_mut()); 244 let future = Pin::new_unchecked(this.future.as_mut());
157 let waker = waker::from_task(p); 245 let waker = waker::from_task(p);
158 let mut cx = Context::from_waker(&waker); 246 let mut cx = Context::from_waker(&waker);
159 match future.poll(&mut cx) { 247 match future.poll(&mut cx) {
160 Poll::Ready(_) => { 248 Poll::Ready(_) => {
249 #[cfg(feature = "_any_trace")]
250 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
251
252 // As the future has finished and this function will not be called
253 // again, we can safely drop the future here.
161 this.future.drop_in_place(); 254 this.future.drop_in_place();
255
256 // We replace the poll_fn with a despawn function, so that the task is cleaned up
257 // when the executor polls it next.
258 this.raw.poll_fn.set(Some(poll_exited));
259
260 // Make sure we despawn last, so that other threads can only spawn the task
261 // after we're done with it.
162 this.raw.state.despawn(); 262 this.raw.state.despawn();
163 263
164 #[cfg(feature = "integrated-timers")] 264 #[cfg(feature = "_any_trace")]
165 this.raw.expires_at.set(u64::MAX); 265 trace::task_end(exec_ptr, &p);
166 } 266 }
167 Poll::Pending => {} 267 Poll::Pending => {}
168 } 268 }
@@ -196,6 +296,7 @@ impl<F: Future + 'static> AvailableTask<F> {
196 296
197 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { 297 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
198 unsafe { 298 unsafe {
299 self.task.raw.metadata.reset();
199 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 300 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
200 self.task.future.write_in_place(future); 301 self.task.future.write_in_place(future);
201 302
@@ -262,10 +363,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
262 } 363 }
263 } 364 }
264 365
265 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> { 366 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
266 match self.pool.iter().find_map(AvailableTask::claim) { 367 match self.pool.iter().find_map(AvailableTask::claim) {
267 Some(task) => task.initialize_impl::<T>(future), 368 Some(task) => Ok(task.initialize_impl::<T>(future)),
268 None => SpawnToken::new_failed(), 369 None => Err(SpawnError::Busy),
269 } 370 }
270 } 371 }
271 372
@@ -276,7 +377,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
276 /// This will loop over the pool and spawn the task in the first storage that 377 /// This will loop over the pool and spawn the task in the first storage that
277 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 378 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
278 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 379 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
279 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 380 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
280 self.spawn_impl::<F>(future) 381 self.spawn_impl::<F>(future)
281 } 382 }
282 383
@@ -289,7 +390,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
289 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` 390 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
290 /// is an `async fn`, NOT a hand-written `Future`. 391 /// is an `async fn`, NOT a hand-written `Future`.
291 #[doc(hidden)] 392 #[doc(hidden)]
292 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> 393 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
293 where 394 where
294 FutFn: FnOnce() -> F, 395 FutFn: FnOnce() -> F,
295 { 396 {
@@ -306,7 +407,7 @@ unsafe impl Sync for Pender {}
306 407
307impl Pender { 408impl Pender {
308 pub(crate) fn pend(self) { 409 pub(crate) fn pend(self) {
309 extern "Rust" { 410 unsafe extern "Rust" {
310 fn __pender(context: *mut ()); 411 fn __pender(context: *mut ());
311 } 412 }
312 unsafe { __pender(self.0) }; 413 unsafe { __pender(self.0) };
@@ -316,26 +417,13 @@ impl Pender {
316pub(crate) struct SyncExecutor { 417pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 418 run_queue: RunQueue,
318 pender: Pender, 419 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 420}
325 421
326impl SyncExecutor { 422impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 423 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 424 Self {
332 run_queue: RunQueue::new(), 425 run_queue: RunQueue::new(),
333 pender, 426 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 427 }
340 } 428 }
341 429
@@ -346,90 +434,50 @@ impl SyncExecutor {
346 /// - `task` must be set up to run in this executor. 434 /// - `task` must be set up to run in this executor.
347 /// - `task` must NOT be already enqueued (in this executor or another one). 435 /// - `task` must NOT be already enqueued (in this executor or another one).
348 #[inline(always)] 436 #[inline(always)]
349 unsafe fn enqueue(&self, task: TaskRef) { 437 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
350 #[cfg(feature = "rtos-trace")] 438 #[cfg(feature = "_any_trace")]
351 trace::task_ready_begin(task.as_ptr() as u32); 439 trace::task_ready_begin(self, &task);
352 440
353 if self.run_queue.enqueue(task) { 441 if self.run_queue.enqueue(task, l) {
354 self.pender.pend(); 442 self.pender.pend();
355 } 443 }
356 } 444 }
357 445
358 #[cfg(feature = "integrated-timers")]
359 fn alarm_callback(ctx: *mut ()) {
360 let this: &Self = unsafe { &*(ctx as *const Self) };
361 this.pender.pend();
362 }
363
364 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 446 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
365 task.header().executor.set(Some(self)); 447 task.header()
448 .executor
449 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
366 450
367 #[cfg(feature = "rtos-trace")] 451 #[cfg(feature = "_any_trace")]
368 trace::task_new(task.as_ptr() as u32); 452 trace::task_new(self, &task);
369 453
370 self.enqueue(task); 454 state::locked(|l| {
455 self.enqueue(task, l);
456 })
371 } 457 }
372 458
373 /// # Safety 459 /// # Safety
374 /// 460 ///
375 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 461 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
376 pub(crate) unsafe fn poll(&'static self) { 462 pub(crate) unsafe fn poll(&'static self) {
377 #[cfg(feature = "integrated-timers")] 463 #[cfg(feature = "_any_trace")]
378 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); 464 trace::poll_start(self);
379
380 #[allow(clippy::never_loop)]
381 loop {
382 #[cfg(feature = "integrated-timers")]
383 self.timer_queue
384 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
385
386 self.run_queue.dequeue_all(|p| {
387 let task = p.header();
388
389 #[cfg(feature = "integrated-timers")]
390 task.expires_at.set(u64::MAX);
391
392 if !task.state.run_dequeue() {
393 // If task is not running, ignore it. This can happen in the following scenario:
394 // - Task gets dequeued, poll starts
395 // - While task is being polled, it gets woken. It gets placed in the queue.
396 // - Task poll finishes, returning done=true
397 // - RUNNING bit is cleared, but the task is already in the queue.
398 return;
399 }
400 465
401 #[cfg(feature = "rtos-trace")] 466 self.run_queue.dequeue_all(|p| {
402 trace::task_exec_begin(p.as_ptr() as u32); 467 let task = p.header();
403 468
404 // Run the task 469 #[cfg(feature = "_any_trace")]
405 task.poll_fn.get().unwrap_unchecked()(p); 470 trace::task_exec_begin(self, &p);
406 471
407 #[cfg(feature = "rtos-trace")] 472 // Run the task
408 trace::task_exec_end(); 473 task.poll_fn.get().unwrap_unchecked()(p);
409
410 // Enqueue or update into timer_queue
411 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.update(p);
413 });
414
415 #[cfg(feature = "integrated-timers")]
416 {
417 // If this is already in the past, set_alarm might return false
418 // In that case do another poll loop iteration.
419 let next_expiration = self.timer_queue.next_expiration();
420 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
421 break;
422 }
423 }
424 474
425 #[cfg(not(feature = "integrated-timers"))] 475 #[cfg(feature = "_any_trace")]
426 { 476 trace::task_exec_end(self, &p);
427 break; 477 });
428 }
429 }
430 478
431 #[cfg(feature = "rtos-trace")] 479 #[cfg(feature = "_any_trace")]
432 trace::system_idle(); 480 trace::executor_idle(self)
433 } 481 }
434} 482}
435 483
@@ -459,7 +507,7 @@ impl SyncExecutor {
459/// The pender function must be exported with the name `__pender` and have the following signature: 507/// The pender function must be exported with the name `__pender` and have the following signature:
460/// 508///
461/// ```rust 509/// ```rust
462/// #[export_name = "__pender"] 510/// #[unsafe(export_name = "__pender")]
463/// fn pender(context: *mut ()) { 511/// fn pender(context: *mut ()) {
464/// // schedule `poll()` to be called 512/// // schedule `poll()` to be called
465/// } 513/// }
@@ -533,6 +581,11 @@ impl Executor {
533 pub fn spawner(&'static self) -> super::Spawner { 581 pub fn spawner(&'static self) -> super::Spawner {
534 super::Spawner::new(self) 582 super::Spawner::new(self)
535 } 583 }
584
585 /// Get a unique ID for this Executor.
586 pub fn id(&'static self) -> usize {
587 &self.inner as *const SyncExecutor as usize
588 }
536} 589}
537 590
538/// Wake a task by `TaskRef`. 591/// Wake a task by `TaskRef`.
@@ -540,13 +593,13 @@ impl Executor {
540/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 593/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
541pub fn wake_task(task: TaskRef) { 594pub fn wake_task(task: TaskRef) {
542 let header = task.header(); 595 let header = task.header();
543 if header.state.run_enqueue() { 596 header.state.run_enqueue(|l| {
544 // We have just marked the task as scheduled, so enqueue it. 597 // We have just marked the task as scheduled, so enqueue it.
545 unsafe { 598 unsafe {
546 let executor = header.executor.get().unwrap_unchecked(); 599 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
547 executor.enqueue(task); 600 executor.enqueue(task, l);
548 } 601 }
549 } 602 });
550} 603}
551 604
552/// Wake a task by `TaskRef` without calling pend. 605/// Wake a task by `TaskRef` without calling pend.
@@ -554,57 +607,11 @@ pub fn wake_task(task: TaskRef) {
554/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 607/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
555pub fn wake_task_no_pend(task: TaskRef) { 608pub fn wake_task_no_pend(task: TaskRef) {
556 let header = task.header(); 609 let header = task.header();
557 if header.state.run_enqueue() { 610 header.state.run_enqueue(|l| {
558 // We have just marked the task as scheduled, so enqueue it. 611 // We have just marked the task as scheduled, so enqueue it.
559 unsafe { 612 unsafe {
560 let executor = header.executor.get().unwrap_unchecked(); 613 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
561 executor.run_queue.enqueue(task); 614 executor.run_queue.enqueue(task, l);
562 } 615 }
563 } 616 });
564} 617}
565
566#[cfg(feature = "integrated-timers")]
567struct TimerQueue;
568
569#[cfg(feature = "integrated-timers")]
570impl embassy_time_queue_driver::TimerQueue for TimerQueue {
571 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
572 let task = waker::task_from_waker(waker);
573 let task = task.header();
574 unsafe {
575 let expires_at = task.expires_at.get();
576 task.expires_at.set(expires_at.min(at));
577 }
578 }
579}
580
581#[cfg(feature = "integrated-timers")]
582embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
583
584#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))]
585const fn gcd(a: u64, b: u64) -> u64 {
586 if b == 0 {
587 a
588 } else {
589 gcd(b, a % b)
590 }
591}
592
593#[cfg(feature = "rtos-trace")]
594impl rtos_trace::RtosTraceOSCallbacks for Executor {
595 fn task_list() {
596 // We don't know what tasks exist, so we can't send them.
597 }
598 #[cfg(feature = "integrated-timers")]
599 fn time() -> u64 {
600 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
601 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
602 }
603 #[cfg(not(feature = "integrated-timers"))]
604 fn time() -> u64 {
605 0
606 }
607}
608
609#[cfg(feature = "rtos-trace")]
610rtos_trace::global_os_callbacks! {Executor}
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..6f2abdbd0
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,213 @@
1use core::ptr::{NonNull, addr_of_mut};
2
3use cordyceps::Linked;
4#[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
5use cordyceps::SortedList;
6use cordyceps::sorted_list::Links;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = MutexTransferStack<T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
74 self.stack.push_was_empty(
75 task,
76 #[cfg(not(target_has_atomic = "ptr"))]
77 _tok,
78 )
79 }
80
81 /// # Standard atomic runqueue
82 ///
83 /// Empty the queue, then call `on_task` for each task that was in the queue.
84 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
85 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
86 #[cfg(not(any(feature = "scheduler-priority", feature = "scheduler-deadline")))]
87 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
88 let taken = self.stack.take_all();
89 for taskref in taken {
90 run_dequeue(&taskref);
91 on_task(taskref);
92 }
93 }
94
95 /// # Earliest Deadline First Scheduler
96 ///
97 /// This algorithm will loop until all enqueued tasks are processed.
98 ///
99 /// Before polling a task, all currently enqueued tasks will be popped from the
100 /// runqueue, and will be added to the working `sorted` list, a linked-list that
101 /// sorts tasks by their deadline, with nearest deadline items in the front, and
102 /// furthest deadline items in the back.
103 ///
104 /// After popping and sorting all pending tasks, the SOONEST task will be popped
105 /// from the front of the queue, and polled by calling `on_task` on it.
106 ///
107 /// This process will repeat until the local `sorted` queue AND the global
108 /// runqueue are both empty, at which point this function will return.
109 #[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
110 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
111 let mut sorted = SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| {
112 // compare by priority first
113 #[cfg(feature = "scheduler-priority")]
114 {
115 let lp = lhs.metadata.priority();
116 let rp = rhs.metadata.priority();
117 if lp != rp {
118 return lp.cmp(&rp).reverse();
119 }
120 }
121 // compare deadlines in case of tie.
122 #[cfg(feature = "scheduler-deadline")]
123 {
124 let ld = lhs.metadata.deadline();
125 let rd = rhs.metadata.deadline();
126 if ld != rd {
127 return ld.cmp(&rd);
128 }
129 }
130 core::cmp::Ordering::Equal
131 });
132
133 loop {
134 // For each loop, grab any newly pended items
135 let taken = self.stack.take_all();
136
137 // Sort these into the list - this is potentially expensive! We do an
138 // insertion sort of new items, which iterates the linked list.
139 //
140 // Something on the order of `O(n * m)`, where `n` is the number
141 // of new tasks, and `m` is the number of already pending tasks.
142 sorted.extend(taken);
143
144 // Pop the task with the SOONEST deadline. If there are no tasks
145 // pending, then we are done.
146 let Some(taskref) = sorted.pop_front() else {
147 return;
148 };
149
150 // We got one task, mark it as dequeued, and process the task.
151 run_dequeue(&taskref);
152 on_task(taskref);
153 }
154 }
155}
156
157/// atomic state does not require a cs...
158#[cfg(target_has_atomic = "ptr")]
159#[inline(always)]
160fn run_dequeue(taskref: &TaskRef) {
161 taskref.header().state.run_dequeue();
162}
163
164/// ...while non-atomic state does
165#[cfg(not(target_has_atomic = "ptr"))]
166#[inline(always)]
167fn run_dequeue(taskref: &TaskRef) {
168 critical_section::with(|cs| {
169 taskref.header().state.run_dequeue(cs);
170 })
171}
172
173/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
174#[cfg(not(target_has_atomic = "ptr"))]
175struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
176 inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
177}
178
179#[cfg(not(target_has_atomic = "ptr"))]
180impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
181 const fn new() -> Self {
182 Self {
183 inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
184 }
185 }
186
187 /// Push an item to the transfer stack, returning whether the stack was previously empty
188 fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
189 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
190 // for the lifetime of the token, but does NOT protect against re-entrant access.
191 // However, we never *return* the reference, nor do we recurse (or call another method
192 // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
193 // presence of the critical section is sufficient to guarantee exclusive access to
194 // the `inner` field for the purposes of this function.
195 let inner = unsafe { &mut *self.inner.borrow(token).get() };
196 let is_empty = inner.is_empty();
197 inner.push(item);
198 is_empty
199 }
200
201 fn take_all(&self) -> cordyceps::Stack<T> {
202 critical_section::with(|cs| {
203 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
204 // for the lifetime of the token, but does NOT protect against re-entrant access.
205 // However, we never *return* the reference, nor do we recurse (or call another method
206 // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
207 // presence of the critical section is sufficient to guarantee exclusive access to
208 // the `inner` field for the purposes of this function.
209 let inner = unsafe { &mut *self.inner.borrow(cs).get() };
210 inner.take_all()
211 })
212 }
213}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
deleted file mode 100644
index 90907cfda..000000000
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ /dev/null
@@ -1,87 +0,0 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4
5use super::{TaskHeader, TaskRef};
6use crate::raw::util::SyncUnsafeCell;
7
8pub(crate) struct RunQueueItem {
9 next: SyncUnsafeCell<Option<TaskRef>>,
10}
11
12impl RunQueueItem {
13 pub const fn new() -> Self {
14 Self {
15 next: SyncUnsafeCell::new(None),
16 }
17 }
18}
19
20/// Atomic task queue using a very, very simple lock-free linked-list queue:
21///
22/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
23///
24/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
25/// null. Then the batch is iterated following the next pointers until null is reached.
26///
27/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
28/// for our purposes: it can't create fairness problems since the next batch won't run until the
29/// current batch is completely processed, so even if a task enqueues itself instantly (for example
30/// by waking its own waker) can't prevent other tasks from running.
31pub(crate) struct RunQueue {
32 head: AtomicPtr<TaskHeader>,
33}
34
35impl RunQueue {
36 pub const fn new() -> Self {
37 Self {
38 head: AtomicPtr::new(ptr::null_mut()),
39 }
40 }
41
42 /// Enqueues an item. Returns true if the queue was empty.
43 ///
44 /// # Safety
45 ///
46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
49 let mut was_empty = false;
50
51 self.head
52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
53 was_empty = prev.is_null();
54 unsafe {
55 // safety: the pointer is either null or valid
56 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
57 // safety: there are no concurrent accesses to `next`
58 task.header().run_queue_item.next.set(prev);
59 }
60 Some(task.as_ptr() as *mut _)
61 })
62 .ok();
63
64 was_empty
65 }
66
67 /// Empty the queue, then call `on_task` for each task that was in the queue.
68 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
69 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
70 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
71 // Atomically empty the queue.
72 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
73
74 // safety: the pointer is either null or valid
75 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
76
77 // Iterate the linked list of tasks that were previously in the queue.
78 while let Some(task) = next {
79 // If the task re-enqueues itself, the `next` pointer will get overwritten.
80 // Therefore, first read the next pointer, and only then process the task.
81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() };
83
84 on_task(task);
85 }
86 }
87}
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
deleted file mode 100644
index ba59c8f29..000000000
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ /dev/null
@@ -1,75 +0,0 @@
1use core::cell::Cell;
2
3use critical_section::{CriticalSection, Mutex};
4
5use super::TaskRef;
6
7pub(crate) struct RunQueueItem {
8 next: Mutex<Cell<Option<TaskRef>>>,
9}
10
11impl RunQueueItem {
12 pub const fn new() -> Self {
13 Self {
14 next: Mutex::new(Cell::new(None)),
15 }
16 }
17}
18
19/// Atomic task queue using a very, very simple lock-free linked-list queue:
20///
21/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
22///
23/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
24/// null. Then the batch is iterated following the next pointers until null is reached.
25///
26/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
27/// for our purposes: it can't create fairness problems since the next batch won't run until the
28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
29/// by waking its own waker) can't prevent other tasks from running.
30pub(crate) struct RunQueue {
31 head: Mutex<Cell<Option<TaskRef>>>,
32}
33
34impl RunQueue {
35 pub const fn new() -> Self {
36 Self {
37 head: Mutex::new(Cell::new(None)),
38 }
39 }
40
41 /// Enqueues an item. Returns true if the queue was empty.
42 ///
43 /// # Safety
44 ///
45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
48 critical_section::with(|cs| {
49 let prev = self.head.borrow(cs).replace(Some(task));
50 task.header().run_queue_item.next.borrow(cs).set(prev);
51
52 prev.is_none()
53 })
54 }
55
56 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue.
61 let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
62
63 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = next {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task.
67
68 // safety: we know if the task is enqueued, no one else will touch the `next` pointer.
69 let cs = unsafe { CriticalSection::new() };
70 next = task.header().run_queue_item.next.borrow(cs).get();
71
72 on_task(task);
73 }
74 }
75}
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e1279ac0b..6675875be 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,21 +1,39 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1// Prefer pointer-width atomic operations, as narrower ones may be slower.
2#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))]
3type AtomicState = core::sync::atomic::AtomicU32;
4#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))]
5type AtomicState = core::sync::atomic::AtomicU8;
6
7#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))]
8type StateBits = u32;
9#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))]
10type StateBits = u8;
11
12use core::sync::atomic::Ordering;
13
14#[derive(Clone, Copy)]
15pub(crate) struct Token(());
16
17/// Creates a token and passes it to the closure.
18///
19/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
20pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
21 f(Token(()))
22}
2 23
3/// Task is spawned (has a future) 24/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 25pub(crate) const STATE_SPAWNED: StateBits = 1 << 0;
5/// Task is in the executor run queue 26/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 27pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1;
7/// Task is in the executor timer queue
8#[cfg(feature = "integrated-timers")]
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
10 28
11pub(crate) struct State { 29pub(crate) struct State {
12 state: AtomicU32, 30 state: AtomicState,
13} 31}
14 32
15impl State { 33impl State {
16 pub const fn new() -> State { 34 pub const fn new() -> State {
17 Self { 35 Self {
18 state: AtomicU32::new(0), 36 state: AtomicState::new(0),
19 } 37 }
20 } 38 }
21 39
@@ -33,41 +51,19 @@ impl State {
33 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 51 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
34 } 52 }
35 53
36 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 54 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
55 /// function if the task was successfully marked.
37 #[inline(always)] 56 #[inline(always)]
38 pub fn run_enqueue(&self) -> bool { 57 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
39 self.state 58 let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel);
40 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { 59 if prev & STATE_RUN_QUEUED == 0 {
41 // If already scheduled, or if not started, 60 locked(f);
42 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 61 }
43 None
44 } else {
45 // Mark it as scheduled
46 Some(state | STATE_RUN_QUEUED)
47 }
48 })
49 .is_ok()
50 } 62 }
51 63
52 /// Unmark the task as run-queued. Return whether the task is spawned. 64 /// Unmark the task as run-queued. Return whether the task is spawned.
53 #[inline(always)] 65 #[inline(always)]
54 pub fn run_dequeue(&self) -> bool { 66 pub fn run_dequeue(&self) {
55 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 67 self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
56 state & STATE_SPAWNED != 0
57 }
58
59 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
60 #[cfg(feature = "integrated-timers")]
61 #[inline(always)]
62 pub fn timer_enqueue(&self) -> bool {
63 let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
64 old_state & STATE_TIMER_QUEUED == 0
65 }
66
67 /// Unmark the task as timer-queued.
68 #[cfg(feature = "integrated-timers")]
69 #[inline(always)]
70 pub fn timer_dequeue(&self) {
71 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
72 } 68 }
73} 69}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index e4dfe5093..f68de955f 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,5 +1,14 @@
1use core::arch::asm; 1use core::sync::atomic::{AtomicBool, AtomicU32, Ordering, compiler_fence};
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 2
3#[derive(Clone, Copy)]
4pub(crate) struct Token(());
5
6/// Creates a token and passes it to the closure.
7///
8/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
9pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
10 f(Token(()))
11}
3 12
4// Must be kept in sync with the layout of `State`! 13// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 14pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
@@ -11,9 +20,8 @@ pub(crate) struct State {
11 spawned: AtomicBool, 20 spawned: AtomicBool,
12 /// Task is in the executor run queue 21 /// Task is in the executor run queue
13 run_queued: AtomicBool, 22 run_queued: AtomicBool,
14 /// Task is in the executor timer queue
15 timer_queued: AtomicBool,
16 pad: AtomicBool, 23 pad: AtomicBool,
24 pad2: AtomicBool,
17} 25}
18 26
19impl State { 27impl State {
@@ -21,8 +29,8 @@ impl State {
21 Self { 29 Self {
22 spawned: AtomicBool::new(false), 30 spawned: AtomicBool::new(false),
23 run_queued: AtomicBool::new(false), 31 run_queued: AtomicBool::new(false),
24 timer_queued: AtomicBool::new(false),
25 pad: AtomicBool::new(false), 32 pad: AtomicBool::new(false),
33 pad2: AtomicBool::new(false),
26 } 34 }
27 } 35 }
28 36
@@ -54,50 +62,22 @@ impl State {
54 self.spawned.store(false, Ordering::Relaxed); 62 self.spawned.store(false, Ordering::Relaxed);
55 } 63 }
56 64
57 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 65 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
66 /// function if the task was successfully marked.
58 #[inline(always)] 67 #[inline(always)]
59 pub fn run_enqueue(&self) -> bool { 68 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
60 unsafe { 69 let old = self.run_queued.swap(true, Ordering::AcqRel);
61 loop {
62 let state: u32;
63 asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));
64 70
65 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 71 if !old {
66 asm!("clrex", options(nomem, nostack)); 72 locked(f);
67 return false;
68 }
69
70 let outcome: usize;
71 let new_state = state | STATE_RUN_QUEUED;
72 asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
73 if outcome == 0 {
74 return true;
75 }
76 }
77 } 73 }
78 } 74 }
79 75
80 /// Unmark the task as run-queued. Return whether the task is spawned. 76 /// Unmark the task as run-queued. Return whether the task is spawned.
81 #[inline(always)] 77 #[inline(always)]
82 pub fn run_dequeue(&self) -> bool { 78 pub fn run_dequeue(&self) {
83 compiler_fence(Ordering::Release); 79 compiler_fence(Ordering::Release);
84 80
85 let r = self.spawned.load(Ordering::Relaxed);
86 self.run_queued.store(false, Ordering::Relaxed); 81 self.run_queued.store(false, Ordering::Relaxed);
87 r
88 }
89
90 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
91 #[cfg(feature = "integrated-timers")]
92 #[inline(always)]
93 pub fn timer_enqueue(&self) -> bool {
94 !self.timer_queued.swap(true, Ordering::Relaxed)
95 }
96
97 /// Unmark the task as timer-queued.
98 #[cfg(feature = "integrated-timers")]
99 #[inline(always)]
100 pub fn timer_dequeue(&self) {
101 self.timer_queued.store(false, Ordering::Relaxed);
102 } 82 }
103} 83}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index c3cc1b0b7..8d7ef2892 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -1,17 +1,20 @@
1use core::cell::Cell; 1use core::cell::Cell;
2 2
3use critical_section::Mutex; 3use critical_section::{CriticalSection, Mutex};
4pub(crate) use critical_section::{CriticalSection as Token, with as locked};
5
6#[cfg(target_arch = "avr")]
7type StateBits = u8;
8#[cfg(not(target_arch = "avr"))]
9type StateBits = usize;
4 10
5/// Task is spawned (has a future) 11/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 12pub(crate) const STATE_SPAWNED: StateBits = 1 << 0;
7/// Task is in the executor run queue 13/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 14pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1;
9/// Task is in the executor timer queue
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
12 15
13pub(crate) struct State { 16pub(crate) struct State {
14 state: Mutex<Cell<u32>>, 17 state: Mutex<Cell<StateBits>>,
15} 18}
16 19
17impl State { 20impl State {
@@ -21,14 +24,16 @@ impl State {
21 } 24 }
22 } 25 }
23 26
24 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { 27 fn update<R>(&self, f: impl FnOnce(&mut StateBits) -> R) -> R {
25 critical_section::with(|cs| { 28 critical_section::with(|cs| self.update_with_cs(cs, f))
26 let s = self.state.borrow(cs); 29 }
27 let mut val = s.get(); 30
28 let r = f(&mut val); 31 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut StateBits) -> R) -> R {
29 s.set(val); 32 let s = self.state.borrow(cs);
30 r 33 let mut val = s.get();
31 }) 34 let r = f(&mut val);
35 s.set(val);
36 r
32 } 37 }
33 38
34 /// If task is idle, mark it as spawned + run_queued and return true. 39 /// If task is idle, mark it as spawned + run_queued and return true.
@@ -50,44 +55,24 @@ impl State {
50 self.update(|s| *s &= !STATE_SPAWNED); 55 self.update(|s| *s &= !STATE_SPAWNED);
51 } 56 }
52 57
53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 58 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
59 /// function if the task was successfully marked.
54 #[inline(always)] 60 #[inline(always)]
55 pub fn run_enqueue(&self) -> bool { 61 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
56 self.update(|s| { 62 critical_section::with(|cs| {
57 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { 63 if self.update_with_cs(cs, |s| {
58 false 64 let ok = *s & STATE_RUN_QUEUED == 0;
59 } else {
60 *s |= STATE_RUN_QUEUED; 65 *s |= STATE_RUN_QUEUED;
61 true 66 ok
67 }) {
68 f(cs);
62 } 69 }
63 }) 70 });
64 } 71 }
65 72
66 /// Unmark the task as run-queued. Return whether the task is spawned. 73 /// Unmark the task as run-queued. Return whether the task is spawned.
67 #[inline(always)] 74 #[inline(always)]
68 pub fn run_dequeue(&self) -> bool { 75 pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
69 self.update(|s| { 76 self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
70 let ok = *s & STATE_SPAWNED != 0;
71 *s &= !STATE_RUN_QUEUED;
72 ok
73 })
74 }
75
76 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
77 #[cfg(feature = "integrated-timers")]
78 #[inline(always)]
79 pub fn timer_enqueue(&self) -> bool {
80 self.update(|s| {
81 let ok = *s & STATE_TIMER_QUEUED == 0;
82 *s |= STATE_TIMER_QUEUED;
83 ok
84 })
85 }
86
87 /// Unmark the task as timer-queued.
88 #[cfg(feature = "integrated-timers")]
89 #[inline(always)]
90 pub fn timer_dequeue(&self) {
91 self.update(|s| *s &= !STATE_TIMER_QUEUED);
92 } 77 }
93} 78}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
deleted file mode 100644
index 94a5f340b..000000000
--- a/embassy-executor/src/raw/timer_queue.rs
+++ /dev/null
@@ -1,76 +0,0 @@
1use core::cmp::min;
2
3use super::TaskRef;
4use crate::raw::util::SyncUnsafeCell;
5
6pub(crate) struct TimerQueueItem {
7 next: SyncUnsafeCell<Option<TaskRef>>,
8}
9
10impl TimerQueueItem {
11 pub const fn new() -> Self {
12 Self {
13 next: SyncUnsafeCell::new(None),
14 }
15 }
16}
17
18pub(crate) struct TimerQueue {
19 head: SyncUnsafeCell<Option<TaskRef>>,
20}
21
22impl TimerQueue {
23 pub const fn new() -> Self {
24 Self {
25 head: SyncUnsafeCell::new(None),
26 }
27 }
28
29 pub(crate) unsafe fn update(&self, p: TaskRef) {
30 let task = p.header();
31 if task.expires_at.get() != u64::MAX {
32 if task.state.timer_enqueue() {
33 task.timer_queue_item.next.set(self.head.get());
34 self.head.set(Some(p));
35 }
36 }
37 }
38
39 pub(crate) unsafe fn next_expiration(&self) -> u64 {
40 let mut res = u64::MAX;
41 self.retain(|p| {
42 let task = p.header();
43 let expires = task.expires_at.get();
44 res = min(res, expires);
45 expires != u64::MAX
46 });
47 res
48 }
49
50 pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) {
51 self.retain(|p| {
52 let task = p.header();
53 if task.expires_at.get() <= now {
54 on_task(p);
55 false
56 } else {
57 true
58 }
59 });
60 }
61
62 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
63 let mut prev = &self.head;
64 while let Some(p) = prev.get() {
65 let task = p.header();
66 if f(p) {
67 // Skip to next
68 prev = &task.timer_queue_item.next;
69 } else {
70 // Remove it
71 prev.set(task.timer_queue_item.next.get());
72 task.state.timer_dequeue();
73 }
74 }
75 }
76}
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs
new file mode 100644
index 000000000..830162039
--- /dev/null
+++ b/embassy-executor/src/raw/trace.rs
@@ -0,0 +1,380 @@
1//! # Tracing
2//!
3//! The `trace` feature enables a number of callbacks that can be used to track the
4//! lifecycle of tasks and/or executors.
5//!
6//! Callbacks will have one or both of the following IDs passed to them:
7//!
8//! 1. A `task_id`, a `u32` value unique to a task for the duration of the time it is valid
9//! 2. An `executor_id`, a `u32` value unique to an executor for the duration of the time it is
10//! valid
11//!
12//! Today, both `task_id` and `executor_id` are u32s containing the least significant 32 bits of
13//! the address of the task or executor, however this is NOT a stable guarantee, and MAY change
14//! at any time.
15//!
16//! IDs are only guaranteed to be unique for the duration of time the item is valid. If a task
17//! ends, and is re-spawned, it MAY or MAY NOT have the same ID. For tasks, this valid time is defined
18//! as the time between `_embassy_trace_task_new` and `_embassy_trace_task_end` for a given task.
19//! For executors, this time is not defined, but is often "forever" for practical embedded
20//! programs.
21//!
22//! Callbacks can be used by enabling the `trace` feature, and providing implementations of the
23//! `extern "Rust"` functions below. All callbacks must be implemented.
24//!
25//! ## Task Tracing lifecycle
26//!
27//! ```text
28//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
29//! │(1) │
30//! │ │
31//! ╔════▼════╗ (2) ┌─────────┐ (3) ┌─────────┐ │
32//! │ ║ SPAWNED ║────▶│ WAITING │────▶│ RUNNING │
33//! ╚═════════╝ └─────────┘ └─────────┘ │
34//! │ ▲ ▲ │ │ │
35//! │ (4) │ │(6) │
36//! │ │(7) └ ─ ─ ┘ │ │
37//! │ │ │ │
38//! │ ┌──────┐ (5) │ │ ┌─────┐
39//! │ IDLE │◀────────────────┘ └─▶│ END │ │
40//! │ └──────┘ └─────┘
41//! ┌──────────────────────┐ │
42//! └ ┤ Task Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
43//! └──────────────────────┘
44//! ```
45//!
46//! 1. A task is spawned, `_embassy_trace_task_new` is called
47//! 2. A task is enqueued for the first time, `_embassy_trace_task_ready_begin` is called
48//! 3. A task is polled, `_embassy_trace_task_exec_begin` is called
49//! 4. WHILE a task is polled, the task is re-awoken, and `_embassy_trace_task_ready_begin` is
50//! called. The task does not IMMEDIATELY move state, until polling is complete and the
51//! RUNNING state is existed. `_embassy_trace_task_exec_end` is called when polling is
52//! complete, marking the transition to WAITING
53//! 5. Polling is complete, `_embassy_trace_task_exec_end` is called
54//! 6. The task has completed, and `_embassy_trace_task_end` is called
55//! 7. A task is awoken, `_embassy_trace_task_ready_begin` is called
56//!
57//! ## Executor Tracing lifecycle
58//!
59//! ```text
60//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
61//! │(1) │
62//! │ │
63//! ╔═══▼══╗ (2) ┌────────────┐ (3) ┌─────────┐ │
64//! │ ║ IDLE ║──────────▶│ SCHEDULING │──────▶│ POLLING │
65//! ╚══════╝ └────────────┘ └─────────┘ │
66//! │ ▲ │ ▲ │
67//! │ (5) │ │ (4) │ │
68//! │ └──────────────┘ └────────────┘
69//! ┌──────────────────────────┐ │
70//! └ ┤ Executor Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71//! └──────────────────────────┘
72//! ```
73//!
74//! 1. The executor is started (no associated trace)
75//! 2. A task on this executor is awoken. `_embassy_trace_task_ready_begin` is called
76//! when this occurs, and `_embassy_trace_poll_start` is called when the executor
77//! actually begins running
78//! 3. The executor has decided a task to poll. `_embassy_trace_task_exec_begin` is called
79//! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called
80//! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called
81
82#![allow(unused)]
83
84use core::cell::UnsafeCell;
85use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
86
87#[cfg(feature = "rtos-trace")]
88use rtos_trace::TaskInfo;
89
90use crate::raw::{SyncExecutor, TaskHeader, TaskRef};
91use crate::spawner::{SpawnError, SpawnToken, Spawner};
92
93/// Global task tracker instance
94///
95/// This static provides access to the global task tracker which maintains
96/// a list of all tasks in the system. It's automatically updated by the
97/// task lifecycle hooks in the trace module.
98#[cfg(feature = "rtos-trace")]
99pub(crate) static TASK_TRACKER: TaskTracker = TaskTracker::new();
100
101/// A thread-safe tracker for all tasks in the system
102///
103/// This struct uses an intrusive linked list approach to track all tasks
104/// without additional memory allocations. It maintains a global list of
105/// tasks that can be traversed to find all currently existing tasks.
106#[cfg(feature = "rtos-trace")]
107pub(crate) struct TaskTracker {
108 head: AtomicPtr<TaskHeader>,
109}
110
111#[cfg(feature = "rtos-trace")]
112impl TaskTracker {
113 /// Creates a new empty task tracker
114 ///
115 /// Initializes a tracker with no tasks in its list.
116 pub const fn new() -> Self {
117 Self {
118 head: AtomicPtr::new(core::ptr::null_mut()),
119 }
120 }
121
122 /// Adds a task to the tracker
123 ///
124 /// This method inserts a task at the head of the intrusive linked list.
125 /// The operation is thread-safe and lock-free, using atomic operations
126 /// to ensure consistency even when called from different contexts.
127 ///
128 /// # Arguments
129 /// * `task` - The task reference to add to the tracker
130 pub fn add(&self, task: TaskRef) {
131 let task_ptr = task.as_ptr();
132
133 loop {
134 let current_head = self.head.load(Ordering::Acquire);
135 unsafe {
136 (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed);
137 }
138
139 if self
140 .head
141 .compare_exchange(current_head, task_ptr.cast_mut(), Ordering::Release, Ordering::Relaxed)
142 .is_ok()
143 {
144 break;
145 }
146 }
147 }
148
149 /// Performs an operation on each task in the tracker
150 ///
151 /// This method traverses the entire list of tasks and calls the provided
152 /// function for each task. This allows inspecting or processing all tasks
153 /// in the system without modifying the tracker's structure.
154 ///
155 /// # Arguments
156 /// * `f` - A function to call for each task in the tracker
157 pub fn for_each<F>(&self, mut f: F)
158 where
159 F: FnMut(TaskRef),
160 {
161 let mut current = self.head.load(Ordering::Acquire);
162 while !current.is_null() {
163 let task = unsafe { TaskRef::from_ptr(current) };
164 f(task);
165
166 current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) };
167 }
168 }
169}
170
171#[cfg(feature = "trace")]
172unsafe extern "Rust" {
173 /// This callback is called when the executor begins polling. This will always
174 /// be paired with a later call to `_embassy_trace_executor_idle`.
175 ///
176 /// This marks the EXECUTOR state transition from IDLE -> SCHEDULING.
177 fn _embassy_trace_poll_start(executor_id: u32);
178
179 /// This callback is called AFTER a task is initialized/allocated, and BEFORE
180 /// it is enqueued to run for the first time. If the task ends (and does not
181 /// loop "forever"), there will be a matching call to `_embassy_trace_task_end`.
182 ///
183 /// Tasks start life in the SPAWNED state.
184 fn _embassy_trace_task_new(executor_id: u32, task_id: u32);
185
186 /// This callback is called AFTER a task is destructed/freed. This will always
187 /// have a prior matching call to `_embassy_trace_task_new`.
188 fn _embassy_trace_task_end(executor_id: u32, task_id: u32);
189
190 /// This callback is called AFTER a task has been dequeued from the runqueue,
191 /// and BEFORE the task is polled. There will always be a matching call to
192 /// `_embassy_trace_task_exec_end`.
193 ///
194 /// This marks the TASK state transition from WAITING -> RUNNING
195 /// This marks the EXECUTOR state transition from SCHEDULING -> POLLING
196 fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32);
197
198 /// This callback is called AFTER a task has completed polling. There will
199 /// always be a matching call to `_embassy_trace_task_exec_begin`.
200 ///
201 /// This marks the TASK state transition from either:
202 /// * RUNNING -> IDLE - if there were no `_embassy_trace_task_ready_begin` events
203 /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task
204 /// * RUNNING -> WAITING - if there WAS a `_embassy_trace_task_ready_begin` event
205 /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task
206 ///
207 /// This marks the EXECUTOR state transition from POLLING -> SCHEDULING
208 fn _embassy_trace_task_exec_end(excutor_id: u32, task_id: u32);
209
210 /// This callback is called AFTER the waker for a task is awoken, and BEFORE it
211 /// is added to the run queue.
212 ///
213 /// If the given task is currently RUNNING, this marks no state change, BUT the
214 /// RUNNING task will then move to the WAITING stage when polling is complete.
215 ///
216 /// If the given task is currently IDLE, this marks the TASK state transition
217 /// from IDLE -> WAITING.
218 ///
219 /// NOTE: This may be called from an interrupt, outside the context of the current
220 /// task or executor.
221 fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32);
222
223 /// This callback is called AFTER all dequeued tasks in a single call to poll
224 /// have been processed. This will always be paired with a call to
225 /// `_embassy_trace_executor_idle`.
226 ///
227 /// This marks the EXECUTOR state transition from SCHEDULING -> IDLE
228 fn _embassy_trace_executor_idle(executor_id: u32);
229}
230
231#[inline]
232pub(crate) fn poll_start(executor: &SyncExecutor) {
233 #[cfg(feature = "trace")]
234 unsafe {
235 _embassy_trace_poll_start(executor as *const _ as u32)
236 }
237}
238
239#[inline]
240pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) {
241 #[cfg(feature = "trace")]
242 unsafe {
243 _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32)
244 }
245
246 #[cfg(feature = "rtos-trace")]
247 {
248 rtos_trace::trace::task_new(task.as_ptr() as u32);
249 let name = task.metadata().name().unwrap_or("unnamed task\0");
250 let info = rtos_trace::TaskInfo {
251 name,
252 priority: 0,
253 stack_base: 0,
254 stack_size: 0,
255 };
256 rtos_trace::trace::task_send_info(task.id(), info);
257 }
258
259 #[cfg(feature = "rtos-trace")]
260 TASK_TRACKER.add(*task);
261}
262
263#[inline]
264pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) {
265 #[cfg(feature = "trace")]
266 unsafe {
267 _embassy_trace_task_end(executor as u32, task.as_ptr() as u32)
268 }
269}
270
271#[inline]
272pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) {
273 #[cfg(feature = "trace")]
274 unsafe {
275 _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32)
276 }
277 #[cfg(feature = "rtos-trace")]
278 rtos_trace::trace::task_ready_begin(task.as_ptr() as u32);
279}
280
281#[inline]
282pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) {
283 #[cfg(feature = "trace")]
284 unsafe {
285 _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32)
286 }
287 #[cfg(feature = "rtos-trace")]
288 rtos_trace::trace::task_exec_begin(task.as_ptr() as u32);
289}
290
291#[inline]
292pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) {
293 #[cfg(feature = "trace")]
294 unsafe {
295 _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32)
296 }
297 #[cfg(feature = "rtos-trace")]
298 rtos_trace::trace::task_exec_end();
299}
300
301#[inline]
302pub(crate) fn executor_idle(executor: &SyncExecutor) {
303 #[cfg(feature = "trace")]
304 unsafe {
305 _embassy_trace_executor_idle(executor as *const _ as u32)
306 }
307 #[cfg(feature = "rtos-trace")]
308 rtos_trace::trace::system_idle();
309}
310
311/// Returns an iterator over all active tasks in the system
312///
313/// This function provides a convenient way to iterate over all tasks
314/// that are currently tracked in the system. The returned iterator
315/// yields each task in the global task tracker.
316///
317/// # Returns
318/// An iterator that yields `TaskRef` items for each task
319#[cfg(feature = "rtos-trace")]
320fn get_all_active_tasks() -> impl Iterator<Item = TaskRef> + 'static {
321 struct TaskIterator<'a> {
322 tracker: &'a TaskTracker,
323 current: *mut TaskHeader,
324 }
325
326 impl<'a> Iterator for TaskIterator<'a> {
327 type Item = TaskRef;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 if self.current.is_null() {
331 return None;
332 }
333
334 let task = unsafe { TaskRef::from_ptr(self.current) };
335 self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) };
336
337 Some(task)
338 }
339 }
340
341 TaskIterator {
342 tracker: &TASK_TRACKER,
343 current: TASK_TRACKER.head.load(Ordering::Acquire),
344 }
345}
346
347/// Perform an action on each active task
348#[cfg(feature = "rtos-trace")]
349fn with_all_active_tasks<F>(f: F)
350where
351 F: FnMut(TaskRef),
352{
353 TASK_TRACKER.for_each(f);
354}
355
356#[cfg(feature = "rtos-trace")]
357impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor {
358 fn task_list() {
359 with_all_active_tasks(|task| {
360 let info = rtos_trace::TaskInfo {
361 name: task.metadata().name().unwrap_or("unnamed task\0"),
362 priority: 0,
363 stack_base: 0,
364 stack_size: 0,
365 };
366 rtos_trace::trace::task_send_info(task.id(), info);
367 });
368 }
369 fn time() -> u64 {
370 const fn gcd(a: u64, b: u64) -> u64 {
371 if b == 0 { a } else { gcd(b, a % b) }
372 }
373
374 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
375 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
376 }
377}
378
379#[cfg(feature = "rtos-trace")]
380rtos_trace::global_os_callbacks! {SyncExecutor}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 8d3910a25..2706f0fdf 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -1,6 +1,6 @@
1use core::task::{RawWaker, RawWakerVTable, Waker}; 1use core::task::{RawWaker, RawWakerVTable, Waker};
2 2
3use super::{wake_task, TaskHeader, TaskRef}; 3use super::{TaskHeader, TaskRef, wake_task};
4 4
5static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 5static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
6 6
@@ -26,38 +26,19 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
26/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps 26/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
27/// avoid dynamic dispatch. 27/// avoid dynamic dispatch.
28/// 28///
29/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). 29/// You can use the returned task pointer to wake the task with [`wake_task`].
30/// 30///
31/// # Panics 31/// # Panics
32/// 32///
33/// Panics if the waker is not created by the Embassy executor. 33/// Panics if the waker is not created by the Embassy executor.
34pub fn task_from_waker(waker: &Waker) -> TaskRef { 34pub fn task_from_waker(waker: &Waker) -> TaskRef {
35 let (vtable, data) = { 35 // make sure to compare vtable addresses. Doing `==` on the references
36 #[cfg(not(feature = "nightly"))] 36 // will compare the contents, which is slower.
37 { 37 if waker.vtable() as *const _ != &VTABLE as *const _ {
38 struct WakerHack { 38 panic!(
39 data: *const (), 39 "Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor."
40 vtable: &'static RawWakerVTable, 40 )
41 }
42
43 // safety: OK because WakerHack has the same layout as Waker.
44 // This is not really guaranteed because the structs are `repr(Rust)`, it is
45 // indeed the case in the current implementation.
46 // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
47 let hack: &WakerHack = unsafe { core::mem::transmute(waker) };
48 (hack.vtable, hack.data)
49 }
50
51 #[cfg(feature = "nightly")]
52 {
53 let raw_waker = waker.as_raw();
54 (raw_waker.vtable(), raw_waker.data())
55 }
56 };
57
58 if vtable != &VTABLE {
59 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
60 } 41 }
61 // safety: our wakers are always created with `TaskRef::as_ptr` 42 // safety: our wakers are always created with `TaskRef::as_ptr`
62 unsafe { TaskRef::from_ptr(data as *const TaskHeader) } 43 unsafe { TaskRef::from_ptr(waker.data() as *const TaskHeader) }
63} 44}
diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs
index 435a0ff7e..919bcc61a 100644
--- a/embassy-executor/src/raw/waker_turbo.rs
+++ b/embassy-executor/src/raw/waker_turbo.rs
@@ -1,7 +1,7 @@
1use core::ptr::NonNull; 1use core::ptr::NonNull;
2use core::task::Waker; 2use core::task::Waker;
3 3
4use super::{wake_task, TaskHeader, TaskRef}; 4use super::{TaskHeader, TaskRef, wake_task};
5 5
6pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { 6pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
7 Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _)) 7 Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _))
@@ -26,7 +26,7 @@ pub fn task_from_waker(waker: &Waker) -> TaskRef {
26} 26}
27 27
28#[inline(never)] 28#[inline(never)]
29#[no_mangle] 29#[unsafe(no_mangle)]
30fn _turbo_wake(ptr: NonNull<()>) { 30fn _turbo_wake(ptr: NonNull<()>) {
31 // safety: our wakers are always created with `TaskRef::as_ptr` 31 // safety: our wakers are always created with `TaskRef::as_ptr`
32 let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) }; 32 let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) };