aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/executor/raw/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/executor/raw/mod.rs')
-rw-r--r--embassy-executor/src/executor/raw/mod.rs462
1 files changed, 0 insertions, 462 deletions
diff --git a/embassy-executor/src/executor/raw/mod.rs b/embassy-executor/src/executor/raw/mod.rs
deleted file mode 100644
index 56220d10e..000000000
--- a/embassy-executor/src/executor/raw/mod.rs
+++ /dev/null
@@ -1,462 +0,0 @@
1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! executor wrappers in [`executor`](crate::executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe.
9
10mod run_queue;
11#[cfg(feature = "time")]
12mod timer_queue;
13pub(crate) mod util;
14mod waker;
15
16use core::cell::Cell;
17use core::future::Future;
18use core::pin::Pin;
19use core::ptr::NonNull;
20use core::task::{Context, Poll};
21use core::{mem, ptr};
22
23use atomic_polyfill::{AtomicU32, Ordering};
24use critical_section::CriticalSection;
25#[cfg(feature = "rtos-trace")]
26use rtos_trace::trace;
27
28use self::run_queue::{RunQueue, RunQueueItem};
29use self::util::UninitCell;
30pub use self::waker::task_from_waker;
31use super::SpawnToken;
32#[cfg(feature = "time")]
33use crate::time::driver::{self, AlarmHandle};
34#[cfg(feature = "time")]
35use crate::time::Instant;
36
37/// Task is spawned (has a future)
38pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
39/// Task is in the executor run queue
40pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
41/// Task is in the executor timer queue
42#[cfg(feature = "time")]
43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
44
45/// Raw task header for use in task pointers.
46///
47/// This is an opaque struct, used for raw pointers to tasks, for use
48/// with funtions like [`wake_task`] and [`task_from_waker`].
49pub struct TaskHeader {
50 pub(crate) state: AtomicU32,
51 pub(crate) run_queue_item: RunQueueItem,
52 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
53 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED
54
55 #[cfg(feature = "time")]
56 pub(crate) expires_at: Cell<Instant>,
57 #[cfg(feature = "time")]
58 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
59}
60
61impl TaskHeader {
62 pub(crate) const fn new() -> Self {
63 Self {
64 state: AtomicU32::new(0),
65 run_queue_item: RunQueueItem::new(),
66 executor: Cell::new(ptr::null()),
67 poll_fn: UninitCell::uninit(),
68
69 #[cfg(feature = "time")]
70 expires_at: Cell::new(Instant::from_ticks(0)),
71 #[cfg(feature = "time")]
72 timer_queue_item: timer_queue::TimerQueueItem::new(),
73 }
74 }
75}
76
77/// Raw storage in which a task can be spawned.
78///
79/// This struct holds the necessary memory to spawn one task whose future is `F`.
80/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
81/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
82///
83/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
84/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
85///
86/// Internally, the [embassy_executor::task](embassy_macros::task) macro allocates an array of `TaskStorage`s
87/// in a `static`. The most common reason to use the raw `Task` is to have control of where
88/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
89
90// repr(C) is needed to guarantee that the Task is located at offset 0
91// This makes it safe to cast between TaskHeader and TaskStorage pointers.
92#[repr(C)]
93pub struct TaskStorage<F: Future + 'static> {
94 raw: TaskHeader,
95 future: UninitCell<F>, // Valid if STATE_SPAWNED
96}
97
98impl<F: Future + 'static> TaskStorage<F> {
99 const NEW: Self = Self::new();
100
101 /// Create a new TaskStorage, in not-spawned state.
102 pub const fn new() -> Self {
103 Self {
104 raw: TaskHeader::new(),
105 future: UninitCell::uninit(),
106 }
107 }
108
109 /// Try to spawn the task.
110 ///
111 /// The `future` closure constructs the future. It's only called if spawning is
112 /// actually possible. It is a closure instead of a simple `future: F` param to ensure
113 /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
114 /// NRVO optimizations.
115 ///
116 /// This function will fail if the task is already spawned and has not finished running.
117 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
118 /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
119 ///
120 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
121 /// on a different executor.
122 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
123 if self.spawn_mark_used() {
124 return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) };
125 }
126
127 SpawnToken::<F>::new_failed()
128 }
129
130 fn spawn_mark_used(&'static self) -> bool {
131 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
132 self.raw
133 .state
134 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
135 .is_ok()
136 }
137
138 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
139 // Initialize the task
140 self.raw.poll_fn.write(Self::poll);
141 self.future.write(future());
142 NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
143 }
144
145 unsafe fn poll(p: NonNull<TaskHeader>) {
146 let this = &*(p.as_ptr() as *const TaskStorage<F>);
147
148 let future = Pin::new_unchecked(this.future.as_mut());
149 let waker = waker::from_task(p);
150 let mut cx = Context::from_waker(&waker);
151 match future.poll(&mut cx) {
152 Poll::Ready(_) => {
153 this.future.drop_in_place();
154 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
155 }
156 Poll::Pending => {}
157 }
158
159 // the compiler is emitting a virtual call for waker drop, but we know
160 // it's a noop for our waker.
161 mem::forget(waker);
162 }
163}
164
165unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
166
167/// Raw storage that can hold up to N tasks of the same type.
168///
169/// This is essentially a `[TaskStorage<F>; N]`.
170pub struct TaskPool<F: Future + 'static, const N: usize> {
171 pool: [TaskStorage<F>; N],
172}
173
174impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
175 /// Create a new TaskPool, with all tasks in non-spawned state.
176 pub const fn new() -> Self {
177 Self {
178 pool: [TaskStorage::NEW; N],
179 }
180 }
181
182 /// Try to spawn a task in the pool.
183 ///
184 /// See [`TaskStorage::spawn()`] for details.
185 ///
186 /// This will loop over the pool and spawn the task in the first storage that
187 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
188 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
189 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
190 for task in &self.pool {
191 if task.spawn_mark_used() {
192 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
193 }
194 }
195
196 SpawnToken::<F>::new_failed()
197 }
198
199 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
200 /// the future is !Send.
201 ///
202 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
203 /// by the Embassy macros ONLY.
204 ///
205 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
206 /// is an `async fn`, NOT a hand-written `Future`.
207 #[doc(hidden)]
208 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
209 where
210 FutFn: FnOnce() -> F,
211 {
212 // When send-spawning a task, we construct the future in this thread, and effectively
213 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
214 // send-spawning should require the future `F` to be `Send`.
215 //
216 // The problem is this is more restrictive than needed. Once the future is executing,
217 // it is never sent to another thread. It is only sent when spawning. It should be
218 // enough for the task's arguments to be Send. (and in practice it's super easy to
219 // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
220 //
221 // We can do it by sending the task args and constructing the future in the executor thread
222 // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
223 // of the args.
224 //
225 // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
226 // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
227 //
228 // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
229 // but it's possible it'll be guaranteed in the future. See zulip thread:
230 // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
231 //
232 // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
233 // This is why we return `SpawnToken<FutFn>` below.
234 //
235 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
236 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
237
238 for task in &self.pool {
239 if task.spawn_mark_used() {
240 return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
241 }
242 }
243
244 SpawnToken::<FutFn>::new_failed()
245 }
246}
247
248/// Raw executor.
249///
250/// This is the core of the Embassy executor. It is low-level, requiring manual
251/// handling of wakeups and task polling. If you can, prefer using one of the
252/// higher level executors in [`crate::executor`].
253///
254/// The raw executor leaves it up to you to handle wakeups and scheduling:
255///
256/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
257/// that "want to run").
258/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
259/// to do. You must arrange for `poll()` to be called as soon as possible.
260///
261/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
262/// level, etc. It may be called synchronously from any `Executor` method call as well.
263/// You must deal with this correctly.
264///
265/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
266/// the requirement for `poll` to not be called reentrantly.
267pub struct Executor {
268 run_queue: RunQueue,
269 signal_fn: fn(*mut ()),
270 signal_ctx: *mut (),
271
272 #[cfg(feature = "time")]
273 pub(crate) timer_queue: timer_queue::TimerQueue,
274 #[cfg(feature = "time")]
275 alarm: AlarmHandle,
276}
277
278impl Executor {
279 /// Create a new executor.
280 ///
281 /// When the executor has work to do, it will call `signal_fn` with
282 /// `signal_ctx` as argument.
283 ///
284 /// See [`Executor`] docs for details on `signal_fn`.
285 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
286 #[cfg(feature = "time")]
287 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
288 #[cfg(feature = "time")]
289 driver::set_alarm_callback(alarm, signal_fn, signal_ctx);
290
291 Self {
292 run_queue: RunQueue::new(),
293 signal_fn,
294 signal_ctx,
295
296 #[cfg(feature = "time")]
297 timer_queue: timer_queue::TimerQueue::new(),
298 #[cfg(feature = "time")]
299 alarm,
300 }
301 }
302
303 /// Enqueue a task in the task queue
304 ///
305 /// # Safety
306 /// - `task` must be a valid pointer to a spawned task.
307 /// - `task` must be set up to run in this executor.
308 /// - `task` must NOT be already enqueued (in this executor or another one).
309 #[inline(always)]
310 unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) {
311 #[cfg(feature = "rtos-trace")]
312 trace::task_ready_begin(task.as_ptr() as u32);
313
314 if self.run_queue.enqueue(cs, task) {
315 (self.signal_fn)(self.signal_ctx)
316 }
317 }
318
319 /// Spawn a task in this executor.
320 ///
321 /// # Safety
322 ///
323 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
324 ///
325 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
326 /// In this case, the task's Future must be Send. This is because this is effectively
327 /// sending the task to the executor thread.
328 pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
329 task.as_ref().executor.set(self);
330
331 #[cfg(feature = "rtos-trace")]
332 trace::task_new(task.as_ptr() as u32);
333
334 critical_section::with(|cs| {
335 self.enqueue(cs, task);
336 })
337 }
338
339 /// Poll all queued tasks in this executor.
340 ///
341 /// This loops over all tasks that are queued to be polled (i.e. they're
342 /// freshly spawned or they've been woken). Other tasks are not polled.
343 ///
344 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
345 /// to call `poll` even when not requested by `signal_fn`, but it wastes
346 /// energy.
347 ///
348 /// # Safety
349 ///
350 /// You must NOT call `poll` reentrantly on the same executor.
351 ///
352 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
353 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
355 /// no `poll()` already running.
356 pub unsafe fn poll(&'static self) {
357 #[cfg(feature = "time")]
358 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
359
360 self.run_queue.dequeue_all(|p| {
361 let task = p.as_ref();
362
363 #[cfg(feature = "time")]
364 task.expires_at.set(Instant::MAX);
365
366 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
367 if state & STATE_SPAWNED == 0 {
368 // If task is not running, ignore it. This can happen in the following scenario:
369 // - Task gets dequeued, poll starts
370 // - While task is being polled, it gets woken. It gets placed in the queue.
371 // - Task poll finishes, returning done=true
372 // - RUNNING bit is cleared, but the task is already in the queue.
373 return;
374 }
375
376 #[cfg(feature = "rtos-trace")]
377 trace::task_exec_begin(p.as_ptr() as u32);
378
379 // Run the task
380 task.poll_fn.read()(p as _);
381
382 #[cfg(feature = "rtos-trace")]
383 trace::task_exec_end();
384
385 // Enqueue or update into timer_queue
386 #[cfg(feature = "time")]
387 self.timer_queue.update(p);
388 });
389
390 #[cfg(feature = "time")]
391 {
392 // If this is already in the past, set_alarm will immediately trigger the alarm.
393 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again,
394 // so we immediately do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 driver::set_alarm(self.alarm, next_expiration.as_ticks());
397 }
398
399 #[cfg(feature = "rtos-trace")]
400 trace::system_idle();
401 }
402
403 /// Get a spawner that spawns tasks in this executor.
404 ///
405 /// It is OK to call this method multiple times to obtain multiple
406 /// `Spawner`s. You may also copy `Spawner`s.
407 pub fn spawner(&'static self) -> super::Spawner {
408 super::Spawner::new(self)
409 }
410}
411
412/// Wake a task by raw pointer.
413///
414/// You can obtain task pointers from `Waker`s using [`task_from_waker`].
415///
416/// # Safety
417///
418/// `task` must be a valid task pointer obtained from [`task_from_waker`].
419pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
420 critical_section::with(|cs| {
421 let header = task.as_ref();
422 let state = header.state.load(Ordering::Relaxed);
423
424 // If already scheduled, or if not started,
425 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
426 return;
427 }
428
429 // Mark it as scheduled
430 header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
431
432 // We have just marked the task as scheduled, so enqueue it.
433 let executor = &*header.executor.get();
434 executor.enqueue(cs, task);
435 })
436}
437
438#[cfg(feature = "time")]
439pub(crate) unsafe fn register_timer(at: Instant, waker: &core::task::Waker) {
440 let task = waker::task_from_waker(waker);
441 let task = task.as_ref();
442 let expires_at = task.expires_at.get();
443 task.expires_at.set(expires_at.min(at));
444}
445
446#[cfg(feature = "rtos-trace")]
447impl rtos_trace::RtosTraceOSCallbacks for Executor {
448 fn task_list() {
449 // We don't know what tasks exist, so we can't send them.
450 }
451 #[cfg(feature = "time")]
452 fn time() -> u64 {
453 Instant::now().as_micros()
454 }
455 #[cfg(not(feature = "time"))]
456 fn time() -> u64 {
457 0
458 }
459}
460
461#[cfg(feature = "rtos-trace")]
462rtos_trace::global_os_callbacks! {Executor}