aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorOlof <[email protected]>2024-12-18 01:48:25 +0100
committerGitHub <[email protected]>2024-12-18 01:48:25 +0100
commit7cf96e4730964d085015320648c870a05fbaf431 (patch)
tree04072529b62082cb66443377b589fe08169f83be /embassy-executor/src
parent8678911028a591d72fd1d8418407b5885ed4c417 (diff)
parent341036a8b865609767fbf9015b482ea70ed4f23f (diff)
Merge branch 'embassy-rs:main' into u5_adc
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/spin.rs58
-rw-r--r--embassy-executor/src/lib.rs10
-rw-r--r--embassy-executor/src/raw/mod.rs271
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs3
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs17
-rw-r--r--embassy-executor/src/raw/state_atomics.rs53
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs58
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs66
-rw-r--r--embassy-executor/src/raw/timer_queue.rs119
-rw-r--r--embassy-executor/src/raw/trace.rs84
-rw-r--r--embassy-executor/src/raw/waker.rs28
-rw-r--r--embassy-executor/src/spawner.rs17
12 files changed, 415 insertions, 369 deletions
diff --git a/embassy-executor/src/arch/spin.rs b/embassy-executor/src/arch/spin.rs
new file mode 100644
index 000000000..340023620
--- /dev/null
+++ b/embassy-executor/src/arch/spin.rs
@@ -0,0 +1,58 @@
1#[cfg(feature = "executor-interrupt")]
2compile_error!("`executor-interrupt` is not supported with `arch-spin`.");
3
4#[cfg(feature = "executor-thread")]
5pub use thread::*;
6#[cfg(feature = "executor-thread")]
7mod thread {
8 use core::marker::PhantomData;
9
10 pub use embassy_executor_macros::main_spin as main;
11
12 use crate::{raw, Spawner};
13
14 #[export_name = "__pender"]
15 fn __pender(_context: *mut ()) {}
16
17 /// Spin Executor
18 pub struct Executor {
19 inner: raw::Executor,
20 not_send: PhantomData<*mut ()>,
21 }
22
23 impl Executor {
24 /// Create a new Executor.
25 pub fn new() -> Self {
26 Self {
27 inner: raw::Executor::new(core::ptr::null_mut()),
28 not_send: PhantomData,
29 }
30 }
31
32 /// Run the executor.
33 ///
34 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
35 /// this executor. Use it to spawn the initial task(s). After `init` returns,
36 /// the executor starts running the tasks.
37 ///
38 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
39 /// for example by passing it as an argument to the initial tasks.
40 ///
41 /// This function requires `&'static mut self`. This means you have to store the
42 /// Executor instance in a place where it'll live forever and grants you mutable
43 /// access. There's a few ways to do this:
44 ///
45 /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
46 /// - a `static mut` (unsafe)
47 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
48 ///
49 /// This function never returns.
50 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
51 init(self.inner.spawner());
52
53 loop {
54 unsafe { self.inner.poll() };
55 }
56 }
57 }
58}
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs
index 6a2e493a2..d816539ac 100644
--- a/embassy-executor/src/lib.rs
+++ b/embassy-executor/src/lib.rs
@@ -23,7 +23,14 @@ macro_rules! check_at_most_one {
23 check_at_most_one!(@amo [$($f)*] [$($f)*] []); 23 check_at_most_one!(@amo [$($f)*] [$($f)*] []);
24 }; 24 };
25} 25}
26check_at_most_one!("arch-avr", "arch-cortex-m", "arch-riscv32", "arch-std", "arch-wasm",); 26check_at_most_one!(
27 "arch-avr",
28 "arch-cortex-m",
29 "arch-riscv32",
30 "arch-std",
31 "arch-wasm",
32 "arch-spin",
33);
27 34
28#[cfg(feature = "_arch")] 35#[cfg(feature = "_arch")]
29#[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")] 36#[cfg_attr(feature = "arch-avr", path = "arch/avr.rs")]
@@ -31,6 +38,7 @@ check_at_most_one!("arch-avr", "arch-cortex-m", "arch-riscv32", "arch-std", "arc
31#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] 38#[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")]
32#[cfg_attr(feature = "arch-std", path = "arch/std.rs")] 39#[cfg_attr(feature = "arch-std", path = "arch/std.rs")]
33#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] 40#[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")]
41#[cfg_attr(feature = "arch-spin", path = "arch/spin.rs")]
34mod arch; 42mod arch;
35 43
36#[cfg(feature = "_arch")] 44#[cfg(feature = "_arch")]
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index d9ea5c005..e38a2af66 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -16,8 +16,9 @@ mod run_queue;
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state; 17mod state;
18 18
19#[cfg(feature = "integrated-timers")] 19pub mod timer_queue;
20mod timer_queue; 20#[cfg(feature = "trace")]
21mod trace;
21pub(crate) mod util; 22pub(crate) mod util;
22#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 23#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
23mod waker; 24mod waker;
@@ -27,13 +28,9 @@ use core::marker::PhantomData;
27use core::mem; 28use core::mem;
28use core::pin::Pin; 29use core::pin::Pin;
29use core::ptr::NonNull; 30use core::ptr::NonNull;
31use core::sync::atomic::{AtomicPtr, Ordering};
30use core::task::{Context, Poll}; 32use core::task::{Context, Poll};
31 33
32#[cfg(feature = "integrated-timers")]
33use embassy_time_driver::AlarmHandle;
34#[cfg(feature = "rtos-trace")]
35use rtos_trace::trace;
36
37use self::run_queue::{RunQueue, RunQueueItem}; 34use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 35use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 36use self::util::{SyncUnsafeCell, UninitCell};
@@ -41,20 +38,56 @@ pub use self::waker::task_from_waker;
41use super::SpawnToken; 38use super::SpawnToken;
42 39
43/// Raw task header for use in task pointers. 40/// Raw task header for use in task pointers.
41///
42/// A task can be in one of the following states:
43///
44/// - Not spawned: the task is ready to spawn.
45/// - `SPAWNED`: the task is currently spawned and may be running.
46/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
47/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
48/// polling the task's future.
49///
50/// A task's complete life cycle is as follows:
51///
52/// ```text
53/// ┌────────────┐ ┌────────────────────────┐
54/// │Not spawned │◄─5┤Not spawned|Run enqueued│
55/// │ ├6─►│ │
56/// └─────┬──────┘ └──────▲─────────────────┘
57/// 1 │
58/// │ ┌────────────┘
59/// │ 4
60/// ┌─────▼────┴─────────┐
61/// │Spawned|Run enqueued│
62/// │ │
63/// └─────┬▲─────────────┘
64/// 2│
65/// │3
66/// ┌─────▼┴─────┐
67/// │ Spawned │
68/// │ │
69/// └────────────┘
70/// ```
71///
72/// Transitions:
73/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
74/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
75/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
76/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
77/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
78/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
44pub(crate) struct TaskHeader { 79pub(crate) struct TaskHeader {
45 pub(crate) state: State, 80 pub(crate) state: State,
46 pub(crate) run_queue_item: RunQueueItem, 81 pub(crate) run_queue_item: RunQueueItem,
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 82 pub(crate) executor: AtomicPtr<SyncExecutor>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 83 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 84
50 #[cfg(feature = "integrated-timers")] 85 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>,
52 #[cfg(feature = "integrated-timers")]
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 86 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
54} 87}
55 88
56/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 89/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
57#[derive(Clone, Copy)] 90#[derive(Clone, Copy, PartialEq)]
58pub struct TaskRef { 91pub struct TaskRef {
59 ptr: NonNull<TaskHeader>, 92 ptr: NonNull<TaskHeader>,
60} 93}
@@ -76,10 +109,31 @@ impl TaskRef {
76 } 109 }
77 } 110 }
78 111
112 /// # Safety
113 ///
114 /// The result of this function must only be compared
115 /// for equality, or stored, but not used.
116 pub const unsafe fn dangling() -> Self {
117 Self {
118 ptr: NonNull::dangling(),
119 }
120 }
121
79 pub(crate) fn header(self) -> &'static TaskHeader { 122 pub(crate) fn header(self) -> &'static TaskHeader {
80 unsafe { self.ptr.as_ref() } 123 unsafe { self.ptr.as_ref() }
81 } 124 }
82 125
126 /// Returns a reference to the executor that the task is currently running on.
127 pub unsafe fn executor(self) -> Option<&'static Executor> {
128 let executor = self.header().executor.load(Ordering::Relaxed);
129 executor.as_ref().map(|e| Executor::wrap(e))
130 }
131
132 /// Returns a reference to the timer queue item.
133 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
134 &self.header().timer_queue_item
135 }
136
83 /// The returned pointer is valid for the entire TaskStorage. 137 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 138 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 139 self.ptr.as_ptr()
@@ -107,6 +161,10 @@ pub struct TaskStorage<F: Future + 'static> {
107 future: UninitCell<F>, // Valid if STATE_SPAWNED 161 future: UninitCell<F>, // Valid if STATE_SPAWNED
108} 162}
109 163
164unsafe fn poll_exited(_p: TaskRef) {
165 // Nothing to do, the task is already !SPAWNED and dequeued.
166}
167
110impl<F: Future + 'static> TaskStorage<F> { 168impl<F: Future + 'static> TaskStorage<F> {
111 const NEW: Self = Self::new(); 169 const NEW: Self = Self::new();
112 170
@@ -116,13 +174,10 @@ impl<F: Future + 'static> TaskStorage<F> {
116 raw: TaskHeader { 174 raw: TaskHeader {
117 state: State::new(), 175 state: State::new(),
118 run_queue_item: RunQueueItem::new(), 176 run_queue_item: RunQueueItem::new(),
119 executor: SyncUnsafeCell::new(None), 177 executor: AtomicPtr::new(core::ptr::null_mut()),
120 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 178 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
121 poll_fn: SyncUnsafeCell::new(None), 179 poll_fn: SyncUnsafeCell::new(None),
122 180
123 #[cfg(feature = "integrated-timers")]
124 expires_at: SyncUnsafeCell::new(0),
125 #[cfg(feature = "integrated-timers")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 181 timer_queue_item: timer_queue::TimerQueueItem::new(),
127 }, 182 },
128 future: UninitCell::uninit(), 183 future: UninitCell::uninit(),
@@ -151,18 +206,24 @@ impl<F: Future + 'static> TaskStorage<F> {
151 } 206 }
152 207
153 unsafe fn poll(p: TaskRef) { 208 unsafe fn poll(p: TaskRef) {
154 let this = &*(p.as_ptr() as *const TaskStorage<F>); 209 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
155 210
156 let future = Pin::new_unchecked(this.future.as_mut()); 211 let future = Pin::new_unchecked(this.future.as_mut());
157 let waker = waker::from_task(p); 212 let waker = waker::from_task(p);
158 let mut cx = Context::from_waker(&waker); 213 let mut cx = Context::from_waker(&waker);
159 match future.poll(&mut cx) { 214 match future.poll(&mut cx) {
160 Poll::Ready(_) => { 215 Poll::Ready(_) => {
216 // As the future has finished and this function will not be called
217 // again, we can safely drop the future here.
161 this.future.drop_in_place(); 218 this.future.drop_in_place();
162 this.raw.state.despawn();
163 219
164 #[cfg(feature = "integrated-timers")] 220 // We replace the poll_fn with a despawn function, so that the task is cleaned up
165 this.raw.expires_at.set(u64::MAX); 221 // when the executor polls it next.
222 this.raw.poll_fn.set(Some(poll_exited));
223
224 // Make sure we despawn last, so that other threads can only spawn the task
225 // after we're done with it.
226 this.raw.state.despawn();
166 } 227 }
167 Poll::Pending => {} 228 Poll::Pending => {}
168 } 229 }
@@ -316,26 +377,13 @@ impl Pender {
316pub(crate) struct SyncExecutor { 377pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 378 run_queue: RunQueue,
318 pender: Pender, 379 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 380}
325 381
326impl SyncExecutor { 382impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 383 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 384 Self {
332 run_queue: RunQueue::new(), 385 run_queue: RunQueue::new(),
333 pender, 386 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 387 }
340 } 388 }
341 389
@@ -346,90 +394,47 @@ impl SyncExecutor {
346 /// - `task` must be set up to run in this executor. 394 /// - `task` must be set up to run in this executor.
347 /// - `task` must NOT be already enqueued (in this executor or another one). 395 /// - `task` must NOT be already enqueued (in this executor or another one).
348 #[inline(always)] 396 #[inline(always)]
349 unsafe fn enqueue(&self, task: TaskRef) { 397 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
350 #[cfg(feature = "rtos-trace")] 398 #[cfg(feature = "trace")]
351 trace::task_ready_begin(task.as_ptr() as u32); 399 trace::task_ready_begin(self, &task);
352 400
353 if self.run_queue.enqueue(task) { 401 if self.run_queue.enqueue(task, l) {
354 self.pender.pend(); 402 self.pender.pend();
355 } 403 }
356 } 404 }
357 405
358 #[cfg(feature = "integrated-timers")]
359 fn alarm_callback(ctx: *mut ()) {
360 let this: &Self = unsafe { &*(ctx as *const Self) };
361 this.pender.pend();
362 }
363
364 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 406 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
365 task.header().executor.set(Some(self)); 407 task.header()
408 .executor
409 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
366 410
367 #[cfg(feature = "rtos-trace")] 411 #[cfg(feature = "trace")]
368 trace::task_new(task.as_ptr() as u32); 412 trace::task_new(self, &task);
369 413
370 self.enqueue(task); 414 state::locked(|l| {
415 self.enqueue(task, l);
416 })
371 } 417 }
372 418
373 /// # Safety 419 /// # Safety
374 /// 420 ///
375 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 421 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
376 pub(crate) unsafe fn poll(&'static self) { 422 pub(crate) unsafe fn poll(&'static self) {
377 #[cfg(feature = "integrated-timers")] 423 self.run_queue.dequeue_all(|p| {
378 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); 424 let task = p.header();
379
380 #[allow(clippy::never_loop)]
381 loop {
382 #[cfg(feature = "integrated-timers")]
383 self.timer_queue
384 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
385
386 self.run_queue.dequeue_all(|p| {
387 let task = p.header();
388
389 #[cfg(feature = "integrated-timers")]
390 task.expires_at.set(u64::MAX);
391
392 if !task.state.run_dequeue() {
393 // If task is not running, ignore it. This can happen in the following scenario:
394 // - Task gets dequeued, poll starts
395 // - While task is being polled, it gets woken. It gets placed in the queue.
396 // - Task poll finishes, returning done=true
397 // - RUNNING bit is cleared, but the task is already in the queue.
398 return;
399 }
400
401 #[cfg(feature = "rtos-trace")]
402 trace::task_exec_begin(p.as_ptr() as u32);
403
404 // Run the task
405 task.poll_fn.get().unwrap_unchecked()(p);
406
407 #[cfg(feature = "rtos-trace")]
408 trace::task_exec_end();
409
410 // Enqueue or update into timer_queue
411 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.update(p);
413 });
414
415 #[cfg(feature = "integrated-timers")]
416 {
417 // If this is already in the past, set_alarm might return false
418 // In that case do another poll loop iteration.
419 let next_expiration = self.timer_queue.next_expiration();
420 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
421 break;
422 }
423 }
424 425
425 #[cfg(not(feature = "integrated-timers"))] 426 #[cfg(feature = "trace")]
426 { 427 trace::task_exec_begin(self, &p);
427 break; 428
428 } 429 // Run the task
429 } 430 task.poll_fn.get().unwrap_unchecked()(p);
431
432 #[cfg(feature = "trace")]
433 trace::task_exec_end(self, &p);
434 });
430 435
431 #[cfg(feature = "rtos-trace")] 436 #[cfg(feature = "trace")]
432 trace::system_idle(); 437 trace::executor_idle(self)
433 } 438 }
434} 439}
435 440
@@ -516,6 +521,8 @@ impl Executor {
516 /// 521 ///
517 /// # Safety 522 /// # Safety
518 /// 523 ///
524 /// You must call `initialize` before calling this method.
525 ///
519 /// You must NOT call `poll` reentrantly on the same executor. 526 /// You must NOT call `poll` reentrantly on the same executor.
520 /// 527 ///
521 /// In particular, note that `poll` may call the pender synchronously. Therefore, you 528 /// In particular, note that `poll` may call the pender synchronously. Therefore, you
@@ -540,13 +547,13 @@ impl Executor {
540/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 547/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
541pub fn wake_task(task: TaskRef) { 548pub fn wake_task(task: TaskRef) {
542 let header = task.header(); 549 let header = task.header();
543 if header.state.run_enqueue() { 550 header.state.run_enqueue(|l| {
544 // We have just marked the task as scheduled, so enqueue it. 551 // We have just marked the task as scheduled, so enqueue it.
545 unsafe { 552 unsafe {
546 let executor = header.executor.get().unwrap_unchecked(); 553 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
547 executor.enqueue(task); 554 executor.enqueue(task, l);
548 } 555 }
549 } 556 });
550} 557}
551 558
552/// Wake a task by `TaskRef` without calling pend. 559/// Wake a task by `TaskRef` without calling pend.
@@ -554,57 +561,11 @@ pub fn wake_task(task: TaskRef) {
554/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 561/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
555pub fn wake_task_no_pend(task: TaskRef) { 562pub fn wake_task_no_pend(task: TaskRef) {
556 let header = task.header(); 563 let header = task.header();
557 if header.state.run_enqueue() { 564 header.state.run_enqueue(|l| {
558 // We have just marked the task as scheduled, so enqueue it. 565 // We have just marked the task as scheduled, so enqueue it.
559 unsafe { 566 unsafe {
560 let executor = header.executor.get().unwrap_unchecked(); 567 let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
561 executor.run_queue.enqueue(task); 568 executor.run_queue.enqueue(task, l);
562 }
563 }
564}
565
566#[cfg(feature = "integrated-timers")]
567struct TimerQueue;
568
569#[cfg(feature = "integrated-timers")]
570impl embassy_time_queue_driver::TimerQueue for TimerQueue {
571 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
572 let task = waker::task_from_waker(waker);
573 let task = task.header();
574 unsafe {
575 let expires_at = task.expires_at.get();
576 task.expires_at.set(expires_at.min(at));
577 } 569 }
578 } 570 });
579}
580
581#[cfg(feature = "integrated-timers")]
582embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
583
584#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))]
585const fn gcd(a: u64, b: u64) -> u64 {
586 if b == 0 {
587 a
588 } else {
589 gcd(b, a % b)
590 }
591} 571}
592
593#[cfg(feature = "rtos-trace")]
594impl rtos_trace::RtosTraceOSCallbacks for Executor {
595 fn task_list() {
596 // We don't know what tasks exist, so we can't send them.
597 }
598 #[cfg(feature = "integrated-timers")]
599 fn time() -> u64 {
600 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
601 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
602 }
603 #[cfg(not(feature = "integrated-timers"))]
604 fn time() -> u64 {
605 0
606 }
607}
608
609#[cfg(feature = "rtos-trace")]
610rtos_trace::global_os_callbacks! {Executor}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
index 90907cfda..ce511d79a 100644
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_atomics.rs
@@ -45,7 +45,7 @@ impl RunQueue {
45 /// 45 ///
46 /// `item` must NOT be already enqueued in any queue. 46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)] 47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { 48 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
49 let mut was_empty = false; 49 let mut was_empty = false;
50 50
51 self.head 51 self.head
@@ -81,6 +81,7 @@ impl RunQueue {
81 // safety: there are no concurrent accesses to `next` 81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() }; 82 next = unsafe { task.header().run_queue_item.next.get() };
83 83
84 task.header().state.run_dequeue();
84 on_task(task); 85 on_task(task);
85 } 86 }
86 } 87 }
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
index ba59c8f29..86c4085ed 100644
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ b/embassy-executor/src/raw/run_queue_critical_section.rs
@@ -44,13 +44,11 @@ impl RunQueue {
44 /// 44 ///
45 /// `item` must NOT be already enqueued in any queue. 45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)] 46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { 47 pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
48 critical_section::with(|cs| { 48 let prev = self.head.borrow(cs).replace(Some(task));
49 let prev = self.head.borrow(cs).replace(Some(task)); 49 task.header().run_queue_item.next.borrow(cs).set(prev);
50 task.header().run_queue_item.next.borrow(cs).set(prev);
51 50
52 prev.is_none() 51 prev.is_none()
53 })
54 } 52 }
55 53
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 54 /// Empty the queue, then call `on_task` for each task that was in the queue.
@@ -65,9 +63,10 @@ impl RunQueue {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten. 63 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task. 64 // Therefore, first read the next pointer, and only then process the task.
67 65
68 // safety: we know if the task is enqueued, no one else will touch the `next` pointer. 66 critical_section::with(|cs| {
69 let cs = unsafe { CriticalSection::new() }; 67 next = task.header().run_queue_item.next.borrow(cs).get();
70 next = task.header().run_queue_item.next.borrow(cs).get(); 68 task.header().state.run_dequeue(cs);
69 });
71 70
72 on_task(task); 71 on_task(task);
73 } 72 }
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e1279ac0b..b6576bfc2 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,12 +1,19 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1use core::sync::atomic::{AtomicU32, Ordering};
2 2
3#[derive(Clone, Copy)]
4pub(crate) struct Token(());
5
6/// Creates a token and passes it to the closure.
7///
8/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
9pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
10 f(Token(()))
11}
12
3/// Task is spawned (has a future) 13/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 14pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
5/// Task is in the executor run queue 15/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 16pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
7/// Task is in the executor timer queue
8#[cfg(feature = "integrated-timers")]
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
10 17
11pub(crate) struct State { 18pub(crate) struct State {
12 state: AtomicU32, 19 state: AtomicU32,
@@ -33,41 +40,19 @@ impl State {
33 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 40 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
34 } 41 }
35 42
36 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 43 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
44 /// function if the task was successfully marked.
37 #[inline(always)] 45 #[inline(always)]
38 pub fn run_enqueue(&self) -> bool { 46 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
39 self.state 47 let prev = self.state.fetch_or(STATE_RUN_QUEUED, Ordering::AcqRel);
40 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { 48 if prev & STATE_RUN_QUEUED == 0 {
41 // If already scheduled, or if not started, 49 locked(f);
42 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 50 }
43 None
44 } else {
45 // Mark it as scheduled
46 Some(state | STATE_RUN_QUEUED)
47 }
48 })
49 .is_ok()
50 } 51 }
51 52
52 /// Unmark the task as run-queued. Return whether the task is spawned. 53 /// Unmark the task as run-queued. Return whether the task is spawned.
53 #[inline(always)] 54 #[inline(always)]
54 pub fn run_dequeue(&self) -> bool { 55 pub fn run_dequeue(&self) {
55 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 56 self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
56 state & STATE_SPAWNED != 0
57 }
58
59 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
60 #[cfg(feature = "integrated-timers")]
61 #[inline(always)]
62 pub fn timer_enqueue(&self) -> bool {
63 let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
64 old_state & STATE_TIMER_QUEUED == 0
65 }
66
67 /// Unmark the task as timer-queued.
68 #[cfg(feature = "integrated-timers")]
69 #[inline(always)]
70 pub fn timer_dequeue(&self) {
71 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
72 } 57 }
73} 58}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index e4dfe5093..b743dcc2c 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,6 +1,15 @@
1use core::arch::asm;
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 1use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 2
3#[derive(Clone, Copy)]
4pub(crate) struct Token(());
5
6/// Creates a token and passes it to the closure.
7///
8/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
9pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
10 f(Token(()))
11}
12
4// Must be kept in sync with the layout of `State`! 13// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 14pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; 15pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
@@ -11,9 +20,8 @@ pub(crate) struct State {
11 spawned: AtomicBool, 20 spawned: AtomicBool,
12 /// Task is in the executor run queue 21 /// Task is in the executor run queue
13 run_queued: AtomicBool, 22 run_queued: AtomicBool,
14 /// Task is in the executor timer queue
15 timer_queued: AtomicBool,
16 pad: AtomicBool, 23 pad: AtomicBool,
24 pad2: AtomicBool,
17} 25}
18 26
19impl State { 27impl State {
@@ -21,8 +29,8 @@ impl State {
21 Self { 29 Self {
22 spawned: AtomicBool::new(false), 30 spawned: AtomicBool::new(false),
23 run_queued: AtomicBool::new(false), 31 run_queued: AtomicBool::new(false),
24 timer_queued: AtomicBool::new(false),
25 pad: AtomicBool::new(false), 32 pad: AtomicBool::new(false),
33 pad2: AtomicBool::new(false),
26 } 34 }
27 } 35 }
28 36
@@ -54,50 +62,22 @@ impl State {
54 self.spawned.store(false, Ordering::Relaxed); 62 self.spawned.store(false, Ordering::Relaxed);
55 } 63 }
56 64
57 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 65 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
66 /// function if the task was successfully marked.
58 #[inline(always)] 67 #[inline(always)]
59 pub fn run_enqueue(&self) -> bool { 68 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
60 unsafe { 69 let old = self.run_queued.swap(true, Ordering::AcqRel);
61 loop {
62 let state: u32;
63 asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));
64 70
65 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 71 if !old {
66 asm!("clrex", options(nomem, nostack)); 72 locked(f);
67 return false;
68 }
69
70 let outcome: usize;
71 let new_state = state | STATE_RUN_QUEUED;
72 asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
73 if outcome == 0 {
74 return true;
75 }
76 }
77 } 73 }
78 } 74 }
79 75
80 /// Unmark the task as run-queued. Return whether the task is spawned. 76 /// Unmark the task as run-queued. Return whether the task is spawned.
81 #[inline(always)] 77 #[inline(always)]
82 pub fn run_dequeue(&self) -> bool { 78 pub fn run_dequeue(&self) {
83 compiler_fence(Ordering::Release); 79 compiler_fence(Ordering::Release);
84 80
85 let r = self.spawned.load(Ordering::Relaxed);
86 self.run_queued.store(false, Ordering::Relaxed); 81 self.run_queued.store(false, Ordering::Relaxed);
87 r
88 }
89
90 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
91 #[cfg(feature = "integrated-timers")]
92 #[inline(always)]
93 pub fn timer_enqueue(&self) -> bool {
94 !self.timer_queued.swap(true, Ordering::Relaxed)
95 }
96
97 /// Unmark the task as timer-queued.
98 #[cfg(feature = "integrated-timers")]
99 #[inline(always)]
100 pub fn timer_dequeue(&self) {
101 self.timer_queued.store(false, Ordering::Relaxed);
102 } 82 }
103} 83}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index c3cc1b0b7..6b627ff79 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -1,14 +1,12 @@
1use core::cell::Cell; 1use core::cell::Cell;
2 2
3use critical_section::Mutex; 3pub(crate) use critical_section::{with as locked, CriticalSection as Token};
4use critical_section::{CriticalSection, Mutex};
4 5
5/// Task is spawned (has a future) 6/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 7pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
7/// Task is in the executor run queue 8/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 9pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
9/// Task is in the executor timer queue
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
12 10
13pub(crate) struct State { 11pub(crate) struct State {
14 state: Mutex<Cell<u32>>, 12 state: Mutex<Cell<u32>>,
@@ -22,13 +20,15 @@ impl State {
22 } 20 }
23 21
24 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { 22 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
25 critical_section::with(|cs| { 23 critical_section::with(|cs| self.update_with_cs(cs, f))
26 let s = self.state.borrow(cs); 24 }
27 let mut val = s.get(); 25
28 let r = f(&mut val); 26 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R {
29 s.set(val); 27 let s = self.state.borrow(cs);
30 r 28 let mut val = s.get();
31 }) 29 let r = f(&mut val);
30 s.set(val);
31 r
32 } 32 }
33 33
34 /// If task is idle, mark it as spawned + run_queued and return true. 34 /// If task is idle, mark it as spawned + run_queued and return true.
@@ -50,44 +50,24 @@ impl State {
50 self.update(|s| *s &= !STATE_SPAWNED); 50 self.update(|s| *s &= !STATE_SPAWNED);
51 } 51 }
52 52
53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
54 /// function if the task was successfully marked.
54 #[inline(always)] 55 #[inline(always)]
55 pub fn run_enqueue(&self) -> bool { 56 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
56 self.update(|s| { 57 critical_section::with(|cs| {
57 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { 58 if self.update_with_cs(cs, |s| {
58 false 59 let ok = *s & STATE_RUN_QUEUED == 0;
59 } else {
60 *s |= STATE_RUN_QUEUED; 60 *s |= STATE_RUN_QUEUED;
61 true 61 ok
62 }) {
63 f(cs);
62 } 64 }
63 }) 65 });
64 } 66 }
65 67
66 /// Unmark the task as run-queued. Return whether the task is spawned. 68 /// Unmark the task as run-queued. Return whether the task is spawned.
67 #[inline(always)] 69 #[inline(always)]
68 pub fn run_dequeue(&self) -> bool { 70 pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
69 self.update(|s| { 71 self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
70 let ok = *s & STATE_SPAWNED != 0;
71 *s &= !STATE_RUN_QUEUED;
72 ok
73 })
74 }
75
76 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
77 #[cfg(feature = "integrated-timers")]
78 #[inline(always)]
79 pub fn timer_enqueue(&self) -> bool {
80 self.update(|s| {
81 let ok = *s & STATE_TIMER_QUEUED == 0;
82 *s |= STATE_TIMER_QUEUED;
83 ok
84 })
85 }
86
87 /// Unmark the task as timer-queued.
88 #[cfg(feature = "integrated-timers")]
89 #[inline(always)]
90 pub fn timer_dequeue(&self) {
91 self.update(|s| *s &= !STATE_TIMER_QUEUED);
92 } 72 }
93} 73}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 94a5f340b..e52453be4 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,76 +1,73 @@
1use core::cmp::min; 1//! Timer queue operations.
2
3use core::cell::Cell;
2 4
3use super::TaskRef; 5use super::TaskRef;
4use crate::raw::util::SyncUnsafeCell;
5 6
6pub(crate) struct TimerQueueItem { 7#[cfg(feature = "_timer-item-payload")]
7 next: SyncUnsafeCell<Option<TaskRef>>, 8macro_rules! define_opaque {
8} 9 ($size:tt) => {
10 /// An opaque data type.
11 #[repr(align($size))]
12 pub struct OpaqueData {
13 data: [u8; $size],
14 }
9 15
10impl TimerQueueItem { 16 impl OpaqueData {
11 pub const fn new() -> Self { 17 const fn new() -> Self {
12 Self { 18 Self { data: [0; $size] }
13 next: SyncUnsafeCell::new(None), 19 }
20
21 /// Access the data as a reference to a type `T`.
22 ///
23 /// Safety:
24 ///
25 /// The caller must ensure that the size of the type `T` is less than, or equal to
26 /// the size of the payload, and must ensure that the alignment of the type `T` is
27 /// less than, or equal to the alignment of the payload.
28 ///
29 /// The type must be valid when zero-initialized.
30 pub unsafe fn as_ref<T>(&self) -> &T {
31 &*(self.data.as_ptr() as *const T)
32 }
14 } 33 }
15 } 34 };
16} 35}
17 36
18pub(crate) struct TimerQueue { 37#[cfg(feature = "timer-item-payload-size-1")]
19 head: SyncUnsafeCell<Option<TaskRef>>, 38define_opaque!(1);
20} 39#[cfg(feature = "timer-item-payload-size-2")]
40define_opaque!(2);
41#[cfg(feature = "timer-item-payload-size-4")]
42define_opaque!(4);
43#[cfg(feature = "timer-item-payload-size-8")]
44define_opaque!(8);
21 45
22impl TimerQueue { 46/// An item in the timer queue.
23 pub const fn new() -> Self { 47pub struct TimerQueueItem {
24 Self { 48 /// The next item in the queue.
25 head: SyncUnsafeCell::new(None), 49 ///
26 } 50 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
27 } 51 /// value of `Some(dangling_pointer)`
52 pub next: Cell<Option<TaskRef>>,
28 53
29 pub(crate) unsafe fn update(&self, p: TaskRef) { 54 /// The time at which this item expires.
30 let task = p.header(); 55 pub expires_at: Cell<u64>,
31 if task.expires_at.get() != u64::MAX {
32 if task.state.timer_enqueue() {
33 task.timer_queue_item.next.set(self.head.get());
34 self.head.set(Some(p));
35 }
36 }
37 }
38 56
39 pub(crate) unsafe fn next_expiration(&self) -> u64 { 57 /// Some implementation-defined, zero-initialized piece of data.
40 let mut res = u64::MAX; 58 #[cfg(feature = "_timer-item-payload")]
41 self.retain(|p| { 59 pub payload: OpaqueData,
42 let task = p.header(); 60}
43 let expires = task.expires_at.get();
44 res = min(res, expires);
45 expires != u64::MAX
46 });
47 res
48 }
49 61
50 pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) { 62unsafe impl Sync for TimerQueueItem {}
51 self.retain(|p| {
52 let task = p.header();
53 if task.expires_at.get() <= now {
54 on_task(p);
55 false
56 } else {
57 true
58 }
59 });
60 }
61 63
62 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { 64impl TimerQueueItem {
63 let mut prev = &self.head; 65 pub(crate) const fn new() -> Self {
64 while let Some(p) = prev.get() { 66 Self {
65 let task = p.header(); 67 next: Cell::new(None),
66 if f(p) { 68 expires_at: Cell::new(0),
67 // Skip to next 69 #[cfg(feature = "_timer-item-payload")]
68 prev = &task.timer_queue_item.next; 70 payload: OpaqueData::new(),
69 } else {
70 // Remove it
71 prev.set(task.timer_queue_item.next.get());
72 task.state.timer_dequeue();
73 }
74 } 71 }
75 } 72 }
76} 73}
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs
new file mode 100644
index 000000000..b34387b58
--- /dev/null
+++ b/embassy-executor/src/raw/trace.rs
@@ -0,0 +1,84 @@
1#![allow(unused)]
2use crate::raw::{SyncExecutor, TaskRef};
3
4#[cfg(not(feature = "rtos-trace"))]
5extern "Rust" {
6 fn _embassy_trace_task_new(executor_id: u32, task_id: u32);
7 fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32);
8 fn _embassy_trace_task_exec_end(excutor_id: u32, task_id: u32);
9 fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32);
10 fn _embassy_trace_executor_idle(executor_id: u32);
11}
12
13#[inline]
14pub(crate) fn task_new(executor: &SyncExecutor, task: &TaskRef) {
15 #[cfg(not(feature = "rtos-trace"))]
16 unsafe {
17 _embassy_trace_task_new(executor as *const _ as u32, task.as_ptr() as u32)
18 }
19
20 #[cfg(feature = "rtos-trace")]
21 rtos_trace::trace::task_new(task.as_ptr() as u32);
22}
23
24#[inline]
25pub(crate) fn task_ready_begin(executor: &SyncExecutor, task: &TaskRef) {
26 #[cfg(not(feature = "rtos-trace"))]
27 unsafe {
28 _embassy_trace_task_ready_begin(executor as *const _ as u32, task.as_ptr() as u32)
29 }
30 #[cfg(feature = "rtos-trace")]
31 rtos_trace::trace::task_ready_begin(task.as_ptr() as u32);
32}
33
34#[inline]
35pub(crate) fn task_exec_begin(executor: &SyncExecutor, task: &TaskRef) {
36 #[cfg(not(feature = "rtos-trace"))]
37 unsafe {
38 _embassy_trace_task_exec_begin(executor as *const _ as u32, task.as_ptr() as u32)
39 }
40 #[cfg(feature = "rtos-trace")]
41 rtos_trace::trace::task_exec_begin(task.as_ptr() as u32);
42}
43
44#[inline]
45pub(crate) fn task_exec_end(executor: &SyncExecutor, task: &TaskRef) {
46 #[cfg(not(feature = "rtos-trace"))]
47 unsafe {
48 _embassy_trace_task_exec_end(executor as *const _ as u32, task.as_ptr() as u32)
49 }
50 #[cfg(feature = "rtos-trace")]
51 rtos_trace::trace::task_exec_end();
52}
53
54#[inline]
55pub(crate) fn executor_idle(executor: &SyncExecutor) {
56 #[cfg(not(feature = "rtos-trace"))]
57 unsafe {
58 _embassy_trace_executor_idle(executor as *const _ as u32)
59 }
60 #[cfg(feature = "rtos-trace")]
61 rtos_trace::trace::system_idle();
62}
63
64#[cfg(feature = "rtos-trace")]
65impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor {
66 fn task_list() {
67 // We don't know what tasks exist, so we can't send them.
68 }
69 fn time() -> u64 {
70 const fn gcd(a: u64, b: u64) -> u64 {
71 if b == 0 {
72 a
73 } else {
74 gcd(b, a % b)
75 }
76 }
77
78 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
79 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
80 }
81}
82
83#[cfg(feature = "rtos-trace")]
84rtos_trace::global_os_callbacks! {SyncExecutor}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 8bb2cfd05..b7d57c314 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -32,31 +32,11 @@ pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
32/// 32///
33/// Panics if the waker is not created by the Embassy executor. 33/// Panics if the waker is not created by the Embassy executor.
34pub fn task_from_waker(waker: &Waker) -> TaskRef { 34pub fn task_from_waker(waker: &Waker) -> TaskRef {
35 let (vtable, data) = { 35 // make sure to compare vtable addresses. Doing `==` on the references
36 #[cfg(not(feature = "nightly"))] 36 // will compare the contents, which is slower.
37 { 37 if waker.vtable() as *const _ != &VTABLE as *const _ {
38 struct WakerHack {
39 data: *const (),
40 vtable: &'static RawWakerVTable,
41 }
42
43 // safety: OK because WakerHack has the same layout as Waker.
44 // This is not really guaranteed because the structs are `repr(Rust)`, it is
45 // indeed the case in the current implementation.
46 // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
47 let hack: &WakerHack = unsafe { core::mem::transmute(waker) };
48 (hack.vtable, hack.data)
49 }
50
51 #[cfg(feature = "nightly")]
52 {
53 (waker.vtable(), waker.data())
54 }
55 };
56
57 if vtable != &VTABLE {
58 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") 38 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
59 } 39 }
60 // safety: our wakers are always created with `TaskRef::as_ptr` 40 // safety: our wakers are always created with `TaskRef::as_ptr`
61 unsafe { TaskRef::from_ptr(data as *const TaskHeader) } 41 unsafe { TaskRef::from_ptr(waker.data() as *const TaskHeader) }
62} 42}
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 271606244..16347ad71 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -1,6 +1,7 @@
1use core::future::poll_fn; 1use core::future::poll_fn;
2use core::marker::PhantomData; 2use core::marker::PhantomData;
3use core::mem; 3use core::mem;
4use core::sync::atomic::Ordering;
4use core::task::Poll; 5use core::task::Poll;
5 6
6use super::raw; 7use super::raw;
@@ -92,7 +93,13 @@ impl Spawner {
92 pub async fn for_current_executor() -> Self { 93 pub async fn for_current_executor() -> Self {
93 poll_fn(|cx| { 94 poll_fn(|cx| {
94 let task = raw::task_from_waker(cx.waker()); 95 let task = raw::task_from_waker(cx.waker());
95 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 96 let executor = unsafe {
97 task.header()
98 .executor
99 .load(Ordering::Relaxed)
100 .as_ref()
101 .unwrap_unchecked()
102 };
96 let executor = unsafe { raw::Executor::wrap(executor) }; 103 let executor = unsafe { raw::Executor::wrap(executor) };
97 Poll::Ready(Self::new(executor)) 104 Poll::Ready(Self::new(executor))
98 }) 105 })
@@ -164,7 +171,13 @@ impl SendSpawner {
164 pub async fn for_current_executor() -> Self { 171 pub async fn for_current_executor() -> Self {
165 poll_fn(|cx| { 172 poll_fn(|cx| {
166 let task = raw::task_from_waker(cx.waker()); 173 let task = raw::task_from_waker(cx.waker());
167 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 174 let executor = unsafe {
175 task.header()
176 .executor
177 .load(Ordering::Relaxed)
178 .as_ref()
179 .unwrap_unchecked()
180 };
168 Poll::Ready(Self::new(executor)) 181 Poll::Ready(Self::new(executor))
169 }) 182 })
170 .await 183 .await