diff options
| author | Quentin Smith <[email protected]> | 2023-07-17 21:31:43 -0400 |
|---|---|---|
| committer | Quentin Smith <[email protected]> | 2023-07-17 21:31:43 -0400 |
| commit | 6f02403184eb7fb7990fb88fc9df9c4328a690a3 (patch) | |
| tree | 748f510e190bb2724750507a6e69ed1a8e08cb20 /embassy-executor/src | |
| parent | d896f80405aa8963877049ed999e4aba25d6e2bb (diff) | |
| parent | 6b5df4523aa1c4902f02e803450ae4b418e0e3ca (diff) | |
Merge remote-tracking branch 'origin/main' into nrf-pdm
Diffstat (limited to 'embassy-executor/src')
| -rw-r--r-- | embassy-executor/src/arch/cortex_m.rs | 253 | ||||
| -rw-r--r-- | embassy-executor/src/arch/riscv32.rs | 135 | ||||
| -rw-r--r-- | embassy-executor/src/arch/std.rs | 150 | ||||
| -rw-r--r-- | embassy-executor/src/arch/wasm.rs | 134 | ||||
| -rw-r--r-- | embassy-executor/src/arch/xtensa.rs | 126 | ||||
| -rw-r--r-- | embassy-executor/src/lib.rs | 77 | ||||
| -rw-r--r-- | embassy-executor/src/raw/mod.rs | 483 | ||||
| -rw-r--r-- | embassy-executor/src/raw/run_queue.rs | 44 | ||||
| -rw-r--r-- | embassy-executor/src/raw/timer_queue.rs | 33 | ||||
| -rw-r--r-- | embassy-executor/src/raw/util.rs | 29 | ||||
| -rw-r--r-- | embassy-executor/src/raw/waker.rs | 13 | ||||
| -rw-r--r-- | embassy-executor/src/raw/waker_turbo.rs | 34 | ||||
| -rw-r--r-- | embassy-executor/src/spawner.rs | 32 |
13 files changed, 985 insertions, 558 deletions
diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index 4b27a264e..94c8134d6 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs | |||
| @@ -1,59 +1,224 @@ | |||
| 1 | use core::arch::asm; | 1 | #[cfg(feature = "executor-thread")] |
| 2 | use core::marker::PhantomData; | 2 | pub use thread::*; |
| 3 | use core::ptr; | 3 | #[cfg(feature = "executor-thread")] |
| 4 | 4 | mod thread { | |
| 5 | use super::{raw, Spawner}; | 5 | use core::arch::asm; |
| 6 | 6 | use core::marker::PhantomData; | |
| 7 | /// Thread mode executor, using WFE/SEV. | 7 | |
| 8 | /// | 8 | #[cfg(feature = "nightly")] |
| 9 | /// This is the simplest and most common kind of executor. It runs on | 9 | pub use embassy_macros::main_cortex_m as main; |
| 10 | /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction | 10 | |
| 11 | /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction | 11 | use crate::raw::{Pender, PenderInner}; |
| 12 | /// is executed, to make the `WFE` exit from sleep and poll the task. | 12 | use crate::{raw, Spawner}; |
| 13 | /// | 13 | |
| 14 | /// This executor allows for ultra low power consumption for chips where `WFE` | 14 | #[derive(Copy, Clone)] |
| 15 | /// triggers low-power sleep without extra steps. If your chip requires extra steps, | 15 | pub(crate) struct ThreadPender; |
| 16 | /// you may use [`raw::Executor`] directly to program custom behavior. | 16 | |
| 17 | pub struct Executor { | 17 | impl ThreadPender { |
| 18 | inner: raw::Executor, | 18 | pub(crate) fn pend(self) { |
| 19 | not_send: PhantomData<*mut ()>, | 19 | unsafe { core::arch::asm!("sev") } |
| 20 | } | ||
| 21 | } | ||
| 22 | |||
| 23 | /// Thread mode executor, using WFE/SEV. | ||
| 24 | /// | ||
| 25 | /// This is the simplest and most common kind of executor. It runs on | ||
| 26 | /// thread mode (at the lowest priority level), and uses the `WFE` ARM instruction | ||
| 27 | /// to sleep when it has no more work to do. When a task is woken, a `SEV` instruction | ||
| 28 | /// is executed, to make the `WFE` exit from sleep and poll the task. | ||
| 29 | /// | ||
| 30 | /// This executor allows for ultra low power consumption for chips where `WFE` | ||
| 31 | /// triggers low-power sleep without extra steps. If your chip requires extra steps, | ||
| 32 | /// you may use [`raw::Executor`] directly to program custom behavior. | ||
| 33 | pub struct Executor { | ||
| 34 | inner: raw::Executor, | ||
| 35 | not_send: PhantomData<*mut ()>, | ||
| 36 | } | ||
| 37 | |||
| 38 | impl Executor { | ||
| 39 | /// Create a new Executor. | ||
| 40 | pub fn new() -> Self { | ||
| 41 | Self { | ||
| 42 | inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), | ||
| 43 | not_send: PhantomData, | ||
| 44 | } | ||
| 45 | } | ||
| 46 | |||
| 47 | /// Run the executor. | ||
| 48 | /// | ||
| 49 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | ||
| 50 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | ||
| 51 | /// the executor starts running the tasks. | ||
| 52 | /// | ||
| 53 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | ||
| 54 | /// for example by passing it as an argument to the initial tasks. | ||
| 55 | /// | ||
| 56 | /// This function requires `&'static mut self`. This means you have to store the | ||
| 57 | /// Executor instance in a place where it'll live forever and grants you mutable | ||
| 58 | /// access. There's a few ways to do this: | ||
| 59 | /// | ||
| 60 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 61 | /// - a `static mut` (unsafe) | ||
| 62 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 63 | /// | ||
| 64 | /// This function never returns. | ||
| 65 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | ||
| 66 | init(self.inner.spawner()); | ||
| 67 | |||
| 68 | loop { | ||
| 69 | unsafe { | ||
| 70 | self.inner.poll(); | ||
| 71 | asm!("wfe"); | ||
| 72 | }; | ||
| 73 | } | ||
| 74 | } | ||
| 75 | } | ||
| 20 | } | 76 | } |
| 21 | 77 | ||
| 22 | impl Executor { | 78 | #[cfg(feature = "executor-interrupt")] |
| 23 | /// Create a new Executor. | 79 | pub use interrupt::*; |
| 24 | pub fn new() -> Self { | 80 | #[cfg(feature = "executor-interrupt")] |
| 25 | Self { | 81 | mod interrupt { |
| 26 | inner: raw::Executor::new(|_| unsafe { asm!("sev") }, ptr::null_mut()), | 82 | use core::cell::UnsafeCell; |
| 27 | not_send: PhantomData, | 83 | use core::mem::MaybeUninit; |
| 84 | |||
| 85 | use atomic_polyfill::{AtomicBool, Ordering}; | ||
| 86 | use cortex_m::interrupt::InterruptNumber; | ||
| 87 | use cortex_m::peripheral::NVIC; | ||
| 88 | |||
| 89 | use crate::raw::{self, Pender, PenderInner}; | ||
| 90 | |||
| 91 | #[derive(Clone, Copy)] | ||
| 92 | pub(crate) struct InterruptPender(u16); | ||
| 93 | |||
| 94 | impl InterruptPender { | ||
| 95 | pub(crate) fn pend(self) { | ||
| 96 | // STIR is faster, but is only available in v7 and higher. | ||
| 97 | #[cfg(not(armv6m))] | ||
| 98 | { | ||
| 99 | let mut nvic: cortex_m::peripheral::NVIC = unsafe { core::mem::transmute(()) }; | ||
| 100 | nvic.request(self); | ||
| 101 | } | ||
| 102 | |||
| 103 | #[cfg(armv6m)] | ||
| 104 | cortex_m::peripheral::NVIC::pend(self); | ||
| 28 | } | 105 | } |
| 29 | } | 106 | } |
| 30 | 107 | ||
| 31 | /// Run the executor. | 108 | unsafe impl cortex_m::interrupt::InterruptNumber for InterruptPender { |
| 109 | fn number(self) -> u16 { | ||
| 110 | self.0 | ||
| 111 | } | ||
| 112 | } | ||
| 113 | |||
| 114 | /// Interrupt mode executor. | ||
| 32 | /// | 115 | /// |
| 33 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | 116 | /// This executor runs tasks in interrupt mode. The interrupt handler is set up |
| 34 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | 117 | /// to poll tasks, and when a task is woken the interrupt is pended from software. |
| 35 | /// the executor starts running the tasks. | ||
| 36 | /// | 118 | /// |
| 37 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | 119 | /// This allows running async tasks at a priority higher than thread mode. One |
| 38 | /// for example by passing it as an argument to the initial tasks. | 120 | /// use case is to leave thread mode free for non-async tasks. Another use case is |
| 121 | /// to run multiple executors: one in thread mode for low priority tasks and another in | ||
| 122 | /// interrupt mode for higher priority tasks. Higher priority tasks will preempt lower | ||
| 123 | /// priority ones. | ||
| 39 | /// | 124 | /// |
| 40 | /// This function requires `&'static mut self`. This means you have to store the | 125 | /// It is even possible to run multiple interrupt mode executors at different priorities, |
| 41 | /// Executor instance in a place where it'll live forever and grants you mutable | 126 | /// by assigning different priorities to the interrupts. For an example on how to do this, |
| 42 | /// access. There's a few ways to do this: | 127 | /// See the 'multiprio' example for 'embassy-nrf'. |
| 43 | /// | 128 | /// |
| 44 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | 129 | /// To use it, you have to pick an interrupt that won't be used by the hardware. |
| 45 | /// - a `static mut` (unsafe) | 130 | /// Some chips reserve some interrupts for this purpose, sometimes named "software interrupts" (SWI). |
| 46 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | 131 | /// If this is not the case, you may use an interrupt from any unused peripheral. |
| 47 | /// | 132 | /// |
| 48 | /// This function never returns. | 133 | /// It is somewhat more complex to use, it's recommended to use the thread-mode |
| 49 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 134 | /// [`Executor`] instead, if it works for your use case. |
| 50 | init(self.inner.spawner()); | 135 | pub struct InterruptExecutor { |
| 136 | started: AtomicBool, | ||
| 137 | executor: UnsafeCell<MaybeUninit<raw::Executor>>, | ||
| 138 | } | ||
| 139 | |||
| 140 | unsafe impl Send for InterruptExecutor {} | ||
| 141 | unsafe impl Sync for InterruptExecutor {} | ||
| 142 | |||
| 143 | impl InterruptExecutor { | ||
| 144 | /// Create a new, not started `InterruptExecutor`. | ||
| 145 | #[inline] | ||
| 146 | pub const fn new() -> Self { | ||
| 147 | Self { | ||
| 148 | started: AtomicBool::new(false), | ||
| 149 | executor: UnsafeCell::new(MaybeUninit::uninit()), | ||
| 150 | } | ||
| 151 | } | ||
| 152 | |||
| 153 | /// Executor interrupt callback. | ||
| 154 | /// | ||
| 155 | /// # Safety | ||
| 156 | /// | ||
| 157 | /// You MUST call this from the interrupt handler, and from nowhere else. | ||
| 158 | pub unsafe fn on_interrupt(&'static self) { | ||
| 159 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | ||
| 160 | executor.poll(); | ||
| 161 | } | ||
| 162 | |||
| 163 | /// Start the executor. | ||
| 164 | /// | ||
| 165 | /// This initializes the executor, enables the interrupt, and returns. | ||
| 166 | /// The executor keeps running in the background through the interrupt. | ||
| 167 | /// | ||
| 168 | /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A [`SendSpawner`] | ||
| 169 | /// is returned instead of a [`Spawner`](embassy_executor::Spawner) because the executor effectively runs in a | ||
| 170 | /// different "thread" (the interrupt), so spawning tasks on it is effectively | ||
| 171 | /// sending them. | ||
| 172 | /// | ||
| 173 | /// To obtain a [`Spawner`](embassy_executor::Spawner) for this executor, use [`Spawner::for_current_executor()`](embassy_executor::Spawner::for_current_executor()) from | ||
| 174 | /// a task running in it. | ||
| 175 | /// | ||
| 176 | /// # Interrupt requirements | ||
| 177 | /// | ||
| 178 | /// You must write the interrupt handler yourself, and make it call [`on_interrupt()`](Self::on_interrupt). | ||
| 179 | /// | ||
| 180 | /// This method already enables (unmasks) the interrupt, you must NOT do it yourself. | ||
| 181 | /// | ||
| 182 | /// You must set the interrupt priority before calling this method. You MUST NOT | ||
| 183 | /// do it after. | ||
| 184 | /// | ||
| 185 | pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { | ||
| 186 | if self | ||
| 187 | .started | ||
| 188 | .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) | ||
| 189 | .is_err() | ||
| 190 | { | ||
| 191 | panic!("InterruptExecutor::start() called multiple times on the same executor."); | ||
| 192 | } | ||
| 51 | 193 | ||
| 52 | loop { | ||
| 53 | unsafe { | 194 | unsafe { |
| 54 | self.inner.poll(); | 195 | (&mut *self.executor.get()) |
| 55 | asm!("wfe"); | 196 | .as_mut_ptr() |
| 56 | }; | 197 | .write(raw::Executor::new(Pender(PenderInner::Interrupt(InterruptPender( |
| 198 | irq.number(), | ||
| 199 | ))))) | ||
| 200 | } | ||
| 201 | |||
| 202 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | ||
| 203 | |||
| 204 | unsafe { NVIC::unmask(irq) } | ||
| 205 | |||
| 206 | executor.spawner().make_send() | ||
| 207 | } | ||
| 208 | |||
| 209 | /// Get a SendSpawner for this executor | ||
| 210 | /// | ||
| 211 | /// This returns a [`SendSpawner`] you can use to spawn tasks on this | ||
| 212 | /// executor. | ||
| 213 | /// | ||
| 214 | /// This MUST only be called on an executor that has already been spawned. | ||
| 215 | /// The function will panic otherwise. | ||
| 216 | pub fn spawner(&'static self) -> crate::SendSpawner { | ||
| 217 | if !self.started.load(Ordering::Acquire) { | ||
| 218 | panic!("InterruptExecutor::spawner() called on uninitialized executor."); | ||
| 219 | } | ||
| 220 | let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; | ||
| 221 | executor.spawner().make_send() | ||
| 57 | } | 222 | } |
| 58 | } | 223 | } |
| 59 | } | 224 | } |
diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index 2a4b006da..ff7ec1575 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs | |||
| @@ -1,73 +1,86 @@ | |||
| 1 | use core::marker::PhantomData; | 1 | #[cfg(feature = "executor-interrupt")] |
| 2 | use core::ptr; | 2 | compile_error!("`executor-interrupt` is not supported with `arch-riscv32`."); |
| 3 | 3 | ||
| 4 | use atomic_polyfill::{AtomicBool, Ordering}; | 4 | #[cfg(feature = "executor-thread")] |
| 5 | pub use thread::*; | ||
| 6 | #[cfg(feature = "executor-thread")] | ||
| 7 | mod thread { | ||
| 8 | use core::marker::PhantomData; | ||
| 9 | use core::sync::atomic::{AtomicBool, Ordering}; | ||
| 5 | 10 | ||
| 6 | use super::{raw, Spawner}; | 11 | #[cfg(feature = "nightly")] |
| 12 | pub use embassy_macros::main_riscv as main; | ||
| 7 | 13 | ||
| 8 | /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV | 14 | use crate::raw::{Pender, PenderInner}; |
| 9 | /// | 15 | use crate::{raw, Spawner}; |
| 10 | static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); | ||
| 11 | 16 | ||
| 12 | /// RISCV32 Executor | 17 | #[derive(Copy, Clone)] |
| 13 | pub struct Executor { | 18 | pub(crate) struct ThreadPender; |
| 14 | inner: raw::Executor, | ||
| 15 | not_send: PhantomData<*mut ()>, | ||
| 16 | } | ||
| 17 | 19 | ||
| 18 | impl Executor { | 20 | impl ThreadPender { |
| 19 | /// Create a new Executor. | 21 | #[allow(unused)] |
| 20 | pub fn new() -> Self { | 22 | pub(crate) fn pend(self) { |
| 21 | Self { | 23 | SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst); |
| 22 | // use Signal_Work_Thread_Mode as substitute for local interrupt register | ||
| 23 | inner: raw::Executor::new( | ||
| 24 | |_| { | ||
| 25 | SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); | ||
| 26 | }, | ||
| 27 | ptr::null_mut(), | ||
| 28 | ), | ||
| 29 | not_send: PhantomData, | ||
| 30 | } | 24 | } |
| 31 | } | 25 | } |
| 32 | 26 | ||
| 33 | /// Run the executor. | 27 | /// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV |
| 34 | /// | 28 | static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); |
| 35 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | 29 | |
| 36 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | 30 | /// RISCV32 Executor |
| 37 | /// the executor starts running the tasks. | 31 | pub struct Executor { |
| 38 | /// | 32 | inner: raw::Executor, |
| 39 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | 33 | not_send: PhantomData<*mut ()>, |
| 40 | /// for example by passing it as an argument to the initial tasks. | 34 | } |
| 41 | /// | 35 | |
| 42 | /// This function requires `&'static mut self`. This means you have to store the | 36 | impl Executor { |
| 43 | /// Executor instance in a place where it'll live forever and grants you mutable | 37 | /// Create a new Executor. |
| 44 | /// access. There's a few ways to do this: | 38 | pub fn new() -> Self { |
| 45 | /// | 39 | Self { |
| 46 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | 40 | inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), |
| 47 | /// - a `static mut` (unsafe) | 41 | not_send: PhantomData, |
| 48 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | 42 | } |
| 49 | /// | 43 | } |
| 50 | /// This function never returns. | 44 | |
| 51 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | 45 | /// Run the executor. |
| 52 | init(self.inner.spawner()); | 46 | /// |
| 47 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | ||
| 48 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | ||
| 49 | /// the executor starts running the tasks. | ||
| 50 | /// | ||
| 51 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | ||
| 52 | /// for example by passing it as an argument to the initial tasks. | ||
| 53 | /// | ||
| 54 | /// This function requires `&'static mut self`. This means you have to store the | ||
| 55 | /// Executor instance in a place where it'll live forever and grants you mutable | ||
| 56 | /// access. There's a few ways to do this: | ||
| 57 | /// | ||
| 58 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 59 | /// - a `static mut` (unsafe) | ||
| 60 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 61 | /// | ||
| 62 | /// This function never returns. | ||
| 63 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | ||
| 64 | init(self.inner.spawner()); | ||
| 53 | 65 | ||
| 54 | loop { | 66 | loop { |
| 55 | unsafe { | 67 | unsafe { |
| 56 | self.inner.poll(); | 68 | self.inner.poll(); |
| 57 | // we do not care about race conditions between the load and store operations, interrupts | 69 | // we do not care about race conditions between the load and store operations, interrupts |
| 58 | //will only set this value to true. | 70 | //will only set this value to true. |
| 59 | critical_section::with(|_| { | 71 | critical_section::with(|_| { |
| 60 | // if there is work to do, loop back to polling | 72 | // if there is work to do, loop back to polling |
| 61 | // TODO can we relax this? | 73 | // TODO can we relax this? |
| 62 | if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { | 74 | if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { |
| 63 | SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); | 75 | SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); |
| 64 | } | 76 | } |
| 65 | // if not, wait for interrupt | 77 | // if not, wait for interrupt |
| 66 | else { | 78 | else { |
| 67 | core::arch::asm!("wfi"); | 79 | core::arch::asm!("wfi"); |
| 68 | } | 80 | } |
| 69 | }); | 81 | }); |
| 70 | // if an interrupt occurred while waiting, it will be serviced here | 82 | // if an interrupt occurred while waiting, it will be serviced here |
| 83 | } | ||
| 71 | } | 84 | } |
| 72 | } | 85 | } |
| 73 | } | 86 | } |
diff --git a/embassy-executor/src/arch/std.rs b/embassy-executor/src/arch/std.rs index 701f0eb18..4e4a178f0 100644 --- a/embassy-executor/src/arch/std.rs +++ b/embassy-executor/src/arch/std.rs | |||
| @@ -1,84 +1,100 @@ | |||
| 1 | use std::marker::PhantomData; | 1 | #[cfg(feature = "executor-interrupt")] |
| 2 | use std::sync::{Condvar, Mutex}; | 2 | compile_error!("`executor-interrupt` is not supported with `arch-std`."); |
| 3 | 3 | ||
| 4 | use super::{raw, Spawner}; | 4 | #[cfg(feature = "executor-thread")] |
| 5 | pub use thread::*; | ||
| 6 | #[cfg(feature = "executor-thread")] | ||
| 7 | mod thread { | ||
| 8 | use std::marker::PhantomData; | ||
| 9 | use std::sync::{Condvar, Mutex}; | ||
| 5 | 10 | ||
| 6 | /// Single-threaded std-based executor. | 11 | #[cfg(feature = "nightly")] |
| 7 | pub struct Executor { | 12 | pub use embassy_macros::main_std as main; |
| 8 | inner: raw::Executor, | 13 | |
| 9 | not_send: PhantomData<*mut ()>, | 14 | use crate::raw::{Pender, PenderInner}; |
| 10 | signaler: &'static Signaler, | 15 | use crate::{raw, Spawner}; |
| 11 | } | ||
| 12 | 16 | ||
| 13 | impl Executor { | 17 | #[derive(Copy, Clone)] |
| 14 | /// Create a new Executor. | 18 | pub(crate) struct ThreadPender(&'static Signaler); |
| 15 | pub fn new() -> Self { | 19 | |
| 16 | let signaler = &*Box::leak(Box::new(Signaler::new())); | 20 | impl ThreadPender { |
| 17 | Self { | 21 | #[allow(unused)] |
| 18 | inner: raw::Executor::new( | 22 | pub(crate) fn pend(self) { |
| 19 | |p| unsafe { | 23 | self.0.signal() |
| 20 | let s = &*(p as *const () as *const Signaler); | ||
| 21 | s.signal() | ||
| 22 | }, | ||
| 23 | signaler as *const _ as _, | ||
| 24 | ), | ||
| 25 | not_send: PhantomData, | ||
| 26 | signaler, | ||
| 27 | } | 24 | } |
| 28 | } | 25 | } |
| 29 | 26 | ||
| 30 | /// Run the executor. | 27 | /// Single-threaded std-based executor. |
| 31 | /// | 28 | pub struct Executor { |
| 32 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | 29 | inner: raw::Executor, |
| 33 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | 30 | not_send: PhantomData<*mut ()>, |
| 34 | /// the executor starts running the tasks. | 31 | signaler: &'static Signaler, |
| 35 | /// | 32 | } |
| 36 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | ||
| 37 | /// for example by passing it as an argument to the initial tasks. | ||
| 38 | /// | ||
| 39 | /// This function requires `&'static mut self`. This means you have to store the | ||
| 40 | /// Executor instance in a place where it'll live forever and grants you mutable | ||
| 41 | /// access. There's a few ways to do this: | ||
| 42 | /// | ||
| 43 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 44 | /// - a `static mut` (unsafe) | ||
| 45 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 46 | /// | ||
| 47 | /// This function never returns. | ||
| 48 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | ||
| 49 | init(self.inner.spawner()); | ||
| 50 | 33 | ||
| 51 | loop { | 34 | impl Executor { |
| 52 | unsafe { self.inner.poll() }; | 35 | /// Create a new Executor. |
| 53 | self.signaler.wait() | 36 | pub fn new() -> Self { |
| 37 | let signaler = &*Box::leak(Box::new(Signaler::new())); | ||
| 38 | Self { | ||
| 39 | inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(signaler)))), | ||
| 40 | not_send: PhantomData, | ||
| 41 | signaler, | ||
| 42 | } | ||
| 54 | } | 43 | } |
| 55 | } | ||
| 56 | } | ||
| 57 | 44 | ||
| 58 | struct Signaler { | 45 | /// Run the executor. |
| 59 | mutex: Mutex<bool>, | 46 | /// |
| 60 | condvar: Condvar, | 47 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on |
| 61 | } | 48 | /// this executor. Use it to spawn the initial task(s). After `init` returns, |
| 49 | /// the executor starts running the tasks. | ||
| 50 | /// | ||
| 51 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | ||
| 52 | /// for example by passing it as an argument to the initial tasks. | ||
| 53 | /// | ||
| 54 | /// This function requires `&'static mut self`. This means you have to store the | ||
| 55 | /// Executor instance in a place where it'll live forever and grants you mutable | ||
| 56 | /// access. There's a few ways to do this: | ||
| 57 | /// | ||
| 58 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 59 | /// - a `static mut` (unsafe) | ||
| 60 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 61 | /// | ||
| 62 | /// This function never returns. | ||
| 63 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | ||
| 64 | init(self.inner.spawner()); | ||
| 62 | 65 | ||
| 63 | impl Signaler { | 66 | loop { |
| 64 | fn new() -> Self { | 67 | unsafe { self.inner.poll() }; |
| 65 | Self { | 68 | self.signaler.wait() |
| 66 | mutex: Mutex::new(false), | 69 | } |
| 67 | condvar: Condvar::new(), | ||
| 68 | } | 70 | } |
| 69 | } | 71 | } |
| 70 | 72 | ||
| 71 | fn wait(&self) { | 73 | struct Signaler { |
| 72 | let mut signaled = self.mutex.lock().unwrap(); | 74 | mutex: Mutex<bool>, |
| 73 | while !*signaled { | 75 | condvar: Condvar, |
| 74 | signaled = self.condvar.wait(signaled).unwrap(); | ||
| 75 | } | ||
| 76 | *signaled = false; | ||
| 77 | } | 76 | } |
| 78 | 77 | ||
| 79 | fn signal(&self) { | 78 | impl Signaler { |
| 80 | let mut signaled = self.mutex.lock().unwrap(); | 79 | fn new() -> Self { |
| 81 | *signaled = true; | 80 | Self { |
| 82 | self.condvar.notify_one(); | 81 | mutex: Mutex::new(false), |
| 82 | condvar: Condvar::new(), | ||
| 83 | } | ||
| 84 | } | ||
| 85 | |||
| 86 | fn wait(&self) { | ||
| 87 | let mut signaled = self.mutex.lock().unwrap(); | ||
| 88 | while !*signaled { | ||
| 89 | signaled = self.condvar.wait(signaled).unwrap(); | ||
| 90 | } | ||
| 91 | *signaled = false; | ||
| 92 | } | ||
| 93 | |||
| 94 | fn signal(&self) { | ||
| 95 | let mut signaled = self.mutex.lock().unwrap(); | ||
| 96 | *signaled = true; | ||
| 97 | self.condvar.notify_one(); | ||
| 98 | } | ||
| 83 | } | 99 | } |
| 84 | } | 100 | } |
diff --git a/embassy-executor/src/arch/wasm.rs b/embassy-executor/src/arch/wasm.rs index 98091cfbb..08ab16b99 100644 --- a/embassy-executor/src/arch/wasm.rs +++ b/embassy-executor/src/arch/wasm.rs | |||
| @@ -1,74 +1,88 @@ | |||
| 1 | use core::marker::PhantomData; | 1 | #[cfg(feature = "executor-interrupt")] |
| 2 | compile_error!("`executor-interrupt` is not supported with `arch-wasm`."); | ||
| 2 | 3 | ||
| 3 | use js_sys::Promise; | 4 | #[cfg(feature = "executor-thread")] |
| 4 | use wasm_bindgen::prelude::*; | 5 | pub use thread::*; |
| 6 | #[cfg(feature = "executor-thread")] | ||
| 7 | mod thread { | ||
| 5 | 8 | ||
| 6 | use super::raw::util::UninitCell; | 9 | use core::marker::PhantomData; |
| 7 | use super::raw::{self}; | ||
| 8 | use super::Spawner; | ||
| 9 | 10 | ||
| 10 | /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. | 11 | #[cfg(feature = "nightly")] |
| 11 | pub struct Executor { | 12 | pub use embassy_macros::main_wasm as main; |
| 12 | inner: raw::Executor, | 13 | use js_sys::Promise; |
| 13 | ctx: &'static WasmContext, | 14 | use wasm_bindgen::prelude::*; |
| 14 | not_send: PhantomData<*mut ()>, | ||
| 15 | } | ||
| 16 | 15 | ||
| 17 | pub(crate) struct WasmContext { | 16 | use crate::raw::util::UninitCell; |
| 18 | promise: Promise, | 17 | use crate::raw::{Pender, PenderInner}; |
| 19 | closure: UninitCell<Closure<dyn FnMut(JsValue)>>, | 18 | use crate::{raw, Spawner}; |
| 20 | } | 19 | |
| 20 | /// WASM executor, wasm_bindgen to schedule tasks on the JS event loop. | ||
| 21 | pub struct Executor { | ||
| 22 | inner: raw::Executor, | ||
| 23 | ctx: &'static WasmContext, | ||
| 24 | not_send: PhantomData<*mut ()>, | ||
| 25 | } | ||
| 26 | |||
| 27 | pub(crate) struct WasmContext { | ||
| 28 | promise: Promise, | ||
| 29 | closure: UninitCell<Closure<dyn FnMut(JsValue)>>, | ||
| 30 | } | ||
| 31 | |||
| 32 | #[derive(Copy, Clone)] | ||
| 33 | pub(crate) struct ThreadPender(&'static WasmContext); | ||
| 21 | 34 | ||
| 22 | impl WasmContext { | 35 | impl ThreadPender { |
| 23 | pub fn new() -> Self { | 36 | #[allow(unused)] |
| 24 | Self { | 37 | pub(crate) fn pend(self) { |
| 25 | promise: Promise::resolve(&JsValue::undefined()), | 38 | let _ = self.0.promise.then(unsafe { self.0.closure.as_mut() }); |
| 26 | closure: UninitCell::uninit(), | ||
| 27 | } | 39 | } |
| 28 | } | 40 | } |
| 29 | } | ||
| 30 | 41 | ||
| 31 | impl Executor { | 42 | impl WasmContext { |
| 32 | /// Create a new Executor. | 43 | pub fn new() -> Self { |
| 33 | pub fn new() -> Self { | 44 | Self { |
| 34 | let ctx = &*Box::leak(Box::new(WasmContext::new())); | 45 | promise: Promise::resolve(&JsValue::undefined()), |
| 35 | let inner = raw::Executor::new( | 46 | closure: UninitCell::uninit(), |
| 36 | |p| unsafe { | 47 | } |
| 37 | let ctx = &*(p as *const () as *const WasmContext); | ||
| 38 | let _ = ctx.promise.then(ctx.closure.as_mut()); | ||
| 39 | }, | ||
| 40 | ctx as *const _ as _, | ||
| 41 | ); | ||
| 42 | Self { | ||
| 43 | inner, | ||
| 44 | not_send: PhantomData, | ||
| 45 | ctx, | ||
| 46 | } | 48 | } |
| 47 | } | 49 | } |
| 48 | 50 | ||
| 49 | /// Run the executor. | 51 | impl Executor { |
| 50 | /// | 52 | /// Create a new Executor. |
| 51 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | 53 | pub fn new() -> Self { |
| 52 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | 54 | let ctx = &*Box::leak(Box::new(WasmContext::new())); |
| 53 | /// the executor starts running the tasks. | 55 | Self { |
| 54 | /// | 56 | inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender(ctx)))), |
| 55 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | 57 | not_send: PhantomData, |
| 56 | /// for example by passing it as an argument to the initial tasks. | 58 | ctx, |
| 57 | /// | 59 | } |
| 58 | /// This function requires `&'static mut self`. This means you have to store the | 60 | } |
| 59 | /// Executor instance in a place where it'll live forever and grants you mutable | 61 | |
| 60 | /// access. There's a few ways to do this: | 62 | /// Run the executor. |
| 61 | /// | 63 | /// |
| 62 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | 64 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on |
| 63 | /// - a `static mut` (unsafe) | 65 | /// this executor. Use it to spawn the initial task(s). After `init` returns, |
| 64 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | 66 | /// the executor starts running the tasks. |
| 65 | pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { | 67 | /// |
| 66 | unsafe { | 68 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), |
| 67 | let executor = &self.inner; | 69 | /// for example by passing it as an argument to the initial tasks. |
| 68 | self.ctx.closure.write(Closure::new(move |_| { | 70 | /// |
| 69 | executor.poll(); | 71 | /// This function requires `&'static mut self`. This means you have to store the |
| 70 | })); | 72 | /// Executor instance in a place where it'll live forever and grants you mutable |
| 71 | init(self.inner.spawner()); | 73 | /// access. There's a few ways to do this: |
| 74 | /// | ||
| 75 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 76 | /// - a `static mut` (unsafe) | ||
| 77 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 78 | pub fn start(&'static mut self, init: impl FnOnce(Spawner)) { | ||
| 79 | unsafe { | ||
| 80 | let executor = &self.inner; | ||
| 81 | self.ctx.closure.write(Closure::new(move |_| { | ||
| 82 | executor.poll(); | ||
| 83 | })); | ||
| 84 | init(self.inner.spawner()); | ||
| 85 | } | ||
| 72 | } | 86 | } |
| 73 | } | 87 | } |
| 74 | } | 88 | } |
diff --git a/embassy-executor/src/arch/xtensa.rs b/embassy-executor/src/arch/xtensa.rs index f908aaa70..017b2c52b 100644 --- a/embassy-executor/src/arch/xtensa.rs +++ b/embassy-executor/src/arch/xtensa.rs | |||
| @@ -1,66 +1,84 @@ | |||
| 1 | use core::marker::PhantomData; | 1 | #[cfg(feature = "executor-interrupt")] |
| 2 | use core::ptr; | 2 | compile_error!("`executor-interrupt` is not supported with `arch-xtensa`."); |
| 3 | 3 | ||
| 4 | use atomic_polyfill::{AtomicBool, Ordering}; | 4 | #[cfg(feature = "executor-thread")] |
| 5 | pub use thread::*; | ||
| 6 | #[cfg(feature = "executor-thread")] | ||
| 7 | mod thread { | ||
| 8 | use core::marker::PhantomData; | ||
| 9 | use core::sync::atomic::{AtomicBool, Ordering}; | ||
| 5 | 10 | ||
| 6 | use super::{raw, Spawner}; | 11 | use crate::raw::{Pender, PenderInner}; |
| 12 | use crate::{raw, Spawner}; | ||
| 7 | 13 | ||
| 8 | /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa | 14 | #[derive(Copy, Clone)] |
| 9 | /// | 15 | pub(crate) struct ThreadPender; |
| 10 | static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); | ||
| 11 | 16 | ||
| 12 | /// Xtensa Executor | 17 | impl ThreadPender { |
| 13 | pub struct Executor { | 18 | #[allow(unused)] |
| 14 | inner: raw::Executor, | 19 | pub(crate) fn pend(self) { |
| 15 | not_send: PhantomData<*mut ()>, | 20 | SIGNAL_WORK_THREAD_MODE.store(true, core::sync::atomic::Ordering::SeqCst); |
| 16 | } | ||
| 17 | |||
| 18 | impl Executor { | ||
| 19 | /// Create a new Executor. | ||
| 20 | pub fn new() -> Self { | ||
| 21 | Self { | ||
| 22 | // use Signal_Work_Thread_Mode as substitute for local interrupt register | ||
| 23 | inner: raw::Executor::new( | ||
| 24 | |_| { | ||
| 25 | SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst); | ||
| 26 | }, | ||
| 27 | ptr::null_mut(), | ||
| 28 | ), | ||
| 29 | not_send: PhantomData, | ||
| 30 | } | 21 | } |
| 31 | } | 22 | } |
| 32 | 23 | ||
| 33 | /// Run the executor. | 24 | /// global atomic used to keep track of whether there is work to do since sev() is not available on Xtensa |
| 34 | /// | 25 | static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false); |
| 35 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | 26 | |
| 36 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | 27 | /// Xtensa Executor |
| 37 | /// the executor starts running the tasks. | 28 | pub struct Executor { |
| 38 | /// | 29 | inner: raw::Executor, |
| 39 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | 30 | not_send: PhantomData<*mut ()>, |
| 40 | /// for example by passing it as an argument to the initial tasks. | 31 | } |
| 41 | /// | ||
| 42 | /// This function requires `&'static mut self`. This means you have to store the | ||
| 43 | /// Executor instance in a place where it'll live forever and grants you mutable | ||
| 44 | /// access. There's a few ways to do this: | ||
| 45 | /// | ||
| 46 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 47 | /// - a `static mut` (unsafe) | ||
| 48 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 49 | /// | ||
| 50 | /// This function never returns. | ||
| 51 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | ||
| 52 | init(self.inner.spawner()); | ||
| 53 | 32 | ||
| 54 | loop { | 33 | impl Executor { |
| 55 | unsafe { | 34 | /// Create a new Executor. |
| 56 | self.inner.poll(); | 35 | pub fn new() -> Self { |
| 57 | // we do not care about race conditions between the load and store operations, interrupts | 36 | Self { |
| 58 | // will only set this value to true. | 37 | inner: raw::Executor::new(Pender(PenderInner::Thread(ThreadPender))), |
| 59 | // if there is work to do, loop back to polling | 38 | not_send: PhantomData, |
| 60 | // TODO can we relax this? | 39 | } |
| 61 | critical_section::with(|_| { | 40 | } |
| 41 | |||
| 42 | /// Run the executor. | ||
| 43 | /// | ||
| 44 | /// The `init` closure is called with a [`Spawner`] that spawns tasks on | ||
| 45 | /// this executor. Use it to spawn the initial task(s). After `init` returns, | ||
| 46 | /// the executor starts running the tasks. | ||
| 47 | /// | ||
| 48 | /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is `Copy`), | ||
| 49 | /// for example by passing it as an argument to the initial tasks. | ||
| 50 | /// | ||
| 51 | /// This function requires `&'static mut self`. This means you have to store the | ||
| 52 | /// Executor instance in a place where it'll live forever and grants you mutable | ||
| 53 | /// access. There's a few ways to do this: | ||
| 54 | /// | ||
| 55 | /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe) | ||
| 56 | /// - a `static mut` (unsafe) | ||
| 57 | /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe) | ||
| 58 | /// | ||
| 59 | /// This function never returns. | ||
| 60 | pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { | ||
| 61 | init(self.inner.spawner()); | ||
| 62 | |||
| 63 | loop { | ||
| 64 | unsafe { | ||
| 65 | self.inner.poll(); | ||
| 66 | |||
| 67 | // Manual critical section implementation that only masks interrupts handlers. | ||
| 68 | // We must not acquire the cross-core on dual-core systems because that would | ||
| 69 | // prevent the other core from doing useful work while this core is sleeping. | ||
| 70 | let token: critical_section::RawRestoreState; | ||
| 71 | core::arch::asm!("rsil {0}, 5", out(reg) token); | ||
| 72 | |||
| 73 | // we do not care about race conditions between the load and store operations, interrupts | ||
| 74 | // will only set this value to true. | ||
| 75 | // if there is work to do, loop back to polling | ||
| 62 | if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { | 76 | if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) { |
| 63 | SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); | 77 | SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst); |
| 78 | |||
| 79 | core::arch::asm!( | ||
| 80 | "wsr.ps {0}", | ||
| 81 | "rsync", in(reg) token) | ||
| 64 | } else { | 82 | } else { |
| 65 | // waiti sets the PS.INTLEVEL when slipping into sleep | 83 | // waiti sets the PS.INTLEVEL when slipping into sleep |
| 66 | // because critical sections in Xtensa are implemented via increasing | 84 | // because critical sections in Xtensa are implemented via increasing |
| @@ -68,7 +86,7 @@ impl Executor { | |||
| 68 | // take care not add code after `waiti` if it needs to be inside the CS | 86 | // take care not add code after `waiti` if it needs to be inside the CS |
| 69 | core::arch::asm!("waiti 0"); // critical section ends here | 87 | core::arch::asm!("waiti 0"); // critical section ends here |
| 70 | } | 88 | } |
| 71 | }); | 89 | } |
| 72 | } | 90 | } |
| 73 | } | 91 | } |
| 74 | } | 92 | } |
diff --git a/embassy-executor/src/lib.rs b/embassy-executor/src/lib.rs index e4cbd04b9..3ce687eb6 100644 --- a/embassy-executor/src/lib.rs +++ b/embassy-executor/src/lib.rs | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] | 1 | #![cfg_attr(not(any(feature = "arch-std", feature = "arch-wasm")), no_std)] |
| 2 | #![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))] | 2 | #![cfg_attr(all(feature = "nightly", feature = "arch-xtensa"), feature(asm_experimental_arch))] |
| 3 | #![allow(clippy::new_without_default)] | 3 | #![allow(clippy::new_without_default)] |
| 4 | #![doc = include_str!("../README.md")] | 4 | #![doc = include_str!("../README.md")] |
| 5 | #![warn(missing_docs)] | 5 | #![warn(missing_docs)] |
| @@ -8,41 +8,45 @@ | |||
| 8 | pub(crate) mod fmt; | 8 | pub(crate) mod fmt; |
| 9 | 9 | ||
| 10 | #[cfg(feature = "nightly")] | 10 | #[cfg(feature = "nightly")] |
| 11 | pub use embassy_macros::{main, task}; | 11 | pub use embassy_macros::task; |
| 12 | 12 | ||
| 13 | cfg_if::cfg_if! { | 13 | macro_rules! check_at_most_one { |
| 14 | if #[cfg(cortex_m)] { | 14 | (@amo [$($feats:literal)*] [] [$($res:tt)*]) => { |
| 15 | #[path="arch/cortex_m.rs"] | 15 | #[cfg(any($($res)*))] |
| 16 | mod arch; | 16 | compile_error!(concat!("At most one of these features can be enabled at the same time:", $(" `", $feats, "`",)*)); |
| 17 | pub use arch::*; | 17 | }; |
| 18 | } | 18 | (@amo $feats:tt [$curr:literal $($rest:literal)*] [$($res:tt)*]) => { |
| 19 | else if #[cfg(target_arch="riscv32")] { | 19 | check_at_most_one!(@amo $feats [$($rest)*] [$($res)* $(all(feature=$curr, feature=$rest),)*]); |
| 20 | #[path="arch/riscv32.rs"] | 20 | }; |
| 21 | mod arch; | 21 | ($($f:literal),*$(,)?) => { |
| 22 | pub use arch::*; | 22 | check_at_most_one!(@amo [$($f)*] [$($f)*] []); |
| 23 | } | 23 | }; |
| 24 | else if #[cfg(all(target_arch="xtensa", feature = "nightly"))] { | ||
| 25 | #[path="arch/xtensa.rs"] | ||
| 26 | mod arch; | ||
| 27 | pub use arch::*; | ||
| 28 | } | ||
| 29 | else if #[cfg(feature="wasm")] { | ||
| 30 | #[path="arch/wasm.rs"] | ||
| 31 | mod arch; | ||
| 32 | pub use arch::*; | ||
| 33 | } | ||
| 34 | else if #[cfg(feature="std")] { | ||
| 35 | #[path="arch/std.rs"] | ||
| 36 | mod arch; | ||
| 37 | pub use arch::*; | ||
| 38 | } | ||
| 39 | } | 24 | } |
| 25 | check_at_most_one!("arch-cortex-m", "arch-riscv32", "arch-xtensa", "arch-std", "arch-wasm",); | ||
| 26 | |||
| 27 | #[cfg(feature = "_arch")] | ||
| 28 | #[cfg_attr(feature = "arch-cortex-m", path = "arch/cortex_m.rs")] | ||
| 29 | #[cfg_attr(feature = "arch-riscv32", path = "arch/riscv32.rs")] | ||
| 30 | #[cfg_attr(feature = "arch-xtensa", path = "arch/xtensa.rs")] | ||
| 31 | #[cfg_attr(feature = "arch-std", path = "arch/std.rs")] | ||
| 32 | #[cfg_attr(feature = "arch-wasm", path = "arch/wasm.rs")] | ||
| 33 | mod arch; | ||
| 34 | |||
| 35 | #[cfg(feature = "_arch")] | ||
| 36 | pub use arch::*; | ||
| 40 | 37 | ||
| 38 | pub mod raw; | ||
| 39 | |||
| 40 | mod spawner; | ||
| 41 | pub use spawner::*; | ||
| 42 | |||
| 43 | /// Implementation details for embassy macros. | ||
| 44 | /// Do not use. Used for macros and HALs only. Not covered by semver guarantees. | ||
| 41 | #[doc(hidden)] | 45 | #[doc(hidden)] |
| 42 | /// Implementation details for embassy macros. DO NOT USE. | 46 | pub mod _export { |
| 43 | pub mod export { | ||
| 44 | #[cfg(feature = "rtos-trace")] | 47 | #[cfg(feature = "rtos-trace")] |
| 45 | pub use rtos_trace::trace; | 48 | pub use rtos_trace::trace; |
| 49 | pub use static_cell::StaticCell; | ||
| 46 | 50 | ||
| 47 | /// Expands the given block of code when `embassy-executor` is compiled with | 51 | /// Expands the given block of code when `embassy-executor` is compiled with |
| 48 | /// the `rtos-trace-interrupt` feature. | 52 | /// the `rtos-trace-interrupt` feature. |
| @@ -62,14 +66,3 @@ pub mod export { | |||
| 62 | ($($tt:tt)*) => {}; | 66 | ($($tt:tt)*) => {}; |
| 63 | } | 67 | } |
| 64 | } | 68 | } |
| 65 | |||
| 66 | pub mod raw; | ||
| 67 | |||
| 68 | mod spawner; | ||
| 69 | pub use spawner::*; | ||
| 70 | |||
| 71 | /// Do not use. Used for macros and HALs only. Not covered by semver guarantees. | ||
| 72 | #[doc(hidden)] | ||
| 73 | pub mod _export { | ||
| 74 | pub use static_cell::StaticCell; | ||
| 75 | } | ||
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e1258ebb5..f3760f589 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs | |||
| @@ -11,17 +11,17 @@ mod run_queue; | |||
| 11 | #[cfg(feature = "integrated-timers")] | 11 | #[cfg(feature = "integrated-timers")] |
| 12 | mod timer_queue; | 12 | mod timer_queue; |
| 13 | pub(crate) mod util; | 13 | pub(crate) mod util; |
| 14 | #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] | ||
| 14 | mod waker; | 15 | mod waker; |
| 15 | 16 | ||
| 16 | use core::cell::Cell; | ||
| 17 | use core::future::Future; | 17 | use core::future::Future; |
| 18 | use core::marker::PhantomData; | ||
| 19 | use core::mem; | ||
| 18 | use core::pin::Pin; | 20 | use core::pin::Pin; |
| 19 | use core::ptr::NonNull; | 21 | use core::ptr::NonNull; |
| 20 | use core::task::{Context, Poll}; | 22 | use core::task::{Context, Poll}; |
| 21 | use core::{mem, ptr}; | ||
| 22 | 23 | ||
| 23 | use atomic_polyfill::{AtomicU32, Ordering}; | 24 | use atomic_polyfill::{AtomicU32, Ordering}; |
| 24 | use critical_section::CriticalSection; | ||
| 25 | #[cfg(feature = "integrated-timers")] | 25 | #[cfg(feature = "integrated-timers")] |
| 26 | use embassy_time::driver::{self, AlarmHandle}; | 26 | use embassy_time::driver::{self, AlarmHandle}; |
| 27 | #[cfg(feature = "integrated-timers")] | 27 | #[cfg(feature = "integrated-timers")] |
| @@ -30,7 +30,7 @@ use embassy_time::Instant; | |||
| 30 | use rtos_trace::trace; | 30 | use rtos_trace::trace; |
| 31 | 31 | ||
| 32 | use self::run_queue::{RunQueue, RunQueueItem}; | 32 | use self::run_queue::{RunQueue, RunQueueItem}; |
| 33 | use self::util::UninitCell; | 33 | use self::util::{SyncUnsafeCell, UninitCell}; |
| 34 | pub use self::waker::task_from_waker; | 34 | pub use self::waker::task_from_waker; |
| 35 | use super::SpawnToken; | 35 | use super::SpawnToken; |
| 36 | 36 | ||
| @@ -43,35 +43,49 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; | |||
| 43 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; | 43 | pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; |
| 44 | 44 | ||
| 45 | /// Raw task header for use in task pointers. | 45 | /// Raw task header for use in task pointers. |
| 46 | /// | 46 | pub(crate) struct TaskHeader { |
| 47 | /// This is an opaque struct, used for raw pointers to tasks, for use | ||
| 48 | /// with funtions like [`wake_task`] and [`task_from_waker`]. | ||
| 49 | pub struct TaskHeader { | ||
| 50 | pub(crate) state: AtomicU32, | 47 | pub(crate) state: AtomicU32, |
| 51 | pub(crate) run_queue_item: RunQueueItem, | 48 | pub(crate) run_queue_item: RunQueueItem, |
| 52 | pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 | 49 | pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>, |
| 53 | pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED | 50 | poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, |
| 54 | 51 | ||
| 55 | #[cfg(feature = "integrated-timers")] | 52 | #[cfg(feature = "integrated-timers")] |
| 56 | pub(crate) expires_at: Cell<Instant>, | 53 | pub(crate) expires_at: SyncUnsafeCell<Instant>, |
| 57 | #[cfg(feature = "integrated-timers")] | 54 | #[cfg(feature = "integrated-timers")] |
| 58 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, | 55 | pub(crate) timer_queue_item: timer_queue::TimerQueueItem, |
| 59 | } | 56 | } |
| 60 | 57 | ||
| 61 | impl TaskHeader { | 58 | /// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased. |
| 62 | pub(crate) const fn new() -> Self { | 59 | #[derive(Clone, Copy)] |
| 60 | pub struct TaskRef { | ||
| 61 | ptr: NonNull<TaskHeader>, | ||
| 62 | } | ||
| 63 | |||
| 64 | unsafe impl Send for TaskRef where &'static TaskHeader: Send {} | ||
| 65 | unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {} | ||
| 66 | |||
| 67 | impl TaskRef { | ||
| 68 | fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { | ||
| 63 | Self { | 69 | Self { |
| 64 | state: AtomicU32::new(0), | 70 | ptr: NonNull::from(task).cast(), |
| 65 | run_queue_item: RunQueueItem::new(), | 71 | } |
| 66 | executor: Cell::new(ptr::null()), | 72 | } |
| 67 | poll_fn: UninitCell::uninit(), | ||
| 68 | 73 | ||
| 69 | #[cfg(feature = "integrated-timers")] | 74 | /// Safety: The pointer must have been obtained with `Task::as_ptr` |
| 70 | expires_at: Cell::new(Instant::from_ticks(0)), | 75 | pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self { |
| 71 | #[cfg(feature = "integrated-timers")] | 76 | Self { |
| 72 | timer_queue_item: timer_queue::TimerQueueItem::new(), | 77 | ptr: NonNull::new_unchecked(ptr as *mut TaskHeader), |
| 73 | } | 78 | } |
| 74 | } | 79 | } |
| 80 | |||
| 81 | pub(crate) fn header(self) -> &'static TaskHeader { | ||
| 82 | unsafe { self.ptr.as_ref() } | ||
| 83 | } | ||
| 84 | |||
| 85 | /// The returned pointer is valid for the entire TaskStorage. | ||
| 86 | pub(crate) fn as_ptr(self) -> *const TaskHeader { | ||
| 87 | self.ptr.as_ptr() | ||
| 88 | } | ||
| 75 | } | 89 | } |
| 76 | 90 | ||
| 77 | /// Raw storage in which a task can be spawned. | 91 | /// Raw storage in which a task can be spawned. |
| @@ -101,7 +115,18 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 101 | /// Create a new TaskStorage, in not-spawned state. | 115 | /// Create a new TaskStorage, in not-spawned state. |
| 102 | pub const fn new() -> Self { | 116 | pub const fn new() -> Self { |
| 103 | Self { | 117 | Self { |
| 104 | raw: TaskHeader::new(), | 118 | raw: TaskHeader { |
| 119 | state: AtomicU32::new(0), | ||
| 120 | run_queue_item: RunQueueItem::new(), | ||
| 121 | executor: SyncUnsafeCell::new(None), | ||
| 122 | // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` | ||
| 123 | poll_fn: SyncUnsafeCell::new(None), | ||
| 124 | |||
| 125 | #[cfg(feature = "integrated-timers")] | ||
| 126 | expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)), | ||
| 127 | #[cfg(feature = "integrated-timers")] | ||
| 128 | timer_queue_item: timer_queue::TimerQueueItem::new(), | ||
| 129 | }, | ||
| 105 | future: UninitCell::uninit(), | 130 | future: UninitCell::uninit(), |
| 106 | } | 131 | } |
| 107 | } | 132 | } |
| @@ -120,29 +145,17 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 120 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it | 145 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it |
| 121 | /// on a different executor. | 146 | /// on a different executor. |
| 122 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | 147 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 123 | if self.spawn_mark_used() { | 148 | let task = AvailableTask::claim(self); |
| 124 | return unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }; | 149 | match task { |
| 150 | Some(task) => { | ||
| 151 | let task = task.initialize(future); | ||
| 152 | unsafe { SpawnToken::<F>::new(task) } | ||
| 153 | } | ||
| 154 | None => SpawnToken::new_failed(), | ||
| 125 | } | 155 | } |
| 126 | |||
| 127 | SpawnToken::<F>::new_failed() | ||
| 128 | } | ||
| 129 | |||
| 130 | fn spawn_mark_used(&'static self) -> bool { | ||
| 131 | let state = STATE_SPAWNED | STATE_RUN_QUEUED; | ||
| 132 | self.raw | ||
| 133 | .state | ||
| 134 | .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) | ||
| 135 | .is_ok() | ||
| 136 | } | 156 | } |
| 137 | 157 | ||
| 138 | unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> { | 158 | unsafe fn poll(p: TaskRef) { |
| 139 | // Initialize the task | ||
| 140 | self.raw.poll_fn.write(Self::poll); | ||
| 141 | self.future.write(future()); | ||
| 142 | NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader) | ||
| 143 | } | ||
| 144 | |||
| 145 | unsafe fn poll(p: NonNull<TaskHeader>) { | ||
| 146 | let this = &*(p.as_ptr() as *const TaskStorage<F>); | 159 | let this = &*(p.as_ptr() as *const TaskStorage<F>); |
| 147 | 160 | ||
| 148 | let future = Pin::new_unchecked(this.future.as_mut()); | 161 | let future = Pin::new_unchecked(this.future.as_mut()); |
| @@ -152,6 +165,9 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 152 | Poll::Ready(_) => { | 165 | Poll::Ready(_) => { |
| 153 | this.future.drop_in_place(); | 166 | this.future.drop_in_place(); |
| 154 | this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); | 167 | this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); |
| 168 | |||
| 169 | #[cfg(feature = "integrated-timers")] | ||
| 170 | this.raw.expires_at.set(Instant::MAX); | ||
| 155 | } | 171 | } |
| 156 | Poll::Pending => {} | 172 | Poll::Pending => {} |
| 157 | } | 173 | } |
| @@ -160,9 +176,37 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 160 | // it's a noop for our waker. | 176 | // it's a noop for our waker. |
| 161 | mem::forget(waker); | 177 | mem::forget(waker); |
| 162 | } | 178 | } |
| 179 | |||
| 180 | #[doc(hidden)] | ||
| 181 | #[allow(dead_code)] | ||
| 182 | fn _assert_sync(self) { | ||
| 183 | fn assert_sync<T: Sync>(_: T) {} | ||
| 184 | |||
| 185 | assert_sync(self) | ||
| 186 | } | ||
| 187 | } | ||
| 188 | |||
| 189 | struct AvailableTask<F: Future + 'static> { | ||
| 190 | task: &'static TaskStorage<F>, | ||
| 163 | } | 191 | } |
| 164 | 192 | ||
| 165 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} | 193 | impl<F: Future + 'static> AvailableTask<F> { |
| 194 | fn claim(task: &'static TaskStorage<F>) -> Option<Self> { | ||
| 195 | task.raw | ||
| 196 | .state | ||
| 197 | .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) | ||
| 198 | .ok() | ||
| 199 | .map(|_| Self { task }) | ||
| 200 | } | ||
| 201 | |||
| 202 | fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { | ||
| 203 | unsafe { | ||
| 204 | self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); | ||
| 205 | self.task.future.write(future()); | ||
| 206 | } | ||
| 207 | TaskRef::new(self.task) | ||
| 208 | } | ||
| 209 | } | ||
| 166 | 210 | ||
| 167 | /// Raw storage that can hold up to N tasks of the same type. | 211 | /// Raw storage that can hold up to N tasks of the same type. |
| 168 | /// | 212 | /// |
| @@ -187,13 +231,14 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 187 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, | 231 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, |
| 188 | /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. | 232 | /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error. |
| 189 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | 233 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 190 | for task in &self.pool { | 234 | let task = self.pool.iter().find_map(AvailableTask::claim); |
| 191 | if task.spawn_mark_used() { | 235 | match task { |
| 192 | return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; | 236 | Some(task) => { |
| 237 | let task = task.initialize(future); | ||
| 238 | unsafe { SpawnToken::<F>::new(task) } | ||
| 193 | } | 239 | } |
| 240 | None => SpawnToken::new_failed(), | ||
| 194 | } | 241 | } |
| 195 | |||
| 196 | SpawnToken::<F>::new_failed() | ||
| 197 | } | 242 | } |
| 198 | 243 | ||
| 199 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if | 244 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if |
| @@ -235,39 +280,71 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 235 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly | 280 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly |
| 236 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. | 281 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. |
| 237 | 282 | ||
| 238 | for task in &self.pool { | 283 | let task = self.pool.iter().find_map(AvailableTask::claim); |
| 239 | if task.spawn_mark_used() { | 284 | match task { |
| 240 | return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); | 285 | Some(task) => { |
| 286 | let task = task.initialize(future); | ||
| 287 | unsafe { SpawnToken::<FutFn>::new(task) } | ||
| 241 | } | 288 | } |
| 289 | None => SpawnToken::new_failed(), | ||
| 242 | } | 290 | } |
| 243 | |||
| 244 | SpawnToken::<FutFn>::new_failed() | ||
| 245 | } | 291 | } |
| 246 | } | 292 | } |
| 247 | 293 | ||
| 248 | /// Raw executor. | 294 | #[derive(Clone, Copy)] |
| 249 | /// | 295 | pub(crate) enum PenderInner { |
| 250 | /// This is the core of the Embassy executor. It is low-level, requiring manual | 296 | #[cfg(feature = "executor-thread")] |
| 251 | /// handling of wakeups and task polling. If you can, prefer using one of the | 297 | Thread(crate::arch::ThreadPender), |
| 252 | /// [higher level executors](crate::Executor). | 298 | #[cfg(feature = "executor-interrupt")] |
| 253 | /// | 299 | Interrupt(crate::arch::InterruptPender), |
| 254 | /// The raw executor leaves it up to you to handle wakeups and scheduling: | 300 | #[cfg(feature = "pender-callback")] |
| 255 | /// | 301 | Callback { func: fn(*mut ()), context: *mut () }, |
| 256 | /// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks | 302 | } |
| 257 | /// that "want to run"). | 303 | |
| 258 | /// - You must supply a `signal_fn`. The executor will call it to notify you it has work | 304 | unsafe impl Send for PenderInner {} |
| 259 | /// to do. You must arrange for `poll()` to be called as soon as possible. | 305 | unsafe impl Sync for PenderInner {} |
| 306 | |||
| 307 | /// Platform/architecture-specific action executed when an executor has pending work. | ||
| 260 | /// | 308 | /// |
| 261 | /// `signal_fn` can be called from *any* context: any thread, any interrupt priority | 309 | /// When a task within an executor is woken, the `Pender` is called. This does a |
| 262 | /// level, etc. It may be called synchronously from any `Executor` method call as well. | 310 | /// platform/architecture-specific action to signal there is pending work in the executor. |
| 263 | /// You must deal with this correctly. | 311 | /// When this happens, you must arrange for [`Executor::poll`] to be called. |
| 264 | /// | 312 | /// |
| 265 | /// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates | 313 | /// You can think of it as a waker, but for the whole executor. |
| 266 | /// the requirement for `poll` to not be called reentrantly. | 314 | pub struct Pender(pub(crate) PenderInner); |
| 267 | pub struct Executor { | 315 | |
| 316 | impl Pender { | ||
| 317 | /// Create a `Pender` that will call an arbitrary function pointer. | ||
| 318 | /// | ||
| 319 | /// # Arguments | ||
| 320 | /// | ||
| 321 | /// - `func`: The function pointer to call. | ||
| 322 | /// - `context`: Opaque context pointer, that will be passed to the function pointer. | ||
| 323 | #[cfg(feature = "pender-callback")] | ||
| 324 | pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self { | ||
| 325 | Self(PenderInner::Callback { | ||
| 326 | func, | ||
| 327 | context: context.into(), | ||
| 328 | }) | ||
| 329 | } | ||
| 330 | } | ||
| 331 | |||
| 332 | impl Pender { | ||
| 333 | pub(crate) fn pend(&self) { | ||
| 334 | match self.0 { | ||
| 335 | #[cfg(feature = "executor-thread")] | ||
| 336 | PenderInner::Thread(x) => x.pend(), | ||
| 337 | #[cfg(feature = "executor-interrupt")] | ||
| 338 | PenderInner::Interrupt(x) => x.pend(), | ||
| 339 | #[cfg(feature = "pender-callback")] | ||
| 340 | PenderInner::Callback { func, context } => func(context), | ||
| 341 | } | ||
| 342 | } | ||
| 343 | } | ||
| 344 | |||
| 345 | pub(crate) struct SyncExecutor { | ||
| 268 | run_queue: RunQueue, | 346 | run_queue: RunQueue, |
| 269 | signal_fn: fn(*mut ()), | 347 | pender: Pender, |
| 270 | signal_ctx: *mut (), | ||
| 271 | 348 | ||
| 272 | #[cfg(feature = "integrated-timers")] | 349 | #[cfg(feature = "integrated-timers")] |
| 273 | pub(crate) timer_queue: timer_queue::TimerQueue, | 350 | pub(crate) timer_queue: timer_queue::TimerQueue, |
| @@ -275,23 +352,14 @@ pub struct Executor { | |||
| 275 | alarm: AlarmHandle, | 352 | alarm: AlarmHandle, |
| 276 | } | 353 | } |
| 277 | 354 | ||
| 278 | impl Executor { | 355 | impl SyncExecutor { |
| 279 | /// Create a new executor. | 356 | pub(crate) fn new(pender: Pender) -> Self { |
| 280 | /// | ||
| 281 | /// When the executor has work to do, it will call `signal_fn` with | ||
| 282 | /// `signal_ctx` as argument. | ||
| 283 | /// | ||
| 284 | /// See [`Executor`] docs for details on `signal_fn`. | ||
| 285 | pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self { | ||
| 286 | #[cfg(feature = "integrated-timers")] | 357 | #[cfg(feature = "integrated-timers")] |
| 287 | let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; | 358 | let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; |
| 288 | #[cfg(feature = "integrated-timers")] | ||
| 289 | driver::set_alarm_callback(alarm, signal_fn, signal_ctx); | ||
| 290 | 359 | ||
| 291 | Self { | 360 | Self { |
| 292 | run_queue: RunQueue::new(), | 361 | run_queue: RunQueue::new(), |
| 293 | signal_fn, | 362 | pender, |
| 294 | signal_ctx, | ||
| 295 | 363 | ||
| 296 | #[cfg(feature = "integrated-timers")] | 364 | #[cfg(feature = "integrated-timers")] |
| 297 | timer_queue: timer_queue::TimerQueue::new(), | 365 | timer_queue: timer_queue::TimerQueue::new(), |
| @@ -307,12 +375,133 @@ impl Executor { | |||
| 307 | /// - `task` must be set up to run in this executor. | 375 | /// - `task` must be set up to run in this executor. |
| 308 | /// - `task` must NOT be already enqueued (in this executor or another one). | 376 | /// - `task` must NOT be already enqueued (in this executor or another one). |
| 309 | #[inline(always)] | 377 | #[inline(always)] |
| 310 | unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) { | 378 | unsafe fn enqueue(&self, task: TaskRef) { |
| 311 | #[cfg(feature = "rtos-trace")] | 379 | #[cfg(feature = "rtos-trace")] |
| 312 | trace::task_ready_begin(task.as_ptr() as u32); | 380 | trace::task_ready_begin(task.as_ptr() as u32); |
| 313 | 381 | ||
| 314 | if self.run_queue.enqueue(cs, task) { | 382 | if self.run_queue.enqueue(task) { |
| 315 | (self.signal_fn)(self.signal_ctx) | 383 | self.pender.pend(); |
| 384 | } | ||
| 385 | } | ||
| 386 | |||
| 387 | #[cfg(feature = "integrated-timers")] | ||
| 388 | fn alarm_callback(ctx: *mut ()) { | ||
| 389 | let this: &Self = unsafe { &*(ctx as *const Self) }; | ||
| 390 | this.pender.pend(); | ||
| 391 | } | ||
| 392 | |||
| 393 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { | ||
| 394 | task.header().executor.set(Some(self)); | ||
| 395 | |||
| 396 | #[cfg(feature = "rtos-trace")] | ||
| 397 | trace::task_new(task.as_ptr() as u32); | ||
| 398 | |||
| 399 | self.enqueue(task); | ||
| 400 | } | ||
| 401 | |||
| 402 | /// # Safety | ||
| 403 | /// | ||
| 404 | /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. | ||
| 405 | pub(crate) unsafe fn poll(&'static self) { | ||
| 406 | #[cfg(feature = "integrated-timers")] | ||
| 407 | driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ()); | ||
| 408 | |||
| 409 | #[allow(clippy::never_loop)] | ||
| 410 | loop { | ||
| 411 | #[cfg(feature = "integrated-timers")] | ||
| 412 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | ||
| 413 | |||
| 414 | self.run_queue.dequeue_all(|p| { | ||
| 415 | let task = p.header(); | ||
| 416 | |||
| 417 | #[cfg(feature = "integrated-timers")] | ||
| 418 | task.expires_at.set(Instant::MAX); | ||
| 419 | |||
| 420 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | ||
| 421 | if state & STATE_SPAWNED == 0 { | ||
| 422 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 423 | // - Task gets dequeued, poll starts | ||
| 424 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 425 | // - Task poll finishes, returning done=true | ||
| 426 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 427 | return; | ||
| 428 | } | ||
| 429 | |||
| 430 | #[cfg(feature = "rtos-trace")] | ||
| 431 | trace::task_exec_begin(p.as_ptr() as u32); | ||
| 432 | |||
| 433 | // Run the task | ||
| 434 | task.poll_fn.get().unwrap_unchecked()(p); | ||
| 435 | |||
| 436 | #[cfg(feature = "rtos-trace")] | ||
| 437 | trace::task_exec_end(); | ||
| 438 | |||
| 439 | // Enqueue or update into timer_queue | ||
| 440 | #[cfg(feature = "integrated-timers")] | ||
| 441 | self.timer_queue.update(p); | ||
| 442 | }); | ||
| 443 | |||
| 444 | #[cfg(feature = "integrated-timers")] | ||
| 445 | { | ||
| 446 | // If this is already in the past, set_alarm might return false | ||
| 447 | // In that case do another poll loop iteration. | ||
| 448 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 449 | if driver::set_alarm(self.alarm, next_expiration.as_ticks()) { | ||
| 450 | break; | ||
| 451 | } | ||
| 452 | } | ||
| 453 | |||
| 454 | #[cfg(not(feature = "integrated-timers"))] | ||
| 455 | { | ||
| 456 | break; | ||
| 457 | } | ||
| 458 | } | ||
| 459 | |||
| 460 | #[cfg(feature = "rtos-trace")] | ||
| 461 | trace::system_idle(); | ||
| 462 | } | ||
| 463 | } | ||
| 464 | |||
| 465 | /// Raw executor. | ||
| 466 | /// | ||
| 467 | /// This is the core of the Embassy executor. It is low-level, requiring manual | ||
| 468 | /// handling of wakeups and task polling. If you can, prefer using one of the | ||
| 469 | /// [higher level executors](crate::Executor). | ||
| 470 | /// | ||
| 471 | /// The raw executor leaves it up to you to handle wakeups and scheduling: | ||
| 472 | /// | ||
| 473 | /// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks | ||
| 474 | /// that "want to run"). | ||
| 475 | /// - You must supply a [`Pender`]. The executor will call it to notify you it has work | ||
| 476 | /// to do. You must arrange for `poll()` to be called as soon as possible. | ||
| 477 | /// | ||
| 478 | /// The [`Pender`] can be called from *any* context: any thread, any interrupt priority | ||
| 479 | /// level, etc. It may be called synchronously from any `Executor` method call as well. | ||
| 480 | /// You must deal with this correctly. | ||
| 481 | /// | ||
| 482 | /// In particular, you must NOT call `poll` directly from the pender callback, as this violates | ||
| 483 | /// the requirement for `poll` to not be called reentrantly. | ||
| 484 | #[repr(transparent)] | ||
| 485 | pub struct Executor { | ||
| 486 | pub(crate) inner: SyncExecutor, | ||
| 487 | |||
| 488 | _not_sync: PhantomData<*mut ()>, | ||
| 489 | } | ||
| 490 | |||
| 491 | impl Executor { | ||
| 492 | pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self { | ||
| 493 | mem::transmute(inner) | ||
| 494 | } | ||
| 495 | |||
| 496 | /// Create a new executor. | ||
| 497 | /// | ||
| 498 | /// When the executor has work to do, it will call the [`Pender`]. | ||
| 499 | /// | ||
| 500 | /// See [`Executor`] docs for details on `Pender`. | ||
| 501 | pub fn new(pender: Pender) -> Self { | ||
| 502 | Self { | ||
| 503 | inner: SyncExecutor::new(pender), | ||
| 504 | _not_sync: PhantomData, | ||
| 316 | } | 505 | } |
| 317 | } | 506 | } |
| 318 | 507 | ||
| @@ -325,15 +514,8 @@ impl Executor { | |||
| 325 | /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. | 514 | /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. |
| 326 | /// In this case, the task's Future must be Send. This is because this is effectively | 515 | /// In this case, the task's Future must be Send. This is because this is effectively |
| 327 | /// sending the task to the executor thread. | 516 | /// sending the task to the executor thread. |
| 328 | pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) { | 517 | pub(super) unsafe fn spawn(&'static self, task: TaskRef) { |
| 329 | task.as_ref().executor.set(self); | 518 | self.inner.spawn(task) |
| 330 | |||
| 331 | #[cfg(feature = "rtos-trace")] | ||
| 332 | trace::task_new(task.as_ptr() as u32); | ||
| 333 | |||
| 334 | critical_section::with(|cs| { | ||
| 335 | self.enqueue(cs, task); | ||
| 336 | }) | ||
| 337 | } | 519 | } |
| 338 | 520 | ||
| 339 | /// Poll all queued tasks in this executor. | 521 | /// Poll all queued tasks in this executor. |
| @@ -341,63 +523,20 @@ impl Executor { | |||
| 341 | /// This loops over all tasks that are queued to be polled (i.e. they're | 523 | /// This loops over all tasks that are queued to be polled (i.e. they're |
| 342 | /// freshly spawned or they've been woken). Other tasks are not polled. | 524 | /// freshly spawned or they've been woken). Other tasks are not polled. |
| 343 | /// | 525 | /// |
| 344 | /// You must call `poll` after receiving a call to `signal_fn`. It is OK | 526 | /// You must call `poll` after receiving a call to the [`Pender`]. It is OK |
| 345 | /// to call `poll` even when not requested by `signal_fn`, but it wastes | 527 | /// to call `poll` even when not requested by the `Pender`, but it wastes |
| 346 | /// energy. | 528 | /// energy. |
| 347 | /// | 529 | /// |
| 348 | /// # Safety | 530 | /// # Safety |
| 349 | /// | 531 | /// |
| 350 | /// You must NOT call `poll` reentrantly on the same executor. | 532 | /// You must NOT call `poll` reentrantly on the same executor. |
| 351 | /// | 533 | /// |
| 352 | /// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you | 534 | /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you |
| 353 | /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to | 535 | /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to |
| 354 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's | 536 | /// somehow schedule for `poll()` to be called later, at a time you know for sure there's |
| 355 | /// no `poll()` already running. | 537 | /// no `poll()` already running. |
| 356 | pub unsafe fn poll(&'static self) { | 538 | pub unsafe fn poll(&'static self) { |
| 357 | #[cfg(feature = "integrated-timers")] | 539 | self.inner.poll() |
| 358 | self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); | ||
| 359 | |||
| 360 | self.run_queue.dequeue_all(|p| { | ||
| 361 | let task = p.as_ref(); | ||
| 362 | |||
| 363 | #[cfg(feature = "integrated-timers")] | ||
| 364 | task.expires_at.set(Instant::MAX); | ||
| 365 | |||
| 366 | let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | ||
| 367 | if state & STATE_SPAWNED == 0 { | ||
| 368 | // If task is not running, ignore it. This can happen in the following scenario: | ||
| 369 | // - Task gets dequeued, poll starts | ||
| 370 | // - While task is being polled, it gets woken. It gets placed in the queue. | ||
| 371 | // - Task poll finishes, returning done=true | ||
| 372 | // - RUNNING bit is cleared, but the task is already in the queue. | ||
| 373 | return; | ||
| 374 | } | ||
| 375 | |||
| 376 | #[cfg(feature = "rtos-trace")] | ||
| 377 | trace::task_exec_begin(p.as_ptr() as u32); | ||
| 378 | |||
| 379 | // Run the task | ||
| 380 | task.poll_fn.read()(p as _); | ||
| 381 | |||
| 382 | #[cfg(feature = "rtos-trace")] | ||
| 383 | trace::task_exec_end(); | ||
| 384 | |||
| 385 | // Enqueue or update into timer_queue | ||
| 386 | #[cfg(feature = "integrated-timers")] | ||
| 387 | self.timer_queue.update(p); | ||
| 388 | }); | ||
| 389 | |||
| 390 | #[cfg(feature = "integrated-timers")] | ||
| 391 | { | ||
| 392 | // If this is already in the past, set_alarm will immediately trigger the alarm. | ||
| 393 | // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, | ||
| 394 | // so we immediately do another poll loop iteration. | ||
| 395 | let next_expiration = self.timer_queue.next_expiration(); | ||
| 396 | driver::set_alarm(self.alarm, next_expiration.as_ticks()); | ||
| 397 | } | ||
| 398 | |||
| 399 | #[cfg(feature = "rtos-trace")] | ||
| 400 | trace::system_idle(); | ||
| 401 | } | 540 | } |
| 402 | 541 | ||
| 403 | /// Get a spawner that spawns tasks in this executor. | 542 | /// Get a spawner that spawns tasks in this executor. |
| @@ -409,41 +548,49 @@ impl Executor { | |||
| 409 | } | 548 | } |
| 410 | } | 549 | } |
| 411 | 550 | ||
| 412 | /// Wake a task by raw pointer. | 551 | /// Wake a task by `TaskRef`. |
| 413 | /// | ||
| 414 | /// You can obtain task pointers from `Waker`s using [`task_from_waker`]. | ||
| 415 | /// | 552 | /// |
| 416 | /// # Safety | 553 | /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. |
| 417 | /// | 554 | pub fn wake_task(task: TaskRef) { |
| 418 | /// `task` must be a valid task pointer obtained from [`task_from_waker`]. | 555 | let header = task.header(); |
| 419 | pub unsafe fn wake_task(task: NonNull<TaskHeader>) { | ||
| 420 | critical_section::with(|cs| { | ||
| 421 | let header = task.as_ref(); | ||
| 422 | let state = header.state.load(Ordering::Relaxed); | ||
| 423 | 556 | ||
| 557 | let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { | ||
| 424 | // If already scheduled, or if not started, | 558 | // If already scheduled, or if not started, |
| 425 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { | 559 | if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { |
| 426 | return; | 560 | None |
| 561 | } else { | ||
| 562 | // Mark it as scheduled | ||
| 563 | Some(state | STATE_RUN_QUEUED) | ||
| 427 | } | 564 | } |
| 565 | }); | ||
| 428 | 566 | ||
| 429 | // Mark it as scheduled | 567 | if res.is_ok() { |
| 430 | header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed); | ||
| 431 | |||
| 432 | // We have just marked the task as scheduled, so enqueue it. | 568 | // We have just marked the task as scheduled, so enqueue it. |
| 433 | let executor = &*header.executor.get(); | 569 | unsafe { |
| 434 | executor.enqueue(cs, task); | 570 | let executor = header.executor.get().unwrap_unchecked(); |
| 435 | }) | 571 | executor.enqueue(task); |
| 572 | } | ||
| 573 | } | ||
| 436 | } | 574 | } |
| 437 | 575 | ||
| 438 | #[cfg(feature = "integrated-timers")] | 576 | #[cfg(feature = "integrated-timers")] |
| 439 | #[no_mangle] | 577 | struct TimerQueue; |
| 440 | unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { | 578 | |
| 441 | let task = waker::task_from_waker(waker); | 579 | #[cfg(feature = "integrated-timers")] |
| 442 | let task = task.as_ref(); | 580 | impl embassy_time::queue::TimerQueue for TimerQueue { |
| 443 | let expires_at = task.expires_at.get(); | 581 | fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { |
| 444 | task.expires_at.set(expires_at.min(at)); | 582 | let task = waker::task_from_waker(waker); |
| 583 | let task = task.header(); | ||
| 584 | unsafe { | ||
| 585 | let expires_at = task.expires_at.get(); | ||
| 586 | task.expires_at.set(expires_at.min(at)); | ||
| 587 | } | ||
| 588 | } | ||
| 445 | } | 589 | } |
| 446 | 590 | ||
| 591 | #[cfg(feature = "integrated-timers")] | ||
| 592 | embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); | ||
| 593 | |||
| 447 | #[cfg(feature = "rtos-trace")] | 594 | #[cfg(feature = "rtos-trace")] |
| 448 | impl rtos_trace::RtosTraceOSCallbacks for Executor { | 595 | impl rtos_trace::RtosTraceOSCallbacks for Executor { |
| 449 | fn task_list() { | 596 | fn task_list() { |
diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue.rs index ed8c82a5c..f1ec19ac1 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue.rs | |||
| @@ -2,18 +2,18 @@ use core::ptr; | |||
| 2 | use core::ptr::NonNull; | 2 | use core::ptr::NonNull; |
| 3 | 3 | ||
| 4 | use atomic_polyfill::{AtomicPtr, Ordering}; | 4 | use atomic_polyfill::{AtomicPtr, Ordering}; |
| 5 | use critical_section::CriticalSection; | ||
| 6 | 5 | ||
| 7 | use super::TaskHeader; | 6 | use super::{TaskHeader, TaskRef}; |
| 7 | use crate::raw::util::SyncUnsafeCell; | ||
| 8 | 8 | ||
| 9 | pub(crate) struct RunQueueItem { | 9 | pub(crate) struct RunQueueItem { |
| 10 | next: AtomicPtr<TaskHeader>, | 10 | next: SyncUnsafeCell<Option<TaskRef>>, |
| 11 | } | 11 | } |
| 12 | 12 | ||
| 13 | impl RunQueueItem { | 13 | impl RunQueueItem { |
| 14 | pub const fn new() -> Self { | 14 | pub const fn new() -> Self { |
| 15 | Self { | 15 | Self { |
| 16 | next: AtomicPtr::new(ptr::null_mut()), | 16 | next: SyncUnsafeCell::new(None), |
| 17 | } | 17 | } |
| 18 | } | 18 | } |
| 19 | } | 19 | } |
| @@ -46,29 +46,43 @@ impl RunQueue { | |||
| 46 | /// | 46 | /// |
| 47 | /// `item` must NOT be already enqueued in any queue. | 47 | /// `item` must NOT be already enqueued in any queue. |
| 48 | #[inline(always)] | 48 | #[inline(always)] |
| 49 | pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool { | 49 | pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { |
| 50 | let prev = self.head.load(Ordering::Relaxed); | 50 | let mut was_empty = false; |
| 51 | task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed); | 51 | |
| 52 | self.head.store(task.as_ptr(), Ordering::Relaxed); | 52 | self.head |
| 53 | prev.is_null() | 53 | .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| { |
| 54 | was_empty = prev.is_null(); | ||
| 55 | unsafe { | ||
| 56 | // safety: the pointer is either null or valid | ||
| 57 | let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())); | ||
| 58 | // safety: there are no concurrent accesses to `next` | ||
| 59 | task.header().run_queue_item.next.set(prev); | ||
| 60 | } | ||
| 61 | Some(task.as_ptr() as *mut _) | ||
| 62 | }) | ||
| 63 | .ok(); | ||
| 64 | |||
| 65 | was_empty | ||
| 54 | } | 66 | } |
| 55 | 67 | ||
| 56 | /// Empty the queue, then call `on_task` for each task that was in the queue. | 68 | /// Empty the queue, then call `on_task` for each task that was in the queue. |
| 57 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue | 69 | /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue |
| 58 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. | 70 | /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. |
| 59 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) { | 71 | pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { |
| 60 | // Atomically empty the queue. | 72 | // Atomically empty the queue. |
| 61 | let mut ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); | 73 | let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel); |
| 74 | |||
| 75 | // safety: the pointer is either null or valid | ||
| 76 | let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) }; | ||
| 62 | 77 | ||
| 63 | // Iterate the linked list of tasks that were previously in the queue. | 78 | // Iterate the linked list of tasks that were previously in the queue. |
| 64 | while let Some(task) = NonNull::new(ptr) { | 79 | while let Some(task) = next { |
| 65 | // If the task re-enqueues itself, the `next` pointer will get overwritten. | 80 | // If the task re-enqueues itself, the `next` pointer will get overwritten. |
| 66 | // Therefore, first read the next pointer, and only then process the task. | 81 | // Therefore, first read the next pointer, and only then process the task. |
| 67 | let next = unsafe { task.as_ref() }.run_queue_item.next.load(Ordering::Relaxed); | 82 | // safety: there are no concurrent accesses to `next` |
| 83 | next = unsafe { task.header().run_queue_item.next.get() }; | ||
| 68 | 84 | ||
| 69 | on_task(task); | 85 | on_task(task); |
| 70 | |||
| 71 | ptr = next | ||
| 72 | } | 86 | } |
| 73 | } | 87 | } |
| 74 | } | 88 | } |
diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 24c31892a..dc71c95b1 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs | |||
| @@ -1,45 +1,43 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::cmp::min; | 1 | use core::cmp::min; |
| 3 | use core::ptr; | ||
| 4 | use core::ptr::NonNull; | ||
| 5 | 2 | ||
| 6 | use atomic_polyfill::Ordering; | 3 | use atomic_polyfill::Ordering; |
| 7 | use embassy_time::Instant; | 4 | use embassy_time::Instant; |
| 8 | 5 | ||
| 9 | use super::{TaskHeader, STATE_TIMER_QUEUED}; | 6 | use super::{TaskRef, STATE_TIMER_QUEUED}; |
| 7 | use crate::raw::util::SyncUnsafeCell; | ||
| 10 | 8 | ||
| 11 | pub(crate) struct TimerQueueItem { | 9 | pub(crate) struct TimerQueueItem { |
| 12 | next: Cell<*mut TaskHeader>, | 10 | next: SyncUnsafeCell<Option<TaskRef>>, |
| 13 | } | 11 | } |
| 14 | 12 | ||
| 15 | impl TimerQueueItem { | 13 | impl TimerQueueItem { |
| 16 | pub const fn new() -> Self { | 14 | pub const fn new() -> Self { |
| 17 | Self { | 15 | Self { |
| 18 | next: Cell::new(ptr::null_mut()), | 16 | next: SyncUnsafeCell::new(None), |
| 19 | } | 17 | } |
| 20 | } | 18 | } |
| 21 | } | 19 | } |
| 22 | 20 | ||
| 23 | pub(crate) struct TimerQueue { | 21 | pub(crate) struct TimerQueue { |
| 24 | head: Cell<*mut TaskHeader>, | 22 | head: SyncUnsafeCell<Option<TaskRef>>, |
| 25 | } | 23 | } |
| 26 | 24 | ||
| 27 | impl TimerQueue { | 25 | impl TimerQueue { |
| 28 | pub const fn new() -> Self { | 26 | pub const fn new() -> Self { |
| 29 | Self { | 27 | Self { |
| 30 | head: Cell::new(ptr::null_mut()), | 28 | head: SyncUnsafeCell::new(None), |
| 31 | } | 29 | } |
| 32 | } | 30 | } |
| 33 | 31 | ||
| 34 | pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) { | 32 | pub(crate) unsafe fn update(&self, p: TaskRef) { |
| 35 | let task = p.as_ref(); | 33 | let task = p.header(); |
| 36 | if task.expires_at.get() != Instant::MAX { | 34 | if task.expires_at.get() != Instant::MAX { |
| 37 | let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); | 35 | let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); |
| 38 | let is_new = old_state & STATE_TIMER_QUEUED == 0; | 36 | let is_new = old_state & STATE_TIMER_QUEUED == 0; |
| 39 | 37 | ||
| 40 | if is_new { | 38 | if is_new { |
| 41 | task.timer_queue_item.next.set(self.head.get()); | 39 | task.timer_queue_item.next.set(self.head.get()); |
| 42 | self.head.set(p.as_ptr()); | 40 | self.head.set(Some(p)); |
| 43 | } | 41 | } |
| 44 | } | 42 | } |
| 45 | } | 43 | } |
| @@ -47,7 +45,7 @@ impl TimerQueue { | |||
| 47 | pub(crate) unsafe fn next_expiration(&self) -> Instant { | 45 | pub(crate) unsafe fn next_expiration(&self) -> Instant { |
| 48 | let mut res = Instant::MAX; | 46 | let mut res = Instant::MAX; |
| 49 | self.retain(|p| { | 47 | self.retain(|p| { |
| 50 | let task = p.as_ref(); | 48 | let task = p.header(); |
| 51 | let expires = task.expires_at.get(); | 49 | let expires = task.expires_at.get(); |
| 52 | res = min(res, expires); | 50 | res = min(res, expires); |
| 53 | expires != Instant::MAX | 51 | expires != Instant::MAX |
| @@ -55,9 +53,9 @@ impl TimerQueue { | |||
| 55 | res | 53 | res |
| 56 | } | 54 | } |
| 57 | 55 | ||
| 58 | pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<TaskHeader>)) { | 56 | pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(TaskRef)) { |
| 59 | self.retain(|p| { | 57 | self.retain(|p| { |
| 60 | let task = p.as_ref(); | 58 | let task = p.header(); |
| 61 | if task.expires_at.get() <= now { | 59 | if task.expires_at.get() <= now { |
| 62 | on_task(p); | 60 | on_task(p); |
| 63 | false | 61 | false |
| @@ -67,11 +65,10 @@ impl TimerQueue { | |||
| 67 | }); | 65 | }); |
| 68 | } | 66 | } |
| 69 | 67 | ||
| 70 | pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) { | 68 | pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) { |
| 71 | let mut prev = &self.head; | 69 | let mut prev = &self.head; |
| 72 | while !prev.get().is_null() { | 70 | while let Some(p) = prev.get() { |
| 73 | let p = NonNull::new_unchecked(prev.get()); | 71 | let task = p.header(); |
| 74 | let task = &*p.as_ptr(); | ||
| 75 | if f(p) { | 72 | if f(p) { |
| 76 | // Skip to next | 73 | // Skip to next |
| 77 | prev = &task.timer_queue_item.next; | 74 | prev = &task.timer_queue_item.next; |
diff --git a/embassy-executor/src/raw/util.rs b/embassy-executor/src/raw/util.rs index ed5822188..e2e8f4df8 100644 --- a/embassy-executor/src/raw/util.rs +++ b/embassy-executor/src/raw/util.rs | |||
| @@ -26,8 +26,31 @@ impl<T> UninitCell<T> { | |||
| 26 | } | 26 | } |
| 27 | } | 27 | } |
| 28 | 28 | ||
| 29 | impl<T: Copy> UninitCell<T> { | 29 | unsafe impl<T> Sync for UninitCell<T> {} |
| 30 | pub unsafe fn read(&self) -> T { | 30 | |
| 31 | ptr::read(self.as_mut_ptr()) | 31 | #[repr(transparent)] |
| 32 | pub struct SyncUnsafeCell<T> { | ||
| 33 | value: UnsafeCell<T>, | ||
| 34 | } | ||
| 35 | |||
| 36 | unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {} | ||
| 37 | |||
| 38 | impl<T> SyncUnsafeCell<T> { | ||
| 39 | #[inline] | ||
| 40 | pub const fn new(value: T) -> Self { | ||
| 41 | Self { | ||
| 42 | value: UnsafeCell::new(value), | ||
| 43 | } | ||
| 44 | } | ||
| 45 | |||
| 46 | pub unsafe fn set(&self, value: T) { | ||
| 47 | *self.value.get() = value; | ||
| 48 | } | ||
| 49 | |||
| 50 | pub unsafe fn get(&self) -> T | ||
| 51 | where | ||
| 52 | T: Copy, | ||
| 53 | { | ||
| 54 | *self.value.get() | ||
| 32 | } | 55 | } |
| 33 | } | 56 | } |
diff --git a/embassy-executor/src/raw/waker.rs b/embassy-executor/src/raw/waker.rs index 5765259f2..400b37fa9 100644 --- a/embassy-executor/src/raw/waker.rs +++ b/embassy-executor/src/raw/waker.rs | |||
| @@ -1,8 +1,7 @@ | |||
| 1 | use core::mem; | 1 | use core::mem; |
| 2 | use core::ptr::NonNull; | ||
| 3 | use core::task::{RawWaker, RawWakerVTable, Waker}; | 2 | use core::task::{RawWaker, RawWakerVTable, Waker}; |
| 4 | 3 | ||
| 5 | use super::{wake_task, TaskHeader}; | 4 | use super::{wake_task, TaskHeader, TaskRef}; |
| 6 | 5 | ||
| 7 | const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); | 6 | const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); |
| 8 | 7 | ||
| @@ -11,14 +10,14 @@ unsafe fn clone(p: *const ()) -> RawWaker { | |||
| 11 | } | 10 | } |
| 12 | 11 | ||
| 13 | unsafe fn wake(p: *const ()) { | 12 | unsafe fn wake(p: *const ()) { |
| 14 | wake_task(NonNull::new_unchecked(p as *mut TaskHeader)) | 13 | wake_task(TaskRef::from_ptr(p as *const TaskHeader)) |
| 15 | } | 14 | } |
| 16 | 15 | ||
| 17 | unsafe fn drop(_: *const ()) { | 16 | unsafe fn drop(_: *const ()) { |
| 18 | // nop | 17 | // nop |
| 19 | } | 18 | } |
| 20 | 19 | ||
| 21 | pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { | 20 | pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { |
| 22 | Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) | 21 | Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) |
| 23 | } | 22 | } |
| 24 | 23 | ||
| @@ -33,7 +32,7 @@ pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker { | |||
| 33 | /// # Panics | 32 | /// # Panics |
| 34 | /// | 33 | /// |
| 35 | /// Panics if the waker is not created by the Embassy executor. | 34 | /// Panics if the waker is not created by the Embassy executor. |
| 36 | pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { | 35 | pub fn task_from_waker(waker: &Waker) -> TaskRef { |
| 37 | // safety: OK because WakerHack has the same layout as Waker. | 36 | // safety: OK because WakerHack has the same layout as Waker. |
| 38 | // This is not really guaranteed because the structs are `repr(Rust)`, it is | 37 | // This is not really guaranteed because the structs are `repr(Rust)`, it is |
| 39 | // indeed the case in the current implementation. | 38 | // indeed the case in the current implementation. |
| @@ -43,8 +42,8 @@ pub fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> { | |||
| 43 | panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") | 42 | panic!("Found waker not created by the Embassy executor. `embassy_time::Timer` only works with the Embassy executor.") |
| 44 | } | 43 | } |
| 45 | 44 | ||
| 46 | // safety: we never create a waker with a null data pointer. | 45 | // safety: our wakers are always created with `TaskRef::as_ptr` |
| 47 | unsafe { NonNull::new_unchecked(hack.data as *mut TaskHeader) } | 46 | unsafe { TaskRef::from_ptr(hack.data as *const TaskHeader) } |
| 48 | } | 47 | } |
| 49 | 48 | ||
| 50 | struct WakerHack { | 49 | struct WakerHack { |
diff --git a/embassy-executor/src/raw/waker_turbo.rs b/embassy-executor/src/raw/waker_turbo.rs new file mode 100644 index 000000000..435a0ff7e --- /dev/null +++ b/embassy-executor/src/raw/waker_turbo.rs | |||
| @@ -0,0 +1,34 @@ | |||
| 1 | use core::ptr::NonNull; | ||
| 2 | use core::task::Waker; | ||
| 3 | |||
| 4 | use super::{wake_task, TaskHeader, TaskRef}; | ||
| 5 | |||
| 6 | pub(crate) unsafe fn from_task(p: TaskRef) -> Waker { | ||
| 7 | Waker::from_turbo_ptr(NonNull::new_unchecked(p.as_ptr() as _)) | ||
| 8 | } | ||
| 9 | |||
| 10 | /// Get a task pointer from a waker. | ||
| 11 | /// | ||
| 12 | /// This can be used as an optimization in wait queues to store task pointers | ||
| 13 | /// (1 word) instead of full Wakers (2 words). This saves a bit of RAM and helps | ||
| 14 | /// avoid dynamic dispatch. | ||
| 15 | /// | ||
| 16 | /// You can use the returned task pointer to wake the task with [`wake_task`](super::wake_task). | ||
| 17 | /// | ||
| 18 | /// # Panics | ||
| 19 | /// | ||
| 20 | /// Panics if the waker is not created by the Embassy executor. | ||
| 21 | pub fn task_from_waker(waker: &Waker) -> TaskRef { | ||
| 22 | let ptr = waker.as_turbo_ptr().as_ptr(); | ||
| 23 | |||
| 24 | // safety: our wakers are always created with `TaskRef::as_ptr` | ||
| 25 | unsafe { TaskRef::from_ptr(ptr as *const TaskHeader) } | ||
| 26 | } | ||
| 27 | |||
| 28 | #[inline(never)] | ||
| 29 | #[no_mangle] | ||
| 30 | fn _turbo_wake(ptr: NonNull<()>) { | ||
| 31 | // safety: our wakers are always created with `TaskRef::as_ptr` | ||
| 32 | let task = unsafe { TaskRef::from_ptr(ptr.as_ptr() as *const TaskHeader) }; | ||
| 33 | wake_task(task) | ||
| 34 | } | ||
diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 25a0d7dbb..2b6224045 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs | |||
| @@ -1,10 +1,8 @@ | |||
| 1 | use core::future::poll_fn; | ||
| 1 | use core::marker::PhantomData; | 2 | use core::marker::PhantomData; |
| 2 | use core::mem; | 3 | use core::mem; |
| 3 | use core::ptr::NonNull; | ||
| 4 | use core::task::Poll; | 4 | use core::task::Poll; |
| 5 | 5 | ||
| 6 | use futures_util::future::poll_fn; | ||
| 7 | |||
| 8 | use super::raw; | 6 | use super::raw; |
| 9 | 7 | ||
| 10 | /// Token to spawn a newly-created task in an executor. | 8 | /// Token to spawn a newly-created task in an executor. |
| @@ -23,12 +21,12 @@ use super::raw; | |||
| 23 | /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. | 21 | /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. |
| 24 | #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] | 22 | #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] |
| 25 | pub struct SpawnToken<S> { | 23 | pub struct SpawnToken<S> { |
| 26 | raw_task: Option<NonNull<raw::TaskHeader>>, | 24 | raw_task: Option<raw::TaskRef>, |
| 27 | phantom: PhantomData<*mut S>, | 25 | phantom: PhantomData<*mut S>, |
| 28 | } | 26 | } |
| 29 | 27 | ||
| 30 | impl<S> SpawnToken<S> { | 28 | impl<S> SpawnToken<S> { |
| 31 | pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { | 29 | pub(crate) unsafe fn new(raw_task: raw::TaskRef) -> Self { |
| 32 | Self { | 30 | Self { |
| 33 | raw_task: Some(raw_task), | 31 | raw_task: Some(raw_task), |
| 34 | phantom: PhantomData, | 32 | phantom: PhantomData, |
| @@ -91,10 +89,11 @@ impl Spawner { | |||
| 91 | /// | 89 | /// |
| 92 | /// Panics if the current executor is not an Embassy executor. | 90 | /// Panics if the current executor is not an Embassy executor. |
| 93 | pub async fn for_current_executor() -> Self { | 91 | pub async fn for_current_executor() -> Self { |
| 94 | poll_fn(|cx| unsafe { | 92 | poll_fn(|cx| { |
| 95 | let task = raw::task_from_waker(cx.waker()); | 93 | let task = raw::task_from_waker(cx.waker()); |
| 96 | let executor = (*task.as_ptr()).executor.get(); | 94 | let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; |
| 97 | Poll::Ready(Self::new(&*executor)) | 95 | let executor = unsafe { raw::Executor::wrap(executor) }; |
| 96 | Poll::Ready(Self::new(executor)) | ||
| 98 | }) | 97 | }) |
| 99 | .await | 98 | .await |
| 100 | } | 99 | } |
| @@ -132,9 +131,7 @@ impl Spawner { | |||
| 132 | /// spawner to other threads, but the spawner loses the ability to spawn | 131 | /// spawner to other threads, but the spawner loses the ability to spawn |
| 133 | /// non-Send tasks. | 132 | /// non-Send tasks. |
| 134 | pub fn make_send(&self) -> SendSpawner { | 133 | pub fn make_send(&self) -> SendSpawner { |
| 135 | SendSpawner { | 134 | SendSpawner::new(&self.executor.inner) |
| 136 | executor: self.executor, | ||
| 137 | } | ||
| 138 | } | 135 | } |
| 139 | } | 136 | } |
| 140 | 137 | ||
| @@ -147,14 +144,11 @@ impl Spawner { | |||
| 147 | /// If you want to spawn non-Send tasks, use [Spawner]. | 144 | /// If you want to spawn non-Send tasks, use [Spawner]. |
| 148 | #[derive(Copy, Clone)] | 145 | #[derive(Copy, Clone)] |
| 149 | pub struct SendSpawner { | 146 | pub struct SendSpawner { |
| 150 | executor: &'static raw::Executor, | 147 | executor: &'static raw::SyncExecutor, |
| 151 | } | 148 | } |
| 152 | 149 | ||
| 153 | unsafe impl Send for SendSpawner {} | ||
| 154 | unsafe impl Sync for SendSpawner {} | ||
| 155 | |||
| 156 | impl SendSpawner { | 150 | impl SendSpawner { |
| 157 | pub(crate) fn new(executor: &'static raw::Executor) -> Self { | 151 | pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self { |
| 158 | Self { executor } | 152 | Self { executor } |
| 159 | } | 153 | } |
| 160 | 154 | ||
| @@ -167,10 +161,10 @@ impl SendSpawner { | |||
| 167 | /// | 161 | /// |
| 168 | /// Panics if the current executor is not an Embassy executor. | 162 | /// Panics if the current executor is not an Embassy executor. |
| 169 | pub async fn for_current_executor() -> Self { | 163 | pub async fn for_current_executor() -> Self { |
| 170 | poll_fn(|cx| unsafe { | 164 | poll_fn(|cx| { |
| 171 | let task = raw::task_from_waker(cx.waker()); | 165 | let task = raw::task_from_waker(cx.waker()); |
| 172 | let executor = (*task.as_ptr()).executor.get(); | 166 | let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; |
| 173 | Poll::Ready(Self::new(&*executor)) | 167 | Poll::Ready(Self::new(executor)) |
| 174 | }) | 168 | }) |
| 175 | .await | 169 | .await |
| 176 | } | 170 | } |
