aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src/queue_generic.rs
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-10-26 19:14:12 +0000
committerGitHub <[email protected]>2022-10-26 19:14:12 +0000
commite5097a8866c071c8b986757543a723b20b67fa03 (patch)
tree91d2c06248d17d9dfc0f22fcdba7f08eb6fd6750 /embassy-time/src/queue_generic.rs
parent9b86de770bccfe00ceaa6b88c51bcaba2a57eb03 (diff)
parentf9da6271cea7035b2c9f27cfe479aa81889168d1 (diff)
Merge #959
959: Generic, executor-agnostic queue implementation r=ivmarkov a=ivmarkov Hopefully relatively well documented. Implementation relies on a fixed-size `SortedLinkedList` from `heapless`. (By default, for up to 128 timer schedules, but we can lower this number to - say - 64.) As discussed earlier, on queue overflow, the `WakerRegistration` approach is utilized, whereas the waker that is ordered first in the queue is awoken to make room for the incoming one (which might be the waker that would be awoken after all!). Wakers are compared with `Waker::will_wake`, so the queue should actually not fill up that easily, if at all. I've left provisions for the user to manually instantiate the queue using a dedicated macro - `generic_queue!` so that users willing to adjust the queue size, or users (like me) who have to use the queue in a complex "on-top-of-RTOS-but-the-timer-driver-calling-back-from-ISR" scenario can customize the mutex that protects the queue. The one thing I'm not completely happy with is the need to call `{ embassy_time::queue::initialize() }` early on before any futures using embassy-time are polled, which is currently on the shoulders of the user. I'm open to any ideas where we can get rid of this and do it on the first call to `_embassy_time_schedule_wake`, without introducing very complex combinations of critical sections, atomics and whatnot. Co-authored-by: ivmarkov <[email protected]> Co-authored-by: Dario Nieuwenhuis <[email protected]>
Diffstat (limited to 'embassy-time/src/queue_generic.rs')
-rw-r--r--embassy-time/src/queue_generic.rs449
1 files changed, 449 insertions, 0 deletions
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
new file mode 100644
index 000000000..20ae7e6cc
--- /dev/null
+++ b/embassy-time/src/queue_generic.rs
@@ -0,0 +1,449 @@
1use core::cell::RefCell;
2use core::cmp::{min, Ordering};
3use core::task::Waker;
4
5use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
6use embassy_sync::blocking_mutex::Mutex;
7use heapless::Vec;
8
9use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
10use crate::queue::TimerQueue;
11use crate::Instant;
12
13#[cfg(feature = "generic-queue-8")]
14const QUEUE_SIZE: usize = 8;
15#[cfg(feature = "generic-queue-16")]
16const QUEUE_SIZE: usize = 16;
17#[cfg(feature = "generic-queue-32")]
18const QUEUE_SIZE: usize = 32;
19#[cfg(feature = "generic-queue-64")]
20const QUEUE_SIZE: usize = 32;
21#[cfg(feature = "generic-queue-128")]
22const QUEUE_SIZE: usize = 128;
23#[cfg(not(any(
24 feature = "generic-queue-8",
25 feature = "generic-queue-16",
26 feature = "generic-queue-32",
27 feature = "generic-queue-64",
28 feature = "generic-queue-128"
29)))]
30const QUEUE_SIZE: usize = 64;
31
32#[derive(Debug)]
33struct Timer {
34 at: Instant,
35 waker: Waker,
36}
37
38impl PartialEq for Timer {
39 fn eq(&self, other: &Self) -> bool {
40 self.at == other.at
41 }
42}
43
44impl Eq for Timer {}
45
46impl PartialOrd for Timer {
47 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
48 self.at.partial_cmp(&other.at)
49 }
50}
51
52impl Ord for Timer {
53 fn cmp(&self, other: &Self) -> Ordering {
54 self.at.cmp(&other.at)
55 }
56}
57
58struct InnerQueue {
59 queue: Vec<Timer, QUEUE_SIZE>,
60 alarm: AlarmHandle,
61}
62
63impl InnerQueue {
64 fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
65 self.queue
66 .iter_mut()
67 .find(|timer| timer.waker.will_wake(waker))
68 .map(|mut timer| {
69 timer.at = min(timer.at, at);
70 })
71 .unwrap_or_else(|| {
72 let mut timer = Timer {
73 waker: waker.clone(),
74 at,
75 };
76
77 loop {
78 match self.queue.push(timer) {
79 Ok(()) => break,
80 Err(e) => timer = e,
81 }
82
83 self.queue.pop().unwrap().waker.wake();
84 }
85 });
86
87 // Don't wait for the alarm callback to trigger and directly
88 // dispatch all timers that are already due
89 //
90 // Then update the alarm if necessary
91 self.dispatch();
92 }
93
94 fn dispatch(&mut self) {
95 loop {
96 let now = Instant::now();
97
98 let mut next_alarm = Instant::MAX;
99
100 let mut i = 0;
101 while i < self.queue.len() {
102 let timer = &self.queue[i];
103 if timer.at <= now {
104 let timer = self.queue.swap_remove(i);
105 timer.waker.wake();
106 } else {
107 next_alarm = min(next_alarm, timer.at);
108 i += 1;
109 }
110 }
111
112 if self.update_alarm(next_alarm) {
113 break;
114 }
115 }
116 }
117
118 fn update_alarm(&mut self, next_alarm: Instant) -> bool {
119 if next_alarm == Instant::MAX {
120 true
121 } else {
122 set_alarm(self.alarm, next_alarm.as_ticks())
123 }
124 }
125
126 fn handle_alarm(&mut self) {
127 self.dispatch();
128 }
129}
130
131struct Queue {
132 inner: Mutex<CriticalSectionRawMutex, RefCell<Option<InnerQueue>>>,
133}
134
135impl Queue {
136 const fn new() -> Self {
137 Self {
138 inner: Mutex::new(RefCell::new(None)),
139 }
140 }
141
142 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
143 self.inner.lock(|inner| {
144 let mut inner = inner.borrow_mut();
145
146 if inner.is_none() {}
147
148 inner
149 .get_or_insert_with(|| {
150 let handle = unsafe { allocate_alarm() }.unwrap();
151 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
152 InnerQueue {
153 queue: Vec::new(),
154 alarm: handle,
155 }
156 })
157 .schedule_wake(at, waker)
158 });
159 }
160
161 fn handle_alarm(&self) {
162 self.inner
163 .lock(|inner| inner.borrow_mut().as_mut().unwrap().handle_alarm());
164 }
165
166 fn handle_alarm_callback(ctx: *mut ()) {
167 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
168 }
169}
170
171impl TimerQueue for Queue {
172 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
173 Queue::schedule_wake(self, at, waker);
174 }
175}
176
177crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
178
179#[cfg(test)]
180mod tests {
181 use core::cell::Cell;
182 use core::task::{RawWaker, RawWakerVTable, Waker};
183 use std::rc::Rc;
184 use std::sync::Mutex;
185
186 use serial_test::serial;
187
188 use super::InnerQueue;
189 use crate::driver::{AlarmHandle, Driver};
190 use crate::queue_generic::QUEUE;
191 use crate::Instant;
192
193 struct InnerTestDriver {
194 now: u64,
195 alarm: u64,
196 callback: fn(*mut ()),
197 ctx: *mut (),
198 }
199
200 impl InnerTestDriver {
201 const fn new() -> Self {
202 Self {
203 now: 0,
204 alarm: u64::MAX,
205 callback: Self::noop,
206 ctx: core::ptr::null_mut(),
207 }
208 }
209
210 fn noop(_ctx: *mut ()) {}
211 }
212
213 unsafe impl Send for InnerTestDriver {}
214
215 struct TestDriver(Mutex<InnerTestDriver>);
216
217 impl TestDriver {
218 const fn new() -> Self {
219 Self(Mutex::new(InnerTestDriver::new()))
220 }
221
222 fn reset(&self) {
223 *self.0.lock().unwrap() = InnerTestDriver::new();
224 }
225
226 fn set_now(&self, now: u64) {
227 let notify = {
228 let mut inner = self.0.lock().unwrap();
229
230 if inner.now < now {
231 inner.now = now;
232
233 if inner.alarm <= now {
234 inner.alarm = u64::MAX;
235
236 Some((inner.callback, inner.ctx))
237 } else {
238 None
239 }
240 } else {
241 panic!("Going back in time?");
242 }
243 };
244
245 if let Some((callback, ctx)) = notify {
246 (callback)(ctx);
247 }
248 }
249 }
250
251 impl Driver for TestDriver {
252 fn now(&self) -> u64 {
253 self.0.lock().unwrap().now
254 }
255
256 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
257 Some(AlarmHandle::new(0))
258 }
259
260 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
261 let mut inner = self.0.lock().unwrap();
262
263 inner.callback = callback;
264 inner.ctx = ctx;
265 }
266
267 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
268 let mut inner = self.0.lock().unwrap();
269
270 if timestamp <= inner.now {
271 false
272 } else {
273 inner.alarm = timestamp;
274 true
275 }
276 }
277 }
278
279 struct TestWaker {
280 pub awoken: Rc<Cell<bool>>,
281 pub waker: Waker,
282 }
283
284 impl TestWaker {
285 fn new() -> Self {
286 let flag = Rc::new(Cell::new(false));
287
288 const VTABLE: RawWakerVTable = RawWakerVTable::new(
289 |data: *const ()| {
290 unsafe {
291 Rc::increment_strong_count(data as *const Cell<bool>);
292 }
293
294 RawWaker::new(data as _, &VTABLE)
295 },
296 |data: *const ()| unsafe {
297 let data = data as *const Cell<bool>;
298 data.as_ref().unwrap().set(true);
299 Rc::decrement_strong_count(data);
300 },
301 |data: *const ()| unsafe {
302 (data as *const Cell<bool>).as_ref().unwrap().set(true);
303 },
304 |data: *const ()| unsafe {
305 Rc::decrement_strong_count(data);
306 },
307 );
308
309 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
310
311 Self {
312 awoken: flag.clone(),
313 waker: unsafe { Waker::from_raw(raw) },
314 }
315 }
316 }
317
318 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
319
320 fn setup() {
321 DRIVER.reset();
322
323 QUEUE.inner.lock(|inner| {
324 *inner.borrow_mut() = InnerQueue::new();
325 });
326 }
327
328 fn queue_len() -> usize {
329 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
330 }
331
332 #[test]
333 #[serial]
334 fn test_schedule() {
335 setup();
336
337 assert_eq!(queue_len(), 0);
338
339 let waker = TestWaker::new();
340
341 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
342
343 assert!(!waker.awoken.get());
344 assert_eq!(queue_len(), 1);
345 }
346
347 #[test]
348 #[serial]
349 fn test_schedule_same() {
350 setup();
351
352 let waker = TestWaker::new();
353
354 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
355
356 assert_eq!(queue_len(), 1);
357
358 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
359
360 assert_eq!(queue_len(), 1);
361
362 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
363
364 assert_eq!(queue_len(), 1);
365
366 let waker2 = TestWaker::new();
367
368 QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
369
370 assert_eq!(queue_len(), 2);
371 }
372
373 #[test]
374 #[serial]
375 fn test_trigger() {
376 setup();
377
378 let waker = TestWaker::new();
379
380 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
381
382 assert!(!waker.awoken.get());
383
384 DRIVER.set_now(Instant::from_secs(99).as_ticks());
385
386 assert!(!waker.awoken.get());
387
388 assert_eq!(queue_len(), 1);
389
390 DRIVER.set_now(Instant::from_secs(100).as_ticks());
391
392 assert!(waker.awoken.get());
393
394 assert_eq!(queue_len(), 0);
395 }
396
397 #[test]
398 #[serial]
399 fn test_immediate_trigger() {
400 setup();
401
402 let waker = TestWaker::new();
403
404 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
405
406 DRIVER.set_now(Instant::from_secs(50).as_ticks());
407
408 let waker2 = TestWaker::new();
409
410 QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
411
412 assert!(!waker.awoken.get());
413 assert!(waker2.awoken.get());
414 assert_eq!(queue_len(), 1);
415 }
416
417 #[test]
418 #[serial]
419 fn test_queue_overflow() {
420 setup();
421
422 for i in 1..super::QUEUE_SIZE {
423 let waker = TestWaker::new();
424
425 QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
426
427 assert_eq!(queue_len(), i);
428 assert!(!waker.awoken.get());
429 }
430
431 let first_waker = TestWaker::new();
432
433 QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
434
435 assert_eq!(queue_len(), super::QUEUE_SIZE);
436 assert!(!first_waker.awoken.get());
437
438 let second_waker = TestWaker::new();
439
440 QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
441
442 assert_eq!(queue_len(), super::QUEUE_SIZE);
443 assert!(first_waker.awoken.get());
444
445 QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
446 assert_eq!(queue_len(), super::QUEUE_SIZE);
447 assert!(second_waker.awoken.get());
448 }
449}