diff options
| author | Dario Nieuwenhuis <[email protected]> | 2020-12-26 17:22:36 +0100 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2020-12-26 17:22:36 +0100 |
| commit | 8b7a42a4f9271c337d550be1f34e7d163f9eb905 (patch) | |
| tree | fe0c7a367f3e61eed759877976b778b75a733c11 | |
| parent | 3df66c44e3bd84df659e2f4d30bb18128cd89427 (diff) | |
Split waker to separate file.
| -rw-r--r-- | embassy/src/executor/mod.rs | 106 | ||||
| -rw-r--r-- | embassy/src/executor/waker.rs | 22 |
2 files changed, 65 insertions, 63 deletions
diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index 435c97db8..6c76eed76 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs | |||
| @@ -8,16 +8,17 @@ use core::pin::Pin; | |||
| 8 | use core::ptr; | 8 | use core::ptr; |
| 9 | use core::ptr::NonNull; | 9 | use core::ptr::NonNull; |
| 10 | use core::sync::atomic::{AtomicU32, Ordering}; | 10 | use core::sync::atomic::{AtomicU32, Ordering}; |
| 11 | use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; | 11 | use core::task::{Context, Poll, RawWaker, Waker}; |
| 12 | 12 | ||
| 13 | mod run_queue; | 13 | mod run_queue; |
| 14 | mod util; | 14 | mod util; |
| 15 | mod waker; | ||
| 15 | 16 | ||
| 16 | use self::run_queue::{RunQueue, RunQueueItem}; | 17 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 17 | use self::util::UninitCell; | 18 | use self::util::UninitCell; |
| 18 | 19 | ||
| 19 | /// Task is spawned and future hasn't finished running yet. | 20 | /// Task is spawned (has a future) |
| 20 | const STATE_RUNNING: u32 = 1 << 0; | 21 | const STATE_SPAWNED: u32 = 1 << 0; |
| 21 | /// Task is in the executor run queue | 22 | /// Task is in the executor run queue |
| 22 | const STATE_RUN_QUEUED: u32 = 1 << 1; | 23 | const STATE_RUN_QUEUED: u32 = 1 << 1; |
| 23 | /// Task is in the executor timer queue | 24 | /// Task is in the executor timer queue |
| @@ -27,67 +28,46 @@ pub(crate) struct TaskHeader { | |||
| 27 | state: AtomicU32, | 28 | state: AtomicU32, |
| 28 | run_queue_item: RunQueueItem, | 29 | run_queue_item: RunQueueItem, |
| 29 | executor: Cell<*const Executor>, // Valid if state != 0 | 30 | executor: Cell<*const Executor>, // Valid if state != 0 |
| 30 | poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_RUNNING | 31 | poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_SPAWNED |
| 31 | } | 32 | } |
| 32 | 33 | ||
| 33 | // repr(C) is needed to guarantee that header is located at offset 0 | 34 | impl TaskHeader { |
| 34 | // This makes it safe to cast between Header and Task pointers. | 35 | pub(crate) unsafe fn enqueue(&self) { |
| 35 | #[repr(C)] | 36 | let mut current = self.state.load(Ordering::Acquire); |
| 36 | pub struct Task<F: Future + 'static> { | 37 | loop { |
| 37 | header: TaskHeader, | 38 | // If already scheduled, or if not started, |
| 38 | future: UninitCell<F>, // Valid if STATE_RUNNING | 39 | if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { |
| 39 | } | 40 | return; |
| 40 | 41 | } | |
| 41 | #[derive(Copy, Clone, Debug)] | ||
| 42 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 43 | pub enum SpawnError { | ||
| 44 | Busy, | ||
| 45 | } | ||
| 46 | |||
| 47 | //============= | ||
| 48 | // Waker | ||
| 49 | |||
| 50 | static WAKER_VTABLE: RawWakerVTable = | ||
| 51 | RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); | ||
| 52 | |||
| 53 | unsafe fn waker_clone(p: *const ()) -> RawWaker { | ||
| 54 | RawWaker::new(p, &WAKER_VTABLE) | ||
| 55 | } | ||
| 56 | |||
| 57 | unsafe fn waker_wake(p: *const ()) { | ||
| 58 | let header = &*(p as *const TaskHeader); | ||
| 59 | 42 | ||
| 60 | let mut current = header.state.load(Ordering::Acquire); | 43 | // Mark it as scheduled |
| 61 | loop { | 44 | let new = current | STATE_RUN_QUEUED; |
| 62 | // If already scheduled, or if not started, | 45 | |
| 63 | if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) { | 46 | match self.state.compare_exchange_weak( |
| 64 | return; | 47 | current, |
| 48 | new, | ||
| 49 | Ordering::AcqRel, | ||
| 50 | Ordering::Acquire, | ||
| 51 | ) { | ||
| 52 | Ok(_) => break, | ||
| 53 | Err(next_current) => current = next_current, | ||
| 54 | } | ||
| 65 | } | 55 | } |
| 66 | 56 | ||
| 67 | // Mark it as scheduled | 57 | // We have just marked the task as scheduled, so enqueue it. |
| 68 | let new = current | STATE_RUN_QUEUED; | 58 | let executor = &*self.executor.get(); |
| 69 | 59 | executor.enqueue(self as *const TaskHeader as *mut TaskHeader); | |
| 70 | match header | ||
| 71 | .state | ||
| 72 | .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) | ||
| 73 | { | ||
| 74 | Ok(_) => break, | ||
| 75 | Err(next_current) => current = next_current, | ||
| 76 | } | ||
| 77 | } | 60 | } |
| 78 | |||
| 79 | // We have just marked the task as scheduled, so enqueue it. | ||
| 80 | let executor = &*header.executor.get(); | ||
| 81 | executor.enqueue(p as *mut TaskHeader); | ||
| 82 | } | 61 | } |
| 83 | 62 | ||
| 84 | unsafe fn waker_drop(_: *const ()) { | 63 | // repr(C) is needed to guarantee that header is located at offset 0 |
| 85 | // nop | 64 | // This makes it safe to cast between Header and Task pointers. |
| 65 | #[repr(C)] | ||
| 66 | pub struct Task<F: Future + 'static> { | ||
| 67 | header: TaskHeader, | ||
| 68 | future: UninitCell<F>, // Valid if STATE_SPAWNED | ||
| 86 | } | 69 | } |
| 87 | 70 | ||
| 88 | //============= | ||
| 89 | // Task | ||
| 90 | |||
| 91 | impl<F: Future + 'static> Task<F> { | 71 | impl<F: Future + 'static> Task<F> { |
| 92 | pub const fn new() -> Self { | 72 | pub const fn new() -> Self { |
| 93 | Self { | 73 | Self { |
| @@ -103,7 +83,7 @@ impl<F: Future + 'static> Task<F> { | |||
| 103 | 83 | ||
| 104 | pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { | 84 | pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { |
| 105 | for task in pool { | 85 | for task in pool { |
| 106 | let state = STATE_RUNNING | STATE_RUN_QUEUED; | 86 | let state = STATE_SPAWNED | STATE_RUN_QUEUED; |
| 107 | if task | 87 | if task |
| 108 | .header | 88 | .header |
| 109 | .state | 89 | .state |
| @@ -129,14 +109,14 @@ impl<F: Future + 'static> Task<F> { | |||
| 129 | let this = &*(p as *const Task<F>); | 109 | let this = &*(p as *const Task<F>); |
| 130 | 110 | ||
| 131 | let future = Pin::new_unchecked(this.future.as_mut()); | 111 | let future = Pin::new_unchecked(this.future.as_mut()); |
| 132 | let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); | 112 | let waker = waker::from_task(p); |
| 133 | let mut cx = Context::from_waker(&waker); | 113 | let mut cx = Context::from_waker(&waker); |
| 134 | match future.poll(&mut cx) { | 114 | match future.poll(&mut cx) { |
| 135 | Poll::Ready(_) => { | 115 | Poll::Ready(_) => { |
| 136 | this.future.drop_in_place(); | 116 | this.future.drop_in_place(); |
| 137 | this.header | 117 | this.header |
| 138 | .state | 118 | .state |
| 139 | .fetch_and(!STATE_RUNNING, Ordering::AcqRel); | 119 | .fetch_and(!STATE_SPAWNED, Ordering::AcqRel); |
| 140 | } | 120 | } |
| 141 | Poll::Pending => {} | 121 | Poll::Pending => {} |
| 142 | } | 122 | } |
| @@ -145,9 +125,6 @@ impl<F: Future + 'static> Task<F> { | |||
| 145 | 125 | ||
| 146 | unsafe impl<F: Future + 'static> Sync for Task<F> {} | 126 | unsafe impl<F: Future + 'static> Sync for Task<F> {} |
| 147 | 127 | ||
| 148 | //============= | ||
| 149 | // Spawn token | ||
| 150 | |||
| 151 | #[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] | 128 | #[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] |
| 152 | pub struct SpawnToken { | 129 | pub struct SpawnToken { |
| 153 | header: Option<NonNull<TaskHeader>>, | 130 | header: Option<NonNull<TaskHeader>>, |
| @@ -160,8 +137,11 @@ impl Drop for SpawnToken { | |||
| 160 | } | 137 | } |
| 161 | } | 138 | } |
| 162 | 139 | ||
| 163 | //============= | 140 | #[derive(Copy, Clone, Debug)] |
| 164 | // Executor | 141 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| 142 | pub enum SpawnError { | ||
| 143 | Busy, | ||
| 144 | } | ||
| 165 | 145 | ||
| 166 | pub struct Executor { | 146 | pub struct Executor { |
| 167 | run_queue: RunQueue, | 147 | run_queue: RunQueue, |
| @@ -207,7 +187,7 @@ impl Executor { | |||
| 207 | let header = &*p; | 187 | let header = &*p; |
| 208 | 188 | ||
| 209 | let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | 189 | let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |
| 210 | if state & STATE_RUNNING == 0 { | 190 | if state & STATE_SPAWNED == 0 { |
| 211 | // If task is not running, ignore it. This can happen in the following scenario: | 191 | // If task is not running, ignore it. This can happen in the following scenario: |
| 212 | // - Task gets dequeued, poll starts | 192 | // - Task gets dequeued, poll starts |
| 213 | // - While task is being polled, it gets woken. It gets placed in the queue. | 193 | // - While task is being polled, it gets woken. It gets placed in the queue. |
diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs new file mode 100644 index 000000000..662857dea --- /dev/null +++ b/embassy/src/executor/waker.rs | |||
| @@ -0,0 +1,22 @@ | |||
| 1 | use core::task::{RawWaker, RawWakerVTable, Waker}; | ||
| 2 | |||
| 3 | use super::TaskHeader; | ||
| 4 | |||
| 5 | static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); | ||
| 6 | |||
| 7 | unsafe fn clone(p: *const ()) -> RawWaker { | ||
| 8 | RawWaker::new(p, &VTABLE) | ||
| 9 | } | ||
| 10 | |||
| 11 | unsafe fn wake(p: *const ()) { | ||
| 12 | let header = &*(p as *const TaskHeader); | ||
| 13 | header.enqueue(); | ||
| 14 | } | ||
| 15 | |||
| 16 | unsafe fn drop(_: *const ()) { | ||
| 17 | // nop | ||
| 18 | } | ||
| 19 | |||
| 20 | pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker { | ||
| 21 | Waker::from_raw(RawWaker::new(p as _, &VTABLE)) | ||
| 22 | } | ||
