aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorxoviat <[email protected]>2023-08-22 16:58:43 -0500
committerxoviat <[email protected]>2023-08-22 16:58:43 -0500
commit7d6edd7b15d2209ac0b96ff8814ecefce2964e36 (patch)
tree7988a9b46855ac187a92cbfc5f38cbbbff695e8d /embassy-executor/src
parent9e3266b74554ea397bdd963ff12a26aa51e77b63 (diff)
parent7bff2ebab3b36cc922505e9db961840109c509ed (diff)
Merge branch 'main' of https://github.com/embassy-rs/embassy into rtc-lp
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/cortex_m.rs88
-rw-r--r--embassy-executor/src/arch/riscv32.rs18
-rw-r--r--embassy-executor/src/arch/std.rs17
-rw-r--r--embassy-executor/src/arch/wasm.rs33
-rw-r--r--embassy-executor/src/arch/xtensa.rs22
-rw-r--r--embassy-executor/src/raw/mod.rs228
-rw-r--r--embassy-executor/src/spawner.rs3
7 files changed, 209 insertions, 200 deletions
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs
index 94c8134d6..0806a22ab 100644
--- a/embassy-executor/src/arch/cortex_m.rs
+++ b/embassy-executor/src/arch/cortex_m.rs
@@ -1,3 +1,49 @@
1const THREAD_PENDER: usize = usize::MAX;
2
3#[export_name = "__pender"]
4#[cfg(any(feature = "executor-thread", feature = "executor-interrupt"))]
5fn __pender(context: *mut ()) {
6 unsafe {
7 // Safety: `context` is either `usize::MAX` created by `Executor::run`, or a valid interrupt
8 // request number given to `InterruptExecutor::start`.
9
10 let context = context as usize;
11
12 #[cfg(feature = "executor-thread")]
13 // Try to make Rust optimize the branching away if we only use thread mode.
14 if !cfg!(feature = "executor-interrupt") || context == THREAD_PENDER {
15 core::arch::asm!("sev");
16 return;
17 }
18
19 #[cfg(feature = "executor-interrupt")]
20 {
21 use cortex_m::interrupt::InterruptNumber;
22 use cortex_m::peripheral::NVIC;
23
24 #[derive(Clone, Copy)]
25 struct Irq(u16);
26 unsafe impl InterruptNumber for Irq {
27 fn number(self) -> u16 {
28 self.0
29 }
30 }
31
32 let irq = Irq(context as u16);
33
34 // STIR is faster, but is only available in v7 and higher.
35 #[cfg(not(armv6m))]
36 {
37 let mut nvic: NVIC = core::mem::transmute(());
38 nvic.request(irq);
39 }
40
41 #[cfg(armv6m)]
42 NVIC::pend(irq);
43 }
44 }
45}
46
1#[cfg(feature = "executor-thread")] 47#[cfg(feature = "executor-thread")]
2pub use thread::*; 48pub use thread::*;
3#[cfg(feature = "executor-thread")] 49#[cfg(feature = "executor-thread")]
@@ -8,18 +54,9 @@ mod thread {
8 #[cfg(feature = "nightly")] 54 #[cfg(feature = "nightly")]
9 pub use embassy_macros::main_cortex_m as main; 55 pub use embassy_macros::main_cortex_m as main;
10 56
11 use crate::raw::{Pender, PenderInner}; 57 use crate::arch::THREAD_PENDER;
12 use crate::{raw, Spawner}; 58 use crate::{raw, Spawner};
13 59
14 #[derive(Copy, Clone)]
15 pub(crate) struct ThreadPender;
16
17 impl ThreadPender {
18 pub(crate) fn pend(self) {
19 unsafe { core::arch::asm!("sev") }
20 }
21 }
22
23 /// Thread mode executor, using WFE/SEV. 60 /// Thread mode executor, using WFE/SEV.
24 /// 61 ///
25 /// This is the simplest and most common kind of executor. It runs on 62 /// This is the simplest and most common kind of executor. It runs on
@@ -39,7 +76,7 @@ mod thread {
39 /// Create a new Executor. 76 /// Create a new Executor.
40 pub fn new() -> Self { 77 pub fn new() -> Self {
41 Self { 78 Self {
42 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), 79 inner: raw::Executor::new(THREAD_PENDER as *mut ()),
43 not_send: PhantomData, 80 not_send: PhantomData,
44 } 81 }
45 } 82 }
@@ -86,30 +123,7 @@ mod interrupt {
86 use cortex_m::interrupt::InterruptNumber; 123 use cortex_m::interrupt::InterruptNumber;
87 use cortex_m::peripheral::NVIC; 124 use cortex_m::peripheral::NVIC;
88 125
89 use crate::raw::{self, Pender, PenderInner}; 126 use crate::raw;
90
91 #[derive(Clone, Copy)]
92 pub(crate) struct InterruptPender(u16);
93
94 impl InterruptPender {
95 pub(crate) fn pend(self) {
96 // STIR is faster, but is only available in v7 and higher.
97 #[cfg(not(armv6m))]
98 {
99 let mut nvic: cortex_m::peripheral::NVIC = unsafe { core::mem::transmute(()) };
100 nvic.request(self);
101 }
102
103 #[cfg(armv6m)]
104 cortex_m::peripheral::NVIC::pend(self);
105 }
106 }
107
108 unsafe impl cortex_m::interrupt::InterruptNumber for InterruptPender {
109 fn number(self) -> u16 {
110 self.0
111 }
112 }
113 127
114 /// Interrupt mode executor. 128 /// Interrupt mode executor.
115 /// 129 ///
@@ -194,9 +208,7 @@ mod interrupt {
194 unsafe { 208 unsafe {
195 (&mut *self.executor.get()) 209 (&mut *self.executor.get())
196 .as_mut_ptr() 210 .as_mut_ptr()
197 .write(raw::Executor::new(Pender(PenderInner::Interrupt(InterruptPender( 211 .write(raw::Executor::new(irq.number() as *mut ()))
198 irq.number(),
199 )))))
200 } 212 }
201 213
202 let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; 214 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs
index ff7ec1575..40c6877e2 100644
--- a/embassy-executor/src/arch/riscv32.rs
+++ b/embassy-executor/src/arch/riscv32.rs
@@ -11,22 +11,16 @@ mod thread {
11 #[cfg(feature = "nightly")] 11 #[cfg(feature = "nightly")]
12 pub use embassy_macros::main_riscv as main; 12 pub use embassy_macros::main_riscv as main;
13 13
14 use crate::raw::{Pender, PenderInner};
15 use crate::{raw, Spawner}; 14 use crate::{raw, Spawner};
16 15
17 #[derive(Copy, Clone)]
18 pub(crate) struct ThreadPender;
19
20 impl ThreadPender {
21 #[allow(unused)]
22 pub(crate) fn pend(self) {
23 SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst);
24 }
25 }
26
27 /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV 16 /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
28 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); 17 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
29 18
19 #[export_name = "__pender"]
20 fn __pender(_context: *mut ()) {
21 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
22 }
23
30 /// RISCV32 Executor 24 /// RISCV32 Executor
31 pub struct Executor { 25 pub struct Executor {
32 inner: raw::Executor, 26 inner: raw::Executor,
@@ -37,7 +31,7 @@ mod thread {
37 /// Create a new Executor. 31 /// Create a new Executor.
38 pub fn new() -> Self { 32 pub fn new() -> Self {
39 Self { 33 Self {
40 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), 34 inner: raw::Executor::new(core::ptr::null_mut()),
41 not_send: PhantomData, 35 not_send: PhantomData,
42 } 36 }
43 } 37 }
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs
index 4e4a178f0..5b2f7e2e4 100644
--- a/embassy-executor/src/arch/std.rs
+++ b/embassy-executor/src/arch/std.rs
@@ -11,17 +11,12 @@ mod thread {
11 #[cfg(feature = "nightly")] 11 #[cfg(feature = "nightly")]
12 pub use embassy_macros::main_std as main; 12 pub use embassy_macros::main_std as main;
13 13
14 use crate::raw::{Pender, PenderInner};
15 use crate::{raw, Spawner}; 14 use crate::{raw, Spawner};
16 15
17 #[derive(Copy, Clone)] 16 #[export_name = "__pender"]
18 pub(crate) struct ThreadPender(&'static Signaler); 17 fn __pender(context: *mut ()) {
19 18 let signaler: &'static Signaler = unsafe { std::mem::transmute(context) };
20 impl ThreadPender { 19 signaler.signal()
21 #[allow(unused)]
22 pub(crate) fn pend(self) {
23 self.0.signal()
24 }
25 } 20 }
26 21
27 /// Single-threaded std-based executor. 22 /// Single-threaded std-based executor.
@@ -34,9 +29,9 @@ mod thread {
34 impl Executor { 29 impl Executor {
35 /// Create a new Executor. 30 /// Create a new Executor.
36 pub fn new() -> Self { 31 pub fn new() -> Self {
37 let signaler = &*Box::leak(Box::new(Signaler::new())); 32 let signaler = Box::leak(Box::new(Signaler::new()));
38 Self { 33 Self {
39 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(signaler)))), 34 inner: raw::Executor::new(signaler as *mut Signaler as *mut ()),
40 not_send: PhantomData, 35 not_send: PhantomData,
41 signaler, 36 signaler,
42 } 37 }
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs
index 08ab16b99..934fd69e5 100644
--- a/embassy-executor/src/arch/wasm.rs
+++ b/embassy-executor/src/arch/wasm.rs
@@ -14,14 +14,12 @@ mod thread {
14 use wasm_bindgen::prelude::*; 14 use wasm_bindgen::prelude::*;
15 15
16 use crate::raw::util::UninitCell; 16 use crate::raw::util::UninitCell;
17 use crate::raw::{Pender, PenderInner};
18 use crate::{raw, Spawner}; 17 use crate::{raw, Spawner};
19 18
20 /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. 19 #[export_name = "__pender"]
21 pub struct Executor { 20 fn __pender(context: *mut ()) {
22 inner: raw::Executor, 21 let signaler: &'static WasmContext = unsafe { std::mem::transmute(context) };
23 ctx: &'static WasmContext, 22 let _ = signaler.promise.then(unsafe { signaler.closure.as_mut() });
24 not_send: PhantomData<*mut ()>,
25 } 23 }
26 24
27 pub(crate) struct WasmContext { 25 pub(crate) struct WasmContext {
@@ -29,16 +27,6 @@ mod thread {
29 closure: UninitCell<Closure<dyn FnMut(JsValue)>>, 27 closure: UninitCell<Closure<dyn FnMut(JsValue)>>,
30 } 28 }
31 29
32 #[derive(Copy, Clone)]
33 pub(crate) struct ThreadPender(&'static WasmContext);
34
35 impl ThreadPender {
36 #[allow(unused)]
37 pub(crate) fn pend(self) {
38 let _ = self.0.promise.then(unsafe { self.0.closure.as_mut() });
39 }
40 }
41
42 impl WasmContext { 30 impl WasmContext {
43 pub fn new() -> Self { 31 pub fn new() -> Self {
44 Self { 32 Self {
@@ -48,14 +36,21 @@ mod thread {
48 } 36 }
49 } 37 }
50 38
39 /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop.
40 pub struct Executor {
41 inner: raw::Executor,
42 ctx: &'static WasmContext,
43 not_send: PhantomData<*mut ()>,
44 }
45
51 impl Executor { 46 impl Executor {
52 /// Create a new Executor. 47 /// Create a new Executor.
53 pub fn new() -> Self { 48 pub fn new() -> Self {
54 let ctx = &*Box::leak(Box::new(WasmContext::new())); 49 let ctx = Box::leak(Box::new(WasmContext::new()));
55 Self { 50 Self {
56 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(ctx)))), 51 inner: raw::Executor::new(ctx as *mut WasmContext as *mut ()),
57 not_send: PhantomData,
58 ctx, 52 ctx,
53 not_send: PhantomData,
59 } 54 }
60 } 55 }
61 56
diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs
index 017b2c52b..601d85002 100644
--- a/embassy-executor/src/arch/xtensa.rs
+++ b/embassy-executor/src/arch/xtensa.rs
@@ -8,22 +8,16 @@ mod thread {
8 use core::marker::PhantomData; 8 use core::marker::PhantomData;
9 use core::sync::atomic::{AtomicBool, Ordering}; 9 use core::sync::atomic::{AtomicBool, Ordering};
10 10
11 use crate::raw::{Pender, PenderInner};
12 use crate::{raw, Spawner}; 11 use crate::{raw, Spawner};
13 12
14 #[derive(Copy, Clone)]
15 pub(crate) struct ThreadPender;
16
17 impl ThreadPender {
18 #[allow(unused)]
19 pub(crate) fn pend(self) {
20 SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst);
21 }
22 }
23
24 /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa 13 /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa
25 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); 14 static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
26 15
16 #[export_name = "__pender"]
17 fn __pender(_context: *mut ()) {
18 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
19 }
20
27 /// Xtensa Executor 21 /// Xtensa Executor
28 pub struct Executor { 22 pub struct Executor {
29 inner: raw::Executor, 23 inner: raw::Executor,
@@ -34,7 +28,7 @@ mod thread {
34 /// Create a new Executor. 28 /// Create a new Executor.
35 pub fn new() -> Self { 29 pub fn new() -> Self {
36 Self { 30 Self {
37 inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), 31 inner: raw::Executor::new(core::ptr::null_mut()),
38 not_send: PhantomData, 32 not_send: PhantomData,
39 } 33 }
40 } 34 }
@@ -77,8 +71,8 @@ mod thread {
77 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); 71 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
78 72
79 core::arch::asm!( 73 core::arch::asm!(
80 "wsr.ps {0}", 74 "wsr.ps {0}",
81 "rsync", in(reg) token) 75 "rsync", in(reg) token)
82 } else { 76 } else {
83 // waiti sets the PS.INTLEVEL when slipping into sleep 77 // waiti sets the PS.INTLEVEL when slipping into sleep
84 // because critical sections in Xtensa are implemented via increasing 78 // because critical sections in Xtensa are implemented via increasing
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index f3760f589..c1d82e18a 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -147,10 +147,7 @@ impl<F: Future + 'static> TaskStorage<F> {
147 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 147 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
148 let task = AvailableTask::claim(self); 148 let task = AvailableTask::claim(self);
149 match task { 149 match task {
150 Some(task) => { 150 Some(task) => task.initialize(future),
151 let task = task.initialize(future);
152 unsafe { SpawnToken::<F>::new(task) }
153 }
154 None => SpawnToken::new_failed(), 151 None => SpawnToken::new_failed(),
155 } 152 }
156 } 153 }
@@ -186,12 +183,16 @@ impl<F: Future + 'static> TaskStorage<F> {
186 } 183 }
187} 184}
188 185
189struct AvailableTask<F: Future + 'static> { 186/// An uninitialized [`TaskStorage`].
187pub struct AvailableTask<F: Future + 'static> {
190 task: &'static TaskStorage<F>, 188 task: &'static TaskStorage<F>,
191} 189}
192 190
193impl<F: Future + 'static> AvailableTask<F> { 191impl<F: Future + 'static> AvailableTask<F> {
194 fn claim(task: &'static TaskStorage<F>) -> Option<Self> { 192 /// Try to claim a [`TaskStorage`].
193 ///
194 /// This function returns `None` if a task has already been spawned and has not finished running.
195 pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
195 task.raw 196 task.raw
196 .state 197 .state
197 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) 198 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
@@ -199,61 +200,30 @@ impl<F: Future + 'static> AvailableTask<F> {
199 .map(|_| Self { task }) 200 .map(|_| Self { task })
200 } 201 }
201 202
202 fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { 203 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
203 unsafe { 204 unsafe {
204 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 205 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
205 self.task.future.write(future()); 206 self.task.future.write(future());
206 }
207 TaskRef::new(self.task)
208 }
209}
210 207
211/// Raw storage that can hold up to N tasks of the same type. 208 let task = TaskRef::new(self.task);
212///
213/// This is essentially a `[TaskStorage<F>; N]`.
214pub struct TaskPool<F: Future + 'static, const N: usize> {
215 pool: [TaskStorage<F>; N],
216}
217 209
218impl<F: Future + 'static, const N: usize> TaskPool<F, N> { 210 SpawnToken::new(task)
219 /// Create a new TaskPool, with all tasks in non-spawned state.
220 pub const fn new() -> Self {
221 Self {
222 pool: [TaskStorage::NEW; N],
223 } 211 }
224 } 212 }
225 213
226 /// Try to spawn a task in the pool. 214 /// Initialize the [`TaskStorage`] to run the given future.
227 /// 215 pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
228 /// See [`TaskStorage::spawn()`] for details. 216 self.initialize_impl::<F>(future)
229 ///
230 /// This will loop over the pool and spawn the task in the first storage that
231 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
232 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
234 let task = self.pool.iter().find_map(AvailableTask::claim);
235 match task {
236 Some(task) => {
237 let task = task.initialize(future);
238 unsafe { SpawnToken::<F>::new(task) }
239 }
240 None => SpawnToken::new_failed(),
241 }
242 } 217 }
243 218
244 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if 219 /// Initialize the [`TaskStorage`] to run the given future.
245 /// the future is !Send.
246 /// 220 ///
247 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used 221 /// # Safety
248 /// by the Embassy macros ONLY.
249 /// 222 ///
250 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` 223 /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
251 /// is an `async fn`, NOT a hand-written `Future`. 224 /// is an `async fn`, NOT a hand-written `Future`.
252 #[doc(hidden)] 225 #[doc(hidden)]
253 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> 226 pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
254 where
255 FutFn: FnOnce() -> F,
256 {
257 // When send-spawning a task, we construct the future in this thread, and effectively 227 // When send-spawning a task, we construct the future in this thread, and effectively
258 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, 228 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
259 // send-spawning should require the future `F` to be `Send`. 229 // send-spawning should require the future `F` to be `Send`.
@@ -279,66 +249,73 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
279 // 249 //
280 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly 250 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
281 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. 251 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
282 252 self.initialize_impl::<FutFn>(future)
283 let task = self.pool.iter().find_map(AvailableTask::claim);
284 match task {
285 Some(task) => {
286 let task = task.initialize(future);
287 unsafe { SpawnToken::<FutFn>::new(task) }
288 }
289 None => SpawnToken::new_failed(),
290 }
291 } 253 }
292} 254}
293 255
294#[derive(Clone, Copy)] 256/// Raw storage that can hold up to N tasks of the same type.
295pub(crate) enum PenderInner { 257///
296 #[cfg(feature = "executor-thread")] 258/// This is essentially a `[TaskStorage<F>; N]`.
297 Thread(crate::arch::ThreadPender), 259pub struct TaskPool<F: Future + 'static, const N: usize> {
298 #[cfg(feature = "executor-interrupt")] 260 pool: [TaskStorage<F>; N],
299 Interrupt(crate::arch::InterruptPender),
300 #[cfg(feature = "pender-callback")]
301 Callback { func: fn(*mut ()), context: *mut () },
302} 261}
303 262
304unsafe impl Send for PenderInner {} 263impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
305unsafe impl Sync for PenderInner {} 264 /// Create a new TaskPool, with all tasks in non-spawned state.
265 pub const fn new() -> Self {
266 Self {
267 pool: [TaskStorage::NEW; N],
268 }
269 }
306 270
307/// Platform/architecture-specific action executed when an executor has pending work. 271 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> {
308/// 272 match self.pool.iter().find_map(AvailableTask::claim) {
309/// When a task within an executor is woken, the `Pender` is called. This does a 273 Some(task) => task.initialize_impl::<T>(future),
310/// platform/architecture-specific action to signal there is pending work in the executor. 274 None => SpawnToken::new_failed(),
311/// When this happens, you must arrange for [`Executor::poll`] to be called. 275 }
312/// 276 }
313/// You can think of it as a waker, but for the whole executor.
314pub struct Pender(pub(crate) PenderInner);
315 277
316impl Pender { 278 /// Try to spawn a task in the pool.
317 /// Create a `Pender` that will call an arbitrary function pointer. 279 ///
280 /// See [`TaskStorage::spawn()`] for details.
281 ///
282 /// This will loop over the pool and spawn the task in the first storage that
283 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
284 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
285 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
286 self.spawn_impl::<F>(future)
287 }
288
289 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
290 /// the future is !Send.
318 /// 291 ///
319 /// # Arguments 292 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
293 /// by the Embassy macros ONLY.
320 /// 294 ///
321 /// - `func`: The function pointer to call. 295 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
322 /// - `context`: Opaque context pointer, that will be passed to the function pointer. 296 /// is an `async fn`, NOT a hand-written `Future`.
323 #[cfg(feature = "pender-callback")] 297 #[doc(hidden)]
324 pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self { 298 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
325 Self(PenderInner::Callback { 299 where
326 func, 300 FutFn: FnOnce() -> F,
327 context: context.into(), 301 {
328 }) 302 // See the comment in AvailableTask::__initialize_async_fn for explanation.
303 self.spawn_impl::<FutFn>(future)
329 } 304 }
330} 305}
331 306
307#[derive(Clone, Copy)]
308pub(crate) struct Pender(*mut ());
309
310unsafe impl Send for Pender {}
311unsafe impl Sync for Pender {}
312
332impl Pender { 313impl Pender {
333 pub(crate) fn pend(&self) { 314 pub(crate) fn pend(self) {
334 match self.0 { 315 extern "Rust" {
335 #[cfg(feature = "executor-thread")] 316 fn __pender(context: *mut ());
336 PenderInner::Thread(x) => x.pend(),
337 #[cfg(feature = "executor-interrupt")]
338 PenderInner::Interrupt(x) => x.pend(),
339 #[cfg(feature = "pender-callback")]
340 PenderInner::Callback { func, context } => func(context),
341 } 317 }
318 unsafe { __pender(self.0) };
342 } 319 }
343} 320}
344 321
@@ -409,7 +386,7 @@ impl SyncExecutor {
409 #[allow(clippy::never_loop)] 386 #[allow(clippy::never_loop)]
410 loop { 387 loop {
411 #[cfg(feature = "integrated-timers")] 388 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 389 self.timer_queue.dequeue_expired(Instant::now(), wake_task_no_pend);
413 390
414 self.run_queue.dequeue_all(|p| { 391 self.run_queue.dequeue_all(|p| {
415 let task = p.header(); 392 let task = p.header();
@@ -472,15 +449,31 @@ impl SyncExecutor {
472/// 449///
473/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks 450/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
474/// that "want to run"). 451/// that "want to run").
475/// - You must supply a [`Pender`]. The executor will call it to notify you it has work 452/// - You must supply a pender function, as shown below. The executor will call it to notify you
476/// to do. You must arrange for `poll()` to be called as soon as possible. 453/// it has work to do. You must arrange for `poll()` to be called as soon as possible.
454/// - Enabling `arch-xx` features will define a pender function for you. This means that you
455/// are limited to using the executors provided to you by the architecture/platform
456/// implementation. If you need a different executor, you must not enable `arch-xx` features.
477/// 457///
478/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority 458/// The pender can be called from *any* context: any thread, any interrupt priority
479/// level, etc. It may be called synchronously from any `Executor` method call as well. 459/// level, etc. It may be called synchronously from any `Executor` method call as well.
480/// You must deal with this correctly. 460/// You must deal with this correctly.
481/// 461///
482/// In particular, you must NOT call `poll` directly from the pender callback, as this violates 462/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
483/// the requirement for `poll` to not be called reentrantly. 463/// the requirement for `poll` to not be called reentrantly.
464///
465/// The pender function must be exported with the name `__pender` and have the following signature:
466///
467/// ```rust
468/// #[export_name = "__pender"]
469/// fn pender(context: *mut ()) {
470/// // schedule `poll()` to be called
471/// }
472/// ```
473///
474/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
475/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
476/// differentiate between executors, or to pass a pointer to a callback that should be called.
484#[repr(transparent)] 477#[repr(transparent)]
485pub struct Executor { 478pub struct Executor {
486 pub(crate) inner: SyncExecutor, 479 pub(crate) inner: SyncExecutor,
@@ -495,12 +488,12 @@ impl Executor {
495 488
496 /// Create a new executor. 489 /// Create a new executor.
497 /// 490 ///
498 /// When the executor has work to do, it will call the [`Pender`]. 491 /// When the executor has work to do, it will call the pender function and pass `context` to it.
499 /// 492 ///
500 /// See [`Executor`] docs for details on `Pender`. 493 /// See [`Executor`] docs for details on the pender.
501 pub fn new(pender: Pender) -> Self { 494 pub fn new(context: *mut ()) -> Self {
502 Self { 495 Self {
503 inner: SyncExecutor::new(pender), 496 inner: SyncExecutor::new(Pender(context)),
504 _not_sync: PhantomData, 497 _not_sync: PhantomData,
505 } 498 }
506 } 499 }
@@ -523,16 +516,16 @@ impl Executor {
523 /// This loops over all tasks that are queued to be polled (i.e. they're 516 /// This loops over all tasks that are queued to be polled (i.e. they're
524 /// freshly spawned or they've been woken). Other tasks are not polled. 517 /// freshly spawned or they've been woken). Other tasks are not polled.
525 /// 518 ///
526 /// You must call `poll` after receiving a call to the [`Pender`]. It is OK 519 /// You must call `poll` after receiving a call to the pender. It is OK
527 /// to call `poll` even when not requested by the `Pender`, but it wastes 520 /// to call `poll` even when not requested by the pender, but it wastes
528 /// energy. 521 /// energy.
529 /// 522 ///
530 /// # Safety 523 /// # Safety
531 /// 524 ///
532 /// You must NOT call `poll` reentrantly on the same executor. 525 /// You must NOT call `poll` reentrantly on the same executor.
533 /// 526 ///
534 /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you 527 /// In particular, note that `poll` may call the pender synchronously. Therefore, you
535 /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to 528 /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
536 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 529 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
537 /// no `poll()` already running. 530 /// no `poll()` already running.
538 pub unsafe fn poll(&'static self) { 531 pub unsafe fn poll(&'static self) {
@@ -573,6 +566,31 @@ pub fn wake_task(task: TaskRef) {
573 } 566 }
574} 567}
575 568
569/// Wake a task by `TaskRef` without calling pend.
570///
571/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
572pub fn wake_task_no_pend(task: TaskRef) {
573 let header = task.header();
574
575 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
576 // If already scheduled, or if not started,
577 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
578 None
579 } else {
580 // Mark it as scheduled
581 Some(state | STATE_RUN_QUEUED)
582 }
583 });
584
585 if res.is_ok() {
586 // We have just marked the task as scheduled, so enqueue it.
587 unsafe {
588 let executor = header.executor.get().unwrap_unchecked();
589 executor.run_queue.enqueue(task);
590 }
591 }
592}
593
576#[cfg(feature = "integrated-timers")] 594#[cfg(feature = "integrated-timers")]
577struct TimerQueue; 595struct TimerQueue;
578 596
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 2b6224045..5a3a0dee1 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -33,7 +33,8 @@ impl<S> SpawnToken<S> {
33 } 33 }
34 } 34 }
35 35
36 pub(crate) fn new_failed() -> Self { 36 /// Return a SpawnToken that represents a failed spawn.
37 pub fn new_failed() -> Self {
37 Self { 38 Self {
38 raw_task: None, 39 raw_task: None,
39 phantom: PhantomData, 40 phantom: PhantomData,