From b268b1795fed58544c166c41842ce0d66328aa3e Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sun, 8 Dec 2024 23:27:32 +0100 Subject: Merge time-driver and time-queue-driver traits, make HALs own and handle the queue. --- embassy-time/src/driver_mock.rs | 113 ++++++++++------------------- embassy-time/src/driver_std.rs | 155 ++++++++++++---------------------------- embassy-time/src/driver_wasm.rs | 112 +++++++++++++---------------- 3 files changed, 130 insertions(+), 250 deletions(-) (limited to 'embassy-time/src') diff --git a/embassy-time/src/driver_mock.rs b/embassy-time/src/driver_mock.rs index 829eb0437..138d60499 100644 --- a/embassy-time/src/driver_mock.rs +++ b/embassy-time/src/driver_mock.rs @@ -1,7 +1,9 @@ use core::cell::RefCell; +use core::task::Waker; use critical_section::Mutex as CsMutex; use embassy_time_driver::Driver; +use embassy_time_queue_driver::Queue; use crate::{Duration, Instant}; @@ -52,50 +54,12 @@ impl MockDriver { /// Advances the time by the specified [`Duration`]. /// Calling any alarm callbacks that are due. pub fn advance(&self, duration: Duration) { - let notify = { - critical_section::with(|cs| { - let mut inner = self.0.borrow_ref_mut(cs); - - inner.now += duration; - - let now = inner.now.as_ticks(); - - if inner.alarm.timestamp <= now { - inner.alarm.timestamp = u64::MAX; - - Some((inner.alarm.callback, inner.alarm.ctx)) - } else { - None - } - }) - }; - - if let Some((callback, ctx)) = notify { - (callback)(ctx); - } - } - - /// Configures a callback to be called when the alarm fires. - pub fn set_alarm_callback(&self, callback: fn(*mut ()), ctx: *mut ()) { critical_section::with(|cs| { - let mut inner = self.0.borrow_ref_mut(cs); + let inner = &mut *self.0.borrow_ref_mut(cs); - inner.alarm.callback = callback; - inner.alarm.ctx = ctx; - }); - } - - /// Sets the alarm to fire at the specified timestamp. - pub fn set_alarm(&self, timestamp: u64) -> bool { - critical_section::with(|cs| { - let mut inner = self.0.borrow_ref_mut(cs); - - if timestamp <= inner.now.as_ticks() { - false - } else { - inner.alarm.timestamp = timestamp; - true - } + inner.now += duration; + // wake expired tasks. + inner.queue.next_expiration(inner.now.as_ticks()); }) } } @@ -104,44 +68,38 @@ impl Driver for MockDriver { fn now(&self) -> u64 { critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks() } + + fn schedule_wake(&self, at: u64, waker: &Waker) { + critical_section::with(|cs| { + let inner = &mut *self.0.borrow_ref_mut(cs); + // enqueue it + inner.queue.schedule_wake(at, waker); + // wake it if it's in the past. + inner.queue.next_expiration(inner.now.as_ticks()); + }) + } } struct InnerMockDriver { now: Instant, - alarm: AlarmState, + queue: Queue, } impl InnerMockDriver { const fn new() -> Self { Self { now: Instant::from_ticks(0), - alarm: AlarmState::new(), - } - } -} - -struct AlarmState { - timestamp: u64, - callback: fn(*mut ()), - ctx: *mut (), -} - -impl AlarmState { - const fn new() -> Self { - Self { - timestamp: u64::MAX, - callback: Self::noop, - ctx: core::ptr::null_mut(), + queue: Queue::new(), } } - - fn noop(_ctx: *mut ()) {} } -unsafe impl Send for AlarmState {} - #[cfg(test)] mod tests { + use core::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::task::Wake; + use serial_test::serial; use super::*; @@ -163,24 +121,25 @@ mod tests { #[test] #[serial] - fn test_set_alarm_not_in_future() { + fn test_schedule_wake() { setup(); - let driver = MockDriver::get(); - assert_eq!(false, driver.set_alarm(driver.now())); - } + static CALLBACK_CALLED: AtomicBool = AtomicBool::new(false); - #[test] - #[serial] - fn test_alarm() { - setup(); + struct MockWaker; + + impl Wake for MockWaker { + fn wake(self: Arc) { + CALLBACK_CALLED.store(true, Ordering::Relaxed); + } + } + let waker = Arc::new(MockWaker).into(); let driver = MockDriver::get(); - static mut CALLBACK_CALLED: bool = false; - driver.set_alarm_callback(|_| unsafe { CALLBACK_CALLED = true }, core::ptr::null_mut()); - driver.set_alarm(driver.now() + 1); - assert_eq!(false, unsafe { CALLBACK_CALLED }); + + driver.schedule_wake(driver.now() + 1, &waker); + assert_eq!(false, CALLBACK_CALLED.load(Ordering::Relaxed)); driver.advance(Duration::from_secs(1)); - assert_eq!(true, unsafe { CALLBACK_CALLED }); + assert_eq!(true, CALLBACK_CALLED.load(Ordering::Relaxed)); } } diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs index 45467f09b..35888fddd 100644 --- a/embassy-time/src/driver_std.rs +++ b/embassy-time/src/driver_std.rs @@ -1,96 +1,66 @@ -use std::cell::{RefCell, UnsafeCell}; -use std::mem::MaybeUninit; -use std::sync::{Condvar, Mutex, Once}; +use std::sync::{Condvar, Mutex}; +use std::thread; use std::time::{Duration as StdDuration, Instant as StdInstant}; -use std::{ptr, thread}; -use critical_section::Mutex as CsMutex; use embassy_time_driver::Driver; -use embassy_time_queue_driver::GlobalTimerQueue; +use embassy_time_queue_driver::Queue; -struct AlarmState { - timestamp: u64, -} - -unsafe impl Send for AlarmState {} - -impl AlarmState { - const fn new() -> Self { - Self { timestamp: u64::MAX } - } +struct TimeDriver { + signaler: Signaler, + inner: Mutex, } -struct TimeDriver { - once: Once, - // The STD Driver implementation requires the alarm's mutex to be reentrant, which the STD Mutex isn't - // Fortunately, mutexes based on the `critical-section` crate are reentrant, because the critical sections - // themselves are reentrant - alarm: UninitCell>>, - zero_instant: UninitCell, - signaler: UninitCell, +struct Inner { + zero_instant: Option, + queue: Queue, } embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { - once: Once::new(), - alarm: UninitCell::uninit(), - zero_instant: UninitCell::uninit(), - signaler: UninitCell::uninit(), + inner: Mutex::new(Inner{ + zero_instant: None, + queue: Queue::new(), + }), + signaler: Signaler::new(), }); -impl TimeDriver { - fn init(&self) { - self.once.call_once(|| unsafe { - self.alarm - .write(CsMutex::new(RefCell::new(const { AlarmState::new() }))); - self.zero_instant.write(StdInstant::now()); - self.signaler.write(Signaler::new()); - - thread::spawn(Self::alarm_thread); - }); +impl Inner { + fn init(&mut self) -> StdInstant { + *self.zero_instant.get_or_insert_with(|| { + thread::spawn(alarm_thread); + StdInstant::now() + }) } +} - fn alarm_thread() { - let zero = unsafe { DRIVER.zero_instant.read() }; - loop { - let now = DRIVER.now(); - - let next_alarm = critical_section::with(|cs| { - let mut alarm = unsafe { DRIVER.alarm.as_ref() }.borrow_ref_mut(cs); - if alarm.timestamp <= now { - alarm.timestamp = u64::MAX; - - TIMER_QUEUE_DRIVER.dispatch(); - } - alarm.timestamp - }); - - // Ensure we don't overflow - let until = zero - .checked_add(StdDuration::from_micros(next_alarm)) - .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1)); +impl Driver for TimeDriver { + fn now(&self) -> u64 { + let mut inner = self.inner.lock().unwrap(); + let zero = inner.init(); + StdInstant::now().duration_since(zero).as_micros() as u64 + } - unsafe { DRIVER.signaler.as_ref() }.wait_until(until); + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + let mut inner = self.inner.lock().unwrap(); + inner.init(); + if inner.queue.schedule_wake(at, waker) { + self.signaler.signal(); } } +} - fn set_alarm(&self, timestamp: u64) -> bool { - self.init(); - critical_section::with(|cs| { - let mut alarm = unsafe { self.alarm.as_ref() }.borrow_ref_mut(cs); - alarm.timestamp = timestamp; - unsafe { self.signaler.as_ref() }.signal(); - }); +fn alarm_thread() { + let zero = DRIVER.inner.lock().unwrap().zero_instant.unwrap(); + loop { + let now = DRIVER.now(); - true - } -} + let next_alarm = DRIVER.inner.lock().unwrap().queue.next_expiration(now); -impl Driver for TimeDriver { - fn now(&self) -> u64 { - self.init(); + // Ensure we don't overflow + let until = zero + .checked_add(StdDuration::from_micros(next_alarm)) + .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1)); - let zero = unsafe { self.zero_instant.read() }; - StdInstant::now().duration_since(zero).as_micros() as u64 + DRIVER.signaler.wait_until(until); } } @@ -100,7 +70,7 @@ struct Signaler { } impl Signaler { - fn new() -> Self { + const fn new() -> Self { Self { mutex: Mutex::new(false), condvar: Condvar::new(), @@ -132,40 +102,3 @@ impl Signaler { self.condvar.notify_one(); } } - -pub(crate) struct UninitCell(MaybeUninit>); -unsafe impl Send for UninitCell {} -unsafe impl Sync for UninitCell {} - -impl UninitCell { - pub const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - - pub unsafe fn as_ptr(&self) -> *const T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_ref(&self) -> &T { - &*self.as_ptr() - } - - pub unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) - } -} - -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -); diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs index dcc935fde..bcdd1670b 100644 --- a/embassy-time/src/driver_wasm.rs +++ b/embassy-time/src/driver_wasm.rs @@ -1,10 +1,7 @@ -use std::cell::UnsafeCell; -use std::mem::MaybeUninit; -use std::ptr; -use std::sync::{Mutex, Once}; +use std::sync::Mutex; use embassy_time_driver::Driver; -use embassy_time_queue_driver::GlobalTimerQueue; +use embassy_time_queue_driver::Queue; use wasm_bindgen::prelude::*; use wasm_timer::Instant as StdInstant; @@ -12,8 +9,6 @@ struct AlarmState { token: Option, } -unsafe impl Send for AlarmState {} - impl AlarmState { const fn new() -> Self { Self { token: None } @@ -27,33 +22,38 @@ extern "C" { } struct TimeDriver { - once: Once, - alarm: UninitCell>, - zero_instant: UninitCell, - closure: UninitCell>, + inner: Mutex, +} + +struct Inner { + alarm: AlarmState, + zero_instant: Option, + queue: Queue, + closure: Option>, } +unsafe impl Send for Inner {} + embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { - once: Once::new(), - alarm: UninitCell::uninit(), - zero_instant: UninitCell::uninit(), - closure: UninitCell::uninit() + inner: Mutex::new(Inner{ + zero_instant: None, + queue: Queue::new(), + alarm: AlarmState::new(), + closure: None, + }), }); -impl TimeDriver { - fn init(&self) { - self.once.call_once(|| unsafe { - self.alarm.write(Mutex::new(const { AlarmState::new() })); - self.zero_instant.write(StdInstant::now()); - self.closure - .write(Closure::new(Box::new(|| TIMER_QUEUE_DRIVER.dispatch()))); - }); +impl Inner { + fn init(&mut self) -> StdInstant { + *self.zero_instant.get_or_insert_with(StdInstant::now) + } + + fn now(&mut self) -> u64 { + StdInstant::now().duration_since(self.zero_instant.unwrap()).as_micros() as u64 } - fn set_alarm(&self, timestamp: u64) -> bool { - self.init(); - let mut alarm = unsafe { self.alarm.as_ref() }.lock().unwrap(); - if let Some(token) = alarm.token { + fn set_alarm(&mut self, timestamp: u64) -> bool { + if let Some(token) = self.alarm.token { clearTimeout(token); } @@ -62,7 +62,8 @@ impl TimeDriver { false } else { let timeout = (timestamp - now) as u32; - alarm.token = Some(setTimeout(unsafe { self.closure.as_ref() }, timeout / 1000)); + let closure = self.closure.get_or_insert_with(|| Closure::new(dispatch)); + self.alarm.token = Some(setTimeout(closure, timeout / 1000)); true } @@ -71,45 +72,32 @@ impl TimeDriver { impl Driver for TimeDriver { fn now(&self) -> u64 { - self.init(); - - let zero = unsafe { self.zero_instant.read() }; + let mut inner = self.inner.lock().unwrap(); + let zero = inner.init(); StdInstant::now().duration_since(zero).as_micros() as u64 } -} -pub(crate) struct UninitCell(MaybeUninit>); -unsafe impl Send for UninitCell {} -unsafe impl Sync for UninitCell {} - -impl UninitCell { - pub const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - unsafe fn as_ptr(&self) -> *const T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_ref(&self) -> &T { - &*self.as_ptr() - } - - pub unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + let mut inner = self.inner.lock().unwrap(); + inner.init(); + if inner.queue.schedule_wake(at, waker) { + let now = inner.now(); + let mut next = inner.queue.next_expiration(now); + while !inner.set_alarm(next) { + let now = inner.now(); + next = inner.queue.next_expiration(now); + } + } } } -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) +fn dispatch() { + let inner = &mut *DRIVER.inner.lock().unwrap(); + + let now = inner.now(); + let mut next = inner.queue.next_expiration(now); + while !inner.set_alarm(next) { + let now = inner.now(); + next = inner.queue.next_expiration(now); } } - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -); -- cgit