aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
-rw-r--r--embassy-executor/src/raw/mod.rs197
1 files changed, 85 insertions, 112 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 3f93eae6f..7da14468d 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -16,8 +16,7 @@ mod run_queue;
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state; 17mod state;
18 18
19#[cfg(feature = "integrated-timers")] 19pub mod timer_queue;
20mod timer_queue;
21#[cfg(feature = "trace")] 20#[cfg(feature = "trace")]
22mod trace; 21mod trace;
23pub(crate) mod util; 22pub(crate) mod util;
@@ -31,9 +30,6 @@ use core::pin::Pin;
31use core::ptr::NonNull; 30use core::ptr::NonNull;
32use core::task::{Context, Poll}; 31use core::task::{Context, Poll};
33 32
34#[cfg(feature = "integrated-timers")]
35use embassy_time_driver::AlarmHandle;
36
37use self::run_queue::{RunQueue, RunQueueItem}; 33use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 34use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 35use self::util::{SyncUnsafeCell, UninitCell};
@@ -47,14 +43,12 @@ pub(crate) struct TaskHeader {
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 43 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 44 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 45
50 #[cfg(feature = "integrated-timers")] 46 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>,
52 #[cfg(feature = "integrated-timers")]
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 47 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
54} 48}
55 49
56/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 50/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
57#[derive(Clone, Copy)] 51#[derive(Clone, Copy, PartialEq)]
58pub struct TaskRef { 52pub struct TaskRef {
59 ptr: NonNull<TaskHeader>, 53 ptr: NonNull<TaskHeader>,
60} 54}
@@ -76,10 +70,53 @@ impl TaskRef {
76 } 70 }
77 } 71 }
78 72
73 /// # Safety
74 ///
75 /// The result of this function must only be compared
76 /// for equality, or stored, but not used.
77 pub const unsafe fn dangling() -> Self {
78 Self {
79 ptr: NonNull::dangling(),
80 }
81 }
82
79 pub(crate) fn header(self) -> &'static TaskHeader { 83 pub(crate) fn header(self) -> &'static TaskHeader {
80 unsafe { self.ptr.as_ref() } 84 unsafe { self.ptr.as_ref() }
81 } 85 }
82 86
87 /// Returns a reference to the executor that the task is currently running on.
88 pub unsafe fn executor(self) -> Option<&'static Executor> {
89 self.header().executor.get().map(|e| Executor::wrap(e))
90 }
91
92 /// Returns a reference to the timer queue item.
93 pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
94 &self.header().timer_queue_item
95 }
96
97 /// Mark the task as timer-queued. Return whether it should be actually enqueued
98 /// using `_embassy_time_schedule_wake`.
99 ///
100 /// Entering this state prevents the task from being respawned while in a timer queue.
101 ///
102 /// Safety:
103 ///
104 /// This functions should only be called by the timer queue driver, before
105 /// enqueueing the timer item.
106 pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation {
107 self.header().state.timer_enqueue()
108 }
109
110 /// Unmark the task as timer-queued.
111 ///
112 /// Safety:
113 ///
114 /// This functions should only be called by the timer queue implementation, after the task has
115 /// been removed from the timer queue.
116 pub unsafe fn timer_dequeue(&self) {
117 self.header().state.timer_dequeue()
118 }
119
83 /// The returned pointer is valid for the entire TaskStorage. 120 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 121 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 122 self.ptr.as_ptr()
@@ -120,9 +157,6 @@ impl<F: Future + 'static> TaskStorage<F> {
120 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 157 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
121 poll_fn: SyncUnsafeCell::new(None), 158 poll_fn: SyncUnsafeCell::new(None),
122 159
123 #[cfg(feature = "integrated-timers")]
124 expires_at: SyncUnsafeCell::new(0),
125 #[cfg(feature = "integrated-timers")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 160 timer_queue_item: timer_queue::TimerQueueItem::new(),
127 }, 161 },
128 future: UninitCell::uninit(), 162 future: UninitCell::uninit(),
@@ -159,10 +193,25 @@ impl<F: Future + 'static> TaskStorage<F> {
159 match future.poll(&mut cx) { 193 match future.poll(&mut cx) {
160 Poll::Ready(_) => { 194 Poll::Ready(_) => {
161 this.future.drop_in_place(); 195 this.future.drop_in_place();
196
197 // Mark this task to be timer queued.
198 // We're splitting the enqueue in two parts, so that we can change task state
199 // to something that prevent re-queueing.
200 let op = this.raw.state.timer_enqueue();
201
202 // Now mark the task as not spawned, so that
203 // - it can be spawned again once it has been removed from the timer queue
204 // - it can not be timer-queued again
205 // We must do this before scheduling the wake, to prevent the task from being
206 // dequeued by the time driver while it's still SPAWNED.
162 this.raw.state.despawn(); 207 this.raw.state.despawn();
163 208
164 #[cfg(feature = "integrated-timers")] 209 // Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's
165 this.raw.expires_at.set(u64::MAX); 210 // better to be safe.
211 if op == timer_queue::TimerEnqueueOperation::Enqueue {
212 // Schedule the task in the past, so it gets dequeued ASAP.
213 unsafe { _embassy_time_schedule_wake(0, &waker) }
214 }
166 } 215 }
167 Poll::Pending => {} 216 Poll::Pending => {}
168 } 217 }
@@ -181,6 +230,10 @@ impl<F: Future + 'static> TaskStorage<F> {
181 } 230 }
182} 231}
183 232
233extern "Rust" {
234 fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker);
235}
236
184/// An uninitialized [`TaskStorage`]. 237/// An uninitialized [`TaskStorage`].
185pub struct AvailableTask<F: Future + 'static> { 238pub struct AvailableTask<F: Future + 'static> {
186 task: &'static TaskStorage<F>, 239 task: &'static TaskStorage<F>,
@@ -316,34 +369,16 @@ impl Pender {
316pub(crate) struct SyncExecutor { 369pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 370 run_queue: RunQueue,
318 pender: Pender, 371 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 372}
325 373
326impl SyncExecutor { 374impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 375 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 376 Self {
332 run_queue: RunQueue::new(), 377 run_queue: RunQueue::new(),
333 pender, 378 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 379 }
340 } 380 }
341 381
342 pub(crate) unsafe fn initialize(&'static self) {
343 #[cfg(feature = "integrated-timers")]
344 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
345 }
346
347 /// Enqueue a task in the task queue 382 /// Enqueue a task in the task queue
348 /// 383 ///
349 /// # Safety 384 /// # Safety
@@ -360,12 +395,6 @@ impl SyncExecutor {
360 } 395 }
361 } 396 }
362 397
363 #[cfg(feature = "integrated-timers")]
364 fn alarm_callback(ctx: *mut ()) {
365 let this: &Self = unsafe { &*(ctx as *const Self) };
366 this.pender.pend();
367 }
368
369 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 398 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
370 task.header().executor.set(Some(self)); 399 task.header().executor.set(Some(self));
371 400
@@ -379,56 +408,27 @@ impl SyncExecutor {
379 /// 408 ///
380 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 409 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
381 pub(crate) unsafe fn poll(&'static self) { 410 pub(crate) unsafe fn poll(&'static self) {
382 #[allow(clippy::never_loop)] 411 self.run_queue.dequeue_all(|p| {
383 loop { 412 let task = p.header();
384 #[cfg(feature = "integrated-timers")] 413
385 self.timer_queue 414 if !task.state.run_dequeue() {
386 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); 415 // If task is not running, ignore it. This can happen in the following scenario:
387 416 // - Task gets dequeued, poll starts
388 self.run_queue.dequeue_all(|p| { 417 // - While task is being polled, it gets woken. It gets placed in the queue.
389 let task = p.header(); 418 // - Task poll finishes, returning done=true
390 419 // - RUNNING bit is cleared, but the task is already in the queue.
391 #[cfg(feature = "integrated-timers")] 420 return;
392 task.expires_at.set(u64::MAX); 421 }
393
394 if !task.state.run_dequeue() {
395 // If task is not running, ignore it. This can happen in the following scenario:
396 // - Task gets dequeued, poll starts
397 // - While task is being polled, it gets woken. It gets placed in the queue.
398 // - Task poll finishes, returning done=true
399 // - RUNNING bit is cleared, but the task is already in the queue.
400 return;
401 }
402
403 #[cfg(feature = "trace")]
404 trace::task_exec_begin(self, &p);
405 422
406 // Run the task 423 #[cfg(feature = "trace")]
407 task.poll_fn.get().unwrap_unchecked()(p); 424 trace::task_exec_begin(self, &p);
408 425
409 #[cfg(feature = "trace")] 426 // Run the task
410 trace::task_exec_end(self, &p); 427 task.poll_fn.get().unwrap_unchecked()(p);
411 428
412 // Enqueue or update into timer_queue 429 #[cfg(feature = "trace")]
413 #[cfg(feature = "integrated-timers")] 430 trace::task_exec_end(self, &p);
414 self.timer_queue.update(p); 431 });
415 });
416
417 #[cfg(feature = "integrated-timers")]
418 {
419 // If this is already in the past, set_alarm might return false
420 // In that case do another poll loop iteration.
421 let next_expiration = self.timer_queue.next_expiration();
422 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
423 break;
424 }
425 }
426
427 #[cfg(not(feature = "integrated-timers"))]
428 {
429 break;
430 }
431 }
432 432
433 #[cfg(feature = "trace")] 433 #[cfg(feature = "trace")]
434 trace::executor_idle(self) 434 trace::executor_idle(self)
@@ -494,15 +494,6 @@ impl Executor {
494 } 494 }
495 } 495 }
496 496
497 /// Initializes the executor.
498 ///
499 /// # Safety
500 ///
501 /// This function must be called once before any other method is called.
502 pub unsafe fn initialize(&'static self) {
503 self.inner.initialize();
504 }
505
506 /// Spawn a task in this executor. 497 /// Spawn a task in this executor.
507 /// 498 ///
508 /// # Safety 499 /// # Safety
@@ -575,21 +566,3 @@ pub fn wake_task_no_pend(task: TaskRef) {
575 } 566 }
576 } 567 }
577} 568}
578
579#[cfg(feature = "integrated-timers")]
580struct TimerQueue;
581
582#[cfg(feature = "integrated-timers")]
583impl embassy_time_queue_driver::TimerQueue for TimerQueue {
584 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
585 let task = waker::task_from_waker(waker);
586 let task = task.header();
587 unsafe {
588 let expires_at = task.expires_at.get();
589 task.expires_at.set(expires_at.min(at));
590 }
591 }
592}
593
594#[cfg(feature = "integrated-timers")]
595embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);