aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/raw/mod.rs156
-rw-r--r--embassy-executor/src/raw/timer_queue.rs14
-rw-r--r--embassy-executor/src/raw/util.rs29
-rw-r--r--embassy-executor/src/spawner.rs12
4 files changed, 154 insertions, 57 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 42bd82262..938492c21 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -13,8 +13,8 @@ mod timer_queue;
13pub(crate) mod util; 13pub(crate) mod util;
14mod waker; 14mod waker;
15 15
16use core::cell::Cell;
17use core::future::Future; 16use core::future::Future;
17use core::marker::PhantomData;
18use core::mem; 18use core::mem;
19use core::pin::Pin; 19use core::pin::Pin;
20use core::ptr::NonNull; 20use core::ptr::NonNull;
@@ -30,7 +30,7 @@ use embassy_time::Instant;
30use rtos_trace::trace; 30use rtos_trace::trace;
31 31
32use self::run_queue::{RunQueue, RunQueueItem}; 32use self::run_queue::{RunQueue, RunQueueItem};
33use self::util::UninitCell; 33use self::util::{SyncUnsafeCell, UninitCell};
34pub use self::waker::task_from_waker; 34pub use self::waker::task_from_waker;
35use super::SpawnToken; 35use super::SpawnToken;
36 36
@@ -46,11 +46,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
46pub(crate) struct TaskHeader { 46pub(crate) struct TaskHeader {
47 pub(crate) state: AtomicU32, 47 pub(crate) state: AtomicU32,
48 pub(crate) run_queue_item: RunQueueItem, 48 pub(crate) run_queue_item: RunQueueItem,
49 pub(crate) executor: Cell<Option<&'static Executor>>, 49 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
50 poll_fn: Cell<Option<unsafe fn(TaskRef)>>, 50 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
51 51
52 #[cfg(feature = "integrated-timers")] 52 #[cfg(feature = "integrated-timers")]
53 pub(crate) expires_at: Cell<Instant>, 53 pub(crate) expires_at: SyncUnsafeCell<Instant>,
54 #[cfg(feature = "integrated-timers")] 54 #[cfg(feature = "integrated-timers")]
55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
56} 56}
@@ -61,6 +61,9 @@ pub struct TaskRef {
61 ptr: NonNull<TaskHeader>, 61 ptr: NonNull<TaskHeader>,
62} 62}
63 63
64unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
65unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
66
64impl TaskRef { 67impl TaskRef {
65 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { 68 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
66 Self { 69 Self {
@@ -115,12 +118,12 @@ impl<F: Future + 'static> TaskStorage<F> {
115 raw: TaskHeader { 118 raw: TaskHeader {
116 state: AtomicU32::new(0), 119 state: AtomicU32::new(0),
117 run_queue_item: RunQueueItem::new(), 120 run_queue_item: RunQueueItem::new(),
118 executor: Cell::new(None), 121 executor: SyncUnsafeCell::new(None),
119 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 122 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
120 poll_fn: Cell::new(None), 123 poll_fn: SyncUnsafeCell::new(None),
121 124
122 #[cfg(feature = "integrated-timers")] 125 #[cfg(feature = "integrated-timers")]
123 expires_at: Cell::new(Instant::from_ticks(0)), 126 expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
124 #[cfg(feature = "integrated-timers")] 127 #[cfg(feature = "integrated-timers")]
125 timer_queue_item: timer_queue::TimerQueueItem::new(), 128 timer_queue_item: timer_queue::TimerQueueItem::new(),
126 }, 129 },
@@ -170,9 +173,15 @@ impl<F: Future + 'static> TaskStorage<F> {
170 // it's a noop for our waker. 173 // it's a noop for our waker.
171 mem::forget(waker); 174 mem::forget(waker);
172 } 175 }
173}
174 176
175unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 177 #[doc(hidden)]
178 #[allow(dead_code)]
179 fn _assert_sync(self) {
180 fn assert_sync<T: Sync>(_: T) {}
181
182 assert_sync(self)
183 }
184}
176 185
177struct AvailableTask<F: Future + 'static> { 186struct AvailableTask<F: Future + 'static> {
178 task: &'static TaskStorage<F>, 187 task: &'static TaskStorage<F>,
@@ -279,29 +288,13 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
279 } 288 }
280} 289}
281 290
282/// Raw executor. 291struct SignalCtx(*mut ());
283/// 292unsafe impl Sync for SignalCtx {}
284/// This is the core of the Embassy executor. It is low-level, requiring manual 293
285/// handling of wakeups and task polling. If you can, prefer using one of the 294pub(crate) struct SyncExecutor {
286/// [higher level executors](crate::Executor).
287///
288/// The raw executor leaves it up to you to handle wakeups and scheduling:
289///
290/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
291/// that "want to run").
292/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
293/// to do. You must arrange for `poll()` to be called as soon as possible.
294///
295/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
296/// level, etc. It may be called synchronously from any `Executor` method call as well.
297/// You must deal with this correctly.
298///
299/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
300/// the requirement for `poll` to not be called reentrantly.
301pub struct Executor {
302 run_queue: RunQueue, 295 run_queue: RunQueue,
303 signal_fn: fn(*mut ()), 296 signal_fn: fn(*mut ()),
304 signal_ctx: *mut (), 297 signal_ctx: SignalCtx,
305 298
306 #[cfg(feature = "integrated-timers")] 299 #[cfg(feature = "integrated-timers")]
307 pub(crate) timer_queue: timer_queue::TimerQueue, 300 pub(crate) timer_queue: timer_queue::TimerQueue,
@@ -309,14 +302,8 @@ pub struct Executor {
309 alarm: AlarmHandle, 302 alarm: AlarmHandle,
310} 303}
311 304
312impl Executor { 305impl SyncExecutor {
313 /// Create a new executor. 306 pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
314 ///
315 /// When the executor has work to do, it will call `signal_fn` with
316 /// `signal_ctx` as argument.
317 ///
318 /// See [`Executor`] docs for details on `signal_fn`.
319 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
320 #[cfg(feature = "integrated-timers")] 307 #[cfg(feature = "integrated-timers")]
321 let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; 308 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
322 #[cfg(feature = "integrated-timers")] 309 #[cfg(feature = "integrated-timers")]
@@ -325,7 +312,7 @@ impl Executor {
325 Self { 312 Self {
326 run_queue: RunQueue::new(), 313 run_queue: RunQueue::new(),
327 signal_fn, 314 signal_fn,
328 signal_ctx, 315 signal_ctx: SignalCtx(signal_ctx),
329 316
330 #[cfg(feature = "integrated-timers")] 317 #[cfg(feature = "integrated-timers")]
331 timer_queue: timer_queue::TimerQueue::new(), 318 timer_queue: timer_queue::TimerQueue::new(),
@@ -346,7 +333,7 @@ impl Executor {
346 trace::task_ready_begin(task.as_ptr() as u32); 333 trace::task_ready_begin(task.as_ptr() as u32);
347 334
348 if self.run_queue.enqueue(cs, task) { 335 if self.run_queue.enqueue(cs, task) {
349 (self.signal_fn)(self.signal_ctx) 336 (self.signal_fn)(self.signal_ctx.0)
350 } 337 }
351 } 338 }
352 339
@@ -387,7 +374,8 @@ impl Executor {
387 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to 374 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
388 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 375 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
389 /// no `poll()` already running. 376 /// no `poll()` already running.
390 pub unsafe fn poll(&'static self) { 377 pub(crate) unsafe fn poll(&'static self) {
378 #[allow(clippy::never_loop)]
391 loop { 379 loop {
392 #[cfg(feature = "integrated-timers")] 380 #[cfg(feature = "integrated-timers")]
393 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 381 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
@@ -441,6 +429,84 @@ impl Executor {
441 #[cfg(feature = "rtos-trace")] 429 #[cfg(feature = "rtos-trace")]
442 trace::system_idle(); 430 trace::system_idle();
443 } 431 }
432}
433
434/// Raw executor.
435///
436/// This is the core of the Embassy executor. It is low-level, requiring manual
437/// handling of wakeups and task polling. If you can, prefer using one of the
438/// [higher level executors](crate::Executor).
439///
440/// The raw executor leaves it up to you to handle wakeups and scheduling:
441///
442/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
443/// that "want to run").
444/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
445/// to do. You must arrange for `poll()` to be called as soon as possible.
446///
447/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
448/// level, etc. It may be called synchronously from any `Executor` method call as well.
449/// You must deal with this correctly.
450///
451/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
452/// the requirement for `poll` to not be called reentrantly.
453#[repr(transparent)]
454pub struct Executor {
455 pub(crate) inner: SyncExecutor,
456
457 _not_sync: PhantomData<*mut ()>,
458}
459
460impl Executor {
461 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
462 mem::transmute(inner)
463 }
464 /// Create a new executor.
465 ///
466 /// When the executor has work to do, it will call `signal_fn` with
467 /// `signal_ctx` as argument.
468 ///
469 /// See [`Executor`] docs for details on `signal_fn`.
470 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
471 Self {
472 inner: SyncExecutor::new(signal_fn, signal_ctx),
473 _not_sync: PhantomData,
474 }
475 }
476
477 /// Spawn a task in this executor.
478 ///
479 /// # Safety
480 ///
481 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
482 ///
483 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
484 /// In this case, the task's Future must be Send. This is because this is effectively
485 /// sending the task to the executor thread.
486 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
487 self.inner.spawn(task)
488 }
489
490 /// Poll all queued tasks in this executor.
491 ///
492 /// This loops over all tasks that are queued to be polled (i.e. they're
493 /// freshly spawned or they've been woken). Other tasks are not polled.
494 ///
495 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
496 /// to call `poll` even when not requested by `signal_fn`, but it wastes
497 /// energy.
498 ///
499 /// # Safety
500 ///
501 /// You must NOT call `poll` reentrantly on the same executor.
502 ///
503 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
504 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
505 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
506 /// no `poll()` already running.
507 pub unsafe fn poll(&'static self) {
508 self.inner.poll()
509 }
444 510
445 /// Get a spawner that spawns tasks in this executor. 511 /// Get a spawner that spawns tasks in this executor.
446 /// 512 ///
@@ -483,8 +549,10 @@ impl embassy_time::queue::TimerQueue for TimerQueue {
483 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { 549 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
484 let task = waker::task_from_waker(waker); 550 let task = waker::task_from_waker(waker);
485 let task = task.header(); 551 let task = task.header();
486 let expires_at = task.expires_at.get(); 552 unsafe {
487 task.expires_at.set(expires_at.min(at)); 553 let expires_at = task.expires_at.get();
554 task.expires_at.set(expires_at.min(at));
555 }
488 } 556 }
489} 557}
490 558
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 57d6d3cda..dc71c95b1 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,28 +1,32 @@
1use core::cell::Cell;
2use core::cmp::min; 1use core::cmp::min;
3 2
4use atomic_polyfill::Ordering; 3use atomic_polyfill::Ordering;
5use embassy_time::Instant; 4use embassy_time::Instant;
6 5
7use super::{TaskRef, STATE_TIMER_QUEUED}; 6use super::{TaskRef, STATE_TIMER_QUEUED};
7use crate::raw::util::SyncUnsafeCell;
8 8
9pub(crate) struct TimerQueueItem { 9pub(crate) struct TimerQueueItem {
10 next: Cell<Option<TaskRef>>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
11} 11}
12 12
13impl TimerQueueItem { 13impl TimerQueueItem {
14 pub const fn new() -> Self { 14 pub const fn new() -> Self {
15 Self { next: Cell::new(None) } 15 Self {
16 next: SyncUnsafeCell::new(None),
17 }
16 } 18 }
17} 19}
18 20
19pub(crate) struct TimerQueue { 21pub(crate) struct TimerQueue {
20 head: Cell<Option<TaskRef>>, 22 head: SyncUnsafeCell<Option<TaskRef>>,
21} 23}
22 24
23impl TimerQueue { 25impl TimerQueue {
24 pub const fn new() -> Self { 26 pub const fn new() -> Self {
25 Self { head: Cell::new(None) } 27 Self {
28 head: SyncUnsafeCell::new(None),
29 }
26 } 30 }
27 31
28 pub(crate) unsafe fn update(&self, p: TaskRef) { 32 pub(crate) unsafe fn update(&self, p: TaskRef) {
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index 2b1f6b6f3..e2e8f4df8 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -25,3 +25,32 @@ impl<T> UninitCell<T> {
25 ptr::drop_in_place(self.as_mut_ptr()) 25 ptr::drop_in_place(self.as_mut_ptr())
26 } 26 }
27} 27}
28
29unsafe impl<T> Sync for UninitCell<T> {}
30
31#[repr(transparent)]
32pub struct SyncUnsafeCell<T> {
33 value: UnsafeCell<T>,
34}
35
36unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}
37
38impl<T> SyncUnsafeCell<T> {
39 #[inline]
40 pub const fn new(value: T) -> Self {
41 Self {
42 value: UnsafeCell::new(value),
43 }
44 }
45
46 pub unsafe fn set(&self, value: T) {
47 *self.value.get() = value;
48 }
49
50 pub unsafe fn get(&self) -> T
51 where
52 T: Copy,
53 {
54 *self.value.get()
55 }
56}
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 7c0a0183c..2b6224045 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -92,6 +92,7 @@ impl Spawner {
92 poll_fn(|cx| { 92 poll_fn(|cx| {
93 let task = raw::task_from_waker(cx.waker()); 93 let task = raw::task_from_waker(cx.waker());
94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
95 let executor = unsafe { raw::Executor::wrap(executor) };
95 Poll::Ready(Self::new(executor)) 96 Poll::Ready(Self::new(executor))
96 }) 97 })
97 .await 98 .await
@@ -130,9 +131,7 @@ impl Spawner {
130 /// spawner to other threads, but the spawner loses the ability to spawn 131 /// spawner to other threads, but the spawner loses the ability to spawn
131 /// non-Send tasks. 132 /// non-Send tasks.
132 pub fn make_send(&self) -> SendSpawner { 133 pub fn make_send(&self) -> SendSpawner {
133 SendSpawner { 134 SendSpawner::new(&self.executor.inner)
134 executor: self.executor,
135 }
136 } 135 }
137} 136}
138 137
@@ -145,14 +144,11 @@ impl Spawner {
145/// If you want to spawn non-Send tasks, use [Spawner]. 144/// If you want to spawn non-Send tasks, use [Spawner].
146#[derive(Copy, Clone)] 145#[derive(Copy, Clone)]
147pub struct SendSpawner { 146pub struct SendSpawner {
148 executor: &'static raw::Executor, 147 executor: &'static raw::SyncExecutor,
149} 148}
150 149
151unsafe impl Send for SendSpawner {}
152unsafe impl Sync for SendSpawner {}
153
154impl SendSpawner { 150impl SendSpawner {
155 pub(crate) fn new(executor: &'static raw::Executor) -> Self { 151 pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self {
156 Self { executor } 152 Self { executor }
157 } 153 }
158 154