diff options
| -rw-r--r-- | embassy-executor/Cargo.toml | 3 | ||||
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 12 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_drs_atomics.rs | 141 |
3 files changed, 150 insertions, 6 deletions
diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml index 1cd732dfa..db664a819 100644 --- a/embassy-executor/Cargo.toml +++ b/embassy-executor/Cargo.toml | |||
| @@ -89,7 +89,6 @@ embassy-sync = { path = "../embassy-sync" } | |||
| 89 | rustversion = "1.0.21" | 89 | rustversion = "1.0.21" |
| 90 | 90 | ||
| 91 | [features] | 91 | [features] |
| 92 | |||
| 93 | ## Enable nightly-only features | 92 | ## Enable nightly-only features |
| 94 | nightly = ["embassy-executor-macros/nightly"] | 93 | nightly = ["embassy-executor-macros/nightly"] |
| 95 | 94 | ||
| @@ -133,4 +132,4 @@ rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time | |||
| 133 | _any_trace = [] | 132 | _any_trace = [] |
| 134 | 133 | ||
| 135 | ## Enable "Deadline Rank Scheduler" | 134 | ## Enable "Deadline Rank Scheduler" |
| 136 | drs-scheduler = ["dep:cordyceps"] | 135 | drs-scheduler = ["dep:cordyceps", "dep:embassy-time-driver"] |
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 894a996ec..9b8a4ea8a 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -68,6 +68,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static | |||
| 68 | #[cfg(feature = "drs-scheduler")] | 68 | #[cfg(feature = "drs-scheduler")] |
| 69 | use cordyceps::{stack, Linked}; | 69 | use cordyceps::{stack, Linked}; |
| 70 | 70 | ||
| 71 | #[cfg(feature = "drs-scheduler")] | ||
| 72 | pub use run_queue::Deadline; | ||
| 73 | |||
| 71 | /// Raw task header for use in task pointers. | 74 | /// Raw task header for use in task pointers. |
| 72 | /// | 75 | /// |
| 73 | /// A task can be in one of the following states: | 76 | /// A task can be in one of the following states: |
| @@ -124,6 +127,9 @@ pub(crate) struct TaskHeader { | |||
| 124 | #[cfg(feature = "drs-scheduler")] | 127 | #[cfg(feature = "drs-scheduler")] |
| 125 | pub(crate) links: stack::Links<TaskHeader>, | 128 | pub(crate) links: stack::Links<TaskHeader>, |
| 126 | 129 | ||
| 130 | #[cfg(feature = "drs-scheduler")] | ||
| 131 | pub(crate) deadline: SyncUnsafeCell<u64>, | ||
| 132 | |||
| 127 | // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though | 133 | // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though |
| 128 | // right now cordyceps doesn't work on non-atomic systems | 134 | // right now cordyceps doesn't work on non-atomic systems |
| 129 | #[cfg(not(feature = "drs-scheduler"))] | 135 | #[cfg(not(feature = "drs-scheduler"))] |
| @@ -255,6 +261,8 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 255 | run_queue_item: RunQueueItem::new(), | 261 | run_queue_item: RunQueueItem::new(), |
| 256 | #[cfg(feature = "drs-scheduler")] | 262 | #[cfg(feature = "drs-scheduler")] |
| 257 | links: stack::Links::new(), | 263 | links: stack::Links::new(), |
| 264 | #[cfg(feature = "drs-scheduler")] | ||
| 265 | deadline: SyncUnsafeCell::new(0u64), | ||
| 258 | state: State::new(), | 266 | state: State::new(), |
| 259 | executor: AtomicPtr::new(core::ptr::null_mut()), | 267 | executor: AtomicPtr::new(core::ptr::null_mut()), |
| 260 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 268 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| @@ -352,6 +360,10 @@ impl<F: Future + 'static> AvailableTask<F> { | |||
| 352 | self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); | 360 | self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); |
| 353 | self.task.future.write_in_place(future); | 361 | self.task.future.write_in_place(future); |
| 354 | 362 | ||
| 363 | // TODO(AJM): Some other way of setting this? Just a placeholder | ||
| 364 | #[cfg(feature = "drs-scheduler")] | ||
| 365 | self.task.raw.deadline.set(u64::MAX); | ||
| 366 | |||
| 355 | let task = TaskRef::new(self.task); | 367 | let task = TaskRef::new(self.task); |
| 356 | 368 | ||
| 357 | SpawnToken::new(task) | 369 | SpawnToken::new(task) |
diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs index 53ada1b14..69b7b3bf0 100644 --- a/embassy-executor/src/raw/run_queue_drs_atomics.rs +++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs | |||
| @@ -1,6 +1,7 @@ | |||
| 1 | use super::{TaskHeader, TaskRef}; | 1 | use super::{TaskHeader, TaskRef}; |
| 2 | use cordyceps::TransferStack; | 2 | use cordyceps::{SortedList, TransferStack}; |
| 3 | 3 | use core::future::{Future, poll_fn}; | |
| 4 | use core::task::Poll; | ||
| 4 | 5 | ||
| 5 | /// Atomic task queue using a very, very simple lock-free linked-list queue: | 6 | /// Atomic task queue using a very, very simple lock-free linked-list queue: |
| 6 | /// | 7 | /// |
| @@ -38,10 +39,142 @@ impl RunQueue { | |||
| 38 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue | 39 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue |
| 39 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. | 40 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. |
| 40 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { | 41 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { |
| 41 | let taken = self.stack.take_all(); | 42 | let mut sorted = SortedList::<TaskHeader>::new(|lhs, rhs| unsafe { |
| 42 | for taskref in taken { | 43 | // TODO: Do we need any kind of access control here? Not if we say that |
| 44 | // tasks can only set their own priority, which they can't do if we're in | ||
| 45 | // the scheduler | ||
| 46 | lhs.deadline.get().cmp(&rhs.deadline.get()) | ||
| 47 | }); | ||
| 48 | |||
| 49 | loop { | ||
| 50 | // For each loop, grab any newly pended items | ||
| 51 | let taken = self.stack.take_all(); | ||
| 52 | |||
| 53 | // Sort these into the list - this is potentially expensive! We do an | ||
| 54 | // insertion sort of new items, which iterates the linked list. | ||
| 55 | // | ||
| 56 | // Something on the order of `O(n * m)`, where `n` is the number | ||
| 57 | // of new tasks, and `m` is the number of already pending tasks. | ||
| 58 | sorted.extend(taken); | ||
| 59 | |||
| 60 | // Pop the task with the SOONEST deadline. If there are no tasks | ||
| 61 | // pending, then we are done. | ||
| 62 | let Some(taskref) = sorted.pop_front() else { | ||
| 63 | return; | ||
| 64 | }; | ||
| 65 | |||
| 66 | // We got one task, mark it as dequeued, and process the task. | ||
| 43 | taskref.header().state.run_dequeue(); | 67 | taskref.header().state.run_dequeue(); |
| 44 | on_task(taskref); | 68 | on_task(taskref); |
| 45 | } | 69 | } |
| 46 | } | 70 | } |
| 47 | } | 71 | } |
| 72 | |||
| 73 | /// A type for interacting with the deadline of the current task | ||
| 74 | pub struct Deadline { | ||
| 75 | /// Deadline value in ticks, same time base and ticks as `embassy-time` | ||
| 76 | pub instant_ticks: u64, | ||
| 77 | } | ||
| 78 | |||
| 79 | impl Deadline { | ||
| 80 | /// Set the current task's deadline at exactly `instant_ticks` | ||
| 81 | /// | ||
| 82 | /// This method is a future in order to access the currently executing task's | ||
| 83 | /// header which contains the deadline. | ||
| 84 | /// | ||
| 85 | /// Analogous to `Timer::at`. | ||
| 86 | /// | ||
| 87 | /// TODO: Should we check/panic if the deadline is in the past? | ||
| 88 | #[must_use = "Setting deadline must be polled to be effective"] | ||
| 89 | pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future<Output = ()> { | ||
| 90 | poll_fn(move |cx| { | ||
| 91 | let task = super::task_from_waker(cx.waker()); | ||
| 92 | // SAFETY: A task can only modify its own deadline, while the task is being | ||
| 93 | // polled, meaning that there cannot be concurrent access to the deadline. | ||
| 94 | unsafe { | ||
| 95 | task.header().deadline.set(instant_ticks); | ||
| 96 | } | ||
| 97 | Poll::Ready(()) | ||
| 98 | }) | ||
| 99 | } | ||
| 100 | |||
| 101 | /// Set the current task's deadline `duration_ticks` in the future from when | ||
| 102 | /// this future is polled. | ||
| 103 | /// | ||
| 104 | /// This method is a future in order to access the currently executing task's | ||
| 105 | /// header which contains the deadline | ||
| 106 | /// | ||
| 107 | /// Analogous to `Timer::after` | ||
| 108 | /// | ||
| 109 | /// TODO: Do we want to return what the deadline is? | ||
| 110 | #[must_use = "Setting deadline must be polled to be effective"] | ||
| 111 | pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future<Output = ()> { | ||
| 112 | poll_fn(move |cx| { | ||
| 113 | let task = super::task_from_waker(cx.waker()); | ||
| 114 | let now = embassy_time_driver::now(); | ||
| 115 | |||
| 116 | // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave | ||
| 117 | // it for now, we can probably make this wrapping_add for performance | ||
| 118 | // reasons later. | ||
| 119 | let deadline = now.saturating_add(duration_ticks); | ||
| 120 | |||
| 121 | // SAFETY: A task can only modify its own deadline, while the task is being | ||
| 122 | // polled, meaning that there cannot be concurrent access to the deadline. | ||
| 123 | unsafe { | ||
| 124 | task.header().deadline.set(deadline); | ||
| 125 | } | ||
| 126 | Poll::Ready(()) | ||
| 127 | }) | ||
| 128 | } | ||
| 129 | |||
| 130 | /// Set the current task's deadline `increment_ticks` from the previous deadline. | ||
| 131 | /// | ||
| 132 | /// Note that by default (unless otherwise set), tasks start life with the deadline | ||
| 133 | /// u64::MAX, which means this method will have no effect. | ||
| 134 | /// | ||
| 135 | /// This method is a future in order to access the currently executing task's | ||
| 136 | /// header which contains the deadline | ||
| 137 | /// | ||
| 138 | /// Analogous to one increment of `Ticker::every().next()`. | ||
| 139 | /// | ||
| 140 | /// TODO: Do we want to return what the deadline is? | ||
| 141 | #[must_use = "Setting deadline must be polled to be effective"] | ||
| 142 | pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future<Output = ()> { | ||
| 143 | poll_fn(move |cx| { | ||
| 144 | let task = super::task_from_waker(cx.waker()); | ||
| 145 | |||
| 146 | // SAFETY: A task can only modify its own deadline, while the task is being | ||
| 147 | // polled, meaning that there cannot be concurrent access to the deadline. | ||
| 148 | unsafe { | ||
| 149 | // Get the last value | ||
| 150 | let last = task.header().deadline.get(); | ||
| 151 | |||
| 152 | // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave | ||
| 153 | // it for now, we can probably make this wrapping_add for performance | ||
| 154 | // reasons later. | ||
| 155 | let deadline = last.saturating_add(increment_ticks); | ||
| 156 | |||
| 157 | // Store the new value | ||
| 158 | task.header().deadline.set(deadline); | ||
| 159 | } | ||
| 160 | Poll::Ready(()) | ||
| 161 | }) | ||
| 162 | } | ||
| 163 | |||
| 164 | /// Get the current task's deadline as a tick value. | ||
| 165 | /// | ||
| 166 | /// This method is a future in order to access the currently executing task's | ||
| 167 | /// header which contains the deadline | ||
| 168 | pub fn get_current_task_deadline() -> impl Future<Output = Self> { | ||
| 169 | poll_fn(move |cx| { | ||
| 170 | let task = super::task_from_waker(cx.waker()); | ||
| 171 | |||
| 172 | // SAFETY: A task can only modify its own deadline, while the task is being | ||
| 173 | // polled, meaning that there cannot be concurrent access to the deadline. | ||
| 174 | let deadline = unsafe { | ||
| 175 | task.header().deadline.get() | ||
| 176 | }; | ||
| 177 | Poll::Ready(Self { instant_ticks: deadline }) | ||
| 178 | }) | ||
| 179 | } | ||
| 180 | } | ||
