diff options
| -rw-r--r-- | embassy/src/executor/executor.rs | 301 | ||||
| -rw-r--r-- | embassy/src/executor/mod.rs | 229 | ||||
| -rw-r--r-- | embassy/src/executor/run_queue.rs | 70 | ||||
| -rw-r--r-- | embassy/src/executor/timer_executor.rs | 77 | ||||
| -rw-r--r-- | embassy/src/executor/util.rs | 32 | ||||
| -rw-r--r-- | embassy/src/time/mod.rs | 2 | ||||
| -rw-r--r-- | embassy/src/time/timer.rs | 63 |
7 files changed, 324 insertions, 450 deletions
diff --git a/embassy/src/executor/executor.rs b/embassy/src/executor/executor.rs deleted file mode 100644 index 81a915778..000000000 --- a/embassy/src/executor/executor.rs +++ /dev/null | |||
| @@ -1,301 +0,0 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::cell::UnsafeCell; | ||
| 3 | use core::future::Future; | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | use core::mem; | ||
| 6 | use core::mem::MaybeUninit; | ||
| 7 | use core::pin::Pin; | ||
| 8 | use core::ptr; | ||
| 9 | use core::ptr::NonNull; | ||
| 10 | use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; | ||
| 11 | use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; | ||
| 12 | |||
| 13 | //============= | ||
| 14 | // UninitCell | ||
| 15 | |||
| 16 | struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>); | ||
| 17 | impl<T> UninitCell<T> { | ||
| 18 | const fn uninit() -> Self { | ||
| 19 | Self(MaybeUninit::uninit()) | ||
| 20 | } | ||
| 21 | |||
| 22 | unsafe fn as_mut_ptr(&self) -> *mut T { | ||
| 23 | (*self.0.as_ptr()).get() | ||
| 24 | } | ||
| 25 | |||
| 26 | unsafe fn as_mut(&self) -> &mut T { | ||
| 27 | &mut *self.as_mut_ptr() | ||
| 28 | } | ||
| 29 | |||
| 30 | unsafe fn write(&self, val: T) { | ||
| 31 | ptr::write(self.as_mut_ptr(), val) | ||
| 32 | } | ||
| 33 | |||
| 34 | unsafe fn drop_in_place(&self) { | ||
| 35 | ptr::drop_in_place(self.as_mut_ptr()) | ||
| 36 | } | ||
| 37 | } | ||
| 38 | |||
| 39 | impl<T: Copy> UninitCell<T> { | ||
| 40 | unsafe fn read(&self) -> T { | ||
| 41 | ptr::read(self.as_mut_ptr()) | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | //============= | ||
| 46 | // Data structures | ||
| 47 | |||
| 48 | const STATE_RUNNING: u32 = 1 << 0; | ||
| 49 | const STATE_QUEUED: u32 = 1 << 1; | ||
| 50 | |||
| 51 | struct Header { | ||
| 52 | state: AtomicU32, | ||
| 53 | next: AtomicPtr<Header>, | ||
| 54 | executor: Cell<*const Executor>, | ||
| 55 | poll_fn: UninitCell<unsafe fn(*mut Header)>, // Valid if STATE_RUNNING | ||
| 56 | } | ||
| 57 | |||
| 58 | // repr(C) is needed to guarantee that header is located at offset 0 | ||
| 59 | // This makes it safe to cast between Header and Task pointers. | ||
| 60 | #[repr(C)] | ||
| 61 | pub struct Task<F: Future + 'static> { | ||
| 62 | header: Header, | ||
| 63 | future: UninitCell<F>, // Valid if STATE_RUNNING | ||
| 64 | } | ||
| 65 | |||
| 66 | #[derive(Copy, Clone, Debug)] | ||
| 67 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 68 | pub enum SpawnError { | ||
| 69 | Busy, | ||
| 70 | } | ||
| 71 | |||
| 72 | //============= | ||
| 73 | // Atomic task queue using a very, very simple lock-free linked-list queue: | ||
| 74 | // | ||
| 75 | // To enqueue a task, task.next is set to the old head, and head is atomically set to task. | ||
| 76 | // | ||
| 77 | // Dequeuing is done in batches: the queue is emptied by atomically replacing head with | ||
| 78 | // null. Then the batch is iterated following the next pointers until null is reached. | ||
| 79 | // | ||
| 80 | // Note that batches will be iterated in the opposite order as they were enqueued. This should | ||
| 81 | // be OK for our use case. Hopefully it doesn't create executor fairness problems. | ||
| 82 | |||
| 83 | struct Queue { | ||
| 84 | head: AtomicPtr<Header>, | ||
| 85 | } | ||
| 86 | |||
| 87 | impl Queue { | ||
| 88 | const fn new() -> Self { | ||
| 89 | Self { | ||
| 90 | head: AtomicPtr::new(ptr::null_mut()), | ||
| 91 | } | ||
| 92 | } | ||
| 93 | |||
| 94 | /// Enqueues an item. Returns true if the queue was empty. | ||
| 95 | unsafe fn enqueue(&self, item: *mut Header) -> bool { | ||
| 96 | let mut prev = self.head.load(Ordering::Acquire); | ||
| 97 | loop { | ||
| 98 | (*item).next.store(prev, Ordering::Relaxed); | ||
| 99 | match self | ||
| 100 | .head | ||
| 101 | .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) | ||
| 102 | { | ||
| 103 | Ok(_) => break, | ||
| 104 | Err(next_prev) => prev = next_prev, | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | prev.is_null() | ||
| 109 | } | ||
| 110 | |||
| 111 | unsafe fn dequeue_all(&self, on_task: impl Fn(*mut Header)) { | ||
| 112 | let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); | ||
| 113 | |||
| 114 | while !task.is_null() { | ||
| 115 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | ||
| 116 | // Therefore, first read the next pointer, and only then process the task. | ||
| 117 | let next = (*task).next.load(Ordering::Relaxed); | ||
| 118 | |||
| 119 | on_task(task); | ||
| 120 | |||
| 121 | task = next | ||
| 122 | } | ||
| 123 | } | ||
| 124 | } | ||
| 125 | |||
| 126 | //============= | ||
| 127 | // Waker | ||
| 128 | |||
| 129 | static WAKER_VTABLE: RawWakerVTable = | ||
| 130 | RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); | ||
| 131 | |||
| 132 | unsafe fn waker_clone(p: *const ()) -> RawWaker { | ||
| 133 | RawWaker::new(p, &WAKER_VTABLE) | ||
| 134 | } | ||
| 135 | |||
| 136 | unsafe fn waker_wake(p: *const ()) { | ||
| 137 | let header = &*(p as *const Header); | ||
| 138 | |||
| 139 | let mut current = header.state.load(Ordering::Acquire); | ||
| 140 | loop { | ||
| 141 | // If already scheduled, or if not started, | ||
| 142 | if (current & STATE_QUEUED != 0) || (current & STATE_RUNNING == 0) { | ||
| 143 | return; | ||
| 144 | } | ||
| 145 | |||
| 146 | // Mark it as scheduled | ||
| 147 | let new = current | STATE_QUEUED; | ||
| 148 | |||
| 149 | match header | ||
| 150 | .state | ||
| 151 | .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) | ||
| 152 | { | ||
| 153 | Ok(_) => break, | ||
| 154 | Err(next_current) => current = next_current, | ||
| 155 | } | ||
| 156 | } | ||
| 157 | |||
| 158 | // We have just marked the task as scheduled, so enqueue it. | ||
| 159 | let executor = &*header.executor.get(); | ||
| 160 | executor.enqueue(p as *mut Header); | ||
| 161 | } | ||
| 162 | |||
| 163 | unsafe fn waker_drop(_: *const ()) { | ||
| 164 | // nop | ||
| 165 | } | ||
| 166 | |||
| 167 | //============= | ||
| 168 | // Task | ||
| 169 | |||
| 170 | impl<F: Future + 'static> Task<F> { | ||
| 171 | pub const fn new() -> Self { | ||
| 172 | Self { | ||
| 173 | header: Header { | ||
| 174 | state: AtomicU32::new(0), | ||
| 175 | next: AtomicPtr::new(ptr::null_mut()), | ||
| 176 | executor: Cell::new(ptr::null()), | ||
| 177 | poll_fn: UninitCell::uninit(), | ||
| 178 | }, | ||
| 179 | future: UninitCell::uninit(), | ||
| 180 | } | ||
| 181 | } | ||
| 182 | |||
| 183 | pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { | ||
| 184 | for task in pool { | ||
| 185 | let state = STATE_RUNNING | STATE_QUEUED; | ||
| 186 | if task | ||
| 187 | .header | ||
| 188 | .state | ||
| 189 | .compare_and_swap(0, state, Ordering::AcqRel) | ||
| 190 | == 0 | ||
| 191 | { | ||
| 192 | // Initialize the task | ||
| 193 | task.header.poll_fn.write(Self::poll); | ||
| 194 | task.future.write(future()); | ||
| 195 | |||
| 196 | return SpawnToken { | ||
| 197 | header: Some(NonNull::new_unchecked(&task.header as *const Header as _)), | ||
| 198 | }; | ||
| 199 | } | ||
| 200 | } | ||
| 201 | |||
| 202 | return SpawnToken { header: None }; | ||
| 203 | } | ||
| 204 | |||
| 205 | unsafe fn poll(p: *mut Header) { | ||
| 206 | let this = &*(p as *const Task<F>); | ||
| 207 | |||
| 208 | let future = Pin::new_unchecked(this.future.as_mut()); | ||
| 209 | let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); | ||
| 210 | let mut cx = Context::from_waker(&waker); | ||
| 211 | match future.poll(&mut cx) { | ||
| 212 | Poll::Ready(_) => { | ||
| 213 | this.future.drop_in_place(); | ||
| 214 | this.header | ||
| 215 | .state | ||
| 216 | .fetch_and(!STATE_RUNNING, Ordering::AcqRel); | ||
| 217 | } | ||
| 218 | Poll::Pending => {} | ||
| 219 | } | ||
| 220 | } | ||
| 221 | } | ||
| 222 | |||
| 223 | unsafe impl<F: Future + 'static> Sync for Task<F> {} | ||
| 224 | |||
| 225 | //============= | ||
| 226 | // Spawn token | ||
| 227 | |||
| 228 | #[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] | ||
| 229 | pub struct SpawnToken { | ||
| 230 | header: Option<NonNull<Header>>, | ||
| 231 | } | ||
| 232 | |||
| 233 | impl Drop for SpawnToken { | ||
| 234 | fn drop(&mut self) { | ||
| 235 | // TODO deallocate the task instead. | ||
| 236 | panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") | ||
| 237 | } | ||
| 238 | } | ||
| 239 | |||
| 240 | //============= | ||
| 241 | // Executor | ||
| 242 | |||
| 243 | pub struct Executor { | ||
| 244 | queue: Queue, | ||
| 245 | signal_fn: fn(), | ||
| 246 | not_send: PhantomData<*mut ()>, | ||
| 247 | } | ||
| 248 | |||
| 249 | impl Executor { | ||
| 250 | pub const fn new(signal_fn: fn()) -> Self { | ||
| 251 | Self { | ||
| 252 | queue: Queue::new(), | ||
| 253 | signal_fn: signal_fn, | ||
| 254 | not_send: PhantomData, | ||
| 255 | } | ||
| 256 | } | ||
| 257 | |||
| 258 | unsafe fn enqueue(&self, item: *mut Header) { | ||
| 259 | if self.queue.enqueue(item) { | ||
| 260 | (self.signal_fn)() | ||
| 261 | } | ||
| 262 | } | ||
| 263 | |||
| 264 | /// Spawn a future on this executor. | ||
| 265 | pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { | ||
| 266 | let header = token.header; | ||
| 267 | mem::forget(token); | ||
| 268 | |||
| 269 | match header { | ||
| 270 | Some(header) => unsafe { | ||
| 271 | let header = header.as_ref(); | ||
| 272 | header.executor.set(self); | ||
| 273 | self.enqueue(header as *const _ as _); | ||
| 274 | Ok(()) | ||
| 275 | }, | ||
| 276 | None => Err(SpawnError::Busy), | ||
| 277 | } | ||
| 278 | } | ||
| 279 | |||
| 280 | /// Runs the executor until the queue is empty. | ||
| 281 | pub fn run(&self) { | ||
| 282 | unsafe { | ||
| 283 | self.queue.dequeue_all(|p| { | ||
| 284 | let header = &*p; | ||
| 285 | |||
| 286 | let state = header.state.fetch_and(!STATE_QUEUED, Ordering::AcqRel); | ||
| 287 | if state & STATE_RUNNING == 0 { | ||
| 288 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 289 | // - Task gets dequeued, poll starts | ||
| 290 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 291 | // - Task poll finishes, returning done=true | ||
| 292 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 293 | return; | ||
| 294 | } | ||
| 295 | |||
| 296 | // Run the task | ||
| 297 | header.poll_fn.read()(p as _); | ||
| 298 | }); | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index 1a68bdfde..435c97db8 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs | |||
| @@ -1,9 +1,224 @@ | |||
| 1 | mod executor; | 1 | pub use embassy_macros::task; |
| 2 | mod timer_executor; | ||
| 3 | 2 | ||
| 4 | // for time::Timer | 3 | use core::cell::Cell; |
| 5 | pub(crate) use timer_executor::current_timer_queue; | 4 | use core::future::Future; |
| 5 | use core::marker::PhantomData; | ||
| 6 | use core::mem; | ||
| 7 | use core::pin::Pin; | ||
| 8 | use core::ptr; | ||
| 9 | use core::ptr::NonNull; | ||
| 10 | use core::sync::atomic::{AtomicU32, Ordering}; | ||
| 11 | use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; | ||
| 6 | 12 | ||
| 7 | pub use embassy_macros::task; | 13 | mod run_queue; |
| 8 | pub use executor::{Executor, SpawnError, SpawnToken, Task}; | 14 | mod util; |
| 9 | pub use timer_executor::TimerExecutor; | 15 | |
| 16 | use self::run_queue::{RunQueue, RunQueueItem}; | ||
| 17 | use self::util::UninitCell; | ||
| 18 | |||
| 19 | /// Task is spawned and future hasn't finished running yet. | ||
| 20 | const STATE_RUNNING: u32 = 1 << 0; | ||
| 21 | /// Task is in the executor run queue | ||
| 22 | const STATE_RUN_QUEUED: u32 = 1 << 1; | ||
| 23 | /// Task is in the executor timer queue | ||
| 24 | const STATE_TIMER_QUEUED: u32 = 1 << 2; | ||
| 25 | |||
| 26 | pub(crate) struct TaskHeader { | ||
| 27 | state: AtomicU32, | ||
| 28 | run_queue_item: RunQueueItem, | ||
| 29 | executor: Cell<*const Executor>, // Valid if state != 0 | ||
| 30 | poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_RUNNING | ||
| 31 | } | ||
| 32 | |||
| 33 | // repr(C) is needed to guarantee that header is located at offset 0 | ||
| 34 | // This makes it safe to cast between Header and Task pointers. | ||
| 35 | #[repr(C)] | ||
| 36 | pub struct Task<F: Future + 'static> { | ||
| 37 | header: TaskHeader, | ||
| 38 | future: UninitCell<F>, // Valid if STATE_RUNNING | ||
| 39 | } | ||
| 40 | |||
| 41 | #[derive(Copy, Clone, Debug)] | ||
| 42 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 43 | pub enum SpawnError { | ||
| 44 | Busy, | ||
| 45 | } | ||
| 46 | |||
| 47 | //============= | ||
| 48 | // Waker | ||
| 49 | |||
| 50 | static WAKER_VTABLE: RawWakerVTable = | ||
| 51 | RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); | ||
| 52 | |||
| 53 | unsafe fn waker_clone(p: *const ()) -> RawWaker { | ||
| 54 | RawWaker::new(p, &WAKER_VTABLE) | ||
| 55 | } | ||
| 56 | |||
| 57 | unsafe fn waker_wake(p: *const ()) { | ||
| 58 | let header = &*(p as *const TaskHeader); | ||
| 59 | |||
| 60 | let mut current = header.state.load(Ordering::Acquire); | ||
| 61 | loop { | ||
| 62 | // If already scheduled, or if not started, | ||
| 63 | if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) { | ||
| 64 | return; | ||
| 65 | } | ||
| 66 | |||
| 67 | // Mark it as scheduled | ||
| 68 | let new = current | STATE_RUN_QUEUED; | ||
| 69 | |||
| 70 | match header | ||
| 71 | .state | ||
| 72 | .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) | ||
| 73 | { | ||
| 74 | Ok(_) => break, | ||
| 75 | Err(next_current) => current = next_current, | ||
| 76 | } | ||
| 77 | } | ||
| 78 | |||
| 79 | // We have just marked the task as scheduled, so enqueue it. | ||
| 80 | let executor = &*header.executor.get(); | ||
| 81 | executor.enqueue(p as *mut TaskHeader); | ||
| 82 | } | ||
| 83 | |||
| 84 | unsafe fn waker_drop(_: *const ()) { | ||
| 85 | // nop | ||
| 86 | } | ||
| 87 | |||
| 88 | //============= | ||
| 89 | // Task | ||
| 90 | |||
| 91 | impl<F: Future + 'static> Task<F> { | ||
| 92 | pub const fn new() -> Self { | ||
| 93 | Self { | ||
| 94 | header: TaskHeader { | ||
| 95 | state: AtomicU32::new(0), | ||
| 96 | run_queue_item: RunQueueItem::new(), | ||
| 97 | executor: Cell::new(ptr::null()), | ||
| 98 | poll_fn: UninitCell::uninit(), | ||
| 99 | }, | ||
| 100 | future: UninitCell::uninit(), | ||
| 101 | } | ||
| 102 | } | ||
| 103 | |||
| 104 | pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { | ||
| 105 | for task in pool { | ||
| 106 | let state = STATE_RUNNING | STATE_RUN_QUEUED; | ||
| 107 | if task | ||
| 108 | .header | ||
| 109 | .state | ||
| 110 | .compare_and_swap(0, state, Ordering::AcqRel) | ||
| 111 | == 0 | ||
| 112 | { | ||
| 113 | // Initialize the task | ||
| 114 | task.header.poll_fn.write(Self::poll); | ||
| 115 | task.future.write(future()); | ||
| 116 | |||
| 117 | return SpawnToken { | ||
| 118 | header: Some(NonNull::new_unchecked( | ||
| 119 | &task.header as *const TaskHeader as _, | ||
| 120 | )), | ||
| 121 | }; | ||
| 122 | } | ||
| 123 | } | ||
| 124 | |||
| 125 | return SpawnToken { header: None }; | ||
| 126 | } | ||
| 127 | |||
| 128 | unsafe fn poll(p: *mut TaskHeader) { | ||
| 129 | let this = &*(p as *const Task<F>); | ||
| 130 | |||
| 131 | let future = Pin::new_unchecked(this.future.as_mut()); | ||
| 132 | let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); | ||
| 133 | let mut cx = Context::from_waker(&waker); | ||
| 134 | match future.poll(&mut cx) { | ||
| 135 | Poll::Ready(_) => { | ||
| 136 | this.future.drop_in_place(); | ||
| 137 | this.header | ||
| 138 | .state | ||
| 139 | .fetch_and(!STATE_RUNNING, Ordering::AcqRel); | ||
| 140 | } | ||
| 141 | Poll::Pending => {} | ||
| 142 | } | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | unsafe impl<F: Future + 'static> Sync for Task<F> {} | ||
| 147 | |||
| 148 | //============= | ||
| 149 | // Spawn token | ||
| 150 | |||
| 151 | #[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] | ||
| 152 | pub struct SpawnToken { | ||
| 153 | header: Option<NonNull<TaskHeader>>, | ||
| 154 | } | ||
| 155 | |||
| 156 | impl Drop for SpawnToken { | ||
| 157 | fn drop(&mut self) { | ||
| 158 | // TODO deallocate the task instead. | ||
| 159 | panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") | ||
| 160 | } | ||
| 161 | } | ||
| 162 | |||
| 163 | //============= | ||
| 164 | // Executor | ||
| 165 | |||
| 166 | pub struct Executor { | ||
| 167 | run_queue: RunQueue, | ||
| 168 | signal_fn: fn(), | ||
| 169 | not_send: PhantomData<*mut ()>, | ||
| 170 | } | ||
| 171 | |||
| 172 | impl Executor { | ||
| 173 | pub const fn new(signal_fn: fn()) -> Self { | ||
| 174 | Self { | ||
| 175 | run_queue: RunQueue::new(), | ||
| 176 | signal_fn: signal_fn, | ||
| 177 | not_send: PhantomData, | ||
| 178 | } | ||
| 179 | } | ||
| 180 | |||
| 181 | unsafe fn enqueue(&self, item: *mut TaskHeader) { | ||
| 182 | if self.run_queue.enqueue(item) { | ||
| 183 | (self.signal_fn)() | ||
| 184 | } | ||
| 185 | } | ||
| 186 | |||
| 187 | /// Spawn a future on this executor. | ||
| 188 | pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { | ||
| 189 | let header = token.header; | ||
| 190 | mem::forget(token); | ||
| 191 | |||
| 192 | match header { | ||
| 193 | Some(header) => unsafe { | ||
| 194 | let header = header.as_ref(); | ||
| 195 | header.executor.set(self); | ||
| 196 | self.enqueue(header as *const _ as _); | ||
| 197 | Ok(()) | ||
| 198 | }, | ||
| 199 | None => Err(SpawnError::Busy), | ||
| 200 | } | ||
| 201 | } | ||
| 202 | |||
| 203 | /// Runs the executor until the queue is empty. | ||
| 204 | pub fn run(&self) { | ||
| 205 | unsafe { | ||
| 206 | self.run_queue.dequeue_all(|p| { | ||
| 207 | let header = &*p; | ||
| 208 | |||
| 209 | let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | ||
| 210 | if state & STATE_RUNNING == 0 { | ||
| 211 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 212 | // - Task gets dequeued, poll starts | ||
| 213 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 214 | // - Task poll finishes, returning done=true | ||
| 215 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 216 | return; | ||
| 217 | } | ||
| 218 | |||
| 219 | // Run the task | ||
| 220 | header.poll_fn.read()(p as _); | ||
| 221 | }); | ||
| 222 | } | ||
| 223 | } | ||
| 224 | } | ||
diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs new file mode 100644 index 000000000..1cdecee33 --- /dev/null +++ b/embassy/src/executor/run_queue.rs | |||
| @@ -0,0 +1,70 @@ | |||
| 1 | use core::ptr; | ||
| 2 | use core::sync::atomic::{AtomicPtr, Ordering}; | ||
| 3 | |||
| 4 | use super::TaskHeader; | ||
| 5 | |||
| 6 | pub(crate) struct RunQueueItem { | ||
| 7 | next: AtomicPtr<TaskHeader>, | ||
| 8 | } | ||
| 9 | |||
| 10 | impl RunQueueItem { | ||
| 11 | pub const fn new() -> Self { | ||
| 12 | Self { | ||
| 13 | next: AtomicPtr::new(ptr::null_mut()), | ||
| 14 | } | ||
| 15 | } | ||
| 16 | } | ||
| 17 | |||
| 18 | /// Atomic task queue using a very, very simple lock-free linked-list queue: | ||
| 19 | /// | ||
| 20 | /// To enqueue a task, task.next is set to the old head, and head is atomically set to task. | ||
| 21 | /// | ||
| 22 | /// Dequeuing is done in batches: the queue is emptied by atomically replacing head with | ||
| 23 | /// null. Then the batch is iterated following the next pointers until null is reached. | ||
| 24 | /// | ||
| 25 | /// Note that batches will be iterated in the reverse order as they were enqueued. This is OK | ||
| 26 | /// for our purposes: it can't crate fairness problems since the next batch won't run until the | ||
| 27 | /// current batch is completely processed, so even if a task enqueues itself instantly (for example | ||
| 28 | /// by waking its own waker) can't prevent other tasks from running. | ||
| 29 | pub(crate) struct RunQueue { | ||
| 30 | head: AtomicPtr<TaskHeader>, | ||
| 31 | } | ||
| 32 | |||
| 33 | impl RunQueue { | ||
| 34 | pub const fn new() -> Self { | ||
| 35 | Self { | ||
| 36 | head: AtomicPtr::new(ptr::null_mut()), | ||
| 37 | } | ||
| 38 | } | ||
| 39 | |||
| 40 | /// Enqueues an item. Returns true if the queue was empty. | ||
| 41 | pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool { | ||
| 42 | let mut prev = self.head.load(Ordering::Acquire); | ||
| 43 | loop { | ||
| 44 | (*item).run_queue_item.next.store(prev, Ordering::Relaxed); | ||
| 45 | match self | ||
| 46 | .head | ||
| 47 | .compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire) | ||
| 48 | { | ||
| 49 | Ok(_) => break, | ||
| 50 | Err(next_prev) => prev = next_prev, | ||
| 51 | } | ||
| 52 | } | ||
| 53 | |||
| 54 | prev.is_null() | ||
| 55 | } | ||
| 56 | |||
| 57 | pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) { | ||
| 58 | let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); | ||
| 59 | |||
| 60 | while !task.is_null() { | ||
| 61 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | ||
| 62 | // Therefore, first read the next pointer, and only then process the task. | ||
| 63 | let next = (*task).run_queue_item.next.load(Ordering::Relaxed); | ||
| 64 | |||
| 65 | on_task(task); | ||
| 66 | |||
| 67 | task = next | ||
| 68 | } | ||
| 69 | } | ||
| 70 | } | ||
diff --git a/embassy/src/executor/timer_executor.rs b/embassy/src/executor/timer_executor.rs deleted file mode 100644 index 1f89490f2..000000000 --- a/embassy/src/executor/timer_executor.rs +++ /dev/null | |||
| @@ -1,77 +0,0 @@ | |||
| 1 | use super::executor::{Executor, SpawnError, SpawnToken}; | ||
| 2 | use core::ptr; | ||
| 3 | use core::sync::atomic::{AtomicPtr, Ordering}; | ||
| 4 | use futures_intrusive::timer as fi; | ||
| 5 | |||
| 6 | use crate::time::Alarm; | ||
| 7 | |||
| 8 | pub(crate) struct IntrusiveClock; | ||
| 9 | |||
| 10 | impl fi::Clock for IntrusiveClock { | ||
| 11 | fn now(&self) -> u64 { | ||
| 12 | crate::time::now() | ||
| 13 | } | ||
| 14 | } | ||
| 15 | |||
| 16 | pub(crate) type TimerQueue = fi::LocalTimerService; | ||
| 17 | |||
| 18 | pub struct TimerExecutor<A: Alarm> { | ||
| 19 | inner: Executor, | ||
| 20 | alarm: A, | ||
| 21 | timer_queue: TimerQueue, | ||
| 22 | } | ||
| 23 | |||
| 24 | impl<A: Alarm> TimerExecutor<A> { | ||
| 25 | pub fn new(alarm: A, signal_fn: fn()) -> Self { | ||
| 26 | alarm.set_callback(signal_fn); | ||
| 27 | Self { | ||
| 28 | inner: Executor::new(signal_fn), | ||
| 29 | alarm, | ||
| 30 | timer_queue: TimerQueue::new(&IntrusiveClock), | ||
| 31 | } | ||
| 32 | } | ||
| 33 | |||
| 34 | /// Spawn a future on this executor. | ||
| 35 | /// | ||
| 36 | /// safety: can only be called from the executor thread | ||
| 37 | pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { | ||
| 38 | self.inner.spawn(token) | ||
| 39 | } | ||
| 40 | |||
| 41 | /// Runs the executor until the queue is empty. | ||
| 42 | /// | ||
| 43 | /// safety: can only be called from the executor thread | ||
| 44 | pub fn run(&'static self) { | ||
| 45 | with_timer_queue(&self.timer_queue, || { | ||
| 46 | self.timer_queue.check_expirations(); | ||
| 47 | self.inner.run(); | ||
| 48 | |||
| 49 | match self.timer_queue.next_expiration() { | ||
| 50 | // If this is in the past, set_alarm will immediately trigger the alarm, | ||
| 51 | // which will make the wfe immediately return so we do another loop iteration. | ||
| 52 | Some(at) => self.alarm.set(at), | ||
| 53 | None => self.alarm.clear(), | ||
| 54 | } | ||
| 55 | }) | ||
| 56 | } | ||
| 57 | } | ||
| 58 | |||
| 59 | static CURRENT_TIMER_QUEUE: AtomicPtr<TimerQueue> = AtomicPtr::new(ptr::null_mut()); | ||
| 60 | |||
| 61 | fn with_timer_queue<R>(svc: &'static TimerQueue, f: impl FnOnce() -> R) -> R { | ||
| 62 | let svc = svc as *const _ as *mut _; | ||
| 63 | let prev_svc = CURRENT_TIMER_QUEUE.swap(svc, Ordering::Relaxed); | ||
| 64 | let r = f(); | ||
| 65 | let svc2 = CURRENT_TIMER_QUEUE.swap(prev_svc, Ordering::Relaxed); | ||
| 66 | assert_eq!(svc, svc2); | ||
| 67 | r | ||
| 68 | } | ||
| 69 | |||
| 70 | pub(crate) fn current_timer_queue() -> &'static TimerQueue { | ||
| 71 | unsafe { | ||
| 72 | CURRENT_TIMER_QUEUE | ||
| 73 | .load(Ordering::Relaxed) | ||
| 74 | .as_ref() | ||
| 75 | .unwrap() | ||
| 76 | } | ||
| 77 | } | ||
diff --git a/embassy/src/executor/util.rs b/embassy/src/executor/util.rs new file mode 100644 index 000000000..ca15b6955 --- /dev/null +++ b/embassy/src/executor/util.rs | |||
| @@ -0,0 +1,32 @@ | |||
| 1 | use core::cell::UnsafeCell; | ||
| 2 | use core::mem::MaybeUninit; | ||
| 3 | use core::ptr; | ||
| 4 | |||
| 5 | pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>); | ||
| 6 | impl<T> UninitCell<T> { | ||
| 7 | pub const fn uninit() -> Self { | ||
| 8 | Self(MaybeUninit::uninit()) | ||
| 9 | } | ||
| 10 | |||
| 11 | pub unsafe fn as_mut_ptr(&self) -> *mut T { | ||
| 12 | (*self.0.as_ptr()).get() | ||
| 13 | } | ||
| 14 | |||
| 15 | pub unsafe fn as_mut(&self) -> &mut T { | ||
| 16 | &mut *self.as_mut_ptr() | ||
| 17 | } | ||
| 18 | |||
| 19 | pub unsafe fn write(&self, val: T) { | ||
| 20 | ptr::write(self.as_mut_ptr(), val) | ||
| 21 | } | ||
| 22 | |||
| 23 | pub unsafe fn drop_in_place(&self) { | ||
| 24 | ptr::drop_in_place(self.as_mut_ptr()) | ||
| 25 | } | ||
| 26 | } | ||
| 27 | |||
| 28 | impl<T: Copy> UninitCell<T> { | ||
| 29 | pub unsafe fn read(&self) -> T { | ||
| 30 | ptr::read(self.as_mut_ptr()) | ||
| 31 | } | ||
| 32 | } | ||
diff --git a/embassy/src/time/mod.rs b/embassy/src/time/mod.rs index 2b4631557..896838371 100644 --- a/embassy/src/time/mod.rs +++ b/embassy/src/time/mod.rs | |||
| @@ -1,11 +1,9 @@ | |||
| 1 | mod duration; | 1 | mod duration; |
| 2 | mod instant; | 2 | mod instant; |
| 3 | mod timer; | ||
| 4 | mod traits; | 3 | mod traits; |
| 5 | 4 | ||
| 6 | pub use duration::Duration; | 5 | pub use duration::Duration; |
| 7 | pub use instant::Instant; | 6 | pub use instant::Instant; |
| 8 | pub use timer::{Ticker, Timer}; | ||
| 9 | pub use traits::*; | 7 | pub use traits::*; |
| 10 | 8 | ||
| 11 | use crate::fmt::*; | 9 | use crate::fmt::*; |
diff --git a/embassy/src/time/timer.rs b/embassy/src/time/timer.rs deleted file mode 100644 index 8756368c7..000000000 --- a/embassy/src/time/timer.rs +++ /dev/null | |||
| @@ -1,63 +0,0 @@ | |||
| 1 | use core::future::Future; | ||
| 2 | use core::pin::Pin; | ||
| 3 | use core::task::{Context, Poll}; | ||
| 4 | use futures::Stream; | ||
| 5 | use futures_intrusive::timer::{LocalTimer, LocalTimerFuture}; | ||
| 6 | |||
| 7 | use super::{Duration, Instant}; | ||
| 8 | use crate::executor::current_timer_queue; | ||
| 9 | |||
| 10 | pub struct Timer { | ||
| 11 | inner: LocalTimerFuture<'static>, | ||
| 12 | } | ||
| 13 | |||
| 14 | impl Timer { | ||
| 15 | pub fn at(when: Instant) -> Self { | ||
| 16 | Self { | ||
| 17 | inner: current_timer_queue().deadline(when.as_ticks()), | ||
| 18 | } | ||
| 19 | } | ||
| 20 | |||
| 21 | pub fn after(dur: Duration) -> Self { | ||
| 22 | Self::at(Instant::now() + dur) | ||
| 23 | } | ||
| 24 | } | ||
| 25 | |||
| 26 | impl Future for Timer { | ||
| 27 | type Output = (); | ||
| 28 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 29 | unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx) | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | pub struct Ticker { | ||
| 34 | inner: LocalTimerFuture<'static>, | ||
| 35 | next: Instant, | ||
| 36 | dur: Duration, | ||
| 37 | } | ||
| 38 | |||
| 39 | impl Ticker { | ||
| 40 | pub fn every(dur: Duration) -> Self { | ||
| 41 | let next = Instant::now() + dur; | ||
| 42 | Self { | ||
| 43 | inner: current_timer_queue().deadline(next.as_ticks()), | ||
| 44 | next, | ||
| 45 | dur, | ||
| 46 | } | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | impl Stream for Ticker { | ||
| 51 | type Item = (); | ||
| 52 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 53 | let this = unsafe { self.get_unchecked_mut() }; | ||
| 54 | match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) { | ||
| 55 | Poll::Ready(_) => { | ||
| 56 | this.next += this.dur; | ||
| 57 | this.inner = current_timer_queue().deadline(this.next.as_ticks()); | ||
| 58 | Poll::Ready(Some(())) | ||
| 59 | } | ||
| 60 | Poll::Pending => Poll::Pending, | ||
| 61 | } | ||
| 62 | } | ||
| 63 | } | ||
