aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2024-12-17 18:03:07 +0000
committerGitHub <[email protected]>2024-12-17 18:03:07 +0000
commit0c245892c6812538f4f51b784ed8afa1ce47f25d (patch)
tree534911bb3fd3da96fb4ed1e598fdddbbd006a595
parentc1120c7138c10a4cbdadb75d47eb23be9b65c231 (diff)
parent76d8a896bbff612e3d2db27554891c71d28988af (diff)
Merge pull request #3622 from bugadani/next
Allow polling exited tasks
-rw-r--r--embassy-executor/src/raw/mod.rs67
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs1
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs7
-rw-r--r--embassy-executor/src/raw/state_atomics.rs5
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs4
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs8
6 files changed, 45 insertions, 47 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 6503b556f..e38a2af66 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -50,33 +50,32 @@ use super::SpawnToken;
50/// A task's complete life cycle is as follows: 50/// A task's complete life cycle is as follows:
51/// 51///
52/// ```text 52/// ```text
53/// ┌────────────┐ ┌────────────────────────┐ 53/// ┌────────────┐ ┌────────────────────────┐
54/// ��─►��Not spawned │◄─6┤Not spawned|Run enqueued│ 54/// │Not spawned │◄─5┤Not spawned|Run enqueued│
55/// │ 7─►│ │ 55/// │ ├6─►│ │
56/// │ └─────┬──────┘ └──────▲─────────────────┘ 56/// └─────┬──────┘ └──────▲─────────────────┘
57/// 1 │ 57/// 1 │
58/// │ ┌────────────┘ 58/// │ ┌────────────┘
59/// 5 59/// │ 4
60/// �� ��─────▼────┴─────────┐ 60/// ┌─────▼────┴─────────┐
61/// │Spawned|Run enqueued│ 61/// │Spawned|Run enqueued│
62/// │ 62/// │ │
63/// �� �─────┬▲─────────────┘ 63/// └─────┬▲─────────────┘
64/// 2│ 64/// 2│
65/// │3 65/// │3
66/// �� ��─────▼┴─────┐ 66/// ┌─────▼┴─────┐
67/// ��─4┤ Spawned │ 67/// � Spawned │
68/// │ │ 68/// │ │
69/// └────────────┘ 69/// └────────────┘
70/// ``` 70/// ```
71/// 71///
72/// Transitions: 72/// Transitions:
73/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn` 73/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
74/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` 74/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
75/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue` 75/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
76/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` 76/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
77/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` 77/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
78/// - 6: Task is dequeued and then ignored via `State::run_dequeue` 78/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
79/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
80pub(crate) struct TaskHeader { 79pub(crate) struct TaskHeader {
81 pub(crate) state: State, 80 pub(crate) state: State,
82 pub(crate) run_queue_item: RunQueueItem, 81 pub(crate) run_queue_item: RunQueueItem,
@@ -162,6 +161,10 @@ pub struct TaskStorage<F: Future + 'static> {
162 future: UninitCell<F>, // Valid if STATE_SPAWNED 161 future: UninitCell<F>, // Valid if STATE_SPAWNED
163} 162}
164 163
164unsafe fn poll_exited(_p: TaskRef) {
165 // Nothing to do, the task is already !SPAWNED and dequeued.
166}
167
165impl<F: Future + 'static> TaskStorage<F> { 168impl<F: Future + 'static> TaskStorage<F> {
166 const NEW: Self = Self::new(); 169 const NEW: Self = Self::new();
167 170
@@ -203,14 +206,23 @@ impl<F: Future + 'static> TaskStorage<F> {
203 } 206 }
204 207
205 unsafe fn poll(p: TaskRef) { 208 unsafe fn poll(p: TaskRef) {
206 let this = &*(p.as_ptr() as *const TaskStorage<F>); 209 let this = &*p.as_ptr().cast::<TaskStorage<F>>();
207 210
208 let future = Pin::new_unchecked(this.future.as_mut()); 211 let future = Pin::new_unchecked(this.future.as_mut());
209 let waker = waker::from_task(p); 212 let waker = waker::from_task(p);
210 let mut cx = Context::from_waker(&waker); 213 let mut cx = Context::from_waker(&waker);
211 match future.poll(&mut cx) { 214 match future.poll(&mut cx) {
212 Poll::Ready(_) => { 215 Poll::Ready(_) => {
216 // As the future has finished and this function will not be called
217 // again, we can safely drop the future here.
213 this.future.drop_in_place(); 218 this.future.drop_in_place();
219
220 // We replace the poll_fn with a despawn function, so that the task is cleaned up
221 // when the executor polls it next.
222 this.raw.poll_fn.set(Some(poll_exited));
223
224 // Make sure we despawn last, so that other threads can only spawn the task
225 // after we're done with it.
214 this.raw.state.despawn(); 226 this.raw.state.despawn();
215 } 227 }
216 Poll::Pending => {} 228 Poll::Pending => {}
@@ -411,15 +423,6 @@ impl SyncExecutor {
411 self.run_queue.dequeue_all(|p| { 423 self.run_queue.dequeue_all(|p| {
412 let task = p.header(); 424 let task = p.header();
413 425
414 if !task.state.run_dequeue() {
415 // If task is not running, ignore it. This can happen in the following scenario:
416 // - Task gets dequeued, poll starts
417 // - While task is being polled, it gets woken. It gets placed in the queue.
418 // - Task poll finishes, returning done=true
419 // - RUNNING bit is cleared, but the task is already in the queue.
420 return;
421 }
422
423 #[cfg(feature = "trace")] 426 #[cfg(feature = "trace")]
424 trace::task_exec_begin(self, &p); 427 trace::task_exec_begin(self, &p);
425 428
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
index efdafdff0..ce511d79a 100644
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_atomics.rs
@@ -81,6 +81,7 @@ impl RunQueue {
81 // safety: there are no concurrent accesses to `next` 81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() }; 82 next = unsafe { task.header().run_queue_item.next.get() };
83 83
84 task.header().state.run_dequeue();
84 on_task(task); 85 on_task(task);
85 } 86 }
86 } 87 }
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
index 90f09e8c8..86c4085ed 100644
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ b/embassy-executor/src/raw/run_queue_critical_section.rs
@@ -63,9 +63,10 @@ impl RunQueue {
63 // If the task re-enqueues itself, the `next` pointer will get overwritten. 63 // If the task re-enqueues itself, the `next` pointer will get overwritten.
64 // Therefore, first read the next pointer, and only then process the task. 64 // Therefore, first read the next pointer, and only then process the task.
65 65
66 // safety: we know if the task is enqueued, no one else will touch the `next` pointer. 66 critical_section::with(|cs| {
67 let cs = unsafe { CriticalSection::new() }; 67 next = task.header().run_queue_item.next.borrow(cs).get();
68 next = task.header().run_queue_item.next.borrow(cs).get(); 68 task.header().state.run_dequeue(cs);
69 });
69 70
70 on_task(task); 71 on_task(task);
71 } 72 }
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index bdd317b53..b6576bfc2 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -52,8 +52,7 @@ impl State {
52 52
53 /// Unmark the task as run-queued. Return whether the task is spawned. 53 /// Unmark the task as run-queued. Return whether the task is spawned.
54 #[inline(always)] 54 #[inline(always)]
55 pub fn run_dequeue(&self) -> bool { 55 pub fn run_dequeue(&self) {
56 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 56 self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
57 state & STATE_SPAWNED != 0
58 } 57 }
59} 58}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index cbda0d89d..b743dcc2c 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -75,11 +75,9 @@ impl State {
75 75
76 /// Unmark the task as run-queued. Return whether the task is spawned. 76 /// Unmark the task as run-queued. Return whether the task is spawned.
77 #[inline(always)] 77 #[inline(always)]
78 pub fn run_dequeue(&self) -> bool { 78 pub fn run_dequeue(&self) {
79 compiler_fence(Ordering::Release); 79 compiler_fence(Ordering::Release);
80 80
81 let r = self.spawned.load(Ordering::Relaxed);
82 self.run_queued.store(false, Ordering::Relaxed); 81 self.run_queued.store(false, Ordering::Relaxed);
83 r
84 } 82 }
85} 83}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index 4733af278..6b627ff79 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -67,11 +67,7 @@ impl State {
67 67
68 /// Unmark the task as run-queued. Return whether the task is spawned. 68 /// Unmark the task as run-queued. Return whether the task is spawned.
69 #[inline(always)] 69 #[inline(always)]
70 pub fn run_dequeue(&self) -> bool { 70 pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
71 self.update(|s| { 71 self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
72 let ok = *s & STATE_SPAWNED != 0;
73 *s &= !STATE_RUN_QUEUED;
74 ok
75 })
76 } 72 }
77} 73}