diff options
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 197 |
1 files changed, 85 insertions, 112 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 3f93eae6f..7da14468d 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -16,8 +16,7 @@ mod run_queue; | |||
| 16 | #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] | 16 | #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] |
| 17 | mod state; | 17 | mod state; |
| 18 | 18 | ||
| 19 | #[cfg(feature = "integrated-timers")] | 19 | pub mod timer_queue; |
| 20 | mod timer_queue; | ||
| 21 | #[cfg(feature = "trace")] | 20 | #[cfg(feature = "trace")] |
| 22 | mod trace; | 21 | mod trace; |
| 23 | pub(crate) mod util; | 22 | pub(crate) mod util; |
| @@ -31,9 +30,6 @@ use core::pin::Pin; | |||
| 31 | use core::ptr::NonNull; | 30 | use core::ptr::NonNull; |
| 32 | use core::task::{Context, Poll}; | 31 | use core::task::{Context, Poll}; |
| 33 | 32 | ||
| 34 | #[cfg(feature = "integrated-timers")] | ||
| 35 | use embassy_time_driver::AlarmHandle; | ||
| 36 | |||
| 37 | use self::run_queue::{RunQueue, RunQueueItem}; | 33 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 38 | use self::state::State; | 34 | use self::state::State; |
| 39 | use self::util::{SyncUnsafeCell, UninitCell}; | 35 | use self::util::{SyncUnsafeCell, UninitCell}; |
| @@ -47,14 +43,12 @@ pub(crate) struct TaskHeader { | |||
| 47 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, | 43 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, |
| 48 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, | 44 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 49 | 45 | ||
| 50 | #[cfg(feature = "integrated-timers")] | 46 | /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. |
| 51 | pub(crate) expires_at: SyncUnsafeCell<u64>, | ||
| 52 | #[cfg(feature = "integrated-timers")] | ||
| 53 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 47 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 54 | } | 48 | } |
| 55 | 49 | ||
| 56 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. | 50 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 57 | #[derive(Clone, Copy)] | 51 | #[derive(Clone, Copy, PartialEq)] |
| 58 | pub struct TaskRef { | 52 | pub struct TaskRef { |
| 59 | ptr: NonNull<TaskHeader>, | 53 | ptr: NonNull<TaskHeader>, |
| 60 | } | 54 | } |
| @@ -76,10 +70,53 @@ impl TaskRef { | |||
| 76 | } | 70 | } |
| 77 | } | 71 | } |
| 78 | 72 | ||
| 73 | /// # Safety | ||
| 74 | /// | ||
| 75 | /// The result of this function must only be compared | ||
| 76 | /// for equality, or stored, but not used. | ||
| 77 | pub const unsafe fn dangling() -> Self { | ||
| 78 | Self { | ||
| 79 | ptr: NonNull::dangling(), | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 79 | pub(crate) fn header(self) -> &'static TaskHeader { | 83 | pub(crate) fn header(self) -> &'static TaskHeader { |
| 80 | unsafe { self.ptr.as_ref() } | 84 | unsafe { self.ptr.as_ref() } |
| 81 | } | 85 | } |
| 82 | 86 | ||
| 87 | /// Returns a reference to the executor that the task is currently running on. | ||
| 88 | pub unsafe fn executor(self) -> Option<&'static Executor> { | ||
| 89 | self.header().executor.get().map(|e| Executor::wrap(e)) | ||
| 90 | } | ||
| 91 | |||
| 92 | /// Returns a reference to the timer queue item. | ||
| 93 | pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { | ||
| 94 | &self.header().timer_queue_item | ||
| 95 | } | ||
| 96 | |||
| 97 | /// Mark the task as timer-queued. Return whether it should be actually enqueued | ||
| 98 | /// using `_embassy_time_schedule_wake`. | ||
| 99 | /// | ||
| 100 | /// Entering this state prevents the task from being respawned while in a timer queue. | ||
| 101 | /// | ||
| 102 | /// Safety: | ||
| 103 | /// | ||
| 104 | /// This functions should only be called by the timer queue driver, before | ||
| 105 | /// enqueueing the timer item. | ||
| 106 | pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { | ||
| 107 | self.header().state.timer_enqueue() | ||
| 108 | } | ||
| 109 | |||
| 110 | /// Unmark the task as timer-queued. | ||
| 111 | /// | ||
| 112 | /// Safety: | ||
| 113 | /// | ||
| 114 | /// This functions should only be called by the timer queue implementation, after the task has | ||
| 115 | /// been removed from the timer queue. | ||
| 116 | pub unsafe fn timer_dequeue(&self) { | ||
| 117 | self.header().state.timer_dequeue() | ||
| 118 | } | ||
| 119 | |||
| 83 | /// The returned pointer is valid for the entire TaskStorage. | 120 | /// The returned pointer is valid for the entire TaskStorage. |
| 84 | pub(crate) fn as_ptr(self) -> *const TaskHeader { | 121 | pub(crate) fn as_ptr(self) -> *const TaskHeader { |
| 85 | self.ptr.as_ptr() | 122 | self.ptr.as_ptr() |
| @@ -120,9 +157,6 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 120 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 157 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| 121 | poll_fn: SyncUnsafeCell::new(None), | 158 | poll_fn: SyncUnsafeCell::new(None), |
| 122 | 159 | ||
| 123 | #[cfg(feature = "integrated-timers")] | ||
| 124 | expires_at: SyncUnsafeCell::new(0), | ||
| 125 | #[cfg(feature = "integrated-timers")] | ||
| 126 | timer_queue_item: timer_queue::TimerQueueItem::new(), | 160 | timer_queue_item: timer_queue::TimerQueueItem::new(), |
| 127 | }, | 161 | }, |
| 128 | future: UninitCell::uninit(), | 162 | future: UninitCell::uninit(), |
| @@ -159,10 +193,25 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 159 | match future.poll(&mut cx) { | 193 | match future.poll(&mut cx) { |
| 160 | Poll::Ready(_) => { | 194 | Poll::Ready(_) => { |
| 161 | this.future.drop_in_place(); | 195 | this.future.drop_in_place(); |
| 196 | |||
| 197 | // Mark this task to be timer queued. | ||
| 198 | // We're splitting the enqueue in two parts, so that we can change task state | ||
| 199 | // to something that prevent re-queueing. | ||
| 200 | let op = this.raw.state.timer_enqueue(); | ||
| 201 | |||
| 202 | // Now mark the task as not spawned, so that | ||
| 203 | // - it can be spawned again once it has been removed from the timer queue | ||
| 204 | // - it can not be timer-queued again | ||
| 205 | // We must do this before scheduling the wake, to prevent the task from being | ||
| 206 | // dequeued by the time driver while it's still SPAWNED. | ||
| 162 | this.raw.state.despawn(); | 207 | this.raw.state.despawn(); |
| 163 | 208 | ||
| 164 | #[cfg(feature = "integrated-timers")] | 209 | // Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's |
| 165 | this.raw.expires_at.set(u64::MAX); | 210 | // better to be safe. |
| 211 | if op == timer_queue::TimerEnqueueOperation::Enqueue { | ||
| 212 | // Schedule the task in the past, so it gets dequeued ASAP. | ||
| 213 | unsafe { _embassy_time_schedule_wake(0, &waker) } | ||
| 214 | } | ||
| 166 | } | 215 | } |
| 167 | Poll::Pending => {} | 216 | Poll::Pending => {} |
| 168 | } | 217 | } |
| @@ -181,6 +230,10 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 181 | } | 230 | } |
| 182 | } | 231 | } |
| 183 | 232 | ||
| 233 | extern "Rust" { | ||
| 234 | fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker); | ||
| 235 | } | ||
| 236 | |||
| 184 | /// An uninitialized [`TaskStorage`]. | 237 | /// An uninitialized [`TaskStorage`]. |
| 185 | pub struct AvailableTask<F: Future + 'static> { | 238 | pub struct AvailableTask<F: Future + 'static> { |
| 186 | task: &'static TaskStorage<F>, | 239 | task: &'static TaskStorage<F>, |
| @@ -316,34 +369,16 @@ impl Pender { | |||
| 316 | pub(crate) struct SyncExecutor { | 369 | pub(crate) struct SyncExecutor { |
| 317 | run_queue: RunQueue, | 370 | run_queue: RunQueue, |
| 318 | pender: Pender, | 371 | pender: Pender, |
| 319 | |||
| 320 | #[cfg(feature = "integrated-timers")] | ||
| 321 | pub(crate) timer_queue: timer_queue::TimerQueue, | ||
| 322 | #[cfg(feature = "integrated-timers")] | ||
| 323 | alarm: AlarmHandle, | ||
| 324 | } | 372 | } |
| 325 | 373 | ||
| 326 | impl SyncExecutor { | 374 | impl SyncExecutor { |
| 327 | pub(crate) fn new(pender: Pender) -> Self { | 375 | pub(crate) fn new(pender: Pender) -> Self { |
| 328 | #[cfg(feature = "integrated-timers")] | ||
| 329 | let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) }; | ||
| 330 | |||
| 331 | Self { | 376 | Self { |
| 332 | run_queue: RunQueue::new(), | 377 | run_queue: RunQueue::new(), |
| 333 | pender, | 378 | pender, |
| 334 | |||
| 335 | #[cfg(feature = "integrated-timers")] | ||
| 336 | timer_queue: timer_queue::TimerQueue::new(), | ||
| 337 | #[cfg(feature = "integrated-timers")] | ||
| 338 | alarm, | ||
| 339 | } | 379 | } |
| 340 | } | 380 | } |
| 341 | 381 | ||
| 342 | pub(crate) unsafe fn initialize(&'static self) { | ||
| 343 | #[cfg(feature = "integrated-timers")] | ||
| 344 | embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); | ||
| 345 | } | ||
| 346 | |||
| 347 | /// Enqueue a task in the task queue | 382 | /// Enqueue a task in the task queue |
| 348 | /// | 383 | /// |
| 349 | /// # Safety | 384 | /// # Safety |
| @@ -360,12 +395,6 @@ impl SyncExecutor { | |||
| 360 | } | 395 | } |
| 361 | } | 396 | } |
| 362 | 397 | ||
| 363 | #[cfg(feature = "integrated-timers")] | ||
| 364 | fn alarm_callback(ctx: *mut ()) { | ||
| 365 | let this: &Self = unsafe { &*(ctx as *const Self) }; | ||
| 366 | this.pender.pend(); | ||
| 367 | } | ||
| 368 | |||
| 369 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | 398 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 370 | task.header().executor.set(Some(self)); | 399 | task.header().executor.set(Some(self)); |
| 371 | 400 | ||
| @@ -379,56 +408,27 @@ impl SyncExecutor { | |||
| 379 | /// | 408 | /// |
| 380 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. | 409 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. |
| 381 | pub(crate) unsafe fn poll(&'static self) { | 410 | pub(crate) unsafe fn poll(&'static self) { |
| 382 | #[allow(clippy::never_loop)] | 411 | self.run_queue.dequeue_all(|p| { |
| 383 | loop { | 412 | let task = p.header(); |
| 384 | #[cfg(feature = "integrated-timers")] | 413 | |
| 385 | self.timer_queue | 414 | if !task.state.run_dequeue() { |
| 386 | .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); | 415 | // If task is not running, ignore it. This can happen in the following scenario: |
| 387 | 416 | // - Task gets dequeued, poll starts | |
| 388 | self.run_queue.dequeue_all(|p| { | 417 | // - While task is being polled, it gets woken. It gets placed in the queue. |
| 389 | let task = p.header(); | 418 | // - Task poll finishes, returning done=true |
| 390 | 419 | // - RUNNING bit is cleared, but the task is already in the queue. | |
| 391 | #[cfg(feature = "integrated-timers")] | 420 | return; |
| 392 | task.expires_at.set(u64::MAX); | 421 | } |
| 393 | |||
| 394 | if !task.state.run_dequeue() { | ||
| 395 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 396 | // - Task gets dequeued, poll starts | ||
| 397 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 398 | // - Task poll finishes, returning done=true | ||
| 399 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 400 | return; | ||
| 401 | } | ||
| 402 | |||
| 403 | #[cfg(feature = "trace")] | ||
| 404 | trace::task_exec_begin(self, &p); | ||
| 405 | 422 | ||
| 406 | // Run the task | 423 | #[cfg(feature = "trace")] |
| 407 | task.poll_fn.get().unwrap_unchecked()(p); | 424 | trace::task_exec_begin(self, &p); |
| 408 | 425 | ||
| 409 | #[cfg(feature = "trace")] | 426 | // Run the task |
| 410 | trace::task_exec_end(self, &p); | 427 | task.poll_fn.get().unwrap_unchecked()(p); |
| 411 | 428 | ||
| 412 | // Enqueue or update into timer_queue | 429 | #[cfg(feature = "trace")] |
| 413 | #[cfg(feature = "integrated-timers")] | 430 | trace::task_exec_end(self, &p); |
| 414 | self.timer_queue.update(p); | 431 | }); |
| 415 | }); | ||
| 416 | |||
| 417 | #[cfg(feature = "integrated-timers")] | ||
| 418 | { | ||
| 419 | // If this is already in the past, set_alarm might return false | ||
| 420 | // In that case do another poll loop iteration. | ||
| 421 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 422 | if embassy_time_driver::set_alarm(self.alarm, next_expiration) { | ||
| 423 | break; | ||
| 424 | } | ||
| 425 | } | ||
| 426 | |||
| 427 | #[cfg(not(feature = "integrated-timers"))] | ||
| 428 | { | ||
| 429 | break; | ||
| 430 | } | ||
| 431 | } | ||
| 432 | 432 | ||
| 433 | #[cfg(feature = "trace")] | 433 | #[cfg(feature = "trace")] |
| 434 | trace::executor_idle(self) | 434 | trace::executor_idle(self) |
| @@ -494,15 +494,6 @@ impl Executor { | |||
| 494 | } | 494 | } |
| 495 | } | 495 | } |
| 496 | 496 | ||
| 497 | /// Initializes the executor. | ||
| 498 | /// | ||
| 499 | /// # Safety | ||
| 500 | /// | ||
| 501 | /// This function must be called once before any other method is called. | ||
| 502 | pub unsafe fn initialize(&'static self) { | ||
| 503 | self.inner.initialize(); | ||
| 504 | } | ||
| 505 | |||
| 506 | /// Spawn a task in this executor. | 497 | /// Spawn a task in this executor. |
| 507 | /// | 498 | /// |
| 508 | /// # Safety | 499 | /// # Safety |
| @@ -575,21 +566,3 @@ pub fn wake_task_no_pend(task: TaskRef) { | |||
| 575 | } | 566 | } |
| 576 | } | 567 | } |
| 577 | } | 568 | } |
| 578 | |||
| 579 | #[cfg(feature = "integrated-timers")] | ||
| 580 | struct TimerQueue; | ||
| 581 | |||
| 582 | #[cfg(feature = "integrated-timers")] | ||
| 583 | impl embassy_time_queue_driver::TimerQueue for TimerQueue { | ||
| 584 | fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) { | ||
| 585 | let task = waker::task_from_waker(waker); | ||
| 586 | let task = task.header(); | ||
| 587 | unsafe { | ||
| 588 | let expires_at = task.expires_at.get(); | ||
| 589 | task.expires_at.set(expires_at.min(at)); | ||
| 590 | } | ||
| 591 | } | ||
| 592 | } | ||
| 593 | |||
| 594 | #[cfg(feature = "integrated-timers")] | ||
| 595 | embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); | ||
