aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src/timer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-time/src/timer.rs')
-rw-r--r--embassy-time/src/timer.rs74
1 files changed, 52 insertions, 22 deletions
diff --git a/embassy-time/src/timer.rs b/embassy-time/src/timer.rs
index 4d7194b20..d1162eadd 100644
--- a/embassy-time/src/timer.rs
+++ b/embassy-time/src/timer.rs
@@ -1,8 +1,7 @@
1use core::future::{poll_fn, Future}; 1use core::future::{poll_fn, Future};
2use core::pin::{pin, Pin}; 2use core::pin::Pin;
3use core::task::{Context, Poll}; 3use core::task::{Context, Poll};
4 4
5use futures_util::future::{select, Either};
6use futures_util::stream::FusedStream; 5use futures_util::stream::FusedStream;
7use futures_util::Stream; 6use futures_util::Stream;
8 7
@@ -17,11 +16,10 @@ pub struct TimeoutError;
17/// 16///
18/// If the future completes before the timeout, its output is returned. Otherwise, on timeout, 17/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
19/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 18/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
20pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> { 19pub fn with_timeout<F: Future>(timeout: Duration, fut: F) -> TimeoutFuture<F> {
21 let timeout_fut = Timer::after(timeout); 20 TimeoutFuture {
22 match select(pin!(fut), timeout_fut).await { 21 timer: Timer::after(timeout),
23 Either::Left((r, _)) => Ok(r), 22 fut,
24 Either::Right(_) => Err(TimeoutError),
25 } 23 }
26} 24}
27 25
@@ -29,16 +27,15 @@ pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Out
29/// 27///
30/// If the future completes before the deadline, its output is returned. Otherwise, on timeout, 28/// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
31/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 29/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
32pub async fn with_deadline<F: Future>(at: Instant, fut: F) -> Result<F::Output, TimeoutError> { 30pub fn with_deadline<F: Future>(at: Instant, fut: F) -> TimeoutFuture<F> {
33 let timeout_fut = Timer::at(at); 31 TimeoutFuture {
34 match select(pin!(fut), timeout_fut).await { 32 timer: Timer::at(at),
35 Either::Left((r, _)) => Ok(r), 33 fut,
36 Either::Right(_) => Err(TimeoutError),
37 } 34 }
38} 35}
39 36
40/// Provides functions to run a given future with a timeout or a deadline. 37/// Provides functions to run a given future with a timeout or a deadline.
41pub trait WithTimeout { 38pub trait WithTimeout: Sized {
42 /// Output type of the future. 39 /// Output type of the future.
43 type Output; 40 type Output;
44 41
@@ -46,24 +43,50 @@ pub trait WithTimeout {
46 /// 43 ///
47 /// If the future completes before the timeout, its output is returned. Otherwise, on timeout, 44 /// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
48 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 45 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
49 async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError>; 46 fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self>;
50 47
51 /// Runs a given future with a deadline time. 48 /// Runs a given future with a deadline time.
52 /// 49 ///
53 /// If the future completes before the deadline, its output is returned. Otherwise, on timeout, 50 /// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
54 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. 51 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
55 async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError>; 52 fn with_deadline(self, at: Instant) -> TimeoutFuture<Self>;
56} 53}
57 54
58impl<F: Future> WithTimeout for F { 55impl<F: Future> WithTimeout for F {
59 type Output = F::Output; 56 type Output = F::Output;
60 57
61 async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError> { 58 fn with_timeout(self, timeout: Duration) -> TimeoutFuture<Self> {
62 with_timeout(timeout, self).await 59 with_timeout(timeout, self)
63 } 60 }
64 61
65 async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError> { 62 fn with_deadline(self, at: Instant) -> TimeoutFuture<Self> {
66 with_deadline(at, self).await 63 with_deadline(at, self)
64 }
65}
66
67/// Future for the [`with_timeout`] and [`with_deadline`] functions.
68#[must_use = "futures do nothing unless you `.await` or poll them"]
69pub struct TimeoutFuture<F> {
70 timer: Timer,
71 fut: F,
72}
73
74impl<F: Unpin> Unpin for TimeoutFuture<F> {}
75
76impl<F: Future> Future for TimeoutFuture<F> {
77 type Output = Result<F::Output, TimeoutError>;
78
79 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 let this = unsafe { self.get_unchecked_mut() };
81 let fut = unsafe { Pin::new_unchecked(&mut this.fut) };
82 let timer = unsafe { Pin::new_unchecked(&mut this.timer) };
83 if let Poll::Ready(x) = fut.poll(cx) {
84 return Poll::Ready(Ok(x));
85 }
86 if let Poll::Ready(_) = timer.poll(cx) {
87 return Poll::Ready(Err(TimeoutError));
88 }
89 Poll::Pending
67 } 90 }
68} 91}
69 92
@@ -157,7 +180,7 @@ impl Future for Timer {
157 if self.yielded_once && self.expires_at <= Instant::now() { 180 if self.yielded_once && self.expires_at <= Instant::now() {
158 Poll::Ready(()) 181 Poll::Ready(())
159 } else { 182 } else {
160 embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); 183 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
161 self.yielded_once = true; 184 self.yielded_once = true;
162 Poll::Pending 185 Poll::Pending
163 } 186 }
@@ -200,6 +223,10 @@ impl Future for Timer {
200/// } 223/// }
201/// } 224/// }
202/// ``` 225/// ```
226///
227/// ## Cancel safety
228/// It is safe to cancel waiting for the next tick,
229/// meaning no tick is lost if the Future is dropped.
203pub struct Ticker { 230pub struct Ticker {
204 expires_at: Instant, 231 expires_at: Instant,
205 duration: Duration, 232 duration: Duration,
@@ -231,6 +258,9 @@ impl Ticker {
231 } 258 }
232 259
233 /// Waits for the next tick. 260 /// Waits for the next tick.
261 ///
262 /// ## Cancel safety
263 /// The produced Future is cancel safe, meaning no tick is lost if the Future is dropped.
234 pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ { 264 pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ {
235 poll_fn(|cx| { 265 poll_fn(|cx| {
236 if self.expires_at <= Instant::now() { 266 if self.expires_at <= Instant::now() {
@@ -238,7 +268,7 @@ impl Ticker {
238 self.expires_at += dur; 268 self.expires_at += dur;
239 Poll::Ready(()) 269 Poll::Ready(())
240 } else { 270 } else {
241 embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); 271 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
242 Poll::Pending 272 Poll::Pending
243 } 273 }
244 }) 274 })
@@ -255,7 +285,7 @@ impl Stream for Ticker {
255 self.expires_at += dur; 285 self.expires_at += dur;
256 Poll::Ready(Some(())) 286 Poll::Ready(Some(()))
257 } else { 287 } else {
258 embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); 288 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
259 Poll::Pending 289 Poll::Pending
260 } 290 }
261 } 291 }