diff options
| author | Dario Nieuwenhuis <[email protected]> | 2024-12-16 16:52:21 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-12-16 16:52:21 +0000 |
| commit | d3f0294fb12e060c4a3ba557ff95766d1c3686e0 (patch) | |
| tree | 7709019cf01d0b2d341ad4ee760dd09c57f0c74d | |
| parent | 50d67083b899f4398ded93966df350935e12f343 (diff) | |
| parent | b47a631abf0c200c3b29b8e4ec199421835a0525 (diff) | |
Merge pull request #3655 from bugadani/header-executor
Fix racy access of TaskHeader::executor
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 16 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics.rs | 5 | ||||
| -rw-r--r-- | embassy-executor/src/raw/state_atomics_arm.rs | 5 | ||||
| -rw-r--r-- | embassy-executor/src/spawner.rs | 17 |
4 files changed, 31 insertions, 12 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index bcbd214a9..5a476213b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -28,6 +28,7 @@ use core::marker::PhantomData; | |||
| 28 | use core::mem; | 28 | use core::mem; |
| 29 | use core::pin::Pin; | 29 | use core::pin::Pin; |
| 30 | use core::ptr::NonNull; | 30 | use core::ptr::NonNull; |
| 31 | use core::sync::atomic::{AtomicPtr, Ordering}; | ||
| 31 | use core::task::{Context, Poll}; | 32 | use core::task::{Context, Poll}; |
| 32 | 33 | ||
| 33 | use self::run_queue::{RunQueue, RunQueueItem}; | 34 | use self::run_queue::{RunQueue, RunQueueItem}; |
| @@ -40,7 +41,7 @@ use super::SpawnToken; | |||
| 40 | pub(crate) struct TaskHeader { | 41 | pub(crate) struct TaskHeader { |
| 41 | pub(crate) state: State, | 42 | pub(crate) state: State, |
| 42 | pub(crate) run_queue_item: RunQueueItem, | 43 | pub(crate) run_queue_item: RunQueueItem, |
| 43 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, | 44 | pub(crate) executor: AtomicPtr<SyncExecutor>, |
| 44 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, | 45 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 45 | 46 | ||
| 46 | /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. | 47 | /// Integrated timer queue storage. This field should not be accessed outside of the timer queue. |
| @@ -86,7 +87,8 @@ impl TaskRef { | |||
| 86 | 87 | ||
| 87 | /// Returns a reference to the executor that the task is currently running on. | 88 | /// Returns a reference to the executor that the task is currently running on. |
| 88 | pub unsafe fn executor(self) -> Option<&'static Executor> { | 89 | pub unsafe fn executor(self) -> Option<&'static Executor> { |
| 89 | self.header().executor.get().map(|e| Executor::wrap(e)) | 90 | let executor = self.header().executor.load(Ordering::Relaxed); |
| 91 | executor.as_ref().map(|e| Executor::wrap(e)) | ||
| 90 | } | 92 | } |
| 91 | 93 | ||
| 92 | /// Returns a reference to the timer queue item. | 94 | /// Returns a reference to the timer queue item. |
| @@ -153,7 +155,7 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 153 | raw: TaskHeader { | 155 | raw: TaskHeader { |
| 154 | state: State::new(), | 156 | state: State::new(), |
| 155 | run_queue_item: RunQueueItem::new(), | 157 | run_queue_item: RunQueueItem::new(), |
| 156 | executor: SyncUnsafeCell::new(None), | 158 | executor: AtomicPtr::new(core::ptr::null_mut()), |
| 157 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | 159 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` |
| 158 | poll_fn: SyncUnsafeCell::new(None), | 160 | poll_fn: SyncUnsafeCell::new(None), |
| 159 | 161 | ||
| @@ -396,7 +398,9 @@ impl SyncExecutor { | |||
| 396 | } | 398 | } |
| 397 | 399 | ||
| 398 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | 400 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 399 | task.header().executor.set(Some(self)); | 401 | task.header() |
| 402 | .executor | ||
| 403 | .store((self as *const Self).cast_mut(), Ordering::Relaxed); | ||
| 400 | 404 | ||
| 401 | #[cfg(feature = "trace")] | 405 | #[cfg(feature = "trace")] |
| 402 | trace::task_new(self, &task); | 406 | trace::task_new(self, &task); |
| @@ -549,7 +553,7 @@ pub fn wake_task(task: TaskRef) { | |||
| 549 | header.state.run_enqueue(|l| { | 553 | header.state.run_enqueue(|l| { |
| 550 | // We have just marked the task as scheduled, so enqueue it. | 554 | // We have just marked the task as scheduled, so enqueue it. |
| 551 | unsafe { | 555 | unsafe { |
| 552 | let executor = header.executor.get().unwrap_unchecked(); | 556 | let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked(); |
| 553 | executor.enqueue(task, l); | 557 | executor.enqueue(task, l); |
| 554 | } | 558 | } |
| 555 | }); | 559 | }); |
| @@ -563,7 +567,7 @@ pub fn wake_task_no_pend(task: TaskRef) { | |||
| 563 | header.state.run_enqueue(|l| { | 567 | header.state.run_enqueue(|l| { |
| 564 | // We have just marked the task as scheduled, so enqueue it. | 568 | // We have just marked the task as scheduled, so enqueue it. |
| 565 | unsafe { | 569 | unsafe { |
| 566 | let executor = header.executor.get().unwrap_unchecked(); | 570 | let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked(); |
| 567 | executor.run_queue.enqueue(task, l); | 571 | executor.run_queue.enqueue(task, l); |
| 568 | } | 572 | } |
| 569 | }); | 573 | }); |
diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index abfe94486..d7350464f 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs | |||
| @@ -2,13 +2,14 @@ use core::sync::atomic::{AtomicU32, Ordering}; | |||
| 2 | 2 | ||
| 3 | use super::timer_queue::TimerEnqueueOperation; | 3 | use super::timer_queue::TimerEnqueueOperation; |
| 4 | 4 | ||
| 5 | #[derive(Clone, Copy)] | ||
| 5 | pub(crate) struct Token(()); | 6 | pub(crate) struct Token(()); |
| 6 | 7 | ||
| 7 | /// Creates a token and passes it to the closure. | 8 | /// Creates a token and passes it to the closure. |
| 8 | /// | 9 | /// |
| 9 | /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. | 10 | /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. |
| 10 | pub(crate) fn locked(f: impl FnOnce(Token)) { | 11 | pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R { |
| 11 | f(Token(())); | 12 | f(Token(())) |
| 12 | } | 13 | } |
| 13 | 14 | ||
| 14 | /// Task is spawned (has a future) | 15 | /// Task is spawned (has a future) |
diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index f0f014652..c1e8f69ab 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs | |||
| @@ -3,13 +3,14 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering}; | |||
| 3 | 3 | ||
| 4 | use super::timer_queue::TimerEnqueueOperation; | 4 | use super::timer_queue::TimerEnqueueOperation; |
| 5 | 5 | ||
| 6 | #[derive(Clone, Copy)] | ||
| 6 | pub(crate) struct Token(()); | 7 | pub(crate) struct Token(()); |
| 7 | 8 | ||
| 8 | /// Creates a token and passes it to the closure. | 9 | /// Creates a token and passes it to the closure. |
| 9 | /// | 10 | /// |
| 10 | /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. | 11 | /// This is a no-op replacement for `CriticalSection::with` because we don't need any locking. |
| 11 | pub(crate) fn locked(f: impl FnOnce(Token)) { | 12 | pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R { |
| 12 | f(Token(())); | 13 | f(Token(())) |
| 13 | } | 14 | } |
| 14 | 15 | ||
| 15 | // Must be kept in sync with the layout of `State`! | 16 | // Must be kept in sync with the layout of `State`! |
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 271606244..16347ad71 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs | |||
| @@ -1,6 +1,7 @@ | |||
| 1 | use core::future::poll_fn; | 1 | use core::future::poll_fn; |
| 2 | use core::marker::PhantomData; | 2 | use core::marker::PhantomData; |
| 3 | use core::mem; | 3 | use core::mem; |
| 4 | use core::sync::atomic::Ordering; | ||
| 4 | use core::task::Poll; | 5 | use core::task::Poll; |
| 5 | 6 | ||
| 6 | use super::raw; | 7 | use super::raw; |
| @@ -92,7 +93,13 @@ impl Spawner { | |||
| 92 | pub async fn for_current_executor() -> Self { | 93 | pub async fn for_current_executor() -> Self { |
| 93 | poll_fn(|cx| { | 94 | poll_fn(|cx| { |
| 94 | let task = raw::task_from_waker(cx.waker()); | 95 | let task = raw::task_from_waker(cx.waker()); |
| 95 | let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; | 96 | let executor = unsafe { |
| 97 | task.header() | ||
| 98 | .executor | ||
| 99 | .load(Ordering::Relaxed) | ||
| 100 | .as_ref() | ||
| 101 | .unwrap_unchecked() | ||
| 102 | }; | ||
| 96 | let executor = unsafe { raw::Executor::wrap(executor) }; | 103 | let executor = unsafe { raw::Executor::wrap(executor) }; |
| 97 | Poll::Ready(Self::new(executor)) | 104 | Poll::Ready(Self::new(executor)) |
| 98 | }) | 105 | }) |
| @@ -164,7 +171,13 @@ impl SendSpawner { | |||
| 164 | pub async fn for_current_executor() -> Self { | 171 | pub async fn for_current_executor() -> Self { |
| 165 | poll_fn(|cx| { | 172 | poll_fn(|cx| { |
| 166 | let task = raw::task_from_waker(cx.waker()); | 173 | let task = raw::task_from_waker(cx.waker()); |
| 167 | let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; | 174 | let executor = unsafe { |
| 175 | task.header() | ||
| 176 | .executor | ||
| 177 | .load(Ordering::Relaxed) | ||
| 178 | .as_ref() | ||
| 179 | .unwrap_unchecked() | ||
| 180 | }; | ||
| 168 | Poll::Ready(Self::new(executor)) | 181 | Poll::Ready(Self::new(executor)) |
| 169 | }) | 182 | }) |
| 170 | .await | 183 | .await |
