aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-executor/src/raw/mod.rs36
-rw-r--r--embassy-executor/src/raw/state_atomics.rs36
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs40
-rw-r--r--embassy-executor/src/raw/state_critical_section.rs29
-rw-r--r--embassy-executor/src/raw/timer_queue.rs15
-rw-r--r--embassy-time-queue-driver/src/lib.rs14
-rw-r--r--embassy-time-queue-driver/src/queue_integrated.rs20
7 files changed, 181 insertions, 9 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index f9c6509f1..14d689900 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -50,7 +50,7 @@ pub(crate) struct TaskHeader {
50} 50}
51 51
52/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. 52/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
53#[derive(Clone, Copy)] 53#[derive(Clone, Copy, PartialEq)]
54pub struct TaskRef { 54pub struct TaskRef {
55 ptr: NonNull<TaskHeader>, 55 ptr: NonNull<TaskHeader>,
56} 56}
@@ -72,6 +72,16 @@ impl TaskRef {
72 } 72 }
73 } 73 }
74 74
75 /// # Safety
76 ///
77 /// The result of this function must only be compared
78 /// for equality, or stored, but not used.
79 pub const unsafe fn dangling() -> Self {
80 Self {
81 ptr: NonNull::dangling(),
82 }
83 }
84
75 pub(crate) fn header(self) -> &'static TaskHeader { 85 pub(crate) fn header(self) -> &'static TaskHeader {
76 unsafe { self.ptr.as_ref() } 86 unsafe { self.ptr.as_ref() }
77 } 87 }
@@ -88,6 +98,30 @@ impl TaskRef {
88 &self.header().timer_queue_item 98 &self.header().timer_queue_item
89 } 99 }
90 100
101 /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
102 ///
103 /// Entering this state prevents the task from being respawned while in a timer queue.
104 ///
105 /// Safety:
106 ///
107 /// This functions should only be called by the timer queue implementation, before
108 /// enqueueing the timer item.
109 #[cfg(feature = "integrated-timers")]
110 pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation {
111 self.header().state.timer_enqueue()
112 }
113
114 /// Unmark the task as timer-queued.
115 ///
116 /// Safety:
117 ///
118 /// This functions should only be called by the timer queue implementation, after the task has
119 /// been removed from the timer queue.
120 #[cfg(feature = "integrated-timers")]
121 pub unsafe fn timer_dequeue(&self) {
122 self.header().state.timer_dequeue()
123 }
124
91 /// The returned pointer is valid for the entire TaskStorage. 125 /// The returned pointer is valid for the entire TaskStorage.
92 pub(crate) fn as_ptr(self) -> *const TaskHeader { 126 pub(crate) fn as_ptr(self) -> *const TaskHeader {
93 self.ptr.as_ptr() 127 self.ptr.as_ptr()
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index e4127897e..d03c61ade 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -1,9 +1,15 @@
1use core::sync::atomic::{AtomicU32, Ordering}; 1use core::sync::atomic::{AtomicU32, Ordering};
2 2
3#[cfg(feature = "integrated-timers")]
4use super::timer_queue::TimerEnqueueOperation;
5
3/// Task is spawned (has a future) 6/// Task is spawned (has a future)
4pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 7pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
5/// Task is in the executor run queue 8/// Task is in the executor run queue
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 9pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
10/// Task is in the executor timer queue
11#[cfg(feature = "integrated-timers")]
12pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
7 13
8pub(crate) struct State { 14pub(crate) struct State {
9 state: AtomicU32, 15 state: AtomicU32,
@@ -52,4 +58,34 @@ impl State {
52 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 58 let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
53 state & STATE_SPAWNED != 0 59 state & STATE_SPAWNED != 0
54 } 60 }
61
62 /// Mark the task as timer-queued. Return whether it can be enqueued.
63 #[cfg(feature = "integrated-timers")]
64 #[inline(always)]
65 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
66 if self
67 .state
68 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
69 // If not started, ignore it
70 if state & STATE_SPAWNED == 0 {
71 None
72 } else {
73 // Mark it as enqueued
74 Some(state | STATE_TIMER_QUEUED)
75 }
76 })
77 .is_ok()
78 {
79 TimerEnqueueOperation::Enqueue
80 } else {
81 TimerEnqueueOperation::Ignore
82 }
83 }
84
85 /// Unmark the task as timer-queued.
86 #[cfg(feature = "integrated-timers")]
87 #[inline(always)]
88 pub fn timer_dequeue(&self) {
89 self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed);
90 }
55} 91}
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index b673c7359..f6f2e8f08 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -1,9 +1,14 @@
1use core::arch::asm; 1use core::arch::asm;
2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; 2use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 3
4#[cfg(feature = "integrated-timers")]
5use super::timer_queue::TimerEnqueueOperation;
6
4// Must be kept in sync with the layout of `State`! 7// Must be kept in sync with the layout of `State`!
5pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 8pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
6pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8; 9pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
10#[cfg(feature = "integrated-timers")]
11pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16;
7 12
8#[repr(C, align(4))] 13#[repr(C, align(4))]
9pub(crate) struct State { 14pub(crate) struct State {
@@ -11,8 +16,9 @@ pub(crate) struct State {
11 spawned: AtomicBool, 16 spawned: AtomicBool,
12 /// Task is in the executor run queue 17 /// Task is in the executor run queue
13 run_queued: AtomicBool, 18 run_queued: AtomicBool,
19 /// Task is in the executor timer queue
20 timer_queued: AtomicBool,
14 pad: AtomicBool, 21 pad: AtomicBool,
15 pad2: AtomicBool,
16} 22}
17 23
18impl State { 24impl State {
@@ -20,8 +26,8 @@ impl State {
20 Self { 26 Self {
21 spawned: AtomicBool::new(false), 27 spawned: AtomicBool::new(false),
22 run_queued: AtomicBool::new(false), 28 run_queued: AtomicBool::new(false),
29 timer_queued: AtomicBool::new(false),
23 pad: AtomicBool::new(false), 30 pad: AtomicBool::new(false),
24 pad2: AtomicBool::new(false),
25 } 31 }
26 } 32 }
27 33
@@ -85,4 +91,34 @@ impl State {
85 self.run_queued.store(false, Ordering::Relaxed); 91 self.run_queued.store(false, Ordering::Relaxed);
86 r 92 r
87 } 93 }
94
95 /// Mark the task as timer-queued. Return whether it can be enqueued.
96 #[cfg(feature = "integrated-timers")]
97 #[inline(always)]
98 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
99 if self
100 .as_u32()
101 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
102 // If not started, ignore it
103 if state & STATE_SPAWNED == 0 {
104 None
105 } else {
106 // Mark it as enqueued
107 Some(state | STATE_TIMER_QUEUED)
108 }
109 })
110 .is_ok()
111 {
112 TimerEnqueueOperation::Enqueue
113 } else {
114 TimerEnqueueOperation::Ignore
115 }
116 }
117
118 /// Unmark the task as timer-queued.
119 #[cfg(feature = "integrated-timers")]
120 #[inline(always)]
121 pub fn timer_dequeue(&self) {
122 self.timer_queued.store(false, Ordering::Relaxed);
123 }
88} 124}
diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs
index b92eed006..c0ec2f530 100644
--- a/embassy-executor/src/raw/state_critical_section.rs
+++ b/embassy-executor/src/raw/state_critical_section.rs
@@ -2,10 +2,16 @@ use core::cell::Cell;
2 2
3use critical_section::Mutex; 3use critical_section::Mutex;
4 4
5#[cfg(feature = "integrated-timers")]
6use super::timer_queue::TimerEnqueueOperation;
7
5/// Task is spawned (has a future) 8/// Task is spawned (has a future)
6pub(crate) const STATE_SPAWNED: u32 = 1 << 0; 9pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
7/// Task is in the executor run queue 10/// Task is in the executor run queue
8pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; 11pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
12/// Task is in the executor timer queue
13#[cfg(feature = "integrated-timers")]
14pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
9 15
10pub(crate) struct State { 16pub(crate) struct State {
11 state: Mutex<Cell<u32>>, 17 state: Mutex<Cell<u32>>,
@@ -69,4 +75,27 @@ impl State {
69 ok 75 ok
70 }) 76 })
71 } 77 }
78
79 /// Mark the task as timer-queued. Return whether it can be enqueued.
80 #[cfg(feature = "integrated-timers")]
81 #[inline(always)]
82 pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
83 self.update(|s| {
84 // FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is
85 // just being spawned, because its executor pointer may still be changing.
86 if *s & STATE_SPAWNED == STATE_SPAWNED {
87 *s |= STATE_TIMER_QUEUED;
88 TimerEnqueueOperation::Enqueue
89 } else {
90 TimerEnqueueOperation::Ignore
91 }
92 })
93 }
94
95 /// Unmark the task as timer-queued.
96 #[cfg(feature = "integrated-timers")]
97 #[inline(always)]
98 pub fn timer_dequeue(&self) {
99 self.update(|s| *s &= !STATE_TIMER_QUEUED);
100 }
72} 101}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 46e346c1b..c36708401 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -7,6 +7,9 @@ use super::TaskRef;
7/// An item in the timer queue. 7/// An item in the timer queue.
8pub struct TimerQueueItem { 8pub struct TimerQueueItem {
9 /// The next item in the queue. 9 /// The next item in the queue.
10 ///
11 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
12 /// value of `Some(dangling_pointer)`
10 pub next: Cell<Option<TaskRef>>, 13 pub next: Cell<Option<TaskRef>>,
11 14
12 /// The time at which this item expires. 15 /// The time at which this item expires.
@@ -19,7 +22,17 @@ impl TimerQueueItem {
19 pub(crate) const fn new() -> Self { 22 pub(crate) const fn new() -> Self {
20 Self { 23 Self {
21 next: Cell::new(None), 24 next: Cell::new(None),
22 expires_at: Cell::new(0), 25 expires_at: Cell::new(u64::MAX),
23 } 26 }
24 } 27 }
25} 28}
29
30/// The operation to perform after `timer_enqueue` is called.
31#[derive(Debug, Copy, Clone, PartialEq)]
32#[cfg_attr(feature = "defmt", derive(defmt::Format))]
33pub enum TimerEnqueueOperation {
34 /// Enqueue the task.
35 Enqueue,
36 /// Update the task's expiration time.
37 Ignore,
38}
diff --git a/embassy-time-queue-driver/src/lib.rs b/embassy-time-queue-driver/src/lib.rs
index 0c78921ed..2d5fd449a 100644
--- a/embassy-time-queue-driver/src/lib.rs
+++ b/embassy-time-queue-driver/src/lib.rs
@@ -73,6 +73,20 @@ extern "Rust" {
73 73
74/// Schedule the given waker to be woken at `at`. 74/// Schedule the given waker to be woken at `at`.
75pub fn schedule_wake(at: u64, waker: &Waker) { 75pub fn schedule_wake(at: u64, waker: &Waker) {
76 #[cfg(feature = "integrated-timers")]
77 {
78 use embassy_executor::raw::task_from_waker;
79 use embassy_executor::raw::timer_queue::TimerEnqueueOperation;
80 // The very first thing we must do, before we even access the timer queue, is to
81 // mark the task a TIMER_QUEUED. This ensures that the task that is being scheduled
82 // can not be respawn while we are accessing the timer queue.
83 let task = task_from_waker(waker);
84 if unsafe { task.timer_enqueue() } == TimerEnqueueOperation::Ignore {
85 // We are not allowed to enqueue the task in the timer queue. This is because the
86 // task is not spawned, and so it makes no sense to schedule it.
87 return;
88 }
89 }
76 unsafe { _embassy_time_schedule_wake(at, waker) } 90 unsafe { _embassy_time_schedule_wake(at, waker) }
77} 91}
78 92
diff --git a/embassy-time-queue-driver/src/queue_integrated.rs b/embassy-time-queue-driver/src/queue_integrated.rs
index cb0f79356..b905c00c3 100644
--- a/embassy-time-queue-driver/src/queue_integrated.rs
+++ b/embassy-time-queue-driver/src/queue_integrated.rs
@@ -24,16 +24,21 @@ impl TimerQueue {
24 if item.next.get().is_none() { 24 if item.next.get().is_none() {
25 // If not in the queue, add it and update. 25 // If not in the queue, add it and update.
26 let prev = self.head.replace(Some(p)); 26 let prev = self.head.replace(Some(p));
27 item.next.set(prev); 27 item.next.set(if prev.is_none() {
28 Some(unsafe { TaskRef::dangling() })
29 } else {
30 prev
31 });
32 item.expires_at.set(at);
33 true
28 } else if at <= item.expires_at.get() { 34 } else if at <= item.expires_at.get() {
29 // If expiration is sooner than previously set, update. 35 // If expiration is sooner than previously set, update.
36 item.expires_at.set(at);
37 true
30 } else { 38 } else {
31 // Task does not need to be updated. 39 // Task does not need to be updated.
32 return false; 40 false
33 } 41 }
34
35 item.expires_at.set(at);
36 true
37 } 42 }
38 43
39 /// Dequeues expired timers and returns the next alarm time. 44 /// Dequeues expired timers and returns the next alarm time.
@@ -64,6 +69,10 @@ impl TimerQueue {
64 fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { 69 fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
65 let mut prev = &self.head; 70 let mut prev = &self.head;
66 while let Some(p) = prev.get() { 71 while let Some(p) = prev.get() {
72 if unsafe { p == TaskRef::dangling() } {
73 // prev was the last item, stop
74 break;
75 }
67 let item = p.timer_queue_item(); 76 let item = p.timer_queue_item();
68 if f(p) { 77 if f(p) {
69 // Skip to next 78 // Skip to next
@@ -72,6 +81,7 @@ impl TimerQueue {
72 // Remove it 81 // Remove it
73 prev.set(item.next.get()); 82 prev.set(item.next.get());
74 item.next.set(None); 83 item.next.set(None);
84 unsafe { p.timer_dequeue() };
75 } 85 }
76 } 86 }
77 } 87 }