diff options
Diffstat (limited to 'embassy-executor/src/raw/mod.rs')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 179 |
1 files changed, 111 insertions, 68 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 42bd82262..15ff18fc8 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -13,11 +13,12 @@ mod timer_queue; | |||
| 13 | pub(crate) mod util; | 13 | pub(crate) mod util; |
| 14 | mod waker; | 14 | mod waker; |
| 15 | 15 | ||
| 16 | use core::cell::Cell; | ||
| 17 | use core::future::Future; | 16 | use core::future::Future; |
| 17 | use core::marker::PhantomData; | ||
| 18 | use core::mem; | 18 | use core::mem; |
| 19 | use core::pin::Pin; | 19 | use core::pin::Pin; |
| 20 | use core::ptr::NonNull; | 20 | use core::ptr::NonNull; |
| 21 | use core::sync::atomic::AtomicPtr; | ||
| 21 | use core::task::{Context, Poll}; | 22 | use core::task::{Context, Poll}; |
| 22 | 23 | ||
| 23 | use atomic_polyfill::{AtomicU32, Ordering}; | 24 | use atomic_polyfill::{AtomicU32, Ordering}; |
| @@ -30,7 +31,7 @@ use embassy_time::Instant; | |||
| 30 | use rtos_trace::trace; | 31 | use rtos_trace::trace; |
| 31 | 32 | ||
| 32 | use self::run_queue::{RunQueue, RunQueueItem}; | 33 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 33 | use self::util::UninitCell; | 34 | use self::util::{SyncUnsafeCell, UninitCell}; |
| 34 | pub use self::waker::task_from_waker; | 35 | pub use self::waker::task_from_waker; |
| 35 | use super::SpawnToken; | 36 | use super::SpawnToken; |
| 36 | 37 | ||
| @@ -46,11 +47,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | |||
| 46 | pub(crate) struct TaskHeader { | 47 | pub(crate) struct TaskHeader { |
| 47 | pub(crate) state: AtomicU32, | 48 | pub(crate) state: AtomicU32, |
| 48 | pub(crate) run_queue_item: RunQueueItem, | 49 | pub(crate) run_queue_item: RunQueueItem, |
| 49 | pub(crate) executor: Cell<Option<&'static Executor>>, | 50 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, |
| 50 | poll_fn: Cell<Option<unsafe fn(TaskRef)>>, | 51 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 51 | 52 | ||
| 52 | #[cfg(feature = "integrated-timers")] | 53 | #[cfg(feature = "integrated-timers")] |
| 53 | pub(crate) expires_at: Cell<Instant>, | 54 | pub(crate) expires_at: SyncUnsafeCell<Instant>, |
| 54 | #[cfg(feature = "integrated-timers")] | 55 | #[cfg(feature = "integrated-timers")] |
| 55 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 56 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 56 | } | 57 | } |
| @@ -61,6 +62,9 @@ pub struct TaskRef { | |||
| 61 | ptr: NonNull<TaskHeader>, | 62 | ptr: NonNull<TaskHeader>, |
| 62 | } | 63 | } |
| 63 | 64 | ||
| 65 | unsafe impl Send for TaskRef where &'static TaskHeader: Send {} | ||
| 66 | unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {} | ||
| 67 | |||
| 64 | impl TaskRef { | 68 | impl TaskRef { |
| 65 | fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { | 69 | fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { |
| 66 | Self { | 70 | Self { |
| @@ -115,12 +119,12 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 115 | raw: TaskHeader { | 119 | raw: TaskHeader { |
| 116 | state: AtomicU32::new(0), | 120 | state: AtomicU32::new(0), |
| 117 | run_queue_item: RunQueueItem::new(), | 121 | run_queue_item: RunQueueItem::new(), |
| 118 | executor: Cell::new(None), | 122 | executor: SyncUnsafeCell::new(None), |
| 119 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 123 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| 120 | poll_fn: Cell::new(None), | 124 | poll_fn: SyncUnsafeCell::new(None), |
| 121 | 125 | ||
| 122 | #[cfg(feature = "integrated-timers")] | 126 | #[cfg(feature = "integrated-timers")] |
| 123 | expires_at: Cell::new(Instant::from_ticks(0)), | 127 | expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)), |
| 124 | #[cfg(feature = "integrated-timers")] | 128 | #[cfg(feature = "integrated-timers")] |
| 125 | timer_queue_item: timer_queue::TimerQueueItem::new(), | 129 | timer_queue_item: timer_queue::TimerQueueItem::new(), |
| 126 | }, | 130 | }, |
| @@ -170,9 +174,15 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 170 | // it's a noop for our waker. | 174 | // it's a noop for our waker. |
| 171 | mem::forget(waker); | 175 | mem::forget(waker); |
| 172 | } | 176 | } |
| 173 | } | ||
| 174 | 177 | ||
| 175 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} | 178 | #[doc(hidden)] |
| 179 | #[allow(dead_code)] | ||
| 180 | fn _assert_sync(self) { | ||
| 181 | fn assert_sync<T: Sync>(_: T) {} | ||
| 182 | |||
| 183 | assert_sync(self) | ||
| 184 | } | ||
| 185 | } | ||
| 176 | 186 | ||
| 177 | struct AvailableTask<F: Future + 'static> { | 187 | struct AvailableTask<F: Future + 'static> { |
| 178 | task: &'static TaskStorage<F>, | 188 | task: &'static TaskStorage<F>, |
| @@ -279,29 +289,10 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 279 | } | 289 | } |
| 280 | } | 290 | } |
| 281 | 291 | ||
| 282 | /// Raw executor. | 292 | pub(crate) struct SyncExecutor { |
| 283 | /// | ||
| 284 | /// This is the core of the Embassy executor. It is low-level, requiring manual | ||
| 285 | /// handling of wakeups and task polling. If you can, prefer using one of the | ||
| 286 | /// [higher level executors](crate::Executor). | ||
| 287 | /// | ||
| 288 | /// The raw executor leaves it up to you to handle wakeups and scheduling: | ||
| 289 | /// | ||
| 290 | /// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks | ||
| 291 | /// that "want to run"). | ||
| 292 | /// - You must supply a `signal_fn`. The executor will call it to notify you it has work | ||
| 293 | /// to do. You must arrange for `poll()` to be called as soon as possible. | ||
| 294 | /// | ||
| 295 | /// `signal_fn` can be called from *any* context: any thread, any interrupt priority | ||
| 296 | /// level, etc. It may be called synchronously from any `Executor` method call as well. | ||
| 297 | /// You must deal with this correctly. | ||
| 298 | /// | ||
| 299 | /// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates | ||
| 300 | /// the requirement for `poll` to not be called reentrantly. | ||
| 301 | pub struct Executor { | ||
| 302 | run_queue: RunQueue, | 293 | run_queue: RunQueue, |
| 303 | signal_fn: fn(*mut ()), | 294 | signal_fn: fn(*mut ()), |
| 304 | signal_ctx: *mut (), | 295 | signal_ctx: AtomicPtr<()>, |
| 305 | 296 | ||
| 306 | #[cfg(feature = "integrated-timers")] | 297 | #[cfg(feature = "integrated-timers")] |
| 307 | pub(crate) timer_queue: timer_queue::TimerQueue, | 298 | pub(crate) timer_queue: timer_queue::TimerQueue, |
| @@ -309,14 +300,8 @@ pub struct Executor { | |||
| 309 | alarm: AlarmHandle, | 300 | alarm: AlarmHandle, |
| 310 | } | 301 | } |
| 311 | 302 | ||
| 312 | impl Executor { | 303 | impl SyncExecutor { |
| 313 | /// Create a new executor. | 304 | pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { |
| 314 | /// | ||
| 315 | /// When the executor has work to do, it will call `signal_fn` with | ||
| 316 | /// `signal_ctx` as argument. | ||
| 317 | /// | ||
| 318 | /// See [`Executor`] docs for details on `signal_fn`. | ||
| 319 | pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { | ||
| 320 | #[cfg(feature = "integrated-timers")] | 305 | #[cfg(feature = "integrated-timers")] |
| 321 | let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; | 306 | let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; |
| 322 | #[cfg(feature = "integrated-timers")] | 307 | #[cfg(feature = "integrated-timers")] |
| @@ -325,7 +310,7 @@ impl Executor { | |||
| 325 | Self { | 310 | Self { |
| 326 | run_queue: RunQueue::new(), | 311 | run_queue: RunQueue::new(), |
| 327 | signal_fn, | 312 | signal_fn, |
| 328 | signal_ctx, | 313 | signal_ctx: AtomicPtr::new(signal_ctx), |
| 329 | 314 | ||
| 330 | #[cfg(feature = "integrated-timers")] | 315 | #[cfg(feature = "integrated-timers")] |
| 331 | timer_queue: timer_queue::TimerQueue::new(), | 316 | timer_queue: timer_queue::TimerQueue::new(), |
| @@ -346,19 +331,10 @@ impl Executor { | |||
| 346 | trace::task_ready_begin(task.as_ptr() as u32); | 331 | trace::task_ready_begin(task.as_ptr() as u32); |
| 347 | 332 | ||
| 348 | if self.run_queue.enqueue(cs, task) { | 333 | if self.run_queue.enqueue(cs, task) { |
| 349 | (self.signal_fn)(self.signal_ctx) | 334 | (self.signal_fn)(self.signal_ctx.load(Ordering::Relaxed)) |
| 350 | } | 335 | } |
| 351 | } | 336 | } |
| 352 | 337 | ||
| 353 | /// Spawn a task in this executor. | ||
| 354 | /// | ||
| 355 | /// # Safety | ||
| 356 | /// | ||
| 357 | /// `task` must be a valid pointer to an initialized but not-already-spawned task. | ||
| 358 | /// | ||
| 359 | /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. | ||
| 360 | /// In this case, the task's Future must be Send. This is because this is effectively | ||
| 361 | /// sending the task to the executor thread. | ||
| 362 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | 338 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 363 | task.header().executor.set(Some(self)); | 339 | task.header().executor.set(Some(self)); |
| 364 | 340 | ||
| @@ -370,24 +346,11 @@ impl Executor { | |||
| 370 | }) | 346 | }) |
| 371 | } | 347 | } |
| 372 | 348 | ||
| 373 | /// Poll all queued tasks in this executor. | ||
| 374 | /// | ||
| 375 | /// This loops over all tasks that are queued to be polled (i.e. they're | ||
| 376 | /// freshly spawned or they've been woken). Other tasks are not polled. | ||
| 377 | /// | ||
| 378 | /// You must call `poll` after receiving a call to `signal_fn`. It is OK | ||
| 379 | /// to call `poll` even when not requested by `signal_fn`, but it wastes | ||
| 380 | /// energy. | ||
| 381 | /// | ||
| 382 | /// # Safety | 349 | /// # Safety |
| 383 | /// | 350 | /// |
| 384 | /// You must NOT call `poll` reentrantly on the same executor. | 351 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. |
| 385 | /// | 352 | pub(crate) unsafe fn poll(&'static self) { |
| 386 | /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you | 353 | #[allow(clippy::never_loop)] |
| 387 | /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to | ||
| 388 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's | ||
| 389 | /// no `poll()` already running. | ||
| 390 | pub unsafe fn poll(&'static self) { | ||
| 391 | loop { | 354 | loop { |
| 392 | #[cfg(feature = "integrated-timers")] | 355 | #[cfg(feature = "integrated-timers")] |
| 393 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | 356 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); |
| @@ -441,6 +404,84 @@ impl Executor { | |||
| 441 | #[cfg(feature = "rtos-trace")] | 404 | #[cfg(feature = "rtos-trace")] |
| 442 | trace::system_idle(); | 405 | trace::system_idle(); |
| 443 | } | 406 | } |
| 407 | } | ||
| 408 | |||
| 409 | /// Raw executor. | ||
| 410 | /// | ||
| 411 | /// This is the core of the Embassy executor. It is low-level, requiring manual | ||
| 412 | /// handling of wakeups and task polling. If you can, prefer using one of the | ||
| 413 | /// [higher level executors](crate::Executor). | ||
| 414 | /// | ||
| 415 | /// The raw executor leaves it up to you to handle wakeups and scheduling: | ||
| 416 | /// | ||
| 417 | /// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks | ||
| 418 | /// that "want to run"). | ||
| 419 | /// - You must supply a `signal_fn`. The executor will call it to notify you it has work | ||
| 420 | /// to do. You must arrange for `poll()` to be called as soon as possible. | ||
| 421 | /// | ||
| 422 | /// `signal_fn` can be called from *any* context: any thread, any interrupt priority | ||
| 423 | /// level, etc. It may be called synchronously from any `Executor` method call as well. | ||
| 424 | /// You must deal with this correctly. | ||
| 425 | /// | ||
| 426 | /// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates | ||
| 427 | /// the requirement for `poll` to not be called reentrantly. | ||
| 428 | #[repr(transparent)] | ||
| 429 | pub struct Executor { | ||
| 430 | pub(crate) inner: SyncExecutor, | ||
| 431 | |||
| 432 | _not_sync: PhantomData<*mut ()>, | ||
| 433 | } | ||
| 434 | |||
| 435 | impl Executor { | ||
| 436 | pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { | ||
| 437 | mem::transmute(inner) | ||
| 438 | } | ||
| 439 | /// Create a new executor. | ||
| 440 | /// | ||
| 441 | /// When the executor has work to do, it will call `signal_fn` with | ||
| 442 | /// `signal_ctx` as argument. | ||
| 443 | /// | ||
| 444 | /// See [`Executor`] docs for details on `signal_fn`. | ||
| 445 | pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { | ||
| 446 | Self { | ||
| 447 | inner: SyncExecutor::new(signal_fn, signal_ctx), | ||
| 448 | _not_sync: PhantomData, | ||
| 449 | } | ||
| 450 | } | ||
| 451 | |||
| 452 | /// Spawn a task in this executor. | ||
| 453 | /// | ||
| 454 | /// # Safety | ||
| 455 | /// | ||
| 456 | /// `task` must be a valid pointer to an initialized but not-already-spawned task. | ||
| 457 | /// | ||
| 458 | /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. | ||
| 459 | /// In this case, the task's Future must be Send. This is because this is effectively | ||
| 460 | /// sending the task to the executor thread. | ||
| 461 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | ||
| 462 | self.inner.spawn(task) | ||
| 463 | } | ||
| 464 | |||
| 465 | /// Poll all queued tasks in this executor. | ||
| 466 | /// | ||
| 467 | /// This loops over all tasks that are queued to be polled (i.e. they're | ||
| 468 | /// freshly spawned or they've been woken). Other tasks are not polled. | ||
| 469 | /// | ||
| 470 | /// You must call `poll` after receiving a call to `signal_fn`. It is OK | ||
| 471 | /// to call `poll` even when not requested by `signal_fn`, but it wastes | ||
| 472 | /// energy. | ||
| 473 | /// | ||
| 474 | /// # Safety | ||
| 475 | /// | ||
| 476 | /// You must NOT call `poll` reentrantly on the same executor. | ||
| 477 | /// | ||
| 478 | /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you | ||
| 479 | /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to | ||
| 480 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's | ||
| 481 | /// no `poll()` already running. | ||
| 482 | pub unsafe fn poll(&'static self) { | ||
| 483 | self.inner.poll() | ||
| 484 | } | ||
| 444 | 485 | ||
| 445 | /// Get a spawner that spawns tasks in this executor. | 486 | /// Get a spawner that spawns tasks in this executor. |
| 446 | /// | 487 | /// |
| @@ -483,8 +524,10 @@ impl embassy_time::queue::TimerQueue for TimerQueue { | |||
| 483 | fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { | 524 | fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { |
| 484 | let task = waker::task_from_waker(waker); | 525 | let task = waker::task_from_waker(waker); |
| 485 | let task = task.header(); | 526 | let task = task.header(); |
| 486 | let expires_at = task.expires_at.get(); | 527 | unsafe { |
| 487 | task.expires_at.set(expires_at.min(at)); | 528 | let expires_at = task.expires_at.get(); |
| 529 | task.expires_at.set(expires_at.min(at)); | ||
| 530 | } | ||
| 488 | } | 531 | } |
| 489 | } | 532 | } |
| 490 | 533 | ||
