1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
use core::ptr::{addr_of_mut, NonNull};
use cordyceps::sorted_list::Links;
#[cfg(feature = "edf-scheduler")]
use cordyceps::SortedList;
use cordyceps::{Linked, TransferStack};
use super::{TaskHeader, TaskRef};
/// Use `cordyceps::sorted_list::Links` as the singly linked list
/// for RunQueueItems.
pub(crate) type RunQueueItem = Links<TaskHeader>;
/// Implements the `Linked` trait, allowing for singly linked list usage
/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
/// popped atomically from the `TransferStack`.
unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
type Handle = TaskRef;
// Convert a TaskRef into a TaskHeader ptr
fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
r.ptr
}
// Convert a TaskHeader into a TaskRef
unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
TaskRef { ptr }
}
// Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
// which can be used to traverse to other TaskHeader nodes in the linked list
unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
let ptr: *mut TaskHeader = ptr.as_ptr();
NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
}
}
/// Atomic task queue using a very, very simple lock-free linked-list queue:
///
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
///
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
///
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't create fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
stack: TransferStack<TaskHeader>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
stack: TransferStack::new(),
}
}
/// Enqueues an item. Returns true if the queue was empty.
///
/// # Safety
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
self.stack.push_was_empty(task)
}
/// # Standard atomic runqueue
///
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
#[cfg(not(feature = "edf-scheduler"))]
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
let taken = self.stack.take_all();
for taskref in taken {
taskref.header().state.run_dequeue();
on_task(taskref);
}
}
/// # Earliest Deadline First Scheduler
///
/// This algorithm will loop until all enqueued tasks are processed.
///
/// Before polling a task, all currently enqueued tasks will be popped from the
/// runqueue, and will be added to the working `sorted` list, a linked-list that
/// sorts tasks by their deadline, with nearest deadline items in the front, and
/// furthest deadline items in the back.
///
/// After popping and sorting all pending tasks, the SOONEST task will be popped
/// from the front of the queue, and polled by calling `on_task` on it.
///
/// This process will repeat until the local `sorted` queue AND the global
/// runqueue are both empty, at which point this function will return.
#[cfg(feature = "edf-scheduler")]
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
// SAFETY: `deadline` can only be set through the `Deadline` interface, which
// only allows access to this value while the given task is being polled.
// This acts as mutual exclusion for access.
let mut sorted =
SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) });
loop {
// For each loop, grab any newly pended items
let taken = self.stack.take_all();
// Sort these into the list - this is potentially expensive! We do an
// insertion sort of new items, which iterates the linked list.
//
// Something on the order of `O(n * m)`, where `n` is the number
// of new tasks, and `m` is the number of already pending tasks.
sorted.extend(taken);
// Pop the task with the SOONEST deadline. If there are no tasks
// pending, then we are done.
let Some(taskref) = sorted.pop_front() else {
return;
};
// We got one task, mark it as dequeued, and process the task.
taskref.header().state.run_dequeue();
on_task(taskref);
}
}
}
|