aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Munns <[email protected]>2025-03-20 09:47:56 +0100
committerDario Nieuwenhuis <[email protected]>2025-09-11 14:45:06 +0200
commit535c80e61f17e4ee4605e00623aabeda2181352d (patch)
treee51920fe675fa5737fb40c885e0199b0ec711abf
parentd860530009c1bf96a20edeff22f10f738bab1503 (diff)
Add initial DRS scheduler placeholder
* Start hacking in cordyceps This adds a third kind of runqueue, for now it should work the same as the current "atomics" runqueue, but uses a cordyceps TransferStack instead of the existing home-rolled linked list. * Clean up, use new cordyceps feature * A bit more cleanup * Update docs to be more clear
-rw-r--r--embassy-executor/Cargo.toml9
-rw-r--r--embassy-executor/src/raw/mod.rs64
-rw-r--r--embassy-executor/src/raw/run_queue_drs_atomics.rs47
3 files changed, 116 insertions, 4 deletions
diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml
index 41636a26f..1cd732dfa 100644
--- a/embassy-executor/Cargo.toml
+++ b/embassy-executor/Cargo.toml
@@ -76,6 +76,12 @@ js-sys = { version = "0.3", optional = true }
76# arch-avr dependencies 76# arch-avr dependencies
77avr-device = { version = "0.7.0", optional = true } 77avr-device = { version = "0.7.0", optional = true }
78 78
79[dependencies.cordyceps]
80version = "0.3"
81git = "https://github.com/hawkw/mycelium"
82rev = "aaad19480d175bfc290f1d4dc2d435c6eb3d9fc5"
83optional = true
84
79[dev-dependencies] 85[dev-dependencies]
80critical-section = { version = "1.1", features = ["std"] } 86critical-section = { version = "1.1", features = ["std"] }
81trybuild = "1.0" 87trybuild = "1.0"
@@ -125,3 +131,6 @@ trace = ["_any_trace"]
125## Enable support for rtos-trace framework 131## Enable support for rtos-trace framework
126rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time-driver"] 132rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time-driver"]
127_any_trace = [] 133_any_trace = []
134
135## Enable "Deadline Rank Scheduler"
136drs-scheduler = ["dep:cordyceps"]
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 4280c5750..894a996ec 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,7 +7,14 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] 10#[cfg_attr(
11 all(not(feature = "drs-scheduler"), target_has_atomic = "ptr"),
12 path = "run_queue_atomics.rs",
13)]
14#[cfg_attr(
15 all(feature = "drs-scheduler", target_has_atomic = "ptr"),
16 path = "run_queue_drs_atomics.rs",
17)]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] 18#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 19mod run_queue;
13 20
@@ -33,6 +40,8 @@ use core::marker::PhantomData;
33use core::mem; 40use core::mem;
34use core::pin::Pin; 41use core::pin::Pin;
35use core::ptr::NonNull; 42use core::ptr::NonNull;
43#[cfg(feature = "drs-scheduler")]
44use core::ptr::addr_of_mut;
36#[cfg(not(feature = "arch-avr"))] 45#[cfg(not(feature = "arch-avr"))]
37use core::sync::atomic::AtomicPtr; 46use core::sync::atomic::AtomicPtr;
38use core::sync::atomic::Ordering; 47use core::sync::atomic::Ordering;
@@ -42,7 +51,9 @@ use embassy_executor_timer_queue::TimerQueueItem;
42#[cfg(feature = "arch-avr")] 51#[cfg(feature = "arch-avr")]
43use portable_atomic::AtomicPtr; 52use portable_atomic::AtomicPtr;
44 53
45use self::run_queue::{RunQueue, RunQueueItem}; 54use self::run_queue::RunQueue;
55#[cfg(not(feature = "drs-scheduler"))]
56use self::run_queue::RunQueueItem;
46use self::state::State; 57use self::state::State;
47use self::util::{SyncUnsafeCell, UninitCell}; 58use self::util::{SyncUnsafeCell, UninitCell};
48pub use self::waker::task_from_waker; 59pub use self::waker::task_from_waker;
@@ -54,6 +65,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
54 unsafe { task_from_waker(waker).timer_queue_item() } 65 unsafe { task_from_waker(waker).timer_queue_item() }
55} 66}
56 67
68#[cfg(feature = "drs-scheduler")]
69use cordyceps::{stack, Linked};
70
57/// Raw task header for use in task pointers. 71/// Raw task header for use in task pointers.
58/// 72///
59/// A task can be in one of the following states: 73/// A task can be in one of the following states:
@@ -93,9 +107,29 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
93/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` 107/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
94/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. 108/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
95/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` 109/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
110#[cfg_attr(feature = "drs-scheduler", repr(C))]
96pub(crate) struct TaskHeader { 111pub(crate) struct TaskHeader {
97 pub(crate) state: State, 112 // TODO(AJM): Make a decision whether we want to support the spicier "pointer recast"/"type punning"
113 // method of implementing the `cordyceps::Linked` trait or not.
114 //
115 // Currently, I do the safer version with `addr_of_mut!`, which doesn't REQUIRE that the first
116 // element is the `links` field, at the potential cost of a little extra pointer math.
117 //
118 // The optimizer *might* (total guess) notice that we are always doing an offset of zero in the
119 // call to `addr_of_mut` in the `impl Linked for TaskHeader` below, and get the best of both worlds,
120 // but right now this is maybe a little over cautious.
121 //
122 // See https://docs.rs/cordyceps/latest/cordyceps/trait.Linked.html#implementing-linkedlinks for
123 // more context on the choices here.
124 #[cfg(feature = "drs-scheduler")]
125 pub(crate) links: stack::Links<TaskHeader>,
126
127 // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though
128 // right now cordyceps doesn't work on non-atomic systems
129 #[cfg(not(feature = "drs-scheduler"))]
98 pub(crate) run_queue_item: RunQueueItem, 130 pub(crate) run_queue_item: RunQueueItem,
131
132 pub(crate) state: State,
99 pub(crate) executor: AtomicPtr<SyncExecutor>, 133 pub(crate) executor: AtomicPtr<SyncExecutor>,
100 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 134 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
101 135
@@ -108,6 +142,25 @@ pub(crate) struct TaskHeader {
108 all_tasks_next: AtomicPtr<TaskHeader>, 142 all_tasks_next: AtomicPtr<TaskHeader>,
109} 143}
110 144
145#[cfg(feature = "drs-scheduler")]
146unsafe impl Linked<stack::Links<TaskHeader>> for TaskHeader {
147 type Handle = TaskRef;
148
149 fn into_ptr(r: Self::Handle) -> NonNull<Self> {
150 r.ptr.cast()
151 }
152
153 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
154 let ptr: NonNull<TaskHeader> = ptr;
155 TaskRef { ptr }
156 }
157
158 unsafe fn links(ptr: NonNull<Self>) -> NonNull<stack::Links<TaskHeader>> {
159 let ptr: *mut TaskHeader = ptr.as_ptr();
160 NonNull::new_unchecked(addr_of_mut!((*ptr).links))
161 }
162}
163
111/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 164/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
112#[derive(Debug, Clone, Copy, PartialEq)] 165#[derive(Debug, Clone, Copy, PartialEq)]
113pub struct TaskRef { 166pub struct TaskRef {
@@ -198,8 +251,11 @@ impl<F: Future + 'static> TaskStorage<F> {
198 pub const fn new() -> Self { 251 pub const fn new() -> Self {
199 Self { 252 Self {
200 raw: TaskHeader { 253 raw: TaskHeader {
201 state: State::new(), 254 #[cfg(not(feature = "drs-scheduler"))]
202 run_queue_item: RunQueueItem::new(), 255 run_queue_item: RunQueueItem::new(),
256 #[cfg(feature = "drs-scheduler")]
257 links: stack::Links::new(),
258 state: State::new(),
203 executor: AtomicPtr::new(core::ptr::null_mut()), 259 executor: AtomicPtr::new(core::ptr::null_mut()),
204 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 260 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
205 poll_fn: SyncUnsafeCell::new(None), 261 poll_fn: SyncUnsafeCell::new(None),
diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs
new file mode 100644
index 000000000..53ada1b14
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs
@@ -0,0 +1,47 @@
1use super::{TaskHeader, TaskRef};
2use cordyceps::TransferStack;
3
4
5/// Atomic task queue using a very, very simple lock-free linked-list queue:
6///
7/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
8///
9/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
10/// null. Then the batch is iterated following the next pointers until null is reached.
11///
12/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
13/// for our purposes: it can't create fairness problems since the next batch won't run until the
14/// current batch is completely processed, so even if a task enqueues itself instantly (for example
15/// by waking its own waker) can't prevent other tasks from running.
16pub(crate) struct RunQueue {
17 stack: TransferStack<TaskHeader>,
18}
19
20impl RunQueue {
21 pub const fn new() -> Self {
22 Self {
23 stack: TransferStack::new(),
24 }
25 }
26
27 /// Enqueues an item. Returns true if the queue was empty.
28 ///
29 /// # Safety
30 ///
31 /// `item` must NOT be already enqueued in any queue.
32 #[inline(always)]
33 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
34 self.stack.push_was_empty(task)
35 }
36
37 /// Empty the queue, then call `on_task` for each task that was in the queue.
38 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
39 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
40 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
41 let taken = self.stack.take_all();
42 for taskref in taken {
43 taskref.header().state.run_dequeue();
44 on_task(taskref);
45 }
46 }
47}