aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor
diff options
context:
space:
mode:
authorDániel Buga <[email protected]>2024-12-09 08:43:57 +0100
committerDániel Buga <[email protected]>2024-12-13 21:20:57 +0100
commitec96395d084d5edc8be25ddaea8547e2ebd447a6 (patch)
treeb1edf825c8d67013df3cec1283376a7558951a3f /embassy-executor
parentd45ea43892198484b5f6dcea4c351dc11d226cc4 (diff)
Prevent task from respawning while in the timer queue
Diffstat (limited to 'embassy-executor')
-rw-r--r--embassy-executor/src/raw/mod.rs36
-rw-r--r--embassy-executor/src/raw/state_atomics.rs36
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs40
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs29
-rw-r--r--embassy-executor/src/raw/timer_queue.rs15
5 files changed, 152 insertions, 4 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index f9c6509f1..14d689900 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -50,7 +50,7 @@ pub(crate) struct TaskHeader {
50} 50}
51 51
52/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 52/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
53#[derive(Clone, Copy)] 53#[derive(Clone, Copy, PartialEq)]
54pub struct TaskRef { 54pub struct TaskRef {
55 ptr: NonNull<TaskHeader>, 55 ptr: NonNull<TaskHeader>,
56} 56}
@@ -72,6 +72,16 @@ impl TaskRef {
72 } 72 }
73 } 73 }
74 74
75 /// # Safety
76 ///
77 /// The result of this function must only be compared
78 /// for equality, or stored, but not used.
79 pub const unsafe fn dangling() -> Self {
80 Self {
81 ptr: NonNull::dangling(),
82 }
83 }
84
75 pub(crate) fn header(self) -> &'static TaskHeader { 85 pub(crate) fn header(self) -> &'static TaskHeader {
76 unsafe { self.ptr.as_ref() } 86 unsafe { self.ptr.as_ref() }
77 } 87 }
@@ -88,6 +98,30 @@ impl TaskRef {
88 &self.header().timer_queue_item 98 &self.header().timer_queue_item
89 } 99 }
90 100
101 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
102 ///
103 /// Entering this state prevents the task from being respawned while in a timer queue.
104 ///
105 /// Safety:
106 ///
107 /// This functions should only be called by the timer queue implementation, before
108 /// enqueueing the timer item.
109 #[cfg(feature = "integrated-timers")]
110 pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation {
111 self.header().state.timer_enqueue()
112 }
113
114 /// Unmark the task as timer-queued.
115 ///
116 /// Safety:
117 ///
118 /// This functions should only be called by the timer queue implementation, after the task has
119 /// been removed from the timer queue.
120 #[cfg(feature = "integrated-timers")]
121 pub unsafe fn timer_dequeue(&self) {
122 self.header().state.timer_dequeue()
123 }
124
91 /// The returned pointer is valid for the entire TaskStorage. 125 /// The returned pointer is valid for the entire TaskStorage.
92 pub(crate) fn as_ptr(self) -> *const TaskHeader { 126 pub(crate) fn as_ptr(self) -> *const TaskHeader {
93 self.ptr.as_ptr() 127 self.ptr.as_ptr()
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e4127897e..d03c61ade 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,9 +1,15 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1use core::sync::atomic::{AtomicU32, Ordering};
2 2
3#[cfg(feature = "integrated-timers")]
4use super::timer_queue::TimerEnqueueOperation;
5
3/// Task is spawned (has a future) 6/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 7pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
5/// Task is in the executor run queue 8/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 9pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
10/// Task is in the executor timer queue
11#[cfg(feature = "integrated-timers")]
12pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
7 13
8pub(crate) struct State { 14pub(crate) struct State {
9 state: AtomicU32, 15 state: AtomicU32,
@@ -52,4 +58,34 @@ impl State {
52 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 58 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
53 state & STATE_SPAWNED != 0 59 state & STATE_SPAWNED != 0
54 } 60 }
61
62 /// Mark the task as timer-queued. Return whether it can be enqueued.
63 #[cfg(feature = "integrated-timers")]
64 #[inline(always)]
65 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
66 if self
67 .state
68 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
69 // If not started, ignore it
70 if state & STATE_SPAWNED == 0 {
71 None
72 } else {
73 // Mark it as enqueued
74 Some(state | STATE_TIMER_QUEUED)
75 }
76 })
77 .is_ok()
78 {
79 TimerEnqueueOperation::Enqueue
80 } else {
81 TimerEnqueueOperation::Ignore
82 }
83 }
84
85 /// Unmark the task as timer-queued.
86 #[cfg(feature = "integrated-timers")]
87 #[inline(always)]
88 pub fn timer_dequeue(&self) {
89 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed);
90 }
55} 91}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index b673c7359..f6f2e8f08 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,9 +1,14 @@
1use core::arch::asm; 1use core::arch::asm;
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 3
4#[cfg(feature = "integrated-timers")]
5use super::timer_queue::TimerEnqueueOperation;
6
4// Must be kept in sync with the layout of `State`! 7// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 8pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; 9pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16;
7 12
8#[repr(C, align(4))] 13#[repr(C, align(4))]
9pub(crate) struct State { 14pub(crate) struct State {
@@ -11,8 +16,9 @@ pub(crate) struct State {
11 spawned: AtomicBool, 16 spawned: AtomicBool,
12 /// Task is in the executor run queue 17 /// Task is in the executor run queue
13 run_queued: AtomicBool, 18 run_queued: AtomicBool,
19 /// Task is in the executor timer queue
20 timer_queued: AtomicBool,
14 pad: AtomicBool, 21 pad: AtomicBool,
15 pad2: AtomicBool,
16} 22}
17 23
18impl State { 24impl State {
@@ -20,8 +26,8 @@ impl State {
20 Self { 26 Self {
21 spawned: AtomicBool::new(false), 27 spawned: AtomicBool::new(false),
22 run_queued: AtomicBool::new(false), 28 run_queued: AtomicBool::new(false),
29 timer_queued: AtomicBool::new(false),
23 pad: AtomicBool::new(false), 30 pad: AtomicBool::new(false),
24 pad2: AtomicBool::new(false),
25 } 31 }
26 } 32 }
27 33
@@ -85,4 +91,34 @@ impl State {
85 self.run_queued.store(false, Ordering::Relaxed); 91 self.run_queued.store(false, Ordering::Relaxed);
86 r 92 r
87 } 93 }
94
95 /// Mark the task as timer-queued. Return whether it can be enqueued.
96 #[cfg(feature = "integrated-timers")]
97 #[inline(always)]
98 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
99 if self
100 .as_u32()
101 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
102 // If not started, ignore it
103 if state & STATE_SPAWNED == 0 {
104 None
105 } else {
106 // Mark it as enqueued
107 Some(state | STATE_TIMER_QUEUED)
108 }
109 })
110 .is_ok()
111 {
112 TimerEnqueueOperation::Enqueue
113 } else {
114 TimerEnqueueOperation::Ignore
115 }
116 }
117
118 /// Unmark the task as timer-queued.
119 #[cfg(feature = "integrated-timers")]
120 #[inline(always)]
121 pub fn timer_dequeue(&self) {
122 self.timer_queued.store(false, Ordering::Relaxed);
123 }
88} 124}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index b92eed006..c0ec2f530 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -2,10 +2,16 @@ use core::cell::Cell;
2 2
3use critical_section::Mutex; 3use critical_section::Mutex;
4 4
5#[cfg(feature = "integrated-timers")]
6use super::timer_queue::TimerEnqueueOperation;
7
5/// Task is spawned (has a future) 8/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 9pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
7/// Task is in the executor run queue 10/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 11pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
12/// Task is in the executor timer queue
13#[cfg(feature = "integrated-timers")]
14pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
9 15
10pub(crate) struct State { 16pub(crate) struct State {
11 state: Mutex<Cell<u32>>, 17 state: Mutex<Cell<u32>>,
@@ -69,4 +75,27 @@ impl State {
69 ok 75 ok
70 }) 76 })
71 } 77 }
78
79 /// Mark the task as timer-queued. Return whether it can be enqueued.
80 #[cfg(feature = "integrated-timers")]
81 #[inline(always)]
82 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
83 self.update(|s| {
84 // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is
85 // just being spawned, because its executor pointer may still be changing.
86 if *s & STATE_SPAWNED == STATE_SPAWNED {
87 *s |= STATE_TIMER_QUEUED;
88 TimerEnqueueOperation::Enqueue
89 } else {
90 TimerEnqueueOperation::Ignore
91 }
92 })
93 }
94
95 /// Unmark the task as timer-queued.
96 #[cfg(feature = "integrated-timers")]
97 #[inline(always)]
98 pub fn timer_dequeue(&self) {
99 self.update(|s| *s &= !STATE_TIMER_QUEUED);
100 }
72} 101}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 46e346c1b..c36708401 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -7,6 +7,9 @@ use super::TaskRef;
7/// An item in the timer queue. 7/// An item in the timer queue.
8pub struct TimerQueueItem { 8pub struct TimerQueueItem {
9 /// The next item in the queue. 9 /// The next item in the queue.
10 ///
11 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
12 /// value of `Some(dangling_pointer)`
10 pub next: Cell<Option<TaskRef>>, 13 pub next: Cell<Option<TaskRef>>,
11 14
12 /// The time at which this item expires. 15 /// The time at which this item expires.
@@ -19,7 +22,17 @@ impl TimerQueueItem {
19 pub(crate) const fn new() -> Self { 22 pub(crate) const fn new() -> Self {
20 Self { 23 Self {
21 next: Cell::new(None), 24 next: Cell::new(None),
22 expires_at: Cell::new(0), 25 expires_at: Cell::new(u64::MAX),
23 } 26 }
24 } 27 }
25} 28}
29
30/// The operation to perform after `timer_enqueue` is called.
31#[derive(Debug, Copy, Clone, PartialEq)]
32#[cfg_attr(feature = "defmt", derive(defmt::Format))]
33pub enum TimerEnqueueOperation {
34 /// Enqueue the task.
35 Enqueue,
36 /// Update the task's expiration time.
37 Ignore,
38}