From 1f50e4d496458dbc7fccd9d028217ebfa7735471 Mon Sep 17 00:00:00 2001 From: James Munns Date: Thu, 20 Mar 2025 14:32:14 +0100 Subject: Implement Deadline Ranked Scheduling This implements a minimal version of Deadline Rank Scheduling, as well as ways to access and set Deadlines. This still needs some UX improvements, but is likely Enough for testing. --- embassy-executor/Cargo.toml | 3 +- embassy-executor/src/raw/mod.rs | 12 ++ embassy-executor/src/raw/run_queue_drs_atomics.rs | 141 +++++++++++++++++++++- 3 files changed, 150 insertions(+), 6 deletions(-) diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml index 1cd732dfa..db664a819 100644 --- a/embassy-executor/Cargo.toml +++ b/embassy-executor/Cargo.toml @@ -89,7 +89,6 @@ embassy-sync = { path = "../embassy-sync" } rustversion = "1.0.21" [features] - ## Enable nightly-only features nightly = ["embassy-executor-macros/nightly"] @@ -133,4 +132,4 @@ rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time _any_trace = [] ## Enable "Deadline Rank Scheduler" -drs-scheduler = ["dep:cordyceps"] +drs-scheduler = ["dep:cordyceps", "dep:embassy-time-driver"] diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 894a996ec..9b8a4ea8a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -68,6 +68,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static #[cfg(feature = "drs-scheduler")] use cordyceps::{stack, Linked}; +#[cfg(feature = "drs-scheduler")] +pub use run_queue::Deadline; + /// Raw task header for use in task pointers. /// /// A task can be in one of the following states: @@ -124,6 +127,9 @@ pub(crate) struct TaskHeader { #[cfg(feature = "drs-scheduler")] pub(crate) links: stack::Links, + #[cfg(feature = "drs-scheduler")] + pub(crate) deadline: SyncUnsafeCell, + // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though // right now cordyceps doesn't work on non-atomic systems #[cfg(not(feature = "drs-scheduler"))] @@ -255,6 +261,8 @@ impl TaskStorage { run_queue_item: RunQueueItem::new(), #[cfg(feature = "drs-scheduler")] links: stack::Links::new(), + #[cfg(feature = "drs-scheduler")] + deadline: SyncUnsafeCell::new(0u64), state: State::new(), executor: AtomicPtr::new(core::ptr::null_mut()), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` @@ -352,6 +360,10 @@ impl AvailableTask { self.task.raw.poll_fn.set(Some(TaskStorage::::poll)); self.task.future.write_in_place(future); + // TODO(AJM): Some other way of setting this? Just a placeholder + #[cfg(feature = "drs-scheduler")] + self.task.raw.deadline.set(u64::MAX); + let task = TaskRef::new(self.task); SpawnToken::new(task) diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs index 53ada1b14..69b7b3bf0 100644 --- a/embassy-executor/src/raw/run_queue_drs_atomics.rs +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs @@ -1,6 +1,7 @@ use super::{TaskHeader, TaskRef}; -use cordyceps::TransferStack; - +use cordyceps::{SortedList, TransferStack}; +use core::future::{Future, poll_fn}; +use core::task::Poll; /// Atomic task queue using a very, very simple lock-free linked-list queue: /// @@ -38,10 +39,142 @@ impl RunQueue { /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { - let taken = self.stack.take_all(); - for taskref in taken { + let mut sorted = SortedList::::new(|lhs, rhs| unsafe { + // TODO: Do we need any kind of access control here? Not if we say that + // tasks can only set their own priority, which they can't do if we're in + // the scheduler + lhs.deadline.get().cmp(&rhs.deadline.get()) + }); + + loop { + // For each loop, grab any newly pended items + let taken = self.stack.take_all(); + + // Sort these into the list - this is potentially expensive! We do an + // insertion sort of new items, which iterates the linked list. + // + // Something on the order of `O(n * m)`, where `n` is the number + // of new tasks, and `m` is the number of already pending tasks. + sorted.extend(taken); + + // Pop the task with the SOONEST deadline. If there are no tasks + // pending, then we are done. + let Some(taskref) = sorted.pop_front() else { + return; + }; + + // We got one task, mark it as dequeued, and process the task. taskref.header().state.run_dequeue(); on_task(taskref); } } } + +/// A type for interacting with the deadline of the current task +pub struct Deadline { + /// Deadline value in ticks, same time base and ticks as `embassy-time` + pub instant_ticks: u64, +} + +impl Deadline { + /// Set the current task's deadline at exactly `instant_ticks` + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline. + /// + /// Analogous to `Timer::at`. + /// + /// TODO: Should we check/panic if the deadline is in the past? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(instant_ticks); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `duration_ticks` in the future from when + /// this future is polled. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to `Timer::after` + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + let now = embassy_time_driver::now(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = now.saturating_add(duration_ticks); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Set the current task's deadline `increment_ticks` from the previous deadline. + /// + /// Note that by default (unless otherwise set), tasks start life with the deadline + /// u64::MAX, which means this method will have no effect. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + /// + /// Analogous to one increment of `Ticker::every().next()`. + /// + /// TODO: Do we want to return what the deadline is? + #[must_use = "Setting deadline must be polled to be effective"] + pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + unsafe { + // Get the last value + let last = task.header().deadline.get(); + + // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave + // it for now, we can probably make this wrapping_add for performance + // reasons later. + let deadline = last.saturating_add(increment_ticks); + + // Store the new value + task.header().deadline.set(deadline); + } + Poll::Ready(()) + }) + } + + /// Get the current task's deadline as a tick value. + /// + /// This method is a future in order to access the currently executing task's + /// header which contains the deadline + pub fn get_current_task_deadline() -> impl Future { + poll_fn(move |cx| { + let task = super::task_from_waker(cx.waker()); + + // SAFETY: A task can only modify its own deadline, while the task is being + // polled, meaning that there cannot be concurrent access to the deadline. + let deadline = unsafe { + task.header().deadline.get() + }; + Poll::Ready(Self { instant_ticks: deadline }) + }) + } +} -- cgit