aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2023-02-03 06:33:22 +0000
committerGitHub <[email protected]>2023-02-03 06:33:22 +0000
commit7d8e6649b7d3364d363cacf9696bd9f40f6881a8 (patch)
tree91652614a35a512f518ecbd88c05061a07d1ccd6 /embassy-executor/src
parent662a02a557457f09c4f9b1f5320c657991a65fc6 (diff)
parent791fbb3ca0caf81882f67caea9e71adf43496261 (diff)
Merge #1187
1187: executor: Minor refactoring r=Dirbaio a=GrantM11235 The third commit may be slightly more controversial than the first two. Personally, I think it makes the code more readable and easier to reason about, but I can drop it if you disagree. Co-authored-by: Grant Miller <[email protected]>
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/raw/mod.rs111
-rw-r--r--embassy-executor/src/raw/util.rs6
-rw-r--r--embassy-executor/src/spawner.rs12
3 files changed, 65 insertions, 64 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 183c5e6a2..42bd82262 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -15,10 +15,10 @@ mod waker;
15 15
16use core::cell::Cell; 16use core::cell::Cell;
17use core::future::Future; 17use core::future::Future;
18use core::mem;
18use core::pin::Pin; 19use core::pin::Pin;
19use core::ptr::NonNull; 20use core::ptr::NonNull;
20use core::task::{Context, Poll}; 21use core::task::{Context, Poll};
21use core::{mem, ptr};
22 22
23use atomic_polyfill::{AtomicU32, Ordering}; 23use atomic_polyfill::{AtomicU32, Ordering};
24use critical_section::CriticalSection; 24use critical_section::CriticalSection;
@@ -46,8 +46,8 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
46pub(crate) struct TaskHeader { 46pub(crate) struct TaskHeader {
47 pub(crate) state: AtomicU32, 47 pub(crate) state: AtomicU32,
48 pub(crate) run_queue_item: RunQueueItem, 48 pub(crate) run_queue_item: RunQueueItem,
49 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 49 pub(crate) executor: Cell<Option<&'static Executor>>,
50 pub(crate) poll_fn: UninitCell<unsafe fn(TaskRef)>, // Valid if STATE_SPAWNED 50 poll_fn: Cell<Option<unsafe fn(TaskRef)>>,
51 51
52 #[cfg(feature = "integrated-timers")] 52 #[cfg(feature = "integrated-timers")]
53 pub(crate) expires_at: Cell<Instant>, 53 pub(crate) expires_at: Cell<Instant>,
@@ -55,22 +55,6 @@ pub(crate) struct TaskHeader {
55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
56} 56}
57 57
58impl TaskHeader {
59 const fn new() -> Self {
60 Self {
61 state: AtomicU32::new(0),
62 run_queue_item: RunQueueItem::new(),
63 executor: Cell::new(ptr::null()),
64 poll_fn: UninitCell::uninit(),
65
66 #[cfg(feature = "integrated-timers")]
67 expires_at: Cell::new(Instant::from_ticks(0)),
68 #[cfg(feature = "integrated-timers")]
69 timer_queue_item: timer_queue::TimerQueueItem::new(),
70 }
71 }
72}
73
74/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 58/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
75#[derive(Clone, Copy)] 59#[derive(Clone, Copy)]
76pub struct TaskRef { 60pub struct TaskRef {
@@ -128,7 +112,18 @@ impl<F: Future + 'static> TaskStorage<F> {
128 /// Create a new TaskStorage, in not-spawned state. 112 /// Create a new TaskStorage, in not-spawned state.
129 pub const fn new() -> Self { 113 pub const fn new() -> Self {
130 Self { 114 Self {
131 raw: TaskHeader::new(), 115 raw: TaskHeader {
116 state: AtomicU32::new(0),
117 run_queue_item: RunQueueItem::new(),
118 executor: Cell::new(None),
119 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
120 poll_fn: Cell::new(None),
121
122 #[cfg(feature = "integrated-timers")]
123 expires_at: Cell::new(Instant::from_ticks(0)),
124 #[cfg(feature = "integrated-timers")]
125 timer_queue_item: timer_queue::TimerQueueItem::new(),
126 },
132 future: UninitCell::uninit(), 127 future: UninitCell::uninit(),
133 } 128 }
134 } 129 }
@@ -147,26 +142,14 @@ impl<F: Future + 'static> TaskStorage<F> {
147 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 142 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
148 /// on a different executor. 143 /// on a different executor.
149 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 144 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
150 if self.spawn_mark_used() { 145 let task = AvailableTask::claim(self);
151 return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; 146 match task {
147 Some(task) => {
148 let task = task.initialize(future);
149 unsafe { SpawnToken::<F>::new(task) }
150 }
151 None => SpawnToken::new_failed(),
152 } 152 }
153
154 SpawnToken::<F>::new_failed()
155 }
156
157 fn spawn_mark_used(&'static self) -> bool {
158 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
159 self.raw
160 .state
161 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
162 .is_ok()
163 }
164
165 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef {
166 // Initialize the task
167 self.raw.poll_fn.write(Self::poll);
168 self.future.write(future());
169 TaskRef::new(self)
170 } 153 }
171 154
172 unsafe fn poll(p: TaskRef) { 155 unsafe fn poll(p: TaskRef) {
@@ -191,6 +174,28 @@ impl<F: Future + 'static> TaskStorage<F> {
191 174
192unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 175unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
193 176
177struct AvailableTask<F: Future + 'static> {
178 task: &'static TaskStorage<F>,
179}
180
181impl<F: Future + 'static> AvailableTask<F> {
182 fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
183 task.raw
184 .state
185 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
186 .ok()
187 .map(|_| Self { task })
188 }
189
190 fn initialize(self, future: impl FnOnce() -> F) -> TaskRef {
191 unsafe {
192 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
193 self.task.future.write(future());
194 }
195 TaskRef::new(self.task)
196 }
197}
198
194/// Raw storage that can hold up to N tasks of the same type. 199/// Raw storage that can hold up to N tasks of the same type.
195/// 200///
196/// This is essentially a `[TaskStorage<F>; N]`. 201/// This is essentially a `[TaskStorage<F>; N]`.
@@ -214,13 +219,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
214 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 219 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
215 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 220 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
216 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 221 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
217 for task in &self.pool { 222 let task = self.pool.iter().find_map(AvailableTask::claim);
218 if task.spawn_mark_used() { 223 match task {
219 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; 224 Some(task) => {
225 let task = task.initialize(future);
226 unsafe { SpawnToken::<F>::new(task) }
220 } 227 }
228 None => SpawnToken::new_failed(),
221 } 229 }
222
223 SpawnToken::<F>::new_failed()
224 } 230 }
225 231
226 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if 232 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
@@ -262,13 +268,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
262 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly 268 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
263 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. 269 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
264 270
265 for task in &self.pool { 271 let task = self.pool.iter().find_map(AvailableTask::claim);
266 if task.spawn_mark_used() { 272 match task {
267 return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); 273 Some(task) => {
274 let task = task.initialize(future);
275 unsafe { SpawnToken::<FutFn>::new(task) }
268 } 276 }
277 None => SpawnToken::new_failed(),
269 } 278 }
270
271 SpawnToken::<FutFn>::new_failed()
272 } 279 }
273} 280}
274 281
@@ -353,7 +360,7 @@ impl Executor {
353 /// In this case, the task's Future must be Send. This is because this is effectively 360 /// In this case, the task's Future must be Send. This is because this is effectively
354 /// sending the task to the executor thread. 361 /// sending the task to the executor thread.
355 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 362 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
356 task.header().executor.set(self); 363 task.header().executor.set(Some(self));
357 364
358 #[cfg(feature = "rtos-trace")] 365 #[cfg(feature = "rtos-trace")]
359 trace::task_new(task.as_ptr() as u32); 366 trace::task_new(task.as_ptr() as u32);
@@ -405,7 +412,7 @@ impl Executor {
405 trace::task_exec_begin(p.as_ptr() as u32); 412 trace::task_exec_begin(p.as_ptr() as u32);
406 413
407 // Run the task 414 // Run the task
408 task.poll_fn.read()(p); 415 task.poll_fn.get().unwrap_unchecked()(p);
409 416
410 #[cfg(feature = "rtos-trace")] 417 #[cfg(feature = "rtos-trace")]
411 trace::task_exec_end(); 418 trace::task_exec_end();
@@ -462,7 +469,7 @@ pub fn wake_task(task: TaskRef) {
462 469
463 // We have just marked the task as scheduled, so enqueue it. 470 // We have just marked the task as scheduled, so enqueue it.
464 unsafe { 471 unsafe {
465 let executor = &*header.executor.get(); 472 let executor = header.executor.get().unwrap_unchecked();
466 executor.enqueue(cs, task); 473 executor.enqueue(cs, task);
467 } 474 }
468 }) 475 })
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index ed5822188..2b1f6b6f3 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -25,9 +25,3 @@ impl<T> UninitCell<T> {
25 ptr::drop_in_place(self.as_mut_ptr()) 25 ptr::drop_in_place(self.as_mut_ptr())
26 } 26 }
27} 27}
28
29impl<T: Copy> UninitCell<T> {
30 pub unsafe fn read(&self) -> T {
31 ptr::read(self.as_mut_ptr())
32 }
33}
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 650ea06cb..7c0a0183c 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -89,10 +89,10 @@ impl Spawner {
89 /// 89 ///
90 /// Panics if the current executor is not an Embassy executor. 90 /// Panics if the current executor is not an Embassy executor.
91 pub async fn for_current_executor() -> Self { 91 pub async fn for_current_executor() -> Self {
92 poll_fn(|cx| unsafe { 92 poll_fn(|cx| {
93 let task = raw::task_from_waker(cx.waker()); 93 let task = raw::task_from_waker(cx.waker());
94 let executor = task.header().executor.get(); 94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
95 Poll::Ready(Self::new(&*executor)) 95 Poll::Ready(Self::new(executor))
96 }) 96 })
97 .await 97 .await
98 } 98 }
@@ -165,10 +165,10 @@ impl SendSpawner {
165 /// 165 ///
166 /// Panics if the current executor is not an Embassy executor. 166 /// Panics if the current executor is not an Embassy executor.
167 pub async fn for_current_executor() -> Self { 167 pub async fn for_current_executor() -> Self {
168 poll_fn(|cx| unsafe { 168 poll_fn(|cx| {
169 let task = raw::task_from_waker(cx.waker()); 169 let task = raw::task_from_waker(cx.waker());
170 let executor = task.header().executor.get(); 170 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
171 Poll::Ready(Self::new(&*executor)) 171 Poll::Ready(Self::new(executor))
172 }) 172 })
173 .await 173 .await
174 } 174 }