aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw
diff options
context:
space:
mode:
authorQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
committerQuentin Smith <[email protected]>2023-07-17 21:31:43 -0400
commit6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch)
tree748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-executor/src/raw
parentd896f80405aa8963877049ed999e4aba25d6e2bb (diff)
parent6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff)
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-executor/src/raw')
-rw-r--r--embassy-executor/src/raw/mod.rs483
-rw-r--r--embassy-executor/src/raw/run_queue.rs44
-rw-r--r--embassy-executor/src/raw/timer_queue.rs33
-rw-r--r--embassy-executor/src/raw/util.rs29
-rw-r--r--embassy-executor/src/raw/waker.rs13
-rw-r--r--embassy-executor/src/raw/waker_turbo.rs34
6 files changed, 425 insertions, 211 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index e1258ebb5..f3760f589 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -11,17 +11,17 @@ mod run_queue;
11#[cfg(feature = "integrated-timers")] 11#[cfg(feature = "integrated-timers")]
12mod timer_queue; 12mod timer_queue;
13pub(crate) mod util; 13pub(crate) mod util;
14#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
14mod waker; 15mod waker;
15 16
16use core::cell::Cell;
17use core::future::Future; 17use core::future::Future;
18use core::marker::PhantomData;
19use core::mem;
18use core::pin::Pin; 20use core::pin::Pin;
19use core::ptr::NonNull; 21use core::ptr::NonNull;
20use core::task::{Context, Poll}; 22use core::task::{Context, Poll};
21use core::{mem, ptr};
22 23
23use atomic_polyfill::{AtomicU32, Ordering}; 24use atomic_polyfill::{AtomicU32, Ordering};
24use critical_section::CriticalSection;
25#[cfg(feature = "integrated-timers")] 25#[cfg(feature = "integrated-timers")]
26use embassy_time::driver::{self, AlarmHandle}; 26use embassy_time::driver::{self, AlarmHandle};
27#[cfg(feature = "integrated-timers")] 27#[cfg(feature = "integrated-timers")]
@@ -30,7 +30,7 @@ use embassy_time::Instant;
30use rtos_trace::trace; 30use rtos_trace::trace;
31 31
32use self::run_queue::{RunQueue, RunQueueItem}; 32use self::run_queue::{RunQueue, RunQueueItem};
33use self::util::UninitCell; 33use self::util::{SyncUnsafeCell, UninitCell};
34pub use self::waker::task_from_waker; 34pub use self::waker::task_from_waker;
35use super::SpawnToken; 35use super::SpawnToken;
36 36
@@ -43,35 +43,49 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; 43pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
44 44
45/// Raw task header for use in task pointers. 45/// Raw task header for use in task pointers.
46/// 46pub(crate) struct TaskHeader {
47/// This is an opaque struct, used for raw pointers to tasks, for use
48/// with funtions like [`wake_task`] and [`task_from_waker`].
49pub struct TaskHeader {
50 pub(crate) state: AtomicU32, 47 pub(crate) state: AtomicU32,
51 pub(crate) run_queue_item: RunQueueItem, 48 pub(crate) run_queue_item: RunQueueItem,
52 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 49 pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
53 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED 50 poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
54 51
55 #[cfg(feature = "integrated-timers")] 52 #[cfg(feature = "integrated-timers")]
56 pub(crate) expires_at: Cell<Instant>, 53 pub(crate) expires_at: SyncUnsafeCell<Instant>,
57 #[cfg(feature = "integrated-timers")] 54 #[cfg(feature = "integrated-timers")]
58 pub(crate) timer_queue_item: timer_queue::TimerQueueItem, 55 pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
59} 56}
60 57
61impl TaskHeader { 58/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
62 pub(crate) const fn new() -> Self { 59#[derive(Clone, Copy)]
60pub struct TaskRef {
61 ptr: NonNull<TaskHeader>,
62}
63
64unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
65unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
66
67impl TaskRef {
68 fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
63 Self { 69 Self {
64 state: AtomicU32::new(0), 70 ptr: NonNull::from(task).cast(),
65 run_queue_item: RunQueueItem::new(), 71 }
66 executor: Cell::new(ptr::null()), 72 }
67 poll_fn: UninitCell::uninit(),
68 73
69 #[cfg(feature = "integrated-timers")] 74 /// Safety: The pointer must have been obtained with `Task::as_ptr`
70 expires_at: Cell::new(Instant::from_ticks(0)), 75 pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
71 #[cfg(feature = "integrated-timers")] 76 Self {
72 timer_queue_item: timer_queue::TimerQueueItem::new(), 77 ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
73 } 78 }
74 } 79 }
80
81 pub(crate) fn header(self) -> &'static TaskHeader {
82 unsafe { self.ptr.as_ref() }
83 }
84
85 /// The returned pointer is valid for the entire TaskStorage.
86 pub(crate) fn as_ptr(self) -> *const TaskHeader {
87 self.ptr.as_ptr()
88 }
75} 89}
76 90
77/// Raw storage in which a task can be spawned. 91/// Raw storage in which a task can be spawned.
@@ -101,7 +115,18 @@ impl<F: Future + 'static> TaskStorage<F> {
101 /// Create a new TaskStorage, in not-spawned state. 115 /// Create a new TaskStorage, in not-spawned state.
102 pub const fn new() -> Self { 116 pub const fn new() -> Self {
103 Self { 117 Self {
104 raw: TaskHeader::new(), 118 raw: TaskHeader {
119 state: AtomicU32::new(0),
120 run_queue_item: RunQueueItem::new(),
121 executor: SyncUnsafeCell::new(None),
122 // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
123 poll_fn: SyncUnsafeCell::new(None),
124
125 #[cfg(feature = "integrated-timers")]
126 expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
127 #[cfg(feature = "integrated-timers")]
128 timer_queue_item: timer_queue::TimerQueueItem::new(),
129 },
105 future: UninitCell::uninit(), 130 future: UninitCell::uninit(),
106 } 131 }
107 } 132 }
@@ -120,29 +145,17 @@ impl<F: Future + 'static> TaskStorage<F> {
120 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 145 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
121 /// on a different executor. 146 /// on a different executor.
122 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 147 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
123 if self.spawn_mark_used() { 148 let task = AvailableTask::claim(self);
124 return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; 149 match task {
150 Some(task) => {
151 let task = task.initialize(future);
152 unsafe { SpawnToken::<F>::new(task) }
153 }
154 None => SpawnToken::new_failed(),
125 } 155 }
126
127 SpawnToken::<F>::new_failed()
128 }
129
130 fn spawn_mark_used(&'static self) -> bool {
131 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
132 self.raw
133 .state
134 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
135 .is_ok()
136 } 156 }
137 157
138 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> { 158 unsafe fn poll(p: TaskRef) {
139 // Initialize the task
140 self.raw.poll_fn.write(Self::poll);
141 self.future.write(future());
142 NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
143 }
144
145 unsafe fn poll(p: NonNull<TaskHeader>) {
146 let this = &*(p.as_ptr() as *const TaskStorage<F>); 159 let this = &*(p.as_ptr() as *const TaskStorage<F>);
147 160
148 let future = Pin::new_unchecked(this.future.as_mut()); 161 let future = Pin::new_unchecked(this.future.as_mut());
@@ -152,6 +165,9 @@ impl<F: Future + 'static> TaskStorage<F> {
152 Poll::Ready(_) => { 165 Poll::Ready(_) => {
153 this.future.drop_in_place(); 166 this.future.drop_in_place();
154 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 167 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
168
169 #[cfg(feature = "integrated-timers")]
170 this.raw.expires_at.set(Instant::MAX);
155 } 171 }
156 Poll::Pending => {} 172 Poll::Pending => {}
157 } 173 }
@@ -160,9 +176,37 @@ impl<F: Future + 'static> TaskStorage<F> {
160 // it's a noop for our waker. 176 // it's a noop for our waker.
161 mem::forget(waker); 177 mem::forget(waker);
162 } 178 }
179
180 #[doc(hidden)]
181 #[allow(dead_code)]
182 fn _assert_sync(self) {
183 fn assert_sync<T: Sync>(_: T) {}
184
185 assert_sync(self)
186 }
187}
188
189struct AvailableTask<F: Future + 'static> {
190 task: &'static TaskStorage<F>,
163} 191}
164 192
165unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 193impl<F: Future + 'static> AvailableTask<F> {
194 fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
195 task.raw
196 .state
197 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
198 .ok()
199 .map(|_| Self { task })
200 }
201
202 fn initialize(self, future: impl FnOnce() -> F) -> TaskRef {
203 unsafe {
204 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
205 self.task.future.write(future());
206 }
207 TaskRef::new(self.task)
208 }
209}
166 210
167/// Raw storage that can hold up to N tasks of the same type. 211/// Raw storage that can hold up to N tasks of the same type.
168/// 212///
@@ -187,13 +231,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
187 /// is currently free. If none is free, a "poisoned" SpawnToken is returned, 231 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
188 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. 232 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
189 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
190 for task in &self.pool { 234 let task = self.pool.iter().find_map(AvailableTask::claim);
191 if task.spawn_mark_used() { 235 match task {
192 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; 236 Some(task) => {
237 let task = task.initialize(future);
238 unsafe { SpawnToken::<F>::new(task) }
193 } 239 }
240 None => SpawnToken::new_failed(),
194 } 241 }
195
196 SpawnToken::<F>::new_failed()
197 } 242 }
198 243
199 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if 244 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
@@ -235,39 +280,71 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
235 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly 280 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
236 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. 281 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
237 282
238 for task in &self.pool { 283 let task = self.pool.iter().find_map(AvailableTask::claim);
239 if task.spawn_mark_used() { 284 match task {
240 return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); 285 Some(task) => {
286 let task = task.initialize(future);
287 unsafe { SpawnToken::<FutFn>::new(task) }
241 } 288 }
289 None => SpawnToken::new_failed(),
242 } 290 }
243
244 SpawnToken::<FutFn>::new_failed()
245 } 291 }
246} 292}
247 293
248/// Raw executor. 294#[derive(Clone, Copy)]
249/// 295pub(crate) enum PenderInner {
250/// This is the core of the Embassy executor. It is low-level, requiring manual 296 #[cfg(feature = "executor-thread")]
251/// handling of wakeups and task polling. If you can, prefer using one of the 297 Thread(crate::arch::ThreadPender),
252/// [higher level executors](crate::Executor). 298 #[cfg(feature = "executor-interrupt")]
253/// 299 Interrupt(crate::arch::InterruptPender),
254/// The raw executor leaves it up to you to handle wakeups and scheduling: 300 #[cfg(feature = "pender-callback")]
255/// 301 Callback { func: fn(*mut ()), context: *mut () },
256/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks 302}
257/// that "want to run"). 303
258/// - You must supply a `signal_fn`. The executor will call it to notify you it has work 304unsafe impl Send for PenderInner {}
259/// to do. You must arrange for `poll()` to be called as soon as possible. 305unsafe impl Sync for PenderInner {}
306
307/// Platform/architecture-specific action executed when an executor has pending work.
260/// 308///
261/// `signal_fn` can be called from *any* context: any thread, any interrupt priority 309/// When a task within an executor is woken, the `Pender` is called. This does a
262/// level, etc. It may be called synchronously from any `Executor` method call as well. 310/// platform/architecture-specific action to signal there is pending work in the executor.
263/// You must deal with this correctly. 311/// When this happens, you must arrange for [`Executor::poll`] to be called.
264/// 312///
265/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates 313/// You can think of it as a waker, but for the whole executor.
266/// the requirement for `poll` to not be called reentrantly. 314pub struct Pender(pub(crate) PenderInner);
267pub struct Executor { 315
316impl Pender {
317 /// Create a `Pender` that will call an arbitrary function pointer.
318 ///
319 /// # Arguments
320 ///
321 /// - `func`: The function pointer to call.
322 /// - `context`: Opaque context pointer, that will be passed to the function pointer.
323 #[cfg(feature = "pender-callback")]
324 pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self {
325 Self(PenderInner::Callback {
326 func,
327 context: context.into(),
328 })
329 }
330}
331
332impl Pender {
333 pub(crate) fn pend(&self) {
334 match self.0 {
335 #[cfg(feature = "executor-thread")]
336 PenderInner::Thread(x) => x.pend(),
337 #[cfg(feature = "executor-interrupt")]
338 PenderInner::Interrupt(x) => x.pend(),
339 #[cfg(feature = "pender-callback")]
340 PenderInner::Callback { func, context } => func(context),
341 }
342 }
343}
344
345pub(crate) struct SyncExecutor {
268 run_queue: RunQueue, 346 run_queue: RunQueue,
269 signal_fn: fn(*mut ()), 347 pender: Pender,
270 signal_ctx: *mut (),
271 348
272 #[cfg(feature = "integrated-timers")] 349 #[cfg(feature = "integrated-timers")]
273 pub(crate) timer_queue: timer_queue::TimerQueue, 350 pub(crate) timer_queue: timer_queue::TimerQueue,
@@ -275,23 +352,14 @@ pub struct Executor {
275 alarm: AlarmHandle, 352 alarm: AlarmHandle,
276} 353}
277 354
278impl Executor { 355impl SyncExecutor {
279 /// Create a new executor. 356 pub(crate) fn new(pender: Pender) -> Self {
280 ///
281 /// When the executor has work to do, it will call `signal_fn` with
282 /// `signal_ctx` as argument.
283 ///
284 /// See [`Executor`] docs for details on `signal_fn`.
285 pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
286 #[cfg(feature = "integrated-timers")] 357 #[cfg(feature = "integrated-timers")]
287 let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; 358 let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
288 #[cfg(feature = "integrated-timers")]
289 driver::set_alarm_callback(alarm, signal_fn, signal_ctx);
290 359
291 Self { 360 Self {
292 run_queue: RunQueue::new(), 361 run_queue: RunQueue::new(),
293 signal_fn, 362 pender,
294 signal_ctx,
295 363
296 #[cfg(feature = "integrated-timers")] 364 #[cfg(feature = "integrated-timers")]
297 timer_queue: timer_queue::TimerQueue::new(), 365 timer_queue: timer_queue::TimerQueue::new(),
@@ -307,12 +375,133 @@ impl Executor {
307 /// - `task` must be set up to run in this executor. 375 /// - `task` must be set up to run in this executor.
308 /// - `task` must NOT be already enqueued (in this executor or another one). 376 /// - `task` must NOT be already enqueued (in this executor or another one).
309 #[inline(always)] 377 #[inline(always)]
310 unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) { 378 unsafe fn enqueue(&self, task: TaskRef) {
311 #[cfg(feature = "rtos-trace")] 379 #[cfg(feature = "rtos-trace")]
312 trace::task_ready_begin(task.as_ptr() as u32); 380 trace::task_ready_begin(task.as_ptr() as u32);
313 381
314 if self.run_queue.enqueue(cs, task) { 382 if self.run_queue.enqueue(task) {
315 (self.signal_fn)(self.signal_ctx) 383 self.pender.pend();
384 }
385 }
386
387 #[cfg(feature = "integrated-timers")]
388 fn alarm_callback(ctx: *mut ()) {
389 let this: &Self = unsafe { &*(ctx as *const Self) };
390 this.pender.pend();
391 }
392
393 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
394 task.header().executor.set(Some(self));
395
396 #[cfg(feature = "rtos-trace")]
397 trace::task_new(task.as_ptr() as u32);
398
399 self.enqueue(task);
400 }
401
402 /// # Safety
403 ///
404 /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
405 pub(crate) unsafe fn poll(&'static self) {
406 #[cfg(feature = "integrated-timers")]
407 driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
408
409 #[allow(clippy::never_loop)]
410 loop {
411 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
413
414 self.run_queue.dequeue_all(|p| {
415 let task = p.header();
416
417 #[cfg(feature = "integrated-timers")]
418 task.expires_at.set(Instant::MAX);
419
420 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
421 if state & STATE_SPAWNED == 0 {
422 // If task is not running, ignore it. This can happen in the following scenario:
423 // - Task gets dequeued, poll starts
424 // - While task is being polled, it gets woken. It gets placed in the queue.
425 // - Task poll finishes, returning done=true
426 // - RUNNING bit is cleared, but the task is already in the queue.
427 return;
428 }
429
430 #[cfg(feature = "rtos-trace")]
431 trace::task_exec_begin(p.as_ptr() as u32);
432
433 // Run the task
434 task.poll_fn.get().unwrap_unchecked()(p);
435
436 #[cfg(feature = "rtos-trace")]
437 trace::task_exec_end();
438
439 // Enqueue or update into timer_queue
440 #[cfg(feature = "integrated-timers")]
441 self.timer_queue.update(p);
442 });
443
444 #[cfg(feature = "integrated-timers")]
445 {
446 // If this is already in the past, set_alarm might return false
447 // In that case do another poll loop iteration.
448 let next_expiration = self.timer_queue.next_expiration();
449 if driver::set_alarm(self.alarm, next_expiration.as_ticks()) {
450 break;
451 }
452 }
453
454 #[cfg(not(feature = "integrated-timers"))]
455 {
456 break;
457 }
458 }
459
460 #[cfg(feature = "rtos-trace")]
461 trace::system_idle();
462 }
463}
464
465/// Raw executor.
466///
467/// This is the core of the Embassy executor. It is low-level, requiring manual
468/// handling of wakeups and task polling. If you can, prefer using one of the
469/// [higher level executors](crate::Executor).
470///
471/// The raw executor leaves it up to you to handle wakeups and scheduling:
472///
473/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
474/// that "want to run").
475/// - You must supply a [`Pender`]. The executor will call it to notify you it has work
476/// to do. You must arrange for `poll()` to be called as soon as possible.
477///
478/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority
479/// level, etc. It may be called synchronously from any `Executor` method call as well.
480/// You must deal with this correctly.
481///
482/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
483/// the requirement for `poll` to not be called reentrantly.
484#[repr(transparent)]
485pub struct Executor {
486 pub(crate) inner: SyncExecutor,
487
488 _not_sync: PhantomData<*mut ()>,
489}
490
491impl Executor {
492 pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
493 mem::transmute(inner)
494 }
495
496 /// Create a new executor.
497 ///
498 /// When the executor has work to do, it will call the [`Pender`].
499 ///
500 /// See [`Executor`] docs for details on `Pender`.
501 pub fn new(pender: Pender) -> Self {
502 Self {
503 inner: SyncExecutor::new(pender),
504 _not_sync: PhantomData,
316 } 505 }
317 } 506 }
318 507
@@ -325,15 +514,8 @@ impl Executor {
325 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. 514 /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
326 /// In this case, the task's Future must be Send. This is because this is effectively 515 /// In this case, the task's Future must be Send. This is because this is effectively
327 /// sending the task to the executor thread. 516 /// sending the task to the executor thread.
328 pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { 517 pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
329 task.as_ref().executor.set(self); 518 self.inner.spawn(task)
330
331 #[cfg(feature = "rtos-trace")]
332 trace::task_new(task.as_ptr() as u32);
333
334 critical_section::with(|cs| {
335 self.enqueue(cs, task);
336 })
337 } 519 }
338 520
339 /// Poll all queued tasks in this executor. 521 /// Poll all queued tasks in this executor.
@@ -341,63 +523,20 @@ impl Executor {
341 /// This loops over all tasks that are queued to be polled (i.e. they're 523 /// This loops over all tasks that are queued to be polled (i.e. they're
342 /// freshly spawned or they've been woken). Other tasks are not polled. 524 /// freshly spawned or they've been woken). Other tasks are not polled.
343 /// 525 ///
344 /// You must call `poll` after receiving a call to `signal_fn`. It is OK 526 /// You must call `poll` after receiving a call to the [`Pender`]. It is OK
345 /// to call `poll` even when not requested by `signal_fn`, but it wastes 527 /// to call `poll` even when not requested by the `Pender`, but it wastes
346 /// energy. 528 /// energy.
347 /// 529 ///
348 /// # Safety 530 /// # Safety
349 /// 531 ///
350 /// You must NOT call `poll` reentrantly on the same executor. 532 /// You must NOT call `poll` reentrantly on the same executor.
351 /// 533 ///
352 /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you 534 /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you
353 /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to 535 /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to
354 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 536 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
355 /// no `poll()` already running. 537 /// no `poll()` already running.
356 pub unsafe fn poll(&'static self) { 538 pub unsafe fn poll(&'static self) {
357 #[cfg(feature = "integrated-timers")] 539 self.inner.poll()
358 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
359
360 self.run_queue.dequeue_all(|p| {
361 let task = p.as_ref();
362
363 #[cfg(feature = "integrated-timers")]
364 task.expires_at.set(Instant::MAX);
365
366 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
367 if state & STATE_SPAWNED == 0 {
368 // If task is not running, ignore it. This can happen in the following scenario:
369 // - Task gets dequeued, poll starts
370 // - While task is being polled, it gets woken. It gets placed in the queue.
371 // - Task poll finishes, returning done=true
372 // - RUNNING bit is cleared, but the task is already in the queue.
373 return;
374 }
375
376 #[cfg(feature = "rtos-trace")]
377 trace::task_exec_begin(p.as_ptr() as u32);
378
379 // Run the task
380 task.poll_fn.read()(p as _);
381
382 #[cfg(feature = "rtos-trace")]
383 trace::task_exec_end();
384
385 // Enqueue or update into timer_queue
386 #[cfg(feature = "integrated-timers")]
387 self.timer_queue.update(p);
388 });
389
390 #[cfg(feature = "integrated-timers")]
391 {
392 // If this is already in the past, set_alarm will immediately trigger the alarm.
393 // This will cause `signal_fn` to be called, which will cause `poll()` to be called again,
394 // so we immediately do another poll loop iteration.
395 let next_expiration = self.timer_queue.next_expiration();
396 driver::set_alarm(self.alarm, next_expiration.as_ticks());
397 }
398
399 #[cfg(feature = "rtos-trace")]
400 trace::system_idle();
401 } 540 }
402 541
403 /// Get a spawner that spawns tasks in this executor. 542 /// Get a spawner that spawns tasks in this executor.
@@ -409,41 +548,49 @@ impl Executor {
409 } 548 }
410} 549}
411 550
412/// Wake a task by raw pointer. 551/// Wake a task by `TaskRef`.
413///
414/// You can obtain task pointers from `Waker`s using [`task_from_waker`].
415/// 552///
416/// # Safety 553/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
417/// 554pub fn wake_task(task: TaskRef) {
418/// `task` must be a valid task pointer obtained from [`task_from_waker`]. 555 let header = task.header();
419pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
420 critical_section::with(|cs| {
421 let header = task.as_ref();
422 let state = header.state.load(Ordering::Relaxed);
423 556
557 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
424 // If already scheduled, or if not started, 558 // If already scheduled, or if not started,
425 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { 559 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
426 return; 560 None
561 } else {
562 // Mark it as scheduled
563 Some(state | STATE_RUN_QUEUED)
427 } 564 }
565 });
428 566
429 // Mark it as scheduled 567 if res.is_ok() {
430 header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
431
432 // We have just marked the task as scheduled, so enqueue it. 568 // We have just marked the task as scheduled, so enqueue it.
433 let executor = &*header.executor.get(); 569 unsafe {
434 executor.enqueue(cs, task); 570 let executor = header.executor.get().unwrap_unchecked();
435 }) 571 executor.enqueue(task);
572 }
573 }
436} 574}
437 575
438#[cfg(feature = "integrated-timers")] 576#[cfg(feature = "integrated-timers")]
439#[no_mangle] 577struct TimerQueue;
440unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { 578
441 let task = waker::task_from_waker(waker); 579#[cfg(feature = "integrated-timers")]
442 let task = task.as_ref(); 580impl embassy_time::queue::TimerQueue for TimerQueue {
443 let expires_at = task.expires_at.get(); 581 fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
444 task.expires_at.set(expires_at.min(at)); 582 let task = waker::task_from_waker(waker);
583 let task = task.header();
584 unsafe {
585 let expires_at = task.expires_at.get();
586 task.expires_at.set(expires_at.min(at));
587 }
588 }
445} 589}
446 590
591#[cfg(feature = "integrated-timers")]
592embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
593
447#[cfg(feature = "rtos-trace")] 594#[cfg(feature = "rtos-trace")]
448impl rtos_trace::RtosTraceOSCallbacks for Executor { 595impl rtos_trace::RtosTraceOSCallbacks for Executor {
449 fn task_list() { 596 fn task_list() {
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs
index ed8c82a5c..f1ec19ac1 100644
--- a/embassy-executor/src/raw/run_queue.rs
+++ b/embassy-executor/src/raw/run_queue.rs
@@ -2,18 +2,18 @@ use core::ptr;
2use core::ptr::NonNull; 2use core::ptr::NonNull;
3 3
4use atomic_polyfill::{AtomicPtr, Ordering}; 4use atomic_polyfill::{AtomicPtr, Ordering};
5use critical_section::CriticalSection;
6 5
7use super::TaskHeader; 6use super::{TaskHeader, TaskRef};
7use crate::raw::util::SyncUnsafeCell;
8 8
9pub(crate) struct RunQueueItem { 9pub(crate) struct RunQueueItem {
10 next: AtomicPtr<TaskHeader>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
11} 11}
12 12
13impl RunQueueItem { 13impl RunQueueItem {
14 pub const fn new() -> Self { 14 pub const fn new() -> Self {
15 Self { 15 Self {
16 next: AtomicPtr::new(ptr::null_mut()), 16 next: SyncUnsafeCell::new(None),
17 } 17 }
18 } 18 }
19} 19}
@@ -46,29 +46,43 @@ impl RunQueue {
46 /// 46 ///
47 /// `item` must NOT be already enqueued in any queue. 47 /// `item` must NOT be already enqueued in any queue.
48 #[inline(always)] 48 #[inline(always)]
49 pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { 49 pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
50 let prev = self.head.load(Ordering::Relaxed); 50 let mut was_empty = false;
51 task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); 51
52 self.head.store(task.as_ptr(), Ordering::Relaxed); 52 self.head
53 prev.is_null() 53 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
54 was_empty = prev.is_null();
55 unsafe {
56 // safety: the pointer is either null or valid
57 let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
58 // safety: there are no concurrent accesses to `next`
59 task.header().run_queue_item.next.set(prev);
60 }
61 Some(task.as_ptr() as *mut _)
62 })
63 .ok();
64
65 was_empty
54 } 66 }
55 67
56 /// Empty the queue, then call `on_task` for each task that was in the queue. 68 /// Empty the queue, then call `on_task` for each task that was in the queue.
57 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue 69 /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
58 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. 70 /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
59 pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { 71 pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
60 // Atomically empty the queue. 72 // Atomically empty the queue.
61 let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 73 let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
74
75 // safety: the pointer is either null or valid
76 let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
62 77
63 // Iterate the linked list of tasks that were previously in the queue. 78 // Iterate the linked list of tasks that were previously in the queue.
64 while let Some(task) = NonNull::new(ptr) { 79 while let Some(task) = next {
65 // If the task re-enqueues itself, the `next` pointer will get overwritten. 80 // If the task re-enqueues itself, the `next` pointer will get overwritten.
66 // Therefore, first read the next pointer, and only then process the task. 81 // Therefore, first read the next pointer, and only then process the task.
67 let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); 82 // safety: there are no concurrent accesses to `next`
83 next = unsafe { task.header().run_queue_item.next.get() };
68 84
69 on_task(task); 85 on_task(task);
70
71 ptr = next
72 } 86 }
73 } 87 }
74} 88}
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs
index 24c31892a..dc71c95b1 100644
--- a/embassy-executor/src/raw/timer_queue.rs
+++ b/embassy-executor/src/raw/timer_queue.rs
@@ -1,45 +1,43 @@
1use core::cell::Cell;
2use core::cmp::min; 1use core::cmp::min;
3use core::ptr;
4use core::ptr::NonNull;
5 2
6use atomic_polyfill::Ordering; 3use atomic_polyfill::Ordering;
7use embassy_time::Instant; 4use embassy_time::Instant;
8 5
9use super::{TaskHeader, STATE_TIMER_QUEUED}; 6use super::{TaskRef, STATE_TIMER_QUEUED};
7use crate::raw::util::SyncUnsafeCell;
10 8
11pub(crate) struct TimerQueueItem { 9pub(crate) struct TimerQueueItem {
12 next: Cell<*mut TaskHeader>, 10 next: SyncUnsafeCell<Option<TaskRef>>,
13} 11}
14 12
15impl TimerQueueItem { 13impl TimerQueueItem {
16 pub const fn new() -> Self { 14 pub const fn new() -> Self {
17 Self { 15 Self {
18 next: Cell::new(ptr::null_mut()), 16 next: SyncUnsafeCell::new(None),
19 } 17 }
20 } 18 }
21} 19}
22 20
23pub(crate) struct TimerQueue { 21pub(crate) struct TimerQueue {
24 head: Cell<*mut TaskHeader>, 22 head: SyncUnsafeCell<Option<TaskRef>>,
25} 23}
26 24
27impl TimerQueue { 25impl TimerQueue {
28 pub const fn new() -> Self { 26 pub const fn new() -> Self {
29 Self { 27 Self {
30 head: Cell::new(ptr::null_mut()), 28 head: SyncUnsafeCell::new(None),
31 } 29 }
32 } 30 }
33 31
34 pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) { 32 pub(crate) unsafe fn update(&self, p: TaskRef) {
35 let task = p.as_ref(); 33 let task = p.header();
36 if task.expires_at.get() != Instant::MAX { 34 if task.expires_at.get() != Instant::MAX {
37 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 35 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
38 let is_new = old_state & STATE_TIMER_QUEUED == 0; 36 let is_new = old_state & STATE_TIMER_QUEUED == 0;
39 37
40 if is_new { 38 if is_new {
41 task.timer_queue_item.next.set(self.head.get()); 39 task.timer_queue_item.next.set(self.head.get());
42 self.head.set(p.as_ptr()); 40 self.head.set(Some(p));
43 } 41 }
44 } 42 }
45 } 43 }
@@ -47,7 +45,7 @@ impl TimerQueue {
47 pub(crate) unsafe fn next_expiration(&self) -> Instant { 45 pub(crate) unsafe fn next_expiration(&self) -> Instant {
48 let mut res = Instant::MAX; 46 let mut res = Instant::MAX;
49 self.retain(|p| { 47 self.retain(|p| {
50 let task = p.as_ref(); 48 let task = p.header();
51 let expires = task.expires_at.get(); 49 let expires = task.expires_at.get();
52 res = min(res, expires); 50 res = min(res, expires);
53 expires != Instant::MAX 51 expires != Instant::MAX
@@ -55,9 +53,9 @@ impl TimerQueue {
55 res 53 res
56 } 54 }
57 55
58 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) { 56 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(TaskRef)) {
59 self.retain(|p| { 57 self.retain(|p| {
60 let task = p.as_ref(); 58 let task = p.header();
61 if task.expires_at.get() <= now { 59 if task.expires_at.get() <= now {
62 on_task(p); 60 on_task(p);
63 false 61 false
@@ -67,11 +65,10 @@ impl TimerQueue {
67 }); 65 });
68 } 66 }
69 67
70 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) { 68 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
71 let mut prev = &self.head; 69 let mut prev = &self.head;
72 while !prev.get().is_null() { 70 while let Some(p) = prev.get() {
73 let p = NonNull::new_unchecked(prev.get()); 71 let task = p.header();
74 let task = &*p.as_ptr();
75 if f(p) { 72 if f(p) {
76 // Skip to next 73 // Skip to next
77 prev = &task.timer_queue_item.next; 74 prev = &task.timer_queue_item.next;
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs
index ed5822188..e2e8f4df8 100644
--- a/embassy-executor/src/raw/util.rs
+++ b/embassy-executor/src/raw/util.rs
@@ -26,8 +26,31 @@ impl<T> UninitCell<T> {
26 } 26 }
27} 27}
28 28
29impl<T: Copy> UninitCell<T> { 29unsafe impl<T> Sync for UninitCell<T> {}
30 pub unsafe fn read(&self) -> T { 30
31 ptr::read(self.as_mut_ptr()) 31#[repr(transparent)]
32pub struct SyncUnsafeCell<T> {
33 value: UnsafeCell<T>,
34}
35
36unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}
37
38impl<T> SyncUnsafeCell<T> {
39 #[inline]
40 pub const fn new(value: T) -> Self {
41 Self {
42 value: UnsafeCell::new(value),
43 }
44 }
45
46 pub unsafe fn set(&self, value: T) {
47 *self.value.get() = value;
48 }
49
50 pub unsafe fn get(&self) -> T
51 where
52 T: Copy,
53 {
54 *self.value.get()
32 } 55 }
33} 56}
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs
index 5765259f2..400b37fa9 100644
--- a/embassy-executor/src/raw/waker.rs
+++ b/embassy-executor/src/raw/waker.rs
@@ -1,8 +1,7 @@
1use core::mem; 1use core::mem;
2use core::ptr::NonNull;
3use core::task::{RawWaker, RawWakerVTable, Waker}; 2use core::task::{RawWaker, RawWakerVTable, Waker};
4 3
5use super::{wake_task, TaskHeader}; 4use super::{wake_task, TaskHeader, TaskRef};
6 5
7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 6const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
8 7
@@ -11,14 +10,14 @@ unsafe fn clone(p: *const ()) -> RawWaker {
11} 10}
12 11
13unsafe fn wake(p: *const ()) { 12unsafe fn wake(p: *const ()) {
14 wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) 13 wake_task(TaskRef::from_ptr(p as *const TaskHeader))
15} 14}
16 15
17unsafe fn drop(_: *const ()) { 16unsafe fn drop(_: *const ()) {
18 // nop 17 // nop
19} 18}
20 19
21pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { 20pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) 21 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
23} 22}
24 23
@@ -33,7 +32,7 @@ pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker {
33/// # Panics 32/// # Panics
34/// 33///
35/// Panics if the waker is not created by the Embassy executor. 34/// Panics if the waker is not created by the Embassy executor.
36pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { 35pub fn task_from_waker(waker: &Waker) -> TaskRef {
37 // safety: OK because WakerHack has the same layout as Waker. 36 // safety: OK because WakerHack has the same layout as Waker.
38 // This is not really guaranteed because the structs are `repr(Rust)`, it is 37 // This is not really guaranteed because the structs are `repr(Rust)`, it is
39 // indeed the case in the current implementation. 38 // indeed the case in the current implementation.
@@ -43,8 +42,8 @@ pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> {
43 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") 42 panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.")
44 } 43 }
45 44
46 // safety: we never create a waker with a null data pointer. 45 // safety: our wakers are always created with `TaskRef::as_ptr`
47 unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } 46 unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) }
48} 47}
49 48
50struct WakerHack { 49struct WakerHack {
diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs
new file mode 100644
index 000000000..435a0ff7e
--- /dev/null
+++ b/embassy-executor/src/raw/waker_turbo.rs
@@ -0,0 +1,34 @@
1use core::ptr::NonNull;
2use core::task::Waker;
3
4use super::{wake_task, TaskHeader, TaskRef};
5
6pub(crate) unsafe fn from_task(p: TaskRef) -> Waker {
7 Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _))
8}
9
10/// Get a task pointer from a waker.
11///
12/// This can be used as an optimization in wait queues to store task pointers
13/// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps
14/// avoid dynamic dispatch.
15///
16/// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task).
17///
18/// # Panics
19///
20/// Panics if the waker is not created by the Embassy executor.
21pub fn task_from_waker(waker: &Waker) -> TaskRef {
22 let ptr = waker.as_turbo_ptr().as_ptr();
23
24 // safety: our wakers are always created with `TaskRef::as_ptr`
25 unsafe { TaskRef::from_ptr(ptr as *const TaskHeader) }
26}
27
28#[inline(never)]
29#[no_mangle]
30fn _turbo_wake(ptr: NonNull<()>) {
31 // safety: our wakers are always created with `TaskRef::as_ptr`
32 let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) };
33 wake_task(task)
34}