diff options
| author | Dániel Buga <[email protected]> | 2024-12-09 08:43:57 +0100 |
|---|---|---|
| committer | Dániel Buga <[email protected]> | 2024-12-13 21:20:57 +0100 |
| commit | ec96395d084d5edc8be25ddaea8547e2ebd447a6 (patch) | |
| tree | b1edf825c8d67013df3cec1283376a7558951a3f /embassy-executor/src | |
| parent | d45ea43892198484b5f6dcea4c351dc11d226cc4 (diff) | |
Prevent task from respawning while in the timer queue
Diffstat (limited to 'embassy-executor/src')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 36 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 36 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics_arm.rs | 40 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_critical_section.rs | 29 | ||||
| -rw-r--r-- | embassy-executor/src/raw/timer_queue.rs | 15 |
5 files changed, 152 insertions, 4 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f9c6509f1..14d689900 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -50,7 +50,7 @@ pub(crate) struct TaskHeader { | |||
| 50 | } | 50 | } |
| 51 | 51 | ||
| 52 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. | 52 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 53 | #[derive(Clone, Copy)] | 53 | #[derive(Clone, Copy, PartialEq)] |
| 54 | pub struct TaskRef { | 54 | pub struct TaskRef { |
| 55 | ptr: NonNull<TaskHeader>, | 55 | ptr: NonNull<TaskHeader>, |
| 56 | } | 56 | } |
| @@ -72,6 +72,16 @@ impl TaskRef { | |||
| 72 | } | 72 | } |
| 73 | } | 73 | } |
| 74 | 74 | ||
| 75 | /// # Safety | ||
| 76 | /// | ||
| 77 | /// The result of this function must only be compared | ||
| 78 | /// for equality, or stored, but not used. | ||
| 79 | pub const unsafe fn dangling() -> Self { | ||
| 80 | Self { | ||
| 81 | ptr: NonNull::dangling(), | ||
| 82 | } | ||
| 83 | } | ||
| 84 | |||
| 75 | pub(crate) fn header(self) -> &'static TaskHeader { | 85 | pub(crate) fn header(self) -> &'static TaskHeader { |
| 76 | unsafe { self.ptr.as_ref() } | 86 | unsafe { self.ptr.as_ref() } |
| 77 | } | 87 | } |
| @@ -88,6 +98,30 @@ impl TaskRef { | |||
| 88 | &self.header().timer_queue_item | 98 | &self.header().timer_queue_item |
| 89 | } | 99 | } |
| 90 | 100 | ||
| 101 | /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) | ||
| 102 | /// | ||
| 103 | /// Entering this state prevents the task from being respawned while in a timer queue. | ||
| 104 | /// | ||
| 105 | /// Safety: | ||
| 106 | /// | ||
| 107 | /// This functions should only be called by the timer queue implementation, before | ||
| 108 | /// enqueueing the timer item. | ||
| 109 | #[cfg(feature = "integrated-timers")] | ||
| 110 | pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation { | ||
| 111 | self.header().state.timer_enqueue() | ||
| 112 | } | ||
| 113 | |||
| 114 | /// Unmark the task as timer-queued. | ||
| 115 | /// | ||
| 116 | /// Safety: | ||
| 117 | /// | ||
| 118 | /// This functions should only be called by the timer queue implementation, after the task has | ||
| 119 | /// been removed from the timer queue. | ||
| 120 | #[cfg(feature = "integrated-timers")] | ||
| 121 | pub unsafe fn timer_dequeue(&self) { | ||
| 122 | self.header().state.timer_dequeue() | ||
| 123 | } | ||
| 124 | |||
| 91 | /// The returned pointer is valid for the entire TaskStorage. | 125 | /// The returned pointer is valid for the entire TaskStorage. |
| 92 | pub(crate) fn as_ptr(self) -> *const TaskHeader { | 126 | pub(crate) fn as_ptr(self) -> *const TaskHeader { |
| 93 | self.ptr.as_ptr() | 127 | self.ptr.as_ptr() |
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index e4127897e..d03c61ade 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -1,9 +1,15 @@ | |||
| 1 | use core::sync::atomic::{AtomicU32, Ordering}; | 1 | use core::sync::atomic::{AtomicU32, Ordering}; |
| 2 | 2 | ||
| 3 | #[cfg(feature = "integrated-timers")] | ||
| 4 | use super::timer_queue::TimerEnqueueOperation; | ||
| 5 | |||
| 3 | /// Task is spawned (has a future) | 6 | /// Task is spawned (has a future) |
| 4 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 7 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 5 | /// Task is in the executor run queue | 8 | /// Task is in the executor run queue |
| 6 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | 9 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; |
| 10 | /// Task is in the executor timer queue | ||
| 11 | #[cfg(feature = "integrated-timers")] | ||
| 12 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | ||
| 7 | 13 | ||
| 8 | pub(crate) struct State { | 14 | pub(crate) struct State { |
| 9 | state: AtomicU32, | 15 | state: AtomicU32, |
| @@ -52,4 +58,34 @@ impl State { | |||
| 52 | let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | 58 | let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |
| 53 | state & STATE_SPAWNED != 0 | 59 | state & STATE_SPAWNED != 0 |
| 54 | } | 60 | } |
| 61 | |||
| 62 | /// Mark the task as timer-queued. Return whether it can be enqueued. | ||
| 63 | #[cfg(feature = "integrated-timers")] | ||
| 64 | #[inline(always)] | ||
| 65 | pub fn timer_enqueue(&self) -> TimerEnqueueOperation { | ||
| 66 | if self | ||
| 67 | .state | ||
| 68 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 69 | // If not started, ignore it | ||
| 70 | if state & STATE_SPAWNED == 0 { | ||
| 71 | None | ||
| 72 | } else { | ||
| 73 | // Mark it as enqueued | ||
| 74 | Some(state | STATE_TIMER_QUEUED) | ||
| 75 | } | ||
| 76 | }) | ||
| 77 | .is_ok() | ||
| 78 | { | ||
| 79 | TimerEnqueueOperation::Enqueue | ||
| 80 | } else { | ||
| 81 | TimerEnqueueOperation::Ignore | ||
| 82 | } | ||
| 83 | } | ||
| 84 | |||
| 85 | /// Unmark the task as timer-queued. | ||
| 86 | #[cfg(feature = "integrated-timers")] | ||
| 87 | #[inline(always)] | ||
| 88 | pub fn timer_dequeue(&self) { | ||
| 89 | self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed); | ||
| 90 | } | ||
| 55 | } | 91 | } |
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index b673c7359..f6f2e8f08 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs | |||
| @@ -1,9 +1,14 @@ | |||
| 1 | use core::arch::asm; | 1 | use core::arch::asm; |
| 2 | use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; | 2 | use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; |
| 3 | 3 | ||
| 4 | #[cfg(feature = "integrated-timers")] | ||
| 5 | use super::timer_queue::TimerEnqueueOperation; | ||
| 6 | |||
| 4 | // Must be kept in sync with the layout of `State`! | 7 | // Must be kept in sync with the layout of `State`! |
| 5 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 8 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 6 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; | 9 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; |
| 10 | #[cfg(feature = "integrated-timers")] | ||
| 11 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16; | ||
| 7 | 12 | ||
| 8 | #[repr(C, align(4))] | 13 | #[repr(C, align(4))] |
| 9 | pub(crate) struct State { | 14 | pub(crate) struct State { |
| @@ -11,8 +16,9 @@ pub(crate) struct State { | |||
| 11 | spawned: AtomicBool, | 16 | spawned: AtomicBool, |
| 12 | /// Task is in the executor run queue | 17 | /// Task is in the executor run queue |
| 13 | run_queued: AtomicBool, | 18 | run_queued: AtomicBool, |
| 19 | /// Task is in the executor timer queue | ||
| 20 | timer_queued: AtomicBool, | ||
| 14 | pad: AtomicBool, | 21 | pad: AtomicBool, |
| 15 | pad2: AtomicBool, | ||
| 16 | } | 22 | } |
| 17 | 23 | ||
| 18 | impl State { | 24 | impl State { |
| @@ -20,8 +26,8 @@ impl State { | |||
| 20 | Self { | 26 | Self { |
| 21 | spawned: AtomicBool::new(false), | 27 | spawned: AtomicBool::new(false), |
| 22 | run_queued: AtomicBool::new(false), | 28 | run_queued: AtomicBool::new(false), |
| 29 | timer_queued: AtomicBool::new(false), | ||
| 23 | pad: AtomicBool::new(false), | 30 | pad: AtomicBool::new(false), |
| 24 | pad2: AtomicBool::new(false), | ||
| 25 | } | 31 | } |
| 26 | } | 32 | } |
| 27 | 33 | ||
| @@ -85,4 +91,34 @@ impl State { | |||
| 85 | self.run_queued.store(false, Ordering::Relaxed); | 91 | self.run_queued.store(false, Ordering::Relaxed); |
| 86 | r | 92 | r |
| 87 | } | 93 | } |
| 94 | |||
| 95 | /// Mark the task as timer-queued. Return whether it can be enqueued. | ||
| 96 | #[cfg(feature = "integrated-timers")] | ||
| 97 | #[inline(always)] | ||
| 98 | pub fn timer_enqueue(&self) -> TimerEnqueueOperation { | ||
| 99 | if self | ||
| 100 | .as_u32() | ||
| 101 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 102 | // If not started, ignore it | ||
| 103 | if state & STATE_SPAWNED == 0 { | ||
| 104 | None | ||
| 105 | } else { | ||
| 106 | // Mark it as enqueued | ||
| 107 | Some(state | STATE_TIMER_QUEUED) | ||
| 108 | } | ||
| 109 | }) | ||
| 110 | .is_ok() | ||
| 111 | { | ||
| 112 | TimerEnqueueOperation::Enqueue | ||
| 113 | } else { | ||
| 114 | TimerEnqueueOperation::Ignore | ||
| 115 | } | ||
| 116 | } | ||
| 117 | |||
| 118 | /// Unmark the task as timer-queued. | ||
| 119 | #[cfg(feature = "integrated-timers")] | ||
| 120 | #[inline(always)] | ||
| 121 | pub fn timer_dequeue(&self) { | ||
| 122 | self.timer_queued.store(false, Ordering::Relaxed); | ||
| 123 | } | ||
| 88 | } | 124 | } |
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index b92eed006..c0ec2f530 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs | |||
| @@ -2,10 +2,16 @@ use core::cell::Cell; | |||
| 2 | 2 | ||
| 3 | use critical_section::Mutex; | 3 | use critical_section::Mutex; |
| 4 | 4 | ||
| 5 | #[cfg(feature = "integrated-timers")] | ||
| 6 | use super::timer_queue::TimerEnqueueOperation; | ||
| 7 | |||
| 5 | /// Task is spawned (has a future) | 8 | /// Task is spawned (has a future) |
| 6 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; | 9 | pub(crate) const STATE_SPAWNED: u32 = 1 << 0; |
| 7 | /// Task is in the executor run queue | 10 | /// Task is in the executor run queue |
| 8 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | 11 | pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; |
| 12 | /// Task is in the executor timer queue | ||
| 13 | #[cfg(feature = "integrated-timers")] | ||
| 14 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | ||
| 9 | 15 | ||
| 10 | pub(crate) struct State { | 16 | pub(crate) struct State { |
| 11 | state: Mutex<Cell<u32>>, | 17 | state: Mutex<Cell<u32>>, |
| @@ -69,4 +75,27 @@ impl State { | |||
| 69 | ok | 75 | ok |
| 70 | }) | 76 | }) |
| 71 | } | 77 | } |
| 78 | |||
| 79 | /// Mark the task as timer-queued. Return whether it can be enqueued. | ||
| 80 | #[cfg(feature = "integrated-timers")] | ||
| 81 | #[inline(always)] | ||
| 82 | pub fn timer_enqueue(&self) -> TimerEnqueueOperation { | ||
| 83 | self.update(|s| { | ||
| 84 | // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is | ||
| 85 | // just being spawned, because its executor pointer may still be changing. | ||
| 86 | if *s & STATE_SPAWNED == STATE_SPAWNED { | ||
| 87 | *s |= STATE_TIMER_QUEUED; | ||
| 88 | TimerEnqueueOperation::Enqueue | ||
| 89 | } else { | ||
| 90 | TimerEnqueueOperation::Ignore | ||
| 91 | } | ||
| 92 | }) | ||
| 93 | } | ||
| 94 | |||
| 95 | /// Unmark the task as timer-queued. | ||
| 96 | #[cfg(feature = "integrated-timers")] | ||
| 97 | #[inline(always)] | ||
| 98 | pub fn timer_dequeue(&self) { | ||
| 99 | self.update(|s| *s &= !STATE_TIMER_QUEUED); | ||
| 100 | } | ||
| 72 | } | 101 | } |
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 46e346c1b..c36708401 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs | |||
| @@ -7,6 +7,9 @@ use super::TaskRef; | |||
| 7 | /// An item in the timer queue. | 7 | /// An item in the timer queue. |
| 8 | pub struct TimerQueueItem { | 8 | pub struct TimerQueueItem { |
| 9 | /// The next item in the queue. | 9 | /// The next item in the queue. |
| 10 | /// | ||
| 11 | /// If this field contains `Some`, the item is in the queue. The last item in the queue has a | ||
| 12 | /// value of `Some(dangling_pointer)` | ||
| 10 | pub next: Cell<Option<TaskRef>>, | 13 | pub next: Cell<Option<TaskRef>>, |
| 11 | 14 | ||
| 12 | /// The time at which this item expires. | 15 | /// The time at which this item expires. |
| @@ -19,7 +22,17 @@ impl TimerQueueItem { | |||
| 19 | pub(crate) const fn new() -> Self { | 22 | pub(crate) const fn new() -> Self { |
| 20 | Self { | 23 | Self { |
| 21 | next: Cell::new(None), | 24 | next: Cell::new(None), |
| 22 | expires_at: Cell::new(0), | 25 | expires_at: Cell::new(u64::MAX), |
| 23 | } | 26 | } |
| 24 | } | 27 | } |
| 25 | } | 28 | } |
| 29 | |||
| 30 | /// The operation to perform after `timer_enqueue` is called. | ||
| 31 | #[derive(Debug, Copy, Clone, PartialEq)] | ||
| 32 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 33 | pub enum TimerEnqueueOperation { | ||
| 34 | /// Enqueue the task. | ||
| 35 | Enqueue, | ||
| 36 | /// Update the task's expiration time. | ||
| 37 | Ignore, | ||
| 38 | } | ||
