diff options
| author | Dario Nieuwenhuis <[email protected]> | 2022-04-27 04:27:42 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2022-04-27 04:56:41 +0200 |
| commit | 6f6c16f44924de4d71d0e5e3acc0908f2dd474e6 (patch) | |
| tree | 563881c926fd9c0fcc104d119d9c8a26d8c319d9 | |
| parent | 293f54d13406850d24d1226eb77989f4fa8db9f4 (diff) | |
executor: make send-spawning only require the task args to be Send, not the whole future.
| -rw-r--r-- | embassy-macros/src/macros/task.rs | 21 | ||||
| -rw-r--r-- | embassy/src/executor/raw/mod.rs | 41 | ||||
| -rw-r--r-- | embassy/src/executor/spawner.rs | 20 |
3 files changed, 49 insertions, 33 deletions
diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs index c450982c9..96932d77c 100644 --- a/embassy-macros/src/macros/task.rs +++ b/embassy-macros/src/macros/task.rs | |||
| @@ -73,23 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result<TokenStream, Toke | |||
| 73 | // in the user's code. | 73 | // in the user's code. |
| 74 | #task_inner | 74 | #task_inner |
| 75 | 75 | ||
| 76 | #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl ::core::future::Future + 'static> { | 76 | #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl Sized> { |
| 77 | use ::core::future::Future; | 77 | type Fut = impl ::core::future::Future + 'static; |
| 78 | use #embassy_path::executor::SpawnToken; | 78 | static POOL: #embassy_path::executor::raw::TaskPool<Fut, #pool_size> = #embassy_path::executor::raw::TaskPool::new(); |
| 79 | use #embassy_path::executor::raw::TaskPool; | 79 | POOL.spawn(move || #task_inner_ident(#(#arg_names,)*)) |
| 80 | |||
| 81 | type Fut = impl Future + 'static; | ||
| 82 | |||
| 83 | static POOL: TaskPool<Fut, #pool_size> = TaskPool::new(); | ||
| 84 | |||
| 85 | // Opaque type laundering, to obscure its origin! | ||
| 86 | // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope" | ||
| 87 | // https://github.com/rust-lang/rust/issues/96406 | ||
| 88 | fn launder_tait(token: SpawnToken<impl Future+'static>) -> SpawnToken<impl Future+'static> { | ||
| 89 | token | ||
| 90 | } | ||
| 91 | |||
| 92 | launder_tait(POOL.spawn(move || #task_inner_ident(#(#arg_names,)*))) | ||
| 93 | } | 80 | } |
| 94 | }; | 81 | }; |
| 95 | 82 | ||
diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs index 5af35d868..6b14b8e8c 100644 --- a/embassy/src/executor/raw/mod.rs +++ b/embassy/src/executor/raw/mod.rs | |||
| @@ -110,8 +110,8 @@ impl TaskHeader { | |||
| 110 | /// Raw storage in which a task can be spawned. | 110 | /// Raw storage in which a task can be spawned. |
| 111 | /// | 111 | /// |
| 112 | /// This struct holds the necessary memory to spawn one task whose future is `F`. | 112 | /// This struct holds the necessary memory to spawn one task whose future is `F`. |
| 113 | /// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it | 113 | /// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You |
| 114 | /// with [`Task::spawn()`], which will fail if it is already spawned. | 114 | /// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned. |
| 115 | /// | 115 | /// |
| 116 | /// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished | 116 | /// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished |
| 117 | /// running. Hence the relevant methods require `&'static self`. It may be reused, however. | 117 | /// running. Hence the relevant methods require `&'static self`. It may be reused, however. |
| @@ -159,11 +159,11 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 159 | /// | 159 | /// |
| 160 | /// This function will fail if the task is already spawned and has not finished running. | 160 | /// This function will fail if the task is already spawned and has not finished running. |
| 161 | /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will | 161 | /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will |
| 162 | /// cause [`Executor::spawn()`] to return the error. | 162 | /// cause [`Spawner::spawn()`] to return the error. |
| 163 | /// | 163 | /// |
| 164 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it | 164 | /// Once the task has finished running, you may spawn it again. It is allowed to spawn it |
| 165 | /// on a different executor. | 165 | /// on a different executor. |
| 166 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { | 166 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 167 | if self.spawn_allocate() { | 167 | if self.spawn_allocate() { |
| 168 | unsafe { self.spawn_initialize(future) } | 168 | unsafe { self.spawn_initialize(future) } |
| 169 | } else { | 169 | } else { |
| @@ -179,12 +179,37 @@ impl<F: Future + 'static> TaskStorage<F> { | |||
| 179 | .is_ok() | 179 | .is_ok() |
| 180 | } | 180 | } |
| 181 | 181 | ||
| 182 | unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { | 182 | unsafe fn spawn_initialize<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> |
| 183 | where | ||
| 184 | FutFn: FnOnce() -> F, | ||
| 185 | { | ||
| 183 | // Initialize the task | 186 | // Initialize the task |
| 184 | self.raw.poll_fn.write(Self::poll); | 187 | self.raw.poll_fn.write(Self::poll); |
| 185 | self.future.write(future()); | 188 | self.future.write(future()); |
| 186 | 189 | ||
| 187 | SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) | 190 | // When send-spawning a task, we construct the future in this thread, and effectively |
| 191 | // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, | ||
| 192 | // send-spawning should require the future `F` to be `Send`. | ||
| 193 | // | ||
| 194 | // The problem is this is more restrictive than needed. Once the future is executing, | ||
| 195 | // it is never sent to another thread. It is only sent when spawning. It should be | ||
| 196 | // enough for the task's arguments to be Send. (and in practice it's super easy to | ||
| 197 | // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) | ||
| 198 | // | ||
| 199 | // We can do it by sending the task args and constructing the future in the executor thread | ||
| 200 | // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy | ||
| 201 | // of the args. | ||
| 202 | // | ||
| 203 | // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the | ||
| 204 | // args are Send, it's OK to send a !Send future, as long as we do it before first polling it. | ||
| 205 | // | ||
| 206 | // (Note: this is how the generators are implemented today, it's not officially guaranteed yet, | ||
| 207 | // but it's possible it'll be guaranteed in the future. See zulip thread: | ||
| 208 | // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures ) | ||
| 209 | // | ||
| 210 | // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned. | ||
| 211 | // This is why we return `SpawnToken<FutFn>` below. | ||
| 212 | SpawnToken::<FutFn>::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _)) | ||
| 188 | } | 213 | } |
| 189 | 214 | ||
| 190 | unsafe fn poll(p: NonNull<TaskHeader>) { | 215 | unsafe fn poll(p: NonNull<TaskHeader>) { |
| @@ -232,8 +257,8 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> { | |||
| 232 | /// | 257 | /// |
| 233 | /// This will loop over the pool and spawn the task in the first storage that | 258 | /// This will loop over the pool and spawn the task in the first storage that |
| 234 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, | 259 | /// is currently free. If none is free, a "poisoned" SpawnToken is returned, |
| 235 | /// which will cause [`Executor::spawn()`] to return the error. | 260 | /// which will cause [`Spawner::spawn()`] to return the error. |
| 236 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { | 261 | pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { |
| 237 | for task in &self.pool { | 262 | for task in &self.pool { |
| 238 | if task.spawn_allocate() { | 263 | if task.spawn_allocate() { |
| 239 | return unsafe { task.spawn_initialize(future) }; | 264 | return unsafe { task.spawn_initialize(future) }; |
diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs index e6770e299..73c1f786f 100644 --- a/embassy/src/executor/spawner.rs +++ b/embassy/src/executor/spawner.rs | |||
| @@ -12,17 +12,21 @@ use super::raw; | |||
| 12 | /// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must | 12 | /// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must |
| 13 | /// then spawn it into an executor, typically with [`Spawner::spawn()`]. | 13 | /// then spawn it into an executor, typically with [`Spawner::spawn()`]. |
| 14 | /// | 14 | /// |
| 15 | /// The generic parameter `S` determines whether the task can be spawned in executors | ||
| 16 | /// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`]. | ||
| 17 | /// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`]. | ||
| 18 | /// | ||
| 15 | /// # Panics | 19 | /// # Panics |
| 16 | /// | 20 | /// |
| 17 | /// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. | 21 | /// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. |
| 18 | /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. | 22 | /// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. |
| 19 | #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] | 23 | #[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] |
| 20 | pub struct SpawnToken<F> { | 24 | pub struct SpawnToken<S> { |
| 21 | raw_task: Option<NonNull<raw::TaskHeader>>, | 25 | raw_task: Option<NonNull<raw::TaskHeader>>, |
| 22 | phantom: PhantomData<*mut F>, | 26 | phantom: PhantomData<*mut S>, |
| 23 | } | 27 | } |
| 24 | 28 | ||
| 25 | impl<F> SpawnToken<F> { | 29 | impl<S> SpawnToken<S> { |
| 26 | pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { | 30 | pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { |
| 27 | Self { | 31 | Self { |
| 28 | raw_task: Some(raw_task), | 32 | raw_task: Some(raw_task), |
| @@ -38,7 +42,7 @@ impl<F> SpawnToken<F> { | |||
| 38 | } | 42 | } |
| 39 | } | 43 | } |
| 40 | 44 | ||
| 41 | impl<F> Drop for SpawnToken<F> { | 45 | impl<S> Drop for SpawnToken<S> { |
| 42 | fn drop(&mut self) { | 46 | fn drop(&mut self) { |
| 43 | // TODO deallocate the task instead. | 47 | // TODO deallocate the task instead. |
| 44 | panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") | 48 | panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") |
| @@ -97,7 +101,7 @@ impl Spawner { | |||
| 97 | /// Spawn a task into an executor. | 101 | /// Spawn a task into an executor. |
| 98 | /// | 102 | /// |
| 99 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). | 103 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). |
| 100 | pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { | 104 | pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { |
| 101 | let task = token.raw_task; | 105 | let task = token.raw_task; |
| 102 | mem::forget(token); | 106 | mem::forget(token); |
| 103 | 107 | ||
| @@ -119,7 +123,7 @@ impl Spawner { | |||
| 119 | /// # Panics | 123 | /// # Panics |
| 120 | /// | 124 | /// |
| 121 | /// Panics if the spawning fails. | 125 | /// Panics if the spawning fails. |
| 122 | pub fn must_spawn<F>(&self, token: SpawnToken<F>) { | 126 | pub fn must_spawn<S>(&self, token: SpawnToken<S>) { |
| 123 | unwrap!(self.spawn(token)); | 127 | unwrap!(self.spawn(token)); |
| 124 | } | 128 | } |
| 125 | 129 | ||
| @@ -173,7 +177,7 @@ impl SendSpawner { | |||
| 173 | /// Spawn a task into an executor. | 177 | /// Spawn a task into an executor. |
| 174 | /// | 178 | /// |
| 175 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). | 179 | /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). |
| 176 | pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { | 180 | pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { |
| 177 | let header = token.raw_task; | 181 | let header = token.raw_task; |
| 178 | mem::forget(token); | 182 | mem::forget(token); |
| 179 | 183 | ||
| @@ -191,7 +195,7 @@ impl SendSpawner { | |||
| 191 | /// # Panics | 195 | /// # Panics |
| 192 | /// | 196 | /// |
| 193 | /// Panics if the spawning fails. | 197 | /// Panics if the spawning fails. |
| 194 | pub fn must_spawn<F: Send>(&self, token: SpawnToken<F>) { | 198 | pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) { |
| 195 | unwrap!(self.spawn(token)); | 199 | unwrap!(self.spawn(token)); |
| 196 | } | 200 | } |
| 197 | } | 201 | } |
