From 74037f04933f4ec9a678e0b47fd6819e7c0489a9 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Mon, 4 Aug 2025 00:05:25 +0200 Subject: Make TimerQueueItem opaque --- embassy-time-queue-utils/src/lib.rs | 1 - embassy-time-queue-utils/src/queue_integrated.rs | 122 ++++++++++++++++------- 2 files changed, 84 insertions(+), 39 deletions(-) (limited to 'embassy-time-queue-utils/src') diff --git a/embassy-time-queue-utils/src/lib.rs b/embassy-time-queue-utils/src/lib.rs index 08e186432..a6f66913f 100644 --- a/embassy-time-queue-utils/src/lib.rs +++ b/embassy-time-queue-utils/src/lib.rs @@ -1,7 +1,6 @@ #![no_std] #![doc = include_str!("../README.md")] #![warn(missing_docs)] -#![deny(missing_debug_implementations)] #[cfg(feature = "_generic-queue")] pub mod queue_generic; diff --git a/embassy-time-queue-utils/src/queue_integrated.rs b/embassy-time-queue-utils/src/queue_integrated.rs index 748cd7843..2731d1ac6 100644 --- a/embassy-time-queue-utils/src/queue_integrated.rs +++ b/embassy-time-queue-utils/src/queue_integrated.rs @@ -1,16 +1,50 @@ //! Timer queue operations. use core::cell::Cell; use core::cmp::min; +use core::ptr::NonNull; use core::task::Waker; -use embassy_executor::raw::TaskRef; +use embassy_executor_timer_queue::TimerQueueItem; + +/// An item in the timer queue. +#[derive(Default)] +struct QueueItem { + /// The next item in the queue. + /// + /// If this field contains `Some`, the item is in the queue. The last item in the queue has a + /// value of `Some(dangling_pointer)` + pub next: Cell>>, + + /// The time at which this item expires. + pub expires_at: u64, + + /// The registered waker. If Some, the item is enqueued in the timer queue. + pub waker: Option, +} + +unsafe impl Sync for QueueItem {} /// A timer queue, with items integrated into tasks. -#[derive(Debug)] +/// +/// # Safety +/// +/// **This Queue is only safe when there is a single integrated queue in the system.** +/// +/// If there are multiple integrated queues, additional checks are necessary to ensure that a Waker +/// is not attempted to be enqueued in multiple queues. pub struct Queue { - head: Cell>, + head: Cell>>, } +impl core::fmt::Debug for Queue { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Queue").finish() + } +} + +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + impl Queue { /// Creates a new timer queue. pub const fn new() -> Self { @@ -22,25 +56,41 @@ impl Queue { /// If this function returns `true`, the called should find the next expiration time and set /// a new alarm for that time. pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool { - let task = embassy_executor::raw::task_from_waker(waker); - let item = task.timer_queue_item(); - if item.next.get().is_none() { - // If not in the queue, add it and update. - let prev = self.head.replace(Some(task)); - item.next.set(if prev.is_none() { - Some(unsafe { TaskRef::dangling() }) - } else { - prev - }); - item.expires_at.set(at); - true - } else if at <= item.expires_at.get() { - // If expiration is sooner than previously set, update. - item.expires_at.set(at); - true - } else { - // Task does not need to be updated. - false + let item = unsafe { + // Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient to + // ensure that this function creates the only mutable reference to the queue item. + TimerQueueItem::from_embassy_waker(waker) + }; + let item = unsafe { item.as_mut::() }; + match item.waker.as_ref() { + Some(_) if at <= item.expires_at => { + // If expiration is sooner than previously set, update. + item.expires_at = at; + // The waker is always stored in its own queue item, so we don't need to update it. + + // Trigger a queue update in case this item can be immediately dequeued. + true + } + Some(_) => { + // Queue item does not need to be updated, the task will be scheduled to be woken + // before the new expiration. + false + } + None => { + // If not in the queue, add it and update. + let mut item_ptr = NonNull::from(item); + let prev = self.head.replace(Some(item_ptr)); + + let item = unsafe { item_ptr.as_mut() }; + + item.expires_at = at; + item.waker = Some(waker.clone()); + item.next.set(prev); + // The default implementation doesn't care about the + // opaque payload, leave it unchanged. + + true + } } } @@ -51,33 +101,29 @@ impl Queue { pub fn next_expiration(&mut self, now: u64) -> u64 { let mut next_expiration = u64::MAX; - self.retain(|p| { - let item = p.timer_queue_item(); - let expires = item.expires_at.get(); - - if expires <= now { + self.retain(|item| { + if item.expires_at <= now { // Timer expired, process task. - embassy_executor::raw::wake_task(p); + if let Some(waker) = item.waker.take() { + waker.wake(); + } false } else { // Timer didn't yet expire, or never expires. - next_expiration = min(next_expiration, expires); - expires != u64::MAX + next_expiration = min(next_expiration, item.expires_at); + item.expires_at != u64::MAX } }); next_expiration } - fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { + fn retain(&mut self, mut f: impl FnMut(&mut QueueItem) -> bool) { let mut prev = &self.head; - while let Some(p) = prev.get() { - if unsafe { p == TaskRef::dangling() } { - // prev was the last item, stop - break; - } - let item = p.timer_queue_item(); - if f(p) { + while let Some(mut p) = prev.get() { + let mut item = unsafe { p.as_mut() }; + + if f(&mut item) { // Skip to next prev = &item.next; } else { -- cgit