aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2021-03-18 00:20:02 +0100
committerDario Nieuwenhuis <[email protected]>2021-03-18 00:20:11 +0100
commit5c2bf3981ed81a7e2ff6300056c671733c810566 (patch)
tree1b894c4b4f6c326335b1c1dd24980e369ad33dc7
parent0cc2c671949937faed038b9d8d764a5f77f6b84a (diff)
Move Task into raw
-rw-r--r--embassy-macros/src/lib.rs7
-rw-r--r--embassy/src/executor/mod.rs64
-rw-r--r--embassy/src/executor/raw.rs79
-rw-r--r--embassy/src/executor/run_queue.rs10
-rw-r--r--embassy/src/executor/timer_queue.rs16
-rw-r--r--embassy/src/executor/waker.rs10
-rw-r--r--embassy/src/util/waker.rs6
7 files changed, 99 insertions, 93 deletions
diff --git a/embassy-macros/src/lib.rs b/embassy-macros/src/lib.rs
index 710c5a151..f207497d5 100644
--- a/embassy-macros/src/lib.rs
+++ b/embassy-macros/src/lib.rs
@@ -100,11 +100,12 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {
100 100
101 let result = quote! { 101 let result = quote! {
102 #visibility fn #name(#args) -> ::embassy::executor::SpawnToken<#impl_ty> { 102 #visibility fn #name(#args) -> ::embassy::executor::SpawnToken<#impl_ty> {
103 use ::embassy::executor::raw::Task;
103 #task_fn 104 #task_fn
104 type F = #impl_ty; 105 type F = #impl_ty;
105 const NEW_TASK: ::embassy::executor::Task<F> = ::embassy::executor::Task::new(); 106 const NEW_TASK: Task<F> = Task::new();
106 static POOL: [::embassy::executor::Task<F>; #pool_size] = [NEW_TASK; #pool_size]; 107 static POOL: [Task<F>; #pool_size] = [NEW_TASK; #pool_size];
107 unsafe { ::embassy::executor::Task::spawn(&POOL, move || task(#arg_names)) } 108 unsafe { Task::spawn(&POOL, move || task(#arg_names)) }
108 } 109 }
109 }; 110 };
110 result.into() 111 result.into()
diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs
index 10e543307..8b23264de 100644
--- a/embassy/src/executor/mod.rs
+++ b/embassy/src/executor/mod.rs
@@ -20,71 +20,9 @@ use crate::fmt::panic;
20use crate::interrupt::{Interrupt, InterruptExt}; 20use crate::interrupt::{Interrupt, InterruptExt};
21use crate::time::Alarm; 21use crate::time::Alarm;
22 22
23// repr(C) is needed to guarantee that the raw::Task is located at offset 0
24// This makes it safe to cast between raw::Task and Task pointers.
25#[repr(C)]
26pub struct Task<F: Future + 'static> {
27 raw: raw::Task,
28 future: UninitCell<F>, // Valid if STATE_SPAWNED
29}
30
31impl<F: Future + 'static> Task<F> {
32 pub const fn new() -> Self {
33 Self {
34 raw: raw::Task::new(),
35 future: UninitCell::uninit(),
36 }
37 }
38
39 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
40 for task in pool {
41 let state = raw::STATE_SPAWNED | raw::STATE_RUN_QUEUED;
42 if task
43 .raw
44 .state
45 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
46 .is_ok()
47 {
48 // Initialize the task
49 task.raw.poll_fn.write(Self::poll);
50 task.future.write(future());
51
52 return SpawnToken {
53 raw_task: Some(NonNull::new_unchecked(&task.raw as *const raw::Task as _)),
54 phantom: PhantomData,
55 };
56 }
57 }
58
59 SpawnToken {
60 raw_task: None,
61 phantom: PhantomData,
62 }
63 }
64
65 unsafe fn poll(p: NonNull<raw::Task>) {
66 let this = &*(p.as_ptr() as *const Task<F>);
67
68 let future = Pin::new_unchecked(this.future.as_mut());
69 let waker = waker::from_task(p);
70 let mut cx = Context::from_waker(&waker);
71 match future.poll(&mut cx) {
72 Poll::Ready(_) => {
73 this.future.drop_in_place();
74 this.raw
75 .state
76 .fetch_and(!raw::STATE_SPAWNED, Ordering::AcqRel);
77 }
78 Poll::Pending => {}
79 }
80 }
81}
82
83unsafe impl<F: Future + 'static> Sync for Task<F> {}
84
85#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] 23#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
86pub struct SpawnToken<F> { 24pub struct SpawnToken<F> {
87 raw_task: Option<NonNull<raw::Task>>, 25 raw_task: Option<NonNull<raw::TaskHeader>>,
88 phantom: PhantomData<*mut F>, 26 phantom: PhantomData<*mut F>,
89} 27}
90 28
diff --git a/embassy/src/executor/raw.rs b/embassy/src/executor/raw.rs
index 84e171dff..edc6d8053 100644
--- a/embassy/src/executor/raw.rs
+++ b/embassy/src/executor/raw.rs
@@ -1,15 +1,18 @@
1use atomic_polyfill::{AtomicU32, Ordering}; 1use atomic_polyfill::{AtomicU32, Ordering};
2use core::cell::Cell; 2use core::cell::Cell;
3use core::cmp::min; 3use core::cmp::min;
4use core::future::Future;
4use core::marker::PhantomData; 5use core::marker::PhantomData;
6use core::pin::Pin;
5use core::ptr; 7use core::ptr;
6use core::ptr::NonNull; 8use core::ptr::NonNull;
7use core::task::Waker; 9use core::task::{Context, Poll, Waker};
8 10
9use super::run_queue::{RunQueue, RunQueueItem}; 11use super::run_queue::{RunQueue, RunQueueItem};
10use super::timer_queue::{TimerQueue, TimerQueueItem}; 12use super::timer_queue::{TimerQueue, TimerQueueItem};
11use super::util::UninitCell; 13use super::util::UninitCell;
12use super::waker; 14use super::waker;
15use super::SpawnToken;
13use crate::time::{Alarm, Instant}; 16use crate::time::{Alarm, Instant};
14 17
15/// Task is spawned (has a future) 18/// Task is spawned (has a future)
@@ -19,16 +22,16 @@ pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
19/// Task is in the executor timer queue 22/// Task is in the executor timer queue
20pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; 23pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
21 24
22pub struct Task { 25pub struct TaskHeader {
23 pub(crate) state: AtomicU32, 26 pub(crate) state: AtomicU32,
24 pub(crate) run_queue_item: RunQueueItem, 27 pub(crate) run_queue_item: RunQueueItem,
25 pub(crate) expires_at: Cell<Instant>, 28 pub(crate) expires_at: Cell<Instant>,
26 pub(crate) timer_queue_item: TimerQueueItem, 29 pub(crate) timer_queue_item: TimerQueueItem,
27 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0 30 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
28 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<Task>)>, // Valid if STATE_SPAWNED 31 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<TaskHeader>)>, // Valid if STATE_SPAWNED
29} 32}
30 33
31impl Task { 34impl TaskHeader {
32 pub(crate) const fn new() -> Self { 35 pub(crate) const fn new() -> Self {
33 Self { 36 Self {
34 state: AtomicU32::new(0), 37 state: AtomicU32::new(0),
@@ -64,10 +67,70 @@ impl Task {
64 67
65 // We have just marked the task as scheduled, so enqueue it. 68 // We have just marked the task as scheduled, so enqueue it.
66 let executor = &*self.executor.get(); 69 let executor = &*self.executor.get();
67 executor.enqueue(self as *const Task as *mut Task); 70 executor.enqueue(self as *const TaskHeader as *mut TaskHeader);
68 } 71 }
69} 72}
70 73
74// repr(C) is needed to guarantee that the Task is located at offset 0
75// This makes it safe to cast between Task and Task pointers.
76#[repr(C)]
77pub struct Task<F: Future + 'static> {
78 raw: TaskHeader,
79 future: UninitCell<F>, // Valid if STATE_SPAWNED
80}
81
82impl<F: Future + 'static> Task<F> {
83 pub const fn new() -> Self {
84 Self {
85 raw: TaskHeader::new(),
86 future: UninitCell::uninit(),
87 }
88 }
89
90 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
91 for task in pool {
92 let state = STATE_SPAWNED | STATE_RUN_QUEUED;
93 if task
94 .raw
95 .state
96 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
97 .is_ok()
98 {
99 // Initialize the task
100 task.raw.poll_fn.write(Self::poll);
101 task.future.write(future());
102
103 return SpawnToken {
104 raw_task: Some(NonNull::new_unchecked(&task.raw as *const TaskHeader as _)),
105 phantom: PhantomData,
106 };
107 }
108 }
109
110 SpawnToken {
111 raw_task: None,
112 phantom: PhantomData,
113 }
114 }
115
116 unsafe fn poll(p: NonNull<TaskHeader>) {
117 let this = &*(p.as_ptr() as *const Task<F>);
118
119 let future = Pin::new_unchecked(this.future.as_mut());
120 let waker = waker::from_task(p);
121 let mut cx = Context::from_waker(&waker);
122 match future.poll(&mut cx) {
123 Poll::Ready(_) => {
124 this.future.drop_in_place();
125 this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
126 }
127 Poll::Pending => {}
128 }
129 }
130}
131
132unsafe impl<F: Future + 'static> Sync for Task<F> {}
133
71pub struct Executor { 134pub struct Executor {
72 run_queue: RunQueue, 135 run_queue: RunQueue,
73 timer_queue: TimerQueue, 136 timer_queue: TimerQueue,
@@ -95,13 +158,13 @@ impl Executor {
95 self.signal_ctx = signal_ctx; 158 self.signal_ctx = signal_ctx;
96 } 159 }
97 160
98 unsafe fn enqueue(&self, item: *mut Task) { 161 unsafe fn enqueue(&self, item: *mut TaskHeader) {
99 if self.run_queue.enqueue(item) { 162 if self.run_queue.enqueue(item) {
100 (self.signal_fn)(self.signal_ctx) 163 (self.signal_fn)(self.signal_ctx)
101 } 164 }
102 } 165 }
103 166
104 pub unsafe fn spawn(&'static self, task: NonNull<Task>) { 167 pub unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
105 let task = task.as_ref(); 168 let task = task.as_ref();
106 task.executor.set(self); 169 task.executor.set(self);
107 self.enqueue(task as *const _ as _); 170 self.enqueue(task as *const _ as _);
@@ -154,7 +217,7 @@ impl Executor {
154 217
155pub use super::waker::task_from_waker; 218pub use super::waker::task_from_waker;
156 219
157pub unsafe fn wake_task(task: NonNull<Task>) { 220pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
158 task.as_ref().enqueue(); 221 task.as_ref().enqueue();
159} 222}
160 223
diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs
index 1d1023e51..083916139 100644
--- a/embassy/src/executor/run_queue.rs
+++ b/embassy/src/executor/run_queue.rs
@@ -2,10 +2,10 @@ use atomic_polyfill::{AtomicPtr, Ordering};
2use core::ptr; 2use core::ptr;
3use core::ptr::NonNull; 3use core::ptr::NonNull;
4 4
5use super::raw::Task; 5use super::raw::TaskHeader;
6 6
7pub(crate) struct RunQueueItem { 7pub(crate) struct RunQueueItem {
8 next: AtomicPtr<Task>, 8 next: AtomicPtr<TaskHeader>,
9} 9}
10 10
11impl RunQueueItem { 11impl RunQueueItem {
@@ -28,7 +28,7 @@ impl RunQueueItem {
28/// current batch is completely processed, so even if a task enqueues itself instantly (for example 28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
29/// by waking its own waker) can't prevent other tasks from running. 29/// by waking its own waker) can't prevent other tasks from running.
30pub(crate) struct RunQueue { 30pub(crate) struct RunQueue {
31 head: AtomicPtr<Task>, 31 head: AtomicPtr<TaskHeader>,
32} 32}
33 33
34impl RunQueue { 34impl RunQueue {
@@ -39,7 +39,7 @@ impl RunQueue {
39 } 39 }
40 40
41 /// Enqueues an item. Returns true if the queue was empty. 41 /// Enqueues an item. Returns true if the queue was empty.
42 pub(crate) unsafe fn enqueue(&self, item: *mut Task) -> bool { 42 pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool {
43 let mut prev = self.head.load(Ordering::Acquire); 43 let mut prev = self.head.load(Ordering::Acquire);
44 loop { 44 loop {
45 (*item).run_queue_item.next.store(prev, Ordering::Relaxed); 45 (*item).run_queue_item.next.store(prev, Ordering::Relaxed);
@@ -55,7 +55,7 @@ impl RunQueue {
55 prev.is_null() 55 prev.is_null()
56 } 56 }
57 57
58 pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull<Task>)) { 58 pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull<TaskHeader>)) {
59 let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 59 let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
60 60
61 while !task.is_null() { 61 while !task.is_null() {
diff --git a/embassy/src/executor/timer_queue.rs b/embassy/src/executor/timer_queue.rs
index d72eb93b1..a6939f110 100644
--- a/embassy/src/executor/timer_queue.rs
+++ b/embassy/src/executor/timer_queue.rs
@@ -4,11 +4,11 @@ use core::cmp::min;
4use core::ptr; 4use core::ptr;
5use core::ptr::NonNull; 5use core::ptr::NonNull;
6 6
7use super::raw::{Task, STATE_TIMER_QUEUED}; 7use super::raw::{TaskHeader, STATE_TIMER_QUEUED};
8use crate::time::Instant; 8use crate::time::Instant;
9 9
10pub(crate) struct TimerQueueItem { 10pub(crate) struct TimerQueueItem {
11 next: Cell<*mut Task>, 11 next: Cell<*mut TaskHeader>,
12} 12}
13 13
14impl TimerQueueItem { 14impl TimerQueueItem {
@@ -20,7 +20,7 @@ impl TimerQueueItem {
20} 20}
21 21
22pub(crate) struct TimerQueue { 22pub(crate) struct TimerQueue {
23 head: Cell<*mut Task>, 23 head: Cell<*mut TaskHeader>,
24} 24}
25 25
26impl TimerQueue { 26impl TimerQueue {
@@ -30,7 +30,7 @@ impl TimerQueue {
30 } 30 }
31 } 31 }
32 32
33 pub(crate) unsafe fn update(&self, p: NonNull<Task>) { 33 pub(crate) unsafe fn update(&self, p: NonNull<TaskHeader>) {
34 let task = p.as_ref(); 34 let task = p.as_ref();
35 if task.expires_at.get() != Instant::MAX { 35 if task.expires_at.get() != Instant::MAX {
36 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 36 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
@@ -54,7 +54,11 @@ impl TimerQueue {
54 res 54 res
55 } 55 }
56 56
57 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<Task>)) { 57 pub(crate) unsafe fn dequeue_expired(
58 &self,
59 now: Instant,
60 on_task: impl Fn(NonNull<TaskHeader>),
61 ) {
58 self.retain(|p| { 62 self.retain(|p| {
59 let task = p.as_ref(); 63 let task = p.as_ref();
60 if task.expires_at.get() <= now { 64 if task.expires_at.get() <= now {
@@ -66,7 +70,7 @@ impl TimerQueue {
66 }); 70 });
67 } 71 }
68 72
69 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<Task>) -> bool) { 73 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<TaskHeader>) -> bool) {
70 let mut prev = &self.head; 74 let mut prev = &self.head;
71 while !prev.get().is_null() { 75 while !prev.get().is_null() {
72 let p = NonNull::new_unchecked(prev.get()); 76 let p = NonNull::new_unchecked(prev.get());
diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs
index bc02c51df..050f6a1cf 100644
--- a/embassy/src/executor/waker.rs
+++ b/embassy/src/executor/waker.rs
@@ -2,7 +2,7 @@ use core::mem;
2use core::ptr::NonNull; 2use core::ptr::NonNull;
3use core::task::{RawWaker, RawWakerVTable, Waker}; 3use core::task::{RawWaker, RawWakerVTable, Waker};
4 4
5use super::raw::Task; 5use super::raw::TaskHeader;
6 6
7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
8 8
@@ -11,21 +11,21 @@ unsafe fn clone(p: *const ()) -> RawWaker {
11} 11}
12 12
13unsafe fn wake(p: *const ()) { 13unsafe fn wake(p: *const ()) {
14 (*(p as *mut Task)).enqueue() 14 (*(p as *mut TaskHeader)).enqueue()
15} 15}
16 16
17unsafe fn drop(_: *const ()) { 17unsafe fn drop(_: *const ()) {
18 // nop 18 // nop
19} 19}
20 20
21pub(crate) unsafe fn from_task(p: NonNull<Task>) -> Waker { 21pub(crate) unsafe fn from_task(p: NonNull<TaskHeader>) -> Waker {
22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE)) 22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
23} 23}
24 24
25pub unsafe fn task_from_waker(waker: &Waker) -> NonNull<Task> { 25pub unsafe fn task_from_waker(waker: &Waker) -> NonNull<TaskHeader> {
26 let hack: &WakerHack = mem::transmute(waker); 26 let hack: &WakerHack = mem::transmute(waker);
27 assert_eq!(hack.vtable, &VTABLE); 27 assert_eq!(hack.vtable, &VTABLE);
28 NonNull::new_unchecked(hack.data as *mut Task) 28 NonNull::new_unchecked(hack.data as *mut TaskHeader)
29} 29}
30 30
31struct WakerHack { 31struct WakerHack {
diff --git a/embassy/src/util/waker.rs b/embassy/src/util/waker.rs
index 1f2d3a770..2b72fd560 100644
--- a/embassy/src/util/waker.rs
+++ b/embassy/src/util/waker.rs
@@ -3,12 +3,12 @@ use core::task::Waker;
3 3
4use atomic_polyfill::{AtomicPtr, Ordering}; 4use atomic_polyfill::{AtomicPtr, Ordering};
5 5
6use crate::executor::raw::{task_from_waker, wake_task, Task}; 6use crate::executor::raw::{task_from_waker, wake_task, TaskHeader};
7 7
8/// Utility struct to register and wake a waker. 8/// Utility struct to register and wake a waker.
9#[derive(Debug)] 9#[derive(Debug)]
10pub struct WakerRegistration { 10pub struct WakerRegistration {
11 waker: Option<NonNull<Task>>, 11 waker: Option<NonNull<TaskHeader>>,
12} 12}
13 13
14impl WakerRegistration { 14impl WakerRegistration {
@@ -49,7 +49,7 @@ impl WakerRegistration {
49} 49}
50 50
51pub struct AtomicWakerRegistration { 51pub struct AtomicWakerRegistration {
52 waker: AtomicPtr<Task>, 52 waker: AtomicPtr<TaskHeader>,
53} 53}
54 54
55impl AtomicWakerRegistration { 55impl AtomicWakerRegistration {