From ba0426f767bb602750bed4fae87a156b661c0e92 Mon Sep 17 00:00:00 2001 From: James Munns Date: Tue, 1 Apr 2025 18:50:12 +0200 Subject: Combine DRS and non-DRS atomic scheduler, using cordyceps --- embassy-executor/src/raw/deadline.rs | 111 ++++++++++++ embassy-executor/src/raw/mod.rs | 18 +- embassy-executor/src/raw/run_queue_atomics.rs | 110 +++++++----- embassy-executor/src/raw/run_queue_drs_atomics.rs | 203 ---------------------- 4 files changed, 186 insertions(+), 256 deletions(-) create mode 100644 embassy-executor/src/raw/deadline.rs delete mode 100644 embassy-executor/src/raw/run_queue_drs_atomics.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs new file mode 100644 index 000000000..3f60936cc --- /dev/null +++ b/embassy-executor/src/raw/deadline.rs @@ -0,0 +1,111 @@ +use core::future::{poll_fn, Future}; +use core::task::Poll; + +/// A type for interacting with the deadline of the current task +pub struct Deadline { + /// Deadline value in ticks, same time base and ticks as `embassy-time` + pub instant_ticks: u64, +} + +impl Deadline { + /// Set the current task's deadline at exactly `instant_ticks` + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline. + /// + /// Analogous to `Timer::at`. + /// + /// TODO: Should we check/panic if the deadline is in the past? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(instant_ticks); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `duration_ticks` in the future from when + /// this future is polled. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to `Timer::after` + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + let now = embassy_time_driver::now(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = now.saturating_add(duration_ticks); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `increment_ticks` from the previous deadline. + /// + /// Note that by default (unless otherwise set), tasks start life with the deadline + /// u64::MAX, which means this method will have no effect. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to one increment of `Ticker::every().next()`. + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + // Get the last value + let last = task.header().deadline.get(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = last.saturating_add(increment_ticks); + + // Store the new value + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Get the current task's deadline as a tick value. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + pub fn get_current_task_deadline() -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + let deadline = unsafe { task.header().deadline.get() }; + Poll::Ready(Self { + instant_ticks: deadline, + }) + }) + } +} diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 2e5941ef7..0dd247d30 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,14 +7,7 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. -#[cfg_attr( - all(not(feature = "drs-scheduler"), target_has_atomic = "ptr"), - path = "run_queue_atomics.rs", -)] -#[cfg_attr( - all(feature = "drs-scheduler", target_has_atomic = "ptr"), - path = "run_queue_drs_atomics.rs", -)] +#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; @@ -35,6 +28,9 @@ pub(crate) mod util; #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] mod waker; +#[cfg(feature = "drs-scheduler")] +mod deadline; + use core::future::Future; use core::marker::PhantomData; use core::mem; @@ -50,6 +46,9 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; +#[cfg(feature = "drs-scheduler")] +pub use deadline::Deadline; + use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; @@ -62,9 +61,6 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static unsafe { task_from_waker(waker).timer_queue_item() } } -#[cfg(feature = "drs-scheduler")] -pub use run_queue::Deadline; - /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index ce511d79a..bc5d38250 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -1,19 +1,36 @@ -use core::ptr; -use core::ptr::NonNull; -use core::sync::atomic::{AtomicPtr, Ordering}; +use core::ptr::{addr_of_mut, NonNull}; + +use cordyceps::sorted_list::Links; +use cordyceps::{Linked, SortedList, TransferStack}; use super::{TaskHeader, TaskRef}; -use crate::raw::util::SyncUnsafeCell; -pub(crate) struct RunQueueItem { - next: SyncUnsafeCell>, -} +/// Use `cordyceps::sorted_list::Links` as the singly linked list +/// for RunQueueItems. +pub(crate) type RunQueueItem = Links; -impl RunQueueItem { - pub const fn new() -> Self { - Self { - next: SyncUnsafeCell::new(None), - } +/// Implements the `Linked` trait, allowing for singly linked list usage +/// of any of cordyceps' `TransferStack` (used for the atomic runqueue), +/// `SortedList` (used with the DRS scheduler), or `Stack`, which is +/// popped atomically from the `TransferStack`. +unsafe impl Linked> for TaskHeader { + type Handle = TaskRef; + + // Convert a TaskRef into a TaskHeader ptr + fn into_ptr(r: TaskRef) -> NonNull { + r.ptr + } + + // Convert a TaskHeader into a TaskRef + unsafe fn from_ptr(ptr: NonNull) -> TaskRef { + TaskRef { ptr } + } + + // Given a pointer to a TaskHeader, obtain a pointer to the Links structure, + // which can be used to traverse to other TaskHeader nodes in the linked list + unsafe fn links(ptr: NonNull) -> NonNull> { + let ptr: *mut TaskHeader = ptr.as_ptr(); + NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) } } @@ -29,13 +46,13 @@ impl RunQueueItem { /// current batch is completely processed, so even if a task enqueues itself instantly (for example /// by waking its own waker) can't prevent other tasks from running. pub(crate) struct RunQueue { - head: AtomicPtr, + stack: TransferStack, } impl RunQueue { pub const fn new() -> Self { Self { - head: AtomicPtr::new(ptr::null_mut()), + stack: TransferStack::new(), } } @@ -46,43 +63,52 @@ impl RunQueue { /// `item` must NOT be already enqueued in any queue. #[inline(always)] pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { - let mut was_empty = false; - - self.head - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { - was_empty = prev.is_null(); - unsafe { - // safety: the pointer is either null or valid - let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); - // safety: there are no concurrent accesses to `next` - task.header().run_queue_item.next.set(prev); - } - Some(task.as_ptr() as *mut _) - }) - .ok(); - - was_empty + self.stack.push_was_empty(task) } /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + #[cfg(not(feature = "drs-scheduler"))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // Atomically empty the queue. - let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + let taken = self.stack.take_all(); + for taskref in taken { + taskref.header().state.run_dequeue(); + on_task(taskref); + } + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + #[cfg(feature = "drs-scheduler")] + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + // SAFETY: `deadline` can only be set through the `Deadline` interface, which + // only allows access to this value while the given task is being polled. + // This acts as mutual exclusion for access. + let mut sorted = + SortedList::::new_custom(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); + + loop { + // For each loop, grab any newly pended items + let taken = self.stack.take_all(); - // safety: the pointer is either null or valid - let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; + // Sort these into the list - this is potentially expensive! We do an + // insertion sort of new items, which iterates the linked list. + // + // Something on the order of `O(n * m)`, where `n` is the number + // of new tasks, and `m` is the number of already pending tasks. + sorted.extend(taken); - // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = next { - // If the task re-enqueues itself, the `next` pointer will get overwritten. - // Therefore, first read the next pointer, and only then process the task. - // safety: there are no concurrent accesses to `next` - next = unsafe { task.header().run_queue_item.next.get() }; + // Pop the task with the SOONEST deadline. If there are no tasks + // pending, then we are done. + let Some(taskref) = sorted.pop_front() else { + return; + }; - task.header().state.run_dequeue(); - on_task(task); + // We got one task, mark it as dequeued, and process the task. + taskref.header().state.run_dequeue(); + on_task(taskref); } } } diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs deleted file mode 100644 index 047265954..000000000 --- a/embassy-executor/src/raw/run_queue_drs_atomics.rs +++ /dev/null @@ -1,203 +0,0 @@ -use super::{TaskHeader, TaskRef}; -use cordyceps::{SortedList, TransferStack}; -use core::future::{Future, poll_fn}; -use core::task::Poll; -use core::ptr::{addr_of_mut, NonNull}; -use cordyceps::sorted_list::Links; -use cordyceps::Linked; - -pub(crate) type RunQueueItem = Links; - -unsafe impl Linked> for super::TaskHeader { - type Handle = TaskRef; - - fn into_ptr(r: Self::Handle) -> NonNull { - r.ptr.cast() - } - - unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { - let ptr: NonNull = ptr; - TaskRef { ptr } - } - - unsafe fn links(ptr: NonNull) -> NonNull> { - let ptr: *mut TaskHeader = ptr.as_ptr(); - NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) - } -} - -/// Atomic task queue using a very, very simple lock-free linked-list queue: -/// -/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. -/// -/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with -/// null. Then the batch is iterated following the next pointers until null is reached. -/// -/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK -/// for our purposes: it can't create fairness problems since the next batch won't run until the -/// current batch is completely processed, so even if a task enqueues itself instantly (for example -/// by waking its own waker) can't prevent other tasks from running. -pub(crate) struct RunQueue { - stack: TransferStack, -} - -impl RunQueue { - pub const fn new() -> Self { - Self { - stack: TransferStack::new(), - } - } - - /// Enqueues an item. Returns true if the queue was empty. - /// - /// # Safety - /// - /// `item` must NOT be already enqueued in any queue. - #[inline(always)] - pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { - self.stack.push_was_empty(task) - } - - /// Empty the queue, then call `on_task` for each task that was in the queue. - /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue - /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - // SAFETY: `deadline` can only be set through the `Deadline` interface, which - // only allows access to this value while the given task is being polled. - // This acts as mutual exclusion for access. - let mut sorted = SortedList::::new_custom(|lhs, rhs| unsafe { - lhs.deadline.get().cmp(&rhs.deadline.get()) - }); - - loop { - // For each loop, grab any newly pended items - let taken = self.stack.take_all(); - - // Sort these into the list - this is potentially expensive! We do an - // insertion sort of new items, which iterates the linked list. - // - // Something on the order of `O(n * m)`, where `n` is the number - // of new tasks, and `m` is the number of already pending tasks. - sorted.extend(taken); - - // Pop the task with the SOONEST deadline. If there are no tasks - // pending, then we are done. - let Some(taskref) = sorted.pop_front() else { - return; - }; - - // We got one task, mark it as dequeued, and process the task. - taskref.header().state.run_dequeue(); - on_task(taskref); - } - } -} - -/// A type for interacting with the deadline of the current task -pub struct Deadline { - /// Deadline value in ticks, same time base and ticks as `embassy-time` - pub instant_ticks: u64, -} - -impl Deadline { - /// Set the current task's deadline at exactly `instant_ticks` - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline. - /// - /// Analogous to `Timer::at`. - /// - /// TODO: Should we check/panic if the deadline is in the past? - #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - task.header().deadline.set(instant_ticks); - } - Poll::Ready(()) - }) - } - - /// Set the current task's deadline `duration_ticks` in the future from when - /// this future is polled. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - /// - /// Analogous to `Timer::after` - /// - /// TODO: Do we want to return what the deadline is? - #[must_use = "Setting deadline must be polled to be effective"] - pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - let now = embassy_time_driver::now(); - - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = now.saturating_add(duration_ticks); - - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - task.header().deadline.set(deadline); - } - Poll::Ready(()) - }) - } - - /// Set the current task's deadline `increment_ticks` from the previous deadline. - /// - /// Note that by default (unless otherwise set), tasks start life with the deadline - /// u64::MAX, which means this method will have no effect. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - /// - /// Analogous to one increment of `Ticker::every().next()`. - /// - /// TODO: Do we want to return what the deadline is? - #[must_use = "Setting deadline must be polled to be effective"] - pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - unsafe { - // Get the last value - let last = task.header().deadline.get(); - - // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave - // it for now, we can probably make this wrapping_add for performance - // reasons later. - let deadline = last.saturating_add(increment_ticks); - - // Store the new value - task.header().deadline.set(deadline); - } - Poll::Ready(()) - }) - } - - /// Get the current task's deadline as a tick value. - /// - /// This method is a future in order to access the currently executing task's - /// header which contains the deadline - pub fn get_current_task_deadline() -> impl Future { - poll_fn(move |cx| { - let task = super::task_from_waker(cx.waker()); - - // SAFETY: A task can only modify its own deadline, while the task is being - // polled, meaning that there cannot be concurrent access to the deadline. - let deadline = unsafe { - task.header().deadline.get() - }; - Poll::Ready(Self { instant_ticks: deadline }) - }) - } -} -- cgit