aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-time/Cargo.toml6
-rw-r--r--embassy-time/src/lib.rs2
-rw-r--r--embassy-time/src/queue.rs486
3 files changed, 459 insertions, 35 deletions
diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml
index 0e3391d1f..4fbf97f0d 100644
--- a/embassy-time/Cargo.toml
+++ b/embassy-time/Cargo.toml
@@ -26,7 +26,8 @@ unstable-traits = ["embedded-hal-1"]
26# To use this you must have a time driver provided. 26# To use this you must have a time driver provided.
27defmt-timestamp-uptime = ["defmt"] 27defmt-timestamp-uptime = ["defmt"]
28 28
29# TODO: Doc 29# Create a global queue that can be used with any executor
30# To use this you must have a time driver provided.
30generic-queue = [] 31generic-queue = []
31 32
32# Set the `embassy_time` tick rate. 33# Set the `embassy_time` tick rate.
@@ -124,3 +125,6 @@ heapless = "0.7"
124wasm-bindgen = { version = "0.2.81", optional = true } 125wasm-bindgen = { version = "0.2.81", optional = true }
125js-sys = { version = "0.3", optional = true } 126js-sys = { version = "0.3", optional = true }
126wasm-timer = { version = "0.2.5", optional = true } 127wasm-timer = { version = "0.2.5", optional = true }
128
129[dev-dependencies]
130serial_test = "0.9"
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs
index 0457a6571..50f437baf 100644
--- a/embassy-time/src/lib.rs
+++ b/embassy-time/src/lib.rs
@@ -1,4 +1,4 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "std", feature = "wasm", test)), no_std)]
2#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] 2#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
3#![doc = include_str!("../README.md")] 3#![doc = include_str!("../README.md")]
4#![allow(clippy::new_without_default)] 4#![allow(clippy::new_without_default)]
diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs
index 7e84090b1..56ad5af8d 100644
--- a/embassy-time/src/queue.rs
+++ b/embassy-time/src/queue.rs
@@ -1,9 +1,53 @@
1//! Generic timer queue implementation 1//! Generic timer queue implementation
2use core::cell::RefCell; 2//!
3//! This module provides a timer queue that works with any executor.
4//!
5//! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues,
6//! like the one provided with e.g. the `embassy-executor` crate.
7//!
8//! # Enabling the queue
9//! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue.
10//!
11//! # Initializing the queue
12//! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled.
13//!
14//! # Customizing the queue
15//! - It is possible to customize two aspects of the queue:
16//! - Queue size:
17//! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added,
18//! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker.
19//! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue:
20//! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex)
21//! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on
22//! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the
23//! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead
24//! configure the queue with a "disable-all-interrupts" style of mutex.
25//! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue)
26//! macro, as per the example below.
27//!
28//!
29//! # Example
30//!
31//! ```ignore
32//! use embassy_time::queue::Queue;
33//!
34//! // You only need to invoke this macro in case you need to customize the queue.
35//! //
36//! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled,
37//! // and the queue instantiation will be done for you behind the scenes.
38//! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new());
39//!
40//! fn main() {
41//! unsafe {
42//! embassy_time::queue::initialize();
43//! }
44//! }
45//! ```
46use core::cell::{Cell, RefCell};
3use core::cmp::Ordering; 47use core::cmp::Ordering;
4use core::task::Waker; 48use core::task::Waker;
5 49
6use atomic_polyfill::{AtomicBool, Ordering as AtomicOrdering}; 50use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering};
7use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; 51use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
8use embassy_sync::blocking_mutex::Mutex; 52use embassy_sync::blocking_mutex::Mutex;
9use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; 53use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
@@ -40,7 +84,6 @@ impl Ord for Timer {
40struct InnerQueue<const N: usize> { 84struct InnerQueue<const N: usize> {
41 queue: SortedLinkedList<Timer, LinkedIndexU8, Min, N>, 85 queue: SortedLinkedList<Timer, LinkedIndexU8, Min, N>,
42 alarm_at: Instant, 86 alarm_at: Instant,
43 alarm: Option<AlarmHandle>,
44} 87}
45 88
46impl<const N: usize> InnerQueue<N> { 89impl<const N: usize> InnerQueue<N> {
@@ -48,11 +91,10 @@ impl<const N: usize> InnerQueue<N> {
48 Self { 91 Self {
49 queue: SortedLinkedList::new_u8(), 92 queue: SortedLinkedList::new_u8(),
50 alarm_at: Instant::MAX, 93 alarm_at: Instant::MAX,
51 alarm: None,
52 } 94 }
53 } 95 }
54 96
55 fn schedule(&mut self, at: Instant, waker: &Waker) { 97 fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) {
56 self.queue 98 self.queue
57 .find_mut(|timer| timer.waker.will_wake(waker)) 99 .find_mut(|timer| timer.waker.will_wake(waker))
58 .map(|mut timer| { 100 .map(|mut timer| {
@@ -81,90 +123,128 @@ impl<const N: usize> InnerQueue<N> {
81 // dispatch all timers that are already due 123 // dispatch all timers that are already due
82 // 124 //
83 // Then update the alarm if necessary 125 // Then update the alarm if necessary
84 self.dispatch(); 126 self.dispatch(alarm_schedule);
85 } 127 }
86 128
87 fn dispatch(&mut self) { 129 fn dispatch(&mut self, alarm_schedule: &AtomicU64) {
88 let now = Instant::now(); 130 let now = Instant::now();
89 131
90 while self.queue.peek().filter(|timer| timer.at <= now).is_some() { 132 while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
91 self.queue.pop().unwrap().waker.wake(); 133 self.queue.pop().unwrap().waker.wake();
92 } 134 }
93 135
94 self.update_alarm(); 136 self.update_alarm(alarm_schedule);
95 } 137 }
96 138
97 fn update_alarm(&mut self) { 139 fn update_alarm(&mut self, alarm_schedule: &AtomicU64) {
98 if let Some(timer) = self.queue.peek() { 140 if let Some(timer) = self.queue.peek() {
99 let new_at = timer.at; 141 let new_at = timer.at;
100 142
101 if self.alarm_at != new_at { 143 if self.alarm_at != new_at {
102 self.alarm_at = new_at; 144 self.alarm_at = new_at;
103 set_alarm(self.alarm.unwrap(), new_at.as_ticks()); 145 alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst);
104 } 146 }
105 } else { 147 } else {
106 self.alarm_at = Instant::MAX; 148 self.alarm_at = Instant::MAX;
149 alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst);
107 } 150 }
108 } 151 }
109 152
110 fn handle_alarm(&mut self) { 153 fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) {
111 self.alarm_at = Instant::MAX; 154 self.alarm_at = Instant::MAX;
112 155
113 self.dispatch(); 156 self.dispatch(alarm_schedule);
114 } 157 }
115} 158}
116 159
117/// TODO: Doc 160/// The generic queue implementation
118pub struct Queue<const N: usize = 128, R: RawMutex = CriticalSectionRawMutex> { 161pub struct Queue<const N: usize = 128, R: RawMutex = CriticalSectionRawMutex> {
119 initialized: AtomicBool,
120 inner: Mutex<R, RefCell<InnerQueue<N>>>, 162 inner: Mutex<R, RefCell<InnerQueue<N>>>,
163 alarm: Cell<Option<AlarmHandle>>,
164 alarm_schedule: AtomicU64,
121} 165}
122 166
123impl<const N: usize, R: RawMutex + 'static> Queue<N, R> { 167impl<const N: usize, R: RawMutex + 'static> Queue<N, R> {
124 /// TODO: Doc 168 /// Create a Queue
125 pub const fn new() -> Self { 169 pub const fn new() -> Self {
126 Self { 170 Self {
127 initialized: AtomicBool::new(false),
128 inner: Mutex::new(RefCell::new(InnerQueue::<N>::new())), 171 inner: Mutex::new(RefCell::new(InnerQueue::<N>::new())),
172 alarm: Cell::new(None),
173 alarm_schedule: AtomicU64::new(u64::MAX),
129 } 174 }
130 } 175 }
131 176
132 /// TODO: Doc 177 /// Initialize the queue
178 ///
179 /// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly.
180 /// Call [`initialize`](crate::queue::initialize) instead.
181 ///
182 /// # Safety
183 /// It is UB call this function more than once, or to call it after any of your
184 /// futures that use `embassy-time` are polled already.
133 pub unsafe fn initialize(&'static self) { 185 pub unsafe fn initialize(&'static self) {
134 if self.initialized.load(AtomicOrdering::SeqCst) { 186 if self.alarm.get().is_some() {
135 panic!("Queue already initialized"); 187 panic!("Queue is already initialized");
136 } 188 }
137 189
138 let handle = allocate_alarm().unwrap(); 190 let handle = allocate_alarm().unwrap();
139 self.inner.lock(|inner| inner.borrow_mut().alarm = Some(handle)); 191 self.alarm.set(Some(handle));
140
141 set_alarm_callback(handle, Self::handle_alarm, self as *const _ as _);
142 192
143 self.initialized.store(true, AtomicOrdering::SeqCst); 193 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
144 } 194 }
145 195
146 /// TODO: Doc 196 /// Schedule a new waker to be awoken at moment `at`
197 ///
198 /// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly.
147 pub fn schedule(&'static self, at: Instant, waker: &Waker) { 199 pub fn schedule(&'static self, at: Instant, waker: &Waker) {
148 self.check_initialized(); 200 self.check_initialized();
149 201
150 self.inner.lock(|inner| inner.borrow_mut().schedule(at, waker)); 202 self.inner
203 .lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule));
204
205 self.update_alarm();
151 } 206 }
152 207
153 fn check_initialized(&self) { 208 fn check_initialized(&self) {
154 if !self.initialized.load(AtomicOrdering::SeqCst) { 209 if self.alarm.get().is_none() {
155 panic!("Queue is not initialized"); 210 panic!("Queue is not initialized yet");
156 } 211 }
157 } 212 }
158 213
159 fn handle_alarm(ctx: *mut ()) { 214 fn update_alarm(&self) {
160 let this = unsafe { (ctx as *const Self).as_ref().unwrap() }; 215 // Need to set the alarm when we are *not* holding the mutex on the inner queue
216 // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately
217 // call us back if the timestamp is in the past.
218 let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst);
219
220 if alarm_at < u64::MAX {
221 set_alarm(self.alarm.get().unwrap(), alarm_at);
222 }
223 }
224
225 fn handle_alarm(&self) {
226 self.check_initialized();
227 self.inner
228 .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule));
229
230 self.update_alarm();
231 }
161 232
162 this.check_initialized(); 233 fn handle_alarm_callback(ctx: *mut ()) {
163 this.inner.lock(|inner| inner.borrow_mut().handle_alarm()); 234 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
164 } 235 }
165} 236}
166 237
167/// TODO: Doc 238unsafe impl<const N: usize, R: RawMutex + 'static> Send for Queue<N, R> {}
239unsafe impl<const N: usize, R: RawMutex + 'static> Sync for Queue<N, R> {}
240
241/// Initialize the queue
242///
243/// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled.
244///
245/// # Safety
246/// It is UB call this function more than once, or to call it after any of your
247/// futures that use `embassy-time` are polled already.
168pub unsafe fn initialize() { 248pub unsafe fn initialize() {
169 extern "Rust" { 249 extern "Rust" {
170 fn _embassy_time_generic_queue_initialize(); 250 fn _embassy_time_generic_queue_initialize();
@@ -173,7 +253,12 @@ pub unsafe fn initialize() {
173 _embassy_time_generic_queue_initialize(); 253 _embassy_time_generic_queue_initialize();
174} 254}
175 255
176/// TODO: Doc 256/// Instantiates a global, generic (as in executor-agnostic) timer queue.
257///
258/// Unless you plan to customize the queue (size or mutex), prefer
259/// instantiating the queue via the `generic-queue` feature.
260///
261/// See the module documentation for an example.
177#[macro_export] 262#[macro_export]
178macro_rules! generic_queue { 263macro_rules! generic_queue {
179 (static $name:ident: $t: ty = $val:expr) => { 264 (static $name:ident: $t: ty = $val:expr) => {
@@ -195,3 +280,338 @@ macro_rules! generic_queue {
195 280
196#[cfg(feature = "generic-queue")] 281#[cfg(feature = "generic-queue")]
197generic_queue!(static QUEUE: Queue = Queue::new()); 282generic_queue!(static QUEUE: Queue = Queue::new());
283
284#[cfg(test)]
285mod tests {
286 use core::cell::Cell;
287 use core::sync::atomic::Ordering;
288 use core::task::{RawWaker, RawWakerVTable, Waker};
289 use std::rc::Rc;
290 use std::sync::Mutex;
291
292 use embassy_sync::blocking_mutex::raw::RawMutex;
293 use serial_test::serial;
294
295 use super::InnerQueue;
296 use crate::driver::{AlarmHandle, Driver};
297 use crate::Instant;
298
299 struct InnerTestDriver {
300 now: u64,
301 alarm: u64,
302 callback: fn(*mut ()),
303 ctx: *mut (),
304 }
305
306 impl InnerTestDriver {
307 const fn new() -> Self {
308 Self {
309 now: 0,
310 alarm: u64::MAX,
311 callback: Self::noop,
312 ctx: core::ptr::null_mut(),
313 }
314 }
315
316 fn noop(_ctx: *mut ()) {}
317 }
318
319 unsafe impl Send for InnerTestDriver {}
320
321 struct TestDriver(Mutex<InnerTestDriver>);
322
323 impl TestDriver {
324 const fn new() -> Self {
325 Self(Mutex::new(InnerTestDriver::new()))
326 }
327
328 fn reset(&self) {
329 *self.0.lock().unwrap() = InnerTestDriver::new();
330 }
331
332 fn set_now(&self, now: u64) {
333 let notify = {
334 let mut inner = self.0.lock().unwrap();
335
336 if inner.now < now {
337 inner.now = now;
338
339 if inner.alarm <= now {
340 inner.alarm = u64::MAX;
341
342 Some((inner.callback, inner.ctx))
343 } else {
344 None
345 }
346 } else {
347 panic!("Going back in time?");
348 }
349 };
350
351 if let Some((callback, ctx)) = notify {
352 (callback)(ctx);
353 }
354 }
355 }
356
357 impl Driver for TestDriver {
358 fn now(&self) -> u64 {
359 self.0.lock().unwrap().now
360 }
361
362 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
363 Some(AlarmHandle::new(0))
364 }
365
366 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
367 let mut inner = self.0.lock().unwrap();
368
369 inner.callback = callback;
370 inner.ctx = ctx;
371 }
372
373 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) {
374 let notify = {
375 let mut inner = self.0.lock().unwrap();
376
377 if timestamp <= inner.now {
378 Some((inner.callback, inner.ctx))
379 } else {
380 inner.alarm = timestamp;
381 None
382 }
383 };
384
385 if let Some((callback, ctx)) = notify {
386 (callback)(ctx);
387 }
388 }
389 }
390
391 struct TestWaker {
392 pub awoken: Rc<Cell<bool>>,
393 pub waker: Waker,
394 }
395
396 impl TestWaker {
397 fn new() -> Self {
398 let flag = Rc::new(Cell::new(false));
399
400 const VTABLE: RawWakerVTable = RawWakerVTable::new(
401 |data: *const ()| {
402 unsafe {
403 Rc::increment_strong_count(data as *const Cell<bool>);
404 }
405
406 RawWaker::new(data as _, &VTABLE)
407 },
408 |data: *const ()| unsafe {
409 let data = data as *const Cell<bool>;
410 data.as_ref().unwrap().set(true);
411 Rc::decrement_strong_count(data);
412 },
413 |data: *const ()| unsafe {
414 (data as *const Cell<bool>).as_ref().unwrap().set(true);
415 },
416 |data: *const ()| unsafe {
417 Rc::decrement_strong_count(data);
418 },
419 );
420
421 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
422
423 Self {
424 awoken: flag.clone(),
425 waker: unsafe { Waker::from_raw(raw) },
426 }
427 }
428 }
429
430 // TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate
431 pub struct StdRawMutex(std::sync::Mutex<()>);
432
433 unsafe impl RawMutex for StdRawMutex {
434 const INIT: Self = StdRawMutex(std::sync::Mutex::new(()));
435
436 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
437 let _guard = self.0.lock().unwrap();
438
439 f()
440 }
441 }
442
443 const QUEUE_MAX_LEN: usize = 8;
444
445 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
446 crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new());
447
448 fn setup() {
449 DRIVER.reset();
450
451 QUEUE.alarm.set(None);
452 QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst);
453 QUEUE.inner.lock(|inner| {
454 *inner.borrow_mut() = InnerQueue::new();
455 });
456
457 unsafe { super::initialize() };
458 }
459
460 fn queue_len() -> usize {
461 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
462 }
463
464 #[test]
465 #[serial]
466 #[should_panic(expected = "Queue is not initialized yet")]
467 fn test_not_initialized() {
468 static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
469
470 let waker = TestWaker::new();
471
472 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
473 }
474
475 #[test]
476 #[serial]
477 fn test_initialized() {
478 static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
479
480 assert!(QUEUE.alarm.get().is_none());
481
482 unsafe { QUEUE.initialize() };
483
484 assert!(QUEUE.alarm.get().is_some());
485 }
486
487 #[test]
488 #[serial]
489 #[should_panic(expected = "Queue is already initialized")]
490 fn test_already_initialized() {
491 static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new();
492
493 unsafe { QUEUE.initialize() };
494
495 assert!(QUEUE.alarm.get().is_some());
496
497 unsafe { QUEUE.initialize() };
498 }
499
500 #[test]
501 #[serial]
502 fn test_schedule() {
503 setup();
504
505 assert_eq!(queue_len(), 0);
506
507 let waker = TestWaker::new();
508
509 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
510
511 assert!(!waker.awoken.get());
512 assert_eq!(queue_len(), 1);
513 }
514
515 #[test]
516 #[serial]
517 fn test_schedule_same() {
518 setup();
519
520 let waker = TestWaker::new();
521
522 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
523
524 assert_eq!(queue_len(), 1);
525
526 QUEUE.schedule(Instant::from_secs(1), &waker.waker);
527
528 assert_eq!(queue_len(), 1);
529
530 QUEUE.schedule(Instant::from_secs(100), &waker.waker);
531
532 assert_eq!(queue_len(), 1);
533
534 let waker2 = TestWaker::new();
535
536 QUEUE.schedule(Instant::from_secs(100), &waker2.waker);
537
538 assert_eq!(queue_len(), 2);
539 }
540
541 #[test]
542 #[serial]
543 fn test_trigger() {
544 setup();
545
546 let waker = TestWaker::new();
547
548 QUEUE.schedule(Instant::from_secs(100), &waker.waker);
549
550 assert!(!waker.awoken.get());
551
552 DRIVER.set_now(Instant::from_secs(99).as_ticks());
553
554 assert!(!waker.awoken.get());
555
556 assert_eq!(queue_len(), 1);
557
558 DRIVER.set_now(Instant::from_secs(100).as_ticks());
559
560 assert!(waker.awoken.get());
561
562 assert_eq!(queue_len(), 0);
563 }
564
565 #[test]
566 #[serial]
567 fn test_immediate_trigger() {
568 setup();
569
570 let waker = TestWaker::new();
571
572 QUEUE.schedule(Instant::from_secs(100), &waker.waker);
573
574 DRIVER.set_now(Instant::from_secs(50).as_ticks());
575
576 let waker2 = TestWaker::new();
577
578 QUEUE.schedule(Instant::from_secs(40), &waker2.waker);
579
580 assert!(!waker.awoken.get());
581 assert!(waker2.awoken.get());
582 assert_eq!(queue_len(), 1);
583 }
584
585 #[test]
586 #[serial]
587 fn test_queue_overflow() {
588 setup();
589
590 for i in 1..QUEUE_MAX_LEN {
591 let waker = TestWaker::new();
592
593 QUEUE.schedule(Instant::from_secs(310), &waker.waker);
594
595 assert_eq!(queue_len(), i);
596 assert!(!waker.awoken.get());
597 }
598
599 let first_waker = TestWaker::new();
600
601 QUEUE.schedule(Instant::from_secs(300), &first_waker.waker);
602
603 assert_eq!(queue_len(), QUEUE_MAX_LEN);
604 assert!(!first_waker.awoken.get());
605
606 let second_waker = TestWaker::new();
607
608 QUEUE.schedule(Instant::from_secs(305), &second_waker.waker);
609
610 assert_eq!(queue_len(), QUEUE_MAX_LEN);
611 assert!(first_waker.awoken.get());
612
613 QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker);
614 assert_eq!(queue_len(), QUEUE_MAX_LEN);
615 assert!(second_waker.awoken.get());
616 }
617}