aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw')
-rw-r--r--embassy-executor/src/raw/mod.rs135
-rw-r--r--embassy-executor/src/raw/timer_queue.rs89
-rw-r--r--embassy-executor/src/raw/util.rs5
3 files changed, 88 insertions, 141 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 3f93eae6f..80bd49bad 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -17,7 +17,7 @@ mod run_queue;
17mod state; 17mod state;
18 18
19#[cfg(feature = "integrated-timers")] 19#[cfg(feature = "integrated-timers")]
20mod timer_queue; 20pub mod timer_queue;
21#[cfg(feature = "trace")] 21#[cfg(feature = "trace")]
22mod trace; 22mod trace;
23pub(crate) mod util; 23pub(crate) mod util;
@@ -31,9 +31,6 @@ use core::pin::Pin;
31use core::ptr::NonNull; 31use core::ptr::NonNull;
32use core::task::{Context, Poll}; 32use core::task::{Context, Poll};
33 33
34#[cfg(feature = "integrated-timers")]
35use embassy_time_driver::AlarmHandle;
36
37use self::run_queue::{RunQueue, RunQueueItem}; 34use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 35use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 36use self::util::{SyncUnsafeCell, UninitCell};
@@ -47,8 +44,7 @@ pub(crate) struct TaskHeader {
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 44 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 45 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 46
50 #[cfg(feature = "integrated-timers")] 47 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>,
52 #[cfg(feature = "integrated-timers")] 48 #[cfg(feature = "integrated-timers")]
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 49 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
54} 50}
@@ -80,6 +76,12 @@ impl TaskRef {
80 unsafe { self.ptr.as_ref() } 76 unsafe { self.ptr.as_ref() }
81 } 77 }
82 78
79 /// Returns a reference to the executor that the task is currently running on.
80 #[cfg(feature = "integrated-timers")]
81 pub unsafe fn executor(self) -> Option<&'static Executor> {
82 self.header().executor.get().map(|e| Executor::wrap(e))
83 }
84
83 /// The returned pointer is valid for the entire TaskStorage. 85 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 86 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 87 self.ptr.as_ptr()
@@ -121,8 +123,6 @@ impl<F: Future + 'static> TaskStorage<F> {
121 poll_fn: SyncUnsafeCell::new(None), 123 poll_fn: SyncUnsafeCell::new(None),
122 124
123 #[cfg(feature = "integrated-timers")] 125 #[cfg(feature = "integrated-timers")]
124 expires_at: SyncUnsafeCell::new(0),
125 #[cfg(feature = "integrated-timers")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 126 timer_queue_item: timer_queue::TimerQueueItem::new(),
127 }, 127 },
128 future: UninitCell::uninit(), 128 future: UninitCell::uninit(),
@@ -160,9 +160,6 @@ impl<F: Future + 'static> TaskStorage<F> {
160 Poll::Ready(_) => { 160 Poll::Ready(_) => {
161 this.future.drop_in_place(); 161 this.future.drop_in_place();
162 this.raw.state.despawn(); 162 this.raw.state.despawn();
163
164 #[cfg(feature = "integrated-timers")]
165 this.raw.expires_at.set(u64::MAX);
166 } 163 }
167 Poll::Pending => {} 164 Poll::Pending => {}
168 } 165 }
@@ -316,34 +313,16 @@ impl Pender {
316pub(crate) struct SyncExecutor { 313pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 314 run_queue: RunQueue,
318 pender: Pender, 315 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 316}
325 317
326impl SyncExecutor { 318impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 319 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 320 Self {
332 run_queue: RunQueue::new(), 321 run_queue: RunQueue::new(),
333 pender, 322 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 323 }
340 } 324 }
341 325
342 pub(crate) unsafe fn initialize(&'static self) {
343 #[cfg(feature = "integrated-timers")]
344 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
345 }
346
347 /// Enqueue a task in the task queue 326 /// Enqueue a task in the task queue
348 /// 327 ///
349 /// # Safety 328 /// # Safety
@@ -360,12 +339,6 @@ impl SyncExecutor {
360 } 339 }
361 } 340 }
362 341
363 #[cfg(feature = "integrated-timers")]
364 fn alarm_callback(ctx: *mut ()) {
365 let this: &Self = unsafe { &*(ctx as *const Self) };
366 this.pender.pend();
367 }
368
369 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 342 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
370 task.header().executor.set(Some(self)); 343 task.header().executor.set(Some(self));
371 344
@@ -379,56 +352,27 @@ impl SyncExecutor {
379 /// 352 ///
380 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 353 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
381 pub(crate) unsafe fn poll(&'static self) { 354 pub(crate) unsafe fn poll(&'static self) {
382 #[allow(clippy::never_loop)] 355 self.run_queue.dequeue_all(|p| {
383 loop { 356 let task = p.header();
384 #[cfg(feature = "integrated-timers")] 357
385 self.timer_queue 358 if !task.state.run_dequeue() {
386 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); 359 // If task is not running, ignore it. This can happen in the following scenario:
387 360 // - Task gets dequeued, poll starts
388 self.run_queue.dequeue_all(|p| { 361 // - While task is being polled, it gets woken. It gets placed in the queue.
389 let task = p.header(); 362 // - Task poll finishes, returning done=true
390 363 // - RUNNING bit is cleared, but the task is already in the queue.
391 #[cfg(feature = "integrated-timers")] 364 return;
392 task.expires_at.set(u64::MAX); 365 }
393
394 if !task.state.run_dequeue() {
395 // If task is not running, ignore it. This can happen in the following scenario:
396 // - Task gets dequeued, poll starts
397 // - While task is being polled, it gets woken. It gets placed in the queue.
398 // - Task poll finishes, returning done=true
399 // - RUNNING bit is cleared, but the task is already in the queue.
400 return;
401 }
402
403 #[cfg(feature = "trace")]
404 trace::task_exec_begin(self, &p);
405 366
406 // Run the task 367 #[cfg(feature = "trace")]
407 task.poll_fn.get().unwrap_unchecked()(p); 368 trace::task_exec_begin(self, &p);
408 369
409 #[cfg(feature = "trace")] 370 // Run the task
410 trace::task_exec_end(self, &p); 371 task.poll_fn.get().unwrap_unchecked()(p);
411 372
412 // Enqueue or update into timer_queue 373 #[cfg(feature = "trace")]
413 #[cfg(feature = "integrated-timers")] 374 trace::task_exec_end(self, &p);
414 self.timer_queue.update(p); 375 });
415 });
416
417 #[cfg(feature = "integrated-timers")]
418 {
419 // If this is already in the past, set_alarm might return false
420 // In that case do another poll loop iteration.
421 let next_expiration = self.timer_queue.next_expiration();
422 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
423 break;
424 }
425 }
426
427 #[cfg(not(feature = "integrated-timers"))]
428 {
429 break;
430 }
431 }
432 376
433 #[cfg(feature = "trace")] 377 #[cfg(feature = "trace")]
434 trace::executor_idle(self) 378 trace::executor_idle(self)
@@ -494,15 +438,6 @@ impl Executor {
494 } 438 }
495 } 439 }
496 440
497 /// Initializes the executor.
498 ///
499 /// # Safety
500 ///
501 /// This function must be called once before any other method is called.
502 pub unsafe fn initialize(&'static self) {
503 self.inner.initialize();
504 }
505
506 /// Spawn a task in this executor. 441 /// Spawn a task in this executor.
507 /// 442 ///
508 /// # Safety 443 /// # Safety
@@ -575,21 +510,3 @@ pub fn wake_task_no_pend(task: TaskRef) {
575 } 510 }
576 } 511 }
577} 512}
578
579#[cfg(feature = "integrated-timers")]
580struct TimerQueue;
581
582#[cfg(feature = "integrated-timers")]
583impl embassy_time_queue_driver::TimerQueue for TimerQueue {
584 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
585 let task = waker::task_from_waker(waker);
586 let task = task.header();
587 unsafe {
588 let expires_at = task.expires_at.get();
589 task.expires_at.set(expires_at.min(at));
590 }
591 }
592}
593
594#[cfg(feature = "integrated-timers")]
595embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 94a5f340b..953bf014f 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,75 +1,100 @@
1//! Timer queue operations.
1use core::cmp::min; 2use core::cmp::min;
2 3
4use super::util::SyncUnsafeCell;
3use super::TaskRef; 5use super::TaskRef;
4use crate::raw::util::SyncUnsafeCell;
5 6
6pub(crate) struct TimerQueueItem { 7pub(crate) struct TimerQueueItem {
7 next: SyncUnsafeCell<Option<TaskRef>>, 8 next: SyncUnsafeCell<Option<TaskRef>>,
9 expires_at: SyncUnsafeCell<u64>,
8} 10}
9 11
10impl TimerQueueItem { 12impl TimerQueueItem {
11 pub const fn new() -> Self { 13 pub const fn new() -> Self {
12 Self { 14 Self {
13 next: SyncUnsafeCell::new(None), 15 next: SyncUnsafeCell::new(None),
16 expires_at: SyncUnsafeCell::new(0),
14 } 17 }
15 } 18 }
16} 19}
17 20
18pub(crate) struct TimerQueue { 21/// A timer queue, with items integrated into tasks.
22pub struct TimerQueue {
19 head: SyncUnsafeCell<Option<TaskRef>>, 23 head: SyncUnsafeCell<Option<TaskRef>>,
20} 24}
21 25
22impl TimerQueue { 26impl TimerQueue {
27 /// Creates a new timer queue.
23 pub const fn new() -> Self { 28 pub const fn new() -> Self {
24 Self { 29 Self {
25 head: SyncUnsafeCell::new(None), 30 head: SyncUnsafeCell::new(None),
26 } 31 }
27 } 32 }
28 33
29 pub(crate) unsafe fn update(&self, p: TaskRef) { 34 /// Schedules a task to run at a specific time.
30 let task = p.header(); 35 ///
31 if task.expires_at.get() != u64::MAX { 36 /// If this function returns `true`, the called should find the next expiration time and set
37 /// a new alarm for that time.
38 pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
39 unsafe {
40 let task = p.header();
41 let item = &task.timer_queue_item;
32 if task.state.timer_enqueue() { 42 if task.state.timer_enqueue() {
33 task.timer_queue_item.next.set(self.head.get()); 43 // If not in the queue, add it and update.
34 self.head.set(Some(p)); 44 let prev = self.head.replace(Some(p));
45 item.next.set(prev);
46 } else if at <= item.expires_at.get() {
47 // If expiration is sooner than previously set, update.
48 } else {
49 // Task does not need to be updated.
50 return false;
35 } 51 }
52
53 item.expires_at.set(at);
54 true
36 } 55 }
37 } 56 }
38 57
39 pub(crate) unsafe fn next_expiration(&self) -> u64 { 58 /// Dequeues expired timers and returns the next alarm time.
40 let mut res = u64::MAX; 59 ///
41 self.retain(|p| { 60 /// The provided callback will be called for each expired task. Tasks that never expire
42 let task = p.header(); 61 /// will be removed, but the callback will not be called.
43 let expires = task.expires_at.get(); 62 pub fn next_expiration(&mut self, now: u64) -> u64 {
44 res = min(res, expires); 63 let mut next_expiration = u64::MAX;
45 expires != u64::MAX
46 });
47 res
48 }
49 64
50 pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) {
51 self.retain(|p| { 65 self.retain(|p| {
52 let task = p.header(); 66 let task = p.header();
53 if task.expires_at.get() <= now { 67 let item = &task.timer_queue_item;
54 on_task(p); 68 let expires = unsafe { item.expires_at.get() };
69
70 if expires <= now {
71 // Timer expired, process task.
72 super::wake_task(p);
55 false 73 false
56 } else { 74 } else {
57 true 75 // Timer didn't yet expire, or never expires.
76 next_expiration = min(next_expiration, expires);
77 expires != u64::MAX
58 } 78 }
59 }); 79 });
80
81 next_expiration
60 } 82 }
61 83
62 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { 84 fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
63 let mut prev = &self.head; 85 unsafe {
64 while let Some(p) = prev.get() { 86 let mut prev = &self.head;
65 let task = p.header(); 87 while let Some(p) = prev.get() {
66 if f(p) { 88 let task = p.header();
67 // Skip to next 89 let item = &task.timer_queue_item;
68 prev = &task.timer_queue_item.next; 90 if f(p) {
69 } else { 91 // Skip to next
70 // Remove it 92 prev = &item.next;
71 prev.set(task.timer_queue_item.next.get()); 93 } else {
72 task.state.timer_dequeue(); 94 // Remove it
95 prev.set(item.next.get());
96 task.state.timer_dequeue();
97 }
73 } 98 }
74 } 99 }
75 } 100 }
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index c46085e45..e2633658a 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -54,4 +54,9 @@ impl<T> SyncUnsafeCell<T> {
54 { 54 {
55 *self.value.get() 55 *self.value.get()
56 } 56 }
57
58 #[cfg(feature = "integrated-timers")]
59 pub unsafe fn replace(&self, value: T) -> T {
60 core::mem::replace(&mut *self.value.get(), value)
61 }
57} 62}