diff options
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 20 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_atomics.rs | 2 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue_critical_section.rs | 10 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 20 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics_arm.rs | 19 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_critical_section.rs | 42 |
6 files changed, 73 insertions, 40 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 7da14468d..bcbd214a9 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -386,11 +386,11 @@ impl SyncExecutor { | |||
| 386 | /// - `task` must be set up to run in this executor. | 386 | /// - `task` must be set up to run in this executor. |
| 387 | /// - `task` must NOT be already enqueued (in this executor or another one). | 387 | /// - `task` must NOT be already enqueued (in this executor or another one). |
| 388 | #[inline(always)] | 388 | #[inline(always)] |
| 389 | unsafe fn enqueue(&self, task: TaskRef) { | 389 | unsafe fn enqueue(&self, task: TaskRef, l: state::Token) { |
| 390 | #[cfg(feature = "trace")] | 390 | #[cfg(feature = "trace")] |
| 391 | trace::task_ready_begin(self, &task); | 391 | trace::task_ready_begin(self, &task); |
| 392 | 392 | ||
| 393 | if self.run_queue.enqueue(task) { | 393 | if self.run_queue.enqueue(task, l) { |
| 394 | self.pender.pend(); | 394 | self.pender.pend(); |
| 395 | } | 395 | } |
| 396 | } | 396 | } |
| @@ -401,7 +401,9 @@ impl SyncExecutor { | |||
| 401 | #[cfg(feature = "trace")] | 401 | #[cfg(feature = "trace")] |
| 402 | trace::task_new(self, &task); | 402 | trace::task_new(self, &task); |
| 403 | 403 | ||
| 404 | self.enqueue(task); | 404 | state::locked(|l| { |
| 405 | self.enqueue(task, l); | ||
| 406 | }) | ||
| 405 | } | 407 | } |
| 406 | 408 | ||
| 407 | /// # Safety | 409 | /// # Safety |
| @@ -544,13 +546,13 @@ impl Executor { | |||
| 544 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. | 546 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. |
| 545 | pub fn wake_task(task: TaskRef) { | 547 | pub fn wake_task(task: TaskRef) { |
| 546 | let header = task.header(); | 548 | let header = task.header(); |
| 547 | if header.state.run_enqueue() { | 549 | header.state.run_enqueue(|l| { |
| 548 | // We have just marked the task as scheduled, so enqueue it. | 550 | // We have just marked the task as scheduled, so enqueue it. |
| 549 | unsafe { | 551 | unsafe { |
| 550 | let executor = header.executor.get().unwrap_unchecked(); | 552 | let executor = header.executor.get().unwrap_unchecked(); |
| 551 | executor.enqueue(task); | 553 | executor.enqueue(task, l); |
| 552 | } | 554 | } |
| 553 | } | 555 | }); |
| 554 | } | 556 | } |
| 555 | 557 | ||
| 556 | /// Wake a task by `TaskRef` without calling pend. | 558 | /// Wake a task by `TaskRef` without calling pend. |
| @@ -558,11 +560,11 @@ pub fn wake_task(task: TaskRef) { | |||
| 558 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. | 560 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. |
| 559 | pub fn wake_task_no_pend(task: TaskRef) { | 561 | pub fn wake_task_no_pend(task: TaskRef) { |
| 560 | let header = task.header(); | 562 | let header = task.header(); |
| 561 | if header.state.run_enqueue() { | 563 | header.state.run_enqueue(|l| { |
| 562 | // We have just marked the task as scheduled, so enqueue it. | 564 | // We have just marked the task as scheduled, so enqueue it. |
| 563 | unsafe { | 565 | unsafe { |
| 564 | let executor = header.executor.get().unwrap_unchecked(); | 566 | let executor = header.executor.get().unwrap_unchecked(); |
| 565 | executor.run_queue.enqueue(task); | 567 | executor.run_queue.enqueue(task, l); |
| 566 | } | 568 | } |
| 567 | } | 569 | }); |
| 568 | } | 570 | } |
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs index 90907cfda..efdafdff0 100644 --- a/embassy-executor/src/raw/run_queue_atomics.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs | |||
| @@ -45,7 +45,7 @@ impl RunQueue { | |||
| 45 | /// | 45 | /// |
| 46 | /// `item` must NOT be already enqueued in any queue. | 46 | /// `item` must NOT be already enqueued in any queue. |
| 47 | #[inline(always)] | 47 | #[inline(always)] |
| 48 | pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { | 48 | pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool { |
| 49 | let mut was_empty = false; | 49 | let mut was_empty = false; |
| 50 | 50 | ||
| 51 | self.head | 51 | self.head |
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs index ba59c8f29..90f09e8c8 100644 --- a/embassy-executor/src/raw/run_queue_critical_section.rs +++ b/embassy-executor/src/raw/run_queue_critical_section.rs | |||
| @@ -44,13 +44,11 @@ impl RunQueue { | |||
| 44 | /// | 44 | /// |
| 45 | /// `item` must NOT be already enqueued in any queue. | 45 | /// `item` must NOT be already enqueued in any queue. |
| 46 | #[inline(always)] | 46 | #[inline(always)] |
| 47 | pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { | 47 | pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool { |
| 48 | critical_section::with(|cs| { | 48 | let prev = self.head.borrow(cs).replace(Some(task)); |
| 49 | let prev = self.head.borrow(cs).replace(Some(task)); | 49 | task.header().run_queue_item.next.borrow(cs).set(prev); |
| 50 | task.header().run_queue_item.next.borrow(cs).set(prev); | ||
| 51 | 50 | ||
| 52 | prev.is_none() | 51 | prev.is_none() |
| 53 | }) | ||
| 54 | } | 52 | } |
| 55 | 53 | ||
| 56 | /// Empty the queue, then call `on_task` for each task that was in the queue. | 54 | /// Empty the queue, then call `on_task` for each task that was in the queue. |
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index 15eb9a368..abfe94486 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering}; | |||
| 2 | 2 | ||
| 3 | use super::timer_queue::TimerEnqueueOperation; | 3 | use super::timer_queue::TimerEnqueueOperation; |
| 4 | 4 | ||
| 5 | pub(crate) struct Token(()); | ||
| 6 | |||
| 7 | /// Creates a token and passes it to the closure. | ||
| 8 | /// | ||
| 9 | /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. | ||
| 10 | pub(crate) fn locked(f: impl FnOnce(Token)) { | ||
| 11 | f(Token(())); | ||
| 12 | } | ||
| 13 | |||
| 5 | /// Task is spawned (has a future) | 14 | /// Task is spawned (has a future) |
| 6 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 15 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 7 | /// Task is in the executor run queue | 16 | /// Task is in the executor run queue |
| @@ -34,10 +43,12 @@ impl State { | |||
| 34 | self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); | 43 | self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); |
| 35 | } | 44 | } |
| 36 | 45 | ||
| 37 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. | 46 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given |
| 47 | /// function if the task was successfully marked. | ||
| 38 | #[inline(always)] | 48 | #[inline(always)] |
| 39 | pub fn run_enqueue(&self) -> bool { | 49 | pub fn run_enqueue(&self, f: impl FnOnce(Token)) { |
| 40 | self.state | 50 | if self |
| 51 | .state | ||
| 41 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | 52 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { |
| 42 | // If already scheduled, or if not started, | 53 | // If already scheduled, or if not started, |
| 43 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { | 54 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { |
| @@ -48,6 +59,9 @@ impl State { | |||
| 48 | } | 59 | } |
| 49 | }) | 60 | }) |
| 50 | .is_ok() | 61 | .is_ok() |
| 62 | { | ||
| 63 | locked(f); | ||
| 64 | } | ||
| 51 | } | 65 | } |
| 52 | 66 | ||
| 53 | /// Unmark the task as run-queued. Return whether the task is spawned. | 67 | /// Unmark the task as run-queued. Return whether the task is spawned. |
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index 7a152e8c0..f0f014652 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs | |||
| @@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; | |||
| 3 | 3 | ||
| 4 | use super::timer_queue::TimerEnqueueOperation; | 4 | use super::timer_queue::TimerEnqueueOperation; |
| 5 | 5 | ||
| 6 | pub(crate) struct Token(()); | ||
| 7 | |||
| 8 | /// Creates a token and passes it to the closure. | ||
| 9 | /// | ||
| 10 | /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. | ||
| 11 | pub(crate) fn locked(f: impl FnOnce(Token)) { | ||
| 12 | f(Token(())); | ||
| 13 | } | ||
| 14 | |||
| 6 | // Must be kept in sync with the layout of `State`! | 15 | // Must be kept in sync with the layout of `State`! |
| 7 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 16 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 8 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; | 17 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; |
| @@ -57,9 +66,10 @@ impl State { | |||
| 57 | self.spawned.store(false, Ordering::Relaxed); | 66 | self.spawned.store(false, Ordering::Relaxed); |
| 58 | } | 67 | } |
| 59 | 68 | ||
| 60 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. | 69 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given |
| 70 | /// function if the task was successfully marked. | ||
| 61 | #[inline(always)] | 71 | #[inline(always)] |
| 62 | pub fn run_enqueue(&self) -> bool { | 72 | pub fn run_enqueue(&self, f: impl FnOnce(Token)) { |
| 63 | unsafe { | 73 | unsafe { |
| 64 | loop { | 74 | loop { |
| 65 | let state: u32; | 75 | let state: u32; |
| @@ -67,14 +77,15 @@ impl State { | |||
| 67 | 77 | ||
| 68 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { | 78 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { |
| 69 | asm!("clrex", options(nomem, nostack)); | 79 | asm!("clrex", options(nomem, nostack)); |
| 70 | return false; | 80 | return; |
| 71 | } | 81 | } |
| 72 | 82 | ||
| 73 | let outcome: usize; | 83 | let outcome: usize; |
| 74 | let new_state = state | STATE_RUN_QUEUED; | 84 | let new_state = state | STATE_RUN_QUEUED; |
| 75 | asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack)); | 85 | asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack)); |
| 76 | if outcome == 0 { | 86 | if outcome == 0 { |
| 77 | return true; | 87 | locked(f); |
| 88 | return; | ||
| 78 | } | 89 | } |
| 79 | } | 90 | } |
| 80 | } | 91 | } |
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index 367162ba2..8e570b33c 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs | |||
| @@ -1,6 +1,7 @@ | |||
| 1 | use core::cell::Cell; | 1 | use core::cell::Cell; |
| 2 | 2 | ||
| 3 | use critical_section::Mutex; | 3 | pub(crate) use critical_section::{with as locked, CriticalSection as Token}; |
| 4 | use critical_section::{CriticalSection, Mutex}; | ||
| 4 | 5 | ||
| 5 | use super::timer_queue::TimerEnqueueOperation; | 6 | use super::timer_queue::TimerEnqueueOperation; |
| 6 | 7 | ||
| @@ -23,13 +24,15 @@ impl State { | |||
| 23 | } | 24 | } |
| 24 | 25 | ||
| 25 | fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { | 26 | fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { |
| 26 | critical_section::with(|cs| { | 27 | critical_section::with(|cs| self.update_with_cs(cs, f)) |
| 27 | let s = self.state.borrow(cs); | 28 | } |
| 28 | let mut val = s.get(); | 29 | |
| 29 | let r = f(&mut val); | 30 | fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R { |
| 30 | s.set(val); | 31 | let s = self.state.borrow(cs); |
| 31 | r | 32 | let mut val = s.get(); |
| 32 | }) | 33 | let r = f(&mut val); |
| 34 | s.set(val); | ||
| 35 | r | ||
| 33 | } | 36 | } |
| 34 | 37 | ||
| 35 | /// If task is idle, mark it as spawned + run_queued and return true. | 38 | /// If task is idle, mark it as spawned + run_queued and return true. |
| @@ -51,17 +54,22 @@ impl State { | |||
| 51 | self.update(|s| *s &= !STATE_SPAWNED); | 54 | self.update(|s| *s &= !STATE_SPAWNED); |
| 52 | } | 55 | } |
| 53 | 56 | ||
| 54 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. | 57 | /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given |
| 58 | /// function if the task was successfully marked. | ||
| 55 | #[inline(always)] | 59 | #[inline(always)] |
| 56 | pub fn run_enqueue(&self) -> bool { | 60 | pub fn run_enqueue(&self, f: impl FnOnce(Token)) { |
| 57 | self.update(|s| { | 61 | critical_section::with(|cs| { |
| 58 | if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { | 62 | if self.update_with_cs(cs, |s| { |
| 59 | false | 63 | if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { |
| 60 | } else { | 64 | false |
| 61 | *s |= STATE_RUN_QUEUED; | 65 | } else { |
| 62 | true | 66 | *s |= STATE_RUN_QUEUED; |
| 67 | true | ||
| 68 | } | ||
| 69 | }) { | ||
| 70 | f(cs); | ||
| 63 | } | 71 | } |
| 64 | }) | 72 | }); |
| 65 | } | 73 | } |
| 66 | 74 | ||
| 67 | /// Unmark the task as run-queued. Return whether the task is spawned. | 75 | /// Unmark the task as run-queued. Return whether the task is spawned. |
