aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
-rw-r--r--embassy-executor/src/raw/mod.rs113
1 files changed, 68 insertions, 45 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 88d839e07..9f36c60bc 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,22 +7,28 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue; 10mod run_queue;
13 11
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] 12#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] 13#[cfg_attr(
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 14 all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
15 path = "state_atomics.rs"
16)]
17#[cfg_attr(
18 not(any(target_has_atomic = "8", target_has_atomic = "32")),
19 path = "state_critical_section.rs"
20)]
17mod state; 21mod state;
18 22
19pub mod timer_queue; 23#[cfg(feature = "_any_trace")]
20#[cfg(feature = "trace")] 24pub mod trace;
21mod trace;
22pub(crate) mod util; 25pub(crate) mod util;
23#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] 26#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
24mod waker; 27mod waker;
25 28
29#[cfg(feature = "scheduler-deadline")]
30mod deadline;
31
26use core::future::Future; 32use core::future::Future;
27use core::marker::PhantomData; 33use core::marker::PhantomData;
28use core::mem; 34use core::mem;
@@ -31,8 +37,11 @@ use core::ptr::NonNull;
31#[cfg(not(feature = "arch-avr"))] 37#[cfg(not(feature = "arch-avr"))]
32use core::sync::atomic::AtomicPtr; 38use core::sync::atomic::AtomicPtr;
33use core::sync::atomic::Ordering; 39use core::sync::atomic::Ordering;
34use core::task::{Context, Poll}; 40use core::task::{Context, Poll, Waker};
35 41
42#[cfg(feature = "scheduler-deadline")]
43pub(crate) use deadline::Deadline;
44use embassy_executor_timer_queue::TimerQueueItem;
36#[cfg(feature = "arch-avr")] 45#[cfg(feature = "arch-avr")]
37use portable_atomic::AtomicPtr; 46use portable_atomic::AtomicPtr;
38 47
@@ -41,6 +50,12 @@ use self::state::State;
41use self::util::{SyncUnsafeCell, UninitCell}; 50use self::util::{SyncUnsafeCell, UninitCell};
42pub use self::waker::task_from_waker; 51pub use self::waker::task_from_waker;
43use super::SpawnToken; 52use super::SpawnToken;
53use crate::{Metadata, SpawnError};
54
55#[no_mangle]
56extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
57 unsafe { task_from_waker(waker).timer_queue_item() }
58}
44 59
45/// Raw task header for use in task pointers. 60/// Raw task header for use in task pointers.
46/// 61///
@@ -84,15 +99,21 @@ use super::SpawnToken;
84pub(crate) struct TaskHeader { 99pub(crate) struct TaskHeader {
85 pub(crate) state: State, 100 pub(crate) state: State,
86 pub(crate) run_queue_item: RunQueueItem, 101 pub(crate) run_queue_item: RunQueueItem,
102
87 pub(crate) executor: AtomicPtr<SyncExecutor>, 103 pub(crate) executor: AtomicPtr<SyncExecutor>,
88 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 104 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
89 105
90 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. 106 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
91 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 107 pub(crate) timer_queue_item: TimerQueueItem,
108
109 pub(crate) metadata: Metadata,
110
111 #[cfg(feature = "rtos-trace")]
112 all_tasks_next: AtomicPtr<TaskHeader>,
92} 113}
93 114
94/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 115/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
95#[derive(Clone, Copy, PartialEq)] 116#[derive(Debug, Clone, Copy, PartialEq)]
96pub struct TaskRef { 117pub struct TaskRef {
97 ptr: NonNull<TaskHeader>, 118 ptr: NonNull<TaskHeader>,
98} 119}
@@ -114,29 +135,27 @@ impl TaskRef {
114 } 135 }
115 } 136 }
116 137
117 /// # Safety
118 ///
119 /// The result of this function must only be compared
120 /// for equality, or stored, but not used.
121 pub const unsafe fn dangling() -> Self {
122 Self {
123 ptr: NonNull::dangling(),
124 }
125 }
126
127 pub(crate) fn header(self) -> &'static TaskHeader { 138 pub(crate) fn header(self) -> &'static TaskHeader {
128 unsafe { self.ptr.as_ref() } 139 unsafe { self.ptr.as_ref() }
129 } 140 }
130 141
142 pub(crate) fn metadata(self) -> &'static Metadata {
143 unsafe { &self.ptr.as_ref().metadata }
144 }
145
131 /// Returns a reference to the executor that the task is currently running on. 146 /// Returns a reference to the executor that the task is currently running on.
132 pub unsafe fn executor(self) -> Option<&'static Executor> { 147 pub unsafe fn executor(self) -> Option<&'static Executor> {
133 let executor = self.header().executor.load(Ordering::Relaxed); 148 let executor = self.header().executor.load(Ordering::Relaxed);
134 executor.as_ref().map(|e| Executor::wrap(e)) 149 executor.as_ref().map(|e| Executor::wrap(e))
135 } 150 }
136 151
137 /// Returns a reference to the timer queue item. 152 /// Returns a mutable reference to the timer queue item.
138 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem { 153 ///
139 &self.header().timer_queue_item 154 /// Safety
155 ///
156 /// This function must only be called in the context of the integrated timer queue.
157 pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
158 unsafe { &mut self.ptr.as_mut().timer_queue_item }
140 } 159 }
141 160
142 /// The returned pointer is valid for the entire TaskStorage. 161 /// The returned pointer is valid for the entire TaskStorage.
@@ -144,10 +163,10 @@ impl TaskRef {
144 self.ptr.as_ptr() 163 self.ptr.as_ptr()
145 } 164 }
146 165
147 /// Get the ID for a task 166 /// Returns the task ID.
148 #[cfg(feature = "trace")] 167 /// This can be used in combination with rtos-trace to match task names with IDs
149 pub fn as_id(self) -> u32 { 168 pub fn id(&self) -> u32 {
150 self.ptr.as_ptr() as u32 169 self.as_ptr() as u32
151 } 170 }
152} 171}
153 172
@@ -189,7 +208,10 @@ impl<F: Future + 'static> TaskStorage<F> {
189 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 208 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
190 poll_fn: SyncUnsafeCell::new(None), 209 poll_fn: SyncUnsafeCell::new(None),
191 210
192 timer_queue_item: timer_queue::TimerQueueItem::new(), 211 timer_queue_item: TimerQueueItem::new(),
212 metadata: Metadata::new(),
213 #[cfg(feature = "rtos-trace")]
214 all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
193 }, 215 },
194 future: UninitCell::uninit(), 216 future: UninitCell::uninit(),
195 } 217 }
@@ -208,11 +230,11 @@ impl<F: Future + 'static> TaskStorage<F> {
208 /// 230 ///
209 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 231 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
210 /// on a different executor. 232 /// on a different executor.
211 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
212 let task = AvailableTask::claim(self); 234 let task = AvailableTask::claim(self);
213 match task { 235 match task {
214 Some(task) => task.initialize(future), 236 Some(task) => Ok(task.initialize(future)),
215 None => SpawnToken::new_failed(), 237 None => Err(SpawnError::Busy),
216 } 238 }
217 } 239 }
218 240
@@ -224,7 +246,7 @@ impl<F: Future + 'static> TaskStorage<F> {
224 let mut cx = Context::from_waker(&waker); 246 let mut cx = Context::from_waker(&waker);
225 match future.poll(&mut cx) { 247 match future.poll(&mut cx) {
226 Poll::Ready(_) => { 248 Poll::Ready(_) => {
227 #[cfg(feature = "trace")] 249 #[cfg(feature = "_any_trace")]
228 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed); 250 let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
229 251
230 // As the future has finished and this function will not be called 252 // As the future has finished and this function will not be called
@@ -239,7 +261,7 @@ impl<F: Future + 'static> TaskStorage<F> {
239 // after we're done with it. 261 // after we're done with it.
240 this.raw.state.despawn(); 262 this.raw.state.despawn();
241 263
242 #[cfg(feature = "trace")] 264 #[cfg(feature = "_any_trace")]
243 trace::task_end(exec_ptr, &p); 265 trace::task_end(exec_ptr, &p);
244 } 266 }
245 Poll::Pending => {} 267 Poll::Pending => {}
@@ -274,6 +296,7 @@ impl<F: Future + 'static> AvailableTask<F> {
274 296
275 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { 297 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
276 unsafe { 298 unsafe {
299 self.task.raw.metadata.reset();
277 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 300 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
278 self.task.future.write_in_place(future); 301 self.task.future.write_in_place(future);
279 302
@@ -340,10 +363,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
340 } 363 }
341 } 364 }
342 365
343 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> { 366 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<T>, SpawnError> {
344 match self.pool.iter().find_map(AvailableTask::claim) { 367 match self.pool.iter().find_map(AvailableTask::claim) {
345 Some(task) => task.initialize_impl::<T>(future), 368 Some(task) => Ok(task.initialize_impl::<T>(future)),
346 None => SpawnToken::new_failed(), 369 None => Err(SpawnError::Busy),
347 } 370 }
348 } 371 }
349 372
@@ -354,7 +377,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
354 /// This will loop over the pool and spawn the task in the first storage that 377 /// This will loop over the pool and spawn the task in the first storage that
355 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 378 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
356 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 379 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
357 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 380 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> Result<SpawnToken<impl Sized>, SpawnError> {
358 self.spawn_impl::<F>(future) 381 self.spawn_impl::<F>(future)
359 } 382 }
360 383
@@ -367,7 +390,7 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
367 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` 390 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
368 /// is an `async fn`, NOT a hand-written `Future`. 391 /// is an `async fn`, NOT a hand-written `Future`.
369 #[doc(hidden)] 392 #[doc(hidden)]
370 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> 393 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> Result<SpawnToken<impl Sized>, SpawnError>
371 where 394 where
372 FutFn: FnOnce() -> F, 395 FutFn: FnOnce() -> F,
373 { 396 {
@@ -412,7 +435,7 @@ impl SyncExecutor {
412 /// - `task` must NOT be already enqueued (in this executor or another one). 435 /// - `task` must NOT be already enqueued (in this executor or another one).
413 #[inline(always)] 436 #[inline(always)]
414 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) { 437 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
415 #[cfg(feature = "trace")] 438 #[cfg(feature = "_any_trace")]
416 trace::task_ready_begin(self, &task); 439 trace::task_ready_begin(self, &task);
417 440
418 if self.run_queue.enqueue(task, l) { 441 if self.run_queue.enqueue(task, l) {
@@ -425,7 +448,7 @@ impl SyncExecutor {
425 .executor 448 .executor
426 .store((self as *const Self).cast_mut(), Ordering::Relaxed); 449 .store((self as *const Self).cast_mut(), Ordering::Relaxed);
427 450
428 #[cfg(feature = "trace")] 451 #[cfg(feature = "_any_trace")]
429 trace::task_new(self, &task); 452 trace::task_new(self, &task);
430 453
431 state::locked(|l| { 454 state::locked(|l| {
@@ -437,23 +460,23 @@ impl SyncExecutor {
437 /// 460 ///
438 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 461 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
439 pub(crate) unsafe fn poll(&'static self) { 462 pub(crate) unsafe fn poll(&'static self) {
440 #[cfg(feature = "trace")] 463 #[cfg(feature = "_any_trace")]
441 trace::poll_start(self); 464 trace::poll_start(self);
442 465
443 self.run_queue.dequeue_all(|p| { 466 self.run_queue.dequeue_all(|p| {
444 let task = p.header(); 467 let task = p.header();
445 468
446 #[cfg(feature = "trace")] 469 #[cfg(feature = "_any_trace")]
447 trace::task_exec_begin(self, &p); 470 trace::task_exec_begin(self, &p);
448 471
449 // Run the task 472 // Run the task
450 task.poll_fn.get().unwrap_unchecked()(p); 473 task.poll_fn.get().unwrap_unchecked()(p);
451 474
452 #[cfg(feature = "trace")] 475 #[cfg(feature = "_any_trace")]
453 trace::task_exec_end(self, &p); 476 trace::task_exec_end(self, &p);
454 }); 477 });
455 478
456 #[cfg(feature = "trace")] 479 #[cfg(feature = "_any_trace")]
457 trace::executor_idle(self) 480 trace::executor_idle(self)
458 } 481 }
459} 482}