aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/run_queue_atomics.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw/run_queue_atomics.rs')
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs110
1 files changed, 68 insertions, 42 deletions
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
index ce511d79a..bc5d38250 100644
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_atomics.rs
@@ -1,19 +1,36 @@
1use core::ptr; 1use core::ptr::{addr_of_mut, NonNull};
2use core::ptr::NonNull; 2
3use core::sync::atomic::{AtomicPtr, Ordering}; 3use cordyceps::sorted_list::Links;
4use cordyceps::{Linked, SortedList, TransferStack};
4 5
5use super::{TaskHeader, TaskRef}; 6use super::{TaskHeader, TaskRef};
6use crate::raw::util::SyncUnsafeCell;
7 7
8pub(crate) struct RunQueueItem { 8/// Use `cordyceps::sorted_list::Links` as the singly linked list
9 next: SyncUnsafeCell<Option<TaskRef>>, 9/// for RunQueueItems.
10} 10pub(crate) type RunQueueItem = Links<TaskHeader>;
11 11
12impl RunQueueItem { 12/// Implements the `Linked` trait, allowing for singly linked list usage
13 pub const fn new() -> Self { 13/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
14 Self { 14/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
15 next: SyncUnsafeCell::new(None), 15/// popped atomically from the `TransferStack`.
16 } 16unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
17 type Handle = TaskRef;
18
19 // Convert a TaskRef into a TaskHeader ptr
20 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
21 r.ptr
22 }
23
24 // Convert a TaskHeader into a TaskRef
25 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
26 TaskRef { ptr }
27 }
28
29 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
30 // which can be used to traverse to other TaskHeader nodes in the linked list
31 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
32 let ptr: *mut TaskHeader = ptr.as_ptr();
33 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
17 } 34 }
18} 35}
19 36
@@ -29,13 +46,13 @@ impl RunQueueItem {
29/// current batch is completely processed, so even if a task enqueues itself instantly (for example 46/// current batch is completely processed, so even if a task enqueues itself instantly (for example
30/// by waking its own waker) can't prevent other tasks from running. 47/// by waking its own waker) can't prevent other tasks from running.
31pub(crate) struct RunQueue { 48pub(crate) struct RunQueue {
32 head: AtomicPtr<TaskHeader>, 49 stack: TransferStack<TaskHeader>,
33} 50}
34 51
35impl RunQueue { 52impl RunQueue {
36 pub const fn new() -> Self { 53 pub const fn new() -> Self {
37 Self { 54 Self {
38 head: AtomicPtr::new(ptr::null_mut()), 55 stack: TransferStack::new(),
39 } 56 }
40 } 57 }
41 58
@@ -46,43 +63,52 @@ impl RunQueue {
46 /// `item` must NOT be already enqueued in any queue. 63 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)] 64 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { 65 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
49 let mut was_empty = false; 66 self.stack.push_was_empty(task)
50
51 self.head
52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
53 was_empty = prev.is_null();
54 unsafe {
55 // safety: the pointer is either null or valid
56 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
57 // safety: there are no concurrent accesses to `next`
58 task.header().run_queue_item.next.set(prev);
59 }
60 Some(task.as_ptr() as *mut _)
61 })
62 .ok();
63
64 was_empty
65 } 67 }
66 68
67 /// Empty the queue, then call `on_task` for each task that was in the queue. 69 /// Empty the queue, then call `on_task` for each task that was in the queue.
68 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 70 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
69 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 71 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
72 #[cfg(not(feature = "drs-scheduler"))]
70 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { 73 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
71 // Atomically empty the queue. 74 let taken = self.stack.take_all();
72 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 75 for taskref in taken {
76 taskref.header().state.run_dequeue();
77 on_task(taskref);
78 }
79 }
80
81 /// Empty the queue, then call `on_task` for each task that was in the queue.
82 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
83 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
84 #[cfg(feature = "drs-scheduler")]
85 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
86 // SAFETY: `deadline` can only be set through the `Deadline` interface, which
87 // only allows access to this value while the given task is being polled.
88 // This acts as mutual exclusion for access.
89 let mut sorted =
90 SortedList::<TaskHeader>::new_custom(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) });
91
92 loop {
93 // For each loop, grab any newly pended items
94 let taken = self.stack.take_all();
73 95
74 // safety: the pointer is either null or valid 96 // Sort these into the list - this is potentially expensive! We do an
75 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; 97 // insertion sort of new items, which iterates the linked list.
98 //
99 // Something on the order of `O(n * m)`, where `n` is the number
100 // of new tasks, and `m` is the number of already pending tasks.
101 sorted.extend(taken);
76 102
77 // Iterate the linked list of tasks that were previously in the queue. 103 // Pop the task with the SOONEST deadline. If there are no tasks
78 while let Some(task) = next { 104 // pending, then we are done.
79 // If the task re-enqueues itself, the `next` pointer will get overwritten. 105 let Some(taskref) = sorted.pop_front() else {
80 // Therefore, first read the next pointer, and only then process the task. 106 return;
81 // safety: there are no concurrent accesses to `next` 107 };
82 next = unsafe { task.header().run_queue_item.next.get() };
83 108
84 task.header().state.run_dequeue(); 109 // We got one task, mark it as dequeued, and process the task.
85 on_task(task); 110 taskref.header().state.run_dequeue();
111 on_task(taskref);
86 } 112 }
87 } 113 }
88} 114}