diff options
| -rw-r--r-- | embassy-macros/src/macros/task.rs | 24 | ||||
| -rw-r--r-- | embassy/src/executor/raw/mod.rs | 121 | ||||
| -rw-r--r-- | embassy/src/executor/spawner.rs | 20 |
3 files changed, 111 insertions, 54 deletions
diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs index 396ce18f2..e48de3d63 100644 --- a/embassy-macros/src/macros/task.rs +++ b/embassy-macros/src/macros/task.rs | |||
| @@ -73,26 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result<TokenStream, Toke | |||
| 73 | // in the user's code. | 73 | // in the user's code. |
| 74 | #task_inner | 74 | #task_inner |
| 75 | 75 | ||
| 76 | #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl ::core::future::Future + 'static> { | 76 | #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl Sized> { |
| 77 | use ::core::future::Future; | 77 | type Fut = impl ::core::future::Future + 'static; |
| 78 | use #embassy_path::executor::SpawnToken; | 78 | static POOL: #embassy_path::executor::raw::TaskPool<Fut, #pool_size> = #embassy_path::executor::raw::TaskPool::new(); |
| 79 | use #embassy_path::executor::raw::TaskStorage; | 79 | unsafe { POOL._spawn_async_fn(move || #task_inner_ident(#(#arg_names,)*)) } |
| 80 | |||
| 81 | type Fut = impl Future + 'static; | ||
| 82 | |||
| 83 | #[allow(clippy::declare_interior_mutable_const)] | ||
| 84 | const NEW_TS: TaskStorage<Fut> = TaskStorage::new(); | ||
| 85 | |||
| 86 | static POOL: [TaskStorage<Fut>; #pool_size] = [NEW_TS; #pool_size]; | ||
| 87 | |||
| 88 | // Opaque type laundering, to obscure its origin! | ||
| 89 | // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope" | ||
| 90 | // https://github.com/rust-lang/rust/issues/96406 | ||
| 91 | fn launder_tait(token: SpawnToken<impl Future+'static>) -> SpawnToken<impl Future+'static> { | ||
| 92 | token | ||
| 93 | } | ||
| 94 | |||
| 95 | launder_tait(unsafe { TaskStorage::spawn_pool(&POOL, move || #task_inner_ident(#(#arg_names,)*)) }) | ||
| 96 | } | 80 | } |
| 97 | }; | 81 | }; |
| 98 | 82 | ||
diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index 3ae52ae31..5cf399cdf 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs | |||
| @@ -110,8 +110,8 @@ impl TaskHeader { | |||
| 110 | /// Raw storage in which a task can be spawned. | 110 | /// Raw storage in which a task can be spawned. |
| 111 | /// | 111 | /// |
| 112 | /// This struct holds the necessary memory to spawn one task whose future is `F`. | 112 | /// This struct holds the necessary memory to spawn one task whose future is `F`. |
| 113 | /// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it | 113 | /// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You |
| 114 | /// with [`Task::spawn()`], which will fail if it is already spawned. | 114 | /// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. |
| 115 | /// | 115 | /// |
| 116 | /// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished | 116 | /// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished |
| 117 | /// running. Hence the relevant methods require `&'static self`. It may be reused, however. | 117 | /// running. Hence the relevant methods require `&'static self`. It may be reused, however. |
| @@ -121,7 +121,7 @@ impl TaskHeader { | |||
| 121 | /// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. | 121 | /// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. |
| 122 | 122 | ||
| 123 | // repr(C) is needed to guarantee that the Task is located at offset 0 | 123 | // repr(C) is needed to guarantee that the Task is located at offset 0 |
| 124 | // This makes it safe to cast between Task and Task pointers. | 124 | // This makes it safe to cast between TaskHeader and TaskStorage pointers. |
| 125 | #[repr(C)] | 125 | #[repr(C)] |
| 126 | pub struct TaskStorage<F: Future + 'static> { | 126 | pub struct TaskStorage<F: Future + 'static> { |
| 127 | raw: TaskHeader, | 127 | raw: TaskHeader, |
| @@ -129,6 +129,9 @@ pub struct TaskStorage<F: Future + 'static> { | |||
| 129 | } | 129 | } |
| 130 | 130 | ||
| 131 | impl<F: Future + 'static> TaskStorage<F> { | 131 | impl<F: Future + 'static> TaskStorage<F> { |
| 132 | #[cfg(feature = "nightly")] | ||
| 133 | const NEW: Self = Self::new(); | ||
| 134 | |||
| 132 | /// Create a new TaskStorage, in not-spawned state. | 135 | /// Create a new TaskStorage, in not-spawned state. |
| 133 | #[cfg(feature = "nightly")] | 136 | #[cfg(feature = "nightly")] |
| 134 | pub const fn new() -> Self { | 137 | pub const fn new() -> Self { |
| @@ -147,22 +150,6 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 147 | } | 150 | } |
| 148 | } | 151 | } |
| 149 | 152 | ||
| 150 | /// Try to spawn a task in a pool. | ||
| 151 | /// | ||
| 152 | /// See [`Self::spawn()`] for details. | ||
| 153 | /// | ||
| 154 | /// This will loop over the pool and spawn the task in the first storage that | ||
| 155 | /// is currently free. If none is free, | ||
| 156 | pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> { | ||
| 157 | for task in pool { | ||
| 158 | if task.spawn_allocate() { | ||
| 159 | return unsafe { task.spawn_initialize(future) }; | ||
| 160 | } | ||
| 161 | } | ||
| 162 | |||
| 163 | SpawnToken::new_failed() | ||
| 164 | } | ||
| 165 | |||
| 166 | /// Try to spawn the task. | 153 | /// Try to spawn the task. |
| 167 | /// | 154 | /// |
| 168 | /// The `future` closure constructs the future. It's only called if spawning is | 155 | /// The `future` closure constructs the future. It's only called if spawning is |
| @@ -172,15 +159,15 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 172 | /// | 159 | /// |
| 173 | /// This function will fail if the task is already spawned and has not finished running. | 160 | /// This function will fail if the task is already spawned and has not finished running. |
| 174 | /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will | 161 | /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will |
| 175 | /// cause [`Executor::spawn()`] to return the error. | 162 | /// cause [`Spawner::spawn()`] to return the error. |
| 176 | /// | 163 | /// |
| 177 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it | 164 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it |
| 178 | /// on a different executor. | 165 | /// on a different executor. |
| 179 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { | 166 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 180 | if self.spawn_allocate() { | 167 | if self.spawn_allocate() { |
| 181 | unsafe { self.spawn_initialize(future) } | 168 | unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) } |
| 182 | } else { | 169 | } else { |
| 183 | SpawnToken::new_failed() | 170 | SpawnToken::<F>::new_failed() |
| 184 | } | 171 | } |
| 185 | } | 172 | } |
| 186 | 173 | ||
| @@ -192,12 +179,11 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 192 | .is_ok() | 179 | .is_ok() |
| 193 | } | 180 | } |
| 194 | 181 | ||
| 195 | unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { | 182 | unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> { |
| 196 | // Initialize the task | 183 | // Initialize the task |
| 197 | self.raw.poll_fn.write(Self::poll); | 184 | self.raw.poll_fn.write(Self::poll); |
| 198 | self.future.write(future()); | 185 | self.future.write(future()); |
| 199 | 186 | NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader) | |
| 200 | SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) | ||
| 201 | } | 187 | } |
| 202 | 188 | ||
| 203 | unsafe fn poll(p: NonNull<TaskHeader>) { | 189 | unsafe fn poll(p: NonNull<TaskHeader>) { |
| @@ -222,6 +208,89 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 222 | 208 | ||
| 223 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} | 209 | unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} |
| 224 | 210 | ||
| 211 | /// Raw storage that can hold up to N tasks of the same type. | ||
| 212 | /// | ||
| 213 | /// This is essentially a `[TaskStorage<F>; N]`. | ||
| 214 | #[cfg(feature = "nightly")] | ||
| 215 | pub struct TaskPool<F: Future + 'static, const N: usize> { | ||
| 216 | pool: [TaskStorage<F>; N], | ||
| 217 | } | ||
| 218 | |||
| 219 | #[cfg(feature = "nightly")] | ||
| 220 | impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | ||
| 221 | /// Create a new TaskPool, with all tasks in non-spawned state. | ||
| 222 | pub const fn new() -> Self { | ||
| 223 | Self { | ||
| 224 | pool: [TaskStorage::NEW; N], | ||
| 225 | } | ||
| 226 | } | ||
| 227 | |||
| 228 | /// Try to spawn a task in the pool. | ||
| 229 | /// | ||
| 230 | /// See [`TaskStorage::spawn()`] for details. | ||
| 231 | /// | ||
| 232 | /// This will loop over the pool and spawn the task in the first storage that | ||
| 233 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, | ||
| 234 | /// which will cause [`Spawner::spawn()`] to return the error. | ||
| 235 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { | ||
| 236 | for task in &self.pool { | ||
| 237 | if task.spawn_allocate() { | ||
| 238 | return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) }; | ||
| 239 | } | ||
| 240 | } | ||
| 241 | |||
| 242 | SpawnToken::<F>::new_failed() | ||
| 243 | } | ||
| 244 | |||
| 245 | /// Like spawn(), but allows the task to be send-spawned if the args are Send even if | ||
| 246 | /// the future is !Send. | ||
| 247 | /// | ||
| 248 | /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used | ||
| 249 | /// by the Embassy macros ONLY. | ||
| 250 | /// | ||
| 251 | /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` | ||
| 252 | /// is an `async fn`, NOT a hand-written `Future`. | ||
| 253 | #[doc(hidden)] | ||
| 254 | pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> | ||
| 255 | where | ||
| 256 | FutFn: FnOnce() -> F, | ||
| 257 | { | ||
| 258 | // When send-spawning a task, we construct the future in this thread, and effectively | ||
| 259 | // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, | ||
| 260 | // send-spawning should require the future `F` to be `Send`. | ||
| 261 | // | ||
| 262 | // The problem is this is more restrictive than needed. Once the future is executing, | ||
| 263 | // it is never sent to another thread. It is only sent when spawning. It should be | ||
| 264 | // enough for the task's arguments to be Send. (and in practice it's super easy to | ||
| 265 | // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) | ||
| 266 | // | ||
| 267 | // We can do it by sending the task args and constructing the future in the executor thread | ||
| 268 | // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy | ||
| 269 | // of the args. | ||
| 270 | // | ||
| 271 | // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the | ||
| 272 | // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. | ||
| 273 | // | ||
| 274 | // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, | ||
| 275 | // but it's possible it'll be guaranteed in the future. See zulip thread: | ||
| 276 | // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) | ||
| 277 | // | ||
| 278 | // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. | ||
| 279 | // This is why we return `SpawnToken<FutFn>` below. | ||
| 280 | // | ||
| 281 | // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly | ||
| 282 | // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. | ||
| 283 | |||
| 284 | for task in &self.pool { | ||
| 285 | if task.spawn_allocate() { | ||
| 286 | return SpawnToken::<FutFn>::new(task.spawn_initialize(future)); | ||
| 287 | } | ||
| 288 | } | ||
| 289 | |||
| 290 | SpawnToken::<FutFn>::new_failed() | ||
| 291 | } | ||
| 292 | } | ||
| 293 | |||
| 225 | /// Raw executor. | 294 | /// Raw executor. |
| 226 | /// | 295 | /// |
| 227 | /// This is the core of the Embassy executor. It is low-level, requiring manual | 296 | /// This is the core of the Embassy executor. It is low-level, requiring manual |
diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs index e6770e299..73c1f786f 100644 --- a/embassy/src/executor/spawner.rs +++ b/embassy/src/executor/spawner.rs | |||
| @@ -12,17 +12,21 @@ use super::raw; | |||
| 12 | /// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must | 12 | /// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must |
| 13 | /// then spawn it into an executor, typically with [`Spawner::spawn()`]. | 13 | /// then spawn it into an executor, typically with [`Spawner::spawn()`]. |
| 14 | /// | 14 | /// |
| 15 | /// The generic parameter `S` determines whether the task can be spawned in executors | ||
| 16 | /// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`]. | ||
| 17 | /// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`]. | ||
| 18 | /// | ||
| 15 | /// # Panics | 19 | /// # Panics |
| 16 | /// | 20 | /// |
| 17 | /// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. | 21 | /// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. |
| 18 | /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. | 22 | /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. |
| 19 | #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] | 23 | #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] |
| 20 | pub struct SpawnToken<F> { | 24 | pub struct SpawnToken<S> { |
| 21 | raw_task: Option<NonNull<raw::TaskHeader>>, | 25 | raw_task: Option<NonNull<raw::TaskHeader>>, |
| 22 | phantom: PhantomData<*mut F>, | 26 | phantom: PhantomData<*mut S>, |
| 23 | } | 27 | } |
| 24 | 28 | ||
| 25 | impl<F> SpawnToken<F> { | 29 | impl<S> SpawnToken<S> { |
| 26 | pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { | 30 | pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { |
| 27 | Self { | 31 | Self { |
| 28 | raw_task: Some(raw_task), | 32 | raw_task: Some(raw_task), |
| @@ -38,7 +42,7 @@ impl<F> SpawnToken<F> { | |||
| 38 | } | 42 | } |
| 39 | } | 43 | } |
| 40 | 44 | ||
| 41 | impl<F> Drop for SpawnToken<F> { | 45 | impl<S> Drop for SpawnToken<S> { |
| 42 | fn drop(&mut self) { | 46 | fn drop(&mut self) { |
| 43 | // TODO deallocate the task instead. | 47 | // TODO deallocate the task instead. |
| 44 | panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") | 48 | panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") |
| @@ -97,7 +101,7 @@ impl Spawner { | |||
| 97 | /// Spawn a task into an executor. | 101 | /// Spawn a task into an executor. |
| 98 | /// | 102 | /// |
| 99 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). | 103 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). |
| 100 | pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { | 104 | pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { |
| 101 | let task = token.raw_task; | 105 | let task = token.raw_task; |
| 102 | mem::forget(token); | 106 | mem::forget(token); |
| 103 | 107 | ||
| @@ -119,7 +123,7 @@ impl Spawner { | |||
| 119 | /// # Panics | 123 | /// # Panics |
| 120 | /// | 124 | /// |
| 121 | /// Panics if the spawning fails. | 125 | /// Panics if the spawning fails. |
| 122 | pub fn must_spawn<F>(&self, token: SpawnToken<F>) { | 126 | pub fn must_spawn<S>(&self, token: SpawnToken<S>) { |
| 123 | unwrap!(self.spawn(token)); | 127 | unwrap!(self.spawn(token)); |
| 124 | } | 128 | } |
| 125 | 129 | ||
| @@ -173,7 +177,7 @@ impl SendSpawner { | |||
| 173 | /// Spawn a task into an executor. | 177 | /// Spawn a task into an executor. |
| 174 | /// | 178 | /// |
| 175 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). | 179 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). |
| 176 | pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { | 180 | pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { |
| 177 | let header = token.raw_task; | 181 | let header = token.raw_task; |
| 178 | mem::forget(token); | 182 | mem::forget(token); |
| 179 | 183 | ||
| @@ -191,7 +195,7 @@ impl SendSpawner { | |||
| 191 | /// # Panics | 195 | /// # Panics |
| 192 | /// | 196 | /// |
| 193 | /// Panics if the spawning fails. | 197 | /// Panics if the spawning fails. |
| 194 | pub fn must_spawn<F: Send>(&self, token: SpawnToken<F>) { | 198 | pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) { |
| 195 | unwrap!(self.spawn(token)); | 199 | unwrap!(self.spawn(token)); |
| 196 | } | 200 | } |
| 197 | } | 201 | } |
