diff options
| author | Mathias <[email protected]> | 2023-02-13 14:55:15 +0100 |
|---|---|---|
| committer | Mathias <[email protected]> | 2023-02-13 14:55:15 +0100 |
| commit | 218b44652c149f895919b606a660b6eff30e8177 (patch) | |
| tree | 5f985f6edd12926a6f374c17a3a0c3a4226088e7 /embassy-executor/src/raw | |
| parent | 86113e199f37fe0888979608a08bfdaf21bff19a (diff) | |
| parent | 41a563aae3e474955892b27487e185f5f486f525 (diff) | |
Rebase on master
Diffstat (limited to 'embassy-executor/src/raw')
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 245 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue.rs | 13 | ||||
| -rw-r--r-- | embassy-executor/src/raw/timer_queue.rs | 35 | ||||
| -rw-r--r-- | embassy-executor/src/raw/util.rs | 6 | ||||
| -rw-r--r-- | embassy-executor/src/raw/waker.rs | 13 |
5 files changed, 173 insertions, 139 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e1258ebb5..42bd82262 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -15,10 +15,10 @@ mod waker; | |||
| 15 | 15 | ||
| 16 | use core::cell::Cell; | 16 | use core::cell::Cell; |
| 17 | use core::future::Future; | 17 | use core::future::Future; |
| 18 | use core::mem; | ||
| 18 | use core::pin::Pin; | 19 | use core::pin::Pin; |
| 19 | use core::ptr::NonNull; | 20 | use core::ptr::NonNull; |
| 20 | use core::task::{Context, Poll}; | 21 | use core::task::{Context, Poll}; |
| 21 | use core::{mem, ptr}; | ||
| 22 | 22 | ||
| 23 | use atomic_polyfill::{AtomicU32, Ordering}; | 23 | use atomic_polyfill::{AtomicU32, Ordering}; |
| 24 | use critical_section::CriticalSection; | 24 | use critical_section::CriticalSection; |
| @@ -43,14 +43,11 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | |||
| 43 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | 43 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; |
| 44 | 44 | ||
| 45 | /// Raw task header for use in task pointers. | 45 | /// Raw task header for use in task pointers. |
| 46 | /// | 46 | pub(crate) struct TaskHeader { |
| 47 | /// This is an opaque struct, used for raw pointers to tasks, for use | ||
| 48 | /// with funtions like [`wake_task`] and [`task_from_waker`]. | ||
| 49 | pub struct TaskHeader { | ||
| 50 | pub(crate) state: AtomicU32, | 47 | pub(crate) state: AtomicU32, |
| 51 | pub(crate) run_queue_item: RunQueueItem, | 48 | pub(crate) run_queue_item: RunQueueItem, |
| 52 | pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 | 49 | pub(crate) executor: Cell<Option<&'static Executor>>, |
| 53 | pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED | 50 | poll_fn: Cell<Option<unsafe fn(TaskRef)>>, |
| 54 | 51 | ||
| 55 | #[cfg(feature = "integrated-timers")] | 52 | #[cfg(feature = "integrated-timers")] |
| 56 | pub(crate) expires_at: Cell<Instant>, | 53 | pub(crate) expires_at: Cell<Instant>, |
| @@ -58,20 +55,34 @@ pub struct TaskHeader { | |||
| 58 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 55 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 59 | } | 56 | } |
| 60 | 57 | ||
| 61 | impl TaskHeader { | 58 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 62 | pub(crate) const fn new() -> Self { | 59 | #[derive(Clone, Copy)] |
| 60 | pub struct TaskRef { | ||
| 61 | ptr: NonNull<TaskHeader>, | ||
| 62 | } | ||
| 63 | |||
| 64 | impl TaskRef { | ||
| 65 | fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { | ||
| 63 | Self { | 66 | Self { |
| 64 | state: AtomicU32::new(0), | 67 | ptr: NonNull::from(task).cast(), |
| 65 | run_queue_item: RunQueueItem::new(), | 68 | } |
| 66 | executor: Cell::new(ptr::null()), | 69 | } |
| 67 | poll_fn: UninitCell::uninit(), | ||
| 68 | 70 | ||
| 69 | #[cfg(feature = "integrated-timers")] | 71 | /// Safety: The pointer must have been obtained with `Task::as_ptr` |
| 70 | expires_at: Cell::new(Instant::from_ticks(0)), | 72 | pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self { |
| 71 | #[cfg(feature = "integrated-timers")] | 73 | Self { |
| 72 | timer_queue_item: timer_queue::TimerQueueItem::new(), | 74 | ptr: NonNull::new_unchecked(ptr as *mut TaskHeader), |
| 73 | } | 75 | } |
| 74 | } | 76 | } |
| 77 | |||
| 78 | pub(crate) fn header(self) -> &'static TaskHeader { | ||
| 79 | unsafe { self.ptr.as_ref() } | ||
| 80 | } | ||
| 81 | |||
| 82 | /// The returned pointer is valid for the entire TaskStorage. | ||
| 83 | pub(crate) fn as_ptr(self) -> *const TaskHeader { | ||
| 84 | self.ptr.as_ptr() | ||
| 85 | } | ||
| 75 | } | 86 | } |
| 76 | 87 | ||
| 77 | /// Raw storage in which a task can be spawned. | 88 | /// Raw storage in which a task can be spawned. |
| @@ -101,7 +112,18 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 101 | /// Create a new TaskStorage, in not-spawned state. | 112 | /// Create a new TaskStorage, in not-spawned state. |
| 102 | pub const fn new() -> Self { | 113 | pub const fn new() -> Self { |
| 103 | Self { | 114 | Self { |
| 104 | raw: TaskHeader::new(), | 115 | raw: TaskHeader { |
| 116 | state: AtomicU32::new(0), | ||
| 117 | run_queue_item: RunQueueItem::new(), | ||
| 118 | executor: Cell::new(None), | ||
| 119 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | ||
| 120 | poll_fn: Cell::new(None), | ||
| 121 | |||
| 122 | #[cfg(feature = "integrated-timers")] | ||
| 123 | expires_at: Cell::new(Instant::from_ticks(0)), | ||
| 124 | #[cfg(feature = "integrated-timers")] | ||
| 125 | timer_queue_item: timer_queue::TimerQueueItem::new(), | ||
| 126 | }, | ||
| 105 | future: UninitCell::uninit(), | 127 | future: UninitCell::uninit(), |
| 106 | } | 128 | } |
| 107 | } | 129 | } |
| @@ -120,29 +142,17 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 120 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it | 142 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it |
| 121 | /// on a different executor. | 143 | /// on a different executor. |
| 122 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | 144 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 123 | if self.spawn_mark_used() { | 145 | let task = AvailableTask::claim(self); |
| 124 | return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; | 146 | match task { |
| 147 | Some(task) => { | ||
| 148 | let task = task.initialize(future); | ||
| 149 | unsafe { SpawnToken::<F>::new(task) } | ||
| 150 | } | ||
| 151 | None => SpawnToken::new_failed(), | ||
| 125 | } | 152 | } |
| 126 | |||
| 127 | SpawnToken::<F>::new_failed() | ||
| 128 | } | ||
| 129 | |||
| 130 | fn spawn_mark_used(&'static self) -> bool { | ||
| 131 | let state = STATE_SPAWNED | STATE_RUN_QUEUED; | ||
| 132 | self.raw | ||
| 133 | .state | ||
| 134 | .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) | ||
| 135 | .is_ok() | ||
| 136 | } | ||
| 137 | |||
| 138 | unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> { | ||
| 139 | // Initialize the task | ||
| 140 | self.raw.poll_fn.write(Self::poll); | ||
| 141 | self.future.write(future()); | ||
| 142 | NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader) | ||
| 143 | } | 153 | } |
| 144 | 154 | ||
| 145 | unsafe fn poll(p: NonNull<TaskHeader>) { | 155 | unsafe fn poll(p: TaskRef) { |
| 146 | let this = &*(p.as_ptr() as *const TaskStorage<F>); | 156 | let this = &*(p.as_ptr() as *const TaskStorage<F>); |
| 147 | 157 | ||
| 148 | let future = Pin::new_unchecked(this.future.as_mut()); | 158 | let future = Pin::new_unchecked(this.future.as_mut()); |
| @@ -164,6 +174,28 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 164 | 174 | ||
| 165 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} | 175 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} |
| 166 | 176 | ||
| 177 | struct AvailableTask<F: Future + 'static> { | ||
| 178 | task: &'static TaskStorage<F>, | ||
| 179 | } | ||
| 180 | |||
| 181 | impl<F: Future + 'static> AvailableTask<F> { | ||
| 182 | fn claim(task: &'static TaskStorage<F>) -> Option<Self> { | ||
| 183 | task.raw | ||
| 184 | .state | ||
| 185 | .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) | ||
| 186 | .ok() | ||
| 187 | .map(|_| Self { task }) | ||
| 188 | } | ||
| 189 | |||
| 190 | fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { | ||
| 191 | unsafe { | ||
| 192 | self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); | ||
| 193 | self.task.future.write(future()); | ||
| 194 | } | ||
| 195 | TaskRef::new(self.task) | ||
| 196 | } | ||
| 197 | } | ||
| 198 | |||
| 167 | /// Raw storage that can hold up to N tasks of the same type. | 199 | /// Raw storage that can hold up to N tasks of the same type. |
| 168 | /// | 200 | /// |
| 169 | /// This is essentially a `[TaskStorage<F>; N]`. | 201 | /// This is essentially a `[TaskStorage<F>; N]`. |
| @@ -187,13 +219,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 187 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, | 219 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, |
| 188 | /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. | 220 | /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. |
| 189 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | 221 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 190 | for task in &self.pool { | 222 | let task = self.pool.iter().find_map(AvailableTask::claim); |
| 191 | if task.spawn_mark_used() { | 223 | match task { |
| 192 | return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; | 224 | Some(task) => { |
| 225 | let task = task.initialize(future); | ||
| 226 | unsafe { SpawnToken::<F>::new(task) } | ||
| 193 | } | 227 | } |
| 228 | None => SpawnToken::new_failed(), | ||
| 194 | } | 229 | } |
| 195 | |||
| 196 | SpawnToken::<F>::new_failed() | ||
| 197 | } | 230 | } |
| 198 | 231 | ||
| 199 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if | 232 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if |
| @@ -235,13 +268,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 235 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly | 268 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly |
| 236 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. | 269 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. |
| 237 | 270 | ||
| 238 | for task in &self.pool { | 271 | let task = self.pool.iter().find_map(AvailableTask::claim); |
| 239 | if task.spawn_mark_used() { | 272 | match task { |
| 240 | return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); | 273 | Some(task) => { |
| 274 | let task = task.initialize(future); | ||
| 275 | unsafe { SpawnToken::<FutFn>::new(task) } | ||
| 241 | } | 276 | } |
| 277 | None => SpawnToken::new_failed(), | ||
| 242 | } | 278 | } |
| 243 | |||
| 244 | SpawnToken::<FutFn>::new_failed() | ||
| 245 | } | 279 | } |
| 246 | } | 280 | } |
| 247 | 281 | ||
| @@ -307,7 +341,7 @@ impl Executor { | |||
| 307 | /// - `task` must be set up to run in this executor. | 341 | /// - `task` must be set up to run in this executor. |
| 308 | /// - `task` must NOT be already enqueued (in this executor or another one). | 342 | /// - `task` must NOT be already enqueued (in this executor or another one). |
| 309 | #[inline(always)] | 343 | #[inline(always)] |
| 310 | unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) { | 344 | unsafe fn enqueue(&self, cs: CriticalSection, task: TaskRef) { |
| 311 | #[cfg(feature = "rtos-trace")] | 345 | #[cfg(feature = "rtos-trace")] |
| 312 | trace::task_ready_begin(task.as_ptr() as u32); | 346 | trace::task_ready_begin(task.as_ptr() as u32); |
| 313 | 347 | ||
| @@ -325,8 +359,8 @@ impl Executor { | |||
| 325 | /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. | 359 | /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. |
| 326 | /// In this case, the task's Future must be Send. This is because this is effectively | 360 | /// In this case, the task's Future must be Send. This is because this is effectively |
| 327 | /// sending the task to the executor thread. | 361 | /// sending the task to the executor thread. |
| 328 | pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { | 362 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 329 | task.as_ref().executor.set(self); | 363 | task.header().executor.set(Some(self)); |
| 330 | 364 | ||
| 331 | #[cfg(feature = "rtos-trace")] | 365 | #[cfg(feature = "rtos-trace")] |
| 332 | trace::task_new(task.as_ptr() as u32); | 366 | trace::task_new(task.as_ptr() as u32); |
| @@ -354,46 +388,54 @@ impl Executor { | |||
| 354 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's | 388 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's |
| 355 | /// no `poll()` already running. | 389 | /// no `poll()` already running. |
| 356 | pub unsafe fn poll(&'static self) { | 390 | pub unsafe fn poll(&'static self) { |
| 357 | #[cfg(feature = "integrated-timers")] | 391 | loop { |
| 358 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | 392 | #[cfg(feature = "integrated-timers")] |
| 393 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | ||
| 359 | 394 | ||
| 360 | self.run_queue.dequeue_all(|p| { | 395 | self.run_queue.dequeue_all(|p| { |
| 361 | let task = p.as_ref(); | 396 | let task = p.header(); |
| 362 | 397 | ||
| 363 | #[cfg(feature = "integrated-timers")] | 398 | #[cfg(feature = "integrated-timers")] |
| 364 | task.expires_at.set(Instant::MAX); | 399 | task.expires_at.set(Instant::MAX); |
| 365 | 400 | ||
| 366 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | 401 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |
| 367 | if state & STATE_SPAWNED == 0 { | 402 | if state & STATE_SPAWNED == 0 { |
| 368 | // If task is not running, ignore it. This can happen in the following scenario: | 403 | // If task is not running, ignore it. This can happen in the following scenario: |
| 369 | // - Task gets dequeued, poll starts | 404 | // - Task gets dequeued, poll starts |
| 370 | // - While task is being polled, it gets woken. It gets placed in the queue. | 405 | // - While task is being polled, it gets woken. It gets placed in the queue. |
| 371 | // - Task poll finishes, returning done=true | 406 | // - Task poll finishes, returning done=true |
| 372 | // - RUNNING bit is cleared, but the task is already in the queue. | 407 | // - RUNNING bit is cleared, but the task is already in the queue. |
| 373 | return; | 408 | return; |
| 374 | } | 409 | } |
| 410 | |||
| 411 | #[cfg(feature = "rtos-trace")] | ||
| 412 | trace::task_exec_begin(p.as_ptr() as u32); | ||
| 375 | 413 | ||
| 376 | #[cfg(feature = "rtos-trace")] | 414 | // Run the task |
| 377 | trace::task_exec_begin(p.as_ptr() as u32); | 415 | task.poll_fn.get().unwrap_unchecked()(p); |
| 378 | 416 | ||
| 379 | // Run the task | 417 | #[cfg(feature = "rtos-trace")] |
| 380 | task.poll_fn.read()(p as _); | 418 | trace::task_exec_end(); |
| 381 | 419 | ||
| 382 | #[cfg(feature = "rtos-trace")] | 420 | // Enqueue or update into timer_queue |
| 383 | trace::task_exec_end(); | 421 | #[cfg(feature = "integrated-timers")] |
| 422 | self.timer_queue.update(p); | ||
| 423 | }); | ||
| 384 | 424 | ||
| 385 | // Enqueue or update into timer_queue | ||
| 386 | #[cfg(feature = "integrated-timers")] | 425 | #[cfg(feature = "integrated-timers")] |
| 387 | self.timer_queue.update(p); | 426 | { |
| 388 | }); | 427 | // If this is already in the past, set_alarm might return false |
| 428 | // In that case do another poll loop iteration. | ||
| 429 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 430 | if driver::set_alarm(self.alarm, next_expiration.as_ticks()) { | ||
| 431 | break; | ||
| 432 | } | ||
| 433 | } | ||
| 389 | 434 | ||
| 390 | #[cfg(feature = "integrated-timers")] | 435 | #[cfg(not(feature = "integrated-timers"))] |
| 391 | { | 436 | { |
| 392 | // If this is already in the past, set_alarm will immediately trigger the alarm. | 437 | break; |
| 393 | // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, | 438 | } |
| 394 | // so we immediately do another poll loop iteration. | ||
| 395 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 396 | driver::set_alarm(self.alarm, next_expiration.as_ticks()); | ||
| 397 | } | 439 | } |
| 398 | 440 | ||
| 399 | #[cfg(feature = "rtos-trace")] | 441 | #[cfg(feature = "rtos-trace")] |
| @@ -409,16 +451,12 @@ impl Executor { | |||
| 409 | } | 451 | } |
| 410 | } | 452 | } |
| 411 | 453 | ||
| 412 | /// Wake a task by raw pointer. | 454 | /// Wake a task by `TaskRef`. |
| 413 | /// | 455 | /// |
| 414 | /// You can obtain task pointers from `Waker`s using [`task_from_waker`]. | 456 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. |
| 415 | /// | 457 | pub fn wake_task(task: TaskRef) { |
| 416 | /// # Safety | ||
| 417 | /// | ||
| 418 | /// `task` must be a valid task pointer obtained from [`task_from_waker`]. | ||
| 419 | pub unsafe fn wake_task(task: NonNull<TaskHeader>) { | ||
| 420 | critical_section::with(|cs| { | 458 | critical_section::with(|cs| { |
| 421 | let header = task.as_ref(); | 459 | let header = task.header(); |
| 422 | let state = header.state.load(Ordering::Relaxed); | 460 | let state = header.state.load(Ordering::Relaxed); |
| 423 | 461 | ||
| 424 | // If already scheduled, or if not started, | 462 | // If already scheduled, or if not started, |
| @@ -430,20 +468,29 @@ pub unsafe fn wake_task(task: NonNull<TaskHeader>) { | |||
| 430 | header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); | 468 | header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); |
| 431 | 469 | ||
| 432 | // We have just marked the task as scheduled, so enqueue it. | 470 | // We have just marked the task as scheduled, so enqueue it. |
| 433 | let executor = &*header.executor.get(); | 471 | unsafe { |
| 434 | executor.enqueue(cs, task); | 472 | let executor = header.executor.get().unwrap_unchecked(); |
| 473 | executor.enqueue(cs, task); | ||
| 474 | } | ||
| 435 | }) | 475 | }) |
| 436 | } | 476 | } |
| 437 | 477 | ||
| 438 | #[cfg(feature = "integrated-timers")] | 478 | #[cfg(feature = "integrated-timers")] |
| 439 | #[no_mangle] | 479 | struct TimerQueue; |
| 440 | unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { | 480 | |
| 441 | let task = waker::task_from_waker(waker); | 481 | #[cfg(feature = "integrated-timers")] |
| 442 | let task = task.as_ref(); | 482 | impl embassy_time::queue::TimerQueue for TimerQueue { |
| 443 | let expires_at = task.expires_at.get(); | 483 | fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { |
| 444 | task.expires_at.set(expires_at.min(at)); | 484 | let task = waker::task_from_waker(waker); |
| 485 | let task = task.header(); | ||
| 486 | let expires_at = task.expires_at.get(); | ||
| 487 | task.expires_at.set(expires_at.min(at)); | ||
| 488 | } | ||
| 445 | } | 489 | } |
| 446 | 490 | ||
| 491 | #[cfg(feature = "integrated-timers")] | ||
| 492 | embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); | ||
| 493 | |||
| 447 | #[cfg(feature = "rtos-trace")] | 494 | #[cfg(feature = "rtos-trace")] |
| 448 | impl rtos_trace::RtosTraceOSCallbacks for Executor { | 495 | impl rtos_trace::RtosTraceOSCallbacks for Executor { |
| 449 | fn task_list() { | 496 | fn task_list() { |
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index ed8c82a5c..362157535 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs | |||
| @@ -4,7 +4,7 @@ use core::ptr::NonNull; | |||
| 4 | use atomic_polyfill::{AtomicPtr, Ordering}; | 4 | use atomic_polyfill::{AtomicPtr, Ordering}; |
| 5 | use critical_section::CriticalSection; | 5 | use critical_section::CriticalSection; |
| 6 | 6 | ||
| 7 | use super::TaskHeader; | 7 | use super::{TaskHeader, TaskRef}; |
| 8 | 8 | ||
| 9 | pub(crate) struct RunQueueItem { | 9 | pub(crate) struct RunQueueItem { |
| 10 | next: AtomicPtr<TaskHeader>, | 10 | next: AtomicPtr<TaskHeader>, |
| @@ -46,25 +46,26 @@ impl RunQueue { | |||
| 46 | /// | 46 | /// |
| 47 | /// `item` must NOT be already enqueued in any queue. | 47 | /// `item` must NOT be already enqueued in any queue. |
| 48 | #[inline(always)] | 48 | #[inline(always)] |
| 49 | pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { | 49 | pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: TaskRef) -> bool { |
| 50 | let prev = self.head.load(Ordering::Relaxed); | 50 | let prev = self.head.load(Ordering::Relaxed); |
| 51 | task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); | 51 | task.header().run_queue_item.next.store(prev, Ordering::Relaxed); |
| 52 | self.head.store(task.as_ptr(), Ordering::Relaxed); | 52 | self.head.store(task.as_ptr() as _, Ordering::Relaxed); |
| 53 | prev.is_null() | 53 | prev.is_null() |
| 54 | } | 54 | } |
| 55 | 55 | ||
| 56 | /// Empty the queue, then call `on_task` for each task that was in the queue. | 56 | /// Empty the queue, then call `on_task` for each task that was in the queue. |
| 57 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue | 57 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue |
| 58 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. | 58 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. |
| 59 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { | 59 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { |
| 60 | // Atomically empty the queue. | 60 | // Atomically empty the queue. |
| 61 | let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); | 61 | let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); |
| 62 | 62 | ||
| 63 | // Iterate the linked list of tasks that were previously in the queue. | 63 | // Iterate the linked list of tasks that were previously in the queue. |
| 64 | while let Some(task) = NonNull::new(ptr) { | 64 | while let Some(task) = NonNull::new(ptr) { |
| 65 | let task = unsafe { TaskRef::from_ptr(task.as_ptr()) }; | ||
| 65 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | 66 | // If the task re-enqueues itself, the `next` pointer will get overwritten. |
| 66 | // Therefore, first read the next pointer, and only then process the task. | 67 | // Therefore, first read the next pointer, and only then process the task. |
| 67 | let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); | 68 | let next = task.header().run_queue_item.next.load(Ordering::Relaxed); |
| 68 | 69 | ||
| 69 | on_task(task); | 70 | on_task(task); |
| 70 | 71 | ||
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 24c31892a..57d6d3cda 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs | |||
| @@ -1,45 +1,39 @@ | |||
| 1 | use core::cell::Cell; | 1 | use core::cell::Cell; |
| 2 | use core::cmp::min; | 2 | use core::cmp::min; |
| 3 | use core::ptr; | ||
| 4 | use core::ptr::NonNull; | ||
| 5 | 3 | ||
| 6 | use atomic_polyfill::Ordering; | 4 | use atomic_polyfill::Ordering; |
| 7 | use embassy_time::Instant; | 5 | use embassy_time::Instant; |
| 8 | 6 | ||
| 9 | use super::{TaskHeader, STATE_TIMER_QUEUED}; | 7 | use super::{TaskRef, STATE_TIMER_QUEUED}; |
| 10 | 8 | ||
| 11 | pub(crate) struct TimerQueueItem { | 9 | pub(crate) struct TimerQueueItem { |
| 12 | next: Cell<*mut TaskHeader>, | 10 | next: Cell<Option<TaskRef>>, |
| 13 | } | 11 | } |
| 14 | 12 | ||
| 15 | impl TimerQueueItem { | 13 | impl TimerQueueItem { |
| 16 | pub const fn new() -> Self { | 14 | pub const fn new() -> Self { |
| 17 | Self { | 15 | Self { next: Cell::new(None) } |
| 18 | next: Cell::new(ptr::null_mut()), | ||
| 19 | } | ||
| 20 | } | 16 | } |
| 21 | } | 17 | } |
| 22 | 18 | ||
| 23 | pub(crate) struct TimerQueue { | 19 | pub(crate) struct TimerQueue { |
| 24 | head: Cell<*mut TaskHeader>, | 20 | head: Cell<Option<TaskRef>>, |
| 25 | } | 21 | } |
| 26 | 22 | ||
| 27 | impl TimerQueue { | 23 | impl TimerQueue { |
| 28 | pub const fn new() -> Self { | 24 | pub const fn new() -> Self { |
| 29 | Self { | 25 | Self { head: Cell::new(None) } |
| 30 | head: Cell::new(ptr::null_mut()), | ||
| 31 | } | ||
| 32 | } | 26 | } |
| 33 | 27 | ||
| 34 | pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) { | 28 | pub(crate) unsafe fn update(&self, p: TaskRef) { |
| 35 | let task = p.as_ref(); | 29 | let task = p.header(); |
| 36 | if task.expires_at.get() != Instant::MAX { | 30 | if task.expires_at.get() != Instant::MAX { |
| 37 | let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); | 31 | let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); |
| 38 | let is_new = old_state & STATE_TIMER_QUEUED == 0; | 32 | let is_new = old_state & STATE_TIMER_QUEUED == 0; |
| 39 | 33 | ||
| 40 | if is_new { | 34 | if is_new { |
| 41 | task.timer_queue_item.next.set(self.head.get()); | 35 | task.timer_queue_item.next.set(self.head.get()); |
| 42 | self.head.set(p.as_ptr()); | 36 | self.head.set(Some(p)); |
| 43 | } | 37 | } |
| 44 | } | 38 | } |
| 45 | } | 39 | } |
| @@ -47,7 +41,7 @@ impl TimerQueue { | |||
| 47 | pub(crate) unsafe fn next_expiration(&self) -> Instant { | 41 | pub(crate) unsafe fn next_expiration(&self) -> Instant { |
| 48 | let mut res = Instant::MAX; | 42 | let mut res = Instant::MAX; |
| 49 | self.retain(|p| { | 43 | self.retain(|p| { |
| 50 | let task = p.as_ref(); | 44 | let task = p.header(); |
| 51 | let expires = task.expires_at.get(); | 45 | let expires = task.expires_at.get(); |
| 52 | res = min(res, expires); | 46 | res = min(res, expires); |
| 53 | expires != Instant::MAX | 47 | expires != Instant::MAX |
| @@ -55,9 +49,9 @@ impl TimerQueue { | |||
| 55 | res | 49 | res |
| 56 | } | 50 | } |
| 57 | 51 | ||
| 58 | pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) { | 52 | pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(TaskRef)) { |
| 59 | self.retain(|p| { | 53 | self.retain(|p| { |
| 60 | let task = p.as_ref(); | 54 | let task = p.header(); |
| 61 | if task.expires_at.get() <= now { | 55 | if task.expires_at.get() <= now { |
| 62 | on_task(p); | 56 | on_task(p); |
| 63 | false | 57 | false |
| @@ -67,11 +61,10 @@ impl TimerQueue { | |||
| 67 | }); | 61 | }); |
| 68 | } | 62 | } |
| 69 | 63 | ||
| 70 | pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) { | 64 | pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { |
| 71 | let mut prev = &self.head; | 65 | let mut prev = &self.head; |
| 72 | while !prev.get().is_null() { | 66 | while let Some(p) = prev.get() { |
| 73 | let p = NonNull::new_unchecked(prev.get()); | 67 | let task = p.header(); |
| 74 | let task = &*p.as_ptr(); | ||
| 75 | if f(p) { | 68 | if f(p) { |
| 76 | // Skip to next | 69 | // Skip to next |
| 77 | prev = &task.timer_queue_item.next; | 70 | prev = &task.timer_queue_item.next; |
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index ed5822188..2b1f6b6f3 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs | |||
| @@ -25,9 +25,3 @@ impl<T> UninitCell<T> { | |||
| 25 | ptr::drop_in_place(self.as_mut_ptr()) | 25 | ptr::drop_in_place(self.as_mut_ptr()) |
| 26 | } | 26 | } |
| 27 | } | 27 | } |
| 28 | |||
| 29 | impl<T: Copy> UninitCell<T> { | ||
| 30 | pub unsafe fn read(&self) -> T { | ||
| 31 | ptr::read(self.as_mut_ptr()) | ||
| 32 | } | ||
| 33 | } | ||
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 5765259f2..400b37fa9 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs | |||
| @@ -1,8 +1,7 @@ | |||
| 1 | use core::mem; | 1 | use core::mem; |
| 2 | use core::ptr::NonNull; | ||
| 3 | use core::task::{RawWaker, RawWakerVTable, Waker}; | 2 | use core::task::{RawWaker, RawWakerVTable, Waker}; |
| 4 | 3 | ||
| 5 | use super::{wake_task, TaskHeader}; | 4 | use super::{wake_task, TaskHeader, TaskRef}; |
| 6 | 5 | ||
| 7 | const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); | 6 | const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); |
| 8 | 7 | ||
| @@ -11,14 +10,14 @@ unsafe fn clone(p: *const ()) -> RawWaker { | |||
| 11 | } | 10 | } |
| 12 | 11 | ||
| 13 | unsafe fn wake(p: *const ()) { | 12 | unsafe fn wake(p: *const ()) { |
| 14 | wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) | 13 | wake_task(TaskRef::from_ptr(p as *const TaskHeader)) |
| 15 | } | 14 | } |
| 16 | 15 | ||
| 17 | unsafe fn drop(_: *const ()) { | 16 | unsafe fn drop(_: *const ()) { |
| 18 | // nop | 17 | // nop |
| 19 | } | 18 | } |
| 20 | 19 | ||
| 21 | pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { | 20 | pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { |
| 22 | Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) | 21 | Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) |
| 23 | } | 22 | } |
| 24 | 23 | ||
| @@ -33,7 +32,7 @@ pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { | |||
| 33 | /// # Panics | 32 | /// # Panics |
| 34 | /// | 33 | /// |
| 35 | /// Panics if the waker is not created by the Embassy executor. | 34 | /// Panics if the waker is not created by the Embassy executor. |
| 36 | pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { | 35 | pub fn task_from_waker(waker: &Waker) -> TaskRef { |
| 37 | // safety: OK because WakerHack has the same layout as Waker. | 36 | // safety: OK because WakerHack has the same layout as Waker. |
| 38 | // This is not really guaranteed because the structs are `repr(Rust)`, it is | 37 | // This is not really guaranteed because the structs are `repr(Rust)`, it is |
| 39 | // indeed the case in the current implementation. | 38 | // indeed the case in the current implementation. |
| @@ -43,8 +42,8 @@ pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { | |||
| 43 | panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") | 42 | panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") |
| 44 | } | 43 | } |
| 45 | 44 | ||
| 46 | // safety: we never create a waker with a null data pointer. | 45 | // safety: our wakers are always created with `TaskRef::as_ptr` |
| 47 | unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } | 46 | unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) } |
| 48 | } | 47 | } |
| 49 | 48 | ||
| 50 | struct WakerHack { | 49 | struct WakerHack { |
