aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw')
-rw-r--r--embassy-executor/src/raw/mod.rs12
-rw-r--r--embassy-executor/src/raw/run_queue_drs_atomics.rs141
2 files changed, 149 insertions, 4 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 894a996ec..9b8a4ea8a 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -68,6 +68,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
68#[cfg(feature = "drs-scheduler")] 68#[cfg(feature = "drs-scheduler")]
69use cordyceps::{stack, Linked}; 69use cordyceps::{stack, Linked};
70 70
71#[cfg(feature = "drs-scheduler")]
72pub use run_queue::Deadline;
73
71/// Raw task header for use in task pointers. 74/// Raw task header for use in task pointers.
72/// 75///
73/// A task can be in one of the following states: 76/// A task can be in one of the following states:
@@ -124,6 +127,9 @@ pub(crate) struct TaskHeader {
124 #[cfg(feature = "drs-scheduler")] 127 #[cfg(feature = "drs-scheduler")]
125 pub(crate) links: stack::Links<TaskHeader>, 128 pub(crate) links: stack::Links<TaskHeader>,
126 129
130 #[cfg(feature = "drs-scheduler")]
131 pub(crate) deadline: SyncUnsafeCell<u64>,
132
127 // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though 133 // TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though
128 // right now cordyceps doesn't work on non-atomic systems 134 // right now cordyceps doesn't work on non-atomic systems
129 #[cfg(not(feature = "drs-scheduler"))] 135 #[cfg(not(feature = "drs-scheduler"))]
@@ -255,6 +261,8 @@ impl<F: Future + 'static> TaskStorage<F> {
255 run_queue_item: RunQueueItem::new(), 261 run_queue_item: RunQueueItem::new(),
256 #[cfg(feature = "drs-scheduler")] 262 #[cfg(feature = "drs-scheduler")]
257 links: stack::Links::new(), 263 links: stack::Links::new(),
264 #[cfg(feature = "drs-scheduler")]
265 deadline: SyncUnsafeCell::new(0u64),
258 state: State::new(), 266 state: State::new(),
259 executor: AtomicPtr::new(core::ptr::null_mut()), 267 executor: AtomicPtr::new(core::ptr::null_mut()),
260 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 268 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
@@ -352,6 +360,10 @@ impl<F: Future + 'static> AvailableTask<F> {
352 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 360 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
353 self.task.future.write_in_place(future); 361 self.task.future.write_in_place(future);
354 362
363 // TODO(AJM): Some other way of setting this? Just a placeholder
364 #[cfg(feature = "drs-scheduler")]
365 self.task.raw.deadline.set(u64::MAX);
366
355 let task = TaskRef::new(self.task); 367 let task = TaskRef::new(self.task);
356 368
357 SpawnToken::new(task) 369 SpawnToken::new(task)
diff --git a/embassy-executor/src/raw/run_queue_drs_atomics.rs b/embassy-executor/src/raw/run_queue_drs_atomics.rs
index 53ada1b14..69b7b3bf0 100644
--- a/embassy-executor/src/raw/run_queue_drs_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_drs_atomics.rs
@@ -1,6 +1,7 @@
1use super::{TaskHeader, TaskRef}; 1use super::{TaskHeader, TaskRef};
2use cordyceps::TransferStack; 2use cordyceps::{SortedList, TransferStack};
3 3use core::future::{Future, poll_fn};
4use core::task::Poll;
4 5
5/// Atomic task queue using a very, very simple lock-free linked-list queue: 6/// Atomic task queue using a very, very simple lock-free linked-list queue:
6/// 7///
@@ -38,10 +39,142 @@ impl RunQueue {
38 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 39 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
39 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 40 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
40 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { 41 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
41 let taken = self.stack.take_all(); 42 let mut sorted = SortedList::<TaskHeader>::new(|lhs, rhs| unsafe {
42 for taskref in taken { 43 // TODO: Do we need any kind of access control here? Not if we say that
44 // tasks can only set their own priority, which they can't do if we're in
45 // the scheduler
46 lhs.deadline.get().cmp(&rhs.deadline.get())
47 });
48
49 loop {
50 // For each loop, grab any newly pended items
51 let taken = self.stack.take_all();
52
53 // Sort these into the list - this is potentially expensive! We do an
54 // insertion sort of new items, which iterates the linked list.
55 //
56 // Something on the order of `O(n * m)`, where `n` is the number
57 // of new tasks, and `m` is the number of already pending tasks.
58 sorted.extend(taken);
59
60 // Pop the task with the SOONEST deadline. If there are no tasks
61 // pending, then we are done.
62 let Some(taskref) = sorted.pop_front() else {
63 return;
64 };
65
66 // We got one task, mark it as dequeued, and process the task.
43 taskref.header().state.run_dequeue(); 67 taskref.header().state.run_dequeue();
44 on_task(taskref); 68 on_task(taskref);
45 } 69 }
46 } 70 }
47} 71}
72
73/// A type for interacting with the deadline of the current task
74pub struct Deadline {
75 /// Deadline value in ticks, same time base and ticks as `embassy-time`
76 pub instant_ticks: u64,
77}
78
79impl Deadline {
80 /// Set the current task's deadline at exactly `instant_ticks`
81 ///
82 /// This method is a future in order to access the currently executing task's
83 /// header which contains the deadline.
84 ///
85 /// Analogous to `Timer::at`.
86 ///
87 /// TODO: Should we check/panic if the deadline is in the past?
88 #[must_use = "Setting deadline must be polled to be effective"]
89 pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future<Output = ()> {
90 poll_fn(move |cx| {
91 let task = super::task_from_waker(cx.waker());
92 // SAFETY: A task can only modify its own deadline, while the task is being
93 // polled, meaning that there cannot be concurrent access to the deadline.
94 unsafe {
95 task.header().deadline.set(instant_ticks);
96 }
97 Poll::Ready(())
98 })
99 }
100
101 /// Set the current task's deadline `duration_ticks` in the future from when
102 /// this future is polled.
103 ///
104 /// This method is a future in order to access the currently executing task's
105 /// header which contains the deadline
106 ///
107 /// Analogous to `Timer::after`
108 ///
109 /// TODO: Do we want to return what the deadline is?
110 #[must_use = "Setting deadline must be polled to be effective"]
111 pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future<Output = ()> {
112 poll_fn(move |cx| {
113 let task = super::task_from_waker(cx.waker());
114 let now = embassy_time_driver::now();
115
116 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
117 // it for now, we can probably make this wrapping_add for performance
118 // reasons later.
119 let deadline = now.saturating_add(duration_ticks);
120
121 // SAFETY: A task can only modify its own deadline, while the task is being
122 // polled, meaning that there cannot be concurrent access to the deadline.
123 unsafe {
124 task.header().deadline.set(deadline);
125 }
126 Poll::Ready(())
127 })
128 }
129
130 /// Set the current task's deadline `increment_ticks` from the previous deadline.
131 ///
132 /// Note that by default (unless otherwise set), tasks start life with the deadline
133 /// u64::MAX, which means this method will have no effect.
134 ///
135 /// This method is a future in order to access the currently executing task's
136 /// header which contains the deadline
137 ///
138 /// Analogous to one increment of `Ticker::every().next()`.
139 ///
140 /// TODO: Do we want to return what the deadline is?
141 #[must_use = "Setting deadline must be polled to be effective"]
142 pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future<Output = ()> {
143 poll_fn(move |cx| {
144 let task = super::task_from_waker(cx.waker());
145
146 // SAFETY: A task can only modify its own deadline, while the task is being
147 // polled, meaning that there cannot be concurrent access to the deadline.
148 unsafe {
149 // Get the last value
150 let last = task.header().deadline.get();
151
152 // Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
153 // it for now, we can probably make this wrapping_add for performance
154 // reasons later.
155 let deadline = last.saturating_add(increment_ticks);
156
157 // Store the new value
158 task.header().deadline.set(deadline);
159 }
160 Poll::Ready(())
161 })
162 }
163
164 /// Get the current task's deadline as a tick value.
165 ///
166 /// This method is a future in order to access the currently executing task's
167 /// header which contains the deadline
168 pub fn get_current_task_deadline() -> impl Future<Output = Self> {
169 poll_fn(move |cx| {
170 let task = super::task_from_waker(cx.waker());
171
172 // SAFETY: A task can only modify its own deadline, while the task is being
173 // polled, meaning that there cannot be concurrent access to the deadline.
174 let deadline = unsafe {
175 task.header().deadline.get()
176 };
177 Poll::Ready(Self { instant_ticks: deadline })
178 })
179 }
180}