aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src
diff options
context:
space:
mode:
authorMathias <[email protected]>2023-02-13 14:55:15 +0100
committerMathias <[email protected]>2023-02-13 14:55:15 +0100
commit218b44652c149f895919b606a660b6eff30e8177 (patch)
tree5f985f6edd12926a6f374c17a3a0c3a4226088e7 /embassy-executor/src
parent86113e199f37fe0888979608a08bfdaf21bff19a (diff)
parent41a563aae3e474955892b27487e185f5f486f525 (diff)
Rebase on master
Diffstat (limited to 'embassy-executor/src')
-rw-r--r--embassy-executor/src/arch/riscv32.rs3
-rw-r--r--embassy-executor/src/arch/xtensa.rs3
-rw-r--r--embassy-executor/src/lib.rs10
-rw-r--r--embassy-executor/src/raw/mod.rs245
-rw-r--r--embassy-executor/src/raw/run_queue.rs13
-rw-r--r--embassy-executor/src/raw/timer_queue.rs35
-rw-r--r--embassy-executor/src/raw/util.rs6
-rw-r--r--embassy-executor/src/raw/waker.rs13
-rw-r--r--embassy-executor/src/spawner.rs17
9 files changed, 192 insertions, 153 deletions
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs
index 2a4b006da..e97a56cda 100644
--- a/embassy-executor/src/arch/riscv32.rs
+++ b/embassy-executor/src/arch/riscv32.rs
@@ -1,7 +1,6 @@
1use core::marker::PhantomData; 1use core::marker::PhantomData;
2use core::ptr; 2use core::ptr;
3 3use core::sync::atomic::{AtomicBool, Ordering};
4use atomic_polyfill::{AtomicBool, Ordering};
5 4
6use super::{raw, Spawner}; 5use super::{raw, Spawner};
7 6
diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs
index f908aaa70..4ee0d9f78 100644
--- a/embassy-executor/src/arch/xtensa.rs
+++ b/embassy-executor/src/arch/xtensa.rs
@@ -1,7 +1,6 @@
1use core::marker::PhantomData; 1use core::marker::PhantomData;
2use core::ptr; 2use core::ptr;
3 3use core::sync::atomic::{AtomicBool, Ordering};
4use atomic_polyfill::{AtomicBool, Ordering};
5 4
6use super::{raw, Spawner}; 5use super::{raw, Spawner};
7 6
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs
index e4cbd04b9..4c7e2f4cd 100644
--- a/embassy-executor/src/lib.rs
+++ b/embassy-executor/src/lib.rs
@@ -8,18 +8,22 @@
8pub(crate) mod fmt; 8pub(crate) mod fmt;
9 9
10#[cfg(feature = "nightly")] 10#[cfg(feature = "nightly")]
11pub use embassy_macros::{main, task}; 11pub use embassy_macros::task;
12 12
13cfg_if::cfg_if! { 13cfg_if::cfg_if! {
14 if #[cfg(cortex_m)] { 14 if #[cfg(cortex_m)] {
15 #[path="arch/cortex_m.rs"] 15 #[path="arch/cortex_m.rs"]
16 mod arch; 16 mod arch;
17 pub use arch::*; 17 pub use arch::*;
18 #[cfg(feature = "nightly")]
19 pub use embassy_macros::main_cortex_m as main;
18 } 20 }
19 else if #[cfg(target_arch="riscv32")] { 21 else if #[cfg(target_arch="riscv32")] {
20 #[path="arch/riscv32.rs"] 22 #[path="arch/riscv32.rs"]
21 mod arch; 23 mod arch;
22 pub use arch::*; 24 pub use arch::*;
25 #[cfg(feature = "nightly")]
26 pub use embassy_macros::main_riscv as main;
23 } 27 }
24 else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { 28 else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] {
25 #[path="arch/xtensa.rs"] 29 #[path="arch/xtensa.rs"]
@@ -30,11 +34,15 @@ cfg_if::cfg_if! {
30 #[path="arch/wasm.rs"] 34 #[path="arch/wasm.rs"]
31 mod arch; 35 mod arch;
32 pub use arch::*; 36 pub use arch::*;
37 #[cfg(feature = "nightly")]
38 pub use embassy_macros::main_wasm as main;
33 } 39 }
34 else if #[cfg(feature="std")] { 40 else if #[cfg(feature="std")] {
35 #[path="arch/std.rs"] 41 #[path="arch/std.rs"]
36 mod arch; 42 mod arch;
37 pub use arch::*; 43 pub use arch::*;
44 #[cfg(feature = "nightly")]
45 pub use embassy_macros::main_std as main;
38 } 46 }
39} 47}
40 48
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index e1258ebb5..42bd82262 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -15,10 +15,10 @@ mod waker;
15 15
16use core::cell::Cell; 16use core::cell::Cell;
17use core::future::Future; 17use core::future::Future;
18use core::mem;
18use core::pin::Pin; 19use core::pin::Pin;
19use core::ptr::NonNull; 20use core::ptr::NonNull;
20use core::task::{Context, Poll}; 21use core::task::{Context, Poll};
21use core::{mem, ptr};
22 22
23use atomic_polyfill::{AtomicU32, Ordering}; 23use atomic_polyfill::{AtomicU32, Ordering};
24use critical_section::CriticalSection; 24use critical_section::CriticalSection;
@@ -43,14 +43,11 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; 43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
44 44
45/// Raw task header for use in task pointers. 45/// Raw task header for use in task pointers.
46/// 46pub(crate) struct TaskHeader {
47/// This is an opaque struct, used for raw pointers to tasks, for use
48/// with funtions like [`wake_task`] and [`task_from_waker`].
49pub struct TaskHeader {
50 pub(crate) state: AtomicU32, 47 pub(crate) state: AtomicU32,
51 pub(crate) run_queue_item: RunQueueItem, 48 pub(crate) run_queue_item: RunQueueItem,
52 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 49 pub(crate) executor: Cell<Option<&'static Executor>>,
53 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED 50 poll_fn: Cell<Option<unsafe fn(TaskRef)>>,
54 51
55 #[cfg(feature = "integrated-timers")] 52 #[cfg(feature = "integrated-timers")]
56 pub(crate) expires_at: Cell<Instant>, 53 pub(crate) expires_at: Cell<Instant>,
@@ -58,20 +55,34 @@ pub struct TaskHeader {
58 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
59} 56}
60 57
61impl TaskHeader { 58/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
62 pub(crate) const fn new() -> Self { 59#[derive(Clone, Copy)]
60pub struct TaskRef {
61 ptr: NonNull<TaskHeader>,
62}
63
64impl TaskRef {
65 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
63 Self { 66 Self {
64 state: AtomicU32::new(0), 67 ptr: NonNull::from(task).cast(),
65 run_queue_item: RunQueueItem::new(), 68 }
66 executor: Cell::new(ptr::null()), 69 }
67 poll_fn: UninitCell::uninit(),
68 70
69 #[cfg(feature = "integrated-timers")] 71 /// Safety: The pointer must have been obtained with `Task::as_ptr`
70 expires_at: Cell::new(Instant::from_ticks(0)), 72 pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
71 #[cfg(feature = "integrated-timers")] 73 Self {
72 timer_queue_item: timer_queue::TimerQueueItem::new(), 74 ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
73 } 75 }
74 } 76 }
77
78 pub(crate) fn header(self) -> &'static TaskHeader {
79 unsafe { self.ptr.as_ref() }
80 }
81
82 /// The returned pointer is valid for the entire TaskStorage.
83 pub(crate) fn as_ptr(self) -> *const TaskHeader {
84 self.ptr.as_ptr()
85 }
75} 86}
76 87
77/// Raw storage in which a task can be spawned. 88/// Raw storage in which a task can be spawned.
@@ -101,7 +112,18 @@ impl<F: Future + 'static> TaskStorage<F> {
101 /// Create a new TaskStorage, in not-spawned state. 112 /// Create a new TaskStorage, in not-spawned state.
102 pub const fn new() -> Self { 113 pub const fn new() -> Self {
103 Self { 114 Self {
104 raw: TaskHeader::new(), 115 raw: TaskHeader {
116 state: AtomicU32::new(0),
117 run_queue_item: RunQueueItem::new(),
118 executor: Cell::new(None),
119 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
120 poll_fn: Cell::new(None),
121
122 #[cfg(feature = "integrated-timers")]
123 expires_at: Cell::new(Instant::from_ticks(0)),
124 #[cfg(feature = "integrated-timers")]
125 timer_queue_item: timer_queue::TimerQueueItem::new(),
126 },
105 future: UninitCell::uninit(), 127 future: UninitCell::uninit(),
106 } 128 }
107 } 129 }
@@ -120,29 +142,17 @@ impl<F: Future + 'static> TaskStorage<F> {
120 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 142 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
121 /// on a different executor. 143 /// on a different executor.
122 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 144 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
123 if self.spawn_mark_used() { 145 let task = AvailableTask::claim(self);
124 return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; 146 match task {
147 Some(task) => {
148 let task = task.initialize(future);
149 unsafe { SpawnToken::<F>::new(task) }
150 }
151 None => SpawnToken::new_failed(),
125 } 152 }
126
127 SpawnToken::<F>::new_failed()
128 }
129
130 fn spawn_mark_used(&'static self) -> bool {
131 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
132 self.raw
133 .state
134 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
135 .is_ok()
136 }
137
138 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
139 // Initialize the task
140 self.raw.poll_fn.write(Self::poll);
141 self.future.write(future());
142 NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
143 } 153 }
144 154
145 unsafe fn poll(p: NonNull<TaskHeader>) { 155 unsafe fn poll(p: TaskRef) {
146 let this = &*(p.as_ptr() as *const TaskStorage<F>); 156 let this = &*(p.as_ptr() as *const TaskStorage<F>);
147 157
148 let future = Pin::new_unchecked(this.future.as_mut()); 158 let future = Pin::new_unchecked(this.future.as_mut());
@@ -164,6 +174,28 @@ impl<F: Future + 'static> TaskStorage<F> {
164 174
165unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 175unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
166 176
177struct AvailableTask<F: Future + 'static> {
178 task: &'static TaskStorage<F>,
179}
180
181impl<F: Future + 'static> AvailableTask<F> {
182 fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
183 task.raw
184 .state
185 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
186 .ok()
187 .map(|_| Self { task })
188 }
189
190 fn initialize(self, future: impl FnOnce() -> F) -> TaskRef {
191 unsafe {
192 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
193 self.task.future.write(future());
194 }
195 TaskRef::new(self.task)
196 }
197}
198
167/// Raw storage that can hold up to N tasks of the same type. 199/// Raw storage that can hold up to N tasks of the same type.
168/// 200///
169/// This is essentially a `[TaskStorage<F>; N]`. 201/// This is essentially a `[TaskStorage<F>; N]`.
@@ -187,13 +219,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
187 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 219 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
188 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 220 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
189 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 221 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
190 for task in &self.pool { 222 let task = self.pool.iter().find_map(AvailableTask::claim);
191 if task.spawn_mark_used() { 223 match task {
192 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; 224 Some(task) => {
225 let task = task.initialize(future);
226 unsafe { SpawnToken::<F>::new(task) }
193 } 227 }
228 None => SpawnToken::new_failed(),
194 } 229 }
195
196 SpawnToken::<F>::new_failed()
197 } 230 }
198 231
199 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if 232 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
@@ -235,13 +268,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
235 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly 268 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
236 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. 269 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
237 270
238 for task in &self.pool { 271 let task = self.pool.iter().find_map(AvailableTask::claim);
239 if task.spawn_mark_used() { 272 match task {
240 return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); 273 Some(task) => {
274 let task = task.initialize(future);
275 unsafe { SpawnToken::<FutFn>::new(task) }
241 } 276 }
277 None => SpawnToken::new_failed(),
242 } 278 }
243
244 SpawnToken::<FutFn>::new_failed()
245 } 279 }
246} 280}
247 281
@@ -307,7 +341,7 @@ impl Executor {
307 /// - `task` must be set up to run in this executor. 341 /// - `task` must be set up to run in this executor.
308 /// - `task` must NOT be already enqueued (in this executor or another one). 342 /// - `task` must NOT be already enqueued (in this executor or another one).
309 #[inline(always)] 343 #[inline(always)]
310 unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) { 344 unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) {
311 #[cfg(feature = "rtos-trace")] 345 #[cfg(feature = "rtos-trace")]
312 trace::task_ready_begin(task.as_ptr() as u32); 346 trace::task_ready_begin(task.as_ptr() as u32);
313 347
@@ -325,8 +359,8 @@ impl Executor {
325 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. 359 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
326 /// In this case, the task's Future must be Send. This is because this is effectively 360 /// In this case, the task's Future must be Send. This is because this is effectively
327 /// sending the task to the executor thread. 361 /// sending the task to the executor thread.
328 pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { 362 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
329 task.as_ref().executor.set(self); 363 task.header().executor.set(Some(self));
330 364
331 #[cfg(feature = "rtos-trace")] 365 #[cfg(feature = "rtos-trace")]
332 trace::task_new(task.as_ptr() as u32); 366 trace::task_new(task.as_ptr() as u32);
@@ -354,46 +388,54 @@ impl Executor {
354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 388 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
355 /// no `poll()` already running. 389 /// no `poll()` already running.
356 pub unsafe fn poll(&'static self) { 390 pub unsafe fn poll(&'static self) {
357 #[cfg(feature = "integrated-timers")] 391 loop {
358 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 392 #[cfg(feature = "integrated-timers")]
393 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
359 394
360 self.run_queue.dequeue_all(|p| { 395 self.run_queue.dequeue_all(|p| {
361 let task = p.as_ref(); 396 let task = p.header();
362 397
363 #[cfg(feature = "integrated-timers")] 398 #[cfg(feature = "integrated-timers")]
364 task.expires_at.set(Instant::MAX); 399 task.expires_at.set(Instant::MAX);
365 400
366 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 401 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
367 if state & STATE_SPAWNED == 0 { 402 if state & STATE_SPAWNED == 0 {
368 // If task is not running, ignore it. This can happen in the following scenario: 403 // If task is not running, ignore it. This can happen in the following scenario:
369 // - Task gets dequeued, poll starts 404 // - Task gets dequeued, poll starts
370 // - While task is being polled, it gets woken. It gets placed in the queue. 405 // - While task is being polled, it gets woken. It gets placed in the queue.
371 // - Task poll finishes, returning done=true 406 // - Task poll finishes, returning done=true
372 // - RUNNING bit is cleared, but the task is already in the queue. 407 // - RUNNING bit is cleared, but the task is already in the queue.
373 return; 408 return;
374 } 409 }
410
411 #[cfg(feature = "rtos-trace")]
412 trace::task_exec_begin(p.as_ptr() as u32);
375 413
376 #[cfg(feature = "rtos-trace")] 414 // Run the task
377 trace::task_exec_begin(p.as_ptr() as u32); 415 task.poll_fn.get().unwrap_unchecked()(p);
378 416
379 // Run the task 417 #[cfg(feature = "rtos-trace")]
380 task.poll_fn.read()(p as _); 418 trace::task_exec_end();
381 419
382 #[cfg(feature = "rtos-trace")] 420 // Enqueue or update into timer_queue
383 trace::task_exec_end(); 421 #[cfg(feature = "integrated-timers")]
422 self.timer_queue.update(p);
423 });
384 424
385 // Enqueue or update into timer_queue
386 #[cfg(feature = "integrated-timers")] 425 #[cfg(feature = "integrated-timers")]
387 self.timer_queue.update(p); 426 {
388 }); 427 // If this is already in the past, set_alarm might return false
428 // In that case do another poll loop iteration.
429 let next_expiration = self.timer_queue.next_expiration();
430 if driver::set_alarm(self.alarm, next_expiration.as_ticks()) {
431 break;
432 }
433 }
389 434
390 #[cfg(feature = "integrated-timers")] 435 #[cfg(not(feature = "integrated-timers"))]
391 { 436 {
392 // If this is already in the past, set_alarm will immediately trigger the alarm. 437 break;
393 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, 438 }
394 // so we immediately do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 driver::set_alarm(self.alarm, next_expiration.as_ticks());
397 } 439 }
398 440
399 #[cfg(feature = "rtos-trace")] 441 #[cfg(feature = "rtos-trace")]
@@ -409,16 +451,12 @@ impl Executor {
409 } 451 }
410} 452}
411 453
412/// Wake a task by raw pointer. 454/// Wake a task by `TaskRef`.
413/// 455///
414/// You can obtain task pointers from `Waker`s using [`task_from_waker`]. 456/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
415/// 457pub fn wake_task(task: TaskRef) {
416/// # Safety
417///
418/// `task` must be a valid task pointer obtained from [`task_from_waker`].
419pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
420 critical_section::with(|cs| { 458 critical_section::with(|cs| {
421 let header = task.as_ref(); 459 let header = task.header();
422 let state = header.state.load(Ordering::Relaxed); 460 let state = header.state.load(Ordering::Relaxed);
423 461
424 // If already scheduled, or if not started, 462 // If already scheduled, or if not started,
@@ -430,20 +468,29 @@ pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
430 header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); 468 header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
431 469
432 // We have just marked the task as scheduled, so enqueue it. 470 // We have just marked the task as scheduled, so enqueue it.
433 let executor = &*header.executor.get(); 471 unsafe {
434 executor.enqueue(cs, task); 472 let executor = header.executor.get().unwrap_unchecked();
473 executor.enqueue(cs, task);
474 }
435 }) 475 })
436} 476}
437 477
438#[cfg(feature = "integrated-timers")] 478#[cfg(feature = "integrated-timers")]
439#[no_mangle] 479struct TimerQueue;
440unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { 480
441 let task = waker::task_from_waker(waker); 481#[cfg(feature = "integrated-timers")]
442 let task = task.as_ref(); 482impl embassy_time::queue::TimerQueue for TimerQueue {
443 let expires_at = task.expires_at.get(); 483 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
444 task.expires_at.set(expires_at.min(at)); 484 let task = waker::task_from_waker(waker);
485 let task = task.header();
486 let expires_at = task.expires_at.get();
487 task.expires_at.set(expires_at.min(at));
488 }
445} 489}
446 490
491#[cfg(feature = "integrated-timers")]
492embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
493
447#[cfg(feature = "rtos-trace")] 494#[cfg(feature = "rtos-trace")]
448impl rtos_trace::RtosTraceOSCallbacks for Executor { 495impl rtos_trace::RtosTraceOSCallbacks for Executor {
449 fn task_list() { 496 fn task_list() {
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
index ed8c82a5c..362157535 100644
--- a/embassy-executor/src/raw/run_queue.rs
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -4,7 +4,7 @@ use core::ptr::NonNull;
4use atomic_polyfill::{AtomicPtr, Ordering}; 4use atomic_polyfill::{AtomicPtr, Ordering};
5use critical_section::CriticalSection; 5use critical_section::CriticalSection;
6 6
7use super::TaskHeader; 7use super::{TaskHeader, TaskRef};
8 8
9pub(crate) struct RunQueueItem { 9pub(crate) struct RunQueueItem {
10 next: AtomicPtr<TaskHeader>, 10 next: AtomicPtr<TaskHeader>,
@@ -46,25 +46,26 @@ impl RunQueue {
46 /// 46 ///
47 /// `item` must NOT be already enqueued in any queue. 47 /// `item` must NOT be already enqueued in any queue.
48 #[inline(always)] 48 #[inline(always)]
49 pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { 49 pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool {
50 let prev = self.head.load(Ordering::Relaxed); 50 let prev = self.head.load(Ordering::Relaxed);
51 task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); 51 task.header().run_queue_item.next.store(prev, Ordering::Relaxed);
52 self.head.store(task.as_ptr(), Ordering::Relaxed); 52 self.head.store(task.as_ptr() as _, Ordering::Relaxed);
53 prev.is_null() 53 prev.is_null()
54 } 54 }
55 55
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 56 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { 59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue. 60 // Atomically empty the queue.
61 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 61 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
62 62
63 // Iterate the linked list of tasks that were previously in the queue. 63 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = NonNull::new(ptr) { 64 while let Some(task) = NonNull::new(ptr) {
65 let task = unsafe { TaskRef::from_ptr(task.as_ptr()) };
65 // If the task re-enqueues itself, the `next` pointer will get overwritten. 66 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task. 67 // Therefore, first read the next pointer, and only then process the task.
67 let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); 68 let next = task.header().run_queue_item.next.load(Ordering::Relaxed);
68 69
69 on_task(task); 70 on_task(task);
70 71
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 24c31892a..57d6d3cda 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,45 +1,39 @@
1use core::cell::Cell; 1use core::cell::Cell;
2use core::cmp::min; 2use core::cmp::min;
3use core::ptr;
4use core::ptr::NonNull;
5 3
6use atomic_polyfill::Ordering; 4use atomic_polyfill::Ordering;
7use embassy_time::Instant; 5use embassy_time::Instant;
8 6
9use super::{TaskHeader, STATE_TIMER_QUEUED}; 7use super::{TaskRef, STATE_TIMER_QUEUED};
10 8
11pub(crate) struct TimerQueueItem { 9pub(crate) struct TimerQueueItem {
12 next: Cell<*mut TaskHeader>, 10 next: Cell<Option<TaskRef>>,
13} 11}
14 12
15impl TimerQueueItem { 13impl TimerQueueItem {
16 pub const fn new() -> Self { 14 pub const fn new() -> Self {
17 Self { 15 Self { next: Cell::new(None) }
18 next: Cell::new(ptr::null_mut()),
19 }
20 } 16 }
21} 17}
22 18
23pub(crate) struct TimerQueue { 19pub(crate) struct TimerQueue {
24 head: Cell<*mut TaskHeader>, 20 head: Cell<Option<TaskRef>>,
25} 21}
26 22
27impl TimerQueue { 23impl TimerQueue {
28 pub const fn new() -> Self { 24 pub const fn new() -> Self {
29 Self { 25 Self { head: Cell::new(None) }
30 head: Cell::new(ptr::null_mut()),
31 }
32 } 26 }
33 27
34 pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) { 28 pub(crate) unsafe fn update(&self, p: TaskRef) {
35 let task = p.as_ref(); 29 let task = p.header();
36 if task.expires_at.get() != Instant::MAX { 30 if task.expires_at.get() != Instant::MAX {
37 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 31 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
38 let is_new = old_state & STATE_TIMER_QUEUED == 0; 32 let is_new = old_state & STATE_TIMER_QUEUED == 0;
39 33
40 if is_new { 34 if is_new {
41 task.timer_queue_item.next.set(self.head.get()); 35 task.timer_queue_item.next.set(self.head.get());
42 self.head.set(p.as_ptr()); 36 self.head.set(Some(p));
43 } 37 }
44 } 38 }
45 } 39 }
@@ -47,7 +41,7 @@ impl TimerQueue {
47 pub(crate) unsafe fn next_expiration(&self) -> Instant { 41 pub(crate) unsafe fn next_expiration(&self) -> Instant {
48 let mut res = Instant::MAX; 42 let mut res = Instant::MAX;
49 self.retain(|p| { 43 self.retain(|p| {
50 let task = p.as_ref(); 44 let task = p.header();
51 let expires = task.expires_at.get(); 45 let expires = task.expires_at.get();
52 res = min(res, expires); 46 res = min(res, expires);
53 expires != Instant::MAX 47 expires != Instant::MAX
@@ -55,9 +49,9 @@ impl TimerQueue {
55 res 49 res
56 } 50 }
57 51
58 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) { 52 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(TaskRef)) {
59 self.retain(|p| { 53 self.retain(|p| {
60 let task = p.as_ref(); 54 let task = p.header();
61 if task.expires_at.get() <= now { 55 if task.expires_at.get() <= now {
62 on_task(p); 56 on_task(p);
63 false 57 false
@@ -67,11 +61,10 @@ impl TimerQueue {
67 }); 61 });
68 } 62 }
69 63
70 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) { 64 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
71 let mut prev = &self.head; 65 let mut prev = &self.head;
72 while !prev.get().is_null() { 66 while let Some(p) = prev.get() {
73 let p = NonNull::new_unchecked(prev.get()); 67 let task = p.header();
74 let task = &*p.as_ptr();
75 if f(p) { 68 if f(p) {
76 // Skip to next 69 // Skip to next
77 prev = &task.timer_queue_item.next; 70 prev = &task.timer_queue_item.next;
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index ed5822188..2b1f6b6f3 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -25,9 +25,3 @@ impl<T> UninitCell<T> {
25 ptr::drop_in_place(self.as_mut_ptr()) 25 ptr::drop_in_place(self.as_mut_ptr())
26 } 26 }
27} 27}
28
29impl<T: Copy> UninitCell<T> {
30 pub unsafe fn read(&self) -> T {
31 ptr::read(self.as_mut_ptr())
32 }
33}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 5765259f2..400b37fa9 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -1,8 +1,7 @@
1use core::mem; 1use core::mem;
2use core::ptr::NonNull;
3use core::task::{RawWaker, RawWakerVTable, Waker}; 2use core::task::{RawWaker, RawWakerVTable, Waker};
4 3
5use super::{wake_task, TaskHeader}; 4use super::{wake_task, TaskHeader, TaskRef};
6 5
7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 6const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
8 7
@@ -11,14 +10,14 @@ unsafe fn clone(p: *const ()) -> RawWaker {
11} 10}
12 11
13unsafe fn wake(p: *const ()) { 12unsafe fn wake(p: *const ()) {
14 wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) 13 wake_task(TaskRef::from_ptr(p as *const TaskHeader))
15} 14}
16 15
17unsafe fn drop(_: *const ()) { 16unsafe fn drop(_: *const ()) {
18 // nop 17 // nop
19} 18}
20 19
21pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { 20pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) 21 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
23} 22}
24 23
@@ -33,7 +32,7 @@ pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker {
33/// # Panics 32/// # Panics
34/// 33///
35/// Panics if the waker is not created by the Embassy executor. 34/// Panics if the waker is not created by the Embassy executor.
36pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { 35pub fn task_from_waker(waker: &Waker) -> TaskRef {
37 // safety: OK because WakerHack has the same layout as Waker. 36 // safety: OK because WakerHack has the same layout as Waker.
38 // This is not really guaranteed because the structs are `repr(Rust)`, it is 37 // This is not really guaranteed because the structs are `repr(Rust)`, it is
39 // indeed the case in the current implementation. 38 // indeed the case in the current implementation.
@@ -43,8 +42,8 @@ pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> {
43 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") 42 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
44 } 43 }
45 44
46 // safety: we never create a waker with a null data pointer. 45 // safety: our wakers are always created with `TaskRef::as_ptr`
47 unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } 46 unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) }
48} 47}
49 48
50struct WakerHack { 49struct WakerHack {
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs
index 400d973ff..7c0a0183c 100644
--- a/embassy-executor/src/spawner.rs
+++ b/embassy-executor/src/spawner.rs
@@ -1,7 +1,6 @@
1use core::future::poll_fn; 1use core::future::poll_fn;
2use core::marker::PhantomData; 2use core::marker::PhantomData;
3use core::mem; 3use core::mem;
4use core::ptr::NonNull;
5use core::task::Poll; 4use core::task::Poll;
6 5
7use super::raw; 6use super::raw;
@@ -22,12 +21,12 @@ use super::raw;
22/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. 21/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
23#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] 22#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
24pub struct SpawnToken<S> { 23pub struct SpawnToken<S> {
25 raw_task: Option<NonNull<raw::TaskHeader>>, 24 raw_task: Option<raw::TaskRef>,
26 phantom: PhantomData<*mut S>, 25 phantom: PhantomData<*mut S>,
27} 26}
28 27
29impl<S> SpawnToken<S> { 28impl<S> SpawnToken<S> {
30 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { 29 pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self {
31 Self { 30 Self {
32 raw_task: Some(raw_task), 31 raw_task: Some(raw_task),
33 phantom: PhantomData, 32 phantom: PhantomData,
@@ -90,10 +89,10 @@ impl Spawner {
90 /// 89 ///
91 /// Panics if the current executor is not an Embassy executor. 90 /// Panics if the current executor is not an Embassy executor.
92 pub async fn for_current_executor() -> Self { 91 pub async fn for_current_executor() -> Self {
93 poll_fn(|cx| unsafe { 92 poll_fn(|cx| {
94 let task = raw::task_from_waker(cx.waker()); 93 let task = raw::task_from_waker(cx.waker());
95 let executor = (*task.as_ptr()).executor.get(); 94 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
96 Poll::Ready(Self::new(&*executor)) 95 Poll::Ready(Self::new(executor))
97 }) 96 })
98 .await 97 .await
99 } 98 }
@@ -166,10 +165,10 @@ impl SendSpawner {
166 /// 165 ///
167 /// Panics if the current executor is not an Embassy executor. 166 /// Panics if the current executor is not an Embassy executor.
168 pub async fn for_current_executor() -> Self { 167 pub async fn for_current_executor() -> Self {
169 poll_fn(|cx| unsafe { 168 poll_fn(|cx| {
170 let task = raw::task_from_waker(cx.waker()); 169 let task = raw::task_from_waker(cx.waker());
171 let executor = (*task.as_ptr()).executor.get(); 170 let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
172 Poll::Ready(Self::new(&*executor)) 171 Poll::Ready(Self::new(executor))
173 }) 172 })
174 .await 173 .await
175 } 174 }