aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-executor')
-rw-r--r--embassy-executor/src/raw/run_queue.rs25
1 files changed, 16 insertions, 9 deletions
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
index a88174a0c..f1ec19ac1 100644
--- a/embassy-executor/src/raw/run_queue.rs
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -4,15 +4,16 @@ use core::ptr::NonNull;
4use atomic_polyfill::{AtomicPtr, Ordering}; 4use atomic_polyfill::{AtomicPtr, Ordering};
5 5
6use super::{TaskHeader, TaskRef}; 6use super::{TaskHeader, TaskRef};
7use crate::raw::util::SyncUnsafeCell;
7 8
8pub(crate) struct RunQueueItem { 9pub(crate) struct RunQueueItem {
9 next: AtomicPtr<TaskHeader>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
10} 11}
11 12
12impl RunQueueItem { 13impl RunQueueItem {
13 pub const fn new() -> Self { 14 pub const fn new() -> Self {
14 Self { 15 Self {
15 next: AtomicPtr::new(ptr::null_mut()), 16 next: SyncUnsafeCell::new(None),
16 } 17 }
17 } 18 }
18} 19}
@@ -51,7 +52,12 @@ impl RunQueue {
51 self.head 52 self.head
52 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { 53 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
53 was_empty = prev.is_null(); 54 was_empty = prev.is_null();
54 task.header().run_queue_item.next.store(prev, Ordering::Relaxed); 55 unsafe {
56 // safety: the pointer is either null or valid
57 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
58 // safety: there are no concurrent accesses to `next`
59 task.header().run_queue_item.next.set(prev);
60 }
55 Some(task.as_ptr() as *mut _) 61 Some(task.as_ptr() as *mut _)
56 }) 62 })
57 .ok(); 63 .ok();
@@ -64,18 +70,19 @@ impl RunQueue {
64 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 70 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
65 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { 71 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
66 // Atomically empty the queue. 72 // Atomically empty the queue.
67 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 73 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
74
75 // safety: the pointer is either null or valid
76 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
68 77
69 // Iterate the linked list of tasks that were previously in the queue. 78 // Iterate the linked list of tasks that were previously in the queue.
70 while let Some(task) = NonNull::new(ptr) { 79 while let Some(task) = next {
71 let task = unsafe { TaskRef::from_ptr(task.as_ptr()) };
72 // If the task re-enqueues itself, the `next` pointer will get overwritten. 80 // If the task re-enqueues itself, the `next` pointer will get overwritten.
73 // Therefore, first read the next pointer, and only then process the task. 81 // Therefore, first read the next pointer, and only then process the task.
74 let next = task.header().run_queue_item.next.load(Ordering::Relaxed); 82 // safety: there are no concurrent accesses to `next`
83 next = unsafe { task.header().run_queue_item.next.get() };
75 84
76 on_task(task); 85 on_task(task);
77
78 ptr = next
79 } 86 }
80 } 87 }
81} 88}