aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw/run_queue.rs
diff options
context:
space:
mode:
authorQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
committerQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
commit6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch)
tree748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-executor/src/raw/run_queue.rs
parentd896f80405aa8963877049ed999e4aba25d6e2bb (diff)
parent6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff)
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-executor/src/raw/run_queue.rs')
-rw-r--r--embassy-executor/src/raw/run_queue.rs44
1 files changed, 29 insertions, 15 deletions
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
index ed8c82a5c..f1ec19ac1 100644
--- a/embassy-executor/src/raw/run_queue.rs
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -2,18 +2,18 @@ use core::ptr;
2use core::ptr::NonNull; 2use core::ptr::NonNull;
3 3
4use atomic_polyfill::{AtomicPtr, Ordering}; 4use atomic_polyfill::{AtomicPtr, Ordering};
5use critical_section::CriticalSection;
6 5
7use super::TaskHeader; 6use super::{TaskHeader, TaskRef};
7use crate::raw::util::SyncUnsafeCell;
8 8
9pub(crate) struct RunQueueItem { 9pub(crate) struct RunQueueItem {
10 next: AtomicPtr<TaskHeader>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
11} 11}
12 12
13impl RunQueueItem { 13impl RunQueueItem {
14 pub const fn new() -> Self { 14 pub const fn new() -> Self {
15 Self { 15 Self {
16 next: AtomicPtr::new(ptr::null_mut()), 16 next: SyncUnsafeCell::new(None),
17 } 17 }
18 } 18 }
19} 19}
@@ -46,29 +46,43 @@ impl RunQueue {
46 /// 46 ///
47 /// `item` must NOT be already enqueued in any queue. 47 /// `item` must NOT be already enqueued in any queue.
48 #[inline(always)] 48 #[inline(always)]
49 pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { 49 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
50 let prev = self.head.load(Ordering::Relaxed); 50 let mut was_empty = false;
51 task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); 51
52 self.head.store(task.as_ptr(), Ordering::Relaxed); 52 self.head
53 prev.is_null() 53 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
54 was_empty = prev.is_null();
55 unsafe {
56 // safety: the pointer is either null or valid
57 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
58 // safety: there are no concurrent accesses to `next`
59 task.header().run_queue_item.next.set(prev);
60 }
61 Some(task.as_ptr() as *mut _)
62 })
63 .ok();
64
65 was_empty
54 } 66 }
55 67
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 68 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 69 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 70 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { 71 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue. 72 // Atomically empty the queue.
61 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 73 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
74
75 // safety: the pointer is either null or valid
76 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
62 77
63 // Iterate the linked list of tasks that were previously in the queue. 78 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = NonNull::new(ptr) { 79 while let Some(task) = next {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten. 80 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task. 81 // Therefore, first read the next pointer, and only then process the task.
67 let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); 82 // safety: there are no concurrent accesses to `next`
83 next = unsafe { task.header().run_queue_item.next.get() };
68 84
69 on_task(task); 85 on_task(task);
70
71 ptr = next
72 } 86 }
73 } 87 }
74} 88}