aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-10-26 19:14:12 +0000
committerGitHub <[email protected]>2022-10-26 19:14:12 +0000
commite5097a8866c071c8b986757543a723b20b67fa03 (patch)
tree91d2c06248d17d9dfc0f22fcdba7f08eb6fd6750
parent9b86de770bccfe00ceaa6b88c51bcaba2a57eb03 (diff)
parentf9da6271cea7035b2c9f27cfe479aa81889168d1 (diff)
Merge #959
959: Generic, executor-agnostic queue implementation r=ivmarkov a=ivmarkov Hopefully relatively well documented. Implementation relies on a fixed-size `SortedLinkedList` from `heapless`. (By default, for up to 128 timer schedules, but we can lower this number to - say - 64.) As discussed earlier, on queue overflow, the `WakerRegistration` approach is utilized, whereas the waker that is ordered first in the queue is awoken to make room for the incoming one (which might be the waker that would be awoken after all!). Wakers are compared with `Waker::will_wake`, so the queue should actually not fill up that easily, if at all. I've left provisions for the user to manually instantiate the queue using a dedicated macro - `generic_queue!` so that users willing to adjust the queue size, or users (like me) who have to use the queue in a complex "on-top-of-RTOS-but-the-timer-driver-calling-back-from-ISR" scenario can customize the mutex that protects the queue. The one thing I'm not completely happy with is the need to call `{ embassy_time::queue::initialize() }` early on before any futures using embassy-time are polled, which is currently on the shoulders of the user. I'm open to any ideas where we can get rid of this and do it on the first call to `_embassy_time_schedule_wake`, without introducing very complex combinations of critical sections, atomics and whatnot. Co-authored-by: ivmarkov <[email protected]> Co-authored-by: Dario Nieuwenhuis <[email protected]>
-rw-r--r--embassy-executor/src/raw/mod.rs91
-rw-r--r--embassy-nrf/src/time_driver.rs19
-rw-r--r--embassy-rp/src/timer.rs15
-rw-r--r--embassy-stm32/src/time_driver.rs14
-rw-r--r--embassy-time/Cargo.toml23
-rw-r--r--embassy-time/src/driver.rs13
-rw-r--r--embassy-time/src/driver_std.rs4
-rw-r--r--embassy-time/src/driver_wasm.rs14
-rw-r--r--embassy-time/src/lib.rs5
-rw-r--r--embassy-time/src/queue.rs58
-rw-r--r--embassy-time/src/queue_generic.rs449
11 files changed, 640 insertions, 65 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index e1258ebb5..181dabe8e 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -354,46 +354,54 @@ impl Executor {
354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
355 /// no `poll()` already running. 355 /// no `poll()` already running.
356 pub unsafe fn poll(&'static self) { 356 pub unsafe fn poll(&'static self) {
357 #[cfg(feature = "integrated-timers")] 357 loop {
358 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 358 #[cfg(feature = "integrated-timers")]
359 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
359 360
360 self.run_queue.dequeue_all(|p| { 361 self.run_queue.dequeue_all(|p| {
361 let task = p.as_ref(); 362 let task = p.as_ref();
362 363
363 #[cfg(feature = "integrated-timers")] 364 #[cfg(feature = "integrated-timers")]
364 task.expires_at.set(Instant::MAX); 365 task.expires_at.set(Instant::MAX);
365
366 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
367 if state & STATE_SPAWNED == 0 {
368 // If task is not running, ignore it. This can happen in the following scenario:
369 // - Task gets dequeued, poll starts
370 // - While task is being polled, it gets woken. It gets placed in the queue.
371 // - Task poll finishes, returning done=true
372 // - RUNNING bit is cleared, but the task is already in the queue.
373 return;
374 }
375 366
376 #[cfg(feature = "rtos-trace")] 367 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
377 trace::task_exec_begin(p.as_ptr() as u32); 368 if state & STATE_SPAWNED == 0 {
369 // If task is not running, ignore it. This can happen in the following scenario:
370 // - Task gets dequeued, poll starts
371 // - While task is being polled, it gets woken. It gets placed in the queue.
372 // - Task poll finishes, returning done=true
373 // - RUNNING bit is cleared, but the task is already in the queue.
374 return;
375 }
378 376
379 // Run the task 377 #[cfg(feature = "rtos-trace")]
380 task.poll_fn.read()(p as _); 378 trace::task_exec_begin(p.as_ptr() as u32);
381 379
382 #[cfg(feature = "rtos-trace")] 380 // Run the task
383 trace::task_exec_end(); 381 task.poll_fn.read()(p as _);
382
383 #[cfg(feature = "rtos-trace")]
384 trace::task_exec_end();
385
386 // Enqueue or update into timer_queue
387 #[cfg(feature = "integrated-timers")]
388 self.timer_queue.update(p);
389 });
384 390
385 // Enqueue or update into timer_queue
386 #[cfg(feature = "integrated-timers")] 391 #[cfg(feature = "integrated-timers")]
387 self.timer_queue.update(p); 392 {
388 }); 393 // If this is already in the past, set_alarm might return false
394 // In that case do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 if driver::set_alarm(self.alarm, next_expiration.as_ticks()) {
397 break;
398 }
399 }
389 400
390 #[cfg(feature = "integrated-timers")] 401 #[cfg(not(feature = "integrated-timers"))]
391 { 402 {
392 // If this is already in the past, set_alarm will immediately trigger the alarm. 403 break;
393 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, 404 }
394 // so we immediately do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 driver::set_alarm(self.alarm, next_expiration.as_ticks());
397 } 405 }
398 406
399 #[cfg(feature = "rtos-trace")] 407 #[cfg(feature = "rtos-trace")]
@@ -436,14 +444,21 @@ pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
436} 444}
437 445
438#[cfg(feature = "integrated-timers")] 446#[cfg(feature = "integrated-timers")]
439#[no_mangle] 447struct TimerQueue;
440unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { 448
441 let task = waker::task_from_waker(waker); 449#[cfg(feature = "integrated-timers")]
442 let task = task.as_ref(); 450impl embassy_time::queue::TimerQueue for TimerQueue {
443 let expires_at = task.expires_at.get(); 451 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
444 task.expires_at.set(expires_at.min(at)); 452 let task = waker::task_from_waker(waker);
453 let task = unsafe { task.as_ref() };
454 let expires_at = task.expires_at.get();
455 task.expires_at.set(expires_at.min(at));
456 }
445} 457}
446 458
459#[cfg(feature = "integrated-timers")]
460embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
461
447#[cfg(feature = "rtos-trace")] 462#[cfg(feature = "rtos-trace")]
448impl rtos_trace::RtosTraceOSCallbacks for Executor { 463impl rtos_trace::RtosTraceOSCallbacks for Executor {
449 fn task_list() { 464 fn task_list() {
diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs
index c32a44637..bc2c8a3c1 100644
--- a/embassy-nrf/src/time_driver.rs
+++ b/embassy-nrf/src/time_driver.rs
@@ -243,21 +243,24 @@ impl Driver for RtcDriver {
243 }) 243 })
244 } 244 }
245 245
246 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { 246 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
247 critical_section::with(|cs| { 247 critical_section::with(|cs| {
248 let n = alarm.id() as _; 248 let n = alarm.id() as _;
249 let alarm = self.get_alarm(cs, alarm); 249 let alarm = self.get_alarm(cs, alarm);
250 alarm.timestamp.set(timestamp); 250 alarm.timestamp.set(timestamp);
251 251
252 let t = self.now(); 252 let r = rtc();
253 253
254 // If alarm timestamp has passed, trigger it instantly. 254 let t = self.now();
255 if timestamp <= t { 255 if timestamp <= t {
256 self.trigger_alarm(n, cs); 256 // If alarm timestamp has passed the alarm will not fire.
257 return; 257 // Disarm the alarm and return `false` to indicate that.
258 } 258 r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) });
259 259
260 let r = rtc(); 260 alarm.timestamp.set(u64::MAX);
261
262 return false;
263 }
261 264
262 // If it hasn't triggered yet, setup it in the compare channel. 265 // If it hasn't triggered yet, setup it in the compare channel.
263 266
@@ -287,6 +290,8 @@ impl Driver for RtcDriver {
287 // It will be setup later by `next_period`. 290 // It will be setup later by `next_period`.
288 r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) }); 291 r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) });
289 } 292 }
293
294 true
290 }) 295 })
291 } 296 }
292} 297}
diff --git a/embassy-rp/src/timer.rs b/embassy-rp/src/timer.rs
index 5215c0c0f..80efd779f 100644
--- a/embassy-rp/src/timer.rs
+++ b/embassy-rp/src/timer.rs
@@ -68,7 +68,7 @@ impl Driver for TimerDriver {
68 }) 68 })
69 } 69 }
70 70
71 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { 71 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
72 let n = alarm.id() as usize; 72 let n = alarm.id() as usize;
73 critical_section::with(|cs| { 73 critical_section::with(|cs| {
74 let alarm = &self.alarms.borrow(cs)[n]; 74 let alarm = &self.alarms.borrow(cs)[n];
@@ -81,11 +81,16 @@ impl Driver for TimerDriver {
81 unsafe { pac::TIMER.alarm(n).write_value(timestamp as u32) }; 81 unsafe { pac::TIMER.alarm(n).write_value(timestamp as u32) };
82 82
83 let now = self.now(); 83 let now = self.now();
84
85 // If alarm timestamp has passed, trigger it instantly.
86 // This disarms it.
87 if timestamp <= now { 84 if timestamp <= now {
88 self.trigger_alarm(n, cs); 85 // If alarm timestamp has passed the alarm will not fire.
86 // Disarm the alarm and return `false` to indicate that.
87 unsafe { pac::TIMER.armed().write(|w| w.set_armed(1 << n)) }
88
89 alarm.timestamp.set(u64::MAX);
90
91 false
92 } else {
93 true
89 } 94 }
90 }) 95 })
91 } 96 }
diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs
index ed3225c51..8e84570a4 100644
--- a/embassy-stm32/src/time_driver.rs
+++ b/embassy-stm32/src/time_driver.rs
@@ -292,19 +292,23 @@ impl Driver for RtcDriver {
292 }) 292 })
293 } 293 }
294 294
295 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { 295 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
296 critical_section::with(|cs| { 296 critical_section::with(|cs| {
297 let r = T::regs_gp16(); 297 let r = T::regs_gp16();
298 298
299 let n = alarm.id() as _; 299 let n = alarm.id() as usize;
300 let alarm = self.get_alarm(cs, alarm); 300 let alarm = self.get_alarm(cs, alarm);
301 alarm.timestamp.set(timestamp); 301 alarm.timestamp.set(timestamp);
302 302
303 let t = self.now(); 303 let t = self.now();
304 if timestamp <= t { 304 if timestamp <= t {
305 // If alarm timestamp has passed the alarm will not fire.
306 // Disarm the alarm and return `false` to indicate that.
305 unsafe { r.dier().modify(|w| w.set_ccie(n + 1, false)) }; 307 unsafe { r.dier().modify(|w| w.set_ccie(n + 1, false)) };
306 self.trigger_alarm(n, cs); 308
307 return; 309 alarm.timestamp.set(u64::MAX);
310
311 return false;
308 } 312 }
309 313
310 let safe_timestamp = timestamp.max(t + 3); 314 let safe_timestamp = timestamp.max(t + 3);
@@ -317,6 +321,8 @@ impl Driver for RtcDriver {
317 let diff = timestamp - t; 321 let diff = timestamp - t;
318 // NOTE(unsafe) We're in a critical section 322 // NOTE(unsafe) We're in a critical section
319 unsafe { r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)) }; 323 unsafe { r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)) };
324
325 true
320 }) 326 })
321 } 327 }
322} 328}
diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml
index 0fd4645a6..9ed4d57ea 100644
--- a/embassy-time/Cargo.toml
+++ b/embassy-time/Cargo.toml
@@ -26,6 +26,22 @@ unstable-traits = ["embedded-hal-1"]
26# To use this you must have a time driver provided. 26# To use this you must have a time driver provided.
27defmt-timestamp-uptime = ["defmt"] 27defmt-timestamp-uptime = ["defmt"]
28 28
29# Create a global, generic queue that can be used with any executor
30# To use this you must have a time driver provided.
31generic-queue = []
32
33# Set the number of timers for the generic queue.
34#
35# At most 1 `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used.
36#
37# When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the
38# end user to pick.
39generic-queue-8 = ["generic-queue"]
40generic-queue-16 = ["generic-queue"]
41generic-queue-32 = ["generic-queue"]
42generic-queue-64 = ["generic-queue"]
43generic-queue-128 = ["generic-queue"]
44
29# Set the `embassy_time` tick rate. 45# Set the `embassy_time` tick rate.
30# 46#
31# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. 47# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
@@ -111,11 +127,18 @@ embedded-hal-async = { version = "=0.1.0-alpha.3", optional = true}
111 127
112futures-util = { version = "0.3.17", default-features = false } 128futures-util = { version = "0.3.17", default-features = false }
113embassy-macros = { version = "0.1.0", path = "../embassy-macros"} 129embassy-macros = { version = "0.1.0", path = "../embassy-macros"}
130embassy-sync = { version = "0.1", path = "../embassy-sync" }
114atomic-polyfill = "1.0.1" 131atomic-polyfill = "1.0.1"
115critical-section = "1.1" 132critical-section = "1.1"
116cfg-if = "1.0.0" 133cfg-if = "1.0.0"
134heapless = "0.7"
117 135
118# WASM dependencies 136# WASM dependencies
119wasm-bindgen = { version = "0.2.81", optional = true } 137wasm-bindgen = { version = "0.2.81", optional = true }
120js-sys = { version = "0.3", optional = true } 138js-sys = { version = "0.3", optional = true }
121wasm-timer = { version = "0.2.5", optional = true } 139wasm-timer = { version = "0.2.5", optional = true }
140
141[dev-dependencies]
142serial_test = "0.9"
143critical-section = { version = "1.1", features = ["std"] }
144
diff --git a/embassy-time/src/driver.rs b/embassy-time/src/driver.rs
index 79ae14b91..5c2ad3b23 100644
--- a/embassy-time/src/driver.rs
+++ b/embassy-time/src/driver.rs
@@ -105,20 +105,21 @@ pub trait Driver: Send + Sync + 'static {
105 /// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm 105 /// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm
106 /// timestamp, the provided callback function will be called. 106 /// timestamp, the provided callback function will be called.
107 /// 107 ///
108 /// If `timestamp` is already in the past, the alarm callback must be immediately fired. 108 /// The `Driver` implementation should guarantee that the alarm callback is never called synchronously from `set_alarm`.
109 /// In this case, it is allowed (but not mandatory) to call the alarm callback synchronously from `set_alarm`. 109 /// Rather - if `timestamp` is already in the past - `false` should be returned and alarm should not be set,
110 /// or alternatively, the driver should return `true` and arrange to call the alarm callback as soon as possible, but not synchronously.
110 /// 111 ///
111 /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. 112 /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp.
112 /// 113 ///
113 /// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any. 114 /// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any.
114 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64); 115 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool;
115} 116}
116 117
117extern "Rust" { 118extern "Rust" {
118 fn _embassy_time_now() -> u64; 119 fn _embassy_time_now() -> u64;
119 fn _embassy_time_allocate_alarm() -> Option<AlarmHandle>; 120 fn _embassy_time_allocate_alarm() -> Option<AlarmHandle>;
120 fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()); 121 fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ());
121 fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64); 122 fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool;
122} 123}
123 124
124/// See [`Driver::now`] 125/// See [`Driver::now`]
@@ -139,7 +140,7 @@ pub fn set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut (
139} 140}
140 141
141/// See [`Driver::set_alarm`] 142/// See [`Driver::set_alarm`]
142pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) { 143pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool {
143 unsafe { _embassy_time_set_alarm(alarm, timestamp) } 144 unsafe { _embassy_time_set_alarm(alarm, timestamp) }
144} 145}
145 146
@@ -167,7 +168,7 @@ macro_rules! time_driver_impl {
167 } 168 }
168 169
169 #[no_mangle] 170 #[no_mangle]
170 fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) { 171 fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) -> bool {
171 <$t as $crate::driver::Driver>::set_alarm(&$name, alarm, timestamp) 172 <$t as $crate::driver::Driver>::set_alarm(&$name, alarm, timestamp)
172 } 173 }
173 }; 174 };
diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs
index 2ddb2e604..fc7fd1979 100644
--- a/embassy-time/src/driver_std.rs
+++ b/embassy-time/src/driver_std.rs
@@ -127,12 +127,14 @@ impl Driver for TimeDriver {
127 alarm.ctx = ctx; 127 alarm.ctx = ctx;
128 } 128 }
129 129
130 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { 130 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
131 self.init(); 131 self.init();
132 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); 132 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
133 let alarm = &mut alarms[alarm.id() as usize]; 133 let alarm = &mut alarms[alarm.id() as usize];
134 alarm.timestamp = timestamp; 134 alarm.timestamp = timestamp;
135 unsafe { self.signaler.as_ref() }.signal(); 135 unsafe { self.signaler.as_ref() }.signal();
136
137 true
136 } 138 }
137} 139}
138 140
diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs
index e4497e6a2..63d049897 100644
--- a/embassy-time/src/driver_wasm.rs
+++ b/embassy-time/src/driver_wasm.rs
@@ -90,15 +90,23 @@ impl Driver for TimeDriver {
90 })); 90 }));
91 } 91 }
92 92
93 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { 93 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool {
94 self.init(); 94 self.init();
95 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); 95 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
96 let alarm = &mut alarms[alarm.id() as usize]; 96 let alarm = &mut alarms[alarm.id() as usize];
97 let timeout = (timestamp - self.now()) as u32;
98 if let Some(token) = alarm.token { 97 if let Some(token) = alarm.token {
99 clearTimeout(token); 98 clearTimeout(token);
100 } 99 }
101 alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000)); 100
101 let now = self.now();
102 if timestamp <= now {
103 false
104 } else {
105 let timeout = (timestamp - now) as u32;
106 alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000));
107
108 true
109 }
102 } 110 }
103} 111}
104 112
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs
index 4edc883fe..586aa28de 100644
--- a/embassy-time/src/lib.rs
+++ b/embassy-time/src/lib.rs
@@ -1,4 +1,4 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "std", feature = "wasm", test)), no_std)]
2#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] 2#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
3#![doc = include_str!("../README.md")] 3#![doc = include_str!("../README.md")]
4#![allow(clippy::new_without_default)] 4#![allow(clippy::new_without_default)]
@@ -11,6 +11,7 @@ mod delay;
11pub mod driver; 11pub mod driver;
12mod duration; 12mod duration;
13mod instant; 13mod instant;
14pub mod queue;
14mod tick; 15mod tick;
15mod timer; 16mod timer;
16 17
@@ -18,6 +19,8 @@ mod timer;
18mod driver_std; 19mod driver_std;
19#[cfg(feature = "wasm")] 20#[cfg(feature = "wasm")]
20mod driver_wasm; 21mod driver_wasm;
22#[cfg(feature = "generic-queue")]
23mod queue_generic;
21 24
22pub use delay::{block_for, Delay}; 25pub use delay::{block_for, Delay};
23pub use duration::Duration; 26pub use duration::Duration;
diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs
new file mode 100644
index 000000000..c6f8b440a
--- /dev/null
+++ b/embassy-time/src/queue.rs
@@ -0,0 +1,58 @@
1//! Timer queue implementation
2//!
3//! This module defines the interface a timer queue needs to implement to power the `embassy_time` module.
4//!
5//! # Implementing a timer queue
6//!
7//! - Define a struct `MyTimerQueue`
8//! - Implement [`TimerQueue`] for it
9//! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl).
10//!
11//! # Linkage details
12//!
13//! Check the documentation of the [`driver`](crate::driver) module for more information.
14//!
15//! Similarly to driver, if there is none or multiple timer queues in the crate tree, linking will fail.
16//!
17//! # Example
18//!
19//! ```
20//! use core::task::Waker;
21//!
22//! use embassy_time::Instant;
23//! use embassy_time::queue::{TimerQueue};
24//!
25//! struct MyTimerQueue{}; // not public!
26//! embassy_time::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
27//!
28//! impl TimerQueue for MyTimerQueue {
29//! fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
30//! todo!()
31//! }
32//! }
33//! ```
34use core::task::Waker;
35
36use crate::Instant;
37
38/// Timer queue
39pub trait TimerQueue {
40 /// Schedules a waker in the queue to be awoken at moment `at`.
41 /// If this moment is in the past, the waker might be awoken immediately.
42 fn schedule_wake(&'static self, at: Instant, waker: &Waker);
43}
44
45/// Set the TimerQueue implementation.
46///
47/// See the module documentation for an example.
48#[macro_export]
49macro_rules! timer_queue_impl {
50 (static $name:ident: $t: ty = $val:expr) => {
51 static $name: $t = $val;
52
53 #[no_mangle]
54 fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) {
55 <$t as $crate::queue::TimerQueue>::schedule_wake(&$name, at, waker);
56 }
57 };
58}
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
new file mode 100644
index 000000000..20ae7e6cc
--- /dev/null
+++ b/embassy-time/src/queue_generic.rs
@@ -0,0 +1,449 @@
1use core::cell::RefCell;
2use core::cmp::{min, Ordering};
3use core::task::Waker;
4
5use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
6use embassy_sync::blocking_mutex::Mutex;
7use heapless::Vec;
8
9use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
10use crate::queue::TimerQueue;
11use crate::Instant;
12
13#[cfg(feature = "generic-queue-8")]
14const QUEUE_SIZE: usize = 8;
15#[cfg(feature = "generic-queue-16")]
16const QUEUE_SIZE: usize = 16;
17#[cfg(feature = "generic-queue-32")]
18const QUEUE_SIZE: usize = 32;
19#[cfg(feature = "generic-queue-64")]
20const QUEUE_SIZE: usize = 32;
21#[cfg(feature = "generic-queue-128")]
22const QUEUE_SIZE: usize = 128;
23#[cfg(not(any(
24 feature = "generic-queue-8",
25 feature = "generic-queue-16",
26 feature = "generic-queue-32",
27 feature = "generic-queue-64",
28 feature = "generic-queue-128"
29)))]
30const QUEUE_SIZE: usize = 64;
31
32#[derive(Debug)]
33struct Timer {
34 at: Instant,
35 waker: Waker,
36}
37
38impl PartialEq for Timer {
39 fn eq(&self, other: &Self) -> bool {
40 self.at == other.at
41 }
42}
43
44impl Eq for Timer {}
45
46impl PartialOrd for Timer {
47 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
48 self.at.partial_cmp(&other.at)
49 }
50}
51
52impl Ord for Timer {
53 fn cmp(&self, other: &Self) -> Ordering {
54 self.at.cmp(&other.at)
55 }
56}
57
58struct InnerQueue {
59 queue: Vec<Timer, QUEUE_SIZE>,
60 alarm: AlarmHandle,
61}
62
63impl InnerQueue {
64 fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
65 self.queue
66 .iter_mut()
67 .find(|timer| timer.waker.will_wake(waker))
68 .map(|mut timer| {
69 timer.at = min(timer.at, at);
70 })
71 .unwrap_or_else(|| {
72 let mut timer = Timer {
73 waker: waker.clone(),
74 at,
75 };
76
77 loop {
78 match self.queue.push(timer) {
79 Ok(()) => break,
80 Err(e) => timer = e,
81 }
82
83 self.queue.pop().unwrap().waker.wake();
84 }
85 });
86
87 // Don't wait for the alarm callback to trigger and directly
88 // dispatch all timers that are already due
89 //
90 // Then update the alarm if necessary
91 self.dispatch();
92 }
93
94 fn dispatch(&mut self) {
95 loop {
96 let now = Instant::now();
97
98 let mut next_alarm = Instant::MAX;
99
100 let mut i = 0;
101 while i < self.queue.len() {
102 let timer = &self.queue[i];
103 if timer.at <= now {
104 let timer = self.queue.swap_remove(i);
105 timer.waker.wake();
106 } else {
107 next_alarm = min(next_alarm, timer.at);
108 i += 1;
109 }
110 }
111
112 if self.update_alarm(next_alarm) {
113 break;
114 }
115 }
116 }
117
118 fn update_alarm(&mut self, next_alarm: Instant) -> bool {
119 if next_alarm == Instant::MAX {
120 true
121 } else {
122 set_alarm(self.alarm, next_alarm.as_ticks())
123 }
124 }
125
126 fn handle_alarm(&mut self) {
127 self.dispatch();
128 }
129}
130
131struct Queue {
132 inner: Mutex<CriticalSectionRawMutex, RefCell<Option<InnerQueue>>>,
133}
134
135impl Queue {
136 const fn new() -> Self {
137 Self {
138 inner: Mutex::new(RefCell::new(None)),
139 }
140 }
141
142 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
143 self.inner.lock(|inner| {
144 let mut inner = inner.borrow_mut();
145
146 if inner.is_none() {}
147
148 inner
149 .get_or_insert_with(|| {
150 let handle = unsafe { allocate_alarm() }.unwrap();
151 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
152 InnerQueue {
153 queue: Vec::new(),
154 alarm: handle,
155 }
156 })
157 .schedule_wake(at, waker)
158 });
159 }
160
161 fn handle_alarm(&self) {
162 self.inner
163 .lock(|inner| inner.borrow_mut().as_mut().unwrap().handle_alarm());
164 }
165
166 fn handle_alarm_callback(ctx: *mut ()) {
167 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
168 }
169}
170
171impl TimerQueue for Queue {
172 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
173 Queue::schedule_wake(self, at, waker);
174 }
175}
176
177crate::timer_queue_impl!(static QUEUE: Queue = Queue::new());
178
179#[cfg(test)]
180mod tests {
181 use core::cell::Cell;
182 use core::task::{RawWaker, RawWakerVTable, Waker};
183 use std::rc::Rc;
184 use std::sync::Mutex;
185
186 use serial_test::serial;
187
188 use super::InnerQueue;
189 use crate::driver::{AlarmHandle, Driver};
190 use crate::queue_generic::QUEUE;
191 use crate::Instant;
192
193 struct InnerTestDriver {
194 now: u64,
195 alarm: u64,
196 callback: fn(*mut ()),
197 ctx: *mut (),
198 }
199
200 impl InnerTestDriver {
201 const fn new() -> Self {
202 Self {
203 now: 0,
204 alarm: u64::MAX,
205 callback: Self::noop,
206 ctx: core::ptr::null_mut(),
207 }
208 }
209
210 fn noop(_ctx: *mut ()) {}
211 }
212
213 unsafe impl Send for InnerTestDriver {}
214
215 struct TestDriver(Mutex<InnerTestDriver>);
216
217 impl TestDriver {
218 const fn new() -> Self {
219 Self(Mutex::new(InnerTestDriver::new()))
220 }
221
222 fn reset(&self) {
223 *self.0.lock().unwrap() = InnerTestDriver::new();
224 }
225
226 fn set_now(&self, now: u64) {
227 let notify = {
228 let mut inner = self.0.lock().unwrap();
229
230 if inner.now < now {
231 inner.now = now;
232
233 if inner.alarm <= now {
234 inner.alarm = u64::MAX;
235
236 Some((inner.callback, inner.ctx))
237 } else {
238 None
239 }
240 } else {
241 panic!("Going back in time?");
242 }
243 };
244
245 if let Some((callback, ctx)) = notify {
246 (callback)(ctx);
247 }
248 }
249 }
250
251 impl Driver for TestDriver {
252 fn now(&self) -> u64 {
253 self.0.lock().unwrap().now
254 }
255
256 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
257 Some(AlarmHandle::new(0))
258 }
259
260 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
261 let mut inner = self.0.lock().unwrap();
262
263 inner.callback = callback;
264 inner.ctx = ctx;
265 }
266
267 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
268 let mut inner = self.0.lock().unwrap();
269
270 if timestamp <= inner.now {
271 false
272 } else {
273 inner.alarm = timestamp;
274 true
275 }
276 }
277 }
278
279 struct TestWaker {
280 pub awoken: Rc<Cell<bool>>,
281 pub waker: Waker,
282 }
283
284 impl TestWaker {
285 fn new() -> Self {
286 let flag = Rc::new(Cell::new(false));
287
288 const VTABLE: RawWakerVTable = RawWakerVTable::new(
289 |data: *const ()| {
290 unsafe {
291 Rc::increment_strong_count(data as *const Cell<bool>);
292 }
293
294 RawWaker::new(data as _, &VTABLE)
295 },
296 |data: *const ()| unsafe {
297 let data = data as *const Cell<bool>;
298 data.as_ref().unwrap().set(true);
299 Rc::decrement_strong_count(data);
300 },
301 |data: *const ()| unsafe {
302 (data as *const Cell<bool>).as_ref().unwrap().set(true);
303 },
304 |data: *const ()| unsafe {
305 Rc::decrement_strong_count(data);
306 },
307 );
308
309 let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE);
310
311 Self {
312 awoken: flag.clone(),
313 waker: unsafe { Waker::from_raw(raw) },
314 }
315 }
316 }
317
318 crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new());
319
320 fn setup() {
321 DRIVER.reset();
322
323 QUEUE.inner.lock(|inner| {
324 *inner.borrow_mut() = InnerQueue::new();
325 });
326 }
327
328 fn queue_len() -> usize {
329 QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count())
330 }
331
332 #[test]
333 #[serial]
334 fn test_schedule() {
335 setup();
336
337 assert_eq!(queue_len(), 0);
338
339 let waker = TestWaker::new();
340
341 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
342
343 assert!(!waker.awoken.get());
344 assert_eq!(queue_len(), 1);
345 }
346
347 #[test]
348 #[serial]
349 fn test_schedule_same() {
350 setup();
351
352 let waker = TestWaker::new();
353
354 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
355
356 assert_eq!(queue_len(), 1);
357
358 QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker);
359
360 assert_eq!(queue_len(), 1);
361
362 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
363
364 assert_eq!(queue_len(), 1);
365
366 let waker2 = TestWaker::new();
367
368 QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker);
369
370 assert_eq!(queue_len(), 2);
371 }
372
373 #[test]
374 #[serial]
375 fn test_trigger() {
376 setup();
377
378 let waker = TestWaker::new();
379
380 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
381
382 assert!(!waker.awoken.get());
383
384 DRIVER.set_now(Instant::from_secs(99).as_ticks());
385
386 assert!(!waker.awoken.get());
387
388 assert_eq!(queue_len(), 1);
389
390 DRIVER.set_now(Instant::from_secs(100).as_ticks());
391
392 assert!(waker.awoken.get());
393
394 assert_eq!(queue_len(), 0);
395 }
396
397 #[test]
398 #[serial]
399 fn test_immediate_trigger() {
400 setup();
401
402 let waker = TestWaker::new();
403
404 QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker);
405
406 DRIVER.set_now(Instant::from_secs(50).as_ticks());
407
408 let waker2 = TestWaker::new();
409
410 QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker);
411
412 assert!(!waker.awoken.get());
413 assert!(waker2.awoken.get());
414 assert_eq!(queue_len(), 1);
415 }
416
417 #[test]
418 #[serial]
419 fn test_queue_overflow() {
420 setup();
421
422 for i in 1..super::QUEUE_SIZE {
423 let waker = TestWaker::new();
424
425 QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker);
426
427 assert_eq!(queue_len(), i);
428 assert!(!waker.awoken.get());
429 }
430
431 let first_waker = TestWaker::new();
432
433 QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker);
434
435 assert_eq!(queue_len(), super::QUEUE_SIZE);
436 assert!(!first_waker.awoken.get());
437
438 let second_waker = TestWaker::new();
439
440 QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker);
441
442 assert_eq!(queue_len(), super::QUEUE_SIZE);
443 assert!(first_waker.awoken.get());
444
445 QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker);
446 assert_eq!(queue_len(), super::QUEUE_SIZE);
447 assert!(second_waker.awoken.get());
448 }
449}