aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.vscode/settings.json3
-rw-r--r--embassy-macros/src/lib.rs20
-rw-r--r--embassy-nrf-examples/src/bin/buffered_uart.rs11
-rw-r--r--embassy-nrf-examples/src/bin/executor_fairness_test.rs17
-rw-r--r--embassy-nrf-examples/src/bin/gpiote.rs11
-rw-r--r--embassy-nrf-examples/src/bin/gpiote_port.rs11
-rw-r--r--embassy-nrf-examples/src/bin/multiprio.rs75
-rw-r--r--embassy-nrf-examples/src/bin/qspi.rs11
-rw-r--r--embassy-nrf-examples/src/bin/rtc_async.rs15
-rw-r--r--embassy-nrf-examples/src/bin/rtc_raw.rs2
-rw-r--r--embassy-nrf-examples/src/bin/uart.rs66
-rw-r--r--embassy-nrf/src/rtc.rs12
-rw-r--r--embassy-stm32f4-examples/src/bin/exti.rs11
-rw-r--r--embassy-stm32f4-examples/src/bin/serial.rs11
-rw-r--r--embassy/src/executor/mod.rs327
-rw-r--r--embassy/src/executor/raw.rs154
-rw-r--r--embassy/src/executor/run_queue.rs13
-rw-r--r--embassy/src/executor/timer.rs5
-rw-r--r--embassy/src/executor/timer_queue.rs47
-rw-r--r--embassy/src/executor/waker.rs22
-rw-r--r--embassy/src/interrupt.rs1
-rw-r--r--embassy/src/lib.rs1
-rw-r--r--embassy/src/time/traits.rs6
-rw-r--r--embassy/src/util/signal.rs8
-rwxr-xr-xtest-build.sh2
25 files changed, 490 insertions, 372 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 8c53d2097..a6d083ad9 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -1,4 +1,5 @@
1{ 1{
2 "rust-analyzer.assist.importMergeBehavior": "last",
2 "editor.formatOnSave": true, 3 "editor.formatOnSave": true,
3 "rust-analyzer.cargo.allFeatures": false, 4 "rust-analyzer.cargo.allFeatures": false,
4 "rust-analyzer.checkOnSave.allFeatures": false, 5 "rust-analyzer.checkOnSave.allFeatures": false,
@@ -8,4 +9,4 @@
8 "**/.git/subtree-cache/**": true, 9 "**/.git/subtree-cache/**": true,
9 "**/target/**": true 10 "**/target/**": true
10 } 11 }
11} 12} \ No newline at end of file
diff --git a/embassy-macros/src/lib.rs b/embassy-macros/src/lib.rs
index cb16f65aa..23f1cda99 100644
--- a/embassy-macros/src/lib.rs
+++ b/embassy-macros/src/lib.rs
@@ -11,21 +11,23 @@ use syn::spanned::Spanned;
11struct MacroArgs { 11struct MacroArgs {
12 #[darling(default)] 12 #[darling(default)]
13 pool_size: Option<usize>, 13 pool_size: Option<usize>,
14 #[darling(default)]
15 send: bool,
14} 16}
15 17
16#[proc_macro_attribute] 18#[proc_macro_attribute]
17pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { 19pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {
18 let args = syn::parse_macro_input!(args as syn::AttributeArgs); 20 let macro_args = syn::parse_macro_input!(args as syn::AttributeArgs);
19 let mut task_fn = syn::parse_macro_input!(item as syn::ItemFn); 21 let mut task_fn = syn::parse_macro_input!(item as syn::ItemFn);
20 22
21 let args = match MacroArgs::from_list(&args) { 23 let macro_args = match MacroArgs::from_list(&macro_args) {
22 Ok(v) => v, 24 Ok(v) => v,
23 Err(e) => { 25 Err(e) => {
24 return TokenStream::from(e.write_errors()); 26 return TokenStream::from(e.write_errors());
25 } 27 }
26 }; 28 };
27 29
28 let pool_size: usize = args.pool_size.unwrap_or(1); 30 let pool_size: usize = macro_args.pool_size.unwrap_or(1);
29 31
30 let mut fail = false; 32 let mut fail = false;
31 if task_fn.sig.asyncness.is_none() { 33 if task_fn.sig.asyncness.is_none() {
@@ -90,11 +92,16 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {
90 92
91 let visibility = &task_fn.vis; 93 let visibility = &task_fn.vis;
92 task_fn.sig.ident = format_ident!("task"); 94 task_fn.sig.ident = format_ident!("task");
95 let impl_ty = if macro_args.send {
96 quote!(impl ::core::future::Future + Send + 'static)
97 } else {
98 quote!(impl ::core::future::Future + 'static)
99 };
93 100
94 let result = quote! { 101 let result = quote! {
95 #visibility fn #name(#args) -> ::embassy::executor::SpawnToken { 102 #visibility fn #name(#args) -> ::embassy::executor::SpawnToken<#impl_ty> {
96 #task_fn 103 #task_fn
97 type F = impl ::core::future::Future + 'static; 104 type F = #impl_ty;
98 static POOL: [::embassy::executor::Task<F>; #pool_size] = [::embassy::executor::Task::new(); #pool_size]; 105 static POOL: [::embassy::executor::Task<F>; #pool_size] = [::embassy::executor::Task::new(); #pool_size];
99 unsafe { ::embassy::executor::Task::spawn(&POOL, move || task(#arg_names)) } 106 unsafe { ::embassy::executor::Task::spawn(&POOL, move || task(#arg_names)) }
100 } 107 }
@@ -119,6 +126,9 @@ pub fn interrupt_declare(item: TokenStream) -> TokenStream {
119 let irq = Interrupt::#name; 126 let irq = Interrupt::#name;
120 irq.nr() as u8 127 irq.nr() as u8
121 } 128 }
129 unsafe fn steal() -> Self {
130 Self(())
131 }
122 unsafe fn __handler(&self) -> &'static ::embassy::interrupt::Handler { 132 unsafe fn __handler(&self) -> &'static ::embassy::interrupt::Handler {
123 #[export_name = #name_handler] 133 #[export_name = #name_handler]
124 static HANDLER: ::embassy::interrupt::Handler = ::embassy::interrupt::Handler::new(); 134 static HANDLER: ::embassy::interrupt::Handler = ::embassy::interrupt::Handler::new();
diff --git a/embassy-nrf-examples/src/bin/buffered_uart.rs b/embassy-nrf-examples/src/bin/buffered_uart.rs
index 57c6b4cf4..7c7283fc2 100644
--- a/embassy-nrf-examples/src/bin/buffered_uart.rs
+++ b/embassy-nrf-examples/src/bin/buffered_uart.rs
@@ -83,11 +83,8 @@ static EXECUTOR: Forever<Executor> = Forever::new();
83fn main() -> ! { 83fn main() -> ! {
84 info!("Hello World!"); 84 info!("Hello World!");
85 85
86 let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); 86 let executor = EXECUTOR.put(Executor::new());
87 unwrap!(executor.spawn(run())); 87 executor.run(|spawner| {
88 88 unwrap!(spawner.spawn(run()));
89 loop { 89 });
90 executor.run();
91 cortex_m::asm::wfe();
92 }
93} 90}
diff --git a/embassy-nrf-examples/src/bin/executor_fairness_test.rs b/embassy-nrf-examples/src/bin/executor_fairness_test.rs
index 9b2c1bd26..1b9955739 100644
--- a/embassy-nrf-examples/src/bin/executor_fairness_test.rs
+++ b/embassy-nrf-examples/src/bin/executor_fairness_test.rs
@@ -61,14 +61,11 @@ fn main() -> ! {
61 unsafe { embassy::time::set_clock(rtc) }; 61 unsafe { embassy::time::set_clock(rtc) };
62 62
63 let alarm = ALARM.put(rtc.alarm0()); 63 let alarm = ALARM.put(rtc.alarm0());
64 let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); 64 let executor = EXECUTOR.put(Executor::new());
65 65 executor.set_alarm(alarm);
66 unwrap!(executor.spawn(run1())); 66 executor.run(|spawner| {
67 unwrap!(executor.spawn(run2())); 67 unwrap!(spawner.spawn(run1()));
68 unwrap!(executor.spawn(run3())); 68 unwrap!(spawner.spawn(run2()));
69 69 unwrap!(spawner.spawn(run3()));
70 loop { 70 });
71 executor.run();
72 cortex_m::asm::wfe();
73 }
74} 71}
diff --git a/embassy-nrf-examples/src/bin/gpiote.rs b/embassy-nrf-examples/src/bin/gpiote.rs
index afa1b85d5..f5315d6a6 100644
--- a/embassy-nrf-examples/src/bin/gpiote.rs
+++ b/embassy-nrf-examples/src/bin/gpiote.rs
@@ -73,11 +73,8 @@ static EXECUTOR: Forever<Executor> = Forever::new();
73fn main() -> ! { 73fn main() -> ! {
74 info!("Hello World!"); 74 info!("Hello World!");
75 75
76 let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); 76 let executor = EXECUTOR.put(Executor::new());
77 unwrap!(executor.spawn(run())); 77 executor.run(|spawner| {
78 78 unwrap!(spawner.spawn(run()));
79 loop { 79 });
80 executor.run();
81 cortex_m::asm::wfe();
82 }
83} 80}
diff --git a/embassy-nrf-examples/src/bin/gpiote_port.rs b/embassy-nrf-examples/src/bin/gpiote_port.rs
index f5aa81322..833096f3a 100644
--- a/embassy-nrf-examples/src/bin/gpiote_port.rs
+++ b/embassy-nrf-examples/src/bin/gpiote_port.rs
@@ -52,11 +52,8 @@ static EXECUTOR: Forever<Executor> = Forever::new();
52fn main() -> ! { 52fn main() -> ! {
53 info!("Hello World!"); 53 info!("Hello World!");
54 54
55 let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); 55 let executor = EXECUTOR.put(Executor::new());
56 unwrap!(executor.spawn(run())); 56 executor.run(|spawner| {
57 57 unwrap!(spawner.spawn(run()));
58 loop { 58 });
59 executor.run();
60 cortex_m::asm::wfe();
61 }
62} 59}
diff --git a/embassy-nrf-examples/src/bin/multiprio.rs b/embassy-nrf-examples/src/bin/multiprio.rs
index c821e3dba..8c2ec19af 100644
--- a/embassy-nrf-examples/src/bin/multiprio.rs
+++ b/embassy-nrf-examples/src/bin/multiprio.rs
@@ -66,9 +66,10 @@ use cortex_m_rt::entry;
66use defmt::panic; 66use defmt::panic;
67use nrf52840_hal::clocks; 67use nrf52840_hal::clocks;
68 68
69use embassy::executor::{task, Executor}; 69use embassy::executor::{task, Executor, IrqExecutor};
70use embassy::time::{Duration, Instant, Timer}; 70use embassy::time::{Duration, Instant, Timer};
71use embassy::util::Forever; 71use embassy::util::Forever;
72use embassy_nrf::interrupt::OwnedInterrupt;
72use embassy_nrf::{interrupt, pac, rtc}; 73use embassy_nrf::{interrupt, pac, rtc};
73 74
74#[task] 75#[task]
@@ -114,12 +115,12 @@ async fn run_low() {
114} 115}
115 116
116static RTC: Forever<rtc::RTC<pac::RTC1>> = Forever::new(); 117static RTC: Forever<rtc::RTC<pac::RTC1>> = Forever::new();
118static ALARM_HIGH: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
119static EXECUTOR_HIGH: Forever<IrqExecutor<interrupt::SWI1_EGU1Interrupt>> = Forever::new();
120static ALARM_MED: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
121static EXECUTOR_MED: Forever<IrqExecutor<interrupt::SWI0_EGU0Interrupt>> = Forever::new();
117static ALARM_LOW: Forever<rtc::Alarm<pac::RTC1>> = Forever::new(); 122static ALARM_LOW: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
118static EXECUTOR_LOW: Forever<Executor> = Forever::new(); 123static EXECUTOR_LOW: Forever<Executor> = Forever::new();
119static ALARM_MED: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
120static EXECUTOR_MED: Forever<Executor> = Forever::new();
121static ALARM_HIGH: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
122static EXECUTOR_HIGH: Forever<Executor> = Forever::new();
123 124
124#[entry] 125#[entry]
125fn main() -> ! { 126fn main() -> ! {
@@ -136,41 +137,31 @@ fn main() -> ! {
136 rtc.start(); 137 rtc.start();
137 unsafe { embassy::time::set_clock(rtc) }; 138 unsafe { embassy::time::set_clock(rtc) };
138 139
139 let alarm_low = ALARM_LOW.put(rtc.alarm0()); 140 // High-priority executor: SWI1_EGU1, priority level 6
140 let executor_low = EXECUTOR_LOW.put(Executor::new_with_alarm(alarm_low, cortex_m::asm::sev)); 141 let irq = interrupt::take!(SWI1_EGU1);
141 let alarm_med = ALARM_MED.put(rtc.alarm1()); 142 irq.set_priority(interrupt::Priority::Level6);
142 let executor_med = EXECUTOR_MED.put(Executor::new_with_alarm(alarm_med, || { 143 let alarm = ALARM_HIGH.put(rtc.alarm2());
143 NVIC::pend(interrupt::SWI0_EGU0) 144 let executor = EXECUTOR_HIGH.put(IrqExecutor::new(irq));
144 })); 145 executor.set_alarm(alarm);
145 let alarm_high = ALARM_HIGH.put(rtc.alarm2()); 146 executor.start(|spawner| {
146 let executor_high = EXECUTOR_HIGH.put(Executor::new_with_alarm(alarm_high, || { 147 unwrap!(spawner.spawn(run_high()));
147 NVIC::pend(interrupt::SWI1_EGU1) 148 });
148 })); 149
149 150 // Medium-priority executor: SWI0_EGU0, priority level 7
150 unsafe { 151 let irq = interrupt::take!(SWI0_EGU0);
151 let mut nvic: NVIC = core::mem::transmute(()); 152 irq.set_priority(interrupt::Priority::Level7);
152 nvic.set_priority(interrupt::SWI0_EGU0, 7 << 5); 153 let alarm = ALARM_MED.put(rtc.alarm1());
153 nvic.set_priority(interrupt::SWI1_EGU1, 6 << 5); 154 let executor = EXECUTOR_MED.put(IrqExecutor::new(irq));
154 NVIC::unmask(interrupt::SWI0_EGU0); 155 executor.set_alarm(alarm);
155 NVIC::unmask(interrupt::SWI1_EGU1); 156 executor.start(|spawner| {
156 } 157 unwrap!(spawner.spawn(run_med()));
157 158 });
158 unwrap!(executor_low.spawn(run_low())); 159
159 unwrap!(executor_med.spawn(run_med())); 160 // Low priority executor: runs in thread mode, using WFE/SEV
160 unwrap!(executor_high.spawn(run_high())); 161 let alarm = ALARM_LOW.put(rtc.alarm0());
161 162 let executor = EXECUTOR_LOW.put(Executor::new());
162 loop { 163 executor.set_alarm(alarm);
163 executor_low.run(); 164 executor.run(|spawner| {
164 cortex_m::asm::wfe(); 165 unwrap!(spawner.spawn(run_low()));
165 } 166 });
166}
167
168#[interrupt]
169unsafe fn SWI0_EGU0() {
170 EXECUTOR_MED.steal().run()
171}
172
173#[interrupt]
174unsafe fn SWI1_EGU1() {
175 EXECUTOR_HIGH.steal().run()
176} 167}
diff --git a/embassy-nrf-examples/src/bin/qspi.rs b/embassy-nrf-examples/src/bin/qspi.rs
index a7d47f79c..4edbd3f91 100644
--- a/embassy-nrf-examples/src/bin/qspi.rs
+++ b/embassy-nrf-examples/src/bin/qspi.rs
@@ -124,11 +124,8 @@ static EXECUTOR: Forever<Executor> = Forever::new();
124fn main() -> ! { 124fn main() -> ! {
125 info!("Hello World!"); 125 info!("Hello World!");
126 126
127 let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); 127 let executor = EXECUTOR.put(Executor::new());
128 unwrap!(executor.spawn(run())); 128 executor.run(|spawner| {
129 129 unwrap!(spawner.spawn(run()));
130 loop { 130 });
131 executor.run();
132 cortex_m::asm::wfe();
133 }
134} 131}
diff --git a/embassy-nrf-examples/src/bin/rtc_async.rs b/embassy-nrf-examples/src/bin/rtc_async.rs
index dcdeb7049..5260c69a8 100644
--- a/embassy-nrf-examples/src/bin/rtc_async.rs
+++ b/embassy-nrf-examples/src/bin/rtc_async.rs
@@ -53,13 +53,10 @@ fn main() -> ! {
53 unsafe { embassy::time::set_clock(rtc) }; 53 unsafe { embassy::time::set_clock(rtc) };
54 54
55 let alarm = ALARM.put(rtc.alarm0()); 55 let alarm = ALARM.put(rtc.alarm0());
56 let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); 56 let executor = EXECUTOR.put(Executor::new());
57 57 executor.set_alarm(alarm);
58 unwrap!(executor.spawn(run1())); 58 executor.run(|spawner| {
59 unwrap!(executor.spawn(run2())); 59 unwrap!(spawner.spawn(run1()));
60 60 unwrap!(spawner.spawn(run2()));
61 loop { 61 });
62 executor.run();
63 cortex_m::asm::wfe();
64 }
65} 62}
diff --git a/embassy-nrf-examples/src/bin/rtc_raw.rs b/embassy-nrf-examples/src/bin/rtc_raw.rs
index 438585460..7c60bb565 100644
--- a/embassy-nrf-examples/src/bin/rtc_raw.rs
+++ b/embassy-nrf-examples/src/bin/rtc_raw.rs
@@ -38,7 +38,7 @@ fn main() -> ! {
38 38
39 rtc.start(); 39 rtc.start();
40 40
41 alarm.set_callback(|| info!("ALARM TRIGGERED")); 41 alarm.set_callback(|_| info!("ALARM TRIGGERED"), core::ptr::null_mut());
42 alarm.set(53719); 42 alarm.set(53719);
43 43
44 info!("initialized!"); 44 info!("initialized!");
diff --git a/embassy-nrf-examples/src/bin/uart.rs b/embassy-nrf-examples/src/bin/uart.rs
index cb38e8fcb..c5468d32b 100644
--- a/embassy-nrf-examples/src/bin/uart.rs
+++ b/embassy-nrf-examples/src/bin/uart.rs
@@ -18,7 +18,31 @@ use nrf52840_hal::clocks;
18use nrf52840_hal::gpio; 18use nrf52840_hal::gpio;
19 19
20#[task] 20#[task]
21async fn run(mut uart: uarte::Uarte<pac::UARTE0>) { 21async fn run(uart: pac::UARTE0, port: pac::P0) {
22 // Init UART
23 let port0 = gpio::p0::Parts::new(port);
24
25 let pins = uarte::Pins {
26 rxd: port0.p0_08.into_floating_input().degrade(),
27 txd: port0
28 .p0_06
29 .into_push_pull_output(gpio::Level::Low)
30 .degrade(),
31 cts: None,
32 rts: None,
33 };
34
35 // NOTE(unsafe): Safe becasue we do not use `mem::forget` anywhere.
36 let mut uart = unsafe {
37 uarte::Uarte::new(
38 uart,
39 interrupt::take!(UARTE0_UART0),
40 pins,
41 uarte::Parity::EXCLUDED,
42 uarte::Baudrate::BAUD115200,
43 )
44 };
45
22 info!("uarte initialized!"); 46 info!("uarte initialized!");
23 47
24 // Message must be in SRAM 48 // Message must be in SRAM
@@ -81,36 +105,12 @@ fn main() -> ! {
81 unsafe { embassy::time::set_clock(rtc) }; 105 unsafe { embassy::time::set_clock(rtc) };
82 106
83 let alarm = ALARM.put(rtc.alarm0()); 107 let alarm = ALARM.put(rtc.alarm0());
84 let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); 108 let executor = EXECUTOR.put(Executor::new());
85 109 executor.set_alarm(alarm);
86 // Init UART 110
87 let port0 = gpio::p0::Parts::new(p.P0); 111 let uarte0 = p.UARTE0;
88 112 let p0 = p.P0;
89 let pins = uarte::Pins { 113 executor.run(|spawner| {
90 rxd: port0.p0_08.into_floating_input().degrade(), 114 unwrap!(spawner.spawn(run(uarte0, p0)));
91 txd: port0 115 });
92 .p0_06
93 .into_push_pull_output(gpio::Level::Low)
94 .degrade(),
95 cts: None,
96 rts: None,
97 };
98
99 // NOTE(unsafe): Safe becasue we do not use `mem::forget` anywhere.
100 let uart = unsafe {
101 uarte::Uarte::new(
102 p.UARTE0,
103 interrupt::take!(UARTE0_UART0),
104 pins,
105 uarte::Parity::EXCLUDED,
106 uarte::Baudrate::BAUD115200,
107 )
108 };
109
110 unwrap!(executor.spawn(run(uart)));
111
112 loop {
113 executor.run();
114 cortex_m::asm::wfe();
115 }
116} 116}
diff --git a/embassy-nrf/src/rtc.rs b/embassy-nrf/src/rtc.rs
index 015583943..dde0fd4ca 100644
--- a/embassy-nrf/src/rtc.rs
+++ b/embassy-nrf/src/rtc.rs
@@ -40,7 +40,7 @@ mod test {
40 40
41struct AlarmState { 41struct AlarmState {
42 timestamp: Cell<u64>, 42 timestamp: Cell<u64>,
43 callback: Cell<Option<fn()>>, 43 callback: Cell<Option<(fn(*mut ()), *mut ())>>,
44} 44}
45 45
46impl AlarmState { 46impl AlarmState {
@@ -159,13 +159,13 @@ impl<T: Instance> RTC<T> {
159 alarm.timestamp.set(u64::MAX); 159 alarm.timestamp.set(u64::MAX);
160 160
161 // Call after clearing alarm, so the callback can set another alarm. 161 // Call after clearing alarm, so the callback can set another alarm.
162 alarm.callback.get().map(|f| f()); 162 alarm.callback.get().map(|(f, ctx)| f(ctx));
163 } 163 }
164 164
165 fn set_alarm_callback(&self, n: usize, callback: fn()) { 165 fn set_alarm_callback(&self, n: usize, callback: fn(*mut ()), ctx: *mut ()) {
166 interrupt::free(|cs| { 166 interrupt::free(|cs| {
167 let alarm = &self.alarms.borrow(cs)[n]; 167 let alarm = &self.alarms.borrow(cs)[n];
168 alarm.callback.set(Some(callback)); 168 alarm.callback.set(Some((callback, ctx)));
169 }) 169 })
170 } 170 }
171 171
@@ -220,8 +220,8 @@ pub struct Alarm<T: Instance> {
220} 220}
221 221
222impl<T: Instance> embassy::time::Alarm for Alarm<T> { 222impl<T: Instance> embassy::time::Alarm for Alarm<T> {
223 fn set_callback(&self, callback: fn()) { 223 fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ()) {
224 self.rtc.set_alarm_callback(self.n, callback); 224 self.rtc.set_alarm_callback(self.n, callback, ctx);
225 } 225 }
226 226
227 fn set(&self, timestamp: u64) { 227 fn set(&self, timestamp: u64) {
diff --git a/embassy-stm32f4-examples/src/bin/exti.rs b/embassy-stm32f4-examples/src/bin/exti.rs
index 879d0fa2a..ec4490b16 100644
--- a/embassy-stm32f4-examples/src/bin/exti.rs
+++ b/embassy-stm32f4-examples/src/bin/exti.rs
@@ -49,11 +49,8 @@ fn main() -> ! {
49 let dp = stm32::Peripherals::take().unwrap(); 49 let dp = stm32::Peripherals::take().unwrap();
50 let cp = cortex_m::peripheral::Peripherals::take().unwrap(); 50 let cp = cortex_m::peripheral::Peripherals::take().unwrap();
51 51
52 let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); 52 let executor = EXECUTOR.put(Executor::new());
53 executor.spawn(run(dp, cp)).unwrap(); 53 executor.run(|spawner| {
54 54 unwrap!(spawner.spawn(run(dp, cp)));
55 loop { 55 });
56 executor.run();
57 //cortex_m::asm::wfe(); // wfe causes RTT to stop working on stm32
58 }
59} 56}
diff --git a/embassy-stm32f4-examples/src/bin/serial.rs b/embassy-stm32f4-examples/src/bin/serial.rs
index 93c32b3f4..7338d4fe0 100644
--- a/embassy-stm32f4-examples/src/bin/serial.rs
+++ b/embassy-stm32f4-examples/src/bin/serial.rs
@@ -59,11 +59,8 @@ fn main() -> ! {
59 let dp = stm32::Peripherals::take().unwrap(); 59 let dp = stm32::Peripherals::take().unwrap();
60 let cp = cortex_m::peripheral::Peripherals::take().unwrap(); 60 let cp = cortex_m::peripheral::Peripherals::take().unwrap();
61 61
62 let executor = EXECUTOR.put(Executor::new(cortex_m::asm::sev)); 62 let executor = EXECUTOR.put(Executor::new());
63 executor.spawn(run(dp, cp)).unwrap(); 63 executor.run(|spawner| {
64 64 unwrap!(spawner.spawn(run(dp, cp)));
65 loop { 65 });
66 executor.run();
67 //cortex_m::asm::wfe(); // wfe causes RTT to stop working on stm32
68 }
69} 66}
diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs
index 922b0fe0c..7c74fa583 100644
--- a/embassy/src/executor/mod.rs
+++ b/embassy/src/executor/mod.rs
@@ -2,129 +2,68 @@ pub use embassy_macros::task;
2 2
3use core::future::Future; 3use core::future::Future;
4use core::marker::PhantomData; 4use core::marker::PhantomData;
5use core::mem;
6use core::pin::Pin; 5use core::pin::Pin;
7use core::ptr;
8use core::ptr::NonNull; 6use core::ptr::NonNull;
9use core::sync::atomic::{AtomicU32, Ordering}; 7use core::sync::atomic::Ordering;
10use core::task::{Context, Poll, Waker}; 8use core::task::{Context, Poll};
11use core::{ 9use core::{mem, ptr};
12 cell::{Cell, UnsafeCell},
13 cmp::min,
14};
15 10
11pub mod raw;
16mod run_queue; 12mod run_queue;
17pub(crate) mod timer; 13pub(crate) mod timer;
18mod timer_queue; 14mod timer_queue;
19mod util; 15mod util;
20mod waker; 16mod waker;
21 17
22use self::run_queue::{RunQueue, RunQueueItem};
23use self::timer_queue::{TimerQueue, TimerQueueItem};
24use self::util::UninitCell; 18use self::util::UninitCell;
25use crate::{ 19use crate::fmt::{panic, *};
26 fmt::{panic, *}, 20use crate::interrupt::OwnedInterrupt;
27 time::{Alarm, Instant}, 21use crate::time::Alarm;
28};
29
30/// Task is spawned (has a future)
31pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
32/// Task is in the executor run queue
33pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
34/// Task is in the executor timer queue
35pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
36
37pub(crate) struct TaskHeader {
38 state: AtomicU32,
39 run_queue_item: RunQueueItem,
40 expires_at: Cell<Instant>,
41 timer_queue_item: TimerQueueItem,
42 executor: Cell<*const Executor>, // Valid if state != 0
43 poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_SPAWNED
44}
45
46impl TaskHeader {
47 const fn new() -> Self {
48 Self {
49 state: AtomicU32::new(0),
50 expires_at: Cell::new(Instant::from_ticks(0)),
51 run_queue_item: RunQueueItem::new(),
52 timer_queue_item: TimerQueueItem::new(),
53 executor: Cell::new(ptr::null()),
54 poll_fn: UninitCell::uninit(),
55 }
56 }
57
58 pub(crate) unsafe fn enqueue(&self) {
59 let mut current = self.state.load(Ordering::Acquire);
60 loop {
61 // If already scheduled, or if not started,
62 if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
63 return;
64 }
65 22
66 // Mark it as scheduled 23// repr(C) is needed to guarantee that the raw::Task is located at offset 0
67 let new = current | STATE_RUN_QUEUED; 24// This makes it safe to cast between raw::Task and Task pointers.
68
69 match self.state.compare_exchange_weak(
70 current,
71 new,
72 Ordering::AcqRel,
73 Ordering::Acquire,
74 ) {
75 Ok(_) => break,
76 Err(next_current) => current = next_current,
77 }
78 }
79
80 // We have just marked the task as scheduled, so enqueue it.
81 let executor = &*self.executor.get();
82 executor.enqueue(self as *const TaskHeader as *mut TaskHeader);
83 }
84}
85
86// repr(C) is needed to guarantee that header is located at offset 0
87// This makes it safe to cast between Header and Task pointers.
88#[repr(C)] 25#[repr(C)]
89pub struct Task<F: Future + 'static> { 26pub struct Task<F: Future + 'static> {
90 header: TaskHeader, 27 raw: raw::Task,
91 future: UninitCell<F>, // Valid if STATE_SPAWNED 28 future: UninitCell<F>, // Valid if STATE_SPAWNED
92} 29}
93 30
94impl<F: Future + 'static> Task<F> { 31impl<F: Future + 'static> Task<F> {
95 pub const fn new() -> Self { 32 pub const fn new() -> Self {
96 Self { 33 Self {
97 header: TaskHeader::new(), 34 raw: raw::Task::new(),
98 future: UninitCell::uninit(), 35 future: UninitCell::uninit(),
99 } 36 }
100 } 37 }
101 38
102 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { 39 pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
103 for task in pool { 40 for task in pool {
104 let state = STATE_SPAWNED | STATE_RUN_QUEUED; 41 let state = raw::STATE_SPAWNED | raw::STATE_RUN_QUEUED;
105 if task 42 if task
106 .header 43 .raw
107 .state 44 .state
108 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire) 45 .compare_exchange(0, state, Ordering::AcqRel, Ordering::Acquire)
109 .is_ok() 46 .is_ok()
110 { 47 {
111 // Initialize the task 48 // Initialize the task
112 task.header.poll_fn.write(Self::poll); 49 task.raw.poll_fn.write(Self::poll);
113 task.future.write(future()); 50 task.future.write(future());
114 51
115 return SpawnToken { 52 return SpawnToken {
116 header: Some(NonNull::new_unchecked( 53 raw_task: Some(NonNull::new_unchecked(&task.raw as *const raw::Task as _)),
117 &task.header as *const TaskHeader as _, 54 phantom: PhantomData,
118 )),
119 }; 55 };
120 } 56 }
121 } 57 }
122 58
123 return SpawnToken { header: None }; 59 return SpawnToken {
60 raw_task: None,
61 phantom: PhantomData,
62 };
124 } 63 }
125 64
126 unsafe fn poll(p: *mut TaskHeader) { 65 unsafe fn poll(p: NonNull<raw::Task>) {
127 let this = &*(p as *const Task<F>); 66 let this = &*(p.as_ptr() as *const Task<F>);
128 67
129 let future = Pin::new_unchecked(this.future.as_mut()); 68 let future = Pin::new_unchecked(this.future.as_mut());
130 let waker = waker::from_task(p); 69 let waker = waker::from_task(p);
@@ -132,9 +71,9 @@ impl<F: Future + 'static> Task<F> {
132 match future.poll(&mut cx) { 71 match future.poll(&mut cx) {
133 Poll::Ready(_) => { 72 Poll::Ready(_) => {
134 this.future.drop_in_place(); 73 this.future.drop_in_place();
135 this.header 74 this.raw
136 .state 75 .state
137 .fetch_and(!STATE_SPAWNED, Ordering::AcqRel); 76 .fetch_and(!raw::STATE_SPAWNED, Ordering::AcqRel);
138 } 77 }
139 Poll::Pending => {} 78 Poll::Pending => {}
140 } 79 }
@@ -144,11 +83,12 @@ impl<F: Future + 'static> Task<F> {
144unsafe impl<F: Future + 'static> Sync for Task<F> {} 83unsafe impl<F: Future + 'static> Sync for Task<F> {}
145 84
146#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] 85#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
147pub struct SpawnToken { 86pub struct SpawnToken<F> {
148 header: Option<NonNull<TaskHeader>>, 87 raw_task: Option<NonNull<raw::Task>>,
88 phantom: PhantomData<*mut F>,
149} 89}
150 90
151impl Drop for SpawnToken { 91impl<F> Drop for SpawnToken<F> {
152 fn drop(&mut self) { 92 fn drop(&mut self) {
153 // TODO deallocate the task instead. 93 // TODO deallocate the task instead.
154 panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()") 94 panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()")
@@ -161,116 +101,167 @@ pub enum SpawnError {
161 Busy, 101 Busy,
162} 102}
163 103
164pub struct Executor { 104/// Handle to spawn tasks into an executor.
165 alarm: Option<&'static dyn Alarm>, 105///
166 run_queue: RunQueue, 106/// This Spawner can spawn any task (Send and non-Send ones), but it can
167 timer_queue: TimerQueue, 107/// only be used in the executor thread (it is not Send itself).
168 signal_fn: fn(), 108///
109/// If you want to spawn tasks from another thread, use [SendSpawner].
110pub struct Spawner {
111 executor: &'static raw::Executor,
169 not_send: PhantomData<*mut ()>, 112 not_send: PhantomData<*mut ()>,
170} 113}
171 114
172impl Executor { 115impl Spawner {
173 pub const fn new(signal_fn: fn()) -> Self { 116 fn new(executor: &'static raw::Executor) -> Self {
174 Self { 117 Self {
175 alarm: None, 118 executor,
176 run_queue: RunQueue::new(),
177 timer_queue: TimerQueue::new(),
178 signal_fn: signal_fn,
179 not_send: PhantomData, 119 not_send: PhantomData,
180 } 120 }
181 } 121 }
182 pub const fn new_with_alarm(alarm: &'static dyn Alarm, signal_fn: fn()) -> Self { 122
183 Self { 123 pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
184 alarm: Some(alarm), 124 let task = token.raw_task;
185 run_queue: RunQueue::new(), 125 mem::forget(token);
186 timer_queue: TimerQueue::new(), 126
187 signal_fn: signal_fn, 127 match task {
128 Some(task) => {
129 unsafe { self.executor.spawn(task) };
130 Ok(())
131 }
132 None => Err(SpawnError::Busy),
133 }
134 }
135
136 /// Convert this Spawner to a SendSpawner. This allows you to send the
137 /// spawner to other threads, but the spawner loses the ability to spawn
138 /// non-Send tasks.
139 pub fn make_send(&self) -> SendSpawner {
140 SendSpawner {
141 executor: self.executor,
188 not_send: PhantomData, 142 not_send: PhantomData,
189 } 143 }
190 } 144 }
145}
146
147/// Handle to spawn tasks into an executor from any thread.
148///
149/// This Spawner can be used from any thread (it implements Send and Sync, so after any task (Send and non-Send ones), but it can
150/// only be used in the executor thread (it is not Send itself).
151///
152/// If you want to spawn tasks from another thread, use [SendSpawner].
153pub struct SendSpawner {
154 executor: &'static raw::Executor,
155 not_send: PhantomData<*mut ()>,
156}
191 157
192 unsafe fn enqueue(&self, item: *mut TaskHeader) { 158unsafe impl Send for SendSpawner {}
193 if self.run_queue.enqueue(item) { 159unsafe impl Sync for SendSpawner {}
194 (self.signal_fn)() 160
161/// Handle to spawn tasks to an executor.
162///
163/// This Spawner can spawn any task (Send and non-Send ones), but it can
164/// only be used in the executor thread (it is not Send itself).
165///
166/// If you want to spawn tasks from another thread, use [SendSpawner].
167impl SendSpawner {
168 fn new(executor: &'static raw::Executor) -> Self {
169 Self {
170 executor,
171 not_send: PhantomData,
195 } 172 }
196 } 173 }
197 174
198 /// Spawn a future on this executor. 175 pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
199 pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> { 176 let header = token.raw_task;
200 let header = token.header;
201 mem::forget(token); 177 mem::forget(token);
202 178
203 match header { 179 match header {
204 Some(header) => unsafe { 180 Some(header) => {
205 let header = header.as_ref(); 181 unsafe { self.executor.spawn(header) };
206 header.executor.set(self);
207 self.enqueue(header as *const _ as _);
208 Ok(()) 182 Ok(())
209 }, 183 }
210 None => Err(SpawnError::Busy), 184 None => Err(SpawnError::Busy),
211 } 185 }
212 } 186 }
187}
213 188
214 /// Runs the executor until the queue is empty. 189pub struct Executor {
215 pub fn run(&self) { 190 inner: raw::Executor,
216 unsafe { 191 not_send: PhantomData<*mut ()>,
217 if self.alarm.is_some() { 192}
218 self.timer_queue.dequeue_expired(Instant::now(), |p| {
219 let header = &*p;
220 header.enqueue();
221 });
222 }
223 193
224 self.run_queue.dequeue_all(|p| { 194impl Executor {
225 let header = &*p; 195 pub const fn new() -> Self {
226 header.expires_at.set(Instant::MAX); 196 Self {
227 197 inner: raw::Executor::new(|_| cortex_m::asm::sev(), ptr::null_mut()),
228 let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); 198 not_send: PhantomData,
229 if state & STATE_SPAWNED == 0 { 199 }
230 // If task is not running, ignore it. This can happen in the following scenario: 200 }
231 // - Task gets dequeued, poll starts 201
232 // - While task is being polled, it gets woken. It gets placed in the queue. 202 pub fn set_alarm(&mut self, alarm: &'static dyn Alarm) {
233 // - Task poll finishes, returning done=true 203 self.inner.set_alarm(alarm);
234 // - RUNNING bit is cleared, but the task is already in the queue. 204 }
235 return; 205
236 } 206 /// Runs the executor.
237 207 ///
238 // Run the task 208 /// This function never returns.
239 header.poll_fn.read()(p as _); 209 pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
240 210 init(Spawner::new(&self.inner));
241 // Enqueue or update into timer_queue 211
242 self.timer_queue.update(p); 212 loop {
243 }); 213 unsafe { self.inner.run_queued() };
244 214 cortex_m::asm::wfe();
245 // If this is in the past, set_alarm will immediately trigger the alarm,
246 // which will make the wfe immediately return so we do another loop iteration.
247 if let Some(alarm) = self.alarm {
248 let next_expiration = self.timer_queue.next_expiration();
249 alarm.set_callback(self.signal_fn);
250 alarm.set(next_expiration.as_ticks());
251 }
252 } 215 }
253 } 216 }
254} 217}
255 218
256pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) { 219fn pend_by_number(n: u8) {
257 let p = waker::task_from_waker(waker); 220 struct N(u8);
258 let header = &*p; 221 unsafe impl cortex_m::interrupt::Nr for N {
259 let expires_at = header.expires_at.get(); 222 fn nr(&self) -> u8 {
260 header.expires_at.set(min(expires_at, at)); 223 self.0
224 }
225 }
226 cortex_m::peripheral::NVIC::pend(N(n))
261} 227}
262 228
263pub mod raw { 229pub struct IrqExecutor<I: OwnedInterrupt> {
264 use super::waker; 230 irq: I,
265 use core::ptr::NonNull; 231 inner: raw::Executor,
266 use core::task::Waker; 232 not_send: PhantomData<*mut ()>,
233}
267 234
268 pub fn task_from_waker(waker: &Waker) -> NonNull<()> { 235impl<I: OwnedInterrupt> IrqExecutor<I> {
269 unsafe { NonNull::new_unchecked(waker::task_from_waker(waker) as *mut ()) } 236 pub fn new(irq: I) -> Self {
237 let ctx = irq.number() as *mut ();
238 Self {
239 irq,
240 inner: raw::Executor::new(|ctx| pend_by_number(ctx as u8), ctx),
241 not_send: PhantomData,
242 }
243 }
244
245 pub fn set_alarm(&mut self, alarm: &'static dyn Alarm) {
246 self.inner.set_alarm(alarm);
270 } 247 }
271 248
272 pub unsafe fn wake_task(task: NonNull<()>) { 249 /// Start the executor.
273 let header = &*waker::task_from_ptr(task.as_ptr()); 250 ///
274 header.enqueue(); 251 /// `init` is called in the interrupt context, then the interrupt is
252 /// configured to run the executor.
253 pub fn start(&'static mut self, init: impl FnOnce(Spawner) + Send) {
254 self.irq.disable();
255
256 init(Spawner::new(&self.inner));
257
258 self.irq.set_handler(
259 |ctx| unsafe {
260 let executor = &*(ctx as *const raw::Executor);
261 executor.run_queued();
262 },
263 &self.inner as *const _ as _,
264 );
265 self.irq.enable();
275 } 266 }
276} 267}
diff --git a/embassy/src/executor/raw.rs b/embassy/src/executor/raw.rs
new file mode 100644
index 000000000..927b6a421
--- /dev/null
+++ b/embassy/src/executor/raw.rs
@@ -0,0 +1,154 @@
1use core::cell::Cell;
2use core::cmp::min;
3use core::ptr;
4use core::ptr::NonNull;
5use core::sync::atomic::{AtomicU32, Ordering};
6use core::task::Waker;
7
8use super::run_queue::{RunQueue, RunQueueItem};
9use super::timer_queue::{TimerQueue, TimerQueueItem};
10use super::util::UninitCell;
11use super::waker;
12use crate::time::{Alarm, Instant};
13
14/// Task is spawned (has a future)
15pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
16/// Task is in the executor run queue
17pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
18/// Task is in the executor timer queue
19pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
20
21pub struct Task {
22 pub(crate) state: AtomicU32,
23 pub(crate) run_queue_item: RunQueueItem,
24 pub(crate) expires_at: Cell<Instant>,
25 pub(crate) timer_queue_item: TimerQueueItem,
26 pub(crate) executor: Cell<*const Executor>, // Valid if state != 0
27 pub(crate) poll_fn: UninitCell<unsafe fn(NonNull<Task>)>, // Valid if STATE_SPAWNED
28}
29
30impl Task {
31 pub(crate) const fn new() -> Self {
32 Self {
33 state: AtomicU32::new(0),
34 expires_at: Cell::new(Instant::from_ticks(0)),
35 run_queue_item: RunQueueItem::new(),
36 timer_queue_item: TimerQueueItem::new(),
37 executor: Cell::new(ptr::null()),
38 poll_fn: UninitCell::uninit(),
39 }
40 }
41
42 pub(crate) unsafe fn enqueue(&self) {
43 let mut current = self.state.load(Ordering::Acquire);
44 loop {
45 // If already scheduled, or if not started,
46 if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) {
47 return;
48 }
49
50 // Mark it as scheduled
51 let new = current | STATE_RUN_QUEUED;
52
53 match self.state.compare_exchange_weak(
54 current,
55 new,
56 Ordering::AcqRel,
57 Ordering::Acquire,
58 ) {
59 Ok(_) => break,
60 Err(next_current) => current = next_current,
61 }
62 }
63
64 // We have just marked the task as scheduled, so enqueue it.
65 let executor = &*self.executor.get();
66 executor.enqueue(self as *const Task as *mut Task);
67 }
68}
69
70pub(crate) struct Executor {
71 run_queue: RunQueue,
72 timer_queue: TimerQueue,
73 signal_fn: fn(*mut ()),
74 signal_ctx: *mut (),
75 alarm: Option<&'static dyn Alarm>,
76}
77
78impl Executor {
79 pub(crate) const fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
80 Self {
81 run_queue: RunQueue::new(),
82 timer_queue: TimerQueue::new(),
83 signal_fn,
84 signal_ctx,
85 alarm: None,
86 }
87 }
88
89 pub(crate) fn set_alarm(&mut self, alarm: &'static dyn Alarm) {
90 self.alarm = Some(alarm);
91 }
92
93 unsafe fn enqueue(&self, item: *mut Task) {
94 if self.run_queue.enqueue(item) {
95 (self.signal_fn)(self.signal_ctx)
96 }
97 }
98
99 pub(crate) unsafe fn spawn(&'static self, task: NonNull<Task>) {
100 let task = task.as_ref();
101 task.executor.set(self);
102 self.enqueue(task as *const _ as _);
103 }
104
105 pub(crate) unsafe fn run_queued(&self) {
106 if self.alarm.is_some() {
107 self.timer_queue.dequeue_expired(Instant::now(), |p| {
108 p.as_ref().enqueue();
109 });
110 }
111
112 self.run_queue.dequeue_all(|p| {
113 let task = p.as_ref();
114 task.expires_at.set(Instant::MAX);
115
116 let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
117 if state & STATE_SPAWNED == 0 {
118 // If task is not running, ignore it. This can happen in the following scenario:
119 // - Task gets dequeued, poll starts
120 // - While task is being polled, it gets woken. It gets placed in the queue.
121 // - Task poll finishes, returning done=true
122 // - RUNNING bit is cleared, but the task is already in the queue.
123 return;
124 }
125
126 // Run the task
127 task.poll_fn.read()(p as _);
128
129 // Enqueue or update into timer_queue
130 self.timer_queue.update(p);
131 });
132
133 // If this is in the past, set_alarm will immediately trigger the alarm,
134 // which will make the wfe immediately return so we do another loop iteration.
135 if let Some(alarm) = self.alarm {
136 let next_expiration = self.timer_queue.next_expiration();
137 alarm.set_callback(self.signal_fn, self.signal_ctx);
138 alarm.set(next_expiration.as_ticks());
139 }
140 }
141}
142
143pub use super::waker::task_from_waker;
144
145pub unsafe fn wake_task(task: NonNull<Task>) {
146 task.as_ref().enqueue();
147}
148
149pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) {
150 let task = waker::task_from_waker(waker);
151 let task = task.as_ref();
152 let expires_at = task.expires_at.get();
153 task.expires_at.set(min(expires_at, at));
154}
diff --git a/embassy/src/executor/run_queue.rs b/embassy/src/executor/run_queue.rs
index 1cdecee33..397d71225 100644
--- a/embassy/src/executor/run_queue.rs
+++ b/embassy/src/executor/run_queue.rs
@@ -1,10 +1,11 @@
1use core::ptr; 1use core::ptr;
2use core::ptr::NonNull;
2use core::sync::atomic::{AtomicPtr, Ordering}; 3use core::sync::atomic::{AtomicPtr, Ordering};
3 4
4use super::TaskHeader; 5use super::raw::Task;
5 6
6pub(crate) struct RunQueueItem { 7pub(crate) struct RunQueueItem {
7 next: AtomicPtr<TaskHeader>, 8 next: AtomicPtr<Task>,
8} 9}
9 10
10impl RunQueueItem { 11impl RunQueueItem {
@@ -27,7 +28,7 @@ impl RunQueueItem {
27/// current batch is completely processed, so even if a task enqueues itself instantly (for example 28/// current batch is completely processed, so even if a task enqueues itself instantly (for example
28/// by waking its own waker) can't prevent other tasks from running. 29/// by waking its own waker) can't prevent other tasks from running.
29pub(crate) struct RunQueue { 30pub(crate) struct RunQueue {
30 head: AtomicPtr<TaskHeader>, 31 head: AtomicPtr<Task>,
31} 32}
32 33
33impl RunQueue { 34impl RunQueue {
@@ -38,7 +39,7 @@ impl RunQueue {
38 } 39 }
39 40
40 /// Enqueues an item. Returns true if the queue was empty. 41 /// Enqueues an item. Returns true if the queue was empty.
41 pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool { 42 pub(crate) unsafe fn enqueue(&self, item: *mut Task) -> bool {
42 let mut prev = self.head.load(Ordering::Acquire); 43 let mut prev = self.head.load(Ordering::Acquire);
43 loop { 44 loop {
44 (*item).run_queue_item.next.store(prev, Ordering::Relaxed); 45 (*item).run_queue_item.next.store(prev, Ordering::Relaxed);
@@ -54,7 +55,7 @@ impl RunQueue {
54 prev.is_null() 55 prev.is_null()
55 } 56 }
56 57
57 pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) { 58 pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(NonNull<Task>)) {
58 let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel); 59 let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
59 60
60 while !task.is_null() { 61 while !task.is_null() {
@@ -62,7 +63,7 @@ impl RunQueue {
62 // Therefore, first read the next pointer, and only then process the task. 63 // Therefore, first read the next pointer, and only then process the task.
63 let next = (*task).run_queue_item.next.load(Ordering::Relaxed); 64 let next = (*task).run_queue_item.next.load(Ordering::Relaxed);
64 65
65 on_task(task); 66 on_task(NonNull::new_unchecked(task));
66 67
67 task = next 68 task = next
68 } 69 }
diff --git a/embassy/src/executor/timer.rs b/embassy/src/executor/timer.rs
index 56236a058..9bd98925c 100644
--- a/embassy/src/executor/timer.rs
+++ b/embassy/src/executor/timer.rs
@@ -3,6 +3,7 @@ use core::pin::Pin;
3use core::task::{Context, Poll}; 3use core::task::{Context, Poll};
4use futures::Stream; 4use futures::Stream;
5 5
6use super::raw;
6use crate::time::{Duration, Instant}; 7use crate::time::{Duration, Instant};
7 8
8pub struct Timer { 9pub struct Timer {
@@ -34,7 +35,7 @@ impl Future for Timer {
34 if self.yielded_once && self.expires_at <= Instant::now() { 35 if self.yielded_once && self.expires_at <= Instant::now() {
35 Poll::Ready(()) 36 Poll::Ready(())
36 } else { 37 } else {
37 unsafe { super::register_timer(self.expires_at, cx.waker()) }; 38 unsafe { raw::register_timer(self.expires_at, cx.waker()) };
38 self.yielded_once = true; 39 self.yielded_once = true;
39 Poll::Pending 40 Poll::Pending
40 } 41 }
@@ -66,7 +67,7 @@ impl Stream for Ticker {
66 self.expires_at += dur; 67 self.expires_at += dur;
67 Poll::Ready(Some(())) 68 Poll::Ready(Some(()))
68 } else { 69 } else {
69 unsafe { super::register_timer(self.expires_at, cx.waker()) }; 70 unsafe { raw::register_timer(self.expires_at, cx.waker()) };
70 Poll::Pending 71 Poll::Pending
71 } 72 }
72 } 73 }
diff --git a/embassy/src/executor/timer_queue.rs b/embassy/src/executor/timer_queue.rs
index 428b6cf63..c722ae00b 100644
--- a/embassy/src/executor/timer_queue.rs
+++ b/embassy/src/executor/timer_queue.rs
@@ -1,13 +1,14 @@
1use core::cell::Cell; 1use core::cell::Cell;
2use core::cmp::min;
3use core::ptr;
4use core::ptr::NonNull;
2use core::sync::atomic::{AtomicPtr, Ordering}; 5use core::sync::atomic::{AtomicPtr, Ordering};
3use core::{cmp::min, ptr};
4 6
7use super::raw::{Task, STATE_TIMER_QUEUED};
5use crate::time::Instant; 8use crate::time::Instant;
6 9
7use super::{TaskHeader, STATE_TIMER_QUEUED};
8
9pub(crate) struct TimerQueueItem { 10pub(crate) struct TimerQueueItem {
10 next: Cell<*mut TaskHeader>, 11 next: Cell<*mut Task>,
11} 12}
12 13
13impl TimerQueueItem { 14impl TimerQueueItem {
@@ -19,7 +20,7 @@ impl TimerQueueItem {
19} 20}
20 21
21pub(crate) struct TimerQueue { 22pub(crate) struct TimerQueue {
22 head: Cell<*mut TaskHeader>, 23 head: Cell<*mut Task>,
23} 24}
24 25
25impl TimerQueue { 26impl TimerQueue {
@@ -29,15 +30,15 @@ impl TimerQueue {
29 } 30 }
30 } 31 }
31 32
32 pub(crate) unsafe fn update(&self, p: *mut TaskHeader) { 33 pub(crate) unsafe fn update(&self, p: NonNull<Task>) {
33 let header = &*p; 34 let task = p.as_ref();
34 if header.expires_at.get() != Instant::MAX { 35 if task.expires_at.get() != Instant::MAX {
35 let old_state = header.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); 36 let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
36 let is_new = old_state & STATE_TIMER_QUEUED == 0; 37 let is_new = old_state & STATE_TIMER_QUEUED == 0;
37 38
38 if is_new { 39 if is_new {
39 header.timer_queue_item.next.set(self.head.get()); 40 task.timer_queue_item.next.set(self.head.get());
40 self.head.set(p); 41 self.head.set(p.as_ptr());
41 } 42 }
42 } 43 }
43 } 44 }
@@ -45,18 +46,18 @@ impl TimerQueue {
45 pub(crate) unsafe fn next_expiration(&self) -> Instant { 46 pub(crate) unsafe fn next_expiration(&self) -> Instant {
46 let mut res = Instant::MAX; 47 let mut res = Instant::MAX;
47 self.retain(|p| { 48 self.retain(|p| {
48 let header = &*p; 49 let task = p.as_ref();
49 let expires = header.expires_at.get(); 50 let expires = task.expires_at.get();
50 res = min(res, expires); 51 res = min(res, expires);
51 expires != Instant::MAX 52 expires != Instant::MAX
52 }); 53 });
53 res 54 res
54 } 55 }
55 56
56 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(*mut TaskHeader)) { 57 pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(NonNull<Task>)) {
57 self.retain(|p| { 58 self.retain(|p| {
58 let header = &*p; 59 let task = p.as_ref();
59 if header.expires_at.get() <= now { 60 if task.expires_at.get() <= now {
60 on_task(p); 61 on_task(p);
61 false 62 false
62 } else { 63 } else {
@@ -65,20 +66,18 @@ impl TimerQueue {
65 }); 66 });
66 } 67 }
67 68
68 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(*mut TaskHeader) -> bool) { 69 pub(crate) unsafe fn retain(&self, mut f: impl FnMut(NonNull<Task>) -> bool) {
69 let mut prev = &self.head; 70 let mut prev = &self.head;
70 while !prev.get().is_null() { 71 while !prev.get().is_null() {
71 let p = prev.get(); 72 let p = NonNull::new_unchecked(prev.get());
72 let header = &*p; 73 let task = &*p.as_ptr();
73 if f(p) { 74 if f(p) {
74 // Skip to next 75 // Skip to next
75 prev = &header.timer_queue_item.next; 76 prev = &task.timer_queue_item.next;
76 } else { 77 } else {
77 // Remove it 78 // Remove it
78 prev.set(header.timer_queue_item.next.get()); 79 prev.set(task.timer_queue_item.next.get());
79 header 80 task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
80 .state
81 .fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
82 } 81 }
83 } 82 }
84 } 83 }
diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs
index 5a604d865..bc02c51df 100644
--- a/embassy/src/executor/waker.rs
+++ b/embassy/src/executor/waker.rs
@@ -1,7 +1,8 @@
1use core::mem; 1use core::mem;
2use core::ptr::NonNull;
2use core::task::{RawWaker, RawWakerVTable, Waker}; 3use core::task::{RawWaker, RawWakerVTable, Waker};
3 4
4use super::TaskHeader; 5use super::raw::Task;
5 6
6const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); 7const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
7 8
@@ -10,26 +11,21 @@ unsafe fn clone(p: *const ()) -> RawWaker {
10} 11}
11 12
12unsafe fn wake(p: *const ()) { 13unsafe fn wake(p: *const ()) {
13 let header = &*task_from_ptr(p); 14 (*(p as *mut Task)).enqueue()
14 header.enqueue();
15} 15}
16 16
17unsafe fn drop(_: *const ()) { 17unsafe fn drop(_: *const ()) {
18 // nop 18 // nop
19} 19}
20 20
21pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker { 21pub(crate) unsafe fn from_task(p: NonNull<Task>) -> Waker {
22 Waker::from_raw(RawWaker::new(p as _, &VTABLE)) 22 Waker::from_raw(RawWaker::new(p.as_ptr() as _, &VTABLE))
23} 23}
24 24
25pub(crate) unsafe fn task_from_ptr(p: *const ()) -> *mut TaskHeader { 25pub unsafe fn task_from_waker(waker: &Waker) -> NonNull<Task> {
26 p as *mut TaskHeader 26 let hack: &WakerHack = mem::transmute(waker);
27} 27 assert_eq!(hack.vtable, &VTABLE);
28 28 NonNull::new_unchecked(hack.data as *mut Task)
29pub(crate) unsafe fn task_from_waker(w: &Waker) -> *mut TaskHeader {
30 let w: &WakerHack = mem::transmute(w);
31 assert_eq!(w.vtable, &VTABLE);
32 task_from_ptr(w.data)
33} 29}
34 30
35struct WakerHack { 31struct WakerHack {
diff --git a/embassy/src/interrupt.rs b/embassy/src/interrupt.rs
index 7690bea0c..45676b31c 100644
--- a/embassy/src/interrupt.rs
+++ b/embassy/src/interrupt.rs
@@ -32,6 +32,7 @@ unsafe impl cortex_m::interrupt::Nr for NrWrap {
32pub unsafe trait OwnedInterrupt { 32pub unsafe trait OwnedInterrupt {
33 type Priority: From<u8> + Into<u8> + Copy; 33 type Priority: From<u8> + Into<u8> + Copy;
34 fn number(&self) -> u8; 34 fn number(&self) -> u8;
35 unsafe fn steal() -> Self;
35 36
36 /// Implementation detail, do not use outside embassy crates. 37 /// Implementation detail, do not use outside embassy crates.
37 #[doc(hidden)] 38 #[doc(hidden)]
diff --git a/embassy/src/lib.rs b/embassy/src/lib.rs
index 74c69b541..baa449dbb 100644
--- a/embassy/src/lib.rs
+++ b/embassy/src/lib.rs
@@ -2,7 +2,6 @@
2#![feature(generic_associated_types)] 2#![feature(generic_associated_types)]
3#![feature(const_fn)] 3#![feature(const_fn)]
4#![feature(const_fn_fn_ptr_basics)] 4#![feature(const_fn_fn_ptr_basics)]
5#![feature(const_in_array_repeat_expressions)]
6#![feature(const_option)] 5#![feature(const_option)]
7 6
8// This mod MUST go first, so that the others see its macros. 7// This mod MUST go first, so that the others see its macros.
diff --git a/embassy/src/time/traits.rs b/embassy/src/time/traits.rs
index 7faa27cdb..2c97b13af 100644
--- a/embassy/src/time/traits.rs
+++ b/embassy/src/time/traits.rs
@@ -16,7 +16,7 @@ impl<T: Clock + ?Sized> Clock for &T {
16pub trait Alarm { 16pub trait Alarm {
17 /// Sets the callback function to be called when the alarm triggers. 17 /// Sets the callback function to be called when the alarm triggers.
18 /// The callback may be called from any context (interrupt or thread mode). 18 /// The callback may be called from any context (interrupt or thread mode).
19 fn set_callback(&self, callback: fn()); 19 fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ());
20 20
21 /// Sets an alarm at the given timestamp. When the clock reaches that 21 /// Sets an alarm at the given timestamp. When the clock reaches that
22 /// timestamp, the provided callback funcion will be called. 22 /// timestamp, the provided callback funcion will be called.
@@ -32,8 +32,8 @@ pub trait Alarm {
32} 32}
33 33
34impl<T: Alarm + ?Sized> Alarm for &T { 34impl<T: Alarm + ?Sized> Alarm for &T {
35 fn set_callback(&self, callback: fn()) { 35 fn set_callback(&self, callback: fn(*mut ()), ctx: *mut ()) {
36 T::set_callback(self, callback); 36 T::set_callback(self, callback, ctx);
37 } 37 }
38 fn set(&self, timestamp: u64) { 38 fn set(&self, timestamp: u64) {
39 T::set(self, timestamp); 39 T::set(self, timestamp);
diff --git a/embassy/src/util/signal.rs b/embassy/src/util/signal.rs
index 2c9c52f1e..c8b7a61a6 100644
--- a/embassy/src/util/signal.rs
+++ b/embassy/src/util/signal.rs
@@ -110,7 +110,7 @@ impl<'a, I: OwnedInterrupt> InterruptFuture<'a, I> {
110 }; 110 };
111 111
112 if ctx as *const _ != ptr::null() { 112 if ctx as *const _ != ptr::null() {
113 executor::raw::wake_task(ptr::NonNull::new_unchecked(ctx)); 113 executor::raw::wake_task(ptr::NonNull::new_unchecked(ctx as _));
114 } 114 }
115 115
116 NVIC::mask(NrWrap(irq)); 116 NVIC::mask(NrWrap(irq));
@@ -124,10 +124,8 @@ impl<'a, I: OwnedInterrupt> Future for InterruptFuture<'a, I> {
124 124
125 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 125 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
126 let s = unsafe { self.get_unchecked_mut() }; 126 let s = unsafe { self.get_unchecked_mut() };
127 s.interrupt.set_handler( 127 let ctx = unsafe { executor::raw::task_from_waker(&cx.waker()).cast().as_ptr() };
128 Self::interrupt_handler, 128 s.interrupt.set_handler(Self::interrupt_handler, ctx);
129 executor::raw::task_from_waker(&cx.waker()).cast().as_ptr(),
130 );
131 if s.interrupt.is_enabled() { 129 if s.interrupt.is_enabled() {
132 Poll::Pending 130 Poll::Pending
133 } else { 131 } else {
diff --git a/test-build.sh b/test-build.sh
index 04e1a95e8..92945457b 100755
--- a/test-build.sh
+++ b/test-build.sh
@@ -3,7 +3,7 @@
3set -euxo pipefail 3set -euxo pipefail
4 4
5# embassy std 5# embassy std
6(cd embassy; cargo build --features log,std) 6#(cd embassy; cargo build --features log,std)
7 7
8# embassy embedded 8# embassy embedded
9(cd embassy; cargo build --target thumbv7em-none-eabi) 9(cd embassy; cargo build --target thumbv7em-none-eabi)