aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-executor/src/raw/mod.rs20
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs2
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs10
-rw-r--r--embassy-executor/src/raw/state_atomics.rs20
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs19
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs42
6 files changed, 73 insertions, 40 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 7da14468d..bcbd214a9 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -386,11 +386,11 @@ impl SyncExecutor {
386 /// - `task` must be set up to run in this executor. 386 /// - `task` must be set up to run in this executor.
387 /// - `task` must NOT be already enqueued (in this executor or another one). 387 /// - `task` must NOT be already enqueued (in this executor or another one).
388 #[inline(always)] 388 #[inline(always)]
389 unsafe fn enqueue(&self, task: TaskRef) { 389 unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
390 #[cfg(feature = "trace")] 390 #[cfg(feature = "trace")]
391 trace::task_ready_begin(self, &task); 391 trace::task_ready_begin(self, &task);
392 392
393 if self.run_queue.enqueue(task) { 393 if self.run_queue.enqueue(task, l) {
394 self.pender.pend(); 394 self.pender.pend();
395 } 395 }
396 } 396 }
@@ -401,7 +401,9 @@ impl SyncExecutor {
401 #[cfg(feature = "trace")] 401 #[cfg(feature = "trace")]
402 trace::task_new(self, &task); 402 trace::task_new(self, &task);
403 403
404 self.enqueue(task); 404 state::locked(|l| {
405 self.enqueue(task, l);
406 })
405 } 407 }
406 408
407 /// # Safety 409 /// # Safety
@@ -544,13 +546,13 @@ impl Executor {
544/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 546/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
545pub fn wake_task(task: TaskRef) { 547pub fn wake_task(task: TaskRef) {
546 let header = task.header(); 548 let header = task.header();
547 if header.state.run_enqueue() { 549 header.state.run_enqueue(|l| {
548 // We have just marked the task as scheduled, so enqueue it. 550 // We have just marked the task as scheduled, so enqueue it.
549 unsafe { 551 unsafe {
550 let executor = header.executor.get().unwrap_unchecked(); 552 let executor = header.executor.get().unwrap_unchecked();
551 executor.enqueue(task); 553 executor.enqueue(task, l);
552 } 554 }
553 } 555 });
554} 556}
555 557
556/// Wake a task by `TaskRef` without calling pend. 558/// Wake a task by `TaskRef` without calling pend.
@@ -558,11 +560,11 @@ pub fn wake_task(task: TaskRef) {
558/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 560/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
559pub fn wake_task_no_pend(task: TaskRef) { 561pub fn wake_task_no_pend(task: TaskRef) {
560 let header = task.header(); 562 let header = task.header();
561 if header.state.run_enqueue() { 563 header.state.run_enqueue(|l| {
562 // We have just marked the task as scheduled, so enqueue it. 564 // We have just marked the task as scheduled, so enqueue it.
563 unsafe { 565 unsafe {
564 let executor = header.executor.get().unwrap_unchecked(); 566 let executor = header.executor.get().unwrap_unchecked();
565 executor.run_queue.enqueue(task); 567 executor.run_queue.enqueue(task, l);
566 } 568 }
567 } 569 });
568} 570}
diff --git a/embassy-executor/src/raw/run_queue_atomics.rs b/embassy-executor/src/raw/run_queue_atomics.rs
index 90907cfda..efdafdff0 100644
--- a/embassy-executor/src/raw/run_queue_atomics.rs
+++ b/embassy-executor/src/raw/run_queue_atomics.rs
@@ -45,7 +45,7 @@ impl RunQueue {
45 /// 45 ///
46 /// `item` must NOT be already enqueued in any queue. 46 /// `item` must NOT be already enqueued in any queue.
47 #[inline(always)] 47 #[inline(always)]
48 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { 48 pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
49 let mut was_empty = false; 49 let mut was_empty = false;
50 50
51 self.head 51 self.head
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
index ba59c8f29..90f09e8c8 100644
--- a/embassy-executor/src/raw/run_queue_critical_section.rs
+++ b/embassy-executor/src/raw/run_queue_critical_section.rs
@@ -44,13 +44,11 @@ impl RunQueue {
44 /// 44 ///
45 /// `item` must NOT be already enqueued in any queue. 45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)] 46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { 47 pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
48 critical_section::with(|cs| { 48 let prev = self.head.borrow(cs).replace(Some(task));
49 let prev = self.head.borrow(cs).replace(Some(task)); 49 task.header().run_queue_item.next.borrow(cs).set(prev);
50 task.header().run_queue_item.next.borrow(cs).set(prev);
51 50
52 prev.is_none() 51 prev.is_none()
53 })
54 } 52 }
55 53
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 54 /// Empty the queue, then call `on_task` for each task that was in the queue.
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index 15eb9a368..abfe94486 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering};
2 2
3use super::timer_queue::TimerEnqueueOperation; 3use super::timer_queue::TimerEnqueueOperation;
4 4
5pub(crate) struct Token(());
6
7/// Creates a token and passes it to the closure.
8///
9/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
10pub(crate) fn locked(f: impl FnOnce(Token)) {
11 f(Token(()));
12}
13
5/// Task is spawned (has a future) 14/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 15pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
7/// Task is in the executor run queue 16/// Task is in the executor run queue
@@ -34,10 +43,12 @@ impl State {
34 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 43 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
35 } 44 }
36 45
37 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 46 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
47 /// function if the task was successfully marked.
38 #[inline(always)] 48 #[inline(always)]
39 pub fn run_enqueue(&self) -> bool { 49 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
40 self.state 50 if self
51 .state
41 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { 52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
42 // If already scheduled, or if not started, 53 // If already scheduled, or if not started,
43 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 54 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
@@ -48,6 +59,9 @@ impl State {
48 } 59 }
49 }) 60 })
50 .is_ok() 61 .is_ok()
62 {
63 locked(f);
64 }
51 } 65 }
52 66
53 /// Unmark the task as run-queued. Return whether the task is spawned. 67 /// Unmark the task as run-queued. Return whether the task is spawned.
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index 7a152e8c0..f0f014652 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 3
4use super::timer_queue::TimerEnqueueOperation; 4use super::timer_queue::TimerEnqueueOperation;
5 5
6pub(crate) struct Token(());
7
8/// Creates a token and passes it to the closure.
9///
10/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
11pub(crate) fn locked(f: impl FnOnce(Token)) {
12 f(Token(()));
13}
14
6// Must be kept in sync with the layout of `State`! 15// Must be kept in sync with the layout of `State`!
7pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 16pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; 17pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
@@ -57,9 +66,10 @@ impl State {
57 self.spawned.store(false, Ordering::Relaxed); 66 self.spawned.store(false, Ordering::Relaxed);
58 } 67 }
59 68
60 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 69 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
70 /// function if the task was successfully marked.
61 #[inline(always)] 71 #[inline(always)]
62 pub fn run_enqueue(&self) -> bool { 72 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
63 unsafe { 73 unsafe {
64 loop { 74 loop {
65 let state: u32; 75 let state: u32;
@@ -67,14 +77,15 @@ impl State {
67 77
68 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 78 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
69 asm!("clrex", options(nomem, nostack)); 79 asm!("clrex", options(nomem, nostack));
70 return false; 80 return;
71 } 81 }
72 82
73 let outcome: usize; 83 let outcome: usize;
74 let new_state = state | STATE_RUN_QUEUED; 84 let new_state = state | STATE_RUN_QUEUED;
75 asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack)); 85 asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
76 if outcome == 0 { 86 if outcome == 0 {
77 return true; 87 locked(f);
88 return;
78 } 89 }
79 } 90 }
80 } 91 }
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index 367162ba2..8e570b33c 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -1,6 +1,7 @@
1use core::cell::Cell; 1use core::cell::Cell;
2 2
3use critical_section::Mutex; 3pub(crate) use critical_section::{with as locked, CriticalSection as Token};
4use critical_section::{CriticalSection, Mutex};
4 5
5use super::timer_queue::TimerEnqueueOperation; 6use super::timer_queue::TimerEnqueueOperation;
6 7
@@ -23,13 +24,15 @@ impl State {
23 } 24 }
24 25
25 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R { 26 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
26 critical_section::with(|cs| { 27 critical_section::with(|cs| self.update_with_cs(cs, f))
27 let s = self.state.borrow(cs); 28 }
28 let mut val = s.get(); 29
29 let r = f(&mut val); 30 fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R {
30 s.set(val); 31 let s = self.state.borrow(cs);
31 r 32 let mut val = s.get();
32 }) 33 let r = f(&mut val);
34 s.set(val);
35 r
33 } 36 }
34 37
35 /// If task is idle, mark it as spawned + run_queued and return true. 38 /// If task is idle, mark it as spawned + run_queued and return true.
@@ -51,17 +54,22 @@ impl State {
51 self.update(|s| *s &= !STATE_SPAWNED); 54 self.update(|s| *s &= !STATE_SPAWNED);
52 } 55 }
53 56
54 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. 57 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
58 /// function if the task was successfully marked.
55 #[inline(always)] 59 #[inline(always)]
56 pub fn run_enqueue(&self) -> bool { 60 pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
57 self.update(|s| { 61 critical_section::with(|cs| {
58 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { 62 if self.update_with_cs(cs, |s| {
59 false 63 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
60 } else { 64 false
61 *s |= STATE_RUN_QUEUED; 65 } else {
62 true 66 *s |= STATE_RUN_QUEUED;
67 true
68 }
69 }) {
70 f(cs);
63 } 71 }
64 }) 72 });
65 } 73 }
66 74
67 /// Unmark the task as run-queued. Return whether the task is spawned. 75 /// Unmark the task as run-queued. Return whether the task is spawned.