aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src/queue_generic.rs
diff options
context:
space:
mode:
authorivmarkov <[email protected]>2022-09-26 13:46:15 +0300
committerivmarkov <[email protected]>2022-10-24 08:21:35 +0300
commit53608a87ac4b6c8c60b5508551d12f5ba76ca2f6 (patch)
treed560bbbae7cff760a2ae00470eb5bfd3d1a1f6c9 /embassy-time/src/queue_generic.rs
parentba6e452cc5d6c33029f34d7cfb5cd5ea846979bd (diff)
Address feedback after code review
Diffstat (limited to 'embassy-time/src/queue_generic.rs')
-rw-r--r--embassy-time/src/queue_generic.rs474
1 files changed, 474 insertions, 0 deletions
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
new file mode 100644
index 000000000..1c4e5398b
--- /dev/null
+++ b/embassy-time/src/queue_generic.rs
@@ -0,0 +1,474 @@
1use core::cell::RefCell;
2use core::cmp::Ordering;
3use core::task::Waker;
4
5use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering};
6use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
7use embassy_sync::blocking_mutex::Mutex;
8use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
9
10use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
11use crate::queue::TimerQueue;
12use crate::Instant;
13
14#[cfg(feature = "generic-queue-8")]
15const QUEUE_SIZE: usize = 8;
16#[cfg(feature = "generic-queue-16")]
17const QUEUE_SIZE: usize = 16;
18#[cfg(feature = "generic-queue-32")]
19const QUEUE_SIZE: usize = 32;
20#[cfg(feature = "generic-queue-64")]
21const QUEUE_SIZE: usize = 32;
22#[cfg(feature = "generic-queue-128")]
23const QUEUE_SIZE: usize = 128;
24#[cfg(not(any(
25 feature = "generic-queue-8",
26 feature = "generic-queue-16",
27 feature = "generic-queue-32",
28 feature = "generic-queue-64",
29 feature = "generic-queue-128"
30)))]
31const QUEUE_SIZE: usize = 64;
32
33#[derive(Debug)]
34struct Timer {
35 at: Instant,
36 waker: Waker,
37}
38
39impl PartialEq for Timer {
40 fn eq(&self, other: &Self) -> bool {
41 self.at == other.at
42 }
43}
44
45impl Eq for Timer {}
46
47impl PartialOrd for Timer {
48 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49 self.at.partial_cmp(&other.at)
50 }
51}
52
53impl Ord for Timer {
54 fn cmp(&self, other: &Self) -> Ordering {
55 self.at.cmp(&other.at)
56 }
57}
58
59struct InnerQueue {
60 queue: SortedLinkedList<Timer, LinkedIndexU8, Min, { QUEUE_SIZE }>,
61 alarm: Option<AlarmHandle>,
62 alarm_at: Instant,
63}
64
65impl InnerQueue {
66 const fn new() -> Self {
67 Self {
68 queue: SortedLinkedList::new_u8(),
69 alarm: None,
70 alarm_at: Instant::MAX,
71 }
72 }
73
74 fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) {
75 self.queue
76 .find_mut(|timer| timer.waker.will_wake(waker))
77 .map(|mut timer| {
78 timer.at = at;
79 timer.finish();
80 })
81 .unwrap_or_else(|| {
82 let mut timer = Timer {
83 waker: waker.clone(),
84 at,
85 };
86
87 loop {
88 match self.queue.push(timer) {
89 Ok(()) => break,
90 Err(e) => timer = e,
91 }
92
93 self.queue.pop().unwrap().waker.wake();
94 }
95 });
96
97 // Don't wait for the alarm callback to trigger and directly
98 // dispatch all timers that are already due
99 //
100 // Then update the alarm if necessary
101 self.dispatch(alarm_schedule);
102 }
103
104 fn dispatch(&mut self, alarm_schedule: &AtomicU64) {
105 let now = Instant::now();
106
107 while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
108 self.queue.pop().unwrap().waker.wake();
109 }
110
111 self.update_alarm(alarm_schedule);
112 }
113
114 fn update_alarm(&mut self, alarm_schedule: &AtomicU64) {
115 if let Some(timer) = self.queue.peek() {
116 let new_at = timer.at;
117
118 if self.alarm_at != new_at {
119 self.alarm_at = new_at;
120 alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst);
121 }
122 } else {
123 self.alarm_at = Instant::MAX;
124 alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst);
125 }
126 }
127
128 fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) {
129 self.alarm_at = Instant::MAX;
130
131 self.dispatch(alarm_schedule);
132 }
133}
134
135struct Queue {
136 inner: Mutex<CriticalSectionRawMutex, RefCell<InnerQueue>>,
137 alarm_schedule: AtomicU64,
138}
139
140impl Queue {
141 const fn new() -> Self {
142 Self {
143 inner: Mutex::new(RefCell::new(InnerQueue::new())),
144 alarm_schedule: AtomicU64::new(u64::MAX),
145 }
146 }
147
148 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
149 self.inner.lock(|inner| {
150 let mut inner = inner.borrow_mut();
151
152 if inner.alarm.is_none() {
153 let handle = unsafe { allocate_alarm() }.unwrap();
154 inner.alarm = Some(handle);
155
156 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
157 }
158
159 inner.schedule_wake(at, waker, &self.alarm_schedule)
160 });
161
162 self.update_alarm();
163 }
164
165 fn update_alarm(&self) {
166 // Need to set the alarm when we are *not* holding the mutex on the inner queue
167 // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately
168 // call us back if the timestamp is in the past.
169 let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst);
170
171 if alarm_at < u64::MAX {
172 set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at);
173 }
174 }
175
176 fn handle_alarm(&self) {
177 self.inner
178 .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule));
179
180 self.update_alarm();
181 }
182
183 fn handle_alarm_callback(ctx: *mut ()) {
184 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
185 }
186}
187
188impl TimerQueue for Queue {
189 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
190 Queue::schedule_wake(self, at, waker);
191 }
192}
193
194crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
195
196#[cfg(test)]
197mod tests {
198 use core::cell::Cell;
199 use core::sync::atomic::Ordering;
200 use core::task::{RawWaker, RawWakerVTable, Waker};
201 use std::rc::Rc;
202 use std::sync::Mutex;
203
204 use serial_test::serial;
205
206 use super::InnerQueue;
207 use crate::driver::{AlarmHandle, Driver};
208 use crate::queue_generic::QUEUE;
209 use crate::Instant;
210
211 struct InnerTestDriver {
212 now: u64,
213 alarm: u64,
214 callback: fn(*mut ()),
215 ctx: *mut (),
216 }
217
218 impl InnerTestDriver {
219 const fn new() -> Self {
220 Self {
221 now: 0,
222 alarm: u64::MAX,
223 callback: Self::noop,
224 ctx: core::ptr::null_mut(),
225 }
226 }
227
228 fn noop(_ctx: *mut ()) {}
229 }
230
231 unsafe impl Send for InnerTestDriver {}
232
233 struct TestDriver(Mutex<InnerTestDriver>);
234
235 impl TestDriver {
236 const fn new() -> Self {
237 Self(Mutex::new(InnerTestDriver::new()))
238 }
239
240 fn reset(&self) {
241 *self.0.lock().unwrap() = InnerTestDriver::new();
242 }
243
244 fn set_now(&self, now: u64) {
245 let notify = {
246 let mut inner = self.0.lock().unwrap();
247
248 if inner.now < now {
249 inner.now = now;
250
251 if inner.alarm <= now {
252 inner.alarm = u64::MAX;
253
254 Some((inner.callback, inner.ctx))
255 } else {
256 None
257 }
258 } else {
259 panic!("Going back in time?");
260 }
261 };
262
263 if let Some((callback, ctx)) = notify {
264 (callback)(ctx);
265 }
266 }
267 }
268
269 impl Driver for TestDriver {
270 fn now(&self) -> u64 {
271 self.0.lock().unwrap().now
272 }
273
274 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
275 Some(AlarmHandle::new(0))
276 }
277
278 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
279 let mut inner = self.0.lock().unwrap();
280
281 inner.callback = callback;
282 inner.ctx = ctx;
283 }
284
285 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) {
286 let notify = {
287 let mut inner = self.0.lock().unwrap();
288
289 if timestamp <= inner.now {
290 Some((inner.callback, inner.ctx))
291 } else {
292 inner.alarm = timestamp;
293 None
294 }
295 };
296
297 if let Some((callback, ctx)) = notify {
298 (callback)(ctx);
299 }
300 }
301 }
302
303 struct TestWaker {
304 pub awoken: Rc<Cell<bool>>,
305 pub waker: Waker,
306 }
307
308 impl TestWaker {
309 fn new() -> Self {
310 let flag = Rc::new(Cell::new(false));
311
312 const VTABLE: RawWakerVTable = RawWakerVTable::new(
313 |data: *const ()| {
314 unsafe {
315 Rc::increment_strong_count(data as *const Cell<bool>);
316 }
317
318 RawWaker::new(data as _, &VTABLE)
319 },
320 |data: *const ()| unsafe {
321 let data = data as *const Cell<bool>;
322 data.as_ref().unwrap().set(true);
323 Rc::decrement_strong_count(data);
324 },
325 |data: *const ()| unsafe {
326 (data as *const Cell<bool>).as_ref().unwrap().set(true);
327 },
328 |data: *const ()| unsafe {
329 Rc::decrement_strong_count(data);
330 },
331 );
332
333 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
334
335 Self {
336 awoken: flag.clone(),
337 waker: unsafe { Waker::from_raw(raw) },
338 }
339 }
340 }
341
342 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
343
344 fn setup() {
345 DRIVER.reset();
346
347 QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst);
348 QUEUE.inner.lock(|inner| {
349 *inner.borrow_mut() = InnerQueue::new();
350 });
351 }
352
353 fn queue_len() -> usize {
354 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
355 }
356
357 #[test]
358 #[serial]
359 fn test_schedule() {
360 setup();
361
362 assert_eq!(queue_len(), 0);
363
364 let waker = TestWaker::new();
365
366 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
367
368 assert!(!waker.awoken.get());
369 assert_eq!(queue_len(), 1);
370 }
371
372 #[test]
373 #[serial]
374 fn test_schedule_same() {
375 setup();
376
377 let waker = TestWaker::new();
378
379 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
380
381 assert_eq!(queue_len(), 1);
382
383 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
384
385 assert_eq!(queue_len(), 1);
386
387 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
388
389 assert_eq!(queue_len(), 1);
390
391 let waker2 = TestWaker::new();
392
393 QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
394
395 assert_eq!(queue_len(), 2);
396 }
397
398 #[test]
399 #[serial]
400 fn test_trigger() {
401 setup();
402
403 let waker = TestWaker::new();
404
405 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
406
407 assert!(!waker.awoken.get());
408
409 DRIVER.set_now(Instant::from_secs(99).as_ticks());
410
411 assert!(!waker.awoken.get());
412
413 assert_eq!(queue_len(), 1);
414
415 DRIVER.set_now(Instant::from_secs(100).as_ticks());
416
417 assert!(waker.awoken.get());
418
419 assert_eq!(queue_len(), 0);
420 }
421
422 #[test]
423 #[serial]
424 fn test_immediate_trigger() {
425 setup();
426
427 let waker = TestWaker::new();
428
429 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
430
431 DRIVER.set_now(Instant::from_secs(50).as_ticks());
432
433 let waker2 = TestWaker::new();
434
435 QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
436
437 assert!(!waker.awoken.get());
438 assert!(waker2.awoken.get());
439 assert_eq!(queue_len(), 1);
440 }
441
442 #[test]
443 #[serial]
444 fn test_queue_overflow() {
445 setup();
446
447 for i in 1..super::QUEUE_SIZE {
448 let waker = TestWaker::new();
449
450 QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
451
452 assert_eq!(queue_len(), i);
453 assert!(!waker.awoken.get());
454 }
455
456 let first_waker = TestWaker::new();
457
458 QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
459
460 assert_eq!(queue_len(), super::QUEUE_SIZE);
461 assert!(!first_waker.awoken.get());
462
463 let second_waker = TestWaker::new();
464
465 QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
466
467 assert_eq!(queue_len(), super::QUEUE_SIZE);
468 assert!(first_waker.awoken.get());
469
470 QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
471 assert_eq!(queue_len(), super::QUEUE_SIZE);
472 assert!(second_waker.awoken.get());
473 }
474}