aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src/queue_generic.rs
diff options
context:
space:
mode:
authorMathias <[email protected]>2022-10-27 07:12:34 +0200
committerMathias <[email protected]>2022-10-27 07:12:34 +0200
commitc871fe0848e50c8682b8a8d9fe8da31ca9185592 (patch)
treee24f9edc23385080968d4b427aed3eddb4bd2f21 /embassy-time/src/queue_generic.rs
parent3c6c382465131c6f76567f976198b77e327df4b2 (diff)
parent61560e740dea1b4c7ca036dafd66c834a1ff92e2 (diff)
Rebase on master
Diffstat (limited to 'embassy-time/src/queue_generic.rs')
-rw-r--r--embassy-time/src/queue_generic.rs449
1 files changed, 449 insertions, 0 deletions
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
new file mode 100644
index 000000000..20ae7e6cc
--- /dev/null
+++ b/embassy-time/src/queue_generic.rs
@@ -0,0 +1,449 @@
1use core::cell::RefCell;
2use core::cmp::{min, Ordering};
3use core::task::Waker;
4
5use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
6use embassy_sync::blocking_mutex::Mutex;
7use heapless::Vec;
8
9use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
10use crate::queue::TimerQueue;
11use crate::Instant;
12
13#[cfg(feature = "generic-queue-8")]
14const QUEUE_SIZE: usize = 8;
15#[cfg(feature = "generic-queue-16")]
16const QUEUE_SIZE: usize = 16;
17#[cfg(feature = "generic-queue-32")]
18const QUEUE_SIZE: usize = 32;
19#[cfg(feature = "generic-queue-64")]
20const QUEUE_SIZE: usize = 32;
21#[cfg(feature = "generic-queue-128")]
22const QUEUE_SIZE: usize = 128;
23#[cfg(not(any(
24 feature = "generic-queue-8",
25 feature = "generic-queue-16",
26 feature = "generic-queue-32",
27 feature = "generic-queue-64",
28 feature = "generic-queue-128"
29)))]
30const QUEUE_SIZE: usize = 64;
31
32#[derive(Debug)]
33struct Timer {
34 at: Instant,
35 waker: Waker,
36}
37
38impl PartialEq for Timer {
39 fn eq(&self, other: &Self) -> bool {
40 self.at == other.at
41 }
42}
43
44impl Eq for Timer {}
45
46impl PartialOrd for Timer {
47 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
48 self.at.partial_cmp(&other.at)
49 }
50}
51
52impl Ord for Timer {
53 fn cmp(&self, other: &Self) -> Ordering {
54 self.at.cmp(&other.at)
55 }
56}
57
58struct InnerQueue {
59 queue: Vec<Timer, QUEUE_SIZE>,
60 alarm: AlarmHandle,
61}
62
63impl InnerQueue {
64 fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
65 self.queue
66 .iter_mut()
67 .find(|timer| timer.waker.will_wake(waker))
68 .map(|mut timer| {
69 timer.at = min(timer.at, at);
70 })
71 .unwrap_or_else(|| {
72 let mut timer = Timer {
73 waker: waker.clone(),
74 at,
75 };
76
77 loop {
78 match self.queue.push(timer) {
79 Ok(()) => break,
80 Err(e) => timer = e,
81 }
82
83 self.queue.pop().unwrap().waker.wake();
84 }
85 });
86
87 // Don't wait for the alarm callback to trigger and directly
88 // dispatch all timers that are already due
89 //
90 // Then update the alarm if necessary
91 self.dispatch();
92 }
93
94 fn dispatch(&mut self) {
95 loop {
96 let now = Instant::now();
97
98 let mut next_alarm = Instant::MAX;
99
100 let mut i = 0;
101 while i < self.queue.len() {
102 let timer = &self.queue[i];
103 if timer.at <= now {
104 let timer = self.queue.swap_remove(i);
105 timer.waker.wake();
106 } else {
107 next_alarm = min(next_alarm, timer.at);
108 i += 1;
109 }
110 }
111
112 if self.update_alarm(next_alarm) {
113 break;
114 }
115 }
116 }
117
118 fn update_alarm(&mut self, next_alarm: Instant) -> bool {
119 if next_alarm == Instant::MAX {
120 true
121 } else {
122 set_alarm(self.alarm, next_alarm.as_ticks())
123 }
124 }
125
126 fn handle_alarm(&mut self) {
127 self.dispatch();
128 }
129}
130
131struct Queue {
132 inner: Mutex<CriticalSectionRawMutex, RefCell<Option<InnerQueue>>>,
133}
134
135impl Queue {
136 const fn new() -> Self {
137 Self {
138 inner: Mutex::new(RefCell::new(None)),
139 }
140 }
141
142 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
143 self.inner.lock(|inner| {
144 let mut inner = inner.borrow_mut();
145
146 if inner.is_none() {}
147
148 inner
149 .get_or_insert_with(|| {
150 let handle = unsafe { allocate_alarm() }.unwrap();
151 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
152 InnerQueue {
153 queue: Vec::new(),
154 alarm: handle,
155 }
156 })
157 .schedule_wake(at, waker)
158 });
159 }
160
161 fn handle_alarm(&self) {
162 self.inner
163 .lock(|inner| inner.borrow_mut().as_mut().unwrap().handle_alarm());
164 }
165
166 fn handle_alarm_callback(ctx: *mut ()) {
167 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
168 }
169}
170
171impl TimerQueue for Queue {
172 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
173 Queue::schedule_wake(self, at, waker);
174 }
175}
176
177crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
178
179#[cfg(test)]
180mod tests {
181 use core::cell::Cell;
182 use core::task::{RawWaker, RawWakerVTable, Waker};
183 use std::rc::Rc;
184 use std::sync::Mutex;
185
186 use serial_test::serial;
187
188 use super::InnerQueue;
189 use crate::driver::{AlarmHandle, Driver};
190 use crate::queue_generic::QUEUE;
191 use crate::Instant;
192
193 struct InnerTestDriver {
194 now: u64,
195 alarm: u64,
196 callback: fn(*mut ()),
197 ctx: *mut (),
198 }
199
200 impl InnerTestDriver {
201 const fn new() -> Self {
202 Self {
203 now: 0,
204 alarm: u64::MAX,
205 callback: Self::noop,
206 ctx: core::ptr::null_mut(),
207 }
208 }
209
210 fn noop(_ctx: *mut ()) {}
211 }
212
213 unsafe impl Send for InnerTestDriver {}
214
215 struct TestDriver(Mutex<InnerTestDriver>);
216
217 impl TestDriver {
218 const fn new() -> Self {
219 Self(Mutex::new(InnerTestDriver::new()))
220 }
221
222 fn reset(&self) {
223 *self.0.lock().unwrap() = InnerTestDriver::new();
224 }
225
226 fn set_now(&self, now: u64) {
227 let notify = {
228 let mut inner = self.0.lock().unwrap();
229
230 if inner.now < now {
231 inner.now = now;
232
233 if inner.alarm <= now {
234 inner.alarm = u64::MAX;
235
236 Some((inner.callback, inner.ctx))
237 } else {
238 None
239 }
240 } else {
241 panic!("Going back in time?");
242 }
243 };
244
245 if let Some((callback, ctx)) = notify {
246 (callback)(ctx);
247 }
248 }
249 }
250
251 impl Driver for TestDriver {
252 fn now(&self) -> u64 {
253 self.0.lock().unwrap().now
254 }
255
256 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
257 Some(AlarmHandle::new(0))
258 }
259
260 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
261 let mut inner = self.0.lock().unwrap();
262
263 inner.callback = callback;
264 inner.ctx = ctx;
265 }
266
267 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
268 let mut inner = self.0.lock().unwrap();
269
270 if timestamp <= inner.now {
271 false
272 } else {
273 inner.alarm = timestamp;
274 true
275 }
276 }
277 }
278
279 struct TestWaker {
280 pub awoken: Rc<Cell<bool>>,
281 pub waker: Waker,
282 }
283
284 impl TestWaker {
285 fn new() -> Self {
286 let flag = Rc::new(Cell::new(false));
287
288 const VTABLE: RawWakerVTable = RawWakerVTable::new(
289 |data: *const ()| {
290 unsafe {
291 Rc::increment_strong_count(data as *const Cell<bool>);
292 }
293
294 RawWaker::new(data as _, &VTABLE)
295 },
296 |data: *const ()| unsafe {
297 let data = data as *const Cell<bool>;
298 data.as_ref().unwrap().set(true);
299 Rc::decrement_strong_count(data);
300 },
301 |data: *const ()| unsafe {
302 (data as *const Cell<bool>).as_ref().unwrap().set(true);
303 },
304 |data: *const ()| unsafe {
305 Rc::decrement_strong_count(data);
306 },
307 );
308
309 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
310
311 Self {
312 awoken: flag.clone(),
313 waker: unsafe { Waker::from_raw(raw) },
314 }
315 }
316 }
317
318 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
319
320 fn setup() {
321 DRIVER.reset();
322
323 QUEUE.inner.lock(|inner| {
324 *inner.borrow_mut() = InnerQueue::new();
325 });
326 }
327
328 fn queue_len() -> usize {
329 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
330 }
331
332 #[test]
333 #[serial]
334 fn test_schedule() {
335 setup();
336
337 assert_eq!(queue_len(), 0);
338
339 let waker = TestWaker::new();
340
341 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
342
343 assert!(!waker.awoken.get());
344 assert_eq!(queue_len(), 1);
345 }
346
347 #[test]
348 #[serial]
349 fn test_schedule_same() {
350 setup();
351
352 let waker = TestWaker::new();
353
354 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
355
356 assert_eq!(queue_len(), 1);
357
358 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
359
360 assert_eq!(queue_len(), 1);
361
362 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
363
364 assert_eq!(queue_len(), 1);
365
366 let waker2 = TestWaker::new();
367
368 QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
369
370 assert_eq!(queue_len(), 2);
371 }
372
373 #[test]
374 #[serial]
375 fn test_trigger() {
376 setup();
377
378 let waker = TestWaker::new();
379
380 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
381
382 assert!(!waker.awoken.get());
383
384 DRIVER.set_now(Instant::from_secs(99).as_ticks());
385
386 assert!(!waker.awoken.get());
387
388 assert_eq!(queue_len(), 1);
389
390 DRIVER.set_now(Instant::from_secs(100).as_ticks());
391
392 assert!(waker.awoken.get());
393
394 assert_eq!(queue_len(), 0);
395 }
396
397 #[test]
398 #[serial]
399 fn test_immediate_trigger() {
400 setup();
401
402 let waker = TestWaker::new();
403
404 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
405
406 DRIVER.set_now(Instant::from_secs(50).as_ticks());
407
408 let waker2 = TestWaker::new();
409
410 QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
411
412 assert!(!waker.awoken.get());
413 assert!(waker2.awoken.get());
414 assert_eq!(queue_len(), 1);
415 }
416
417 #[test]
418 #[serial]
419 fn test_queue_overflow() {
420 setup();
421
422 for i in 1..super::QUEUE_SIZE {
423 let waker = TestWaker::new();
424
425 QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
426
427 assert_eq!(queue_len(), i);
428 assert!(!waker.awoken.get());
429 }
430
431 let first_waker = TestWaker::new();
432
433 QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
434
435 assert_eq!(queue_len(), super::QUEUE_SIZE);
436 assert!(!first_waker.awoken.get());
437
438 let second_waker = TestWaker::new();
439
440 QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
441
442 assert_eq!(queue_len(), super::QUEUE_SIZE);
443 assert!(first_waker.awoken.get());
444
445 QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
446 assert_eq!(queue_len(), super::QUEUE_SIZE);
447 assert!(second_waker.awoken.get());
448 }
449}