aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src
diff options
context:
space:
mode:
authorjrmoulton <[email protected]>2025-06-10 15:47:54 -0600
committerjrmoulton <[email protected]>2025-06-10 15:48:36 -0600
commitcfad9798ff99d4de0571a512d156b5fe1ef1d427 (patch)
treefc3bf670f82d139de19466cddad1e909db7f3d2e /embassy-time/src
parentfc342915e6155dec7bafa3e135da7f37a9a07f5c (diff)
parent6186d111a5c150946ee5b7e9e68d987a38c1a463 (diff)
merge new embassy changes
Diffstat (limited to 'embassy-time/src')
-rw-r--r--embassy-time/src/driver_mock.rs146
-rw-r--r--embassy-time/src/driver_std.rs208
-rw-r--r--embassy-time/src/driver_wasm.rs142
-rw-r--r--embassy-time/src/duration.rs80
-rw-r--r--embassy-time/src/instant.rs44
-rw-r--r--embassy-time/src/lib.rs2
-rw-r--r--embassy-time/src/queue_generic.rs348
-rw-r--r--embassy-time/src/timer.rs74
8 files changed, 302 insertions, 742 deletions
diff --git a/embassy-time/src/driver_mock.rs b/embassy-time/src/driver_mock.rs
index 8587f9172..bb1961bf2 100644
--- a/embassy-time/src/driver_mock.rs
+++ b/embassy-time/src/driver_mock.rs
@@ -1,7 +1,9 @@
1use core::cell::RefCell; 1use core::cell::RefCell;
2use core::task::Waker;
2 3
3use critical_section::Mutex as CsMutex; 4use critical_section::Mutex as CsMutex;
4use embassy_time_driver::{AlarmHandle, Driver}; 5use embassy_time_driver::Driver;
6use embassy_time_queue_utils::Queue;
5 7
6use crate::{Duration, Instant}; 8use crate::{Duration, Instant};
7 9
@@ -52,29 +54,13 @@ impl MockDriver {
52 /// Advances the time by the specified [`Duration`]. 54 /// Advances the time by the specified [`Duration`].
53 /// Calling any alarm callbacks that are due. 55 /// Calling any alarm callbacks that are due.
54 pub fn advance(&self, duration: Duration) { 56 pub fn advance(&self, duration: Duration) {
55 let notify = { 57 critical_section::with(|cs| {
56 critical_section::with(|cs| { 58 let inner = &mut *self.0.borrow_ref_mut(cs);
57 let mut inner = self.0.borrow_ref_mut(cs);
58
59 inner.now += duration;
60
61 let now = inner.now.as_ticks();
62
63 inner
64 .alarm
65 .as_mut()
66 .filter(|alarm| alarm.timestamp <= now)
67 .map(|alarm| {
68 alarm.timestamp = u64::MAX;
69
70 (alarm.callback, alarm.ctx)
71 })
72 })
73 };
74 59
75 if let Some((callback, ctx)) = notify { 60 inner.now += duration;
76 (callback)(ctx); 61 // wake expired tasks.
77 } 62 inner.queue.next_expiration(inner.now.as_ticks());
63 })
78 } 64 }
79} 65}
80 66
@@ -83,87 +69,37 @@ impl Driver for MockDriver {
83 critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks() 69 critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks()
84 } 70 }
85 71
86 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { 72 fn schedule_wake(&self, at: u64, waker: &Waker) {
87 critical_section::with(|cs| { 73 critical_section::with(|cs| {
88 let mut inner = self.0.borrow_ref_mut(cs); 74 let inner = &mut *self.0.borrow_ref_mut(cs);
89 75 // enqueue it
90 if inner.alarm.is_some() { 76 inner.queue.schedule_wake(at, waker);
91 None 77 // wake it if it's in the past.
92 } else { 78 inner.queue.next_expiration(inner.now.as_ticks());
93 inner.alarm.replace(AlarmState::new());
94
95 Some(AlarmHandle::new(0))
96 }
97 })
98 }
99
100 fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
101 critical_section::with(|cs| {
102 let mut inner = self.0.borrow_ref_mut(cs);
103
104 let Some(alarm) = inner.alarm.as_mut() else {
105 panic!("Alarm not allocated");
106 };
107
108 alarm.callback = callback;
109 alarm.ctx = ctx;
110 });
111 }
112
113 fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool {
114 critical_section::with(|cs| {
115 let mut inner = self.0.borrow_ref_mut(cs);
116
117 if timestamp <= inner.now.as_ticks() {
118 false
119 } else {
120 let Some(alarm) = inner.alarm.as_mut() else {
121 panic!("Alarm not allocated");
122 };
123
124 alarm.timestamp = timestamp;
125 true
126 }
127 }) 79 })
128 } 80 }
129} 81}
130 82
131struct InnerMockDriver { 83struct InnerMockDriver {
132 now: Instant, 84 now: Instant,
133 alarm: Option<AlarmState>, 85 queue: Queue,
134} 86}
135 87
136impl InnerMockDriver { 88impl InnerMockDriver {
137 const fn new() -> Self { 89 const fn new() -> Self {
138 Self { 90 Self {
139 now: Instant::from_ticks(0), 91 now: Instant::from_ticks(0),
140 alarm: None, 92 queue: Queue::new(),
141 }
142 }
143}
144
145struct AlarmState {
146 timestamp: u64,
147 callback: fn(*mut ()),
148 ctx: *mut (),
149}
150
151impl AlarmState {
152 const fn new() -> Self {
153 Self {
154 timestamp: u64::MAX,
155 callback: Self::noop,
156 ctx: core::ptr::null_mut(),
157 } 93 }
158 } 94 }
159
160 fn noop(_ctx: *mut ()) {}
161} 95}
162 96
163unsafe impl Send for AlarmState {}
164
165#[cfg(test)] 97#[cfg(test)]
166mod tests { 98mod tests {
99 use core::sync::atomic::{AtomicBool, Ordering};
100 use std::sync::Arc;
101 use std::task::Wake;
102
167 use serial_test::serial; 103 use serial_test::serial;
168 104
169 use super::*; 105 use super::*;
@@ -185,37 +121,25 @@ mod tests {
185 121
186 #[test] 122 #[test]
187 #[serial] 123 #[serial]
188 fn test_set_alarm_not_in_future() { 124 fn test_schedule_wake() {
189 setup(); 125 setup();
190 126
191 let driver = MockDriver::get(); 127 static CALLBACK_CALLED: AtomicBool = AtomicBool::new(false);
192 let alarm = unsafe { AlarmHandle::new(0) };
193 assert_eq!(false, driver.set_alarm(alarm, driver.now()));
194 }
195 128
196 #[test] 129 struct MockWaker;
197 #[serial]
198 fn test_alarm() {
199 setup();
200
201 let driver = MockDriver::get();
202 let alarm = unsafe { driver.allocate_alarm() }.expect("No alarms available");
203 static mut CALLBACK_CALLED: bool = false;
204 let ctx = &mut () as *mut ();
205 driver.set_alarm_callback(alarm, |_| unsafe { CALLBACK_CALLED = true }, ctx);
206 driver.set_alarm(alarm, driver.now() + 1);
207 assert_eq!(false, unsafe { CALLBACK_CALLED });
208 driver.advance(Duration::from_secs(1));
209 assert_eq!(true, unsafe { CALLBACK_CALLED });
210 }
211 130
212 #[test] 131 impl Wake for MockWaker {
213 #[serial] 132 fn wake(self: Arc<Self>) {
214 fn test_allocate_alarm() { 133 CALLBACK_CALLED.store(true, Ordering::Relaxed);
215 setup(); 134 }
135 }
136 let waker = Arc::new(MockWaker).into();
216 137
217 let driver = MockDriver::get(); 138 let driver = MockDriver::get();
218 assert!(unsafe { driver.allocate_alarm() }.is_some()); 139
219 assert!(unsafe { driver.allocate_alarm() }.is_none()); 140 driver.schedule_wake(driver.now() + 1, &waker);
141 assert_eq!(false, CALLBACK_CALLED.load(Ordering::Relaxed));
142 driver.advance(Duration::from_secs(1));
143 assert_eq!(true, CALLBACK_CALLED.load(Ordering::Relaxed));
220 } 144 }
221} 145}
diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs
index d182f8331..87d7ef7eb 100644
--- a/embassy-time/src/driver_std.rs
+++ b/embassy-time/src/driver_std.rs
@@ -1,160 +1,66 @@
1use core::sync::atomic::{AtomicU8, Ordering}; 1use std::sync::{Condvar, Mutex};
2use std::cell::{RefCell, UnsafeCell}; 2use std::thread;
3use std::mem::MaybeUninit;
4use std::sync::{Condvar, Mutex, Once};
5use std::time::{Duration as StdDuration, Instant as StdInstant}; 3use std::time::{Duration as StdDuration, Instant as StdInstant};
6use std::{mem, ptr, thread};
7 4
8use critical_section::Mutex as CsMutex; 5use embassy_time_driver::Driver;
9use embassy_time_driver::{AlarmHandle, Driver}; 6use embassy_time_queue_utils::Queue;
10 7
11const ALARM_COUNT: usize = 4; 8struct TimeDriver {
12 9 signaler: Signaler,
13struct AlarmState { 10 inner: Mutex<Inner>,
14 timestamp: u64,
15
16 // This is really a Option<(fn(*mut ()), *mut ())>
17 // but fn pointers aren't allowed in const yet
18 callback: *const (),
19 ctx: *mut (),
20} 11}
21 12
22unsafe impl Send for AlarmState {} 13struct Inner {
23 14 zero_instant: Option<StdInstant>,
24impl AlarmState { 15 queue: Queue,
25 const fn new() -> Self {
26 Self {
27 timestamp: u64::MAX,
28 callback: ptr::null(),
29 ctx: ptr::null_mut(),
30 }
31 }
32} 16}
33 17
34struct TimeDriver {
35 alarm_count: AtomicU8,
36
37 once: Once,
38 // The STD Driver implementation requires the alarms' mutex to be reentrant, which the STD Mutex isn't
39 // Fortunately, mutexes based on the `critical-section` crate are reentrant, because the critical sections
40 // themselves are reentrant
41 alarms: UninitCell<CsMutex<RefCell<[AlarmState; ALARM_COUNT]>>>,
42 zero_instant: UninitCell<StdInstant>,
43 signaler: UninitCell<Signaler>,
44}
45
46const ALARM_NEW: AlarmState = AlarmState::new();
47embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { 18embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
48 alarm_count: AtomicU8::new(0), 19 inner: Mutex::new(Inner{
49 20 zero_instant: None,
50 once: Once::new(), 21 queue: Queue::new(),
51 alarms: UninitCell::uninit(), 22 }),
52 zero_instant: UninitCell::uninit(), 23 signaler: Signaler::new(),
53 signaler: UninitCell::uninit(),
54}); 24});
55 25
56impl TimeDriver { 26impl Inner {
57 fn init(&self) { 27 fn init(&mut self) -> StdInstant {
58 self.once.call_once(|| unsafe { 28 *self.zero_instant.get_or_insert_with(|| {
59 self.alarms.write(CsMutex::new(RefCell::new([ALARM_NEW; ALARM_COUNT]))); 29 thread::spawn(alarm_thread);
60 self.zero_instant.write(StdInstant::now()); 30 StdInstant::now()
61 self.signaler.write(Signaler::new()); 31 })
62
63 thread::spawn(Self::alarm_thread);
64 });
65 }
66
67 fn alarm_thread() {
68 let zero = unsafe { DRIVER.zero_instant.read() };
69 loop {
70 let now = DRIVER.now();
71
72 let next_alarm = critical_section::with(|cs| {
73 let alarms = unsafe { DRIVER.alarms.as_ref() }.borrow(cs);
74 loop {
75 let pending = alarms
76 .borrow_mut()
77 .iter_mut()
78 .find(|alarm| alarm.timestamp <= now)
79 .map(|alarm| {
80 alarm.timestamp = u64::MAX;
81
82 (alarm.callback, alarm.ctx)
83 });
84
85 if let Some((callback, ctx)) = pending {
86 // safety:
87 // - we can ignore the possiblity of `f` being unset (null) because of the safety contract of `allocate_alarm`.
88 // - other than that we only store valid function pointers into alarm.callback
89 let f: fn(*mut ()) = unsafe { mem::transmute(callback) };
90 f(ctx);
91 } else {
92 // No alarm due
93 break;
94 }
95 }
96
97 alarms
98 .borrow()
99 .iter()
100 .map(|alarm| alarm.timestamp)
101 .min()
102 .unwrap_or(u64::MAX)
103 });
104
105 // Ensure we don't overflow
106 let until = zero
107 .checked_add(StdDuration::from_micros(next_alarm))
108 .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
109
110 unsafe { DRIVER.signaler.as_ref() }.wait_until(until);
111 }
112 } 32 }
113} 33}
114 34
115impl Driver for TimeDriver { 35impl Driver for TimeDriver {
116 fn now(&self) -> u64 { 36 fn now(&self) -> u64 {
117 self.init(); 37 let mut inner = self.inner.lock().unwrap();
118 38 let zero = inner.init();
119 let zero = unsafe { self.zero_instant.read() };
120 StdInstant::now().duration_since(zero).as_micros() as u64 39 StdInstant::now().duration_since(zero).as_micros() as u64
121 } 40 }
122 41
123 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { 42 fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
124 let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| { 43 let mut inner = self.inner.lock().unwrap();
125 if x < ALARM_COUNT as u8 { 44 inner.init();
126 Some(x + 1) 45 if inner.queue.schedule_wake(at, waker) {
127 } else { 46 self.signaler.signal();
128 None
129 }
130 });
131
132 match id {
133 Ok(id) => Some(AlarmHandle::new(id)),
134 Err(_) => None,
135 } 47 }
136 } 48 }
49}
137 50
138 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { 51fn alarm_thread() {
139 self.init(); 52 let zero = DRIVER.inner.lock().unwrap().zero_instant.unwrap();
140 critical_section::with(|cs| { 53 loop {
141 let mut alarms = unsafe { self.alarms.as_ref() }.borrow_ref_mut(cs); 54 let now = DRIVER.now();
142 let alarm = &mut alarms[alarm.id() as usize]; 55
143 alarm.callback = callback as *const (); 56 let next_alarm = DRIVER.inner.lock().unwrap().queue.next_expiration(now);
144 alarm.ctx = ctx;
145 });
146 }
147 57
148 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { 58 // Ensure we don't overflow
149 self.init(); 59 let until = zero
150 critical_section::with(|cs| { 60 .checked_add(StdDuration::from_micros(next_alarm))
151 let mut alarms = unsafe { self.alarms.as_ref() }.borrow_ref_mut(cs); 61 .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
152 let alarm = &mut alarms[alarm.id() as usize];
153 alarm.timestamp = timestamp;
154 unsafe { self.signaler.as_ref() }.signal();
155 });
156 62
157 true 63 DRIVER.signaler.wait_until(until);
158 } 64 }
159} 65}
160 66
@@ -164,7 +70,7 @@ struct Signaler {
164} 70}
165 71
166impl Signaler { 72impl Signaler {
167 fn new() -> Self { 73 const fn new() -> Self {
168 Self { 74 Self {
169 mutex: Mutex::new(false), 75 mutex: Mutex::new(false),
170 condvar: Condvar::new(), 76 condvar: Condvar::new(),
@@ -196,35 +102,3 @@ impl Signaler {
196 self.condvar.notify_one(); 102 self.condvar.notify_one();
197 } 103 }
198} 104}
199
200pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
201unsafe impl<T> Send for UninitCell<T> {}
202unsafe impl<T> Sync for UninitCell<T> {}
203
204impl<T> UninitCell<T> {
205 pub const fn uninit() -> Self {
206 Self(MaybeUninit::uninit())
207 }
208
209 pub unsafe fn as_ptr(&self) -> *const T {
210 (*self.0.as_ptr()).get()
211 }
212
213 pub unsafe fn as_mut_ptr(&self) -> *mut T {
214 (*self.0.as_ptr()).get()
215 }
216
217 pub unsafe fn as_ref(&self) -> &T {
218 &*self.as_ptr()
219 }
220
221 pub unsafe fn write(&self, val: T) {
222 ptr::write(self.as_mut_ptr(), val)
223 }
224}
225
226impl<T: Copy> UninitCell<T> {
227 pub unsafe fn read(&self) -> T {
228 ptr::read(self.as_mut_ptr())
229 }
230}
diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs
index ad884f060..e3207691a 100644
--- a/embassy-time/src/driver_wasm.rs
+++ b/embassy-time/src/driver_wasm.rs
@@ -1,28 +1,17 @@
1use core::sync::atomic::{AtomicU8, Ordering}; 1use std::sync::Mutex;
2use std::cell::UnsafeCell;
3use std::mem::MaybeUninit;
4use std::ptr;
5use std::sync::{Mutex, Once};
6 2
7use embassy_time_driver::{AlarmHandle, Driver}; 3use embassy_time_driver::Driver;
4use embassy_time_queue_utils::Queue;
8use wasm_bindgen::prelude::*; 5use wasm_bindgen::prelude::*;
9use wasm_timer::Instant as StdInstant; 6use wasm_timer::Instant as StdInstant;
10 7
11const ALARM_COUNT: usize = 4;
12
13struct AlarmState { 8struct AlarmState {
14 token: Option<f64>, 9 token: Option<f64>,
15 closure: Option<Closure<dyn FnMut() + 'static>>,
16} 10}
17 11
18unsafe impl Send for AlarmState {}
19
20impl AlarmState { 12impl AlarmState {
21 const fn new() -> Self { 13 const fn new() -> Self {
22 Self { 14 Self { token: None }
23 token: None,
24 closure: None,
25 }
26 } 15 }
27} 16}
28 17
@@ -33,67 +22,38 @@ extern "C" {
33} 22}
34 23
35struct TimeDriver { 24struct TimeDriver {
36 alarm_count: AtomicU8, 25 inner: Mutex<Inner>,
37
38 once: Once,
39 alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
40 zero_instant: UninitCell<StdInstant>,
41} 26}
42 27
43const ALARM_NEW: AlarmState = AlarmState::new(); 28struct Inner {
44embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { 29 alarm: AlarmState,
45 alarm_count: AtomicU8::new(0), 30 zero_instant: Option<StdInstant>,
46 once: Once::new(), 31 queue: Queue,
47 alarms: UninitCell::uninit(), 32 closure: Option<Closure<dyn FnMut()>>,
48 zero_instant: UninitCell::uninit(),
49});
50
51impl TimeDriver {
52 fn init(&self) {
53 self.once.call_once(|| unsafe {
54 self.alarms.write(Mutex::new([ALARM_NEW; ALARM_COUNT]));
55 self.zero_instant.write(StdInstant::now());
56 });
57 }
58} 33}
59 34
60impl Driver for TimeDriver { 35unsafe impl Send for Inner {}
61 fn now(&self) -> u64 {
62 self.init();
63 36
64 let zero = unsafe { self.zero_instant.read() }; 37embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
65 StdInstant::now().duration_since(zero).as_micros() as u64 38 inner: Mutex::new(Inner{
66 } 39 zero_instant: None,
67 40 queue: Queue::new(),
68 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> { 41 alarm: AlarmState::new(),
69 let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| { 42 closure: None,
70 if x < ALARM_COUNT as u8 { 43 }),
71 Some(x + 1) 44});
72 } else {
73 None
74 }
75 });
76 45
77 match id { 46impl Inner {
78 Ok(id) => Some(AlarmHandle::new(id)), 47 fn init(&mut self) -> StdInstant {
79 Err(_) => None, 48 *self.zero_instant.get_or_insert_with(StdInstant::now)
80 }
81 } 49 }
82 50
83 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { 51 fn now(&mut self) -> u64 {
84 self.init(); 52 StdInstant::now().duration_since(self.zero_instant.unwrap()).as_micros() as u64
85 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
86 let alarm = &mut alarms[alarm.id() as usize];
87 alarm.closure.replace(Closure::new(move || {
88 callback(ctx);
89 }));
90 } 53 }
91 54
92 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { 55 fn set_alarm(&mut self, timestamp: u64) -> bool {
93 self.init(); 56 if let Some(token) = self.alarm.token {
94 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
95 let alarm = &mut alarms[alarm.id() as usize];
96 if let Some(token) = alarm.token {
97 clearTimeout(token); 57 clearTimeout(token);
98 } 58 }
99 59
@@ -102,40 +62,42 @@ impl Driver for TimeDriver {
102 false 62 false
103 } else { 63 } else {
104 let timeout = (timestamp - now) as u32; 64 let timeout = (timestamp - now) as u32;
105 alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000)); 65 let closure = self.closure.get_or_insert_with(|| Closure::new(dispatch));
66 self.alarm.token = Some(setTimeout(closure, timeout / 1000));
106 67
107 true 68 true
108 } 69 }
109 } 70 }
110} 71}
111 72
112pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>); 73impl Driver for TimeDriver {
113unsafe impl<T> Send for UninitCell<T> {} 74 fn now(&self) -> u64 {
114unsafe impl<T> Sync for UninitCell<T> {} 75 let mut inner = self.inner.lock().unwrap();
115 76 let zero = inner.init();
116impl<T> UninitCell<T> { 77 StdInstant::now().duration_since(zero).as_micros() as u64
117 pub const fn uninit() -> Self {
118 Self(MaybeUninit::uninit())
119 }
120 unsafe fn as_ptr(&self) -> *const T {
121 (*self.0.as_ptr()).get()
122 }
123
124 pub unsafe fn as_mut_ptr(&self) -> *mut T {
125 (*self.0.as_ptr()).get()
126 }
127
128 pub unsafe fn as_ref(&self) -> &T {
129 &*self.as_ptr()
130 } 78 }
131 79
132 pub unsafe fn write(&self, val: T) { 80 fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
133 ptr::write(self.as_mut_ptr(), val) 81 let mut inner = self.inner.lock().unwrap();
82 inner.init();
83 if inner.queue.schedule_wake(at, waker) {
84 let now = inner.now();
85 let mut next = inner.queue.next_expiration(now);
86 while !inner.set_alarm(next) {
87 let now = inner.now();
88 next = inner.queue.next_expiration(now);
89 }
90 }
134 } 91 }
135} 92}
136 93
137impl<T: Copy> UninitCell<T> { 94fn dispatch() {
138 pub unsafe fn read(&self) -> T { 95 let inner = &mut *DRIVER.inner.lock().unwrap();
139 ptr::read(self.as_mut_ptr()) 96
97 let now = inner.now();
98 let mut next = inner.queue.next_expiration(now);
99 while !inner.set_alarm(next) {
100 let now = inner.now();
101 next = inner.queue.next_expiration(now);
140 } 102 }
141} 103}
diff --git a/embassy-time/src/duration.rs b/embassy-time/src/duration.rs
index 647d208e3..dcda705d3 100644
--- a/embassy-time/src/duration.rs
+++ b/embassy-time/src/duration.rs
@@ -64,9 +64,9 @@ impl Duration {
64 64
65 /// Creates a duration from the specified number of nanoseconds, rounding up. 65 /// Creates a duration from the specified number of nanoseconds, rounding up.
66 /// NOTE: Delays this small may be inaccurate. 66 /// NOTE: Delays this small may be inaccurate.
67 pub const fn from_nanos(micros: u64) -> Duration { 67 pub const fn from_nanos(nanoseconds: u64) -> Duration {
68 Duration { 68 Duration {
69 ticks: div_ceil(micros * (TICK_HZ / GCD_1G), 1_000_000_000 / GCD_1G), 69 ticks: div_ceil(nanoseconds * (TICK_HZ / GCD_1G), 1_000_000_000 / GCD_1G),
70 } 70 }
71 } 71 }
72 72
@@ -90,6 +90,82 @@ impl Duration {
90 } 90 }
91 } 91 }
92 92
93 /// Try to create a duration from the specified number of seconds, rounding up.
94 /// Fails if the number of seconds is too large.
95 pub const fn try_from_secs(secs: u64) -> Option<Duration> {
96 let Some(ticks) = secs.checked_mul(TICK_HZ) else {
97 return None;
98 };
99 Some(Duration { ticks })
100 }
101
102 /// Try to create a duration from the specified number of milliseconds, rounding up.
103 /// Fails if the number of milliseconds is too large.
104 pub const fn try_from_millis(millis: u64) -> Option<Duration> {
105 let Some(value) = millis.checked_mul(TICK_HZ / GCD_1K) else {
106 return None;
107 };
108 Some(Duration {
109 ticks: div_ceil(value, 1000 / GCD_1K),
110 })
111 }
112
113 /// Try to create a duration from the specified number of microseconds, rounding up.
114 /// Fails if the number of microseconds is too large.
115 /// NOTE: Delays this small may be inaccurate.
116 pub const fn try_from_micros(micros: u64) -> Option<Duration> {
117 let Some(value) = micros.checked_mul(TICK_HZ / GCD_1M) else {
118 return None;
119 };
120 Some(Duration {
121 ticks: div_ceil(value, 1_000_000 / GCD_1M),
122 })
123 }
124
125 /// Try to create a duration from the specified number of nanoseconds, rounding up.
126 /// Fails if the number of nanoseconds is too large.
127 /// NOTE: Delays this small may be inaccurate.
128 pub const fn try_from_nanos(nanoseconds: u64) -> Option<Duration> {
129 let Some(value) = nanoseconds.checked_mul(TICK_HZ / GCD_1G) else {
130 return None;
131 };
132 Some(Duration {
133 ticks: div_ceil(value, 1_000_000_000 / GCD_1G),
134 })
135 }
136
137 /// Try to create a duration from the specified number of seconds, rounding down.
138 /// Fails if the number of seconds is too large.
139 pub const fn try_from_secs_floor(secs: u64) -> Option<Duration> {
140 let Some(ticks) = secs.checked_mul(TICK_HZ) else {
141 return None;
142 };
143 Some(Duration { ticks })
144 }
145
146 /// Try to create a duration from the specified number of milliseconds, rounding down.
147 /// Fails if the number of milliseconds is too large.
148 pub const fn try_from_millis_floor(millis: u64) -> Option<Duration> {
149 let Some(value) = millis.checked_mul(TICK_HZ / GCD_1K) else {
150 return None;
151 };
152 Some(Duration {
153 ticks: value / (1000 / GCD_1K),
154 })
155 }
156
157 /// Try to create a duration from the specified number of microseconds, rounding down.
158 /// Fails if the number of microseconds is too large.
159 /// NOTE: Delays this small may be inaccurate.
160 pub const fn try_from_micros_floor(micros: u64) -> Option<Duration> {
161 let Some(value) = micros.checked_mul(TICK_HZ / GCD_1M) else {
162 return None;
163 };
164 Some(Duration {
165 ticks: value / (1_000_000 / GCD_1M),
166 })
167 }
168
93 /// Creates a duration corresponding to the specified Hz. 169 /// Creates a duration corresponding to the specified Hz.
94 /// NOTE: Giving this function a hz >= the TICK_HZ of your platform will clamp the Duration to 1 170 /// NOTE: Giving this function a hz >= the TICK_HZ of your platform will clamp the Duration to 1
95 /// tick. Doing so will not deadlock, but will certainly not produce the desired output. 171 /// tick. Doing so will not deadlock, but will certainly not produce the desired output.
diff --git a/embassy-time/src/instant.rs b/embassy-time/src/instant.rs
index 909f1b173..6571bea62 100644
--- a/embassy-time/src/instant.rs
+++ b/embassy-time/src/instant.rs
@@ -17,6 +17,7 @@ impl Instant {
17 pub const MAX: Instant = Instant { ticks: u64::MAX }; 17 pub const MAX: Instant = Instant { ticks: u64::MAX };
18 18
19 /// Returns an Instant representing the current time. 19 /// Returns an Instant representing the current time.
20 #[inline]
20 pub fn now() -> Instant { 21 pub fn now() -> Instant {
21 Instant { 22 Instant {
22 ticks: embassy_time_driver::now(), 23 ticks: embassy_time_driver::now(),
@@ -49,6 +50,37 @@ impl Instant {
49 } 50 }
50 } 51 }
51 52
53 /// Try to create an Instant from a microsecond count since system boot.
54 /// Fails if the number of microseconds is too large.
55 pub const fn try_from_micros(micros: u64) -> Option<Self> {
56 let Some(value) = micros.checked_mul(TICK_HZ / GCD_1M) else {
57 return None;
58 };
59 Some(Self {
60 ticks: value / (1_000_000 / GCD_1M),
61 })
62 }
63
64 /// Try to create an Instant from a millisecond count since system boot.
65 /// Fails if the number of milliseconds is too large.
66 pub const fn try_from_millis(millis: u64) -> Option<Self> {
67 let Some(value) = millis.checked_mul(TICK_HZ / GCD_1K) else {
68 return None;
69 };
70 Some(Self {
71 ticks: value / (1000 / GCD_1K),
72 })
73 }
74
75 /// Try to create an Instant from a second count since system boot.
76 /// Fails if the number of seconds is too large.
77 pub const fn try_from_secs(seconds: u64) -> Option<Self> {
78 let Some(ticks) = seconds.checked_mul(TICK_HZ) else {
79 return None;
80 };
81 Some(Self { ticks })
82 }
83
52 /// Tick count since system boot. 84 /// Tick count since system boot.
53 pub const fn as_ticks(&self) -> u64 { 85 pub const fn as_ticks(&self) -> u64 {
54 self.ticks 86 self.ticks
@@ -114,6 +146,18 @@ impl Instant {
114 pub fn checked_sub(&self, duration: Duration) -> Option<Instant> { 146 pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
115 self.ticks.checked_sub(duration.ticks).map(|ticks| Instant { ticks }) 147 self.ticks.checked_sub(duration.ticks).map(|ticks| Instant { ticks })
116 } 148 }
149
150 /// Adds a Duration to self. In case of overflow, the maximum value is returned.
151 pub fn saturating_add(mut self, duration: Duration) -> Self {
152 self.ticks = self.ticks.saturating_add(duration.ticks);
153 self
154 }
155
156 /// Subtracts a Duration from self. In case of overflow, the minimum value is returned.
157 pub fn saturating_sub(mut self, duration: Duration) -> Self {
158 self.ticks = self.ticks.saturating_sub(duration.ticks);
159 self
160 }
117} 161}
118 162
119impl Add<Duration> for Instant { 163impl Add<Duration> for Instant {
diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs
index 8d0648ce5..80a359413 100644
--- a/embassy-time/src/lib.rs
+++ b/embassy-time/src/lib.rs
@@ -25,8 +25,6 @@ pub use driver_mock::MockDriver;
25mod driver_std; 25mod driver_std;
26#[cfg(feature = "wasm")] 26#[cfg(feature = "wasm")]
27mod driver_wasm; 27mod driver_wasm;
28#[cfg(feature = "generic-queue")]
29mod queue_generic;
30 28
31pub use delay::{block_for, Delay}; 29pub use delay::{block_for, Delay};
32pub use duration::Duration; 30pub use duration::Duration;
diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs
deleted file mode 100644
index 4882afd3e..000000000
--- a/embassy-time/src/queue_generic.rs
+++ /dev/null
@@ -1,348 +0,0 @@
1use core::cell::RefCell;
2use core::cmp::{min, Ordering};
3use core::task::Waker;
4
5use critical_section::Mutex;
6use embassy_time_driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle};
7use embassy_time_queue_driver::TimerQueue;
8use heapless::Vec;
9
10use crate::Instant;
11
12#[cfg(feature = "generic-queue-8")]
13const QUEUE_SIZE: usize = 8;
14#[cfg(feature = "generic-queue-16")]
15const QUEUE_SIZE: usize = 16;
16#[cfg(feature = "generic-queue-32")]
17const QUEUE_SIZE: usize = 32;
18#[cfg(feature = "generic-queue-64")]
19const QUEUE_SIZE: usize = 64;
20#[cfg(feature = "generic-queue-128")]
21const QUEUE_SIZE: usize = 128;
22#[cfg(not(any(
23 feature = "generic-queue-8",
24 feature = "generic-queue-16",
25 feature = "generic-queue-32",
26 feature = "generic-queue-64",
27 feature = "generic-queue-128"
28)))]
29const QUEUE_SIZE: usize = 64;
30
31#[derive(Debug)]
32struct Timer {
33 at: Instant,
34 waker: Waker,
35}
36
37impl PartialEq for Timer {
38 fn eq(&self, other: &Self) -> bool {
39 self.at == other.at
40 }
41}
42
43impl Eq for Timer {}
44
45impl PartialOrd for Timer {
46 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47 self.at.partial_cmp(&other.at)
48 }
49}
50
51impl Ord for Timer {
52 fn cmp(&self, other: &Self) -> Ordering {
53 self.at.cmp(&other.at)
54 }
55}
56
57struct InnerQueue {
58 queue: Vec<Timer, QUEUE_SIZE>,
59 alarm: AlarmHandle,
60}
61
62impl InnerQueue {
63 fn schedule_wake(&mut self, at: Instant, waker: &Waker) {
64 self.queue
65 .iter_mut()
66 .find(|timer| timer.waker.will_wake(waker))
67 .map(|timer| {
68 timer.at = min(timer.at, at);
69 })
70 .unwrap_or_else(|| {
71 let mut timer = Timer {
72 waker: waker.clone(),
73 at,
74 };
75
76 loop {
77 match self.queue.push(timer) {
78 Ok(()) => break,
79 Err(e) => timer = e,
80 }
81
82 self.queue.pop().unwrap().waker.wake();
83 }
84 });
85
86 // Don't wait for the alarm callback to trigger and directly
87 // dispatch all timers that are already due
88 //
89 // Then update the alarm if necessary
90 self.dispatch();
91 }
92
93 fn dispatch(&mut self) {
94 loop {
95 let now = Instant::now();
96
97 let mut next_alarm = Instant::MAX;
98
99 let mut i = 0;
100 while i < self.queue.len() {
101 let timer = &self.queue[i];
102 if timer.at <= now {
103 let timer = self.queue.swap_remove(i);
104 timer.waker.wake();
105 } else {
106 next_alarm = min(next_alarm, timer.at);
107 i += 1;
108 }
109 }
110
111 if self.update_alarm(next_alarm) {
112 break;
113 }
114 }
115 }
116
117 fn update_alarm(&mut self, next_alarm: Instant) -> bool {
118 if next_alarm == Instant::MAX {
119 true
120 } else {
121 set_alarm(self.alarm, next_alarm.as_ticks())
122 }
123 }
124
125 fn handle_alarm(&mut self) {
126 self.dispatch();
127 }
128}
129
130struct Queue {
131 inner: Mutex<RefCell<Option<InnerQueue>>>,
132}
133
134impl Queue {
135 const fn new() -> Self {
136 Self {
137 inner: Mutex::new(RefCell::new(None)),
138 }
139 }
140
141 fn schedule_wake(&'static self, at: Instant, waker: &Waker) {
142 critical_section::with(|cs| {
143 let mut inner = self.inner.borrow_ref_mut(cs);
144
145 if inner.is_none() {}
146
147 inner
148 .get_or_insert_with(|| {
149 let handle = unsafe { allocate_alarm() }.unwrap();
150 set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _);
151 InnerQueue {
152 queue: Vec::new(),
153 alarm: handle,
154 }
155 })
156 .schedule_wake(at, waker)
157 });
158 }
159
160 fn handle_alarm(&self) {
161 critical_section::with(|cs| self.inner.borrow_ref_mut(cs).as_mut().unwrap().handle_alarm())
162 }
163
164 fn handle_alarm_callback(ctx: *mut ()) {
165 unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm();
166 }
167}
168
169impl TimerQueue for Queue {
170 fn schedule_wake(&'static self, at: u64, waker: &Waker) {
171 Queue::schedule_wake(self, Instant::from_ticks(at), waker);
172 }
173}
174
175embassy_time_queue_driver::timer_queue_impl!(static QUEUE: Queue = Queue::new());
176
177#[cfg(test)]
178#[cfg(feature = "mock-driver")]
179mod tests {
180 use core::sync::atomic::{AtomicBool, Ordering};
181 use core::task::Waker;
182 use std::sync::Arc;
183 use std::task::Wake;
184
185 use serial_test::serial;
186
187 use crate::driver_mock::MockDriver;
188 use crate::queue_generic::QUEUE;
189 use crate::{Duration, Instant};
190
191 struct TestWaker {
192 pub awoken: AtomicBool,
193 }
194
195 impl Wake for TestWaker {
196 fn wake(self: Arc<Self>) {
197 self.awoken.store(true, Ordering::Relaxed);
198 }
199
200 fn wake_by_ref(self: &Arc<Self>) {
201 self.awoken.store(true, Ordering::Relaxed);
202 }
203 }
204
205 fn test_waker() -> (Arc<TestWaker>, Waker) {
206 let arc = Arc::new(TestWaker {
207 awoken: AtomicBool::new(false),
208 });
209 let waker = Waker::from(arc.clone());
210
211 (arc, waker)
212 }
213
214 fn setup() {
215 MockDriver::get().reset();
216 critical_section::with(|cs| *QUEUE.inner.borrow_ref_mut(cs) = None);
217 }
218
219 fn queue_len() -> usize {
220 critical_section::with(|cs| {
221 QUEUE
222 .inner
223 .borrow_ref(cs)
224 .as_ref()
225 .map(|inner| inner.queue.iter().count())
226 .unwrap_or(0)
227 })
228 }
229
230 #[test]
231 #[serial]
232 fn test_schedule() {
233 setup();
234
235 assert_eq!(queue_len(), 0);
236
237 let (flag, waker) = test_waker();
238
239 QUEUE.schedule_wake(Instant::from_secs(1), &waker);
240
241 assert!(!flag.awoken.load(Ordering::Relaxed));
242 assert_eq!(queue_len(), 1);
243 }
244
245 #[test]
246 #[serial]
247 fn test_schedule_same() {
248 setup();
249
250 let (_flag, waker) = test_waker();
251
252 QUEUE.schedule_wake(Instant::from_secs(1), &waker);
253
254 assert_eq!(queue_len(), 1);
255
256 QUEUE.schedule_wake(Instant::from_secs(1), &waker);
257
258 assert_eq!(queue_len(), 1);
259
260 QUEUE.schedule_wake(Instant::from_secs(100), &waker);
261
262 assert_eq!(queue_len(), 1);
263
264 let (_flag2, waker2) = test_waker();
265
266 QUEUE.schedule_wake(Instant::from_secs(100), &waker2);
267
268 assert_eq!(queue_len(), 2);
269 }
270
271 #[test]
272 #[serial]
273 fn test_trigger() {
274 setup();
275
276 let (flag, waker) = test_waker();
277
278 QUEUE.schedule_wake(Instant::from_secs(100), &waker);
279
280 assert!(!flag.awoken.load(Ordering::Relaxed));
281
282 MockDriver::get().advance(Duration::from_secs(99));
283
284 assert!(!flag.awoken.load(Ordering::Relaxed));
285
286 assert_eq!(queue_len(), 1);
287
288 MockDriver::get().advance(Duration::from_secs(1));
289
290 assert!(flag.awoken.load(Ordering::Relaxed));
291
292 assert_eq!(queue_len(), 0);
293 }
294
295 #[test]
296 #[serial]
297 fn test_immediate_trigger() {
298 setup();
299
300 let (flag, waker) = test_waker();
301
302 QUEUE.schedule_wake(Instant::from_secs(100), &waker);
303
304 MockDriver::get().advance(Duration::from_secs(50));
305
306 let (flag2, waker2) = test_waker();
307
308 QUEUE.schedule_wake(Instant::from_secs(40), &waker2);
309
310 assert!(!flag.awoken.load(Ordering::Relaxed));
311 assert!(flag2.awoken.load(Ordering::Relaxed));
312 assert_eq!(queue_len(), 1);
313 }
314
315 #[test]
316 #[serial]
317 fn test_queue_overflow() {
318 setup();
319
320 for i in 1..super::QUEUE_SIZE {
321 let (flag, waker) = test_waker();
322
323 QUEUE.schedule_wake(Instant::from_secs(310), &waker);
324
325 assert_eq!(queue_len(), i);
326 assert!(!flag.awoken.load(Ordering::Relaxed));
327 }
328
329 let (flag, waker) = test_waker();
330
331 QUEUE.schedule_wake(Instant::from_secs(300), &waker);
332
333 assert_eq!(queue_len(), super::QUEUE_SIZE);
334 assert!(!flag.awoken.load(Ordering::Relaxed));
335
336 let (flag2, waker2) = test_waker();
337
338 QUEUE.schedule_wake(Instant::from_secs(305), &waker2);
339
340 assert_eq!(queue_len(), super::QUEUE_SIZE);
341 assert!(flag.awoken.load(Ordering::Relaxed));
342
343 let (_flag3, waker3) = test_waker();
344 QUEUE.schedule_wake(Instant::from_secs(320), &waker3);
345 assert_eq!(queue_len(), super::QUEUE_SIZE);
346 assert!(flag2.awoken.load(Ordering::Relaxed));
347 }
348}
diff --git a/embassy-time/src/timer.rs b/embassy-time/src/timer.rs
index 4d7194b20..d1162eadd 100644
--- a/embassy-time/src/timer.rs
+++ b/embassy-time/src/timer.rs
@@ -1,8 +1,7 @@
1use core::future::{poll_fn, Future}; 1use core::future::{poll_fn, Future};
2use core::pin::{pin, Pin}; 2use core::pin::Pin;
3use core::task::{Context, Poll}; 3use core::task::{Context, Poll};
4 4
5use futures_util::future::{select, Either};
6use futures_util::stream::FusedStream; 5use futures_util::stream::FusedStream;
7use futures_util::Stream; 6use futures_util::Stream;
8 7
@@ -17,11 +16,10 @@ pub struct TimeoutError;
17/// 16///
18/// If the future completes before the timeout, its output is returned. Otherwise, on timeout, 17/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
19/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 18/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
20pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> { 19pub fn with_timeout<F: Future>(timeout: Duration, fut: F) -> TimeoutFuture<F> {
21 let timeout_fut = Timer::after(timeout); 20 TimeoutFuture {
22 match select(pin!(fut), timeout_fut).await { 21 timer: Timer::after(timeout),
23 Either::Left((r, _)) => Ok(r), 22 fut,
24 Either::Right(_) => Err(TimeoutError),
25 } 23 }
26} 24}
27 25
@@ -29,16 +27,15 @@ pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Out
29/// 27///
30/// If the future completes before the deadline, its output is returned. Otherwise, on timeout, 28/// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
31/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 29/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
32pub async fn with_deadline<F: Future>(at: Instant, fut: F) -> Result<F::Output, TimeoutError> { 30pub fn with_deadline<F: Future>(at: Instant, fut: F) -> TimeoutFuture<F> {
33 let timeout_fut = Timer::at(at); 31 TimeoutFuture {
34 match select(pin!(fut), timeout_fut).await { 32 timer: Timer::at(at),
35 Either::Left((r, _)) => Ok(r), 33 fut,
36 Either::Right(_) => Err(TimeoutError),
37 } 34 }
38} 35}
39 36
40/// Provides functions to run a given future with a timeout or a deadline. 37/// Provides functions to run a given future with a timeout or a deadline.
41pub trait WithTimeout { 38pub trait WithTimeout: Sized {
42 /// Output type of the future. 39 /// Output type of the future.
43 type Output; 40 type Output;
44 41
@@ -46,24 +43,50 @@ pub trait WithTimeout {
46 /// 43 ///
47 /// If the future completes before the timeout, its output is returned. Otherwise, on timeout, 44 /// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
48 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 45 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
49 async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError>; 46 fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self>;
50 47
51 /// Runs a given future with a deadline time. 48 /// Runs a given future with a deadline time.
52 /// 49 ///
53 /// If the future completes before the deadline, its output is returned. Otherwise, on timeout, 50 /// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
54 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 51 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
55 async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError>; 52 fn with_deadline(self, at: Instant) -> TimeoutFuture<Self>;
56} 53}
57 54
58impl<F: Future> WithTimeout for F { 55impl<F: Future> WithTimeout for F {
59 type Output = F::Output; 56 type Output = F::Output;
60 57
61 async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError> { 58 fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self> {
62 with_timeout(timeout, self).await 59 with_timeout(timeout, self)
63 } 60 }
64 61
65 async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError> { 62 fn with_deadline(self, at: Instant) -> TimeoutFuture<Self> {
66 with_deadline(at, self).await 63 with_deadline(at, self)
64 }
65}
66
67/// Future for the [`with_timeout`] and [`with_deadline`] functions.
68#[must_use = "futures do nothing unless you `.await` or poll them"]
69pub struct TimeoutFuture<F> {
70 timer: Timer,
71 fut: F,
72}
73
74impl<F: Unpin> Unpin for TimeoutFuture<F> {}
75
76impl<F: Future> Future for TimeoutFuture<F> {
77 type Output = Result<F::Output, TimeoutError>;
78
79 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 let this = unsafe { self.get_unchecked_mut() };
81 let fut = unsafe { Pin::new_unchecked(&mut this.fut) };
82 let timer = unsafe { Pin::new_unchecked(&mut this.timer) };
83 if let Poll::Ready(x) = fut.poll(cx) {
84 return Poll::Ready(Ok(x));
85 }
86 if let Poll::Ready(_) = timer.poll(cx) {
87 return Poll::Ready(Err(TimeoutError));
88 }
89 Poll::Pending
67 } 90 }
68} 91}
69 92
@@ -157,7 +180,7 @@ impl Future for Timer {
157 if self.yielded_once && self.expires_at <= Instant::now() { 180 if self.yielded_once && self.expires_at <= Instant::now() {
158 Poll::Ready(()) 181 Poll::Ready(())
159 } else { 182 } else {
160 embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); 183 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
161 self.yielded_once = true; 184 self.yielded_once = true;
162 Poll::Pending 185 Poll::Pending
163 } 186 }
@@ -200,6 +223,10 @@ impl Future for Timer {
200/// } 223/// }
201/// } 224/// }
202/// ``` 225/// ```
226///
227/// ## Cancel safety
228/// It is safe to cancel waiting for the next tick,
229/// meaning no tick is lost if the Future is dropped.
203pub struct Ticker { 230pub struct Ticker {
204 expires_at: Instant, 231 expires_at: Instant,
205 duration: Duration, 232 duration: Duration,
@@ -231,6 +258,9 @@ impl Ticker {
231 } 258 }
232 259
233 /// Waits for the next tick. 260 /// Waits for the next tick.
261 ///
262 /// ## Cancel safety
263 /// The produced Future is cancel safe, meaning no tick is lost if the Future is dropped.
234 pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ { 264 pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ {
235 poll_fn(|cx| { 265 poll_fn(|cx| {
236 if self.expires_at <= Instant::now() { 266 if self.expires_at <= Instant::now() {
@@ -238,7 +268,7 @@ impl Ticker {
238 self.expires_at += dur; 268 self.expires_at += dur;
239 Poll::Ready(()) 269 Poll::Ready(())
240 } else { 270 } else {
241 embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); 271 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
242 Poll::Pending 272 Poll::Pending
243 } 273 }
244 }) 274 })
@@ -255,7 +285,7 @@ impl Stream for Ticker {
255 self.expires_at += dur; 285 self.expires_at += dur;
256 Poll::Ready(Some(())) 286 Poll::Ready(Some(()))
257 } else { 287 } else {
258 embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); 288 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
259 Poll::Pending 289 Poll::Pending
260 } 290 }
261 } 291 }