diff options
| author | Dario Nieuwenhuis <[email protected]> | 2023-11-14 22:32:48 +0100 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2023-11-15 18:43:27 +0100 |
| commit | bef9b7a8539c3dddb1cf6ab46db161f1ca56b1a1 (patch) | |
| tree | 6d15736eec0029c13093bee120bd2189aa9537ac /embassy-executor/src | |
| parent | 50a983fd9b8f10fa5153757593e9f8cfccc902ac (diff) | |
executor: remove atomic-polyfill.
Diffstat (limited to 'embassy-executor/src')
| -rw-r--r-- | embassy-executor/src/arch/cortex_m.rs | 21 | ||||
| -rw-r--r-- | embassy-executor/src/arch/riscv32.rs | 2 | ||||
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 58 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_atomics.rs (renamed from embassy-executor/src/raw/run_queue.rs) | 3 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_critical_section.rs | 75 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 73 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_critical_section.rs | 93 | ||||
| -rw-r--r-- | embassy-executor/src/raw/timer_queue.rs | 10 |
8 files changed, 270 insertions, 65 deletions
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index fde862f3c..55299c94f 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs | |||
| @@ -115,12 +115,12 @@ mod thread { | |||
| 115 | pub use interrupt::*; | 115 | pub use interrupt::*; |
| 116 | #[cfg(feature = "executor-interrupt")] | 116 | #[cfg(feature = "executor-interrupt")] |
| 117 | mod interrupt { | 117 | mod interrupt { |
| 118 | use core::cell::UnsafeCell; | 118 | use core::cell::{Cell, UnsafeCell}; |
| 119 | use core::mem::MaybeUninit; | 119 | use core::mem::MaybeUninit; |
| 120 | 120 | ||
| 121 | use atomic_polyfill::{AtomicBool, Ordering}; | ||
| 122 | use cortex_m::interrupt::InterruptNumber; | 121 | use cortex_m::interrupt::InterruptNumber; |
| 123 | use cortex_m::peripheral::NVIC; | 122 | use cortex_m::peripheral::NVIC; |
| 123 | use critical_section::Mutex; | ||
| 124 | 124 | ||
| 125 | use crate::raw; | 125 | use crate::raw; |
| 126 | 126 | ||
| @@ -146,7 +146,7 @@ mod interrupt { | |||
| 146 | /// It is somewhat more complex to use, it's recommended to use the thread-mode | 146 | /// It is somewhat more complex to use, it's recommended to use the thread-mode |
| 147 | /// [`Executor`] instead, if it works for your use case. | 147 | /// [`Executor`] instead, if it works for your use case. |
| 148 | pub struct InterruptExecutor { | 148 | pub struct InterruptExecutor { |
| 149 | started: AtomicBool, | 149 | started: Mutex<Cell<bool>>, |
| 150 | executor: UnsafeCell<MaybeUninit<raw::Executor>>, | 150 | executor: UnsafeCell<MaybeUninit<raw::Executor>>, |
| 151 | } | 151 | } |
| 152 | 152 | ||
| @@ -158,7 +158,7 @@ mod interrupt { | |||
| 158 | #[inline] | 158 | #[inline] |
| 159 | pub const fn new() -> Self { | 159 | pub const fn new() -> Self { |
| 160 | Self { | 160 | Self { |
| 161 | started: AtomicBool::new(false), | 161 | started: Mutex::new(Cell::new(false)), |
| 162 | executor: UnsafeCell::new(MaybeUninit::uninit()), | 162 | executor: UnsafeCell::new(MaybeUninit::uninit()), |
| 163 | } | 163 | } |
| 164 | } | 164 | } |
| @@ -167,7 +167,8 @@ mod interrupt { | |||
| 167 | /// | 167 | /// |
| 168 | /// # Safety | 168 | /// # Safety |
| 169 | /// | 169 | /// |
| 170 | /// You MUST call this from the interrupt handler, and from nowhere else. | 170 | /// - You MUST call this from the interrupt handler, and from nowhere else. |
| 171 | /// - You must not call this before calling `start()`. | ||
| 171 | pub unsafe fn on_interrupt(&'static self) { | 172 | pub unsafe fn on_interrupt(&'static self) { |
| 172 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | 173 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; |
| 173 | executor.poll(); | 174 | executor.poll(); |
| @@ -196,11 +197,7 @@ mod interrupt { | |||
| 196 | /// do it after. | 197 | /// do it after. |
| 197 | /// | 198 | /// |
| 198 | pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { | 199 | pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { |
| 199 | if self | 200 | if critical_section::with(|cs| self.started.borrow(cs).replace(true)) { |
| 200 | .started | ||
| 201 | .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) | ||
| 202 | .is_err() | ||
| 203 | { | ||
| 204 | panic!("InterruptExecutor::start() called multiple times on the same executor."); | 201 | panic!("InterruptExecutor::start() called multiple times on the same executor."); |
| 205 | } | 202 | } |
| 206 | 203 | ||
| @@ -222,10 +219,10 @@ mod interrupt { | |||
| 222 | /// This returns a [`SendSpawner`] you can use to spawn tasks on this | 219 | /// This returns a [`SendSpawner`] you can use to spawn tasks on this |
| 223 | /// executor. | 220 | /// executor. |
| 224 | /// | 221 | /// |
| 225 | /// This MUST only be called on an executor that has already been spawned. | 222 | /// This MUST only be called on an executor that has already been started. |
| 226 | /// The function will panic otherwise. | 223 | /// The function will panic otherwise. |
| 227 | pub fn spawner(&'static self) -> crate::SendSpawner { | 224 | pub fn spawner(&'static self) -> crate::SendSpawner { |
| 228 | if !self.started.load(Ordering::Acquire) { | 225 | if !critical_section::with(|cs| self.started.borrow(cs).get()) { |
| 229 | panic!("InterruptExecutor::spawner() called on uninitialized executor."); | 226 | panic!("InterruptExecutor::spawner() called on uninitialized executor."); |
| 230 | } | 227 | } |
| 231 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | 228 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; |
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index e5c0ff2ec..6814e7844 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs | |||
| @@ -7,9 +7,9 @@ pub use thread::*; | |||
| 7 | mod thread { | 7 | mod thread { |
| 8 | use core::marker::PhantomData; | 8 | use core::marker::PhantomData; |
| 9 | 9 | ||
| 10 | use atomic_polyfill::{AtomicBool, Ordering}; | ||
| 11 | #[cfg(feature = "nightly")] | 10 | #[cfg(feature = "nightly")] |
| 12 | pub use embassy_macros::main_riscv as main; | 11 | pub use embassy_macros::main_riscv as main; |
| 12 | use portable_atomic::{AtomicBool, Ordering}; | ||
| 13 | 13 | ||
| 14 | use crate::{raw, Spawner}; | 14 | use crate::{raw, Spawner}; |
| 15 | 15 | ||
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6d2c1c18a..ed0bedd25 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -7,7 +7,14 @@ | |||
| 7 | //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe | 7 | //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe |
| 8 | //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. | 8 | //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. |
| 9 | 9 | ||
| 10 | #[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] | ||
| 11 | #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] | ||
| 10 | mod run_queue; | 12 | mod run_queue; |
| 13 | |||
| 14 | #[cfg_attr(target_has_atomic = "8", path = "state_atomics.rs")] | ||
| 15 | #[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] | ||
| 16 | mod state; | ||
| 17 | |||
| 11 | #[cfg(feature = "integrated-timers")] | 18 | #[cfg(feature = "integrated-timers")] |
| 12 | mod timer_queue; | 19 | mod timer_queue; |
| 13 | pub(crate) mod util; | 20 | pub(crate) mod util; |
| @@ -21,7 +28,6 @@ use core::pin::Pin; | |||
| 21 | use core::ptr::NonNull; | 28 | use core::ptr::NonNull; |
| 22 | use core::task::{Context, Poll}; | 29 | use core::task::{Context, Poll}; |
| 23 | 30 | ||
| 24 | use atomic_polyfill::{AtomicU32, Ordering}; | ||
| 25 | #[cfg(feature = "integrated-timers")] | 31 | #[cfg(feature = "integrated-timers")] |
| 26 | use embassy_time::driver::{self, AlarmHandle}; | 32 | use embassy_time::driver::{self, AlarmHandle}; |
| 27 | #[cfg(feature = "integrated-timers")] | 33 | #[cfg(feature = "integrated-timers")] |
| @@ -30,21 +36,14 @@ use embassy_time::Instant; | |||
| 30 | use rtos_trace::trace; | 36 | use rtos_trace::trace; |
| 31 | 37 | ||
| 32 | use self::run_queue::{RunQueue, RunQueueItem}; | 38 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 39 | use self::state::State; | ||
| 33 | use self::util::{SyncUnsafeCell, UninitCell}; | 40 | use self::util::{SyncUnsafeCell, UninitCell}; |
| 34 | pub use self::waker::task_from_waker; | 41 | pub use self::waker::task_from_waker; |
| 35 | use super::SpawnToken; | 42 | use super::SpawnToken; |
| 36 | 43 | ||
| 37 | /// Task is spawned (has a future) | ||
| 38 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | ||
| 39 | /// Task is in the executor run queue | ||
| 40 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | ||
| 41 | /// Task is in the executor timer queue | ||
| 42 | #[cfg(feature = "integrated-timers")] | ||
| 43 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | ||
| 44 | |||
| 45 | /// Raw task header for use in task pointers. | 44 | /// Raw task header for use in task pointers. |
| 46 | pub(crate) struct TaskHeader { | 45 | pub(crate) struct TaskHeader { |
| 47 | pub(crate) state: AtomicU32, | 46 | pub(crate) state: State, |
| 48 | pub(crate) run_queue_item: RunQueueItem, | 47 | pub(crate) run_queue_item: RunQueueItem, |
| 49 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, | 48 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, |
| 50 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, | 49 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| @@ -116,7 +115,7 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 116 | pub const fn new() -> Self { | 115 | pub const fn new() -> Self { |
| 117 | Self { | 116 | Self { |
| 118 | raw: TaskHeader { | 117 | raw: TaskHeader { |
| 119 | state: AtomicU32::new(0), | 118 | state: State::new(), |
| 120 | run_queue_item: RunQueueItem::new(), | 119 | run_queue_item: RunQueueItem::new(), |
| 121 | executor: SyncUnsafeCell::new(None), | 120 | executor: SyncUnsafeCell::new(None), |
| 122 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 121 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| @@ -161,7 +160,7 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 161 | match future.poll(&mut cx) { | 160 | match future.poll(&mut cx) { |
| 162 | Poll::Ready(_) => { | 161 | Poll::Ready(_) => { |
| 163 | this.future.drop_in_place(); | 162 | this.future.drop_in_place(); |
| 164 | this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); | 163 | this.raw.state.despawn(); |
| 165 | 164 | ||
| 166 | #[cfg(feature = "integrated-timers")] | 165 | #[cfg(feature = "integrated-timers")] |
| 167 | this.raw.expires_at.set(Instant::MAX); | 166 | this.raw.expires_at.set(Instant::MAX); |
| @@ -193,11 +192,7 @@ impl<F: Future + 'static> AvailableTask<F> { | |||
| 193 | /// | 192 | /// |
| 194 | /// This function returns `None` if a task has already been spawned and has not finished running. | 193 | /// This function returns `None` if a task has already been spawned and has not finished running. |
| 195 | pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> { | 194 | pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> { |
| 196 | task.raw | 195 | task.raw.state.spawn().then(|| Self { task }) |
| 197 | .state | ||
| 198 | .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) | ||
| 199 | .ok() | ||
| 200 | .map(|_| Self { task }) | ||
| 201 | } | 196 | } |
| 202 | 197 | ||
| 203 | fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { | 198 | fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { |
| @@ -394,8 +389,7 @@ impl SyncExecutor { | |||
| 394 | #[cfg(feature = "integrated-timers")] | 389 | #[cfg(feature = "integrated-timers")] |
| 395 | task.expires_at.set(Instant::MAX); | 390 | task.expires_at.set(Instant::MAX); |
| 396 | 391 | ||
| 397 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | 392 | if !task.state.run_dequeue() { |
| 398 | if state & STATE_SPAWNED == 0 { | ||
| 399 | // If task is not running, ignore it. This can happen in the following scenario: | 393 | // If task is not running, ignore it. This can happen in the following scenario: |
| 400 | // - Task gets dequeued, poll starts | 394 | // - Task gets dequeued, poll starts |
| 401 | // - While task is being polled, it gets woken. It gets placed in the queue. | 395 | // - While task is being polled, it gets woken. It gets placed in the queue. |
| @@ -546,18 +540,7 @@ impl Executor { | |||
| 546 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. | 540 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. |
| 547 | pub fn wake_task(task: TaskRef) { | 541 | pub fn wake_task(task: TaskRef) { |
| 548 | let header = task.header(); | 542 | let header = task.header(); |
| 549 | 543 | if header.state.run_enqueue() { | |
| 550 | let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 551 | // If already scheduled, or if not started, | ||
| 552 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { | ||
| 553 | None | ||
| 554 | } else { | ||
| 555 | // Mark it as scheduled | ||
| 556 | Some(state | STATE_RUN_QUEUED) | ||
| 557 | } | ||
| 558 | }); | ||
| 559 | |||
| 560 | if res.is_ok() { | ||
| 561 | // We have just marked the task as scheduled, so enqueue it. | 544 | // We have just marked the task as scheduled, so enqueue it. |
| 562 | unsafe { | 545 | unsafe { |
| 563 | let executor = header.executor.get().unwrap_unchecked(); | 546 | let executor = header.executor.get().unwrap_unchecked(); |
| @@ -571,18 +554,7 @@ pub fn wake_task(task: TaskRef) { | |||
| 571 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. | 554 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. |
| 572 | pub fn wake_task_no_pend(task: TaskRef) { | 555 | pub fn wake_task_no_pend(task: TaskRef) { |
| 573 | let header = task.header(); | 556 | let header = task.header(); |
| 574 | 557 | if header.state.run_enqueue() { | |
| 575 | let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 576 | // If already scheduled, or if not started, | ||
| 577 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { | ||
| 578 | None | ||
| 579 | } else { | ||
| 580 | // Mark it as scheduled | ||
| 581 | Some(state | STATE_RUN_QUEUED) | ||
| 582 | } | ||
| 583 | }); | ||
| 584 | |||
| 585 | if res.is_ok() { | ||
| 586 | // We have just marked the task as scheduled, so enqueue it. | 558 | // We have just marked the task as scheduled, so enqueue it. |
| 587 | unsafe { | 559 | unsafe { |
| 588 | let executor = header.executor.get().unwrap_unchecked(); | 560 | let executor = header.executor.get().unwrap_unchecked(); |
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue_atomics.rs index f1ec19ac1..90907cfda 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs | |||
| @@ -1,7 +1,6 @@ | |||
| 1 | use core::ptr; | 1 | use core::ptr; |
| 2 | use core::ptr::NonNull; | 2 | use core::ptr::NonNull; |
| 3 | 3 | use core::sync::atomic::{AtomicPtr, Ordering}; | |
| 4 | use atomic_polyfill::{AtomicPtr, Ordering}; | ||
| 5 | 4 | ||
| 6 | use super::{TaskHeader, TaskRef}; | 5 | use super::{TaskHeader, TaskRef}; |
| 7 | use crate::raw::util::SyncUnsafeCell; | 6 | use crate::raw::util::SyncUnsafeCell; |
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs new file mode 100644 index 000000000..ba59c8f29 --- /dev/null +++ b/embassy-executor/src/raw/run_queue_critical_section.rs | |||
| @@ -0,0 +1,75 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | |||
| 3 | use critical_section::{CriticalSection, Mutex}; | ||
| 4 | |||
| 5 | use super::TaskRef; | ||
| 6 | |||
| 7 | pub(crate) struct RunQueueItem { | ||
| 8 | next: Mutex<Cell<Option<TaskRef>>>, | ||
| 9 | } | ||
| 10 | |||
| 11 | impl RunQueueItem { | ||
| 12 | pub const fn new() -> Self { | ||
| 13 | Self { | ||
| 14 | next: Mutex::new(Cell::new(None)), | ||
| 15 | } | ||
| 16 | } | ||
| 17 | } | ||
| 18 | |||
| 19 | /// Atomic task queue using a very, very simple lock-free linked-list queue: | ||
| 20 | /// | ||
| 21 | /// To enqueue a task, task.next is set to the old head, and head is atomically set to task. | ||
| 22 | /// | ||
| 23 | /// Dequeuing is done in batches: the queue is emptied by atomically replacing head with | ||
| 24 | /// null. Then the batch is iterated following the next pointers until null is reached. | ||
| 25 | /// | ||
| 26 | /// Note that batches will be iterated in the reverse order as they were enqueued. This is OK | ||
| 27 | /// for our purposes: it can't create fairness problems since the next batch won't run until the | ||
| 28 | /// current batch is completely processed, so even if a task enqueues itself instantly (for example | ||
| 29 | /// by waking its own waker) can't prevent other tasks from running. | ||
| 30 | pub(crate) struct RunQueue { | ||
| 31 | head: Mutex<Cell<Option<TaskRef>>>, | ||
| 32 | } | ||
| 33 | |||
| 34 | impl RunQueue { | ||
| 35 | pub const fn new() -> Self { | ||
| 36 | Self { | ||
| 37 | head: Mutex::new(Cell::new(None)), | ||
| 38 | } | ||
| 39 | } | ||
| 40 | |||
| 41 | /// Enqueues an item. Returns true if the queue was empty. | ||
| 42 | /// | ||
| 43 | /// # Safety | ||
| 44 | /// | ||
| 45 | /// `item` must NOT be already enqueued in any queue. | ||
| 46 | #[inline(always)] | ||
| 47 | pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { | ||
| 48 | critical_section::with(|cs| { | ||
| 49 | let prev = self.head.borrow(cs).replace(Some(task)); | ||
| 50 | task.header().run_queue_item.next.borrow(cs).set(prev); | ||
| 51 | |||
| 52 | prev.is_none() | ||
| 53 | }) | ||
| 54 | } | ||
| 55 | |||
| 56 | /// Empty the queue, then call `on_task` for each task that was in the queue. | ||
| 57 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue | ||
| 58 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. | ||
| 59 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { | ||
| 60 | // Atomically empty the queue. | ||
| 61 | let mut next = critical_section::with(|cs| self.head.borrow(cs).take()); | ||
| 62 | |||
| 63 | // Iterate the linked list of tasks that were previously in the queue. | ||
| 64 | while let Some(task) = next { | ||
| 65 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | ||
| 66 | // Therefore, first read the next pointer, and only then process the task. | ||
| 67 | |||
| 68 | // safety: we know if the task is enqueued, no one else will touch the `next` pointer. | ||
| 69 | let cs = unsafe { CriticalSection::new() }; | ||
| 70 | next = task.header().run_queue_item.next.borrow(cs).get(); | ||
| 71 | |||
| 72 | on_task(task); | ||
| 73 | } | ||
| 74 | } | ||
| 75 | } | ||
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs new file mode 100644 index 000000000..e1279ac0b --- /dev/null +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -0,0 +1,73 @@ | |||
| 1 | use core::sync::atomic::{AtomicU32, Ordering}; | ||
| 2 | |||
| 3 | /// Task is spawned (has a future) | ||
| 4 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | ||
| 5 | /// Task is in the executor run queue | ||
| 6 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | ||
| 7 | /// Task is in the executor timer queue | ||
| 8 | #[cfg(feature = "integrated-timers")] | ||
| 9 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | ||
| 10 | |||
| 11 | pub(crate) struct State { | ||
| 12 | state: AtomicU32, | ||
| 13 | } | ||
| 14 | |||
| 15 | impl State { | ||
| 16 | pub const fn new() -> State { | ||
| 17 | Self { | ||
| 18 | state: AtomicU32::new(0), | ||
| 19 | } | ||
| 20 | } | ||
| 21 | |||
| 22 | /// If task is idle, mark it as spawned + run_queued and return true. | ||
| 23 | #[inline(always)] | ||
| 24 | pub fn spawn(&self) -> bool { | ||
| 25 | self.state | ||
| 26 | .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) | ||
| 27 | .is_ok() | ||
| 28 | } | ||
| 29 | |||
| 30 | /// Unmark the task as spawned. | ||
| 31 | #[inline(always)] | ||
| 32 | pub fn despawn(&self) { | ||
| 33 | self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); | ||
| 34 | } | ||
| 35 | |||
| 36 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. | ||
| 37 | #[inline(always)] | ||
| 38 | pub fn run_enqueue(&self) -> bool { | ||
| 39 | self.state | ||
| 40 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 41 | // If already scheduled, or if not started, | ||
| 42 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { | ||
| 43 | None | ||
| 44 | } else { | ||
| 45 | // Mark it as scheduled | ||
| 46 | Some(state | STATE_RUN_QUEUED) | ||
| 47 | } | ||
| 48 | }) | ||
| 49 | .is_ok() | ||
| 50 | } | ||
| 51 | |||
| 52 | /// Unmark the task as run-queued. Return whether the task is spawned. | ||
| 53 | #[inline(always)] | ||
| 54 | pub fn run_dequeue(&self) -> bool { | ||
| 55 | let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | ||
| 56 | state & STATE_SPAWNED != 0 | ||
| 57 | } | ||
| 58 | |||
| 59 | /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) | ||
| 60 | #[cfg(feature = "integrated-timers")] | ||
| 61 | #[inline(always)] | ||
| 62 | pub fn timer_enqueue(&self) -> bool { | ||
| 63 | let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); | ||
| 64 | old_state & STATE_TIMER_QUEUED == 0 | ||
| 65 | } | ||
| 66 | |||
| 67 | /// Unmark the task as timer-queued. | ||
| 68 | #[cfg(feature = "integrated-timers")] | ||
| 69 | #[inline(always)] | ||
| 70 | pub fn timer_dequeue(&self) { | ||
| 71 | self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); | ||
| 72 | } | ||
| 73 | } | ||
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs new file mode 100644 index 000000000..c3cc1b0b7 --- /dev/null +++ b/embassy-executor/src/raw/state_critical_section.rs | |||
| @@ -0,0 +1,93 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | |||
| 3 | use critical_section::Mutex; | ||
| 4 | |||
| 5 | /// Task is spawned (has a future) | ||
| 6 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | ||
| 7 | /// Task is in the executor run queue | ||
| 8 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | ||
| 9 | /// Task is in the executor timer queue | ||
| 10 | #[cfg(feature = "integrated-timers")] | ||
| 11 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | ||
| 12 | |||
| 13 | pub(crate) struct State { | ||
| 14 | state: Mutex<Cell<u32>>, | ||
| 15 | } | ||
| 16 | |||
| 17 | impl State { | ||
| 18 | pub const fn new() -> State { | ||
| 19 | Self { | ||
| 20 | state: Mutex::new(Cell::new(0)), | ||
| 21 | } | ||
| 22 | } | ||
| 23 | |||
| 24 | fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { | ||
| 25 | critical_section::with(|cs| { | ||
| 26 | let s = self.state.borrow(cs); | ||
| 27 | let mut val = s.get(); | ||
| 28 | let r = f(&mut val); | ||
| 29 | s.set(val); | ||
| 30 | r | ||
| 31 | }) | ||
| 32 | } | ||
| 33 | |||
| 34 | /// If task is idle, mark it as spawned + run_queued and return true. | ||
| 35 | #[inline(always)] | ||
| 36 | pub fn spawn(&self) -> bool { | ||
| 37 | self.update(|s| { | ||
| 38 | if *s == 0 { | ||
| 39 | *s = STATE_SPAWNED | STATE_RUN_QUEUED; | ||
| 40 | true | ||
| 41 | } else { | ||
| 42 | false | ||
| 43 | } | ||
| 44 | }) | ||
| 45 | } | ||
| 46 | |||
| 47 | /// Unmark the task as spawned. | ||
| 48 | #[inline(always)] | ||
| 49 | pub fn despawn(&self) { | ||
| 50 | self.update(|s| *s &= !STATE_SPAWNED); | ||
| 51 | } | ||
| 52 | |||
| 53 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. | ||
| 54 | #[inline(always)] | ||
| 55 | pub fn run_enqueue(&self) -> bool { | ||
| 56 | self.update(|s| { | ||
| 57 | if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { | ||
| 58 | false | ||
| 59 | } else { | ||
| 60 | *s |= STATE_RUN_QUEUED; | ||
| 61 | true | ||
| 62 | } | ||
| 63 | }) | ||
| 64 | } | ||
| 65 | |||
| 66 | /// Unmark the task as run-queued. Return whether the task is spawned. | ||
| 67 | #[inline(always)] | ||
| 68 | pub fn run_dequeue(&self) -> bool { | ||
| 69 | self.update(|s| { | ||
| 70 | let ok = *s & STATE_SPAWNED != 0; | ||
| 71 | *s &= !STATE_RUN_QUEUED; | ||
| 72 | ok | ||
| 73 | }) | ||
| 74 | } | ||
| 75 | |||
| 76 | /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) | ||
| 77 | #[cfg(feature = "integrated-timers")] | ||
| 78 | #[inline(always)] | ||
| 79 | pub fn timer_enqueue(&self) -> bool { | ||
| 80 | self.update(|s| { | ||
| 81 | let ok = *s & STATE_TIMER_QUEUED == 0; | ||
| 82 | *s |= STATE_TIMER_QUEUED; | ||
| 83 | ok | ||
| 84 | }) | ||
| 85 | } | ||
| 86 | |||
| 87 | /// Unmark the task as timer-queued. | ||
| 88 | #[cfg(feature = "integrated-timers")] | ||
| 89 | #[inline(always)] | ||
| 90 | pub fn timer_dequeue(&self) { | ||
| 91 | self.update(|s| *s &= !STATE_TIMER_QUEUED); | ||
| 92 | } | ||
| 93 | } | ||
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index dc71c95b1..59a3b43f5 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs | |||
| @@ -1,9 +1,8 @@ | |||
| 1 | use core::cmp::min; | 1 | use core::cmp::min; |
| 2 | 2 | ||
| 3 | use atomic_polyfill::Ordering; | ||
| 4 | use embassy_time::Instant; | 3 | use embassy_time::Instant; |
| 5 | 4 | ||
| 6 | use super::{TaskRef, STATE_TIMER_QUEUED}; | 5 | use super::TaskRef; |
| 7 | use crate::raw::util::SyncUnsafeCell; | 6 | use crate::raw::util::SyncUnsafeCell; |
| 8 | 7 | ||
| 9 | pub(crate) struct TimerQueueItem { | 8 | pub(crate) struct TimerQueueItem { |
| @@ -32,10 +31,7 @@ impl TimerQueue { | |||
| 32 | pub(crate) unsafe fn update(&self, p: TaskRef) { | 31 | pub(crate) unsafe fn update(&self, p: TaskRef) { |
| 33 | let task = p.header(); | 32 | let task = p.header(); |
| 34 | if task.expires_at.get() != Instant::MAX { | 33 | if task.expires_at.get() != Instant::MAX { |
| 35 | let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); | 34 | if task.state.timer_enqueue() { |
| 36 | let is_new = old_state & STATE_TIMER_QUEUED == 0; | ||
| 37 | |||
| 38 | if is_new { | ||
| 39 | task.timer_queue_item.next.set(self.head.get()); | 35 | task.timer_queue_item.next.set(self.head.get()); |
| 40 | self.head.set(Some(p)); | 36 | self.head.set(Some(p)); |
| 41 | } | 37 | } |
| @@ -75,7 +71,7 @@ impl TimerQueue { | |||
| 75 | } else { | 71 | } else { |
| 76 | // Remove it | 72 | // Remove it |
| 77 | prev.set(task.timer_queue_item.next.get()); | 73 | prev.set(task.timer_queue_item.next.get()); |
| 78 | task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); | 74 | task.state.timer_dequeue(); |
| 79 | } | 75 | } |
| 80 | } | 76 | } |
| 81 | } | 77 | } |
