aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorDion Dokter <[email protected]>2025-11-20 13:22:38 +0100
committerDion Dokter <[email protected]>2025-11-20 13:22:38 +0100
commit4f2c36e447455e8d33607d586859d3d075cabf1d (patch)
tree003cd822d688acd7c074dd229663b4648d100f71 /embassy-executor/src
parent663732d85abbae400f2dbab2c411802a5b60e9b1 (diff)
parent661874d11de7d93ed52e08e020a9d4c7ee11122d (diff)
Merge branch 'main' into u0-lcd
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/avr.rs4
-rw-r--r--embassy-executor/src/arch/cortex_ar.rs87
-rw-r--r--embassy-executor/src/arch/cortex_m.rs13
-rw-r--r--embassy-executor/src/arch/riscv32.rs4
-rw-r--r--embassy-executor/src/arch/spin.rs58
-rw-r--r--embassy-executor/src/arch/std.rs4
-rw-r--r--embassy-executor/src/arch/wasm.rs4
-rw-r--r--embassy-executor/src/fmt.rs31
-rw-r--r--embassy-executor/src/lib.rs254
-rw-r--r--embassy-executor/src/metadata.rs148
-rw-r--r--embassy-executor/src/raw/deadline.rs44
-rw-r--r--embassy-executor/src/raw/mod.rs343
-rw-r--r--embassy-executor/src/raw/run_queue.rs213
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs87
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs75
-rw-r--r--embassy-executor/src/raw/state_atomics.rs74
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs60
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs79
-rw-r--r--embassy-executor/src/raw/timer_queue.rs76
-rw-r--r--embassy-executor/src/raw/trace.rs380
-rw-r--r--embassy-executor/src/raw/waker.rs37
-rw-r--r--embassy-executor/src/raw/waker_turbo.rs4
-rw-r--r--embassy-executor/src/spawner.rs147
23 files changed, 1514 insertions, 712 deletions
diff --git a/embassy-executor/src/arch/avr.rs b/embassy-executor/src/arch/avr.rs
index 70085d04d..a841afe15 100644
--- a/embassy-executor/src/arch/avr.rs
+++ b/embassy-executor/src/arch/avr.rs
@@ -10,11 +10,11 @@ mod thread {
10 pub use embassy_executor_macros::main_avr as main; 10 pub use embassy_executor_macros::main_avr as main;
11 use portable_atomic::{AtomicBool, Ordering}; 11 use portable_atomic::{AtomicBool, Ordering};
12 12
13 use crate::{raw, Spawner}; 13 use crate::{Spawner, raw};
14 14
15 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); 15 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
16 16
17 #[export_name = "__pender"] 17 #[unsafe(export_name = "__pender")]
18 fn __pender(_context: *mut ()) { 18 fn __pender(_context: *mut ()) {
19 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); 19 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
20 } 20 }
diff --git a/embassy-executor/src/arch/cortex_ar.rs b/embassy-executor/src/arch/cortex_ar.rs
new file mode 100644
index 000000000..ce572738a
--- /dev/null
+++ b/embassy-executor/src/arch/cortex_ar.rs
@@ -0,0 +1,87 @@
1#[cfg(arm_profile = "legacy")]
2compile_error!("`arch-cortex-ar` does not support the legacy ARM profile, WFE/SEV are not available.");
3
4#[cfg(feature = "executor-interrupt")]
5compile_error!("`executor-interrupt` is not supported with `arch-cortex-ar`.");
6
7#[unsafe(export_name = "__pender")]
8#[cfg(any(feature = "executor-thread", feature = "executor-interrupt"))]
9fn __pender(context: *mut ()) {
10 // `context` is always `usize::MAX` created by `Executor::run`.
11 let context = context as usize;
12
13 #[cfg(feature = "executor-thread")]
14 // Try to make Rust optimize the branching away if we only use thread mode.
15 if !cfg!(feature = "executor-interrupt") || context == THREAD_PENDER {
16 aarch32_cpu::asm::sev();
17 return;
18 }
19}
20
21#[cfg(feature = "executor-thread")]
22pub use thread::*;
23#[cfg(feature = "executor-thread")]
24mod thread {
25 pub(super) const THREAD_PENDER: usize = usize::MAX;
26
27 use core::marker::PhantomData;
28
29 use aarch32_cpu::asm::wfe;
30 pub use embassy_executor_macros::main_cortex_ar as main;
31
32 use crate::{Spawner, raw};
33
34 /// Thread mode executor, using WFE/SEV.
35 ///
36 /// This is the simplest and most common kind of executor. It runs on
37 /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
38 /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
39 /// is executed, to make the `WFE` exit from sleep and poll the task.
40 ///
41 /// This executor allows for ultra low power consumption for chips where `WFE`
42 /// triggers low-power sleep without extra steps. If your chip requires extra steps,
43 /// you may use [`raw::Executor`] directly to program custom behavior.
44 pub struct Executor {
45 inner: raw::Executor,
46 not_send: PhantomData<*mut ()>,
47 }
48
49 impl Executor {
50 /// Create a new Executor.
51 pub fn new() -> Self {
52 Self {
53 inner: raw::Executor::new(THREAD_PENDER as *mut ()),
54 not_send: PhantomData,
55 }
56 }
57
58 /// Run the executor.
59 ///
60 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
61 /// this executor. Use it to spawn the initial task(s). After `init` returns,
62 /// the executor starts running the tasks.
63 ///
64 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
65 /// for example by passing it as an argument to the initial tasks.
66 ///
67 /// This function requires `&'static mut self`. This means you have to store the
68 /// Executor instance in a place where it'll live forever and grants you mutable
69 /// access. There's a few ways to do this:
70 ///
71 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
72 /// - a `static mut` (unsafe)
73 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
74 ///
75 /// This function never returns.
76 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
77 init(self.inner.spawner());
78
79 loop {
80 unsafe {
81 self.inner.poll();
82 }
83 wfe();
84 }
85 }
86 }
87}
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs
index 5c517e0a2..1ce96d1d5 100644
--- a/embassy-executor/src/arch/cortex_m.rs
+++ b/embassy-executor/src/arch/cortex_m.rs
@@ -1,4 +1,4 @@
1#[export_name = "__pender"] 1#[unsafe(export_name = "__pender")]
2#[cfg(any(feature = "executor-thread", feature = "executor-interrupt"))] 2#[cfg(any(feature = "executor-thread", feature = "executor-interrupt"))]
3fn __pender(context: *mut ()) { 3fn __pender(context: *mut ()) {
4 unsafe { 4 unsafe {
@@ -53,7 +53,7 @@ mod thread {
53 53
54 pub use embassy_executor_macros::main_cortex_m as main; 54 pub use embassy_executor_macros::main_cortex_m as main;
55 55
56 use crate::{raw, Spawner}; 56 use crate::{Spawner, raw};
57 57
58 /// Thread mode executor, using WFE/SEV. 58 /// Thread mode executor, using WFE/SEV.
59 /// 59 ///
@@ -143,7 +143,7 @@ mod interrupt {
143 /// If this is not the case, you may use an interrupt from any unused peripheral. 143 /// If this is not the case, you may use an interrupt from any unused peripheral.
144 /// 144 ///
145 /// It is somewhat more complex to use, it's recommended to use the thread-mode 145 /// It is somewhat more complex to use, it's recommended to use the thread-mode
146 /// [`Executor`] instead, if it works for your use case. 146 /// [`Executor`](crate::Executor) instead, if it works for your use case.
147 pub struct InterruptExecutor { 147 pub struct InterruptExecutor {
148 started: Mutex<Cell<bool>>, 148 started: Mutex<Cell<bool>>,
149 executor: UnsafeCell<MaybeUninit<raw::Executor>>, 149 executor: UnsafeCell<MaybeUninit<raw::Executor>>,
@@ -179,11 +179,11 @@ mod interrupt {
179 /// The executor keeps running in the background through the interrupt. 179 /// The executor keeps running in the background through the interrupt.
180 /// 180 ///
181 /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`] 181 /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`]
182 /// is returned instead of a [`Spawner`](embassy_executor::Spawner) because the executor effectively runs in a 182 /// is returned instead of a [`Spawner`](crate::Spawner) because the executor effectively runs in a
183 /// different "thread" (the interrupt), so spawning tasks on it is effectively 183 /// different "thread" (the interrupt), so spawning tasks on it is effectively
184 /// sending them. 184 /// sending them.
185 /// 185 ///
186 /// To obtain a [`Spawner`](embassy_executor::Spawner) for this executor, use [`Spawner::for_current_executor()`](embassy_executor::Spawner::for_current_executor()) from 186 /// To obtain a [`Spawner`](crate::Spawner) for this executor, use [`Spawner::for_current_executor()`](crate::Spawner::for_current_executor()) from
187 /// a task running in it. 187 /// a task running in it.
188 /// 188 ///
189 /// # Interrupt requirements 189 /// # Interrupt requirements
@@ -195,6 +195,7 @@ mod interrupt {
195 /// You must set the interrupt priority before calling this method. You MUST NOT 195 /// You must set the interrupt priority before calling this method. You MUST NOT
196 /// do it after. 196 /// do it after.
197 /// 197 ///
198 /// [`SendSpawner`]: crate::SendSpawner
198 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { 199 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner {
199 if critical_section::with(|cs| self.started.borrow(cs).replace(true)) { 200 if critical_section::with(|cs| self.started.borrow(cs).replace(true)) {
200 panic!("InterruptExecutor::start() called multiple times on the same executor."); 201 panic!("InterruptExecutor::start() called multiple times on the same executor.");
@@ -215,7 +216,7 @@ mod interrupt {
215 216
216 /// Get a SendSpawner for this executor 217 /// Get a SendSpawner for this executor
217 /// 218 ///
218 /// This returns a [`SendSpawner`] you can use to spawn tasks on this 219 /// This returns a [`SendSpawner`](crate::SendSpawner) you can use to spawn tasks on this
219 /// executor. 220 /// executor.
220 /// 221 ///
221 /// This MUST only be called on an executor that has already been started. 222 /// This MUST only be called on an executor that has already been started.
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs
index 01e63a9fd..c70c1344a 100644
--- a/embassy-executor/src/arch/riscv32.rs
+++ b/embassy-executor/src/arch/riscv32.rs
@@ -10,12 +10,12 @@ mod thread {
10 10
11 pub use embassy_executor_macros::main_riscv as main; 11 pub use embassy_executor_macros::main_riscv as main;
12 12
13 use crate::{raw, Spawner}; 13 use crate::{Spawner, raw};
14 14
15 /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV 15 /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
16 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); 16 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
17 17
18 #[export_name = "__pender"] 18 #[unsafe(export_name = "__pender")]
19 fn __pender(_context: *mut ()) { 19 fn __pender(_context: *mut ()) {
20 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); 20 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
21 } 21 }
diff --git a/embassy-executor/src/arch/spin.rs b/embassy-executor/src/arch/spin.rs
new file mode 100644
index 000000000..49f3356a6
--- /dev/null
+++ b/embassy-executor/src/arch/spin.rs
@@ -0,0 +1,58 @@
1#[cfg(feature = "executor-interrupt")]
2compile_error!("`executor-interrupt` is not supported with `arch-spin`.");
3
4#[cfg(feature = "executor-thread")]
5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
8 use core::marker::PhantomData;
9
10 pub use embassy_executor_macros::main_spin as main;
11
12 use crate::{Spawner, raw};
13
14 #[unsafe(export_name = "__pender")]
15 fn __pender(_context: *mut ()) {}
16
17 /// Spin Executor
18 pub struct Executor {
19 inner: raw::Executor,
20 not_send: PhantomData<*mut ()>,
21 }
22
23 impl Executor {
24 /// Create a new Executor.
25 pub fn new() -> Self {
26 Self {
27 inner: raw::Executor::new(core::ptr::null_mut()),
28 not_send: PhantomData,
29 }
30 }
31
32 /// Run the executor.
33 ///
34 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
35 /// this executor. Use it to spawn the initial task(s). After `init` returns,
36 /// the executor starts running the tasks.
37 ///
38 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
39 /// for example by passing it as an argument to the initial tasks.
40 ///
41 /// This function requires `&'static mut self`. This means you have to store the
42 /// Executor instance in a place where it'll live forever and grants you mutable
43 /// access. There's a few ways to do this:
44 ///
45 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
46 /// - a `static mut` (unsafe)
47 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
48 ///
49 /// This function never returns.
50 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
51 init(self.inner.spawner());
52
53 loop {
54 unsafe { self.inner.poll() };
55 }
56 }
57 }
58}
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs
index b02b15988..c62ab723b 100644
--- a/embassy-executor/src/arch/std.rs
+++ b/embassy-executor/src/arch/std.rs
@@ -10,9 +10,9 @@ mod thread {
10 10
11 pub use embassy_executor_macros::main_std as main; 11 pub use embassy_executor_macros::main_std as main;
12 12
13 use crate::{raw, Spawner}; 13 use crate::{Spawner, raw};
14 14
15 #[export_name = "__pender"] 15 #[unsafe(export_name = "__pender")]
16 fn __pender(context: *mut ()) { 16 fn __pender(context: *mut ()) {
17 let signaler: &'static Signaler = unsafe { std::mem::transmute(context) }; 17 let signaler: &'static Signaler = unsafe { std::mem::transmute(context) };
18 signaler.signal() 18 signaler.signal()
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs
index f9d0f935c..d2ff2fe51 100644
--- a/embassy-executor/src/arch/wasm.rs
+++ b/embassy-executor/src/arch/wasm.rs
@@ -13,9 +13,9 @@ mod thread {
13 use wasm_bindgen::prelude::*; 13 use wasm_bindgen::prelude::*;
14 14
15 use crate::raw::util::UninitCell; 15 use crate::raw::util::UninitCell;
16 use crate::{raw, Spawner}; 16 use crate::{Spawner, raw};
17 17
18 #[export_name = "__pender"] 18 #[unsafe(export_name = "__pender")]
19 fn __pender(context: *mut ()) { 19 fn __pender(context: *mut ()) {
20 let signaler: &'static WasmContext = unsafe { std::mem::transmute(context) }; 20 let signaler: &'static WasmContext = unsafe { std::mem::transmute(context) };
21 let _ = signaler.promise.then(unsafe { signaler.closure.as_mut() }); 21 let _ = signaler.promise.then(unsafe { signaler.closure.as_mut() });
diff --git a/embassy-executor/src/fmt.rs b/embassy-executor/src/fmt.rs
index 2ac42c557..8ca61bc39 100644
--- a/embassy-executor/src/fmt.rs
+++ b/embassy-executor/src/fmt.rs
@@ -6,6 +6,7 @@ use core::fmt::{Debug, Display, LowerHex};
6#[cfg(all(feature = "defmt", feature = "log"))] 6#[cfg(all(feature = "defmt", feature = "log"))]
7compile_error!("You may not enable both `defmt` and `log` features."); 7compile_error!("You may not enable both `defmt` and `log` features.");
8 8
9#[collapse_debuginfo(yes)]
9macro_rules! assert { 10macro_rules! assert {
10 ($($x:tt)*) => { 11 ($($x:tt)*) => {
11 { 12 {
@@ -17,6 +18,7 @@ macro_rules! assert {
17 }; 18 };
18} 19}
19 20
21#[collapse_debuginfo(yes)]
20macro_rules! assert_eq { 22macro_rules! assert_eq {
21 ($($x:tt)*) => { 23 ($($x:tt)*) => {
22 { 24 {
@@ -28,6 +30,7 @@ macro_rules! assert_eq {
28 }; 30 };
29} 31}
30 32
33#[collapse_debuginfo(yes)]
31macro_rules! assert_ne { 34macro_rules! assert_ne {
32 ($($x:tt)*) => { 35 ($($x:tt)*) => {
33 { 36 {
@@ -39,6 +42,7 @@ macro_rules! assert_ne {
39 }; 42 };
40} 43}
41 44
45#[collapse_debuginfo(yes)]
42macro_rules! debug_assert { 46macro_rules! debug_assert {
43 ($($x:tt)*) => { 47 ($($x:tt)*) => {
44 { 48 {
@@ -50,6 +54,7 @@ macro_rules! debug_assert {
50 }; 54 };
51} 55}
52 56
57#[collapse_debuginfo(yes)]
53macro_rules! debug_assert_eq { 58macro_rules! debug_assert_eq {
54 ($($x:tt)*) => { 59 ($($x:tt)*) => {
55 { 60 {
@@ -61,6 +66,7 @@ macro_rules! debug_assert_eq {
61 }; 66 };
62} 67}
63 68
69#[collapse_debuginfo(yes)]
64macro_rules! debug_assert_ne { 70macro_rules! debug_assert_ne {
65 ($($x:tt)*) => { 71 ($($x:tt)*) => {
66 { 72 {
@@ -72,6 +78,7 @@ macro_rules! debug_assert_ne {
72 }; 78 };
73} 79}
74 80
81#[collapse_debuginfo(yes)]
75macro_rules! todo { 82macro_rules! todo {
76 ($($x:tt)*) => { 83 ($($x:tt)*) => {
77 { 84 {
@@ -83,20 +90,19 @@ macro_rules! todo {
83 }; 90 };
84} 91}
85 92
86#[cfg(not(feature = "defmt"))] 93#[collapse_debuginfo(yes)]
87macro_rules! unreachable { 94macro_rules! unreachable {
88 ($($x:tt)*) => { 95 ($($x:tt)*) => {
89 ::core::unreachable!($($x)*) 96 {
90 }; 97 #[cfg(not(feature = "defmt"))]
91} 98 ::core::unreachable!($($x)*);
92 99 #[cfg(feature = "defmt")]
93#[cfg(feature = "defmt")] 100 ::defmt::unreachable!($($x)*);
94macro_rules! unreachable { 101 }
95 ($($x:tt)*) => {
96 ::defmt::unreachable!($($x)*)
97 }; 102 };
98} 103}
99 104
105#[collapse_debuginfo(yes)]
100macro_rules! panic { 106macro_rules! panic {
101 ($($x:tt)*) => { 107 ($($x:tt)*) => {
102 { 108 {
@@ -108,6 +114,7 @@ macro_rules! panic {
108 }; 114 };
109} 115}
110 116
117#[collapse_debuginfo(yes)]
111macro_rules! trace { 118macro_rules! trace {
112 ($s:literal $(, $x:expr)* $(,)?) => { 119 ($s:literal $(, $x:expr)* $(,)?) => {
113 { 120 {
@@ -121,6 +128,7 @@ macro_rules! trace {
121 }; 128 };
122} 129}
123 130
131#[collapse_debuginfo(yes)]
124macro_rules! debug { 132macro_rules! debug {
125 ($s:literal $(, $x:expr)* $(,)?) => { 133 ($s:literal $(, $x:expr)* $(,)?) => {
126 { 134 {
@@ -134,6 +142,7 @@ macro_rules! debug {
134 }; 142 };
135} 143}
136 144
145#[collapse_debuginfo(yes)]
137macro_rules! info { 146macro_rules! info {
138 ($s:literal $(, $x:expr)* $(,)?) => { 147 ($s:literal $(, $x:expr)* $(,)?) => {
139 { 148 {
@@ -147,6 +156,7 @@ macro_rules! info {
147 }; 156 };
148} 157}
149 158
159#[collapse_debuginfo(yes)]
150macro_rules! warn { 160macro_rules! warn {
151 ($s:literal $(, $x:expr)* $(,)?) => { 161 ($s:literal $(, $x:expr)* $(,)?) => {
152 { 162 {
@@ -160,6 +170,7 @@ macro_rules! warn {
160 }; 170 };
161} 171}
162 172
173#[collapse_debuginfo(yes)]
163macro_rules! error { 174macro_rules! error {
164 ($s:literal $(, $x:expr)* $(,)?) => { 175 ($s:literal $(, $x:expr)* $(,)?) => {
165 { 176 {
@@ -174,6 +185,7 @@ macro_rules! error {
174} 185}
175 186
176#[cfg(feature = "defmt")] 187#[cfg(feature = "defmt")]
188#[collapse_debuginfo(yes)]
177macro_rules! unwrap { 189macro_rules! unwrap {
178 ($($x:tt)*) => { 190 ($($x:tt)*) => {
179 ::defmt::unwrap!($($x)*) 191 ::defmt::unwrap!($($x)*)
@@ -181,6 +193,7 @@ macro_rules! unwrap {
181} 193}
182 194
183#[cfg(not(feature = "defmt"))] 195#[cfg(not(feature = "defmt"))]
196#[collapse_debuginfo(yes)]
184macro_rules! unwrap { 197macro_rules! unwrap {
185 ($arg:expr) => { 198 ($arg:expr) => {
186 match $crate::fmt::Try::into_result($arg) { 199 match $crate::fmt::Try::into_result($arg) {
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs
index 553ed76d3..cffc76699 100644
--- a/embassy-executor/src/lib.rs
+++ b/embassy-executor/src/lib.rs
@@ -1,6 +1,6 @@
1#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)] 1#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(waker_getters))]
3#![allow(clippy::new_without_default)] 2#![allow(clippy::new_without_default)]
3#![allow(unsafe_op_in_unsafe_fn)]
4#![doc = include_str!("../README.md")] 4#![doc = include_str!("../README.md")]
5#![warn(missing_docs)] 5#![warn(missing_docs)]
6 6
@@ -24,120 +24,236 @@ macro_rules! check_at_most_one {
24 check_at_most_one!(@amo [$($f)*] [$($f)*] []); 24 check_at_most_one!(@amo [$($f)*] [$($f)*] []);
25 }; 25 };
26} 26}
27check_at_most_one!("arch-avr", "arch-cortex-m", "arch-riscv32", "arch-std", "arch-wasm",); 27check_at_most_one!(
28 "arch-avr",
29 "arch-cortex-m",
30 "arch-cortex-ar",
31 "arch-riscv32",
32 "arch-std",
33 "arch-wasm",
34 "arch-spin",
35);
28 36
29#[cfg(feature = "_arch")] 37#[cfg(feature = "_arch")]
30#[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")] 38#[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")]
31#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")] 39#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")]
40#[cfg_attr(feature = "arch-cortex-ar", path = "arch/cortex_ar.rs")]
32#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] 41#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")]
33#[cfg_attr(feature = "arch-std", path = "arch/std.rs")] 42#[cfg_attr(feature = "arch-std", path = "arch/std.rs")]
34#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] 43#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")]
44#[cfg_attr(feature = "arch-spin", path = "arch/spin.rs")]
35mod arch; 45mod arch;
36 46
37#[cfg(feature = "_arch")] 47#[cfg(feature = "_arch")]
38#[allow(unused_imports)] // don't warn if the module is empty. 48#[allow(unused_imports)] // don't warn if the module is empty.
39pub use arch::*; 49pub use arch::*;
50#[cfg(not(feature = "_arch"))]
51pub use embassy_executor_macros::main_unspecified as main;
40 52
41pub mod raw; 53pub mod raw;
42 54
43mod spawner; 55mod spawner;
44pub use spawner::*; 56pub use spawner::*;
45 57
46mod config { 58mod metadata;
47 #![allow(unused)] 59pub use metadata::*;
48 include!(concat!(env!("OUT_DIR"), "/config.rs"));
49}
50 60
51/// Implementation details for embassy macros. 61/// Implementation details for embassy macros.
52/// Do not use. Used for macros and HALs only. Not covered by semver guarantees. 62/// Do not use. Used for macros and HALs only. Not covered by semver guarantees.
53#[doc(hidden)] 63#[doc(hidden)]
54#[cfg(not(feature = "nightly"))] 64#[cfg(not(feature = "nightly"))]
55pub mod _export { 65pub mod _export {
56 use core::alloc::Layout; 66 use core::cell::UnsafeCell;
57 use core::cell::{Cell, UnsafeCell};
58 use core::future::Future; 67 use core::future::Future;
59 use core::mem::MaybeUninit; 68 use core::mem::MaybeUninit;
60 use core::ptr::null_mut;
61
62 use critical_section::{CriticalSection, Mutex};
63 69
64 use crate::raw::TaskPool; 70 use crate::raw::TaskPool;
65 71
66 struct Arena<const N: usize> { 72 trait TaskReturnValue {}
67 buf: UnsafeCell<MaybeUninit<[u8; N]>>, 73 impl TaskReturnValue for () {}
68 ptr: Mutex<Cell<*mut u8>>, 74 impl TaskReturnValue for Never {}
69 }
70 75
71 unsafe impl<const N: usize> Sync for Arena<N> {} 76 #[diagnostic::on_unimplemented(
72 unsafe impl<const N: usize> Send for Arena<N> {} 77 message = "task futures must resolve to `()` or `!`",
78 note = "use `async fn` or change the return type to `impl Future<Output = ()>`"
79 )]
80 #[allow(private_bounds)]
81 pub trait TaskFn<Args>: Copy {
82 type Fut: Future<Output: TaskReturnValue> + 'static;
83 }
73 84
74 impl<const N: usize> Arena<N> { 85 macro_rules! task_fn_impl {
75 const fn new() -> Self { 86 ($($Tn:ident),*) => {
76 Self { 87 impl<F, Fut, $($Tn,)*> TaskFn<($($Tn,)*)> for F
77 buf: UnsafeCell::new(MaybeUninit::uninit()), 88 where
78 ptr: Mutex::new(Cell::new(null_mut())), 89 F: Copy + FnOnce($($Tn,)*) -> Fut,
90 Fut: Future<Output: TaskReturnValue> + 'static,
91 {
92 type Fut = Fut;
79 } 93 }
80 } 94 };
95 }
81 96
82 fn alloc<T>(&'static self, cs: CriticalSection) -> &'static mut MaybeUninit<T> { 97 task_fn_impl!();
83 let layout = Layout::new::<T>(); 98 task_fn_impl!(T0);
99 task_fn_impl!(T0, T1);
100 task_fn_impl!(T0, T1, T2);
101 task_fn_impl!(T0, T1, T2, T3);
102 task_fn_impl!(T0, T1, T2, T3, T4);
103 task_fn_impl!(T0, T1, T2, T3, T4, T5);
104 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6);
105 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7);
106 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8);
107 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9);
108 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
109 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
110 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
111 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
112 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
113 task_fn_impl!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
84 114
85 let start = self.buf.get().cast::<u8>(); 115 #[allow(private_bounds)]
86 let end = unsafe { start.add(N) }; 116 #[repr(C)]
117 pub struct TaskPoolHolder<const SIZE: usize, const ALIGN: usize>
118 where
119 Align<ALIGN>: Alignment,
120 {
121 data: UnsafeCell<[MaybeUninit<u8>; SIZE]>,
122 align: Align<ALIGN>,
123 }
87 124
88 let mut ptr = self.ptr.borrow(cs).get(); 125 unsafe impl<const SIZE: usize, const ALIGN: usize> Send for TaskPoolHolder<SIZE, ALIGN> where Align<ALIGN>: Alignment {}
89 if ptr.is_null() { 126 unsafe impl<const SIZE: usize, const ALIGN: usize> Sync for TaskPoolHolder<SIZE, ALIGN> where Align<ALIGN>: Alignment {}
90 ptr = self.buf.get().cast::<u8>();
91 }
92 127
93 let bytes_left = (end as usize) - (ptr as usize); 128 #[allow(private_bounds)]
94 let align_offset = (ptr as usize).next_multiple_of(layout.align()) - (ptr as usize); 129 impl<const SIZE: usize, const ALIGN: usize> TaskPoolHolder<SIZE, ALIGN>
130 where
131 Align<ALIGN>: Alignment,
132 {
133 pub const fn get(&self) -> *const u8 {
134 self.data.get().cast()
135 }
136 }
95 137
96 if align_offset + layout.size() > bytes_left { 138 pub const fn task_pool_size<F, Args, Fut, const POOL_SIZE: usize>(_: F) -> usize
97 panic!("embassy-executor: task arena is full. You must increase the arena size, see the documentation for details: https://docs.embassy.dev/embassy-executor/"); 139 where
98 } 140 F: TaskFn<Args, Fut = Fut>,
141 Fut: Future + 'static,
142 {
143 size_of::<TaskPool<Fut, POOL_SIZE>>()
144 }
145
146 pub const fn task_pool_align<F, Args, Fut, const POOL_SIZE: usize>(_: F) -> usize
147 where
148 F: TaskFn<Args, Fut = Fut>,
149 Fut: Future + 'static,
150 {
151 align_of::<TaskPool<Fut, POOL_SIZE>>()
152 }
99 153
100 let res = unsafe { ptr.add(align_offset) }; 154 pub const fn task_pool_new<F, Args, Fut, const POOL_SIZE: usize>(_: F) -> TaskPool<Fut, POOL_SIZE>
101 let ptr = unsafe { ptr.add(align_offset + layout.size()) }; 155 where
156 F: TaskFn<Args, Fut = Fut>,
157 Fut: Future + 'static,
158 {
159 TaskPool::new()
160 }
102 161
103 self.ptr.borrow(cs).set(ptr); 162 #[allow(private_bounds)]
163 #[repr(transparent)]
164 pub struct Align<const N: usize>([<Self as Alignment>::Archetype; 0])
165 where
166 Self: Alignment;
104 167
105 unsafe { &mut *(res as *mut MaybeUninit<T>) } 168 trait Alignment {
106 } 169 /// A zero-sized type of particular alignment.
170 type Archetype: Copy + Eq + PartialEq + Send + Sync + Unpin;
107 } 171 }
108 172
109 static ARENA: Arena<{ crate::config::TASK_ARENA_SIZE }> = Arena::new(); 173 macro_rules! aligns {
174 ($($AlignX:ident: $n:literal,)*) => {
175 $(
176 #[derive(Copy, Clone, Eq, PartialEq)]
177 #[repr(align($n))]
178 struct $AlignX {}
179 impl Alignment for Align<$n> {
180 type Archetype = $AlignX;
181 }
182 )*
183 };
184 }
185
186 aligns!(
187 Align1: 1,
188 Align2: 2,
189 Align4: 4,
190 Align8: 8,
191 Align16: 16,
192 Align32: 32,
193 Align64: 64,
194 Align128: 128,
195 Align256: 256,
196 Align512: 512,
197 Align1024: 1024,
198 Align2048: 2048,
199 Align4096: 4096,
200 Align8192: 8192,
201 Align16384: 16384,
202 );
203 #[cfg(any(target_pointer_width = "32", target_pointer_width = "64"))]
204 aligns!(
205 Align32768: 32768,
206 Align65536: 65536,
207 Align131072: 131072,
208 Align262144: 262144,
209 Align524288: 524288,
210 Align1048576: 1048576,
211 Align2097152: 2097152,
212 Align4194304: 4194304,
213 Align8388608: 8388608,
214 Align16777216: 16777216,
215 Align33554432: 33554432,
216 Align67108864: 67108864,
217 Align134217728: 134217728,
218 Align268435456: 268435456,
219 Align536870912: 536870912,
220 );
110 221
111 pub struct TaskPoolRef { 222 #[allow(dead_code)]
112 // type-erased `&'static mut TaskPool<F, N>` 223 pub trait HasOutput {
113 // Needed because statics can't have generics. 224 type Output;
114 ptr: Mutex<Cell<*mut ()>>,
115 } 225 }
116 unsafe impl Sync for TaskPoolRef {}
117 unsafe impl Send for TaskPoolRef {}
118 226
119 impl TaskPoolRef { 227 impl<O> HasOutput for fn() -> O {
120 pub const fn new() -> Self { 228 type Output = O;
121 Self { 229 }
122 ptr: Mutex::new(Cell::new(null_mut())),
123 }
124 }
125 230
126 /// Get the pool for this ref, allocating it from the arena the first time. 231 #[allow(dead_code)]
127 /// 232 pub type Never = <fn() -> ! as HasOutput>::Output;
128 /// safety: for a given TaskPoolRef instance, must always call with the exact 233}
129 /// same generic params.
130 pub unsafe fn get<F: Future, const N: usize>(&'static self) -> &'static TaskPool<F, N> {
131 critical_section::with(|cs| {
132 let ptr = self.ptr.borrow(cs);
133 if ptr.get().is_null() {
134 let pool = ARENA.alloc::<TaskPool<F, N>>(cs);
135 pool.write(TaskPool::new());
136 ptr.set(pool as *mut _ as _);
137 }
138 234
139 unsafe { &*(ptr.get() as *const _) } 235/// Implementation details for embassy macros.
140 }) 236/// Do not use. Used for macros and HALs only. Not covered by semver guarantees.
141 } 237#[doc(hidden)]
238#[cfg(feature = "nightly")]
239pub mod _export {
240 #[diagnostic::on_unimplemented(
241 message = "task futures must resolve to `()` or `!`",
242 note = "use `async fn` or change the return type to `impl Future<Output = ()>`"
243 )]
244 pub trait TaskReturnValue {}
245 impl TaskReturnValue for () {}
246 impl TaskReturnValue for Never {}
247
248 #[allow(dead_code)]
249 pub trait HasOutput {
250 type Output;
251 }
252
253 impl<O> HasOutput for fn() -> O {
254 type Output = O;
142 } 255 }
256
257 #[allow(dead_code)]
258 pub type Never = <fn() -> ! as HasOutput>::Output;
143} 259}
diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs
new file mode 100644
index 000000000..76504ab0b
--- /dev/null
+++ b/embassy-executor/src/metadata.rs
@@ -0,0 +1,148 @@
1#[cfg(feature = "metadata-name")]
2use core::cell::Cell;
3use core::future::{Future, poll_fn};
4#[cfg(feature = "scheduler-priority")]
5use core::sync::atomic::{AtomicU8, Ordering};
6use core::task::Poll;
7
8#[cfg(feature = "metadata-name")]
9use critical_section::Mutex;
10
11use crate::raw;
12#[cfg(feature = "scheduler-deadline")]
13use crate::raw::Deadline;
14
15/// Metadata associated with a task.
16pub struct Metadata {
17 #[cfg(feature = "metadata-name")]
18 name: Mutex<Cell<Option<&'static str>>>,
19 #[cfg(feature = "scheduler-priority")]
20 priority: AtomicU8,
21 #[cfg(feature = "scheduler-deadline")]
22 deadline: raw::Deadline,
23}
24
25impl Metadata {
26 pub(crate) const fn new() -> Self {
27 Self {
28 #[cfg(feature = "metadata-name")]
29 name: Mutex::new(Cell::new(None)),
30 #[cfg(feature = "scheduler-priority")]
31 priority: AtomicU8::new(0),
32 // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This
33 // will be lazily initalized in `initialize_impl`
34 #[cfg(feature = "scheduler-deadline")]
35 deadline: raw::Deadline::new_unset(),
36 }
37 }
38
39 pub(crate) fn reset(&self) {
40 #[cfg(feature = "metadata-name")]
41 critical_section::with(|cs| self.name.borrow(cs).set(None));
42
43 #[cfg(feature = "scheduler-priority")]
44 self.set_priority(0);
45
46 // By default, deadlines are set to the maximum value, so that any task WITH
47 // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline
48 #[cfg(feature = "scheduler-deadline")]
49 self.unset_deadline();
50 }
51
52 /// Get the metadata for the current task.
53 ///
54 /// You can use this to read or modify the current task's metadata.
55 ///
56 /// This function is `async` just to get access to the current async
57 /// context. It returns instantly, it does not block/yield.
58 pub fn for_current_task() -> impl Future<Output = &'static Self> {
59 poll_fn(|cx| Poll::Ready(raw::task_from_waker(cx.waker()).metadata()))
60 }
61
62 /// Get this task's name
63 ///
64 /// NOTE: this takes a critical section.
65 #[cfg(feature = "metadata-name")]
66 pub fn name(&self) -> Option<&'static str> {
67 critical_section::with(|cs| self.name.borrow(cs).get())
68 }
69
70 /// Set this task's name
71 ///
72 /// NOTE: this takes a critical section.
73 #[cfg(feature = "metadata-name")]
74 pub fn set_name(&self, name: &'static str) {
75 critical_section::with(|cs| self.name.borrow(cs).set(Some(name)))
76 }
77
78 /// Get this task's priority.
79 #[cfg(feature = "scheduler-priority")]
80 pub fn priority(&self) -> u8 {
81 self.priority.load(Ordering::Relaxed)
82 }
83
84 /// Set this task's priority.
85 #[cfg(feature = "scheduler-priority")]
86 pub fn set_priority(&self, priority: u8) {
87 self.priority.store(priority, Ordering::Relaxed)
88 }
89
90 /// Get this task's deadline.
91 #[cfg(feature = "scheduler-deadline")]
92 pub fn deadline(&self) -> u64 {
93 self.deadline.instant_ticks()
94 }
95
96 /// Set this task's deadline.
97 ///
98 /// This method does NOT check whether the deadline has already passed.
99 #[cfg(feature = "scheduler-deadline")]
100 pub fn set_deadline(&self, instant_ticks: u64) {
101 self.deadline.set(instant_ticks);
102 }
103
104 /// Remove this task's deadline.
105 /// This brings it back to the defaul where it's not scheduled ahead of other tasks.
106 #[cfg(feature = "scheduler-deadline")]
107 pub fn unset_deadline(&self) {
108 self.deadline.set(Deadline::UNSET_TICKS);
109 }
110
111 /// Set this task's deadline `duration_ticks` in the future from when
112 /// this future is polled. This deadline is saturated to the max tick value.
113 ///
114 /// Analogous to `Timer::after`.
115 #[cfg(all(feature = "scheduler-deadline", feature = "embassy-time-driver"))]
116 pub fn set_deadline_after(&self, duration_ticks: u64) {
117 let now = embassy_time_driver::now();
118
119 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
120 // it for now, we can probably make this wrapping_add for performance
121 // reasons later.
122 let deadline = now.saturating_add(duration_ticks);
123
124 self.set_deadline(deadline);
125 }
126
127 /// Set the this task's deadline `increment_ticks` from the previous deadline.
128 ///
129 /// This deadline is saturated to the max tick value.
130 ///
131 /// Note that by default (unless otherwise set), tasks start life with the deadline
132 /// not set, which means this method will have no effect.
133 ///
134 /// Analogous to one increment of `Ticker::every().next()`.
135 ///
136 /// Returns the deadline that was set.
137 #[cfg(feature = "scheduler-deadline")]
138 pub fn increment_deadline(&self, duration_ticks: u64) {
139 let last = self.deadline();
140
141 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
142 // it for now, we can probably make this wrapping_add for performance
143 // reasons later.
144 let deadline = last.saturating_add(duration_ticks);
145
146 self.set_deadline(deadline);
147 }
148}
diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs
new file mode 100644
index 000000000..cc89fadb0
--- /dev/null
+++ b/embassy-executor/src/raw/deadline.rs
@@ -0,0 +1,44 @@
1use core::sync::atomic::{AtomicU32, Ordering};
2
3/// A type for interacting with the deadline of the current task
4///
5/// Requires the `scheduler-deadline` feature.
6///
7/// Note: Interacting with the deadline should be done locally in a task.
8/// In theory you could try to set or read the deadline from another task,
9/// but that will result in weird (though not unsound) behavior.
10pub(crate) struct Deadline {
11 instant_ticks_hi: AtomicU32,
12 instant_ticks_lo: AtomicU32,
13}
14
15impl Deadline {
16 pub(crate) const fn new(instant_ticks: u64) -> Self {
17 Self {
18 instant_ticks_hi: AtomicU32::new((instant_ticks >> 32) as u32),
19 instant_ticks_lo: AtomicU32::new(instant_ticks as u32),
20 }
21 }
22
23 pub(crate) const fn new_unset() -> Self {
24 Self::new(Self::UNSET_TICKS)
25 }
26
27 pub(crate) fn set(&self, instant_ticks: u64) {
28 self.instant_ticks_hi
29 .store((instant_ticks >> 32) as u32, Ordering::Relaxed);
30 self.instant_ticks_lo.store(instant_ticks as u32, Ordering::Relaxed);
31 }
32
33 /// Deadline value in ticks, same time base and ticks as `embassy-time`
34 pub(crate) fn instant_ticks(&self) -> u64 {
35 let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64;
36 let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64;
37
38 (hi << 32) | lo
39 }
40
41 /// Sentinel value representing an "unset" deadline, which has lower priority
42 /// than any other set deadline value
43 pub(crate) const UNSET_TICKS: u64 = u64::MAX;
44}
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index d9ea5c005..ab845ed3b 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,54 +7,113 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 10mod run_queue;
13 11
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] 12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] 13#[cfg_attr(
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 14 all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
15 path = "state_atomics.rs"
16)]
17#[cfg_attr(
18 not(any(target_has_atomic = "8", target_has_atomic = "32")),
19 path = "state_critical_section.rs"
20)]
17mod state; 21mod state;
18 22
19#[cfg(feature = "integrated-timers")] 23#[cfg(feature = "_any_trace")]
20mod timer_queue; 24pub mod trace;
21pub(crate) mod util; 25pub(crate) mod util;
22#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
23mod waker; 27mod waker;
24 28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
25use core::future::Future; 32use core::future::Future;
26use core::marker::PhantomData; 33use core::marker::PhantomData;
27use core::mem; 34use core::mem;
28use core::pin::Pin; 35use core::pin::Pin;
29use core::ptr::NonNull; 36use core::ptr::NonNull;
30use core::task::{Context, Poll}; 37#[cfg(not(feature = "arch-avr"))]
38use core::sync::atomic::AtomicPtr;
39use core::sync::atomic::Ordering;
40use core::task::{Context, Poll, Waker};
31 41
32#[cfg(feature = "integrated-timers")] 42#[cfg(feature = "scheduler-deadline")]
33use embassy_time_driver::AlarmHandle; 43pub(crate) use deadline::Deadline;
34#[cfg(feature = "rtos-trace")] 44use embassy_executor_timer_queue::TimerQueueItem;
35use rtos_trace::trace; 45#[cfg(feature = "arch-avr")]
46use portable_atomic::AtomicPtr;
36 47
37use self::run_queue::{RunQueue, RunQueueItem}; 48use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 49use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 50use self::util::{SyncUnsafeCell, UninitCell};
40pub use self::waker::task_from_waker; 51pub use self::waker::task_from_waker;
41use super::SpawnToken; 52use super::SpawnToken;
53use crate::{Metadata, SpawnError};
54
55#[unsafe(no_mangle)]
56extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
57 unsafe { task_from_waker(waker).timer_queue_item() }
58}
42 59
43/// Raw task header for use in task pointers. 60/// Raw task header for use in task pointers.
61///
62/// A task can be in one of the following states:
63///
64/// - Not spawned: the task is ready to spawn.
65/// - `SPAWNED`: the task is currently spawned and may be running.
66/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
67/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
68/// polling the task's future.
69///
70/// A task's complete life cycle is as follows:
71///
72/// ```text
73/// ┌────────────┐ ┌────────────────────────┐
74/// │Not spawned │◄─5┤Not spawned|Run enqueued│
75/// │ ├6─►│ │
76/// └─────┬──────┘ └──────▲─────────────────┘
77/// 1 │
78/// │ ┌────────────┘
79/// │ 4
80/// ┌─────▼────┴─────────┐
81/// │Spawned|Run enqueued│
82/// │ │
83/// └─────┬▲─────────────┘
84/// 2│
85/// │3
86/// ┌─────▼┴─────┐
87/// │ Spawned │
88/// │ │
89/// └────────────┘
90/// ```
91///
92/// Transitions:
93/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
94/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
95/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
96/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
97/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
98/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
44pub(crate) struct TaskHeader { 99pub(crate) struct TaskHeader {
45 pub(crate) state: State, 100 pub(crate) state: State,
46 pub(crate) run_queue_item: RunQueueItem, 101 pub(crate) run_queue_item: RunQueueItem,
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 102
103 pub(crate) executor: AtomicPtr<SyncExecutor>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 104 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 105
50 #[cfg(feature = "integrated-timers")] 106 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>, 107 pub(crate) timer_queue_item: TimerQueueItem,
52 #[cfg(feature = "integrated-timers")] 108
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 109 pub(crate) metadata: Metadata,
110
111 #[cfg(feature = "rtos-trace")]
112 all_tasks_next: AtomicPtr<TaskHeader>,
54} 113}
55 114
56/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 115/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
57#[derive(Clone, Copy)] 116#[derive(Debug, Clone, Copy, PartialEq)]
58pub struct TaskRef { 117pub struct TaskRef {
59 ptr: NonNull<TaskHeader>, 118 ptr: NonNull<TaskHeader>,
60} 119}
@@ -80,10 +139,35 @@ impl TaskRef {
80 unsafe { self.ptr.as_ref() } 139 unsafe { self.ptr.as_ref() }
81 } 140 }
82 141
142 pub(crate) fn metadata(self) -> &'static Metadata {
143 unsafe { &self.ptr.as_ref().metadata }
144 }
145
146 /// Returns a reference to the executor that the task is currently running on.
147 pub unsafe fn executor(self) -> Option<&'static Executor> {
148 let executor = self.header().executor.load(Ordering::Relaxed);
149 executor.as_ref().map(|e| Executor::wrap(e))
150 }
151
152 /// Returns a mutable reference to the timer queue item.
153 ///
154 /// Safety
155 ///
156 /// This function must only be called in the context of the integrated timer queue.
157 pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
158 unsafe { &mut self.ptr.as_mut().timer_queue_item }
159 }
160
83 /// The returned pointer is valid for the entire TaskStorage. 161 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 162 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 163 self.ptr.as_ptr()
86 } 164 }
165
166 /// Returns the task ID.
167 /// This can be used in combination with rtos-trace to match task names with IDs
168 pub fn id(&self) -> u32 {
169 self.as_ptr() as u32
170 }
87} 171}
88 172
89/// Raw storage in which a task can be spawned. 173/// Raw storage in which a task can be spawned.
@@ -107,6 +191,10 @@ pub struct TaskStorage<F: Future + 'static> {
107 future: UninitCell<F>, // Valid if STATE_SPAWNED 191 future: UninitCell<F>, // Valid if STATE_SPAWNED
108} 192}
109 193
194unsafe fn poll_exited(_p: TaskRef) {
195 // Nothing to do, the task is already !SPAWNED and dequeued.
196}
197
110impl<F: Future + 'static> TaskStorage<F> { 198impl<F: Future + 'static> TaskStorage<F> {
111 const NEW: Self = Self::new(); 199 const NEW: Self = Self::new();
112 200
@@ -116,14 +204,14 @@ impl<F: Future + 'static> TaskStorage<F> {
116 raw: TaskHeader { 204 raw: TaskHeader {
117 state: State::new(), 205 state: State::new(),
118 run_queue_item: RunQueueItem::new(), 206 run_queue_item: RunQueueItem::new(),
119 executor: SyncUnsafeCell::new(None), 207 executor: AtomicPtr::new(core::ptr::null_mut()),
120 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 208 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
121 poll_fn: SyncUnsafeCell::new(None), 209 poll_fn: SyncUnsafeCell::new(None),
122 210
123 #[cfg(feature = "integrated-timers")] 211 timer_queue_item: TimerQueueItem::new(),
124 expires_at: SyncUnsafeCell::new(0), 212 metadata: Metadata::new(),
125 #[cfg(feature = "integrated-timers")] 213 #[cfg(feature = "rtos-trace")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 214 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
127 }, 215 },
128 future: UninitCell::uninit(), 216 future: UninitCell::uninit(),
129 } 217 }
@@ -142,27 +230,39 @@ impl<F: Future + 'static> TaskStorage<F> {
142 /// 230 ///
143 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 231 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
144 /// on a different executor. 232 /// on a different executor.
145 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
146 let task = AvailableTask::claim(self); 234 let task = AvailableTask::claim(self);
147 match task { 235 match task {
148 Some(task) => task.initialize(future), 236 Some(task) => Ok(task.initialize(future)),
149 None => SpawnToken::new_failed(), 237 None => Err(SpawnError::Busy),
150 } 238 }
151 } 239 }
152 240
153 unsafe fn poll(p: TaskRef) { 241 unsafe fn poll(p: TaskRef) {
154 let this = &*(p.as_ptr() as *const TaskStorage<F>); 242 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
155 243
156 let future = Pin::new_unchecked(this.future.as_mut()); 244 let future = Pin::new_unchecked(this.future.as_mut());
157 let waker = waker::from_task(p); 245 let waker = waker::from_task(p);
158 let mut cx = Context::from_waker(&waker); 246 let mut cx = Context::from_waker(&waker);
159 match future.poll(&mut cx) { 247 match future.poll(&mut cx) {
160 Poll::Ready(_) => { 248 Poll::Ready(_) => {
249 #[cfg(feature = "_any_trace")]
250 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
251
252 // As the future has finished and this function will not be called
253 // again, we can safely drop the future here.
161 this.future.drop_in_place(); 254 this.future.drop_in_place();
255
256 // We replace the poll_fn with a despawn function, so that the task is cleaned up
257 // when the executor polls it next.
258 this.raw.poll_fn.set(Some(poll_exited));
259
260 // Make sure we despawn last, so that other threads can only spawn the task
261 // after we're done with it.
162 this.raw.state.despawn(); 262 this.raw.state.despawn();
163 263
164 #[cfg(feature = "integrated-timers")] 264 #[cfg(feature = "_any_trace")]
165 this.raw.expires_at.set(u64::MAX); 265 trace::task_end(exec_ptr, &p);
166 } 266 }
167 Poll::Pending => {} 267 Poll::Pending => {}
168 } 268 }
@@ -196,6 +296,7 @@ impl<F: Future + 'static> AvailableTask<F> {
196 296
197 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { 297 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
198 unsafe { 298 unsafe {
299 self.task.raw.metadata.reset();
199 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 300 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
200 self.task.future.write_in_place(future); 301 self.task.future.write_in_place(future);
201 302
@@ -262,10 +363,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
262 } 363 }
263 } 364 }
264 365
265 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> { 366 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
266 match self.pool.iter().find_map(AvailableTask::claim) { 367 match self.pool.iter().find_map(AvailableTask::claim) {
267 Some(task) => task.initialize_impl::<T>(future), 368 Some(task) => Ok(task.initialize_impl::<T>(future)),
268 None => SpawnToken::new_failed(), 369 None => Err(SpawnError::Busy),
269 } 370 }
270 } 371 }
271 372
@@ -276,7 +377,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
276 /// This will loop over the pool and spawn the task in the first storage that 377 /// This will loop over the pool and spawn the task in the first storage that
277 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 378 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
278 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 379 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
279 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 380 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
280 self.spawn_impl::<F>(future) 381 self.spawn_impl::<F>(future)
281 } 382 }
282 383
@@ -289,7 +390,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
289 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` 390 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
290 /// is an `async fn`, NOT a hand-written `Future`. 391 /// is an `async fn`, NOT a hand-written `Future`.
291 #[doc(hidden)] 392 #[doc(hidden)]
292 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> 393 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
293 where 394 where
294 FutFn: FnOnce() -> F, 395 FutFn: FnOnce() -> F,
295 { 396 {
@@ -306,7 +407,7 @@ unsafe impl Sync for Pender {}
306 407
307impl Pender { 408impl Pender {
308 pub(crate) fn pend(self) { 409 pub(crate) fn pend(self) {
309 extern "Rust" { 410 unsafe extern "Rust" {
310 fn __pender(context: *mut ()); 411 fn __pender(context: *mut ());
311 } 412 }
312 unsafe { __pender(self.0) }; 413 unsafe { __pender(self.0) };
@@ -316,26 +417,13 @@ impl Pender {
316pub(crate) struct SyncExecutor { 417pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 418 run_queue: RunQueue,
318 pender: Pender, 419 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 420}
325 421
326impl SyncExecutor { 422impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 423 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 424 Self {
332 run_queue: RunQueue::new(), 425 run_queue: RunQueue::new(),
333 pender, 426 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 427 }
340 } 428 }
341 429
@@ -346,90 +434,50 @@ impl SyncExecutor {
346 /// - `task` must be set up to run in this executor. 434 /// - `task` must be set up to run in this executor.
347 /// - `task` must NOT be already enqueued (in this executor or another one). 435 /// - `task` must NOT be already enqueued (in this executor or another one).
348 #[inline(always)] 436 #[inline(always)]
349 unsafe fn enqueue(&self, task: TaskRef) { 437 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
350 #[cfg(feature = "rtos-trace")] 438 #[cfg(feature = "_any_trace")]
351 trace::task_ready_begin(task.as_ptr() as u32); 439 trace::task_ready_begin(self, &task);
352 440
353 if self.run_queue.enqueue(task) { 441 if self.run_queue.enqueue(task, l) {
354 self.pender.pend(); 442 self.pender.pend();
355 } 443 }
356 } 444 }
357 445
358 #[cfg(feature = "integrated-timers")]
359 fn alarm_callback(ctx: *mut ()) {
360 let this: &Self = unsafe { &*(ctx as *const Self) };
361 this.pender.pend();
362 }
363
364 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 446 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
365 task.header().executor.set(Some(self)); 447 task.header()
448 .executor
449 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
366 450
367 #[cfg(feature = "rtos-trace")] 451 #[cfg(feature = "_any_trace")]
368 trace::task_new(task.as_ptr() as u32); 452 trace::task_new(self, &task);
369 453
370 self.enqueue(task); 454 state::locked(|l| {
455 self.enqueue(task, l);
456 })
371 } 457 }
372 458
373 /// # Safety 459 /// # Safety
374 /// 460 ///
375 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 461 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
376 pub(crate) unsafe fn poll(&'static self) { 462 pub(crate) unsafe fn poll(&'static self) {
377 #[cfg(feature = "integrated-timers")] 463 #[cfg(feature = "_any_trace")]
378 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); 464 trace::poll_start(self);
379
380 #[allow(clippy::never_loop)]
381 loop {
382 #[cfg(feature = "integrated-timers")]
383 self.timer_queue
384 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
385
386 self.run_queue.dequeue_all(|p| {
387 let task = p.header();
388
389 #[cfg(feature = "integrated-timers")]
390 task.expires_at.set(u64::MAX);
391
392 if !task.state.run_dequeue() {
393 // If task is not running, ignore it. This can happen in the following scenario:
394 // - Task gets dequeued, poll starts
395 // - While task is being polled, it gets woken. It gets placed in the queue.
396 // - Task poll finishes, returning done=true
397 // - RUNNING bit is cleared, but the task is already in the queue.
398 return;
399 }
400 465
401 #[cfg(feature = "rtos-trace")] 466 self.run_queue.dequeue_all(|p| {
402 trace::task_exec_begin(p.as_ptr() as u32); 467 let task = p.header();
403 468
404 // Run the task 469 #[cfg(feature = "_any_trace")]
405 task.poll_fn.get().unwrap_unchecked()(p); 470 trace::task_exec_begin(self, &p);
406 471
407 #[cfg(feature = "rtos-trace")] 472 // Run the task
408 trace::task_exec_end(); 473 task.poll_fn.get().unwrap_unchecked()(p);
409
410 // Enqueue or update into timer_queue
411 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.update(p);
413 });
414
415 #[cfg(feature = "integrated-timers")]
416 {
417 // If this is already in the past, set_alarm might return false
418 // In that case do another poll loop iteration.
419 let next_expiration = self.timer_queue.next_expiration();
420 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
421 break;
422 }
423 }
424 474
425 #[cfg(not(feature = "integrated-timers"))] 475 #[cfg(feature = "_any_trace")]
426 { 476 trace::task_exec_end(self, &p);
427 break; 477 });
428 }
429 }
430 478
431 #[cfg(feature = "rtos-trace")] 479 #[cfg(feature = "_any_trace")]
432 trace::system_idle(); 480 trace::executor_idle(self)
433 } 481 }
434} 482}
435 483
@@ -459,7 +507,7 @@ impl SyncExecutor {
459/// The pender function must be exported with the name `__pender` and have the following signature: 507/// The pender function must be exported with the name `__pender` and have the following signature:
460/// 508///
461/// ```rust 509/// ```rust
462/// #[export_name = "__pender"] 510/// #[unsafe(export_name = "__pender")]
463/// fn pender(context: *mut ()) { 511/// fn pender(context: *mut ()) {
464/// // schedule `poll()` to be called 512/// // schedule `poll()` to be called
465/// } 513/// }
@@ -533,6 +581,11 @@ impl Executor {
533 pub fn spawner(&'static self) -> super::Spawner { 581 pub fn spawner(&'static self) -> super::Spawner {
534 super::Spawner::new(self) 582 super::Spawner::new(self)
535 } 583 }
584
585 /// Get a unique ID for this Executor.
586 pub fn id(&'static self) -> usize {
587 &self.inner as *const SyncExecutor as usize
588 }
536} 589}
537 590
538/// Wake a task by `TaskRef`. 591/// Wake a task by `TaskRef`.
@@ -540,13 +593,13 @@ impl Executor {
540/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 593/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
541pub fn wake_task(task: TaskRef) { 594pub fn wake_task(task: TaskRef) {
542 let header = task.header(); 595 let header = task.header();
543 if header.state.run_enqueue() { 596 header.state.run_enqueue(|l| {
544 // We have just marked the task as scheduled, so enqueue it. 597 // We have just marked the task as scheduled, so enqueue it.
545 unsafe { 598 unsafe {
546 let executor = header.executor.get().unwrap_unchecked(); 599 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
547 executor.enqueue(task); 600 executor.enqueue(task, l);
548 } 601 }
549 } 602 });
550} 603}
551 604
552/// Wake a task by `TaskRef` without calling pend. 605/// Wake a task by `TaskRef` without calling pend.
@@ -554,57 +607,11 @@ pub fn wake_task(task: TaskRef) {
554/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 607/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
555pub fn wake_task_no_pend(task: TaskRef) { 608pub fn wake_task_no_pend(task: TaskRef) {
556 let header = task.header(); 609 let header = task.header();
557 if header.state.run_enqueue() { 610 header.state.run_enqueue(|l| {
558 // We have just marked the task as scheduled, so enqueue it. 611 // We have just marked the task as scheduled, so enqueue it.
559 unsafe { 612 unsafe {
560 let executor = header.executor.get().unwrap_unchecked(); 613 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
561 executor.run_queue.enqueue(task); 614 executor.run_queue.enqueue(task, l);
562 } 615 }
563 } 616 });
564} 617}
565
566#[cfg(feature = "integrated-timers")]
567struct TimerQueue;
568
569#[cfg(feature = "integrated-timers")]
570impl embassy_time_queue_driver::TimerQueue for TimerQueue {
571 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
572 let task = waker::task_from_waker(waker);
573 let task = task.header();
574 unsafe {
575 let expires_at = task.expires_at.get();
576 task.expires_at.set(expires_at.min(at));
577 }
578 }
579}
580
581#[cfg(feature = "integrated-timers")]
582embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
583
584#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))]
585const fn gcd(a: u64, b: u64) -> u64 {
586 if b == 0 {
587 a
588 } else {
589 gcd(b, a % b)
590 }
591}
592
593#[cfg(feature = "rtos-trace")]
594impl rtos_trace::RtosTraceOSCallbacks for Executor {
595 fn task_list() {
596 // We don't know what tasks exist, so we can't send them.
597 }
598 #[cfg(feature = "integrated-timers")]
599 fn time() -> u64 {
600 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
601 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
602 }
603 #[cfg(not(feature = "integrated-timers"))]
604 fn time() -> u64 {
605 0
606 }
607}
608
609#[cfg(feature = "rtos-trace")]
610rtos_trace::global_os_callbacks! {Executor}
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..6f2abdbd0
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,213 @@
1use core::ptr::{NonNull, addr_of_mut};
2
3use cordyceps::Linked;
4#[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
5use cordyceps::SortedList;
6use cordyceps::sorted_list::Links;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = MutexTransferStack<T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
74 self.stack.push_was_empty(
75 task,
76 #[cfg(not(target_has_atomic = "ptr"))]
77 _tok,
78 )
79 }
80
81 /// # Standard atomic runqueue
82 ///
83 /// Empty the queue, then call `on_task` for each task that was in the queue.
84 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
85 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
86 #[cfg(not(any(feature = "scheduler-priority", feature = "scheduler-deadline")))]
87 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
88 let taken = self.stack.take_all();
89 for taskref in taken {
90 run_dequeue(&taskref);
91 on_task(taskref);
92 }
93 }
94
95 /// # Earliest Deadline First Scheduler
96 ///
97 /// This algorithm will loop until all enqueued tasks are processed.
98 ///
99 /// Before polling a task, all currently enqueued tasks will be popped from the
100 /// runqueue, and will be added to the working `sorted` list, a linked-list that
101 /// sorts tasks by their deadline, with nearest deadline items in the front, and
102 /// furthest deadline items in the back.
103 ///
104 /// After popping and sorting all pending tasks, the SOONEST task will be popped
105 /// from the front of the queue, and polled by calling `on_task` on it.
106 ///
107 /// This process will repeat until the local `sorted` queue AND the global
108 /// runqueue are both empty, at which point this function will return.
109 #[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
110 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
111 let mut sorted = SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| {
112 // compare by priority first
113 #[cfg(feature = "scheduler-priority")]
114 {
115 let lp = lhs.metadata.priority();
116 let rp = rhs.metadata.priority();
117 if lp != rp {
118 return lp.cmp(&rp).reverse();
119 }
120 }
121 // compare deadlines in case of tie.
122 #[cfg(feature = "scheduler-deadline")]
123 {
124 let ld = lhs.metadata.deadline();
125 let rd = rhs.metadata.deadline();
126 if ld != rd {
127 return ld.cmp(&rd);
128 }
129 }
130 core::cmp::Ordering::Equal
131 });
132
133 loop {
134 // For each loop, grab any newly pended items
135 let taken = self.stack.take_all();
136
137 // Sort these into the list - this is potentially expensive! We do an
138 // insertion sort of new items, which iterates the linked list.
139 //
140 // Something on the order of `O(n * m)`, where `n` is the number
141 // of new tasks, and `m` is the number of already pending tasks.
142 sorted.extend(taken);
143
144 // Pop the task with the SOONEST deadline. If there are no tasks
145 // pending, then we are done.
146 let Some(taskref) = sorted.pop_front() else {
147 return;
148 };
149
150 // We got one task, mark it as dequeued, and process the task.
151 run_dequeue(&taskref);
152 on_task(taskref);
153 }
154 }
155}
156
157/// atomic state does not require a cs...
158#[cfg(target_has_atomic = "ptr")]
159#[inline(always)]
160fn run_dequeue(taskref: &TaskRef) {
161 taskref.header().state.run_dequeue();
162}
163
164/// ...while non-atomic state does
165#[cfg(not(target_has_atomic = "ptr"))]
166#[inline(always)]
167fn run_dequeue(taskref: &TaskRef) {
168 critical_section::with(|cs| {
169 taskref.header().state.run_dequeue(cs);
170 })
171}
172
173/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
174#[cfg(not(target_has_atomic = "ptr"))]
175struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
176 inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
177}
178
179#[cfg(not(target_has_atomic = "ptr"))]
180impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
181 const fn new() -> Self {
182 Self {
183 inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
184 }
185 }
186
187 /// Push an item to the transfer stack, returning whether the stack was previously empty
188 fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
189 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
190 // for the lifetime of the token, but does NOT protect against re-entrant access.
191 // However, we never *return* the reference, nor do we recurse (or call another method
192 // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
193 // presence of the critical section is sufficient to guarantee exclusive access to
194 // the `inner` field for the purposes of this function.
195 let inner = unsafe { &mut *self.inner.borrow(token).get() };
196 let is_empty = inner.is_empty();
197 inner.push(item);
198 is_empty
199 }
200
201 fn take_all(&self) -> cordyceps::Stack<T> {
202 critical_section::with(|cs| {
203 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
204 // for the lifetime of the token, but does NOT protect against re-entrant access.
205 // However, we never *return* the reference, nor do we recurse (or call another method
206 // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
207 // presence of the critical section is sufficient to guarantee exclusive access to
208 // the `inner` field for the purposes of this function.
209 let inner = unsafe { &mut *self.inner.borrow(cs).get() };
210 inner.take_all()
211 })
212 }
213}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
deleted file mode 100644
index 90907cfda..000000000
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ /dev/null
@@ -1,87 +0,0 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4
5use super::{TaskHeader, TaskRef};
6use crate::raw::util::SyncUnsafeCell;
7
8pub(crate) struct RunQueueItem {
9 next: SyncUnsafeCell<Option<TaskRef>>,
10}
11
12impl RunQueueItem {
13 pub const fn new() -> Self {
14 Self {
15 next: SyncUnsafeCell::new(None),
16 }
17 }
18}
19
20/// Atomic task queue using a very, very simple lock-free linked-list queue:
21///
22/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
23///
24/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
25/// null. Then the batch is iterated following the next pointers until null is reached.
26///
27/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
28/// for our purposes: it can't create fairness problems since the next batch won't run until the
29/// current batch is completely processed, so even if a task enqueues itself instantly (for example
30/// by waking its own waker) can't prevent other tasks from running.
31pub(crate) struct RunQueue {
32 head: AtomicPtr<TaskHeader>,
33}
34
35impl RunQueue {
36 pub const fn new() -> Self {
37 Self {
38 head: AtomicPtr::new(ptr::null_mut()),
39 }
40 }
41
42 /// Enqueues an item. Returns true if the queue was empty.
43 ///
44 /// # Safety
45 ///
46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
49 let mut was_empty = false;
50
51 self.head
52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
53 was_empty = prev.is_null();
54 unsafe {
55 // safety: the pointer is either null or valid
56 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
57 // safety: there are no concurrent accesses to `next`
58 task.header().run_queue_item.next.set(prev);
59 }
60 Some(task.as_ptr() as *mut _)
61 })
62 .ok();
63
64 was_empty
65 }
66
67 /// Empty the queue, then call `on_task` for each task that was in the queue.
68 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
69 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
70 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
71 // Atomically empty the queue.
72 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
73
74 // safety: the pointer is either null or valid
75 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
76
77 // Iterate the linked list of tasks that were previously in the queue.
78 while let Some(task) = next {
79 // If the task re-enqueues itself, the `next` pointer will get overwritten.
80 // Therefore, first read the next pointer, and only then process the task.
81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() };
83
84 on_task(task);
85 }
86 }
87}
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
deleted file mode 100644
index ba59c8f29..000000000
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ /dev/null
@@ -1,75 +0,0 @@
1use core::cell::Cell;
2
3use critical_section::{CriticalSection, Mutex};
4
5use super::TaskRef;
6
7pub(crate) struct RunQueueItem {
8 next: Mutex<Cell<Option<TaskRef>>>,
9}
10
11impl RunQueueItem {
12 pub const fn new() -> Self {
13 Self {
14 next: Mutex::new(Cell::new(None)),
15 }
16 }
17}
18
19/// Atomic task queue using a very, very simple lock-free linked-list queue:
20///
21/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
22///
23/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
24/// null. Then the batch is iterated following the next pointers until null is reached.
25///
26/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
27/// for our purposes: it can't create fairness problems since the next batch won't run until the
28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
29/// by waking its own waker) can't prevent other tasks from running.
30pub(crate) struct RunQueue {
31 head: Mutex<Cell<Option<TaskRef>>>,
32}
33
34impl RunQueue {
35 pub const fn new() -> Self {
36 Self {
37 head: Mutex::new(Cell::new(None)),
38 }
39 }
40
41 /// Enqueues an item. Returns true if the queue was empty.
42 ///
43 /// # Safety
44 ///
45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
48 critical_section::with(|cs| {
49 let prev = self.head.borrow(cs).replace(Some(task));
50 task.header().run_queue_item.next.borrow(cs).set(prev);
51
52 prev.is_none()
53 })
54 }
55
56 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue.
61 let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
62
63 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = next {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task.
67
68 // safety: we know if the task is enqueued, no one else will touch the `next` pointer.
69 let cs = unsafe { CriticalSection::new() };
70 next = task.header().run_queue_item.next.borrow(cs).get();
71
72 on_task(task);
73 }
74 }
75}
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e1279ac0b..6675875be 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,21 +1,39 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1// Prefer pointer-width atomic operations, as narrower ones may be slower.
2#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))]
3type AtomicState = core::sync::atomic::AtomicU32;
4#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))]
5type AtomicState = core::sync::atomic::AtomicU8;
6
7#[cfg(all(target_pointer_width = "32", target_has_atomic = "32"))]
8type StateBits = u32;
9#[cfg(not(all(target_pointer_width = "32", target_has_atomic = "32")))]
10type StateBits = u8;
11
12use core::sync::atomic::Ordering;
13
14#[derive(Clone, Copy)]
15pub(crate) struct Token(());
16
17/// Creates a token and passes it to the closure.
18///
19/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
20pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
21 f(Token(()))
22}
2 23
3/// Task is spawned (has a future) 24/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 25pub(crate) const STATE_SPAWNED: StateBits = 1 << 0;
5/// Task is in the executor run queue 26/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 27pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1;
7/// Task is in the executor timer queue
8#[cfg(feature = "integrated-timers")]
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
10 28
11pub(crate) struct State { 29pub(crate) struct State {
12 state: AtomicU32, 30 state: AtomicState,
13} 31}
14 32
15impl State { 33impl State {
16 pub const fn new() -> State { 34 pub const fn new() -> State {
17 Self { 35 Self {
18 state: AtomicU32::new(0), 36 state: AtomicState::new(0),
19 } 37 }
20 } 38 }
21 39
@@ -33,41 +51,19 @@ impl State {
33 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 51 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
34 } 52 }
35 53
36 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 54 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
55 /// function if the task was successfully marked.
37 #[inline(always)] 56 #[inline(always)]
38 pub fn run_enqueue(&self) -> bool { 57 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
39 self.state 58 let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel);
40 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { 59 if prev & STATE_RUN_QUEUED == 0 {
41 // If already scheduled, or if not started, 60 locked(f);
42 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 61 }
43 None
44 } else {
45 // Mark it as scheduled
46 Some(state | STATE_RUN_QUEUED)
47 }
48 })
49 .is_ok()
50 } 62 }
51 63
52 /// Unmark the task as run-queued. Return whether the task is spawned. 64 /// Unmark the task as run-queued. Return whether the task is spawned.
53 #[inline(always)] 65 #[inline(always)]
54 pub fn run_dequeue(&self) -> bool { 66 pub fn run_dequeue(&self) {
55 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 67 self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
56 state & STATE_SPAWNED != 0
57 }
58
59 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
60 #[cfg(feature = "integrated-timers")]
61 #[inline(always)]
62 pub fn timer_enqueue(&self) -> bool {
63 let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
64 old_state & STATE_TIMER_QUEUED == 0
65 }
66
67 /// Unmark the task as timer-queued.
68 #[cfg(feature = "integrated-timers")]
69 #[inline(always)]
70 pub fn timer_dequeue(&self) {
71 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
72 } 68 }
73} 69}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index e4dfe5093..f68de955f 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,5 +1,14 @@
1use core::arch::asm; 1use core::sync::atomic::{AtomicBool, AtomicU32, Ordering, compiler_fence};
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 2
3#[derive(Clone, Copy)]
4pub(crate) struct Token(());
5
6/// Creates a token and passes it to the closure.
7///
8/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
9pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
10 f(Token(()))
11}
3 12
4// Must be kept in sync with the layout of `State`! 13// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 14pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
@@ -11,9 +20,8 @@ pub(crate) struct State {
11 spawned: AtomicBool, 20 spawned: AtomicBool,
12 /// Task is in the executor run queue 21 /// Task is in the executor run queue
13 run_queued: AtomicBool, 22 run_queued: AtomicBool,
14 /// Task is in the executor timer queue
15 timer_queued: AtomicBool,
16 pad: AtomicBool, 23 pad: AtomicBool,
24 pad2: AtomicBool,
17} 25}
18 26
19impl State { 27impl State {
@@ -21,8 +29,8 @@ impl State {
21 Self { 29 Self {
22 spawned: AtomicBool::new(false), 30 spawned: AtomicBool::new(false),
23 run_queued: AtomicBool::new(false), 31 run_queued: AtomicBool::new(false),
24 timer_queued: AtomicBool::new(false),
25 pad: AtomicBool::new(false), 32 pad: AtomicBool::new(false),
33 pad2: AtomicBool::new(false),
26 } 34 }
27 } 35 }
28 36
@@ -54,50 +62,22 @@ impl State {
54 self.spawned.store(false, Ordering::Relaxed); 62 self.spawned.store(false, Ordering::Relaxed);
55 } 63 }
56 64
57 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 65 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
66 /// function if the task was successfully marked.
58 #[inline(always)] 67 #[inline(always)]
59 pub fn run_enqueue(&self) -> bool { 68 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
60 unsafe { 69 let old = self.run_queued.swap(true, Ordering::AcqRel);
61 loop {
62 let state: u32;
63 asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));
64 70
65 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 71 if !old {
66 asm!("clrex", options(nomem, nostack)); 72 locked(f);
67 return false;
68 }
69
70 let outcome: usize;
71 let new_state = state | STATE_RUN_QUEUED;
72 asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
73 if outcome == 0 {
74 return true;
75 }
76 }
77 } 73 }
78 } 74 }
79 75
80 /// Unmark the task as run-queued. Return whether the task is spawned. 76 /// Unmark the task as run-queued. Return whether the task is spawned.
81 #[inline(always)] 77 #[inline(always)]
82 pub fn run_dequeue(&self) -> bool { 78 pub fn run_dequeue(&self) {
83 compiler_fence(Ordering::Release); 79 compiler_fence(Ordering::Release);
84 80
85 let r = self.spawned.load(Ordering::Relaxed);
86 self.run_queued.store(false, Ordering::Relaxed); 81 self.run_queued.store(false, Ordering::Relaxed);
87 r
88 }
89
90 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
91 #[cfg(feature = "integrated-timers")]
92 #[inline(always)]
93 pub fn timer_enqueue(&self) -> bool {
94 !self.timer_queued.swap(true, Ordering::Relaxed)
95 }
96
97 /// Unmark the task as timer-queued.
98 #[cfg(feature = "integrated-timers")]
99 #[inline(always)]
100 pub fn timer_dequeue(&self) {
101 self.timer_queued.store(false, Ordering::Relaxed);
102 } 82 }
103} 83}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index c3cc1b0b7..8d7ef2892 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -1,17 +1,20 @@
1use core::cell::Cell; 1use core::cell::Cell;
2 2
3use critical_section::Mutex; 3use critical_section::{CriticalSection, Mutex};
4pub(crate) use critical_section::{CriticalSection as Token, with as locked};
5
6#[cfg(target_arch = "avr")]
7type StateBits = u8;
8#[cfg(not(target_arch = "avr"))]
9type StateBits = usize;
4 10
5/// Task is spawned (has a future) 11/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 12pub(crate) const STATE_SPAWNED: StateBits = 1 << 0;
7/// Task is in the executor run queue 13/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 14pub(crate) const STATE_RUN_QUEUED: StateBits = 1 << 1;
9/// Task is in the executor timer queue
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
12 15
13pub(crate) struct State { 16pub(crate) struct State {
14 state: Mutex<Cell<u32>>, 17 state: Mutex<Cell<StateBits>>,
15} 18}
16 19
17impl State { 20impl State {
@@ -21,14 +24,16 @@ impl State {
21 } 24 }
22 } 25 }
23 26
24 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { 27 fn update<R>(&self, f: impl FnOnce(&mut StateBits) -> R) -> R {
25 critical_section::with(|cs| { 28 critical_section::with(|cs| self.update_with_cs(cs, f))
26 let s = self.state.borrow(cs); 29 }
27 let mut val = s.get(); 30
28 let r = f(&mut val); 31 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut StateBits) -> R) -> R {
29 s.set(val); 32 let s = self.state.borrow(cs);
30 r 33 let mut val = s.get();
31 }) 34 let r = f(&mut val);
35 s.set(val);
36 r
32 } 37 }
33 38
34 /// If task is idle, mark it as spawned + run_queued and return true. 39 /// If task is idle, mark it as spawned + run_queued and return true.
@@ -50,44 +55,24 @@ impl State {
50 self.update(|s| *s &= !STATE_SPAWNED); 55 self.update(|s| *s &= !STATE_SPAWNED);
51 } 56 }
52 57
53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 58 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
59 /// function if the task was successfully marked.
54 #[inline(always)] 60 #[inline(always)]
55 pub fn run_enqueue(&self) -> bool { 61 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
56 self.update(|s| { 62 critical_section::with(|cs| {
57 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { 63 if self.update_with_cs(cs, |s| {
58 false 64 let ok = *s & STATE_RUN_QUEUED == 0;
59 } else {
60 *s |= STATE_RUN_QUEUED; 65 *s |= STATE_RUN_QUEUED;
61 true 66 ok
67 }) {
68 f(cs);
62 } 69 }
63 }) 70 });
64 } 71 }
65 72
66 /// Unmark the task as run-queued. Return whether the task is spawned. 73 /// Unmark the task as run-queued. Return whether the task is spawned.
67 #[inline(always)] 74 #[inline(always)]
68 pub fn run_dequeue(&self) -> bool { 75 pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
69 self.update(|s| { 76 self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
70 let ok = *s & STATE_SPAWNED != 0;
71 *s &= !STATE_RUN_QUEUED;
72 ok
73 })
74 }
75
76 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
77 #[cfg(feature = "integrated-timers")]
78 #[inline(always)]
79 pub fn timer_enqueue(&self) -> bool {
80 self.update(|s| {
81 let ok = *s & STATE_TIMER_QUEUED == 0;
82 *s |= STATE_TIMER_QUEUED;
83 ok
84 })
85 }
86
87 /// Unmark the task as timer-queued.
88 #[cfg(feature = "integrated-timers")]
89 #[inline(always)]
90 pub fn timer_dequeue(&self) {
91 self.update(|s| *s &= !STATE_TIMER_QUEUED);
92 } 77 }
93} 78}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
deleted file mode 100644
index 94a5f340b..000000000
--- a/embassy-executor/src/raw/timer_queue.rs
+++ /dev/null
@@ -1,76 +0,0 @@
1use core::cmp::min;
2
3use super::TaskRef;
4use crate::raw::util::SyncUnsafeCell;
5
6pub(crate) struct TimerQueueItem {
7 next: SyncUnsafeCell<Option<TaskRef>>,
8}
9
10impl TimerQueueItem {
11 pub const fn new() -> Self {
12 Self {
13 next: SyncUnsafeCell::new(None),
14 }
15 }
16}
17
18pub(crate) struct TimerQueue {
19 head: SyncUnsafeCell<Option<TaskRef>>,
20}
21
22impl TimerQueue {
23 pub const fn new() -> Self {
24 Self {
25 head: SyncUnsafeCell::new(None),
26 }
27 }
28
29 pub(crate) unsafe fn update(&self, p: TaskRef) {
30 let task = p.header();
31 if task.expires_at.get() != u64::MAX {
32 if task.state.timer_enqueue() {
33 task.timer_queue_item.next.set(self.head.get());
34 self.head.set(Some(p));
35 }
36 }
37 }
38
39 pub(crate) unsafe fn next_expiration(&self) -> u64 {
40 let mut res = u64::MAX;
41 self.retain(|p| {
42 let task = p.header();
43 let expires = task.expires_at.get();
44 res = min(res, expires);
45 expires != u64::MAX
46 });
47 res
48 }
49
50 pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) {
51 self.retain(|p| {
52 let task = p.header();
53 if task.expires_at.get() <= now {
54 on_task(p);
55 false
56 } else {
57 true
58 }
59 });
60 }
61
62 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
63 let mut prev = &self.head;
64 while let Some(p) = prev.get() {
65 let task = p.header();
66 if f(p) {
67 // Skip to next
68 prev = &task.timer_queue_item.next;
69 } else {
70 // Remove it
71 prev.set(task.timer_queue_item.next.get());
72 task.state.timer_dequeue();
73 }
74 }
75 }
76}
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs
new file mode 100644
index 000000000..830162039
--- /dev/null
+++ b/embassy-executor/src/raw/trace.rs
@@ -0,0 +1,380 @@
1//! # Tracing
2//!
3//! The `trace` feature enables a number of callbacks that can be used to track the
4//! lifecycle of tasks and/or executors.
5//!
6//! Callbacks will have one or both of the following IDs passed to them:
7//!
8//! 1. A `task_id`, a `u32` value unique to a task for the duration of the time it is valid
9//! 2. An `executor_id`, a `u32` value unique to an executor for the duration of the time it is
10//! valid
11//!
12//! Today, both `task_id` and `executor_id` are u32s containing the least significant 32 bits of
13//! the address of the task or executor, however this is NOT a stable guarantee, and MAY change
14//! at any time.
15//!
16//! IDs are only guaranteed to be unique for the duration of time the item is valid. If a task
17//! ends, and is re-spawned, it MAY or MAY NOT have the same ID. For tasks, this valid time is defined
18//! as the time between `_embassy_trace_task_new` and `_embassy_trace_task_end` for a given task.
19//! For executors, this time is not defined, but is often "forever" for practical embedded
20//! programs.
21//!
22//! Callbacks can be used by enabling the `trace` feature, and providing implementations of the
23//! `extern "Rust"` functions below. All callbacks must be implemented.
24//!
25//! ## Task Tracing lifecycle
26//!
27//! ```text
28//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
29//! │(1) │
30//! │ │
31//! ╔════▼════╗ (2) ┌─────────┐ (3) ┌─────────┐ │
32//! │ ║ SPAWNED ║────▶│ WAITING │────▶│ RUNNING │
33//! ╚═════════╝ └─────────┘ └─────────┘ │
34//! │ ▲ ▲ │ │ │
35//! │ (4) │ │(6) │
36//! │ │(7) └ ─ ─ ┘ │ │
37//! │ │ │ │
38//! │ ┌──────┐ (5) │ │ ┌─────┐
39//! │ IDLE │◀────────────────┘ └─▶│ END │ │
40//! │ └──────┘ └─────┘
41//! ┌──────────────────────┐ │
42//! └ ┤ Task Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
43//! └──────────────────────┘
44//! ```
45//!
46//! 1. A task is spawned, `_embassy_trace_task_new` is called
47//! 2. A task is enqueued for the first time, `_embassy_trace_task_ready_begin` is called
48//! 3. A task is polled, `_embassy_trace_task_exec_begin` is called
49//! 4. WHILE a task is polled, the task is re-awoken, and `_embassy_trace_task_ready_begin` is
50//! called. The task does not IMMEDIATELY move state, until polling is complete and the
51//! RUNNING state is existed. `_embassy_trace_task_exec_end` is called when polling is
52//! complete, marking the transition to WAITING
53//! 5. Polling is complete, `_embassy_trace_task_exec_end` is called
54//! 6. The task has completed, and `_embassy_trace_task_end` is called
55//! 7. A task is awoken, `_embassy_trace_task_ready_begin` is called
56//!
57//! ## Executor Tracing lifecycle
58//!
59//! ```text
60//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
61//! │(1) │
62//! │ │
63//! ╔═══▼══╗ (2) ┌────────────┐ (3) ┌─────────┐ │
64//! │ ║ IDLE ║──────────▶│ SCHEDULING │──────▶│ POLLING │
65//! ╚══════╝ └────────────┘ └─────────┘ │
66//! │ ▲ │ ▲ │
67//! │ (5) │ │ (4) │ │
68//! │ └──────────────┘ └────────────┘
69//! ┌──────────────────────────┐ │
70//! └ ┤ Executor Trace Lifecycle │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71//! └──────────────────────────┘
72//! ```
73//!
74//! 1. The executor is started (no associated trace)
75//! 2. A task on this executor is awoken. `_embassy_trace_task_ready_begin` is called
76//! when this occurs, and `_embassy_trace_poll_start` is called when the executor
77//! actually begins running
78//! 3. The executor has decided a task to poll. `_embassy_trace_task_exec_begin` is called
79//! 4. The executor finishes polling the task. `_embassy_trace_task_exec_end` is called
80//! 5. The executor has finished polling tasks. `_embassy_trace_executor_idle` is called
81
82#![allow(unused)]
83
84use core::cell::UnsafeCell;
85use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
86
87#[cfg(feature = "rtos-trace")]
88use rtos_trace::TaskInfo;
89
90use crate::raw::{SyncExecutor, TaskHeader, TaskRef};
91use crate::spawner::{SpawnError, SpawnToken, Spawner};
92
93/// Global task tracker instance
94///
95/// This static provides access to the global task tracker which maintains
96/// a list of all tasks in the system. It's automatically updated by the
97/// task lifecycle hooks in the trace module.
98#[cfg(feature = "rtos-trace")]
99pub(crate) static TASK_TRACKER: TaskTracker = TaskTracker::new();
100
101/// A thread-safe tracker for all tasks in the system
102///
103/// This struct uses an intrusive linked list approach to track all tasks
104/// without additional memory allocations. It maintains a global list of
105/// tasks that can be traversed to find all currently existing tasks.
106#[cfg(feature = "rtos-trace")]
107pub(crate) struct TaskTracker {
108 head: AtomicPtr<TaskHeader>,
109}
110
111#[cfg(feature = "rtos-trace")]
112impl TaskTracker {
113 /// Creates a new empty task tracker
114 ///
115 /// Initializes a tracker with no tasks in its list.
116 pub const fn new() -> Self {
117 Self {
118 head: AtomicPtr::new(core::ptr::null_mut()),
119 }
120 }
121
122 /// Adds a task to the tracker
123 ///
124 /// This method inserts a task at the head of the intrusive linked list.
125 /// The operation is thread-safe and lock-free, using atomic operations
126 /// to ensure consistency even when called from different contexts.
127 ///
128 /// # Arguments
129 /// * `task` - The task reference to add to the tracker
130 pub fn add(&self, task: TaskRef) {
131 let task_ptr = task.as_ptr();
132
133 loop {
134 let current_head = self.head.load(Ordering::Acquire);
135 unsafe {
136 (*task_ptr).all_tasks_next.store(current_head, Ordering::Relaxed);
137 }
138
139 if self
140 .head
141 .compare_exchange(current_head, task_ptr.cast_mut(), Ordering::Release, Ordering::Relaxed)
142 .is_ok()
143 {
144 break;
145 }
146 }
147 }
148
149 /// Performs an operation on each task in the tracker
150 ///
151 /// This method traverses the entire list of tasks and calls the provided
152 /// function for each task. This allows inspecting or processing all tasks
153 /// in the system without modifying the tracker's structure.
154 ///
155 /// # Arguments
156 /// * `f` - A function to call for each task in the tracker
157 pub fn for_each<F>(&self, mut f: F)
158 where
159 F: FnMut(TaskRef),
160 {
161 let mut current = self.head.load(Ordering::Acquire);
162 while !current.is_null() {
163 let task = unsafe { TaskRef::from_ptr(current) };
164 f(task);
165
166 current = unsafe { (*current).all_tasks_next.load(Ordering::Acquire) };
167 }
168 }
169}
170
171#[cfg(feature = "trace")]
172unsafe extern "Rust" {
173 /// This callback is called when the executor begins polling. This will always
174 /// be paired with a later call to `_embassy_trace_executor_idle`.
175 ///
176 /// This marks the EXECUTOR state transition from IDLE -> SCHEDULING.
177 fn _embassy_trace_poll_start(executor_id: u32);
178
179 /// This callback is called AFTER a task is initialized/allocated, and BEFORE
180 /// it is enqueued to run for the first time. If the task ends (and does not
181 /// loop "forever"), there will be a matching call to `_embassy_trace_task_end`.
182 ///
183 /// Tasks start life in the SPAWNED state.
184 fn _embassy_trace_task_new(executor_id: u32, task_id: u32);
185
186 /// This callback is called AFTER a task is destructed/freed. This will always
187 /// have a prior matching call to `_embassy_trace_task_new`.
188 fn _embassy_trace_task_end(executor_id: u32, task_id: u32);
189
190 /// This callback is called AFTER a task has been dequeued from the runqueue,
191 /// and BEFORE the task is polled. There will always be a matching call to
192 /// `_embassy_trace_task_exec_end`.
193 ///
194 /// This marks the TASK state transition from WAITING -> RUNNING
195 /// This marks the EXECUTOR state transition from SCHEDULING -> POLLING
196 fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32);
197
198 /// This callback is called AFTER a task has completed polling. There will
199 /// always be a matching call to `_embassy_trace_task_exec_begin`.
200 ///
201 /// This marks the TASK state transition from either:
202 /// * RUNNING -> IDLE - if there were no `_embassy_trace_task_ready_begin` events
203 /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task
204 /// * RUNNING -> WAITING - if there WAS a `_embassy_trace_task_ready_begin` event
205 /// for this task since the last `_embassy_trace_task_exec_begin` for THIS task
206 ///
207 /// This marks the EXECUTOR state transition from POLLING -> SCHEDULING
208 fn _embassy_trace_task_exec_end(excutor_id: u32, task_id: u32);
209
210 /// This callback is called AFTER the waker for a task is awoken, and BEFORE it
211 /// is added to the run queue.
212 ///
213 /// If the given task is currently RUNNING, this marks no state change, BUT the
214 /// RUNNING task will then move to the WAITING stage when polling is complete.
215 ///
216 /// If the given task is currently IDLE, this marks the TASK state transition
217 /// from IDLE -> WAITING.
218 ///
219 /// NOTE: This may be called from an interrupt, outside the context of the current
220 /// task or executor.
221 fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32);
222
223 /// This callback is called AFTER all dequeued tasks in a single call to poll
224 /// have been processed. This will always be paired with a call to
225 /// `_embassy_trace_executor_idle`.
226 ///
227 /// This marks the EXECUTOR state transition from SCHEDULING -> IDLE
228 fn _embassy_trace_executor_idle(executor_id: u32);
229}
230
231#[inline]
232pub(crate) fn poll_start(executor: &SyncExecutor) {
233 #[cfg(feature = "trace")]
234 unsafe {
235 _embassy_trace_poll_start(executor as *const _ as u32)
236 }
237}
238
239#[inline]
240pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) {
241 #[cfg(feature = "trace")]
242 unsafe {
243 _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32)
244 }
245
246 #[cfg(feature = "rtos-trace")]
247 {
248 rtos_trace::trace::task_new(task.as_ptr() as u32);
249 let name = task.metadata().name().unwrap_or("unnamed task\0");
250 let info = rtos_trace::TaskInfo {
251 name,
252 priority: 0,
253 stack_base: 0,
254 stack_size: 0,
255 };
256 rtos_trace::trace::task_send_info(task.id(), info);
257 }
258
259 #[cfg(feature = "rtos-trace")]
260 TASK_TRACKER.add(*task);
261}
262
263#[inline]
264pub(crate) fn task_end(executor: *const SyncExecutor, task: &TaskRef) {
265 #[cfg(feature = "trace")]
266 unsafe {
267 _embassy_trace_task_end(executor as u32, task.as_ptr() as u32)
268 }
269}
270
271#[inline]
272pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) {
273 #[cfg(feature = "trace")]
274 unsafe {
275 _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32)
276 }
277 #[cfg(feature = "rtos-trace")]
278 rtos_trace::trace::task_ready_begin(task.as_ptr() as u32);
279}
280
281#[inline]
282pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) {
283 #[cfg(feature = "trace")]
284 unsafe {
285 _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32)
286 }
287 #[cfg(feature = "rtos-trace")]
288 rtos_trace::trace::task_exec_begin(task.as_ptr() as u32);
289}
290
291#[inline]
292pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) {
293 #[cfg(feature = "trace")]
294 unsafe {
295 _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32)
296 }
297 #[cfg(feature = "rtos-trace")]
298 rtos_trace::trace::task_exec_end();
299}
300
301#[inline]
302pub(crate) fn executor_idle(executor: &SyncExecutor) {
303 #[cfg(feature = "trace")]
304 unsafe {
305 _embassy_trace_executor_idle(executor as *const _ as u32)
306 }
307 #[cfg(feature = "rtos-trace")]
308 rtos_trace::trace::system_idle();
309}
310
311/// Returns an iterator over all active tasks in the system
312///
313/// This function provides a convenient way to iterate over all tasks
314/// that are currently tracked in the system. The returned iterator
315/// yields each task in the global task tracker.
316///
317/// # Returns
318/// An iterator that yields `TaskRef` items for each task
319#[cfg(feature = "rtos-trace")]
320fn get_all_active_tasks() -> impl Iterator<Item = TaskRef> + 'static {
321 struct TaskIterator<'a> {
322 tracker: &'a TaskTracker,
323 current: *mut TaskHeader,
324 }
325
326 impl<'a> Iterator for TaskIterator<'a> {
327 type Item = TaskRef;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 if self.current.is_null() {
331 return None;
332 }
333
334 let task = unsafe { TaskRef::from_ptr(self.current) };
335 self.current = unsafe { (*self.current).all_tasks_next.load(Ordering::Acquire) };
336
337 Some(task)
338 }
339 }
340
341 TaskIterator {
342 tracker: &TASK_TRACKER,
343 current: TASK_TRACKER.head.load(Ordering::Acquire),
344 }
345}
346
347/// Perform an action on each active task
348#[cfg(feature = "rtos-trace")]
349fn with_all_active_tasks<F>(f: F)
350where
351 F: FnMut(TaskRef),
352{
353 TASK_TRACKER.for_each(f);
354}
355
356#[cfg(feature = "rtos-trace")]
357impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor {
358 fn task_list() {
359 with_all_active_tasks(|task| {
360 let info = rtos_trace::TaskInfo {
361 name: task.metadata().name().unwrap_or("unnamed task\0"),
362 priority: 0,
363 stack_base: 0,
364 stack_size: 0,
365 };
366 rtos_trace::trace::task_send_info(task.id(), info);
367 });
368 }
369 fn time() -> u64 {
370 const fn gcd(a: u64, b: u64) -> u64 {
371 if b == 0 { a } else { gcd(b, a % b) }
372 }
373
374 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
375 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
376 }
377}
378
379#[cfg(feature = "rtos-trace")]
380rtos_trace::global_os_callbacks! {SyncExecutor}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 8d3910a25..2706f0fdf 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -1,6 +1,6 @@
1use core::task::{RawWaker, RawWakerVTable, Waker}; 1use core::task::{RawWaker, RawWakerVTable, Waker};
2 2
3use super::{wake_task, TaskHeader, TaskRef}; 3use super::{TaskHeader, TaskRef, wake_task};
4 4
5static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 5static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
6 6
@@ -26,38 +26,19 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
26/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps 26/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
27/// avoid dynamic dispatch. 27/// avoid dynamic dispatch.
28/// 28///
29/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). 29/// You can use the returned task pointer to wake the task with [`wake_task`].
30/// 30///
31/// # Panics 31/// # Panics
32/// 32///
33/// Panics if the waker is not created by the Embassy executor. 33/// Panics if the waker is not created by the Embassy executor.
34pub fn task_from_waker(waker: &Waker) -> TaskRef { 34pub fn task_from_waker(waker: &Waker) -> TaskRef {
35 let (vtable, data) = { 35 // make sure to compare vtable addresses. Doing `==` on the references
36 #[cfg(not(feature = "nightly"))] 36 // will compare the contents, which is slower.
37 { 37 if waker.vtable() as *const _ != &VTABLE as *const _ {
38 struct WakerHack { 38 panic!(
39 data: *const (), 39 "Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor."
40 vtable: &'static RawWakerVTable, 40 )
41 }
42
43 // safety: OK because WakerHack has the same layout as Waker.
44 // This is not really guaranteed because the structs are `repr(Rust)`, it is
45 // indeed the case in the current implementation.
46 // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
47 let hack: &WakerHack = unsafe { core::mem::transmute(waker) };
48 (hack.vtable, hack.data)
49 }
50
51 #[cfg(feature = "nightly")]
52 {
53 let raw_waker = waker.as_raw();
54 (raw_waker.vtable(), raw_waker.data())
55 }
56 };
57
58 if vtable != &VTABLE {
59 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
60 } 41 }
61 // safety: our wakers are always created with `TaskRef::as_ptr` 42 // safety: our wakers are always created with `TaskRef::as_ptr`
62 unsafe { TaskRef::from_ptr(data as *const TaskHeader) } 43 unsafe { TaskRef::from_ptr(waker.data() as *const TaskHeader) }
63} 44}
diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs
index 435a0ff7e..919bcc61a 100644
--- a/embassy-executor/src/raw/waker_turbo.rs
+++ b/embassy-executor/src/raw/waker_turbo.rs
@@ -1,7 +1,7 @@
1use core::ptr::NonNull; 1use core::ptr::NonNull;
2use core::task::Waker; 2use core::task::Waker;
3 3
4use super::{wake_task, TaskHeader, TaskRef}; 4use super::{TaskHeader, TaskRef, wake_task};
5 5
6pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { 6pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
7 Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _)) 7 Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _))
@@ -26,7 +26,7 @@ pub fn task_from_waker(waker: &Waker) -> TaskRef {
26} 26}
27 27
28#[inline(never)] 28#[inline(never)]
29#[no_mangle] 29#[unsafe(no_mangle)]
30fn _turbo_wake(ptr: NonNull<()>) { 30fn _turbo_wake(ptr: NonNull<()>) {
31 // safety: our wakers are always created with `TaskRef::as_ptr` 31 // safety: our wakers are always created with `TaskRef::as_ptr`
32 let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) }; 32 let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) };
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 271606244..b73a1e7c6 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -1,9 +1,11 @@
1use core::future::poll_fn; 1use core::future::{Future, poll_fn};
2use core::marker::PhantomData; 2use core::marker::PhantomData;
3use core::mem; 3use core::mem;
4use core::sync::atomic::Ordering;
4use core::task::Poll; 5use core::task::Poll;
5 6
6use super::raw; 7use super::raw;
8use crate::Metadata;
7 9
8/// Token to spawn a newly-created task in an executor. 10/// Token to spawn a newly-created task in an executor.
9/// 11///
@@ -21,24 +23,28 @@ use super::raw;
21/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. 23/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
22#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] 24#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
23pub struct SpawnToken<S> { 25pub struct SpawnToken<S> {
24 raw_task: Option<raw::TaskRef>, 26 pub(crate) raw_task: raw::TaskRef,
25 phantom: PhantomData<*mut S>, 27 phantom: PhantomData<*mut S>,
26} 28}
27 29
28impl<S> SpawnToken<S> { 30impl<S> SpawnToken<S> {
29 pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self { 31 pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self {
30 Self { 32 Self {
31 raw_task: Some(raw_task), 33 raw_task,
32 phantom: PhantomData, 34 phantom: PhantomData,
33 } 35 }
34 } 36 }
35 37
36 /// Return a SpawnToken that represents a failed spawn. 38 /// Returns the task ID.
37 pub fn new_failed() -> Self { 39 /// This can be used in combination with rtos-trace to match task names with IDs
38 Self { 40 pub fn id(&self) -> u32 {
39 raw_task: None, 41 self.raw_task.id()
40 phantom: PhantomData, 42 }
41 } 43
44 /// Get the metadata for this task. You can use this to set metadata fields
45 /// prior to spawning it.
46 pub fn metadata(&self) -> &Metadata {
47 self.raw_task.metadata()
42 } 48 }
43} 49}
44 50
@@ -50,8 +56,7 @@ impl<S> Drop for SpawnToken<S> {
50} 56}
51 57
52/// Error returned when spawning a task. 58/// Error returned when spawning a task.
53#[derive(Copy, Clone, Debug)] 59#[derive(Copy, Clone)]
54#[cfg_attr(feature = "defmt", derive(defmt::Format))]
55pub enum SpawnError { 60pub enum SpawnError {
56 /// Too many instances of this task are already running. 61 /// Too many instances of this task are already running.
57 /// 62 ///
@@ -61,6 +66,37 @@ pub enum SpawnError {
61 Busy, 66 Busy,
62} 67}
63 68
69impl core::fmt::Debug for SpawnError {
70 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
71 core::fmt::Display::fmt(self, f)
72 }
73}
74
75impl core::fmt::Display for SpawnError {
76 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77 match self {
78 SpawnError::Busy => write!(
79 f,
80 "Busy - Too many instances of this task are already running. Check the `pool_size` attribute of the task."
81 ),
82 }
83 }
84}
85
86#[cfg(feature = "defmt")]
87impl defmt::Format for SpawnError {
88 fn format(&self, f: defmt::Formatter) {
89 match self {
90 SpawnError::Busy => defmt::write!(
91 f,
92 "Busy - Too many instances of this task are already running. Check the `pool_size` attribute of the task."
93 ),
94 }
95 }
96}
97
98impl core::error::Error for SpawnError {}
99
64/// Handle to spawn tasks into an executor. 100/// Handle to spawn tasks into an executor.
65/// 101///
66/// This Spawner can spawn any task (Send and non-Send ones), but it can 102/// This Spawner can spawn any task (Send and non-Send ones), but it can
@@ -69,7 +105,7 @@ pub enum SpawnError {
69/// If you want to spawn tasks from another thread, use [SendSpawner]. 105/// If you want to spawn tasks from another thread, use [SendSpawner].
70#[derive(Copy, Clone)] 106#[derive(Copy, Clone)]
71pub struct Spawner { 107pub struct Spawner {
72 executor: &'static raw::Executor, 108 pub(crate) executor: &'static raw::Executor,
73 not_send: PhantomData<*mut ()>, 109 not_send: PhantomData<*mut ()>,
74} 110}
75 111
@@ -86,46 +122,47 @@ impl Spawner {
86 /// This function is `async` just to get access to the current async 122 /// This function is `async` just to get access to the current async
87 /// context. It returns instantly, it does not block/yield. 123 /// context. It returns instantly, it does not block/yield.
88 /// 124 ///
125 /// Using this method is discouraged due to it being unsafe. Consider the following
126 /// alternatives instead:
127 ///
128 /// - Pass the initial `Spawner` as an argument to tasks. Note that it's `Copy`, so you can
129 /// make as many copies of it as you want.
130 /// - Use `SendSpawner::for_current_executor()` instead, which is safe but can only be used
131 /// if task arguments are `Send`.
132 ///
133 /// The only case where using this method is absolutely required is obtaining the `Spawner`
134 /// for an `InterruptExecutor`.
135 ///
136 /// # Safety
137 ///
138 /// You must only execute this with an async `Context` created by the Embassy executor.
139 /// You must not execute it with manually-created `Context`s.
140 ///
89 /// # Panics 141 /// # Panics
90 /// 142 ///
91 /// Panics if the current executor is not an Embassy executor. 143 /// Panics if the current executor is not an Embassy executor.
92 pub async fn for_current_executor() -> Self { 144 pub unsafe fn for_current_executor() -> impl Future<Output = Self> {
93 poll_fn(|cx| { 145 poll_fn(|cx| {
94 let task = raw::task_from_waker(cx.waker()); 146 let task = raw::task_from_waker(cx.waker());
95 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 147 let executor = unsafe {
148 task.header()
149 .executor
150 .load(Ordering::Relaxed)
151 .as_ref()
152 .unwrap_unchecked()
153 };
96 let executor = unsafe { raw::Executor::wrap(executor) }; 154 let executor = unsafe { raw::Executor::wrap(executor) };
97 Poll::Ready(Self::new(executor)) 155 Poll::Ready(Self::new(executor))
98 }) 156 })
99 .await
100 } 157 }
101 158
102 /// Spawn a task into an executor. 159 /// Spawn a task into an executor.
103 /// 160 ///
104 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). 161 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
105 pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { 162 pub fn spawn<S>(&self, token: SpawnToken<S>) {
106 let task = token.raw_task; 163 let task = token.raw_task;
107 mem::forget(token); 164 mem::forget(token);
108 165 unsafe { self.executor.spawn(task) }
109 match task {
110 Some(task) => {
111 unsafe { self.executor.spawn(task) };
112 Ok(())
113 }
114 None => Err(SpawnError::Busy),
115 }
116 }
117
118 // Used by the `embassy_executor_macros::main!` macro to throw an error when spawn
119 // fails. This is here to allow conditional use of `defmt::unwrap!`
120 // without introducing a `defmt` feature in the `embassy_executor_macros` package,
121 // which would require use of `-Z namespaced-features`.
122 /// Spawn a task into an executor, panicking on failure.
123 ///
124 /// # Panics
125 ///
126 /// Panics if the spawning fails.
127 pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
128 unwrap!(self.spawn(token));
129 } 166 }
130 167
131 /// Convert this Spawner to a SendSpawner. This allows you to send the 168 /// Convert this Spawner to a SendSpawner. This allows you to send the
@@ -134,6 +171,11 @@ impl Spawner {
134 pub fn make_send(&self) -> SendSpawner { 171 pub fn make_send(&self) -> SendSpawner {
135 SendSpawner::new(&self.executor.inner) 172 SendSpawner::new(&self.executor.inner)
136 } 173 }
174
175 /// Return the unique ID of this Spawner's Executor.
176 pub fn executor_id(&self) -> usize {
177 self.executor.id()
178 }
137} 179}
138 180
139/// Handle to spawn tasks into an executor from any thread. 181/// Handle to spawn tasks into an executor from any thread.
@@ -161,37 +203,26 @@ impl SendSpawner {
161 /// # Panics 203 /// # Panics
162 /// 204 ///
163 /// Panics if the current executor is not an Embassy executor. 205 /// Panics if the current executor is not an Embassy executor.
164 pub async fn for_current_executor() -> Self { 206 pub fn for_current_executor() -> impl Future<Output = Self> {
165 poll_fn(|cx| { 207 poll_fn(|cx| {
166 let task = raw::task_from_waker(cx.waker()); 208 let task = raw::task_from_waker(cx.waker());
167 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 209 let executor = unsafe {
210 task.header()
211 .executor
212 .load(Ordering::Relaxed)
213 .as_ref()
214 .unwrap_unchecked()
215 };
168 Poll::Ready(Self::new(executor)) 216 Poll::Ready(Self::new(executor))
169 }) 217 })
170 .await
171 } 218 }
172 219
173 /// Spawn a task into an executor. 220 /// Spawn a task into an executor.
174 /// 221 ///
175 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`). 222 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
176 pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { 223 pub fn spawn<S: Send>(&self, token: SpawnToken<S>) {
177 let header = token.raw_task; 224 let header = token.raw_task;
178 mem::forget(token); 225 mem::forget(token);
179 226 unsafe { self.executor.spawn(header) }
180 match header {
181 Some(header) => {
182 unsafe { self.executor.spawn(header) };
183 Ok(())
184 }
185 None => Err(SpawnError::Busy),
186 }
187 }
188
189 /// Spawn a task into an executor, panicking on failure.
190 ///
191 /// # Panics
192 ///
193 /// Panics if the spawning fails.
194 pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
195 unwrap!(self.spawn(token));
196 } 227 }
197} 228}