aboutsummaryrefslogtreecommitdiff
path: root/embassy-time/src/timer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-time/src/timer.rs')
-rw-r--r--embassy-time/src/timer.rs158
1 files changed, 158 insertions, 0 deletions
diff --git a/embassy-time/src/timer.rs b/embassy-time/src/timer.rs
new file mode 100644
index 000000000..bd791b817
--- /dev/null
+++ b/embassy-time/src/timer.rs
@@ -0,0 +1,158 @@
1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll, Waker};
4
5use futures_util::future::{select, Either};
6use futures_util::{pin_mut, Stream};
7
8use crate::{Duration, Instant};
9
10/// Error returned by [`with_timeout`] on timeout.
11#[derive(Debug, Clone, PartialEq, Eq)]
12#[cfg_attr(feature = "defmt", derive(defmt::Format))]
13pub struct TimeoutError;
14
15/// Runs a given future with a timeout.
16///
17/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
18/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
19pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> {
20 let timeout_fut = Timer::after(timeout);
21 pin_mut!(fut);
22 match select(fut, timeout_fut).await {
23 Either::Left((r, _)) => Ok(r),
24 Either::Right(_) => Err(TimeoutError),
25 }
26}
27
28/// A future that completes at a specified [Instant](struct.Instant.html).
29pub struct Timer {
30 expires_at: Instant,
31 yielded_once: bool,
32}
33
34impl Timer {
35 /// Expire at specified [Instant](struct.Instant.html)
36 pub fn at(expires_at: Instant) -> Self {
37 Self {
38 expires_at,
39 yielded_once: false,
40 }
41 }
42
43 /// Expire after specified [Duration](struct.Duration.html).
44 /// This can be used as a `sleep` abstraction.
45 ///
46 /// Example:
47 /// ``` no_run
48 /// # #![feature(type_alias_impl_trait)]
49 /// #
50 /// # fn foo() {}
51 /// use embassy_time::{Duration, Timer};
52 ///
53 /// #[embassy_executor::task]
54 /// async fn demo_sleep_seconds() {
55 /// // suspend this task for one second.
56 /// Timer::after(Duration::from_secs(1)).await;
57 /// }
58 /// ```
59 pub fn after(duration: Duration) -> Self {
60 Self {
61 expires_at: Instant::now() + duration,
62 yielded_once: false,
63 }
64 }
65}
66
67impl Unpin for Timer {}
68
69impl Future for Timer {
70 type Output = ();
71 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 if self.yielded_once && self.expires_at <= Instant::now() {
73 Poll::Ready(())
74 } else {
75 schedule_wake(self.expires_at, cx.waker());
76 self.yielded_once = true;
77 Poll::Pending
78 }
79 }
80}
81
82/// Asynchronous stream that yields every Duration, indefinitely.
83///
84/// This stream will tick at uniform intervals, even if blocking work is performed between ticks.
85///
86/// For instance, consider the following code fragment.
87/// ``` no_run
88/// # #![feature(type_alias_impl_trait)]
89/// #
90/// use embassy_time::{Duration, Timer};
91/// # fn foo() {}
92///
93/// #[embassy_executor::task]
94/// async fn ticker_example_0() {
95/// loop {
96/// foo();
97/// Timer::after(Duration::from_secs(1)).await;
98/// }
99/// }
100/// ```
101///
102/// This fragment will not call `foo` every second.
103/// Instead, it will call it every second + the time it took to previously call `foo`.
104///
105/// Example using ticker, which will consistently call `foo` once a second.
106///
107/// ``` no_run
108/// # #![feature(type_alias_impl_trait)]
109/// #
110/// use embassy_time::{Duration, Ticker};
111/// use futures::StreamExt;
112/// # fn foo(){}
113///
114/// #[embassy_executor::task]
115/// async fn ticker_example_1() {
116/// let mut ticker = Ticker::every(Duration::from_secs(1));
117/// loop {
118/// foo();
119/// ticker.next().await;
120/// }
121/// }
122/// ```
123pub struct Ticker {
124 expires_at: Instant,
125 duration: Duration,
126}
127
128impl Ticker {
129 /// Creates a new ticker that ticks at the specified duration interval.
130 pub fn every(duration: Duration) -> Self {
131 let expires_at = Instant::now() + duration;
132 Self { expires_at, duration }
133 }
134}
135
136impl Unpin for Ticker {}
137
138impl Stream for Ticker {
139 type Item = ();
140 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 if self.expires_at <= Instant::now() {
142 let dur = self.duration;
143 self.expires_at += dur;
144 Poll::Ready(Some(()))
145 } else {
146 schedule_wake(self.expires_at, cx.waker());
147 Poll::Pending
148 }
149 }
150}
151
152extern "Rust" {
153 fn _embassy_time_schedule_wake(at: Instant, waker: &Waker);
154}
155
156fn schedule_wake(at: Instant, waker: &Waker) {
157 unsafe { _embassy_time_schedule_wake(at, waker) }
158}