use core::ptr::{addr_of_mut, NonNull}; use cordyceps::sorted_list::Links; use cordyceps::Linked; #[cfg(feature = "edf-scheduler")] use cordyceps::SortedList; #[cfg(target_has_atomic = "ptr")] type TransferStack = cordyceps::TransferStack; #[cfg(not(target_has_atomic = "ptr"))] type TransferStack = MutexTransferStack; use super::{TaskHeader, TaskRef}; /// Use `cordyceps::sorted_list::Links` as the singly linked list /// for RunQueueItems. pub(crate) type RunQueueItem = Links; /// Implements the `Linked` trait, allowing for singly linked list usage /// of any of cordyceps' `TransferStack` (used for the atomic runqueue), /// `SortedList` (used with the DRS scheduler), or `Stack`, which is /// popped atomically from the `TransferStack`. unsafe impl Linked> for TaskHeader { type Handle = TaskRef; // Convert a TaskRef into a TaskHeader ptr fn into_ptr(r: TaskRef) -> NonNull { r.ptr } // Convert a TaskHeader into a TaskRef unsafe fn from_ptr(ptr: NonNull) -> TaskRef { TaskRef { ptr } } // Given a pointer to a TaskHeader, obtain a pointer to the Links structure, // which can be used to traverse to other TaskHeader nodes in the linked list unsafe fn links(ptr: NonNull) -> NonNull> { let ptr: *mut TaskHeader = ptr.as_ptr(); NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item)) } } /// Atomic task queue using a very, very simple lock-free linked-list queue: /// /// To enqueue a task, task.next is set to the old head, and head is atomically set to task. /// /// Dequeuing is done in batches: the queue is emptied by atomically replacing head with /// null. Then the batch is iterated following the next pointers until null is reached. /// /// Note that batches will be iterated in the reverse order as they were enqueued. This is OK /// for our purposes: it can't create fairness problems since the next batch won't run until the /// current batch is completely processed, so even if a task enqueues itself instantly (for example /// by waking its own waker) can't prevent other tasks from running. pub(crate) struct RunQueue { stack: TransferStack, } impl RunQueue { pub const fn new() -> Self { Self { stack: TransferStack::new(), } } /// Enqueues an item. Returns true if the queue was empty. /// /// # Safety /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool { self.stack.push_was_empty( task, #[cfg(not(target_has_atomic = "ptr"))] _tok, ) } /// # Standard atomic runqueue /// /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. #[cfg(not(feature = "edf-scheduler"))] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { let taken = self.stack.take_all(); for taskref in taken { run_dequeue(&taskref); on_task(taskref); } } /// # Earliest Deadline First Scheduler /// /// This algorithm will loop until all enqueued tasks are processed. /// /// Before polling a task, all currently enqueued tasks will be popped from the /// runqueue, and will be added to the working `sorted` list, a linked-list that /// sorts tasks by their deadline, with nearest deadline items in the front, and /// furthest deadline items in the back. /// /// After popping and sorting all pending tasks, the SOONEST task will be popped /// from the front of the queue, and polled by calling `on_task` on it. /// /// This process will repeat until the local `sorted` queue AND the global /// runqueue are both empty, at which point this function will return. #[cfg(feature = "edf-scheduler")] pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // SAFETY: `deadline` can only be set through the `Deadline` interface, which // only allows access to this value while the given task is being polled. // This acts as mutual exclusion for access. let mut sorted = SortedList::::new_with_cmp(|lhs, rhs| unsafe { lhs.deadline.get().cmp(&rhs.deadline.get()) }); loop { // For each loop, grab any newly pended items let taken = self.stack.take_all(); // Sort these into the list - this is potentially expensive! We do an // insertion sort of new items, which iterates the linked list. // // Something on the order of `O(n * m)`, where `n` is the number // of new tasks, and `m` is the number of already pending tasks. sorted.extend(taken); // Pop the task with the SOONEST deadline. If there are no tasks // pending, then we are done. let Some(taskref) = sorted.pop_front() else { return; }; // We got one task, mark it as dequeued, and process the task. run_dequeue(&taskref); on_task(taskref); } } } /// atomic state does not require a cs... #[cfg(target_has_atomic = "ptr")] #[inline(always)] fn run_dequeue(taskref: &TaskRef) { taskref.header().state.run_dequeue(); } /// ...while non-atomic state does #[cfg(not(target_has_atomic = "ptr"))] #[inline(always)] fn run_dequeue(taskref: &TaskRef) { critical_section::with(|cs| { taskref.header().state.run_dequeue(cs); }) } /// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex #[cfg(not(target_has_atomic = "ptr"))] struct MutexTransferStack>> { inner: critical_section::Mutex>>, } #[cfg(not(target_has_atomic = "ptr"))] impl>> MutexTransferStack { const fn new() -> Self { Self { inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())), } } /// Push an item to the transfer stack, returning whether the stack was previously empty fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool { /// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access /// for the lifetime of the token, but does NOT protect against re-entrant access. /// However, we never *return* the reference, nor do we recurse (or call another method /// like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the /// presence of the critical section is sufficient to guarantee exclusive access to /// the `inner` field for the purposes of this function let mut guard = unsafe { &mut *self.inner.borrow(token).get() }; let is_empty = guard.is_empty(); guard.push(item); is_empty } fn take_all(&self) -> cordyceps::Stack { critical_section::with(|cs| { /// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access /// for the lifetime of the token, but does NOT protect against re-entrant access. /// However, we never *return* the reference, nor do we recurse (or call another method /// like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the /// presence of the critical section is sufficient to guarantee exclusive access to /// the `inner` field for the purposes of this function let mut guard = unsafe { &mut *self.inner.borrow(cs).get() }; guard.take_all() }) } }