aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorivmarkov <[email protected]>2022-09-06 21:39:23 +0300
committerivmarkov <[email protected]>2022-10-24 08:20:29 +0300
commitc2404ee8ca6200d9037f096eee3f0eab98711778 (patch)
tree6ad7bd1336d94e9ae3e74a8438808d0db28fdce2
parentce1cba761c2942b7faa27f4098487c6468784729 (diff)
Initial generic timer queue impl
-rw-r--r--embassy-time/Cargo.toml5
-rw-r--r--embassy-time/src/lib.rs1
-rw-r--r--embassy-time/src/queue.rs197
3 files changed, 203 insertions, 0 deletions
diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml
index c51a71d01..0e3391d1f 100644
--- a/embassy-time/Cargo.toml
+++ b/embassy-time/Cargo.toml
@@ -26,6 +26,9 @@ unstable-traits = ["embedded-hal-1"]
26# To use this you must have a time driver provided. 26# To use this you must have a time driver provided.
27defmt-timestamp-uptime = ["defmt"] 27defmt-timestamp-uptime = ["defmt"]
28 28
29# TODO: Doc
30generic-queue = []
31
29# Set the `embassy_time` tick rate. 32# Set the `embassy_time` tick rate.
30# 33#
31# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. 34# At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used.
@@ -111,9 +114,11 @@ embedded-hal-async = { version = "=0.1.0-alpha.2", optional = true}
111 114
112futures-util = { version = "0.3.17", default-features = false } 115futures-util = { version = "0.3.17", default-features = false }
113embassy-macros = { version = "0.1.0", path = "../embassy-macros"} 116embassy-macros = { version = "0.1.0", path = "../embassy-macros"}
117embassy-sync = { version = "0.1", path = "../embassy-sync" }
114atomic-polyfill = "1.0.1" 118atomic-polyfill = "1.0.1"
115critical-section = "1.1" 119critical-section = "1.1"
116cfg-if = "1.0.0" 120cfg-if = "1.0.0"
121heapless = "0.7"
117 122
118# WASM dependencies 123# WASM dependencies
119wasm-bindgen = { version = "0.2.81", optional = true } 124wasm-bindgen = { version = "0.2.81", optional = true }
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs
index 4edc883fe..0457a6571 100644
--- a/embassy-time/src/lib.rs
+++ b/embassy-time/src/lib.rs
@@ -11,6 +11,7 @@ mod delay;
11pub mod driver; 11pub mod driver;
12mod duration; 12mod duration;
13mod instant; 13mod instant;
14pub mod queue;
14mod tick; 15mod tick;
15mod timer; 16mod timer;
16 17
diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs
new file mode 100644
index 000000000..7e84090b1
--- /dev/null
+++ b/embassy-time/src/queue.rs
@@ -0,0 +1,197 @@
1//! Generic timer queue implementation
2use core::cell::RefCell;
3use core::cmp::Ordering;
4use core::task::Waker;
5
6use atomic_polyfill::{AtomicBool, Ordering as AtomicOrdering};
7use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
8use embassy_sync::blocking_mutex::Mutex;
9use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList};
10
11use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
12use crate::Instant;
13
14#[derive(Debug)]
15struct Timer {
16 at: Instant,
17 waker: Waker,
18}
19
20impl PartialEq for Timer {
21 fn eq(&self, other: &Self) -> bool {
22 self.at == other.at
23 }
24}
25
26impl Eq for Timer {}
27
28impl PartialOrd for Timer {
29 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30 self.at.partial_cmp(&other.at)
31 }
32}
33
34impl Ord for Timer {
35 fn cmp(&self, other: &Self) -> Ordering {
36 self.at.cmp(&other.at)
37 }
38}
39
40struct InnerQueue<const N: usize> {
41 queue: SortedLinkedList<Timer, LinkedIndexU8, Min, N>,
42 alarm_at: Instant,
43 alarm: Option<AlarmHandle>,
44}
45
46impl<const N: usize> InnerQueue<N> {
47 const fn new() -> Self {
48 Self {
49 queue: SortedLinkedList::new_u8(),
50 alarm_at: Instant::MAX,
51 alarm: None,
52 }
53 }
54
55 fn schedule(&mut self, at: Instant, waker: &Waker) {
56 self.queue
57 .find_mut(|timer| timer.waker.will_wake(waker))
58 .map(|mut timer| {
59 timer.waker = waker.clone();
60 timer.at = at;
61
62 timer.finish();
63 })
64 .unwrap_or_else(|| {
65 let mut timer = Timer {
66 waker: waker.clone(),
67 at,
68 };
69
70 loop {
71 match self.queue.push(timer) {
72 Ok(()) => break,
73 Err(e) => timer = e,
74 }
75
76 self.queue.pop().unwrap().waker.wake();
77 }
78 });
79
80 // Don't wait for the alarm callback to trigger and directly
81 // dispatch all timers that are already due
82 //
83 // Then update the alarm if necessary
84 self.dispatch();
85 }
86
87 fn dispatch(&mut self) {
88 let now = Instant::now();
89
90 while self.queue.peek().filter(|timer| timer.at <= now).is_some() {
91 self.queue.pop().unwrap().waker.wake();
92 }
93
94 self.update_alarm();
95 }
96
97 fn update_alarm(&mut self) {
98 if let Some(timer) = self.queue.peek() {
99 let new_at = timer.at;
100
101 if self.alarm_at != new_at {
102 self.alarm_at = new_at;
103 set_alarm(self.alarm.unwrap(), new_at.as_ticks());
104 }
105 } else {
106 self.alarm_at = Instant::MAX;
107 }
108 }
109
110 fn handle_alarm(&mut self) {
111 self.alarm_at = Instant::MAX;
112
113 self.dispatch();
114 }
115}
116
117/// TODO: Doc
118pub struct Queue<const N: usize = 128, R: RawMutex = CriticalSectionRawMutex> {
119 initialized: AtomicBool,
120 inner: Mutex<R, RefCell<InnerQueue<N>>>,
121}
122
123impl<const N: usize, R: RawMutex + 'static> Queue<N, R> {
124 /// TODO: Doc
125 pub const fn new() -> Self {
126 Self {
127 initialized: AtomicBool::new(false),
128 inner: Mutex::new(RefCell::new(InnerQueue::<N>::new())),
129 }
130 }
131
132 /// TODO: Doc
133 pub unsafe fn initialize(&'static self) {
134 if self.initialized.load(AtomicOrdering::SeqCst) {
135 panic!("Queue already initialized");
136 }
137
138 let handle = allocate_alarm().unwrap();
139 self.inner.lock(|inner| inner.borrow_mut().alarm = Some(handle));
140
141 set_alarm_callback(handle, Self::handle_alarm, self as *const _ as _);
142
143 self.initialized.store(true, AtomicOrdering::SeqCst);
144 }
145
146 /// TODO: Doc
147 pub fn schedule(&'static self, at: Instant, waker: &Waker) {
148 self.check_initialized();
149
150 self.inner.lock(|inner| inner.borrow_mut().schedule(at, waker));
151 }
152
153 fn check_initialized(&self) {
154 if !self.initialized.load(AtomicOrdering::SeqCst) {
155 panic!("Queue is not initialized");
156 }
157 }
158
159 fn handle_alarm(ctx: *mut ()) {
160 let this = unsafe { (ctx as *const Self).as_ref().unwrap() };
161
162 this.check_initialized();
163 this.inner.lock(|inner| inner.borrow_mut().handle_alarm());
164 }
165}
166
167/// TODO: Doc
168pub unsafe fn initialize() {
169 extern "Rust" {
170 fn _embassy_time_generic_queue_initialize();
171 }
172
173 _embassy_time_generic_queue_initialize();
174}
175
176/// TODO: Doc
177#[macro_export]
178macro_rules! generic_queue {
179 (static $name:ident: $t: ty = $val:expr) => {
180 static $name: $t = $val;
181
182 #[no_mangle]
183 fn _embassy_time_generic_queue_initialize() {
184 unsafe {
185 $crate::queue::Queue::initialize(&$name);
186 }
187 }
188
189 #[no_mangle]
190 fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) {
191 $crate::queue::Queue::schedule(&$name, at, waker);
192 }
193 };
194}
195
196#[cfg(feature = "generic-queue")]
197generic_queue!(static QUEUE: Queue = Queue::new());