aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/executor/arch/cortex_m.rs59
-rw-r--r--embassy-executor/src/executor/arch/riscv32.rs74
-rw-r--r--embassy-executor/src/executor/arch/std.rs84
-rw-r--r--embassy-executor/src/executor/arch/wasm.rs74
-rw-r--r--embassy-executor/src/executor/arch/xtensa.rs75
-rw-r--r--embassy-executor/src/executor/mod.rs44
-rw-r--r--embassy-executor/src/executor/raw/mod.rs433
-rw-r--r--embassy-executor/src/executor/raw/run_queue.rs74
-rw-r--r--embassy-executor/src/executor/raw/timer_queue.rs85
-rw-r--r--embassy-executor/src/executor/raw/util.rs33
-rw-r--r--embassy-executor/src/executor/raw/waker.rs53
-rw-r--r--embassy-executor/src/executor/spawner.rs202
-rw-r--r--embassy-executor/src/fmt.rs228
-rw-r--r--embassy-executor/src/lib.rs22
-rw-r--r--embassy-executor/src/time/delay.rs98
-rw-r--r--embassy-executor/src/time/driver.rs170
-rw-r--r--embassy-executor/src/time/driver_std.rs208
-rw-r--r--embassy-executor/src/time/driver_wasm.rs134
-rw-r--r--embassy-executor/src/time/duration.rs184
-rw-r--r--embassy-executor/src/time/instant.rs159
-rw-r--r--embassy-executor/src/time/mod.rs91
-rw-r--r--embassy-executor/src/time/timer.rs151
22 files changed, 2735 insertions, 0 deletions
diff --git a/embassy-executor/src/executor/arch/cortex_m.rs b/embassy-executor/src/executor/arch/cortex_m.rs
new file mode 100644
index 000000000..d6e758dfb
--- /dev/null
+++ b/embassy-executor/src/executor/arch/cortex_m.rs
@@ -0,0 +1,59 @@
1use core::arch::asm;
2use core::marker::PhantomData;
3use core::ptr;
4
5use super::{raw, Spawner};
6
7/// Thread mode executor, using WFE/SEV.
8///
9/// This is the simplest and most common kind of executor. It runs on
10/// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction
11/// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction
12/// is executed, to make the `WFE` exit from sleep and poll the task.
13///
14/// This executor allows for ultra low power consumption for chips where `WFE`
15/// triggers low-power sleep without extra steps. If your chip requires extra steps,
16/// you may use [`raw::Executor`] directly to program custom behavior.
17pub struct Executor {
18 inner: raw::Executor,
19 not_send: PhantomData<*mut ()>,
20}
21
22impl Executor {
23 /// Create a new Executor.
24 pub fn new() -> Self {
25 Self {
26 inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()),
27 not_send: PhantomData,
28 }
29 }
30
31 /// Run the executor.
32 ///
33 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
34 /// this executor. Use it to spawn the initial task(s). After `init` returns,
35 /// the executor starts running the tasks.
36 ///
37 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
38 /// for example by passing it as an argument to the initial tasks.
39 ///
40 /// This function requires `&'static mut self`. This means you have to store the
41 /// Executor instance in a place where it'll live forever and grants you mutable
42 /// access. There's a few ways to do this:
43 ///
44 /// - a [Forever](crate::util::Forever) (safe)
45 /// - a `static mut` (unsafe)
46 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
47 ///
48 /// This function never returns.
49 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
50 init(self.inner.spawner());
51
52 loop {
53 unsafe {
54 self.inner.poll();
55 asm!("wfe");
56 };
57 }
58 }
59}
diff --git a/embassy-executor/src/executor/arch/riscv32.rs b/embassy-executor/src/executor/arch/riscv32.rs
new file mode 100644
index 000000000..7a7d5698c
--- /dev/null
+++ b/embassy-executor/src/executor/arch/riscv32.rs
@@ -0,0 +1,74 @@
1use core::marker::PhantomData;
2use core::ptr;
3
4use atomic_polyfill::{AtomicBool, Ordering};
5
6use super::{raw, Spawner};
7
8/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
9///
10static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
11
12/// RISCV32 Executor
13pub struct Executor {
14 inner: raw::Executor,
15 not_send: PhantomData<*mut ()>,
16}
17
18impl Executor {
19 /// Create a new Executor.
20 pub fn new() -> Self {
21 Self {
22 // use Signal_Work_Thread_Mode as substitute for local interrupt register
23 inner: raw::Executor::new(
24 |_| {
25 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
26 },
27 ptr::null_mut(),
28 ),
29 not_send: PhantomData,
30 }
31 }
32
33 /// Run the executor.
34 ///
35 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
36 /// this executor. Use it to spawn the initial task(s). After `init` returns,
37 /// the executor starts running the tasks.
38 ///
39 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
40 /// for example by passing it as an argument to the initial tasks.
41 ///
42 /// This function requires `&'static mut self`. This means you have to store the
43 /// Executor instance in a place where it'll live forever and grants you mutable
44 /// access. There's a few ways to do this:
45 ///
46 /// - a [Forever](crate::util::Forever) (safe)
47 /// - a `static mut` (unsafe)
48 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
49 ///
50 /// This function never returns.
51 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
52 init(self.inner.spawner());
53
54 loop {
55 unsafe {
56 self.inner.poll();
57 // we do not care about race conditions between the load and store operations, interrupts
58 //will only set this value to true.
59 critical_section::with(|_| {
60 // if there is work to do, loop back to polling
61 // TODO can we relax this?
62 if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
63 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
64 }
65 // if not, wait for interrupt
66 else {
67 core::arch::asm!("wfi");
68 }
69 });
70 // if an interrupt occurred while waiting, it will be serviced here
71 }
72 }
73 }
74}
diff --git a/embassy-executor/src/executor/arch/std.rs b/embassy-executor/src/executor/arch/std.rs
new file mode 100644
index 000000000..b93ab8a79
--- /dev/null
+++ b/embassy-executor/src/executor/arch/std.rs
@@ -0,0 +1,84 @@
1use std::marker::PhantomData;
2use std::sync::{Condvar, Mutex};
3
4use super::{raw, Spawner};
5
6/// Single-threaded std-based executor.
7pub struct Executor {
8 inner: raw::Executor,
9 not_send: PhantomData<*mut ()>,
10 signaler: &'static Signaler,
11}
12
13impl Executor {
14 /// Create a new Executor.
15 pub fn new() -> Self {
16 let signaler = &*Box::leak(Box::new(Signaler::new()));
17 Self {
18 inner: raw::Executor::new(
19 |p| unsafe {
20 let s = &*(p as *const () as *const Signaler);
21 s.signal()
22 },
23 signaler as *const _ as _,
24 ),
25 not_send: PhantomData,
26 signaler,
27 }
28 }
29
30 /// Run the executor.
31 ///
32 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
33 /// this executor. Use it to spawn the initial task(s). After `init` returns,
34 /// the executor starts running the tasks.
35 ///
36 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
37 /// for example by passing it as an argument to the initial tasks.
38 ///
39 /// This function requires `&'static mut self`. This means you have to store the
40 /// Executor instance in a place where it'll live forever and grants you mutable
41 /// access. There's a few ways to do this:
42 ///
43 /// - a [Forever](crate::util::Forever) (safe)
44 /// - a `static mut` (unsafe)
45 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
46 ///
47 /// This function never returns.
48 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
49 init(self.inner.spawner());
50
51 loop {
52 unsafe { self.inner.poll() };
53 self.signaler.wait()
54 }
55 }
56}
57
58struct Signaler {
59 mutex: Mutex<bool>,
60 condvar: Condvar,
61}
62
63impl Signaler {
64 fn new() -> Self {
65 Self {
66 mutex: Mutex::new(false),
67 condvar: Condvar::new(),
68 }
69 }
70
71 fn wait(&self) {
72 let mut signaled = self.mutex.lock().unwrap();
73 while !*signaled {
74 signaled = self.condvar.wait(signaled).unwrap();
75 }
76 *signaled = false;
77 }
78
79 fn signal(&self) {
80 let mut signaled = self.mutex.lock().unwrap();
81 *signaled = true;
82 self.condvar.notify_one();
83 }
84}
diff --git a/embassy-executor/src/executor/arch/wasm.rs b/embassy-executor/src/executor/arch/wasm.rs
new file mode 100644
index 000000000..9d5aa31ed
--- /dev/null
+++ b/embassy-executor/src/executor/arch/wasm.rs
@@ -0,0 +1,74 @@
1use core::marker::PhantomData;
2
3use js_sys::Promise;
4use wasm_bindgen::prelude::*;
5
6use super::raw::util::UninitCell;
7use super::raw::{self};
8use super::Spawner;
9
10/// WASM executor, wasm_bindgen to schedule tasks on the JS event loop.
11pub struct Executor {
12 inner: raw::Executor,
13 ctx: &'static WasmContext,
14 not_send: PhantomData<*mut ()>,
15}
16
17pub(crate) struct WasmContext {
18 promise: Promise,
19 closure: UninitCell<Closure<dyn FnMut(JsValue)>>,
20}
21
22impl WasmContext {
23 pub fn new() -> Self {
24 Self {
25 promise: Promise::resolve(&JsValue::undefined()),
26 closure: UninitCell::uninit(),
27 }
28 }
29}
30
31impl Executor {
32 /// Create a new Executor.
33 pub fn new() -> Self {
34 let ctx = &*Box::leak(Box::new(WasmContext::new()));
35 let inner = raw::Executor::new(
36 |p| unsafe {
37 let ctx = &*(p as *const () as *const WasmContext);
38 let _ = ctx.promise.then(ctx.closure.as_mut());
39 },
40 ctx as *const _ as _,
41 );
42 Self {
43 inner,
44 not_send: PhantomData,
45 ctx,
46 }
47 }
48
49 /// Run the executor.
50 ///
51 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
52 /// this executor. Use it to spawn the initial task(s). After `init` returns,
53 /// the executor starts running the tasks.
54 ///
55 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
56 /// for example by passing it as an argument to the initial tasks.
57 ///
58 /// This function requires `&'static mut self`. This means you have to store the
59 /// Executor instance in a place where it'll live forever and grants you mutable
60 /// access. There's a few ways to do this:
61 ///
62 /// - a [Forever](crate::util::Forever) (safe)
63 /// - a `static mut` (unsafe)
64 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
65 pub fn start(&'static mut self, init: impl FnOnce(Spawner)) {
66 unsafe {
67 let executor = &self.inner;
68 self.ctx.closure.write(Closure::new(move |_| {
69 executor.poll();
70 }));
71 init(self.inner.spawner());
72 }
73 }
74}
diff --git a/embassy-executor/src/executor/arch/xtensa.rs b/embassy-executor/src/executor/arch/xtensa.rs
new file mode 100644
index 000000000..20bd7b8a5
--- /dev/null
+++ b/embassy-executor/src/executor/arch/xtensa.rs
@@ -0,0 +1,75 @@
1use core::marker::PhantomData;
2use core::ptr;
3
4use atomic_polyfill::{AtomicBool, Ordering};
5
6use super::{raw, Spawner};
7
8/// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa
9///
10static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
11
12/// Xtensa Executor
13pub struct Executor {
14 inner: raw::Executor,
15 not_send: PhantomData<*mut ()>,
16}
17
18impl Executor {
19 /// Create a new Executor.
20 pub fn new() -> Self {
21 Self {
22 // use Signal_Work_Thread_Mode as substitute for local interrupt register
23 inner: raw::Executor::new(
24 |_| {
25 SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
26 },
27 ptr::null_mut(),
28 ),
29 not_send: PhantomData,
30 }
31 }
32
33 /// Run the executor.
34 ///
35 /// The `init` closure is called with a [`Spawner`] that spawns tasks on
36 /// this executor. Use it to spawn the initial task(s). After `init` returns,
37 /// the executor starts running the tasks.
38 ///
39 /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`),
40 /// for example by passing it as an argument to the initial tasks.
41 ///
42 /// This function requires `&'static mut self`. This means you have to store the
43 /// Executor instance in a place where it'll live forever and grants you mutable
44 /// access. There's a few ways to do this:
45 ///
46 /// - a [Forever](crate::util::Forever) (safe)
47 /// - a `static mut` (unsafe)
48 /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
49 ///
50 /// This function never returns.
51 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
52 init(self.inner.spawner());
53
54 loop {
55 unsafe {
56 self.inner.poll();
57 // we do not care about race conditions between the load and store operations, interrupts
58 // will only set this value to true.
59 // if there is work to do, loop back to polling
60 // TODO can we relax this?
61 critical_section::with(|_| {
62 if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
63 SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
64 } else {
65 // waiti sets the PS.INTLEVEL when slipping into sleep
66 // because critical sections in Xtensa are implemented via increasing
67 // PS.INTLEVEL the critical section ends here
68 // take care not add code after `waiti` if it needs to be inside the CS
69 core::arch::asm!("waiti 0"); // critical section ends here
70 }
71 });
72 }
73 }
74 }
75}
diff --git a/embassy-executor/src/executor/mod.rs b/embassy-executor/src/executor/mod.rs
new file mode 100644
index 000000000..45d00c568
--- /dev/null
+++ b/embassy-executor/src/executor/mod.rs
@@ -0,0 +1,44 @@
1//! Async task executor.
2//!
3//! This module provides an async/await executor designed for embedded usage.
4//!
5//! - No `alloc`, no heap needed. Task futures are statically allocated.
6//! - No "fixed capacity" data structures, executor works with 1 or 1000 tasks without needing config/tuning.
7//! - Integrated timer queue: sleeping is easy, just do `Timer::after(Duration::from_secs(1)).await;`.
8//! - No busy-loop polling: CPU sleeps when there's no work to do, using interrupts or `WFE/SEV`.
9//! - Efficient polling: a wake will only poll the woken task, not all of them.
10//! - Fair: a task can't monopolize CPU time even if it's constantly being woken. All other tasks get a chance to run before a given task gets polled for the second time.
11//! - Creating multiple executor instances is supported, to run tasks with multiple priority levels. This allows higher-priority tasks to preempt lower-priority tasks.
12
13cfg_if::cfg_if! {
14 if #[cfg(cortex_m)] {
15 #[path="arch/cortex_m.rs"]
16 mod arch;
17 pub use arch::*;
18 }
19 else if #[cfg(target_arch="riscv32")] {
20 #[path="arch/riscv32.rs"]
21 mod arch;
22 pub use arch::*;
23 }
24 else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] {
25 #[path="arch/xtensa.rs"]
26 mod arch;
27 pub use arch::*;
28 }
29 else if #[cfg(feature="wasm")] {
30 #[path="arch/wasm.rs"]
31 mod arch;
32 pub use arch::*;
33 }
34 else if #[cfg(feature="std")] {
35 #[path="arch/std.rs"]
36 mod arch;
37 pub use arch::*;
38 }
39}
40
41pub mod raw;
42
43mod spawner;
44pub use spawner::*;
diff --git a/embassy-executor/src/executor/raw/mod.rs b/embassy-executor/src/executor/raw/mod.rs
new file mode 100644
index 000000000..87317bc02
--- /dev/null
+++ b/embassy-executor/src/executor/raw/mod.rs
@@ -0,0 +1,433 @@
1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! executor wrappers in [`executor`](crate::executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe.
9
10mod run_queue;
11#[cfg(feature = "time")]
12mod timer_queue;
13pub(crate) mod util;
14mod waker;
15
16use core::cell::Cell;
17use core::future::Future;
18use core::pin::Pin;
19use core::ptr::NonNull;
20use core::task::{Context, Poll};
21use core::{mem, ptr};
22
23use atomic_polyfill::{AtomicU32, Ordering};
24use critical_section::CriticalSection;
25
26use self::run_queue::{RunQueue, RunQueueItem};
27use self::util::UninitCell;
28pub use self::waker::task_from_waker;
29use super::SpawnToken;
30#[cfg(feature = "time")]
31use crate::time::driver::{self, AlarmHandle};
32#[cfg(feature = "time")]
33use crate::time::Instant;
34
35/// Task is spawned (has a future)
36pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
37/// Task is in the executor run queue
38pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
39/// Task is in the executor timer queue
40#[cfg(feature = "time")]
41pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
42
43/// Raw task header for use in task pointers.
44///
45/// This is an opaque struct, used for raw pointers to tasks, for use
46/// with funtions like [`wake_task`] and [`task_from_waker`].
47pub struct TaskHeader {
48 pub(crate) state: AtomicU32,
49 pub(crate) run_queue_item: RunQueueItem,
50 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
51 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED
52
53 #[cfg(feature = "time")]
54 pub(crate) expires_at: Cell<Instant>,
55 #[cfg(feature = "time")]
56 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
57}
58
59impl TaskHeader {
60 pub(crate) const fn new() -> Self {
61 Self {
62 state: AtomicU32::new(0),
63 run_queue_item: RunQueueItem::new(),
64 executor: Cell::new(ptr::null()),
65 poll_fn: UninitCell::uninit(),
66
67 #[cfg(feature = "time")]
68 expires_at: Cell::new(Instant::from_ticks(0)),
69 #[cfg(feature = "time")]
70 timer_queue_item: timer_queue::TimerQueueItem::new(),
71 }
72 }
73
74 pub(crate) unsafe fn enqueue(&self) {
75 critical_section::with(|cs| {
76 let state = self.state.load(Ordering::Relaxed);
77
78 // If already scheduled, or if not started,
79 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
80 return;
81 }
82
83 // Mark it as scheduled
84 self.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
85
86 // We have just marked the task as scheduled, so enqueue it.
87 let executor = &*self.executor.get();
88 executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader);
89 })
90 }
91}
92
93/// Raw storage in which a task can be spawned.
94///
95/// This struct holds the necessary memory to spawn one task whose future is `F`.
96/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
97/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
98///
99/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
100/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
101///
102/// Internally, the [embassy_executor::task](embassy_macros::task) macro allocates an array of `TaskStorage`s
103/// in a `static`. The most common reason to use the raw `Task` is to have control of where
104/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
105
106// repr(C) is needed to guarantee that the Task is located at offset 0
107// This makes it safe to cast between TaskHeader and TaskStorage pointers.
108#[repr(C)]
109pub struct TaskStorage<F: Future + 'static> {
110 raw: TaskHeader,
111 future: UninitCell<F>, // Valid if STATE_SPAWNED
112}
113
114impl<F: Future + 'static> TaskStorage<F> {
115 const NEW: Self = Self::new();
116
117 /// Create a new TaskStorage, in not-spawned state.
118 pub const fn new() -> Self {
119 Self {
120 raw: TaskHeader::new(),
121 future: UninitCell::uninit(),
122 }
123 }
124
125 /// Try to spawn the task.
126 ///
127 /// The `future` closure constructs the future. It's only called if spawning is
128 /// actually possible. It is a closure instead of a simple `future: F` param to ensure
129 /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
130 /// NRVO optimizations.
131 ///
132 /// This function will fail if the task is already spawned and has not finished running.
133 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
134 /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
135 ///
136 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
137 /// on a different executor.
138 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
139 if self.spawn_mark_used() {
140 return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) };
141 }
142
143 SpawnToken::<F>::new_failed()
144 }
145
146 fn spawn_mark_used(&'static self) -> bool {
147 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
148 self.raw
149 .state
150 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
151 .is_ok()
152 }
153
154 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
155 // Initialize the task
156 self.raw.poll_fn.write(Self::poll);
157 self.future.write(future());
158 NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader)
159 }
160
161 unsafe fn poll(p: NonNull<TaskHeader>) {
162 let this = &*(p.as_ptr() as *const TaskStorage<F>);
163
164 let future = Pin::new_unchecked(this.future.as_mut());
165 let waker = waker::from_task(p);
166 let mut cx = Context::from_waker(&waker);
167 match future.poll(&mut cx) {
168 Poll::Ready(_) => {
169 this.future.drop_in_place();
170 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
171 }
172 Poll::Pending => {}
173 }
174
175 // the compiler is emitting a virtual call for waker drop, but we know
176 // it's a noop for our waker.
177 mem::forget(waker);
178 }
179}
180
181unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
182
183/// Raw storage that can hold up to N tasks of the same type.
184///
185/// This is essentially a `[TaskStorage<F>; N]`.
186pub struct TaskPool<F: Future + 'static, const N: usize> {
187 pool: [TaskStorage<F>; N],
188}
189
190impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
191 /// Create a new TaskPool, with all tasks in non-spawned state.
192 pub const fn new() -> Self {
193 Self {
194 pool: [TaskStorage::NEW; N],
195 }
196 }
197
198 /// Try to spawn a task in the pool.
199 ///
200 /// See [`TaskStorage::spawn()`] for details.
201 ///
202 /// This will loop over the pool and spawn the task in the first storage that
203 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
204 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
205 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
206 for task in &self.pool {
207 if task.spawn_mark_used() {
208 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
209 }
210 }
211
212 SpawnToken::<F>::new_failed()
213 }
214
215 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
216 /// the future is !Send.
217 ///
218 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
219 /// by the Embassy macros ONLY.
220 ///
221 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
222 /// is an `async fn`, NOT a hand-written `Future`.
223 #[doc(hidden)]
224 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
225 where
226 FutFn: FnOnce() -> F,
227 {
228 // When send-spawning a task, we construct the future in this thread, and effectively
229 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
230 // send-spawning should require the future `F` to be `Send`.
231 //
232 // The problem is this is more restrictive than needed. Once the future is executing,
233 // it is never sent to another thread. It is only sent when spawning. It should be
234 // enough for the task's arguments to be Send. (and in practice it's super easy to
235 // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
236 //
237 // We can do it by sending the task args and constructing the future in the executor thread
238 // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
239 // of the args.
240 //
241 // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
242 // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
243 //
244 // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
245 // but it's possible it'll be guaranteed in the future. See zulip thread:
246 // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
247 //
248 // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
249 // This is why we return `SpawnToken<FutFn>` below.
250 //
251 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
252 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
253
254 for task in &self.pool {
255 if task.spawn_mark_used() {
256 return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
257 }
258 }
259
260 SpawnToken::<FutFn>::new_failed()
261 }
262}
263
264/// Raw executor.
265///
266/// This is the core of the Embassy executor. It is low-level, requiring manual
267/// handling of wakeups and task polling. If you can, prefer using one of the
268/// higher level executors in [`crate::executor`].
269///
270/// The raw executor leaves it up to you to handle wakeups and scheduling:
271///
272/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
273/// that "want to run").
274/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
275/// to do. You must arrange for `poll()` to be called as soon as possible.
276///
277/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
278/// level, etc. It may be called synchronously from any `Executor` method call as well.
279/// You must deal with this correctly.
280///
281/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
282/// the requirement for `poll` to not be called reentrantly.
283pub struct Executor {
284 run_queue: RunQueue,
285 signal_fn: fn(*mut ()),
286 signal_ctx: *mut (),
287
288 #[cfg(feature = "time")]
289 pub(crate) timer_queue: timer_queue::TimerQueue,
290 #[cfg(feature = "time")]
291 alarm: AlarmHandle,
292}
293
294impl Executor {
295 /// Create a new executor.
296 ///
297 /// When the executor has work to do, it will call `signal_fn` with
298 /// `signal_ctx` as argument.
299 ///
300 /// See [`Executor`] docs for details on `signal_fn`.
301 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
302 #[cfg(feature = "time")]
303 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
304 #[cfg(feature = "time")]
305 driver::set_alarm_callback(alarm, signal_fn, signal_ctx);
306
307 Self {
308 run_queue: RunQueue::new(),
309 signal_fn,
310 signal_ctx,
311
312 #[cfg(feature = "time")]
313 timer_queue: timer_queue::TimerQueue::new(),
314 #[cfg(feature = "time")]
315 alarm,
316 }
317 }
318
319 /// Enqueue a task in the task queue
320 ///
321 /// # Safety
322 /// - `task` must be a valid pointer to a spawned task.
323 /// - `task` must be set up to run in this executor.
324 /// - `task` must NOT be already enqueued (in this executor or another one).
325 #[inline(always)]
326 unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) {
327 if self.run_queue.enqueue(cs, task) {
328 (self.signal_fn)(self.signal_ctx)
329 }
330 }
331
332 /// Spawn a task in this executor.
333 ///
334 /// # Safety
335 ///
336 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
337 ///
338 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
339 /// In this case, the task's Future must be Send. This is because this is effectively
340 /// sending the task to the executor thread.
341 pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
342 let task = task.as_ref();
343 task.executor.set(self);
344
345 critical_section::with(|cs| {
346 self.enqueue(cs, task as *const _ as _);
347 })
348 }
349
350 /// Poll all queued tasks in this executor.
351 ///
352 /// This loops over all tasks that are queued to be polled (i.e. they're
353 /// freshly spawned or they've been woken). Other tasks are not polled.
354 ///
355 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
356 /// to call `poll` even when not requested by `signal_fn`, but it wastes
357 /// energy.
358 ///
359 /// # Safety
360 ///
361 /// You must NOT call `poll` reentrantly on the same executor.
362 ///
363 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
364 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
365 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
366 /// no `poll()` already running.
367 pub unsafe fn poll(&'static self) {
368 #[cfg(feature = "time")]
369 self.timer_queue.dequeue_expired(Instant::now(), |p| {
370 p.as_ref().enqueue();
371 });
372
373 self.run_queue.dequeue_all(|p| {
374 let task = p.as_ref();
375
376 #[cfg(feature = "time")]
377 task.expires_at.set(Instant::MAX);
378
379 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
380 if state & STATE_SPAWNED == 0 {
381 // If task is not running, ignore it. This can happen in the following scenario:
382 // - Task gets dequeued, poll starts
383 // - While task is being polled, it gets woken. It gets placed in the queue.
384 // - Task poll finishes, returning done=true
385 // - RUNNING bit is cleared, but the task is already in the queue.
386 return;
387 }
388
389 // Run the task
390 task.poll_fn.read()(p as _);
391
392 // Enqueue or update into timer_queue
393 #[cfg(feature = "time")]
394 self.timer_queue.update(p);
395 });
396
397 #[cfg(feature = "time")]
398 {
399 // If this is already in the past, set_alarm will immediately trigger the alarm.
400 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again,
401 // so we immediately do another poll loop iteration.
402 let next_expiration = self.timer_queue.next_expiration();
403 driver::set_alarm(self.alarm, next_expiration.as_ticks());
404 }
405 }
406
407 /// Get a spawner that spawns tasks in this executor.
408 ///
409 /// It is OK to call this method multiple times to obtain multiple
410 /// `Spawner`s. You may also copy `Spawner`s.
411 pub fn spawner(&'static self) -> super::Spawner {
412 super::Spawner::new(self)
413 }
414}
415
416/// Wake a task by raw pointer.
417///
418/// You can obtain task pointers from `Waker`s using [`task_from_waker`].
419///
420/// # Safety
421///
422/// `task` must be a valid task pointer obtained from [`task_from_waker`].
423pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
424 task.as_ref().enqueue();
425}
426
427#[cfg(feature = "time")]
428pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) {
429 let task = waker::task_from_waker(waker);
430 let task = task.as_ref();
431 let expires_at = task.expires_at.get();
432 task.expires_at.set(expires_at.min(at));
433}
diff --git a/embassy-executor/src/executor/raw/run_queue.rs b/embassy-executor/src/executor/raw/run_queue.rs
new file mode 100644
index 000000000..31615da7e
--- /dev/null
+++ b/embassy-executor/src/executor/raw/run_queue.rs
@@ -0,0 +1,74 @@
1use core::ptr;
2use core::ptr::NonNull;
3
4use atomic_polyfill::{AtomicPtr, Ordering};
5use critical_section::CriticalSection;
6
7use super::TaskHeader;
8
9pub(crate) struct RunQueueItem {
10 next: AtomicPtr<TaskHeader>,
11}
12
13impl RunQueueItem {
14 pub const fn new() -> Self {
15 Self {
16 next: AtomicPtr::new(ptr::null_mut()),
17 }
18 }
19}
20
21/// Atomic task queue using a very, very simple lock-free linked-list queue:
22///
23/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
24///
25/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
26/// null. Then the batch is iterated following the next pointers until null is reached.
27///
28/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
29/// for our purposes: it can't create fairness problems since the next batch won't run until the
30/// current batch is completely processed, so even if a task enqueues itself instantly (for example
31/// by waking its own waker) can't prevent other tasks from running.
32pub(crate) struct RunQueue {
33 head: AtomicPtr<TaskHeader>,
34}
35
36impl RunQueue {
37 pub const fn new() -> Self {
38 Self {
39 head: AtomicPtr::new(ptr::null_mut()),
40 }
41 }
42
43 /// Enqueues an item. Returns true if the queue was empty.
44 ///
45 /// # Safety
46 ///
47 /// `item` must NOT be already enqueued in any queue.
48 #[inline(always)]
49 pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool {
50 let prev = self.head.load(Ordering::Relaxed);
51 (*task).run_queue_item.next.store(prev, Ordering::Relaxed);
52 self.head.store(task, Ordering::Relaxed);
53 prev.is_null()
54 }
55
56 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) {
60 // Atomically empty the queue.
61 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
62
63 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = NonNull::new(ptr) {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task.
67 let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed);
68
69 on_task(task);
70
71 ptr = next
72 }
73 }
74}
diff --git a/embassy-executor/src/executor/raw/timer_queue.rs b/embassy-executor/src/executor/raw/timer_queue.rs
new file mode 100644
index 000000000..62fcfc531
--- /dev/null
+++ b/embassy-executor/src/executor/raw/timer_queue.rs
@@ -0,0 +1,85 @@
1use core::cell::Cell;
2use core::cmp::min;
3use core::ptr;
4use core::ptr::NonNull;
5
6use atomic_polyfill::Ordering;
7
8use super::{TaskHeader, STATE_TIMER_QUEUED};
9use crate::time::Instant;
10
11pub(crate) struct TimerQueueItem {
12 next: Cell<*mut TaskHeader>,
13}
14
15impl TimerQueueItem {
16 pub const fn new() -> Self {
17 Self {
18 next: Cell::new(ptr::null_mut()),
19 }
20 }
21}
22
23pub(crate) struct TimerQueue {
24 head: Cell<*mut TaskHeader>,
25}
26
27impl TimerQueue {
28 pub const fn new() -> Self {
29 Self {
30 head: Cell::new(ptr::null_mut()),
31 }
32 }
33
34 pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) {
35 let task = p.as_ref();
36 if task.expires_at.get() != Instant::MAX {
37 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
38 let is_new = old_state & STATE_TIMER_QUEUED == 0;
39
40 if is_new {
41 task.timer_queue_item.next.set(self.head.get());
42 self.head.set(p.as_ptr());
43 }
44 }
45 }
46
47 pub(crate) unsafe fn next_expiration(&self) -> Instant {
48 let mut res = Instant::MAX;
49 self.retain(|p| {
50 let task = p.as_ref();
51 let expires = task.expires_at.get();
52 res = min(res, expires);
53 expires != Instant::MAX
54 });
55 res
56 }
57
58 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) {
59 self.retain(|p| {
60 let task = p.as_ref();
61 if task.expires_at.get() <= now {
62 on_task(p);
63 false
64 } else {
65 true
66 }
67 });
68 }
69
70 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) {
71 let mut prev = &self.head;
72 while !prev.get().is_null() {
73 let p = NonNull::new_unchecked(prev.get());
74 let task = &*p.as_ptr();
75 if f(p) {
76 // Skip to next
77 prev = &task.timer_queue_item.next;
78 } else {
79 // Remove it
80 prev.set(task.timer_queue_item.next.get());
81 task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
82 }
83 }
84 }
85}
diff --git a/embassy-executor/src/executor/raw/util.rs b/embassy-executor/src/executor/raw/util.rs
new file mode 100644
index 000000000..ed5822188
--- /dev/null
+++ b/embassy-executor/src/executor/raw/util.rs
@@ -0,0 +1,33 @@
1use core::cell::UnsafeCell;
2use core::mem::MaybeUninit;
3use core::ptr;
4
5pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
6impl<T> UninitCell<T> {
7 pub const fn uninit() -> Self {
8 Self(MaybeUninit::uninit())
9 }
10
11 pub unsafe fn as_mut_ptr(&self) -> *mut T {
12 (*self.0.as_ptr()).get()
13 }
14
15 #[allow(clippy::mut_from_ref)]
16 pub unsafe fn as_mut(&self) -> &mut T {
17 &mut *self.as_mut_ptr()
18 }
19
20 pub unsafe fn write(&self, val: T) {
21 ptr::write(self.as_mut_ptr(), val)
22 }
23
24 pub unsafe fn drop_in_place(&self) {
25 ptr::drop_in_place(self.as_mut_ptr())
26 }
27}
28
29impl<T: Copy> UninitCell<T> {
30 pub unsafe fn read(&self) -> T {
31 ptr::read(self.as_mut_ptr())
32 }
33}
diff --git a/embassy-executor/src/executor/raw/waker.rs b/embassy-executor/src/executor/raw/waker.rs
new file mode 100644
index 000000000..f6ae332fa
--- /dev/null
+++ b/embassy-executor/src/executor/raw/waker.rs
@@ -0,0 +1,53 @@
1use core::mem;
2use core::ptr::NonNull;
3use core::task::{RawWaker, RawWakerVTable, Waker};
4
5use super::TaskHeader;
6
7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
8
9unsafe fn clone(p: *const ()) -> RawWaker {
10 RawWaker::new(p, &VTABLE)
11}
12
13unsafe fn wake(p: *const ()) {
14 (*(p as *mut TaskHeader)).enqueue()
15}
16
17unsafe fn drop(_: *const ()) {
18 // nop
19}
20
21pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker {
22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
23}
24
25/// Get a task pointer from a waker.
26///
27/// This can be used as an optimization in wait queues to store task pointers
28/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
29/// avoid dynamic dispatch.
30///
31/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task).
32///
33/// # Panics
34///
35/// Panics if the waker is not created by the Embassy executor.
36pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> {
37 // safety: OK because WakerHack has the same layout as Waker.
38 // This is not really guaranteed because the structs are `repr(Rust)`, it is
39 // indeed the case in the current implementation.
40 // TODO use waker_getters when stable. https://github.com/rust-lang/rust/issues/96992
41 let hack: &WakerHack = unsafe { mem::transmute(waker) };
42 if hack.vtable != &VTABLE {
43 panic!("Found waker not created by the Embassy executor. `embassy_executor::time::Timer` only works with the Embassy executor.")
44 }
45
46 // safety: we never create a waker with a null data pointer.
47 unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) }
48}
49
50struct WakerHack {
51 data: *const (),
52 vtable: &'static RawWakerVTable,
53}
diff --git a/embassy-executor/src/executor/spawner.rs b/embassy-executor/src/executor/spawner.rs
new file mode 100644
index 000000000..25a0d7dbb
--- /dev/null
+++ b/embassy-executor/src/executor/spawner.rs
@@ -0,0 +1,202 @@
1use core::marker::PhantomData;
2use core::mem;
3use core::ptr::NonNull;
4use core::task::Poll;
5
6use futures_util::future::poll_fn;
7
8use super::raw;
9
10/// Token to spawn a newly-created task in an executor.
11///
12/// When calling a task function (like `#[embassy_executor::task] async fn my_task() { ... }`), the returned
13/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
14/// then spawn it into an executor, typically with [`Spawner::spawn()`].
15///
16/// The generic parameter `S` determines whether the task can be spawned in executors
17/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`].
18/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`].
19///
20/// # Panics
21///
22/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
23/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
24#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
25pub struct SpawnToken<S> {
26 raw_task: Option<NonNull<raw::TaskHeader>>,
27 phantom: PhantomData<*mut S>,
28}
29
30impl<S> SpawnToken<S> {
31 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
32 Self {
33 raw_task: Some(raw_task),
34 phantom: PhantomData,
35 }
36 }
37
38 pub(crate) fn new_failed() -> Self {
39 Self {
40 raw_task: None,
41 phantom: PhantomData,
42 }
43 }
44}
45
46impl<S> Drop for SpawnToken<S> {
47 fn drop(&mut self) {
48 // TODO deallocate the task instead.
49 panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
50 }
51}
52
53/// Error returned when spawning a task.
54#[derive(Copy, Clone, Debug)]
55#[cfg_attr(feature = "defmt", derive(defmt::Format))]
56pub enum SpawnError {
57 /// Too many instances of this task are already running.
58 ///
59 /// By default, a task marked with `#[embassy_executor::task]` can only have one instance
60 /// running at a time. You may allow multiple instances to run in parallel with
61 /// `#[embassy_executor::task(pool_size = 4)]`, at the cost of higher RAM usage.
62 Busy,
63}
64
65/// Handle to spawn tasks into an executor.
66///
67/// This Spawner can spawn any task (Send and non-Send ones), but it can
68/// only be used in the executor thread (it is not Send itself).
69///
70/// If you want to spawn tasks from another thread, use [SendSpawner].
71#[derive(Copy, Clone)]
72pub struct Spawner {
73 executor: &'static raw::Executor,
74 not_send: PhantomData<*mut ()>,
75}
76
77impl Spawner {
78 pub(crate) fn new(executor: &'static raw::Executor) -> Self {
79 Self {
80 executor,
81 not_send: PhantomData,
82 }
83 }
84
85 /// Get a Spawner for the current executor.
86 ///
87 /// This function is `async` just to get access to the current async
88 /// context. It returns instantly, it does not block/yield.
89 ///
90 /// # Panics
91 ///
92 /// Panics if the current executor is not an Embassy executor.
93 pub async fn for_current_executor() -> Self {
94 poll_fn(|cx| unsafe {
95 let task = raw::task_from_waker(cx.waker());
96 let executor = (*task.as_ptr()).executor.get();
97 Poll::Ready(Self::new(&*executor))
98 })
99 .await
100 }
101
102 /// Spawn a task into an executor.
103 ///
104 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
105 pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
106 let task = token.raw_task;
107 mem::forget(token);
108
109 match task {
110 Some(task) => {
111 unsafe { self.executor.spawn(task) };
112 Ok(())
113 }
114 None => Err(SpawnError::Busy),
115 }
116 }
117
118 // Used by the `embassy_macros::main!` macro to throw an error when spawn
119 // fails. This is here to allow conditional use of `defmt::unwrap!`
120 // without introducing a `defmt` feature in the `embassy_macros` package,
121 // which would require use of `-Z namespaced-features`.
122 /// Spawn a task into an executor, panicking on failure.
123 ///
124 /// # Panics
125 ///
126 /// Panics if the spawning fails.
127 pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
128 unwrap!(self.spawn(token));
129 }
130
131 /// Convert this Spawner to a SendSpawner. This allows you to send the
132 /// spawner to other threads, but the spawner loses the ability to spawn
133 /// non-Send tasks.
134 pub fn make_send(&self) -> SendSpawner {
135 SendSpawner {
136 executor: self.executor,
137 }
138 }
139}
140
141/// Handle to spawn tasks into an executor from any thread.
142///
143/// This Spawner can be used from any thread (it is Send), but it can
144/// only spawn Send tasks. The reason for this is spawning is effectively
145/// "sending" the tasks to the executor thread.
146///
147/// If you want to spawn non-Send tasks, use [Spawner].
148#[derive(Copy, Clone)]
149pub struct SendSpawner {
150 executor: &'static raw::Executor,
151}
152
153unsafe impl Send for SendSpawner {}
154unsafe impl Sync for SendSpawner {}
155
156impl SendSpawner {
157 pub(crate) fn new(executor: &'static raw::Executor) -> Self {
158 Self { executor }
159 }
160
161 /// Get a Spawner for the current executor.
162 ///
163 /// This function is `async` just to get access to the current async
164 /// context. It returns instantly, it does not block/yield.
165 ///
166 /// # Panics
167 ///
168 /// Panics if the current executor is not an Embassy executor.
169 pub async fn for_current_executor() -> Self {
170 poll_fn(|cx| unsafe {
171 let task = raw::task_from_waker(cx.waker());
172 let executor = (*task.as_ptr()).executor.get();
173 Poll::Ready(Self::new(&*executor))
174 })
175 .await
176 }
177
178 /// Spawn a task into an executor.
179 ///
180 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy_executor::task]`).
181 pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
182 let header = token.raw_task;
183 mem::forget(token);
184
185 match header {
186 Some(header) => {
187 unsafe { self.executor.spawn(header) };
188 Ok(())
189 }
190 None => Err(SpawnError::Busy),
191 }
192 }
193
194 /// Spawn a task into an executor, panicking on failure.
195 ///
196 /// # Panics
197 ///
198 /// Panics if the spawning fails.
199 pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
200 unwrap!(self.spawn(token));
201 }
202}
diff --git a/embassy-executor/src/fmt.rs b/embassy-executor/src/fmt.rs
new file mode 100644
index 000000000..f8bb0a035
--- /dev/null
+++ b/embassy-executor/src/fmt.rs
@@ -0,0 +1,228 @@
1#![macro_use]
2#![allow(unused_macros)]
3
4#[cfg(all(feature = "defmt", feature = "log"))]
5compile_error!("You may not enable both `defmt` and `log` features.");
6
7macro_rules! assert {
8 ($($x:tt)*) => {
9 {
10 #[cfg(not(feature = "defmt"))]
11 ::core::assert!($($x)*);
12 #[cfg(feature = "defmt")]
13 ::defmt::assert!($($x)*);
14 }
15 };
16}
17
18macro_rules! assert_eq {
19 ($($x:tt)*) => {
20 {
21 #[cfg(not(feature = "defmt"))]
22 ::core::assert_eq!($($x)*);
23 #[cfg(feature = "defmt")]
24 ::defmt::assert_eq!($($x)*);
25 }
26 };
27}
28
29macro_rules! assert_ne {
30 ($($x:tt)*) => {
31 {
32 #[cfg(not(feature = "defmt"))]
33 ::core::assert_ne!($($x)*);
34 #[cfg(feature = "defmt")]
35 ::defmt::assert_ne!($($x)*);
36 }
37 };
38}
39
40macro_rules! debug_assert {
41 ($($x:tt)*) => {
42 {
43 #[cfg(not(feature = "defmt"))]
44 ::core::debug_assert!($($x)*);
45 #[cfg(feature = "defmt")]
46 ::defmt::debug_assert!($($x)*);
47 }
48 };
49}
50
51macro_rules! debug_assert_eq {
52 ($($x:tt)*) => {
53 {
54 #[cfg(not(feature = "defmt"))]
55 ::core::debug_assert_eq!($($x)*);
56 #[cfg(feature = "defmt")]
57 ::defmt::debug_assert_eq!($($x)*);
58 }
59 };
60}
61
62macro_rules! debug_assert_ne {
63 ($($x:tt)*) => {
64 {
65 #[cfg(not(feature = "defmt"))]
66 ::core::debug_assert_ne!($($x)*);
67 #[cfg(feature = "defmt")]
68 ::defmt::debug_assert_ne!($($x)*);
69 }
70 };
71}
72
73macro_rules! todo {
74 ($($x:tt)*) => {
75 {
76 #[cfg(not(feature = "defmt"))]
77 ::core::todo!($($x)*);
78 #[cfg(feature = "defmt")]
79 ::defmt::todo!($($x)*);
80 }
81 };
82}
83
84macro_rules! unreachable {
85 ($($x:tt)*) => {
86 {
87 #[cfg(not(feature = "defmt"))]
88 ::core::unreachable!($($x)*);
89 #[cfg(feature = "defmt")]
90 ::defmt::unreachable!($($x)*);
91 }
92 };
93}
94
95macro_rules! panic {
96 ($($x:tt)*) => {
97 {
98 #[cfg(not(feature = "defmt"))]
99 ::core::panic!($($x)*);
100 #[cfg(feature = "defmt")]
101 ::defmt::panic!($($x)*);
102 }
103 };
104}
105
106macro_rules! trace {
107 ($s:literal $(, $x:expr)* $(,)?) => {
108 {
109 #[cfg(feature = "log")]
110 ::log::trace!($s $(, $x)*);
111 #[cfg(feature = "defmt")]
112 ::defmt::trace!($s $(, $x)*);
113 #[cfg(not(any(feature = "log", feature="defmt")))]
114 let _ = ($( & $x ),*);
115 }
116 };
117}
118
119macro_rules! debug {
120 ($s:literal $(, $x:expr)* $(,)?) => {
121 {
122 #[cfg(feature = "log")]
123 ::log::debug!($s $(, $x)*);
124 #[cfg(feature = "defmt")]
125 ::defmt::debug!($s $(, $x)*);
126 #[cfg(not(any(feature = "log", feature="defmt")))]
127 let _ = ($( & $x ),*);
128 }
129 };
130}
131
132macro_rules! info {
133 ($s:literal $(, $x:expr)* $(,)?) => {
134 {
135 #[cfg(feature = "log")]
136 ::log::info!($s $(, $x)*);
137 #[cfg(feature = "defmt")]
138 ::defmt::info!($s $(, $x)*);
139 #[cfg(not(any(feature = "log", feature="defmt")))]
140 let _ = ($( & $x ),*);
141 }
142 };
143}
144
145macro_rules! warn {
146 ($s:literal $(, $x:expr)* $(,)?) => {
147 {
148 #[cfg(feature = "log")]
149 ::log::warn!($s $(, $x)*);
150 #[cfg(feature = "defmt")]
151 ::defmt::warn!($s $(, $x)*);
152 #[cfg(not(any(feature = "log", feature="defmt")))]
153 let _ = ($( & $x ),*);
154 }
155 };
156}
157
158macro_rules! error {
159 ($s:literal $(, $x:expr)* $(,)?) => {
160 {
161 #[cfg(feature = "log")]
162 ::log::error!($s $(, $x)*);
163 #[cfg(feature = "defmt")]
164 ::defmt::error!($s $(, $x)*);
165 #[cfg(not(any(feature = "log", feature="defmt")))]
166 let _ = ($( & $x ),*);
167 }
168 };
169}
170
171#[cfg(feature = "defmt")]
172macro_rules! unwrap {
173 ($($x:tt)*) => {
174 ::defmt::unwrap!($($x)*)
175 };
176}
177
178#[cfg(not(feature = "defmt"))]
179macro_rules! unwrap {
180 ($arg:expr) => {
181 match $crate::fmt::Try::into_result($arg) {
182 ::core::result::Result::Ok(t) => t,
183 ::core::result::Result::Err(e) => {
184 ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
185 }
186 }
187 };
188 ($arg:expr, $($msg:expr),+ $(,)? ) => {
189 match $crate::fmt::Try::into_result($arg) {
190 ::core::result::Result::Ok(t) => t,
191 ::core::result::Result::Err(e) => {
192 ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
193 }
194 }
195 }
196}
197
198#[cfg(feature = "defmt-timestamp-uptime")]
199defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
200
201#[derive(Debug, Copy, Clone, Eq, PartialEq)]
202pub struct NoneError;
203
204pub trait Try {
205 type Ok;
206 type Error;
207 fn into_result(self) -> Result<Self::Ok, Self::Error>;
208}
209
210impl<T> Try for Option<T> {
211 type Ok = T;
212 type Error = NoneError;
213
214 #[inline]
215 fn into_result(self) -> Result<T, NoneError> {
216 self.ok_or(NoneError)
217 }
218}
219
220impl<T, E> Try for Result<T, E> {
221 type Ok = T;
222 type Error = E;
223
224 #[inline]
225 fn into_result(self) -> Self {
226 self
227 }
228}
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs
new file mode 100644
index 000000000..69e4aeb4b
--- /dev/null
+++ b/embassy-executor/src/lib.rs
@@ -0,0 +1,22 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
3#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))]
4#![allow(clippy::new_without_default)]
5#![doc = include_str!("../../README.md")]
6#![warn(missing_docs)]
7
8// This mod MUST go first, so that the others see its macros.
9pub(crate) mod fmt;
10
11pub mod executor;
12#[cfg(feature = "time")]
13pub mod time;
14
15#[cfg(feature = "nightly")]
16pub use embassy_macros::{main, task};
17
18#[doc(hidden)]
19/// Implementation details for embassy macros. DO NOT USE.
20pub mod export {
21 pub use atomic_polyfill as atomic;
22}
diff --git a/embassy-executor/src/time/delay.rs b/embassy-executor/src/time/delay.rs
new file mode 100644
index 000000000..d76ed32eb
--- /dev/null
+++ b/embassy-executor/src/time/delay.rs
@@ -0,0 +1,98 @@
1use super::{Duration, Instant};
2
3/// Blocks for at least `duration`.
4pub fn block_for(duration: Duration) {
5 let expires_at = Instant::now() + duration;
6 while Instant::now() < expires_at {}
7}
8
9/// Type implementing async delays and blocking `embedded-hal` delays.
10///
11/// The delays are implemented in a "best-effort" way, meaning that the cpu will block for at least
12/// the amount provided, but accuracy can be affected by many factors, including interrupt usage.
13/// Make sure to use a suitable tick rate for your use case. The tick rate is defined by the currently
14/// active driver.
15pub struct Delay;
16
17#[cfg(feature = "unstable-traits")]
18mod eh1 {
19 use super::*;
20
21 impl embedded_hal_1::delay::blocking::DelayUs for Delay {
22 type Error = core::convert::Infallible;
23
24 fn delay_us(&mut self, us: u32) -> Result<(), Self::Error> {
25 Ok(block_for(Duration::from_micros(us as u64)))
26 }
27
28 fn delay_ms(&mut self, ms: u32) -> Result<(), Self::Error> {
29 Ok(block_for(Duration::from_millis(ms as u64)))
30 }
31 }
32}
33
34cfg_if::cfg_if! {
35 if #[cfg(all(feature = "unstable-traits", feature = "nightly"))] {
36 use crate::time::Timer;
37 use core::future::Future;
38 use futures_util::FutureExt;
39
40 impl embedded_hal_async::delay::DelayUs for Delay {
41 type Error = core::convert::Infallible;
42
43 type DelayUsFuture<'a> = impl Future<Output = Result<(), Self::Error>> + 'a where Self: 'a;
44
45 fn delay_us(&mut self, micros: u32) -> Self::DelayUsFuture<'_> {
46 Timer::after(Duration::from_micros(micros as _)).map(Ok)
47 }
48
49 type DelayMsFuture<'a> = impl Future<Output = Result<(), Self::Error>> + 'a where Self: 'a;
50
51 fn delay_ms(&mut self, millis: u32) -> Self::DelayMsFuture<'_> {
52 Timer::after(Duration::from_millis(millis as _)).map(Ok)
53 }
54 }
55 }
56}
57
58mod eh02 {
59 use embedded_hal_02::blocking::delay::{DelayMs, DelayUs};
60
61 use super::*;
62
63 impl DelayMs<u8> for Delay {
64 fn delay_ms(&mut self, ms: u8) {
65 block_for(Duration::from_millis(ms as u64))
66 }
67 }
68
69 impl DelayMs<u16> for Delay {
70 fn delay_ms(&mut self, ms: u16) {
71 block_for(Duration::from_millis(ms as u64))
72 }
73 }
74
75 impl DelayMs<u32> for Delay {
76 fn delay_ms(&mut self, ms: u32) {
77 block_for(Duration::from_millis(ms as u64))
78 }
79 }
80
81 impl DelayUs<u8> for Delay {
82 fn delay_us(&mut self, us: u8) {
83 block_for(Duration::from_micros(us as u64))
84 }
85 }
86
87 impl DelayUs<u16> for Delay {
88 fn delay_us(&mut self, us: u16) {
89 block_for(Duration::from_micros(us as u64))
90 }
91 }
92
93 impl DelayUs<u32> for Delay {
94 fn delay_us(&mut self, us: u32) {
95 block_for(Duration::from_micros(us as u64))
96 }
97 }
98}
diff --git a/embassy-executor/src/time/driver.rs b/embassy-executor/src/time/driver.rs
new file mode 100644
index 000000000..48e2f1c7d
--- /dev/null
+++ b/embassy-executor/src/time/driver.rs
@@ -0,0 +1,170 @@
1//! Time driver interface
2//!
3//! This module defines the interface a driver needs to implement to power the `embassy_executor::time` module.
4//!
5//! # Implementing a driver
6//!
7//! - Define a struct `MyDriver`
8//! - Implement [`Driver`] for it
9//! - Register it as the global driver with [`time_driver_impl`].
10//! - Enable the Cargo features `embassy-executor/time` and one of `embassy-executor/time-tick-*` corresponding to the
11//! tick rate of your driver.
12//!
13//! If you wish to make the tick rate configurable by the end user, you should do so by exposing your own
14//! Cargo features and having each enable the corresponding `embassy-executor/time-tick-*`.
15//!
16//! # Linkage details
17//!
18//! Instead of the usual "trait + generic params" approach, calls from embassy to the driver are done via `extern` functions.
19//!
20//! `embassy` internally defines the driver functions as `extern "Rust" { fn _embassy_time_now() -> u64; }` and calls them.
21//! The driver crate defines the functions as `#[no_mangle] fn _embassy_time_now() -> u64`. The linker will resolve the
22//! calls from the `embassy` crate to call into the driver crate.
23//!
24//! If there is none or multiple drivers in the crate tree, linking will fail.
25//!
26//! This method has a few key advantages for something as foundational as timekeeping:
27//!
28//! - The time driver is available everywhere easily, without having to thread the implementation
29//! through generic parameters. This is especially helpful for libraries.
30//! - It means comparing `Instant`s will always make sense: if there were multiple drivers
31//! active, one could compare an `Instant` from driver A to an `Instant` from driver B, which
32//! would yield incorrect results.
33//!
34//! # Example
35//!
36//! ```
37//! use embassy_executor::time::driver::{Driver, AlarmHandle};
38//!
39//! struct MyDriver{}; // not public!
40//! embassy_executor::time_driver_impl!(static DRIVER: MyDriver = MyDriver{});
41//!
42//! impl Driver for MyDriver {
43//! fn now(&self) -> u64 {
44//! todo!()
45//! }
46//! unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
47//! todo!()
48//! }
49//! fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
50//! todo!()
51//! }
52//! fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
53//! todo!()
54//! }
55//! }
56//! ```
57
58/// Alarm handle, assigned by the driver.
59#[derive(Clone, Copy)]
60pub struct AlarmHandle {
61 id: u8,
62}
63
64impl AlarmHandle {
65 /// Create an AlarmHandle
66 ///
67 /// Safety: May only be called by the current global Driver impl.
68 /// The impl is allowed to rely on the fact that all `AlarmHandle` instances
69 /// are created by itself in unsafe code (e.g. indexing operations)
70 pub unsafe fn new(id: u8) -> Self {
71 Self { id }
72 }
73
74 /// Get the ID of the AlarmHandle.
75 pub fn id(&self) -> u8 {
76 self.id
77 }
78}
79
80/// Time driver
81pub trait Driver: Send + Sync + 'static {
82 /// Return the current timestamp in ticks.
83 ///
84 /// Implementations MUST ensure that:
85 /// - This is guaranteed to be monotonic, i.e. a call to now() will always return
86 /// a greater or equal value than earler calls. Time can't "roll backwards".
87 /// - It "never" overflows. It must not overflow in a sufficiently long time frame, say
88 /// in 10_000 years (Human civilization is likely to already have self-destructed
89 /// 10_000 years from now.). This means if your hardware only has 16bit/32bit timers
90 /// you MUST extend them to 64-bit, for example by counting overflows in software,
91 /// or chaining multiple timers together.
92 fn now(&self) -> u64;
93
94 /// Try allocating an alarm handle. Returns None if no alarms left.
95 /// Initially the alarm has no callback set, and a null `ctx` pointer.
96 ///
97 /// # Safety
98 /// It is UB to make the alarm fire before setting a callback.
99 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle>;
100
101 /// Sets the callback function to be called when the alarm triggers.
102 /// The callback may be called from any context (interrupt or thread mode).
103 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ());
104
105 /// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm
106 /// timestamp, the provided callback function will be called.
107 ///
108 /// If `timestamp` is already in the past, the alarm callback must be immediately fired.
109 /// In this case, it is allowed (but not mandatory) to call the alarm callback synchronously from `set_alarm`.
110 ///
111 /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp.
112 ///
113 /// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any.
114 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64);
115}
116
117extern "Rust" {
118 fn _embassy_time_now() -> u64;
119 fn _embassy_time_allocate_alarm() -> Option<AlarmHandle>;
120 fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ());
121 fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64);
122}
123
124pub(crate) fn now() -> u64 {
125 unsafe { _embassy_time_now() }
126}
127/// Safety: it is UB to make the alarm fire before setting a callback.
128pub(crate) unsafe fn allocate_alarm() -> Option<AlarmHandle> {
129 _embassy_time_allocate_alarm()
130}
131pub(crate) fn set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
132 unsafe { _embassy_time_set_alarm_callback(alarm, callback, ctx) }
133}
134pub(crate) fn set_alarm(alarm: AlarmHandle, timestamp: u64) {
135 unsafe { _embassy_time_set_alarm(alarm, timestamp) }
136}
137
138/// Set the time Driver implementation.
139///
140/// See the module documentation for an example.
141#[macro_export]
142macro_rules! time_driver_impl {
143 (static $name:ident: $t: ty = $val:expr) => {
144 static $name: $t = $val;
145
146 #[no_mangle]
147 fn _embassy_time_now() -> u64 {
148 <$t as $crate::time::driver::Driver>::now(&$name)
149 }
150
151 #[no_mangle]
152 unsafe fn _embassy_time_allocate_alarm() -> Option<$crate::time::driver::AlarmHandle> {
153 <$t as $crate::time::driver::Driver>::allocate_alarm(&$name)
154 }
155
156 #[no_mangle]
157 fn _embassy_time_set_alarm_callback(
158 alarm: $crate::time::driver::AlarmHandle,
159 callback: fn(*mut ()),
160 ctx: *mut (),
161 ) {
162 <$t as $crate::time::driver::Driver>::set_alarm_callback(&$name, alarm, callback, ctx)
163 }
164
165 #[no_mangle]
166 fn _embassy_time_set_alarm(alarm: $crate::time::driver::AlarmHandle, timestamp: u64) {
167 <$t as $crate::time::driver::Driver>::set_alarm(&$name, alarm, timestamp)
168 }
169 };
170}
diff --git a/embassy-executor/src/time/driver_std.rs b/embassy-executor/src/time/driver_std.rs
new file mode 100644
index 000000000..cb66f7c19
--- /dev/null
+++ b/embassy-executor/src/time/driver_std.rs
@@ -0,0 +1,208 @@
1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::{Condvar, Mutex, Once};
4use std::time::{Duration as StdDuration, Instant as StdInstant};
5use std::{mem, ptr, thread};
6
7use atomic_polyfill::{AtomicU8, Ordering};
8
9use crate::time::driver::{AlarmHandle, Driver};
10
11const ALARM_COUNT: usize = 4;
12
13struct AlarmState {
14 timestamp: u64,
15
16 // This is really a Option<(fn(*mut ()), *mut ())>
17 // but fn pointers aren't allowed in const yet
18 callback: *const (),
19 ctx: *mut (),
20}
21
22unsafe impl Send for AlarmState {}
23
24impl AlarmState {
25 const fn new() -> Self {
26 Self {
27 timestamp: u64::MAX,
28 callback: ptr::null(),
29 ctx: ptr::null_mut(),
30 }
31 }
32}
33
34struct TimeDriver {
35 alarm_count: AtomicU8,
36
37 once: Once,
38 alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
39 zero_instant: UninitCell<StdInstant>,
40 signaler: UninitCell<Signaler>,
41}
42
43const ALARM_NEW: AlarmState = AlarmState::new();
44crate::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
45 alarm_count: AtomicU8::new(0),
46
47 once: Once::new(),
48 alarms: UninitCell::uninit(),
49 zero_instant: UninitCell::uninit(),
50 signaler: UninitCell::uninit(),
51});
52
53impl TimeDriver {
54 fn init(&self) {
55 self.once.call_once(|| unsafe {
56 self.alarms.write(Mutex::new([ALARM_NEW; ALARM_COUNT]));
57 self.zero_instant.write(StdInstant::now());
58 self.signaler.write(Signaler::new());
59
60 thread::spawn(Self::alarm_thread);
61 });
62 }
63
64 fn alarm_thread() {
65 let zero = unsafe { DRIVER.zero_instant.read() };
66 loop {
67 let now = DRIVER.now();
68
69 let mut next_alarm = u64::MAX;
70 {
71 let alarms = &mut *unsafe { DRIVER.alarms.as_ref() }.lock().unwrap();
72 for alarm in alarms {
73 if alarm.timestamp <= now {
74 alarm.timestamp = u64::MAX;
75
76 // Call after clearing alarm, so the callback can set another alarm.
77
78 // safety:
79 // - we can ignore the possiblity of `f` being unset (null) because of the safety contract of `allocate_alarm`.
80 // - other than that we only store valid function pointers into alarm.callback
81 let f: fn(*mut ()) = unsafe { mem::transmute(alarm.callback) };
82 f(alarm.ctx);
83 } else {
84 next_alarm = next_alarm.min(alarm.timestamp);
85 }
86 }
87 }
88
89 // Ensure we don't overflow
90 let until = zero
91 .checked_add(StdDuration::from_micros(next_alarm))
92 .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
93
94 unsafe { DRIVER.signaler.as_ref() }.wait_until(until);
95 }
96 }
97}
98
99impl Driver for TimeDriver {
100 fn now(&self) -> u64 {
101 self.init();
102
103 let zero = unsafe { self.zero_instant.read() };
104 StdInstant::now().duration_since(zero).as_micros() as u64
105 }
106
107 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
108 let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
109 if x < ALARM_COUNT as u8 {
110 Some(x + 1)
111 } else {
112 None
113 }
114 });
115
116 match id {
117 Ok(id) => Some(AlarmHandle::new(id)),
118 Err(_) => None,
119 }
120 }
121
122 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
123 self.init();
124 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
125 let alarm = &mut alarms[alarm.id() as usize];
126 alarm.callback = callback as *const ();
127 alarm.ctx = ctx;
128 }
129
130 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
131 self.init();
132 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
133 let alarm = &mut alarms[alarm.id() as usize];
134 alarm.timestamp = timestamp;
135 unsafe { self.signaler.as_ref() }.signal();
136 }
137}
138
139struct Signaler {
140 mutex: Mutex<bool>,
141 condvar: Condvar,
142}
143
144impl Signaler {
145 fn new() -> Self {
146 Self {
147 mutex: Mutex::new(false),
148 condvar: Condvar::new(),
149 }
150 }
151
152 fn wait_until(&self, until: StdInstant) {
153 let mut signaled = self.mutex.lock().unwrap();
154 while !*signaled {
155 let now = StdInstant::now();
156
157 if now >= until {
158 break;
159 }
160
161 let dur = until - now;
162 let (signaled2, timeout) = self.condvar.wait_timeout(signaled, dur).unwrap();
163 signaled = signaled2;
164 if timeout.timed_out() {
165 break;
166 }
167 }
168 *signaled = false;
169 }
170
171 fn signal(&self) {
172 let mut signaled = self.mutex.lock().unwrap();
173 *signaled = true;
174 self.condvar.notify_one();
175 }
176}
177
178pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
179unsafe impl<T> Send for UninitCell<T> {}
180unsafe impl<T> Sync for UninitCell<T> {}
181
182impl<T> UninitCell<T> {
183 pub const fn uninit() -> Self {
184 Self(MaybeUninit::uninit())
185 }
186
187 pub unsafe fn as_ptr(&self) -> *const T {
188 (*self.0.as_ptr()).get()
189 }
190
191 pub unsafe fn as_mut_ptr(&self) -> *mut T {
192 (*self.0.as_ptr()).get()
193 }
194
195 pub unsafe fn as_ref(&self) -> &T {
196 &*self.as_ptr()
197 }
198
199 pub unsafe fn write(&self, val: T) {
200 ptr::write(self.as_mut_ptr(), val)
201 }
202}
203
204impl<T: Copy> UninitCell<T> {
205 pub unsafe fn read(&self) -> T {
206 ptr::read(self.as_mut_ptr())
207 }
208}
diff --git a/embassy-executor/src/time/driver_wasm.rs b/embassy-executor/src/time/driver_wasm.rs
new file mode 100644
index 000000000..5f585a19a
--- /dev/null
+++ b/embassy-executor/src/time/driver_wasm.rs
@@ -0,0 +1,134 @@
1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::{Mutex, Once};
5
6use atomic_polyfill::{AtomicU8, Ordering};
7use wasm_bindgen::prelude::*;
8use wasm_timer::Instant as StdInstant;
9
10use crate::time::driver::{AlarmHandle, Driver};
11
12const ALARM_COUNT: usize = 4;
13
14struct AlarmState {
15 token: Option<f64>,
16 closure: Option<Closure<dyn FnMut() + 'static>>,
17}
18
19unsafe impl Send for AlarmState {}
20
21impl AlarmState {
22 const fn new() -> Self {
23 Self {
24 token: None,
25 closure: None,
26 }
27 }
28}
29
30#[wasm_bindgen]
31extern "C" {
32 fn setTimeout(closure: &Closure<dyn FnMut()>, millis: u32) -> f64;
33 fn clearTimeout(token: f64);
34}
35
36struct TimeDriver {
37 alarm_count: AtomicU8,
38
39 once: Once,
40 alarms: UninitCell<Mutex<[AlarmState; ALARM_COUNT]>>,
41 zero_instant: UninitCell<StdInstant>,
42}
43
44const ALARM_NEW: AlarmState = AlarmState::new();
45crate::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
46 alarm_count: AtomicU8::new(0),
47 once: Once::new(),
48 alarms: UninitCell::uninit(),
49 zero_instant: UninitCell::uninit(),
50});
51
52impl TimeDriver {
53 fn init(&self) {
54 self.once.call_once(|| unsafe {
55 self.alarms.write(Mutex::new([ALARM_NEW; ALARM_COUNT]));
56 self.zero_instant.write(StdInstant::now());
57 });
58 }
59}
60
61impl Driver for TimeDriver {
62 fn now(&self) -> u64 {
63 self.init();
64
65 let zero = unsafe { self.zero_instant.read() };
66 StdInstant::now().duration_since(zero).as_micros() as u64
67 }
68
69 unsafe fn allocate_alarm(&self) -> Option<AlarmHandle> {
70 let id = self.alarm_count.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
71 if x < ALARM_COUNT as u8 {
72 Some(x + 1)
73 } else {
74 None
75 }
76 });
77
78 match id {
79 Ok(id) => Some(AlarmHandle::new(id)),
80 Err(_) => None,
81 }
82 }
83
84 fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) {
85 self.init();
86 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
87 let alarm = &mut alarms[alarm.id() as usize];
88 alarm.closure.replace(Closure::new(move || {
89 callback(ctx);
90 }));
91 }
92
93 fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) {
94 self.init();
95 let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap();
96 let alarm = &mut alarms[alarm.id() as usize];
97 let timeout = (timestamp - self.now()) as u32;
98 if let Some(token) = alarm.token {
99 clearTimeout(token);
100 }
101 alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000));
102 }
103}
104
105pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
106unsafe impl<T> Send for UninitCell<T> {}
107unsafe impl<T> Sync for UninitCell<T> {}
108
109impl<T> UninitCell<T> {
110 pub const fn uninit() -> Self {
111 Self(MaybeUninit::uninit())
112 }
113 unsafe fn as_ptr(&self) -> *const T {
114 (*self.0.as_ptr()).get()
115 }
116
117 pub unsafe fn as_mut_ptr(&self) -> *mut T {
118 (*self.0.as_ptr()).get()
119 }
120
121 pub unsafe fn as_ref(&self) -> &T {
122 &*self.as_ptr()
123 }
124
125 pub unsafe fn write(&self, val: T) {
126 ptr::write(self.as_mut_ptr(), val)
127 }
128}
129
130impl<T: Copy> UninitCell<T> {
131 pub unsafe fn read(&self) -> T {
132 ptr::read(self.as_mut_ptr())
133 }
134}
diff --git a/embassy-executor/src/time/duration.rs b/embassy-executor/src/time/duration.rs
new file mode 100644
index 000000000..dc4f16bd4
--- /dev/null
+++ b/embassy-executor/src/time/duration.rs
@@ -0,0 +1,184 @@
1use core::fmt;
2use core::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign};
3
4use super::{GCD_1K, GCD_1M, TICKS_PER_SECOND};
5
6#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
7#[cfg_attr(feature = "defmt", derive(defmt::Format))]
8/// Represents the difference between two [Instant](struct.Instant.html)s
9pub struct Duration {
10 pub(crate) ticks: u64,
11}
12
13impl Duration {
14 /// The smallest value that can be represented by the `Duration` type.
15 pub const MIN: Duration = Duration { ticks: u64::MIN };
16 /// The largest value that can be represented by the `Duration` type.
17 pub const MAX: Duration = Duration { ticks: u64::MAX };
18
19 /// Tick count of the `Duration`.
20 pub const fn as_ticks(&self) -> u64 {
21 self.ticks
22 }
23
24 /// Convert the `Duration` to seconds, rounding down.
25 pub const fn as_secs(&self) -> u64 {
26 self.ticks / TICKS_PER_SECOND
27 }
28
29 /// Convert the `Duration` to milliseconds, rounding down.
30 pub const fn as_millis(&self) -> u64 {
31 self.ticks * (1000 / GCD_1K) / (TICKS_PER_SECOND / GCD_1K)
32 }
33
34 /// Convert the `Duration` to microseconds, rounding down.
35 pub const fn as_micros(&self) -> u64 {
36 self.ticks * (1_000_000 / GCD_1M) / (TICKS_PER_SECOND / GCD_1M)
37 }
38
39 /// Creates a duration from the specified number of clock ticks
40 pub const fn from_ticks(ticks: u64) -> Duration {
41 Duration { ticks }
42 }
43
44 /// Creates a duration from the specified number of seconds, rounding up.
45 pub const fn from_secs(secs: u64) -> Duration {
46 Duration {
47 ticks: secs * TICKS_PER_SECOND,
48 }
49 }
50
51 /// Creates a duration from the specified number of milliseconds, rounding up.
52 pub const fn from_millis(millis: u64) -> Duration {
53 Duration {
54 ticks: div_ceil(millis * (TICKS_PER_SECOND / GCD_1K), 1000 / GCD_1K),
55 }
56 }
57
58 /// Creates a duration from the specified number of microseconds, rounding up.
59 /// NOTE: Delays this small may be inaccurate.
60 pub const fn from_micros(micros: u64) -> Duration {
61 Duration {
62 ticks: div_ceil(micros * (TICKS_PER_SECOND / GCD_1M), 1_000_000 / GCD_1M),
63 }
64 }
65
66 /// Creates a duration from the specified number of seconds, rounding down.
67 pub const fn from_secs_floor(secs: u64) -> Duration {
68 Duration {
69 ticks: secs * TICKS_PER_SECOND,
70 }
71 }
72
73 /// Creates a duration from the specified number of milliseconds, rounding down.
74 pub const fn from_millis_floor(millis: u64) -> Duration {
75 Duration {
76 ticks: millis * (TICKS_PER_SECOND / GCD_1K) / (1000 / GCD_1K),
77 }
78 }
79
80 /// Creates a duration from the specified number of microseconds, rounding down.
81 /// NOTE: Delays this small may be inaccurate.
82 pub const fn from_micros_floor(micros: u64) -> Duration {
83 Duration {
84 ticks: micros * (TICKS_PER_SECOND / GCD_1M) / (1_000_000 / GCD_1M),
85 }
86 }
87
88 /// Adds one Duration to another, returning a new Duration or None in the event of an overflow.
89 pub fn checked_add(self, rhs: Duration) -> Option<Duration> {
90 self.ticks.checked_add(rhs.ticks).map(|ticks| Duration { ticks })
91 }
92
93 /// Subtracts one Duration to another, returning a new Duration or None in the event of an overflow.
94 pub fn checked_sub(self, rhs: Duration) -> Option<Duration> {
95 self.ticks.checked_sub(rhs.ticks).map(|ticks| Duration { ticks })
96 }
97
98 /// Multiplies one Duration by a scalar u32, returning a new Duration or None in the event of an overflow.
99 pub fn checked_mul(self, rhs: u32) -> Option<Duration> {
100 self.ticks.checked_mul(rhs as _).map(|ticks| Duration { ticks })
101 }
102
103 /// Divides one Duration a scalar u32, returning a new Duration or None in the event of an overflow.
104 pub fn checked_div(self, rhs: u32) -> Option<Duration> {
105 self.ticks.checked_div(rhs as _).map(|ticks| Duration { ticks })
106 }
107}
108
109impl Add for Duration {
110 type Output = Duration;
111
112 fn add(self, rhs: Duration) -> Duration {
113 self.checked_add(rhs).expect("overflow when adding durations")
114 }
115}
116
117impl AddAssign for Duration {
118 fn add_assign(&mut self, rhs: Duration) {
119 *self = *self + rhs;
120 }
121}
122
123impl Sub for Duration {
124 type Output = Duration;
125
126 fn sub(self, rhs: Duration) -> Duration {
127 self.checked_sub(rhs).expect("overflow when subtracting durations")
128 }
129}
130
131impl SubAssign for Duration {
132 fn sub_assign(&mut self, rhs: Duration) {
133 *self = *self - rhs;
134 }
135}
136
137impl Mul<u32> for Duration {
138 type Output = Duration;
139
140 fn mul(self, rhs: u32) -> Duration {
141 self.checked_mul(rhs)
142 .expect("overflow when multiplying duration by scalar")
143 }
144}
145
146impl Mul<Duration> for u32 {
147 type Output = Duration;
148
149 fn mul(self, rhs: Duration) -> Duration {
150 rhs * self
151 }
152}
153
154impl MulAssign<u32> for Duration {
155 fn mul_assign(&mut self, rhs: u32) {
156 *self = *self * rhs;
157 }
158}
159
160impl Div<u32> for Duration {
161 type Output = Duration;
162
163 fn div(self, rhs: u32) -> Duration {
164 self.checked_div(rhs)
165 .expect("divide by zero error when dividing duration by scalar")
166 }
167}
168
169impl DivAssign<u32> for Duration {
170 fn div_assign(&mut self, rhs: u32) {
171 *self = *self / rhs;
172 }
173}
174
175impl<'a> fmt::Display for Duration {
176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177 write!(f, "{} ticks", self.ticks)
178 }
179}
180
181#[inline]
182const fn div_ceil(num: u64, den: u64) -> u64 {
183 (num + den - 1) / den
184}
diff --git a/embassy-executor/src/time/instant.rs b/embassy-executor/src/time/instant.rs
new file mode 100644
index 000000000..6a4925f47
--- /dev/null
+++ b/embassy-executor/src/time/instant.rs
@@ -0,0 +1,159 @@
1use core::fmt;
2use core::ops::{Add, AddAssign, Sub, SubAssign};
3
4use super::{driver, Duration, GCD_1K, GCD_1M, TICKS_PER_SECOND};
5
6#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
7#[cfg_attr(feature = "defmt", derive(defmt::Format))]
8/// An Instant in time, based on the MCU's clock ticks since startup.
9pub struct Instant {
10 ticks: u64,
11}
12
13impl Instant {
14 /// The smallest (earliest) value that can be represented by the `Instant` type.
15 pub const MIN: Instant = Instant { ticks: u64::MIN };
16 /// The largest (latest) value that can be represented by the `Instant` type.
17 pub const MAX: Instant = Instant { ticks: u64::MAX };
18
19 /// Returns an Instant representing the current time.
20 pub fn now() -> Instant {
21 Instant { ticks: driver::now() }
22 }
23
24 /// Create an Instant from a tick count since system boot.
25 pub const fn from_ticks(ticks: u64) -> Self {
26 Self { ticks }
27 }
28
29 /// Create an Instant from a microsecond count since system boot.
30 pub const fn from_micros(micros: u64) -> Self {
31 Self {
32 ticks: micros * (TICKS_PER_SECOND / GCD_1M) / (1_000_000 / GCD_1M),
33 }
34 }
35
36 /// Create an Instant from a millisecond count since system boot.
37 pub const fn from_millis(millis: u64) -> Self {
38 Self {
39 ticks: millis * (TICKS_PER_SECOND / GCD_1K) / (1000 / GCD_1K),
40 }
41 }
42
43 /// Create an Instant from a second count since system boot.
44 pub const fn from_secs(seconds: u64) -> Self {
45 Self {
46 ticks: seconds * TICKS_PER_SECOND,
47 }
48 }
49
50 /// Tick count since system boot.
51 pub const fn as_ticks(&self) -> u64 {
52 self.ticks
53 }
54
55 /// Seconds since system boot.
56 pub const fn as_secs(&self) -> u64 {
57 self.ticks / TICKS_PER_SECOND
58 }
59
60 /// Milliseconds since system boot.
61 pub const fn as_millis(&self) -> u64 {
62 self.ticks * (1000 / GCD_1K) / (TICKS_PER_SECOND / GCD_1K)
63 }
64
65 /// Microseconds since system boot.
66 pub const fn as_micros(&self) -> u64 {
67 self.ticks * (1_000_000 / GCD_1M) / (TICKS_PER_SECOND / GCD_1M)
68 }
69
70 /// Duration between this Instant and another Instant
71 /// Panics on over/underflow.
72 pub fn duration_since(&self, earlier: Instant) -> Duration {
73 Duration {
74 ticks: self.ticks.checked_sub(earlier.ticks).unwrap(),
75 }
76 }
77
78 /// Duration between this Instant and another Instant
79 pub fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> {
80 if self.ticks < earlier.ticks {
81 None
82 } else {
83 Some(Duration {
84 ticks: self.ticks - earlier.ticks,
85 })
86 }
87 }
88
89 /// Returns the duration since the "earlier" Instant.
90 /// If the "earlier" instant is in the future, the duration is set to zero.
91 pub fn saturating_duration_since(&self, earlier: Instant) -> Duration {
92 Duration {
93 ticks: if self.ticks < earlier.ticks {
94 0
95 } else {
96 self.ticks - earlier.ticks
97 },
98 }
99 }
100
101 /// Duration elapsed since this Instant.
102 pub fn elapsed(&self) -> Duration {
103 Instant::now() - *self
104 }
105
106 /// Adds one Duration to self, returning a new `Instant` or None in the event of an overflow.
107 pub fn checked_add(&self, duration: Duration) -> Option<Instant> {
108 self.ticks.checked_add(duration.ticks).map(|ticks| Instant { ticks })
109 }
110
111 /// Subtracts one Duration to self, returning a new `Instant` or None in the event of an overflow.
112 pub fn checked_sub(&self, duration: Duration) -> Option<Instant> {
113 self.ticks.checked_sub(duration.ticks).map(|ticks| Instant { ticks })
114 }
115}
116
117impl Add<Duration> for Instant {
118 type Output = Instant;
119
120 fn add(self, other: Duration) -> Instant {
121 self.checked_add(other)
122 .expect("overflow when adding duration to instant")
123 }
124}
125
126impl AddAssign<Duration> for Instant {
127 fn add_assign(&mut self, other: Duration) {
128 *self = *self + other;
129 }
130}
131
132impl Sub<Duration> for Instant {
133 type Output = Instant;
134
135 fn sub(self, other: Duration) -> Instant {
136 self.checked_sub(other)
137 .expect("overflow when subtracting duration from instant")
138 }
139}
140
141impl SubAssign<Duration> for Instant {
142 fn sub_assign(&mut self, other: Duration) {
143 *self = *self - other;
144 }
145}
146
147impl Sub<Instant> for Instant {
148 type Output = Duration;
149
150 fn sub(self, other: Instant) -> Duration {
151 self.duration_since(other)
152 }
153}
154
155impl<'a> fmt::Display for Instant {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 write!(f, "{} ticks", self.ticks)
158 }
159}
diff --git a/embassy-executor/src/time/mod.rs b/embassy-executor/src/time/mod.rs
new file mode 100644
index 000000000..b787a5cf2
--- /dev/null
+++ b/embassy-executor/src/time/mod.rs
@@ -0,0 +1,91 @@
1//! Timekeeping, delays and timeouts.
2//!
3//! Timekeeping is done with elapsed time since system boot. Time is represented in
4//! ticks, where the tick rate is defined by the current driver, usually to match
5//! the tick rate of the hardware.
6//!
7//! Tick counts are 64 bits. At the highest supported tick rate of 1Mhz this supports
8//! representing time spans of up to ~584558 years, which is big enough for all practical
9//! purposes and allows not having to worry about overflows.
10//!
11//! [`Instant`] represents a given instant of time (relative to system boot), and [`Duration`]
12//! represents the duration of a span of time. They implement the math operations you'd expect,
13//! like addition and substraction.
14//!
15//! # Delays and timeouts
16//!
17//! [`Timer`] allows performing async delays. [`Ticker`] allows periodic delays without drifting over time.
18//!
19//! An implementation of the `embedded-hal` delay traits is provided by [`Delay`], for compatibility
20//! with libraries from the ecosystem.
21//!
22//! # Wall-clock time
23//!
24//! The `time` module deals exclusively with a monotonically increasing tick count.
25//! Therefore it has no direct support for wall-clock time ("real life" datetimes
26//! like `2021-08-24 13:33:21`).
27//!
28//! If persistence across reboots is not needed, support can be built on top of
29//! `embassy_executor::time` by storing the offset between "seconds elapsed since boot"
30//! and "seconds since unix epoch".
31//!
32//! # Time driver
33//!
34//! The `time` module is backed by a global "time driver" specified at build time.
35//! Only one driver can be active in a program.
36//!
37//! All methods and structs transparently call into the active driver. This makes it
38//! possible for libraries to use `embassy_executor::time` in a driver-agnostic way without
39//! requiring generic parameters.
40//!
41//! For more details, check the [`driver`] module.
42
43#![deny(missing_docs)]
44
45mod delay;
46pub mod driver;
47mod duration;
48mod instant;
49mod timer;
50
51#[cfg(feature = "std")]
52mod driver_std;
53
54#[cfg(feature = "wasm")]
55mod driver_wasm;
56
57pub use delay::{block_for, Delay};
58pub use duration::Duration;
59pub use instant::Instant;
60pub use timer::{with_timeout, Ticker, TimeoutError, Timer};
61
62#[cfg(feature = "time-tick-1000hz")]
63const TPS: u64 = 1_000;
64
65#[cfg(feature = "time-tick-32768hz")]
66const TPS: u64 = 32_768;
67
68#[cfg(feature = "time-tick-1mhz")]
69const TPS: u64 = 1_000_000;
70
71#[cfg(feature = "time-tick-16mhz")]
72const TPS: u64 = 16_000_000;
73
74/// Ticks per second of the global timebase.
75///
76/// This value is specified by the `time-tick-*` Cargo features, which
77/// should be set by the time driver. Some drivers support a fixed tick rate, others
78/// allow you to choose a tick rate with Cargo features of their own. You should not
79/// set the `time-tick-*` features for embassy yourself as an end user.
80pub const TICKS_PER_SECOND: u64 = TPS;
81
82const fn gcd(a: u64, b: u64) -> u64 {
83 if b == 0 {
84 a
85 } else {
86 gcd(b, a % b)
87 }
88}
89
90pub(crate) const GCD_1K: u64 = gcd(TICKS_PER_SECOND, 1_000);
91pub(crate) const GCD_1M: u64 = gcd(TICKS_PER_SECOND, 1_000_000);
diff --git a/embassy-executor/src/time/timer.rs b/embassy-executor/src/time/timer.rs
new file mode 100644
index 000000000..b9cdb1be5
--- /dev/null
+++ b/embassy-executor/src/time/timer.rs
@@ -0,0 +1,151 @@
1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_util::future::{select, Either};
6use futures_util::{pin_mut, Stream};
7
8use crate::executor::raw;
9use crate::time::{Duration, Instant};
10
11/// Error returned by [`with_timeout`] on timeout.
12#[derive(Debug, Clone, PartialEq, Eq)]
13#[cfg_attr(feature = "defmt", derive(defmt::Format))]
14pub struct TimeoutError;
15
16/// Runs a given future with a timeout.
17///
18/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
19/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
20pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> {
21 let timeout_fut = Timer::after(timeout);
22 pin_mut!(fut);
23 match select(fut, timeout_fut).await {
24 Either::Left((r, _)) => Ok(r),
25 Either::Right(_) => Err(TimeoutError),
26 }
27}
28
29/// A future that completes at a specified [Instant](struct.Instant.html).
30pub struct Timer {
31 expires_at: Instant,
32 yielded_once: bool,
33}
34
35impl Timer {
36 /// Expire at specified [Instant](struct.Instant.html)
37 pub fn at(expires_at: Instant) -> Self {
38 Self {
39 expires_at,
40 yielded_once: false,
41 }
42 }
43
44 /// Expire after specified [Duration](struct.Duration.html).
45 /// This can be used as a `sleep` abstraction.
46 ///
47 /// Example:
48 /// ``` no_run
49 /// # #![feature(type_alias_impl_trait)]
50 /// #
51 /// # fn foo() {}
52 /// use embassy_executor::time::{Duration, Timer};
53 ///
54 /// #[embassy_executor::task]
55 /// async fn demo_sleep_seconds() {
56 /// // suspend this task for one second.
57 /// Timer::after(Duration::from_secs(1)).await;
58 /// }
59 /// ```
60 pub fn after(duration: Duration) -> Self {
61 Self {
62 expires_at: Instant::now() + duration,
63 yielded_once: false,
64 }
65 }
66}
67
68impl Unpin for Timer {}
69
70impl Future for Timer {
71 type Output = ();
72 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73 if self.yielded_once && self.expires_at <= Instant::now() {
74 Poll::Ready(())
75 } else {
76 unsafe { raw::register_timer(self.expires_at, cx.waker()) };
77 self.yielded_once = true;
78 Poll::Pending
79 }
80 }
81}
82
83/// Asynchronous stream that yields every Duration, indefinitely.
84///
85/// This stream will tick at uniform intervals, even if blocking work is performed between ticks.
86///
87/// For instance, consider the following code fragment.
88/// ``` no_run
89/// # #![feature(type_alias_impl_trait)]
90/// #
91/// use embassy_executor::time::{Duration, Timer};
92/// # fn foo() {}
93///
94/// #[embassy_executor::task]
95/// async fn ticker_example_0() {
96/// loop {
97/// foo();
98/// Timer::after(Duration::from_secs(1)).await;
99/// }
100/// }
101/// ```
102///
103/// This fragment will not call `foo` every second.
104/// Instead, it will call it every second + the time it took to previously call `foo`.
105///
106/// Example using ticker, which will consistently call `foo` once a second.
107///
108/// ``` no_run
109/// # #![feature(type_alias_impl_trait)]
110/// #
111/// use embassy_executor::time::{Duration, Ticker};
112/// use futures::StreamExt;
113/// # fn foo(){}
114///
115/// #[embassy_executor::task]
116/// async fn ticker_example_1() {
117/// let mut ticker = Ticker::every(Duration::from_secs(1));
118/// loop {
119/// foo();
120/// ticker.next().await;
121/// }
122/// }
123/// ```
124pub struct Ticker {
125 expires_at: Instant,
126 duration: Duration,
127}
128
129impl Ticker {
130 /// Creates a new ticker that ticks at the specified duration interval.
131 pub fn every(duration: Duration) -> Self {
132 let expires_at = Instant::now() + duration;
133 Self { expires_at, duration }
134 }
135}
136
137impl Unpin for Ticker {}
138
139impl Stream for Ticker {
140 type Item = ();
141 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142 if self.expires_at <= Instant::now() {
143 let dur = self.duration;
144 self.expires_at += dur;
145 Poll::Ready(Some(()))
146 } else {
147 unsafe { raw::register_timer(self.expires_at, cx.waker()) };
148 Poll::Pending
149 }
150 }
151}