aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2024-12-16 12:30:30 +0000
committerGitHub <[email protected]>2024-12-16 12:30:30 +0000
commit2c3bc75da6008afa7cacc1045954cef7e3d8740f (patch)
tree47661322d49d3e38717e2fc3f38e920c222138f7 /embassy-executor/src
parent99ad61cecf4fe098feeced5524d3e60625137457 (diff)
parente1c00613288024623f7fde61f65c4c40c9a5381a (diff)
Merge pull request #3593 from bugadani/refactor
Rework time-driver contract.
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/avr.rs4
-rw-r--r--embassy-executor/src/arch/cortex_m.rs6
-rw-r--r--embassy-executor/src/arch/riscv32.rs4
-rw-r--r--embassy-executor/src/arch/spin.rs4
-rw-r--r--embassy-executor/src/arch/std.rs4
-rw-r--r--embassy-executor/src/arch/wasm.rs4
-rw-r--r--embassy-executor/src/raw/mod.rs197
-rw-r--r--embassy-executor/src/raw/state_atomics.rs30
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs27
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs20
-rw-r--r--embassy-executor/src/raw/timer_queue.rs91
-rw-r--r--embassy-executor/src/raw/trace.rs22
12 files changed, 176 insertions, 237 deletions
diff --git a/embassy-executor/src/arch/avr.rs b/embassy-executor/src/arch/avr.rs
index 7f9ed4421..70085d04d 100644
--- a/embassy-executor/src/arch/avr.rs
+++ b/embassy-executor/src/arch/avr.rs
@@ -53,10 +53,6 @@ mod thread {
53 /// 53 ///
54 /// This function never returns. 54 /// This function never returns.
55 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 55 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
56 unsafe {
57 self.inner.initialize();
58 }
59
60 init(self.inner.spawner()); 56 init(self.inner.spawner());
61 57
62 loop { 58 loop {
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs
index 0c2af88a6..5c517e0a2 100644
--- a/embassy-executor/src/arch/cortex_m.rs
+++ b/embassy-executor/src/arch/cortex_m.rs
@@ -98,9 +98,6 @@ mod thread {
98 /// 98 ///
99 /// This function never returns. 99 /// This function never returns.
100 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 100 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
101 unsafe {
102 self.inner.initialize();
103 }
104 init(self.inner.spawner()); 101 init(self.inner.spawner());
105 102
106 loop { 103 loop {
@@ -210,9 +207,6 @@ mod interrupt {
210 } 207 }
211 208
212 let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; 209 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
213 unsafe {
214 executor.initialize();
215 }
216 210
217 unsafe { NVIC::unmask(irq) } 211 unsafe { NVIC::unmask(irq) }
218 212
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs
index 715e5f3cf..01e63a9fd 100644
--- a/embassy-executor/src/arch/riscv32.rs
+++ b/embassy-executor/src/arch/riscv32.rs
@@ -54,10 +54,6 @@ mod thread {
54 /// 54 ///
55 /// This function never returns. 55 /// This function never returns.
56 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 56 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
57 unsafe {
58 self.inner.initialize();
59 }
60
61 init(self.inner.spawner()); 57 init(self.inner.spawner());
62 58
63 loop { 59 loop {
diff --git a/embassy-executor/src/arch/spin.rs b/embassy-executor/src/arch/spin.rs
index 54c7458b3..340023620 100644
--- a/embassy-executor/src/arch/spin.rs
+++ b/embassy-executor/src/arch/spin.rs
@@ -48,10 +48,6 @@ mod thread {
48 /// 48 ///
49 /// This function never returns. 49 /// This function never returns.
50 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 50 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
51 unsafe {
52 self.inner.initialize();
53 }
54
55 init(self.inner.spawner()); 51 init(self.inner.spawner());
56 52
57 loop { 53 loop {
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs
index 948c7711b..b02b15988 100644
--- a/embassy-executor/src/arch/std.rs
+++ b/embassy-executor/src/arch/std.rs
@@ -55,10 +55,6 @@ mod thread {
55 /// 55 ///
56 /// This function never returns. 56 /// This function never returns.
57 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { 57 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
58 unsafe {
59 self.inner.initialize();
60 }
61
62 init(self.inner.spawner()); 58 init(self.inner.spawner());
63 59
64 loop { 60 loop {
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs
index 35025f11f..f9d0f935c 100644
--- a/embassy-executor/src/arch/wasm.rs
+++ b/embassy-executor/src/arch/wasm.rs
@@ -71,10 +71,6 @@ mod thread {
71 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) 71 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
72 pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { 72 pub fn start(&'static mut self, init: impl FnOnce(Spawner)) {
73 unsafe { 73 unsafe {
74 self.inner.initialize();
75 }
76
77 unsafe {
78 let executor = &self.inner; 74 let executor = &self.inner;
79 let future = Closure::new(move |_| { 75 let future = Closure::new(move |_| {
80 executor.poll(); 76 executor.poll();
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 3f93eae6f..7da14468d 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -16,8 +16,7 @@ mod run_queue;
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state; 17mod state;
18 18
19#[cfg(feature = "integrated-timers")] 19pub mod timer_queue;
20mod timer_queue;
21#[cfg(feature = "trace")] 20#[cfg(feature = "trace")]
22mod trace; 21mod trace;
23pub(crate) mod util; 22pub(crate) mod util;
@@ -31,9 +30,6 @@ use core::pin::Pin;
31use core::ptr::NonNull; 30use core::ptr::NonNull;
32use core::task::{Context, Poll}; 31use core::task::{Context, Poll};
33 32
34#[cfg(feature = "integrated-timers")]
35use embassy_time_driver::AlarmHandle;
36
37use self::run_queue::{RunQueue, RunQueueItem}; 33use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 34use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 35use self::util::{SyncUnsafeCell, UninitCell};
@@ -47,14 +43,12 @@ pub(crate) struct TaskHeader {
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 43 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 44 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 45
50 #[cfg(feature = "integrated-timers")] 46 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>,
52 #[cfg(feature = "integrated-timers")]
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 47 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
54} 48}
55 49
56/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 50/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
57#[derive(Clone, Copy)] 51#[derive(Clone, Copy, PartialEq)]
58pub struct TaskRef { 52pub struct TaskRef {
59 ptr: NonNull<TaskHeader>, 53 ptr: NonNull<TaskHeader>,
60} 54}
@@ -76,10 +70,53 @@ impl TaskRef {
76 } 70 }
77 } 71 }
78 72
73 /// # Safety
74 ///
75 /// The result of this function must only be compared
76 /// for equality, or stored, but not used.
77 pub const unsafe fn dangling() -> Self {
78 Self {
79 ptr: NonNull::dangling(),
80 }
81 }
82
79 pub(crate) fn header(self) -> &'static TaskHeader { 83 pub(crate) fn header(self) -> &'static TaskHeader {
80 unsafe { self.ptr.as_ref() } 84 unsafe { self.ptr.as_ref() }
81 } 85 }
82 86
87 /// Returns a reference to the executor that the task is currently running on.
88 pub unsafe fn executor(self) -> Option<&'static Executor> {
89 self.header().executor.get().map(|e| Executor::wrap(e))
90 }
91
92 /// Returns a reference to the timer queue item.
93 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
94 &self.header().timer_queue_item
95 }
96
97 /// Mark the task as timer-queued. Return whether it should be actually enqueued
98 /// using `_embassy_time_schedule_wake`.
99 ///
100 /// Entering this state prevents the task from being respawned while in a timer queue.
101 ///
102 /// Safety:
103 ///
104 /// This functions should only be called by the timer queue driver, before
105 /// enqueueing the timer item.
106 pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation {
107 self.header().state.timer_enqueue()
108 }
109
110 /// Unmark the task as timer-queued.
111 ///
112 /// Safety:
113 ///
114 /// This functions should only be called by the timer queue implementation, after the task has
115 /// been removed from the timer queue.
116 pub unsafe fn timer_dequeue(&self) {
117 self.header().state.timer_dequeue()
118 }
119
83 /// The returned pointer is valid for the entire TaskStorage. 120 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 121 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 122 self.ptr.as_ptr()
@@ -120,9 +157,6 @@ impl<F: Future + 'static> TaskStorage<F> {
120 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 157 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
121 poll_fn: SyncUnsafeCell::new(None), 158 poll_fn: SyncUnsafeCell::new(None),
122 159
123 #[cfg(feature = "integrated-timers")]
124 expires_at: SyncUnsafeCell::new(0),
125 #[cfg(feature = "integrated-timers")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 160 timer_queue_item: timer_queue::TimerQueueItem::new(),
127 }, 161 },
128 future: UninitCell::uninit(), 162 future: UninitCell::uninit(),
@@ -159,10 +193,25 @@ impl<F: Future + 'static> TaskStorage<F> {
159 match future.poll(&mut cx) { 193 match future.poll(&mut cx) {
160 Poll::Ready(_) => { 194 Poll::Ready(_) => {
161 this.future.drop_in_place(); 195 this.future.drop_in_place();
196
197 // Mark this task to be timer queued.
198 // We're splitting the enqueue in two parts, so that we can change task state
199 // to something that prevent re-queueing.
200 let op = this.raw.state.timer_enqueue();
201
202 // Now mark the task as not spawned, so that
203 // - it can be spawned again once it has been removed from the timer queue
204 // - it can not be timer-queued again
205 // We must do this before scheduling the wake, to prevent the task from being
206 // dequeued by the time driver while it's still SPAWNED.
162 this.raw.state.despawn(); 207 this.raw.state.despawn();
163 208
164 #[cfg(feature = "integrated-timers")] 209 // Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's
165 this.raw.expires_at.set(u64::MAX); 210 // better to be safe.
211 if op == timer_queue::TimerEnqueueOperation::Enqueue {
212 // Schedule the task in the past, so it gets dequeued ASAP.
213 unsafe { _embassy_time_schedule_wake(0, &waker) }
214 }
166 } 215 }
167 Poll::Pending => {} 216 Poll::Pending => {}
168 } 217 }
@@ -181,6 +230,10 @@ impl<F: Future + 'static> TaskStorage<F> {
181 } 230 }
182} 231}
183 232
233extern "Rust" {
234 fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker);
235}
236
184/// An uninitialized [`TaskStorage`]. 237/// An uninitialized [`TaskStorage`].
185pub struct AvailableTask<F: Future + 'static> { 238pub struct AvailableTask<F: Future + 'static> {
186 task: &'static TaskStorage<F>, 239 task: &'static TaskStorage<F>,
@@ -316,34 +369,16 @@ impl Pender {
316pub(crate) struct SyncExecutor { 369pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 370 run_queue: RunQueue,
318 pender: Pender, 371 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 372}
325 373
326impl SyncExecutor { 374impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 375 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 376 Self {
332 run_queue: RunQueue::new(), 377 run_queue: RunQueue::new(),
333 pender, 378 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 379 }
340 } 380 }
341 381
342 pub(crate) unsafe fn initialize(&'static self) {
343 #[cfg(feature = "integrated-timers")]
344 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
345 }
346
347 /// Enqueue a task in the task queue 382 /// Enqueue a task in the task queue
348 /// 383 ///
349 /// # Safety 384 /// # Safety
@@ -360,12 +395,6 @@ impl SyncExecutor {
360 } 395 }
361 } 396 }
362 397
363 #[cfg(feature = "integrated-timers")]
364 fn alarm_callback(ctx: *mut ()) {
365 let this: &Self = unsafe { &*(ctx as *const Self) };
366 this.pender.pend();
367 }
368
369 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 398 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
370 task.header().executor.set(Some(self)); 399 task.header().executor.set(Some(self));
371 400
@@ -379,56 +408,27 @@ impl SyncExecutor {
379 /// 408 ///
380 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 409 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
381 pub(crate) unsafe fn poll(&'static self) { 410 pub(crate) unsafe fn poll(&'static self) {
382 #[allow(clippy::never_loop)] 411 self.run_queue.dequeue_all(|p| {
383 loop { 412 let task = p.header();
384 #[cfg(feature = "integrated-timers")] 413
385 self.timer_queue 414 if !task.state.run_dequeue() {
386 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); 415 // If task is not running, ignore it. This can happen in the following scenario:
387 416 // - Task gets dequeued, poll starts
388 self.run_queue.dequeue_all(|p| { 417 // - While task is being polled, it gets woken. It gets placed in the queue.
389 let task = p.header(); 418 // - Task poll finishes, returning done=true
390 419 // - RUNNING bit is cleared, but the task is already in the queue.
391 #[cfg(feature = "integrated-timers")] 420 return;
392 task.expires_at.set(u64::MAX); 421 }
393
394 if !task.state.run_dequeue() {
395 // If task is not running, ignore it. This can happen in the following scenario:
396 // - Task gets dequeued, poll starts
397 // - While task is being polled, it gets woken. It gets placed in the queue.
398 // - Task poll finishes, returning done=true
399 // - RUNNING bit is cleared, but the task is already in the queue.
400 return;
401 }
402
403 #[cfg(feature = "trace")]
404 trace::task_exec_begin(self, &p);
405 422
406 // Run the task 423 #[cfg(feature = "trace")]
407 task.poll_fn.get().unwrap_unchecked()(p); 424 trace::task_exec_begin(self, &p);
408 425
409 #[cfg(feature = "trace")] 426 // Run the task
410 trace::task_exec_end(self, &p); 427 task.poll_fn.get().unwrap_unchecked()(p);
411 428
412 // Enqueue or update into timer_queue 429 #[cfg(feature = "trace")]
413 #[cfg(feature = "integrated-timers")] 430 trace::task_exec_end(self, &p);
414 self.timer_queue.update(p); 431 });
415 });
416
417 #[cfg(feature = "integrated-timers")]
418 {
419 // If this is already in the past, set_alarm might return false
420 // In that case do another poll loop iteration.
421 let next_expiration = self.timer_queue.next_expiration();
422 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
423 break;
424 }
425 }
426
427 #[cfg(not(feature = "integrated-timers"))]
428 {
429 break;
430 }
431 }
432 432
433 #[cfg(feature = "trace")] 433 #[cfg(feature = "trace")]
434 trace::executor_idle(self) 434 trace::executor_idle(self)
@@ -494,15 +494,6 @@ impl Executor {
494 } 494 }
495 } 495 }
496 496
497 /// Initializes the executor.
498 ///
499 /// # Safety
500 ///
501 /// This function must be called once before any other method is called.
502 pub unsafe fn initialize(&'static self) {
503 self.inner.initialize();
504 }
505
506 /// Spawn a task in this executor. 497 /// Spawn a task in this executor.
507 /// 498 ///
508 /// # Safety 499 /// # Safety
@@ -575,21 +566,3 @@ pub fn wake_task_no_pend(task: TaskRef) {
575 } 566 }
576 } 567 }
577} 568}
578
579#[cfg(feature = "integrated-timers")]
580struct TimerQueue;
581
582#[cfg(feature = "integrated-timers")]
583impl embassy_time_queue_driver::TimerQueue for TimerQueue {
584 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
585 let task = waker::task_from_waker(waker);
586 let task = task.header();
587 unsafe {
588 let expires_at = task.expires_at.get();
589 task.expires_at.set(expires_at.min(at));
590 }
591 }
592}
593
594#[cfg(feature = "integrated-timers")]
595embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e1279ac0b..15eb9a368 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,11 +1,12 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1use core::sync::atomic::{AtomicU32, Ordering};
2 2
3use super::timer_queue::TimerEnqueueOperation;
4
3/// Task is spawned (has a future) 5/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 6pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
5/// Task is in the executor run queue 7/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
7/// Task is in the executor timer queue 9/// Task is in the executor timer queue
8#[cfg(feature = "integrated-timers")]
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; 10pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
10 11
11pub(crate) struct State { 12pub(crate) struct State {
@@ -56,18 +57,31 @@ impl State {
56 state & STATE_SPAWNED != 0 57 state & STATE_SPAWNED != 0
57 } 58 }
58 59
59 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) 60 /// Mark the task as timer-queued. Return whether it can be enqueued.
60 #[cfg(feature = "integrated-timers")]
61 #[inline(always)] 61 #[inline(always)]
62 pub fn timer_enqueue(&self) -> bool { 62 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
63 let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 63 if self
64 old_state & STATE_TIMER_QUEUED == 0 64 .state
65 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
66 // If not started, ignore it
67 if state & STATE_SPAWNED == 0 {
68 None
69 } else {
70 // Mark it as enqueued
71 Some(state | STATE_TIMER_QUEUED)
72 }
73 })
74 .is_ok()
75 {
76 TimerEnqueueOperation::Enqueue
77 } else {
78 TimerEnqueueOperation::Ignore
79 }
65 } 80 }
66 81
67 /// Unmark the task as timer-queued. 82 /// Unmark the task as timer-queued.
68 #[cfg(feature = "integrated-timers")]
69 #[inline(always)] 83 #[inline(always)]
70 pub fn timer_dequeue(&self) { 84 pub fn timer_dequeue(&self) {
71 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); 85 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed);
72 } 86 }
73} 87}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index e4dfe5093..7a152e8c0 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,9 +1,12 @@
1use core::arch::asm; 1use core::arch::asm;
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 3
4use super::timer_queue::TimerEnqueueOperation;
5
4// Must be kept in sync with the layout of `State`! 6// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 7pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; 8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16;
7 10
8#[repr(C, align(4))] 11#[repr(C, align(4))]
9pub(crate) struct State { 12pub(crate) struct State {
@@ -87,15 +90,29 @@ impl State {
87 r 90 r
88 } 91 }
89 92
90 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) 93 /// Mark the task as timer-queued. Return whether it can be enqueued.
91 #[cfg(feature = "integrated-timers")]
92 #[inline(always)] 94 #[inline(always)]
93 pub fn timer_enqueue(&self) -> bool { 95 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
94 !self.timer_queued.swap(true, Ordering::Relaxed) 96 if self
97 .as_u32()
98 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
99 // If not started, ignore it
100 if state & STATE_SPAWNED == 0 {
101 None
102 } else {
103 // Mark it as enqueued
104 Some(state | STATE_TIMER_QUEUED)
105 }
106 })
107 .is_ok()
108 {
109 TimerEnqueueOperation::Enqueue
110 } else {
111 TimerEnqueueOperation::Ignore
112 }
95 } 113 }
96 114
97 /// Unmark the task as timer-queued. 115 /// Unmark the task as timer-queued.
98 #[cfg(feature = "integrated-timers")]
99 #[inline(always)] 116 #[inline(always)]
100 pub fn timer_dequeue(&self) { 117 pub fn timer_dequeue(&self) {
101 self.timer_queued.store(false, Ordering::Relaxed); 118 self.timer_queued.store(false, Ordering::Relaxed);
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index c3cc1b0b7..367162ba2 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -2,12 +2,13 @@ use core::cell::Cell;
2 2
3use critical_section::Mutex; 3use critical_section::Mutex;
4 4
5use super::timer_queue::TimerEnqueueOperation;
6
5/// Task is spawned (has a future) 7/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 8pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
7/// Task is in the executor run queue 9/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 10pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
9/// Task is in the executor timer queue 11/// Task is in the executor timer queue
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; 12pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
12 13
13pub(crate) struct State { 14pub(crate) struct State {
@@ -73,19 +74,22 @@ impl State {
73 }) 74 })
74 } 75 }
75 76
76 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) 77 /// Mark the task as timer-queued. Return whether it can be enqueued.
77 #[cfg(feature = "integrated-timers")]
78 #[inline(always)] 78 #[inline(always)]
79 pub fn timer_enqueue(&self) -> bool { 79 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
80 self.update(|s| { 80 self.update(|s| {
81 let ok = *s & STATE_TIMER_QUEUED == 0; 81 // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is
82 *s |= STATE_TIMER_QUEUED; 82 // just being spawned, because its executor pointer may still be changing.
83 ok 83 if *s & STATE_SPAWNED == STATE_SPAWNED {
84 *s |= STATE_TIMER_QUEUED;
85 TimerEnqueueOperation::Enqueue
86 } else {
87 TimerEnqueueOperation::Ignore
88 }
84 }) 89 })
85 } 90 }
86 91
87 /// Unmark the task as timer-queued. 92 /// Unmark the task as timer-queued.
88 #[cfg(feature = "integrated-timers")]
89 #[inline(always)] 93 #[inline(always)]
90 pub fn timer_dequeue(&self) { 94 pub fn timer_dequeue(&self) {
91 self.update(|s| *s &= !STATE_TIMER_QUEUED); 95 self.update(|s| *s &= !STATE_TIMER_QUEUED);
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 94a5f340b..2ba0e00a9 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,76 +1,39 @@
1use core::cmp::min; 1//! Timer queue operations.
2
3use core::cell::Cell;
2 4
3use super::TaskRef; 5use super::TaskRef;
4use crate::raw::util::SyncUnsafeCell;
5 6
6pub(crate) struct TimerQueueItem { 7/// An item in the timer queue.
7 next: SyncUnsafeCell<Option<TaskRef>>, 8pub struct TimerQueueItem {
8} 9 /// The next item in the queue.
10 ///
11 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
12 /// value of `Some(dangling_pointer)`
13 pub next: Cell<Option<TaskRef>>,
9 14
10impl TimerQueueItem { 15 /// The time at which this item expires.
11 pub const fn new() -> Self { 16 pub expires_at: Cell<u64>,
12 Self {
13 next: SyncUnsafeCell::new(None),
14 }
15 }
16} 17}
17 18
18pub(crate) struct TimerQueue { 19unsafe impl Sync for TimerQueueItem {}
19 head: SyncUnsafeCell<Option<TaskRef>>,
20}
21 20
22impl TimerQueue { 21impl TimerQueueItem {
23 pub const fn new() -> Self { 22 pub(crate) const fn new() -> Self {
24 Self { 23 Self {
25 head: SyncUnsafeCell::new(None), 24 next: Cell::new(None),
26 } 25 expires_at: Cell::new(0),
27 }
28
29 pub(crate) unsafe fn update(&self, p: TaskRef) {
30 let task = p.header();
31 if task.expires_at.get() != u64::MAX {
32 if task.state.timer_enqueue() {
33 task.timer_queue_item.next.set(self.head.get());
34 self.head.set(Some(p));
35 }
36 } 26 }
37 } 27 }
28}
38 29
39 pub(crate) unsafe fn next_expiration(&self) -> u64 { 30/// The operation to perform after `timer_enqueue` is called.
40 let mut res = u64::MAX; 31#[derive(Debug, Copy, Clone, PartialEq)]
41 self.retain(|p| { 32#[cfg_attr(feature = "defmt", derive(defmt::Format))]
42 let task = p.header(); 33#[must_use]
43 let expires = task.expires_at.get(); 34pub enum TimerEnqueueOperation {
44 res = min(res, expires); 35 /// Enqueue the task (or update its expiration time).
45 expires != u64::MAX 36 Enqueue,
46 }); 37 /// The task must not be enqueued in the timer queue.
47 res 38 Ignore,
48 }
49
50 pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) {
51 self.retain(|p| {
52 let task = p.header();
53 if task.expires_at.get() <= now {
54 on_task(p);
55 false
56 } else {
57 true
58 }
59 });
60 }
61
62 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
63 let mut prev = &self.head;
64 while let Some(p) = prev.get() {
65 let task = p.header();
66 if f(p) {
67 // Skip to next
68 prev = &task.timer_queue_item.next;
69 } else {
70 // Remove it
71 prev.set(task.timer_queue_item.next.get());
72 task.state.timer_dequeue();
73 }
74 }
75 }
76} 39}
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs
index c7bcf9c11..b34387b58 100644
--- a/embassy-executor/src/raw/trace.rs
+++ b/embassy-executor/src/raw/trace.rs
@@ -61,29 +61,23 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) {
61 rtos_trace::trace::system_idle(); 61 rtos_trace::trace::system_idle();
62} 62}
63 63
64#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))]
65const fn gcd(a: u64, b: u64) -> u64 {
66 if b == 0 {
67 a
68 } else {
69 gcd(b, a % b)
70 }
71}
72
73#[cfg(feature = "rtos-trace")] 64#[cfg(feature = "rtos-trace")]
74impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { 65impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor {
75 fn task_list() { 66 fn task_list() {
76 // We don't know what tasks exist, so we can't send them. 67 // We don't know what tasks exist, so we can't send them.
77 } 68 }
78 #[cfg(feature = "integrated-timers")]
79 fn time() -> u64 { 69 fn time() -> u64 {
70 const fn gcd(a: u64, b: u64) -> u64 {
71 if b == 0 {
72 a
73 } else {
74 gcd(b, a % b)
75 }
76 }
77
80 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000); 78 const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000);
81 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M) 79 embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M)
82 } 80 }
83 #[cfg(not(feature = "integrated-timers"))]
84 fn time() -> u64 {
85 0
86 }
87} 81}
88 82
89#[cfg(feature = "rtos-trace")] 83#[cfg(feature = "rtos-trace")]