aboutsummaryrefslogtreecommitdiff
path: root/embassy-executor/src/raw
diff options
context:
space:
mode:
authorxoviat <[email protected]>2023-08-22 16:58:43 -0500
committerxoviat <[email protected]>2023-08-22 16:58:43 -0500
commit7d6edd7b15d2209ac0b96ff8814ecefce2964e36 (patch)
tree7988a9b46855ac187a92cbfc5f38cbbbff695e8d /embassy-executor/src/raw
parent9e3266b74554ea397bdd963ff12a26aa51e77b63 (diff)
parent7bff2ebab3b36cc922505e9db961840109c509ed (diff)
Merge branch 'main' of https://github.com/embassy-rs/embassy into rtc-lp
Diffstat (limited to 'embassy-executor/src/raw')
-rw-r--r--embassy-executor/src/raw/mod.rs228
1 files changed, 123 insertions, 105 deletions
diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs
index f3760f589..c1d82e18a 100644
--- a/embassy-executor/src/raw/mod.rs
+++ b/embassy-executor/src/raw/mod.rs
@@ -147,10 +147,7 @@ impl<F: Future + 'static> TaskStorage<F> {
147 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { 147 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
148 let task = AvailableTask::claim(self); 148 let task = AvailableTask::claim(self);
149 match task { 149 match task {
150 Some(task) => { 150 Some(task) => task.initialize(future),
151 let task = task.initialize(future);
152 unsafe { SpawnToken::<F>::new(task) }
153 }
154 None => SpawnToken::new_failed(), 151 None => SpawnToken::new_failed(),
155 } 152 }
156 } 153 }
@@ -186,12 +183,16 @@ impl<F: Future + 'static> TaskStorage<F> {
186 } 183 }
187} 184}
188 185
189struct AvailableTask<F: Future + 'static> { 186/// An uninitialized [`TaskStorage`].
187pub struct AvailableTask<F: Future + 'static> {
190 task: &'static TaskStorage<F>, 188 task: &'static TaskStorage<F>,
191} 189}
192 190
193impl<F: Future + 'static> AvailableTask<F> { 191impl<F: Future + 'static> AvailableTask<F> {
194 fn claim(task: &'static TaskStorage<F>) -> Option<Self> { 192 /// Try to claim a [`TaskStorage`].
193 ///
194 /// This function returns `None` if a task has already been spawned and has not finished running.
195 pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
195 task.raw 196 task.raw
196 .state 197 .state
197 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) 198 .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire)
@@ -199,61 +200,30 @@ impl<F: Future + 'static> AvailableTask<F> {
199 .map(|_| Self { task }) 200 .map(|_| Self { task })
200 } 201 }
201 202
202 fn initialize(self, future: impl FnOnce() -> F) -> TaskRef { 203 fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
203 unsafe { 204 unsafe {
204 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); 205 self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
205 self.task.future.write(future()); 206 self.task.future.write(future());
206 }
207 TaskRef::new(self.task)
208 }
209}
210 207
211/// Raw storage that can hold up to N tasks of the same type. 208 let task = TaskRef::new(self.task);
212///
213/// This is essentially a `[TaskStorage<F>; N]`.
214pub struct TaskPool<F: Future + 'static, const N: usize> {
215 pool: [TaskStorage<F>; N],
216}
217 209
218impl<F: Future + 'static, const N: usize> TaskPool<F, N> { 210 SpawnToken::new(task)
219 /// Create a new TaskPool, with all tasks in non-spawned state.
220 pub const fn new() -> Self {
221 Self {
222 pool: [TaskStorage::NEW; N],
223 } 211 }
224 } 212 }
225 213
226 /// Try to spawn a task in the pool. 214 /// Initialize the [`TaskStorage`] to run the given future.
227 /// 215 pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
228 /// See [`TaskStorage::spawn()`] for details. 216 self.initialize_impl::<F>(future)
229 ///
230 /// This will loop over the pool and spawn the task in the first storage that
231 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
232 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
233 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
234 let task = self.pool.iter().find_map(AvailableTask::claim);
235 match task {
236 Some(task) => {
237 let task = task.initialize(future);
238 unsafe { SpawnToken::<F>::new(task) }
239 }
240 None => SpawnToken::new_failed(),
241 }
242 } 217 }
243 218
244 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if 219 /// Initialize the [`TaskStorage`] to run the given future.
245 /// the future is !Send.
246 /// 220 ///
247 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used 221 /// # Safety
248 /// by the Embassy macros ONLY.
249 /// 222 ///
250 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn` 223 /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
251 /// is an `async fn`, NOT a hand-written `Future`. 224 /// is an `async fn`, NOT a hand-written `Future`.
252 #[doc(hidden)] 225 #[doc(hidden)]
253 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized> 226 pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
254 where
255 FutFn: FnOnce() -> F,
256 {
257 // When send-spawning a task, we construct the future in this thread, and effectively 227 // When send-spawning a task, we construct the future in this thread, and effectively
258 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory, 228 // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
259 // send-spawning should require the future `F` to be `Send`. 229 // send-spawning should require the future `F` to be `Send`.
@@ -279,66 +249,73 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
279 // 249 //
280 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly 250 // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
281 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`. 251 // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
282 252 self.initialize_impl::<FutFn>(future)
283 let task = self.pool.iter().find_map(AvailableTask::claim);
284 match task {
285 Some(task) => {
286 let task = task.initialize(future);
287 unsafe { SpawnToken::<FutFn>::new(task) }
288 }
289 None => SpawnToken::new_failed(),
290 }
291 } 253 }
292} 254}
293 255
294#[derive(Clone, Copy)] 256/// Raw storage that can hold up to N tasks of the same type.
295pub(crate) enum PenderInner { 257///
296 #[cfg(feature = "executor-thread")] 258/// This is essentially a `[TaskStorage<F>; N]`.
297 Thread(crate::arch::ThreadPender), 259pub struct TaskPool<F: Future + 'static, const N: usize> {
298 #[cfg(feature = "executor-interrupt")] 260 pool: [TaskStorage<F>; N],
299 Interrupt(crate::arch::InterruptPender),
300 #[cfg(feature = "pender-callback")]
301 Callback { func: fn(*mut ()), context: *mut () },
302} 261}
303 262
304unsafe impl Send for PenderInner {} 263impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
305unsafe impl Sync for PenderInner {} 264 /// Create a new TaskPool, with all tasks in non-spawned state.
265 pub const fn new() -> Self {
266 Self {
267 pool: [TaskStorage::NEW; N],
268 }
269 }
306 270
307/// Platform/architecture-specific action executed when an executor has pending work. 271 fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> {
308/// 272 match self.pool.iter().find_map(AvailableTask::claim) {
309/// When a task within an executor is woken, the `Pender` is called. This does a 273 Some(task) => task.initialize_impl::<T>(future),
310/// platform/architecture-specific action to signal there is pending work in the executor. 274 None => SpawnToken::new_failed(),
311/// When this happens, you must arrange for [`Executor::poll`] to be called. 275 }
312/// 276 }
313/// You can think of it as a waker, but for the whole executor.
314pub struct Pender(pub(crate) PenderInner);
315 277
316impl Pender { 278 /// Try to spawn a task in the pool.
317 /// Create a `Pender` that will call an arbitrary function pointer. 279 ///
280 /// See [`TaskStorage::spawn()`] for details.
281 ///
282 /// This will loop over the pool and spawn the task in the first storage that
283 /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
284 /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
285 pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
286 self.spawn_impl::<F>(future)
287 }
288
289 /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
290 /// the future is !Send.
318 /// 291 ///
319 /// # Arguments 292 /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
293 /// by the Embassy macros ONLY.
320 /// 294 ///
321 /// - `func`: The function pointer to call. 295 /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
322 /// - `context`: Opaque context pointer, that will be passed to the function pointer. 296 /// is an `async fn`, NOT a hand-written `Future`.
323 #[cfg(feature = "pender-callback")] 297 #[doc(hidden)]
324 pub fn new_from_callback(func: fn(*mut ()), context: *mut ()) -> Self { 298 pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
325 Self(PenderInner::Callback { 299 where
326 func, 300 FutFn: FnOnce() -> F,
327 context: context.into(), 301 {
328 }) 302 // See the comment in AvailableTask::__initialize_async_fn for explanation.
303 self.spawn_impl::<FutFn>(future)
329 } 304 }
330} 305}
331 306
307#[derive(Clone, Copy)]
308pub(crate) struct Pender(*mut ());
309
310unsafe impl Send for Pender {}
311unsafe impl Sync for Pender {}
312
332impl Pender { 313impl Pender {
333 pub(crate) fn pend(&self) { 314 pub(crate) fn pend(self) {
334 match self.0 { 315 extern "Rust" {
335 #[cfg(feature = "executor-thread")] 316 fn __pender(context: *mut ());
336 PenderInner::Thread(x) => x.pend(),
337 #[cfg(feature = "executor-interrupt")]
338 PenderInner::Interrupt(x) => x.pend(),
339 #[cfg(feature = "pender-callback")]
340 PenderInner::Callback { func, context } => func(context),
341 } 317 }
318 unsafe { __pender(self.0) };
342 } 319 }
343} 320}
344 321
@@ -409,7 +386,7 @@ impl SyncExecutor {
409 #[allow(clippy::never_loop)] 386 #[allow(clippy::never_loop)]
410 loop { 387 loop {
411 #[cfg(feature = "integrated-timers")] 388 #[cfg(feature = "integrated-timers")]
412 self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); 389 self.timer_queue.dequeue_expired(Instant::now(), wake_task_no_pend);
413 390
414 self.run_queue.dequeue_all(|p| { 391 self.run_queue.dequeue_all(|p| {
415 let task = p.header(); 392 let task = p.header();
@@ -472,15 +449,31 @@ impl SyncExecutor {
472/// 449///
473/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks 450/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
474/// that "want to run"). 451/// that "want to run").
475/// - You must supply a [`Pender`]. The executor will call it to notify you it has work 452/// - You must supply a pender function, as shown below. The executor will call it to notify you
476/// to do. You must arrange for `poll()` to be called as soon as possible. 453/// it has work to do. You must arrange for `poll()` to be called as soon as possible.
454/// - Enabling `arch-xx` features will define a pender function for you. This means that you
455/// are limited to using the executors provided to you by the architecture/platform
456/// implementation. If you need a different executor, you must not enable `arch-xx` features.
477/// 457///
478/// The [`Pender`] can be called from *any* context: any thread, any interrupt priority 458/// The pender can be called from *any* context: any thread, any interrupt priority
479/// level, etc. It may be called synchronously from any `Executor` method call as well. 459/// level, etc. It may be called synchronously from any `Executor` method call as well.
480/// You must deal with this correctly. 460/// You must deal with this correctly.
481/// 461///
482/// In particular, you must NOT call `poll` directly from the pender callback, as this violates 462/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
483/// the requirement for `poll` to not be called reentrantly. 463/// the requirement for `poll` to not be called reentrantly.
464///
465/// The pender function must be exported with the name `__pender` and have the following signature:
466///
467/// ```rust
468/// #[export_name = "__pender"]
469/// fn pender(context: *mut ()) {
470/// // schedule `poll()` to be called
471/// }
472/// ```
473///
474/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
475/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
476/// differentiate between executors, or to pass a pointer to a callback that should be called.
484#[repr(transparent)] 477#[repr(transparent)]
485pub struct Executor { 478pub struct Executor {
486 pub(crate) inner: SyncExecutor, 479 pub(crate) inner: SyncExecutor,
@@ -495,12 +488,12 @@ impl Executor {
495 488
496 /// Create a new executor. 489 /// Create a new executor.
497 /// 490 ///
498 /// When the executor has work to do, it will call the [`Pender`]. 491 /// When the executor has work to do, it will call the pender function and pass `context` to it.
499 /// 492 ///
500 /// See [`Executor`] docs for details on `Pender`. 493 /// See [`Executor`] docs for details on the pender.
501 pub fn new(pender: Pender) -> Self { 494 pub fn new(context: *mut ()) -> Self {
502 Self { 495 Self {
503 inner: SyncExecutor::new(pender), 496 inner: SyncExecutor::new(Pender(context)),
504 _not_sync: PhantomData, 497 _not_sync: PhantomData,
505 } 498 }
506 } 499 }
@@ -523,16 +516,16 @@ impl Executor {
523 /// This loops over all tasks that are queued to be polled (i.e. they're 516 /// This loops over all tasks that are queued to be polled (i.e. they're
524 /// freshly spawned or they've been woken). Other tasks are not polled. 517 /// freshly spawned or they've been woken). Other tasks are not polled.
525 /// 518 ///
526 /// You must call `poll` after receiving a call to the [`Pender`]. It is OK 519 /// You must call `poll` after receiving a call to the pender. It is OK
527 /// to call `poll` even when not requested by the `Pender`, but it wastes 520 /// to call `poll` even when not requested by the pender, but it wastes
528 /// energy. 521 /// energy.
529 /// 522 ///
530 /// # Safety 523 /// # Safety
531 /// 524 ///
532 /// You must NOT call `poll` reentrantly on the same executor. 525 /// You must NOT call `poll` reentrantly on the same executor.
533 /// 526 ///
534 /// In particular, note that `poll` may call the `Pender` synchronously. Therefore, you 527 /// In particular, note that `poll` may call the pender synchronously. Therefore, you
535 /// must NOT directly call `poll()` from the `Pender` callback. Instead, the callback has to 528 /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
536 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's 529 /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
537 /// no `poll()` already running. 530 /// no `poll()` already running.
538 pub unsafe fn poll(&'static self) { 531 pub unsafe fn poll(&'static self) {
@@ -573,6 +566,31 @@ pub fn wake_task(task: TaskRef) {
573 } 566 }
574} 567}
575 568
569/// Wake a task by `TaskRef` without calling pend.
570///
571/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
572pub fn wake_task_no_pend(task: TaskRef) {
573 let header = task.header();
574
575 let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
576 // If already scheduled, or if not started,
577 if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
578 None
579 } else {
580 // Mark it as scheduled
581 Some(state | STATE_RUN_QUEUED)
582 }
583 });
584
585 if res.is_ok() {
586 // We have just marked the task as scheduled, so enqueue it.
587 unsafe {
588 let executor = header.executor.get().unwrap_unchecked();
589 executor.run_queue.enqueue(task);
590 }
591 }
592}
593
576#[cfg(feature = "integrated-timers")] 594#[cfg(feature = "integrated-timers")]
577struct TimerQueue; 595struct TimerQueue;
578 596