aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2021-12-10 04:26:11 +0000
committerGitHub <[email protected]>2021-12-10 04:26:11 +0000
commitdce3f8c47df611b51c47559ba8f4c301eb86af95 (patch)
treec88ff4746929fa3918436a5702f5145cac2ad860
parent0338fd2237d0b3d0703ff6dc4eb6708d2e323068 (diff)
parent45ef9444579c8d7af72fc2d42093f26ef6f6ac3c (diff)
Merge #534
534: Provides AsyncWrite with flush r=huntc a=huntc As per Tokio and others, this commit provides a `poll_flush` method on `AsyncWrite` so that a best-effort attempt at wakening once all bytes are flushed can be made. Co-authored-by: huntc <[email protected]>
-rw-r--r--embassy-hal-common/src/ring_buffer.rs7
-rw-r--r--embassy-hal-common/src/usb/usb_serial.rs15
-rw-r--r--embassy-net/src/tcp_socket.rs4
-rw-r--r--embassy-nrf/src/buffered_uarte.rs14
-rw-r--r--embassy-stm32/src/usart/mod.rs14
-rw-r--r--embassy/src/io/std.rs6
-rw-r--r--embassy/src/io/traits.rs17
-rw-r--r--embassy/src/io/util/flush.rs32
-rw-r--r--embassy/src/io/util/mod.rs12
-rw-r--r--embassy/src/io/util/split.rs3
-rw-r--r--examples/nrf/src/bin/buffered_uart.rs3
11 files changed, 127 insertions, 0 deletions
diff --git a/embassy-hal-common/src/ring_buffer.rs b/embassy-hal-common/src/ring_buffer.rs
index 6829f62f5..fcad68bb1 100644
--- a/embassy-hal-common/src/ring_buffer.rs
+++ b/embassy-hal-common/src/ring_buffer.rs
@@ -125,5 +125,12 @@ mod tests {
125 let buf = rb.pop_buf(); 125 let buf = rb.pop_buf();
126 assert_eq!(1, buf.len()); 126 assert_eq!(1, buf.len());
127 assert_eq!(4, buf[0]); 127 assert_eq!(4, buf[0]);
128 rb.pop(1);
129
130 let buf = rb.pop_buf();
131 assert_eq!(0, buf.len());
132
133 let buf = rb.push_buf();
134 assert_eq!(4, buf.len());
128 } 135 }
129} 136}
diff --git a/embassy-hal-common/src/usb/usb_serial.rs b/embassy-hal-common/src/usb/usb_serial.rs
index ca43a4d73..2592d05a6 100644
--- a/embassy-hal-common/src/usb/usb_serial.rs
+++ b/embassy-hal-common/src/usb/usb_serial.rs
@@ -106,6 +106,17 @@ where
106 serial.poll_write(cx, buf) 106 serial.poll_write(cx, buf)
107 }) 107 })
108 } 108 }
109
110 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
111 let this = self.get_mut();
112 let mut mutex = this.inner.borrow_mut();
113 mutex.with(|state| {
114 let serial = state.classes.get_serial();
115 let serial = Pin::new(serial);
116
117 serial.poll_flush(cx)
118 })
119 }
109} 120}
110 121
111pub struct UsbSerial<'bus, 'a, B: UsbBus> { 122pub struct UsbSerial<'bus, 'a, B: UsbBus> {
@@ -167,6 +178,10 @@ impl<'bus, 'a, B: UsbBus> AsyncWrite for UsbSerial<'bus, 'a, B> {
167 this.flush_write(); 178 this.flush_write();
168 Poll::Ready(Ok(count)) 179 Poll::Ready(Ok(count))
169 } 180 }
181
182 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
183 Poll::Ready(Ok(()))
184 }
170} 185}
171 186
172/// Keeps track of the type of the last written packet. 187/// Keeps track of the type of the last written packet.
diff --git a/embassy-net/src/tcp_socket.rs b/embassy-net/src/tcp_socket.rs
index 39bdd0c18..4836f8075 100644
--- a/embassy-net/src/tcp_socket.rs
+++ b/embassy-net/src/tcp_socket.rs
@@ -200,4 +200,8 @@ impl<'a> AsyncWrite for TcpSocket<'a> {
200 Err(e) => Poll::Ready(Err(to_ioerr(e))), 200 Err(e) => Poll::Ready(Err(to_ioerr(e))),
201 }) 201 })
202 } 202 }
203
204 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
205 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
206 }
203} 207}
diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs
index 9b0451c12..e3ca74384 100644
--- a/embassy-nrf/src/buffered_uarte.rs
+++ b/embassy-nrf/src/buffered_uarte.rs
@@ -266,6 +266,20 @@ impl<'d, U: UarteInstance, T: TimerInstance> AsyncWrite for BufferedUarte<'d, U,
266 266
267 poll 267 poll
268 } 268 }
269
270 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<embassy::io::Result<()>> {
271 self.inner.with(|state| {
272 trace!("poll_flush");
273
274 if !state.tx.is_empty() {
275 trace!("poll_flush: pending");
276 state.tx_waker.register(cx.waker());
277 return Poll::Pending;
278 }
279
280 Poll::Ready(Ok(()))
281 })
282 }
269} 283}
270 284
271impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> { 285impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> {
diff --git a/embassy-stm32/src/usart/mod.rs b/embassy-stm32/src/usart/mod.rs
index a835093c5..a87b7c020 100644
--- a/embassy-stm32/src/usart/mod.rs
+++ b/embassy-stm32/src/usart/mod.rs
@@ -449,6 +449,20 @@ mod buffered {
449 } 449 }
450 poll 450 poll
451 } 451 }
452
453 fn poll_flush(
454 mut self: Pin<&mut Self>,
455 cx: &mut Context<'_>,
456 ) -> Poll<Result<(), embassy::io::Error>> {
457 self.inner.with(|state| {
458 if !state.tx.is_empty() {
459 state.tx_waker.register(cx.waker());
460 return Poll::Pending;
461 }
462
463 Poll::Ready(Ok(()))
464 })
465 }
452 } 466 }
453} 467}
454 468
diff --git a/embassy/src/io/std.rs b/embassy/src/io/std.rs
index ddec8d56d..580d52891 100644
--- a/embassy/src/io/std.rs
+++ b/embassy/src/io/std.rs
@@ -32,4 +32,10 @@ impl<T: std_io::AsyncWrite> AsyncWrite for FromStdIo<T> {
32 .poll_write(cx, buf) 32 .poll_write(cx, buf)
33 .map_err(|e| e.into()) 33 .map_err(|e| e.into())
34 } 34 }
35 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
36 let Self(inner) = unsafe { self.get_unchecked_mut() };
37 unsafe { Pin::new_unchecked(inner) }
38 .poll_flush(cx)
39 .map_err(|e| e.into())
40 }
35} 41}
diff --git a/embassy/src/io/traits.rs b/embassy/src/io/traits.rs
index 8e4a981da..06500a687 100644
--- a/embassy/src/io/traits.rs
+++ b/embassy/src/io/traits.rs
@@ -89,6 +89,15 @@ pub trait AsyncWrite {
89 /// `poll_write` must try to make progress by flushing the underlying object if 89 /// `poll_write` must try to make progress by flushing the underlying object if
90 /// that is the only way the underlying object can become writable again. 90 /// that is the only way the underlying object can become writable again.
91 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>; 91 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>;
92
93 /// Attempt to flush the object, ensuring that any buffered data reach their destination.
94 ///
95 /// On success, returns Poll::Ready(Ok(())).
96 ///
97 /// If flushing cannot immediately complete, this method returns [Poll::Pending] and arranges for the
98 /// current task (via cx.waker()) to receive a notification when the object can make progress
99 /// towards flushing.
100 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
92} 101}
93 102
94macro_rules! defer_async_read { 103macro_rules! defer_async_read {
@@ -135,6 +144,10 @@ macro_rules! deref_async_write {
135 ) -> Poll<Result<usize>> { 144 ) -> Poll<Result<usize>> {
136 Pin::new(&mut **self).poll_write(cx, buf) 145 Pin::new(&mut **self).poll_write(cx, buf)
137 } 146 }
147
148 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
149 Pin::new(&mut **self).poll_flush(cx)
150 }
138 }; 151 };
139} 152}
140 153
@@ -155,4 +168,8 @@ where
155 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { 168 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
156 self.get_mut().as_mut().poll_write(cx, buf) 169 self.get_mut().as_mut().poll_write(cx, buf)
157 } 170 }
171
172 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
173 self.get_mut().as_mut().poll_flush(cx)
174 }
158} 175}
diff --git a/embassy/src/io/util/flush.rs b/embassy/src/io/util/flush.rs
new file mode 100644
index 000000000..966ef10fb
--- /dev/null
+++ b/embassy/src/io/util/flush.rs
@@ -0,0 +1,32 @@
1use core::pin::Pin;
2use futures::future::Future;
3use futures::ready;
4use futures::task::{Context, Poll};
5
6use super::super::error::Result;
7use super::super::traits::AsyncWrite;
8
9/// Future for the [`flush`](super::AsyncWriteExt::flush) method.
10#[derive(Debug)]
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct Flush<'a, W: ?Sized> {
13 writer: &'a mut W,
14}
15
16impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
17
18impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
19 pub(super) fn new(writer: &'a mut W) -> Self {
20 Flush { writer }
21 }
22}
23
24impl<W: AsyncWrite + ?Sized + Unpin> Future for Flush<'_, W> {
25 type Output = Result<()>;
26
27 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
28 let this = &mut *self;
29 let _ = ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
30 Poll::Ready(Ok(()))
31 }
32}
diff --git a/embassy/src/io/util/mod.rs b/embassy/src/io/util/mod.rs
index de6643cb3..49758ba99 100644
--- a/embassy/src/io/util/mod.rs
+++ b/embassy/src/io/util/mod.rs
@@ -27,6 +27,9 @@ pub use self::skip_while::SkipWhile;
27mod drain; 27mod drain;
28pub use self::drain::Drain; 28pub use self::drain::Drain;
29 29
30mod flush;
31pub use self::flush::Flush;
32
30mod write; 33mod write;
31pub use self::write::Write; 34pub use self::write::Write;
32 35
@@ -160,6 +163,15 @@ pub trait AsyncWriteExt: AsyncWrite {
160 { 163 {
161 Write::new(self, buf) 164 Write::new(self, buf)
162 } 165 }
166
167 /// Awaits until all bytes have actually been written, and
168 /// not just enqueued as per the other "write" methods.
169 fn flush<'a>(&mut self) -> Flush<Self>
170 where
171 Self: Unpin,
172 {
173 Flush::new(self)
174 }
163} 175}
164 176
165impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} 177impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {}
diff --git a/embassy/src/io/util/split.rs b/embassy/src/io/util/split.rs
index 0cebb5cbd..cc029aa53 100644
--- a/embassy/src/io/util/split.rs
+++ b/embassy/src/io/util/split.rs
@@ -32,6 +32,9 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
32 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { 32 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
33 Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf) 33 Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf)
34 } 34 }
35 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
36 Pin::new(unsafe { &mut *self.handle.get() }).poll_flush(cx)
37 }
35} 38}
36 39
37pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { 40pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
diff --git a/examples/nrf/src/bin/buffered_uart.rs b/examples/nrf/src/bin/buffered_uart.rs
index 5d9075edf..c3e07e44a 100644
--- a/examples/nrf/src/bin/buffered_uart.rs
+++ b/examples/nrf/src/bin/buffered_uart.rs
@@ -61,5 +61,8 @@ async fn main(_spawner: Spawner, p: Peripherals) {
61 info!("writing..."); 61 info!("writing...");
62 unwrap!(u.write_all(&buf).await); 62 unwrap!(u.write_all(&buf).await);
63 info!("write done"); 63 info!("write done");
64
65 // Wait until the bytes are actually finished being transmitted
66 unwrap!(u.flush().await);
64 } 67 }
65} 68}