aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-time/src')
-rw-r--r--embassy-time/src/driver_mock.rs146
-rw-r--r--embassy-time/src/driver_std.rs208
-rw-r--r--embassy-time/src/driver_wasm.rs142
-rw-r--r--embassy-time/src/lib.rs2
-rw-r--r--embassy-time/src/queue_generic.rs346
5 files changed, 128 insertions, 716 deletions
diff --git a/embassy-time/src/driver_mock.rs b/embassy-time/src/driver_mock.rs
index 8587f9172..138d60499 100644
--- a/embassy-time/src/driver_mock.rs
+++ b/embassy-time/src/driver_mock.rs
@@ -1,7 +1,9 @@
1use core::cell::RefCell; 1use core::cell::RefCell;
2use core::task::Waker;
2 3
3use critical_section::Mutex as CsMutex; 4use critical_section::Mutex as CsMutex;
4use embassy_time_driver::{AlarmHandle, Driver}; 5use embassy_time_driver::Driver;
6use embassy_time_queue_driver::Queue;
5 7
6use crate::{Duration, Instant}; 8use crate::{Duration, Instant};
7 9
@@ -52,29 +54,13 @@ impl MockDriver {
52 /// Advances the time by the specified [`Duration`]. 54 /// Advances the time by the specified [`Duration`].
53 /// Calling any alarm callbacks that are due. 55 /// Calling any alarm callbacks that are due.
54 pub fn advance(&self, duration: Duration) { 56 pub fn advance(&self, duration: Duration) {
55 let notify = { 57 critical_section::with(|cs| {
56 critical_section::with(|cs| { 58 let inner = &mut *self.0.borrow_ref_mut(cs);
57 let mut inner = self.0.borrow_ref_mut(cs);
58
59 inner.now += duration;
60
61 let now = inner.now.as_ticks();
62
63 inner
64 .alarm
65 .as_mut()
66 .filter(|alarm| alarm.timestamp <= now)
67 .map(|alarm| {
68 alarm.timestamp = u64::MAX;
69
70 (alarm.callback, alarm.ctx)
71 })
72 })
73 };
74 59
75 if let Some((callback, ctx)) = notify { 60 inner.now += duration;
76 (callback)(ctx); 61 // wake expired tasks.
77 } 62 inner.queue.next_expiration(inner.now.as_ticks());
63 })
78 } 64 }
79} 65}
80 66
@@ -83,87 +69,37 @@ impl Driver for MockDriver {
83 critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks() 69 critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks()
84 } 70 }
85 71
86 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { 72 fn schedule_wake(&self, at: u64, waker: &Waker) {
87 critical_section::with(|cs| { 73 critical_section::with(|cs| {
88 let mut inner = self.0.borrow_ref_mut(cs); 74 let inner = &mut *self.0.borrow_ref_mut(cs);
89 75 // enqueue it
90 if inner.alarm.is_some() { 76 inner.queue.schedule_wake(at, waker);
91 None 77 // wake it if it's in the past.
92 } else { 78 inner.queue.next_expiration(inner.now.as_ticks());
93 inner.alarm.replace(AlarmState::new());
94
95 Some(AlarmHandle::new(0))
96 }
97 })
98 }
99
100 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
101 critical_section::with(|cs| {
102 let mut inner = self.0.borrow_ref_mut(cs);
103
104 let Some(alarm) = inner.alarm.as_mut() else {
105 panic!("Alarm not allocated");
106 };
107
108 alarm.callback = callback;
109 alarm.ctx = ctx;
110 });
111 }
112
113 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
114 critical_section::with(|cs| {
115 let mut inner = self.0.borrow_ref_mut(cs);
116
117 if timestamp <= inner.now.as_ticks() {
118 false
119 } else {
120 let Some(alarm) = inner.alarm.as_mut() else {
121 panic!("Alarm not allocated");
122 };
123
124 alarm.timestamp = timestamp;
125 true
126 }
127 }) 79 })
128 } 80 }
129} 81}
130 82
131struct InnerMockDriver { 83struct InnerMockDriver {
132 now: Instant, 84 now: Instant,
133 alarm: Option<AlarmState>, 85 queue: Queue,
134} 86}
135 87
136impl InnerMockDriver { 88impl InnerMockDriver {
137 const fn new() -> Self { 89 const fn new() -> Self {
138 Self { 90 Self {
139 now: Instant::from_ticks(0), 91 now: Instant::from_ticks(0),
140 alarm: None, 92 queue: Queue::new(),
141 }
142 }
143}
144
145struct AlarmState {
146 timestamp: u64,
147 callback: fn(*mut ()),
148 ctx: *mut (),
149}
150
151impl AlarmState {
152 const fn new() -> Self {
153 Self {
154 timestamp: u64::MAX,
155 callback: Self::noop,
156 ctx: core::ptr::null_mut(),
157 } 93 }
158 } 94 }
159
160 fn noop(_ctx: *mut ()) {}
161} 95}
162 96
163unsafe impl Send for AlarmState {}
164
165#[cfg(test)] 97#[cfg(test)]
166mod tests { 98mod tests {
99 use core::sync::atomic::{AtomicBool, Ordering};
100 use std::sync::Arc;
101 use std::task::Wake;
102
167 use serial_test::serial; 103 use serial_test::serial;
168 104
169 use super::*; 105 use super::*;
@@ -185,37 +121,25 @@ mod tests {
185 121
186 #[test] 122 #[test]
187 #[serial] 123 #[serial]
188 fn test_set_alarm_not_in_future() { 124 fn test_schedule_wake() {
189 setup(); 125 setup();
190 126
191 let driver = MockDriver::get(); 127 static CALLBACK_CALLED: AtomicBool = AtomicBool::new(false);
192 let alarm = unsafe { AlarmHandle::new(0) };
193 assert_eq!(false, driver.set_alarm(alarm, driver.now()));
194 }
195 128
196 #[test] 129 struct MockWaker;
197 #[serial]
198 fn test_alarm() {
199 setup();
200
201 let driver = MockDriver::get();
202 let alarm = unsafe { driver.allocate_alarm() }.expect("No alarms available");
203 static mut CALLBACK_CALLED: bool = false;
204 let ctx = &mut () as *mut ();
205 driver.set_alarm_callback(alarm, |_| unsafe { CALLBACK_CALLED = true }, ctx);
206 driver.set_alarm(alarm, driver.now() + 1);
207 assert_eq!(false, unsafe { CALLBACK_CALLED });
208 driver.advance(Duration::from_secs(1));
209 assert_eq!(true, unsafe { CALLBACK_CALLED });
210 }
211 130
212 #[test] 131 impl Wake for MockWaker {
213 #[serial] 132 fn wake(self: Arc<Self>) {
214 fn test_allocate_alarm() { 133 CALLBACK_CALLED.store(true, Ordering::Relaxed);
215 setup(); 134 }
135 }
136 let waker = Arc::new(MockWaker).into();
216 137
217 let driver = MockDriver::get(); 138 let driver = MockDriver::get();
218 assert!(unsafe { driver.allocate_alarm() }.is_some()); 139
219 assert!(unsafe { driver.allocate_alarm() }.is_none()); 140 driver.schedule_wake(driver.now() + 1, &waker);
141 assert_eq!(false, CALLBACK_CALLED.load(Ordering::Relaxed));
142 driver.advance(Duration::from_secs(1));
143 assert_eq!(true, CALLBACK_CALLED.load(Ordering::Relaxed));
220 } 144 }
221} 145}
diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs
index cbef7aae1..35888fddd 100644
--- a/embassy-time/src/driver_std.rs
+++ b/embassy-time/src/driver_std.rs
@@ -1,160 +1,66 @@
1use core::sync::atomic::{AtomicU8, Ordering}; 1use std::sync::{Condvar, Mutex};
2use std::cell::{RefCell, UnsafeCell}; 2use std::thread;
3use std::mem::MaybeUninit;
4use std::sync::{Condvar, Mutex, Once};
5use std::time::{Duration as StdDuration, Instant as StdInstant}; 3use std::time::{Duration as StdDuration, Instant as StdInstant};
6use std::{mem, ptr, thread};
7 4
8use critical_section::Mutex as CsMutex; 5use embassy_time_driver::Driver;
9use embassy_time_driver::{AlarmHandle, Driver}; 6use embassy_time_queue_driver::Queue;
10 7
11const ALARM_COUNT: usize = 4; 8struct TimeDriver {
12 9 signaler: Signaler,
13struct AlarmState { 10 inner: Mutex<Inner>,
14 timestamp: u64,
15
16 // This is really a Option<(fn(*mut ()), *mut ())>
17 // but fn pointers aren't allowed in const yet
18 callback: *const (),
19 ctx: *mut (),
20} 11}
21 12
22unsafe impl Send for AlarmState {} 13struct Inner {
23 14 zero_instant: Option<StdInstant>,
24impl AlarmState { 15 queue: Queue,
25 const fn new() -> Self {
26 Self {
27 timestamp: u64::MAX,
28 callback: ptr::null(),
29 ctx: ptr::null_mut(),
30 }
31 }
32}
33
34struct TimeDriver {
35 alarm_count: AtomicU8,
36
37 once: Once,
38 // The STD Driver implementation requires the alarms' mutex to be reentrant, which the STD Mutex isn't
39 // Fortunately, mutexes based on the `critical-section` crate are reentrant, because the critical sections
40 // themselves are reentrant
41 alarms: UninitCell<CsMutex<RefCell<[AlarmState; ALARM_COUNT]>>>,
42 zero_instant: UninitCell<StdInstant>,
43 signaler: UninitCell<Signaler>,
44} 16}
45 17
46embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { 18embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
47 alarm_count: AtomicU8::new(0), 19 inner: Mutex::new(Inner{
48 20 zero_instant: None,
49 once: Once::new(), 21 queue: Queue::new(),
50 alarms: UninitCell::uninit(), 22 }),
51 zero_instant: UninitCell::uninit(), 23 signaler: Signaler::new(),
52 signaler: UninitCell::uninit(),
53}); 24});
54 25
55impl TimeDriver { 26impl Inner {
56 fn init(&self) { 27 fn init(&mut self) -> StdInstant {
57 self.once.call_once(|| unsafe { 28 *self.zero_instant.get_or_insert_with(|| {
58 self.alarms 29 thread::spawn(alarm_thread);
59 .write(CsMutex::new(RefCell::new([const { AlarmState::new() }; ALARM_COUNT]))); 30 StdInstant::now()
60 self.zero_instant.write(StdInstant::now()); 31 })
61 self.signaler.write(Signaler::new());
62
63 thread::spawn(Self::alarm_thread);
64 });
65 }
66
67 fn alarm_thread() {
68 let zero = unsafe { DRIVER.zero_instant.read() };
69 loop {
70 let now = DRIVER.now();
71
72 let next_alarm = critical_section::with(|cs| {
73 let alarms = unsafe { DRIVER.alarms.as_ref() }.borrow(cs);
74 loop {
75 let pending = alarms
76 .borrow_mut()
77 .iter_mut()
78 .find(|alarm| alarm.timestamp <= now)
79 .map(|alarm| {
80 alarm.timestamp = u64::MAX;
81
82 (alarm.callback, alarm.ctx)
83 });
84
85 if let Some((callback, ctx)) = pending {
86 // safety:
87 // - we can ignore the possiblity of `f` being unset (null) because of the safety contract of `allocate_alarm`.
88 // - other than that we only store valid function pointers into alarm.callback
89 let f: fn(*mut ()) = unsafe { mem::transmute(callback) };
90 f(ctx);
91 } else {
92 // No alarm due
93 break;
94 }
95 }
96
97 alarms
98 .borrow()
99 .iter()
100 .map(|alarm| alarm.timestamp)
101 .min()
102 .unwrap_or(u64::MAX)
103 });
104
105 // Ensure we don't overflow
106 let until = zero
107 .checked_add(StdDuration::from_micros(next_alarm))
108 .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
109
110 unsafe { DRIVER.signaler.as_ref() }.wait_until(until);
111 }
112 } 32 }
113} 33}
114 34
115impl Driver for TimeDriver { 35impl Driver for TimeDriver {
116 fn now(&self) -> u64 { 36 fn now(&self) -> u64 {
117 self.init(); 37 let mut inner = self.inner.lock().unwrap();
118 38 let zero = inner.init();
119 let zero = unsafe { self.zero_instant.read() };
120 StdInstant::now().duration_since(zero).as_micros() as u64 39 StdInstant::now().duration_since(zero).as_micros() as u64
121 } 40 }
122 41
123 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { 42 fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
124 let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| { 43 let mut inner = self.inner.lock().unwrap();
125 if x < ALARM_COUNT as u8 { 44 inner.init();
126 Some(x + 1) 45 if inner.queue.schedule_wake(at, waker) {
127 } else { 46 self.signaler.signal();
128 None
129 }
130 });
131
132 match id {
133 Ok(id) => Some(AlarmHandle::new(id)),
134 Err(_) => None,
135 } 47 }
136 } 48 }
49}
137 50
138 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { 51fn alarm_thread() {
139 self.init(); 52 let zero = DRIVER.inner.lock().unwrap().zero_instant.unwrap();
140 critical_section::with(|cs| { 53 loop {
141 let mut alarms = unsafe { self.alarms.as_ref() }.borrow_ref_mut(cs); 54 let now = DRIVER.now();
142 let alarm = &mut alarms[alarm.id() as usize]; 55
143 alarm.callback = callback as *const (); 56 let next_alarm = DRIVER.inner.lock().unwrap().queue.next_expiration(now);
144 alarm.ctx = ctx;
145 });
146 }
147 57
148 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { 58 // Ensure we don't overflow
149 self.init(); 59 let until = zero
150 critical_section::with(|cs| { 60 .checked_add(StdDuration::from_micros(next_alarm))
151 let mut alarms = unsafe { self.alarms.as_ref() }.borrow_ref_mut(cs); 61 .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
152 let alarm = &mut alarms[alarm.id() as usize];
153 alarm.timestamp = timestamp;
154 unsafe { self.signaler.as_ref() }.signal();
155 });
156 62
157 true 63 DRIVER.signaler.wait_until(until);
158 } 64 }
159} 65}
160 66
@@ -164,7 +70,7 @@ struct Signaler {
164} 70}
165 71
166impl Signaler { 72impl Signaler {
167 fn new() -> Self { 73 const fn new() -> Self {
168 Self { 74 Self {
169 mutex: Mutex::new(false), 75 mutex: Mutex::new(false),
170 condvar: Condvar::new(), 76 condvar: Condvar::new(),
@@ -196,35 +102,3 @@ impl Signaler {
196 self.condvar.notify_one(); 102 self.condvar.notify_one();
197 } 103 }
198} 104}
199
200pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
201unsafe impl<T> Send for UninitCell<T> {}
202unsafe impl<T> Sync for UninitCell<T> {}
203
204impl<T> UninitCell<T> {
205 pub const fn uninit() -> Self {
206 Self(MaybeUninit::uninit())
207 }
208
209 pub unsafe fn as_ptr(&self) -> *const T {
210 (*self.0.as_ptr()).get()
211 }
212
213 pub unsafe fn as_mut_ptr(&self) -> *mut T {
214 (*self.0.as_ptr()).get()
215 }
216
217 pub unsafe fn as_ref(&self) -> &T {
218 &*self.as_ptr()
219 }
220
221 pub unsafe fn write(&self, val: T) {
222 ptr::write(self.as_mut_ptr(), val)
223 }
224}
225
226impl<T: Copy> UninitCell<T> {
227 pub unsafe fn read(&self) -> T {
228 ptr::read(self.as_mut_ptr())
229 }
230}
diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs
index d65629e49..bcdd1670b 100644
--- a/embassy-time/src/driver_wasm.rs
+++ b/embassy-time/src/driver_wasm.rs
@@ -1,28 +1,17 @@
1use core::sync::atomic::{AtomicU8, Ordering}; 1use std::sync::Mutex;
2use std::cell::UnsafeCell;
3use std::mem::MaybeUninit;
4use std::ptr;
5use std::sync::{Mutex, Once};
6 2
7use embassy_time_driver::{AlarmHandle, Driver}; 3use embassy_time_driver::Driver;
4use embassy_time_queue_driver::Queue;
8use wasm_bindgen::prelude::*; 5use wasm_bindgen::prelude::*;
9use wasm_timer::Instant as StdInstant; 6use wasm_timer::Instant as StdInstant;
10 7
11const ALARM_COUNT: usize = 4;
12
13struct AlarmState { 8struct AlarmState {
14 token: Option<f64>, 9 token: Option<f64>,
15 closure: Option<Closure<dyn FnMut() + 'static>>,
16} 10}
17 11
18unsafe impl Send for AlarmState {}
19
20impl AlarmState { 12impl AlarmState {
21 const fn new() -> Self { 13 const fn new() -> Self {
22 Self { 14 Self { token: None }
23 token: None,
24 closure: None,
25 }
26 } 15 }
27} 16}
28 17
@@ -33,67 +22,38 @@ extern "C" {
33} 22}
34 23
35struct TimeDriver { 24struct TimeDriver {
36 alarm_count: AtomicU8, 25 inner: Mutex<Inner>,
37
38 once: Once,
39 alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
40 zero_instant: UninitCell<StdInstant>,
41} 26}
42 27
43embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { 28struct Inner {
44 alarm_count: AtomicU8::new(0), 29 alarm: AlarmState,
45 once: Once::new(), 30 zero_instant: Option<StdInstant>,
46 alarms: UninitCell::uninit(), 31 queue: Queue,
47 zero_instant: UninitCell::uninit(), 32 closure: Option<Closure<dyn FnMut()>>,
48});
49
50impl TimeDriver {
51 fn init(&self) {
52 self.once.call_once(|| unsafe {
53 self.alarms
54 .write(Mutex::new([const { AlarmState::new() }; ALARM_COUNT]));
55 self.zero_instant.write(StdInstant::now());
56 });
57 }
58} 33}
59 34
60impl Driver for TimeDriver { 35unsafe impl Send for Inner {}
61 fn now(&self) -> u64 {
62 self.init();
63 36
64 let zero = unsafe { self.zero_instant.read() }; 37embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
65 StdInstant::now().duration_since(zero).as_micros() as u64 38 inner: Mutex::new(Inner{
66 } 39 zero_instant: None,
67 40 queue: Queue::new(),
68 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { 41 alarm: AlarmState::new(),
69 let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| { 42 closure: None,
70 if x < ALARM_COUNT as u8 { 43 }),
71 Some(x + 1) 44});
72 } else {
73 None
74 }
75 });
76 45
77 match id { 46impl Inner {
78 Ok(id) => Some(AlarmHandle::new(id)), 47 fn init(&mut self) -> StdInstant {
79 Err(_) => None, 48 *self.zero_instant.get_or_insert_with(StdInstant::now)
80 }
81 } 49 }
82 50
83 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { 51 fn now(&mut self) -> u64 {
84 self.init(); 52 StdInstant::now().duration_since(self.zero_instant.unwrap()).as_micros() as u64
85 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
86 let alarm = &mut alarms[alarm.id() as usize];
87 alarm.closure.replace(Closure::new(move || {
88 callback(ctx);
89 }));
90 } 53 }
91 54
92 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { 55 fn set_alarm(&mut self, timestamp: u64) -> bool {
93 self.init(); 56 if let Some(token) = self.alarm.token {
94 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
95 let alarm = &mut alarms[alarm.id() as usize];
96 if let Some(token) = alarm.token {
97 clearTimeout(token); 57 clearTimeout(token);
98 } 58 }
99 59
@@ -102,40 +62,42 @@ impl Driver for TimeDriver {
102 false 62 false
103 } else { 63 } else {
104 let timeout = (timestamp - now) as u32; 64 let timeout = (timestamp - now) as u32;
105 alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000)); 65 let closure = self.closure.get_or_insert_with(|| Closure::new(dispatch));
66 self.alarm.token = Some(setTimeout(closure, timeout / 1000));
106 67
107 true 68 true
108 } 69 }
109 } 70 }
110} 71}
111 72
112pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>); 73impl Driver for TimeDriver {
113unsafe impl<T> Send for UninitCell<T> {} 74 fn now(&self) -> u64 {
114unsafe impl<T> Sync for UninitCell<T> {} 75 let mut inner = self.inner.lock().unwrap();
115 76 let zero = inner.init();
116impl<T> UninitCell<T> { 77 StdInstant::now().duration_since(zero).as_micros() as u64
117 pub const fn uninit() -> Self {
118 Self(MaybeUninit::uninit())
119 }
120 unsafe fn as_ptr(&self) -> *const T {
121 (*self.0.as_ptr()).get()
122 }
123
124 pub unsafe fn as_mut_ptr(&self) -> *mut T {
125 (*self.0.as_ptr()).get()
126 }
127
128 pub unsafe fn as_ref(&self) -> &T {
129 &*self.as_ptr()
130 } 78 }
131 79
132 pub unsafe fn write(&self, val: T) { 80 fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
133 ptr::write(self.as_mut_ptr(), val) 81 let mut inner = self.inner.lock().unwrap();
82 inner.init();
83 if inner.queue.schedule_wake(at, waker) {
84 let now = inner.now();
85 let mut next = inner.queue.next_expiration(now);
86 while !inner.set_alarm(next) {
87 let now = inner.now();
88 next = inner.queue.next_expiration(now);
89 }
90 }
134 } 91 }
135} 92}
136 93
137impl<T: Copy> UninitCell<T> { 94fn dispatch() {
138 pub unsafe fn read(&self) -> T { 95 let inner = &mut *DRIVER.inner.lock().unwrap();
139 ptr::read(self.as_mut_ptr()) 96
97 let now = inner.now();
98 let mut next = inner.queue.next_expiration(now);
99 while !inner.set_alarm(next) {
100 let now = inner.now();
101 next = inner.queue.next_expiration(now);
140 } 102 }
141} 103}
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs
index 8d0648ce5..80a359413 100644
--- a/embassy-time/src/lib.rs
+++ b/embassy-time/src/lib.rs
@@ -25,8 +25,6 @@ pub use driver_mock::MockDriver;
25mod driver_std; 25mod driver_std;
26#[cfg(feature = "wasm")] 26#[cfg(feature = "wasm")]
27mod driver_wasm; 27mod driver_wasm;
28#[cfg(feature = "generic-queue")]
29mod queue_generic;
30 28
31pub use delay::{block_for, Delay}; 29pub use delay::{block_for, Delay};
32pub use duration::Duration; 30pub use duration::Duration;
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
deleted file mode 100644
index 0068edae8..000000000
--- a/embassy-time/src/queue_generic.rs
+++ /dev/null
@@ -1,346 +0,0 @@
1use core::cell::RefCell;
2use core::cmp::{min, Ordering};
3use core::task::Waker;
4
5use critical_section::Mutex;
6use embassy_time_driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
7use embassy_time_queue_driver::TimerQueue;
8use heapless::Vec;
9
10use crate::Instant;
11
12#[cfg(feature = "generic-queue-8")]
13const QUEUE_SIZE: usize = 8;
14#[cfg(feature = "generic-queue-16")]
15const QUEUE_SIZE: usize = 16;
16#[cfg(feature = "generic-queue-32")]
17const QUEUE_SIZE: usize = 32;
18#[cfg(feature = "generic-queue-64")]
19const QUEUE_SIZE: usize = 64;
20#[cfg(feature = "generic-queue-128")]
21const QUEUE_SIZE: usize = 128;
22#[cfg(not(any(
23 feature = "generic-queue-8",
24 feature = "generic-queue-16",
25 feature = "generic-queue-32",
26 feature = "generic-queue-64",
27 feature = "generic-queue-128"
28)))]
29const QUEUE_SIZE: usize = 64;
30
31#[derive(Debug)]
32struct Timer {
33 at: Instant,
34 waker: Waker,
35}
36
37impl PartialEq for Timer {
38 fn eq(&self, other: &Self) -> bool {
39 self.at == other.at
40 }
41}
42
43impl Eq for Timer {}
44
45impl PartialOrd for Timer {
46 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47 self.at.partial_cmp(&other.at)
48 }
49}
50
51impl Ord for Timer {
52 fn cmp(&self, other: &Self) -> Ordering {
53 self.at.cmp(&other.at)
54 }
55}
56
57struct InnerQueue {
58 queue: Vec<Timer, QUEUE_SIZE>,
59 alarm: AlarmHandle,
60}
61
62impl InnerQueue {
63 fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
64 self.queue
65 .iter_mut()
66 .find(|timer| timer.waker.will_wake(waker))
67 .map(|timer| {
68 timer.at = min(timer.at, at);
69 })
70 .unwrap_or_else(|| {
71 let mut timer = Timer {
72 waker: waker.clone(),
73 at,
74 };
75
76 loop {
77 match self.queue.push(timer) {
78 Ok(()) => break,
79 Err(e) => timer = e,
80 }
81
82 self.queue.pop().unwrap().waker.wake();
83 }
84 });
85
86 // Don't wait for the alarm callback to trigger and directly
87 // dispatch all timers that are already due
88 //
89 // Then update the alarm if necessary
90 self.dispatch();
91 }
92
93 fn dispatch(&mut self) {
94 loop {
95 let now = Instant::now();
96
97 let mut next_alarm = Instant::MAX;
98
99 let mut i = 0;
100 while i < self.queue.len() {
101 let timer = &self.queue[i];
102 if timer.at <= now {
103 let timer = self.queue.swap_remove(i);
104 timer.waker.wake();
105 } else {
106 next_alarm = min(next_alarm, timer.at);
107 i += 1;
108 }
109 }
110
111 if self.update_alarm(next_alarm) {
112 break;
113 }
114 }
115 }
116
117 fn update_alarm(&mut self, next_alarm: Instant) -> bool {
118 if next_alarm == Instant::MAX {
119 true
120 } else {
121 set_alarm(self.alarm, next_alarm.as_ticks())
122 }
123 }
124
125 fn handle_alarm(&mut self) {
126 self.dispatch();
127 }
128}
129
130struct Queue {
131 inner: Mutex<RefCell<Option<InnerQueue>>>,
132}
133
134impl Queue {
135 const fn new() -> Self {
136 Self {
137 inner: Mutex::new(RefCell::new(None)),
138 }
139 }
140
141 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
142 critical_section::with(|cs| {
143 let mut inner = self.inner.borrow_ref_mut(cs);
144
145 inner
146 .get_or_insert_with(|| {
147 let handle = unsafe { allocate_alarm() }.unwrap();
148 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
149 InnerQueue {
150 queue: Vec::new(),
151 alarm: handle,
152 }
153 })
154 .schedule_wake(at, waker)
155 });
156 }
157
158 fn handle_alarm(&self) {
159 critical_section::with(|cs| self.inner.borrow_ref_mut(cs).as_mut().unwrap().handle_alarm())
160 }
161
162 fn handle_alarm_callback(ctx: *mut ()) {
163 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
164 }
165}
166
167impl TimerQueue for Queue {
168 fn schedule_wake(&'static self, at: u64, waker: &Waker) {
169 Queue::schedule_wake(self, Instant::from_ticks(at), waker);
170 }
171}
172
173embassy_time_queue_driver::timer_queue_impl!(static QUEUE: Queue = Queue::new());
174
175#[cfg(test)]
176#[cfg(feature = "mock-driver")]
177mod tests {
178 use core::sync::atomic::{AtomicBool, Ordering};
179 use core::task::Waker;
180 use std::sync::Arc;
181 use std::task::Wake;
182
183 use serial_test::serial;
184
185 use crate::driver_mock::MockDriver;
186 use crate::queue_generic::QUEUE;
187 use crate::{Duration, Instant};
188
189 struct TestWaker {
190 pub awoken: AtomicBool,
191 }
192
193 impl Wake for TestWaker {
194 fn wake(self: Arc<Self>) {
195 self.awoken.store(true, Ordering::Relaxed);
196 }
197
198 fn wake_by_ref(self: &Arc<Self>) {
199 self.awoken.store(true, Ordering::Relaxed);
200 }
201 }
202
203 fn test_waker() -> (Arc<TestWaker>, Waker) {
204 let arc = Arc::new(TestWaker {
205 awoken: AtomicBool::new(false),
206 });
207 let waker = Waker::from(arc.clone());
208
209 (arc, waker)
210 }
211
212 fn setup() {
213 MockDriver::get().reset();
214 critical_section::with(|cs| *QUEUE.inner.borrow_ref_mut(cs) = None);
215 }
216
217 fn queue_len() -> usize {
218 critical_section::with(|cs| {
219 QUEUE
220 .inner
221 .borrow_ref(cs)
222 .as_ref()
223 .map(|inner| inner.queue.iter().count())
224 .unwrap_or(0)
225 })
226 }
227
228 #[test]
229 #[serial]
230 fn test_schedule() {
231 setup();
232
233 assert_eq!(queue_len(), 0);
234
235 let (flag, waker) = test_waker();
236
237 QUEUE.schedule_wake(Instant::from_secs(1), &waker);
238
239 assert!(!flag.awoken.load(Ordering::Relaxed));
240 assert_eq!(queue_len(), 1);
241 }
242
243 #[test]
244 #[serial]
245 fn test_schedule_same() {
246 setup();
247
248 let (_flag, waker) = test_waker();
249
250 QUEUE.schedule_wake(Instant::from_secs(1), &waker);
251
252 assert_eq!(queue_len(), 1);
253
254 QUEUE.schedule_wake(Instant::from_secs(1), &waker);
255
256 assert_eq!(queue_len(), 1);
257
258 QUEUE.schedule_wake(Instant::from_secs(100), &waker);
259
260 assert_eq!(queue_len(), 1);
261
262 let (_flag2, waker2) = test_waker();
263
264 QUEUE.schedule_wake(Instant::from_secs(100), &waker2);
265
266 assert_eq!(queue_len(), 2);
267 }
268
269 #[test]
270 #[serial]
271 fn test_trigger() {
272 setup();
273
274 let (flag, waker) = test_waker();
275
276 QUEUE.schedule_wake(Instant::from_secs(100), &waker);
277
278 assert!(!flag.awoken.load(Ordering::Relaxed));
279
280 MockDriver::get().advance(Duration::from_secs(99));
281
282 assert!(!flag.awoken.load(Ordering::Relaxed));
283
284 assert_eq!(queue_len(), 1);
285
286 MockDriver::get().advance(Duration::from_secs(1));
287
288 assert!(flag.awoken.load(Ordering::Relaxed));
289
290 assert_eq!(queue_len(), 0);
291 }
292
293 #[test]
294 #[serial]
295 fn test_immediate_trigger() {
296 setup();
297
298 let (flag, waker) = test_waker();
299
300 QUEUE.schedule_wake(Instant::from_secs(100), &waker);
301
302 MockDriver::get().advance(Duration::from_secs(50));
303
304 let (flag2, waker2) = test_waker();
305
306 QUEUE.schedule_wake(Instant::from_secs(40), &waker2);
307
308 assert!(!flag.awoken.load(Ordering::Relaxed));
309 assert!(flag2.awoken.load(Ordering::Relaxed));
310 assert_eq!(queue_len(), 1);
311 }
312
313 #[test]
314 #[serial]
315 fn test_queue_overflow() {
316 setup();
317
318 for i in 1..super::QUEUE_SIZE {
319 let (flag, waker) = test_waker();
320
321 QUEUE.schedule_wake(Instant::from_secs(310), &waker);
322
323 assert_eq!(queue_len(), i);
324 assert!(!flag.awoken.load(Ordering::Relaxed));
325 }
326
327 let (flag, waker) = test_waker();
328
329 QUEUE.schedule_wake(Instant::from_secs(300), &waker);
330
331 assert_eq!(queue_len(), super::QUEUE_SIZE);
332 assert!(!flag.awoken.load(Ordering::Relaxed));
333
334 let (flag2, waker2) = test_waker();
335
336 QUEUE.schedule_wake(Instant::from_secs(305), &waker2);
337
338 assert_eq!(queue_len(), super::QUEUE_SIZE);
339 assert!(flag.awoken.load(Ordering::Relaxed));
340
341 let (_flag3, waker3) = test_waker();
342 QUEUE.schedule_wake(Instant::from_secs(320), &waker3);
343 assert_eq!(queue_len(), super::QUEUE_SIZE);
344 assert!(flag2.awoken.load(Ordering::Relaxed));
345 }
346}