aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-stm32/src/usart/buffered.rs252
1 files changed, 171 insertions, 81 deletions
diff --git a/embassy-stm32/src/usart/buffered.rs b/embassy-stm32/src/usart/buffered.rs
index 46c49a997..2a711bc06 100644
--- a/embassy-stm32/src/usart/buffered.rs
+++ b/embassy-stm32/src/usart/buffered.rs
@@ -1,3 +1,4 @@
1use core::cell::RefCell;
1use core::future::{poll_fn, Future}; 2use core::future::{poll_fn, Future};
2use core::task::Poll; 3use core::task::Poll;
3 4
@@ -29,7 +30,15 @@ unsafe impl<'d, T: BasicInstance> Send for StateInner<'d, T> {}
29unsafe impl<'d, T: BasicInstance> Sync for StateInner<'d, T> {} 30unsafe impl<'d, T: BasicInstance> Sync for StateInner<'d, T> {}
30 31
31pub struct BufferedUart<'d, T: BasicInstance> { 32pub struct BufferedUart<'d, T: BasicInstance> {
32 inner: PeripheralMutex<'d, StateInner<'d, T>>, 33 inner: RefCell<PeripheralMutex<'d, StateInner<'d, T>>>,
34}
35
36pub struct BufferedUartTx<'u, 'd, T: BasicInstance> {
37 inner: &'u BufferedUart<'d, T>,
38}
39
40pub struct BufferedUartRx<'u, 'd, T: BasicInstance> {
41 inner: &'u BufferedUart<'d, T>,
33} 42}
34 43
35impl<'d, T: BasicInstance> Unpin for BufferedUart<'d, T> {} 44impl<'d, T: BasicInstance> Unpin for BufferedUart<'d, T> {}
@@ -53,14 +62,124 @@ impl<'d, T: BasicInstance> BufferedUart<'d, T> {
53 } 62 }
54 63
55 Self { 64 Self {
56 inner: PeripheralMutex::new(irq, &mut state.0, move || StateInner { 65 inner: RefCell::new(PeripheralMutex::new(irq, &mut state.0, move || StateInner {
57 phantom: PhantomData, 66 phantom: PhantomData,
58 tx: RingBuffer::new(tx_buffer), 67 tx: RingBuffer::new(tx_buffer),
59 tx_waker: WakerRegistration::new(), 68 tx_waker: WakerRegistration::new(),
60 69
61 rx: RingBuffer::new(rx_buffer), 70 rx: RingBuffer::new(rx_buffer),
62 rx_waker: WakerRegistration::new(), 71 rx_waker: WakerRegistration::new(),
63 }), 72 })),
73 }
74 }
75
76 pub fn split<'u>(&'u mut self) -> (BufferedUartRx<'u, 'd, T>, BufferedUartTx<'u, 'd, T>) {
77 (BufferedUartRx { inner: self }, BufferedUartTx { inner: self })
78 }
79
80 async fn inner_read<'a>(&'a self, buf: &'a mut [u8]) -> Result<usize, Error> {
81 poll_fn(move |cx| {
82 let mut do_pend = false;
83 let mut inner = self.inner.borrow_mut();
84 let res = inner.with(|state| {
85 compiler_fence(Ordering::SeqCst);
86
87 // We have data ready in buffer? Return it.
88 let data = state.rx.pop_buf();
89 if !data.is_empty() {
90 let len = data.len().min(buf.len());
91 buf[..len].copy_from_slice(&data[..len]);
92
93 if state.rx.is_full() {
94 do_pend = true;
95 }
96 state.rx.pop(len);
97
98 return Poll::Ready(Ok(len));
99 }
100
101 state.rx_waker.register(cx.waker());
102 Poll::Pending
103 });
104
105 if do_pend {
106 inner.pend();
107 }
108
109 res
110 })
111 .await
112 }
113
114 async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result<usize, Error> {
115 poll_fn(move |cx| {
116 let mut inner = self.inner.borrow_mut();
117 let (poll, empty) = inner.with(|state| {
118 let empty = state.tx.is_empty();
119 let tx_buf = state.tx.push_buf();
120 if tx_buf.is_empty() {
121 state.tx_waker.register(cx.waker());
122 return (Poll::Pending, empty);
123 }
124
125 let n = core::cmp::min(tx_buf.len(), buf.len());
126 tx_buf[..n].copy_from_slice(&buf[..n]);
127 state.tx.push(n);
128
129 (Poll::Ready(Ok(n)), empty)
130 });
131 if empty {
132 inner.pend();
133 }
134 poll
135 })
136 .await
137 }
138
139 async fn inner_flush<'a>(&'a self) -> Result<(), Error> {
140 poll_fn(move |cx| {
141 self.inner.borrow_mut().with(|state| {
142 if !state.tx.is_empty() {
143 state.tx_waker.register(cx.waker());
144 return Poll::Pending;
145 }
146
147 Poll::Ready(Ok(()))
148 })
149 })
150 .await
151 }
152
153 async fn inner_fill_buf<'a>(&'a self) -> Result<&'a [u8], Error> {
154 poll_fn(move |cx| {
155 self.inner.borrow_mut().with(|state| {
156 compiler_fence(Ordering::SeqCst);
157
158 // We have data ready in buffer? Return it.
159 let buf = state.rx.pop_buf();
160 if !buf.is_empty() {
161 let buf: &[u8] = buf;
162 // Safety: buffer lives as long as uart
163 let buf: &[u8] = unsafe { core::mem::transmute(buf) };
164 return Poll::Ready(Ok(buf));
165 }
166
167 state.rx_waker.register(cx.waker());
168 Poll::<Result<&[u8], Error>>::Pending
169 })
170 })
171 .await
172 }
173
174 fn inner_consume(&self, amt: usize) {
175 let mut inner = self.inner.borrow_mut();
176 let signal = inner.with(|state| {
177 let full = state.rx.is_full();
178 state.rx.pop(amt);
179 full
180 });
181 if signal {
182 inner.pend();
64 } 183 }
65 } 184 }
66} 185}
@@ -155,41 +274,31 @@ impl<'d, T: BasicInstance> embedded_io::Io for BufferedUart<'d, T> {
155 type Error = Error; 274 type Error = Error;
156} 275}
157 276
277impl<'u, 'd, T: BasicInstance> embedded_io::Io for BufferedUartRx<'u, 'd, T> {
278 type Error = Error;
279}
280
281impl<'u, 'd, T: BasicInstance> embedded_io::Io for BufferedUartTx<'u, 'd, T> {
282 type Error = Error;
283}
284
158impl<'d, T: BasicInstance> embedded_io::asynch::Read for BufferedUart<'d, T> { 285impl<'d, T: BasicInstance> embedded_io::asynch::Read for BufferedUart<'d, T> {
159 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 286 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
160 where 287 where
161 Self: 'a; 288 Self: 'a;
162 289
163 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { 290 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
164 poll_fn(move |cx| { 291 self.inner_read(buf)
165 let mut do_pend = false; 292 }
166 let res = self.inner.with(|state| { 293}
167 compiler_fence(Ordering::SeqCst);
168
169 // We have data ready in buffer? Return it.
170 let data = state.rx.pop_buf();
171 if !data.is_empty() {
172 let len = data.len().min(buf.len());
173 buf[..len].copy_from_slice(&data[..len]);
174
175 if state.rx.is_full() {
176 do_pend = true;
177 }
178 state.rx.pop(len);
179
180 return Poll::Ready(Ok(len));
181 }
182
183 state.rx_waker.register(cx.waker());
184 Poll::Pending
185 });
186 294
187 if do_pend { 295impl<'u, 'd, T: BasicInstance> embedded_io::asynch::Read for BufferedUartRx<'u, 'd, T> {
188 self.inner.pend(); 296 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
189 } 297 where
298 Self: 'a;
190 299
191 res 300 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
192 }) 301 self.inner.inner_read(buf)
193 } 302 }
194} 303}
195 304
@@ -199,34 +308,25 @@ impl<'d, T: BasicInstance> embedded_io::asynch::BufRead for BufferedUart<'d, T>
199 Self: 'a; 308 Self: 'a;
200 309
201 fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { 310 fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> {
202 poll_fn(move |cx| { 311 self.inner_fill_buf()
203 self.inner.with(|state| { 312 }
204 compiler_fence(Ordering::SeqCst);
205 313
206 // We have data ready in buffer? Return it. 314 fn consume(&mut self, amt: usize) {
207 let buf = state.rx.pop_buf(); 315 self.inner_consume(amt)
208 if !buf.is_empty() { 316 }
209 let buf: &[u8] = buf; 317}
210 // Safety: buffer lives as long as uart
211 let buf: &[u8] = unsafe { core::mem::transmute(buf) };
212 return Poll::Ready(Ok(buf));
213 }
214 318
215 state.rx_waker.register(cx.waker()); 319impl<'u, 'd, T: BasicInstance> embedded_io::asynch::BufRead for BufferedUartRx<'u, 'd, T> {
216 Poll::<Result<&[u8], Self::Error>>::Pending 320 type FillBufFuture<'a> = impl Future<Output = Result<&'a [u8], Self::Error>>
217 }) 321 where
218 }) 322 Self: 'a;
323
324 fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> {
325 self.inner.inner_fill_buf()
219 } 326 }
220 327
221 fn consume(&mut self, amt: usize) { 328 fn consume(&mut self, amt: usize) {
222 let signal = self.inner.with(|state| { 329 self.inner.inner_consume(amt)
223 let full = state.rx.is_full();
224 state.rx.pop(amt);
225 full
226 });
227 if signal {
228 self.inner.pend();
229 }
230 } 330 }
231} 331}
232 332
@@ -236,26 +336,25 @@ impl<'d, T: BasicInstance> embedded_io::asynch::Write for BufferedUart<'d, T> {
236 Self: 'a; 336 Self: 'a;
237 337
238 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { 338 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
239 poll_fn(move |cx| { 339 self.inner_write(buf)
240 let (poll, empty) = self.inner.with(|state| { 340 }
241 let empty = state.tx.is_empty();
242 let tx_buf = state.tx.push_buf();
243 if tx_buf.is_empty() {
244 state.tx_waker.register(cx.waker());
245 return (Poll::Pending, empty);
246 }
247 341
248 let n = core::cmp::min(tx_buf.len(), buf.len()); 342 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
249 tx_buf[..n].copy_from_slice(&buf[..n]); 343 where
250 state.tx.push(n); 344 Self: 'a;
251 345
252 (Poll::Ready(Ok(n)), empty) 346 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
253 }); 347 self.inner_flush()
254 if empty { 348 }
255 self.inner.pend(); 349}
256 } 350
257 poll 351impl<'u, 'd, T: BasicInstance> embedded_io::asynch::Write for BufferedUartTx<'u, 'd, T> {
258 }) 352 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
353 where
354 Self: 'a;
355
356 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
357 self.inner.inner_write(buf)
259 } 358 }
260 359
261 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 360 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
@@ -263,15 +362,6 @@ impl<'d, T: BasicInstance> embedded_io::asynch::Write for BufferedUart<'d, T> {
263 Self: 'a; 362 Self: 'a;
264 363
265 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { 364 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
266 poll_fn(move |cx| { 365 self.inner.inner_flush()
267 self.inner.with(|state| {
268 if !state.tx.is_empty() {
269 state.tx_waker.register(cx.waker());
270 return Poll::Pending;
271 }
272
273 Poll::Ready(Ok(()))
274 })
275 })
276 } 366 }
277} 367}