aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-macros/src/macros/task.rs24
-rw-r--r--embassy/src/executor/raw/mod.rs121
-rw-r--r--embassy/src/executor/spawner.rs20
3 files changed, 111 insertions, 54 deletions
diff --git a/embassy-macros/src/macros/task.rs b/embassy-macros/src/macros/task.rs
index 396ce18f2..e48de3d63 100644
--- a/embassy-macros/src/macros/task.rs
+++ b/embassy-macros/src/macros/task.rs
@@ -73,26 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result<TokenStream, Toke
73 // in the user's code. 73 // in the user's code.
74 #task_inner 74 #task_inner
75 75
76 #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl ::core::future::Future + 'static> { 76 #visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl Sized> {
77 use ::core::future::Future; 77 type Fut = impl ::core::future::Future + 'static;
78 use #embassy_path::executor::SpawnToken; 78 static POOL: #embassy_path::executor::raw::TaskPool<Fut, #pool_size> = #embassy_path::executor::raw::TaskPool::new();
79 use #embassy_path::executor::raw::TaskStorage; 79 unsafe { POOL._spawn_async_fn(move || #task_inner_ident(#(#arg_names,)*)) }
80
81 type Fut = impl Future + 'static;
82
83 #[allow(clippy::declare_interior_mutable_const)]
84 const NEW_TS: TaskStorage<Fut> = TaskStorage::new();
85
86 static POOL: [TaskStorage<Fut>; #pool_size] = [NEW_TS; #pool_size];
87
88 // Opaque type laundering, to obscure its origin!
89 // Workaround for "opaque type's hidden type cannot be another opaque type from the same scope"
90 // https://github.com/rust-lang/rust/issues/96406
91 fn launder_tait(token: SpawnToken<impl Future+'static>) -> SpawnToken<impl Future+'static> {
92 token
93 }
94
95 launder_tait(unsafe { TaskStorage::spawn_pool(&POOL, move || #task_inner_ident(#(#arg_names,)*)) })
96 } 80 }
97 }; 81 };
98 82
diff --git a/embassy/src/executor/raw/mod.rs b/embassy/src/executor/raw/mod.rs
index 3ae52ae31..5cf399cdf 100644
--- a/embassy/src/executor/raw/mod.rs
+++ b/embassy/src/executor/raw/mod.rs
@@ -110,8 +110,8 @@ impl TaskHeader {
110/// Raw storage in which a task can be spawned. 110/// Raw storage in which a task can be spawned.
111/// 111///
112/// This struct holds the necessary memory to spawn one task whose future is `F`. 112/// This struct holds the necessary memory to spawn one task whose future is `F`.
113/// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it 113/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
114/// with [`Task::spawn()`], which will fail if it is already spawned. 114/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
115/// 115///
116/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished 116/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
117/// running. Hence the relevant methods require `&'static self`. It may be reused, however. 117/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
@@ -121,7 +121,7 @@ impl TaskHeader {
121/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc. 121/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
122 122
123// repr(C) is needed to guarantee that the Task is located at offset 0 123// repr(C) is needed to guarantee that the Task is located at offset 0
124// This makes it safe to cast between Task and Task pointers. 124// This makes it safe to cast between TaskHeader and TaskStorage pointers.
125#[repr(C)] 125#[repr(C)]
126pub struct TaskStorage<F: Future + 'static> { 126pub struct TaskStorage<F: Future + 'static> {
127 raw: TaskHeader, 127 raw: TaskHeader,
@@ -129,6 +129,9 @@ pub struct TaskStorage<F: Future + 'static> {
129} 129}
130 130
131impl<F: Future + 'static> TaskStorage<F> { 131impl<F: Future + 'static> TaskStorage<F> {
132 #[cfg(feature = "nightly")]
133 const NEW: Self = Self::new();
134
132 /// Create a new TaskStorage, in not-spawned state. 135 /// Create a new TaskStorage, in not-spawned state.
133 #[cfg(feature = "nightly")] 136 #[cfg(feature = "nightly")]
134 pub const fn new() -> Self { 137 pub const fn new() -> Self {
@@ -147,22 +150,6 @@ impl<F: Future + 'static> TaskStorage<F> {
147 } 150 }
148 } 151 }
149 152
150 /// Try to spawn a task in a pool.
151 ///
152 /// See [`Self::spawn()`] for details.
153 ///
154 /// This will loop over the pool and spawn the task in the first storage that
155 /// is currently free. If none is free,
156 pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
157 for task in pool {
158 if task.spawn_allocate() {
159 return unsafe { task.spawn_initialize(future) };
160 }
161 }
162
163 SpawnToken::new_failed()
164 }
165
166 /// Try to spawn the task. 153 /// Try to spawn the task.
167 /// 154 ///
168 /// The `future` closure constructs the future. It's only called if spawning is 155 /// The `future` closure constructs the future. It's only called if spawning is
@@ -172,15 +159,15 @@ impl<F: Future + 'static> TaskStorage<F> {
172 /// 159 ///
173 /// This function will fail if the task is already spawned and has not finished running. 160 /// This function will fail if the task is already spawned and has not finished running.
174 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will 161 /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
175 /// cause [`Executor::spawn()`] to return the error. 162 /// cause [`Spawner::spawn()`] to return the error.
176 /// 163 ///
177 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it 164 /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
178 /// on a different executor. 165 /// on a different executor.
179 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { 166 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
180 if self.spawn_allocate() { 167 if self.spawn_allocate() {
181 unsafe { self.spawn_initialize(future) } 168 unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }
182 } else { 169 } else {
183 SpawnToken::new_failed() 170 SpawnToken::<F>::new_failed()
184 } 171 }
185 } 172 }
186 173
@@ -192,12 +179,11 @@ impl<F: Future + 'static> TaskStorage<F> {
192 .is_ok() 179 .is_ok()
193 } 180 }
194 181
195 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> { 182 unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
196 // Initialize the task 183 // Initialize the task
197 self.raw.poll_fn.write(Self::poll); 184 self.raw.poll_fn.write(Self::poll);
198 self.future.write(future()); 185 self.future.write(future());
199 186 NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader)
200 SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _))
201 } 187 }
202 188
203 unsafe fn poll(p: NonNull<TaskHeader>) { 189 unsafe fn poll(p: NonNull<TaskHeader>) {
@@ -222,6 +208,89 @@ impl<F: Future + 'static> TaskStorage<F> {
222 208
223unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} 209unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
224 210
211/// Raw storage that can hold up to N tasks of the same type.
212///
213/// This is essentially a `[TaskStorage<F>; N]`.
214#[cfg(feature = "nightly")]
215pub struct TaskPool<F: Future + 'static, const N: usize> {
216 pool: [TaskStorage<F>; N],
217}
218
219#[cfg(feature = "nightly")]
220impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
221 /// Create a new TaskPool, with all tasks in non-spawned state.
222 pub const fn new() -> Self {
223 Self {
224 pool: [TaskStorage::NEW; N],
225 }
226 }
227
228 /// Try to spawn a task in the pool.
229 ///
230 /// See [`TaskStorage::spawn()`] for details.
231 ///
232 /// This will loop over the pool and spawn the task in the first storage that
233 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
234 /// which will cause [`Spawner::spawn()`] to return the error.
235 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
236 for task in &self.pool {
237 if task.spawn_allocate() {
238 return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
239 }
240 }
241
242 SpawnToken::<F>::new_failed()
243 }
244
245 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
246 /// the future is !Send.
247 ///
248 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
249 /// by the Embassy macros ONLY.
250 ///
251 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
252 /// is an `async fn`, NOT a hand-written `Future`.
253 #[doc(hidden)]
254 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
255 where
256 FutFn: FnOnce() -> F,
257 {
258 // When send-spawning a task, we construct the future in this thread, and effectively
259 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
260 // send-spawning should require the future `F` to be `Send`.
261 //
262 // The problem is this is more restrictive than needed. Once the future is executing,
263 // it is never sent to another thread. It is only sent when spawning. It should be
264 // enough for the task's arguments to be Send. (and in practice it's super easy to
265 // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
266 //
267 // We can do it by sending the task args and constructing the future in the executor thread
268 // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
269 // of the args.
270 //
271 // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
272 // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
273 //
274 // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
275 // but it's possible it'll be guaranteed in the future. See zulip thread:
276 // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
277 //
278 // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
279 // This is why we return `SpawnToken<FutFn>` below.
280 //
281 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
282 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
283
284 for task in &self.pool {
285 if task.spawn_allocate() {
286 return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
287 }
288 }
289
290 SpawnToken::<FutFn>::new_failed()
291 }
292}
293
225/// Raw executor. 294/// Raw executor.
226/// 295///
227/// This is the core of the Embassy executor. It is low-level, requiring manual 296/// This is the core of the Embassy executor. It is low-level, requiring manual
diff --git a/embassy/src/executor/spawner.rs b/embassy/src/executor/spawner.rs
index e6770e299..73c1f786f 100644
--- a/embassy/src/executor/spawner.rs
+++ b/embassy/src/executor/spawner.rs
@@ -12,17 +12,21 @@ use super::raw;
12/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must 12/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
13/// then spawn it into an executor, typically with [`Spawner::spawn()`]. 13/// then spawn it into an executor, typically with [`Spawner::spawn()`].
14/// 14///
15/// The generic parameter `S` determines whether the task can be spawned in executors
16/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`].
17/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`].
18///
15/// # Panics 19/// # Panics
16/// 20///
17/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way. 21/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
18/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it. 22/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
19#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"] 23#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
20pub struct SpawnToken<F> { 24pub struct SpawnToken<S> {
21 raw_task: Option<NonNull<raw::TaskHeader>>, 25 raw_task: Option<NonNull<raw::TaskHeader>>,
22 phantom: PhantomData<*mut F>, 26 phantom: PhantomData<*mut S>,
23} 27}
24 28
25impl<F> SpawnToken<F> { 29impl<S> SpawnToken<S> {
26 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self { 30 pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
27 Self { 31 Self {
28 raw_task: Some(raw_task), 32 raw_task: Some(raw_task),
@@ -38,7 +42,7 @@ impl<F> SpawnToken<F> {
38 } 42 }
39} 43}
40 44
41impl<F> Drop for SpawnToken<F> { 45impl<S> Drop for SpawnToken<S> {
42 fn drop(&mut self) { 46 fn drop(&mut self) {
43 // TODO deallocate the task instead. 47 // TODO deallocate the task instead.
44 panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()") 48 panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
@@ -97,7 +101,7 @@ impl Spawner {
97 /// Spawn a task into an executor. 101 /// Spawn a task into an executor.
98 /// 102 ///
99 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). 103 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
100 pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { 104 pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
101 let task = token.raw_task; 105 let task = token.raw_task;
102 mem::forget(token); 106 mem::forget(token);
103 107
@@ -119,7 +123,7 @@ impl Spawner {
119 /// # Panics 123 /// # Panics
120 /// 124 ///
121 /// Panics if the spawning fails. 125 /// Panics if the spawning fails.
122 pub fn must_spawn<F>(&self, token: SpawnToken<F>) { 126 pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
123 unwrap!(self.spawn(token)); 127 unwrap!(self.spawn(token));
124 } 128 }
125 129
@@ -173,7 +177,7 @@ impl SendSpawner {
173 /// Spawn a task into an executor. 177 /// Spawn a task into an executor.
174 /// 178 ///
175 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`). 179 /// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
176 pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> { 180 pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
177 let header = token.raw_task; 181 let header = token.raw_task;
178 mem::forget(token); 182 mem::forget(token);
179 183
@@ -191,7 +195,7 @@ impl SendSpawner {
191 /// # Panics 195 /// # Panics
192 /// 196 ///
193 /// Panics if the spawning fails. 197 /// Panics if the spawning fails.
194 pub fn must_spawn<F: Send>(&self, token: SpawnToken<F>) { 198 pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
195 unwrap!(self.spawn(token)); 199 unwrap!(self.spawn(token));
196 } 200 }
197} 201}