aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2023-03-26 22:23:00 +0000
committerGitHub <[email protected]>2023-03-26 22:23:00 +0000
commit8a3a7c65a89e27883a2ab7524162f26851c4c10a (patch)
tree47ad806df2a2a05f874cff215c2c3f1cd0a088d4
parent7186e038012bf8c3430334c4838cb8ff508040b1 (diff)
parent21400da073d7173e4c2445cbbcd2cd430f120ad1 (diff)
Merge #1291
1291: executor: Allow TaskStorage to auto-implement `Sync` r=Dirbaio a=GrantM11235 Co-authored-by: Grant Miller <[email protected]> Co-authored-by: Dario Nieuwenhuis <[email protected]>
-rw-r--r--embassy-executor/src/raw/mod.rs179
-rw-r--r--embassy-executor/src/raw/timer_queue.rs14
-rw-r--r--embassy-executor/src/raw/util.rs29
-rw-r--r--embassy-executor/src/spawner.rs12
4 files changed, 153 insertions, 81 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 42bd82262..15ff18fc8 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -13,11 +13,12 @@ mod timer_queue;
13pub(crate) mod util; 13pub(crate) mod util;
14mod waker; 14mod waker;
15 15
16use core::cell::Cell;
17use core::future::Future; 16use core::future::Future;
17use core::marker::PhantomData;
18use core::mem; 18use core::mem;
19use core::pin::Pin; 19use core::pin::Pin;
20use core::ptr::NonNull; 20use core::ptr::NonNull;
21use core::sync::atomic::AtomicPtr;
21use core::task::{Context, Poll}; 22use core::task::{Context, Poll};
22 23
23use atomic_polyfill::{AtomicU32, Ordering}; 24use atomic_polyfill::{AtomicU32, Ordering};
@@ -30,7 +31,7 @@ use embassy_time::Instant;
30use rtos_trace::trace; 31use rtos_trace::trace;
31 32
32use self::run_queue::{RunQueue, RunQueueItem}; 33use self::run_queue::{RunQueue, RunQueueItem};
33use self::util::UninitCell; 34use self::util::{SyncUnsafeCell, UninitCell};
34pub use self::waker::task_from_waker; 35pub use self::waker::task_from_waker;
35use super::SpawnToken; 36use super::SpawnToken;
36 37
@@ -46,11 +47,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
46pub(crate) struct TaskHeader { 47pub(crate) struct TaskHeader {
47 pub(crate) state: AtomicU32, 48 pub(crate) state: AtomicU32,
48 pub(crate) run_queue_item: RunQueueItem, 49 pub(crate) run_queue_item: RunQueueItem,
49 pub(crate) executor: Cell<Option<&'static Executor>>, 50 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
50 poll_fn: Cell<Option<unsafe fn(TaskRef)>>, 51 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
51 52
52 #[cfg(feature = "integrated-timers")] 53 #[cfg(feature = "integrated-timers")]
53 pub(crate) expires_at: Cell<Instant>, 54 pub(crate) expires_at: SyncUnsafeCell<Instant>,
54 #[cfg(feature = "integrated-timers")] 55 #[cfg(feature = "integrated-timers")]
55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 56 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
56} 57}
@@ -61,6 +62,9 @@ pub struct TaskRef {
61 ptr: NonNull<TaskHeader>, 62 ptr: NonNull<TaskHeader>,
62} 63}
63 64
65unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
66unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
67
64impl TaskRef { 68impl TaskRef {
65 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { 69 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
66 Self { 70 Self {
@@ -115,12 +119,12 @@ impl<F: Future + 'static> TaskStorage<F> {
115 raw: TaskHeader { 119 raw: TaskHeader {
116 state: AtomicU32::new(0), 120 state: AtomicU32::new(0),
117 run_queue_item: RunQueueItem::new(), 121 run_queue_item: RunQueueItem::new(),
118 executor: Cell::new(None), 122 executor: SyncUnsafeCell::new(None),
119 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 123 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
120 poll_fn: Cell::new(None), 124 poll_fn: SyncUnsafeCell::new(None),
121 125
122 #[cfg(feature = "integrated-timers")] 126 #[cfg(feature = "integrated-timers")]
123 expires_at: Cell::new(Instant::from_ticks(0)), 127 expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
124 #[cfg(feature = "integrated-timers")] 128 #[cfg(feature = "integrated-timers")]
125 timer_queue_item: timer_queue::TimerQueueItem::new(), 129 timer_queue_item: timer_queue::TimerQueueItem::new(),
126 }, 130 },
@@ -170,9 +174,15 @@ impl<F: Future + 'static> TaskStorage<F> {
170 // it's a noop for our waker. 174 // it's a noop for our waker.
171 mem::forget(waker); 175 mem::forget(waker);
172 } 176 }
173}
174 177
175unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 178 #[doc(hidden)]
179 #[allow(dead_code)]
180 fn _assert_sync(self) {
181 fn assert_sync<T: Sync>(_: T) {}
182
183 assert_sync(self)
184 }
185}
176 186
177struct AvailableTask<F: Future + 'static> { 187struct AvailableTask<F: Future + 'static> {
178 task: &'static TaskStorage<F>, 188 task: &'static TaskStorage<F>,
@@ -279,29 +289,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
279 } 289 }
280} 290}
281 291
282/// Raw executor. 292pub(crate) struct SyncExecutor {
283///
284/// This is the core of the Embassy executor. It is low-level, requiring manual
285/// handling of wakeups and task polling. If you can, prefer using one of the
286/// [higher level executors](crate::Executor).
287///
288/// The raw executor leaves it up to you to handle wakeups and scheduling:
289///
290/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
291/// that "want to run").
292/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
293/// to do. You must arrange for `poll()` to be called as soon as possible.
294///
295/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
296/// level, etc. It may be called synchronously from any `Executor` method call as well.
297/// You must deal with this correctly.
298///
299/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
300/// the requirement for `poll` to not be called reentrantly.
301pub struct Executor {
302 run_queue: RunQueue, 293 run_queue: RunQueue,
303 signal_fn: fn(*mut ()), 294 signal_fn: fn(*mut ()),
304 signal_ctx: *mut (), 295 signal_ctx: AtomicPtr<()>,
305 296
306 #[cfg(feature = "integrated-timers")] 297 #[cfg(feature = "integrated-timers")]
307 pub(crate) timer_queue: timer_queue::TimerQueue, 298 pub(crate) timer_queue: timer_queue::TimerQueue,
@@ -309,14 +300,8 @@ pub struct Executor {
309 alarm: AlarmHandle, 300 alarm: AlarmHandle,
310} 301}
311 302
312impl Executor { 303impl SyncExecutor {
313 /// Create a new executor. 304 pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
314 ///
315 /// When the executor has work to do, it will call `signal_fn` with
316 /// `signal_ctx` as argument.
317 ///
318 /// See [`Executor`] docs for details on `signal_fn`.
319 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
320 #[cfg(feature = "integrated-timers")] 305 #[cfg(feature = "integrated-timers")]
321 let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; 306 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
322 #[cfg(feature = "integrated-timers")] 307 #[cfg(feature = "integrated-timers")]
@@ -325,7 +310,7 @@ impl Executor {
325 Self { 310 Self {
326 run_queue: RunQueue::new(), 311 run_queue: RunQueue::new(),
327 signal_fn, 312 signal_fn,
328 signal_ctx, 313 signal_ctx: AtomicPtr::new(signal_ctx),
329 314
330 #[cfg(feature = "integrated-timers")] 315 #[cfg(feature = "integrated-timers")]
331 timer_queue: timer_queue::TimerQueue::new(), 316 timer_queue: timer_queue::TimerQueue::new(),
@@ -346,19 +331,10 @@ impl Executor {
346 trace::task_ready_begin(task.as_ptr() as u32); 331 trace::task_ready_begin(task.as_ptr() as u32);
347 332
348 if self.run_queue.enqueue(cs, task) { 333 if self.run_queue.enqueue(cs, task) {
349 (self.signal_fn)(self.signal_ctx) 334 (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed))
350 } 335 }
351 } 336 }
352 337
353 /// Spawn a task in this executor.
354 ///
355 /// # Safety
356 ///
357 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
358 ///
359 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
360 /// In this case, the task's Future must be Send. This is because this is effectively
361 /// sending the task to the executor thread.
362 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 338 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
363 task.header().executor.set(Some(self)); 339 task.header().executor.set(Some(self));
364 340
@@ -370,24 +346,11 @@ impl Executor {
370 }) 346 })
371 } 347 }
372 348
373 /// Poll all queued tasks in this executor.
374 ///
375 /// This loops over all tasks that are queued to be polled (i.e. they're
376 /// freshly spawned or they've been woken). Other tasks are not polled.
377 ///
378 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
379 /// to call `poll` even when not requested by `signal_fn`, but it wastes
380 /// energy.
381 ///
382 /// # Safety 349 /// # Safety
383 /// 350 ///
384 /// You must NOT call `poll` reentrantly on the same executor. 351 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
385 /// 352 pub(crate) unsafe fn poll(&'static self) {
386 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you 353 #[allow(clippy::never_loop)]
387 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
388 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
389 /// no `poll()` already running.
390 pub unsafe fn poll(&'static self) {
391 loop { 354 loop {
392 #[cfg(feature = "integrated-timers")] 355 #[cfg(feature = "integrated-timers")]
393 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 356 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
@@ -441,6 +404,84 @@ impl Executor {
441 #[cfg(feature = "rtos-trace")] 404 #[cfg(feature = "rtos-trace")]
442 trace::system_idle(); 405 trace::system_idle();
443 } 406 }
407}
408
409/// Raw executor.
410///
411/// This is the core of the Embassy executor. It is low-level, requiring manual
412/// handling of wakeups and task polling. If you can, prefer using one of the
413/// [higher level executors](crate::Executor).
414///
415/// The raw executor leaves it up to you to handle wakeups and scheduling:
416///
417/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
418/// that "want to run").
419/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
420/// to do. You must arrange for `poll()` to be called as soon as possible.
421///
422/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
423/// level, etc. It may be called synchronously from any `Executor` method call as well.
424/// You must deal with this correctly.
425///
426/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
427/// the requirement for `poll` to not be called reentrantly.
428#[repr(transparent)]
429pub struct Executor {
430 pub(crate) inner: SyncExecutor,
431
432 _not_sync: PhantomData<*mut ()>,
433}
434
435impl Executor {
436 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
437 mem::transmute(inner)
438 }
439 /// Create a new executor.
440 ///
441 /// When the executor has work to do, it will call `signal_fn` with
442 /// `signal_ctx` as argument.
443 ///
444 /// See [`Executor`] docs for details on `signal_fn`.
445 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
446 Self {
447 inner: SyncExecutor::new(signal_fn, signal_ctx),
448 _not_sync: PhantomData,
449 }
450 }
451
452 /// Spawn a task in this executor.
453 ///
454 /// # Safety
455 ///
456 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
457 ///
458 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
459 /// In this case, the task's Future must be Send. This is because this is effectively
460 /// sending the task to the executor thread.
461 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
462 self.inner.spawn(task)
463 }
464
465 /// Poll all queued tasks in this executor.
466 ///
467 /// This loops over all tasks that are queued to be polled (i.e. they're
468 /// freshly spawned or they've been woken). Other tasks are not polled.
469 ///
470 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
471 /// to call `poll` even when not requested by `signal_fn`, but it wastes
472 /// energy.
473 ///
474 /// # Safety
475 ///
476 /// You must NOT call `poll` reentrantly on the same executor.
477 ///
478 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
479 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
480 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
481 /// no `poll()` already running.
482 pub unsafe fn poll(&'static self) {
483 self.inner.poll()
484 }
444 485
445 /// Get a spawner that spawns tasks in this executor. 486 /// Get a spawner that spawns tasks in this executor.
446 /// 487 ///
@@ -483,8 +524,10 @@ impl embassy_time::queue::TimerQueue for TimerQueue {
483 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { 524 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
484 let task = waker::task_from_waker(waker); 525 let task = waker::task_from_waker(waker);
485 let task = task.header(); 526 let task = task.header();
486 let expires_at = task.expires_at.get(); 527 unsafe {
487 task.expires_at.set(expires_at.min(at)); 528 let expires_at = task.expires_at.get();
529 task.expires_at.set(expires_at.min(at));
530 }
488 } 531 }
489} 532}
490 533
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 57d6d3cda..dc71c95b1 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,28 +1,32 @@
1use core::cell::Cell;
2use core::cmp::min; 1use core::cmp::min;
3 2
4use atomic_polyfill::Ordering; 3use atomic_polyfill::Ordering;
5use embassy_time::Instant; 4use embassy_time::Instant;
6 5
7use super::{TaskRef, STATE_TIMER_QUEUED}; 6use super::{TaskRef, STATE_TIMER_QUEUED};
7use crate::raw::util::SyncUnsafeCell;
8 8
9pub(crate) struct TimerQueueItem { 9pub(crate) struct TimerQueueItem {
10 next: Cell<Option<TaskRef>>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
11} 11}
12 12
13impl TimerQueueItem { 13impl TimerQueueItem {
14 pub const fn new() -> Self { 14 pub const fn new() -> Self {
15 Self { next: Cell::new(None) } 15 Self {
16 next: SyncUnsafeCell::new(None),
17 }
16 } 18 }
17} 19}
18 20
19pub(crate) struct TimerQueue { 21pub(crate) struct TimerQueue {
20 head: Cell<Option<TaskRef>>, 22 head: SyncUnsafeCell<Option<TaskRef>>,
21} 23}
22 24
23impl TimerQueue { 25impl TimerQueue {
24 pub const fn new() -> Self { 26 pub const fn new() -> Self {
25 Self { head: Cell::new(None) } 27 Self {
28 head: SyncUnsafeCell::new(None),
29 }
26 } 30 }
27 31
28 pub(crate) unsafe fn update(&self, p: TaskRef) { 32 pub(crate) unsafe fn update(&self, p: TaskRef) {
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index 2b1f6b6f3..e2e8f4df8 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -25,3 +25,32 @@ impl<T> UninitCell<T> {
25 ptr::drop_in_place(self.as_mut_ptr()) 25 ptr::drop_in_place(self.as_mut_ptr())
26 } 26 }
27} 27}
28
29unsafe impl<T> Sync for UninitCell<T> {}
30
31#[repr(transparent)]
32pub struct SyncUnsafeCell<T> {
33 value: UnsafeCell<T>,
34}
35
36unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}
37
38impl<T> SyncUnsafeCell<T> {
39 #[inline]
40 pub const fn new(value: T) -> Self {
41 Self {
42 value: UnsafeCell::new(value),
43 }
44 }
45
46 pub unsafe fn set(&self, value: T) {
47 *self.value.get() = value;
48 }
49
50 pub unsafe fn get(&self) -> T
51 where
52 T: Copy,
53 {
54 *self.value.get()
55 }
56}
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 7c0a0183c..2b6224045 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -92,6 +92,7 @@ impl Spawner {
92 poll_fn(|cx| { 92 poll_fn(|cx| {
93 let task = raw::task_from_waker(cx.waker()); 93 let task = raw::task_from_waker(cx.waker());
94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
95 let executor = unsafe { raw::Executor::wrap(executor) };
95 Poll::Ready(Self::new(executor)) 96 Poll::Ready(Self::new(executor))
96 }) 97 })
97 .await 98 .await
@@ -130,9 +131,7 @@ impl Spawner {
130 /// spawner to other threads, but the spawner loses the ability to spawn 131 /// spawner to other threads, but the spawner loses the ability to spawn
131 /// non-Send tasks. 132 /// non-Send tasks.
132 pub fn make_send(&self) -> SendSpawner { 133 pub fn make_send(&self) -> SendSpawner {
133 SendSpawner { 134 SendSpawner::new(&self.executor.inner)
134 executor: self.executor,
135 }
136 } 135 }
137} 136}
138 137
@@ -145,14 +144,11 @@ impl Spawner {
145/// If you want to spawn non-Send tasks, use [Spawner]. 144/// If you want to spawn non-Send tasks, use [Spawner].
146#[derive(Copy, Clone)] 145#[derive(Copy, Clone)]
147pub struct SendSpawner { 146pub struct SendSpawner {
148 executor: &'static raw::Executor, 147 executor: &'static raw::SyncExecutor,
149} 148}
150 149
151unsafe impl Send for SendSpawner {}
152unsafe impl Sync for SendSpawner {}
153
154impl SendSpawner { 150impl SendSpawner {
155 pub(crate) fn new(executor: &'static raw::Executor) -> Self { 151 pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self {
156 Self { executor } 152 Self { executor }
157 } 153 }
158 154