aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor
diff options
context:
space:
mode:
authorJames Munns <[email protected]>2025-03-20 14:32:14 +0100
committerDario Nieuwenhuis <[email protected]>2025-09-11 14:45:06 +0200
commit1f50e4d496458dbc7fccd9d028217ebfa7735471 (patch)
tree5e8824fa0dc9e39287dc7a3f28df8161ddee1aa2 /embassy-executor
parent535c80e61f17e4ee4605e00623aabeda2181352d (diff)
Implement Deadline Ranked Scheduling
This implements a minimal version of Deadline Rank Scheduling, as well as ways to access and set Deadlines. This still needs some UX improvements, but is likely Enough for testing.
Diffstat (limited to 'embassy-executor')
-rw-r--r--embassy-executor/Cargo.toml3
-rw-r--r--embassy-executor/src/raw/mod.rs12
-rw-r--r--embassy-executor/src/raw/run_queue_drs_atomics.rs141
3 files changed, 150 insertions, 6 deletions
diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml
index 1cd732dfa..db664a819 100644
--- a/embassy-executor/Cargo.toml
+++ b/embassy-executor/Cargo.toml
@@ -89,7 +89,6 @@ embassy-sync = { path = "../embassy-sync" }
89rustversion = "1.0.21" 89rustversion = "1.0.21"
90 90
91[features] 91[features]
92
93## Enable nightly-only features 92## Enable nightly-only features
94nightly = ["embassy-executor-macros/nightly"] 93nightly = ["embassy-executor-macros/nightly"]
95 94
@@ -133,4 +132,4 @@ rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time
133_any_trace = [] 132_any_trace = []
134 133
135## Enable "Deadline Rank Scheduler" 134## Enable "Deadline Rank Scheduler"
136drs-scheduler = ["dep:cordyceps"] 135drs-scheduler = ["dep:cordyceps", "dep:embassy-time-driver"]
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 894a996ec..9b8a4ea8a 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -68,6 +68,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
68#[cfg(feature = "drs-scheduler")] 68#[cfg(feature = "drs-scheduler")]
69use cordyceps::{stack, Linked}; 69use cordyceps::{stack, Linked};
70 70
71#[cfg(feature = "drs-scheduler")]
72pub use run_queue::Deadline;
73
71/// Raw task header for use in task pointers. 74/// Raw task header for use in task pointers.
72/// 75///
73/// A task can be in one of the following states: 76/// A task can be in one of the following states:
@@ -124,6 +127,9 @@ pub(crate) struct TaskHeader {
124 #[cfg(feature = "drs-scheduler")] 127 #[cfg(feature = "drs-scheduler")]
125 pub(crate) links: stack::Links<TaskHeader>, 128 pub(crate) links: stack::Links<TaskHeader>,
126 129
130 #[cfg(feature = "drs-scheduler")]
131 pub(crate) deadline: SyncUnsafeCell<u64>,
132
127 // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though 133 // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though
128 // right now cordyceps doesn't work on non-atomic systems 134 // right now cordyceps doesn't work on non-atomic systems
129 #[cfg(not(feature = "drs-scheduler"))] 135 #[cfg(not(feature = "drs-scheduler"))]
@@ -255,6 +261,8 @@ impl<F: Future + 'static> TaskStorage<F> {
255 run_queue_item: RunQueueItem::new(), 261 run_queue_item: RunQueueItem::new(),
256 #[cfg(feature = "drs-scheduler")] 262 #[cfg(feature = "drs-scheduler")]
257 links: stack::Links::new(), 263 links: stack::Links::new(),
264 #[cfg(feature = "drs-scheduler")]
265 deadline: SyncUnsafeCell::new(0u64),
258 state: State::new(), 266 state: State::new(),
259 executor: AtomicPtr::new(core::ptr::null_mut()), 267 executor: AtomicPtr::new(core::ptr::null_mut()),
260 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 268 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
@@ -352,6 +360,10 @@ impl<F: Future + 'static> AvailableTask<F> {
352 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 360 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
353 self.task.future.write_in_place(future); 361 self.task.future.write_in_place(future);
354 362
363 // TODO(AJM): Some other way of setting this? Just a placeholder
364 #[cfg(feature = "drs-scheduler")]
365 self.task.raw.deadline.set(u64::MAX);
366
355 let task = TaskRef::new(self.task); 367 let task = TaskRef::new(self.task);
356 368
357 SpawnToken::new(task) 369 SpawnToken::new(task)
diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs
index 53ada1b14..69b7b3bf0 100644
--- a/embassy-executor/src/raw/run_queue_drs_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs
@@ -1,6 +1,7 @@
1use super::{TaskHeader, TaskRef}; 1use super::{TaskHeader, TaskRef};
2use cordyceps::TransferStack; 2use cordyceps::{SortedList, TransferStack};
3 3use core::future::{Future, poll_fn};
4use core::task::Poll;
4 5
5/// Atomic task queue using a very, very simple lock-free linked-list queue: 6/// Atomic task queue using a very, very simple lock-free linked-list queue:
6/// 7///
@@ -38,10 +39,142 @@ impl RunQueue {
38 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 39 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
39 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 40 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
40 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { 41 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
41 let taken = self.stack.take_all(); 42 let mut sorted = SortedList::<TaskHeader>::new(|lhs, rhs| unsafe {
42 for taskref in taken { 43 // TODO: Do we need any kind of access control here? Not if we say that
44 // tasks can only set their own priority, which they can't do if we're in
45 // the scheduler
46 lhs.deadline.get().cmp(&rhs.deadline.get())
47 });
48
49 loop {
50 // For each loop, grab any newly pended items
51 let taken = self.stack.take_all();
52
53 // Sort these into the list - this is potentially expensive! We do an
54 // insertion sort of new items, which iterates the linked list.
55 //
56 // Something on the order of `O(n * m)`, where `n` is the number
57 // of new tasks, and `m` is the number of already pending tasks.
58 sorted.extend(taken);
59
60 // Pop the task with the SOONEST deadline. If there are no tasks
61 // pending, then we are done.
62 let Some(taskref) = sorted.pop_front() else {
63 return;
64 };
65
66 // We got one task, mark it as dequeued, and process the task.
43 taskref.header().state.run_dequeue(); 67 taskref.header().state.run_dequeue();
44 on_task(taskref); 68 on_task(taskref);
45 } 69 }
46 } 70 }
47} 71}
72
73/// A type for interacting with the deadline of the current task
74pub struct Deadline {
75 /// Deadline value in ticks, same time base and ticks as `embassy-time`
76 pub instant_ticks: u64,
77}
78
79impl Deadline {
80 /// Set the current task's deadline at exactly `instant_ticks`
81 ///
82 /// This method is a future in order to access the currently executing task's
83 /// header which contains the deadline.
84 ///
85 /// Analogous to `Timer::at`.
86 ///
87 /// TODO: Should we check/panic if the deadline is in the past?
88 #[must_use = "Setting deadline must be polled to be effective"]
89 pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future<Output = ()> {
90 poll_fn(move |cx| {
91 let task = super::task_from_waker(cx.waker());
92 // SAFETY: A task can only modify its own deadline, while the task is being
93 // polled, meaning that there cannot be concurrent access to the deadline.
94 unsafe {
95 task.header().deadline.set(instant_ticks);
96 }
97 Poll::Ready(())
98 })
99 }
100
101 /// Set the current task's deadline `duration_ticks` in the future from when
102 /// this future is polled.
103 ///
104 /// This method is a future in order to access the currently executing task's
105 /// header which contains the deadline
106 ///
107 /// Analogous to `Timer::after`
108 ///
109 /// TODO: Do we want to return what the deadline is?
110 #[must_use = "Setting deadline must be polled to be effective"]
111 pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future<Output = ()> {
112 poll_fn(move |cx| {
113 let task = super::task_from_waker(cx.waker());
114 let now = embassy_time_driver::now();
115
116 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
117 // it for now, we can probably make this wrapping_add for performance
118 // reasons later.
119 let deadline = now.saturating_add(duration_ticks);
120
121 // SAFETY: A task can only modify its own deadline, while the task is being
122 // polled, meaning that there cannot be concurrent access to the deadline.
123 unsafe {
124 task.header().deadline.set(deadline);
125 }
126 Poll::Ready(())
127 })
128 }
129
130 /// Set the current task's deadline `increment_ticks` from the previous deadline.
131 ///
132 /// Note that by default (unless otherwise set), tasks start life with the deadline
133 /// u64::MAX, which means this method will have no effect.
134 ///
135 /// This method is a future in order to access the currently executing task's
136 /// header which contains the deadline
137 ///
138 /// Analogous to one increment of `Ticker::every().next()`.
139 ///
140 /// TODO: Do we want to return what the deadline is?
141 #[must_use = "Setting deadline must be polled to be effective"]
142 pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future<Output = ()> {
143 poll_fn(move |cx| {
144 let task = super::task_from_waker(cx.waker());
145
146 // SAFETY: A task can only modify its own deadline, while the task is being
147 // polled, meaning that there cannot be concurrent access to the deadline.
148 unsafe {
149 // Get the last value
150 let last = task.header().deadline.get();
151
152 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
153 // it for now, we can probably make this wrapping_add for performance
154 // reasons later.
155 let deadline = last.saturating_add(increment_ticks);
156
157 // Store the new value
158 task.header().deadline.set(deadline);
159 }
160 Poll::Ready(())
161 })
162 }
163
164 /// Get the current task's deadline as a tick value.
165 ///
166 /// This method is a future in order to access the currently executing task's
167 /// header which contains the deadline
168 pub fn get_current_task_deadline() -> impl Future<Output = Self> {
169 poll_fn(move |cx| {
170 let task = super::task_from_waker(cx.waker());
171
172 // SAFETY: A task can only modify its own deadline, while the task is being
173 // polled, meaning that there cannot be concurrent access to the deadline.
174 let deadline = unsafe {
175 task.header().deadline.get()
176 };
177 Poll::Ready(Self { instant_ticks: deadline })
178 })
179 }
180}