diff options
| author | Dániel Buga <[email protected]> | 2024-12-17 18:05:48 +0100 |
|---|---|---|
| committer | Dániel Buga <[email protected]> | 2024-12-17 18:07:06 +0100 |
| commit | 8fd08b1e97533c7526bb4937770060d18bb37410 (patch) | |
| tree | 3e30a41e0d630f6b472bd85a4407f600ca07410a | |
| parent | edb8f21a741358f7c80b744f008f1e5acc77b429 (diff) | |
Swap poll_fn to allow polling exited tasks
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 17 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_atomics.rs | 12 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_critical_section.rs | 13 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 5 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics_arm.rs | 4 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_critical_section.rs | 8 |
6 files changed, 25 insertions, 34 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index c79fdae60..242e9c365 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -202,16 +202,29 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 202 | } | 202 | } |
| 203 | } | 203 | } |
| 204 | 204 | ||
| 205 | unsafe fn poll_to_despawn(p: TaskRef) { | ||
| 206 | // The task's future has already been dropped, we just mark it as `!SPAWNED`. | ||
| 207 | let this = &*p.as_ptr().cast::<TaskStorage<F>>(); | ||
| 208 | this.raw.state.despawn(); | ||
| 209 | } | ||
| 210 | |||
| 205 | unsafe fn poll(p: TaskRef) { | 211 | unsafe fn poll(p: TaskRef) { |
| 206 | let this = &*(p.as_ptr() as *const TaskStorage<F>); | 212 | let this = &*p.as_ptr().cast::<TaskStorage<F>>(); |
| 207 | 213 | ||
| 208 | let future = Pin::new_unchecked(this.future.as_mut()); | 214 | let future = Pin::new_unchecked(this.future.as_mut()); |
| 209 | let waker = waker::from_task(p); | 215 | let waker = waker::from_task(p); |
| 210 | let mut cx = Context::from_waker(&waker); | 216 | let mut cx = Context::from_waker(&waker); |
| 211 | match future.poll(&mut cx) { | 217 | match future.poll(&mut cx) { |
| 212 | Poll::Ready(_) => { | 218 | Poll::Ready(_) => { |
| 219 | waker.wake_by_ref(); | ||
| 220 | |||
| 221 | // As the future has finished and this function will not be called | ||
| 222 | // again, we can safely drop the future here. | ||
| 213 | this.future.drop_in_place(); | 223 | this.future.drop_in_place(); |
| 214 | this.raw.state.despawn(); | 224 | |
| 225 | // We replace the poll_fn with a despawn function, so that the task is cleaned up | ||
| 226 | // when the executor polls it next. | ||
| 227 | this.raw.poll_fn.set(Some(Self::poll_to_despawn)); | ||
| 215 | } | 228 | } |
| 216 | Poll::Pending => {} | 229 | Poll::Pending => {} |
| 217 | } | 230 | } |
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index aad90d767..ce511d79a 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs | |||
| @@ -81,16 +81,8 @@ impl RunQueue { | |||
| 81 | // safety: there are no concurrent accesses to `next` | 81 | // safety: there are no concurrent accesses to `next` |
| 82 | next = unsafe { task.header().run_queue_item.next.get() }; | 82 | next = unsafe { task.header().run_queue_item.next.get() }; |
| 83 | 83 | ||
| 84 | let run_task = task.header().state.run_dequeue(); | 84 | task.header().state.run_dequeue(); |
| 85 | 85 | on_task(task); | |
| 86 | if run_task { | ||
| 87 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 88 | // - Task gets dequeued, poll starts | ||
| 89 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 90 | // - Task poll finishes, returning done=true | ||
| 91 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 92 | on_task(task); | ||
| 93 | } | ||
| 94 | } | 86 | } |
| 95 | } | 87 | } |
| 96 | } | 88 | } |
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs index 4f1b2855a..86c4085ed 100644 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ b/embassy-executor/src/raw/run_queue_critical_section.rs | |||
| @@ -63,19 +63,12 @@ impl RunQueue { | |||
| 63 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | 63 | // If the task re-enqueues itself, the `next` pointer will get overwritten. |
| 64 | // Therefore, first read the next pointer, and only then process the task. | 64 | // Therefore, first read the next pointer, and only then process the task. |
| 65 | 65 | ||
| 66 | let run_task = critical_section::with(|cs| { | 66 | critical_section::with(|cs| { |
| 67 | next = task.header().run_queue_item.next.borrow(cs).get(); | 67 | next = task.header().run_queue_item.next.borrow(cs).get(); |
| 68 | task.header().state.run_dequeue(cs) | 68 | task.header().state.run_dequeue(cs); |
| 69 | }); | 69 | }); |
| 70 | 70 | ||
| 71 | if run_task { | 71 | on_task(task); |
| 72 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 73 | // - Task gets dequeued, poll starts | ||
| 74 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 75 | // - Task poll finishes, returning done=true | ||
| 76 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 77 | on_task(task); | ||
| 78 | } | ||
| 79 | } | 72 | } |
| 80 | } | 73 | } |
| 81 | } | 74 | } |
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index bdd317b53..b6576bfc2 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -52,8 +52,7 @@ impl State { | |||
| 52 | 52 | ||
| 53 | /// Unmark the task as run-queued. Return whether the task is spawned. | 53 | /// Unmark the task as run-queued. Return whether the task is spawned. |
| 54 | #[inline(always)] | 54 | #[inline(always)] |
| 55 | pub fn run_dequeue(&self) -> bool { | 55 | pub fn run_dequeue(&self) { |
| 56 | let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | 56 | self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |
| 57 | state & STATE_SPAWNED != 0 | ||
| 58 | } | 57 | } |
| 59 | } | 58 | } |
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index cbda0d89d..b743dcc2c 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs | |||
| @@ -75,11 +75,9 @@ impl State { | |||
| 75 | 75 | ||
| 76 | /// Unmark the task as run-queued. Return whether the task is spawned. | 76 | /// Unmark the task as run-queued. Return whether the task is spawned. |
| 77 | #[inline(always)] | 77 | #[inline(always)] |
| 78 | pub fn run_dequeue(&self) -> bool { | 78 | pub fn run_dequeue(&self) { |
| 79 | compiler_fence(Ordering::Release); | 79 | compiler_fence(Ordering::Release); |
| 80 | 80 | ||
| 81 | let r = self.spawned.load(Ordering::Relaxed); | ||
| 82 | self.run_queued.store(false, Ordering::Relaxed); | 81 | self.run_queued.store(false, Ordering::Relaxed); |
| 83 | r | ||
| 84 | } | 82 | } |
| 85 | } | 83 | } |
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 4733af278..6b627ff79 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs | |||
| @@ -67,11 +67,7 @@ impl State { | |||
| 67 | 67 | ||
| 68 | /// Unmark the task as run-queued. Return whether the task is spawned. | 68 | /// Unmark the task as run-queued. Return whether the task is spawned. |
| 69 | #[inline(always)] | 69 | #[inline(always)] |
| 70 | pub fn run_dequeue(&self) -> bool { | 70 | pub fn run_dequeue(&self, cs: CriticalSection<'_>) { |
| 71 | self.update(|s| { | 71 | self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED) |
| 72 | let ok = *s & STATE_SPAWNED != 0; | ||
| 73 | *s &= !STATE_RUN_QUEUED; | ||
| 74 | ok | ||
| 75 | }) | ||
| 76 | } | 72 | } |
| 77 | } | 73 | } |
