aboutsummaryrefslogtreecommitdiff
path: root/embassy-time-queue-utils/src/queue_integrated.rs
blob: 2731d1ac622687b538b6aa210c0cef2c6420a325 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! Timer queue operations.
use core::cell::Cell;
use core::cmp::min;
use core::ptr::NonNull;
use core::task::Waker;

use embassy_executor_timer_queue::TimerQueueItem;

/// An item in the timer queue.
#[derive(Default)]
struct QueueItem {
    /// The next item in the queue.
    ///
    /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
    /// value of `Some(dangling_pointer)`
    pub next: Cell<Option<NonNull<QueueItem>>>,

    /// The time at which this item expires.
    pub expires_at: u64,

    /// The registered waker. If Some, the item is enqueued in the timer queue.
    pub waker: Option<Waker>,
}

unsafe impl Sync for QueueItem {}

/// A timer queue, with items integrated into tasks.
///
/// # Safety
///
/// **This Queue is only safe when there is a single integrated queue in the system.**
///
/// If there are multiple integrated queues, additional checks are necessary to ensure that a Waker
/// is not attempted to be enqueued in multiple queues.
pub struct Queue {
    head: Cell<Option<NonNull<QueueItem>>>,
}

impl core::fmt::Debug for Queue {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        f.debug_struct("Queue").finish()
    }
}

unsafe impl Send for Queue {}
unsafe impl Sync for Queue {}

impl Queue {
    /// Creates a new timer queue.
    pub const fn new() -> Self {
        Self { head: Cell::new(None) }
    }

    /// Schedules a task to run at a specific time.
    ///
    /// If this function returns `true`, the called should find the next expiration time and set
    /// a new alarm for that time.
    pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool {
        let item = unsafe {
            // Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient to
            // ensure that this function creates the only mutable reference to the queue item.
            TimerQueueItem::from_embassy_waker(waker)
        };
        let item = unsafe { item.as_mut::<QueueItem>() };
        match item.waker.as_ref() {
            Some(_) if at <= item.expires_at => {
                // If expiration is sooner than previously set, update.
                item.expires_at = at;
                // The waker is always stored in its own queue item, so we don't need to update it.

                // Trigger a queue update in case this item can be immediately dequeued.
                true
            }
            Some(_) => {
                // Queue item does not need to be updated, the task will be scheduled to be woken
                // before the new expiration.
                false
            }
            None => {
                // If not in the queue, add it and update.
                let mut item_ptr = NonNull::from(item);
                let prev = self.head.replace(Some(item_ptr));

                let item = unsafe { item_ptr.as_mut() };

                item.expires_at = at;
                item.waker = Some(waker.clone());
                item.next.set(prev);
                // The default implementation doesn't care about the
                // opaque payload, leave it unchanged.

                true
            }
        }
    }

    /// Dequeues expired timers and returns the next alarm time.
    ///
    /// The provided callback will be called for each expired task. Tasks that never expire
    /// will be removed, but the callback will not be called.
    pub fn next_expiration(&mut self, now: u64) -> u64 {
        let mut next_expiration = u64::MAX;

        self.retain(|item| {
            if item.expires_at <= now {
                // Timer expired, process task.
                if let Some(waker) = item.waker.take() {
                    waker.wake();
                }
                false
            } else {
                // Timer didn't yet expire, or never expires.
                next_expiration = min(next_expiration, item.expires_at);
                item.expires_at != u64::MAX
            }
        });

        next_expiration
    }

    fn retain(&mut self, mut f: impl FnMut(&mut QueueItem) -> bool) {
        let mut prev = &self.head;
        while let Some(mut p) = prev.get() {
            let mut item = unsafe { p.as_mut() };

            if f(&mut item) {
                // Skip to next
                prev = &item.next;
            } else {
                // Remove it
                prev.set(item.next.get());
                item.next.set(None);
            }
        }
    }
}