diff options
| author | Dario Nieuwenhuis <[email protected]> | 2024-12-17 18:03:07 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-12-17 18:03:07 +0000 |
| commit | 0c245892c6812538f4f51b784ed8afa1ce47f25d (patch) | |
| tree | 534911bb3fd3da96fb4ed1e598fdddbbd006a595 | |
| parent | c1120c7138c10a4cbdadb75d47eb23be9b65c231 (diff) | |
| parent | 76d8a896bbff612e3d2db27554891c71d28988af (diff) | |
Merge pull request #3622 from bugadani/next
Allow polling exited tasks
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 67 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_atomics.rs | 1 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_critical_section.rs | 7 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 5 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics_arm.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_critical_section.rs | 8 |
6 files changed, 45 insertions, 47 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6503b556f..e38a2af66 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -50,33 +50,32 @@ use super::SpawnToken; | |||
| 50 | /// A task's complete life cycle is as follows: | 50 | /// A task's complete life cycle is as follows: |
| 51 | /// | 51 | /// |
| 52 | /// ```text | 52 | /// ```text |
| 53 | /// ┌────────────┐ ┌────────────────────────┐ | 53 | /// ┌────────────┐ ┌────────────────────────┐ |
| 54 | /// ��─►��Not spawned │◄─6┤Not spawned|Run enqueued│ | 54 | /// │Not spawned │◄─5┤Not spawned|Run enqueued│ |
| 55 | /// │ │ ├7─►│ │ | 55 | /// │ ├6─►│ │ |
| 56 | /// │ └─────┬──────┘ └──────▲─────────────────┘ | 56 | /// └─────┬──────┘ └──────▲─────────────────┘ |
| 57 | /// │ 1 │ | 57 | /// 1 │ |
| 58 | /// │ │ ┌────────────┘ | 58 | /// │ ┌────────────┘ |
| 59 | /// │ │ 5 | 59 | /// │ 4 |
| 60 | /// �� ��─────▼────┴─────────┐ | 60 | /// ┌─────▼────┴─────────┐ |
| 61 | /// │ │Spawned|Run enqueued│ | 61 | /// │Spawned|Run enqueued│ |
| 62 | /// │ │ │ | 62 | /// │ │ |
| 63 | /// �� ���─────┬▲─────────────┘ | 63 | /// └─────┬▲─────────────┘ |
| 64 | /// │ 2│ | 64 | /// 2│ |
| 65 | /// │ │3 | 65 | /// │3 |
| 66 | /// �� ��─────▼┴─────┐ | 66 | /// ┌─────▼┴─────┐ |
| 67 | /// ��─4┤ Spawned │ | 67 | /// �� Spawned │ |
| 68 | /// │ │ | 68 | /// │ │ |
| 69 | /// └────────────┘ | 69 | /// └────────────┘ |
| 70 | /// ``` | 70 | /// ``` |
| 71 | /// | 71 | /// |
| 72 | /// Transitions: | 72 | /// Transitions: |
| 73 | /// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn` | 73 | /// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn` |
| 74 | /// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` | 74 | /// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` |
| 75 | /// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue` | 75 | /// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue` |
| 76 | /// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` | 76 | /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` |
| 77 | /// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` | 77 | /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. |
| 78 | /// - 6: Task is dequeued and then ignored via `State::run_dequeue` | 78 | /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` |
| 79 | /// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` | ||
| 80 | pub(crate) struct TaskHeader { | 79 | pub(crate) struct TaskHeader { |
| 81 | pub(crate) state: State, | 80 | pub(crate) state: State, |
| 82 | pub(crate) run_queue_item: RunQueueItem, | 81 | pub(crate) run_queue_item: RunQueueItem, |
| @@ -162,6 +161,10 @@ pub struct TaskStorage<F: Future + 'static> { | |||
| 162 | future: UninitCell<F>, // Valid if STATE_SPAWNED | 161 | future: UninitCell<F>, // Valid if STATE_SPAWNED |
| 163 | } | 162 | } |
| 164 | 163 | ||
| 164 | unsafe fn poll_exited(_p: TaskRef) { | ||
| 165 | // Nothing to do, the task is already !SPAWNED and dequeued. | ||
| 166 | } | ||
| 167 | |||
| 165 | impl<F: Future + 'static> TaskStorage<F> { | 168 | impl<F: Future + 'static> TaskStorage<F> { |
| 166 | const NEW: Self = Self::new(); | 169 | const NEW: Self = Self::new(); |
| 167 | 170 | ||
| @@ -203,14 +206,23 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 203 | } | 206 | } |
| 204 | 207 | ||
| 205 | unsafe fn poll(p: TaskRef) { | 208 | unsafe fn poll(p: TaskRef) { |
| 206 | let this = &*(p.as_ptr() as *const TaskStorage<F>); | 209 | let this = &*p.as_ptr().cast::<TaskStorage<F>>(); |
| 207 | 210 | ||
| 208 | let future = Pin::new_unchecked(this.future.as_mut()); | 211 | let future = Pin::new_unchecked(this.future.as_mut()); |
| 209 | let waker = waker::from_task(p); | 212 | let waker = waker::from_task(p); |
| 210 | let mut cx = Context::from_waker(&waker); | 213 | let mut cx = Context::from_waker(&waker); |
| 211 | match future.poll(&mut cx) { | 214 | match future.poll(&mut cx) { |
| 212 | Poll::Ready(_) => { | 215 | Poll::Ready(_) => { |
| 216 | // As the future has finished and this function will not be called | ||
| 217 | // again, we can safely drop the future here. | ||
| 213 | this.future.drop_in_place(); | 218 | this.future.drop_in_place(); |
| 219 | |||
| 220 | // We replace the poll_fn with a despawn function, so that the task is cleaned up | ||
| 221 | // when the executor polls it next. | ||
| 222 | this.raw.poll_fn.set(Some(poll_exited)); | ||
| 223 | |||
| 224 | // Make sure we despawn last, so that other threads can only spawn the task | ||
| 225 | // after we're done with it. | ||
| 214 | this.raw.state.despawn(); | 226 | this.raw.state.despawn(); |
| 215 | } | 227 | } |
| 216 | Poll::Pending => {} | 228 | Poll::Pending => {} |
| @@ -411,15 +423,6 @@ impl SyncExecutor { | |||
| 411 | self.run_queue.dequeue_all(|p| { | 423 | self.run_queue.dequeue_all(|p| { |
| 412 | let task = p.header(); | 424 | let task = p.header(); |
| 413 | 425 | ||
| 414 | if !task.state.run_dequeue() { | ||
| 415 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 416 | // - Task gets dequeued, poll starts | ||
| 417 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 418 | // - Task poll finishes, returning done=true | ||
| 419 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 420 | return; | ||
| 421 | } | ||
| 422 | |||
| 423 | #[cfg(feature = "trace")] | 426 | #[cfg(feature = "trace")] |
| 424 | trace::task_exec_begin(self, &p); | 427 | trace::task_exec_begin(self, &p); |
| 425 | 428 | ||
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index efdafdff0..ce511d79a 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs | |||
| @@ -81,6 +81,7 @@ impl RunQueue { | |||
| 81 | // safety: there are no concurrent accesses to `next` | 81 | // safety: there are no concurrent accesses to `next` |
| 82 | next = unsafe { task.header().run_queue_item.next.get() }; | 82 | next = unsafe { task.header().run_queue_item.next.get() }; |
| 83 | 83 | ||
| 84 | task.header().state.run_dequeue(); | ||
| 84 | on_task(task); | 85 | on_task(task); |
| 85 | } | 86 | } |
| 86 | } | 87 | } |
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs index 90f09e8c8..86c4085ed 100644 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ b/embassy-executor/src/raw/run_queue_critical_section.rs | |||
| @@ -63,9 +63,10 @@ impl RunQueue { | |||
| 63 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | 63 | // If the task re-enqueues itself, the `next` pointer will get overwritten. |
| 64 | // Therefore, first read the next pointer, and only then process the task. | 64 | // Therefore, first read the next pointer, and only then process the task. |
| 65 | 65 | ||
| 66 | // safety: we know if the task is enqueued, no one else will touch the `next` pointer. | 66 | critical_section::with(|cs| { |
| 67 | let cs = unsafe { CriticalSection::new() }; | 67 | next = task.header().run_queue_item.next.borrow(cs).get(); |
| 68 | next = task.header().run_queue_item.next.borrow(cs).get(); | 68 | task.header().state.run_dequeue(cs); |
| 69 | }); | ||
| 69 | 70 | ||
| 70 | on_task(task); | 71 | on_task(task); |
| 71 | } | 72 | } |
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index bdd317b53..b6576bfc2 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -52,8 +52,7 @@ impl State { | |||
| 52 | 52 | ||
| 53 | /// Unmark the task as run-queued. Return whether the task is spawned. | 53 | /// Unmark the task as run-queued. Return whether the task is spawned. |
| 54 | #[inline(always)] | 54 | #[inline(always)] |
| 55 | pub fn run_dequeue(&self) -> bool { | 55 | pub fn run_dequeue(&self) { |
| 56 | let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | 56 | self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |
| 57 | state & STATE_SPAWNED != 0 | ||
| 58 | } | 57 | } |
| 59 | } | 58 | } |
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index cbda0d89d..b743dcc2c 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs | |||
| @@ -75,11 +75,9 @@ impl State { | |||
| 75 | 75 | ||
| 76 | /// Unmark the task as run-queued. Return whether the task is spawned. | 76 | /// Unmark the task as run-queued. Return whether the task is spawned. |
| 77 | #[inline(always)] | 77 | #[inline(always)] |
| 78 | pub fn run_dequeue(&self) -> bool { | 78 | pub fn run_dequeue(&self) { |
| 79 | compiler_fence(Ordering::Release); | 79 | compiler_fence(Ordering::Release); |
| 80 | 80 | ||
| 81 | let r = self.spawned.load(Ordering::Relaxed); | ||
| 82 | self.run_queued.store(false, Ordering::Relaxed); | 81 | self.run_queued.store(false, Ordering::Relaxed); |
| 83 | r | ||
| 84 | } | 82 | } |
| 85 | } | 83 | } |
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 4733af278..6b627ff79 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs | |||
| @@ -67,11 +67,7 @@ impl State { | |||
| 67 | 67 | ||
| 68 | /// Unmark the task as run-queued. Return whether the task is spawned. | 68 | /// Unmark the task as run-queued. Return whether the task is spawned. |
| 69 | #[inline(always)] | 69 | #[inline(always)] |
| 70 | pub fn run_dequeue(&self) -> bool { | 70 | pub fn run_dequeue(&self, cs: CriticalSection<'_>) { |
| 71 | self.update(|s| { | 71 | self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED) |
| 72 | let ok = *s & STATE_SPAWNED != 0; | ||
| 73 | *s &= !STATE_RUN_QUEUED; | ||
| 74 | ok | ||
| 75 | }) | ||
| 76 | } | 72 | } |
| 77 | } | 73 | } |
