aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-10-26 19:14:12 +0000
committerGitHub <[email protected]>2022-10-26 19:14:12 +0000
commite5097a8866c071c8b986757543a723b20b67fa03 (patch)
tree91d2c06248d17d9dfc0f22fcdba7f08eb6fd6750 /embassy-executor/src
parent9b86de770bccfe00ceaa6b88c51bcaba2a57eb03 (diff)
parentf9da6271cea7035b2c9f27cfe479aa81889168d1 (diff)
Merge #959
959: Generic, executor-agnostic queue implementation r=ivmarkov a=ivmarkov Hopefully relatively well documented. Implementation relies on a fixed-size `SortedLinkedList` from `heapless`. (By default, for up to 128 timer schedules, but we can lower this number to - say - 64.) As discussed earlier, on queue overflow, the `WakerRegistration` approach is utilized, whereas the waker that is ordered first in the queue is awoken to make room for the incoming one (which might be the waker that would be awoken after all!). Wakers are compared with `Waker::will_wake`, so the queue should actually not fill up that easily, if at all. I've left provisions for the user to manually instantiate the queue using a dedicated macro - `generic_queue!` so that users willing to adjust the queue size, or users (like me) who have to use the queue in a complex "on-top-of-RTOS-but-the-timer-driver-calling-back-from-ISR" scenario can customize the mutex that protects the queue. The one thing I'm not completely happy with is the need to call `{ embassy_time::queue::initialize() }` early on before any futures using embassy-time are polled, which is currently on the shoulders of the user. I'm open to any ideas where we can get rid of this and do it on the first call to `_embassy_time_schedule_wake`, without introducing very complex combinations of critical sections, atomics and whatnot. Co-authored-by: ivmarkov <[email protected]> Co-authored-by: Dario Nieuwenhuis <[email protected]>
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/raw/mod.rs91
1 files changed, 53 insertions, 38 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index e1258ebb5..181dabe8e 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -354,46 +354,54 @@ impl Executor {
354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
355 /// no `poll()` already running. 355 /// no `poll()` already running.
356 pub unsafe fn poll(&'static self) { 356 pub unsafe fn poll(&'static self) {
357 #[cfg(feature = "integrated-timers")] 357 loop {
358 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 358 #[cfg(feature = "integrated-timers")]
359 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
359 360
360 self.run_queue.dequeue_all(|p| { 361 self.run_queue.dequeue_all(|p| {
361 let task = p.as_ref(); 362 let task = p.as_ref();
362 363
363 #[cfg(feature = "integrated-timers")] 364 #[cfg(feature = "integrated-timers")]
364 task.expires_at.set(Instant::MAX); 365 task.expires_at.set(Instant::MAX);
365
366 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
367 if state & STATE_SPAWNED == 0 {
368 // If task is not running, ignore it. This can happen in the following scenario:
369 // - Task gets dequeued, poll starts
370 // - While task is being polled, it gets woken. It gets placed in the queue.
371 // - Task poll finishes, returning done=true
372 // - RUNNING bit is cleared, but the task is already in the queue.
373 return;
374 }
375 366
376 #[cfg(feature = "rtos-trace")] 367 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
377 trace::task_exec_begin(p.as_ptr() as u32); 368 if state & STATE_SPAWNED == 0 {
369 // If task is not running, ignore it. This can happen in the following scenario:
370 // - Task gets dequeued, poll starts
371 // - While task is being polled, it gets woken. It gets placed in the queue.
372 // - Task poll finishes, returning done=true
373 // - RUNNING bit is cleared, but the task is already in the queue.
374 return;
375 }
378 376
379 // Run the task 377 #[cfg(feature = "rtos-trace")]
380 task.poll_fn.read()(p as _); 378 trace::task_exec_begin(p.as_ptr() as u32);
381 379
382 #[cfg(feature = "rtos-trace")] 380 // Run the task
383 trace::task_exec_end(); 381 task.poll_fn.read()(p as _);
382
383 #[cfg(feature = "rtos-trace")]
384 trace::task_exec_end();
385
386 // Enqueue or update into timer_queue
387 #[cfg(feature = "integrated-timers")]
388 self.timer_queue.update(p);
389 });
384 390
385 // Enqueue or update into timer_queue
386 #[cfg(feature = "integrated-timers")] 391 #[cfg(feature = "integrated-timers")]
387 self.timer_queue.update(p); 392 {
388 }); 393 // If this is already in the past, set_alarm might return false
394 // In that case do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 if driver::set_alarm(self.alarm, next_expiration.as_ticks()) {
397 break;
398 }
399 }
389 400
390 #[cfg(feature = "integrated-timers")] 401 #[cfg(not(feature = "integrated-timers"))]
391 { 402 {
392 // If this is already in the past, set_alarm will immediately trigger the alarm. 403 break;
393 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, 404 }
394 // so we immediately do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 driver::set_alarm(self.alarm, next_expiration.as_ticks());
397 } 405 }
398 406
399 #[cfg(feature = "rtos-trace")] 407 #[cfg(feature = "rtos-trace")]
@@ -436,14 +444,21 @@ pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
436} 444}
437 445
438#[cfg(feature = "integrated-timers")] 446#[cfg(feature = "integrated-timers")]
439#[no_mangle] 447struct TimerQueue;
440unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { 448
441 let task = waker::task_from_waker(waker); 449#[cfg(feature = "integrated-timers")]
442 let task = task.as_ref(); 450impl embassy_time::queue::TimerQueue for TimerQueue {
443 let expires_at = task.expires_at.get(); 451 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
444 task.expires_at.set(expires_at.min(at)); 452 let task = waker::task_from_waker(waker);
453 let task = unsafe { task.as_ref() };
454 let expires_at = task.expires_at.get();
455 task.expires_at.set(expires_at.min(at));
456 }
445} 457}
446 458
459#[cfg(feature = "integrated-timers")]
460embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
461
447#[cfg(feature = "rtos-trace")] 462#[cfg(feature = "rtos-trace")]
448impl rtos_trace::RtosTraceOSCallbacks for Executor { 463impl rtos_trace::RtosTraceOSCallbacks for Executor {
449 fn task_list() { 464 fn task_list() {