aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/run_queue.rs
diff options
context:
space:
mode:
author1-rafael-1 <[email protected]>2025-09-15 20:07:18 +0200
committer1-rafael-1 <[email protected]>2025-09-15 20:07:18 +0200
commit6bb3d2c0720fa082f27d3cdb70f516058497ec87 (patch)
tree5a1e255cff999b00800f203b91a759c720c973e5 /embassy-executor/src/raw/run_queue.rs
parenteb685574601d98c44faed9a3534d056199b46e20 (diff)
parent92a6fd2946f2cbb15359290f68aa360953da2ff7 (diff)
Merge branch 'main' into rp2040-rtc-alarm
Diffstat (limited to 'embassy-executor/src/raw/run_queue.rs')
-rw-r--r--embassy-executor/src/raw/run_queue.rs213
1 files changed, 213 insertions, 0 deletions
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..b8b052310
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,213 @@
1use core::ptr::{addr_of_mut, NonNull};
2
3use cordyceps::sorted_list::Links;
4use cordyceps::Linked;
5#[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
6use cordyceps::SortedList;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = MutexTransferStack<T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
74 self.stack.push_was_empty(
75 task,
76 #[cfg(not(target_has_atomic = "ptr"))]
77 _tok,
78 )
79 }
80
81 /// # Standard atomic runqueue
82 ///
83 /// Empty the queue, then call `on_task` for each task that was in the queue.
84 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
85 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
86 #[cfg(not(any(feature = "scheduler-priority", feature = "scheduler-deadline")))]
87 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
88 let taken = self.stack.take_all();
89 for taskref in taken {
90 run_dequeue(&taskref);
91 on_task(taskref);
92 }
93 }
94
95 /// # Earliest Deadline First Scheduler
96 ///
97 /// This algorithm will loop until all enqueued tasks are processed.
98 ///
99 /// Before polling a task, all currently enqueued tasks will be popped from the
100 /// runqueue, and will be added to the working `sorted` list, a linked-list that
101 /// sorts tasks by their deadline, with nearest deadline items in the front, and
102 /// furthest deadline items in the back.
103 ///
104 /// After popping and sorting all pending tasks, the SOONEST task will be popped
105 /// from the front of the queue, and polled by calling `on_task` on it.
106 ///
107 /// This process will repeat until the local `sorted` queue AND the global
108 /// runqueue are both empty, at which point this function will return.
109 #[cfg(any(feature = "scheduler-priority", feature = "scheduler-deadline"))]
110 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
111 let mut sorted = SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| {
112 // compare by priority first
113 #[cfg(feature = "scheduler-priority")]
114 {
115 let lp = lhs.metadata.priority();
116 let rp = rhs.metadata.priority();
117 if lp != rp {
118 return lp.cmp(&rp).reverse();
119 }
120 }
121 // compare deadlines in case of tie.
122 #[cfg(feature = "scheduler-deadline")]
123 {
124 let ld = lhs.metadata.deadline();
125 let rd = rhs.metadata.deadline();
126 if ld != rd {
127 return ld.cmp(&rd);
128 }
129 }
130 core::cmp::Ordering::Equal
131 });
132
133 loop {
134 // For each loop, grab any newly pended items
135 let taken = self.stack.take_all();
136
137 // Sort these into the list - this is potentially expensive! We do an
138 // insertion sort of new items, which iterates the linked list.
139 //
140 // Something on the order of `O(n * m)`, where `n` is the number
141 // of new tasks, and `m` is the number of already pending tasks.
142 sorted.extend(taken);
143
144 // Pop the task with the SOONEST deadline. If there are no tasks
145 // pending, then we are done.
146 let Some(taskref) = sorted.pop_front() else {
147 return;
148 };
149
150 // We got one task, mark it as dequeued, and process the task.
151 run_dequeue(&taskref);
152 on_task(taskref);
153 }
154 }
155}
156
157/// atomic state does not require a cs...
158#[cfg(target_has_atomic = "ptr")]
159#[inline(always)]
160fn run_dequeue(taskref: &TaskRef) {
161 taskref.header().state.run_dequeue();
162}
163
164/// ...while non-atomic state does
165#[cfg(not(target_has_atomic = "ptr"))]
166#[inline(always)]
167fn run_dequeue(taskref: &TaskRef) {
168 critical_section::with(|cs| {
169 taskref.header().state.run_dequeue(cs);
170 })
171}
172
173/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
174#[cfg(not(target_has_atomic = "ptr"))]
175struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
176 inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
177}
178
179#[cfg(not(target_has_atomic = "ptr"))]
180impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
181 const fn new() -> Self {
182 Self {
183 inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
184 }
185 }
186
187 /// Push an item to the transfer stack, returning whether the stack was previously empty
188 fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
189 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
190 // for the lifetime of the token, but does NOT protect against re-entrant access.
191 // However, we never *return* the reference, nor do we recurse (or call another method
192 // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
193 // presence of the critical section is sufficient to guarantee exclusive access to
194 // the `inner` field for the purposes of this function.
195 let inner = unsafe { &mut *self.inner.borrow(token).get() };
196 let is_empty = inner.is_empty();
197 inner.push(item);
198 is_empty
199 }
200
201 fn take_all(&self) -> cordyceps::Stack<T> {
202 critical_section::with(|cs| {
203 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
204 // for the lifetime of the token, but does NOT protect against re-entrant access.
205 // However, we never *return* the reference, nor do we recurse (or call another method
206 // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
207 // presence of the critical section is sufficient to guarantee exclusive access to
208 // the `inner` field for the purposes of this function.
209 let inner = unsafe { &mut *self.inner.borrow(cs).get() };
210 inner.take_all()
211 })
212 }
213}