aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2025-09-11 13:52:14 +0000
committerGitHub <[email protected]>2025-09-11 13:52:14 +0000
commit42c68622eeba3be05e8f8ccdc4072b7aa57f78d1 (patch)
tree431aa79f8343a7ed8eb948ad52dec5ef13f5869a /embassy-executor/src
parentd860530009c1bf96a20edeff22f10f738bab1503 (diff)
parente1209c5563576d18c4d033b015c9a5dd6145d581 (diff)
Merge pull request #4608 from diondokter/upstream-drs-2
[embassy-executor]: Upstream "Earliest Deadline First" Scheduler (version 2)
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/metadata.rs67
-rw-r--r--embassy-executor/src/raw/deadline.rs44
-rw-r--r--embassy-executor/src/raw/mod.rs13
-rw-r--r--embassy-executor/src/raw/run_queue.rs194
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs88
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs74
6 files changed, 316 insertions, 164 deletions
diff --git a/embassy-executor/src/metadata.rs b/embassy-executor/src/metadata.rs
index f92c9b37c..4220048a6 100644
--- a/embassy-executor/src/metadata.rs
+++ b/embassy-executor/src/metadata.rs
@@ -7,11 +7,15 @@ use core::task::Poll;
7use critical_section::Mutex; 7use critical_section::Mutex;
8 8
9use crate::raw; 9use crate::raw;
10#[cfg(feature = "scheduler-deadline")]
11use crate::raw::Deadline;
10 12
11/// Metadata associated with a task. 13/// Metadata associated with a task.
12pub struct Metadata { 14pub struct Metadata {
13 #[cfg(feature = "metadata-name")] 15 #[cfg(feature = "metadata-name")]
14 name: Mutex<Cell<Option<&'static str>>>, 16 name: Mutex<Cell<Option<&'static str>>>,
17 #[cfg(feature = "scheduler-deadline")]
18 deadline: raw::Deadline,
15} 19}
16 20
17impl Metadata { 21impl Metadata {
@@ -19,6 +23,10 @@ impl Metadata {
19 Self { 23 Self {
20 #[cfg(feature = "metadata-name")] 24 #[cfg(feature = "metadata-name")]
21 name: Mutex::new(Cell::new(None)), 25 name: Mutex::new(Cell::new(None)),
26 // NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This
27 // will be lazily initalized in `initialize_impl`
28 #[cfg(feature = "scheduler-deadline")]
29 deadline: raw::Deadline::new_unset(),
22 } 30 }
23 } 31 }
24 32
@@ -52,4 +60,63 @@ impl Metadata {
52 pub fn set_name(&self, name: &'static str) { 60 pub fn set_name(&self, name: &'static str) {
53 critical_section::with(|cs| self.name.borrow(cs).set(Some(name))) 61 critical_section::with(|cs| self.name.borrow(cs).set(Some(name)))
54 } 62 }
63
64 /// Get this task's deadline.
65 #[cfg(feature = "scheduler-deadline")]
66 pub fn deadline(&self) -> u64 {
67 self.deadline.instant_ticks()
68 }
69
70 /// Set this task's deadline.
71 ///
72 /// This method does NOT check whether the deadline has already passed.
73 #[cfg(feature = "scheduler-deadline")]
74 pub fn set_deadline(&self, instant_ticks: u64) {
75 self.deadline.set(instant_ticks);
76 }
77
78 /// Remove this task's deadline.
79 /// This brings it back to the defaul where it's not scheduled ahead of other tasks.
80 #[cfg(feature = "scheduler-deadline")]
81 pub fn unset_deadline(&self) {
82 self.deadline.set(Deadline::UNSET_TICKS);
83 }
84
85 /// Set this task's deadline `duration_ticks` in the future from when
86 /// this future is polled. This deadline is saturated to the max tick value.
87 ///
88 /// Analogous to `Timer::after`.
89 #[cfg(all(feature = "scheduler-deadline", feature = "embassy-time-driver"))]
90 pub fn set_deadline_after(&self, duration_ticks: u64) {
91 let now = embassy_time_driver::now();
92
93 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
94 // it for now, we can probably make this wrapping_add for performance
95 // reasons later.
96 let deadline = now.saturating_add(duration_ticks);
97
98 self.set_deadline(deadline);
99 }
100
101 /// Set the this task's deadline `increment_ticks` from the previous deadline.
102 ///
103 /// This deadline is saturated to the max tick value.
104 ///
105 /// Note that by default (unless otherwise set), tasks start life with the deadline
106 /// not set, which means this method will have no effect.
107 ///
108 /// Analogous to one increment of `Ticker::every().next()`.
109 ///
110 /// Returns the deadline that was set.
111 #[cfg(feature = "scheduler-deadline")]
112 pub fn increment_deadline(&self, duration_ticks: u64) {
113 let last = self.deadline();
114
115 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
116 // it for now, we can probably make this wrapping_add for performance
117 // reasons later.
118 let deadline = last.saturating_add(duration_ticks);
119
120 self.set_deadline(deadline);
121 }
55} 122}
diff --git a/embassy-executor/src/raw/deadline.rs b/embassy-executor/src/raw/deadline.rs
new file mode 100644
index 000000000..cc89fadb0
--- /dev/null
+++ b/embassy-executor/src/raw/deadline.rs
@@ -0,0 +1,44 @@
1use core::sync::atomic::{AtomicU32, Ordering};
2
3/// A type for interacting with the deadline of the current task
4///
5/// Requires the `scheduler-deadline` feature.
6///
7/// Note: Interacting with the deadline should be done locally in a task.
8/// In theory you could try to set or read the deadline from another task,
9/// but that will result in weird (though not unsound) behavior.
10pub(crate) struct Deadline {
11 instant_ticks_hi: AtomicU32,
12 instant_ticks_lo: AtomicU32,
13}
14
15impl Deadline {
16 pub(crate) const fn new(instant_ticks: u64) -> Self {
17 Self {
18 instant_ticks_hi: AtomicU32::new((instant_ticks >> 32) as u32),
19 instant_ticks_lo: AtomicU32::new(instant_ticks as u32),
20 }
21 }
22
23 pub(crate) const fn new_unset() -> Self {
24 Self::new(Self::UNSET_TICKS)
25 }
26
27 pub(crate) fn set(&self, instant_ticks: u64) {
28 self.instant_ticks_hi
29 .store((instant_ticks >> 32) as u32, Ordering::Relaxed);
30 self.instant_ticks_lo.store(instant_ticks as u32, Ordering::Relaxed);
31 }
32
33 /// Deadline value in ticks, same time base and ticks as `embassy-time`
34 pub(crate) fn instant_ticks(&self) -> u64 {
35 let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64;
36 let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64;
37
38 (hi << 32) | lo
39 }
40
41 /// Sentinel value representing an "unset" deadline, which has lower priority
42 /// than any other set deadline value
43 pub(crate) const UNSET_TICKS: u64 = u64::MAX;
44}
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 4280c5750..51a363385 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,8 +7,6 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 10mod run_queue;
13 11
14#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")] 12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
@@ -28,6 +26,9 @@ pub(crate) mod util;
28#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
29mod waker; 27mod waker;
30 28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
31use core::future::Future; 32use core::future::Future;
32use core::marker::PhantomData; 33use core::marker::PhantomData;
33use core::mem; 34use core::mem;
@@ -38,6 +39,8 @@ use core::sync::atomic::AtomicPtr;
38use core::sync::atomic::Ordering; 39use core::sync::atomic::Ordering;
39use core::task::{Context, Poll, Waker}; 40use core::task::{Context, Poll, Waker};
40 41
42#[cfg(feature = "scheduler-deadline")]
43pub(crate) use deadline::Deadline;
41use embassy_executor_timer_queue::TimerQueueItem; 44use embassy_executor_timer_queue::TimerQueueItem;
42#[cfg(feature = "arch-avr")] 45#[cfg(feature = "arch-avr")]
43use portable_atomic::AtomicPtr; 46use portable_atomic::AtomicPtr;
@@ -96,6 +99,7 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
96pub(crate) struct TaskHeader { 99pub(crate) struct TaskHeader {
97 pub(crate) state: State, 100 pub(crate) state: State,
98 pub(crate) run_queue_item: RunQueueItem, 101 pub(crate) run_queue_item: RunQueueItem,
102
99 pub(crate) executor: AtomicPtr<SyncExecutor>, 103 pub(crate) executor: AtomicPtr<SyncExecutor>,
100 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 104 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
101 105
@@ -296,6 +300,11 @@ impl<F: Future + 'static> AvailableTask<F> {
296 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 300 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
297 self.task.future.write_in_place(future); 301 self.task.future.write_in_place(future);
298 302
303 // By default, deadlines are set to the maximum value, so that any task WITH
304 // a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline
305 #[cfg(feature = "scheduler-deadline")]
306 self.task.raw.metadata.unset_deadline();
307
299 let task = TaskRef::new(self.task); 308 let task = TaskRef::new(self.task);
300 309
301 SpawnToken::new(task) 310 SpawnToken::new(task)
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
new file mode 100644
index 000000000..d98c26f73
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -0,0 +1,194 @@
1use core::ptr::{addr_of_mut, NonNull};
2
3use cordyceps::sorted_list::Links;
4use cordyceps::Linked;
5#[cfg(feature = "scheduler-deadline")]
6use cordyceps::SortedList;
7
8#[cfg(target_has_atomic = "ptr")]
9type TransferStack<T> = cordyceps::TransferStack<T>;
10
11#[cfg(not(target_has_atomic = "ptr"))]
12type TransferStack<T> = MutexTransferStack<T>;
13
14use super::{TaskHeader, TaskRef};
15
16/// Use `cordyceps::sorted_list::Links` as the singly linked list
17/// for RunQueueItems.
18pub(crate) type RunQueueItem = Links<TaskHeader>;
19
20/// Implements the `Linked` trait, allowing for singly linked list usage
21/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
22/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
23/// popped atomically from the `TransferStack`.
24unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
25 type Handle = TaskRef;
26
27 // Convert a TaskRef into a TaskHeader ptr
28 fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
29 r.ptr
30 }
31
32 // Convert a TaskHeader into a TaskRef
33 unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
34 TaskRef { ptr }
35 }
36
37 // Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
38 // which can be used to traverse to other TaskHeader nodes in the linked list
39 unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
40 let ptr: *mut TaskHeader = ptr.as_ptr();
41 NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
42 }
43}
44
45/// Atomic task queue using a very, very simple lock-free linked-list queue:
46///
47/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
48///
49/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
50/// null. Then the batch is iterated following the next pointers until null is reached.
51///
52/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
53/// for our purposes: it can't create fairness problems since the next batch won't run until the
54/// current batch is completely processed, so even if a task enqueues itself instantly (for example
55/// by waking its own waker) can't prevent other tasks from running.
56pub(crate) struct RunQueue {
57 stack: TransferStack<TaskHeader>,
58}
59
60impl RunQueue {
61 pub const fn new() -> Self {
62 Self {
63 stack: TransferStack::new(),
64 }
65 }
66
67 /// Enqueues an item. Returns true if the queue was empty.
68 ///
69 /// # Safety
70 ///
71 /// `item` must NOT be already enqueued in any queue.
72 #[inline(always)]
73 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
74 self.stack.push_was_empty(
75 task,
76 #[cfg(not(target_has_atomic = "ptr"))]
77 _tok,
78 )
79 }
80
81 /// # Standard atomic runqueue
82 ///
83 /// Empty the queue, then call `on_task` for each task that was in the queue.
84 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
85 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
86 #[cfg(not(feature = "scheduler-deadline"))]
87 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
88 let taken = self.stack.take_all();
89 for taskref in taken {
90 run_dequeue(&taskref);
91 on_task(taskref);
92 }
93 }
94
95 /// # Earliest Deadline First Scheduler
96 ///
97 /// This algorithm will loop until all enqueued tasks are processed.
98 ///
99 /// Before polling a task, all currently enqueued tasks will be popped from the
100 /// runqueue, and will be added to the working `sorted` list, a linked-list that
101 /// sorts tasks by their deadline, with nearest deadline items in the front, and
102 /// furthest deadline items in the back.
103 ///
104 /// After popping and sorting all pending tasks, the SOONEST task will be popped
105 /// from the front of the queue, and polled by calling `on_task` on it.
106 ///
107 /// This process will repeat until the local `sorted` queue AND the global
108 /// runqueue are both empty, at which point this function will return.
109 #[cfg(feature = "scheduler-deadline")]
110 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
111 let mut sorted =
112 SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| lhs.metadata.deadline().cmp(&rhs.metadata.deadline()));
113
114 loop {
115 // For each loop, grab any newly pended items
116 let taken = self.stack.take_all();
117
118 // Sort these into the list - this is potentially expensive! We do an
119 // insertion sort of new items, which iterates the linked list.
120 //
121 // Something on the order of `O(n * m)`, where `n` is the number
122 // of new tasks, and `m` is the number of already pending tasks.
123 sorted.extend(taken);
124
125 // Pop the task with the SOONEST deadline. If there are no tasks
126 // pending, then we are done.
127 let Some(taskref) = sorted.pop_front() else {
128 return;
129 };
130
131 // We got one task, mark it as dequeued, and process the task.
132 run_dequeue(&taskref);
133 on_task(taskref);
134 }
135 }
136}
137
138/// atomic state does not require a cs...
139#[cfg(target_has_atomic = "ptr")]
140#[inline(always)]
141fn run_dequeue(taskref: &TaskRef) {
142 taskref.header().state.run_dequeue();
143}
144
145/// ...while non-atomic state does
146#[cfg(not(target_has_atomic = "ptr"))]
147#[inline(always)]
148fn run_dequeue(taskref: &TaskRef) {
149 critical_section::with(|cs| {
150 taskref.header().state.run_dequeue(cs);
151 })
152}
153
154/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
155#[cfg(not(target_has_atomic = "ptr"))]
156struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
157 inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
158}
159
160#[cfg(not(target_has_atomic = "ptr"))]
161impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
162 const fn new() -> Self {
163 Self {
164 inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
165 }
166 }
167
168 /// Push an item to the transfer stack, returning whether the stack was previously empty
169 fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
170 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
171 // for the lifetime of the token, but does NOT protect against re-entrant access.
172 // However, we never *return* the reference, nor do we recurse (or call another method
173 // like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
174 // presence of the critical section is sufficient to guarantee exclusive access to
175 // the `inner` field for the purposes of this function.
176 let inner = unsafe { &mut *self.inner.borrow(token).get() };
177 let is_empty = inner.is_empty();
178 inner.push(item);
179 is_empty
180 }
181
182 fn take_all(&self) -> cordyceps::Stack<T> {
183 critical_section::with(|cs| {
184 // SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
185 // for the lifetime of the token, but does NOT protect against re-entrant access.
186 // However, we never *return* the reference, nor do we recurse (or call another method
187 // like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
188 // presence of the critical section is sufficient to guarantee exclusive access to
189 // the `inner` field for the purposes of this function.
190 let inner = unsafe { &mut *self.inner.borrow(cs).get() };
191 inner.take_all()
192 })
193 }
194}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
deleted file mode 100644
index ce511d79a..000000000
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ /dev/null
@@ -1,88 +0,0 @@
1use core::ptr;
2use core::ptr::NonNull;
3use core::sync::atomic::{AtomicPtr, Ordering};
4
5use super::{TaskHeader, TaskRef};
6use crate::raw::util::SyncUnsafeCell;
7
8pub(crate) struct RunQueueItem {
9 next: SyncUnsafeCell<Option<TaskRef>>,
10}
11
12impl RunQueueItem {
13 pub const fn new() -> Self {
14 Self {
15 next: SyncUnsafeCell::new(None),
16 }
17 }
18}
19
20/// Atomic task queue using a very, very simple lock-free linked-list queue:
21///
22/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
23///
24/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
25/// null. Then the batch is iterated following the next pointers until null is reached.
26///
27/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
28/// for our purposes: it can't create fairness problems since the next batch won't run until the
29/// current batch is completely processed, so even if a task enqueues itself instantly (for example
30/// by waking its own waker) can't prevent other tasks from running.
31pub(crate) struct RunQueue {
32 head: AtomicPtr<TaskHeader>,
33}
34
35impl RunQueue {
36 pub const fn new() -> Self {
37 Self {
38 head: AtomicPtr::new(ptr::null_mut()),
39 }
40 }
41
42 /// Enqueues an item. Returns true if the queue was empty.
43 ///
44 /// # Safety
45 ///
46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
49 let mut was_empty = false;
50
51 self.head
52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
53 was_empty = prev.is_null();
54 unsafe {
55 // safety: the pointer is either null or valid
56 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
57 // safety: there are no concurrent accesses to `next`
58 task.header().run_queue_item.next.set(prev);
59 }
60 Some(task.as_ptr() as *mut _)
61 })
62 .ok();
63
64 was_empty
65 }
66
67 /// Empty the queue, then call `on_task` for each task that was in the queue.
68 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
69 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
70 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
71 // Atomically empty the queue.
72 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
73
74 // safety: the pointer is either null or valid
75 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
76
77 // Iterate the linked list of tasks that were previously in the queue.
78 while let Some(task) = next {
79 // If the task re-enqueues itself, the `next` pointer will get overwritten.
80 // Therefore, first read the next pointer, and only then process the task.
81 // safety: there are no concurrent accesses to `next`
82 next = unsafe { task.header().run_queue_item.next.get() };
83
84 task.header().state.run_dequeue();
85 on_task(task);
86 }
87 }
88}
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
deleted file mode 100644
index 86c4085ed..000000000
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ /dev/null
@@ -1,74 +0,0 @@
1use core::cell::Cell;
2
3use critical_section::{CriticalSection, Mutex};
4
5use super::TaskRef;
6
7pub(crate) struct RunQueueItem {
8 next: Mutex<Cell<Option<TaskRef>>>,
9}
10
11impl RunQueueItem {
12 pub const fn new() -> Self {
13 Self {
14 next: Mutex::new(Cell::new(None)),
15 }
16 }
17}
18
19/// Atomic task queue using a very, very simple lock-free linked-list queue:
20///
21/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
22///
23/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
24/// null. Then the batch is iterated following the next pointers until null is reached.
25///
26/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
27/// for our purposes: it can't create fairness problems since the next batch won't run until the
28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
29/// by waking its own waker) can't prevent other tasks from running.
30pub(crate) struct RunQueue {
31 head: Mutex<Cell<Option<TaskRef>>>,
32}
33
34impl RunQueue {
35 pub const fn new() -> Self {
36 Self {
37 head: Mutex::new(Cell::new(None)),
38 }
39 }
40
41 /// Enqueues an item. Returns true if the queue was empty.
42 ///
43 /// # Safety
44 ///
45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
48 let prev = self.head.borrow(cs).replace(Some(task));
49 task.header().run_queue_item.next.borrow(cs).set(prev);
50
51 prev.is_none()
52 }
53
54 /// Empty the queue, then call `on_task` for each task that was in the queue.
55 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
56 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
57 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
58 // Atomically empty the queue.
59 let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
60
61 // Iterate the linked list of tasks that were previously in the queue.
62 while let Some(task) = next {
63 // If the task re-enqueues itself, the `next` pointer will get overwritten.
64 // Therefore, first read the next pointer, and only then process the task.
65
66 critical_section::with(|cs| {
67 next = task.header().run_queue_item.next.borrow(cs).get();
68 task.header().state.run_dequeue(cs);
69 });
70
71 on_task(task);
72 }
73 }
74}