aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/mod.rs
diff options
context:
space:
mode:
authorDániel Buga <[email protected]>2024-11-26 23:54:21 +0100
committerDániel Buga <[email protected]>2024-12-10 21:31:42 +0100
commit5a5495aac43d75610735f2ca80fb6c8e8f31ed71 (patch)
tree7a4336917894730692589359e9d1a285ec5a0a05 /embassy-executor/src/raw/mod.rs
parent406d377b7564d16e12b7fae4f42c0c709bf4f243 (diff)
Refactor integrated-timers
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
-rw-r--r--embassy-executor/src/raw/mod.rs135
1 files changed, 26 insertions, 109 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 3f93eae6f..80bd49bad 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -17,7 +17,7 @@ mod run_queue;
17mod state; 17mod state;
18 18
19#[cfg(feature = "integrated-timers")] 19#[cfg(feature = "integrated-timers")]
20mod timer_queue; 20pub mod timer_queue;
21#[cfg(feature = "trace")] 21#[cfg(feature = "trace")]
22mod trace; 22mod trace;
23pub(crate) mod util; 23pub(crate) mod util;
@@ -31,9 +31,6 @@ use core::pin::Pin;
31use core::ptr::NonNull; 31use core::ptr::NonNull;
32use core::task::{Context, Poll}; 32use core::task::{Context, Poll};
33 33
34#[cfg(feature = "integrated-timers")]
35use embassy_time_driver::AlarmHandle;
36
37use self::run_queue::{RunQueue, RunQueueItem}; 34use self::run_queue::{RunQueue, RunQueueItem};
38use self::state::State; 35use self::state::State;
39use self::util::{SyncUnsafeCell, UninitCell}; 36use self::util::{SyncUnsafeCell, UninitCell};
@@ -47,8 +44,7 @@ pub(crate) struct TaskHeader {
47 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 44 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
48 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 45 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
49 46
50 #[cfg(feature = "integrated-timers")] 47 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
51 pub(crate) expires_at: SyncUnsafeCell<u64>,
52 #[cfg(feature = "integrated-timers")] 48 #[cfg(feature = "integrated-timers")]
53 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 49 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
54} 50}
@@ -80,6 +76,12 @@ impl TaskRef {
80 unsafe { self.ptr.as_ref() } 76 unsafe { self.ptr.as_ref() }
81 } 77 }
82 78
79 /// Returns a reference to the executor that the task is currently running on.
80 #[cfg(feature = "integrated-timers")]
81 pub unsafe fn executor(self) -> Option<&'static Executor> {
82 self.header().executor.get().map(|e| Executor::wrap(e))
83 }
84
83 /// The returned pointer is valid for the entire TaskStorage. 85 /// The returned pointer is valid for the entire TaskStorage.
84 pub(crate) fn as_ptr(self) -> *const TaskHeader { 86 pub(crate) fn as_ptr(self) -> *const TaskHeader {
85 self.ptr.as_ptr() 87 self.ptr.as_ptr()
@@ -121,8 +123,6 @@ impl<F: Future + 'static> TaskStorage<F> {
121 poll_fn: SyncUnsafeCell::new(None), 123 poll_fn: SyncUnsafeCell::new(None),
122 124
123 #[cfg(feature = "integrated-timers")] 125 #[cfg(feature = "integrated-timers")]
124 expires_at: SyncUnsafeCell::new(0),
125 #[cfg(feature = "integrated-timers")]
126 timer_queue_item: timer_queue::TimerQueueItem::new(), 126 timer_queue_item: timer_queue::TimerQueueItem::new(),
127 }, 127 },
128 future: UninitCell::uninit(), 128 future: UninitCell::uninit(),
@@ -160,9 +160,6 @@ impl<F: Future + 'static> TaskStorage<F> {
160 Poll::Ready(_) => { 160 Poll::Ready(_) => {
161 this.future.drop_in_place(); 161 this.future.drop_in_place();
162 this.raw.state.despawn(); 162 this.raw.state.despawn();
163
164 #[cfg(feature = "integrated-timers")]
165 this.raw.expires_at.set(u64::MAX);
166 } 163 }
167 Poll::Pending => {} 164 Poll::Pending => {}
168 } 165 }
@@ -316,34 +313,16 @@ impl Pender {
316pub(crate) struct SyncExecutor { 313pub(crate) struct SyncExecutor {
317 run_queue: RunQueue, 314 run_queue: RunQueue,
318 pender: Pender, 315 pender: Pender,
319
320 #[cfg(feature = "integrated-timers")]
321 pub(crate) timer_queue: timer_queue::TimerQueue,
322 #[cfg(feature = "integrated-timers")]
323 alarm: AlarmHandle,
324} 316}
325 317
326impl SyncExecutor { 318impl SyncExecutor {
327 pub(crate) fn new(pender: Pender) -> Self { 319 pub(crate) fn new(pender: Pender) -> Self {
328 #[cfg(feature = "integrated-timers")]
329 let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };
330
331 Self { 320 Self {
332 run_queue: RunQueue::new(), 321 run_queue: RunQueue::new(),
333 pender, 322 pender,
334
335 #[cfg(feature = "integrated-timers")]
336 timer_queue: timer_queue::TimerQueue::new(),
337 #[cfg(feature = "integrated-timers")]
338 alarm,
339 } 323 }
340 } 324 }
341 325
342 pub(crate) unsafe fn initialize(&'static self) {
343 #[cfg(feature = "integrated-timers")]
344 embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
345 }
346
347 /// Enqueue a task in the task queue 326 /// Enqueue a task in the task queue
348 /// 327 ///
349 /// # Safety 328 /// # Safety
@@ -360,12 +339,6 @@ impl SyncExecutor {
360 } 339 }
361 } 340 }
362 341
363 #[cfg(feature = "integrated-timers")]
364 fn alarm_callback(ctx: *mut ()) {
365 let this: &Self = unsafe { &*(ctx as *const Self) };
366 this.pender.pend();
367 }
368
369 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 342 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
370 task.header().executor.set(Some(self)); 343 task.header().executor.set(Some(self));
371 344
@@ -379,56 +352,27 @@ impl SyncExecutor {
379 /// 352 ///
380 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 353 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
381 pub(crate) unsafe fn poll(&'static self) { 354 pub(crate) unsafe fn poll(&'static self) {
382 #[allow(clippy::never_loop)] 355 self.run_queue.dequeue_all(|p| {
383 loop { 356 let task = p.header();
384 #[cfg(feature = "integrated-timers")] 357
385 self.timer_queue 358 if !task.state.run_dequeue() {
386 .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); 359 // If task is not running, ignore it. This can happen in the following scenario:
387 360 // - Task gets dequeued, poll starts
388 self.run_queue.dequeue_all(|p| { 361 // - While task is being polled, it gets woken. It gets placed in the queue.
389 let task = p.header(); 362 // - Task poll finishes, returning done=true
390 363 // - RUNNING bit is cleared, but the task is already in the queue.
391 #[cfg(feature = "integrated-timers")] 364 return;
392 task.expires_at.set(u64::MAX); 365 }
393
394 if !task.state.run_dequeue() {
395 // If task is not running, ignore it. This can happen in the following scenario:
396 // - Task gets dequeued, poll starts
397 // - While task is being polled, it gets woken. It gets placed in the queue.
398 // - Task poll finishes, returning done=true
399 // - RUNNING bit is cleared, but the task is already in the queue.
400 return;
401 }
402
403 #[cfg(feature = "trace")]
404 trace::task_exec_begin(self, &p);
405 366
406 // Run the task 367 #[cfg(feature = "trace")]
407 task.poll_fn.get().unwrap_unchecked()(p); 368 trace::task_exec_begin(self, &p);
408 369
409 #[cfg(feature = "trace")] 370 // Run the task
410 trace::task_exec_end(self, &p); 371 task.poll_fn.get().unwrap_unchecked()(p);
411 372
412 // Enqueue or update into timer_queue 373 #[cfg(feature = "trace")]
413 #[cfg(feature = "integrated-timers")] 374 trace::task_exec_end(self, &p);
414 self.timer_queue.update(p); 375 });
415 });
416
417 #[cfg(feature = "integrated-timers")]
418 {
419 // If this is already in the past, set_alarm might return false
420 // In that case do another poll loop iteration.
421 let next_expiration = self.timer_queue.next_expiration();
422 if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
423 break;
424 }
425 }
426
427 #[cfg(not(feature = "integrated-timers"))]
428 {
429 break;
430 }
431 }
432 376
433 #[cfg(feature = "trace")] 377 #[cfg(feature = "trace")]
434 trace::executor_idle(self) 378 trace::executor_idle(self)
@@ -494,15 +438,6 @@ impl Executor {
494 } 438 }
495 } 439 }
496 440
497 /// Initializes the executor.
498 ///
499 /// # Safety
500 ///
501 /// This function must be called once before any other method is called.
502 pub unsafe fn initialize(&'static self) {
503 self.inner.initialize();
504 }
505
506 /// Spawn a task in this executor. 441 /// Spawn a task in this executor.
507 /// 442 ///
508 /// # Safety 443 /// # Safety
@@ -575,21 +510,3 @@ pub fn wake_task_no_pend(task: TaskRef) {
575 } 510 }
576 } 511 }
577} 512}
578
579#[cfg(feature = "integrated-timers")]
580struct TimerQueue;
581
582#[cfg(feature = "integrated-timers")]
583impl embassy_time_queue_driver::TimerQueue for TimerQueue {
584 fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
585 let task = waker::task_from_waker(waker);
586 let task = task.header();
587 unsafe {
588 let expires_at = task.expires_at.get();
589 task.expires_at.set(expires_at.min(at));
590 }
591 }
592}
593
594#[cfg(feature = "integrated-timers")]
595embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);