diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2022-10-26 19:14:12 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-10-26 19:14:12 +0000 |
| commit | e5097a8866c071c8b986757543a723b20b67fa03 (patch) | |
| tree | 91d2c06248d17d9dfc0f22fcdba7f08eb6fd6750 /embassy-executor | |
| parent | 9b86de770bccfe00ceaa6b88c51bcaba2a57eb03 (diff) | |
| parent | f9da6271cea7035b2c9f27cfe479aa81889168d1 (diff) | |
Merge #959
959: Generic, executor-agnostic queue implementation r=ivmarkov a=ivmarkov
Hopefully relatively well documented.
Implementation relies on a fixed-size `SortedLinkedList` from `heapless`. (By default, for up to 128 timer schedules, but we can lower this number to - say - 64.)
As discussed earlier, on queue overflow, the `WakerRegistration` approach is utilized, whereas the waker that is ordered first in the queue is awoken to make room for the incoming one (which might be the waker that would be awoken after all!). Wakers are compared with `Waker::will_wake`, so the queue should actually not fill up that easily, if at all.
I've left provisions for the user to manually instantiate the queue using a dedicated macro - `generic_queue!` so that users willing to adjust the queue size, or users (like me) who have to use the queue in a complex "on-top-of-RTOS-but-the-timer-driver-calling-back-from-ISR" scenario can customize the mutex that protects the queue.
The one thing I'm not completely happy with is the need to call `{ embassy_time::queue::initialize() }` early on before any futures using embassy-time are polled, which is currently on the shoulders of the user. I'm open to any ideas where we can get rid of this and do it on the first call to `_embassy_time_schedule_wake`, without introducing very complex combinations of critical sections, atomics and whatnot.
Co-authored-by: ivmarkov <[email protected]>
Co-authored-by: Dario Nieuwenhuis <[email protected]>
Diffstat (limited to 'embassy-executor')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 91 |
1 files changed, 53 insertions, 38 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e1258ebb5..181dabe8e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -354,46 +354,54 @@ impl Executor { | |||
| 354 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's | 354 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's |
| 355 | /// no `poll()` already running. | 355 | /// no `poll()` already running. |
| 356 | pub unsafe fn poll(&'static self) { | 356 | pub unsafe fn poll(&'static self) { |
| 357 | #[cfg(feature = "integrated-timers")] | 357 | loop { |
| 358 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | 358 | #[cfg(feature = "integrated-timers")] |
| 359 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | ||
| 359 | 360 | ||
| 360 | self.run_queue.dequeue_all(|p| { | 361 | self.run_queue.dequeue_all(|p| { |
| 361 | let task = p.as_ref(); | 362 | let task = p.as_ref(); |
| 362 | 363 | ||
| 363 | #[cfg(feature = "integrated-timers")] | 364 | #[cfg(feature = "integrated-timers")] |
| 364 | task.expires_at.set(Instant::MAX); | 365 | task.expires_at.set(Instant::MAX); |
| 365 | |||
| 366 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | ||
| 367 | if state & STATE_SPAWNED == 0 { | ||
| 368 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 369 | // - Task gets dequeued, poll starts | ||
| 370 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 371 | // - Task poll finishes, returning done=true | ||
| 372 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 373 | return; | ||
| 374 | } | ||
| 375 | 366 | ||
| 376 | #[cfg(feature = "rtos-trace")] | 367 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |
| 377 | trace::task_exec_begin(p.as_ptr() as u32); | 368 | if state & STATE_SPAWNED == 0 { |
| 369 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 370 | // - Task gets dequeued, poll starts | ||
| 371 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 372 | // - Task poll finishes, returning done=true | ||
| 373 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 374 | return; | ||
| 375 | } | ||
| 378 | 376 | ||
| 379 | // Run the task | 377 | #[cfg(feature = "rtos-trace")] |
| 380 | task.poll_fn.read()(p as _); | 378 | trace::task_exec_begin(p.as_ptr() as u32); |
| 381 | 379 | ||
| 382 | #[cfg(feature = "rtos-trace")] | 380 | // Run the task |
| 383 | trace::task_exec_end(); | 381 | task.poll_fn.read()(p as _); |
| 382 | |||
| 383 | #[cfg(feature = "rtos-trace")] | ||
| 384 | trace::task_exec_end(); | ||
| 385 | |||
| 386 | // Enqueue or update into timer_queue | ||
| 387 | #[cfg(feature = "integrated-timers")] | ||
| 388 | self.timer_queue.update(p); | ||
| 389 | }); | ||
| 384 | 390 | ||
| 385 | // Enqueue or update into timer_queue | ||
| 386 | #[cfg(feature = "integrated-timers")] | 391 | #[cfg(feature = "integrated-timers")] |
| 387 | self.timer_queue.update(p); | 392 | { |
| 388 | }); | 393 | // If this is already in the past, set_alarm might return false |
| 394 | // In that case do another poll loop iteration. | ||
| 395 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 396 | if driver::set_alarm(self.alarm, next_expiration.as_ticks()) { | ||
| 397 | break; | ||
| 398 | } | ||
| 399 | } | ||
| 389 | 400 | ||
| 390 | #[cfg(feature = "integrated-timers")] | 401 | #[cfg(not(feature = "integrated-timers"))] |
| 391 | { | 402 | { |
| 392 | // If this is already in the past, set_alarm will immediately trigger the alarm. | 403 | break; |
| 393 | // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, | 404 | } |
| 394 | // so we immediately do another poll loop iteration. | ||
| 395 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 396 | driver::set_alarm(self.alarm, next_expiration.as_ticks()); | ||
| 397 | } | 405 | } |
| 398 | 406 | ||
| 399 | #[cfg(feature = "rtos-trace")] | 407 | #[cfg(feature = "rtos-trace")] |
| @@ -436,14 +444,21 @@ pub unsafe fn wake_task(task: NonNull<TaskHeader>) { | |||
| 436 | } | 444 | } |
| 437 | 445 | ||
| 438 | #[cfg(feature = "integrated-timers")] | 446 | #[cfg(feature = "integrated-timers")] |
| 439 | #[no_mangle] | 447 | struct TimerQueue; |
| 440 | unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { | 448 | |
| 441 | let task = waker::task_from_waker(waker); | 449 | #[cfg(feature = "integrated-timers")] |
| 442 | let task = task.as_ref(); | 450 | impl embassy_time::queue::TimerQueue for TimerQueue { |
| 443 | let expires_at = task.expires_at.get(); | 451 | fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { |
| 444 | task.expires_at.set(expires_at.min(at)); | 452 | let task = waker::task_from_waker(waker); |
| 453 | let task = unsafe { task.as_ref() }; | ||
| 454 | let expires_at = task.expires_at.get(); | ||
| 455 | task.expires_at.set(expires_at.min(at)); | ||
| 456 | } | ||
| 445 | } | 457 | } |
| 446 | 458 | ||
| 459 | #[cfg(feature = "integrated-timers")] | ||
| 460 | embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); | ||
| 461 | |||
| 447 | #[cfg(feature = "rtos-trace")] | 462 | #[cfg(feature = "rtos-trace")] |
| 448 | impl rtos_trace::RtosTraceOSCallbacks for Executor { | 463 | impl rtos_trace::RtosTraceOSCallbacks for Executor { |
| 449 | fn task_list() { | 464 | fn task_list() { |
