aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDániel Buga <[email protected]>2024-12-16 15:56:55 +0100
committerDániel Buga <[email protected]>2024-12-16 16:01:08 +0100
commitb44ef5ccb40d6b778e623e6e68a234c2e0615d25 (patch)
tree0baaa602127b047c329e351517f0fb76ec452e15
parent47e96beff4fc4c8dff8bf6d6a67e1d2e81c40495 (diff)
Fix racy access of TaskHeader::executor
-rw-r--r--embassy-executor/src/raw/mod.rs70
-rw-r--r--embassy-executor/src/raw/state_atomics.rs5
-rw-r--r--embassy-executor/src/raw/state_atomics_arm.rs5
-rw-r--r--embassy-executor/src/spawner.rs8
4 files changed, 75 insertions, 13 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index bcbd214a9..808a78389 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -14,7 +14,58 @@ mod run_queue;
14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")] 14#[cfg_attr(all(cortex_m, target_has_atomic = "8"), path = "state_atomics_arm.rs")]
15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")] 15#[cfg_attr(all(not(cortex_m), target_has_atomic = "8"), path = "state_atomics.rs")]
16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] 16#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
17mod state; 17pub(crate) mod state;
18
19#[cfg(target_has_atomic = "ptr")]
20mod owner {
21 use core::sync::atomic::{AtomicPtr, Ordering};
22
23 use super::{state::Token, SyncExecutor};
24
25 pub(crate) struct ExecutorRef(AtomicPtr<SyncExecutor>);
26
27 impl ExecutorRef {
28 pub const fn new() -> Self {
29 Self(AtomicPtr::new(core::ptr::null_mut()))
30 }
31
32 pub fn set(&self, executor: Option<&'static SyncExecutor>, _: Token) {
33 let ptr = executor.map(|e| e as *const SyncExecutor).unwrap_or(core::ptr::null());
34 self.0.store(ptr.cast_mut(), Ordering::Release);
35 }
36
37 pub fn get(&self, _: Token) -> *const SyncExecutor {
38 self.0.load(Ordering::Acquire).cast_const()
39 }
40 }
41}
42#[cfg(not(target_has_atomic = "ptr"))]
43mod owner {
44 use super::{state::Token, SyncExecutor};
45 use core::cell::Cell;
46
47 use critical_section::Mutex;
48
49 pub(crate) struct ExecutorRef(Mutex<Cell<*const SyncExecutor>>);
50
51 unsafe impl Send for ExecutorRef {}
52 unsafe impl Sync for ExecutorRef {}
53
54 impl ExecutorRef {
55 pub const fn new() -> Self {
56 Self(Mutex::new(Cell::new(core::ptr::null())))
57 }
58
59 pub fn set(&self, executor: Option<&'static SyncExecutor>, cs: Token) {
60 let ptr = executor.map(|e| e as *const SyncExecutor).unwrap_or(core::ptr::null());
61 self.0.borrow(cs).set(ptr);
62 }
63
64 pub fn get(&self, cs: Token) -> *const SyncExecutor {
65 self.0.borrow(cs).get()
66 }
67 }
68}
18 69
19pub mod timer_queue; 70pub mod timer_queue;
20#[cfg(feature = "trace")] 71#[cfg(feature = "trace")]
@@ -30,6 +81,8 @@ use core::pin::Pin;
30use core::ptr::NonNull; 81use core::ptr::NonNull;
31use core::task::{Context, Poll}; 82use core::task::{Context, Poll};
32 83
84use crate::raw::owner::ExecutorRef;
85
33use self::run_queue::{RunQueue, RunQueueItem}; 86use self::run_queue::{RunQueue, RunQueueItem};
34use self::state::State; 87use self::state::State;
35use self::util::{SyncUnsafeCell, UninitCell}; 88use self::util::{SyncUnsafeCell, UninitCell};
@@ -40,7 +93,7 @@ use super::SpawnToken;
40pub(crate) struct TaskHeader { 93pub(crate) struct TaskHeader {
41 pub(crate) state: State, 94 pub(crate) state: State,
42 pub(crate) run_queue_item: RunQueueItem, 95 pub(crate) run_queue_item: RunQueueItem,
43 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, 96 pub(crate) executor: ExecutorRef,
44 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, 97 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
45 98
46 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. 99 /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
@@ -86,7 +139,8 @@ impl TaskRef {
86 139
87 /// Returns a reference to the executor that the task is currently running on. 140 /// Returns a reference to the executor that the task is currently running on.
88 pub unsafe fn executor(self) -> Option<&'static Executor> { 141 pub unsafe fn executor(self) -> Option<&'static Executor> {
89 self.header().executor.get().map(|e| Executor::wrap(e)) 142 let executor = state::locked(|token| self.header().executor.get(token));
143 executor.as_ref().map(|e| Executor::wrap(e))
90 } 144 }
91 145
92 /// Returns a reference to the timer queue item. 146 /// Returns a reference to the timer queue item.
@@ -153,7 +207,7 @@ impl<F: Future + 'static> TaskStorage<F> {
153 raw: TaskHeader { 207 raw: TaskHeader {
154 state: State::new(), 208 state: State::new(),
155 run_queue_item: RunQueueItem::new(), 209 run_queue_item: RunQueueItem::new(),
156 executor: SyncUnsafeCell::new(None), 210 executor: ExecutorRef::new(),
157 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` 211 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
158 poll_fn: SyncUnsafeCell::new(None), 212 poll_fn: SyncUnsafeCell::new(None),
159 213
@@ -396,7 +450,9 @@ impl SyncExecutor {
396 } 450 }
397 451
398 pub(super) unsafe fn spawn(&'static self, task: TaskRef) { 452 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
399 task.header().executor.set(Some(self)); 453 state::locked(|l| {
454 task.header().executor.set(Some(self), l);
455 });
400 456
401 #[cfg(feature = "trace")] 457 #[cfg(feature = "trace")]
402 trace::task_new(self, &task); 458 trace::task_new(self, &task);
@@ -549,7 +605,7 @@ pub fn wake_task(task: TaskRef) {
549 header.state.run_enqueue(|l| { 605 header.state.run_enqueue(|l| {
550 // We have just marked the task as scheduled, so enqueue it. 606 // We have just marked the task as scheduled, so enqueue it.
551 unsafe { 607 unsafe {
552 let executor = header.executor.get().unwrap_unchecked(); 608 let executor = header.executor.get(l).as_ref().unwrap_unchecked();
553 executor.enqueue(task, l); 609 executor.enqueue(task, l);
554 } 610 }
555 }); 611 });
@@ -563,7 +619,7 @@ pub fn wake_task_no_pend(task: TaskRef) {
563 header.state.run_enqueue(|l| { 619 header.state.run_enqueue(|l| {
564 // We have just marked the task as scheduled, so enqueue it. 620 // We have just marked the task as scheduled, so enqueue it.
565 unsafe { 621 unsafe {
566 let executor = header.executor.get().unwrap_unchecked(); 622 let executor = header.executor.get(l).as_ref().unwrap_unchecked();
567 executor.run_queue.enqueue(task, l); 623 executor.run_queue.enqueue(task, l);
568 } 624 }
569 }); 625 });
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs
index abfe94486..d7350464f 100644
--- a/embassy-executor/src/raw/state_atomics.rs
+++ b/embassy-executor/src/raw/state_atomics.rs
@@ -2,13 +2,14 @@ use core::sync::atomic::{AtomicU32, Ordering};
2 2
3use super::timer_queue::TimerEnqueueOperation; 3use super::timer_queue::TimerEnqueueOperation;
4 4
5#[derive(Clone, Copy)]
5pub(crate) struct Token(()); 6pub(crate) struct Token(());
6 7
7/// Creates a token and passes it to the closure. 8/// Creates a token and passes it to the closure.
8/// 9///
9/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. 10/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
10pub(crate) fn locked(f: impl FnOnce(Token)) { 11pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
11 f(Token(())); 12 f(Token(()))
12} 13}
13 14
14/// Task is spawned (has a future) 15/// Task is spawned (has a future)
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs
index f0f014652..c1e8f69ab 100644
--- a/embassy-executor/src/raw/state_atomics_arm.rs
+++ b/embassy-executor/src/raw/state_atomics_arm.rs
@@ -3,13 +3,14 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
3 3
4use super::timer_queue::TimerEnqueueOperation; 4use super::timer_queue::TimerEnqueueOperation;
5 5
6#[derive(Clone, Copy)]
6pub(crate) struct Token(()); 7pub(crate) struct Token(());
7 8
8/// Creates a token and passes it to the closure. 9/// Creates a token and passes it to the closure.
9/// 10///
10/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. 11/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
11pub(crate) fn locked(f: impl FnOnce(Token)) { 12pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
12 f(Token(())); 13 f(Token(()))
13} 14}
14 15
15// Must be kept in sync with the layout of `State`! 16// Must be kept in sync with the layout of `State`!
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 271606244..bc243bee7 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -92,7 +92,9 @@ impl Spawner {
92 pub async fn for_current_executor() -> Self { 92 pub async fn for_current_executor() -> Self {
93 poll_fn(|cx| { 93 poll_fn(|cx| {
94 let task = raw::task_from_waker(cx.waker()); 94 let task = raw::task_from_waker(cx.waker());
95 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 95 let executor = raw::state::locked(|l| {
96 unsafe { task.header().executor.get(l).as_ref().unwrap_unchecked() }
97 });
96 let executor = unsafe { raw::Executor::wrap(executor) }; 98 let executor = unsafe { raw::Executor::wrap(executor) };
97 Poll::Ready(Self::new(executor)) 99 Poll::Ready(Self::new(executor))
98 }) 100 })
@@ -164,7 +166,9 @@ impl SendSpawner {
164 pub async fn for_current_executor() -> Self { 166 pub async fn for_current_executor() -> Self {
165 poll_fn(|cx| { 167 poll_fn(|cx| {
166 let task = raw::task_from_waker(cx.waker()); 168 let task = raw::task_from_waker(cx.waker());
167 let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; 169 let executor = raw::state::locked(|l| {
170 unsafe { task.header().executor.get(l).as_ref().unwrap_unchecked() }
171 });
168 Poll::Ready(Self::new(executor)) 172 Poll::Ready(Self::new(executor))
169 }) 173 })
170 .await 174 .await