aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2020-12-26 16:42:44 +0100
committerDario Nieuwenhuis <[email protected]>2020-12-26 16:42:44 +0100
commit3df66c44e3bd84df659e2f4d30bb18128cd89427 (patch)
treedcae98e93c63dc027579f9906bebf8e01fa7f57e
parentdb8b4ca56533eb743f8b00b904581bc99e4c9597 (diff)
Split executor into multiple files, remove old timers implementation.
-rw-r--r--embassy/src/executor/executor.rs301
-rw-r--r--embassy/src/executor/mod.rs229
-rw-r--r--embassy/src/executor/run_queue.rs70
-rw-r--r--embassy/src/executor/timer_executor.rs77
-rw-r--r--embassy/src/executor/util.rs32
-rw-r--r--embassy/src/time/mod.rs2
-rw-r--r--embassy/src/time/timer.rs63
7 files changed, 324 insertions, 450 deletions
diff --git a/embassy/src/executor/executor.rs b/embassy/src/executor/executor.rs
deleted file mode 100644
index 81a915778..000000000
--- a/embassy/src/executor/executor.rs
+++ /dev/null
@@ -1,301 +0,0 @@
1use core::cell::Cell;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::marker::PhantomData;
5use core::mem;
6use core::mem::MaybeUninit;
7use core::pin::Pin;
8use core::ptr;
9use core::ptr::NonNull;
10use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering};
11use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
12
13//=============
14// UninitCell
15
16struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
17impl<T> UninitCell<T> {
18 const fn uninit() -> Self {
19 Self(MaybeUninit::uninit())
20 }
21
22 unsafe fn as_mut_ptr(&self) -> *mut T {
23 (*self.0.as_ptr()).get()
24 }
25
26 unsafe fn as_mut(&self) -> &mut T {
27 &mut *self.as_mut_ptr()
28 }
29
30 unsafe fn write(&self, val: T) {
31 ptr::write(self.as_mut_ptr(), val)
32 }
33
34 unsafe fn drop_in_place(&self) {
35 ptr::drop_in_place(self.as_mut_ptr())
36 }
37}
38
39impl<T: Copy> UninitCell<T> {
40 unsafe fn read(&self) -> T {
41 ptr::read(self.as_mut_ptr())
42 }
43}
44
45//=============
46// Data structures
47
48const STATE_RUNNING: u32 = 1 << 0;
49const STATE_QUEUED: u32 = 1 << 1;
50
51struct Header {
52 state: AtomicU32,
53 next: AtomicPtr<Header>,
54 executor: Cell<*const Executor>,
55 poll_fn: UninitCell<unsafe fn(*mut Header)>, // Valid if STATE_RUNNING
56}
57
58// repr(C) is needed to guarantee that header is located at offset 0
59// This makes it safe to cast between Header and Task pointers.
60#[repr(C)]
61pub struct Task<F: Future + 'static> {
62 header: Header,
63 future: UninitCell<F>, // Valid if STATE_RUNNING
64}
65
66#[derive(Copy, Clone, Debug)]
67#[cfg_attr(feature = "defmt", derive(defmt::Format))]
68pub enum SpawnError {
69 Busy,
70}
71
72//=============
73// Atomic task queue using a very, very simple lock-free linked-list queue:
74//
75// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
76//
77// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
78// null. Then the batch is iterated following the next pointers until null is reached.
79//
80// Note that batches will be iterated in the opposite order as they were enqueued. This should
81// be OK for our use case. Hopefully it doesn't create executor fairness problems.
82
83struct Queue {
84 head: AtomicPtr<Header>,
85}
86
87impl Queue {
88 const fn new() -> Self {
89 Self {
90 head: AtomicPtr::new(ptr::null_mut()),
91 }
92 }
93
94 /// Enqueues an item. Returns true if the queue was empty.
95 unsafe fn enqueue(&self, item: *mut Header) -> bool {
96 let mut prev = self.head.load(Ordering::Acquire);
97 loop {
98 (*item).next.store(prev, Ordering::Relaxed);
99 match self
100 .head
101 .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire)
102 {
103 Ok(_) => break,
104 Err(next_prev) => prev = next_prev,
105 }
106 }
107
108 prev.is_null()
109 }
110
111 unsafe fn dequeue_all(&self, on_task: impl Fn(*mut Header)) {
112 let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
113
114 while !task.is_null() {
115 // If the task re-enqueues itself, the `next` pointer will get overwritten.
116 // Therefore, first read the next pointer, and only then process the task.
117 let next = (*task).next.load(Ordering::Relaxed);
118
119 on_task(task);
120
121 task = next
122 }
123 }
124}
125
126//=============
127// Waker
128
129static WAKER_VTABLE: RawWakerVTable =
130 RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
131
132unsafe fn waker_clone(p: *const ()) -> RawWaker {
133 RawWaker::new(p, &WAKER_VTABLE)
134}
135
136unsafe fn waker_wake(p: *const ()) {
137 let header = &*(p as *const Header);
138
139 let mut current = header.state.load(Ordering::Acquire);
140 loop {
141 // If already scheduled, or if not started,
142 if (current & STATE_QUEUED != 0) || (current & STATE_RUNNING == 0) {
143 return;
144 }
145
146 // Mark it as scheduled
147 let new = current | STATE_QUEUED;
148
149 match header
150 .state
151 .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
152 {
153 Ok(_) => break,
154 Err(next_current) => current = next_current,
155 }
156 }
157
158 // We have just marked the task as scheduled, so enqueue it.
159 let executor = &*header.executor.get();
160 executor.enqueue(p as *mut Header);
161}
162
163unsafe fn waker_drop(_: *const ()) {
164 // nop
165}
166
167//=============
168// Task
169
170impl<F: Future + 'static> Task<F> {
171 pub const fn new() -> Self {
172 Self {
173 header: Header {
174 state: AtomicU32::new(0),
175 next: AtomicPtr::new(ptr::null_mut()),
176 executor: Cell::new(ptr::null()),
177 poll_fn: UninitCell::uninit(),
178 },
179 future: UninitCell::uninit(),
180 }
181 }
182
183 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
184 for task in pool {
185 let state = STATE_RUNNING | STATE_QUEUED;
186 if task
187 .header
188 .state
189 .compare_and_swap(0, state, Ordering::AcqRel)
190 == 0
191 {
192 // Initialize the task
193 task.header.poll_fn.write(Self::poll);
194 task.future.write(future());
195
196 return SpawnToken {
197 header: Some(NonNull::new_unchecked(&task.header as *const Header as _)),
198 };
199 }
200 }
201
202 return SpawnToken { header: None };
203 }
204
205 unsafe fn poll(p: *mut Header) {
206 let this = &*(p as *const Task<F>);
207
208 let future = Pin::new_unchecked(this.future.as_mut());
209 let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE));
210 let mut cx = Context::from_waker(&waker);
211 match future.poll(&mut cx) {
212 Poll::Ready(_) => {
213 this.future.drop_in_place();
214 this.header
215 .state
216 .fetch_and(!STATE_RUNNING, Ordering::AcqRel);
217 }
218 Poll::Pending => {}
219 }
220 }
221}
222
223unsafe impl<F: Future + 'static> Sync for Task<F> {}
224
225//=============
226// Spawn token
227
228#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
229pub struct SpawnToken {
230 header: Option<NonNull<Header>>,
231}
232
233impl Drop for SpawnToken {
234 fn drop(&mut self) {
235 // TODO deallocate the task instead.
236 panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()")
237 }
238}
239
240//=============
241// Executor
242
243pub struct Executor {
244 queue: Queue,
245 signal_fn: fn(),
246 not_send: PhantomData<*mut ()>,
247}
248
249impl Executor {
250 pub const fn new(signal_fn: fn()) -> Self {
251 Self {
252 queue: Queue::new(),
253 signal_fn: signal_fn,
254 not_send: PhantomData,
255 }
256 }
257
258 unsafe fn enqueue(&self, item: *mut Header) {
259 if self.queue.enqueue(item) {
260 (self.signal_fn)()
261 }
262 }
263
264 /// Spawn a future on this executor.
265 pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
266 let header = token.header;
267 mem::forget(token);
268
269 match header {
270 Some(header) => unsafe {
271 let header = header.as_ref();
272 header.executor.set(self);
273 self.enqueue(header as *const _ as _);
274 Ok(())
275 },
276 None => Err(SpawnError::Busy),
277 }
278 }
279
280 /// Runs the executor until the queue is empty.
281 pub fn run(&self) {
282 unsafe {
283 self.queue.dequeue_all(|p| {
284 let header = &*p;
285
286 let state = header.state.fetch_and(!STATE_QUEUED, Ordering::AcqRel);
287 if state & STATE_RUNNING == 0 {
288 // If task is not running, ignore it. This can happen in the following scenario:
289 // - Task gets dequeued, poll starts
290 // - While task is being polled, it gets woken. It gets placed in the queue.
291 // - Task poll finishes, returning done=true
292 // - RUNNING bit is cleared, but the task is already in the queue.
293 return;
294 }
295
296 // Run the task
297 header.poll_fn.read()(p as _);
298 });
299 }
300 }
301}
diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs
index 1a68bdfde..435c97db8 100644
--- a/embassy/src/executor/mod.rs
+++ b/embassy/src/executor/mod.rs
@@ -1,9 +1,224 @@
1mod executor; 1pub use embassy_macros::task;
2mod timer_executor;
3 2
4// for time::Timer 3use core::cell::Cell;
5pub(crate) use timer_executor::current_timer_queue; 4use core::future::Future;
5use core::marker::PhantomData;
6use core::mem;
7use core::pin::Pin;
8use core::ptr;
9use core::ptr::NonNull;
10use core::sync::atomic::{AtomicU32, Ordering};
11use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
6 12
7pub use embassy_macros::task; 13mod run_queue;
8pub use executor::{Executor, SpawnError, SpawnToken, Task}; 14mod util;
9pub use timer_executor::TimerExecutor; 15
16use self::run_queue::{RunQueue, RunQueueItem};
17use self::util::UninitCell;
18
19/// Task is spawned and future hasn't finished running yet.
20const STATE_RUNNING: u32 = 1 << 0;
21/// Task is in the executor run queue
22const STATE_RUN_QUEUED: u32 = 1 << 1;
23/// Task is in the executor timer queue
24const STATE_TIMER_QUEUED: u32 = 1 << 2;
25
26pub(crate) struct TaskHeader {
27 state: AtomicU32,
28 run_queue_item: RunQueueItem,
29 executor: Cell<*const Executor>, // Valid if state != 0
30 poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_RUNNING
31}
32
33// repr(C) is needed to guarantee that header is located at offset 0
34// This makes it safe to cast between Header and Task pointers.
35#[repr(C)]
36pub struct Task<F: Future + 'static> {
37 header: TaskHeader,
38 future: UninitCell<F>, // Valid if STATE_RUNNING
39}
40
41#[derive(Copy, Clone, Debug)]
42#[cfg_attr(feature = "defmt", derive(defmt::Format))]
43pub enum SpawnError {
44 Busy,
45}
46
47//=============
48// Waker
49
50static WAKER_VTABLE: RawWakerVTable =
51 RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
52
53unsafe fn waker_clone(p: *const ()) -> RawWaker {
54 RawWaker::new(p, &WAKER_VTABLE)
55}
56
57unsafe fn waker_wake(p: *const ()) {
58 let header = &*(p as *const TaskHeader);
59
60 let mut current = header.state.load(Ordering::Acquire);
61 loop {
62 // If already scheduled, or if not started,
63 if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) {
64 return;
65 }
66
67 // Mark it as scheduled
68 let new = current | STATE_RUN_QUEUED;
69
70 match header
71 .state
72 .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
73 {
74 Ok(_) => break,
75 Err(next_current) => current = next_current,
76 }
77 }
78
79 // We have just marked the task as scheduled, so enqueue it.
80 let executor = &*header.executor.get();
81 executor.enqueue(p as *mut TaskHeader);
82}
83
84unsafe fn waker_drop(_: *const ()) {
85 // nop
86}
87
88//=============
89// Task
90
91impl<F: Future + 'static> Task<F> {
92 pub const fn new() -> Self {
93 Self {
94 header: TaskHeader {
95 state: AtomicU32::new(0),
96 run_queue_item: RunQueueItem::new(),
97 executor: Cell::new(ptr::null()),
98 poll_fn: UninitCell::uninit(),
99 },
100 future: UninitCell::uninit(),
101 }
102 }
103
104 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
105 for task in pool {
106 let state = STATE_RUNNING | STATE_RUN_QUEUED;
107 if task
108 .header
109 .state
110 .compare_and_swap(0, state, Ordering::AcqRel)
111 == 0
112 {
113 // Initialize the task
114 task.header.poll_fn.write(Self::poll);
115 task.future.write(future());
116
117 return SpawnToken {
118 header: Some(NonNull::new_unchecked(
119 &task.header as *const TaskHeader as _,
120 )),
121 };
122 }
123 }
124
125 return SpawnToken { header: None };
126 }
127
128 unsafe fn poll(p: *mut TaskHeader) {
129 let this = &*(p as *const Task<F>);
130
131 let future = Pin::new_unchecked(this.future.as_mut());
132 let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE));
133 let mut cx = Context::from_waker(&waker);
134 match future.poll(&mut cx) {
135 Poll::Ready(_) => {
136 this.future.drop_in_place();
137 this.header
138 .state
139 .fetch_and(!STATE_RUNNING, Ordering::AcqRel);
140 }
141 Poll::Pending => {}
142 }
143 }
144}
145
146unsafe impl<F: Future + 'static> Sync for Task<F> {}
147
148//=============
149// Spawn token
150
151#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
152pub struct SpawnToken {
153 header: Option<NonNull<TaskHeader>>,
154}
155
156impl Drop for SpawnToken {
157 fn drop(&mut self) {
158 // TODO deallocate the task instead.
159 panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()")
160 }
161}
162
163//=============
164// Executor
165
166pub struct Executor {
167 run_queue: RunQueue,
168 signal_fn: fn(),
169 not_send: PhantomData<*mut ()>,
170}
171
172impl Executor {
173 pub const fn new(signal_fn: fn()) -> Self {
174 Self {
175 run_queue: RunQueue::new(),
176 signal_fn: signal_fn,
177 not_send: PhantomData,
178 }
179 }
180
181 unsafe fn enqueue(&self, item: *mut TaskHeader) {
182 if self.run_queue.enqueue(item) {
183 (self.signal_fn)()
184 }
185 }
186
187 /// Spawn a future on this executor.
188 pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
189 let header = token.header;
190 mem::forget(token);
191
192 match header {
193 Some(header) => unsafe {
194 let header = header.as_ref();
195 header.executor.set(self);
196 self.enqueue(header as *const _ as _);
197 Ok(())
198 },
199 None => Err(SpawnError::Busy),
200 }
201 }
202
203 /// Runs the executor until the queue is empty.
204 pub fn run(&self) {
205 unsafe {
206 self.run_queue.dequeue_all(|p| {
207 let header = &*p;
208
209 let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
210 if state & STATE_RUNNING == 0 {
211 // If task is not running, ignore it. This can happen in the following scenario:
212 // - Task gets dequeued, poll starts
213 // - While task is being polled, it gets woken. It gets placed in the queue.
214 // - Task poll finishes, returning done=true
215 // - RUNNING bit is cleared, but the task is already in the queue.
216 return;
217 }
218
219 // Run the task
220 header.poll_fn.read()(p as _);
221 });
222 }
223 }
224}
diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs
new file mode 100644
index 000000000..1cdecee33
--- /dev/null
+++ b/embassy/src/executor/run_queue.rs
@@ -0,0 +1,70 @@
1use core::ptr;
2use core::sync::atomic::{AtomicPtr, Ordering};
3
4use super::TaskHeader;
5
6pub(crate) struct RunQueueItem {
7 next: AtomicPtr<TaskHeader>,
8}
9
10impl RunQueueItem {
11 pub const fn new() -> Self {
12 Self {
13 next: AtomicPtr::new(ptr::null_mut()),
14 }
15 }
16}
17
18/// Atomic task queue using a very, very simple lock-free linked-list queue:
19///
20/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
21///
22/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
23/// null. Then the batch is iterated following the next pointers until null is reached.
24///
25/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
26/// for our purposes: it can't crate fairness problems since the next batch won't run until the
27/// current batch is completely processed, so even if a task enqueues itself instantly (for example
28/// by waking its own waker) can't prevent other tasks from running.
29pub(crate) struct RunQueue {
30 head: AtomicPtr<TaskHeader>,
31}
32
33impl RunQueue {
34 pub const fn new() -> Self {
35 Self {
36 head: AtomicPtr::new(ptr::null_mut()),
37 }
38 }
39
40 /// Enqueues an item. Returns true if the queue was empty.
41 pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool {
42 let mut prev = self.head.load(Ordering::Acquire);
43 loop {
44 (*item).run_queue_item.next.store(prev, Ordering::Relaxed);
45 match self
46 .head
47 .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire)
48 {
49 Ok(_) => break,
50 Err(next_prev) => prev = next_prev,
51 }
52 }
53
54 prev.is_null()
55 }
56
57 pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) {
58 let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
59
60 while !task.is_null() {
61 // If the task re-enqueues itself, the `next` pointer will get overwritten.
62 // Therefore, first read the next pointer, and only then process the task.
63 let next = (*task).run_queue_item.next.load(Ordering::Relaxed);
64
65 on_task(task);
66
67 task = next
68 }
69 }
70}
diff --git a/embassy/src/executor/timer_executor.rs b/embassy/src/executor/timer_executor.rs
deleted file mode 100644
index 1f89490f2..000000000
--- a/embassy/src/executor/timer_executor.rs
+++ /dev/null
@@ -1,77 +0,0 @@
1use super::executor::{Executor, SpawnError, SpawnToken};
2use core::ptr;
3use core::sync::atomic::{AtomicPtr, Ordering};
4use futures_intrusive::timer as fi;
5
6use crate::time::Alarm;
7
8pub(crate) struct IntrusiveClock;
9
10impl fi::Clock for IntrusiveClock {
11 fn now(&self) -> u64 {
12 crate::time::now()
13 }
14}
15
16pub(crate) type TimerQueue = fi::LocalTimerService;
17
18pub struct TimerExecutor<A: Alarm> {
19 inner: Executor,
20 alarm: A,
21 timer_queue: TimerQueue,
22}
23
24impl<A: Alarm> TimerExecutor<A> {
25 pub fn new(alarm: A, signal_fn: fn()) -> Self {
26 alarm.set_callback(signal_fn);
27 Self {
28 inner: Executor::new(signal_fn),
29 alarm,
30 timer_queue: TimerQueue::new(&IntrusiveClock),
31 }
32 }
33
34 /// Spawn a future on this executor.
35 ///
36 /// safety: can only be called from the executor thread
37 pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
38 self.inner.spawn(token)
39 }
40
41 /// Runs the executor until the queue is empty.
42 ///
43 /// safety: can only be called from the executor thread
44 pub fn run(&'static self) {
45 with_timer_queue(&self.timer_queue, || {
46 self.timer_queue.check_expirations();
47 self.inner.run();
48
49 match self.timer_queue.next_expiration() {
50 // If this is in the past, set_alarm will immediately trigger the alarm,
51 // which will make the wfe immediately return so we do another loop iteration.
52 Some(at) => self.alarm.set(at),
53 None => self.alarm.clear(),
54 }
55 })
56 }
57}
58
59static CURRENT_TIMER_QUEUE: AtomicPtr<TimerQueue> = AtomicPtr::new(ptr::null_mut());
60
61fn with_timer_queue<R>(svc: &'static TimerQueue, f: impl FnOnce() -> R) -> R {
62 let svc = svc as *const _ as *mut _;
63 let prev_svc = CURRENT_TIMER_QUEUE.swap(svc, Ordering::Relaxed);
64 let r = f();
65 let svc2 = CURRENT_TIMER_QUEUE.swap(prev_svc, Ordering::Relaxed);
66 assert_eq!(svc, svc2);
67 r
68}
69
70pub(crate) fn current_timer_queue() -> &'static TimerQueue {
71 unsafe {
72 CURRENT_TIMER_QUEUE
73 .load(Ordering::Relaxed)
74 .as_ref()
75 .unwrap()
76 }
77}
diff --git a/embassy/src/executor/util.rs b/embassy/src/executor/util.rs
new file mode 100644
index 000000000..ca15b6955
--- /dev/null
+++ b/embassy/src/executor/util.rs
@@ -0,0 +1,32 @@
1use core::cell::UnsafeCell;
2use core::mem::MaybeUninit;
3use core::ptr;
4
5pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
6impl<T> UninitCell<T> {
7 pub const fn uninit() -> Self {
8 Self(MaybeUninit::uninit())
9 }
10
11 pub unsafe fn as_mut_ptr(&self) -> *mut T {
12 (*self.0.as_ptr()).get()
13 }
14
15 pub unsafe fn as_mut(&self) -> &mut T {
16 &mut *self.as_mut_ptr()
17 }
18
19 pub unsafe fn write(&self, val: T) {
20 ptr::write(self.as_mut_ptr(), val)
21 }
22
23 pub unsafe fn drop_in_place(&self) {
24 ptr::drop_in_place(self.as_mut_ptr())
25 }
26}
27
28impl<T: Copy> UninitCell<T> {
29 pub unsafe fn read(&self) -> T {
30 ptr::read(self.as_mut_ptr())
31 }
32}
diff --git a/embassy/src/time/mod.rs b/embassy/src/time/mod.rs
index 2b4631557..896838371 100644
--- a/embassy/src/time/mod.rs
+++ b/embassy/src/time/mod.rs
@@ -1,11 +1,9 @@
1mod duration; 1mod duration;
2mod instant; 2mod instant;
3mod timer;
4mod traits; 3mod traits;
5 4
6pub use duration::Duration; 5pub use duration::Duration;
7pub use instant::Instant; 6pub use instant::Instant;
8pub use timer::{Ticker, Timer};
9pub use traits::*; 7pub use traits::*;
10 8
11use crate::fmt::*; 9use crate::fmt::*;
diff --git a/embassy/src/time/timer.rs b/embassy/src/time/timer.rs
deleted file mode 100644
index 8756368c7..000000000
--- a/embassy/src/time/timer.rs
+++ /dev/null
@@ -1,63 +0,0 @@
1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use futures::Stream;
5use futures_intrusive::timer::{LocalTimer, LocalTimerFuture};
6
7use super::{Duration, Instant};
8use crate::executor::current_timer_queue;
9
10pub struct Timer {
11 inner: LocalTimerFuture<'static>,
12}
13
14impl Timer {
15 pub fn at(when: Instant) -> Self {
16 Self {
17 inner: current_timer_queue().deadline(when.as_ticks()),
18 }
19 }
20
21 pub fn after(dur: Duration) -> Self {
22 Self::at(Instant::now() + dur)
23 }
24}
25
26impl Future for Timer {
27 type Output = ();
28 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
29 unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx)
30 }
31}
32
33pub struct Ticker {
34 inner: LocalTimerFuture<'static>,
35 next: Instant,
36 dur: Duration,
37}
38
39impl Ticker {
40 pub fn every(dur: Duration) -> Self {
41 let next = Instant::now() + dur;
42 Self {
43 inner: current_timer_queue().deadline(next.as_ticks()),
44 next,
45 dur,
46 }
47 }
48}
49
50impl Stream for Ticker {
51 type Item = ();
52 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 let this = unsafe { self.get_unchecked_mut() };
54 match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
55 Poll::Ready(_) => {
56 this.next += this.dur;
57 this.inner = current_timer_queue().deadline(this.next.as_ticks());
58 Poll::Ready(Some(()))
59 }
60 Poll::Pending => Poll::Pending,
61 }
62 }
63}