aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-04-27 12:21:25 +0000
committerGitHub <[email protected]>2022-04-27 12:21:25 +0000
commit9c283cd44504d6d9d6f9e352e4c7a8d043bd673f (patch)
treeacd5a0bf15ece515c576673cfcfd8a81bd8d91f9
parentdf814f9bbd5cc03f4d60eb8cb19374d23f0a84a0 (diff)
parent1599009a4f5fe1a0f9596b7b27bfd9fd84366377 (diff)
Merge #736
736: executor: allow Send-spawning of tasks if their args are Send. r=Dirbaio a=Dirbaio This allows send-spawning (spawning into an executor in another thread) tasks if their args are Send. Previously this would require the entire future to be Send. -- When send-spawning a task, we construct the future in this thread, and effectively "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, send-spawning should require the future `F` to be `Send`. The problem is this is more restrictive than needed. Once the future is executing, it is never sent to another thread. It is only sent when spawning. It should be enough for the task's arguments to be Send. (and in practice it's super easy to accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.) Luckily, an `async fn` future contains just the args when freshly constructed. So, if the args are Send, it's OK to send a !Send future, as long as we do it before first polling it. Co-authored-by: Dario Nieuwenhuis <[email protected]>
-rw-r--r--embassy-macros/src/macros/task.rs24
-rw-r--r--embassy/src/executor/raw/mod.rs121
-rw-r--r--embassy/src/executor/spawner.rs20
3 files changed, 111 insertions, 54 deletions
diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs
index 396ce18f2..e48de3d63 100644
--- a/embassy-macros/src/macros/task.rs
+++ b/embassy-macros/src/macros/task.rs
@@ -73,26 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result<TokenStream, Toke
73 // in the user's code. 73 // in the user's code.
74 #task_inner 74 #task_inner
75 75
76 #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl ::core::future::Future + 'static> { 76 #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl Sized> {
77 use ::core::future::Future; 77 type Fut = impl ::core::future::Future + 'static;
78 use #embassy_path::executor::SpawnToken; 78 static POOL: #embassy_path::executor::raw::TaskPool<Fut, #pool_size> = #embassy_path::executor::raw::TaskPool::new();
79 use #embassy_path::executor::raw::TaskStorage; 79 unsafe { POOL._spawn_async_fn(move || #task_inner_ident(#(#arg_names,)*)) }
80
81 type Fut = impl Future + 'static;
82
83 #[allow(clippy::declare_interior_mutable_const)]
84 const NEW_TS: TaskStorage<Fut> = TaskStorage::new();
85
86 static POOL: [TaskStorage<Fut>; #pool_size] = [NEW_TS; #pool_size];
87
88 // Opaque type laundering, to obscure its origin!
89 // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope"
90 // https://github.com/rust-lang/rust/issues/96406
91 fn launder_tait(token: SpawnToken<impl Future+'static>) -> SpawnToken<impl Future+'static> {
92 token
93 }
94
95 launder_tait(unsafe { TaskStorage::spawn_pool(&POOL, move || #task_inner_ident(#(#arg_names,)*)) })
96 } 80 }
97 }; 81 };
98 82
diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs
index 3ae52ae31..5cf399cdf 100644
--- a/embassy/src/executor/raw/mod.rs
+++ b/embassy/src/executor/raw/mod.rs
@@ -110,8 +110,8 @@ impl TaskHeader {
110/// Raw storage in which a task can be spawned. 110/// Raw storage in which a task can be spawned.
111/// 111///
112/// This struct holds the necessary memory to spawn one task whose future is `F`. 112/// This struct holds the necessary memory to spawn one task whose future is `F`.
113/// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it 113/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
114/// with [`Task::spawn()`], which will fail if it is already spawned. 114/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
115/// 115///
116/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished 116/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
117/// running. Hence the relevant methods require `&'static self`. It may be reused, however. 117/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
@@ -121,7 +121,7 @@ impl TaskHeader {
121/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. 121/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
122 122
123// repr(C) is needed to guarantee that the Task is located at offset 0 123// repr(C) is needed to guarantee that the Task is located at offset 0
124// This makes it safe to cast between Task and Task pointers. 124// This makes it safe to cast between TaskHeader and TaskStorage pointers.
125#[repr(C)] 125#[repr(C)]
126pub struct TaskStorage<F: Future + 'static> { 126pub struct TaskStorage<F: Future + 'static> {
127 raw: TaskHeader, 127 raw: TaskHeader,
@@ -129,6 +129,9 @@ pub struct TaskStorage<F: Future + 'static> {
129} 129}
130 130
131impl<F: Future + 'static> TaskStorage<F> { 131impl<F: Future + 'static> TaskStorage<F> {
132 #[cfg(feature = "nightly")]
133 const NEW: Self = Self::new();
134
132 /// Create a new TaskStorage, in not-spawned state. 135 /// Create a new TaskStorage, in not-spawned state.
133 #[cfg(feature = "nightly")] 136 #[cfg(feature = "nightly")]
134 pub const fn new() -> Self { 137 pub const fn new() -> Self {
@@ -147,22 +150,6 @@ impl<F: Future + 'static> TaskStorage<F> {
147 } 150 }
148 } 151 }
149 152
150 /// Try to spawn a task in a pool.
151 ///
152 /// See [`Self::spawn()`] for details.
153 ///
154 /// This will loop over the pool and spawn the task in the first storage that
155 /// is currently free. If none is free,
156 pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
157 for task in pool {
158 if task.spawn_allocate() {
159 return unsafe { task.spawn_initialize(future) };
160 }
161 }
162
163 SpawnToken::new_failed()
164 }
165
166 /// Try to spawn the task. 153 /// Try to spawn the task.
167 /// 154 ///
168 /// The `future` closure constructs the future. It's only called if spawning is 155 /// The `future` closure constructs the future. It's only called if spawning is
@@ -172,15 +159,15 @@ impl<F: Future + 'static> TaskStorage<F> {
172 /// 159 ///
173 /// This function will fail if the task is already spawned and has not finished running. 160 /// This function will fail if the task is already spawned and has not finished running.
174 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will 161 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
175 /// cause [`Executor::spawn()`] to return the error. 162 /// cause [`Spawner::spawn()`] to return the error.
176 /// 163 ///
177 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 164 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
178 /// on a different executor. 165 /// on a different executor.
179 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { 166 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
180 if self.spawn_allocate() { 167 if self.spawn_allocate() {
181 unsafe { self.spawn_initialize(future) } 168 unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }
182 } else { 169 } else {
183 SpawnToken::new_failed() 170 SpawnToken::<F>::new_failed()
184 } 171 }
185 } 172 }
186 173
@@ -192,12 +179,11 @@ impl<F: Future + 'static> TaskStorage<F> {
192 .is_ok() 179 .is_ok()
193 } 180 }
194 181
195 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { 182 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
196 // Initialize the task 183 // Initialize the task
197 self.raw.poll_fn.write(Self::poll); 184 self.raw.poll_fn.write(Self::poll);
198 self.future.write(future()); 185 self.future.write(future());
199 186 NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader)
200 SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _))
201 } 187 }
202 188
203 unsafe fn poll(p: NonNull<TaskHeader>) { 189 unsafe fn poll(p: NonNull<TaskHeader>) {
@@ -222,6 +208,89 @@ impl<F: Future + 'static> TaskStorage<F> {
222 208
223unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 209unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
224 210
211/// Raw storage that can hold up to N tasks of the same type.
212///
213/// This is essentially a `[TaskStorage<F>; N]`.
214#[cfg(feature = "nightly")]
215pub struct TaskPool<F: Future + 'static, const N: usize> {
216 pool: [TaskStorage<F>; N],
217}
218
219#[cfg(feature = "nightly")]
220impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
221 /// Create a new TaskPool, with all tasks in non-spawned state.
222 pub const fn new() -> Self {
223 Self {
224 pool: [TaskStorage::NEW; N],
225 }
226 }
227
228 /// Try to spawn a task in the pool.
229 ///
230 /// See [`TaskStorage::spawn()`] for details.
231 ///
232 /// This will loop over the pool and spawn the task in the first storage that
233 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
234 /// which will cause [`Spawner::spawn()`] to return the error.
235 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
236 for task in &self.pool {
237 if task.spawn_allocate() {
238 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
239 }
240 }
241
242 SpawnToken::<F>::new_failed()
243 }
244
245 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
246 /// the future is !Send.
247 ///
248 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
249 /// by the Embassy macros ONLY.
250 ///
251 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
252 /// is an `async fn`, NOT a hand-written `Future`.
253 #[doc(hidden)]
254 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
255 where
256 FutFn: FnOnce() -> F,
257 {
258 // When send-spawning a task, we construct the future in this thread, and effectively
259 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
260 // send-spawning should require the future `F` to be `Send`.
261 //
262 // The problem is this is more restrictive than needed. Once the future is executing,
263 // it is never sent to another thread. It is only sent when spawning. It should be
264 // enough for the task's arguments to be Send. (and in practice it's super easy to
265 // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
266 //
267 // We can do it by sending the task args and constructing the future in the executor thread
268 // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
269 // of the args.
270 //
271 // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
272 // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
273 //
274 // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
275 // but it's possible it'll be guaranteed in the future. See zulip thread:
276 // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
277 //
278 // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
279 // This is why we return `SpawnToken<FutFn>` below.
280 //
281 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
282 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
283
284 for task in &self.pool {
285 if task.spawn_allocate() {
286 return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
287 }
288 }
289
290 SpawnToken::<FutFn>::new_failed()
291 }
292}
293
225/// Raw executor. 294/// Raw executor.
226/// 295///
227/// This is the core of the Embassy executor. It is low-level, requiring manual 296/// This is the core of the Embassy executor. It is low-level, requiring manual
diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs
index e6770e299..73c1f786f 100644
--- a/embassy/src/executor/spawner.rs
+++ b/embassy/src/executor/spawner.rs
@@ -12,17 +12,21 @@ use super::raw;
12/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must 12/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
13/// then spawn it into an executor, typically with [`Spawner::spawn()`]. 13/// then spawn it into an executor, typically with [`Spawner::spawn()`].
14/// 14///
15/// The generic parameter `S` determines whether the task can be spawned in executors
16/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`].
17/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`].
18///
15/// # Panics 19/// # Panics
16/// 20///
17/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. 21/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
18/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. 22/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
19#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] 23#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
20pub struct SpawnToken<F> { 24pub struct SpawnToken<S> {
21 raw_task: Option<NonNull<raw::TaskHeader>>, 25 raw_task: Option<NonNull<raw::TaskHeader>>,
22 phantom: PhantomData<*mut F>, 26 phantom: PhantomData<*mut S>,
23} 27}
24 28
25impl<F> SpawnToken<F> { 29impl<S> SpawnToken<S> {
26 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { 30 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
27 Self { 31 Self {
28 raw_task: Some(raw_task), 32 raw_task: Some(raw_task),
@@ -38,7 +42,7 @@ impl<F> SpawnToken<F> {
38 } 42 }
39} 43}
40 44
41impl<F> Drop for SpawnToken<F> { 45impl<S> Drop for SpawnToken<S> {
42 fn drop(&mut self) { 46 fn drop(&mut self) {
43 // TODO deallocate the task instead. 47 // TODO deallocate the task instead.
44 panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") 48 panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
@@ -97,7 +101,7 @@ impl Spawner {
97 /// Spawn a task into an executor. 101 /// Spawn a task into an executor.
98 /// 102 ///
99 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). 103 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
100 pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { 104 pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
101 let task = token.raw_task; 105 let task = token.raw_task;
102 mem::forget(token); 106 mem::forget(token);
103 107
@@ -119,7 +123,7 @@ impl Spawner {
119 /// # Panics 123 /// # Panics
120 /// 124 ///
121 /// Panics if the spawning fails. 125 /// Panics if the spawning fails.
122 pub fn must_spawn<F>(&self, token: SpawnToken<F>) { 126 pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
123 unwrap!(self.spawn(token)); 127 unwrap!(self.spawn(token));
124 } 128 }
125 129
@@ -173,7 +177,7 @@ impl SendSpawner {
173 /// Spawn a task into an executor. 177 /// Spawn a task into an executor.
174 /// 178 ///
175 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). 179 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
176 pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { 180 pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
177 let header = token.raw_task; 181 let header = token.raw_task;
178 mem::forget(token); 182 mem::forget(token);
179 183
@@ -191,7 +195,7 @@ impl SendSpawner {
191 /// # Panics 195 /// # Panics
192 /// 196 ///
193 /// Panics if the spawning fails. 197 /// Panics if the spawning fails.
194 pub fn must_spawn<F: Send>(&self, token: SpawnToken<F>) { 198 pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
195 unwrap!(self.spawn(token)); 199 unwrap!(self.spawn(token));
196 } 200 }
197} 201}