aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2020-12-26 17:22:36 +0100
committerDario Nieuwenhuis <[email protected]>2020-12-26 17:22:36 +0100
commit8b7a42a4f9271c337d550be1f34e7d163f9eb905 (patch)
treefe0c7a367f3e61eed759877976b778b75a733c11
parent3df66c44e3bd84df659e2f4d30bb18128cd89427 (diff)
Split waker to separate file.
-rw-r--r--embassy/src/executor/mod.rs106
-rw-r--r--embassy/src/executor/waker.rs22
2 files changed, 65 insertions, 63 deletions
diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs
index 435c97db8..6c76eed76 100644
--- a/embassy/src/executor/mod.rs
+++ b/embassy/src/executor/mod.rs
@@ -8,16 +8,17 @@ use core::pin::Pin;
8use core::ptr; 8use core::ptr;
9use core::ptr::NonNull; 9use core::ptr::NonNull;
10use core::sync::atomic::{AtomicU32, Ordering}; 10use core::sync::atomic::{AtomicU32, Ordering};
11use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; 11use core::task::{Context, Poll, RawWaker, Waker};
12 12
13mod run_queue; 13mod run_queue;
14mod util; 14mod util;
15mod waker;
15 16
16use self::run_queue::{RunQueue, RunQueueItem}; 17use self::run_queue::{RunQueue, RunQueueItem};
17use self::util::UninitCell; 18use self::util::UninitCell;
18 19
19/// Task is spawned and future hasn't finished running yet. 20/// Task is spawned (has a future)
20const STATE_RUNNING: u32 = 1 << 0; 21const STATE_SPAWNED: u32 = 1 << 0;
21/// Task is in the executor run queue 22/// Task is in the executor run queue
22const STATE_RUN_QUEUED: u32 = 1 << 1; 23const STATE_RUN_QUEUED: u32 = 1 << 1;
23/// Task is in the executor timer queue 24/// Task is in the executor timer queue
@@ -27,67 +28,46 @@ pub(crate) struct TaskHeader {
27 state: AtomicU32, 28 state: AtomicU32,
28 run_queue_item: RunQueueItem, 29 run_queue_item: RunQueueItem,
29 executor: Cell<*const Executor>, // Valid if state != 0 30 executor: Cell<*const Executor>, // Valid if state != 0
30 poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_RUNNING 31 poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_SPAWNED
31} 32}
32 33
33// repr(C) is needed to guarantee that header is located at offset 0 34impl TaskHeader {
34// This makes it safe to cast between Header and Task pointers. 35 pub(crate) unsafe fn enqueue(&self) {
35#[repr(C)] 36 let mut current = self.state.load(Ordering::Acquire);
36pub struct Task<F: Future + 'static> { 37 loop {
37 header: TaskHeader, 38 // If already scheduled, or if not started,
38 future: UninitCell<F>, // Valid if STATE_RUNNING 39 if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
39} 40 return;
40 41 }
41#[derive(Copy, Clone, Debug)]
42#[cfg_attr(feature = "defmt", derive(defmt::Format))]
43pub enum SpawnError {
44 Busy,
45}
46
47//=============
48// Waker
49
50static WAKER_VTABLE: RawWakerVTable =
51 RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
52
53unsafe fn waker_clone(p: *const ()) -> RawWaker {
54 RawWaker::new(p, &WAKER_VTABLE)
55}
56
57unsafe fn waker_wake(p: *const ()) {
58 let header = &*(p as *const TaskHeader);
59 42
60 let mut current = header.state.load(Ordering::Acquire); 43 // Mark it as scheduled
61 loop { 44 let new = current | STATE_RUN_QUEUED;
62 // If already scheduled, or if not started, 45
63 if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) { 46 match self.state.compare_exchange_weak(
64 return; 47 current,
48 new,
49 Ordering::AcqRel,
50 Ordering::Acquire,
51 ) {
52 Ok(_) => break,
53 Err(next_current) => current = next_current,
54 }
65 } 55 }
66 56
67 // Mark it as scheduled 57 // We have just marked the task as scheduled, so enqueue it.
68 let new = current | STATE_RUN_QUEUED; 58 let executor = &*self.executor.get();
69 59 executor.enqueue(self as *const TaskHeader as *mut TaskHeader);
70 match header
71 .state
72 .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
73 {
74 Ok(_) => break,
75 Err(next_current) => current = next_current,
76 }
77 } 60 }
78
79 // We have just marked the task as scheduled, so enqueue it.
80 let executor = &*header.executor.get();
81 executor.enqueue(p as *mut TaskHeader);
82} 61}
83 62
84unsafe fn waker_drop(_: *const ()) { 63// repr(C) is needed to guarantee that header is located at offset 0
85 // nop 64// This makes it safe to cast between Header and Task pointers.
65#[repr(C)]
66pub struct Task<F: Future + 'static> {
67 header: TaskHeader,
68 future: UninitCell<F>, // Valid if STATE_SPAWNED
86} 69}
87 70
88//=============
89// Task
90
91impl<F: Future + 'static> Task<F> { 71impl<F: Future + 'static> Task<F> {
92 pub const fn new() -> Self { 72 pub const fn new() -> Self {
93 Self { 73 Self {
@@ -103,7 +83,7 @@ impl<F: Future + 'static> Task<F> {
103 83
104 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { 84 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
105 for task in pool { 85 for task in pool {
106 let state = STATE_RUNNING | STATE_RUN_QUEUED; 86 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
107 if task 87 if task
108 .header 88 .header
109 .state 89 .state
@@ -129,14 +109,14 @@ impl<F: Future + 'static> Task<F> {
129 let this = &*(p as *const Task<F>); 109 let this = &*(p as *const Task<F>);
130 110
131 let future = Pin::new_unchecked(this.future.as_mut()); 111 let future = Pin::new_unchecked(this.future.as_mut());
132 let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); 112 let waker = waker::from_task(p);
133 let mut cx = Context::from_waker(&waker); 113 let mut cx = Context::from_waker(&waker);
134 match future.poll(&mut cx) { 114 match future.poll(&mut cx) {
135 Poll::Ready(_) => { 115 Poll::Ready(_) => {
136 this.future.drop_in_place(); 116 this.future.drop_in_place();
137 this.header 117 this.header
138 .state 118 .state
139 .fetch_and(!STATE_RUNNING, Ordering::AcqRel); 119 .fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
140 } 120 }
141 Poll::Pending => {} 121 Poll::Pending => {}
142 } 122 }
@@ -145,9 +125,6 @@ impl<F: Future + 'static> Task<F> {
145 125
146unsafe impl<F: Future + 'static> Sync for Task<F> {} 126unsafe impl<F: Future + 'static> Sync for Task<F> {}
147 127
148//=============
149// Spawn token
150
151#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] 128#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
152pub struct SpawnToken { 129pub struct SpawnToken {
153 header: Option<NonNull<TaskHeader>>, 130 header: Option<NonNull<TaskHeader>>,
@@ -160,8 +137,11 @@ impl Drop for SpawnToken {
160 } 137 }
161} 138}
162 139
163//============= 140#[derive(Copy, Clone, Debug)]
164// Executor 141#[cfg_attr(feature = "defmt", derive(defmt::Format))]
142pub enum SpawnError {
143 Busy,
144}
165 145
166pub struct Executor { 146pub struct Executor {
167 run_queue: RunQueue, 147 run_queue: RunQueue,
@@ -207,7 +187,7 @@ impl Executor {
207 let header = &*p; 187 let header = &*p;
208 188
209 let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 189 let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
210 if state & STATE_RUNNING == 0 { 190 if state & STATE_SPAWNED == 0 {
211 // If task is not running, ignore it. This can happen in the following scenario: 191 // If task is not running, ignore it. This can happen in the following scenario:
212 // - Task gets dequeued, poll starts 192 // - Task gets dequeued, poll starts
213 // - While task is being polled, it gets woken. It gets placed in the queue. 193 // - While task is being polled, it gets woken. It gets placed in the queue.
diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs
new file mode 100644
index 000000000..662857dea
--- /dev/null
+++ b/embassy/src/executor/waker.rs
@@ -0,0 +1,22 @@
1use core::task::{RawWaker, RawWakerVTable, Waker};
2
3use super::TaskHeader;
4
5static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
6
7unsafe fn clone(p: *const ()) -> RawWaker {
8 RawWaker::new(p, &VTABLE)
9}
10
11unsafe fn wake(p: *const ()) {
12 let header = &*(p as *const TaskHeader);
13 header.enqueue();
14}
15
16unsafe fn drop(_: *const ()) {
17 // nop
18}
19
20pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker {
21 Waker::from_raw(RawWaker::new(p as _, &VTABLE))
22}