diff options
| author | ivmarkov <[email protected]> | 2022-09-26 13:46:15 +0300 |
|---|---|---|
| committer | ivmarkov <[email protected]> | 2022-10-24 08:21:35 +0300 |
| commit | 53608a87ac4b6c8c60b5508551d12f5ba76ca2f6 (patch) | |
| tree | d560bbbae7cff760a2ae00470eb5bfd3d1a1f6c9 | |
| parent | ba6e452cc5d6c33029f34d7cfb5cd5ea846979bd (diff) | |
Address feedback after code review
| -rw-r--r-- | embassy-time/Cargo.toml | 16 | ||||
| -rw-r--r-- | embassy-time/src/lib.rs | 2 | ||||
| -rw-r--r-- | embassy-time/src/queue.rs | 613 | ||||
| -rw-r--r-- | embassy-time/src/queue_generic.rs | 474 |
4 files changed, 518 insertions, 587 deletions
diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index 4fbf97f0d..e1ad4b9dd 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml | |||
| @@ -26,10 +26,22 @@ unstable-traits = ["embedded-hal-1"] | |||
| 26 | # To use this you must have a time driver provided. | 26 | # To use this you must have a time driver provided. |
| 27 | defmt-timestamp-uptime = ["defmt"] | 27 | defmt-timestamp-uptime = ["defmt"] |
| 28 | 28 | ||
| 29 | # Create a global queue that can be used with any executor | 29 | # Create a global, generic queue that can be used with any executor |
| 30 | # To use this you must have a time driver provided. | 30 | # To use this you must have a time driver provided. |
| 31 | generic-queue = [] | 31 | generic-queue = [] |
| 32 | 32 | ||
| 33 | # Set the number of timers for the generic queue. | ||
| 34 | # | ||
| 35 | # At most 1 `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used. | ||
| 36 | # | ||
| 37 | # When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the | ||
| 38 | # end user to pick. | ||
| 39 | generic-queue-8 = ["generic-queue"] | ||
| 40 | generic-queue-16 = ["generic-queue"] | ||
| 41 | generic-queue-32 = ["generic-queue"] | ||
| 42 | generic-queue-64 = ["generic-queue"] | ||
| 43 | generic-queue-128 = ["generic-queue"] | ||
| 44 | |||
| 33 | # Set the `embassy_time` tick rate. | 45 | # Set the `embassy_time` tick rate. |
| 34 | # | 46 | # |
| 35 | # At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. | 47 | # At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. |
| @@ -128,3 +140,5 @@ wasm-timer = { version = "0.2.5", optional = true } | |||
| 128 | 140 | ||
| 129 | [dev-dependencies] | 141 | [dev-dependencies] |
| 130 | serial_test = "0.9" | 142 | serial_test = "0.9" |
| 143 | critical-section = { version = "1.1", features = ["std"] } | ||
| 144 | |||
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs index 50f437baf..586aa28de 100644 --- a/embassy-time/src/lib.rs +++ b/embassy-time/src/lib.rs | |||
| @@ -19,6 +19,8 @@ mod timer; | |||
| 19 | mod driver_std; | 19 | mod driver_std; |
| 20 | #[cfg(feature = "wasm")] | 20 | #[cfg(feature = "wasm")] |
| 21 | mod driver_wasm; | 21 | mod driver_wasm; |
| 22 | #[cfg(feature = "generic-queue")] | ||
| 23 | mod queue_generic; | ||
| 22 | 24 | ||
| 23 | pub use delay::{block_for, Delay}; | 25 | pub use delay::{block_for, Delay}; |
| 24 | pub use duration::Duration; | 26 | pub use duration::Duration; |
diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs index 56ad5af8d..c6f8b440a 100644 --- a/embassy-time/src/queue.rs +++ b/embassy-time/src/queue.rs | |||
| @@ -1,617 +1,58 @@ | |||
| 1 | //! Generic timer queue implementation | 1 | //! Timer queue implementation |
| 2 | //! | 2 | //! |
| 3 | //! This module provides a timer queue that works with any executor. | 3 | //! This module defines the interface a timer queue needs to implement to power the `embassy_time` module. |
| 4 | //! | 4 | //! |
| 5 | //! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues, | 5 | //! # Implementing a timer queue |
| 6 | //! like the one provided with e.g. the `embassy-executor` crate. | ||
| 7 | //! | 6 | //! |
| 8 | //! # Enabling the queue | 7 | //! - Define a struct `MyTimerQueue` |
| 9 | //! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue. | 8 | //! - Implement [`TimerQueue`] for it |
| 9 | //! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl). | ||
| 10 | //! | 10 | //! |
| 11 | //! # Initializing the queue | 11 | //! # Linkage details |
| 12 | //! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled. | ||
| 13 | //! | 12 | //! |
| 14 | //! # Customizing the queue | 13 | //! Check the documentation of the [`driver`](crate::driver) module for more information. |
| 15 | //! - It is possible to customize two aspects of the queue: | ||
| 16 | //! - Queue size: | ||
| 17 | //! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added, | ||
| 18 | //! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker. | ||
| 19 | //! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue: | ||
| 20 | //! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex) | ||
| 21 | //! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on | ||
| 22 | //! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the | ||
| 23 | //! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead | ||
| 24 | //! configure the queue with a "disable-all-interrupts" style of mutex. | ||
| 25 | //! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue) | ||
| 26 | //! macro, as per the example below. | ||
| 27 | //! | 14 | //! |
| 15 | //! Similarly to driver, if there is none or multiple timer queues in the crate tree, linking will fail. | ||
| 28 | //! | 16 | //! |
| 29 | //! # Example | 17 | //! # Example |
| 30 | //! | 18 | //! |
| 31 | //! ```ignore | 19 | //! ``` |
| 32 | //! use embassy_time::queue::Queue; | 20 | //! use core::task::Waker; |
| 21 | //! | ||
| 22 | //! use embassy_time::Instant; | ||
| 23 | //! use embassy_time::queue::{TimerQueue}; | ||
| 33 | //! | 24 | //! |
| 34 | //! // You only need to invoke this macro in case you need to customize the queue. | 25 | //! struct MyTimerQueue{}; // not public! |
| 35 | //! // | 26 | //! embassy_time::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{}); |
| 36 | //! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled, | ||
| 37 | //! // and the queue instantiation will be done for you behind the scenes. | ||
| 38 | //! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new()); | ||
| 39 | //! | 27 | //! |
| 40 | //! fn main() { | 28 | //! impl TimerQueue for MyTimerQueue { |
| 41 | //! unsafe { | 29 | //! fn schedule_wake(&'static self, at: Instant, waker: &Waker) { |
| 42 | //! embassy_time::queue::initialize(); | 30 | //! todo!() |
| 43 | //! } | 31 | //! } |
| 44 | //! } | 32 | //! } |
| 45 | //! ``` | 33 | //! ``` |
| 46 | use core::cell::{Cell, RefCell}; | ||
| 47 | use core::cmp::Ordering; | ||
| 48 | use core::task::Waker; | 34 | use core::task::Waker; |
| 49 | 35 | ||
| 50 | use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; | ||
| 51 | use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; | ||
| 52 | use embassy_sync::blocking_mutex::Mutex; | ||
| 53 | use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; | ||
| 54 | |||
| 55 | use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; | ||
| 56 | use crate::Instant; | 36 | use crate::Instant; |
| 57 | 37 | ||
| 58 | #[derive(Debug)] | 38 | /// Timer queue |
| 59 | struct Timer { | 39 | pub trait TimerQueue { |
| 60 | at: Instant, | 40 | /// Schedules a waker in the queue to be awoken at moment `at`. |
| 61 | waker: Waker, | 41 | /// If this moment is in the past, the waker might be awoken immediately. |
| 62 | } | 42 | fn schedule_wake(&'static self, at: Instant, waker: &Waker); |
| 63 | |||
| 64 | impl PartialEq for Timer { | ||
| 65 | fn eq(&self, other: &Self) -> bool { | ||
| 66 | self.at == other.at | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | impl Eq for Timer {} | ||
| 71 | |||
| 72 | impl PartialOrd for Timer { | ||
| 73 | fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||
| 74 | self.at.partial_cmp(&other.at) | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 78 | impl Ord for Timer { | ||
| 79 | fn cmp(&self, other: &Self) -> Ordering { | ||
| 80 | self.at.cmp(&other.at) | ||
| 81 | } | ||
| 82 | } | ||
| 83 | |||
| 84 | struct InnerQueue<const N: usize> { | ||
| 85 | queue: SortedLinkedList<Timer, LinkedIndexU8, Min, N>, | ||
| 86 | alarm_at: Instant, | ||
| 87 | } | ||
| 88 | |||
| 89 | impl<const N: usize> InnerQueue<N> { | ||
| 90 | const fn new() -> Self { | ||
| 91 | Self { | ||
| 92 | queue: SortedLinkedList::new_u8(), | ||
| 93 | alarm_at: Instant::MAX, | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { | ||
| 98 | self.queue | ||
| 99 | .find_mut(|timer| timer.waker.will_wake(waker)) | ||
| 100 | .map(|mut timer| { | ||
| 101 | timer.waker = waker.clone(); | ||
| 102 | timer.at = at; | ||
| 103 | |||
| 104 | timer.finish(); | ||
| 105 | }) | ||
| 106 | .unwrap_or_else(|| { | ||
| 107 | let mut timer = Timer { | ||
| 108 | waker: waker.clone(), | ||
| 109 | at, | ||
| 110 | }; | ||
| 111 | |||
| 112 | loop { | ||
| 113 | match self.queue.push(timer) { | ||
| 114 | Ok(()) => break, | ||
| 115 | Err(e) => timer = e, | ||
| 116 | } | ||
| 117 | |||
| 118 | self.queue.pop().unwrap().waker.wake(); | ||
| 119 | } | ||
| 120 | }); | ||
| 121 | |||
| 122 | // Don't wait for the alarm callback to trigger and directly | ||
| 123 | // dispatch all timers that are already due | ||
| 124 | // | ||
| 125 | // Then update the alarm if necessary | ||
| 126 | self.dispatch(alarm_schedule); | ||
| 127 | } | ||
| 128 | |||
| 129 | fn dispatch(&mut self, alarm_schedule: &AtomicU64) { | ||
| 130 | let now = Instant::now(); | ||
| 131 | |||
| 132 | while self.queue.peek().filter(|timer| timer.at <= now).is_some() { | ||
| 133 | self.queue.pop().unwrap().waker.wake(); | ||
| 134 | } | ||
| 135 | |||
| 136 | self.update_alarm(alarm_schedule); | ||
| 137 | } | ||
| 138 | |||
| 139 | fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { | ||
| 140 | if let Some(timer) = self.queue.peek() { | ||
| 141 | let new_at = timer.at; | ||
| 142 | |||
| 143 | if self.alarm_at != new_at { | ||
| 144 | self.alarm_at = new_at; | ||
| 145 | alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); | ||
| 146 | } | ||
| 147 | } else { | ||
| 148 | self.alarm_at = Instant::MAX; | ||
| 149 | alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); | ||
| 150 | } | ||
| 151 | } | ||
| 152 | |||
| 153 | fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { | ||
| 154 | self.alarm_at = Instant::MAX; | ||
| 155 | |||
| 156 | self.dispatch(alarm_schedule); | ||
| 157 | } | ||
| 158 | } | 43 | } |
| 159 | 44 | ||
| 160 | /// The generic queue implementation | 45 | /// Set the TimerQueue implementation. |
| 161 | pub struct Queue<const N: usize = 128, R: RawMutex = CriticalSectionRawMutex> { | ||
| 162 | inner: Mutex<R, RefCell<InnerQueue<N>>>, | ||
| 163 | alarm: Cell<Option<AlarmHandle>>, | ||
| 164 | alarm_schedule: AtomicU64, | ||
| 165 | } | ||
| 166 | |||
| 167 | impl<const N: usize, R: RawMutex + 'static> Queue<N, R> { | ||
| 168 | /// Create a Queue | ||
| 169 | pub const fn new() -> Self { | ||
| 170 | Self { | ||
| 171 | inner: Mutex::new(RefCell::new(InnerQueue::<N>::new())), | ||
| 172 | alarm: Cell::new(None), | ||
| 173 | alarm_schedule: AtomicU64::new(u64::MAX), | ||
| 174 | } | ||
| 175 | } | ||
| 176 | |||
| 177 | /// Initialize the queue | ||
| 178 | /// | ||
| 179 | /// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly. | ||
| 180 | /// Call [`initialize`](crate::queue::initialize) instead. | ||
| 181 | /// | ||
| 182 | /// # Safety | ||
| 183 | /// It is UB call this function more than once, or to call it after any of your | ||
| 184 | /// futures that use `embassy-time` are polled already. | ||
| 185 | pub unsafe fn initialize(&'static self) { | ||
| 186 | if self.alarm.get().is_some() { | ||
| 187 | panic!("Queue is already initialized"); | ||
| 188 | } | ||
| 189 | |||
| 190 | let handle = allocate_alarm().unwrap(); | ||
| 191 | self.alarm.set(Some(handle)); | ||
| 192 | |||
| 193 | set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); | ||
| 194 | } | ||
| 195 | |||
| 196 | /// Schedule a new waker to be awoken at moment `at` | ||
| 197 | /// | ||
| 198 | /// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly. | ||
| 199 | pub fn schedule(&'static self, at: Instant, waker: &Waker) { | ||
| 200 | self.check_initialized(); | ||
| 201 | |||
| 202 | self.inner | ||
| 203 | .lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule)); | ||
| 204 | |||
| 205 | self.update_alarm(); | ||
| 206 | } | ||
| 207 | |||
| 208 | fn check_initialized(&self) { | ||
| 209 | if self.alarm.get().is_none() { | ||
| 210 | panic!("Queue is not initialized yet"); | ||
| 211 | } | ||
| 212 | } | ||
| 213 | |||
| 214 | fn update_alarm(&self) { | ||
| 215 | // Need to set the alarm when we are *not* holding the mutex on the inner queue | ||
| 216 | // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately | ||
| 217 | // call us back if the timestamp is in the past. | ||
| 218 | let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); | ||
| 219 | |||
| 220 | if alarm_at < u64::MAX { | ||
| 221 | set_alarm(self.alarm.get().unwrap(), alarm_at); | ||
| 222 | } | ||
| 223 | } | ||
| 224 | |||
| 225 | fn handle_alarm(&self) { | ||
| 226 | self.check_initialized(); | ||
| 227 | self.inner | ||
| 228 | .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); | ||
| 229 | |||
| 230 | self.update_alarm(); | ||
| 231 | } | ||
| 232 | |||
| 233 | fn handle_alarm_callback(ctx: *mut ()) { | ||
| 234 | unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); | ||
| 235 | } | ||
| 236 | } | ||
| 237 | |||
| 238 | unsafe impl<const N: usize, R: RawMutex + 'static> Send for Queue<N, R> {} | ||
| 239 | unsafe impl<const N: usize, R: RawMutex + 'static> Sync for Queue<N, R> {} | ||
| 240 | |||
| 241 | /// Initialize the queue | ||
| 242 | /// | ||
| 243 | /// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled. | ||
| 244 | /// | ||
| 245 | /// # Safety | ||
| 246 | /// It is UB call this function more than once, or to call it after any of your | ||
| 247 | /// futures that use `embassy-time` are polled already. | ||
| 248 | pub unsafe fn initialize() { | ||
| 249 | extern "Rust" { | ||
| 250 | fn _embassy_time_generic_queue_initialize(); | ||
| 251 | } | ||
| 252 | |||
| 253 | _embassy_time_generic_queue_initialize(); | ||
| 254 | } | ||
| 255 | |||
| 256 | /// Instantiates a global, generic (as in executor-agnostic) timer queue. | ||
| 257 | /// | ||
| 258 | /// Unless you plan to customize the queue (size or mutex), prefer | ||
| 259 | /// instantiating the queue via the `generic-queue` feature. | ||
| 260 | /// | 46 | /// |
| 261 | /// See the module documentation for an example. | 47 | /// See the module documentation for an example. |
| 262 | #[macro_export] | 48 | #[macro_export] |
| 263 | macro_rules! generic_queue { | 49 | macro_rules! timer_queue_impl { |
| 264 | (static $name:ident: $t: ty = $val:expr) => { | 50 | (static $name:ident: $t: ty = $val:expr) => { |
| 265 | static $name: $t = $val; | 51 | static $name: $t = $val; |
| 266 | 52 | ||
| 267 | #[no_mangle] | 53 | #[no_mangle] |
| 268 | fn _embassy_time_generic_queue_initialize() { | ||
| 269 | unsafe { | ||
| 270 | $crate::queue::Queue::initialize(&$name); | ||
| 271 | } | ||
| 272 | } | ||
| 273 | |||
| 274 | #[no_mangle] | ||
| 275 | fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) { | 54 | fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) { |
| 276 | $crate::queue::Queue::schedule(&$name, at, waker); | 55 | <$t as $crate::queue::TimerQueue>::schedule_wake(&$name, at, waker); |
| 277 | } | 56 | } |
| 278 | }; | 57 | }; |
| 279 | } | 58 | } |
| 280 | |||
| 281 | #[cfg(feature = "generic-queue")] | ||
| 282 | generic_queue!(static QUEUE: Queue = Queue::new()); | ||
| 283 | |||
| 284 | #[cfg(test)] | ||
| 285 | mod tests { | ||
| 286 | use core::cell::Cell; | ||
| 287 | use core::sync::atomic::Ordering; | ||
| 288 | use core::task::{RawWaker, RawWakerVTable, Waker}; | ||
| 289 | use std::rc::Rc; | ||
| 290 | use std::sync::Mutex; | ||
| 291 | |||
| 292 | use embassy_sync::blocking_mutex::raw::RawMutex; | ||
| 293 | use serial_test::serial; | ||
| 294 | |||
| 295 | use super::InnerQueue; | ||
| 296 | use crate::driver::{AlarmHandle, Driver}; | ||
| 297 | use crate::Instant; | ||
| 298 | |||
| 299 | struct InnerTestDriver { | ||
| 300 | now: u64, | ||
| 301 | alarm: u64, | ||
| 302 | callback: fn(*mut ()), | ||
| 303 | ctx: *mut (), | ||
| 304 | } | ||
| 305 | |||
| 306 | impl InnerTestDriver { | ||
| 307 | const fn new() -> Self { | ||
| 308 | Self { | ||
| 309 | now: 0, | ||
| 310 | alarm: u64::MAX, | ||
| 311 | callback: Self::noop, | ||
| 312 | ctx: core::ptr::null_mut(), | ||
| 313 | } | ||
| 314 | } | ||
| 315 | |||
| 316 | fn noop(_ctx: *mut ()) {} | ||
| 317 | } | ||
| 318 | |||
| 319 | unsafe impl Send for InnerTestDriver {} | ||
| 320 | |||
| 321 | struct TestDriver(Mutex<InnerTestDriver>); | ||
| 322 | |||
| 323 | impl TestDriver { | ||
| 324 | const fn new() -> Self { | ||
| 325 | Self(Mutex::new(InnerTestDriver::new())) | ||
| 326 | } | ||
| 327 | |||
| 328 | fn reset(&self) { | ||
| 329 | *self.0.lock().unwrap() = InnerTestDriver::new(); | ||
| 330 | } | ||
| 331 | |||
| 332 | fn set_now(&self, now: u64) { | ||
| 333 | let notify = { | ||
| 334 | let mut inner = self.0.lock().unwrap(); | ||
| 335 | |||
| 336 | if inner.now < now { | ||
| 337 | inner.now = now; | ||
| 338 | |||
| 339 | if inner.alarm <= now { | ||
| 340 | inner.alarm = u64::MAX; | ||
| 341 | |||
| 342 | Some((inner.callback, inner.ctx)) | ||
| 343 | } else { | ||
| 344 | None | ||
| 345 | } | ||
| 346 | } else { | ||
| 347 | panic!("Going back in time?"); | ||
| 348 | } | ||
| 349 | }; | ||
| 350 | |||
| 351 | if let Some((callback, ctx)) = notify { | ||
| 352 | (callback)(ctx); | ||
| 353 | } | ||
| 354 | } | ||
| 355 | } | ||
| 356 | |||
| 357 | impl Driver for TestDriver { | ||
| 358 | fn now(&self) -> u64 { | ||
| 359 | self.0.lock().unwrap().now | ||
| 360 | } | ||
| 361 | |||
| 362 | unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { | ||
| 363 | Some(AlarmHandle::new(0)) | ||
| 364 | } | ||
| 365 | |||
| 366 | fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { | ||
| 367 | let mut inner = self.0.lock().unwrap(); | ||
| 368 | |||
| 369 | inner.callback = callback; | ||
| 370 | inner.ctx = ctx; | ||
| 371 | } | ||
| 372 | |||
| 373 | fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { | ||
| 374 | let notify = { | ||
| 375 | let mut inner = self.0.lock().unwrap(); | ||
| 376 | |||
| 377 | if timestamp <= inner.now { | ||
| 378 | Some((inner.callback, inner.ctx)) | ||
| 379 | } else { | ||
| 380 | inner.alarm = timestamp; | ||
| 381 | None | ||
| 382 | } | ||
| 383 | }; | ||
| 384 | |||
| 385 | if let Some((callback, ctx)) = notify { | ||
| 386 | (callback)(ctx); | ||
| 387 | } | ||
| 388 | } | ||
| 389 | } | ||
| 390 | |||
| 391 | struct TestWaker { | ||
| 392 | pub awoken: Rc<Cell<bool>>, | ||
| 393 | pub waker: Waker, | ||
| 394 | } | ||
| 395 | |||
| 396 | impl TestWaker { | ||
| 397 | fn new() -> Self { | ||
| 398 | let flag = Rc::new(Cell::new(false)); | ||
| 399 | |||
| 400 | const VTABLE: RawWakerVTable = RawWakerVTable::new( | ||
| 401 | |data: *const ()| { | ||
| 402 | unsafe { | ||
| 403 | Rc::increment_strong_count(data as *const Cell<bool>); | ||
| 404 | } | ||
| 405 | |||
| 406 | RawWaker::new(data as _, &VTABLE) | ||
| 407 | }, | ||
| 408 | |data: *const ()| unsafe { | ||
| 409 | let data = data as *const Cell<bool>; | ||
| 410 | data.as_ref().unwrap().set(true); | ||
| 411 | Rc::decrement_strong_count(data); | ||
| 412 | }, | ||
| 413 | |data: *const ()| unsafe { | ||
| 414 | (data as *const Cell<bool>).as_ref().unwrap().set(true); | ||
| 415 | }, | ||
| 416 | |data: *const ()| unsafe { | ||
| 417 | Rc::decrement_strong_count(data); | ||
| 418 | }, | ||
| 419 | ); | ||
| 420 | |||
| 421 | let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); | ||
| 422 | |||
| 423 | Self { | ||
| 424 | awoken: flag.clone(), | ||
| 425 | waker: unsafe { Waker::from_raw(raw) }, | ||
| 426 | } | ||
| 427 | } | ||
| 428 | } | ||
| 429 | |||
| 430 | // TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate | ||
| 431 | pub struct StdRawMutex(std::sync::Mutex<()>); | ||
| 432 | |||
| 433 | unsafe impl RawMutex for StdRawMutex { | ||
| 434 | const INIT: Self = StdRawMutex(std::sync::Mutex::new(())); | ||
| 435 | |||
| 436 | fn lock<R>(&self, f: impl FnOnce() -> R) -> R { | ||
| 437 | let _guard = self.0.lock().unwrap(); | ||
| 438 | |||
| 439 | f() | ||
| 440 | } | ||
| 441 | } | ||
| 442 | |||
| 443 | const QUEUE_MAX_LEN: usize = 8; | ||
| 444 | |||
| 445 | crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); | ||
| 446 | crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new()); | ||
| 447 | |||
| 448 | fn setup() { | ||
| 449 | DRIVER.reset(); | ||
| 450 | |||
| 451 | QUEUE.alarm.set(None); | ||
| 452 | QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); | ||
| 453 | QUEUE.inner.lock(|inner| { | ||
| 454 | *inner.borrow_mut() = InnerQueue::new(); | ||
| 455 | }); | ||
| 456 | |||
| 457 | unsafe { super::initialize() }; | ||
| 458 | } | ||
| 459 | |||
| 460 | fn queue_len() -> usize { | ||
| 461 | QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) | ||
| 462 | } | ||
| 463 | |||
| 464 | #[test] | ||
| 465 | #[serial] | ||
| 466 | #[should_panic(expected = "Queue is not initialized yet")] | ||
| 467 | fn test_not_initialized() { | ||
| 468 | static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); | ||
| 469 | |||
| 470 | let waker = TestWaker::new(); | ||
| 471 | |||
| 472 | QUEUE.schedule(Instant::from_secs(1), &waker.waker); | ||
| 473 | } | ||
| 474 | |||
| 475 | #[test] | ||
| 476 | #[serial] | ||
| 477 | fn test_initialized() { | ||
| 478 | static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); | ||
| 479 | |||
| 480 | assert!(QUEUE.alarm.get().is_none()); | ||
| 481 | |||
| 482 | unsafe { QUEUE.initialize() }; | ||
| 483 | |||
| 484 | assert!(QUEUE.alarm.get().is_some()); | ||
| 485 | } | ||
| 486 | |||
| 487 | #[test] | ||
| 488 | #[serial] | ||
| 489 | #[should_panic(expected = "Queue is already initialized")] | ||
| 490 | fn test_already_initialized() { | ||
| 491 | static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); | ||
| 492 | |||
| 493 | unsafe { QUEUE.initialize() }; | ||
| 494 | |||
| 495 | assert!(QUEUE.alarm.get().is_some()); | ||
| 496 | |||
| 497 | unsafe { QUEUE.initialize() }; | ||
| 498 | } | ||
| 499 | |||
| 500 | #[test] | ||
| 501 | #[serial] | ||
| 502 | fn test_schedule() { | ||
| 503 | setup(); | ||
| 504 | |||
| 505 | assert_eq!(queue_len(), 0); | ||
| 506 | |||
| 507 | let waker = TestWaker::new(); | ||
| 508 | |||
| 509 | QUEUE.schedule(Instant::from_secs(1), &waker.waker); | ||
| 510 | |||
| 511 | assert!(!waker.awoken.get()); | ||
| 512 | assert_eq!(queue_len(), 1); | ||
| 513 | } | ||
| 514 | |||
| 515 | #[test] | ||
| 516 | #[serial] | ||
| 517 | fn test_schedule_same() { | ||
| 518 | setup(); | ||
| 519 | |||
| 520 | let waker = TestWaker::new(); | ||
| 521 | |||
| 522 | QUEUE.schedule(Instant::from_secs(1), &waker.waker); | ||
| 523 | |||
| 524 | assert_eq!(queue_len(), 1); | ||
| 525 | |||
| 526 | QUEUE.schedule(Instant::from_secs(1), &waker.waker); | ||
| 527 | |||
| 528 | assert_eq!(queue_len(), 1); | ||
| 529 | |||
| 530 | QUEUE.schedule(Instant::from_secs(100), &waker.waker); | ||
| 531 | |||
| 532 | assert_eq!(queue_len(), 1); | ||
| 533 | |||
| 534 | let waker2 = TestWaker::new(); | ||
| 535 | |||
| 536 | QUEUE.schedule(Instant::from_secs(100), &waker2.waker); | ||
| 537 | |||
| 538 | assert_eq!(queue_len(), 2); | ||
| 539 | } | ||
| 540 | |||
| 541 | #[test] | ||
| 542 | #[serial] | ||
| 543 | fn test_trigger() { | ||
| 544 | setup(); | ||
| 545 | |||
| 546 | let waker = TestWaker::new(); | ||
| 547 | |||
| 548 | QUEUE.schedule(Instant::from_secs(100), &waker.waker); | ||
| 549 | |||
| 550 | assert!(!waker.awoken.get()); | ||
| 551 | |||
| 552 | DRIVER.set_now(Instant::from_secs(99).as_ticks()); | ||
| 553 | |||
| 554 | assert!(!waker.awoken.get()); | ||
| 555 | |||
| 556 | assert_eq!(queue_len(), 1); | ||
| 557 | |||
| 558 | DRIVER.set_now(Instant::from_secs(100).as_ticks()); | ||
| 559 | |||
| 560 | assert!(waker.awoken.get()); | ||
| 561 | |||
| 562 | assert_eq!(queue_len(), 0); | ||
| 563 | } | ||
| 564 | |||
| 565 | #[test] | ||
| 566 | #[serial] | ||
| 567 | fn test_immediate_trigger() { | ||
| 568 | setup(); | ||
| 569 | |||
| 570 | let waker = TestWaker::new(); | ||
| 571 | |||
| 572 | QUEUE.schedule(Instant::from_secs(100), &waker.waker); | ||
| 573 | |||
| 574 | DRIVER.set_now(Instant::from_secs(50).as_ticks()); | ||
| 575 | |||
| 576 | let waker2 = TestWaker::new(); | ||
| 577 | |||
| 578 | QUEUE.schedule(Instant::from_secs(40), &waker2.waker); | ||
| 579 | |||
| 580 | assert!(!waker.awoken.get()); | ||
| 581 | assert!(waker2.awoken.get()); | ||
| 582 | assert_eq!(queue_len(), 1); | ||
| 583 | } | ||
| 584 | |||
| 585 | #[test] | ||
| 586 | #[serial] | ||
| 587 | fn test_queue_overflow() { | ||
| 588 | setup(); | ||
| 589 | |||
| 590 | for i in 1..QUEUE_MAX_LEN { | ||
| 591 | let waker = TestWaker::new(); | ||
| 592 | |||
| 593 | QUEUE.schedule(Instant::from_secs(310), &waker.waker); | ||
| 594 | |||
| 595 | assert_eq!(queue_len(), i); | ||
| 596 | assert!(!waker.awoken.get()); | ||
| 597 | } | ||
| 598 | |||
| 599 | let first_waker = TestWaker::new(); | ||
| 600 | |||
| 601 | QUEUE.schedule(Instant::from_secs(300), &first_waker.waker); | ||
| 602 | |||
| 603 | assert_eq!(queue_len(), QUEUE_MAX_LEN); | ||
| 604 | assert!(!first_waker.awoken.get()); | ||
| 605 | |||
| 606 | let second_waker = TestWaker::new(); | ||
| 607 | |||
| 608 | QUEUE.schedule(Instant::from_secs(305), &second_waker.waker); | ||
| 609 | |||
| 610 | assert_eq!(queue_len(), QUEUE_MAX_LEN); | ||
| 611 | assert!(first_waker.awoken.get()); | ||
| 612 | |||
| 613 | QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker); | ||
| 614 | assert_eq!(queue_len(), QUEUE_MAX_LEN); | ||
| 615 | assert!(second_waker.awoken.get()); | ||
| 616 | } | ||
| 617 | } | ||
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs new file mode 100644 index 000000000..1c4e5398b --- /dev/null +++ b/embassy-time/src/queue_generic.rs | |||
| @@ -0,0 +1,474 @@ | |||
| 1 | use core::cell::RefCell; | ||
| 2 | use core::cmp::Ordering; | ||
| 3 | use core::task::Waker; | ||
| 4 | |||
| 5 | use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; | ||
| 6 | use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 7 | use embassy_sync::blocking_mutex::Mutex; | ||
| 8 | use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; | ||
| 9 | |||
| 10 | use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; | ||
| 11 | use crate::queue::TimerQueue; | ||
| 12 | use crate::Instant; | ||
| 13 | |||
| 14 | #[cfg(feature = "generic-queue-8")] | ||
| 15 | const QUEUE_SIZE: usize = 8; | ||
| 16 | #[cfg(feature = "generic-queue-16")] | ||
| 17 | const QUEUE_SIZE: usize = 16; | ||
| 18 | #[cfg(feature = "generic-queue-32")] | ||
| 19 | const QUEUE_SIZE: usize = 32; | ||
| 20 | #[cfg(feature = "generic-queue-64")] | ||
| 21 | const QUEUE_SIZE: usize = 32; | ||
| 22 | #[cfg(feature = "generic-queue-128")] | ||
| 23 | const QUEUE_SIZE: usize = 128; | ||
| 24 | #[cfg(not(any( | ||
| 25 | feature = "generic-queue-8", | ||
| 26 | feature = "generic-queue-16", | ||
| 27 | feature = "generic-queue-32", | ||
| 28 | feature = "generic-queue-64", | ||
| 29 | feature = "generic-queue-128" | ||
| 30 | )))] | ||
| 31 | const QUEUE_SIZE: usize = 64; | ||
| 32 | |||
| 33 | #[derive(Debug)] | ||
| 34 | struct Timer { | ||
| 35 | at: Instant, | ||
| 36 | waker: Waker, | ||
| 37 | } | ||
| 38 | |||
| 39 | impl PartialEq for Timer { | ||
| 40 | fn eq(&self, other: &Self) -> bool { | ||
| 41 | self.at == other.at | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | impl Eq for Timer {} | ||
| 46 | |||
| 47 | impl PartialOrd for Timer { | ||
| 48 | fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||
| 49 | self.at.partial_cmp(&other.at) | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | impl Ord for Timer { | ||
| 54 | fn cmp(&self, other: &Self) -> Ordering { | ||
| 55 | self.at.cmp(&other.at) | ||
| 56 | } | ||
| 57 | } | ||
| 58 | |||
| 59 | struct InnerQueue { | ||
| 60 | queue: SortedLinkedList<Timer, LinkedIndexU8, Min, { QUEUE_SIZE }>, | ||
| 61 | alarm: Option<AlarmHandle>, | ||
| 62 | alarm_at: Instant, | ||
| 63 | } | ||
| 64 | |||
| 65 | impl InnerQueue { | ||
| 66 | const fn new() -> Self { | ||
| 67 | Self { | ||
| 68 | queue: SortedLinkedList::new_u8(), | ||
| 69 | alarm: None, | ||
| 70 | alarm_at: Instant::MAX, | ||
| 71 | } | ||
| 72 | } | ||
| 73 | |||
| 74 | fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { | ||
| 75 | self.queue | ||
| 76 | .find_mut(|timer| timer.waker.will_wake(waker)) | ||
| 77 | .map(|mut timer| { | ||
| 78 | timer.at = at; | ||
| 79 | timer.finish(); | ||
| 80 | }) | ||
| 81 | .unwrap_or_else(|| { | ||
| 82 | let mut timer = Timer { | ||
| 83 | waker: waker.clone(), | ||
| 84 | at, | ||
| 85 | }; | ||
| 86 | |||
| 87 | loop { | ||
| 88 | match self.queue.push(timer) { | ||
| 89 | Ok(()) => break, | ||
| 90 | Err(e) => timer = e, | ||
| 91 | } | ||
| 92 | |||
| 93 | self.queue.pop().unwrap().waker.wake(); | ||
| 94 | } | ||
| 95 | }); | ||
| 96 | |||
| 97 | // Don't wait for the alarm callback to trigger and directly | ||
| 98 | // dispatch all timers that are already due | ||
| 99 | // | ||
| 100 | // Then update the alarm if necessary | ||
| 101 | self.dispatch(alarm_schedule); | ||
| 102 | } | ||
| 103 | |||
| 104 | fn dispatch(&mut self, alarm_schedule: &AtomicU64) { | ||
| 105 | let now = Instant::now(); | ||
| 106 | |||
| 107 | while self.queue.peek().filter(|timer| timer.at <= now).is_some() { | ||
| 108 | self.queue.pop().unwrap().waker.wake(); | ||
| 109 | } | ||
| 110 | |||
| 111 | self.update_alarm(alarm_schedule); | ||
| 112 | } | ||
| 113 | |||
| 114 | fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { | ||
| 115 | if let Some(timer) = self.queue.peek() { | ||
| 116 | let new_at = timer.at; | ||
| 117 | |||
| 118 | if self.alarm_at != new_at { | ||
| 119 | self.alarm_at = new_at; | ||
| 120 | alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); | ||
| 121 | } | ||
| 122 | } else { | ||
| 123 | self.alarm_at = Instant::MAX; | ||
| 124 | alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); | ||
| 125 | } | ||
| 126 | } | ||
| 127 | |||
| 128 | fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { | ||
| 129 | self.alarm_at = Instant::MAX; | ||
| 130 | |||
| 131 | self.dispatch(alarm_schedule); | ||
| 132 | } | ||
| 133 | } | ||
| 134 | |||
| 135 | struct Queue { | ||
| 136 | inner: Mutex<CriticalSectionRawMutex, RefCell<InnerQueue>>, | ||
| 137 | alarm_schedule: AtomicU64, | ||
| 138 | } | ||
| 139 | |||
| 140 | impl Queue { | ||
| 141 | const fn new() -> Self { | ||
| 142 | Self { | ||
| 143 | inner: Mutex::new(RefCell::new(InnerQueue::new())), | ||
| 144 | alarm_schedule: AtomicU64::new(u64::MAX), | ||
| 145 | } | ||
| 146 | } | ||
| 147 | |||
| 148 | fn schedule_wake(&'static self, at: Instant, waker: &Waker) { | ||
| 149 | self.inner.lock(|inner| { | ||
| 150 | let mut inner = inner.borrow_mut(); | ||
| 151 | |||
| 152 | if inner.alarm.is_none() { | ||
| 153 | let handle = unsafe { allocate_alarm() }.unwrap(); | ||
| 154 | inner.alarm = Some(handle); | ||
| 155 | |||
| 156 | set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); | ||
| 157 | } | ||
| 158 | |||
| 159 | inner.schedule_wake(at, waker, &self.alarm_schedule) | ||
| 160 | }); | ||
| 161 | |||
| 162 | self.update_alarm(); | ||
| 163 | } | ||
| 164 | |||
| 165 | fn update_alarm(&self) { | ||
| 166 | // Need to set the alarm when we are *not* holding the mutex on the inner queue | ||
| 167 | // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately | ||
| 168 | // call us back if the timestamp is in the past. | ||
| 169 | let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); | ||
| 170 | |||
| 171 | if alarm_at < u64::MAX { | ||
| 172 | set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at); | ||
| 173 | } | ||
| 174 | } | ||
| 175 | |||
| 176 | fn handle_alarm(&self) { | ||
| 177 | self.inner | ||
| 178 | .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); | ||
| 179 | |||
| 180 | self.update_alarm(); | ||
| 181 | } | ||
| 182 | |||
| 183 | fn handle_alarm_callback(ctx: *mut ()) { | ||
| 184 | unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); | ||
| 185 | } | ||
| 186 | } | ||
| 187 | |||
| 188 | impl TimerQueue for Queue { | ||
| 189 | fn schedule_wake(&'static self, at: Instant, waker: &Waker) { | ||
| 190 | Queue::schedule_wake(self, at, waker); | ||
| 191 | } | ||
| 192 | } | ||
| 193 | |||
| 194 | crate::timer_queue_impl!(static QUEUE: Queue = Queue::new()); | ||
| 195 | |||
| 196 | #[cfg(test)] | ||
| 197 | mod tests { | ||
| 198 | use core::cell::Cell; | ||
| 199 | use core::sync::atomic::Ordering; | ||
| 200 | use core::task::{RawWaker, RawWakerVTable, Waker}; | ||
| 201 | use std::rc::Rc; | ||
| 202 | use std::sync::Mutex; | ||
| 203 | |||
| 204 | use serial_test::serial; | ||
| 205 | |||
| 206 | use super::InnerQueue; | ||
| 207 | use crate::driver::{AlarmHandle, Driver}; | ||
| 208 | use crate::queue_generic::QUEUE; | ||
| 209 | use crate::Instant; | ||
| 210 | |||
| 211 | struct InnerTestDriver { | ||
| 212 | now: u64, | ||
| 213 | alarm: u64, | ||
| 214 | callback: fn(*mut ()), | ||
| 215 | ctx: *mut (), | ||
| 216 | } | ||
| 217 | |||
| 218 | impl InnerTestDriver { | ||
| 219 | const fn new() -> Self { | ||
| 220 | Self { | ||
| 221 | now: 0, | ||
| 222 | alarm: u64::MAX, | ||
| 223 | callback: Self::noop, | ||
| 224 | ctx: core::ptr::null_mut(), | ||
| 225 | } | ||
| 226 | } | ||
| 227 | |||
| 228 | fn noop(_ctx: *mut ()) {} | ||
| 229 | } | ||
| 230 | |||
| 231 | unsafe impl Send for InnerTestDriver {} | ||
| 232 | |||
| 233 | struct TestDriver(Mutex<InnerTestDriver>); | ||
| 234 | |||
| 235 | impl TestDriver { | ||
| 236 | const fn new() -> Self { | ||
| 237 | Self(Mutex::new(InnerTestDriver::new())) | ||
| 238 | } | ||
| 239 | |||
| 240 | fn reset(&self) { | ||
| 241 | *self.0.lock().unwrap() = InnerTestDriver::new(); | ||
| 242 | } | ||
| 243 | |||
| 244 | fn set_now(&self, now: u64) { | ||
| 245 | let notify = { | ||
| 246 | let mut inner = self.0.lock().unwrap(); | ||
| 247 | |||
| 248 | if inner.now < now { | ||
| 249 | inner.now = now; | ||
| 250 | |||
| 251 | if inner.alarm <= now { | ||
| 252 | inner.alarm = u64::MAX; | ||
| 253 | |||
| 254 | Some((inner.callback, inner.ctx)) | ||
| 255 | } else { | ||
| 256 | None | ||
| 257 | } | ||
| 258 | } else { | ||
| 259 | panic!("Going back in time?"); | ||
| 260 | } | ||
| 261 | }; | ||
| 262 | |||
| 263 | if let Some((callback, ctx)) = notify { | ||
| 264 | (callback)(ctx); | ||
| 265 | } | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | impl Driver for TestDriver { | ||
| 270 | fn now(&self) -> u64 { | ||
| 271 | self.0.lock().unwrap().now | ||
| 272 | } | ||
| 273 | |||
| 274 | unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { | ||
| 275 | Some(AlarmHandle::new(0)) | ||
| 276 | } | ||
| 277 | |||
| 278 | fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { | ||
| 279 | let mut inner = self.0.lock().unwrap(); | ||
| 280 | |||
| 281 | inner.callback = callback; | ||
| 282 | inner.ctx = ctx; | ||
| 283 | } | ||
| 284 | |||
| 285 | fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { | ||
| 286 | let notify = { | ||
| 287 | let mut inner = self.0.lock().unwrap(); | ||
| 288 | |||
| 289 | if timestamp <= inner.now { | ||
| 290 | Some((inner.callback, inner.ctx)) | ||
| 291 | } else { | ||
| 292 | inner.alarm = timestamp; | ||
| 293 | None | ||
| 294 | } | ||
| 295 | }; | ||
| 296 | |||
| 297 | if let Some((callback, ctx)) = notify { | ||
| 298 | (callback)(ctx); | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
| 302 | |||
| 303 | struct TestWaker { | ||
| 304 | pub awoken: Rc<Cell<bool>>, | ||
| 305 | pub waker: Waker, | ||
| 306 | } | ||
| 307 | |||
| 308 | impl TestWaker { | ||
| 309 | fn new() -> Self { | ||
| 310 | let flag = Rc::new(Cell::new(false)); | ||
| 311 | |||
| 312 | const VTABLE: RawWakerVTable = RawWakerVTable::new( | ||
| 313 | |data: *const ()| { | ||
| 314 | unsafe { | ||
| 315 | Rc::increment_strong_count(data as *const Cell<bool>); | ||
| 316 | } | ||
| 317 | |||
| 318 | RawWaker::new(data as _, &VTABLE) | ||
| 319 | }, | ||
| 320 | |data: *const ()| unsafe { | ||
| 321 | let data = data as *const Cell<bool>; | ||
| 322 | data.as_ref().unwrap().set(true); | ||
| 323 | Rc::decrement_strong_count(data); | ||
| 324 | }, | ||
| 325 | |data: *const ()| unsafe { | ||
| 326 | (data as *const Cell<bool>).as_ref().unwrap().set(true); | ||
| 327 | }, | ||
| 328 | |data: *const ()| unsafe { | ||
| 329 | Rc::decrement_strong_count(data); | ||
| 330 | }, | ||
| 331 | ); | ||
| 332 | |||
| 333 | let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); | ||
| 334 | |||
| 335 | Self { | ||
| 336 | awoken: flag.clone(), | ||
| 337 | waker: unsafe { Waker::from_raw(raw) }, | ||
| 338 | } | ||
| 339 | } | ||
| 340 | } | ||
| 341 | |||
| 342 | crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); | ||
| 343 | |||
| 344 | fn setup() { | ||
| 345 | DRIVER.reset(); | ||
| 346 | |||
| 347 | QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); | ||
| 348 | QUEUE.inner.lock(|inner| { | ||
| 349 | *inner.borrow_mut() = InnerQueue::new(); | ||
| 350 | }); | ||
| 351 | } | ||
| 352 | |||
| 353 | fn queue_len() -> usize { | ||
| 354 | QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) | ||
| 355 | } | ||
| 356 | |||
| 357 | #[test] | ||
| 358 | #[serial] | ||
| 359 | fn test_schedule() { | ||
| 360 | setup(); | ||
| 361 | |||
| 362 | assert_eq!(queue_len(), 0); | ||
| 363 | |||
| 364 | let waker = TestWaker::new(); | ||
| 365 | |||
| 366 | QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); | ||
| 367 | |||
| 368 | assert!(!waker.awoken.get()); | ||
| 369 | assert_eq!(queue_len(), 1); | ||
| 370 | } | ||
| 371 | |||
| 372 | #[test] | ||
| 373 | #[serial] | ||
| 374 | fn test_schedule_same() { | ||
| 375 | setup(); | ||
| 376 | |||
| 377 | let waker = TestWaker::new(); | ||
| 378 | |||
| 379 | QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); | ||
| 380 | |||
| 381 | assert_eq!(queue_len(), 1); | ||
| 382 | |||
| 383 | QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); | ||
| 384 | |||
| 385 | assert_eq!(queue_len(), 1); | ||
| 386 | |||
| 387 | QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); | ||
| 388 | |||
| 389 | assert_eq!(queue_len(), 1); | ||
| 390 | |||
| 391 | let waker2 = TestWaker::new(); | ||
| 392 | |||
| 393 | QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker); | ||
| 394 | |||
| 395 | assert_eq!(queue_len(), 2); | ||
| 396 | } | ||
| 397 | |||
| 398 | #[test] | ||
| 399 | #[serial] | ||
| 400 | fn test_trigger() { | ||
| 401 | setup(); | ||
| 402 | |||
| 403 | let waker = TestWaker::new(); | ||
| 404 | |||
| 405 | QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); | ||
| 406 | |||
| 407 | assert!(!waker.awoken.get()); | ||
| 408 | |||
| 409 | DRIVER.set_now(Instant::from_secs(99).as_ticks()); | ||
| 410 | |||
| 411 | assert!(!waker.awoken.get()); | ||
| 412 | |||
| 413 | assert_eq!(queue_len(), 1); | ||
| 414 | |||
| 415 | DRIVER.set_now(Instant::from_secs(100).as_ticks()); | ||
| 416 | |||
| 417 | assert!(waker.awoken.get()); | ||
| 418 | |||
| 419 | assert_eq!(queue_len(), 0); | ||
| 420 | } | ||
| 421 | |||
| 422 | #[test] | ||
| 423 | #[serial] | ||
| 424 | fn test_immediate_trigger() { | ||
| 425 | setup(); | ||
| 426 | |||
| 427 | let waker = TestWaker::new(); | ||
| 428 | |||
| 429 | QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); | ||
| 430 | |||
| 431 | DRIVER.set_now(Instant::from_secs(50).as_ticks()); | ||
| 432 | |||
| 433 | let waker2 = TestWaker::new(); | ||
| 434 | |||
| 435 | QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker); | ||
| 436 | |||
| 437 | assert!(!waker.awoken.get()); | ||
| 438 | assert!(waker2.awoken.get()); | ||
| 439 | assert_eq!(queue_len(), 1); | ||
| 440 | } | ||
| 441 | |||
| 442 | #[test] | ||
| 443 | #[serial] | ||
| 444 | fn test_queue_overflow() { | ||
| 445 | setup(); | ||
| 446 | |||
| 447 | for i in 1..super::QUEUE_SIZE { | ||
| 448 | let waker = TestWaker::new(); | ||
| 449 | |||
| 450 | QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker); | ||
| 451 | |||
| 452 | assert_eq!(queue_len(), i); | ||
| 453 | assert!(!waker.awoken.get()); | ||
| 454 | } | ||
| 455 | |||
| 456 | let first_waker = TestWaker::new(); | ||
| 457 | |||
| 458 | QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker); | ||
| 459 | |||
| 460 | assert_eq!(queue_len(), super::QUEUE_SIZE); | ||
| 461 | assert!(!first_waker.awoken.get()); | ||
| 462 | |||
| 463 | let second_waker = TestWaker::new(); | ||
| 464 | |||
| 465 | QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker); | ||
| 466 | |||
| 467 | assert_eq!(queue_len(), super::QUEUE_SIZE); | ||
| 468 | assert!(first_waker.awoken.get()); | ||
| 469 | |||
| 470 | QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker); | ||
| 471 | assert_eq!(queue_len(), super::QUEUE_SIZE); | ||
| 472 | assert!(second_waker.awoken.get()); | ||
| 473 | } | ||
| 474 | } | ||
