diff options
| author | Ulf Lilleengen <[email protected]> | 2025-08-25 17:28:38 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-08-25 17:28:38 +0000 |
| commit | ac60eaeddd9c4accbe8dc20d0486382940723efb (patch) | |
| tree | 0816242eaaceb2b633d040351fdef1a3e8b5e6ed /embassy-time-queue-utils/src | |
| parent | 1975965d7710cd86c7a169a9b5ae6340bddc316c (diff) | |
| parent | 3d1ce0fb7daadf647298230b51a7fb8fbaa1feed (diff) | |
Merge pull request #4550 from bugadani/time-queue
Make timer queue item opaque, move to new crate
Diffstat (limited to 'embassy-time-queue-utils/src')
| -rw-r--r-- | embassy-time-queue-utils/src/lib.rs | 1 | ||||
| -rw-r--r-- | embassy-time-queue-utils/src/queue_integrated.rs | 122 |
2 files changed, 84 insertions, 39 deletions
diff --git a/embassy-time-queue-utils/src/lib.rs b/embassy-time-queue-utils/src/lib.rs index 08e186432..a6f66913f 100644 --- a/embassy-time-queue-utils/src/lib.rs +++ b/embassy-time-queue-utils/src/lib.rs | |||
| @@ -1,7 +1,6 @@ | |||
| 1 | #![no_std] | 1 | #![no_std] |
| 2 | #![doc = include_str!("../README.md")] | 2 | #![doc = include_str!("../README.md")] |
| 3 | #![warn(missing_docs)] | 3 | #![warn(missing_docs)] |
| 4 | #![deny(missing_debug_implementations)] | ||
| 5 | 4 | ||
| 6 | #[cfg(feature = "_generic-queue")] | 5 | #[cfg(feature = "_generic-queue")] |
| 7 | pub mod queue_generic; | 6 | pub mod queue_generic; |
diff --git a/embassy-time-queue-utils/src/queue_integrated.rs b/embassy-time-queue-utils/src/queue_integrated.rs index 748cd7843..2731d1ac6 100644 --- a/embassy-time-queue-utils/src/queue_integrated.rs +++ b/embassy-time-queue-utils/src/queue_integrated.rs | |||
| @@ -1,16 +1,50 @@ | |||
| 1 | //! Timer queue operations. | 1 | //! Timer queue operations. |
| 2 | use core::cell::Cell; | 2 | use core::cell::Cell; |
| 3 | use core::cmp::min; | 3 | use core::cmp::min; |
| 4 | use core::ptr::NonNull; | ||
| 4 | use core::task::Waker; | 5 | use core::task::Waker; |
| 5 | 6 | ||
| 6 | use embassy_executor::raw::TaskRef; | 7 | use embassy_executor_timer_queue::TimerQueueItem; |
| 8 | |||
| 9 | /// An item in the timer queue. | ||
| 10 | #[derive(Default)] | ||
| 11 | struct QueueItem { | ||
| 12 | /// The next item in the queue. | ||
| 13 | /// | ||
| 14 | /// If this field contains `Some`, the item is in the queue. The last item in the queue has a | ||
| 15 | /// value of `Some(dangling_pointer)` | ||
| 16 | pub next: Cell<Option<NonNull<QueueItem>>>, | ||
| 17 | |||
| 18 | /// The time at which this item expires. | ||
| 19 | pub expires_at: u64, | ||
| 20 | |||
| 21 | /// The registered waker. If Some, the item is enqueued in the timer queue. | ||
| 22 | pub waker: Option<Waker>, | ||
| 23 | } | ||
| 24 | |||
| 25 | unsafe impl Sync for QueueItem {} | ||
| 7 | 26 | ||
| 8 | /// A timer queue, with items integrated into tasks. | 27 | /// A timer queue, with items integrated into tasks. |
| 9 | #[derive(Debug)] | 28 | /// |
| 29 | /// # Safety | ||
| 30 | /// | ||
| 31 | /// **This Queue is only safe when there is a single integrated queue in the system.** | ||
| 32 | /// | ||
| 33 | /// If there are multiple integrated queues, additional checks are necessary to ensure that a Waker | ||
| 34 | /// is not attempted to be enqueued in multiple queues. | ||
| 10 | pub struct Queue { | 35 | pub struct Queue { |
| 11 | head: Cell<Option<TaskRef>>, | 36 | head: Cell<Option<NonNull<QueueItem>>>, |
| 12 | } | 37 | } |
| 13 | 38 | ||
| 39 | impl core::fmt::Debug for Queue { | ||
| 40 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | ||
| 41 | f.debug_struct("Queue").finish() | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | unsafe impl Send for Queue {} | ||
| 46 | unsafe impl Sync for Queue {} | ||
| 47 | |||
| 14 | impl Queue { | 48 | impl Queue { |
| 15 | /// Creates a new timer queue. | 49 | /// Creates a new timer queue. |
| 16 | pub const fn new() -> Self { | 50 | pub const fn new() -> Self { |
| @@ -22,25 +56,41 @@ impl Queue { | |||
| 22 | /// If this function returns `true`, the called should find the next expiration time and set | 56 | /// If this function returns `true`, the called should find the next expiration time and set |
| 23 | /// a new alarm for that time. | 57 | /// a new alarm for that time. |
| 24 | pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool { | 58 | pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool { |
| 25 | let task = embassy_executor::raw::task_from_waker(waker); | 59 | let item = unsafe { |
| 26 | let item = task.timer_queue_item(); | 60 | // Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient to |
| 27 | if item.next.get().is_none() { | 61 | // ensure that this function creates the only mutable reference to the queue item. |
| 28 | // If not in the queue, add it and update. | 62 | TimerQueueItem::from_embassy_waker(waker) |
| 29 | let prev = self.head.replace(Some(task)); | 63 | }; |
| 30 | item.next.set(if prev.is_none() { | 64 | let item = unsafe { item.as_mut::<QueueItem>() }; |
| 31 | Some(unsafe { TaskRef::dangling() }) | 65 | match item.waker.as_ref() { |
| 32 | } else { | 66 | Some(_) if at <= item.expires_at => { |
| 33 | prev | 67 | // If expiration is sooner than previously set, update. |
| 34 | }); | 68 | item.expires_at = at; |
| 35 | item.expires_at.set(at); | 69 | // The waker is always stored in its own queue item, so we don't need to update it. |
| 36 | true | 70 | |
| 37 | } else if at <= item.expires_at.get() { | 71 | // Trigger a queue update in case this item can be immediately dequeued. |
| 38 | // If expiration is sooner than previously set, update. | 72 | true |
| 39 | item.expires_at.set(at); | 73 | } |
| 40 | true | 74 | Some(_) => { |
| 41 | } else { | 75 | // Queue item does not need to be updated, the task will be scheduled to be woken |
| 42 | // Task does not need to be updated. | 76 | // before the new expiration. |
| 43 | false | 77 | false |
| 78 | } | ||
| 79 | None => { | ||
| 80 | // If not in the queue, add it and update. | ||
| 81 | let mut item_ptr = NonNull::from(item); | ||
| 82 | let prev = self.head.replace(Some(item_ptr)); | ||
| 83 | |||
| 84 | let item = unsafe { item_ptr.as_mut() }; | ||
| 85 | |||
| 86 | item.expires_at = at; | ||
| 87 | item.waker = Some(waker.clone()); | ||
| 88 | item.next.set(prev); | ||
| 89 | // The default implementation doesn't care about the | ||
| 90 | // opaque payload, leave it unchanged. | ||
| 91 | |||
| 92 | true | ||
| 93 | } | ||
| 44 | } | 94 | } |
| 45 | } | 95 | } |
| 46 | 96 | ||
| @@ -51,33 +101,29 @@ impl Queue { | |||
| 51 | pub fn next_expiration(&mut self, now: u64) -> u64 { | 101 | pub fn next_expiration(&mut self, now: u64) -> u64 { |
| 52 | let mut next_expiration = u64::MAX; | 102 | let mut next_expiration = u64::MAX; |
| 53 | 103 | ||
| 54 | self.retain(|p| { | 104 | self.retain(|item| { |
| 55 | let item = p.timer_queue_item(); | 105 | if item.expires_at <= now { |
| 56 | let expires = item.expires_at.get(); | ||
| 57 | |||
| 58 | if expires <= now { | ||
| 59 | // Timer expired, process task. | 106 | // Timer expired, process task. |
| 60 | embassy_executor::raw::wake_task(p); | 107 | if let Some(waker) = item.waker.take() { |
| 108 | waker.wake(); | ||
| 109 | } | ||
| 61 | false | 110 | false |
| 62 | } else { | 111 | } else { |
| 63 | // Timer didn't yet expire, or never expires. | 112 | // Timer didn't yet expire, or never expires. |
| 64 | next_expiration = min(next_expiration, expires); | 113 | next_expiration = min(next_expiration, item.expires_at); |
| 65 | expires != u64::MAX | 114 | item.expires_at != u64::MAX |
| 66 | } | 115 | } |
| 67 | }); | 116 | }); |
| 68 | 117 | ||
| 69 | next_expiration | 118 | next_expiration |
| 70 | } | 119 | } |
| 71 | 120 | ||
| 72 | fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { | 121 | fn retain(&mut self, mut f: impl FnMut(&mut QueueItem) -> bool) { |
| 73 | let mut prev = &self.head; | 122 | let mut prev = &self.head; |
| 74 | while let Some(p) = prev.get() { | 123 | while let Some(mut p) = prev.get() { |
| 75 | if unsafe { p == TaskRef::dangling() } { | 124 | let mut item = unsafe { p.as_mut() }; |
| 76 | // prev was the last item, stop | 125 | |
| 77 | break; | 126 | if f(&mut item) { |
| 78 | } | ||
| 79 | let item = p.timer_queue_item(); | ||
| 80 | if f(p) { | ||
| 81 | // Skip to next | 127 | // Skip to next |
| 82 | prev = &item.next; | 128 | prev = &item.next; |
| 83 | } else { | 129 | } else { |
