From bef9b7a8539c3dddb1cf6ab46db161f1ca56b1a1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 14 Nov 2023 22:32:48 +0100 Subject: executor: remove atomic-polyfill. --- embassy-executor/src/raw/mod.rs | 58 ++++---------- embassy-executor/src/raw/run_queue.rs | 88 -------------------- embassy-executor/src/raw/run_queue_atomics.rs | 87 ++++++++++++++++++++ .../src/raw/run_queue_critical_section.rs | 75 +++++++++++++++++ embassy-executor/src/raw/state_atomics.rs | 73 +++++++++++++++++ embassy-executor/src/raw/state_critical_section.rs | 93 ++++++++++++++++++++++ embassy-executor/src/raw/timer_queue.rs | 10 +-- 7 files changed, 346 insertions(+), 138 deletions(-) delete mode 100644 embassy-executor/src/raw/run_queue.rs create mode 100644 embassy-executor/src/raw/run_queue_atomics.rs create mode 100644 embassy-executor/src/raw/run_queue_critical_section.rs create mode 100644 embassy-executor/src/raw/state_atomics.rs create mode 100644 embassy-executor/src/raw/state_critical_section.rs (limited to 'embassy-executor/src/raw') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6d2c1c18a..ed0bedd25 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,7 +7,14 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. +#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] +#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; + +#[cfg_attr(target_has_atomic = "8", path = "state_atomics.rs")] +#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] +mod state; + #[cfg(feature = "integrated-timers")] mod timer_queue; pub(crate) mod util; @@ -21,7 +28,6 @@ use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; -use atomic_polyfill::{AtomicU32, Ordering}; #[cfg(feature = "integrated-timers")] use embassy_time::driver::{self, AlarmHandle}; #[cfg(feature = "integrated-timers")] @@ -30,21 +36,14 @@ use embassy_time::Instant; use rtos_trace::trace; use self::run_queue::{RunQueue, RunQueueItem}; +use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; -/// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; -/// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -#[cfg(feature = "integrated-timers")] -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; - /// Raw task header for use in task pointers. pub(crate) struct TaskHeader { - pub(crate) state: AtomicU32, + pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, pub(crate) executor: SyncUnsafeCell>, poll_fn: SyncUnsafeCell>, @@ -116,7 +115,7 @@ impl TaskStorage { pub const fn new() -> Self { Self { raw: TaskHeader { - state: AtomicU32::new(0), + state: State::new(), run_queue_item: RunQueueItem::new(), executor: SyncUnsafeCell::new(None), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` @@ -161,7 +160,7 @@ impl TaskStorage { match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); - this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + this.raw.state.despawn(); #[cfg(feature = "integrated-timers")] this.raw.expires_at.set(Instant::MAX); @@ -193,11 +192,7 @@ impl AvailableTask { /// /// This function returns `None` if a task has already been spawned and has not finished running. pub fn claim(task: &'static TaskStorage) -> Option { - task.raw - .state - .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) - .ok() - .map(|_| Self { task }) + task.raw.state.spawn().then(|| Self { task }) } fn initialize_impl(self, future: impl FnOnce() -> F) -> SpawnToken { @@ -394,8 +389,7 @@ impl SyncExecutor { #[cfg(feature = "integrated-timers")] task.expires_at.set(Instant::MAX); - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { + if !task.state.run_dequeue() { // If task is not running, ignore it. This can happen in the following scenario: // - Task gets dequeued, poll starts // - While task is being polled, it gets woken. It gets placed in the queue. @@ -546,18 +540,7 @@ impl Executor { /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task(task: TaskRef) { let header = task.header(); - - let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - None - } else { - // Mark it as scheduled - Some(state | STATE_RUN_QUEUED) - } - }); - - if res.is_ok() { + if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); @@ -571,18 +554,7 @@ pub fn wake_task(task: TaskRef) { /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task_no_pend(task: TaskRef) { let header = task.header(); - - let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - None - } else { - // Mark it as scheduled - Some(state | STATE_RUN_QUEUED) - } - }); - - if res.is_ok() { + if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs deleted file mode 100644 index f1ec19ac1..000000000 --- a/embassy-executor/src/raw/run_queue.rs +++ /dev/null @@ -1,88 +0,0 @@ -use core::ptr; -use core::ptr::NonNull; - -use atomic_polyfill::{AtomicPtr, Ordering}; - -use super::{TaskHeader, TaskRef}; -use crate::raw::util::SyncUnsafeCell; - -pub(crate) struct RunQueueItem { - next: SyncUnsafeCell>, -} - -impl RunQueueItem { - pub const fn new() -> Self { - Self { - next: SyncUnsafeCell::new(None), - } - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't create fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - head: AtomicPtr, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - /// - /// # Safety - /// - /// `item` must NOT be already enqueued in any queue. - #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { - let mut was_empty = false; - - self.head - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { - was_empty = prev.is_null(); - unsafe { - // safety: the pointer is either null or valid - let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); - // safety: there are no concurrent accesses to `next` - task.header().run_queue_item.next.set(prev); - } - Some(task.as_ptr() as *mut _) - }) - .ok(); - - was_empty - } - - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // Atomically empty the queue. - let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); - - // safety: the pointer is either null or valid - let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; - - // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = next { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - // safety: there are no concurrent accesses to `next` - next = unsafe { task.header().run_queue_item.next.get() }; - - on_task(task); - } - } -} diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs new file mode 100644 index 000000000..90907cfda --- /dev/null +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -0,0 +1,87 @@ +use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicPtr, Ordering}; + +use super::{TaskHeader, TaskRef}; +use crate::raw::util::SyncUnsafeCell; + +pub(crate) struct RunQueueItem { + next: SyncUnsafeCell>, +} + +impl RunQueueItem { + pub const fn new() -> Self { + Self { + next: SyncUnsafeCell::new(None), + } + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + head: AtomicPtr, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + let mut was_empty = false; + + self.head + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { + was_empty = prev.is_null(); + unsafe { + // safety: the pointer is either null or valid + let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); + // safety: there are no concurrent accesses to `next` + task.header().run_queue_item.next.set(prev); + } + Some(task.as_ptr() as *mut _) + }) + .ok(); + + was_empty + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + // Atomically empty the queue. + let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + // safety: the pointer is either null or valid + let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; + + // Iterate the linked list of tasks that were previously in the queue. + while let Some(task) = next { + // If the task re-enqueues itself, the `next` pointer will get overwritten. + // Therefore, first read the next pointer, and only then process the task. + // safety: there are no concurrent accesses to `next` + next = unsafe { task.header().run_queue_item.next.get() }; + + on_task(task); + } + } +} diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs new file mode 100644 index 000000000..ba59c8f29 --- /dev/null +++ b/embassy-executor/src/raw/run_queue_critical_section.rs @@ -0,0 +1,75 @@ +use core::cell::Cell; + +use critical_section::{CriticalSection, Mutex}; + +use super::TaskRef; + +pub(crate) struct RunQueueItem { + next: Mutex>>, +} + +impl RunQueueItem { + pub const fn new() -> Self { + Self { + next: Mutex::new(Cell::new(None)), + } + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + head: Mutex>>, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + head: Mutex::new(Cell::new(None)), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + critical_section::with(|cs| { + let prev = self.head.borrow(cs).replace(Some(task)); + task.header().run_queue_item.next.borrow(cs).set(prev); + + prev.is_none() + }) + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + // Atomically empty the queue. + let mut next = critical_section::with(|cs| self.head.borrow(cs).take()); + + // Iterate the linked list of tasks that were previously in the queue. + while let Some(task) = next { + // If the task re-enqueues itself, the `next` pointer will get overwritten. + // Therefore, first read the next pointer, and only then process the task. + + // safety: we know if the task is enqueued, no one else will touch the `next` pointer. + let cs = unsafe { CriticalSection::new() }; + next = task.header().run_queue_item.next.borrow(cs).get(); + + on_task(task); + } + } +} diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs new file mode 100644 index 000000000..e1279ac0b --- /dev/null +++ b/embassy-executor/src/raw/state_atomics.rs @@ -0,0 +1,73 @@ +use core::sync::atomic::{AtomicU32, Ordering}; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub(crate) struct State { + state: AtomicU32, +} + +impl State { + pub const fn new() -> State { + Self { + state: AtomicU32::new(0), + } + } + + /// If task is idle, mark it as spawned + run_queued and return true. + #[inline(always)] + pub fn spawn(&self) -> bool { + self.state + .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + /// Unmark the task as spawned. + #[inline(always)] + pub fn despawn(&self) { + self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + } + + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + #[inline(always)] + pub fn run_enqueue(&self) -> bool { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // If already scheduled, or if not started, + if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { + None + } else { + // Mark it as scheduled + Some(state | STATE_RUN_QUEUED) + } + }) + .is_ok() + } + + /// Unmark the task as run-queued. Return whether the task is spawned. + #[inline(always)] + pub fn run_dequeue(&self) -> bool { + let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + state & STATE_SPAWNED != 0 + } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); + old_state & STATE_TIMER_QUEUED == 0 + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + } +} diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs new file mode 100644 index 000000000..c3cc1b0b7 --- /dev/null +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -0,0 +1,93 @@ +use core::cell::Cell; + +use critical_section::Mutex; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub(crate) struct State { + state: Mutex>, +} + +impl State { + pub const fn new() -> State { + Self { + state: Mutex::new(Cell::new(0)), + } + } + + fn update(&self, f: impl FnOnce(&mut u32) -> R) -> R { + critical_section::with(|cs| { + let s = self.state.borrow(cs); + let mut val = s.get(); + let r = f(&mut val); + s.set(val); + r + }) + } + + /// If task is idle, mark it as spawned + run_queued and return true. + #[inline(always)] + pub fn spawn(&self) -> bool { + self.update(|s| { + if *s == 0 { + *s = STATE_SPAWNED | STATE_RUN_QUEUED; + true + } else { + false + } + }) + } + + /// Unmark the task as spawned. + #[inline(always)] + pub fn despawn(&self) { + self.update(|s| *s &= !STATE_SPAWNED); + } + + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + #[inline(always)] + pub fn run_enqueue(&self) -> bool { + self.update(|s| { + if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { + false + } else { + *s |= STATE_RUN_QUEUED; + true + } + }) + } + + /// Unmark the task as run-queued. Return whether the task is spawned. + #[inline(always)] + pub fn run_dequeue(&self) -> bool { + self.update(|s| { + let ok = *s & STATE_SPAWNED != 0; + *s &= !STATE_RUN_QUEUED; + ok + }) + } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + self.update(|s| { + let ok = *s & STATE_TIMER_QUEUED == 0; + *s |= STATE_TIMER_QUEUED; + ok + }) + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.update(|s| *s &= !STATE_TIMER_QUEUED); + } +} diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index dc71c95b1..59a3b43f5 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,9 +1,8 @@ use core::cmp::min; -use atomic_polyfill::Ordering; use embassy_time::Instant; -use super::{TaskRef, STATE_TIMER_QUEUED}; +use super::TaskRef; use crate::raw::util::SyncUnsafeCell; pub(crate) struct TimerQueueItem { @@ -32,10 +31,7 @@ impl TimerQueue { pub(crate) unsafe fn update(&self, p: TaskRef) { let task = p.header(); if task.expires_at.get() != Instant::MAX { - let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); - let is_new = old_state & STATE_TIMER_QUEUED == 0; - - if is_new { + if task.state.timer_enqueue() { task.timer_queue_item.next.set(self.head.get()); self.head.set(Some(p)); } @@ -75,7 +71,7 @@ impl TimerQueue { } else { // Remove it prev.set(task.timer_queue_item.next.get()); - task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + task.state.timer_dequeue(); } } } -- cgit