From 535c80e61f17e4ee4605e00623aabeda2181352d Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 20 Mar 2025 09:47:56 +0100 Subject: Add initial DRS scheduler placeholder * Start hacking in cordyceps This adds a third kind of runqueue, for now it should work the same as the current "atomics" runqueue, but uses a cordyceps TransferStack instead of the existing home-rolled linked list. * Clean up, use new cordyceps feature * A bit more cleanup * Update docs to be more clear --- embassy-executor/src/raw/mod.rs | 64 +++++++++++++++++++++-- embassy-executor/src/raw/run_queue_drs_atomics.rs | 47 +++++++++++++++++ 2 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 embassy-executor/src/raw/run_queue_drs_atomics.rs (limited to 'embassy-executor/src') diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 4280c5750..894a996ec 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,7 +7,14 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. -#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] +#[cfg_attr( + all(not(feature = "drs-scheduler"), target_has_atomic = "ptr"), + path = "run_queue_atomics.rs", +)] +#[cfg_attr( + all(feature = "drs-scheduler", target_has_atomic = "ptr"), + path = "run_queue_drs_atomics.rs", +)] #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; @@ -33,6 +40,8 @@ use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; +#[cfg(feature = "drs-scheduler")] +use core::ptr::addr_of_mut; #[cfg(not(feature = "arch-avr"))] use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering; @@ -42,7 +51,9 @@ use embassy_executor_timer_queue::TimerQueueItem; #[cfg(feature = "arch-avr")] use portable_atomic::AtomicPtr; -use self::run_queue::{RunQueue, RunQueueItem}; +use self::run_queue::RunQueue; +#[cfg(not(feature = "drs-scheduler"))] +use self::run_queue::RunQueueItem; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; @@ -54,6 +65,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static unsafe { task_from_waker(waker).timer_queue_item() } } +#[cfg(feature = "drs-scheduler")] +use cordyceps::{stack, Linked}; + /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: @@ -93,9 +107,29 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` +#[cfg_attr(feature = "drs-scheduler", repr(C))] pub(crate) struct TaskHeader { - pub(crate) state: State, + // TODO(AJM): Make a decision whether we want to support the spicier "pointer recast"/"type punning" + // method of implementing the `cordyceps::Linked` trait or not. + // + // Currently, I do the safer version with `addr_of_mut!`, which doesn't REQUIRE that the first + // element is the `links` field, at the potential cost of a little extra pointer math. + // + // The optimizer *might* (total guess) notice that we are always doing an offset of zero in the + // call to `addr_of_mut` in the `impl Linked for TaskHeader` below, and get the best of both worlds, + // but right now this is maybe a little over cautious. + // + // See https://docs.rs/cordyceps/latest/cordyceps/trait.Linked.html#implementing-linkedlinks for + // more context on the choices here. + #[cfg(feature = "drs-scheduler")] + pub(crate) links: stack::Links, + + // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though + // right now cordyceps doesn't work on non-atomic systems + #[cfg(not(feature = "drs-scheduler"))] pub(crate) run_queue_item: RunQueueItem, + + pub(crate) state: State, pub(crate) executor: AtomicPtr, poll_fn: SyncUnsafeCell>, @@ -108,6 +142,25 @@ pub(crate) struct TaskHeader { all_tasks_next: AtomicPtr, } +#[cfg(feature = "drs-scheduler")] +unsafe impl Linked> for TaskHeader { + type Handle = TaskRef; + + fn into_ptr(r: Self::Handle) -> NonNull { + r.ptr.cast() + } + + unsafe fn from_ptr(ptr: NonNull) -> Self::Handle { + let ptr: NonNull = ptr; + TaskRef { ptr } + } + + unsafe fn links(ptr: NonNull) -> NonNull> { + let ptr: *mut TaskHeader = ptr.as_ptr(); + NonNull::new_unchecked(addr_of_mut!((*ptr).links)) + } +} + /// This is essentially a `&'static TaskStorage` where the type of the future has been erased. #[derive(Debug, Clone, Copy, PartialEq)] pub struct TaskRef { @@ -198,8 +251,11 @@ impl TaskStorage { pub const fn new() -> Self { Self { raw: TaskHeader { - state: State::new(), + #[cfg(not(feature = "drs-scheduler"))] run_queue_item: RunQueueItem::new(), + #[cfg(feature = "drs-scheduler")] + links: stack::Links::new(), + state: State::new(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` poll_fn: SyncUnsafeCell::new(None), diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs new file mode 100644 index 000000000..53ada1b14 --- /dev/null +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs @@ -0,0 +1,47 @@ +use super::{TaskHeader, TaskRef}; +use cordyceps::TransferStack; + + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + stack: TransferStack, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + stack: TransferStack::new(), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { + self.stack.push_was_empty(task) + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + let taken = self.stack.take_all(); + for taskref in taken { + taskref.header().state.run_dequeue(); + on_task(taskref); + } + } +} -- cgit