aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/run_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw/run_queue.rs')
-rw-r--r--embassy-executor/src/raw/run_queue.rs194
1 files changed, 194 insertions, 0 deletions
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..d98c26f73
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,194 @@
1use core::ptr::{addr_of_mut, NonNull};
2
3use cordyceps::sorted_list::Links;
4use cordyceps::Linked;
5#[cfg(feature = "scheduler-deadline")]
6use cordyceps::SortedList;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = MutexTransferStack<T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
74 self.stack.push_was_empty(
75 task,
76 #[cfg(not(target_has_atomic = "ptr"))]
77 _tok,
78 )
79 }
80
81 /// # Standard atomic runqueue
82 ///
83 /// Empty the queue, then call `on_task` for each task that was in the queue.
84 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
85 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
86 #[cfg(not(feature = "scheduler-deadline"))]
87 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
88 let taken = self.stack.take_all();
89 for taskref in taken {
90 run_dequeue(&taskref);
91 on_task(taskref);
92 }
93 }
94
95 /// # Earliest Deadline First Scheduler
96 ///
97 /// This algorithm will loop until all enqueued tasks are processed.
98 ///
99 /// Before polling a task, all currently enqueued tasks will be popped from the
100 /// runqueue, and will be added to the working `sorted` list, a linked-list that
101 /// sorts tasks by their deadline, with nearest deadline items in the front, and
102 /// furthest deadline items in the back.
103 ///
104 /// After popping and sorting all pending tasks, the SOONEST task will be popped
105 /// from the front of the queue, and polled by calling `on_task` on it.
106 ///
107 /// This process will repeat until the local `sorted` queue AND the global
108 /// runqueue are both empty, at which point this function will return.
109 #[cfg(feature = "scheduler-deadline")]
110 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
111 let mut sorted =
112 SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| lhs.metadata.deadline().cmp(&rhs.metadata.deadline()));
113
114 loop {
115 // For each loop, grab any newly pended items
116 let taken = self.stack.take_all();
117
118 // Sort these into the list - this is potentially expensive! We do an
119 // insertion sort of new items, which iterates the linked list.
120 //
121 // Something on the order of `O(n * m)`, where `n` is the number
122 // of new tasks, and `m` is the number of already pending tasks.
123 sorted.extend(taken);
124
125 // Pop the task with the SOONEST deadline. If there are no tasks
126 // pending, then we are done.
127 let Some(taskref) = sorted.pop_front() else {
128 return;
129 };
130
131 // We got one task, mark it as dequeued, and process the task.
132 run_dequeue(&taskref);
133 on_task(taskref);
134 }
135 }
136}
137
138/// atomic state does not require a cs...
139#[cfg(target_has_atomic = "ptr")]
140#[inline(always)]
141fn run_dequeue(taskref: &TaskRef) {
142 taskref.header().state.run_dequeue();
143}
144
145/// ...while non-atomic state does
146#[cfg(not(target_has_atomic = "ptr"))]
147#[inline(always)]
148fn run_dequeue(taskref: &TaskRef) {
149 critical_section::with(|cs| {
150 taskref.header().state.run_dequeue(cs);
151 })
152}
153
154/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
155#[cfg(not(target_has_atomic = "ptr"))]
156struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
157 inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
158}
159
160#[cfg(not(target_has_atomic = "ptr"))]
161impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
162 const fn new() -> Self {
163 Self {
164 inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
165 }
166 }
167
168 /// Push an item to the transfer stack, returning whether the stack was previously empty
169 fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
170 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
171 // for the lifetime of the token, but does NOT protect against re-entrant access.
172 // However, we never *return* the reference, nor do we recurse (or call another method
173 // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
174 // presence of the critical section is sufficient to guarantee exclusive access to
175 // the `inner` field for the purposes of this function.
176 let inner = unsafe { &mut *self.inner.borrow(token).get() };
177 let is_empty = inner.is_empty();
178 inner.push(item);
179 is_empty
180 }
181
182 fn take_all(&self) -> cordyceps::Stack<T> {
183 critical_section::with(|cs| {
184 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
185 // for the lifetime of the token, but does NOT protect against re-entrant access.
186 // However, we never *return* the reference, nor do we recurse (or call another method
187 // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
188 // presence of the critical section is sufficient to guarantee exclusive access to
189 // the `inner` field for the purposes of this function.
190 let inner = unsafe { &mut *self.inner.borrow(cs).get() };
191 inner.take_all()
192 })
193 }
194}