aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/run_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw/run_queue.rs')
-rw-r--r--embassy-executor/src/raw/run_queue.rs151
1 files changed, 151 insertions, 0 deletions
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..f630041e0
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,151 @@
1use core::ptr::{addr_of_mut, NonNull};
2
3use cordyceps::sorted_list::Links;
4use cordyceps::Linked;
5#[cfg(feature = "edf-scheduler")]
6use cordyceps::SortedList;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = cordyceps::TransferStack<mutex::raw_impls::cs::CriticalSectionRawMutex, T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
74 self.stack.push_was_empty(task)
75 }
76
77 /// # Standard atomic runqueue
78 ///
79 /// Empty the queue, then call `on_task` for each task that was in the queue.
80 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
81 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
82 #[cfg(not(feature = "edf-scheduler"))]
83 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
84 let taken = self.stack.take_all();
85 for taskref in taken {
86 run_dequeue(&taskref);
87 on_task(taskref);
88 }
89 }
90
91 /// # Earliest Deadline First Scheduler
92 ///
93 /// This algorithm will loop until all enqueued tasks are processed.
94 ///
95 /// Before polling a task, all currently enqueued tasks will be popped from the
96 /// runqueue, and will be added to the working `sorted` list, a linked-list that
97 /// sorts tasks by their deadline, with nearest deadline items in the front, and
98 /// furthest deadline items in the back.
99 ///
100 /// After popping and sorting all pending tasks, the SOONEST task will be popped
101 /// from the front of the queue, and polled by calling `on_task` on it.
102 ///
103 /// This process will repeat until the local `sorted` queue AND the global
104 /// runqueue are both empty, at which point this function will return.
105 #[cfg(feature = "edf-scheduler")]
106 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
107 // SAFETY: `deadline` can only be set through the `Deadline` interface, which
108 // only allows access to this value while the given task is being polled.
109 // This acts as mutual exclusion for access.
110 let mut sorted =
111 SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) });
112
113 loop {
114 // For each loop, grab any newly pended items
115 let taken = self.stack.take_all();
116
117 // Sort these into the list - this is potentially expensive! We do an
118 // insertion sort of new items, which iterates the linked list.
119 //
120 // Something on the order of `O(n * m)`, where `n` is the number
121 // of new tasks, and `m` is the number of already pending tasks.
122 sorted.extend(taken);
123
124 // Pop the task with the SOONEST deadline. If there are no tasks
125 // pending, then we are done.
126 let Some(taskref) = sorted.pop_front() else {
127 return;
128 };
129
130 // We got one task, mark it as dequeued, and process the task.
131 run_dequeue(&taskref);
132 on_task(taskref);
133 }
134 }
135}
136
137/// atomic state does not require a cs...
138#[cfg(target_has_atomic = "ptr")]
139#[inline(always)]
140fn run_dequeue(taskref: &TaskRef) {
141 taskref.header().state.run_dequeue();
142}
143
144/// ...while non-atomic state does
145#[cfg(not(target_has_atomic = "ptr"))]
146#[inline(always)]
147fn run_dequeue(taskref: &TaskRef) {
148 critical_section::with(|cs| {
149 taskref.header().state.run_dequeue(cs);
150 })
151}