aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/cortex_ar.rs84
-rw-r--r--embassy-executor/src/arch/cortex_m.rs9
-rw-r--r--embassy-executor/src/arch/spin.rs58
-rw-r--r--embassy-executor/src/lib.rs211
-rw-r--r--embassy-executor/src/raw/mod.rs300
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs3
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs17
-rw-r--r--embassy-executor/src/raw/state_atomics.rs63
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs58
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs74
-rw-r--r--embassy-executor/src/raw/timer_queue.rs119
-rw-r--r--embassy-executor/src/raw/trace.rs412
-rw-r--r--embassy-executor/src/raw/waker.rs31
-rw-r--r--embassy-executor/src/spawner.rs120
14 files changed, 1095 insertions, 464 deletions
diff --git a/embassy-executor/src/arch/cortex_ar.rs b/embassy-executor/src/arch/cortex_ar.rs
new file mode 100644
index 000000000..f9e2f3f7c
--- /dev/null
+++ b/embassy-executor/src/arch/cortex_ar.rs
@@ -0,0 +1,84 @@
1#[cfg(feature = "executor-interrupt")]
2compile_error!("`executor-interrupt` is not supported with `arch-cortex-ar`.");
3
4#[export_name = "__pender"]
5#[cfg(any(feature = "executor-thread", feature = "executor-interrupt"))]
6fn __pender(context: *mut ()) {
7 // `context` is always `usize::MAX` created by `Executor::run`.
8 let context = context as usize;
9
10 #[cfg(feature = "executor-thread")]
11 // Try to make Rust optimize the branching away if we only use thread mode.
12 if !cfg!(feature = "executor-interrupt") || context == THREAD_PENDER {
13 cortex_ar::asm::sev();
14 return;
15 }
16}
17
18#[cfg(feature = "executor-thread")]
19pub use thread::*;
20#[cfg(feature = "executor-thread")]
21mod thread {
22 pub(super) const THREAD_PENDER: usize = usize::MAX;
23
24 use core::marker::PhantomData;
25
26 use cortex_ar::asm::wfe;
27 pub use embassy_executor_macros::main_cortex_ar as main;
28
29 use crate::{raw, Spawner};
30
31 /// Thread mode executor, using WFE/SEV.
32 ///
33 /// This is the simplest and most common kind of executor. It runs on
34 /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
35 /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
36 /// is executed, to make the `WFE` exit from sleep and poll the task.
37 ///
38 /// This executor allows for ultra low power consumption for chips where `WFE`
39 /// triggers low-power sleep without extra steps. If your chip requires extra steps,
40 /// you may use [`raw::Executor`] directly to program custom behavior.
41 pub struct Executor {
42 inner: raw::Executor,
43 not_send: PhantomData<*mut ()>,
44 }
45
46 impl Executor {
47 /// Create a new Executor.
48 pub fn new() -> Self {
49 Self {
50 inner: raw::Executor::new(THREAD_PENDER as *mut ()),
51 not_send: PhantomData,
52 }
53 }
54
55 /// Run the executor.
56 ///
57 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
58 /// this executor. Use it to spawn the initial task(s). After `init` returns,
59 /// the executor starts running the tasks.
60 ///
61 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
62 /// for example by passing it as an argument to the initial tasks.
63 ///
64 /// This function requires `&'static mut self`. This means you have to store the
65 /// Executor instance in a place where it'll live forever and grants you mutable
66 /// access. There's a few ways to do this:
67 ///
68 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
69 /// - a `static mut` (unsafe)
70 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
71 ///
72 /// This function never returns.
73 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
74 init(self.inner.spawner());
75
76 loop {
77 unsafe {
78 self.inner.poll();
79 }
80 wfe();
81 }
82 }
83 }
84}
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs
index 5c517e0a2..1c9ddd8a0 100644
--- a/embassy-executor/src/arch/cortex_m.rs
+++ b/embassy-executor/src/arch/cortex_m.rs
@@ -143,7 +143,7 @@ mod interrupt {
143 /// If this is not the case, you may use an interrupt from any unused peripheral. 143 /// If this is not the case, you may use an interrupt from any unused peripheral.
144 /// 144 ///
145 /// It is somewhat more complex to use, it's recommended to use the thread-mode 145 /// It is somewhat more complex to use, it's recommended to use the thread-mode
146 /// [`Executor`] instead, if it works for your use case. 146 /// [`Executor`](crate::Executor) instead, if it works for your use case.
147 pub struct InterruptExecutor { 147 pub struct InterruptExecutor {
148 started: Mutex<Cell<bool>>, 148 started: Mutex<Cell<bool>>,
149 executor: UnsafeCell<MaybeUninit<raw::Executor>>, 149 executor: UnsafeCell<MaybeUninit<raw::Executor>>,
@@ -179,11 +179,11 @@ mod interrupt {
179 /// The executor keeps running in the background through the interrupt. 179 /// The executor keeps running in the background through the interrupt.
180 /// 180 ///
181 /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`] 181 /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`]
182 /// is returned instead of a [`Spawner`](embassy_executor::Spawner) because the executor effectively runs in a 182 /// is returned instead of a [`Spawner`](crate::Spawner) because the executor effectively runs in a
183 /// different "thread" (the interrupt), so spawning tasks on it is effectively 183 /// different "thread" (the interrupt), so spawning tasks on it is effectively
184 /// sending them. 184 /// sending them.
185 /// 185 ///
186 /// To obtain a [`Spawner`](embassy_executor::Spawner) for this executor, use [`Spawner::for_current_executor()`](embassy_executor::Spawner::for_current_executor()) from 186 /// To obtain a [`Spawner`](crate::Spawner) for this executor, use [`Spawner::for_current_executor()`](crate::Spawner::for_current_executor()) from
187 /// a task running in it. 187 /// a task running in it.
188 /// 188 ///
189 /// # Interrupt requirements 189 /// # Interrupt requirements
@@ -195,6 +195,7 @@ mod interrupt {
195 /// You must set the interrupt priority before calling this method. You MUST NOT 195 /// You must set the interrupt priority before calling this method. You MUST NOT
196 /// do it after. 196 /// do it after.
197 /// 197 ///
198 /// [`SendSpawner`]: crate::SendSpawner
198 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { 199 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner {
199 if critical_section::with(|cs| self.started.borrow(cs).replace(true)) { 200 if critical_section::with(|cs| self.started.borrow(cs).replace(true)) {
200 panic!("InterruptExecutor::start() called multiple times on the same executor."); 201 panic!("InterruptExecutor::start() called multiple times on the same executor.");
@@ -215,7 +216,7 @@ mod interrupt {
215 216
216 /// Get a SendSpawner for this executor 217 /// Get a SendSpawner for this executor
217 /// 218 ///
218 /// This returns a [`SendSpawner`] you can use to spawn tasks on this 219 /// This returns a [`SendSpawner`](crate::SendSpawner) you can use to spawn tasks on this
219 /// executor. 220 /// executor.
220 /// 221 ///
221 /// This MUST only be called on an executor that has already been started. 222 /// This MUST only be called on an executor that has already been started.
diff --git a/embassy-executor/src/arch/spin.rs b/embassy-executor/src/arch/spin.rs
new file mode 100644
index 000000000..340023620
--- /dev/null
+++ b/embassy-executor/src/arch/spin.rs
@@ -0,0 +1,58 @@
1#[cfg(feature = "executor-interrupt")]
2compile_error!("`executor-interrupt` is not supported with `arch-spin`.");
3
4#[cfg(feature = "executor-thread")]
5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
8 use core::marker::PhantomData;
9
10 pub use embassy_executor_macros::main_spin as main;
11
12 use crate::{raw, Spawner};
13
14 #[export_name = "__pender"]
15 fn __pender(_context: *mut ()) {}
16
17 /// Spin Executor
18 pub struct Executor {
19 inner: raw::Executor,
20 not_send: PhantomData<*mut ()>,
21 }
22
23 impl Executor {
24 /// Create a new Executor.
25 pub fn new() -> Self {
26 Self {
27 inner: raw::Executor::new(core::ptr::null_mut()),
28 not_send: PhantomData,
29 }
30 }
31
32 /// Run the executor.
33 ///
34 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
35 /// this executor. Use it to spawn the initial task(s). After `init` returns,
36 /// the executor starts running the tasks.
37 ///
38 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
39 /// for example by passing it as an argument to the initial tasks.
40 ///
41 /// This function requires `&'static mut self`. This means you have to store the
42 /// Executor instance in a place where it'll live forever and grants you mutable
43 /// access. There's a few ways to do this:
44 ///
45 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
46 /// - a `static mut` (unsafe)
47 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
48 ///
49 /// This function never returns.
50 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
51 init(self.inner.spawner());
52
53 loop {
54 unsafe { self.inner.poll() };
55 }
56 }
57 }
58}
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs
index 553ed76d3..dfe420bab 100644
--- a/embassy-executor/src/lib.rs
+++ b/embassy-executor/src/lib.rs
@@ -1,5 +1,4 @@
1#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)] 1#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(waker_getters))]
3#![allow(clippy::new_without_default)] 2#![allow(clippy::new_without_default)]
4#![doc = include_str!("../README.md")] 3#![doc = include_str!("../README.md")]
5#![warn(missing_docs)] 4#![warn(missing_docs)]
@@ -24,120 +23,186 @@ macro_rules! check_at_most_one {
24 check_at_most_one!(@amo [$($f)*] [$($f)*] []); 23 check_at_most_one!(@amo [$($f)*] [$($f)*] []);
25 }; 24 };
26} 25}
27check_at_most_one!("arch-avr", "arch-cortex-m", "arch-riscv32", "arch-std", "arch-wasm",); 26check_at_most_one!(
27 "arch-avr",
28 "arch-cortex-m",
29 "arch-cortex-ar",
30 "arch-riscv32",
31 "arch-std",
32 "arch-wasm",
33 "arch-spin",
34);
28 35
29#[cfg(feature = "_arch")] 36#[cfg(feature = "_arch")]
30#[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")] 37#[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")]
31#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")] 38#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")]
39#[cfg_attr(feature = "arch-cortex-ar", path = "arch/cortex_ar.rs")]
32#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] 40#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")]
33#[cfg_attr(feature = "arch-std", path = "arch/std.rs")] 41#[cfg_attr(feature = "arch-std", path = "arch/std.rs")]
34#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] 42#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")]
43#[cfg_attr(feature = "arch-spin", path = "arch/spin.rs")]
35mod arch; 44mod arch;
36 45
37#[cfg(feature = "_arch")] 46#[cfg(feature = "_arch")]
38#[allow(unused_imports)] // don't warn if the module is empty. 47#[allow(unused_imports)] // don't warn if the module is empty.
39pub use arch::*; 48pub use arch::*;
49#[cfg(not(feature = "_arch"))]
50pub use embassy_executor_macros::main_unspecified as main;
40 51
41pub mod raw; 52pub mod raw;
42 53
43mod spawner; 54mod spawner;
44pub use spawner::*; 55pub use spawner::*;
45 56
46mod config {
47 #![allow(unused)]
48 include!(concat!(env!("OUT_DIR"), "/config.rs"));
49}
50
51/// Implementation details for embassy macros. 57/// Implementation details for embassy macros.
52/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. 58/// Do not use. Used for macros and HALs only. Not covered by semver guarantees.
53#[doc(hidden)] 59#[doc(hidden)]
54#[cfg(not(feature = "nightly"))] 60#[cfg(not(feature = "nightly"))]
55pub mod _export { 61pub mod _export {
56 use core::alloc::Layout; 62 use core::cell::UnsafeCell;
57 use core::cell::{Cell, UnsafeCell};
58 use core::future::Future; 63 use core::future::Future;
59 use core::mem::MaybeUninit; 64 use core::mem::MaybeUninit;
60 use core::ptr::null_mut;
61
62 use critical_section::{CriticalSection, Mutex};
63 65
64 use crate::raw::TaskPool; 66 use crate::raw::TaskPool;
65 67
66 struct Arena<const N: usize> { 68 pub trait TaskFn<Args>: Copy {
67 buf: UnsafeCell<MaybeUninit<[u8; N]>>, 69 type Fut: Future + 'static;
68 ptr: Mutex<Cell<*mut u8>>,
69 } 70 }
70 71
71 unsafe impl<const N: usize> Sync for Arena<N> {} 72 macro_rules! task_fn_impl {
72 unsafe impl<const N: usize> Send for Arena<N> {} 73 ($($Tn:ident),*) => {
73 74 impl<F, Fut, $($Tn,)*> TaskFn<($($Tn,)*)> for F
74 impl<const N: usize> Arena<N> { 75 where
75 const fn new() -> Self { 76 F: Copy + FnOnce($($Tn,)*) -> Fut,
76 Self { 77 Fut: Future + 'static,
77 buf: UnsafeCell::new(MaybeUninit::uninit()), 78 {
78 ptr: Mutex::new(Cell::new(null_mut())), 79 type Fut = Fut;
79 } 80 }
80 } 81 };
81 82 }
82 fn alloc<T>(&'static self, cs: CriticalSection) -> &'static mut MaybeUninit<T> {
83 let layout = Layout::new::<T>();
84
85 let start = self.buf.get().cast::<u8>();
86 let end = unsafe { start.add(N) };
87 83
88 let mut ptr = self.ptr.borrow(cs).get(); 84 task_fn_impl!();
89 if ptr.is_null() { 85 task_fn_impl!(T0);
90 ptr = self.buf.get().cast::<u8>(); 86 task_fn_impl!(T0, T1);
91 } 87 task_fn_impl!(T0, T1, T2);
88 task_fn_impl!(T0, T1, T2, T3);
89 task_fn_impl!(T0, T1, T2, T3, T4);
90 task_fn_impl!(T0, T1, T2, T3, T4, T5);
91 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6);
92 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7);
93 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8);
94 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9);
95 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
96 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
97 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
98 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
99 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
100 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
101
102 #[allow(private_bounds)]
103 #[repr(C)]
104 pub struct TaskPoolHolder<const SIZE: usize, const ALIGN: usize>
105 where
106 Align<ALIGN>: Alignment,
107 {
108 data: UnsafeCell<[MaybeUninit<u8>; SIZE]>,
109 align: Align<ALIGN>,
110 }
92 111
93 let bytes_left = (end as usize) - (ptr as usize); 112 unsafe impl<const SIZE: usize, const ALIGN: usize> Send for TaskPoolHolder<SIZE, ALIGN> where Align<ALIGN>: Alignment {}
94 let align_offset = (ptr as usize).next_multiple_of(layout.align()) - (ptr as usize); 113 unsafe impl<const SIZE: usize, const ALIGN: usize> Sync for TaskPoolHolder<SIZE, ALIGN> where Align<ALIGN>: Alignment {}
95 114
96 if align_offset + layout.size() > bytes_left { 115 #[allow(private_bounds)]
97 panic!("embassy-executor: task arena is full. You must increase the arena size, see the documentation for details: https://docs.embassy.dev/embassy-executor/"); 116 impl<const SIZE: usize, const ALIGN: usize> TaskPoolHolder<SIZE, ALIGN>
98 } 117 where
118 Align<ALIGN>: Alignment,
119 {
120 pub const fn get(&self) -> *const u8 {
121 self.data.get().cast()
122 }
123 }
99 124
100 let res = unsafe { ptr.add(align_offset) }; 125 pub const fn task_pool_size<F, Args, Fut, const POOL_SIZE: usize>(_: F) -> usize
101 let ptr = unsafe { ptr.add(align_offset + layout.size()) }; 126 where
127 F: TaskFn<Args, Fut = Fut>,
128 Fut: Future + 'static,
129 {
130 size_of::<TaskPool<Fut, POOL_SIZE>>()
131 }
102 132
103 self.ptr.borrow(cs).set(ptr); 133 pub const fn task_pool_align<F, Args, Fut, const POOL_SIZE: usize>(_: F) -> usize
134 where
135 F: TaskFn<Args, Fut = Fut>,
136 Fut: Future + 'static,
137 {
138 align_of::<TaskPool<Fut, POOL_SIZE>>()
139 }
104 140
105 unsafe { &mut *(res as *mut MaybeUninit<T>) } 141 pub const fn task_pool_new<F, Args, Fut, const POOL_SIZE: usize>(_: F) -> TaskPool<Fut, POOL_SIZE>
106 } 142 where
143 F: TaskFn<Args, Fut = Fut>,
144 Fut: Future + 'static,
145 {
146 TaskPool::new()
107 } 147 }
108 148
109 static ARENA: Arena<{ crate::config::TASK_ARENA_SIZE }> = Arena::new(); 149 #[allow(private_bounds)]
150 #[repr(transparent)]
151 pub struct Align<const N: usize>([<Self as Alignment>::Archetype; 0])
152 where
153 Self: Alignment;
110 154
111 pub struct TaskPoolRef { 155 trait Alignment {
112 // type-erased `&'static mut TaskPool<F, N>` 156 /// A zero-sized type of particular alignment.
113 // Needed because statics can't have generics. 157 type Archetype: Copy + Eq + PartialEq + Send + Sync + Unpin;
114 ptr: Mutex<Cell<*mut ()>>,
115 } 158 }
116 unsafe impl Sync for TaskPoolRef {}
117 unsafe impl Send for TaskPoolRef {}
118 159
119 impl TaskPoolRef { 160 macro_rules! aligns {
120 pub const fn new() -> Self { 161 ($($AlignX:ident: $n:literal,)*) => {
121 Self { 162 $(
122 ptr: Mutex::new(Cell::new(null_mut())), 163 #[derive(Copy, Clone, Eq, PartialEq)]
123 } 164 #[repr(align($n))]
124 } 165 struct $AlignX {}
125 166 impl Alignment for Align<$n> {
126 /// Get the pool for this ref, allocating it from the arena the first time. 167 type Archetype = $AlignX;
127 ///
128 /// safety: for a given TaskPoolRef instance, must always call with the exact
129 /// same generic params.
130 pub unsafe fn get<F: Future, const N: usize>(&'static self) -> &'static TaskPool<F, N> {
131 critical_section::with(|cs| {
132 let ptr = self.ptr.borrow(cs);
133 if ptr.get().is_null() {
134 let pool = ARENA.alloc::<TaskPool<F, N>>(cs);
135 pool.write(TaskPool::new());
136 ptr.set(pool as *mut _ as _);
137 } 168 }
138 169 )*
139 unsafe { &*(ptr.get() as *const _) } 170 };
140 })
141 }
142 } 171 }
172
173 aligns!(
174 Align1: 1,
175 Align2: 2,
176 Align4: 4,
177 Align8: 8,
178 Align16: 16,
179 Align32: 32,
180 Align64: 64,
181 Align128: 128,
182 Align256: 256,
183 Align512: 512,
184 Align1024: 1024,
185 Align2048: 2048,
186 Align4096: 4096,
187 Align8192: 8192,
188 Align16384: 16384,
189 );
190 #[cfg(any(target_pointer_width = "32", target_pointer_width = "64"))]
191 aligns!(
192 Align32768: 32768,
193 Align65536: 65536,
194 Align131072: 131072,
195 Align262144: 262144,
196 Align524288: 524288,
197 Align1048576: 1048576,
198 Align2097152: 2097152,
199 Align4194304: 4194304,
200 Align8388608: 8388608,
201 Align16777216: 16777216,
202 Align33554432: 33554432,
203 Align67108864: 67108864,
204 Align134217728: 134217728,
205 Align268435456: 268435456,
206 Align536870912: 536870912,
207 );
143} 208}
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index d9ea5c005..913da2e25 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -11,13 +11,14 @@
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] 11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 12mod run_queue;
13 13
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] 14#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] 15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")]
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state; 17mod state;
18 18
19#[cfg(feature = "integrated-timers")] 19pub mod timer_queue;
20mod timer_queue; 20#[cfg(feature = "trace")]
21pub mod trace;
21pub(crate) mod util; 22pub(crate) mod util;
22#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 23#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
23mod waker; 24mod waker;
@@ -27,12 +28,13 @@ use core::marker::PhantomData;
27use core::mem; 28use core::mem;
28use core::pin::Pin; 29use core::pin::Pin;
29use core::ptr::NonNull; 30use core::ptr::NonNull;
31#[cfg(not(feature = "arch-avr"))]
32use core::sync::atomic::AtomicPtr;
33use core::sync::atomic::Ordering;
30use core::task::{Context, Poll}; 34use core::task::{Context, Poll};
31 35
32#[cfg(feature = "integrated-timers")] 36#[cfg(feature = "arch-avr")]
33use embassy_time_driver::AlarmHandle; 37use portable_atomic::AtomicPtr;
34#[cfg(feature = "rtos-trace")]
35use rtos_trace::trace;
36 38
37use self::run_queue::{RunQueue, RunQueueItem}; 39use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 40use self::state::State;
@@ -41,20 +43,62 @@ pub use self::waker::task_from_waker;
41use super::SpawnToken; 43use super::SpawnToken;
42 44
43/// Raw task header for use in task pointers. 45/// Raw task header for use in task pointers.
46///
47/// A task can be in one of the following states:
48///
49/// - Not spawned: the task is ready to spawn.
50/// - `SPAWNED`: the task is currently spawned and may be running.
51/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
52/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
53/// polling the task's future.
54///
55/// A task's complete life cycle is as follows:
56///
57/// ```text
58/// ┌────────────┐ ┌────────────────────────┐
59/// │Not spawned │◄─5┤Not spawned|Run enqueued│
60/// │ ├6─►│ │
61/// └─────┬──────┘ └──────▲─────────────────┘
62/// 1 │
63/// │ ┌────────────┘
64/// │ 4
65/// ┌─────▼────┴─────────┐
66/// │Spawned|Run enqueued│
67/// │ │
68/// └─────┬▲─────────────┘
69/// 2│
70/// │3
71/// ┌─────▼┴─────┐
72/// │ Spawned │
73/// │ │
74/// └────────────┘
75/// ```
76///
77/// Transitions:
78/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
79/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
80/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
81/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
82/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
83/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
44pub(crate) struct TaskHeader { 84pub(crate) struct TaskHeader {
45 pub(crate) state: State, 85 pub(crate) state: State,
46 pub(crate) run_queue_item: RunQueueItem, 86 pub(crate) run_queue_item: RunQueueItem,
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 87 pub(crate) executor: AtomicPtr<SyncExecutor>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 88 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 89
50 #[cfg(feature = "integrated-timers")] 90 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>,
52 #[cfg(feature = "integrated-timers")]
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 91 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
92 #[cfg(feature = "trace")]
93 pub(crate) name: Option<&'static str>,
94 #[cfg(feature = "trace")]
95 pub(crate) id: u32,
96 #[cfg(feature = "trace")]
97 all_tasks_next: AtomicPtr<TaskHeader>,
54} 98}
55 99
56/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 100/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
57#[derive(Clone, Copy)] 101#[derive(Clone, Copy, PartialEq)]
58pub struct TaskRef { 102pub struct TaskRef {
59 ptr: NonNull<TaskHeader>, 103 ptr: NonNull<TaskHeader>,
60} 104}
@@ -76,10 +120,31 @@ impl TaskRef {
76 } 120 }
77 } 121 }
78 122
123 /// # Safety
124 ///
125 /// The result of this function must only be compared
126 /// for equality, or stored, but not used.
127 pub const unsafe fn dangling() -> Self {
128 Self {
129 ptr: NonNull::dangling(),
130 }
131 }
132
79 pub(crate) fn header(self) -> &'static TaskHeader { 133 pub(crate) fn header(self) -> &'static TaskHeader {
80 unsafe { self.ptr.as_ref() } 134 unsafe { self.ptr.as_ref() }
81 } 135 }
82 136
137 /// Returns a reference to the executor that the task is currently running on.
138 pub unsafe fn executor(self) -> Option<&'static Executor> {
139 let executor = self.header().executor.load(Ordering::Relaxed);
140 executor.as_ref().map(|e| Executor::wrap(e))
141 }
142
143 /// Returns a reference to the timer queue item.
144 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
145 &self.header().timer_queue_item
146 }
147
83 /// The returned pointer is valid for the entire TaskStorage. 148 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 149 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 150 self.ptr.as_ptr()
@@ -107,6 +172,10 @@ pub struct TaskStorage<F: Future + 'static> {
107 future: UninitCell<F>, // Valid if STATE_SPAWNED 172 future: UninitCell<F>, // Valid if STATE_SPAWNED
108} 173}
109 174
175unsafe fn poll_exited(_p: TaskRef) {
176 // Nothing to do, the task is already !SPAWNED and dequeued.
177}
178
110impl<F: Future + 'static> TaskStorage<F> { 179impl<F: Future + 'static> TaskStorage<F> {
111 const NEW: Self = Self::new(); 180 const NEW: Self = Self::new();
112 181
@@ -116,14 +185,17 @@ impl<F: Future + 'static> TaskStorage<F> {
116 raw: TaskHeader { 185 raw: TaskHeader {
117 state: State::new(), 186 state: State::new(),
118 run_queue_item: RunQueueItem::new(), 187 run_queue_item: RunQueueItem::new(),
119 executor: SyncUnsafeCell::new(None), 188 executor: AtomicPtr::new(core::ptr::null_mut()),
120 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 189 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
121 poll_fn: SyncUnsafeCell::new(None), 190 poll_fn: SyncUnsafeCell::new(None),
122 191
123 #[cfg(feature = "integrated-timers")]
124 expires_at: SyncUnsafeCell::new(0),
125 #[cfg(feature = "integrated-timers")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 192 timer_queue_item: timer_queue::TimerQueueItem::new(),
193 #[cfg(feature = "trace")]
194 name: None,
195 #[cfg(feature = "trace")]
196 id: 0,
197 #[cfg(feature = "trace")]
198 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
127 }, 199 },
128 future: UninitCell::uninit(), 200 future: UninitCell::uninit(),
129 } 201 }
@@ -151,18 +223,30 @@ impl<F: Future + 'static> TaskStorage<F> {
151 } 223 }
152 224
153 unsafe fn poll(p: TaskRef) { 225 unsafe fn poll(p: TaskRef) {
154 let this = &*(p.as_ptr() as *const TaskStorage<F>); 226 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
155 227
156 let future = Pin::new_unchecked(this.future.as_mut()); 228 let future = Pin::new_unchecked(this.future.as_mut());
157 let waker = waker::from_task(p); 229 let waker = waker::from_task(p);
158 let mut cx = Context::from_waker(&waker); 230 let mut cx = Context::from_waker(&waker);
159 match future.poll(&mut cx) { 231 match future.poll(&mut cx) {
160 Poll::Ready(_) => { 232 Poll::Ready(_) => {
233 #[cfg(feature = "trace")]
234 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
235
236 // As the future has finished and this function will not be called
237 // again, we can safely drop the future here.
161 this.future.drop_in_place(); 238 this.future.drop_in_place();
239
240 // We replace the poll_fn with a despawn function, so that the task is cleaned up
241 // when the executor polls it next.
242 this.raw.poll_fn.set(Some(poll_exited));
243
244 // Make sure we despawn last, so that other threads can only spawn the task
245 // after we're done with it.
162 this.raw.state.despawn(); 246 this.raw.state.despawn();
163 247
164 #[cfg(feature = "integrated-timers")] 248 #[cfg(feature = "trace")]
165 this.raw.expires_at.set(u64::MAX); 249 trace::task_end(exec_ptr, &p);
166 } 250 }
167 Poll::Pending => {} 251 Poll::Pending => {}
168 } 252 }
@@ -316,26 +400,13 @@ impl Pender {
316pub(crate) struct SyncExecutor { 400pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 401 run_queue: RunQueue,
318 pender: Pender, 402 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 403}
325 404
326impl SyncExecutor { 405impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 406 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 407 Self {
332 run_queue: RunQueue::new(), 408 run_queue: RunQueue::new(),
333 pender, 409 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 410 }
340 } 411 }
341 412
@@ -346,90 +417,50 @@ impl SyncExecutor {
346 /// - `task` must be set up to run in this executor. 417 /// - `task` must be set up to run in this executor.
347 /// - `task` must NOT be already enqueued (in this executor or another one). 418 /// - `task` must NOT be already enqueued (in this executor or another one).
348 #[inline(always)] 419 #[inline(always)]
349 unsafe fn enqueue(&self, task: TaskRef) { 420 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
350 #[cfg(feature = "rtos-trace")] 421 #[cfg(feature = "trace")]
351 trace::task_ready_begin(task.as_ptr() as u32); 422 trace::task_ready_begin(self, &task);
352 423
353 if self.run_queue.enqueue(task) { 424 if self.run_queue.enqueue(task, l) {
354 self.pender.pend(); 425 self.pender.pend();
355 } 426 }
356 } 427 }
357 428
358 #[cfg(feature = "integrated-timers")]
359 fn alarm_callback(ctx: *mut ()) {
360 let this: &Self = unsafe { &*(ctx as *const Self) };
361 this.pender.pend();
362 }
363
364 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 429 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
365 task.header().executor.set(Some(self)); 430 task.header()
431 .executor
432 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
366 433
367 #[cfg(feature = "rtos-trace")] 434 #[cfg(feature = "trace")]
368 trace::task_new(task.as_ptr() as u32); 435 trace::task_new(self, &task);
369 436
370 self.enqueue(task); 437 state::locked(|l| {
438 self.enqueue(task, l);
439 })
371 } 440 }
372 441
373 /// # Safety 442 /// # Safety
374 /// 443 ///
375 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 444 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
376 pub(crate) unsafe fn poll(&'static self) { 445 pub(crate) unsafe fn poll(&'static self) {
377 #[cfg(feature = "integrated-timers")] 446 #[cfg(feature = "trace")]
378 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); 447 trace::poll_start(self);
379
380 #[allow(clippy::never_loop)]
381 loop {
382 #[cfg(feature = "integrated-timers")]
383 self.timer_queue
384 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
385
386 self.run_queue.dequeue_all(|p| {
387 let task = p.header();
388
389 #[cfg(feature = "integrated-timers")]
390 task.expires_at.set(u64::MAX);
391
392 if !task.state.run_dequeue() {
393 // If task is not running, ignore it. This can happen in the following scenario:
394 // - Task gets dequeued, poll starts
395 // - While task is being polled, it gets woken. It gets placed in the queue.
396 // - Task poll finishes, returning done=true
397 // - RUNNING bit is cleared, but the task is already in the queue.
398 return;
399 }
400
401 #[cfg(feature = "rtos-trace")]
402 trace::task_exec_begin(p.as_ptr() as u32);
403
404 // Run the task
405 task.poll_fn.get().unwrap_unchecked()(p);
406
407 #[cfg(feature = "rtos-trace")]
408 trace::task_exec_end();
409
410 // Enqueue or update into timer_queue
411 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.update(p);
413 });
414
415 #[cfg(feature = "integrated-timers")]
416 {
417 // If this is already in the past, set_alarm might return false
418 // In that case do another poll loop iteration.
419 let next_expiration = self.timer_queue.next_expiration();
420 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
421 break;
422 }
423 }
424 448
425 #[cfg(not(feature = "integrated-timers"))] 449 self.run_queue.dequeue_all(|p| {
426 { 450 let task = p.header();
427 break; 451
428 } 452 #[cfg(feature = "trace")]
429 } 453 trace::task_exec_begin(self, &p);
430 454
431 #[cfg(feature = "rtos-trace")] 455 // Run the task
432 trace::system_idle(); 456 task.poll_fn.get().unwrap_unchecked()(p);
457
458 #[cfg(feature = "trace")]
459 trace::task_exec_end(self, &p);
460 });
461
462 #[cfg(feature = "trace")]
463 trace::executor_idle(self)
433 } 464 }
434} 465}
435 466
@@ -516,6 +547,8 @@ impl Executor {
516 /// 547 ///
517 /// # Safety 548 /// # Safety
518 /// 549 ///
550 /// You must call `initialize` before calling this method.
551 ///
519 /// You must NOT call `poll` reentrantly on the same executor. 552 /// You must NOT call `poll` reentrantly on the same executor.
520 /// 553 ///
521 /// In particular, note that `poll` may call the pender synchronously. Therefore, you 554 /// In particular, note that `poll` may call the pender synchronously. Therefore, you
@@ -533,6 +566,11 @@ impl Executor {
533 pub fn spawner(&'static self) -> super::Spawner { 566 pub fn spawner(&'static self) -> super::Spawner {
534 super::Spawner::new(self) 567 super::Spawner::new(self)
535 } 568 }
569
570 /// Get a unique ID for this Executor.
571 pub fn id(&'static self) -> usize {
572 &self.inner as *const SyncExecutor as usize
573 }
536} 574}
537 575
538/// Wake a task by `TaskRef`. 576/// Wake a task by `TaskRef`.
@@ -540,13 +578,13 @@ impl Executor {
540/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 578/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
541pub fn wake_task(task: TaskRef) { 579pub fn wake_task(task: TaskRef) {
542 let header = task.header(); 580 let header = task.header();
543 if header.state.run_enqueue() { 581 header.state.run_enqueue(|l| {
544 // We have just marked the task as scheduled, so enqueue it. 582 // We have just marked the task as scheduled, so enqueue it.
545 unsafe { 583 unsafe {
546 let executor = header.executor.get().unwrap_unchecked(); 584 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
547 executor.enqueue(task); 585 executor.enqueue(task, l);
548 } 586 }
549 } 587 });
550} 588}
551 589
552/// Wake a task by `TaskRef` without calling pend. 590/// Wake a task by `TaskRef` without calling pend.
@@ -554,57 +592,11 @@ pub fn wake_task(task: TaskRef) {
554/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 592/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
555pub fn wake_task_no_pend(task: TaskRef) { 593pub fn wake_task_no_pend(task: TaskRef) {
556 let header = task.header(); 594 let header = task.header();
557 if header.state.run_enqueue() { 595 header.state.run_enqueue(|l| {
558 // We have just marked the task as scheduled, so enqueue it. 596 // We have just marked the task as scheduled, so enqueue it.
559 unsafe { 597 unsafe {
560 let executor = header.executor.get().unwrap_unchecked(); 598 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
561 executor.run_queue.enqueue(task); 599 executor.run_queue.enqueue(task, l);
562 } 600 }
563 } 601 });
564} 602}
565
566#[cfg(feature = "integrated-timers")]
567struct TimerQueue;
568
569#[cfg(feature = "integrated-timers")]
570impl embassy_time_queue_driver::TimerQueue for TimerQueue {
571 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
572 let task = waker::task_from_waker(waker);
573 let task = task.header();
574 unsafe {
575 let expires_at = task.expires_at.get();
576 task.expires_at.set(expires_at.min(at));
577 }
578 }
579}
580
581#[cfg(feature = "integrated-timers")]
582embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
583
584#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))]
585const fn gcd(a: u64, b: u64) -> u64 {
586 if b == 0 {
587 a
588 } else {
589 gcd(b, a % b)
590 }
591}
592
593#[cfg(feature = "rtos-trace")]
594impl rtos_trace::RtosTraceOSCallbacks for Executor {
595 fn task_list() {
596 // We don't know what tasks exist, so we can't send them.
597 }
598 #[cfg(feature = "integrated-timers")]
599 fn time() -> u64 {
600 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
601 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
602 }
603 #[cfg(not(feature = "integrated-timers"))]
604 fn time() -> u64 {
605 0
606 }
607}
608
609#[cfg(feature = "rtos-trace")]
610rtos_trace::global_os_callbacks! {Executor}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
index 90907cfda..ce511d79a 100644
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_atomics.rs
@@ -45,7 +45,7 @@ impl RunQueue {
45 /// 45 ///
46 /// `item` must NOT be already enqueued in any queue. 46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)] 47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { 48 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
49 let mut was_empty = false; 49 let mut was_empty = false;
50 50
51 self.head 51 self.head
@@ -81,6 +81,7 @@ impl RunQueue {
81 // safety: there are no concurrent accesses to `next` 81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() }; 82 next = unsafe { task.header().run_queue_item.next.get() };
83 83
84 task.header().state.run_dequeue();
84 on_task(task); 85 on_task(task);
85 } 86 }
86 } 87 }
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
index ba59c8f29..86c4085ed 100644
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ b/embassy-executor/src/raw/run_queue_critical_section.rs
@@ -44,13 +44,11 @@ impl RunQueue {
44 /// 44 ///
45 /// `item` must NOT be already enqueued in any queue. 45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)] 46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { 47 pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
48 critical_section::with(|cs| { 48 let prev = self.head.borrow(cs).replace(Some(task));
49 let prev = self.head.borrow(cs).replace(Some(task)); 49 task.header().run_queue_item.next.borrow(cs).set(prev);
50 task.header().run_queue_item.next.borrow(cs).set(prev);
51 50
52 prev.is_none() 51 prev.is_none()
53 })
54 } 52 }
55 53
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 54 /// Empty the queue, then call `on_task` for each task that was in the queue.
@@ -65,9 +63,10 @@ impl RunQueue {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten. 63 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task. 64 // Therefore, first read the next pointer, and only then process the task.
67 65
68 // safety: we know if the task is enqueued, no one else will touch the `next` pointer. 66 critical_section::with(|cs| {
69 let cs = unsafe { CriticalSection::new() }; 67 next = task.header().run_queue_item.next.borrow(cs).get();
70 next = task.header().run_queue_item.next.borrow(cs).get(); 68 task.header().state.run_dequeue(cs);
69 });
71 70
72 on_task(task); 71 on_task(task);
73 } 72 }
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e1279ac0b..e813548ae 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,21 +1,28 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1use core::sync::atomic::{AtomicU8, Ordering};
2
3#[derive(Clone, Copy)]
4pub(crate) struct Token(());
5
6/// Creates a token and passes it to the closure.
7///
8/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
9pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
10 f(Token(()))
11}
2 12
3/// Task is spawned (has a future) 13/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 14pub(crate) const STATE_SPAWNED: u8 = 1 << 0;
5/// Task is in the executor run queue 15/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 16pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1;
7/// Task is in the executor timer queue
8#[cfg(feature = "integrated-timers")]
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
10 17
11pub(crate) struct State { 18pub(crate) struct State {
12 state: AtomicU32, 19 state: AtomicU8,
13} 20}
14 21
15impl State { 22impl State {
16 pub const fn new() -> State { 23 pub const fn new() -> State {
17 Self { 24 Self {
18 state: AtomicU32::new(0), 25 state: AtomicU8::new(0),
19 } 26 }
20 } 27 }
21 28
@@ -33,41 +40,19 @@ impl State {
33 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 40 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
34 } 41 }
35 42
36 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 43 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
44 /// function if the task was successfully marked.
37 #[inline(always)] 45 #[inline(always)]
38 pub fn run_enqueue(&self) -> bool { 46 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
39 self.state 47 let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel);
40 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { 48 if prev & STATE_RUN_QUEUED == 0 {
41 // If already scheduled, or if not started, 49 locked(f);
42 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 50 }
43 None
44 } else {
45 // Mark it as scheduled
46 Some(state | STATE_RUN_QUEUED)
47 }
48 })
49 .is_ok()
50 } 51 }
51 52
52 /// Unmark the task as run-queued. Return whether the task is spawned. 53 /// Unmark the task as run-queued. Return whether the task is spawned.
53 #[inline(always)] 54 #[inline(always)]
54 pub fn run_dequeue(&self) -> bool { 55 pub fn run_dequeue(&self) {
55 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 56 self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
56 state & STATE_SPAWNED != 0
57 }
58
59 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
60 #[cfg(feature = "integrated-timers")]
61 #[inline(always)]
62 pub fn timer_enqueue(&self) -> bool {
63 let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
64 old_state & STATE_TIMER_QUEUED == 0
65 }
66
67 /// Unmark the task as timer-queued.
68 #[cfg(feature = "integrated-timers")]
69 #[inline(always)]
70 pub fn timer_dequeue(&self) {
71 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
72 } 57 }
73} 58}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index e4dfe5093..b743dcc2c 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,6 +1,15 @@
1use core::arch::asm;
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 1use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 2
3#[derive(Clone, Copy)]
4pub(crate) struct Token(());
5
6/// Creates a token and passes it to the closure.
7///
8/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
9pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
10 f(Token(()))
11}
12
4// Must be kept in sync with the layout of `State`! 13// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 14pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; 15pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
@@ -11,9 +20,8 @@ pub(crate) struct State {
11 spawned: AtomicBool, 20 spawned: AtomicBool,
12 /// Task is in the executor run queue 21 /// Task is in the executor run queue
13 run_queued: AtomicBool, 22 run_queued: AtomicBool,
14 /// Task is in the executor timer queue
15 timer_queued: AtomicBool,
16 pad: AtomicBool, 23 pad: AtomicBool,
24 pad2: AtomicBool,
17} 25}
18 26
19impl State { 27impl State {
@@ -21,8 +29,8 @@ impl State {
21 Self { 29 Self {
22 spawned: AtomicBool::new(false), 30 spawned: AtomicBool::new(false),
23 run_queued: AtomicBool::new(false), 31 run_queued: AtomicBool::new(false),
24 timer_queued: AtomicBool::new(false),
25 pad: AtomicBool::new(false), 32 pad: AtomicBool::new(false),
33 pad2: AtomicBool::new(false),
26 } 34 }
27 } 35 }
28 36
@@ -54,50 +62,22 @@ impl State {
54 self.spawned.store(false, Ordering::Relaxed); 62 self.spawned.store(false, Ordering::Relaxed);
55 } 63 }
56 64
57 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 65 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
66 /// function if the task was successfully marked.
58 #[inline(always)] 67 #[inline(always)]
59 pub fn run_enqueue(&self) -> bool { 68 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
60 unsafe { 69 let old = self.run_queued.swap(true, Ordering::AcqRel);
61 loop {
62 let state: u32;
63 asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));
64 70
65 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 71 if !old {
66 asm!("clrex", options(nomem, nostack)); 72 locked(f);
67 return false;
68 }
69
70 let outcome: usize;
71 let new_state = state | STATE_RUN_QUEUED;
72 asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
73 if outcome == 0 {
74 return true;
75 }
76 }
77 } 73 }
78 } 74 }
79 75
80 /// Unmark the task as run-queued. Return whether the task is spawned. 76 /// Unmark the task as run-queued. Return whether the task is spawned.
81 #[inline(always)] 77 #[inline(always)]
82 pub fn run_dequeue(&self) -> bool { 78 pub fn run_dequeue(&self) {
83 compiler_fence(Ordering::Release); 79 compiler_fence(Ordering::Release);
84 80
85 let r = self.spawned.load(Ordering::Relaxed);
86 self.run_queued.store(false, Ordering::Relaxed); 81 self.run_queued.store(false, Ordering::Relaxed);
87 r
88 }
89
90 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
91 #[cfg(feature = "integrated-timers")]
92 #[inline(always)]
93 pub fn timer_enqueue(&self) -> bool {
94 !self.timer_queued.swap(true, Ordering::Relaxed)
95 }
96
97 /// Unmark the task as timer-queued.
98 #[cfg(feature = "integrated-timers")]
99 #[inline(always)]
100 pub fn timer_dequeue(&self) {
101 self.timer_queued.store(false, Ordering::Relaxed);
102 } 82 }
103} 83}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index c3cc1b0b7..ec08f2f58 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -1,17 +1,15 @@
1use core::cell::Cell; 1use core::cell::Cell;
2 2
3use critical_section::Mutex; 3pub(crate) use critical_section::{with as locked, CriticalSection as Token};
4use critical_section::{CriticalSection, Mutex};
4 5
5/// Task is spawned (has a future) 6/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 7pub(crate) const STATE_SPAWNED: u8 = 1 << 0;
7/// Task is in the executor run queue 8/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 9pub(crate) const STATE_RUN_QUEUED: u8 = 1 << 1;
9/// Task is in the executor timer queue
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
12 10
13pub(crate) struct State { 11pub(crate) struct State {
14 state: Mutex<Cell<u32>>, 12 state: Mutex<Cell<u8>>,
15} 13}
16 14
17impl State { 15impl State {
@@ -21,14 +19,16 @@ impl State {
21 } 19 }
22 } 20 }
23 21
24 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { 22 fn update<R>(&self, f: impl FnOnce(&mut u8) -> R) -> R {
25 critical_section::with(|cs| { 23 critical_section::with(|cs| self.update_with_cs(cs, f))
26 let s = self.state.borrow(cs); 24 }
27 let mut val = s.get(); 25
28 let r = f(&mut val); 26 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u8) -> R) -> R {
29 s.set(val); 27 let s = self.state.borrow(cs);
30 r 28 let mut val = s.get();
31 }) 29 let r = f(&mut val);
30 s.set(val);
31 r
32 } 32 }
33 33
34 /// If task is idle, mark it as spawned + run_queued and return true. 34 /// If task is idle, mark it as spawned + run_queued and return true.
@@ -50,44 +50,24 @@ impl State {
50 self.update(|s| *s &= !STATE_SPAWNED); 50 self.update(|s| *s &= !STATE_SPAWNED);
51 } 51 }
52 52
53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
54 /// function if the task was successfully marked.
54 #[inline(always)] 55 #[inline(always)]
55 pub fn run_enqueue(&self) -> bool { 56 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
56 self.update(|s| { 57 critical_section::with(|cs| {
57 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { 58 if self.update_with_cs(cs, |s| {
58 false 59 let ok = *s & STATE_RUN_QUEUED == 0;
59 } else {
60 *s |= STATE_RUN_QUEUED; 60 *s |= STATE_RUN_QUEUED;
61 true 61 ok
62 }) {
63 f(cs);
62 } 64 }
63 }) 65 });
64 } 66 }
65 67
66 /// Unmark the task as run-queued. Return whether the task is spawned. 68 /// Unmark the task as run-queued. Return whether the task is spawned.
67 #[inline(always)] 69 #[inline(always)]
68 pub fn run_dequeue(&self) -> bool { 70 pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
69 self.update(|s| { 71 self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
70 let ok = *s & STATE_SPAWNED != 0;
71 *s &= !STATE_RUN_QUEUED;
72 ok
73 })
74 }
75
76 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
77 #[cfg(feature = "integrated-timers")]
78 #[inline(always)]
79 pub fn timer_enqueue(&self) -> bool {
80 self.update(|s| {
81 let ok = *s & STATE_TIMER_QUEUED == 0;
82 *s |= STATE_TIMER_QUEUED;
83 ok
84 })
85 }
86
87 /// Unmark the task as timer-queued.
88 #[cfg(feature = "integrated-timers")]
89 #[inline(always)]
90 pub fn timer_dequeue(&self) {
91 self.update(|s| *s &= !STATE_TIMER_QUEUED);
92 } 72 }
93} 73}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 94a5f340b..e52453be4 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,76 +1,73 @@
1use core::cmp::min; 1//! Timer queue operations.
2
3use core::cell::Cell;
2 4
3use super::TaskRef; 5use super::TaskRef;
4use crate::raw::util::SyncUnsafeCell;
5 6
6pub(crate) struct TimerQueueItem { 7#[cfg(feature = "_timer-item-payload")]
7 next: SyncUnsafeCell<Option<TaskRef>>, 8macro_rules! define_opaque {
8} 9 ($size:tt) => {
10 /// An opaque data type.
11 #[repr(align($size))]
12 pub struct OpaqueData {
13 data: [u8; $size],
14 }
9 15
10impl TimerQueueItem { 16 impl OpaqueData {
11 pub const fn new() -> Self { 17 const fn new() -> Self {
12 Self { 18 Self { data: [0; $size] }
13 next: SyncUnsafeCell::new(None), 19 }
20
21 /// Access the data as a reference to a type `T`.
22 ///
23 /// Safety:
24 ///
25 /// The caller must ensure that the size of the type `T` is less than, or equal to
26 /// the size of the payload, and must ensure that the alignment of the type `T` is
27 /// less than, or equal to the alignment of the payload.
28 ///
29 /// The type must be valid when zero-initialized.
30 pub unsafe fn as_ref<T>(&self) -> &T {
31 &*(self.data.as_ptr() as *const T)
32 }
14 } 33 }
15 } 34 };
16} 35}
17 36
18pub(crate) struct TimerQueue { 37#[cfg(feature = "timer-item-payload-size-1")]
19 head: SyncUnsafeCell<Option<TaskRef>>, 38define_opaque!(1);
20} 39#[cfg(feature = "timer-item-payload-size-2")]
40define_opaque!(2);
41#[cfg(feature = "timer-item-payload-size-4")]
42define_opaque!(4);
43#[cfg(feature = "timer-item-payload-size-8")]
44define_opaque!(8);
21 45
22impl TimerQueue { 46/// An item in the timer queue.
23 pub const fn new() -> Self { 47pub struct TimerQueueItem {
24 Self { 48 /// The next item in the queue.
25 head: SyncUnsafeCell::new(None), 49 ///
26 } 50 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
27 } 51 /// value of `Some(dangling_pointer)`
52 pub next: Cell<Option<TaskRef>>,
28 53
29 pub(crate) unsafe fn update(&self, p: TaskRef) { 54 /// The time at which this item expires.
30 let task = p.header(); 55 pub expires_at: Cell<u64>,
31 if task.expires_at.get() != u64::MAX {
32 if task.state.timer_enqueue() {
33 task.timer_queue_item.next.set(self.head.get());
34 self.head.set(Some(p));
35 }
36 }
37 }
38 56
39 pub(crate) unsafe fn next_expiration(&self) -> u64 { 57 /// Some implementation-defined, zero-initialized piece of data.
40 let mut res = u64::MAX; 58 #[cfg(feature = "_timer-item-payload")]
41 self.retain(|p| { 59 pub payload: OpaqueData,
42 let task = p.header(); 60}
43 let expires = task.expires_at.get();
44 res = min(res, expires);
45 expires != u64::MAX
46 });
47 res
48 }
49 61
50 pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) { 62unsafe impl Sync for TimerQueueItem {}
51 self.retain(|p| {
52 let task = p.header();
53 if task.expires_at.get() <= now {
54 on_task(p);
55 false
56 } else {
57 true
58 }
59 });
60 }
61 63
62 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { 64impl TimerQueueItem {
63 let mut prev = &self.head; 65 pub(crate) const fn new() -> Self {
64 while let Some(p) = prev.get() { 66 Self {
65 let task = p.header(); 67 next: Cell::new(None),
66 if f(p) { 68 expires_at: Cell::new(0),
67 // Skip to next 69 #[cfg(feature = "_timer-item-payload")]
68 prev = &task.timer_queue_item.next; 70 payload: OpaqueData::new(),
69 } else {
70 // Remove it
71 prev.set(task.timer_queue_item.next.get());
72 task.state.timer_dequeue();
73 }
74 } 71 }
75 } 72 }
76} 73}
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs
new file mode 100644
index 000000000..6c9cfda25
--- /dev/null
+++ b/embassy-executor/src/raw/trace.rs
@@ -0,0 +1,412 @@
1//! # Tracing
2//!
3//! The `trace` feature enables a number of callbacks that can be used to track the
4//! lifecycle of tasks and/or executors.
5//!
6//! Callbacks will have one or both of the following IDs passed to them:
7//!
8//! 1. A `task_id`, a `u32` value unique to a task for the duration of the time it is valid
9//! 2. An `executor_id`, a `u32` value unique to an executor for the duration of the time it is
10//! valid
11//!
12//! Today, both `task_id` and `executor_id` are u32s containing the least significant 32 bits of
13//! the address of the task or executor, however this is NOT a stable guarantee, and MAY change
14//! at any time.
15//!
16//! IDs are only guaranteed to be unique for the duration of time the item is valid. If a task
17//! ends, and is re-spawned, it MAY or MAY NOT have the same ID. For tasks, this valid time is defined
18//! as the time between `_embassy_trace_task_new` and `_embassy_trace_task_end` for a given task.
19//! For executors, this time is not defined, but is often "forever" for practical embedded
20//! programs.
21//!
22//! Callbacks can be used by enabling the `trace` feature, and providing implementations of the
23//! `extern "Rust"` functions below. All callbacks must be implemented.
24//!
25//! ## Task Tracing lifecycle
26//!
27//! ```text
28//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
29//! │(1) │
30//! │ │
31//! ╔════▼════╗ (2) ┌─────────┐ (3) ┌─────────┐ │
32//! │ ║ SPAWNED ║────▶│ WAITING │────▶│ RUNNING │
33//! ╚═════════╝ └─────────┘ └─────────┘ │
34//! │ ▲ ▲ │ │ │
35//! │ (4) │ │(6) │
36//! │ │(7) └ ─ ─ ┘ │ │
37//! │ │ │ │
38//! │ ┌──────┐ (5) │ │ ┌─────┐
39//! │ IDLE │◀────────────────┘ └─▶│ END │ │
40//! │ └──────┘ └─────┘
41//! ┌──────────────────────┐ │
42//! └ ┤ Task Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
43//! └──────────────────────┘
44//! ```
45//!
46//! 1. A task is spawned, `_embassy_trace_task_new` is called
47//! 2. A task is enqueued for the first time, `_embassy_trace_task_ready_begin` is called
48//! 3. A task is polled, `_embassy_trace_task_exec_begin` is called
49//! 4. WHILE a task is polled, the task is re-awoken, and `_embassy_trace_task_ready_begin` is
50//! called. The task does not IMMEDIATELY move state, until polling is complete and the
51//! RUNNING state is existed. `_embassy_trace_task_exec_end` is called when polling is
52//! complete, marking the transition to WAITING
53//! 5. Polling is complete, `_embassy_trace_task_exec_end` is called
54//! 6. The task has completed, and `_embassy_trace_task_end` is called
55//! 7. A task is awoken, `_embassy_trace_task_ready_begin` is called
56//!
57//! ## Executor Tracing lifecycle
58//!
59//! ```text
60//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
61//! │(1) │
62//! │ │
63//! ╔═══▼══╗ (2) ┌────────────┐ (3) ┌─────────┐ │
64//! │ ║ IDLE ║──────────▶│ SCHEDULING │──────▶│ POLLING │
65//! ╚══════╝ └────────────┘ └─────────┘ │
66//! │ ▲ │ ▲ │
67//! │ (5) │ │ (4) │ │
68//! │ └──────────────┘ └────────────┘
69//! ┌──────────────────────────┐ │
70//! └ ┤ Executor Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71//! └──────────────────────────┘
72//! ```
73//!
74//! 1. The executor is started (no associated trace)
75//! 2. A task on this executor is awoken. `_embassy_trace_task_ready_begin` is called
76//! when this occurs, and `_embassy_trace_poll_start` is called when the executor
77//! actually begins running
78//! 3. The executor has decided a task to poll. `_embassy_trace_task_exec_begin` is called
79//! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called
80//! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called
81
82#![allow(unused)]
83
84use core::cell::UnsafeCell;
85use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
86
87use rtos_trace::TaskInfo;
88
89use crate::raw::{SyncExecutor, TaskHeader, TaskRef};
90use crate::spawner::{SpawnError, SpawnToken, Spawner};
91
92/// Global task tracker instance
93///
94/// This static provides access to the global task tracker which maintains
95/// a list of all tasks in the system. It's automatically updated by the
96/// task lifecycle hooks in the trace module.
97pub static TASK_TRACKER: TaskTracker = TaskTracker::new();
98
99/// A thread-safe tracker for all tasks in the system
100///
101/// This struct uses an intrusive linked list approach to track all tasks
102/// without additional memory allocations. It maintains a global list of
103/// tasks that can be traversed to find all currently existing tasks.
104pub struct TaskTracker {
105 head: AtomicPtr<TaskHeader>,
106}
107
108impl TaskTracker {
109 /// Creates a new empty task tracker
110 ///
111 /// Initializes a tracker with no tasks in its list.
112 pub const fn new() -> Self {
113 Self {
114 head: AtomicPtr::new(core::ptr::null_mut()),
115 }
116 }
117
118 /// Adds a task to the tracker
119 ///
120 /// This method inserts a task at the head of the intrusive linked list.
121 /// The operation is thread-safe and lock-free, using atomic operations
122 /// to ensure consistency even when called from different contexts.
123 ///
124 /// # Arguments
125 /// * `task` - The task reference to add to the tracker
126 pub fn add(&self, task: TaskRef) {
127 let task_ptr = task.as_ptr() as *mut TaskHeader;
128
129 loop {
130 let current_head = self.head.load(Ordering::Acquire);
131 unsafe {
132 (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed);
133 }
134
135 if self
136 .head
137 .compare_exchange(current_head, task_ptr, Ordering::Release, Ordering::Relaxed)
138 .is_ok()
139 {
140 break;
141 }
142 }
143 }
144
145 /// Performs an operation on each task in the tracker
146 ///
147 /// This method traverses the entire list of tasks and calls the provided
148 /// function for each task. This allows inspecting or processing all tasks
149 /// in the system without modifying the tracker's structure.
150 ///
151 /// # Arguments
152 /// * `f` - A function to call for each task in the tracker
153 pub fn for_each<F>(&self, mut f: F)
154 where
155 F: FnMut(TaskRef),
156 {
157 let mut current = self.head.load(Ordering::Acquire);
158 while !current.is_null() {
159 let task = unsafe { TaskRef::from_ptr(current) };
160 f(task);
161
162 current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) };
163 }
164 }
165}
166
167/// Extension trait for `TaskRef` that provides tracing functionality.
168///
169/// This trait is only available when the `trace` feature is enabled.
170/// It extends `TaskRef` with methods for accessing and modifying task identifiers
171/// and names, which are useful for debugging, logging, and performance analysis.
172pub trait TaskRefTrace {
173 /// Get the name for a task
174 fn name(&self) -> Option<&'static str>;
175
176 /// Set the name for a task
177 fn set_name(&self, name: Option<&'static str>);
178
179 /// Get the ID for a task
180 fn id(&self) -> u32;
181
182 /// Set the ID for a task
183 fn set_id(&self, id: u32);
184}
185
186impl TaskRefTrace for TaskRef {
187 fn name(&self) -> Option<&'static str> {
188 self.header().name
189 }
190
191 fn set_name(&self, name: Option<&'static str>) {
192 unsafe {
193 let header_ptr = self.ptr.as_ptr() as *mut TaskHeader;
194 (*header_ptr).name = name;
195 }
196 }
197
198 fn id(&self) -> u32 {
199 self.header().id
200 }
201
202 fn set_id(&self, id: u32) {
203 unsafe {
204 let header_ptr = self.ptr.as_ptr() as *mut TaskHeader;
205 (*header_ptr).id = id;
206 }
207 }
208}
209
210#[cfg(not(feature = "rtos-trace"))]
211extern "Rust" {
212 /// This callback is called when the executor begins polling. This will always
213 /// be paired with a later call to `_embassy_trace_executor_idle`.
214 ///
215 /// This marks the EXECUTOR state transition from IDLE -> SCHEDULING.
216 fn _embassy_trace_poll_start(executor_id: u32);
217
218 /// This callback is called AFTER a task is initialized/allocated, and BEFORE
219 /// it is enqueued to run for the first time. If the task ends (and does not
220 /// loop "forever"), there will be a matching call to `_embassy_trace_task_end`.
221 ///
222 /// Tasks start life in the SPAWNED state.
223 fn _embassy_trace_task_new(executor_id: u32, task_id: u32);
224
225 /// This callback is called AFTER a task is destructed/freed. This will always
226 /// have a prior matching call to `_embassy_trace_task_new`.
227 fn _embassy_trace_task_end(executor_id: u32, task_id: u32);
228
229 /// This callback is called AFTER a task has been dequeued from the runqueue,
230 /// and BEFORE the task is polled. There will always be a matching call to
231 /// `_embassy_trace_task_exec_end`.
232 ///
233 /// This marks the TASK state transition from WAITING -> RUNNING
234 /// This marks the EXECUTOR state transition from SCHEDULING -> POLLING
235 fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32);
236
237 /// This callback is called AFTER a task has completed polling. There will
238 /// always be a matching call to `_embassy_trace_task_exec_begin`.
239 ///
240 /// This marks the TASK state transition from either:
241 /// * RUNNING -> IDLE - if there were no `_embassy_trace_task_ready_begin` events
242 /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task
243 /// * RUNNING -> WAITING - if there WAS a `_embassy_trace_task_ready_begin` event
244 /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task
245 ///
246 /// This marks the EXECUTOR state transition from POLLING -> SCHEDULING
247 fn _embassy_trace_task_exec_end(excutor_id: u32, task_id: u32);
248
249 /// This callback is called AFTER the waker for a task is awoken, and BEFORE it
250 /// is added to the run queue.
251 ///
252 /// If the given task is currently RUNNING, this marks no state change, BUT the
253 /// RUNNING task will then move to the WAITING stage when polling is complete.
254 ///
255 /// If the given task is currently IDLE, this marks the TASK state transition
256 /// from IDLE -> WAITING.
257 ///
258 /// NOTE: This may be called from an interrupt, outside the context of the current
259 /// task or executor.
260 fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32);
261
262 /// This callback is called AFTER all dequeued tasks in a single call to poll
263 /// have been processed. This will always be paired with a call to
264 /// `_embassy_trace_executor_idle`.
265 ///
266 /// This marks the EXECUTOR state transition from SCHEDULING -> IDLE
267 fn _embassy_trace_executor_idle(executor_id: u32);
268}
269
270#[inline]
271pub(crate) fn poll_start(executor: &SyncExecutor) {
272 #[cfg(not(feature = "rtos-trace"))]
273 unsafe {
274 _embassy_trace_poll_start(executor as *const _ as u32)
275 }
276}
277
278#[inline]
279pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) {
280 #[cfg(not(feature = "rtos-trace"))]
281 unsafe {
282 _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32)
283 }
284
285 #[cfg(feature = "rtos-trace")]
286 rtos_trace::trace::task_new(task.as_ptr() as u32);
287
288 #[cfg(feature = "rtos-trace")]
289 TASK_TRACKER.add(*task);
290}
291
292#[inline]
293pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) {
294 #[cfg(not(feature = "rtos-trace"))]
295 unsafe {
296 _embassy_trace_task_end(executor as u32, task.as_ptr() as u32)
297 }
298}
299
300#[inline]
301pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) {
302 #[cfg(not(feature = "rtos-trace"))]
303 unsafe {
304 _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32)
305 }
306 #[cfg(feature = "rtos-trace")]
307 rtos_trace::trace::task_ready_begin(task.as_ptr() as u32);
308}
309
310#[inline]
311pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) {
312 #[cfg(not(feature = "rtos-trace"))]
313 unsafe {
314 _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32)
315 }
316 #[cfg(feature = "rtos-trace")]
317 rtos_trace::trace::task_exec_begin(task.as_ptr() as u32);
318}
319
320#[inline]
321pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) {
322 #[cfg(not(feature = "rtos-trace"))]
323 unsafe {
324 _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32)
325 }
326 #[cfg(feature = "rtos-trace")]
327 rtos_trace::trace::task_exec_end();
328}
329
330#[inline]
331pub(crate) fn executor_idle(executor: &SyncExecutor) {
332 #[cfg(not(feature = "rtos-trace"))]
333 unsafe {
334 _embassy_trace_executor_idle(executor as *const _ as u32)
335 }
336 #[cfg(feature = "rtos-trace")]
337 rtos_trace::trace::system_idle();
338}
339
340/// Returns an iterator over all active tasks in the system
341///
342/// This function provides a convenient way to iterate over all tasks
343/// that are currently tracked in the system. The returned iterator
344/// yields each task in the global task tracker.
345///
346/// # Returns
347/// An iterator that yields `TaskRef` items for each task
348fn get_all_active_tasks() -> impl Iterator<Item = TaskRef> + 'static {
349 struct TaskIterator<'a> {
350 tracker: &'a TaskTracker,
351 current: *mut TaskHeader,
352 }
353
354 impl<'a> Iterator for TaskIterator<'a> {
355 type Item = TaskRef;
356
357 fn next(&mut self) -> Option<Self::Item> {
358 if self.current.is_null() {
359 return None;
360 }
361
362 let task = unsafe { TaskRef::from_ptr(self.current) };
363 self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) };
364
365 Some(task)
366 }
367 }
368
369 TaskIterator {
370 tracker: &TASK_TRACKER,
371 current: TASK_TRACKER.head.load(Ordering::Acquire),
372 }
373}
374
375/// Perform an action on each active task
376fn with_all_active_tasks<F>(f: F)
377where
378 F: FnMut(TaskRef),
379{
380 TASK_TRACKER.for_each(f);
381}
382
383#[cfg(feature = "rtos-trace")]
384impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor {
385 fn task_list() {
386 with_all_active_tasks(|task| {
387 let name = task.name().unwrap_or("unnamed task\0");
388 let info = rtos_trace::TaskInfo {
389 name,
390 priority: 0,
391 stack_base: 0,
392 stack_size: 0,
393 };
394 rtos_trace::trace::task_send_info(task.id(), info);
395 });
396 }
397 fn time() -> u64 {
398 const fn gcd(a: u64, b: u64) -> u64 {
399 if b == 0 {
400 a
401 } else {
402 gcd(b, a % b)
403 }
404 }
405
406 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
407 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
408 }
409}
410
411#[cfg(feature = "rtos-trace")]
412rtos_trace::global_os_callbacks! {SyncExecutor}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 8d3910a25..d0d7b003d 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -26,38 +26,17 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
26/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps 26/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
27/// avoid dynamic dispatch. 27/// avoid dynamic dispatch.
28/// 28///
29/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). 29/// You can use the returned task pointer to wake the task with [`wake_task`].
30/// 30///
31/// # Panics 31/// # Panics
32/// 32///
33/// Panics if the waker is not created by the Embassy executor. 33/// Panics if the waker is not created by the Embassy executor.
34pub fn task_from_waker(waker: &Waker) -> TaskRef { 34pub fn task_from_waker(waker: &Waker) -> TaskRef {
35 let (vtable, data) = { 35 // make sure to compare vtable addresses. Doing `==` on the references
36 #[cfg(not(feature = "nightly"))] 36 // will compare the contents, which is slower.
37 { 37 if waker.vtable() as *const _ != &VTABLE as *const _ {
38 struct WakerHack {
39 data: *const (),
40 vtable: &'static RawWakerVTable,
41 }
42
43 // safety: OK because WakerHack has the same layout as Waker.
44 // This is not really guaranteed because the structs are `repr(Rust)`, it is
45 // indeed the case in the current implementation.
46 // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
47 let hack: &WakerHack = unsafe { core::mem::transmute(waker) };
48 (hack.vtable, hack.data)
49 }
50
51 #[cfg(feature = "nightly")]
52 {
53 let raw_waker = waker.as_raw();
54 (raw_waker.vtable(), raw_waker.data())
55 }
56 };
57
58 if vtable != &VTABLE {
59 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") 38 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
60 } 39 }
61 // safety: our wakers are always created with `TaskRef::as_ptr` 40 // safety: our wakers are always created with `TaskRef::as_ptr`
62 unsafe { TaskRef::from_ptr(data as *const TaskHeader) } 41 unsafe { TaskRef::from_ptr(waker.data() as *const TaskHeader) }
63} 42}
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 271606244..522d97db3 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -1,9 +1,12 @@
1use core::future::poll_fn; 1use core::future::{poll_fn, Future};
2use core::marker::PhantomData; 2use core::marker::PhantomData;
3use core::mem; 3use core::mem;
4use core::sync::atomic::Ordering;
4use core::task::Poll; 5use core::task::Poll;
5 6
6use super::raw; 7use super::raw;
8#[cfg(feature = "trace")]
9use crate::raw::trace::TaskRefTrace;
7 10
8/// Token to spawn a newly-created task in an executor. 11/// Token to spawn a newly-created task in an executor.
9/// 12///
@@ -21,7 +24,7 @@ use super::raw;
21/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. 24/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
22#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] 25#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
23pub struct SpawnToken<S> { 26pub struct SpawnToken<S> {
24 raw_task: Option<raw::TaskRef>, 27 pub(crate) raw_task: Option<raw::TaskRef>,
25 phantom: PhantomData<*mut S>, 28 phantom: PhantomData<*mut S>,
26} 29}
27 30
@@ -33,6 +36,15 @@ impl<S> SpawnToken<S> {
33 } 36 }
34 } 37 }
35 38
39 /// Returns the task id if available, otherwise 0
40 /// This can be used in combination with rtos-trace to match task names with id's
41 pub fn id(&self) -> u32 {
42 match self.raw_task {
43 None => 0,
44 Some(t) => t.as_ptr() as u32,
45 }
46 }
47
36 /// Return a SpawnToken that represents a failed spawn. 48 /// Return a SpawnToken that represents a failed spawn.
37 pub fn new_failed() -> Self { 49 pub fn new_failed() -> Self {
38 Self { 50 Self {
@@ -50,8 +62,7 @@ impl<S> Drop for SpawnToken<S> {
50} 62}
51 63
52/// Error returned when spawning a task. 64/// Error returned when spawning a task.
53#[derive(Copy, Clone, Debug)] 65#[derive(Copy, Clone)]
54#[cfg_attr(feature = "defmt", derive(defmt::Format))]
55pub enum SpawnError { 66pub enum SpawnError {
56 /// Too many instances of this task are already running. 67 /// Too many instances of this task are already running.
57 /// 68 ///
@@ -61,6 +72,31 @@ pub enum SpawnError {
61 Busy, 72 Busy,
62} 73}
63 74
75impl core::fmt::Debug for SpawnError {
76 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77 core::fmt::Display::fmt(self, f)
78 }
79}
80
81impl core::fmt::Display for SpawnError {
82 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
83 match self {
84 SpawnError::Busy => write!(f, "Busy - Too many instances of this task are already running. Check the `pool_size` attribute of the task."),
85 }
86 }
87}
88
89#[cfg(feature = "defmt")]
90impl defmt::Format for SpawnError {
91 fn format(&self, f: defmt::Formatter) {
92 match self {
93 SpawnError::Busy => defmt::write!(f, "Busy - Too many instances of this task are already running. Check the `pool_size` attribute of the task."),
94 }
95 }
96}
97
98impl core::error::Error for SpawnError {}
99
64/// Handle to spawn tasks into an executor. 100/// Handle to spawn tasks into an executor.
65/// 101///
66/// This Spawner can spawn any task (Send and non-Send ones), but it can 102/// This Spawner can spawn any task (Send and non-Send ones), but it can
@@ -69,7 +105,7 @@ pub enum SpawnError {
69/// If you want to spawn tasks from another thread, use [SendSpawner]. 105/// If you want to spawn tasks from another thread, use [SendSpawner].
70#[derive(Copy, Clone)] 106#[derive(Copy, Clone)]
71pub struct Spawner { 107pub struct Spawner {
72 executor: &'static raw::Executor, 108 pub(crate) executor: &'static raw::Executor,
73 not_send: PhantomData<*mut ()>, 109 not_send: PhantomData<*mut ()>,
74} 110}
75 111
@@ -89,14 +125,19 @@ impl Spawner {
89 /// # Panics 125 /// # Panics
90 /// 126 ///
91 /// Panics if the current executor is not an Embassy executor. 127 /// Panics if the current executor is not an Embassy executor.
92 pub async fn for_current_executor() -> Self { 128 pub fn for_current_executor() -> impl Future<Output = Self> {
93 poll_fn(|cx| { 129 poll_fn(|cx| {
94 let task = raw::task_from_waker(cx.waker()); 130 let task = raw::task_from_waker(cx.waker());
95 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 131 let executor = unsafe {
132 task.header()
133 .executor
134 .load(Ordering::Relaxed)
135 .as_ref()
136 .unwrap_unchecked()
137 };
96 let executor = unsafe { raw::Executor::wrap(executor) }; 138 let executor = unsafe { raw::Executor::wrap(executor) };
97 Poll::Ready(Self::new(executor)) 139 Poll::Ready(Self::new(executor))
98 }) 140 })
99 .await
100 } 141 }
101 142
102 /// Spawn a task into an executor. 143 /// Spawn a task into an executor.
@@ -134,6 +175,58 @@ impl Spawner {
134 pub fn make_send(&self) -> SendSpawner { 175 pub fn make_send(&self) -> SendSpawner {
135 SendSpawner::new(&self.executor.inner) 176 SendSpawner::new(&self.executor.inner)
136 } 177 }
178
179 /// Return the unique ID of this Spawner's Executor.
180 pub fn executor_id(&self) -> usize {
181 self.executor.id()
182 }
183}
184
185/// Extension trait adding tracing capabilities to the Spawner
186///
187/// This trait provides an additional method to spawn tasks with an associated name,
188/// which can be useful for debugging and tracing purposes.
189pub trait SpawnerTraceExt {
190 /// Spawns a new task with a specified name.
191 ///
192 /// # Arguments
193 /// * `name` - Static string name to associate with the task
194 /// * `token` - Token representing the task to spawn
195 ///
196 /// # Returns
197 /// Result indicating whether the spawn was successful
198 fn spawn_named<S>(&self, name: &'static str, token: SpawnToken<S>) -> Result<(), SpawnError>;
199}
200
201/// Implementation of the SpawnerTraceExt trait for Spawner when trace is enabled
202#[cfg(feature = "trace")]
203impl SpawnerTraceExt for Spawner {
204 fn spawn_named<S>(&self, name: &'static str, token: SpawnToken<S>) -> Result<(), SpawnError> {
205 let task = token.raw_task;
206 core::mem::forget(token);
207
208 match task {
209 Some(task) => {
210 // Set the name and ID when trace is enabled
211 task.set_name(Some(name));
212 let task_id = task.as_ptr() as u32;
213 task.set_id(task_id);
214
215 unsafe { self.executor.spawn(task) };
216 Ok(())
217 }
218 None => Err(SpawnError::Busy),
219 }
220 }
221}
222
223/// Implementation of the SpawnerTraceExt trait for Spawner when trace is disabled
224#[cfg(not(feature = "trace"))]
225impl SpawnerTraceExt for Spawner {
226 fn spawn_named<S>(&self, _name: &'static str, token: SpawnToken<S>) -> Result<(), SpawnError> {
227 // When trace is disabled, just forward to regular spawn and ignore the name
228 self.spawn(token)
229 }
137} 230}
138 231
139/// Handle to spawn tasks into an executor from any thread. 232/// Handle to spawn tasks into an executor from any thread.
@@ -161,13 +254,18 @@ impl SendSpawner {
161 /// # Panics 254 /// # Panics
162 /// 255 ///
163 /// Panics if the current executor is not an Embassy executor. 256 /// Panics if the current executor is not an Embassy executor.
164 pub async fn for_current_executor() -> Self { 257 pub fn for_current_executor() -> impl Future<Output = Self> {
165 poll_fn(|cx| { 258 poll_fn(|cx| {
166 let task = raw::task_from_waker(cx.waker()); 259 let task = raw::task_from_waker(cx.waker());
167 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 260 let executor = unsafe {
261 task.header()
262 .executor
263 .load(Ordering::Relaxed)
264 .as_ref()
265 .unwrap_unchecked()
266 };
168 Poll::Ready(Self::new(executor)) 267 Poll::Ready(Self::new(executor))
169 }) 268 })
170 .await
171 } 269 }
172 270
173 /// Spawn a task into an executor. 271 /// Spawn a task into an executor.