aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src/queue_generic.rs
diff options
context:
space:
mode:
authorQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
committerQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
commit6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch)
tree748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-time/src/queue_generic.rs
parentd896f80405aa8963877049ed999e4aba25d6e2bb (diff)
parent6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff)
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-time/src/queue_generic.rs')
-rw-r--r--embassy-time/src/queue_generic.rs450
1 files changed, 450 insertions, 0 deletions
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
new file mode 100644
index 000000000..77947ab29
--- /dev/null
+++ b/embassy-time/src/queue_generic.rs
@@ -0,0 +1,450 @@
1use core::cell::RefCell;
2use core::cmp::{min, Ordering};
3use core::task::Waker;
4
5use critical_section::Mutex;
6use heapless::Vec;
7
8use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
9use crate::queue::TimerQueue;
10use crate::Instant;
11
12#[cfg(feature = "generic-queue-8")]
13const QUEUE_SIZE: usize = 8;
14#[cfg(feature = "generic-queue-16")]
15const QUEUE_SIZE: usize = 16;
16#[cfg(feature = "generic-queue-32")]
17const QUEUE_SIZE: usize = 32;
18#[cfg(feature = "generic-queue-64")]
19const QUEUE_SIZE: usize = 64;
20#[cfg(feature = "generic-queue-128")]
21const QUEUE_SIZE: usize = 128;
22#[cfg(not(any(
23 feature = "generic-queue-8",
24 feature = "generic-queue-16",
25 feature = "generic-queue-32",
26 feature = "generic-queue-64",
27 feature = "generic-queue-128"
28)))]
29const QUEUE_SIZE: usize = 64;
30
31#[derive(Debug)]
32struct Timer {
33 at: Instant,
34 waker: Waker,
35}
36
37impl PartialEq for Timer {
38 fn eq(&self, other: &Self) -> bool {
39 self.at == other.at
40 }
41}
42
43impl Eq for Timer {}
44
45impl PartialOrd for Timer {
46 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47 self.at.partial_cmp(&other.at)
48 }
49}
50
51impl Ord for Timer {
52 fn cmp(&self, other: &Self) -> Ordering {
53 self.at.cmp(&other.at)
54 }
55}
56
57struct InnerQueue {
58 queue: Vec<Timer, QUEUE_SIZE>,
59 alarm: AlarmHandle,
60}
61
62impl InnerQueue {
63 fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
64 self.queue
65 .iter_mut()
66 .find(|timer| timer.waker.will_wake(waker))
67 .map(|timer| {
68 timer.at = min(timer.at, at);
69 })
70 .unwrap_or_else(|| {
71 let mut timer = Timer {
72 waker: waker.clone(),
73 at,
74 };
75
76 loop {
77 match self.queue.push(timer) {
78 Ok(()) => break,
79 Err(e) => timer = e,
80 }
81
82 self.queue.pop().unwrap().waker.wake();
83 }
84 });
85
86 // Don't wait for the alarm callback to trigger and directly
87 // dispatch all timers that are already due
88 //
89 // Then update the alarm if necessary
90 self.dispatch();
91 }
92
93 fn dispatch(&mut self) {
94 loop {
95 let now = Instant::now();
96
97 let mut next_alarm = Instant::MAX;
98
99 let mut i = 0;
100 while i < self.queue.len() {
101 let timer = &self.queue[i];
102 if timer.at <= now {
103 let timer = self.queue.swap_remove(i);
104 timer.waker.wake();
105 } else {
106 next_alarm = min(next_alarm, timer.at);
107 i += 1;
108 }
109 }
110
111 if self.update_alarm(next_alarm) {
112 break;
113 }
114 }
115 }
116
117 fn update_alarm(&mut self, next_alarm: Instant) -> bool {
118 if next_alarm == Instant::MAX {
119 true
120 } else {
121 set_alarm(self.alarm, next_alarm.as_ticks())
122 }
123 }
124
125 fn handle_alarm(&mut self) {
126 self.dispatch();
127 }
128}
129
130struct Queue {
131 inner: Mutex<RefCell<Option<InnerQueue>>>,
132}
133
134impl Queue {
135 const fn new() -> Self {
136 Self {
137 inner: Mutex::new(RefCell::new(None)),
138 }
139 }
140
141 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
142 critical_section::with(|cs| {
143 let mut inner = self.inner.borrow_ref_mut(cs);
144
145 if inner.is_none() {}
146
147 inner
148 .get_or_insert_with(|| {
149 let handle = unsafe { allocate_alarm() }.unwrap();
150 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
151 InnerQueue {
152 queue: Vec::new(),
153 alarm: handle,
154 }
155 })
156 .schedule_wake(at, waker)
157 });
158 }
159
160 fn handle_alarm(&self) {
161 critical_section::with(|cs| self.inner.borrow_ref_mut(cs).as_mut().unwrap().handle_alarm())
162 }
163
164 fn handle_alarm_callback(ctx: *mut ()) {
165 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
166 }
167}
168
169impl TimerQueue for Queue {
170 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
171 Queue::schedule_wake(self, at, waker);
172 }
173}
174
175crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
176
177#[cfg(test)]
178mod tests {
179 use core::cell::Cell;
180 use core::task::{RawWaker, RawWakerVTable, Waker};
181 use std::rc::Rc;
182 use std::sync::Mutex;
183
184 use serial_test::serial;
185
186 use crate::driver::{AlarmHandle, Driver};
187 use crate::queue_generic::QUEUE;
188 use crate::Instant;
189
190 struct InnerTestDriver {
191 now: u64,
192 alarm: u64,
193 callback: fn(*mut ()),
194 ctx: *mut (),
195 }
196
197 impl InnerTestDriver {
198 const fn new() -> Self {
199 Self {
200 now: 0,
201 alarm: u64::MAX,
202 callback: Self::noop,
203 ctx: core::ptr::null_mut(),
204 }
205 }
206
207 fn noop(_ctx: *mut ()) {}
208 }
209
210 unsafe impl Send for InnerTestDriver {}
211
212 struct TestDriver(Mutex<InnerTestDriver>);
213
214 impl TestDriver {
215 const fn new() -> Self {
216 Self(Mutex::new(InnerTestDriver::new()))
217 }
218
219 fn reset(&self) {
220 *self.0.lock().unwrap() = InnerTestDriver::new();
221 }
222
223 fn set_now(&self, now: u64) {
224 let notify = {
225 let mut inner = self.0.lock().unwrap();
226
227 if inner.now < now {
228 inner.now = now;
229
230 if inner.alarm <= now {
231 inner.alarm = u64::MAX;
232
233 Some((inner.callback, inner.ctx))
234 } else {
235 None
236 }
237 } else {
238 panic!("Going back in time?");
239 }
240 };
241
242 if let Some((callback, ctx)) = notify {
243 (callback)(ctx);
244 }
245 }
246 }
247
248 impl Driver for TestDriver {
249 fn now(&self) -> u64 {
250 self.0.lock().unwrap().now
251 }
252
253 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
254 Some(AlarmHandle::new(0))
255 }
256
257 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
258 let mut inner = self.0.lock().unwrap();
259
260 inner.callback = callback;
261 inner.ctx = ctx;
262 }
263
264 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
265 let mut inner = self.0.lock().unwrap();
266
267 if timestamp <= inner.now {
268 false
269 } else {
270 inner.alarm = timestamp;
271 true
272 }
273 }
274 }
275
276 struct TestWaker {
277 pub awoken: Rc<Cell<bool>>,
278 pub waker: Waker,
279 }
280
281 impl TestWaker {
282 fn new() -> Self {
283 let flag = Rc::new(Cell::new(false));
284
285 const VTABLE: RawWakerVTable = RawWakerVTable::new(
286 |data: *const ()| {
287 unsafe {
288 Rc::increment_strong_count(data as *const Cell<bool>);
289 }
290
291 RawWaker::new(data as _, &VTABLE)
292 },
293 |data: *const ()| unsafe {
294 let data = data as *const Cell<bool>;
295 data.as_ref().unwrap().set(true);
296 Rc::decrement_strong_count(data);
297 },
298 |data: *const ()| unsafe {
299 (data as *const Cell<bool>).as_ref().unwrap().set(true);
300 },
301 |data: *const ()| unsafe {
302 Rc::decrement_strong_count(data);
303 },
304 );
305
306 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
307
308 Self {
309 awoken: flag.clone(),
310 waker: unsafe { Waker::from_raw(raw) },
311 }
312 }
313 }
314
315 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
316
317 fn setup() {
318 DRIVER.reset();
319 critical_section::with(|cs| *QUEUE.inner.borrow_ref_mut(cs) = None);
320 }
321
322 fn queue_len() -> usize {
323 critical_section::with(|cs| {
324 QUEUE
325 .inner
326 .borrow_ref(cs)
327 .as_ref()
328 .map(|inner| inner.queue.iter().count())
329 .unwrap_or(0)
330 })
331 }
332
333 #[test]
334 #[serial]
335 fn test_schedule() {
336 setup();
337
338 assert_eq!(queue_len(), 0);
339
340 let waker = TestWaker::new();
341
342 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
343
344 assert!(!waker.awoken.get());
345 assert_eq!(queue_len(), 1);
346 }
347
348 #[test]
349 #[serial]
350 fn test_schedule_same() {
351 setup();
352
353 let waker = TestWaker::new();
354
355 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
356
357 assert_eq!(queue_len(), 1);
358
359 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
360
361 assert_eq!(queue_len(), 1);
362
363 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
364
365 assert_eq!(queue_len(), 1);
366
367 let waker2 = TestWaker::new();
368
369 QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
370
371 assert_eq!(queue_len(), 2);
372 }
373
374 #[test]
375 #[serial]
376 fn test_trigger() {
377 setup();
378
379 let waker = TestWaker::new();
380
381 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
382
383 assert!(!waker.awoken.get());
384
385 DRIVER.set_now(Instant::from_secs(99).as_ticks());
386
387 assert!(!waker.awoken.get());
388
389 assert_eq!(queue_len(), 1);
390
391 DRIVER.set_now(Instant::from_secs(100).as_ticks());
392
393 assert!(waker.awoken.get());
394
395 assert_eq!(queue_len(), 0);
396 }
397
398 #[test]
399 #[serial]
400 fn test_immediate_trigger() {
401 setup();
402
403 let waker = TestWaker::new();
404
405 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
406
407 DRIVER.set_now(Instant::from_secs(50).as_ticks());
408
409 let waker2 = TestWaker::new();
410
411 QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
412
413 assert!(!waker.awoken.get());
414 assert!(waker2.awoken.get());
415 assert_eq!(queue_len(), 1);
416 }
417
418 #[test]
419 #[serial]
420 fn test_queue_overflow() {
421 setup();
422
423 for i in 1..super::QUEUE_SIZE {
424 let waker = TestWaker::new();
425
426 QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
427
428 assert_eq!(queue_len(), i);
429 assert!(!waker.awoken.get());
430 }
431
432 let first_waker = TestWaker::new();
433
434 QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
435
436 assert_eq!(queue_len(), super::QUEUE_SIZE);
437 assert!(!first_waker.awoken.get());
438
439 let second_waker = TestWaker::new();
440
441 QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
442
443 assert_eq!(queue_len(), super::QUEUE_SIZE);
444 assert!(first_waker.awoken.get());
445
446 QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
447 assert_eq!(queue_len(), super::QUEUE_SIZE);
448 assert!(second_waker.awoken.get());
449 }
450}