aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
committerQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
commit6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch)
tree748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-executor/src
parentd896f80405aa8963877049ed999e4aba25d6e2bb (diff)
parent6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff)
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/cortex_m.rs253
-rw-r--r--embassy-executor/src/arch/riscv32.rs135
-rw-r--r--embassy-executor/src/arch/std.rs150
-rw-r--r--embassy-executor/src/arch/wasm.rs134
-rw-r--r--embassy-executor/src/arch/xtensa.rs126
-rw-r--r--embassy-executor/src/lib.rs77
-rw-r--r--embassy-executor/src/raw/mod.rs483
-rw-r--r--embassy-executor/src/raw/run_queue.rs44
-rw-r--r--embassy-executor/src/raw/timer_queue.rs33
-rw-r--r--embassy-executor/src/raw/util.rs29
-rw-r--r--embassy-executor/src/raw/waker.rs13
-rw-r--r--embassy-executor/src/raw/waker_turbo.rs34
-rw-r--r--embassy-executor/src/spawner.rs32
13 files changed, 985 insertions, 558 deletions
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs
index 4b27a264e..94c8134d6 100644
--- a/embassy-executor/src/arch/cortex_m.rs
+++ b/embassy-executor/src/arch/cortex_m.rs
@@ -1,59 +1,224 @@
1use core::arch::asm; 1#[cfg(feature = "executor-thread")]
2use core::marker::PhantomData; 2pub use thread::*;
3use core::ptr; 3#[cfg(feature = "executor-thread")]
4 4mod thread {
5use super::{raw, Spawner}; 5 use core::arch::asm;
6 6 use core::marker::PhantomData;
7/// Thread mode executor, using WFE/SEV. 7
8/// 8 #[cfg(feature = "nightly")]
9/// This is the simplest and most common kind of executor. It runs on 9 pub use embassy_macros::main_cortex_m as main;
10/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction 10
11/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction 11 use crate::raw::{Pender, PenderInner};
12/// is executed, to make the `WFE` exit from sleep and poll the task. 12 use crate::{raw, Spawner};
13/// 13
14/// This executor allows for ultra low power consumption for chips where `WFE` 14 #[derive(Copy, Clone)]
15/// triggers low-power sleep without extra steps. If your chip requires extra steps, 15 pub(crate) struct ThreadPender;
16/// you may use [`raw::Executor`] directly to program custom behavior. 16
17pub struct Executor { 17 impl ThreadPender {
18 inner: raw::Executor, 18 pub(crate) fn pend(self) {
19 not_send: PhantomData<*mut ()>, 19 unsafe { core::arch::asm!("sev") }
20 }
21 }
22
23 /// Thread mode executor, using WFE/SEV.
24 ///
25 /// This is the simplest and most common kind of executor. It runs on
26 /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
27 /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
28 /// is executed, to make the `WFE` exit from sleep and poll the task.
29 ///
30 /// This executor allows for ultra low power consumption for chips where `WFE`
31 /// triggers low-power sleep without extra steps. If your chip requires extra steps,
32 /// you may use [`raw::Executor`] directly to program custom behavior.
33 pub struct Executor {
34 inner: raw::Executor,
35 not_send: PhantomData<*mut ()>,
36 }
37
38 impl Executor {
39 /// Create a new Executor.
40 pub fn new() -> Self {
41 Self {
42 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))),
43 not_send: PhantomData,
44 }
45 }
46
47 /// Run the executor.
48 ///
49 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
50 /// this executor. Use it to spawn the initial task(s). After `init` returns,
51 /// the executor starts running the tasks.
52 ///
53 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
54 /// for example by passing it as an argument to the initial tasks.
55 ///
56 /// This function requires `&'static mut self`. This means you have to store the
57 /// Executor instance in a place where it'll live forever and grants you mutable
58 /// access. There's a few ways to do this:
59 ///
60 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
61 /// - a `static mut` (unsafe)
62 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
63 ///
64 /// This function never returns.
65 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
66 init(self.inner.spawner());
67
68 loop {
69 unsafe {
70 self.inner.poll();
71 asm!("wfe");
72 };
73 }
74 }
75 }
20} 76}
21 77
22impl Executor { 78#[cfg(feature = "executor-interrupt")]
23 /// Create a new Executor. 79pub use interrupt::*;
24 pub fn new() -> Self { 80#[cfg(feature = "executor-interrupt")]
25 Self { 81mod interrupt {
26 inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()), 82 use core::cell::UnsafeCell;
27 not_send: PhantomData, 83 use core::mem::MaybeUninit;
84
85 use atomic_polyfill::{AtomicBool, Ordering};
86 use cortex_m::interrupt::InterruptNumber;
87 use cortex_m::peripheral::NVIC;
88
89 use crate::raw::{self, Pender, PenderInner};
90
91 #[derive(Clone, Copy)]
92 pub(crate) struct InterruptPender(u16);
93
94 impl InterruptPender {
95 pub(crate) fn pend(self) {
96 // STIR is faster, but is only available in v7 and higher.
97 #[cfg(not(armv6m))]
98 {
99 let mut nvic: cortex_m::peripheral::NVIC = unsafe { core::mem::transmute(()) };
100 nvic.request(self);
101 }
102
103 #[cfg(armv6m)]
104 cortex_m::peripheral::NVIC::pend(self);
28 } 105 }
29 } 106 }
30 107
31 /// Run the executor. 108 unsafe impl cortex_m::interrupt::InterruptNumber for InterruptPender {
109 fn number(self) -> u16 {
110 self.0
111 }
112 }
113
114 /// Interrupt mode executor.
32 /// 115 ///
33 /// The `init` closure is called with a [`Spawner`] that spawns tasks on 116 /// This executor runs tasks in interrupt mode. The interrupt handler is set up
34 /// this executor. Use it to spawn the initial task(s). After `init` returns, 117 /// to poll tasks, and when a task is woken the interrupt is pended from software.
35 /// the executor starts running the tasks.
36 /// 118 ///
37 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), 119 /// This allows running async tasks at a priority higher than thread mode. One
38 /// for example by passing it as an argument to the initial tasks. 120 /// use case is to leave thread mode free for non-async tasks. Another use case is
121 /// to run multiple executors: one in thread mode for low priority tasks and another in
122 /// interrupt mode for higher priority tasks. Higher priority tasks will preempt lower
123 /// priority ones.
39 /// 124 ///
40 /// This function requires `&'static mut self`. This means you have to store the 125 /// It is even possible to run multiple interrupt mode executors at different priorities,
41 /// Executor instance in a place where it'll live forever and grants you mutable 126 /// by assigning different priorities to the interrupts. For an example on how to do this,
42 /// access. There's a few ways to do this: 127 /// See the 'multiprio' example for 'embassy-nrf'.
43 /// 128 ///
44 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) 129 /// To use it, you have to pick an interrupt that won't be used by the hardware.
45 /// - a `static mut` (unsafe) 130 /// Some chips reserve some interrupts for this purpose, sometimes named "software interrupts" (SWI).
46 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) 131 /// If this is not the case, you may use an interrupt from any unused peripheral.
47 /// 132 ///
48 /// This function never returns. 133 /// It is somewhat more complex to use, it's recommended to use the thread-mode
49 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 134 /// [`Executor`] instead, if it works for your use case.
50 init(self.inner.spawner()); 135 pub struct InterruptExecutor {
136 started: AtomicBool,
137 executor: UnsafeCell<MaybeUninit<raw::Executor>>,
138 }
139
140 unsafe impl Send for InterruptExecutor {}
141 unsafe impl Sync for InterruptExecutor {}
142
143 impl InterruptExecutor {
144 /// Create a new, not started `InterruptExecutor`.
145 #[inline]
146 pub const fn new() -> Self {
147 Self {
148 started: AtomicBool::new(false),
149 executor: UnsafeCell::new(MaybeUninit::uninit()),
150 }
151 }
152
153 /// Executor interrupt callback.
154 ///
155 /// # Safety
156 ///
157 /// You MUST call this from the interrupt handler, and from nowhere else.
158 pub unsafe fn on_interrupt(&'static self) {
159 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
160 executor.poll();
161 }
162
163 /// Start the executor.
164 ///
165 /// This initializes the executor, enables the interrupt, and returns.
166 /// The executor keeps running in the background through the interrupt.
167 ///
168 /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`]
169 /// is returned instead of a [`Spawner`](embassy_executor::Spawner) because the executor effectively runs in a
170 /// different "thread" (the interrupt), so spawning tasks on it is effectively
171 /// sending them.
172 ///
173 /// To obtain a [`Spawner`](embassy_executor::Spawner) for this executor, use [`Spawner::for_current_executor()`](embassy_executor::Spawner::for_current_executor()) from
174 /// a task running in it.
175 ///
176 /// # Interrupt requirements
177 ///
178 /// You must write the interrupt handler yourself, and make it call [`on_interrupt()`](Self::on_interrupt).
179 ///
180 /// This method already enables (unmasks) the interrupt, you must NOT do it yourself.
181 ///
182 /// You must set the interrupt priority before calling this method. You MUST NOT
183 /// do it after.
184 ///
185 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner {
186 if self
187 .started
188 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
189 .is_err()
190 {
191 panic!("InterruptExecutor::start() called multiple times on the same executor.");
192 }
51 193
52 loop {
53 unsafe { 194 unsafe {
54 self.inner.poll(); 195 (&mut *self.executor.get())
55 asm!("wfe"); 196 .as_mut_ptr()
56 }; 197 .write(raw::Executor::new(Pender(PenderInner::Interrupt(InterruptPender(
198 irq.number(),
199 )))))
200 }
201
202 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
203
204 unsafe { NVIC::unmask(irq) }
205
206 executor.spawner().make_send()
207 }
208
209 /// Get a SendSpawner for this executor
210 ///
211 /// This returns a [`SendSpawner`] you can use to spawn tasks on this
212 /// executor.
213 ///
214 /// This MUST only be called on an executor that has already been spawned.
215 /// The function will panic otherwise.
216 pub fn spawner(&'static self) -> crate::SendSpawner {
217 if !self.started.load(Ordering::Acquire) {
218 panic!("InterruptExecutor::spawner() called on uninitialized executor.");
219 }
220 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
221 executor.spawner().make_send()
57 } 222 }
58 } 223 }
59} 224}
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs
index 2a4b006da..ff7ec1575 100644
--- a/embassy-executor/src/arch/riscv32.rs
+++ b/embassy-executor/src/arch/riscv32.rs
@@ -1,73 +1,86 @@
1use core::marker::PhantomData; 1#[cfg(feature = "executor-interrupt")]
2use core::ptr; 2compile_error!("`executor-interrupt` is not supported with `arch-riscv32`.");
3 3
4use atomic_polyfill::{AtomicBool, Ordering}; 4#[cfg(feature = "executor-thread")]
5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
8 use core::marker::PhantomData;
9 use core::sync::atomic::{AtomicBool, Ordering};
5 10
6use super::{raw, Spawner}; 11 #[cfg(feature = "nightly")]
12 pub use embassy_macros::main_riscv as main;
7 13
8/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV 14 use crate::raw::{Pender, PenderInner};
9/// 15 use crate::{raw, Spawner};
10static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
11 16
12/// RISCV32 Executor 17 #[derive(Copy, Clone)]
13pub struct Executor { 18 pub(crate) struct ThreadPender;
14 inner: raw::Executor,
15 not_send: PhantomData<*mut ()>,
16}
17 19
18impl Executor { 20 impl ThreadPender {
19 /// Create a new Executor. 21 #[allow(unused)]
20 pub fn new() -> Self { 22 pub(crate) fn pend(self) {
21 Self { 23 SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst);
22 // use Signal_Work_Thread_Mode as substitute for local interrupt register
23 inner: raw::Executor::new(
24 |_| {
25 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
26 },
27 ptr::null_mut(),
28 ),
29 not_send: PhantomData,
30 } 24 }
31 } 25 }
32 26
33 /// Run the executor. 27 /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
34 /// 28 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
35 /// The `init` closure is called with a [`Spawner`] that spawns tasks on 29
36 /// this executor. Use it to spawn the initial task(s). After `init` returns, 30 /// RISCV32 Executor
37 /// the executor starts running the tasks. 31 pub struct Executor {
38 /// 32 inner: raw::Executor,
39 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), 33 not_send: PhantomData<*mut ()>,
40 /// for example by passing it as an argument to the initial tasks. 34 }
41 /// 35
42 /// This function requires `&'static mut self`. This means you have to store the 36 impl Executor {
43 /// Executor instance in a place where it'll live forever and grants you mutable 37 /// Create a new Executor.
44 /// access. There's a few ways to do this: 38 pub fn new() -> Self {
45 /// 39 Self {
46 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) 40 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))),
47 /// - a `static mut` (unsafe) 41 not_send: PhantomData,
48 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) 42 }
49 /// 43 }
50 /// This function never returns. 44
51 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 45 /// Run the executor.
52 init(self.inner.spawner()); 46 ///
47 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
48 /// this executor. Use it to spawn the initial task(s). After `init` returns,
49 /// the executor starts running the tasks.
50 ///
51 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
52 /// for example by passing it as an argument to the initial tasks.
53 ///
54 /// This function requires `&'static mut self`. This means you have to store the
55 /// Executor instance in a place where it'll live forever and grants you mutable
56 /// access. There's a few ways to do this:
57 ///
58 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
59 /// - a `static mut` (unsafe)
60 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
61 ///
62 /// This function never returns.
63 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
64 init(self.inner.spawner());
53 65
54 loop { 66 loop {
55 unsafe { 67 unsafe {
56 self.inner.poll(); 68 self.inner.poll();
57 // we do not care about race conditions between the load and store operations, interrupts 69 // we do not care about race conditions between the load and store operations, interrupts
58 //will only set this value to true. 70 //will only set this value to true.
59 critical_section::with(|_| { 71 critical_section::with(|_| {
60 // if there is work to do, loop back to polling 72 // if there is work to do, loop back to polling
61 // TODO can we relax this? 73 // TODO can we relax this?
62 if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { 74 if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
63 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); 75 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
64 } 76 }
65 // if not, wait for interrupt 77 // if not, wait for interrupt
66 else { 78 else {
67 core::arch::asm!("wfi"); 79 core::arch::asm!("wfi");
68 } 80 }
69 }); 81 });
70 // if an interrupt occurred while waiting, it will be serviced here 82 // if an interrupt occurred while waiting, it will be serviced here
83 }
71 } 84 }
72 } 85 }
73 } 86 }
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs
index 701f0eb18..4e4a178f0 100644
--- a/embassy-executor/src/arch/std.rs
+++ b/embassy-executor/src/arch/std.rs
@@ -1,84 +1,100 @@
1use std::marker::PhantomData; 1#[cfg(feature = "executor-interrupt")]
2use std::sync::{Condvar, Mutex}; 2compile_error!("`executor-interrupt` is not supported with `arch-std`.");
3 3
4use super::{raw, Spawner}; 4#[cfg(feature = "executor-thread")]
5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
8 use std::marker::PhantomData;
9 use std::sync::{Condvar, Mutex};
5 10
6/// Single-threaded std-based executor. 11 #[cfg(feature = "nightly")]
7pub struct Executor { 12 pub use embassy_macros::main_std as main;
8 inner: raw::Executor, 13
9 not_send: PhantomData<*mut ()>, 14 use crate::raw::{Pender, PenderInner};
10 signaler: &'static Signaler, 15 use crate::{raw, Spawner};
11}
12 16
13impl Executor { 17 #[derive(Copy, Clone)]
14 /// Create a new Executor. 18 pub(crate) struct ThreadPender(&'static Signaler);
15 pub fn new() -> Self { 19
16 let signaler = &*Box::leak(Box::new(Signaler::new())); 20 impl ThreadPender {
17 Self { 21 #[allow(unused)]
18 inner: raw::Executor::new( 22 pub(crate) fn pend(self) {
19 |p| unsafe { 23 self.0.signal()
20 let s = &*(p as *const () as *const Signaler);
21 s.signal()
22 },
23 signaler as *const _ as _,
24 ),
25 not_send: PhantomData,
26 signaler,
27 } 24 }
28 } 25 }
29 26
30 /// Run the executor. 27 /// Single-threaded std-based executor.
31 /// 28 pub struct Executor {
32 /// The `init` closure is called with a [`Spawner`] that spawns tasks on 29 inner: raw::Executor,
33 /// this executor. Use it to spawn the initial task(s). After `init` returns, 30 not_send: PhantomData<*mut ()>,
34 /// the executor starts running the tasks. 31 signaler: &'static Signaler,
35 /// 32 }
36 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
37 /// for example by passing it as an argument to the initial tasks.
38 ///
39 /// This function requires `&'static mut self`. This means you have to store the
40 /// Executor instance in a place where it'll live forever and grants you mutable
41 /// access. There's a few ways to do this:
42 ///
43 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
44 /// - a `static mut` (unsafe)
45 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
46 ///
47 /// This function never returns.
48 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
49 init(self.inner.spawner());
50 33
51 loop { 34 impl Executor {
52 unsafe { self.inner.poll() }; 35 /// Create a new Executor.
53 self.signaler.wait() 36 pub fn new() -> Self {
37 let signaler = &*Box::leak(Box::new(Signaler::new()));
38 Self {
39 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(signaler)))),
40 not_send: PhantomData,
41 signaler,
42 }
54 } 43 }
55 }
56}
57 44
58struct Signaler { 45 /// Run the executor.
59 mutex: Mutex<bool>, 46 ///
60 condvar: Condvar, 47 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
61} 48 /// this executor. Use it to spawn the initial task(s). After `init` returns,
49 /// the executor starts running the tasks.
50 ///
51 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
52 /// for example by passing it as an argument to the initial tasks.
53 ///
54 /// This function requires `&'static mut self`. This means you have to store the
55 /// Executor instance in a place where it'll live forever and grants you mutable
56 /// access. There's a few ways to do this:
57 ///
58 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
59 /// - a `static mut` (unsafe)
60 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
61 ///
62 /// This function never returns.
63 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
64 init(self.inner.spawner());
62 65
63impl Signaler { 66 loop {
64 fn new() -> Self { 67 unsafe { self.inner.poll() };
65 Self { 68 self.signaler.wait()
66 mutex: Mutex::new(false), 69 }
67 condvar: Condvar::new(),
68 } 70 }
69 } 71 }
70 72
71 fn wait(&self) { 73 struct Signaler {
72 let mut signaled = self.mutex.lock().unwrap(); 74 mutex: Mutex<bool>,
73 while !*signaled { 75 condvar: Condvar,
74 signaled = self.condvar.wait(signaled).unwrap();
75 }
76 *signaled = false;
77 } 76 }
78 77
79 fn signal(&self) { 78 impl Signaler {
80 let mut signaled = self.mutex.lock().unwrap(); 79 fn new() -> Self {
81 *signaled = true; 80 Self {
82 self.condvar.notify_one(); 81 mutex: Mutex::new(false),
82 condvar: Condvar::new(),
83 }
84 }
85
86 fn wait(&self) {
87 let mut signaled = self.mutex.lock().unwrap();
88 while !*signaled {
89 signaled = self.condvar.wait(signaled).unwrap();
90 }
91 *signaled = false;
92 }
93
94 fn signal(&self) {
95 let mut signaled = self.mutex.lock().unwrap();
96 *signaled = true;
97 self.condvar.notify_one();
98 }
83 } 99 }
84} 100}
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs
index 98091cfbb..08ab16b99 100644
--- a/embassy-executor/src/arch/wasm.rs
+++ b/embassy-executor/src/arch/wasm.rs
@@ -1,74 +1,88 @@
1use core::marker::PhantomData; 1#[cfg(feature = "executor-interrupt")]
2compile_error!("`executor-interrupt` is not supported with `arch-wasm`.");
2 3
3use js_sys::Promise; 4#[cfg(feature = "executor-thread")]
4use wasm_bindgen::prelude::*; 5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
5 8
6use super::raw::util::UninitCell; 9 use core::marker::PhantomData;
7use super::raw::{self};
8use super::Spawner;
9 10
10/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. 11 #[cfg(feature = "nightly")]
11pub struct Executor { 12 pub use embassy_macros::main_wasm as main;
12 inner: raw::Executor, 13 use js_sys::Promise;
13 ctx: &'static WasmContext, 14 use wasm_bindgen::prelude::*;
14 not_send: PhantomData<*mut ()>,
15}
16 15
17pub(crate) struct WasmContext { 16 use crate::raw::util::UninitCell;
18 promise: Promise, 17 use crate::raw::{Pender, PenderInner};
19 closure: UninitCell<Closure<dyn FnMut(JsValue)>>, 18 use crate::{raw, Spawner};
20} 19
20 /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop.
21 pub struct Executor {
22 inner: raw::Executor,
23 ctx: &'static WasmContext,
24 not_send: PhantomData<*mut ()>,
25 }
26
27 pub(crate) struct WasmContext {
28 promise: Promise,
29 closure: UninitCell<Closure<dyn FnMut(JsValue)>>,
30 }
31
32 #[derive(Copy, Clone)]
33 pub(crate) struct ThreadPender(&'static WasmContext);
21 34
22impl WasmContext { 35 impl ThreadPender {
23 pub fn new() -> Self { 36 #[allow(unused)]
24 Self { 37 pub(crate) fn pend(self) {
25 promise: Promise::resolve(&JsValue::undefined()), 38 let _ = self.0.promise.then(unsafe { self.0.closure.as_mut() });
26 closure: UninitCell::uninit(),
27 } 39 }
28 } 40 }
29}
30 41
31impl Executor { 42 impl WasmContext {
32 /// Create a new Executor. 43 pub fn new() -> Self {
33 pub fn new() -> Self { 44 Self {
34 let ctx = &*Box::leak(Box::new(WasmContext::new())); 45 promise: Promise::resolve(&JsValue::undefined()),
35 let inner = raw::Executor::new( 46 closure: UninitCell::uninit(),
36 |p| unsafe { 47 }
37 let ctx = &*(p as *const () as *const WasmContext);
38 let _ = ctx.promise.then(ctx.closure.as_mut());
39 },
40 ctx as *const _ as _,
41 );
42 Self {
43 inner,
44 not_send: PhantomData,
45 ctx,
46 } 48 }
47 } 49 }
48 50
49 /// Run the executor. 51 impl Executor {
50 /// 52 /// Create a new Executor.
51 /// The `init` closure is called with a [`Spawner`] that spawns tasks on 53 pub fn new() -> Self {
52 /// this executor. Use it to spawn the initial task(s). After `init` returns, 54 let ctx = &*Box::leak(Box::new(WasmContext::new()));
53 /// the executor starts running the tasks. 55 Self {
54 /// 56 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(ctx)))),
55 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), 57 not_send: PhantomData,
56 /// for example by passing it as an argument to the initial tasks. 58 ctx,
57 /// 59 }
58 /// This function requires `&'static mut self`. This means you have to store the 60 }
59 /// Executor instance in a place where it'll live forever and grants you mutable 61
60 /// access. There's a few ways to do this: 62 /// Run the executor.
61 /// 63 ///
62 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) 64 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
63 /// - a `static mut` (unsafe) 65 /// this executor. Use it to spawn the initial task(s). After `init` returns,
64 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) 66 /// the executor starts running the tasks.
65 pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { 67 ///
66 unsafe { 68 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
67 let executor = &self.inner; 69 /// for example by passing it as an argument to the initial tasks.
68 self.ctx.closure.write(Closure::new(move |_| { 70 ///
69 executor.poll(); 71 /// This function requires `&'static mut self`. This means you have to store the
70 })); 72 /// Executor instance in a place where it'll live forever and grants you mutable
71 init(self.inner.spawner()); 73 /// access. There's a few ways to do this:
74 ///
75 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
76 /// - a `static mut` (unsafe)
77 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
78 pub fn start(&'static mut self, init: impl FnOnce(Spawner)) {
79 unsafe {
80 let executor = &self.inner;
81 self.ctx.closure.write(Closure::new(move |_| {
82 executor.poll();
83 }));
84 init(self.inner.spawner());
85 }
72 } 86 }
73 } 87 }
74} 88}
diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs
index f908aaa70..017b2c52b 100644
--- a/embassy-executor/src/arch/xtensa.rs
+++ b/embassy-executor/src/arch/xtensa.rs
@@ -1,66 +1,84 @@
1use core::marker::PhantomData; 1#[cfg(feature = "executor-interrupt")]
2use core::ptr; 2compile_error!("`executor-interrupt` is not supported with `arch-xtensa`.");
3 3
4use atomic_polyfill::{AtomicBool, Ordering}; 4#[cfg(feature = "executor-thread")]
5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
8 use core::marker::PhantomData;
9 use core::sync::atomic::{AtomicBool, Ordering};
5 10
6use super::{raw, Spawner}; 11 use crate::raw::{Pender, PenderInner};
12 use crate::{raw, Spawner};
7 13
8/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa 14 #[derive(Copy, Clone)]
9/// 15 pub(crate) struct ThreadPender;
10static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
11 16
12/// Xtensa Executor 17 impl ThreadPender {
13pub struct Executor { 18 #[allow(unused)]
14 inner: raw::Executor, 19 pub(crate) fn pend(self) {
15 not_send: PhantomData<*mut ()>, 20 SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst);
16}
17
18impl Executor {
19 /// Create a new Executor.
20 pub fn new() -> Self {
21 Self {
22 // use Signal_Work_Thread_Mode as substitute for local interrupt register
23 inner: raw::Executor::new(
24 |_| {
25 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
26 },
27 ptr::null_mut(),
28 ),
29 not_send: PhantomData,
30 } 21 }
31 } 22 }
32 23
33 /// Run the executor. 24 /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa
34 /// 25 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
35 /// The `init` closure is called with a [`Spawner`] that spawns tasks on 26
36 /// this executor. Use it to spawn the initial task(s). After `init` returns, 27 /// Xtensa Executor
37 /// the executor starts running the tasks. 28 pub struct Executor {
38 /// 29 inner: raw::Executor,
39 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), 30 not_send: PhantomData<*mut ()>,
40 /// for example by passing it as an argument to the initial tasks. 31 }
41 ///
42 /// This function requires `&'static mut self`. This means you have to store the
43 /// Executor instance in a place where it'll live forever and grants you mutable
44 /// access. There's a few ways to do this:
45 ///
46 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
47 /// - a `static mut` (unsafe)
48 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
49 ///
50 /// This function never returns.
51 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
52 init(self.inner.spawner());
53 32
54 loop { 33 impl Executor {
55 unsafe { 34 /// Create a new Executor.
56 self.inner.poll(); 35 pub fn new() -> Self {
57 // we do not care about race conditions between the load and store operations, interrupts 36 Self {
58 // will only set this value to true. 37 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))),
59 // if there is work to do, loop back to polling 38 not_send: PhantomData,
60 // TODO can we relax this? 39 }
61 critical_section::with(|_| { 40 }
41
42 /// Run the executor.
43 ///
44 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
45 /// this executor. Use it to spawn the initial task(s). After `init` returns,
46 /// the executor starts running the tasks.
47 ///
48 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
49 /// for example by passing it as an argument to the initial tasks.
50 ///
51 /// This function requires `&'static mut self`. This means you have to store the
52 /// Executor instance in a place where it'll live forever and grants you mutable
53 /// access. There's a few ways to do this:
54 ///
55 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
56 /// - a `static mut` (unsafe)
57 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
58 ///
59 /// This function never returns.
60 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
61 init(self.inner.spawner());
62
63 loop {
64 unsafe {
65 self.inner.poll();
66
67 // Manual critical section implementation that only masks interrupts handlers.
68 // We must not acquire the cross-core on dual-core systems because that would
69 // prevent the other core from doing useful work while this core is sleeping.
70 let token: critical_section::RawRestoreState;
71 core::arch::asm!("rsil {0}, 5", out(reg) token);
72
73 // we do not care about race conditions between the load and store operations, interrupts
74 // will only set this value to true.
75 // if there is work to do, loop back to polling
62 if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { 76 if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
63 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); 77 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
78
79 core::arch::asm!(
80 "wsr.ps {0}",
81 "rsync", in(reg) token)
64 } else { 82 } else {
65 // waiti sets the PS.INTLEVEL when slipping into sleep 83 // waiti sets the PS.INTLEVEL when slipping into sleep
66 // because critical sections in Xtensa are implemented via increasing 84 // because critical sections in Xtensa are implemented via increasing
@@ -68,7 +86,7 @@ impl Executor {
68 // take care not add code after `waiti` if it needs to be inside the CS 86 // take care not add code after `waiti` if it needs to be inside the CS
69 core::arch::asm!("waiti 0"); // critical section ends here 87 core::arch::asm!("waiti 0"); // critical section ends here
70 } 88 }
71 }); 89 }
72 } 90 }
73 } 91 }
74 } 92 }
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs
index e4cbd04b9..3ce687eb6 100644
--- a/embassy-executor/src/lib.rs
+++ b/embassy-executor/src/lib.rs
@@ -1,5 +1,5 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)]
2#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))] 2#![cfg_attr(all(feature = "nightly", feature = "arch-xtensa"), feature(asm_experimental_arch))]
3#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
4#![doc = include_str!("../README.md")] 4#![doc = include_str!("../README.md")]
5#![warn(missing_docs)] 5#![warn(missing_docs)]
@@ -8,41 +8,45 @@
8pub(crate) mod fmt; 8pub(crate) mod fmt;
9 9
10#[cfg(feature = "nightly")] 10#[cfg(feature = "nightly")]
11pub use embassy_macros::{main, task}; 11pub use embassy_macros::task;
12 12
13cfg_if::cfg_if! { 13macro_rules! check_at_most_one {
14 if #[cfg(cortex_m)] { 14 (@amo [$($feats:literal)*] [] [$($res:tt)*]) => {
15 #[path="arch/cortex_m.rs"] 15 #[cfg(any($($res)*))]
16 mod arch; 16 compile_error!(concat!("At most one of these features can be enabled at the same time:", $(" `", $feats, "`",)*));
17 pub use arch::*; 17 };
18 } 18 (@amo $feats:tt [$curr:literal $($rest:literal)*] [$($res:tt)*]) => {
19 else if #[cfg(target_arch="riscv32")] { 19 check_at_most_one!(@amo $feats [$($rest)*] [$($res)* $(all(feature=$curr, feature=$rest),)*]);
20 #[path="arch/riscv32.rs"] 20 };
21 mod arch; 21 ($($f:literal),*$(,)?) => {
22 pub use arch::*; 22 check_at_most_one!(@amo [$($f)*] [$($f)*] []);
23 } 23 };
24 else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] {
25 #[path="arch/xtensa.rs"]
26 mod arch;
27 pub use arch::*;
28 }
29 else if #[cfg(feature="wasm")] {
30 #[path="arch/wasm.rs"]
31 mod arch;
32 pub use arch::*;
33 }
34 else if #[cfg(feature="std")] {
35 #[path="arch/std.rs"]
36 mod arch;
37 pub use arch::*;
38 }
39} 24}
25check_at_most_one!("arch-cortex-m", "arch-riscv32", "arch-xtensa", "arch-std", "arch-wasm",);
26
27#[cfg(feature = "_arch")]
28#[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")]
29#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")]
30#[cfg_attr(feature = "arch-xtensa", path = "arch/xtensa.rs")]
31#[cfg_attr(feature = "arch-std", path = "arch/std.rs")]
32#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")]
33mod arch;
34
35#[cfg(feature = "_arch")]
36pub use arch::*;
40 37
38pub mod raw;
39
40mod spawner;
41pub use spawner::*;
42
43/// Implementation details for embassy macros.
44/// Do not use. Used for macros and HALs only. Not covered by semver guarantees.
41#[doc(hidden)] 45#[doc(hidden)]
42/// Implementation details for embassy macros. DO NOT USE. 46pub mod _export {
43pub mod export {
44 #[cfg(feature = "rtos-trace")] 47 #[cfg(feature = "rtos-trace")]
45 pub use rtos_trace::trace; 48 pub use rtos_trace::trace;
49 pub use static_cell::StaticCell;
46 50
47 /// Expands the given block of code when `embassy-executor` is compiled with 51 /// Expands the given block of code when `embassy-executor` is compiled with
48 /// the `rtos-trace-interrupt` feature. 52 /// the `rtos-trace-interrupt` feature.
@@ -62,14 +66,3 @@ pub mod export {
62 ($($tt:tt)*) => {}; 66 ($($tt:tt)*) => {};
63 } 67 }
64} 68}
65
66pub mod raw;
67
68mod spawner;
69pub use spawner::*;
70
71/// Do not use. Used for macros and HALs only. Not covered by semver guarantees.
72#[doc(hidden)]
73pub mod _export {
74 pub use static_cell::StaticCell;
75}
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index e1258ebb5..f3760f589 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -11,17 +11,17 @@ mod run_queue;
11#[cfg(feature = "integrated-timers")] 11#[cfg(feature = "integrated-timers")]
12mod timer_queue; 12mod timer_queue;
13pub(crate) mod util; 13pub(crate) mod util;
14#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
14mod waker; 15mod waker;
15 16
16use core::cell::Cell;
17use core::future::Future; 17use core::future::Future;
18use core::marker::PhantomData;
19use core::mem;
18use core::pin::Pin; 20use core::pin::Pin;
19use core::ptr::NonNull; 21use core::ptr::NonNull;
20use core::task::{Context, Poll}; 22use core::task::{Context, Poll};
21use core::{mem, ptr};
22 23
23use atomic_polyfill::{AtomicU32, Ordering}; 24use atomic_polyfill::{AtomicU32, Ordering};
24use critical_section::CriticalSection;
25#[cfg(feature = "integrated-timers")] 25#[cfg(feature = "integrated-timers")]
26use embassy_time::driver::{self, AlarmHandle}; 26use embassy_time::driver::{self, AlarmHandle};
27#[cfg(feature = "integrated-timers")] 27#[cfg(feature = "integrated-timers")]
@@ -30,7 +30,7 @@ use embassy_time::Instant;
30use rtos_trace::trace; 30use rtos_trace::trace;
31 31
32use self::run_queue::{RunQueue, RunQueueItem}; 32use self::run_queue::{RunQueue, RunQueueItem};
33use self::util::UninitCell; 33use self::util::{SyncUnsafeCell, UninitCell};
34pub use self::waker::task_from_waker; 34pub use self::waker::task_from_waker;
35use super::SpawnToken; 35use super::SpawnToken;
36 36
@@ -43,35 +43,49 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; 43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
44 44
45/// Raw task header for use in task pointers. 45/// Raw task header for use in task pointers.
46/// 46pub(crate) struct TaskHeader {
47/// This is an opaque struct, used for raw pointers to tasks, for use
48/// with funtions like [`wake_task`] and [`task_from_waker`].
49pub struct TaskHeader {
50 pub(crate) state: AtomicU32, 47 pub(crate) state: AtomicU32,
51 pub(crate) run_queue_item: RunQueueItem, 48 pub(crate) run_queue_item: RunQueueItem,
52 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 49 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
53 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED 50 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
54 51
55 #[cfg(feature = "integrated-timers")] 52 #[cfg(feature = "integrated-timers")]
56 pub(crate) expires_at: Cell<Instant>, 53 pub(crate) expires_at: SyncUnsafeCell<Instant>,
57 #[cfg(feature = "integrated-timers")] 54 #[cfg(feature = "integrated-timers")]
58 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
59} 56}
60 57
61impl TaskHeader { 58/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
62 pub(crate) const fn new() -> Self { 59#[derive(Clone, Copy)]
60pub struct TaskRef {
61 ptr: NonNull<TaskHeader>,
62}
63
64unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
65unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
66
67impl TaskRef {
68 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
63 Self { 69 Self {
64 state: AtomicU32::new(0), 70 ptr: NonNull::from(task).cast(),
65 run_queue_item: RunQueueItem::new(), 71 }
66 executor: Cell::new(ptr::null()), 72 }
67 poll_fn: UninitCell::uninit(),
68 73
69 #[cfg(feature = "integrated-timers")] 74 /// Safety: The pointer must have been obtained with `Task::as_ptr`
70 expires_at: Cell::new(Instant::from_ticks(0)), 75 pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
71 #[cfg(feature = "integrated-timers")] 76 Self {
72 timer_queue_item: timer_queue::TimerQueueItem::new(), 77 ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
73 } 78 }
74 } 79 }
80
81 pub(crate) fn header(self) -> &'static TaskHeader {
82 unsafe { self.ptr.as_ref() }
83 }
84
85 /// The returned pointer is valid for the entire TaskStorage.
86 pub(crate) fn as_ptr(self) -> *const TaskHeader {
87 self.ptr.as_ptr()
88 }
75} 89}
76 90
77/// Raw storage in which a task can be spawned. 91/// Raw storage in which a task can be spawned.
@@ -101,7 +115,18 @@ impl<F: Future + 'static> TaskStorage<F> {
101 /// Create a new TaskStorage, in not-spawned state. 115 /// Create a new TaskStorage, in not-spawned state.
102 pub const fn new() -> Self { 116 pub const fn new() -> Self {
103 Self { 117 Self {
104 raw: TaskHeader::new(), 118 raw: TaskHeader {
119 state: AtomicU32::new(0),
120 run_queue_item: RunQueueItem::new(),
121 executor: SyncUnsafeCell::new(None),
122 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
123 poll_fn: SyncUnsafeCell::new(None),
124
125 #[cfg(feature = "integrated-timers")]
126 expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
127 #[cfg(feature = "integrated-timers")]
128 timer_queue_item: timer_queue::TimerQueueItem::new(),
129 },
105 future: UninitCell::uninit(), 130 future: UninitCell::uninit(),
106 } 131 }
107 } 132 }
@@ -120,29 +145,17 @@ impl<F: Future + 'static> TaskStorage<F> {
120 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 145 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
121 /// on a different executor. 146 /// on a different executor.
122 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 147 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
123 if self.spawn_mark_used() { 148 let task = AvailableTask::claim(self);
124 return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; 149 match task {
150 Some(task) => {
151 let task = task.initialize(future);
152 unsafe { SpawnToken::<F>::new(task) }
153 }
154 None => SpawnToken::new_failed(),
125 } 155 }
126
127 SpawnToken::<F>::new_failed()
128 }
129
130 fn spawn_mark_used(&'static self) -> bool {
131 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
132 self.raw
133 .state
134 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
135 .is_ok()
136 } 156 }
137 157
138 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> { 158 unsafe fn poll(p: TaskRef) {
139 // Initialize the task
140 self.raw.poll_fn.write(Self::poll);
141 self.future.write(future());
142 NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
143 }
144
145 unsafe fn poll(p: NonNull<TaskHeader>) {
146 let this = &*(p.as_ptr() as *const TaskStorage<F>); 159 let this = &*(p.as_ptr() as *const TaskStorage<F>);
147 160
148 let future = Pin::new_unchecked(this.future.as_mut()); 161 let future = Pin::new_unchecked(this.future.as_mut());
@@ -152,6 +165,9 @@ impl<F: Future + 'static> TaskStorage<F> {
152 Poll::Ready(_) => { 165 Poll::Ready(_) => {
153 this.future.drop_in_place(); 166 this.future.drop_in_place();
154 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 167 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
168
169 #[cfg(feature = "integrated-timers")]
170 this.raw.expires_at.set(Instant::MAX);
155 } 171 }
156 Poll::Pending => {} 172 Poll::Pending => {}
157 } 173 }
@@ -160,9 +176,37 @@ impl<F: Future + 'static> TaskStorage<F> {
160 // it's a noop for our waker. 176 // it's a noop for our waker.
161 mem::forget(waker); 177 mem::forget(waker);
162 } 178 }
179
180 #[doc(hidden)]
181 #[allow(dead_code)]
182 fn _assert_sync(self) {
183 fn assert_sync<T: Sync>(_: T) {}
184
185 assert_sync(self)
186 }
187}
188
189struct AvailableTask<F: Future + 'static> {
190 task: &'static TaskStorage<F>,
163} 191}
164 192
165unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 193impl<F: Future + 'static> AvailableTask<F> {
194 fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
195 task.raw
196 .state
197 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
198 .ok()
199 .map(|_| Self { task })
200 }
201
202 fn initialize(self, future: impl FnOnce() -> F) -> TaskRef {
203 unsafe {
204 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
205 self.task.future.write(future());
206 }
207 TaskRef::new(self.task)
208 }
209}
166 210
167/// Raw storage that can hold up to N tasks of the same type. 211/// Raw storage that can hold up to N tasks of the same type.
168/// 212///
@@ -187,13 +231,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
187 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 231 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
188 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 232 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
189 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
190 for task in &self.pool { 234 let task = self.pool.iter().find_map(AvailableTask::claim);
191 if task.spawn_mark_used() { 235 match task {
192 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; 236 Some(task) => {
237 let task = task.initialize(future);
238 unsafe { SpawnToken::<F>::new(task) }
193 } 239 }
240 None => SpawnToken::new_failed(),
194 } 241 }
195
196 SpawnToken::<F>::new_failed()
197 } 242 }
198 243
199 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if 244 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
@@ -235,39 +280,71 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
235 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly 280 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
236 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. 281 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
237 282
238 for task in &self.pool { 283 let task = self.pool.iter().find_map(AvailableTask::claim);
239 if task.spawn_mark_used() { 284 match task {
240 return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); 285 Some(task) => {
286 let task = task.initialize(future);
287 unsafe { SpawnToken::<FutFn>::new(task) }
241 } 288 }
289 None => SpawnToken::new_failed(),
242 } 290 }
243
244 SpawnToken::<FutFn>::new_failed()
245 } 291 }
246} 292}
247 293
248/// Raw executor. 294#[derive(Clone, Copy)]
249/// 295pub(crate) enum PenderInner {
250/// This is the core of the Embassy executor. It is low-level, requiring manual 296 #[cfg(feature = "executor-thread")]
251/// handling of wakeups and task polling. If you can, prefer using one of the 297 Thread(crate::arch::ThreadPender),
252/// [higher level executors](crate::Executor). 298 #[cfg(feature = "executor-interrupt")]
253/// 299 Interrupt(crate::arch::InterruptPender),
254/// The raw executor leaves it up to you to handle wakeups and scheduling: 300 #[cfg(feature = "pender-callback")]
255/// 301 Callback { func: fn(*mut ()), context: *mut () },
256/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks 302}
257/// that "want to run"). 303
258/// - You must supply a `signal_fn`. The executor will call it to notify you it has work 304unsafe impl Send for PenderInner {}
259/// to do. You must arrange for `poll()` to be called as soon as possible. 305unsafe impl Sync for PenderInner {}
306
307/// Platform/architecture-specific action executed when an executor has pending work.
260/// 308///
261/// `signal_fn` can be called from *any* context: any thread, any interrupt priority 309/// When a task within an executor is woken, the `Pender` is called. This does a
262/// level, etc. It may be called synchronously from any `Executor` method call as well. 310/// platform/architecture-specific action to signal there is pending work in the executor.
263/// You must deal with this correctly. 311/// When this happens, you must arrange for [`Executor::poll`] to be called.
264/// 312///
265/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates 313/// You can think of it as a waker, but for the whole executor.
266/// the requirement for `poll` to not be called reentrantly. 314pub struct Pender(pub(crate) PenderInner);
267pub struct Executor { 315
316impl Pender {
317 /// Create a `Pender` that will call an arbitrary function pointer.
318 ///
319 /// # Arguments
320 ///
321 /// - `func`: The function pointer to call.
322 /// - `context`: Opaque context pointer, that will be passed to the function pointer.
323 #[cfg(feature = "pender-callback")]
324 pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self {
325 Self(PenderInner::Callback {
326 func,
327 context: context.into(),
328 })
329 }
330}
331
332impl Pender {
333 pub(crate) fn pend(&self) {
334 match self.0 {
335 #[cfg(feature = "executor-thread")]
336 PenderInner::Thread(x) => x.pend(),
337 #[cfg(feature = "executor-interrupt")]
338 PenderInner::Interrupt(x) => x.pend(),
339 #[cfg(feature = "pender-callback")]
340 PenderInner::Callback { func, context } => func(context),
341 }
342 }
343}
344
345pub(crate) struct SyncExecutor {
268 run_queue: RunQueue, 346 run_queue: RunQueue,
269 signal_fn: fn(*mut ()), 347 pender: Pender,
270 signal_ctx: *mut (),
271 348
272 #[cfg(feature = "integrated-timers")] 349 #[cfg(feature = "integrated-timers")]
273 pub(crate) timer_queue: timer_queue::TimerQueue, 350 pub(crate) timer_queue: timer_queue::TimerQueue,
@@ -275,23 +352,14 @@ pub struct Executor {
275 alarm: AlarmHandle, 352 alarm: AlarmHandle,
276} 353}
277 354
278impl Executor { 355impl SyncExecutor {
279 /// Create a new executor. 356 pub(crate) fn new(pender: Pender) -> Self {
280 ///
281 /// When the executor has work to do, it will call `signal_fn` with
282 /// `signal_ctx` as argument.
283 ///
284 /// See [`Executor`] docs for details on `signal_fn`.
285 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
286 #[cfg(feature = "integrated-timers")] 357 #[cfg(feature = "integrated-timers")]
287 let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; 358 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
288 #[cfg(feature = "integrated-timers")]
289 driver::set_alarm_callback(alarm, signal_fn, signal_ctx);
290 359
291 Self { 360 Self {
292 run_queue: RunQueue::new(), 361 run_queue: RunQueue::new(),
293 signal_fn, 362 pender,
294 signal_ctx,
295 363
296 #[cfg(feature = "integrated-timers")] 364 #[cfg(feature = "integrated-timers")]
297 timer_queue: timer_queue::TimerQueue::new(), 365 timer_queue: timer_queue::TimerQueue::new(),
@@ -307,12 +375,133 @@ impl Executor {
307 /// - `task` must be set up to run in this executor. 375 /// - `task` must be set up to run in this executor.
308 /// - `task` must NOT be already enqueued (in this executor or another one). 376 /// - `task` must NOT be already enqueued (in this executor or another one).
309 #[inline(always)] 377 #[inline(always)]
310 unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) { 378 unsafe fn enqueue(&self, task: TaskRef) {
311 #[cfg(feature = "rtos-trace")] 379 #[cfg(feature = "rtos-trace")]
312 trace::task_ready_begin(task.as_ptr() as u32); 380 trace::task_ready_begin(task.as_ptr() as u32);
313 381
314 if self.run_queue.enqueue(cs, task) { 382 if self.run_queue.enqueue(task) {
315 (self.signal_fn)(self.signal_ctx) 383 self.pender.pend();
384 }
385 }
386
387 #[cfg(feature = "integrated-timers")]
388 fn alarm_callback(ctx: *mut ()) {
389 let this: &Self = unsafe { &*(ctx as *const Self) };
390 this.pender.pend();
391 }
392
393 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
394 task.header().executor.set(Some(self));
395
396 #[cfg(feature = "rtos-trace")]
397 trace::task_new(task.as_ptr() as u32);
398
399 self.enqueue(task);
400 }
401
402 /// # Safety
403 ///
404 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
405 pub(crate) unsafe fn poll(&'static self) {
406 #[cfg(feature = "integrated-timers")]
407 driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
408
409 #[allow(clippy::never_loop)]
410 loop {
411 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
413
414 self.run_queue.dequeue_all(|p| {
415 let task = p.header();
416
417 #[cfg(feature = "integrated-timers")]
418 task.expires_at.set(Instant::MAX);
419
420 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
421 if state & STATE_SPAWNED == 0 {
422 // If task is not running, ignore it. This can happen in the following scenario:
423 // - Task gets dequeued, poll starts
424 // - While task is being polled, it gets woken. It gets placed in the queue.
425 // - Task poll finishes, returning done=true
426 // - RUNNING bit is cleared, but the task is already in the queue.
427 return;
428 }
429
430 #[cfg(feature = "rtos-trace")]
431 trace::task_exec_begin(p.as_ptr() as u32);
432
433 // Run the task
434 task.poll_fn.get().unwrap_unchecked()(p);
435
436 #[cfg(feature = "rtos-trace")]
437 trace::task_exec_end();
438
439 // Enqueue or update into timer_queue
440 #[cfg(feature = "integrated-timers")]
441 self.timer_queue.update(p);
442 });
443
444 #[cfg(feature = "integrated-timers")]
445 {
446 // If this is already in the past, set_alarm might return false
447 // In that case do another poll loop iteration.
448 let next_expiration = self.timer_queue.next_expiration();
449 if driver::set_alarm(self.alarm, next_expiration.as_ticks()) {
450 break;
451 }
452 }
453
454 #[cfg(not(feature = "integrated-timers"))]
455 {
456 break;
457 }
458 }
459
460 #[cfg(feature = "rtos-trace")]
461 trace::system_idle();
462 }
463}
464
465/// Raw executor.
466///
467/// This is the core of the Embassy executor. It is low-level, requiring manual
468/// handling of wakeups and task polling. If you can, prefer using one of the
469/// [higher level executors](crate::Executor).
470///
471/// The raw executor leaves it up to you to handle wakeups and scheduling:
472///
473/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
474/// that "want to run").
475/// - You must supply a [`Pender`]. The executor will call it to notify you it has work
476/// to do. You must arrange for `poll()` to be called as soon as possible.
477///
478/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority
479/// level, etc. It may be called synchronously from any `Executor` method call as well.
480/// You must deal with this correctly.
481///
482/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
483/// the requirement for `poll` to not be called reentrantly.
484#[repr(transparent)]
485pub struct Executor {
486 pub(crate) inner: SyncExecutor,
487
488 _not_sync: PhantomData<*mut ()>,
489}
490
491impl Executor {
492 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
493 mem::transmute(inner)
494 }
495
496 /// Create a new executor.
497 ///
498 /// When the executor has work to do, it will call the [`Pender`].
499 ///
500 /// See [`Executor`] docs for details on `Pender`.
501 pub fn new(pender: Pender) -> Self {
502 Self {
503 inner: SyncExecutor::new(pender),
504 _not_sync: PhantomData,
316 } 505 }
317 } 506 }
318 507
@@ -325,15 +514,8 @@ impl Executor {
325 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. 514 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
326 /// In this case, the task's Future must be Send. This is because this is effectively 515 /// In this case, the task's Future must be Send. This is because this is effectively
327 /// sending the task to the executor thread. 516 /// sending the task to the executor thread.
328 pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { 517 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
329 task.as_ref().executor.set(self); 518 self.inner.spawn(task)
330
331 #[cfg(feature = "rtos-trace")]
332 trace::task_new(task.as_ptr() as u32);
333
334 critical_section::with(|cs| {
335 self.enqueue(cs, task);
336 })
337 } 519 }
338 520
339 /// Poll all queued tasks in this executor. 521 /// Poll all queued tasks in this executor.
@@ -341,63 +523,20 @@ impl Executor {
341 /// This loops over all tasks that are queued to be polled (i.e. they're 523 /// This loops over all tasks that are queued to be polled (i.e. they're
342 /// freshly spawned or they've been woken). Other tasks are not polled. 524 /// freshly spawned or they've been woken). Other tasks are not polled.
343 /// 525 ///
344 /// You must call `poll` after receiving a call to `signal_fn`. It is OK 526 /// You must call `poll` after receiving a call to the [`Pender`]. It is OK
345 /// to call `poll` even when not requested by `signal_fn`, but it wastes 527 /// to call `poll` even when not requested by the `Pender`, but it wastes
346 /// energy. 528 /// energy.
347 /// 529 ///
348 /// # Safety 530 /// # Safety
349 /// 531 ///
350 /// You must NOT call `poll` reentrantly on the same executor. 532 /// You must NOT call `poll` reentrantly on the same executor.
351 /// 533 ///
352 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you 534 /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you
353 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to 535 /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to
354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 536 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
355 /// no `poll()` already running. 537 /// no `poll()` already running.
356 pub unsafe fn poll(&'static self) { 538 pub unsafe fn poll(&'static self) {
357 #[cfg(feature = "integrated-timers")] 539 self.inner.poll()
358 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
359
360 self.run_queue.dequeue_all(|p| {
361 let task = p.as_ref();
362
363 #[cfg(feature = "integrated-timers")]
364 task.expires_at.set(Instant::MAX);
365
366 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
367 if state & STATE_SPAWNED == 0 {
368 // If task is not running, ignore it. This can happen in the following scenario:
369 // - Task gets dequeued, poll starts
370 // - While task is being polled, it gets woken. It gets placed in the queue.
371 // - Task poll finishes, returning done=true
372 // - RUNNING bit is cleared, but the task is already in the queue.
373 return;
374 }
375
376 #[cfg(feature = "rtos-trace")]
377 trace::task_exec_begin(p.as_ptr() as u32);
378
379 // Run the task
380 task.poll_fn.read()(p as _);
381
382 #[cfg(feature = "rtos-trace")]
383 trace::task_exec_end();
384
385 // Enqueue or update into timer_queue
386 #[cfg(feature = "integrated-timers")]
387 self.timer_queue.update(p);
388 });
389
390 #[cfg(feature = "integrated-timers")]
391 {
392 // If this is already in the past, set_alarm will immediately trigger the alarm.
393 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again,
394 // so we immediately do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 driver::set_alarm(self.alarm, next_expiration.as_ticks());
397 }
398
399 #[cfg(feature = "rtos-trace")]
400 trace::system_idle();
401 } 540 }
402 541
403 /// Get a spawner that spawns tasks in this executor. 542 /// Get a spawner that spawns tasks in this executor.
@@ -409,41 +548,49 @@ impl Executor {
409 } 548 }
410} 549}
411 550
412/// Wake a task by raw pointer. 551/// Wake a task by `TaskRef`.
413///
414/// You can obtain task pointers from `Waker`s using [`task_from_waker`].
415/// 552///
416/// # Safety 553/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
417/// 554pub fn wake_task(task: TaskRef) {
418/// `task` must be a valid task pointer obtained from [`task_from_waker`]. 555 let header = task.header();
419pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
420 critical_section::with(|cs| {
421 let header = task.as_ref();
422 let state = header.state.load(Ordering::Relaxed);
423 556
557 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
424 // If already scheduled, or if not started, 558 // If already scheduled, or if not started,
425 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 559 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
426 return; 560 None
561 } else {
562 // Mark it as scheduled
563 Some(state | STATE_RUN_QUEUED)
427 } 564 }
565 });
428 566
429 // Mark it as scheduled 567 if res.is_ok() {
430 header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
431
432 // We have just marked the task as scheduled, so enqueue it. 568 // We have just marked the task as scheduled, so enqueue it.
433 let executor = &*header.executor.get(); 569 unsafe {
434 executor.enqueue(cs, task); 570 let executor = header.executor.get().unwrap_unchecked();
435 }) 571 executor.enqueue(task);
572 }
573 }
436} 574}
437 575
438#[cfg(feature = "integrated-timers")] 576#[cfg(feature = "integrated-timers")]
439#[no_mangle] 577struct TimerQueue;
440unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { 578
441 let task = waker::task_from_waker(waker); 579#[cfg(feature = "integrated-timers")]
442 let task = task.as_ref(); 580impl embassy_time::queue::TimerQueue for TimerQueue {
443 let expires_at = task.expires_at.get(); 581 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
444 task.expires_at.set(expires_at.min(at)); 582 let task = waker::task_from_waker(waker);
583 let task = task.header();
584 unsafe {
585 let expires_at = task.expires_at.get();
586 task.expires_at.set(expires_at.min(at));
587 }
588 }
445} 589}
446 590
591#[cfg(feature = "integrated-timers")]
592embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
593
447#[cfg(feature = "rtos-trace")] 594#[cfg(feature = "rtos-trace")]
448impl rtos_trace::RtosTraceOSCallbacks for Executor { 595impl rtos_trace::RtosTraceOSCallbacks for Executor {
449 fn task_list() { 596 fn task_list() {
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
index ed8c82a5c..f1ec19ac1 100644
--- a/embassy-executor/src/raw/run_queue.rs
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -2,18 +2,18 @@ use core::ptr;
2use core::ptr::NonNull; 2use core::ptr::NonNull;
3 3
4use atomic_polyfill::{AtomicPtr, Ordering}; 4use atomic_polyfill::{AtomicPtr, Ordering};
5use critical_section::CriticalSection;
6 5
7use super::TaskHeader; 6use super::{TaskHeader, TaskRef};
7use crate::raw::util::SyncUnsafeCell;
8 8
9pub(crate) struct RunQueueItem { 9pub(crate) struct RunQueueItem {
10 next: AtomicPtr<TaskHeader>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
11} 11}
12 12
13impl RunQueueItem { 13impl RunQueueItem {
14 pub const fn new() -> Self { 14 pub const fn new() -> Self {
15 Self { 15 Self {
16 next: AtomicPtr::new(ptr::null_mut()), 16 next: SyncUnsafeCell::new(None),
17 } 17 }
18 } 18 }
19} 19}
@@ -46,29 +46,43 @@ impl RunQueue {
46 /// 46 ///
47 /// `item` must NOT be already enqueued in any queue. 47 /// `item` must NOT be already enqueued in any queue.
48 #[inline(always)] 48 #[inline(always)]
49 pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { 49 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
50 let prev = self.head.load(Ordering::Relaxed); 50 let mut was_empty = false;
51 task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); 51
52 self.head.store(task.as_ptr(), Ordering::Relaxed); 52 self.head
53 prev.is_null() 53 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
54 was_empty = prev.is_null();
55 unsafe {
56 // safety: the pointer is either null or valid
57 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
58 // safety: there are no concurrent accesses to `next`
59 task.header().run_queue_item.next.set(prev);
60 }
61 Some(task.as_ptr() as *mut _)
62 })
63 .ok();
64
65 was_empty
54 } 66 }
55 67
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 68 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 69 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 70 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { 71 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue. 72 // Atomically empty the queue.
61 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 73 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
74
75 // safety: the pointer is either null or valid
76 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
62 77
63 // Iterate the linked list of tasks that were previously in the queue. 78 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = NonNull::new(ptr) { 79 while let Some(task) = next {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten. 80 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task. 81 // Therefore, first read the next pointer, and only then process the task.
67 let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); 82 // safety: there are no concurrent accesses to `next`
83 next = unsafe { task.header().run_queue_item.next.get() };
68 84
69 on_task(task); 85 on_task(task);
70
71 ptr = next
72 } 86 }
73 } 87 }
74} 88}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 24c31892a..dc71c95b1 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,45 +1,43 @@
1use core::cell::Cell;
2use core::cmp::min; 1use core::cmp::min;
3use core::ptr;
4use core::ptr::NonNull;
5 2
6use atomic_polyfill::Ordering; 3use atomic_polyfill::Ordering;
7use embassy_time::Instant; 4use embassy_time::Instant;
8 5
9use super::{TaskHeader, STATE_TIMER_QUEUED}; 6use super::{TaskRef, STATE_TIMER_QUEUED};
7use crate::raw::util::SyncUnsafeCell;
10 8
11pub(crate) struct TimerQueueItem { 9pub(crate) struct TimerQueueItem {
12 next: Cell<*mut TaskHeader>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
13} 11}
14 12
15impl TimerQueueItem { 13impl TimerQueueItem {
16 pub const fn new() -> Self { 14 pub const fn new() -> Self {
17 Self { 15 Self {
18 next: Cell::new(ptr::null_mut()), 16 next: SyncUnsafeCell::new(None),
19 } 17 }
20 } 18 }
21} 19}
22 20
23pub(crate) struct TimerQueue { 21pub(crate) struct TimerQueue {
24 head: Cell<*mut TaskHeader>, 22 head: SyncUnsafeCell<Option<TaskRef>>,
25} 23}
26 24
27impl TimerQueue { 25impl TimerQueue {
28 pub const fn new() -> Self { 26 pub const fn new() -> Self {
29 Self { 27 Self {
30 head: Cell::new(ptr::null_mut()), 28 head: SyncUnsafeCell::new(None),
31 } 29 }
32 } 30 }
33 31
34 pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) { 32 pub(crate) unsafe fn update(&self, p: TaskRef) {
35 let task = p.as_ref(); 33 let task = p.header();
36 if task.expires_at.get() != Instant::MAX { 34 if task.expires_at.get() != Instant::MAX {
37 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 35 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
38 let is_new = old_state & STATE_TIMER_QUEUED == 0; 36 let is_new = old_state & STATE_TIMER_QUEUED == 0;
39 37
40 if is_new { 38 if is_new {
41 task.timer_queue_item.next.set(self.head.get()); 39 task.timer_queue_item.next.set(self.head.get());
42 self.head.set(p.as_ptr()); 40 self.head.set(Some(p));
43 } 41 }
44 } 42 }
45 } 43 }
@@ -47,7 +45,7 @@ impl TimerQueue {
47 pub(crate) unsafe fn next_expiration(&self) -> Instant { 45 pub(crate) unsafe fn next_expiration(&self) -> Instant {
48 let mut res = Instant::MAX; 46 let mut res = Instant::MAX;
49 self.retain(|p| { 47 self.retain(|p| {
50 let task = p.as_ref(); 48 let task = p.header();
51 let expires = task.expires_at.get(); 49 let expires = task.expires_at.get();
52 res = min(res, expires); 50 res = min(res, expires);
53 expires != Instant::MAX 51 expires != Instant::MAX
@@ -55,9 +53,9 @@ impl TimerQueue {
55 res 53 res
56 } 54 }
57 55
58 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) { 56 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(TaskRef)) {
59 self.retain(|p| { 57 self.retain(|p| {
60 let task = p.as_ref(); 58 let task = p.header();
61 if task.expires_at.get() <= now { 59 if task.expires_at.get() <= now {
62 on_task(p); 60 on_task(p);
63 false 61 false
@@ -67,11 +65,10 @@ impl TimerQueue {
67 }); 65 });
68 } 66 }
69 67
70 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) { 68 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
71 let mut prev = &self.head; 69 let mut prev = &self.head;
72 while !prev.get().is_null() { 70 while let Some(p) = prev.get() {
73 let p = NonNull::new_unchecked(prev.get()); 71 let task = p.header();
74 let task = &*p.as_ptr();
75 if f(p) { 72 if f(p) {
76 // Skip to next 73 // Skip to next
77 prev = &task.timer_queue_item.next; 74 prev = &task.timer_queue_item.next;
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index ed5822188..e2e8f4df8 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -26,8 +26,31 @@ impl<T> UninitCell<T> {
26 } 26 }
27} 27}
28 28
29impl<T: Copy> UninitCell<T> { 29unsafe impl<T> Sync for UninitCell<T> {}
30 pub unsafe fn read(&self) -> T { 30
31 ptr::read(self.as_mut_ptr()) 31#[repr(transparent)]
32pub struct SyncUnsafeCell<T> {
33 value: UnsafeCell<T>,
34}
35
36unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}
37
38impl<T> SyncUnsafeCell<T> {
39 #[inline]
40 pub const fn new(value: T) -> Self {
41 Self {
42 value: UnsafeCell::new(value),
43 }
44 }
45
46 pub unsafe fn set(&self, value: T) {
47 *self.value.get() = value;
48 }
49
50 pub unsafe fn get(&self) -> T
51 where
52 T: Copy,
53 {
54 *self.value.get()
32 } 55 }
33} 56}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 5765259f2..400b37fa9 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -1,8 +1,7 @@
1use core::mem; 1use core::mem;
2use core::ptr::NonNull;
3use core::task::{RawWaker, RawWakerVTable, Waker}; 2use core::task::{RawWaker, RawWakerVTable, Waker};
4 3
5use super::{wake_task, TaskHeader}; 4use super::{wake_task, TaskHeader, TaskRef};
6 5
7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 6const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
8 7
@@ -11,14 +10,14 @@ unsafe fn clone(p: *const ()) -> RawWaker {
11} 10}
12 11
13unsafe fn wake(p: *const ()) { 12unsafe fn wake(p: *const ()) {
14 wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) 13 wake_task(TaskRef::from_ptr(p as *const TaskHeader))
15} 14}
16 15
17unsafe fn drop(_: *const ()) { 16unsafe fn drop(_: *const ()) {
18 // nop 17 // nop
19} 18}
20 19
21pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { 20pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) 21 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
23} 22}
24 23
@@ -33,7 +32,7 @@ pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker {
33/// # Panics 32/// # Panics
34/// 33///
35/// Panics if the waker is not created by the Embassy executor. 34/// Panics if the waker is not created by the Embassy executor.
36pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { 35pub fn task_from_waker(waker: &Waker) -> TaskRef {
37 // safety: OK because WakerHack has the same layout as Waker. 36 // safety: OK because WakerHack has the same layout as Waker.
38 // This is not really guaranteed because the structs are `repr(Rust)`, it is 37 // This is not really guaranteed because the structs are `repr(Rust)`, it is
39 // indeed the case in the current implementation. 38 // indeed the case in the current implementation.
@@ -43,8 +42,8 @@ pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> {
43 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") 42 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
44 } 43 }
45 44
46 // safety: we never create a waker with a null data pointer. 45 // safety: our wakers are always created with `TaskRef::as_ptr`
47 unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } 46 unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) }
48} 47}
49 48
50struct WakerHack { 49struct WakerHack {
diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs
new file mode 100644
index 000000000..435a0ff7e
--- /dev/null
+++ b/embassy-executor/src/raw/waker_turbo.rs
@@ -0,0 +1,34 @@
1use core::ptr::NonNull;
2use core::task::Waker;
3
4use super::{wake_task, TaskHeader, TaskRef};
5
6pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
7 Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _))
8}
9
10/// Get a task pointer from a waker.
11///
12/// This can be used as an optimization in wait queues to store task pointers
13/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
14/// avoid dynamic dispatch.
15///
16/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task).
17///
18/// # Panics
19///
20/// Panics if the waker is not created by the Embassy executor.
21pub fn task_from_waker(waker: &Waker) -> TaskRef {
22 let ptr = waker.as_turbo_ptr().as_ptr();
23
24 // safety: our wakers are always created with `TaskRef::as_ptr`
25 unsafe { TaskRef::from_ptr(ptr as *const TaskHeader) }
26}
27
28#[inline(never)]
29#[no_mangle]
30fn _turbo_wake(ptr: NonNull<()>) {
31 // safety: our wakers are always created with `TaskRef::as_ptr`
32 let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) };
33 wake_task(task)
34}
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 25a0d7dbb..2b6224045 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -1,10 +1,8 @@
1use core::future::poll_fn;
1use core::marker::PhantomData; 2use core::marker::PhantomData;
2use core::mem; 3use core::mem;
3use core::ptr::NonNull;
4use core::task::Poll; 4use core::task::Poll;
5 5
6use futures_util::future::poll_fn;
7
8use super::raw; 6use super::raw;
9 7
10/// Token to spawn a newly-created task in an executor. 8/// Token to spawn a newly-created task in an executor.
@@ -23,12 +21,12 @@ use super::raw;
23/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. 21/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
24#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] 22#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
25pub struct SpawnToken<S> { 23pub struct SpawnToken<S> {
26 raw_task: Option<NonNull<raw::TaskHeader>>, 24 raw_task: Option<raw::TaskRef>,
27 phantom: PhantomData<*mut S>, 25 phantom: PhantomData<*mut S>,
28} 26}
29 27
30impl<S> SpawnToken<S> { 28impl<S> SpawnToken<S> {
31 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { 29 pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self {
32 Self { 30 Self {
33 raw_task: Some(raw_task), 31 raw_task: Some(raw_task),
34 phantom: PhantomData, 32 phantom: PhantomData,
@@ -91,10 +89,11 @@ impl Spawner {
91 /// 89 ///
92 /// Panics if the current executor is not an Embassy executor. 90 /// Panics if the current executor is not an Embassy executor.
93 pub async fn for_current_executor() -> Self { 91 pub async fn for_current_executor() -> Self {
94 poll_fn(|cx| unsafe { 92 poll_fn(|cx| {
95 let task = raw::task_from_waker(cx.waker()); 93 let task = raw::task_from_waker(cx.waker());
96 let executor = (*task.as_ptr()).executor.get(); 94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
97 Poll::Ready(Self::new(&*executor)) 95 let executor = unsafe { raw::Executor::wrap(executor) };
96 Poll::Ready(Self::new(executor))
98 }) 97 })
99 .await 98 .await
100 } 99 }
@@ -132,9 +131,7 @@ impl Spawner {
132 /// spawner to other threads, but the spawner loses the ability to spawn 131 /// spawner to other threads, but the spawner loses the ability to spawn
133 /// non-Send tasks. 132 /// non-Send tasks.
134 pub fn make_send(&self) -> SendSpawner { 133 pub fn make_send(&self) -> SendSpawner {
135 SendSpawner { 134 SendSpawner::new(&self.executor.inner)
136 executor: self.executor,
137 }
138 } 135 }
139} 136}
140 137
@@ -147,14 +144,11 @@ impl Spawner {
147/// If you want to spawn non-Send tasks, use [Spawner]. 144/// If you want to spawn non-Send tasks, use [Spawner].
148#[derive(Copy, Clone)] 145#[derive(Copy, Clone)]
149pub struct SendSpawner { 146pub struct SendSpawner {
150 executor: &'static raw::Executor, 147 executor: &'static raw::SyncExecutor,
151} 148}
152 149
153unsafe impl Send for SendSpawner {}
154unsafe impl Sync for SendSpawner {}
155
156impl SendSpawner { 150impl SendSpawner {
157 pub(crate) fn new(executor: &'static raw::Executor) -> Self { 151 pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self {
158 Self { executor } 152 Self { executor }
159 } 153 }
160 154
@@ -167,10 +161,10 @@ impl SendSpawner {
167 /// 161 ///
168 /// Panics if the current executor is not an Embassy executor. 162 /// Panics if the current executor is not an Embassy executor.
169 pub async fn for_current_executor() -> Self { 163 pub async fn for_current_executor() -> Self {
170 poll_fn(|cx| unsafe { 164 poll_fn(|cx| {
171 let task = raw::task_from_waker(cx.waker()); 165 let task = raw::task_from_waker(cx.waker());
172 let executor = (*task.as_ptr()).executor.get(); 166 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
173 Poll::Ready(Self::new(&*executor)) 167 Poll::Ready(Self::new(executor))
174 }) 168 })
175 .await 169 .await
176 } 170 }