diff options
| author | Dániel Buga <[email protected]> | 2024-11-26 23:54:21 +0100 |
|---|---|---|
| committer | Dániel Buga <[email protected]> | 2024-12-10 21:31:42 +0100 |
| commit | 5a5495aac43d75610735f2ca80fb6c8e8f31ed71 (patch) | |
| tree | 7a4336917894730692589359e9d1a285ec5a0a05 /embassy-executor | |
| parent | 406d377b7564d16e12b7fae4f42c0c709bf4f243 (diff) | |
Refactor integrated-timers
Diffstat (limited to 'embassy-executor')
| -rw-r--r-- | embassy-executor/Cargo.toml | 2 | ||||
| -rw-r--r-- | embassy-executor/src/arch/avr.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/cortex_m.rs | 6 | ||||
| -rw-r--r-- | embassy-executor/src/arch/riscv32.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/spin.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/std.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/arch/wasm.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 135 | ||||
| -rw-r--r-- | embassy-executor/src/raw/timer_queue.rs | 89 | ||||
| -rw-r--r-- | embassy-executor/src/raw/util.rs | 5 | ||||
| -rw-r--r-- | embassy-executor/tests/test.rs | 3 |
11 files changed, 89 insertions, 171 deletions
diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml index 0a5360e5d..862d25b59 100644 --- a/embassy-executor/Cargo.toml +++ b/embassy-executor/Cargo.toml | |||
| @@ -68,7 +68,7 @@ nightly = ["embassy-executor-macros/nightly"] | |||
| 68 | turbowakers = [] | 68 | turbowakers = [] |
| 69 | 69 | ||
| 70 | ## Use the executor-integrated `embassy-time` timer queue. | 70 | ## Use the executor-integrated `embassy-time` timer queue. |
| 71 | integrated-timers = ["dep:embassy-time-driver", "dep:embassy-time-queue-driver"] | 71 | integrated-timers = ["dep:embassy-time-driver"] |
| 72 | 72 | ||
| 73 | #! ### Architecture | 73 | #! ### Architecture |
| 74 | _arch = [] # some arch was picked | 74 | _arch = [] # some arch was picked |
diff --git a/embassy-executor/src/arch/avr.rs b/embassy-executor/src/arch/avr.rs index 7f9ed4421..70085d04d 100644 --- a/embassy-executor/src/arch/avr.rs +++ b/embassy-executor/src/arch/avr.rs | |||
| @@ -53,10 +53,6 @@ mod thread { | |||
| 53 | /// | 53 | /// |
| 54 | /// This function never returns. | 54 | /// This function never returns. |
| 55 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 55 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 56 | unsafe { | ||
| 57 | self.inner.initialize(); | ||
| 58 | } | ||
| 59 | |||
| 60 | init(self.inner.spawner()); | 56 | init(self.inner.spawner()); |
| 61 | 57 | ||
| 62 | loop { | 58 | loop { |
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index 0c2af88a6..5c517e0a2 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs | |||
| @@ -98,9 +98,6 @@ mod thread { | |||
| 98 | /// | 98 | /// |
| 99 | /// This function never returns. | 99 | /// This function never returns. |
| 100 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 100 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 101 | unsafe { | ||
| 102 | self.inner.initialize(); | ||
| 103 | } | ||
| 104 | init(self.inner.spawner()); | 101 | init(self.inner.spawner()); |
| 105 | 102 | ||
| 106 | loop { | 103 | loop { |
| @@ -210,9 +207,6 @@ mod interrupt { | |||
| 210 | } | 207 | } |
| 211 | 208 | ||
| 212 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | 209 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; |
| 213 | unsafe { | ||
| 214 | executor.initialize(); | ||
| 215 | } | ||
| 216 | 210 | ||
| 217 | unsafe { NVIC::unmask(irq) } | 211 | unsafe { NVIC::unmask(irq) } |
| 218 | 212 | ||
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index 715e5f3cf..01e63a9fd 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs | |||
| @@ -54,10 +54,6 @@ mod thread { | |||
| 54 | /// | 54 | /// |
| 55 | /// This function never returns. | 55 | /// This function never returns. |
| 56 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 56 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 57 | unsafe { | ||
| 58 | self.inner.initialize(); | ||
| 59 | } | ||
| 60 | |||
| 61 | init(self.inner.spawner()); | 57 | init(self.inner.spawner()); |
| 62 | 58 | ||
| 63 | loop { | 59 | loop { |
diff --git a/embassy-executor/src/arch/spin.rs b/embassy-executor/src/arch/spin.rs index 54c7458b3..340023620 100644 --- a/embassy-executor/src/arch/spin.rs +++ b/embassy-executor/src/arch/spin.rs | |||
| @@ -48,10 +48,6 @@ mod thread { | |||
| 48 | /// | 48 | /// |
| 49 | /// This function never returns. | 49 | /// This function never returns. |
| 50 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 50 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 51 | unsafe { | ||
| 52 | self.inner.initialize(); | ||
| 53 | } | ||
| 54 | |||
| 55 | init(self.inner.spawner()); | 51 | init(self.inner.spawner()); |
| 56 | 52 | ||
| 57 | loop { | 53 | loop { |
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs index 948c7711b..b02b15988 100644 --- a/embassy-executor/src/arch/std.rs +++ b/embassy-executor/src/arch/std.rs | |||
| @@ -55,10 +55,6 @@ mod thread { | |||
| 55 | /// | 55 | /// |
| 56 | /// This function never returns. | 56 | /// This function never returns. |
| 57 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 57 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { |
| 58 | unsafe { | ||
| 59 | self.inner.initialize(); | ||
| 60 | } | ||
| 61 | |||
| 62 | init(self.inner.spawner()); | 58 | init(self.inner.spawner()); |
| 63 | 59 | ||
| 64 | loop { | 60 | loop { |
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs index 35025f11f..f9d0f935c 100644 --- a/embassy-executor/src/arch/wasm.rs +++ b/embassy-executor/src/arch/wasm.rs | |||
| @@ -71,10 +71,6 @@ mod thread { | |||
| 71 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | 71 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) |
| 72 | pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { | 72 | pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { |
| 73 | unsafe { | 73 | unsafe { |
| 74 | self.inner.initialize(); | ||
| 75 | } | ||
| 76 | |||
| 77 | unsafe { | ||
| 78 | let executor = &self.inner; | 74 | let executor = &self.inner; |
| 79 | let future = Closure::new(move |_| { | 75 | let future = Closure::new(move |_| { |
| 80 | executor.poll(); | 76 | executor.poll(); |
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 3f93eae6f..80bd49bad 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -17,7 +17,7 @@ mod run_queue; | |||
| 17 | mod state; | 17 | mod state; |
| 18 | 18 | ||
| 19 | #[cfg(feature = "integrated-timers")] | 19 | #[cfg(feature = "integrated-timers")] |
| 20 | mod timer_queue; | 20 | pub mod timer_queue; |
| 21 | #[cfg(feature = "trace")] | 21 | #[cfg(feature = "trace")] |
| 22 | mod trace; | 22 | mod trace; |
| 23 | pub(crate) mod util; | 23 | pub(crate) mod util; |
| @@ -31,9 +31,6 @@ use core::pin::Pin; | |||
| 31 | use core::ptr::NonNull; | 31 | use core::ptr::NonNull; |
| 32 | use core::task::{Context, Poll}; | 32 | use core::task::{Context, Poll}; |
| 33 | 33 | ||
| 34 | #[cfg(feature = "integrated-timers")] | ||
| 35 | use embassy_time_driver::AlarmHandle; | ||
| 36 | |||
| 37 | use self::run_queue::{RunQueue, RunQueueItem}; | 34 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 38 | use self::state::State; | 35 | use self::state::State; |
| 39 | use self::util::{SyncUnsafeCell, UninitCell}; | 36 | use self::util::{SyncUnsafeCell, UninitCell}; |
| @@ -47,8 +44,7 @@ pub(crate) struct TaskHeader { | |||
| 47 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, | 44 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, |
| 48 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, | 45 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 49 | 46 | ||
| 50 | #[cfg(feature = "integrated-timers")] | 47 | /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. |
| 51 | pub(crate) expires_at: SyncUnsafeCell<u64>, | ||
| 52 | #[cfg(feature = "integrated-timers")] | 48 | #[cfg(feature = "integrated-timers")] |
| 53 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 49 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 54 | } | 50 | } |
| @@ -80,6 +76,12 @@ impl TaskRef { | |||
| 80 | unsafe { self.ptr.as_ref() } | 76 | unsafe { self.ptr.as_ref() } |
| 81 | } | 77 | } |
| 82 | 78 | ||
| 79 | /// Returns a reference to the executor that the task is currently running on. | ||
| 80 | #[cfg(feature = "integrated-timers")] | ||
| 81 | pub unsafe fn executor(self) -> Option<&'static Executor> { | ||
| 82 | self.header().executor.get().map(|e| Executor::wrap(e)) | ||
| 83 | } | ||
| 84 | |||
| 83 | /// The returned pointer is valid for the entire TaskStorage. | 85 | /// The returned pointer is valid for the entire TaskStorage. |
| 84 | pub(crate) fn as_ptr(self) -> *const TaskHeader { | 86 | pub(crate) fn as_ptr(self) -> *const TaskHeader { |
| 85 | self.ptr.as_ptr() | 87 | self.ptr.as_ptr() |
| @@ -121,8 +123,6 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 121 | poll_fn: SyncUnsafeCell::new(None), | 123 | poll_fn: SyncUnsafeCell::new(None), |
| 122 | 124 | ||
| 123 | #[cfg(feature = "integrated-timers")] | 125 | #[cfg(feature = "integrated-timers")] |
| 124 | expires_at: SyncUnsafeCell::new(0), | ||
| 125 | #[cfg(feature = "integrated-timers")] | ||
| 126 | timer_queue_item: timer_queue::TimerQueueItem::new(), | 126 | timer_queue_item: timer_queue::TimerQueueItem::new(), |
| 127 | }, | 127 | }, |
| 128 | future: UninitCell::uninit(), | 128 | future: UninitCell::uninit(), |
| @@ -160,9 +160,6 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 160 | Poll::Ready(_) => { | 160 | Poll::Ready(_) => { |
| 161 | this.future.drop_in_place(); | 161 | this.future.drop_in_place(); |
| 162 | this.raw.state.despawn(); | 162 | this.raw.state.despawn(); |
| 163 | |||
| 164 | #[cfg(feature = "integrated-timers")] | ||
| 165 | this.raw.expires_at.set(u64::MAX); | ||
| 166 | } | 163 | } |
| 167 | Poll::Pending => {} | 164 | Poll::Pending => {} |
| 168 | } | 165 | } |
| @@ -316,34 +313,16 @@ impl Pender { | |||
| 316 | pub(crate) struct SyncExecutor { | 313 | pub(crate) struct SyncExecutor { |
| 317 | run_queue: RunQueue, | 314 | run_queue: RunQueue, |
| 318 | pender: Pender, | 315 | pender: Pender, |
| 319 | |||
| 320 | #[cfg(feature = "integrated-timers")] | ||
| 321 | pub(crate) timer_queue: timer_queue::TimerQueue, | ||
| 322 | #[cfg(feature = "integrated-timers")] | ||
| 323 | alarm: AlarmHandle, | ||
| 324 | } | 316 | } |
| 325 | 317 | ||
| 326 | impl SyncExecutor { | 318 | impl SyncExecutor { |
| 327 | pub(crate) fn new(pender: Pender) -> Self { | 319 | pub(crate) fn new(pender: Pender) -> Self { |
| 328 | #[cfg(feature = "integrated-timers")] | ||
| 329 | let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) }; | ||
| 330 | |||
| 331 | Self { | 320 | Self { |
| 332 | run_queue: RunQueue::new(), | 321 | run_queue: RunQueue::new(), |
| 333 | pender, | 322 | pender, |
| 334 | |||
| 335 | #[cfg(feature = "integrated-timers")] | ||
| 336 | timer_queue: timer_queue::TimerQueue::new(), | ||
| 337 | #[cfg(feature = "integrated-timers")] | ||
| 338 | alarm, | ||
| 339 | } | 323 | } |
| 340 | } | 324 | } |
| 341 | 325 | ||
| 342 | pub(crate) unsafe fn initialize(&'static self) { | ||
| 343 | #[cfg(feature = "integrated-timers")] | ||
| 344 | embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); | ||
| 345 | } | ||
| 346 | |||
| 347 | /// Enqueue a task in the task queue | 326 | /// Enqueue a task in the task queue |
| 348 | /// | 327 | /// |
| 349 | /// # Safety | 328 | /// # Safety |
| @@ -360,12 +339,6 @@ impl SyncExecutor { | |||
| 360 | } | 339 | } |
| 361 | } | 340 | } |
| 362 | 341 | ||
| 363 | #[cfg(feature = "integrated-timers")] | ||
| 364 | fn alarm_callback(ctx: *mut ()) { | ||
| 365 | let this: &Self = unsafe { &*(ctx as *const Self) }; | ||
| 366 | this.pender.pend(); | ||
| 367 | } | ||
| 368 | |||
| 369 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | 342 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 370 | task.header().executor.set(Some(self)); | 343 | task.header().executor.set(Some(self)); |
| 371 | 344 | ||
| @@ -379,56 +352,27 @@ impl SyncExecutor { | |||
| 379 | /// | 352 | /// |
| 380 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. | 353 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. |
| 381 | pub(crate) unsafe fn poll(&'static self) { | 354 | pub(crate) unsafe fn poll(&'static self) { |
| 382 | #[allow(clippy::never_loop)] | 355 | self.run_queue.dequeue_all(|p| { |
| 383 | loop { | 356 | let task = p.header(); |
| 384 | #[cfg(feature = "integrated-timers")] | 357 | |
| 385 | self.timer_queue | 358 | if !task.state.run_dequeue() { |
| 386 | .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); | 359 | // If task is not running, ignore it. This can happen in the following scenario: |
| 387 | 360 | // - Task gets dequeued, poll starts | |
| 388 | self.run_queue.dequeue_all(|p| { | 361 | // - While task is being polled, it gets woken. It gets placed in the queue. |
| 389 | let task = p.header(); | 362 | // - Task poll finishes, returning done=true |
| 390 | 363 | // - RUNNING bit is cleared, but the task is already in the queue. | |
| 391 | #[cfg(feature = "integrated-timers")] | 364 | return; |
| 392 | task.expires_at.set(u64::MAX); | 365 | } |
| 393 | |||
| 394 | if !task.state.run_dequeue() { | ||
| 395 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 396 | // - Task gets dequeued, poll starts | ||
| 397 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 398 | // - Task poll finishes, returning done=true | ||
| 399 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 400 | return; | ||
| 401 | } | ||
| 402 | |||
| 403 | #[cfg(feature = "trace")] | ||
| 404 | trace::task_exec_begin(self, &p); | ||
| 405 | 366 | ||
| 406 | // Run the task | 367 | #[cfg(feature = "trace")] |
| 407 | task.poll_fn.get().unwrap_unchecked()(p); | 368 | trace::task_exec_begin(self, &p); |
| 408 | 369 | ||
| 409 | #[cfg(feature = "trace")] | 370 | // Run the task |
| 410 | trace::task_exec_end(self, &p); | 371 | task.poll_fn.get().unwrap_unchecked()(p); |
| 411 | 372 | ||
| 412 | // Enqueue or update into timer_queue | 373 | #[cfg(feature = "trace")] |
| 413 | #[cfg(feature = "integrated-timers")] | 374 | trace::task_exec_end(self, &p); |
| 414 | self.timer_queue.update(p); | 375 | }); |
| 415 | }); | ||
| 416 | |||
| 417 | #[cfg(feature = "integrated-timers")] | ||
| 418 | { | ||
| 419 | // If this is already in the past, set_alarm might return false | ||
| 420 | // In that case do another poll loop iteration. | ||
| 421 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 422 | if embassy_time_driver::set_alarm(self.alarm, next_expiration) { | ||
| 423 | break; | ||
| 424 | } | ||
| 425 | } | ||
| 426 | |||
| 427 | #[cfg(not(feature = "integrated-timers"))] | ||
| 428 | { | ||
| 429 | break; | ||
| 430 | } | ||
| 431 | } | ||
| 432 | 376 | ||
| 433 | #[cfg(feature = "trace")] | 377 | #[cfg(feature = "trace")] |
| 434 | trace::executor_idle(self) | 378 | trace::executor_idle(self) |
| @@ -494,15 +438,6 @@ impl Executor { | |||
| 494 | } | 438 | } |
| 495 | } | 439 | } |
| 496 | 440 | ||
| 497 | /// Initializes the executor. | ||
| 498 | /// | ||
| 499 | /// # Safety | ||
| 500 | /// | ||
| 501 | /// This function must be called once before any other method is called. | ||
| 502 | pub unsafe fn initialize(&'static self) { | ||
| 503 | self.inner.initialize(); | ||
| 504 | } | ||
| 505 | |||
| 506 | /// Spawn a task in this executor. | 441 | /// Spawn a task in this executor. |
| 507 | /// | 442 | /// |
| 508 | /// # Safety | 443 | /// # Safety |
| @@ -575,21 +510,3 @@ pub fn wake_task_no_pend(task: TaskRef) { | |||
| 575 | } | 510 | } |
| 576 | } | 511 | } |
| 577 | } | 512 | } |
| 578 | |||
| 579 | #[cfg(feature = "integrated-timers")] | ||
| 580 | struct TimerQueue; | ||
| 581 | |||
| 582 | #[cfg(feature = "integrated-timers")] | ||
| 583 | impl embassy_time_queue_driver::TimerQueue for TimerQueue { | ||
| 584 | fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) { | ||
| 585 | let task = waker::task_from_waker(waker); | ||
| 586 | let task = task.header(); | ||
| 587 | unsafe { | ||
| 588 | let expires_at = task.expires_at.get(); | ||
| 589 | task.expires_at.set(expires_at.min(at)); | ||
| 590 | } | ||
| 591 | } | ||
| 592 | } | ||
| 593 | |||
| 594 | #[cfg(feature = "integrated-timers")] | ||
| 595 | embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); | ||
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 94a5f340b..953bf014f 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs | |||
| @@ -1,75 +1,100 @@ | |||
| 1 | //! Timer queue operations. | ||
| 1 | use core::cmp::min; | 2 | use core::cmp::min; |
| 2 | 3 | ||
| 4 | use super::util::SyncUnsafeCell; | ||
| 3 | use super::TaskRef; | 5 | use super::TaskRef; |
| 4 | use crate::raw::util::SyncUnsafeCell; | ||
| 5 | 6 | ||
| 6 | pub(crate) struct TimerQueueItem { | 7 | pub(crate) struct TimerQueueItem { |
| 7 | next: SyncUnsafeCell<Option<TaskRef>>, | 8 | next: SyncUnsafeCell<Option<TaskRef>>, |
| 9 | expires_at: SyncUnsafeCell<u64>, | ||
| 8 | } | 10 | } |
| 9 | 11 | ||
| 10 | impl TimerQueueItem { | 12 | impl TimerQueueItem { |
| 11 | pub const fn new() -> Self { | 13 | pub const fn new() -> Self { |
| 12 | Self { | 14 | Self { |
| 13 | next: SyncUnsafeCell::new(None), | 15 | next: SyncUnsafeCell::new(None), |
| 16 | expires_at: SyncUnsafeCell::new(0), | ||
| 14 | } | 17 | } |
| 15 | } | 18 | } |
| 16 | } | 19 | } |
| 17 | 20 | ||
| 18 | pub(crate) struct TimerQueue { | 21 | /// A timer queue, with items integrated into tasks. |
| 22 | pub struct TimerQueue { | ||
| 19 | head: SyncUnsafeCell<Option<TaskRef>>, | 23 | head: SyncUnsafeCell<Option<TaskRef>>, |
| 20 | } | 24 | } |
| 21 | 25 | ||
| 22 | impl TimerQueue { | 26 | impl TimerQueue { |
| 27 | /// Creates a new timer queue. | ||
| 23 | pub const fn new() -> Self { | 28 | pub const fn new() -> Self { |
| 24 | Self { | 29 | Self { |
| 25 | head: SyncUnsafeCell::new(None), | 30 | head: SyncUnsafeCell::new(None), |
| 26 | } | 31 | } |
| 27 | } | 32 | } |
| 28 | 33 | ||
| 29 | pub(crate) unsafe fn update(&self, p: TaskRef) { | 34 | /// Schedules a task to run at a specific time. |
| 30 | let task = p.header(); | 35 | /// |
| 31 | if task.expires_at.get() != u64::MAX { | 36 | /// If this function returns `true`, the called should find the next expiration time and set |
| 37 | /// a new alarm for that time. | ||
| 38 | pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool { | ||
| 39 | unsafe { | ||
| 40 | let task = p.header(); | ||
| 41 | let item = &task.timer_queue_item; | ||
| 32 | if task.state.timer_enqueue() { | 42 | if task.state.timer_enqueue() { |
| 33 | task.timer_queue_item.next.set(self.head.get()); | 43 | // If not in the queue, add it and update. |
| 34 | self.head.set(Some(p)); | 44 | let prev = self.head.replace(Some(p)); |
| 45 | item.next.set(prev); | ||
| 46 | } else if at <= item.expires_at.get() { | ||
| 47 | // If expiration is sooner than previously set, update. | ||
| 48 | } else { | ||
| 49 | // Task does not need to be updated. | ||
| 50 | return false; | ||
| 35 | } | 51 | } |
| 52 | |||
| 53 | item.expires_at.set(at); | ||
| 54 | true | ||
| 36 | } | 55 | } |
| 37 | } | 56 | } |
| 38 | 57 | ||
| 39 | pub(crate) unsafe fn next_expiration(&self) -> u64 { | 58 | /// Dequeues expired timers and returns the next alarm time. |
| 40 | let mut res = u64::MAX; | 59 | /// |
| 41 | self.retain(|p| { | 60 | /// The provided callback will be called for each expired task. Tasks that never expire |
| 42 | let task = p.header(); | 61 | /// will be removed, but the callback will not be called. |
| 43 | let expires = task.expires_at.get(); | 62 | pub fn next_expiration(&mut self, now: u64) -> u64 { |
| 44 | res = min(res, expires); | 63 | let mut next_expiration = u64::MAX; |
| 45 | expires != u64::MAX | ||
| 46 | }); | ||
| 47 | res | ||
| 48 | } | ||
| 49 | 64 | ||
| 50 | pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) { | ||
| 51 | self.retain(|p| { | 65 | self.retain(|p| { |
| 52 | let task = p.header(); | 66 | let task = p.header(); |
| 53 | if task.expires_at.get() <= now { | 67 | let item = &task.timer_queue_item; |
| 54 | on_task(p); | 68 | let expires = unsafe { item.expires_at.get() }; |
| 69 | |||
| 70 | if expires <= now { | ||
| 71 | // Timer expired, process task. | ||
| 72 | super::wake_task(p); | ||
| 55 | false | 73 | false |
| 56 | } else { | 74 | } else { |
| 57 | true | 75 | // Timer didn't yet expire, or never expires. |
| 76 | next_expiration = min(next_expiration, expires); | ||
| 77 | expires != u64::MAX | ||
| 58 | } | 78 | } |
| 59 | }); | 79 | }); |
| 80 | |||
| 81 | next_expiration | ||
| 60 | } | 82 | } |
| 61 | 83 | ||
| 62 | pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { | 84 | fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { |
| 63 | let mut prev = &self.head; | 85 | unsafe { |
| 64 | while let Some(p) = prev.get() { | 86 | let mut prev = &self.head; |
| 65 | let task = p.header(); | 87 | while let Some(p) = prev.get() { |
| 66 | if f(p) { | 88 | let task = p.header(); |
| 67 | // Skip to next | 89 | let item = &task.timer_queue_item; |
| 68 | prev = &task.timer_queue_item.next; | 90 | if f(p) { |
| 69 | } else { | 91 | // Skip to next |
| 70 | // Remove it | 92 | prev = &item.next; |
| 71 | prev.set(task.timer_queue_item.next.get()); | 93 | } else { |
| 72 | task.state.timer_dequeue(); | 94 | // Remove it |
| 95 | prev.set(item.next.get()); | ||
| 96 | task.state.timer_dequeue(); | ||
| 97 | } | ||
| 73 | } | 98 | } |
| 74 | } | 99 | } |
| 75 | } | 100 | } |
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index c46085e45..e2633658a 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs | |||
| @@ -54,4 +54,9 @@ impl<T> SyncUnsafeCell<T> { | |||
| 54 | { | 54 | { |
| 55 | *self.value.get() | 55 | *self.value.get() |
| 56 | } | 56 | } |
| 57 | |||
| 58 | #[cfg(feature = "integrated-timers")] | ||
| 59 | pub unsafe fn replace(&self, value: T) -> T { | ||
| 60 | core::mem::replace(&mut *self.value.get(), value) | ||
| 61 | } | ||
| 57 | } | 62 | } |
diff --git a/embassy-executor/tests/test.rs b/embassy-executor/tests/test.rs index 8054bf7eb..0ce1f1891 100644 --- a/embassy-executor/tests/test.rs +++ b/embassy-executor/tests/test.rs | |||
| @@ -40,9 +40,6 @@ fn setup() -> (&'static Executor, Trace) { | |||
| 40 | let trace = Trace::new(); | 40 | let trace = Trace::new(); |
| 41 | let context = Box::leak(Box::new(trace.clone())) as *mut _ as *mut (); | 41 | let context = Box::leak(Box::new(trace.clone())) as *mut _ as *mut (); |
| 42 | let executor = &*Box::leak(Box::new(Executor::new(context))); | 42 | let executor = &*Box::leak(Box::new(Executor::new(context))); |
| 43 | unsafe { | ||
| 44 | executor.initialize(); | ||
| 45 | } | ||
| 46 | 43 | ||
| 47 | (executor, trace) | 44 | (executor, trace) |
| 48 | } | 45 | } |
