diff options
Diffstat (limited to 'embassy-executor/src/raw/run_queue.rs')
| -rw-r--r-- | embassy-executor/src/raw/run_queue.rs | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index ed8c82a5c..f1ec19ac1 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs | |||
| @@ -2,18 +2,18 @@ use core::ptr; | |||
| 2 | use core::ptr::NonNull; | 2 | use core::ptr::NonNull; |
| 3 | 3 | ||
| 4 | use atomic_polyfill::{AtomicPtr, Ordering}; | 4 | use atomic_polyfill::{AtomicPtr, Ordering}; |
| 5 | use critical_section::CriticalSection; | ||
| 6 | 5 | ||
| 7 | use super::TaskHeader; | 6 | use super::{TaskHeader, TaskRef}; |
| 7 | use crate::raw::util::SyncUnsafeCell; | ||
| 8 | 8 | ||
| 9 | pub(crate) struct RunQueueItem { | 9 | pub(crate) struct RunQueueItem { |
| 10 | next: AtomicPtr<TaskHeader>, | 10 | next: SyncUnsafeCell<Option<TaskRef>>, |
| 11 | } | 11 | } |
| 12 | 12 | ||
| 13 | impl RunQueueItem { | 13 | impl RunQueueItem { |
| 14 | pub const fn new() -> Self { | 14 | pub const fn new() -> Self { |
| 15 | Self { | 15 | Self { |
| 16 | next: AtomicPtr::new(ptr::null_mut()), | 16 | next: SyncUnsafeCell::new(None), |
| 17 | } | 17 | } |
| 18 | } | 18 | } |
| 19 | } | 19 | } |
| @@ -46,29 +46,43 @@ impl RunQueue { | |||
| 46 | /// | 46 | /// |
| 47 | /// `item` must NOT be already enqueued in any queue. | 47 | /// `item` must NOT be already enqueued in any queue. |
| 48 | #[inline(always)] | 48 | #[inline(always)] |
| 49 | pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { | 49 | pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { |
| 50 | let prev = self.head.load(Ordering::Relaxed); | 50 | let mut was_empty = false; |
| 51 | task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); | 51 | |
| 52 | self.head.store(task.as_ptr(), Ordering::Relaxed); | 52 | self.head |
| 53 | prev.is_null() | 53 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { |
| 54 | was_empty = prev.is_null(); | ||
| 55 | unsafe { | ||
| 56 | // safety: the pointer is either null or valid | ||
| 57 | let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); | ||
| 58 | // safety: there are no concurrent accesses to `next` | ||
| 59 | task.header().run_queue_item.next.set(prev); | ||
| 60 | } | ||
| 61 | Some(task.as_ptr() as *mut _) | ||
| 62 | }) | ||
| 63 | .ok(); | ||
| 64 | |||
| 65 | was_empty | ||
| 54 | } | 66 | } |
| 55 | 67 | ||
| 56 | /// Empty the queue, then call `on_task` for each task that was in the queue. | 68 | /// Empty the queue, then call `on_task` for each task that was in the queue. |
| 57 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue | 69 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue |
| 58 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. | 70 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. |
| 59 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { | 71 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { |
| 60 | // Atomically empty the queue. | 72 | // Atomically empty the queue. |
| 61 | let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); | 73 | let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); |
| 74 | |||
| 75 | // safety: the pointer is either null or valid | ||
| 76 | let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; | ||
| 62 | 77 | ||
| 63 | // Iterate the linked list of tasks that were previously in the queue. | 78 | // Iterate the linked list of tasks that were previously in the queue. |
| 64 | while let Some(task) = NonNull::new(ptr) { | 79 | while let Some(task) = next { |
| 65 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | 80 | // If the task re-enqueues itself, the `next` pointer will get overwritten. |
| 66 | // Therefore, first read the next pointer, and only then process the task. | 81 | // Therefore, first read the next pointer, and only then process the task. |
| 67 | let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); | 82 | // safety: there are no concurrent accesses to `next` |
| 83 | next = unsafe { task.header().run_queue_item.next.get() }; | ||
| 68 | 84 | ||
| 69 | on_task(task); | 85 | on_task(task); |
| 70 | |||
| 71 | ptr = next | ||
| 72 | } | 86 | } |
| 73 | } | 87 | } |
| 74 | } | 88 | } |
