aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
-rw-r--r--embassy-executor/src/raw/mod.rs126
1 files changed, 90 insertions, 36 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 15ff18fc8..bd0cff26b 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -11,6 +11,7 @@ mod run_queue;
11#[cfg(feature = "integrated-timers")] 11#[cfg(feature = "integrated-timers")]
12mod timer_queue; 12mod timer_queue;
13pub(crate) mod util; 13pub(crate) mod util;
14#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
14mod waker; 15mod waker;
15 16
16use core::future::Future; 17use core::future::Future;
@@ -18,11 +19,9 @@ use core::marker::PhantomData;
18use core::mem; 19use core::mem;
19use core::pin::Pin; 20use core::pin::Pin;
20use core::ptr::NonNull; 21use core::ptr::NonNull;
21use core::sync::atomic::AtomicPtr;
22use core::task::{Context, Poll}; 22use core::task::{Context, Poll};
23 23
24use atomic_polyfill::{AtomicU32, Ordering}; 24use atomic_polyfill::{AtomicU32, Ordering};
25use critical_section::CriticalSection;
26#[cfg(feature = "integrated-timers")] 25#[cfg(feature = "integrated-timers")]
27use embassy_time::driver::{self, AlarmHandle}; 26use embassy_time::driver::{self, AlarmHandle};
28#[cfg(feature = "integrated-timers")] 27#[cfg(feature = "integrated-timers")]
@@ -289,10 +288,60 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
289 } 288 }
290} 289}
291 290
291#[derive(Clone, Copy)]
292pub(crate) enum PenderInner {
293 #[cfg(feature = "executor-thread")]
294 Thread(crate::arch::ThreadPender),
295 #[cfg(feature = "executor-interrupt")]
296 Interrupt(crate::arch::InterruptPender),
297 #[cfg(feature = "pender-callback")]
298 Callback { func: fn(*mut ()), context: *mut () },
299}
300
301unsafe impl Send for PenderInner {}
302unsafe impl Sync for PenderInner {}
303
304/// Platform/architecture-specific action executed when an executor has pending work.
305///
306/// When a task within an executor is woken, the `Pender` is called. This does a
307/// platform/architecture-specific action to signal there is pending work in the executor.
308/// When this happens, you must arrange for [`Executor::poll`] to be called.
309///
310/// You can think of it as a waker, but for the whole executor.
311pub struct Pender(pub(crate) PenderInner);
312
313impl Pender {
314 /// Create a `Pender` that will call an arbitrary function pointer.
315 ///
316 /// # Arguments
317 ///
318 /// - `func`: The function pointer to call.
319 /// - `context`: Opaque context pointer, that will be passed to the function pointer.
320 #[cfg(feature = "pender-callback")]
321 pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self {
322 Self(PenderInner::Callback {
323 func,
324 context: context.into(),
325 })
326 }
327}
328
329impl Pender {
330 pub(crate) fn pend(&self) {
331 match self.0 {
332 #[cfg(feature = "executor-thread")]
333 PenderInner::Thread(x) => x.pend(),
334 #[cfg(feature = "executor-interrupt")]
335 PenderInner::Interrupt(x) => x.pend(),
336 #[cfg(feature = "pender-callback")]
337 PenderInner::Callback { func, context } => func(context),
338 }
339 }
340}
341
292pub(crate) struct SyncExecutor { 342pub(crate) struct SyncExecutor {
293 run_queue: RunQueue, 343 run_queue: RunQueue,
294 signal_fn: fn(*mut ()), 344 pender: Pender,
295 signal_ctx: AtomicPtr<()>,
296 345
297 #[cfg(feature = "integrated-timers")] 346 #[cfg(feature = "integrated-timers")]
298 pub(crate) timer_queue: timer_queue::TimerQueue, 347 pub(crate) timer_queue: timer_queue::TimerQueue,
@@ -301,16 +350,13 @@ pub(crate) struct SyncExecutor {
301} 350}
302 351
303impl SyncExecutor { 352impl SyncExecutor {
304 pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { 353 pub(crate) fn new(pender: Pender) -> Self {
305 #[cfg(feature = "integrated-timers")] 354 #[cfg(feature = "integrated-timers")]
306 let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; 355 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
307 #[cfg(feature = "integrated-timers")]
308 driver::set_alarm_callback(alarm, signal_fn, signal_ctx);
309 356
310 Self { 357 Self {
311 run_queue: RunQueue::new(), 358 run_queue: RunQueue::new(),
312 signal_fn, 359 pender,
313 signal_ctx: AtomicPtr::new(signal_ctx),
314 360
315 #[cfg(feature = "integrated-timers")] 361 #[cfg(feature = "integrated-timers")]
316 timer_queue: timer_queue::TimerQueue::new(), 362 timer_queue: timer_queue::TimerQueue::new(),
@@ -326,30 +372,37 @@ impl SyncExecutor {
326 /// - `task` must be set up to run in this executor. 372 /// - `task` must be set up to run in this executor.
327 /// - `task` must NOT be already enqueued (in this executor or another one). 373 /// - `task` must NOT be already enqueued (in this executor or another one).
328 #[inline(always)] 374 #[inline(always)]
329 unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) { 375 unsafe fn enqueue(&self, task: TaskRef) {
330 #[cfg(feature = "rtos-trace")] 376 #[cfg(feature = "rtos-trace")]
331 trace::task_ready_begin(task.as_ptr() as u32); 377 trace::task_ready_begin(task.as_ptr() as u32);
332 378
333 if self.run_queue.enqueue(cs, task) { 379 if self.run_queue.enqueue(task) {
334 (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed)) 380 self.pender.pend();
335 } 381 }
336 } 382 }
337 383
384 #[cfg(feature = "integrated-timers")]
385 fn alarm_callback(ctx: *mut ()) {
386 let this: &Self = unsafe { &*(ctx as *const Self) };
387 this.pender.pend();
388 }
389
338 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 390 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
339 task.header().executor.set(Some(self)); 391 task.header().executor.set(Some(self));
340 392
341 #[cfg(feature = "rtos-trace")] 393 #[cfg(feature = "rtos-trace")]
342 trace::task_new(task.as_ptr() as u32); 394 trace::task_new(task.as_ptr() as u32);
343 395
344 critical_section::with(|cs| { 396 self.enqueue(task);
345 self.enqueue(cs, task);
346 })
347 } 397 }
348 398
349 /// # Safety 399 /// # Safety
350 /// 400 ///
351 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. 401 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
352 pub(crate) unsafe fn poll(&'static self) { 402 pub(crate) unsafe fn poll(&'static self) {
403 #[cfg(feature = "integrated-timers")]
404 driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
405
353 #[allow(clippy::never_loop)] 406 #[allow(clippy::never_loop)]
354 loop { 407 loop {
355 #[cfg(feature = "integrated-timers")] 408 #[cfg(feature = "integrated-timers")]
@@ -416,14 +469,14 @@ impl SyncExecutor {
416/// 469///
417/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks 470/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
418/// that "want to run"). 471/// that "want to run").
419/// - You must supply a `signal_fn`. The executor will call it to notify you it has work 472/// - You must supply a [`Pender`]. The executor will call it to notify you it has work
420/// to do. You must arrange for `poll()` to be called as soon as possible. 473/// to do. You must arrange for `poll()` to be called as soon as possible.
421/// 474///
422/// `signal_fn` can be called from *any* context: any thread, any interrupt priority 475/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority
423/// level, etc. It may be called synchronously from any `Executor` method call as well. 476/// level, etc. It may be called synchronously from any `Executor` method call as well.
424/// You must deal with this correctly. 477/// You must deal with this correctly.
425/// 478///
426/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates 479/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
427/// the requirement for `poll` to not be called reentrantly. 480/// the requirement for `poll` to not be called reentrantly.
428#[repr(transparent)] 481#[repr(transparent)]
429pub struct Executor { 482pub struct Executor {
@@ -436,15 +489,15 @@ impl Executor {
436 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { 489 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
437 mem::transmute(inner) 490 mem::transmute(inner)
438 } 491 }
492
439 /// Create a new executor. 493 /// Create a new executor.
440 /// 494 ///
441 /// When the executor has work to do, it will call `signal_fn` with 495 /// When the executor has work to do, it will call the [`Pender`].
442 /// `signal_ctx` as argument.
443 /// 496 ///
444 /// See [`Executor`] docs for details on `signal_fn`. 497 /// See [`Executor`] docs for details on `Pender`.
445 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { 498 pub fn new(pender: Pender) -> Self {
446 Self { 499 Self {
447 inner: SyncExecutor::new(signal_fn, signal_ctx), 500 inner: SyncExecutor::new(pender),
448 _not_sync: PhantomData, 501 _not_sync: PhantomData,
449 } 502 }
450 } 503 }
@@ -467,16 +520,16 @@ impl Executor {
467 /// This loops over all tasks that are queued to be polled (i.e. they're 520 /// This loops over all tasks that are queued to be polled (i.e. they're
468 /// freshly spawned or they've been woken). Other tasks are not polled. 521 /// freshly spawned or they've been woken). Other tasks are not polled.
469 /// 522 ///
470 /// You must call `poll` after receiving a call to `signal_fn`. It is OK 523 /// You must call `poll` after receiving a call to the [`Pender`]. It is OK
471 /// to call `poll` even when not requested by `signal_fn`, but it wastes 524 /// to call `poll` even when not requested by the `Pender`, but it wastes
472 /// energy. 525 /// energy.
473 /// 526 ///
474 /// # Safety 527 /// # Safety
475 /// 528 ///
476 /// You must NOT call `poll` reentrantly on the same executor. 529 /// You must NOT call `poll` reentrantly on the same executor.
477 /// 530 ///
478 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you 531 /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you
479 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to 532 /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to
480 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 533 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
481 /// no `poll()` already running. 534 /// no `poll()` already running.
482 pub unsafe fn poll(&'static self) { 535 pub unsafe fn poll(&'static self) {
@@ -496,24 +549,25 @@ impl Executor {
496/// 549///
497/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 550/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
498pub fn wake_task(task: TaskRef) { 551pub fn wake_task(task: TaskRef) {
499 critical_section::with(|cs| { 552 let header = task.header();
500 let header = task.header();
501 let state = header.state.load(Ordering::Relaxed);
502 553
554 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
503 // If already scheduled, or if not started, 555 // If already scheduled, or if not started,
504 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 556 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
505 return; 557 None
558 } else {
559 // Mark it as scheduled
560 Some(state | STATE_RUN_QUEUED)
506 } 561 }
562 });
507 563
508 // Mark it as scheduled 564 if res.is_ok() {
509 header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
510
511 // We have just marked the task as scheduled, so enqueue it. 565 // We have just marked the task as scheduled, so enqueue it.
512 unsafe { 566 unsafe {
513 let executor = header.executor.get().unwrap_unchecked(); 567 let executor = header.executor.get().unwrap_unchecked();
514 executor.enqueue(cs, task); 568 executor.enqueue(task);
515 } 569 }
516 }) 570 }
517} 571}
518 572
519#[cfg(feature = "integrated-timers")] 573#[cfg(feature = "integrated-timers")]