diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2023-02-03 06:33:22 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-03 06:33:22 +0000 |
| commit | 7d8e6649b7d3364d363cacf9696bd9f40f6881a8 (patch) | |
| tree | 91652614a35a512f518ecbd88c05061a07d1ccd6 /embassy-executor/src | |
| parent | 662a02a557457f09c4f9b1f5320c657991a65fc6 (diff) | |
| parent | 791fbb3ca0caf81882f67caea9e71adf43496261 (diff) | |
Merge #1187
1187: executor: Minor refactoring r=Dirbaio a=GrantM11235
The third commit may be slightly more controversial than the first two. Personally, I think it makes the code more readable and easier to reason about, but I can drop it if you disagree.
Co-authored-by: Grant Miller <[email protected]>
Diffstat (limited to 'embassy-executor/src')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 111 | ||||
| -rw-r--r-- | embassy-executor/src/raw/util.rs | 6 | ||||
| -rw-r--r-- | embassy-executor/src/spawner.rs | 12 |
3 files changed, 65 insertions, 64 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 183c5e6a2..42bd82262 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -15,10 +15,10 @@ mod waker; | |||
| 15 | 15 | ||
| 16 | use core::cell::Cell; | 16 | use core::cell::Cell; |
| 17 | use core::future::Future; | 17 | use core::future::Future; |
| 18 | use core::mem; | ||
| 18 | use core::pin::Pin; | 19 | use core::pin::Pin; |
| 19 | use core::ptr::NonNull; | 20 | use core::ptr::NonNull; |
| 20 | use core::task::{Context, Poll}; | 21 | use core::task::{Context, Poll}; |
| 21 | use core::{mem, ptr}; | ||
| 22 | 22 | ||
| 23 | use atomic_polyfill::{AtomicU32, Ordering}; | 23 | use atomic_polyfill::{AtomicU32, Ordering}; |
| 24 | use critical_section::CriticalSection; | 24 | use critical_section::CriticalSection; |
| @@ -46,8 +46,8 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | |||
| 46 | pub(crate) struct TaskHeader { | 46 | pub(crate) struct TaskHeader { |
| 47 | pub(crate) state: AtomicU32, | 47 | pub(crate) state: AtomicU32, |
| 48 | pub(crate) run_queue_item: RunQueueItem, | 48 | pub(crate) run_queue_item: RunQueueItem, |
| 49 | pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 | 49 | pub(crate) executor: Cell<Option<&'static Executor>>, |
| 50 | pub(crate) poll_fn: UninitCell<unsafe fn(TaskRef)>, // Valid if STATE_SPAWNED | 50 | poll_fn: Cell<Option<unsafe fn(TaskRef)>>, |
| 51 | 51 | ||
| 52 | #[cfg(feature = "integrated-timers")] | 52 | #[cfg(feature = "integrated-timers")] |
| 53 | pub(crate) expires_at: Cell<Instant>, | 53 | pub(crate) expires_at: Cell<Instant>, |
| @@ -55,22 +55,6 @@ pub(crate) struct TaskHeader { | |||
| 55 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 55 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 56 | } | 56 | } |
| 57 | 57 | ||
| 58 | impl TaskHeader { | ||
| 59 | const fn new() -> Self { | ||
| 60 | Self { | ||
| 61 | state: AtomicU32::new(0), | ||
| 62 | run_queue_item: RunQueueItem::new(), | ||
| 63 | executor: Cell::new(ptr::null()), | ||
| 64 | poll_fn: UninitCell::uninit(), | ||
| 65 | |||
| 66 | #[cfg(feature = "integrated-timers")] | ||
| 67 | expires_at: Cell::new(Instant::from_ticks(0)), | ||
| 68 | #[cfg(feature = "integrated-timers")] | ||
| 69 | timer_queue_item: timer_queue::TimerQueueItem::new(), | ||
| 70 | } | ||
| 71 | } | ||
| 72 | } | ||
| 73 | |||
| 74 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. | 58 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 75 | #[derive(Clone, Copy)] | 59 | #[derive(Clone, Copy)] |
| 76 | pub struct TaskRef { | 60 | pub struct TaskRef { |
| @@ -128,7 +112,18 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 128 | /// Create a new TaskStorage, in not-spawned state. | 112 | /// Create a new TaskStorage, in not-spawned state. |
| 129 | pub const fn new() -> Self { | 113 | pub const fn new() -> Self { |
| 130 | Self { | 114 | Self { |
| 131 | raw: TaskHeader::new(), | 115 | raw: TaskHeader { |
| 116 | state: AtomicU32::new(0), | ||
| 117 | run_queue_item: RunQueueItem::new(), | ||
| 118 | executor: Cell::new(None), | ||
| 119 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | ||
| 120 | poll_fn: Cell::new(None), | ||
| 121 | |||
| 122 | #[cfg(feature = "integrated-timers")] | ||
| 123 | expires_at: Cell::new(Instant::from_ticks(0)), | ||
| 124 | #[cfg(feature = "integrated-timers")] | ||
| 125 | timer_queue_item: timer_queue::TimerQueueItem::new(), | ||
| 126 | }, | ||
| 132 | future: UninitCell::uninit(), | 127 | future: UninitCell::uninit(), |
| 133 | } | 128 | } |
| 134 | } | 129 | } |
| @@ -147,26 +142,14 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 147 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it | 142 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it |
| 148 | /// on a different executor. | 143 | /// on a different executor. |
| 149 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | 144 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 150 | if self.spawn_mark_used() { | 145 | let task = AvailableTask::claim(self); |
| 151 | return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; | 146 | match task { |
| 147 | Some(task) => { | ||
| 148 | let task = task.initialize(future); | ||
| 149 | unsafe { SpawnToken::<F>::new(task) } | ||
| 150 | } | ||
| 151 | None => SpawnToken::new_failed(), | ||
| 152 | } | 152 | } |
| 153 | |||
| 154 | SpawnToken::<F>::new_failed() | ||
| 155 | } | ||
| 156 | |||
| 157 | fn spawn_mark_used(&'static self) -> bool { | ||
| 158 | let state = STATE_SPAWNED | STATE_RUN_QUEUED; | ||
| 159 | self.raw | ||
| 160 | .state | ||
| 161 | .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) | ||
| 162 | .is_ok() | ||
| 163 | } | ||
| 164 | |||
| 165 | unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> TaskRef { | ||
| 166 | // Initialize the task | ||
| 167 | self.raw.poll_fn.write(Self::poll); | ||
| 168 | self.future.write(future()); | ||
| 169 | TaskRef::new(self) | ||
| 170 | } | 153 | } |
| 171 | 154 | ||
| 172 | unsafe fn poll(p: TaskRef) { | 155 | unsafe fn poll(p: TaskRef) { |
| @@ -191,6 +174,28 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 191 | 174 | ||
| 192 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} | 175 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} |
| 193 | 176 | ||
| 177 | struct AvailableTask<F: Future + 'static> { | ||
| 178 | task: &'static TaskStorage<F>, | ||
| 179 | } | ||
| 180 | |||
| 181 | impl<F: Future + 'static> AvailableTask<F> { | ||
| 182 | fn claim(task: &'static TaskStorage<F>) -> Option<Self> { | ||
| 183 | task.raw | ||
| 184 | .state | ||
| 185 | .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) | ||
| 186 | .ok() | ||
| 187 | .map(|_| Self { task }) | ||
| 188 | } | ||
| 189 | |||
| 190 | fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { | ||
| 191 | unsafe { | ||
| 192 | self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); | ||
| 193 | self.task.future.write(future()); | ||
| 194 | } | ||
| 195 | TaskRef::new(self.task) | ||
| 196 | } | ||
| 197 | } | ||
| 198 | |||
| 194 | /// Raw storage that can hold up to N tasks of the same type. | 199 | /// Raw storage that can hold up to N tasks of the same type. |
| 195 | /// | 200 | /// |
| 196 | /// This is essentially a `[TaskStorage<F>; N]`. | 201 | /// This is essentially a `[TaskStorage<F>; N]`. |
| @@ -214,13 +219,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 214 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, | 219 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, |
| 215 | /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. | 220 | /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. |
| 216 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | 221 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 217 | for task in &self.pool { | 222 | let task = self.pool.iter().find_map(AvailableTask::claim); |
| 218 | if task.spawn_mark_used() { | 223 | match task { |
| 219 | return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; | 224 | Some(task) => { |
| 225 | let task = task.initialize(future); | ||
| 226 | unsafe { SpawnToken::<F>::new(task) } | ||
| 220 | } | 227 | } |
| 228 | None => SpawnToken::new_failed(), | ||
| 221 | } | 229 | } |
| 222 | |||
| 223 | SpawnToken::<F>::new_failed() | ||
| 224 | } | 230 | } |
| 225 | 231 | ||
| 226 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if | 232 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if |
| @@ -262,13 +268,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 262 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly | 268 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly |
| 263 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. | 269 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. |
| 264 | 270 | ||
| 265 | for task in &self.pool { | 271 | let task = self.pool.iter().find_map(AvailableTask::claim); |
| 266 | if task.spawn_mark_used() { | 272 | match task { |
| 267 | return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); | 273 | Some(task) => { |
| 274 | let task = task.initialize(future); | ||
| 275 | unsafe { SpawnToken::<FutFn>::new(task) } | ||
| 268 | } | 276 | } |
| 277 | None => SpawnToken::new_failed(), | ||
| 269 | } | 278 | } |
| 270 | |||
| 271 | SpawnToken::<FutFn>::new_failed() | ||
| 272 | } | 279 | } |
| 273 | } | 280 | } |
| 274 | 281 | ||
| @@ -353,7 +360,7 @@ impl Executor { | |||
| 353 | /// In this case, the task's Future must be Send. This is because this is effectively | 360 | /// In this case, the task's Future must be Send. This is because this is effectively |
| 354 | /// sending the task to the executor thread. | 361 | /// sending the task to the executor thread. |
| 355 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | 362 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 356 | task.header().executor.set(self); | 363 | task.header().executor.set(Some(self)); |
| 357 | 364 | ||
| 358 | #[cfg(feature = "rtos-trace")] | 365 | #[cfg(feature = "rtos-trace")] |
| 359 | trace::task_new(task.as_ptr() as u32); | 366 | trace::task_new(task.as_ptr() as u32); |
| @@ -405,7 +412,7 @@ impl Executor { | |||
| 405 | trace::task_exec_begin(p.as_ptr() as u32); | 412 | trace::task_exec_begin(p.as_ptr() as u32); |
| 406 | 413 | ||
| 407 | // Run the task | 414 | // Run the task |
| 408 | task.poll_fn.read()(p); | 415 | task.poll_fn.get().unwrap_unchecked()(p); |
| 409 | 416 | ||
| 410 | #[cfg(feature = "rtos-trace")] | 417 | #[cfg(feature = "rtos-trace")] |
| 411 | trace::task_exec_end(); | 418 | trace::task_exec_end(); |
| @@ -462,7 +469,7 @@ pub fn wake_task(task: TaskRef) { | |||
| 462 | 469 | ||
| 463 | // We have just marked the task as scheduled, so enqueue it. | 470 | // We have just marked the task as scheduled, so enqueue it. |
| 464 | unsafe { | 471 | unsafe { |
| 465 | let executor = &*header.executor.get(); | 472 | let executor = header.executor.get().unwrap_unchecked(); |
| 466 | executor.enqueue(cs, task); | 473 | executor.enqueue(cs, task); |
| 467 | } | 474 | } |
| 468 | }) | 475 | }) |
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index ed5822188..2b1f6b6f3 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs | |||
| @@ -25,9 +25,3 @@ impl<T> UninitCell<T> { | |||
| 25 | ptr::drop_in_place(self.as_mut_ptr()) | 25 | ptr::drop_in_place(self.as_mut_ptr()) |
| 26 | } | 26 | } |
| 27 | } | 27 | } |
| 28 | |||
| 29 | impl<T: Copy> UninitCell<T> { | ||
| 30 | pub unsafe fn read(&self) -> T { | ||
| 31 | ptr::read(self.as_mut_ptr()) | ||
| 32 | } | ||
| 33 | } | ||
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 650ea06cb..7c0a0183c 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs | |||
| @@ -89,10 +89,10 @@ impl Spawner { | |||
| 89 | /// | 89 | /// |
| 90 | /// Panics if the current executor is not an Embassy executor. | 90 | /// Panics if the current executor is not an Embassy executor. |
| 91 | pub async fn for_current_executor() -> Self { | 91 | pub async fn for_current_executor() -> Self { |
| 92 | poll_fn(|cx| unsafe { | 92 | poll_fn(|cx| { |
| 93 | let task = raw::task_from_waker(cx.waker()); | 93 | let task = raw::task_from_waker(cx.waker()); |
| 94 | let executor = task.header().executor.get(); | 94 | let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; |
| 95 | Poll::Ready(Self::new(&*executor)) | 95 | Poll::Ready(Self::new(executor)) |
| 96 | }) | 96 | }) |
| 97 | .await | 97 | .await |
| 98 | } | 98 | } |
| @@ -165,10 +165,10 @@ impl SendSpawner { | |||
| 165 | /// | 165 | /// |
| 166 | /// Panics if the current executor is not an Embassy executor. | 166 | /// Panics if the current executor is not an Embassy executor. |
| 167 | pub async fn for_current_executor() -> Self { | 167 | pub async fn for_current_executor() -> Self { |
| 168 | poll_fn(|cx| unsafe { | 168 | poll_fn(|cx| { |
| 169 | let task = raw::task_from_waker(cx.waker()); | 169 | let task = raw::task_from_waker(cx.waker()); |
| 170 | let executor = task.header().executor.get(); | 170 | let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; |
| 171 | Poll::Ready(Self::new(&*executor)) | 171 | Poll::Ready(Self::new(executor)) |
| 172 | }) | 172 | }) |
| 173 | .await | 173 | .await |
| 174 | } | 174 | } |
