aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/mod.rs
diff options
context:
space:
mode:
authorsander <[email protected]>2023-03-30 14:37:51 +0200
committersander <[email protected]>2023-03-30 14:37:51 +0200
commit6b2aaacf830d69fcb05f9611d3780f56b4ae82bc (patch)
treea6e4d7628cd5153bbfd122b902a598b0862feeb9 /embassy-executor/src/raw/mod.rs
parentba9afbc26d06ab38065cbff5b17a7f76db297ad4 (diff)
parent754bb802ba377c19be97d092c4b2afe542de20b5 (diff)
Update embassy
Merge commit '9dd3719f09835f646e3a8f3abaa33726a1e3f9ca'
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
-rw-r--r--embassy-executor/src/raw/mod.rs179
1 files changed, 111 insertions, 68 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 42bd82262..15ff18fc8 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -13,11 +13,12 @@ mod timer_queue;
13pub(crate) mod util; 13pub(crate) mod util;
14mod waker; 14mod waker;
15 15
16use core::cell::Cell;
17use core::future::Future; 16use core::future::Future;
17use core::marker::PhantomData;
18use core::mem; 18use core::mem;
19use core::pin::Pin; 19use core::pin::Pin;
20use core::ptr::NonNull; 20use core::ptr::NonNull;
21use core::sync::atomic::AtomicPtr;
21use core::task::{Context, Poll}; 22use core::task::{Context, Poll};
22 23
23use atomic_polyfill::{AtomicU32, Ordering}; 24use atomic_polyfill::{AtomicU32, Ordering};
@@ -30,7 +31,7 @@ use embassy_time::Instant;
30use rtos_trace::trace; 31use rtos_trace::trace;
31 32
32use self::run_queue::{RunQueue, RunQueueItem}; 33use self::run_queue::{RunQueue, RunQueueItem};
33use self::util::UninitCell; 34use self::util::{SyncUnsafeCell, UninitCell};
34pub use self::waker::task_from_waker; 35pub use self::waker::task_from_waker;
35use super::SpawnToken; 36use super::SpawnToken;
36 37
@@ -46,11 +47,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
46pub(crate) struct TaskHeader { 47pub(crate) struct TaskHeader {
47 pub(crate) state: AtomicU32, 48 pub(crate) state: AtomicU32,
48 pub(crate) run_queue_item: RunQueueItem, 49 pub(crate) run_queue_item: RunQueueItem,
49 pub(crate) executor: Cell<Option<&'static Executor>>, 50 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
50 poll_fn: Cell<Option<unsafe fn(TaskRef)>>, 51 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
51 52
52 #[cfg(feature = "integrated-timers")] 53 #[cfg(feature = "integrated-timers")]
53 pub(crate) expires_at: Cell<Instant>, 54 pub(crate) expires_at: SyncUnsafeCell<Instant>,
54 #[cfg(feature = "integrated-timers")] 55 #[cfg(feature = "integrated-timers")]
55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 56 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
56} 57}
@@ -61,6 +62,9 @@ pub struct TaskRef {
61 ptr: NonNull<TaskHeader>, 62 ptr: NonNull<TaskHeader>,
62} 63}
63 64
65unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
66unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
67
64impl TaskRef { 68impl TaskRef {
65 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { 69 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
66 Self { 70 Self {
@@ -115,12 +119,12 @@ impl<F: Future + 'static> TaskStorage<F> {
115 raw: TaskHeader { 119 raw: TaskHeader {
116 state: AtomicU32::new(0), 120 state: AtomicU32::new(0),
117 run_queue_item: RunQueueItem::new(), 121 run_queue_item: RunQueueItem::new(),
118 executor: Cell::new(None), 122 executor: SyncUnsafeCell::new(None),
119 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 123 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
120 poll_fn: Cell::new(None), 124 poll_fn: SyncUnsafeCell::new(None),
121 125
122 #[cfg(feature = "integrated-timers")] 126 #[cfg(feature = "integrated-timers")]
123 expires_at: Cell::new(Instant::from_ticks(0)), 127 expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
124 #[cfg(feature = "integrated-timers")] 128 #[cfg(feature = "integrated-timers")]
125 timer_queue_item: timer_queue::TimerQueueItem::new(), 129 timer_queue_item: timer_queue::TimerQueueItem::new(),
126 }, 130 },
@@ -170,9 +174,15 @@ impl<F: Future + 'static> TaskStorage<F> {
170 // it's a noop for our waker. 174 // it's a noop for our waker.
171 mem::forget(waker); 175 mem::forget(waker);
172 } 176 }
173}
174 177
175unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 178 #[doc(hidden)]
179 #[allow(dead_code)]
180 fn _assert_sync(self) {
181 fn assert_sync<T: Sync>(_: T) {}
182
183 assert_sync(self)
184 }
185}
176 186
177struct AvailableTask<F: Future + 'static> { 187struct AvailableTask<F: Future + 'static> {
178 task: &'static TaskStorage<F>, 188 task: &'static TaskStorage<F>,
@@ -279,29 +289,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
279 } 289 }
280} 290}
281 291
282/// Raw executor. 292pub(crate) struct SyncExecutor {
283///
284/// This is the core of the Embassy executor. It is low-level, requiring manual
285/// handling of wakeups and task polling. If you can, prefer using one of the
286/// [higher level executors](crate::Executor).
287///
288/// The raw executor leaves it up to you to handle wakeups and scheduling:
289///
290/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
291/// that "want to run").
292/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
293/// to do. You must arrange for `poll()` to be called as soon as possible.
294///
295/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
296/// level, etc. It may be called synchronously from any `Executor` method call as well.
297/// You must deal with this correctly.
298///
299/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
300/// the requirement for `poll` to not be called reentrantly.
301pub struct Executor {
302 run_queue: RunQueue, 293 run_queue: RunQueue,
303 signal_fn: fn(*mut ()), 294 signal_fn: fn(*mut ()),
304 signal_ctx: *mut (), 295 signal_ctx: AtomicPtr<()>,
305 296
306 #[cfg(feature = "integrated-timers")] 297 #[cfg(feature = "integrated-timers")]
307 pub(crate) timer_queue: timer_queue::TimerQueue, 298 pub(crate) timer_queue: timer_queue::TimerQueue,
@@ -309,14 +300,8 @@ pub struct Executor {
309 alarm: AlarmHandle, 300 alarm: AlarmHandle,
310} 301}
311 302
312impl Executor { 303impl SyncExecutor {
313 /// Create a new executor. 304 pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
314 ///
315 /// When the executor has work to do, it will call `signal_fn` with
316 /// `signal_ctx` as argument.
317 ///
318 /// See [`Executor`] docs for details on `signal_fn`.
319 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
320 #[cfg(feature = "integrated-timers")] 305 #[cfg(feature = "integrated-timers")]
321 let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; 306 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
322 #[cfg(feature = "integrated-timers")] 307 #[cfg(feature = "integrated-timers")]
@@ -325,7 +310,7 @@ impl Executor {
325 Self { 310 Self {
326 run_queue: RunQueue::new(), 311 run_queue: RunQueue::new(),
327 signal_fn, 312 signal_fn,
328 signal_ctx, 313 signal_ctx: AtomicPtr::new(signal_ctx),
329 314
330 #[cfg(feature = "integrated-timers")] 315 #[cfg(feature = "integrated-timers")]
331 timer_queue: timer_queue::TimerQueue::new(), 316 timer_queue: timer_queue::TimerQueue::new(),
@@ -346,19 +331,10 @@ impl Executor {
346 trace::task_ready_begin(task.as_ptr() as u32); 331 trace::task_ready_begin(task.as_ptr() as u32);
347 332
348 if self.run_queue.enqueue(cs, task) { 333 if self.run_queue.enqueue(cs, task) {
349 (self.signal_fn)(self.signal_ctx) 334 (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed))
350 } 335 }
351 } 336 }
352 337
353 /// Spawn a task in this executor.
354 ///
355 /// # Safety
356 ///
357 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
358 ///
359 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
360 /// In this case, the task's Future must be Send. This is because this is effectively
361 /// sending the task to the executor thread.
362 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 338 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
363 task.header().executor.set(Some(self)); 339 task.header().executor.set(Some(self));
364 340
@@ -370,24 +346,11 @@ impl Executor {
370 }) 346 })
371 } 347 }
372 348
373 /// Poll all queued tasks in this executor.
374 ///
375 /// This loops over all tasks that are queued to be polled (i.e. they're
376 /// freshly spawned or they've been woken). Other tasks are not polled.
377 ///
378 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
379 /// to call `poll` even when not requested by `signal_fn`, but it wastes
380 /// energy.
381 ///
382 /// # Safety 349 /// # Safety
383 /// 350 ///
384 /// You must NOT call `poll` reentrantly on the same executor. 351 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
385 /// 352 pub(crate) unsafe fn poll(&'static self) {
386 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you 353 #[allow(clippy::never_loop)]
387 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
388 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
389 /// no `poll()` already running.
390 pub unsafe fn poll(&'static self) {
391 loop { 354 loop {
392 #[cfg(feature = "integrated-timers")] 355 #[cfg(feature = "integrated-timers")]
393 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 356 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
@@ -441,6 +404,84 @@ impl Executor {
441 #[cfg(feature = "rtos-trace")] 404 #[cfg(feature = "rtos-trace")]
442 trace::system_idle(); 405 trace::system_idle();
443 } 406 }
407}
408
409/// Raw executor.
410///
411/// This is the core of the Embassy executor. It is low-level, requiring manual
412/// handling of wakeups and task polling. If you can, prefer using one of the
413/// [higher level executors](crate::Executor).
414///
415/// The raw executor leaves it up to you to handle wakeups and scheduling:
416///
417/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
418/// that "want to run").
419/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
420/// to do. You must arrange for `poll()` to be called as soon as possible.
421///
422/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
423/// level, etc. It may be called synchronously from any `Executor` method call as well.
424/// You must deal with this correctly.
425///
426/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
427/// the requirement for `poll` to not be called reentrantly.
428#[repr(transparent)]
429pub struct Executor {
430 pub(crate) inner: SyncExecutor,
431
432 _not_sync: PhantomData<*mut ()>,
433}
434
435impl Executor {
436 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
437 mem::transmute(inner)
438 }
439 /// Create a new executor.
440 ///
441 /// When the executor has work to do, it will call `signal_fn` with
442 /// `signal_ctx` as argument.
443 ///
444 /// See [`Executor`] docs for details on `signal_fn`.
445 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
446 Self {
447 inner: SyncExecutor::new(signal_fn, signal_ctx),
448 _not_sync: PhantomData,
449 }
450 }
451
452 /// Spawn a task in this executor.
453 ///
454 /// # Safety
455 ///
456 /// `task` must be a valid pointer to an initialized but not-already-spawned task.
457 ///
458 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
459 /// In this case, the task's Future must be Send. This is because this is effectively
460 /// sending the task to the executor thread.
461 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
462 self.inner.spawn(task)
463 }
464
465 /// Poll all queued tasks in this executor.
466 ///
467 /// This loops over all tasks that are queued to be polled (i.e. they're
468 /// freshly spawned or they've been woken). Other tasks are not polled.
469 ///
470 /// You must call `poll` after receiving a call to `signal_fn`. It is OK
471 /// to call `poll` even when not requested by `signal_fn`, but it wastes
472 /// energy.
473 ///
474 /// # Safety
475 ///
476 /// You must NOT call `poll` reentrantly on the same executor.
477 ///
478 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
479 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
480 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
481 /// no `poll()` already running.
482 pub unsafe fn poll(&'static self) {
483 self.inner.poll()
484 }
444 485
445 /// Get a spawner that spawns tasks in this executor. 486 /// Get a spawner that spawns tasks in this executor.
446 /// 487 ///
@@ -483,8 +524,10 @@ impl embassy_time::queue::TimerQueue for TimerQueue {
483 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { 524 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
484 let task = waker::task_from_waker(waker); 525 let task = waker::task_from_waker(waker);
485 let task = task.header(); 526 let task = task.header();
486 let expires_at = task.expires_at.get(); 527 unsafe {
487 task.expires_at.set(expires_at.min(at)); 528 let expires_at = task.expires_at.get();
529 task.expires_at.set(expires_at.min(at));
530 }
488 } 531 }
489} 532}
490 533