diff options
| author | Dario Nieuwenhuis <[email protected]> | 2024-12-16 12:30:30 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-12-16 12:30:30 +0000 |
| commit | 2c3bc75da6008afa7cacc1045954cef7e3d8740f (patch) | |
| tree | 47661322d49d3e38717e2fc3f38e920c222138f7 /embassy-executor/src | |
| parent | 99ad61cecf4fe098feeced5524d3e60625137457 (diff) | |
| parent | e1c00613288024623f7fde61f65c4c40c9a5381a (diff) | |
Merge pull request #3593 from bugadani/refactor
Rework time-driver contract.
Diffstat (limited to 'embassy-executor/src')
| -rw-r--r-- | embassy-executor/src/arch/avr.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/cortex_m.rs | 6 | ||||
| -rw-r--r-- | embassy-executor/src/arch/riscv32.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/spin.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/std.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/wasm.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 197 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 30 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics_arm.rs | 27 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_critical_section.rs | 20 | ||||
| -rw-r--r-- | embassy-executor/src/raw/timer_queue.rs | 91 | ||||
| -rw-r--r-- | embassy-executor/src/raw/trace.rs | 22 |
12 files changed, 176 insertions, 237 deletions
diff --git a/embassy-executor/src/arch/avr.rs b/embassy-executor/src/arch/avr.rs index 7f9ed4421..70085d04d 100644 --- a/embassy-executor/src/arch/avr.rs +++ b/embassy-executor/src/arch/avr.rs | |||
| @@ -53,10 +53,6 @@ mod thread { | |||
| 53 | /// | 53 | /// |
| 54 | /// This function never returns. | 54 | /// This function never returns. |
| 55 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 55 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 56 | unsafe { | ||
| 57 | self.inner.initialize(); | ||
| 58 | } | ||
| 59 | |||
| 60 | init(self.inner.spawner()); | 56 | init(self.inner.spawner()); |
| 61 | 57 | ||
| 62 | loop { | 58 | loop { |
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index 0c2af88a6..5c517e0a2 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs | |||
| @@ -98,9 +98,6 @@ mod thread { | |||
| 98 | /// | 98 | /// |
| 99 | /// This function never returns. | 99 | /// This function never returns. |
| 100 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 100 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 101 | unsafe { | ||
| 102 | self.inner.initialize(); | ||
| 103 | } | ||
| 104 | init(self.inner.spawner()); | 101 | init(self.inner.spawner()); |
| 105 | 102 | ||
| 106 | loop { | 103 | loop { |
| @@ -210,9 +207,6 @@ mod interrupt { | |||
| 210 | } | 207 | } |
| 211 | 208 | ||
| 212 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | 209 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; |
| 213 | unsafe { | ||
| 214 | executor.initialize(); | ||
| 215 | } | ||
| 216 | 210 | ||
| 217 | unsafe { NVIC::unmask(irq) } | 211 | unsafe { NVIC::unmask(irq) } |
| 218 | 212 | ||
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index 715e5f3cf..01e63a9fd 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs | |||
| @@ -54,10 +54,6 @@ mod thread { | |||
| 54 | /// | 54 | /// |
| 55 | /// This function never returns. | 55 | /// This function never returns. |
| 56 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 56 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 57 | unsafe { | ||
| 58 | self.inner.initialize(); | ||
| 59 | } | ||
| 60 | |||
| 61 | init(self.inner.spawner()); | 57 | init(self.inner.spawner()); |
| 62 | 58 | ||
| 63 | loop { | 59 | loop { |
diff --git a/embassy-executor/src/arch/spin.rs b/embassy-executor/src/arch/spin.rs index 54c7458b3..340023620 100644 --- a/embassy-executor/src/arch/spin.rs +++ b/embassy-executor/src/arch/spin.rs | |||
| @@ -48,10 +48,6 @@ mod thread { | |||
| 48 | /// | 48 | /// |
| 49 | /// This function never returns. | 49 | /// This function never returns. |
| 50 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 50 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 51 | unsafe { | ||
| 52 | self.inner.initialize(); | ||
| 53 | } | ||
| 54 | |||
| 55 | init(self.inner.spawner()); | 51 | init(self.inner.spawner()); |
| 56 | 52 | ||
| 57 | loop { | 53 | loop { |
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs index 948c7711b..b02b15988 100644 --- a/embassy-executor/src/arch/std.rs +++ b/embassy-executor/src/arch/std.rs | |||
| @@ -55,10 +55,6 @@ mod thread { | |||
| 55 | /// | 55 | /// |
| 56 | /// This function never returns. | 56 | /// This function never returns. |
| 57 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 57 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 58 | unsafe { | ||
| 59 | self.inner.initialize(); | ||
| 60 | } | ||
| 61 | |||
| 62 | init(self.inner.spawner()); | 58 | init(self.inner.spawner()); |
| 63 | 59 | ||
| 64 | loop { | 60 | loop { |
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs index 35025f11f..f9d0f935c 100644 --- a/embassy-executor/src/arch/wasm.rs +++ b/embassy-executor/src/arch/wasm.rs | |||
| @@ -71,10 +71,6 @@ mod thread { | |||
| 71 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | 71 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) |
| 72 | pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { | 72 | pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { |
| 73 | unsafe { | 73 | unsafe { |
| 74 | self.inner.initialize(); | ||
| 75 | } | ||
| 76 | |||
| 77 | unsafe { | ||
| 78 | let executor = &self.inner; | 74 | let executor = &self.inner; |
| 79 | let future = Closure::new(move |_| { | 75 | let future = Closure::new(move |_| { |
| 80 | executor.poll(); | 76 | executor.poll(); |
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 3f93eae6f..7da14468d 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -16,8 +16,7 @@ mod run_queue; | |||
| 16 | #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] | 16 | #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] |
| 17 | mod state; | 17 | mod state; |
| 18 | 18 | ||
| 19 | #[cfg(feature = "integrated-timers")] | 19 | pub mod timer_queue; |
| 20 | mod timer_queue; | ||
| 21 | #[cfg(feature = "trace")] | 20 | #[cfg(feature = "trace")] |
| 22 | mod trace; | 21 | mod trace; |
| 23 | pub(crate) mod util; | 22 | pub(crate) mod util; |
| @@ -31,9 +30,6 @@ use core::pin::Pin; | |||
| 31 | use core::ptr::NonNull; | 30 | use core::ptr::NonNull; |
| 32 | use core::task::{Context, Poll}; | 31 | use core::task::{Context, Poll}; |
| 33 | 32 | ||
| 34 | #[cfg(feature = "integrated-timers")] | ||
| 35 | use embassy_time_driver::AlarmHandle; | ||
| 36 | |||
| 37 | use self::run_queue::{RunQueue, RunQueueItem}; | 33 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 38 | use self::state::State; | 34 | use self::state::State; |
| 39 | use self::util::{SyncUnsafeCell, UninitCell}; | 35 | use self::util::{SyncUnsafeCell, UninitCell}; |
| @@ -47,14 +43,12 @@ pub(crate) struct TaskHeader { | |||
| 47 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, | 43 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, |
| 48 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, | 44 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 49 | 45 | ||
| 50 | #[cfg(feature = "integrated-timers")] | 46 | /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. |
| 51 | pub(crate) expires_at: SyncUnsafeCell<u64>, | ||
| 52 | #[cfg(feature = "integrated-timers")] | ||
| 53 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 47 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 54 | } | 48 | } |
| 55 | 49 | ||
| 56 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. | 50 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 57 | #[derive(Clone, Copy)] | 51 | #[derive(Clone, Copy, PartialEq)] |
| 58 | pub struct TaskRef { | 52 | pub struct TaskRef { |
| 59 | ptr: NonNull<TaskHeader>, | 53 | ptr: NonNull<TaskHeader>, |
| 60 | } | 54 | } |
| @@ -76,10 +70,53 @@ impl TaskRef { | |||
| 76 | } | 70 | } |
| 77 | } | 71 | } |
| 78 | 72 | ||
| 73 | /// # Safety | ||
| 74 | /// | ||
| 75 | /// The result of this function must only be compared | ||
| 76 | /// for equality, or stored, but not used. | ||
| 77 | pub const unsafe fn dangling() -> Self { | ||
| 78 | Self { | ||
| 79 | ptr: NonNull::dangling(), | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 79 | pub(crate) fn header(self) -> &'static TaskHeader { | 83 | pub(crate) fn header(self) -> &'static TaskHeader { |
| 80 | unsafe { self.ptr.as_ref() } | 84 | unsafe { self.ptr.as_ref() } |
| 81 | } | 85 | } |
| 82 | 86 | ||
| 87 | /// Returns a reference to the executor that the task is currently running on. | ||
| 88 | pub unsafe fn executor(self) -> Option<&'static Executor> { | ||
| 89 | self.header().executor.get().map(|e| Executor::wrap(e)) | ||
| 90 | } | ||
| 91 | |||
| 92 | /// Returns a reference to the timer queue item. | ||
| 93 | pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { | ||
| 94 | &self.header().timer_queue_item | ||
| 95 | } | ||
| 96 | |||
| 97 | /// Mark the task as timer-queued. Return whether it should be actually enqueued | ||
| 98 | /// using `_embassy_time_schedule_wake`. | ||
| 99 | /// | ||
| 100 | /// Entering this state prevents the task from being respawned while in a timer queue. | ||
| 101 | /// | ||
| 102 | /// Safety: | ||
| 103 | /// | ||
| 104 | /// This functions should only be called by the timer queue driver, before | ||
| 105 | /// enqueueing the timer item. | ||
| 106 | pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { | ||
| 107 | self.header().state.timer_enqueue() | ||
| 108 | } | ||
| 109 | |||
| 110 | /// Unmark the task as timer-queued. | ||
| 111 | /// | ||
| 112 | /// Safety: | ||
| 113 | /// | ||
| 114 | /// This functions should only be called by the timer queue implementation, after the task has | ||
| 115 | /// been removed from the timer queue. | ||
| 116 | pub unsafe fn timer_dequeue(&self) { | ||
| 117 | self.header().state.timer_dequeue() | ||
| 118 | } | ||
| 119 | |||
| 83 | /// The returned pointer is valid for the entire TaskStorage. | 120 | /// The returned pointer is valid for the entire TaskStorage. |
| 84 | pub(crate) fn as_ptr(self) -> *const TaskHeader { | 121 | pub(crate) fn as_ptr(self) -> *const TaskHeader { |
| 85 | self.ptr.as_ptr() | 122 | self.ptr.as_ptr() |
| @@ -120,9 +157,6 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 120 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 157 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| 121 | poll_fn: SyncUnsafeCell::new(None), | 158 | poll_fn: SyncUnsafeCell::new(None), |
| 122 | 159 | ||
| 123 | #[cfg(feature = "integrated-timers")] | ||
| 124 | expires_at: SyncUnsafeCell::new(0), | ||
| 125 | #[cfg(feature = "integrated-timers")] | ||
| 126 | timer_queue_item: timer_queue::TimerQueueItem::new(), | 160 | timer_queue_item: timer_queue::TimerQueueItem::new(), |
| 127 | }, | 161 | }, |
| 128 | future: UninitCell::uninit(), | 162 | future: UninitCell::uninit(), |
| @@ -159,10 +193,25 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 159 | match future.poll(&mut cx) { | 193 | match future.poll(&mut cx) { |
| 160 | Poll::Ready(_) => { | 194 | Poll::Ready(_) => { |
| 161 | this.future.drop_in_place(); | 195 | this.future.drop_in_place(); |
| 196 | |||
| 197 | // Mark this task to be timer queued. | ||
| 198 | // We're splitting the enqueue in two parts, so that we can change task state | ||
| 199 | // to something that prevent re-queueing. | ||
| 200 | let op = this.raw.state.timer_enqueue(); | ||
| 201 | |||
| 202 | // Now mark the task as not spawned, so that | ||
| 203 | // - it can be spawned again once it has been removed from the timer queue | ||
| 204 | // - it can not be timer-queued again | ||
| 205 | // We must do this before scheduling the wake, to prevent the task from being | ||
| 206 | // dequeued by the time driver while it's still SPAWNED. | ||
| 162 | this.raw.state.despawn(); | 207 | this.raw.state.despawn(); |
| 163 | 208 | ||
| 164 | #[cfg(feature = "integrated-timers")] | 209 | // Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's |
| 165 | this.raw.expires_at.set(u64::MAX); | 210 | // better to be safe. |
| 211 | if op == timer_queue::TimerEnqueueOperation::Enqueue { | ||
| 212 | // Schedule the task in the past, so it gets dequeued ASAP. | ||
| 213 | unsafe { _embassy_time_schedule_wake(0, &waker) } | ||
| 214 | } | ||
| 166 | } | 215 | } |
| 167 | Poll::Pending => {} | 216 | Poll::Pending => {} |
| 168 | } | 217 | } |
| @@ -181,6 +230,10 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 181 | } | 230 | } |
| 182 | } | 231 | } |
| 183 | 232 | ||
| 233 | extern "Rust" { | ||
| 234 | fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker); | ||
| 235 | } | ||
| 236 | |||
| 184 | /// An uninitialized [`TaskStorage`]. | 237 | /// An uninitialized [`TaskStorage`]. |
| 185 | pub struct AvailableTask<F: Future + 'static> { | 238 | pub struct AvailableTask<F: Future + 'static> { |
| 186 | task: &'static TaskStorage<F>, | 239 | task: &'static TaskStorage<F>, |
| @@ -316,34 +369,16 @@ impl Pender { | |||
| 316 | pub(crate) struct SyncExecutor { | 369 | pub(crate) struct SyncExecutor { |
| 317 | run_queue: RunQueue, | 370 | run_queue: RunQueue, |
| 318 | pender: Pender, | 371 | pender: Pender, |
| 319 | |||
| 320 | #[cfg(feature = "integrated-timers")] | ||
| 321 | pub(crate) timer_queue: timer_queue::TimerQueue, | ||
| 322 | #[cfg(feature = "integrated-timers")] | ||
| 323 | alarm: AlarmHandle, | ||
| 324 | } | 372 | } |
| 325 | 373 | ||
| 326 | impl SyncExecutor { | 374 | impl SyncExecutor { |
| 327 | pub(crate) fn new(pender: Pender) -> Self { | 375 | pub(crate) fn new(pender: Pender) -> Self { |
| 328 | #[cfg(feature = "integrated-timers")] | ||
| 329 | let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) }; | ||
| 330 | |||
| 331 | Self { | 376 | Self { |
| 332 | run_queue: RunQueue::new(), | 377 | run_queue: RunQueue::new(), |
| 333 | pender, | 378 | pender, |
| 334 | |||
| 335 | #[cfg(feature = "integrated-timers")] | ||
| 336 | timer_queue: timer_queue::TimerQueue::new(), | ||
| 337 | #[cfg(feature = "integrated-timers")] | ||
| 338 | alarm, | ||
| 339 | } | 379 | } |
| 340 | } | 380 | } |
| 341 | 381 | ||
| 342 | pub(crate) unsafe fn initialize(&'static self) { | ||
| 343 | #[cfg(feature = "integrated-timers")] | ||
| 344 | embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); | ||
| 345 | } | ||
| 346 | |||
| 347 | /// Enqueue a task in the task queue | 382 | /// Enqueue a task in the task queue |
| 348 | /// | 383 | /// |
| 349 | /// # Safety | 384 | /// # Safety |
| @@ -360,12 +395,6 @@ impl SyncExecutor { | |||
| 360 | } | 395 | } |
| 361 | } | 396 | } |
| 362 | 397 | ||
| 363 | #[cfg(feature = "integrated-timers")] | ||
| 364 | fn alarm_callback(ctx: *mut ()) { | ||
| 365 | let this: &Self = unsafe { &*(ctx as *const Self) }; | ||
| 366 | this.pender.pend(); | ||
| 367 | } | ||
| 368 | |||
| 369 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | 398 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 370 | task.header().executor.set(Some(self)); | 399 | task.header().executor.set(Some(self)); |
| 371 | 400 | ||
| @@ -379,56 +408,27 @@ impl SyncExecutor { | |||
| 379 | /// | 408 | /// |
| 380 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. | 409 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. |
| 381 | pub(crate) unsafe fn poll(&'static self) { | 410 | pub(crate) unsafe fn poll(&'static self) { |
| 382 | #[allow(clippy::never_loop)] | 411 | self.run_queue.dequeue_all(|p| { |
| 383 | loop { | 412 | let task = p.header(); |
| 384 | #[cfg(feature = "integrated-timers")] | 413 | |
| 385 | self.timer_queue | 414 | if !task.state.run_dequeue() { |
| 386 | .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); | 415 | // If task is not running, ignore it. This can happen in the following scenario: |
| 387 | 416 | // - Task gets dequeued, poll starts | |
| 388 | self.run_queue.dequeue_all(|p| { | 417 | // - While task is being polled, it gets woken. It gets placed in the queue. |
| 389 | let task = p.header(); | 418 | // - Task poll finishes, returning done=true |
| 390 | 419 | // - RUNNING bit is cleared, but the task is already in the queue. | |
| 391 | #[cfg(feature = "integrated-timers")] | 420 | return; |
| 392 | task.expires_at.set(u64::MAX); | 421 | } |
| 393 | |||
| 394 | if !task.state.run_dequeue() { | ||
| 395 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 396 | // - Task gets dequeued, poll starts | ||
| 397 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 398 | // - Task poll finishes, returning done=true | ||
| 399 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 400 | return; | ||
| 401 | } | ||
| 402 | |||
| 403 | #[cfg(feature = "trace")] | ||
| 404 | trace::task_exec_begin(self, &p); | ||
| 405 | 422 | ||
| 406 | // Run the task | 423 | #[cfg(feature = "trace")] |
| 407 | task.poll_fn.get().unwrap_unchecked()(p); | 424 | trace::task_exec_begin(self, &p); |
| 408 | 425 | ||
| 409 | #[cfg(feature = "trace")] | 426 | // Run the task |
| 410 | trace::task_exec_end(self, &p); | 427 | task.poll_fn.get().unwrap_unchecked()(p); |
| 411 | 428 | ||
| 412 | // Enqueue or update into timer_queue | 429 | #[cfg(feature = "trace")] |
| 413 | #[cfg(feature = "integrated-timers")] | 430 | trace::task_exec_end(self, &p); |
| 414 | self.timer_queue.update(p); | 431 | }); |
| 415 | }); | ||
| 416 | |||
| 417 | #[cfg(feature = "integrated-timers")] | ||
| 418 | { | ||
| 419 | // If this is already in the past, set_alarm might return false | ||
| 420 | // In that case do another poll loop iteration. | ||
| 421 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 422 | if embassy_time_driver::set_alarm(self.alarm, next_expiration) { | ||
| 423 | break; | ||
| 424 | } | ||
| 425 | } | ||
| 426 | |||
| 427 | #[cfg(not(feature = "integrated-timers"))] | ||
| 428 | { | ||
| 429 | break; | ||
| 430 | } | ||
| 431 | } | ||
| 432 | 432 | ||
| 433 | #[cfg(feature = "trace")] | 433 | #[cfg(feature = "trace")] |
| 434 | trace::executor_idle(self) | 434 | trace::executor_idle(self) |
| @@ -494,15 +494,6 @@ impl Executor { | |||
| 494 | } | 494 | } |
| 495 | } | 495 | } |
| 496 | 496 | ||
| 497 | /// Initializes the executor. | ||
| 498 | /// | ||
| 499 | /// # Safety | ||
| 500 | /// | ||
| 501 | /// This function must be called once before any other method is called. | ||
| 502 | pub unsafe fn initialize(&'static self) { | ||
| 503 | self.inner.initialize(); | ||
| 504 | } | ||
| 505 | |||
| 506 | /// Spawn a task in this executor. | 497 | /// Spawn a task in this executor. |
| 507 | /// | 498 | /// |
| 508 | /// # Safety | 499 | /// # Safety |
| @@ -575,21 +566,3 @@ pub fn wake_task_no_pend(task: TaskRef) { | |||
| 575 | } | 566 | } |
| 576 | } | 567 | } |
| 577 | } | 568 | } |
| 578 | |||
| 579 | #[cfg(feature = "integrated-timers")] | ||
| 580 | struct TimerQueue; | ||
| 581 | |||
| 582 | #[cfg(feature = "integrated-timers")] | ||
| 583 | impl embassy_time_queue_driver::TimerQueue for TimerQueue { | ||
| 584 | fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) { | ||
| 585 | let task = waker::task_from_waker(waker); | ||
| 586 | let task = task.header(); | ||
| 587 | unsafe { | ||
| 588 | let expires_at = task.expires_at.get(); | ||
| 589 | task.expires_at.set(expires_at.min(at)); | ||
| 590 | } | ||
| 591 | } | ||
| 592 | } | ||
| 593 | |||
| 594 | #[cfg(feature = "integrated-timers")] | ||
| 595 | embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); | ||
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index e1279ac0b..15eb9a368 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -1,11 +1,12 @@ | |||
| 1 | use core::sync::atomic::{AtomicU32, Ordering}; | 1 | use core::sync::atomic::{AtomicU32, Ordering}; |
| 2 | 2 | ||
| 3 | use super::timer_queue::TimerEnqueueOperation; | ||
| 4 | |||
| 3 | /// Task is spawned (has a future) | 5 | /// Task is spawned (has a future) |
| 4 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 6 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 5 | /// Task is in the executor run queue | 7 | /// Task is in the executor run queue |
| 6 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | 8 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; |
| 7 | /// Task is in the executor timer queue | 9 | /// Task is in the executor timer queue |
| 8 | #[cfg(feature = "integrated-timers")] | ||
| 9 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | 10 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; |
| 10 | 11 | ||
| 11 | pub(crate) struct State { | 12 | pub(crate) struct State { |
| @@ -56,18 +57,31 @@ impl State { | |||
| 56 | state & STATE_SPAWNED != 0 | 57 | state & STATE_SPAWNED != 0 |
| 57 | } | 58 | } |
| 58 | 59 | ||
| 59 | /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) | 60 | /// Mark the task as timer-queued. Return whether it can be enqueued. |
| 60 | #[cfg(feature = "integrated-timers")] | ||
| 61 | #[inline(always)] | 61 | #[inline(always)] |
| 62 | pub fn timer_enqueue(&self) -> bool { | 62 | pub fn timer_enqueue(&self) -> TimerEnqueueOperation { |
| 63 | let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); | 63 | if self |
| 64 | old_state & STATE_TIMER_QUEUED == 0 | 64 | .state |
| 65 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 66 | // If not started, ignore it | ||
| 67 | if state & STATE_SPAWNED == 0 { | ||
| 68 | None | ||
| 69 | } else { | ||
| 70 | // Mark it as enqueued | ||
| 71 | Some(state | STATE_TIMER_QUEUED) | ||
| 72 | } | ||
| 73 | }) | ||
| 74 | .is_ok() | ||
| 75 | { | ||
| 76 | TimerEnqueueOperation::Enqueue | ||
| 77 | } else { | ||
| 78 | TimerEnqueueOperation::Ignore | ||
| 79 | } | ||
| 65 | } | 80 | } |
| 66 | 81 | ||
| 67 | /// Unmark the task as timer-queued. | 82 | /// Unmark the task as timer-queued. |
| 68 | #[cfg(feature = "integrated-timers")] | ||
| 69 | #[inline(always)] | 83 | #[inline(always)] |
| 70 | pub fn timer_dequeue(&self) { | 84 | pub fn timer_dequeue(&self) { |
| 71 | self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); | 85 | self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed); |
| 72 | } | 86 | } |
| 73 | } | 87 | } |
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index e4dfe5093..7a152e8c0 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs | |||
| @@ -1,9 +1,12 @@ | |||
| 1 | use core::arch::asm; | 1 | use core::arch::asm; |
| 2 | use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; | 2 | use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; |
| 3 | 3 | ||
| 4 | use super::timer_queue::TimerEnqueueOperation; | ||
| 5 | |||
| 4 | // Must be kept in sync with the layout of `State`! | 6 | // Must be kept in sync with the layout of `State`! |
| 5 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 7 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 6 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; | 8 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; |
| 9 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16; | ||
| 7 | 10 | ||
| 8 | #[repr(C, align(4))] | 11 | #[repr(C, align(4))] |
| 9 | pub(crate) struct State { | 12 | pub(crate) struct State { |
| @@ -87,15 +90,29 @@ impl State { | |||
| 87 | r | 90 | r |
| 88 | } | 91 | } |
| 89 | 92 | ||
| 90 | /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) | 93 | /// Mark the task as timer-queued. Return whether it can be enqueued. |
| 91 | #[cfg(feature = "integrated-timers")] | ||
| 92 | #[inline(always)] | 94 | #[inline(always)] |
| 93 | pub fn timer_enqueue(&self) -> bool { | 95 | pub fn timer_enqueue(&self) -> TimerEnqueueOperation { |
| 94 | !self.timer_queued.swap(true, Ordering::Relaxed) | 96 | if self |
| 97 | .as_u32() | ||
| 98 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 99 | // If not started, ignore it | ||
| 100 | if state & STATE_SPAWNED == 0 { | ||
| 101 | None | ||
| 102 | } else { | ||
| 103 | // Mark it as enqueued | ||
| 104 | Some(state | STATE_TIMER_QUEUED) | ||
| 105 | } | ||
| 106 | }) | ||
| 107 | .is_ok() | ||
| 108 | { | ||
| 109 | TimerEnqueueOperation::Enqueue | ||
| 110 | } else { | ||
| 111 | TimerEnqueueOperation::Ignore | ||
| 112 | } | ||
| 95 | } | 113 | } |
| 96 | 114 | ||
| 97 | /// Unmark the task as timer-queued. | 115 | /// Unmark the task as timer-queued. |
| 98 | #[cfg(feature = "integrated-timers")] | ||
| 99 | #[inline(always)] | 116 | #[inline(always)] |
| 100 | pub fn timer_dequeue(&self) { | 117 | pub fn timer_dequeue(&self) { |
| 101 | self.timer_queued.store(false, Ordering::Relaxed); | 118 | self.timer_queued.store(false, Ordering::Relaxed); |
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index c3cc1b0b7..367162ba2 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs | |||
| @@ -2,12 +2,13 @@ use core::cell::Cell; | |||
| 2 | 2 | ||
| 3 | use critical_section::Mutex; | 3 | use critical_section::Mutex; |
| 4 | 4 | ||
| 5 | use super::timer_queue::TimerEnqueueOperation; | ||
| 6 | |||
| 5 | /// Task is spawned (has a future) | 7 | /// Task is spawned (has a future) |
| 6 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 8 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 7 | /// Task is in the executor run queue | 9 | /// Task is in the executor run queue |
| 8 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | 10 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; |
| 9 | /// Task is in the executor timer queue | 11 | /// Task is in the executor timer queue |
| 10 | #[cfg(feature = "integrated-timers")] | ||
| 11 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | 12 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; |
| 12 | 13 | ||
| 13 | pub(crate) struct State { | 14 | pub(crate) struct State { |
| @@ -73,19 +74,22 @@ impl State { | |||
| 73 | }) | 74 | }) |
| 74 | } | 75 | } |
| 75 | 76 | ||
| 76 | /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) | 77 | /// Mark the task as timer-queued. Return whether it can be enqueued. |
| 77 | #[cfg(feature = "integrated-timers")] | ||
| 78 | #[inline(always)] | 78 | #[inline(always)] |
| 79 | pub fn timer_enqueue(&self) -> bool { | 79 | pub fn timer_enqueue(&self) -> TimerEnqueueOperation { |
| 80 | self.update(|s| { | 80 | self.update(|s| { |
| 81 | let ok = *s & STATE_TIMER_QUEUED == 0; | 81 | // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is |
| 82 | *s |= STATE_TIMER_QUEUED; | 82 | // just being spawned, because its executor pointer may still be changing. |
| 83 | ok | 83 | if *s & STATE_SPAWNED == STATE_SPAWNED { |
| 84 | *s |= STATE_TIMER_QUEUED; | ||
| 85 | TimerEnqueueOperation::Enqueue | ||
| 86 | } else { | ||
| 87 | TimerEnqueueOperation::Ignore | ||
| 88 | } | ||
| 84 | }) | 89 | }) |
| 85 | } | 90 | } |
| 86 | 91 | ||
| 87 | /// Unmark the task as timer-queued. | 92 | /// Unmark the task as timer-queued. |
| 88 | #[cfg(feature = "integrated-timers")] | ||
| 89 | #[inline(always)] | 93 | #[inline(always)] |
| 90 | pub fn timer_dequeue(&self) { | 94 | pub fn timer_dequeue(&self) { |
| 91 | self.update(|s| *s &= !STATE_TIMER_QUEUED); | 95 | self.update(|s| *s &= !STATE_TIMER_QUEUED); |
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 94a5f340b..2ba0e00a9 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs | |||
| @@ -1,76 +1,39 @@ | |||
| 1 | use core::cmp::min; | 1 | //! Timer queue operations. |
| 2 | |||
| 3 | use core::cell::Cell; | ||
| 2 | 4 | ||
| 3 | use super::TaskRef; | 5 | use super::TaskRef; |
| 4 | use crate::raw::util::SyncUnsafeCell; | ||
| 5 | 6 | ||
| 6 | pub(crate) struct TimerQueueItem { | 7 | /// An item in the timer queue. |
| 7 | next: SyncUnsafeCell<Option<TaskRef>>, | 8 | pub struct TimerQueueItem { |
| 8 | } | 9 | /// The next item in the queue. |
| 10 | /// | ||
| 11 | /// If this field contains `Some`, the item is in the queue. The last item in the queue has a | ||
| 12 | /// value of `Some(dangling_pointer)` | ||
| 13 | pub next: Cell<Option<TaskRef>>, | ||
| 9 | 14 | ||
| 10 | impl TimerQueueItem { | 15 | /// The time at which this item expires. |
| 11 | pub const fn new() -> Self { | 16 | pub expires_at: Cell<u64>, |
| 12 | Self { | ||
| 13 | next: SyncUnsafeCell::new(None), | ||
| 14 | } | ||
| 15 | } | ||
| 16 | } | 17 | } |
| 17 | 18 | ||
| 18 | pub(crate) struct TimerQueue { | 19 | unsafe impl Sync for TimerQueueItem {} |
| 19 | head: SyncUnsafeCell<Option<TaskRef>>, | ||
| 20 | } | ||
| 21 | 20 | ||
| 22 | impl TimerQueue { | 21 | impl TimerQueueItem { |
| 23 | pub const fn new() -> Self { | 22 | pub(crate) const fn new() -> Self { |
| 24 | Self { | 23 | Self { |
| 25 | head: SyncUnsafeCell::new(None), | 24 | next: Cell::new(None), |
| 26 | } | 25 | expires_at: Cell::new(0), |
| 27 | } | ||
| 28 | |||
| 29 | pub(crate) unsafe fn update(&self, p: TaskRef) { | ||
| 30 | let task = p.header(); | ||
| 31 | if task.expires_at.get() != u64::MAX { | ||
| 32 | if task.state.timer_enqueue() { | ||
| 33 | task.timer_queue_item.next.set(self.head.get()); | ||
| 34 | self.head.set(Some(p)); | ||
| 35 | } | ||
| 36 | } | 26 | } |
| 37 | } | 27 | } |
| 28 | } | ||
| 38 | 29 | ||
| 39 | pub(crate) unsafe fn next_expiration(&self) -> u64 { | 30 | /// The operation to perform after `timer_enqueue` is called. |
| 40 | let mut res = u64::MAX; | 31 | #[derive(Debug, Copy, Clone, PartialEq)] |
| 41 | self.retain(|p| { | 32 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| 42 | let task = p.header(); | 33 | #[must_use] |
| 43 | let expires = task.expires_at.get(); | 34 | pub enum TimerEnqueueOperation { |
| 44 | res = min(res, expires); | 35 | /// Enqueue the task (or update its expiration time). |
| 45 | expires != u64::MAX | 36 | Enqueue, |
| 46 | }); | 37 | /// The task must not be enqueued in the timer queue. |
| 47 | res | 38 | Ignore, |
| 48 | } | ||
| 49 | |||
| 50 | pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) { | ||
| 51 | self.retain(|p| { | ||
| 52 | let task = p.header(); | ||
| 53 | if task.expires_at.get() <= now { | ||
| 54 | on_task(p); | ||
| 55 | false | ||
| 56 | } else { | ||
| 57 | true | ||
| 58 | } | ||
| 59 | }); | ||
| 60 | } | ||
| 61 | |||
| 62 | pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { | ||
| 63 | let mut prev = &self.head; | ||
| 64 | while let Some(p) = prev.get() { | ||
| 65 | let task = p.header(); | ||
| 66 | if f(p) { | ||
| 67 | // Skip to next | ||
| 68 | prev = &task.timer_queue_item.next; | ||
| 69 | } else { | ||
| 70 | // Remove it | ||
| 71 | prev.set(task.timer_queue_item.next.get()); | ||
| 72 | task.state.timer_dequeue(); | ||
| 73 | } | ||
| 74 | } | ||
| 75 | } | ||
| 76 | } | 39 | } |
diff --git a/embassy-executor/src/raw/trace.rs b/embassy-executor/src/raw/trace.rs index c7bcf9c11..b34387b58 100644 --- a/embassy-executor/src/raw/trace.rs +++ b/embassy-executor/src/raw/trace.rs | |||
| @@ -61,29 +61,23 @@ pub(crate) fn executor_idle(executor: &SyncExecutor) { | |||
| 61 | rtos_trace::trace::system_idle(); | 61 | rtos_trace::trace::system_idle(); |
| 62 | } | 62 | } |
| 63 | 63 | ||
| 64 | #[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))] | ||
| 65 | const fn gcd(a: u64, b: u64) -> u64 { | ||
| 66 | if b == 0 { | ||
| 67 | a | ||
| 68 | } else { | ||
| 69 | gcd(b, a % b) | ||
| 70 | } | ||
| 71 | } | ||
| 72 | |||
| 73 | #[cfg(feature = "rtos-trace")] | 64 | #[cfg(feature = "rtos-trace")] |
| 74 | impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { | 65 | impl rtos_trace::RtosTraceOSCallbacks for crate::raw::SyncExecutor { |
| 75 | fn task_list() { | 66 | fn task_list() { |
| 76 | // We don't know what tasks exist, so we can't send them. | 67 | // We don't know what tasks exist, so we can't send them. |
| 77 | } | 68 | } |
| 78 | #[cfg(feature = "integrated-timers")] | ||
| 79 | fn time() -> u64 { | 69 | fn time() -> u64 { |
| 70 | const fn gcd(a: u64, b: u64) -> u64 { | ||
| 71 | if b == 0 { | ||
| 72 | a | ||
| 73 | } else { | ||
| 74 | gcd(b, a % b) | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 80 | const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000); | 78 | const GCD_1M: u64 = gcd(embassy_time_driver::TICK_HZ, 1_000_000); |
| 81 | embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M) | 79 | embassy_time_driver::now() * (1_000_000 / GCD_1M) / (embassy_time_driver::TICK_HZ / GCD_1M) |
| 82 | } | 80 | } |
| 83 | #[cfg(not(feature = "integrated-timers"))] | ||
| 84 | fn time() -> u64 { | ||
| 85 | 0 | ||
| 86 | } | ||
| 87 | } | 81 | } |
| 88 | 82 | ||
| 89 | #[cfg(feature = "rtos-trace")] | 83 | #[cfg(feature = "rtos-trace")] |
