aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorivmarkov <[email protected]>2022-09-26 13:46:15 +0300
committerivmarkov <[email protected]>2022-10-24 08:21:35 +0300
commit53608a87ac4b6c8c60b5508551d12f5ba76ca2f6 (patch)
treed560bbbae7cff760a2ae00470eb5bfd3d1a1f6c9
parentba6e452cc5d6c33029f34d7cfb5cd5ea846979bd (diff)
Address feedback after code review
-rw-r--r--embassy-time/Cargo.toml16
-rw-r--r--embassy-time/src/lib.rs2
-rw-r--r--embassy-time/src/queue.rs613
-rw-r--r--embassy-time/src/queue_generic.rs474
4 files changed, 518 insertions, 587 deletions
diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml
index 4fbf97f0d..e1ad4b9dd 100644
--- a/embassy-time/Cargo.toml
+++ b/embassy-time/Cargo.toml
@@ -26,10 +26,22 @@ unstable-traits = ["embedded-hal-1"]
26# To use this you must have a time driver provided. 26# To use this you must have a time driver provided.
27defmt-timestamp-uptime = ["defmt"] 27defmt-timestamp-uptime = ["defmt"]
28 28
29# Create a global queue that can be used with any executor 29# Create a global, generic queue that can be used with any executor
30# To use this you must have a time driver provided. 30# To use this you must have a time driver provided.
31generic-queue = [] 31generic-queue = []
32 32
33# Set the number of timers for the generic queue.
34#
35# At most 1 `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used.
36#
37# When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the
38# end user to pick.
39generic-queue-8 = ["generic-queue"]
40generic-queue-16 = ["generic-queue"]
41generic-queue-32 = ["generic-queue"]
42generic-queue-64 = ["generic-queue"]
43generic-queue-128 = ["generic-queue"]
44
33# Set the `embassy_time` tick rate. 45# Set the `embassy_time` tick rate.
34# 46#
35# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. 47# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
@@ -128,3 +140,5 @@ wasm-timer = { version = "0.2.5", optional = true }
128 140
129[dev-dependencies] 141[dev-dependencies]
130serial_test = "0.9" 142serial_test = "0.9"
143critical-section = { version = "1.1", features = ["std"] }
144
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs
index 50f437baf..586aa28de 100644
--- a/embassy-time/src/lib.rs
+++ b/embassy-time/src/lib.rs
@@ -19,6 +19,8 @@ mod timer;
19mod driver_std; 19mod driver_std;
20#[cfg(feature = "wasm")] 20#[cfg(feature = "wasm")]
21mod driver_wasm; 21mod driver_wasm;
22#[cfg(feature = "generic-queue")]
23mod queue_generic;
22 24
23pub use delay::{block_for, Delay}; 25pub use delay::{block_for, Delay};
24pub use duration::Duration; 26pub use duration::Duration;
diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs
index 56ad5af8d..c6f8b440a 100644
--- a/embassy-time/src/queue.rs
+++ b/embassy-time/src/queue.rs
@@ -1,617 +1,58 @@
1//! Generic timer queue implementation 1//! Timer queue implementation
2//! 2//!
3//! This module provides a timer queue that works with any executor. 3//! This module defines the interface a timer queue needs to implement to power the `embassy_time` module.
4//! 4//!
5//! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues, 5//! # Implementing a timer queue
6//! like the one provided with e.g. the `embassy-executor` crate.
7//! 6//!
8//! # Enabling the queue 7//! - Define a struct `MyTimerQueue`
9//! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue. 8//! - Implement [`TimerQueue`] for it
9//! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl).
10//! 10//!
11//! # Initializing the queue 11//! # Linkage details
12//! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled.
13//! 12//!
14//! # Customizing the queue 13//! Check the documentation of the [`driver`](crate::driver) module for more information.
15//! - It is possible to customize two aspects of the queue:
16//! - Queue size:
17//! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added,
18//! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker.
19//! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue:
20//! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex)
21//! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on
22//! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the
23//! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead
24//! configure the queue with a "disable-all-interrupts" style of mutex.
25//! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue)
26//! macro, as per the example below.
27//! 14//!
15//! Similarly to driver, if there is none or multiple timer queues in the crate tree, linking will fail.
28//! 16//!
29//! # Example 17//! # Example
30//! 18//!
31//! ```ignore 19//! ```
32//! use embassy_time::queue::Queue; 20//! use core::task::Waker;
21//!
22//! use embassy_time::Instant;
23//! use embassy_time::queue::{TimerQueue};
33//! 24//!
34//! // You only need to invoke this macro in case you need to customize the queue. 25//! struct MyTimerQueue{}; // not public!
35//! // 26//! embassy_time::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
36//! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled,
37//! // and the queue instantiation will be done for you behind the scenes.
38//! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new());
39//! 27//!
40//! fn main() { 28//! impl TimerQueue for MyTimerQueue {
41//! unsafe { 29//! fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
42//! embassy_time::queue::initialize(); 30//! todo!()
43//! } 31//! }
44//! } 32//! }
45//! ``` 33//! ```
46use core::cell::{Cell, RefCell};
47use core::cmp::Ordering;
48use core::task::Waker; 34use core::task::Waker;
49 35
50use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering};
51use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
52use embassy_sync::blocking_mutex::Mutex;
53use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
54
55use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
56use crate::Instant; 36use crate::Instant;
57 37
58#[derive(Debug)] 38/// Timer queue
59struct Timer { 39pub trait TimerQueue {
60 at: Instant, 40 /// Schedules a waker in the queue to be awoken at moment `at`.
61 waker: Waker, 41 /// If this moment is in the past, the waker might be awoken immediately.
62} 42 fn schedule_wake(&'static self, at: Instant, waker: &Waker);
63
64impl PartialEq for Timer {
65 fn eq(&self, other: &Self) -> bool {
66 self.at == other.at
67 }
68}
69
70impl Eq for Timer {}
71
72impl PartialOrd for Timer {
73 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74 self.at.partial_cmp(&other.at)
75 }
76}
77
78impl Ord for Timer {
79 fn cmp(&self, other: &Self) -> Ordering {
80 self.at.cmp(&other.at)
81 }
82}
83
84struct InnerQueue<const N: usize> {
85 queue: SortedLinkedList<Timer, LinkedIndexU8, Min, N>,
86 alarm_at: Instant,
87}
88
89impl<const N: usize> InnerQueue<N> {
90 const fn new() -> Self {
91 Self {
92 queue: SortedLinkedList::new_u8(),
93 alarm_at: Instant::MAX,
94 }
95 }
96
97 fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) {
98 self.queue
99 .find_mut(|timer| timer.waker.will_wake(waker))
100 .map(|mut timer| {
101 timer.waker = waker.clone();
102 timer.at = at;
103
104 timer.finish();
105 })
106 .unwrap_or_else(|| {
107 let mut timer = Timer {
108 waker: waker.clone(),
109 at,
110 };
111
112 loop {
113 match self.queue.push(timer) {
114 Ok(()) => break,
115 Err(e) => timer = e,
116 }
117
118 self.queue.pop().unwrap().waker.wake();
119 }
120 });
121
122 // Don't wait for the alarm callback to trigger and directly
123 // dispatch all timers that are already due
124 //
125 // Then update the alarm if necessary
126 self.dispatch(alarm_schedule);
127 }
128
129 fn dispatch(&mut self, alarm_schedule: &AtomicU64) {
130 let now = Instant::now();
131
132 while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
133 self.queue.pop().unwrap().waker.wake();
134 }
135
136 self.update_alarm(alarm_schedule);
137 }
138
139 fn update_alarm(&mut self, alarm_schedule: &AtomicU64) {
140 if let Some(timer) = self.queue.peek() {
141 let new_at = timer.at;
142
143 if self.alarm_at != new_at {
144 self.alarm_at = new_at;
145 alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst);
146 }
147 } else {
148 self.alarm_at = Instant::MAX;
149 alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst);
150 }
151 }
152
153 fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) {
154 self.alarm_at = Instant::MAX;
155
156 self.dispatch(alarm_schedule);
157 }
158} 43}
159 44
160/// The generic queue implementation 45/// Set the TimerQueue implementation.
161pub struct Queue<const N: usize = 128, R: RawMutex = CriticalSectionRawMutex> {
162 inner: Mutex<R, RefCell<InnerQueue<N>>>,
163 alarm: Cell<Option<AlarmHandle>>,
164 alarm_schedule: AtomicU64,
165}
166
167impl<const N: usize, R: RawMutex + 'static> Queue<N, R> {
168 /// Create a Queue
169 pub const fn new() -> Self {
170 Self {
171 inner: Mutex::new(RefCell::new(InnerQueue::<N>::new())),
172 alarm: Cell::new(None),
173 alarm_schedule: AtomicU64::new(u64::MAX),
174 }
175 }
176
177 /// Initialize the queue
178 ///
179 /// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly.
180 /// Call [`initialize`](crate::queue::initialize) instead.
181 ///
182 /// # Safety
183 /// It is UB call this function more than once, or to call it after any of your
184 /// futures that use `embassy-time` are polled already.
185 pub unsafe fn initialize(&'static self) {
186 if self.alarm.get().is_some() {
187 panic!("Queue is already initialized");
188 }
189
190 let handle = allocate_alarm().unwrap();
191 self.alarm.set(Some(handle));
192
193 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
194 }
195
196 /// Schedule a new waker to be awoken at moment `at`
197 ///
198 /// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly.
199 pub fn schedule(&'static self, at: Instant, waker: &Waker) {
200 self.check_initialized();
201
202 self.inner
203 .lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule));
204
205 self.update_alarm();
206 }
207
208 fn check_initialized(&self) {
209 if self.alarm.get().is_none() {
210 panic!("Queue is not initialized yet");
211 }
212 }
213
214 fn update_alarm(&self) {
215 // Need to set the alarm when we are *not* holding the mutex on the inner queue
216 // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately
217 // call us back if the timestamp is in the past.
218 let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst);
219
220 if alarm_at < u64::MAX {
221 set_alarm(self.alarm.get().unwrap(), alarm_at);
222 }
223 }
224
225 fn handle_alarm(&self) {
226 self.check_initialized();
227 self.inner
228 .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule));
229
230 self.update_alarm();
231 }
232
233 fn handle_alarm_callback(ctx: *mut ()) {
234 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
235 }
236}
237
238unsafe impl<const N: usize, R: RawMutex + 'static> Send for Queue<N, R> {}
239unsafe impl<const N: usize, R: RawMutex + 'static> Sync for Queue<N, R> {}
240
241/// Initialize the queue
242///
243/// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled.
244///
245/// # Safety
246/// It is UB call this function more than once, or to call it after any of your
247/// futures that use `embassy-time` are polled already.
248pub unsafe fn initialize() {
249 extern "Rust" {
250 fn _embassy_time_generic_queue_initialize();
251 }
252
253 _embassy_time_generic_queue_initialize();
254}
255
256/// Instantiates a global, generic (as in executor-agnostic) timer queue.
257///
258/// Unless you plan to customize the queue (size or mutex), prefer
259/// instantiating the queue via the `generic-queue` feature.
260/// 46///
261/// See the module documentation for an example. 47/// See the module documentation for an example.
262#[macro_export] 48#[macro_export]
263macro_rules! generic_queue { 49macro_rules! timer_queue_impl {
264 (static $name:ident: $t: ty = $val:expr) => { 50 (static $name:ident: $t: ty = $val:expr) => {
265 static $name: $t = $val; 51 static $name: $t = $val;
266 52
267 #[no_mangle] 53 #[no_mangle]
268 fn _embassy_time_generic_queue_initialize() {
269 unsafe {
270 $crate::queue::Queue::initialize(&$name);
271 }
272 }
273
274 #[no_mangle]
275 fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) { 54 fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) {
276 $crate::queue::Queue::schedule(&$name, at, waker); 55 <$t as $crate::queue::TimerQueue>::schedule_wake(&$name, at, waker);
277 } 56 }
278 }; 57 };
279} 58}
280
281#[cfg(feature = "generic-queue")]
282generic_queue!(static QUEUE: Queue = Queue::new());
283
284#[cfg(test)]
285mod tests {
286 use core::cell::Cell;
287 use core::sync::atomic::Ordering;
288 use core::task::{RawWaker, RawWakerVTable, Waker};
289 use std::rc::Rc;
290 use std::sync::Mutex;
291
292 use embassy_sync::blocking_mutex::raw::RawMutex;
293 use serial_test::serial;
294
295 use super::InnerQueue;
296 use crate::driver::{AlarmHandle, Driver};
297 use crate::Instant;
298
299 struct InnerTestDriver {
300 now: u64,
301 alarm: u64,
302 callback: fn(*mut ()),
303 ctx: *mut (),
304 }
305
306 impl InnerTestDriver {
307 const fn new() -> Self {
308 Self {
309 now: 0,
310 alarm: u64::MAX,
311 callback: Self::noop,
312 ctx: core::ptr::null_mut(),
313 }
314 }
315
316 fn noop(_ctx: *mut ()) {}
317 }
318
319 unsafe impl Send for InnerTestDriver {}
320
321 struct TestDriver(Mutex<InnerTestDriver>);
322
323 impl TestDriver {
324 const fn new() -> Self {
325 Self(Mutex::new(InnerTestDriver::new()))
326 }
327
328 fn reset(&self) {
329 *self.0.lock().unwrap() = InnerTestDriver::new();
330 }
331
332 fn set_now(&self, now: u64) {
333 let notify = {
334 let mut inner = self.0.lock().unwrap();
335
336 if inner.now < now {
337 inner.now = now;
338
339 if inner.alarm <= now {
340 inner.alarm = u64::MAX;
341
342 Some((inner.callback, inner.ctx))
343 } else {
344 None
345 }
346 } else {
347 panic!("Going back in time?");
348 }
349 };
350
351 if let Some((callback, ctx)) = notify {
352 (callback)(ctx);
353 }
354 }
355 }
356
357 impl Driver for TestDriver {
358 fn now(&self) -> u64 {
359 self.0.lock().unwrap().now
360 }
361
362 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
363 Some(AlarmHandle::new(0))
364 }
365
366 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
367 let mut inner = self.0.lock().unwrap();
368
369 inner.callback = callback;
370 inner.ctx = ctx;
371 }
372
373 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) {
374 let notify = {
375 let mut inner = self.0.lock().unwrap();
376
377 if timestamp <= inner.now {
378 Some((inner.callback, inner.ctx))
379 } else {
380 inner.alarm = timestamp;
381 None
382 }
383 };
384
385 if let Some((callback, ctx)) = notify {
386 (callback)(ctx);
387 }
388 }
389 }
390
391 struct TestWaker {
392 pub awoken: Rc<Cell<bool>>,
393 pub waker: Waker,
394 }
395
396 impl TestWaker {
397 fn new() -> Self {
398 let flag = Rc::new(Cell::new(false));
399
400 const VTABLE: RawWakerVTable = RawWakerVTable::new(
401 |data: *const ()| {
402 unsafe {
403 Rc::increment_strong_count(data as *const Cell<bool>);
404 }
405
406 RawWaker::new(data as _, &VTABLE)
407 },
408 |data: *const ()| unsafe {
409 let data = data as *const Cell<bool>;
410 data.as_ref().unwrap().set(true);
411 Rc::decrement_strong_count(data);
412 },
413 |data: *const ()| unsafe {
414 (data as *const Cell<bool>).as_ref().unwrap().set(true);
415 },
416 |data: *const ()| unsafe {
417 Rc::decrement_strong_count(data);
418 },
419 );
420
421 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
422
423 Self {
424 awoken: flag.clone(),
425 waker: unsafe { Waker::from_raw(raw) },
426 }
427 }
428 }
429
430 // TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate
431 pub struct StdRawMutex(std::sync::Mutex<()>);
432
433 unsafe impl RawMutex for StdRawMutex {
434 const INIT: Self = StdRawMutex(std::sync::Mutex::new(()));
435
436 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
437 let _guard = self.0.lock().unwrap();
438
439 f()
440 }
441 }
442
443 const QUEUE_MAX_LEN: usize = 8;
444
445 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
446 crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new());
447
448 fn setup() {
449 DRIVER.reset();
450
451 QUEUE.alarm.set(None);
452 QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst);
453 QUEUE.inner.lock(|inner| {
454 *inner.borrow_mut() = InnerQueue::new();
455 });
456
457 unsafe { super::initialize() };
458 }
459
460 fn queue_len() -> usize {
461 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
462 }
463
464 #[test]
465 #[serial]
466 #[should_panic(expected = "Queue is not initialized yet")]
467 fn test_not_initialized() {
468 static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
469
470 let waker = TestWaker::new();
471
472 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
473 }
474
475 #[test]
476 #[serial]
477 fn test_initialized() {
478 static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
479
480 assert!(QUEUE.alarm.get().is_none());
481
482 unsafe { QUEUE.initialize() };
483
484 assert!(QUEUE.alarm.get().is_some());
485 }
486
487 #[test]
488 #[serial]
489 #[should_panic(expected = "Queue is already initialized")]
490 fn test_already_initialized() {
491 static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
492
493 unsafe { QUEUE.initialize() };
494
495 assert!(QUEUE.alarm.get().is_some());
496
497 unsafe { QUEUE.initialize() };
498 }
499
500 #[test]
501 #[serial]
502 fn test_schedule() {
503 setup();
504
505 assert_eq!(queue_len(), 0);
506
507 let waker = TestWaker::new();
508
509 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
510
511 assert!(!waker.awoken.get());
512 assert_eq!(queue_len(), 1);
513 }
514
515 #[test]
516 #[serial]
517 fn test_schedule_same() {
518 setup();
519
520 let waker = TestWaker::new();
521
522 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
523
524 assert_eq!(queue_len(), 1);
525
526 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
527
528 assert_eq!(queue_len(), 1);
529
530 QUEUE.schedule(Instant::from_secs(100), &waker.waker);
531
532 assert_eq!(queue_len(), 1);
533
534 let waker2 = TestWaker::new();
535
536 QUEUE.schedule(Instant::from_secs(100), &waker2.waker);
537
538 assert_eq!(queue_len(), 2);
539 }
540
541 #[test]
542 #[serial]
543 fn test_trigger() {
544 setup();
545
546 let waker = TestWaker::new();
547
548 QUEUE.schedule(Instant::from_secs(100), &waker.waker);
549
550 assert!(!waker.awoken.get());
551
552 DRIVER.set_now(Instant::from_secs(99).as_ticks());
553
554 assert!(!waker.awoken.get());
555
556 assert_eq!(queue_len(), 1);
557
558 DRIVER.set_now(Instant::from_secs(100).as_ticks());
559
560 assert!(waker.awoken.get());
561
562 assert_eq!(queue_len(), 0);
563 }
564
565 #[test]
566 #[serial]
567 fn test_immediate_trigger() {
568 setup();
569
570 let waker = TestWaker::new();
571
572 QUEUE.schedule(Instant::from_secs(100), &waker.waker);
573
574 DRIVER.set_now(Instant::from_secs(50).as_ticks());
575
576 let waker2 = TestWaker::new();
577
578 QUEUE.schedule(Instant::from_secs(40), &waker2.waker);
579
580 assert!(!waker.awoken.get());
581 assert!(waker2.awoken.get());
582 assert_eq!(queue_len(), 1);
583 }
584
585 #[test]
586 #[serial]
587 fn test_queue_overflow() {
588 setup();
589
590 for i in 1..QUEUE_MAX_LEN {
591 let waker = TestWaker::new();
592
593 QUEUE.schedule(Instant::from_secs(310), &waker.waker);
594
595 assert_eq!(queue_len(), i);
596 assert!(!waker.awoken.get());
597 }
598
599 let first_waker = TestWaker::new();
600
601 QUEUE.schedule(Instant::from_secs(300), &first_waker.waker);
602
603 assert_eq!(queue_len(), QUEUE_MAX_LEN);
604 assert!(!first_waker.awoken.get());
605
606 let second_waker = TestWaker::new();
607
608 QUEUE.schedule(Instant::from_secs(305), &second_waker.waker);
609
610 assert_eq!(queue_len(), QUEUE_MAX_LEN);
611 assert!(first_waker.awoken.get());
612
613 QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker);
614 assert_eq!(queue_len(), QUEUE_MAX_LEN);
615 assert!(second_waker.awoken.get());
616 }
617}
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
new file mode 100644
index 000000000..1c4e5398b
--- /dev/null
+++ b/embassy-time/src/queue_generic.rs
@@ -0,0 +1,474 @@
1use core::cell::RefCell;
2use core::cmp::Ordering;
3use core::task::Waker;
4
5use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering};
6use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
7use embassy_sync::blocking_mutex::Mutex;
8use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
9
10use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
11use crate::queue::TimerQueue;
12use crate::Instant;
13
14#[cfg(feature = "generic-queue-8")]
15const QUEUE_SIZE: usize = 8;
16#[cfg(feature = "generic-queue-16")]
17const QUEUE_SIZE: usize = 16;
18#[cfg(feature = "generic-queue-32")]
19const QUEUE_SIZE: usize = 32;
20#[cfg(feature = "generic-queue-64")]
21const QUEUE_SIZE: usize = 32;
22#[cfg(feature = "generic-queue-128")]
23const QUEUE_SIZE: usize = 128;
24#[cfg(not(any(
25 feature = "generic-queue-8",
26 feature = "generic-queue-16",
27 feature = "generic-queue-32",
28 feature = "generic-queue-64",
29 feature = "generic-queue-128"
30)))]
31const QUEUE_SIZE: usize = 64;
32
33#[derive(Debug)]
34struct Timer {
35 at: Instant,
36 waker: Waker,
37}
38
39impl PartialEq for Timer {
40 fn eq(&self, other: &Self) -> bool {
41 self.at == other.at
42 }
43}
44
45impl Eq for Timer {}
46
47impl PartialOrd for Timer {
48 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49 self.at.partial_cmp(&other.at)
50 }
51}
52
53impl Ord for Timer {
54 fn cmp(&self, other: &Self) -> Ordering {
55 self.at.cmp(&other.at)
56 }
57}
58
59struct InnerQueue {
60 queue: SortedLinkedList<Timer, LinkedIndexU8, Min, { QUEUE_SIZE }>,
61 alarm: Option<AlarmHandle>,
62 alarm_at: Instant,
63}
64
65impl InnerQueue {
66 const fn new() -> Self {
67 Self {
68 queue: SortedLinkedList::new_u8(),
69 alarm: None,
70 alarm_at: Instant::MAX,
71 }
72 }
73
74 fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) {
75 self.queue
76 .find_mut(|timer| timer.waker.will_wake(waker))
77 .map(|mut timer| {
78 timer.at = at;
79 timer.finish();
80 })
81 .unwrap_or_else(|| {
82 let mut timer = Timer {
83 waker: waker.clone(),
84 at,
85 };
86
87 loop {
88 match self.queue.push(timer) {
89 Ok(()) => break,
90 Err(e) => timer = e,
91 }
92
93 self.queue.pop().unwrap().waker.wake();
94 }
95 });
96
97 // Don't wait for the alarm callback to trigger and directly
98 // dispatch all timers that are already due
99 //
100 // Then update the alarm if necessary
101 self.dispatch(alarm_schedule);
102 }
103
104 fn dispatch(&mut self, alarm_schedule: &AtomicU64) {
105 let now = Instant::now();
106
107 while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
108 self.queue.pop().unwrap().waker.wake();
109 }
110
111 self.update_alarm(alarm_schedule);
112 }
113
114 fn update_alarm(&mut self, alarm_schedule: &AtomicU64) {
115 if let Some(timer) = self.queue.peek() {
116 let new_at = timer.at;
117
118 if self.alarm_at != new_at {
119 self.alarm_at = new_at;
120 alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst);
121 }
122 } else {
123 self.alarm_at = Instant::MAX;
124 alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst);
125 }
126 }
127
128 fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) {
129 self.alarm_at = Instant::MAX;
130
131 self.dispatch(alarm_schedule);
132 }
133}
134
135struct Queue {
136 inner: Mutex<CriticalSectionRawMutex, RefCell<InnerQueue>>,
137 alarm_schedule: AtomicU64,
138}
139
140impl Queue {
141 const fn new() -> Self {
142 Self {
143 inner: Mutex::new(RefCell::new(InnerQueue::new())),
144 alarm_schedule: AtomicU64::new(u64::MAX),
145 }
146 }
147
148 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
149 self.inner.lock(|inner| {
150 let mut inner = inner.borrow_mut();
151
152 if inner.alarm.is_none() {
153 let handle = unsafe { allocate_alarm() }.unwrap();
154 inner.alarm = Some(handle);
155
156 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
157 }
158
159 inner.schedule_wake(at, waker, &self.alarm_schedule)
160 });
161
162 self.update_alarm();
163 }
164
165 fn update_alarm(&self) {
166 // Need to set the alarm when we are *not* holding the mutex on the inner queue
167 // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately
168 // call us back if the timestamp is in the past.
169 let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst);
170
171 if alarm_at < u64::MAX {
172 set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at);
173 }
174 }
175
176 fn handle_alarm(&self) {
177 self.inner
178 .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule));
179
180 self.update_alarm();
181 }
182
183 fn handle_alarm_callback(ctx: *mut ()) {
184 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
185 }
186}
187
188impl TimerQueue for Queue {
189 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
190 Queue::schedule_wake(self, at, waker);
191 }
192}
193
194crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
195
196#[cfg(test)]
197mod tests {
198 use core::cell::Cell;
199 use core::sync::atomic::Ordering;
200 use core::task::{RawWaker, RawWakerVTable, Waker};
201 use std::rc::Rc;
202 use std::sync::Mutex;
203
204 use serial_test::serial;
205
206 use super::InnerQueue;
207 use crate::driver::{AlarmHandle, Driver};
208 use crate::queue_generic::QUEUE;
209 use crate::Instant;
210
211 struct InnerTestDriver {
212 now: u64,
213 alarm: u64,
214 callback: fn(*mut ()),
215 ctx: *mut (),
216 }
217
218 impl InnerTestDriver {
219 const fn new() -> Self {
220 Self {
221 now: 0,
222 alarm: u64::MAX,
223 callback: Self::noop,
224 ctx: core::ptr::null_mut(),
225 }
226 }
227
228 fn noop(_ctx: *mut ()) {}
229 }
230
231 unsafe impl Send for InnerTestDriver {}
232
233 struct TestDriver(Mutex<InnerTestDriver>);
234
235 impl TestDriver {
236 const fn new() -> Self {
237 Self(Mutex::new(InnerTestDriver::new()))
238 }
239
240 fn reset(&self) {
241 *self.0.lock().unwrap() = InnerTestDriver::new();
242 }
243
244 fn set_now(&self, now: u64) {
245 let notify = {
246 let mut inner = self.0.lock().unwrap();
247
248 if inner.now < now {
249 inner.now = now;
250
251 if inner.alarm <= now {
252 inner.alarm = u64::MAX;
253
254 Some((inner.callback, inner.ctx))
255 } else {
256 None
257 }
258 } else {
259 panic!("Going back in time?");
260 }
261 };
262
263 if let Some((callback, ctx)) = notify {
264 (callback)(ctx);
265 }
266 }
267 }
268
269 impl Driver for TestDriver {
270 fn now(&self) -> u64 {
271 self.0.lock().unwrap().now
272 }
273
274 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
275 Some(AlarmHandle::new(0))
276 }
277
278 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
279 let mut inner = self.0.lock().unwrap();
280
281 inner.callback = callback;
282 inner.ctx = ctx;
283 }
284
285 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) {
286 let notify = {
287 let mut inner = self.0.lock().unwrap();
288
289 if timestamp <= inner.now {
290 Some((inner.callback, inner.ctx))
291 } else {
292 inner.alarm = timestamp;
293 None
294 }
295 };
296
297 if let Some((callback, ctx)) = notify {
298 (callback)(ctx);
299 }
300 }
301 }
302
303 struct TestWaker {
304 pub awoken: Rc<Cell<bool>>,
305 pub waker: Waker,
306 }
307
308 impl TestWaker {
309 fn new() -> Self {
310 let flag = Rc::new(Cell::new(false));
311
312 const VTABLE: RawWakerVTable = RawWakerVTable::new(
313 |data: *const ()| {
314 unsafe {
315 Rc::increment_strong_count(data as *const Cell<bool>);
316 }
317
318 RawWaker::new(data as _, &VTABLE)
319 },
320 |data: *const ()| unsafe {
321 let data = data as *const Cell<bool>;
322 data.as_ref().unwrap().set(true);
323 Rc::decrement_strong_count(data);
324 },
325 |data: *const ()| unsafe {
326 (data as *const Cell<bool>).as_ref().unwrap().set(true);
327 },
328 |data: *const ()| unsafe {
329 Rc::decrement_strong_count(data);
330 },
331 );
332
333 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
334
335 Self {
336 awoken: flag.clone(),
337 waker: unsafe { Waker::from_raw(raw) },
338 }
339 }
340 }
341
342 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
343
344 fn setup() {
345 DRIVER.reset();
346
347 QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst);
348 QUEUE.inner.lock(|inner| {
349 *inner.borrow_mut() = InnerQueue::new();
350 });
351 }
352
353 fn queue_len() -> usize {
354 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
355 }
356
357 #[test]
358 #[serial]
359 fn test_schedule() {
360 setup();
361
362 assert_eq!(queue_len(), 0);
363
364 let waker = TestWaker::new();
365
366 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
367
368 assert!(!waker.awoken.get());
369 assert_eq!(queue_len(), 1);
370 }
371
372 #[test]
373 #[serial]
374 fn test_schedule_same() {
375 setup();
376
377 let waker = TestWaker::new();
378
379 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
380
381 assert_eq!(queue_len(), 1);
382
383 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
384
385 assert_eq!(queue_len(), 1);
386
387 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
388
389 assert_eq!(queue_len(), 1);
390
391 let waker2 = TestWaker::new();
392
393 QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
394
395 assert_eq!(queue_len(), 2);
396 }
397
398 #[test]
399 #[serial]
400 fn test_trigger() {
401 setup();
402
403 let waker = TestWaker::new();
404
405 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
406
407 assert!(!waker.awoken.get());
408
409 DRIVER.set_now(Instant::from_secs(99).as_ticks());
410
411 assert!(!waker.awoken.get());
412
413 assert_eq!(queue_len(), 1);
414
415 DRIVER.set_now(Instant::from_secs(100).as_ticks());
416
417 assert!(waker.awoken.get());
418
419 assert_eq!(queue_len(), 0);
420 }
421
422 #[test]
423 #[serial]
424 fn test_immediate_trigger() {
425 setup();
426
427 let waker = TestWaker::new();
428
429 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
430
431 DRIVER.set_now(Instant::from_secs(50).as_ticks());
432
433 let waker2 = TestWaker::new();
434
435 QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
436
437 assert!(!waker.awoken.get());
438 assert!(waker2.awoken.get());
439 assert_eq!(queue_len(), 1);
440 }
441
442 #[test]
443 #[serial]
444 fn test_queue_overflow() {
445 setup();
446
447 for i in 1..super::QUEUE_SIZE {
448 let waker = TestWaker::new();
449
450 QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
451
452 assert_eq!(queue_len(), i);
453 assert!(!waker.awoken.get());
454 }
455
456 let first_waker = TestWaker::new();
457
458 QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
459
460 assert_eq!(queue_len(), super::QUEUE_SIZE);
461 assert!(!first_waker.awoken.get());
462
463 let second_waker = TestWaker::new();
464
465 QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
466
467 assert_eq!(queue_len(), super::QUEUE_SIZE);
468 assert!(first_waker.awoken.get());
469
470 QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
471 assert_eq!(queue_len(), super::QUEUE_SIZE);
472 assert!(second_waker.awoken.get());
473 }
474}