From 48e1aab762e902ee0a132602d3c2f9ec0551cd6b Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Sun, 29 Jan 2023 12:55:06 -0600 Subject: executor: Replace `NonNull` with `TaskRef` --- embassy-executor/src/raw/run_queue.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'embassy-executor/src/raw/run_queue.rs') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index ed8c82a5c..362157535 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -4,7 +4,7 @@ use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; use critical_section::CriticalSection; -use super::TaskHeader; +use super::{TaskHeader, TaskRef}; pub(crate) struct RunQueueItem { next: AtomicPtr, @@ -46,25 +46,26 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull) -> bool { + pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { let prev = self.head.load(Ordering::Relaxed); - task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); - self.head.store(task.as_ptr(), Ordering::Relaxed); + task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + self.head.store(task.as_ptr() as _, Ordering::Relaxed); prev.is_null() } /// Empty the queue, then call `on_task` for each task that was in the queue. /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. - pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull)) { + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // Atomically empty the queue. let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); // Iterate the linked list of tasks that were previously in the queue. while let Some(task) = NonNull::new(ptr) { + let task = unsafe { TaskRef::from_ptr(task.as_ptr()) }; // If the task re-enqueues itself, the `next` pointer will get overwritten. // Therefore, first read the next pointer, and only then process the task. - let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); + let next = task.header().run_queue_item.next.load(Ordering::Relaxed); on_task(task); -- cgit From 8290236ed64435453a9c028c95246a86371bd4ce Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Sun, 2 Apr 2023 14:11:31 -0500 Subject: executor: Replace unsound critical sections with atomics --- embassy-executor/src/raw/run_queue.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'embassy-executor/src/raw/run_queue.rs') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index 362157535..a88174a0c 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -2,7 +2,6 @@ use core::ptr; use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; -use critical_section::CriticalSection; use super::{TaskHeader, TaskRef}; @@ -46,11 +45,18 @@ impl RunQueue { /// /// `item` must NOT be already enqueued in any queue. #[inline(always)] - pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { - let prev = self.head.load(Ordering::Relaxed); - task.header().run_queue_item.next.store(prev, Ordering::Relaxed); - self.head.store(task.as_ptr() as _, Ordering::Relaxed); - prev.is_null() + pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + let mut was_empty = false; + + self.head + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { + was_empty = prev.is_null(); + task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + Some(task.as_ptr() as *mut _) + }) + .ok(); + + was_empty } /// Empty the queue, then call `on_task` for each task that was in the queue. -- cgit From 6a6c673c5fd1baa1d3ca3eebb55ba430a86b1438 Mon Sep 17 00:00:00 2001 From: Grant Miller Date: Thu, 13 Apr 2023 14:21:41 -0500 Subject: Executor: Replace unnecessary atomics in runqueue --- embassy-executor/src/raw/run_queue.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) (limited to 'embassy-executor/src/raw/run_queue.rs') diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index a88174a0c..f1ec19ac1 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs @@ -4,15 +4,16 @@ use core::ptr::NonNull; use atomic_polyfill::{AtomicPtr, Ordering}; use super::{TaskHeader, TaskRef}; +use crate::raw::util::SyncUnsafeCell; pub(crate) struct RunQueueItem { - next: AtomicPtr, + next: SyncUnsafeCell>, } impl RunQueueItem { pub const fn new() -> Self { Self { - next: AtomicPtr::new(ptr::null_mut()), + next: SyncUnsafeCell::new(None), } } } @@ -51,7 +52,12 @@ impl RunQueue { self.head .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { was_empty = prev.is_null(); - task.header().run_queue_item.next.store(prev, Ordering::Relaxed); + unsafe { + // safety: the pointer is either null or valid + let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); + // safety: there are no concurrent accesses to `next` + task.header().run_queue_item.next.set(prev); + } Some(task.as_ptr() as *mut _) }) .ok(); @@ -64,18 +70,19 @@ impl RunQueue { /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { // Atomically empty the queue. - let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); + + // safety: the pointer is either null or valid + let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; // Iterate the linked list of tasks that were previously in the queue. - while let Some(task) = NonNull::new(ptr) { - let task = unsafe { TaskRef::from_ptr(task.as_ptr()) }; + while let Some(task) = next { // If the task re-enqueues itself, the `next` pointer will get overwritten. // Therefore, first read the next pointer, and only then process the task. - let next = task.header().run_queue_item.next.load(Ordering::Relaxed); + // safety: there are no concurrent accesses to `next` + next = unsafe { task.header().run_queue_item.next.get() }; on_task(task); - - ptr = next } } } -- cgit