diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2022-10-26 19:14:12 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-10-26 19:14:12 +0000 |
| commit | e5097a8866c071c8b986757543a723b20b67fa03 (patch) | |
| tree | 91d2c06248d17d9dfc0f22fcdba7f08eb6fd6750 /embassy-time/src/queue_generic.rs | |
| parent | 9b86de770bccfe00ceaa6b88c51bcaba2a57eb03 (diff) | |
| parent | f9da6271cea7035b2c9f27cfe479aa81889168d1 (diff) | |
Merge #959
959: Generic, executor-agnostic queue implementation r=ivmarkov a=ivmarkov
Hopefully relatively well documented.
Implementation relies on a fixed-size `SortedLinkedList` from `heapless`. (By default, for up to 128 timer schedules, but we can lower this number to - say - 64.)
As discussed earlier, on queue overflow, the `WakerRegistration` approach is utilized, whereas the waker that is ordered first in the queue is awoken to make room for the incoming one (which might be the waker that would be awoken after all!). Wakers are compared with `Waker::will_wake`, so the queue should actually not fill up that easily, if at all.
I've left provisions for the user to manually instantiate the queue using a dedicated macro - `generic_queue!` so that users willing to adjust the queue size, or users (like me) who have to use the queue in a complex "on-top-of-RTOS-but-the-timer-driver-calling-back-from-ISR" scenario can customize the mutex that protects the queue.
The one thing I'm not completely happy with is the need to call `{ embassy_time::queue::initialize() }` early on before any futures using embassy-time are polled, which is currently on the shoulders of the user. I'm open to any ideas where we can get rid of this and do it on the first call to `_embassy_time_schedule_wake`, without introducing very complex combinations of critical sections, atomics and whatnot.
Co-authored-by: ivmarkov <[email protected]>
Co-authored-by: Dario Nieuwenhuis <[email protected]>
Diffstat (limited to 'embassy-time/src/queue_generic.rs')
| -rw-r--r-- | embassy-time/src/queue_generic.rs | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs new file mode 100644 index 000000000..20ae7e6cc --- /dev/null +++ b/embassy-time/src/queue_generic.rs | |||
| @@ -0,0 +1,449 @@ | |||
| 1 | use core::cell::RefCell; | ||
| 2 | use core::cmp::{min, Ordering}; | ||
| 3 | use core::task::Waker; | ||
| 4 | |||
| 5 | use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 6 | use embassy_sync::blocking_mutex::Mutex; | ||
| 7 | use heapless::Vec; | ||
| 8 | |||
| 9 | use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; | ||
| 10 | use crate::queue::TimerQueue; | ||
| 11 | use crate::Instant; | ||
| 12 | |||
| 13 | #[cfg(feature = "generic-queue-8")] | ||
| 14 | const QUEUE_SIZE: usize = 8; | ||
| 15 | #[cfg(feature = "generic-queue-16")] | ||
| 16 | const QUEUE_SIZE: usize = 16; | ||
| 17 | #[cfg(feature = "generic-queue-32")] | ||
| 18 | const QUEUE_SIZE: usize = 32; | ||
| 19 | #[cfg(feature = "generic-queue-64")] | ||
| 20 | const QUEUE_SIZE: usize = 32; | ||
| 21 | #[cfg(feature = "generic-queue-128")] | ||
| 22 | const QUEUE_SIZE: usize = 128; | ||
| 23 | #[cfg(not(any( | ||
| 24 | feature = "generic-queue-8", | ||
| 25 | feature = "generic-queue-16", | ||
| 26 | feature = "generic-queue-32", | ||
| 27 | feature = "generic-queue-64", | ||
| 28 | feature = "generic-queue-128" | ||
| 29 | )))] | ||
| 30 | const QUEUE_SIZE: usize = 64; | ||
| 31 | |||
| 32 | #[derive(Debug)] | ||
| 33 | struct Timer { | ||
| 34 | at: Instant, | ||
| 35 | waker: Waker, | ||
| 36 | } | ||
| 37 | |||
| 38 | impl PartialEq for Timer { | ||
| 39 | fn eq(&self, other: &Self) -> bool { | ||
| 40 | self.at == other.at | ||
| 41 | } | ||
| 42 | } | ||
| 43 | |||
| 44 | impl Eq for Timer {} | ||
| 45 | |||
| 46 | impl PartialOrd for Timer { | ||
| 47 | fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||
| 48 | self.at.partial_cmp(&other.at) | ||
| 49 | } | ||
| 50 | } | ||
| 51 | |||
| 52 | impl Ord for Timer { | ||
| 53 | fn cmp(&self, other: &Self) -> Ordering { | ||
| 54 | self.at.cmp(&other.at) | ||
| 55 | } | ||
| 56 | } | ||
| 57 | |||
| 58 | struct InnerQueue { | ||
| 59 | queue: Vec<Timer, QUEUE_SIZE>, | ||
| 60 | alarm: AlarmHandle, | ||
| 61 | } | ||
| 62 | |||
| 63 | impl InnerQueue { | ||
| 64 | fn schedule_wake(&mut self, at: Instant, waker: &Waker) { | ||
| 65 | self.queue | ||
| 66 | .iter_mut() | ||
| 67 | .find(|timer| timer.waker.will_wake(waker)) | ||
| 68 | .map(|mut timer| { | ||
| 69 | timer.at = min(timer.at, at); | ||
| 70 | }) | ||
| 71 | .unwrap_or_else(|| { | ||
| 72 | let mut timer = Timer { | ||
| 73 | waker: waker.clone(), | ||
| 74 | at, | ||
| 75 | }; | ||
| 76 | |||
| 77 | loop { | ||
| 78 | match self.queue.push(timer) { | ||
| 79 | Ok(()) => break, | ||
| 80 | Err(e) => timer = e, | ||
| 81 | } | ||
| 82 | |||
| 83 | self.queue.pop().unwrap().waker.wake(); | ||
| 84 | } | ||
| 85 | }); | ||
| 86 | |||
| 87 | // Don't wait for the alarm callback to trigger and directly | ||
| 88 | // dispatch all timers that are already due | ||
| 89 | // | ||
| 90 | // Then update the alarm if necessary | ||
| 91 | self.dispatch(); | ||
| 92 | } | ||
| 93 | |||
| 94 | fn dispatch(&mut self) { | ||
| 95 | loop { | ||
| 96 | let now = Instant::now(); | ||
| 97 | |||
| 98 | let mut next_alarm = Instant::MAX; | ||
| 99 | |||
| 100 | let mut i = 0; | ||
| 101 | while i < self.queue.len() { | ||
| 102 | let timer = &self.queue[i]; | ||
| 103 | if timer.at <= now { | ||
| 104 | let timer = self.queue.swap_remove(i); | ||
| 105 | timer.waker.wake(); | ||
| 106 | } else { | ||
| 107 | next_alarm = min(next_alarm, timer.at); | ||
| 108 | i += 1; | ||
| 109 | } | ||
| 110 | } | ||
| 111 | |||
| 112 | if self.update_alarm(next_alarm) { | ||
| 113 | break; | ||
| 114 | } | ||
| 115 | } | ||
| 116 | } | ||
| 117 | |||
| 118 | fn update_alarm(&mut self, next_alarm: Instant) -> bool { | ||
| 119 | if next_alarm == Instant::MAX { | ||
| 120 | true | ||
| 121 | } else { | ||
| 122 | set_alarm(self.alarm, next_alarm.as_ticks()) | ||
| 123 | } | ||
| 124 | } | ||
| 125 | |||
| 126 | fn handle_alarm(&mut self) { | ||
| 127 | self.dispatch(); | ||
| 128 | } | ||
| 129 | } | ||
| 130 | |||
| 131 | struct Queue { | ||
| 132 | inner: Mutex<CriticalSectionRawMutex, RefCell<Option<InnerQueue>>>, | ||
| 133 | } | ||
| 134 | |||
| 135 | impl Queue { | ||
| 136 | const fn new() -> Self { | ||
| 137 | Self { | ||
| 138 | inner: Mutex::new(RefCell::new(None)), | ||
| 139 | } | ||
| 140 | } | ||
| 141 | |||
| 142 | fn schedule_wake(&'static self, at: Instant, waker: &Waker) { | ||
| 143 | self.inner.lock(|inner| { | ||
| 144 | let mut inner = inner.borrow_mut(); | ||
| 145 | |||
| 146 | if inner.is_none() {} | ||
| 147 | |||
| 148 | inner | ||
| 149 | .get_or_insert_with(|| { | ||
| 150 | let handle = unsafe { allocate_alarm() }.unwrap(); | ||
| 151 | set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); | ||
| 152 | InnerQueue { | ||
| 153 | queue: Vec::new(), | ||
| 154 | alarm: handle, | ||
| 155 | } | ||
| 156 | }) | ||
| 157 | .schedule_wake(at, waker) | ||
| 158 | }); | ||
| 159 | } | ||
| 160 | |||
| 161 | fn handle_alarm(&self) { | ||
| 162 | self.inner | ||
| 163 | .lock(|inner| inner.borrow_mut().as_mut().unwrap().handle_alarm()); | ||
| 164 | } | ||
| 165 | |||
| 166 | fn handle_alarm_callback(ctx: *mut ()) { | ||
| 167 | unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); | ||
| 168 | } | ||
| 169 | } | ||
| 170 | |||
| 171 | impl TimerQueue for Queue { | ||
| 172 | fn schedule_wake(&'static self, at: Instant, waker: &Waker) { | ||
| 173 | Queue::schedule_wake(self, at, waker); | ||
| 174 | } | ||
| 175 | } | ||
| 176 | |||
| 177 | crate::timer_queue_impl!(static QUEUE: Queue = Queue::new()); | ||
| 178 | |||
| 179 | #[cfg(test)] | ||
| 180 | mod tests { | ||
| 181 | use core::cell::Cell; | ||
| 182 | use core::task::{RawWaker, RawWakerVTable, Waker}; | ||
| 183 | use std::rc::Rc; | ||
| 184 | use std::sync::Mutex; | ||
| 185 | |||
| 186 | use serial_test::serial; | ||
| 187 | |||
| 188 | use super::InnerQueue; | ||
| 189 | use crate::driver::{AlarmHandle, Driver}; | ||
| 190 | use crate::queue_generic::QUEUE; | ||
| 191 | use crate::Instant; | ||
| 192 | |||
| 193 | struct InnerTestDriver { | ||
| 194 | now: u64, | ||
| 195 | alarm: u64, | ||
| 196 | callback: fn(*mut ()), | ||
| 197 | ctx: *mut (), | ||
| 198 | } | ||
| 199 | |||
| 200 | impl InnerTestDriver { | ||
| 201 | const fn new() -> Self { | ||
| 202 | Self { | ||
| 203 | now: 0, | ||
| 204 | alarm: u64::MAX, | ||
| 205 | callback: Self::noop, | ||
| 206 | ctx: core::ptr::null_mut(), | ||
| 207 | } | ||
| 208 | } | ||
| 209 | |||
| 210 | fn noop(_ctx: *mut ()) {} | ||
| 211 | } | ||
| 212 | |||
| 213 | unsafe impl Send for InnerTestDriver {} | ||
| 214 | |||
| 215 | struct TestDriver(Mutex<InnerTestDriver>); | ||
| 216 | |||
| 217 | impl TestDriver { | ||
| 218 | const fn new() -> Self { | ||
| 219 | Self(Mutex::new(InnerTestDriver::new())) | ||
| 220 | } | ||
| 221 | |||
| 222 | fn reset(&self) { | ||
| 223 | *self.0.lock().unwrap() = InnerTestDriver::new(); | ||
| 224 | } | ||
| 225 | |||
| 226 | fn set_now(&self, now: u64) { | ||
| 227 | let notify = { | ||
| 228 | let mut inner = self.0.lock().unwrap(); | ||
| 229 | |||
| 230 | if inner.now < now { | ||
| 231 | inner.now = now; | ||
| 232 | |||
| 233 | if inner.alarm <= now { | ||
| 234 | inner.alarm = u64::MAX; | ||
| 235 | |||
| 236 | Some((inner.callback, inner.ctx)) | ||
| 237 | } else { | ||
| 238 | None | ||
| 239 | } | ||
| 240 | } else { | ||
| 241 | panic!("Going back in time?"); | ||
| 242 | } | ||
| 243 | }; | ||
| 244 | |||
| 245 | if let Some((callback, ctx)) = notify { | ||
| 246 | (callback)(ctx); | ||
| 247 | } | ||
| 248 | } | ||
| 249 | } | ||
| 250 | |||
| 251 | impl Driver for TestDriver { | ||
| 252 | fn now(&self) -> u64 { | ||
| 253 | self.0.lock().unwrap().now | ||
| 254 | } | ||
| 255 | |||
| 256 | unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { | ||
| 257 | Some(AlarmHandle::new(0)) | ||
| 258 | } | ||
| 259 | |||
| 260 | fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { | ||
| 261 | let mut inner = self.0.lock().unwrap(); | ||
| 262 | |||
| 263 | inner.callback = callback; | ||
| 264 | inner.ctx = ctx; | ||
| 265 | } | ||
| 266 | |||
| 267 | fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool { | ||
| 268 | let mut inner = self.0.lock().unwrap(); | ||
| 269 | |||
| 270 | if timestamp <= inner.now { | ||
| 271 | false | ||
| 272 | } else { | ||
| 273 | inner.alarm = timestamp; | ||
| 274 | true | ||
| 275 | } | ||
| 276 | } | ||
| 277 | } | ||
| 278 | |||
| 279 | struct TestWaker { | ||
| 280 | pub awoken: Rc<Cell<bool>>, | ||
| 281 | pub waker: Waker, | ||
| 282 | } | ||
| 283 | |||
| 284 | impl TestWaker { | ||
| 285 | fn new() -> Self { | ||
| 286 | let flag = Rc::new(Cell::new(false)); | ||
| 287 | |||
| 288 | const VTABLE: RawWakerVTable = RawWakerVTable::new( | ||
| 289 | |data: *const ()| { | ||
| 290 | unsafe { | ||
| 291 | Rc::increment_strong_count(data as *const Cell<bool>); | ||
| 292 | } | ||
| 293 | |||
| 294 | RawWaker::new(data as _, &VTABLE) | ||
| 295 | }, | ||
| 296 | |data: *const ()| unsafe { | ||
| 297 | let data = data as *const Cell<bool>; | ||
| 298 | data.as_ref().unwrap().set(true); | ||
| 299 | Rc::decrement_strong_count(data); | ||
| 300 | }, | ||
| 301 | |data: *const ()| unsafe { | ||
| 302 | (data as *const Cell<bool>).as_ref().unwrap().set(true); | ||
| 303 | }, | ||
| 304 | |data: *const ()| unsafe { | ||
| 305 | Rc::decrement_strong_count(data); | ||
| 306 | }, | ||
| 307 | ); | ||
| 308 | |||
| 309 | let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); | ||
| 310 | |||
| 311 | Self { | ||
| 312 | awoken: flag.clone(), | ||
| 313 | waker: unsafe { Waker::from_raw(raw) }, | ||
| 314 | } | ||
| 315 | } | ||
| 316 | } | ||
| 317 | |||
| 318 | crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); | ||
| 319 | |||
| 320 | fn setup() { | ||
| 321 | DRIVER.reset(); | ||
| 322 | |||
| 323 | QUEUE.inner.lock(|inner| { | ||
| 324 | *inner.borrow_mut() = InnerQueue::new(); | ||
| 325 | }); | ||
| 326 | } | ||
| 327 | |||
| 328 | fn queue_len() -> usize { | ||
| 329 | QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) | ||
| 330 | } | ||
| 331 | |||
| 332 | #[test] | ||
| 333 | #[serial] | ||
| 334 | fn test_schedule() { | ||
| 335 | setup(); | ||
| 336 | |||
| 337 | assert_eq!(queue_len(), 0); | ||
| 338 | |||
| 339 | let waker = TestWaker::new(); | ||
| 340 | |||
| 341 | QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); | ||
| 342 | |||
| 343 | assert!(!waker.awoken.get()); | ||
| 344 | assert_eq!(queue_len(), 1); | ||
| 345 | } | ||
| 346 | |||
| 347 | #[test] | ||
| 348 | #[serial] | ||
| 349 | fn test_schedule_same() { | ||
| 350 | setup(); | ||
| 351 | |||
| 352 | let waker = TestWaker::new(); | ||
| 353 | |||
| 354 | QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); | ||
| 355 | |||
| 356 | assert_eq!(queue_len(), 1); | ||
| 357 | |||
| 358 | QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); | ||
| 359 | |||
| 360 | assert_eq!(queue_len(), 1); | ||
| 361 | |||
| 362 | QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); | ||
| 363 | |||
| 364 | assert_eq!(queue_len(), 1); | ||
| 365 | |||
| 366 | let waker2 = TestWaker::new(); | ||
| 367 | |||
| 368 | QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker); | ||
| 369 | |||
| 370 | assert_eq!(queue_len(), 2); | ||
| 371 | } | ||
| 372 | |||
| 373 | #[test] | ||
| 374 | #[serial] | ||
| 375 | fn test_trigger() { | ||
| 376 | setup(); | ||
| 377 | |||
| 378 | let waker = TestWaker::new(); | ||
| 379 | |||
| 380 | QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); | ||
| 381 | |||
| 382 | assert!(!waker.awoken.get()); | ||
| 383 | |||
| 384 | DRIVER.set_now(Instant::from_secs(99).as_ticks()); | ||
| 385 | |||
| 386 | assert!(!waker.awoken.get()); | ||
| 387 | |||
| 388 | assert_eq!(queue_len(), 1); | ||
| 389 | |||
| 390 | DRIVER.set_now(Instant::from_secs(100).as_ticks()); | ||
| 391 | |||
| 392 | assert!(waker.awoken.get()); | ||
| 393 | |||
| 394 | assert_eq!(queue_len(), 0); | ||
| 395 | } | ||
| 396 | |||
| 397 | #[test] | ||
| 398 | #[serial] | ||
| 399 | fn test_immediate_trigger() { | ||
| 400 | setup(); | ||
| 401 | |||
| 402 | let waker = TestWaker::new(); | ||
| 403 | |||
| 404 | QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); | ||
| 405 | |||
| 406 | DRIVER.set_now(Instant::from_secs(50).as_ticks()); | ||
| 407 | |||
| 408 | let waker2 = TestWaker::new(); | ||
| 409 | |||
| 410 | QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker); | ||
| 411 | |||
| 412 | assert!(!waker.awoken.get()); | ||
| 413 | assert!(waker2.awoken.get()); | ||
| 414 | assert_eq!(queue_len(), 1); | ||
| 415 | } | ||
| 416 | |||
| 417 | #[test] | ||
| 418 | #[serial] | ||
| 419 | fn test_queue_overflow() { | ||
| 420 | setup(); | ||
| 421 | |||
| 422 | for i in 1..super::QUEUE_SIZE { | ||
| 423 | let waker = TestWaker::new(); | ||
| 424 | |||
| 425 | QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker); | ||
| 426 | |||
| 427 | assert_eq!(queue_len(), i); | ||
| 428 | assert!(!waker.awoken.get()); | ||
| 429 | } | ||
| 430 | |||
| 431 | let first_waker = TestWaker::new(); | ||
| 432 | |||
| 433 | QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker); | ||
| 434 | |||
| 435 | assert_eq!(queue_len(), super::QUEUE_SIZE); | ||
| 436 | assert!(!first_waker.awoken.get()); | ||
| 437 | |||
| 438 | let second_waker = TestWaker::new(); | ||
| 439 | |||
| 440 | QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker); | ||
| 441 | |||
| 442 | assert_eq!(queue_len(), super::QUEUE_SIZE); | ||
| 443 | assert!(first_waker.awoken.get()); | ||
| 444 | |||
| 445 | QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker); | ||
| 446 | assert_eq!(queue_len(), super::QUEUE_SIZE); | ||
| 447 | assert!(second_waker.awoken.get()); | ||
| 448 | } | ||
| 449 | } | ||
