From 5a5495aac43d75610735f2ca80fb6c8e8f31ed71 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 26 Nov 2024 23:54:21 +0100 Subject: Refactor integrated-timers --- embassy-time-queue-driver/Cargo.toml | 33 ++++++ embassy-time-queue-driver/src/lib.rs | 136 ++++++++++++++++++++++- embassy-time-queue-driver/src/queue_generic.rs | 146 +++++++++++++++++++++++++ 3 files changed, 313 insertions(+), 2 deletions(-) create mode 100644 embassy-time-queue-driver/src/queue_generic.rs (limited to 'embassy-time-queue-driver') diff --git a/embassy-time-queue-driver/Cargo.toml b/embassy-time-queue-driver/Cargo.toml index 9ce9d79bb..599041a3f 100644 --- a/embassy-time-queue-driver/Cargo.toml +++ b/embassy-time-queue-driver/Cargo.toml @@ -20,6 +20,39 @@ categories = [ # This is especially common when mixing crates from crates.io and git. links = "embassy-time-queue" +[dependencies] +critical-section = "1.2.0" +heapless = "0.8" +embassy-executor = { version = "0.6.3", path = "../embassy-executor", optional = true } +embassy-time-driver = { version = "0.1.0", path = "../embassy-time-driver" } + +[features] +#! ### Generic Queue + +## Use the executor-integrated `embassy-time` timer queue. The timer items are stored inside +## the task headers, so you do not need to set a capacity for the queue. +## To use this you must have a time driver provided. +## +## If this feature is not enabled, a generic queue is available with a configurable capacity. +integrated-timers = ["embassy-executor/integrated-timers"] + +#! The following features set how many timers are used for the generic queue. At most one +#! `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used. +#! +#! When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the +#! end user to pick. + +## Generic Queue with 8 timers +generic-queue-8 = [] +## Generic Queue with 16 timers +generic-queue-16 = [] +## Generic Queue with 32 timers +generic-queue-32 = [] +## Generic Queue with 64 timers +generic-queue-64 = [] +## Generic Queue with 128 timers +generic-queue-128 = [] + [package.metadata.embassy_docs] src_base = "https://github.com/embassy-rs/embassy/blob/embassy-time-queue-driver-v$VERSION/embassy-time-queue-driver/src/" src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-time-queue-driver/src/" diff --git a/embassy-time-queue-driver/src/lib.rs b/embassy-time-queue-driver/src/lib.rs index 50736e8c7..c5e989854 100644 --- a/embassy-time-queue-driver/src/lib.rs +++ b/embassy-time-queue-driver/src/lib.rs @@ -6,7 +6,29 @@ //! //! - Define a struct `MyTimerQueue` //! - Implement [`TimerQueue`] for it -//! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl). +//! - Register it as the global timer queue with [`timer_queue_impl`]. +//! - Ensure that you process the timer queue when `schedule_wake` is due. This usually involves +//! waking expired tasks, finding the next expiration time and setting an alarm. +//! +//! If a single global timer queue is sufficient for you, you can use the +//! [`GlobalTimerQueue`] type, which is a wrapper around a global timer queue +//! protected by a critical section. +//! +//! ``` +//! use embassy_time_queue_driver::GlobalTimerQueue; +//! embassy_time_queue_driver::timer_queue_impl!( +//! static TIMER_QUEUE_DRIVER: GlobalTimerQueue +//! = GlobalTimerQueue::new(|next_expiration| todo!("Set an alarm")) +//! ); +//! ``` +//! +//! You can also use the `queue_generic` or the `embassy_executor::raw::timer_queue` modules to +//! implement your own timer queue. These modules contain queue implementations which you can wrap +//! and tailor to your needs. +//! +//! If you are providing an embassy-executor implementation besides a timer queue, you can choose to +//! expose the `integrated-timers` feature in your implementation. This feature stores timer items +//! in the tasks themselves, so you don't need a fixed-size queue or dynamic memory allocation. //! //! ## Example //! @@ -14,7 +36,7 @@ //! use core::task::Waker; //! //! use embassy_time::Instant; -//! use embassy_time::queue::{TimerQueue}; +//! use embassy_time::queue::TimerQueue; //! //! struct MyTimerQueue{}; // not public! //! @@ -26,11 +48,18 @@ //! //! embassy_time_queue_driver::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{}); //! ``` + +pub mod queue_generic; + +use core::cell::RefCell; use core::task::Waker; +use critical_section::Mutex; + /// Timer queue pub trait TimerQueue { /// Schedules a waker in the queue to be awoken at moment `at`. + /// /// If this moment is in the past, the waker might be awoken immediately. fn schedule_wake(&'static self, at: u64, waker: &Waker); } @@ -58,3 +87,106 @@ macro_rules! timer_queue_impl { } }; } + +#[cfg(feature = "integrated-timers")] +type InnerQueue = embassy_executor::raw::timer_queue::TimerQueue; + +#[cfg(not(feature = "integrated-timers"))] +type InnerQueue = queue_generic::Queue; + +/// A timer queue implementation that can be used as a global timer queue. +/// +/// This implementation is not thread-safe, and should be protected by a mutex of some sort. +pub struct GenericTimerQueue bool> { + queue: InnerQueue, + set_alarm: F, +} + +impl bool> GenericTimerQueue { + /// Creates a new timer queue. + /// + /// `set_alarm` is a function that should set the next alarm time. The function should + /// return `true` if the alarm was set, and `false` if the alarm was in the past. + pub const fn new(set_alarm: F) -> Self { + Self { + queue: InnerQueue::new(), + set_alarm, + } + } + + /// Schedules a task to run at a specific time, and returns whether any changes were made. + pub fn schedule_wake(&mut self, at: u64, waker: &core::task::Waker) { + #[cfg(feature = "integrated-timers")] + let waker = embassy_executor::raw::task_from_waker(waker); + + if self.queue.schedule_wake(at, waker) { + self.dispatch() + } + } + + /// Dequeues expired timers and returns the next alarm time. + pub fn next_expiration(&mut self, now: u64) -> u64 { + self.queue.next_expiration(now) + } + + /// Handle the alarm. + /// + /// Call this function when the next alarm is due. + pub fn dispatch(&mut self) { + let mut next_expiration = self.next_expiration(embassy_time_driver::now()); + + while !(self.set_alarm)(next_expiration) { + // next_expiration is in the past, dequeue and find a new expiration + next_expiration = self.next_expiration(next_expiration); + } + } +} + +/// A [`GenericTimerQueue`] protected by a critical section. Directly useable as a [`TimerQueue`]. +pub struct GlobalTimerQueue { + inner: Mutex bool>>>, +} + +impl GlobalTimerQueue { + /// Creates a new timer queue. + /// + /// `set_alarm` is a function that should set the next alarm time. The function should + /// return `true` if the alarm was set, and `false` if the alarm was in the past. + pub const fn new(set_alarm: fn(u64) -> bool) -> Self { + Self { + inner: Mutex::new(RefCell::new(GenericTimerQueue::new(set_alarm))), + } + } + + /// Schedules a task to run at a specific time, and returns whether any changes were made. + pub fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + inner.schedule_wake(at, waker); + }); + } + + /// Dequeues expired timers and returns the next alarm time. + pub fn next_expiration(&self, now: u64) -> u64 { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + inner.next_expiration(now) + }) + } + + /// Handle the alarm. + /// + /// Call this function when the next alarm is due. + pub fn dispatch(&self) { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + inner.dispatch() + }) + } +} + +impl TimerQueue for GlobalTimerQueue { + fn schedule_wake(&'static self, at: u64, waker: &Waker) { + GlobalTimerQueue::schedule_wake(self, at, waker) + } +} diff --git a/embassy-time-queue-driver/src/queue_generic.rs b/embassy-time-queue-driver/src/queue_generic.rs new file mode 100644 index 000000000..232035bc6 --- /dev/null +++ b/embassy-time-queue-driver/src/queue_generic.rs @@ -0,0 +1,146 @@ +//! Generic timer queue implementations. +//! +//! Time queue drivers may use this to simplify their implementation. + +use core::cmp::{min, Ordering}; +use core::task::Waker; + +use heapless::Vec; + +#[derive(Debug)] +struct Timer { + at: u64, + waker: Waker, +} + +impl PartialEq for Timer { + fn eq(&self, other: &Self) -> bool { + self.at == other.at + } +} + +impl Eq for Timer {} + +impl PartialOrd for Timer { + fn partial_cmp(&self, other: &Self) -> Option { + self.at.partial_cmp(&other.at) + } +} + +impl Ord for Timer { + fn cmp(&self, other: &Self) -> Ordering { + self.at.cmp(&other.at) + } +} + +/// A timer queue with a pre-determined capacity. +pub struct ConstGenericQueue { + queue: Vec, +} + +impl ConstGenericQueue { + /// Creates a new timer queue. + pub const fn new() -> Self { + Self { queue: Vec::new() } + } + + /// Schedules a task to run at a specific time, and returns whether any changes were made. + /// + /// If this function returns `true`, the called should find the next expiration time and set + /// a new alarm for that time. + pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool { + self.queue + .iter_mut() + .find(|timer| timer.waker.will_wake(waker)) + .map(|timer| { + if timer.at > at { + timer.at = at; + true + } else { + false + } + }) + .unwrap_or_else(|| { + let mut timer = Timer { + waker: waker.clone(), + at, + }; + + loop { + match self.queue.push(timer) { + Ok(()) => break, + Err(e) => timer = e, + } + + self.queue.pop().unwrap().waker.wake(); + } + + true + }) + } + + /// Dequeues expired timers and returns the next alarm time. + pub fn next_expiration(&mut self, now: u64) -> u64 { + let mut next_alarm = u64::MAX; + + let mut i = 0; + while i < self.queue.len() { + let timer = &self.queue[i]; + if timer.at <= now { + let timer = self.queue.swap_remove(i); + timer.waker.wake(); + } else { + next_alarm = min(next_alarm, timer.at); + i += 1; + } + } + + next_alarm + } +} + +#[cfg(feature = "generic-queue-8")] +const QUEUE_SIZE: usize = 8; +#[cfg(feature = "generic-queue-16")] +const QUEUE_SIZE: usize = 16; +#[cfg(feature = "generic-queue-32")] +const QUEUE_SIZE: usize = 32; +#[cfg(feature = "generic-queue-64")] +const QUEUE_SIZE: usize = 64; +#[cfg(feature = "generic-queue-128")] +const QUEUE_SIZE: usize = 128; +#[cfg(not(any( + feature = "generic-queue-8", + feature = "generic-queue-16", + feature = "generic-queue-32", + feature = "generic-queue-64", + feature = "generic-queue-128" +)))] +const QUEUE_SIZE: usize = 64; + +/// A timer queue with a pre-determined capacity. +pub struct Queue { + queue: ConstGenericQueue, +} + +impl Queue { + /// Creates a new timer queue. + pub const fn new() -> Self { + Self { + queue: ConstGenericQueue::new(), + } + } + + /// Schedules a task to run at a specific time, and returns whether any changes were made. + /// + /// If this function returns `true`, the called should find the next expiration time and set + /// a new alarm for that time. + pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool { + self.queue.schedule_wake(at, waker) + } + + /// Dequeues expired timers and returns the next alarm time. + pub fn next_expiration(&mut self, now: u64) -> u64 { + self.queue.next_expiration(now) + } +} -- cgit