diff options
| author | James Munns <[email protected]> | 2025-03-20 09:47:56 +0100 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2025-09-11 14:45:06 +0200 |
| commit | 535c80e61f17e4ee4605e00623aabeda2181352d (patch) | |
| tree | e51920fe675fa5737fb40c885e0199b0ec711abf /embassy-executor/src | |
| parent | d860530009c1bf96a20edeff22f10f738bab1503 (diff) | |
Add initial DRS scheduler placeholder
* Start hacking in cordyceps
This adds a third kind of runqueue, for now it should work the same as the
current "atomics" runqueue, but uses a cordyceps TransferStack instead of
the existing home-rolled linked list.
* Clean up, use new cordyceps feature
* A bit more cleanup
* Update docs to be more clear
Diffstat (limited to 'embassy-executor/src')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 64 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_drs_atomics.rs | 47 |
2 files changed, 107 insertions, 4 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 4280c5750..894a996ec 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -7,7 +7,14 @@ | |||
| 7 | //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe | 7 | //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe |
| 8 | //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. | 8 | //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. |
| 9 | 9 | ||
| 10 | #[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] | 10 | #[cfg_attr( |
| 11 | all(not(feature = "drs-scheduler"), target_has_atomic = "ptr"), | ||
| 12 | path = "run_queue_atomics.rs", | ||
| 13 | )] | ||
| 14 | #[cfg_attr( | ||
| 15 | all(feature = "drs-scheduler", target_has_atomic = "ptr"), | ||
| 16 | path = "run_queue_drs_atomics.rs", | ||
| 17 | )] | ||
| 11 | #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] | 18 | #[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] |
| 12 | mod run_queue; | 19 | mod run_queue; |
| 13 | 20 | ||
| @@ -33,6 +40,8 @@ use core::marker::PhantomData; | |||
| 33 | use core::mem; | 40 | use core::mem; |
| 34 | use core::pin::Pin; | 41 | use core::pin::Pin; |
| 35 | use core::ptr::NonNull; | 42 | use core::ptr::NonNull; |
| 43 | #[cfg(feature = "drs-scheduler")] | ||
| 44 | use core::ptr::addr_of_mut; | ||
| 36 | #[cfg(not(feature = "arch-avr"))] | 45 | #[cfg(not(feature = "arch-avr"))] |
| 37 | use core::sync::atomic::AtomicPtr; | 46 | use core::sync::atomic::AtomicPtr; |
| 38 | use core::sync::atomic::Ordering; | 47 | use core::sync::atomic::Ordering; |
| @@ -42,7 +51,9 @@ use embassy_executor_timer_queue::TimerQueueItem; | |||
| 42 | #[cfg(feature = "arch-avr")] | 51 | #[cfg(feature = "arch-avr")] |
| 43 | use portable_atomic::AtomicPtr; | 52 | use portable_atomic::AtomicPtr; |
| 44 | 53 | ||
| 45 | use self::run_queue::{RunQueue, RunQueueItem}; | 54 | use self::run_queue::RunQueue; |
| 55 | #[cfg(not(feature = "drs-scheduler"))] | ||
| 56 | use self::run_queue::RunQueueItem; | ||
| 46 | use self::state::State; | 57 | use self::state::State; |
| 47 | use self::util::{SyncUnsafeCell, UninitCell}; | 58 | use self::util::{SyncUnsafeCell, UninitCell}; |
| 48 | pub use self::waker::task_from_waker; | 59 | pub use self::waker::task_from_waker; |
| @@ -54,6 +65,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static | |||
| 54 | unsafe { task_from_waker(waker).timer_queue_item() } | 65 | unsafe { task_from_waker(waker).timer_queue_item() } |
| 55 | } | 66 | } |
| 56 | 67 | ||
| 68 | #[cfg(feature = "drs-scheduler")] | ||
| 69 | use cordyceps::{stack, Linked}; | ||
| 70 | |||
| 57 | /// Raw task header for use in task pointers. | 71 | /// Raw task header for use in task pointers. |
| 58 | /// | 72 | /// |
| 59 | /// A task can be in one of the following states: | 73 | /// A task can be in one of the following states: |
| @@ -93,9 +107,29 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static | |||
| 93 | /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` | 107 | /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` |
| 94 | /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. | 108 | /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`. |
| 95 | /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` | 109 | /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue` |
| 110 | #[cfg_attr(feature = "drs-scheduler", repr(C))] | ||
| 96 | pub(crate) struct TaskHeader { | 111 | pub(crate) struct TaskHeader { |
| 97 | pub(crate) state: State, | 112 | // TODO(AJM): Make a decision whether we want to support the spicier "pointer recast"/"type punning" |
| 113 | // method of implementing the `cordyceps::Linked` trait or not. | ||
| 114 | // | ||
| 115 | // Currently, I do the safer version with `addr_of_mut!`, which doesn't REQUIRE that the first | ||
| 116 | // element is the `links` field, at the potential cost of a little extra pointer math. | ||
| 117 | // | ||
| 118 | // The optimizer *might* (total guess) notice that we are always doing an offset of zero in the | ||
| 119 | // call to `addr_of_mut` in the `impl Linked for TaskHeader` below, and get the best of both worlds, | ||
| 120 | // but right now this is maybe a little over cautious. | ||
| 121 | // | ||
| 122 | // See https://docs.rs/cordyceps/latest/cordyceps/trait.Linked.html#implementing-linkedlinks for | ||
| 123 | // more context on the choices here. | ||
| 124 | #[cfg(feature = "drs-scheduler")] | ||
| 125 | pub(crate) links: stack::Links<TaskHeader>, | ||
| 126 | |||
| 127 | // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though | ||
| 128 | // right now cordyceps doesn't work on non-atomic systems | ||
| 129 | #[cfg(not(feature = "drs-scheduler"))] | ||
| 98 | pub(crate) run_queue_item: RunQueueItem, | 130 | pub(crate) run_queue_item: RunQueueItem, |
| 131 | |||
| 132 | pub(crate) state: State, | ||
| 99 | pub(crate) executor: AtomicPtr<SyncExecutor>, | 133 | pub(crate) executor: AtomicPtr<SyncExecutor>, |
| 100 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, | 134 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 101 | 135 | ||
| @@ -108,6 +142,25 @@ pub(crate) struct TaskHeader { | |||
| 108 | all_tasks_next: AtomicPtr<TaskHeader>, | 142 | all_tasks_next: AtomicPtr<TaskHeader>, |
| 109 | } | 143 | } |
| 110 | 144 | ||
| 145 | #[cfg(feature = "drs-scheduler")] | ||
| 146 | unsafe impl Linked<stack::Links<TaskHeader>> for TaskHeader { | ||
| 147 | type Handle = TaskRef; | ||
| 148 | |||
| 149 | fn into_ptr(r: Self::Handle) -> NonNull<Self> { | ||
| 150 | r.ptr.cast() | ||
| 151 | } | ||
| 152 | |||
| 153 | unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle { | ||
| 154 | let ptr: NonNull<TaskHeader> = ptr; | ||
| 155 | TaskRef { ptr } | ||
| 156 | } | ||
| 157 | |||
| 158 | unsafe fn links(ptr: NonNull<Self>) -> NonNull<stack::Links<TaskHeader>> { | ||
| 159 | let ptr: *mut TaskHeader = ptr.as_ptr(); | ||
| 160 | NonNull::new_unchecked(addr_of_mut!((*ptr).links)) | ||
| 161 | } | ||
| 162 | } | ||
| 163 | |||
| 111 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. | 164 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 112 | #[derive(Debug, Clone, Copy, PartialEq)] | 165 | #[derive(Debug, Clone, Copy, PartialEq)] |
| 113 | pub struct TaskRef { | 166 | pub struct TaskRef { |
| @@ -198,8 +251,11 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 198 | pub const fn new() -> Self { | 251 | pub const fn new() -> Self { |
| 199 | Self { | 252 | Self { |
| 200 | raw: TaskHeader { | 253 | raw: TaskHeader { |
| 201 | state: State::new(), | 254 | #[cfg(not(feature = "drs-scheduler"))] |
| 202 | run_queue_item: RunQueueItem::new(), | 255 | run_queue_item: RunQueueItem::new(), |
| 256 | #[cfg(feature = "drs-scheduler")] | ||
| 257 | links: stack::Links::new(), | ||
| 258 | state: State::new(), | ||
| 203 | executor: AtomicPtr::new(core::ptr::null_mut()), | 259 | executor: AtomicPtr::new(core::ptr::null_mut()), |
| 204 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 260 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| 205 | poll_fn: SyncUnsafeCell::new(None), | 261 | poll_fn: SyncUnsafeCell::new(None), |
diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs new file mode 100644 index 000000000..53ada1b14 --- /dev/null +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs | |||
| @@ -0,0 +1,47 @@ | |||
| 1 | use super::{TaskHeader, TaskRef}; | ||
| 2 | use cordyceps::TransferStack; | ||
| 3 | |||
| 4 | |||
| 5 | /// Atomic task queue using a very, very simple lock-free linked-list queue: | ||
| 6 | /// | ||
| 7 | /// To enqueue a task, task.next is set to the old head, and head is atomically set to task. | ||
| 8 | /// | ||
| 9 | /// Dequeuing is done in batches: the queue is emptied by atomically replacing head with | ||
| 10 | /// null. Then the batch is iterated following the next pointers until null is reached. | ||
| 11 | /// | ||
| 12 | /// Note that batches will be iterated in the reverse order as they were enqueued. This is OK | ||
| 13 | /// for our purposes: it can't create fairness problems since the next batch won't run until the | ||
| 14 | /// current batch is completely processed, so even if a task enqueues itself instantly (for example | ||
| 15 | /// by waking its own waker) can't prevent other tasks from running. | ||
| 16 | pub(crate) struct RunQueue { | ||
| 17 | stack: TransferStack<TaskHeader>, | ||
| 18 | } | ||
| 19 | |||
| 20 | impl RunQueue { | ||
| 21 | pub const fn new() -> Self { | ||
| 22 | Self { | ||
| 23 | stack: TransferStack::new(), | ||
| 24 | } | ||
| 25 | } | ||
| 26 | |||
| 27 | /// Enqueues an item. Returns true if the queue was empty. | ||
| 28 | /// | ||
| 29 | /// # Safety | ||
| 30 | /// | ||
| 31 | /// `item` must NOT be already enqueued in any queue. | ||
| 32 | #[inline(always)] | ||
| 33 | pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { | ||
| 34 | self.stack.push_was_empty(task) | ||
| 35 | } | ||
| 36 | |||
| 37 | /// Empty the queue, then call `on_task` for each task that was in the queue. | ||
| 38 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue | ||
| 39 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. | ||
| 40 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { | ||
| 41 | let taken = self.stack.take_all(); | ||
| 42 | for taskref in taken { | ||
| 43 | taskref.header().state.run_dequeue(); | ||
| 44 | on_task(taskref); | ||
| 45 | } | ||
| 46 | } | ||
| 47 | } | ||
