aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2023-11-14 22:32:48 +0100
committerDario Nieuwenhuis <[email protected]>2023-11-15 18:43:27 +0100
commitbef9b7a8539c3dddb1cf6ab46db161f1ca56b1a1 (patch)
tree6d15736eec0029c13093bee120bd2189aa9537ac /embassy-executor/src
parent50a983fd9b8f10fa5153757593e9f8cfccc902ac (diff)
executor: remove atomic-polyfill.
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/cortex_m.rs21
-rw-r--r--embassy-executor/src/arch/riscv32.rs2
-rw-r--r--embassy-executor/src/raw/mod.rs58
-rw-r--r--embassy-executor/src/raw/run_queue_atomics.rs (renamed from embassy-executor/src/raw/run_queue.rs)3
-rw-r--r--embassy-executor/src/raw/run_queue_critical_section.rs75
-rw-r--r--embassy-executor/src/raw/state_atomics.rs73
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs93
-rw-r--r--embassy-executor/src/raw/timer_queue.rs10
8 files changed, 270 insertions, 65 deletions
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs
index fde862f3c..55299c94f 100644
--- a/embassy-executor/src/arch/cortex_m.rs
+++ b/embassy-executor/src/arch/cortex_m.rs
@@ -115,12 +115,12 @@ mod thread {
115pub use interrupt::*; 115pub use interrupt::*;
116#[cfg(feature = "executor-interrupt")] 116#[cfg(feature = "executor-interrupt")]
117mod interrupt { 117mod interrupt {
118 use core::cell::UnsafeCell; 118 use core::cell::{Cell, UnsafeCell};
119 use core::mem::MaybeUninit; 119 use core::mem::MaybeUninit;
120 120
121 use atomic_polyfill::{AtomicBool, Ordering};
122 use cortex_m::interrupt::InterruptNumber; 121 use cortex_m::interrupt::InterruptNumber;
123 use cortex_m::peripheral::NVIC; 122 use cortex_m::peripheral::NVIC;
123 use critical_section::Mutex;
124 124
125 use crate::raw; 125 use crate::raw;
126 126
@@ -146,7 +146,7 @@ mod interrupt {
146 /// It is somewhat more complex to use, it's recommended to use the thread-mode 146 /// It is somewhat more complex to use, it's recommended to use the thread-mode
147 /// [`Executor`] instead, if it works for your use case. 147 /// [`Executor`] instead, if it works for your use case.
148 pub struct InterruptExecutor { 148 pub struct InterruptExecutor {
149 started: AtomicBool, 149 started: Mutex<Cell<bool>>,
150 executor: UnsafeCell<MaybeUninit<raw::Executor>>, 150 executor: UnsafeCell<MaybeUninit<raw::Executor>>,
151 } 151 }
152 152
@@ -158,7 +158,7 @@ mod interrupt {
158 #[inline] 158 #[inline]
159 pub const fn new() -> Self { 159 pub const fn new() -> Self {
160 Self { 160 Self {
161 started: AtomicBool::new(false), 161 started: Mutex::new(Cell::new(false)),
162 executor: UnsafeCell::new(MaybeUninit::uninit()), 162 executor: UnsafeCell::new(MaybeUninit::uninit()),
163 } 163 }
164 } 164 }
@@ -167,7 +167,8 @@ mod interrupt {
167 /// 167 ///
168 /// # Safety 168 /// # Safety
169 /// 169 ///
170 /// You MUST call this from the interrupt handler, and from nowhere else. 170 /// - You MUST call this from the interrupt handler, and from nowhere else.
171 /// - You must not call this before calling `start()`.
171 pub unsafe fn on_interrupt(&'static self) { 172 pub unsafe fn on_interrupt(&'static self) {
172 let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; 173 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
173 executor.poll(); 174 executor.poll();
@@ -196,11 +197,7 @@ mod interrupt {
196 /// do it after. 197 /// do it after.
197 /// 198 ///
198 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { 199 pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner {
199 if self 200 if critical_section::with(|cs| self.started.borrow(cs).replace(true)) {
200 .started
201 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
202 .is_err()
203 {
204 panic!("InterruptExecutor::start() called multiple times on the same executor."); 201 panic!("InterruptExecutor::start() called multiple times on the same executor.");
205 } 202 }
206 203
@@ -222,10 +219,10 @@ mod interrupt {
222 /// This returns a [`SendSpawner`] you can use to spawn tasks on this 219 /// This returns a [`SendSpawner`] you can use to spawn tasks on this
223 /// executor. 220 /// executor.
224 /// 221 ///
225 /// This MUST only be called on an executor that has already been spawned. 222 /// This MUST only be called on an executor that has already been started.
226 /// The function will panic otherwise. 223 /// The function will panic otherwise.
227 pub fn spawner(&'static self) -> crate::SendSpawner { 224 pub fn spawner(&'static self) -> crate::SendSpawner {
228 if !self.started.load(Ordering::Acquire) { 225 if !critical_section::with(|cs| self.started.borrow(cs).get()) {
229 panic!("InterruptExecutor::spawner() called on uninitialized executor."); 226 panic!("InterruptExecutor::spawner() called on uninitialized executor.");
230 } 227 }
231 let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; 228 let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs
index e5c0ff2ec..6814e7844 100644
--- a/embassy-executor/src/arch/riscv32.rs
+++ b/embassy-executor/src/arch/riscv32.rs
@@ -7,9 +7,9 @@ pub use thread::*;
7mod thread { 7mod thread {
8 use core::marker::PhantomData; 8 use core::marker::PhantomData;
9 9
10 use atomic_polyfill::{AtomicBool, Ordering};
11 #[cfg(feature = "nightly")] 10 #[cfg(feature = "nightly")]
12 pub use embassy_macros::main_riscv as main; 11 pub use embassy_macros::main_riscv as main;
12 use portable_atomic::{AtomicBool, Ordering};
13 13
14 use crate::{raw, Spawner}; 14 use crate::{raw, Spawner};
15 15
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index 6d2c1c18a..ed0bedd25 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -7,7 +7,14 @@
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe 7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. 8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe.
9 9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
10mod run_queue; 12mod run_queue;
13
14#[cfg_attr(target_has_atomic = "8", path = "state_atomics.rs")]
15#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
16mod state;
17
11#[cfg(feature = "integrated-timers")] 18#[cfg(feature = "integrated-timers")]
12mod timer_queue; 19mod timer_queue;
13pub(crate) mod util; 20pub(crate) mod util;
@@ -21,7 +28,6 @@ use core::pin::Pin;
21use core::ptr::NonNull; 28use core::ptr::NonNull;
22use core::task::{Context, Poll}; 29use core::task::{Context, Poll};
23 30
24use atomic_polyfill::{AtomicU32, Ordering};
25#[cfg(feature = "integrated-timers")] 31#[cfg(feature = "integrated-timers")]
26use embassy_time::driver::{self, AlarmHandle}; 32use embassy_time::driver::{self, AlarmHandle};
27#[cfg(feature = "integrated-timers")] 33#[cfg(feature = "integrated-timers")]
@@ -30,21 +36,14 @@ use embassy_time::Instant;
30use rtos_trace::trace; 36use rtos_trace::trace;
31 37
32use self::run_queue::{RunQueue, RunQueueItem}; 38use self::run_queue::{RunQueue, RunQueueItem};
39use self::state::State;
33use self::util::{SyncUnsafeCell, UninitCell}; 40use self::util::{SyncUnsafeCell, UninitCell};
34pub use self::waker::task_from_waker; 41pub use self::waker::task_from_waker;
35use super::SpawnToken; 42use super::SpawnToken;
36 43
37/// Task is spawned (has a future)
38pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
39/// Task is in the executor run queue
40pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
41/// Task is in the executor timer queue
42#[cfg(feature = "integrated-timers")]
43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
44
45/// Raw task header for use in task pointers. 44/// Raw task header for use in task pointers.
46pub(crate) struct TaskHeader { 45pub(crate) struct TaskHeader {
47 pub(crate) state: AtomicU32, 46 pub(crate) state: State,
48 pub(crate) run_queue_item: RunQueueItem, 47 pub(crate) run_queue_item: RunQueueItem,
49 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 48 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
50 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 49 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
@@ -116,7 +115,7 @@ impl<F: Future + 'static> TaskStorage<F> {
116 pub const fn new() -> Self { 115 pub const fn new() -> Self {
117 Self { 116 Self {
118 raw: TaskHeader { 117 raw: TaskHeader {
119 state: AtomicU32::new(0), 118 state: State::new(),
120 run_queue_item: RunQueueItem::new(), 119 run_queue_item: RunQueueItem::new(),
121 executor: SyncUnsafeCell::new(None), 120 executor: SyncUnsafeCell::new(None),
122 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 121 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
@@ -161,7 +160,7 @@ impl<F: Future + 'static> TaskStorage<F> {
161 match future.poll(&mut cx) { 160 match future.poll(&mut cx) {
162 Poll::Ready(_) => { 161 Poll::Ready(_) => {
163 this.future.drop_in_place(); 162 this.future.drop_in_place();
164 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 163 this.raw.state.despawn();
165 164
166 #[cfg(feature = "integrated-timers")] 165 #[cfg(feature = "integrated-timers")]
167 this.raw.expires_at.set(Instant::MAX); 166 this.raw.expires_at.set(Instant::MAX);
@@ -193,11 +192,7 @@ impl<F: Future + 'static> AvailableTask<F> {
193 /// 192 ///
194 /// This function returns `None` if a task has already been spawned and has not finished running. 193 /// This function returns `None` if a task has already been spawned and has not finished running.
195 pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> { 194 pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
196 task.raw 195 task.raw.state.spawn().then(|| Self { task })
197 .state
198 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
199 .ok()
200 .map(|_| Self { task })
201 } 196 }
202 197
203 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { 198 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
@@ -394,8 +389,7 @@ impl SyncExecutor {
394 #[cfg(feature = "integrated-timers")] 389 #[cfg(feature = "integrated-timers")]
395 task.expires_at.set(Instant::MAX); 390 task.expires_at.set(Instant::MAX);
396 391
397 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 392 if !task.state.run_dequeue() {
398 if state & STATE_SPAWNED == 0 {
399 // If task is not running, ignore it. This can happen in the following scenario: 393 // If task is not running, ignore it. This can happen in the following scenario:
400 // - Task gets dequeued, poll starts 394 // - Task gets dequeued, poll starts
401 // - While task is being polled, it gets woken. It gets placed in the queue. 395 // - While task is being polled, it gets woken. It gets placed in the queue.
@@ -546,18 +540,7 @@ impl Executor {
546/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 540/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
547pub fn wake_task(task: TaskRef) { 541pub fn wake_task(task: TaskRef) {
548 let header = task.header(); 542 let header = task.header();
549 543 if header.state.run_enqueue() {
550 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
551 // If already scheduled, or if not started,
552 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
553 None
554 } else {
555 // Mark it as scheduled
556 Some(state | STATE_RUN_QUEUED)
557 }
558 });
559
560 if res.is_ok() {
561 // We have just marked the task as scheduled, so enqueue it. 544 // We have just marked the task as scheduled, so enqueue it.
562 unsafe { 545 unsafe {
563 let executor = header.executor.get().unwrap_unchecked(); 546 let executor = header.executor.get().unwrap_unchecked();
@@ -571,18 +554,7 @@ pub fn wake_task(task: TaskRef) {
571/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. 554/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
572pub fn wake_task_no_pend(task: TaskRef) { 555pub fn wake_task_no_pend(task: TaskRef) {
573 let header = task.header(); 556 let header = task.header();
574 557 if header.state.run_enqueue() {
575 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
576 // If already scheduled, or if not started,
577 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
578 None
579 } else {
580 // Mark it as scheduled
581 Some(state | STATE_RUN_QUEUED)
582 }
583 });
584
585 if res.is_ok() {
586 // We have just marked the task as scheduled, so enqueue it. 558 // We have just marked the task as scheduled, so enqueue it.
587 unsafe { 559 unsafe {
588 let executor = header.executor.get().unwrap_unchecked(); 560 let executor = header.executor.get().unwrap_unchecked();
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue_atomics.rs
index f1ec19ac1..90907cfda 100644
--- a/embassy-executor/src/raw/run_queue.rs
+++ b/embassy-executor/src/raw/run_queue_atomics.rs
@@ -1,7 +1,6 @@
1use core::ptr; 1use core::ptr;
2use core::ptr::NonNull; 2use core::ptr::NonNull;
3 3use core::sync::atomic::{AtomicPtr, Ordering};
4use atomic_polyfill::{AtomicPtr, Ordering};
5 4
6use super::{TaskHeader, TaskRef}; 5use super::{TaskHeader, TaskRef};
7use crate::raw::util::SyncUnsafeCell; 6use crate::raw::util::SyncUnsafeCell;
diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs
new file mode 100644
index 000000000..ba59c8f29
--- /dev/null
+++ b/embassy-executor/src/raw/run_queue_critical_section.rs
@@ -0,0 +1,75 @@
1use core::cell::Cell;
2
3use critical_section::{CriticalSection, Mutex};
4
5use super::TaskRef;
6
7pub(crate) struct RunQueueItem {
8 next: Mutex<Cell<Option<TaskRef>>>,
9}
10
11impl RunQueueItem {
12 pub const fn new() -> Self {
13 Self {
14 next: Mutex::new(Cell::new(None)),
15 }
16 }
17}
18
19/// Atomic task queue using a very, very simple lock-free linked-list queue:
20///
21/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
22///
23/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
24/// null. Then the batch is iterated following the next pointers until null is reached.
25///
26/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
27/// for our purposes: it can't create fairness problems since the next batch won't run until the
28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
29/// by waking its own waker) can't prevent other tasks from running.
30pub(crate) struct RunQueue {
31 head: Mutex<Cell<Option<TaskRef>>>,
32}
33
34impl RunQueue {
35 pub const fn new() -> Self {
36 Self {
37 head: Mutex::new(Cell::new(None)),
38 }
39 }
40
41 /// Enqueues an item. Returns true if the queue was empty.
42 ///
43 /// # Safety
44 ///
45 /// `item` must NOT be already enqueued in any queue.
46 #[inline(always)]
47 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
48 critical_section::with(|cs| {
49 let prev = self.head.borrow(cs).replace(Some(task));
50 task.header().run_queue_item.next.borrow(cs).set(prev);
51
52 prev.is_none()
53 })
54 }
55
56 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue.
61 let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
62
63 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = next {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task.
67
68 // safety: we know if the task is enqueued, no one else will touch the `next` pointer.
69 let cs = unsafe { CriticalSection::new() };
70 next = task.header().run_queue_item.next.borrow(cs).get();
71
72 on_task(task);
73 }
74 }
75}
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
new file mode 100644
index 000000000..e1279ac0b
--- /dev/null
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -0,0 +1,73 @@
1use core::sync::atomic::{AtomicU32, Ordering};
2
3/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
5/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
7/// Task is in the executor timer queue
8#[cfg(feature = "integrated-timers")]
9pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
10
11pub(crate) struct State {
12 state: AtomicU32,
13}
14
15impl State {
16 pub const fn new() -> State {
17 Self {
18 state: AtomicU32::new(0),
19 }
20 }
21
22 /// If task is idle, mark it as spawned + run_queued and return true.
23 #[inline(always)]
24 pub fn spawn(&self) -> bool {
25 self.state
26 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
27 .is_ok()
28 }
29
30 /// Unmark the task as spawned.
31 #[inline(always)]
32 pub fn despawn(&self) {
33 self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
34 }
35
36 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
37 #[inline(always)]
38 pub fn run_enqueue(&self) -> bool {
39 self.state
40 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
41 // If already scheduled, or if not started,
42 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
43 None
44 } else {
45 // Mark it as scheduled
46 Some(state | STATE_RUN_QUEUED)
47 }
48 })
49 .is_ok()
50 }
51
52 /// Unmark the task as run-queued. Return whether the task is spawned.
53 #[inline(always)]
54 pub fn run_dequeue(&self) -> bool {
55 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
56 state & STATE_SPAWNED != 0
57 }
58
59 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
60 #[cfg(feature = "integrated-timers")]
61 #[inline(always)]
62 pub fn timer_enqueue(&self) -> bool {
63 let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
64 old_state & STATE_TIMER_QUEUED == 0
65 }
66
67 /// Unmark the task as timer-queued.
68 #[cfg(feature = "integrated-timers")]
69 #[inline(always)]
70 pub fn timer_dequeue(&self) {
71 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
72 }
73}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
new file mode 100644
index 000000000..c3cc1b0b7
--- /dev/null
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -0,0 +1,93 @@
1use core::cell::Cell;
2
3use critical_section::Mutex;
4
5/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
7/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
9/// Task is in the executor timer queue
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
12
13pub(crate) struct State {
14 state: Mutex<Cell<u32>>,
15}
16
17impl State {
18 pub const fn new() -> State {
19 Self {
20 state: Mutex::new(Cell::new(0)),
21 }
22 }
23
24 fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
25 critical_section::with(|cs| {
26 let s = self.state.borrow(cs);
27 let mut val = s.get();
28 let r = f(&mut val);
29 s.set(val);
30 r
31 })
32 }
33
34 /// If task is idle, mark it as spawned + run_queued and return true.
35 #[inline(always)]
36 pub fn spawn(&self) -> bool {
37 self.update(|s| {
38 if *s == 0 {
39 *s = STATE_SPAWNED | STATE_RUN_QUEUED;
40 true
41 } else {
42 false
43 }
44 })
45 }
46
47 /// Unmark the task as spawned.
48 #[inline(always)]
49 pub fn despawn(&self) {
50 self.update(|s| *s &= !STATE_SPAWNED);
51 }
52
53 /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
54 #[inline(always)]
55 pub fn run_enqueue(&self) -> bool {
56 self.update(|s| {
57 if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
58 false
59 } else {
60 *s |= STATE_RUN_QUEUED;
61 true
62 }
63 })
64 }
65
66 /// Unmark the task as run-queued. Return whether the task is spawned.
67 #[inline(always)]
68 pub fn run_dequeue(&self) -> bool {
69 self.update(|s| {
70 let ok = *s & STATE_SPAWNED != 0;
71 *s &= !STATE_RUN_QUEUED;
72 ok
73 })
74 }
75
76 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
77 #[cfg(feature = "integrated-timers")]
78 #[inline(always)]
79 pub fn timer_enqueue(&self) -> bool {
80 self.update(|s| {
81 let ok = *s & STATE_TIMER_QUEUED == 0;
82 *s |= STATE_TIMER_QUEUED;
83 ok
84 })
85 }
86
87 /// Unmark the task as timer-queued.
88 #[cfg(feature = "integrated-timers")]
89 #[inline(always)]
90 pub fn timer_dequeue(&self) {
91 self.update(|s| *s &= !STATE_TIMER_QUEUED);
92 }
93}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index dc71c95b1..59a3b43f5 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,9 +1,8 @@
1use core::cmp::min; 1use core::cmp::min;
2 2
3use atomic_polyfill::Ordering;
4use embassy_time::Instant; 3use embassy_time::Instant;
5 4
6use super::{TaskRef, STATE_TIMER_QUEUED}; 5use super::TaskRef;
7use crate::raw::util::SyncUnsafeCell; 6use crate::raw::util::SyncUnsafeCell;
8 7
9pub(crate) struct TimerQueueItem { 8pub(crate) struct TimerQueueItem {
@@ -32,10 +31,7 @@ impl TimerQueue {
32 pub(crate) unsafe fn update(&self, p: TaskRef) { 31 pub(crate) unsafe fn update(&self, p: TaskRef) {
33 let task = p.header(); 32 let task = p.header();
34 if task.expires_at.get() != Instant::MAX { 33 if task.expires_at.get() != Instant::MAX {
35 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 34 if task.state.timer_enqueue() {
36 let is_new = old_state & STATE_TIMER_QUEUED == 0;
37
38 if is_new {
39 task.timer_queue_item.next.set(self.head.get()); 35 task.timer_queue_item.next.set(self.head.get());
40 self.head.set(Some(p)); 36 self.head.set(Some(p));
41 } 37 }
@@ -75,7 +71,7 @@ impl TimerQueue {
75 } else { 71 } else {
76 // Remove it 72 // Remove it
77 prev.set(task.timer_queue_item.next.get()); 73 prev.set(task.timer_queue_item.next.get());
78 task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); 74 task.state.timer_dequeue();
79 } 75 }
80 } 76 }
81 } 77 }