diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2021-12-10 04:26:11 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-12-10 04:26:11 +0000 |
| commit | dce3f8c47df611b51c47559ba8f4c301eb86af95 (patch) | |
| tree | c88ff4746929fa3918436a5702f5145cac2ad860 | |
| parent | 0338fd2237d0b3d0703ff6dc4eb6708d2e323068 (diff) | |
| parent | 45ef9444579c8d7af72fc2d42093f26ef6f6ac3c (diff) | |
Merge #534
534: Provides AsyncWrite with flush r=huntc a=huntc
As per Tokio and others, this commit provides a `poll_flush` method on `AsyncWrite` so that a best-effort attempt at wakening once all bytes are flushed can be made.
Co-authored-by: huntc <[email protected]>
| -rw-r--r-- | embassy-hal-common/src/ring_buffer.rs | 7 | ||||
| -rw-r--r-- | embassy-hal-common/src/usb/usb_serial.rs | 15 | ||||
| -rw-r--r-- | embassy-net/src/tcp_socket.rs | 4 | ||||
| -rw-r--r-- | embassy-nrf/src/buffered_uarte.rs | 14 | ||||
| -rw-r--r-- | embassy-stm32/src/usart/mod.rs | 14 | ||||
| -rw-r--r-- | embassy/src/io/std.rs | 6 | ||||
| -rw-r--r-- | embassy/src/io/traits.rs | 17 | ||||
| -rw-r--r-- | embassy/src/io/util/flush.rs | 32 | ||||
| -rw-r--r-- | embassy/src/io/util/mod.rs | 12 | ||||
| -rw-r--r-- | embassy/src/io/util/split.rs | 3 | ||||
| -rw-r--r-- | examples/nrf/src/bin/buffered_uart.rs | 3 |
11 files changed, 127 insertions, 0 deletions
diff --git a/embassy-hal-common/src/ring_buffer.rs b/embassy-hal-common/src/ring_buffer.rs index 6829f62f5..fcad68bb1 100644 --- a/embassy-hal-common/src/ring_buffer.rs +++ b/embassy-hal-common/src/ring_buffer.rs | |||
| @@ -125,5 +125,12 @@ mod tests { | |||
| 125 | let buf = rb.pop_buf(); | 125 | let buf = rb.pop_buf(); |
| 126 | assert_eq!(1, buf.len()); | 126 | assert_eq!(1, buf.len()); |
| 127 | assert_eq!(4, buf[0]); | 127 | assert_eq!(4, buf[0]); |
| 128 | rb.pop(1); | ||
| 129 | |||
| 130 | let buf = rb.pop_buf(); | ||
| 131 | assert_eq!(0, buf.len()); | ||
| 132 | |||
| 133 | let buf = rb.push_buf(); | ||
| 134 | assert_eq!(4, buf.len()); | ||
| 128 | } | 135 | } |
| 129 | } | 136 | } |
diff --git a/embassy-hal-common/src/usb/usb_serial.rs b/embassy-hal-common/src/usb/usb_serial.rs index ca43a4d73..2592d05a6 100644 --- a/embassy-hal-common/src/usb/usb_serial.rs +++ b/embassy-hal-common/src/usb/usb_serial.rs | |||
| @@ -106,6 +106,17 @@ where | |||
| 106 | serial.poll_write(cx, buf) | 106 | serial.poll_write(cx, buf) |
| 107 | }) | 107 | }) |
| 108 | } | 108 | } |
| 109 | |||
| 110 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| 111 | let this = self.get_mut(); | ||
| 112 | let mut mutex = this.inner.borrow_mut(); | ||
| 113 | mutex.with(|state| { | ||
| 114 | let serial = state.classes.get_serial(); | ||
| 115 | let serial = Pin::new(serial); | ||
| 116 | |||
| 117 | serial.poll_flush(cx) | ||
| 118 | }) | ||
| 119 | } | ||
| 109 | } | 120 | } |
| 110 | 121 | ||
| 111 | pub struct UsbSerial<'bus, 'a, B: UsbBus> { | 122 | pub struct UsbSerial<'bus, 'a, B: UsbBus> { |
| @@ -167,6 +178,10 @@ impl<'bus, 'a, B: UsbBus> AsyncWrite for UsbSerial<'bus, 'a, B> { | |||
| 167 | this.flush_write(); | 178 | this.flush_write(); |
| 168 | Poll::Ready(Ok(count)) | 179 | Poll::Ready(Ok(count)) |
| 169 | } | 180 | } |
| 181 | |||
| 182 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| 183 | Poll::Ready(Ok(())) | ||
| 184 | } | ||
| 170 | } | 185 | } |
| 171 | 186 | ||
| 172 | /// Keeps track of the type of the last written packet. | 187 | /// Keeps track of the type of the last written packet. |
diff --git a/embassy-net/src/tcp_socket.rs b/embassy-net/src/tcp_socket.rs index 39bdd0c18..4836f8075 100644 --- a/embassy-net/src/tcp_socket.rs +++ b/embassy-net/src/tcp_socket.rs | |||
| @@ -200,4 +200,8 @@ impl<'a> AsyncWrite for TcpSocket<'a> { | |||
| 200 | Err(e) => Poll::Ready(Err(to_ioerr(e))), | 200 | Err(e) => Poll::Ready(Err(to_ioerr(e))), |
| 201 | }) | 201 | }) |
| 202 | } | 202 | } |
| 203 | |||
| 204 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| 205 | Poll::Ready(Ok(())) // TODO: Is there a better implementation for this? | ||
| 206 | } | ||
| 203 | } | 207 | } |
diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 9b0451c12..e3ca74384 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs | |||
| @@ -266,6 +266,20 @@ impl<'d, U: UarteInstance, T: TimerInstance> AsyncWrite for BufferedUarte<'d, U, | |||
| 266 | 266 | ||
| 267 | poll | 267 | poll |
| 268 | } | 268 | } |
| 269 | |||
| 270 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<embassy::io::Result<()>> { | ||
| 271 | self.inner.with(|state| { | ||
| 272 | trace!("poll_flush"); | ||
| 273 | |||
| 274 | if !state.tx.is_empty() { | ||
| 275 | trace!("poll_flush: pending"); | ||
| 276 | state.tx_waker.register(cx.waker()); | ||
| 277 | return Poll::Pending; | ||
| 278 | } | ||
| 279 | |||
| 280 | Poll::Ready(Ok(())) | ||
| 281 | }) | ||
| 282 | } | ||
| 269 | } | 283 | } |
| 270 | 284 | ||
| 271 | impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> { | 285 | impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> { |
diff --git a/embassy-stm32/src/usart/mod.rs b/embassy-stm32/src/usart/mod.rs index a835093c5..a87b7c020 100644 --- a/embassy-stm32/src/usart/mod.rs +++ b/embassy-stm32/src/usart/mod.rs | |||
| @@ -449,6 +449,20 @@ mod buffered { | |||
| 449 | } | 449 | } |
| 450 | poll | 450 | poll |
| 451 | } | 451 | } |
| 452 | |||
| 453 | fn poll_flush( | ||
| 454 | mut self: Pin<&mut Self>, | ||
| 455 | cx: &mut Context<'_>, | ||
| 456 | ) -> Poll<Result<(), embassy::io::Error>> { | ||
| 457 | self.inner.with(|state| { | ||
| 458 | if !state.tx.is_empty() { | ||
| 459 | state.tx_waker.register(cx.waker()); | ||
| 460 | return Poll::Pending; | ||
| 461 | } | ||
| 462 | |||
| 463 | Poll::Ready(Ok(())) | ||
| 464 | }) | ||
| 465 | } | ||
| 452 | } | 466 | } |
| 453 | } | 467 | } |
| 454 | 468 | ||
diff --git a/embassy/src/io/std.rs b/embassy/src/io/std.rs index ddec8d56d..580d52891 100644 --- a/embassy/src/io/std.rs +++ b/embassy/src/io/std.rs | |||
| @@ -32,4 +32,10 @@ impl<T: std_io::AsyncWrite> AsyncWrite for FromStdIo<T> { | |||
| 32 | .poll_write(cx, buf) | 32 | .poll_write(cx, buf) |
| 33 | .map_err(|e| e.into()) | 33 | .map_err(|e| e.into()) |
| 34 | } | 34 | } |
| 35 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 36 | let Self(inner) = unsafe { self.get_unchecked_mut() }; | ||
| 37 | unsafe { Pin::new_unchecked(inner) } | ||
| 38 | .poll_flush(cx) | ||
| 39 | .map_err(|e| e.into()) | ||
| 40 | } | ||
| 35 | } | 41 | } |
diff --git a/embassy/src/io/traits.rs b/embassy/src/io/traits.rs index 8e4a981da..06500a687 100644 --- a/embassy/src/io/traits.rs +++ b/embassy/src/io/traits.rs | |||
| @@ -89,6 +89,15 @@ pub trait AsyncWrite { | |||
| 89 | /// `poll_write` must try to make progress by flushing the underlying object if | 89 | /// `poll_write` must try to make progress by flushing the underlying object if |
| 90 | /// that is the only way the underlying object can become writable again. | 90 | /// that is the only way the underlying object can become writable again. |
| 91 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>; | 91 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>; |
| 92 | |||
| 93 | /// Attempt to flush the object, ensuring that any buffered data reach their destination. | ||
| 94 | /// | ||
| 95 | /// On success, returns Poll::Ready(Ok(())). | ||
| 96 | /// | ||
| 97 | /// If flushing cannot immediately complete, this method returns [Poll::Pending] and arranges for the | ||
| 98 | /// current task (via cx.waker()) to receive a notification when the object can make progress | ||
| 99 | /// towards flushing. | ||
| 100 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>; | ||
| 92 | } | 101 | } |
| 93 | 102 | ||
| 94 | macro_rules! defer_async_read { | 103 | macro_rules! defer_async_read { |
| @@ -135,6 +144,10 @@ macro_rules! deref_async_write { | |||
| 135 | ) -> Poll<Result<usize>> { | 144 | ) -> Poll<Result<usize>> { |
| 136 | Pin::new(&mut **self).poll_write(cx, buf) | 145 | Pin::new(&mut **self).poll_write(cx, buf) |
| 137 | } | 146 | } |
| 147 | |||
| 148 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 149 | Pin::new(&mut **self).poll_flush(cx) | ||
| 150 | } | ||
| 138 | }; | 151 | }; |
| 139 | } | 152 | } |
| 140 | 153 | ||
| @@ -155,4 +168,8 @@ where | |||
| 155 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { | 168 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { |
| 156 | self.get_mut().as_mut().poll_write(cx, buf) | 169 | self.get_mut().as_mut().poll_write(cx, buf) |
| 157 | } | 170 | } |
| 171 | |||
| 172 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 173 | self.get_mut().as_mut().poll_flush(cx) | ||
| 174 | } | ||
| 158 | } | 175 | } |
diff --git a/embassy/src/io/util/flush.rs b/embassy/src/io/util/flush.rs new file mode 100644 index 000000000..966ef10fb --- /dev/null +++ b/embassy/src/io/util/flush.rs | |||
| @@ -0,0 +1,32 @@ | |||
| 1 | use core::pin::Pin; | ||
| 2 | use futures::future::Future; | ||
| 3 | use futures::ready; | ||
| 4 | use futures::task::{Context, Poll}; | ||
| 5 | |||
| 6 | use super::super::error::Result; | ||
| 7 | use super::super::traits::AsyncWrite; | ||
| 8 | |||
| 9 | /// Future for the [`flush`](super::AsyncWriteExt::flush) method. | ||
| 10 | #[derive(Debug)] | ||
| 11 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 12 | pub struct Flush<'a, W: ?Sized> { | ||
| 13 | writer: &'a mut W, | ||
| 14 | } | ||
| 15 | |||
| 16 | impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {} | ||
| 17 | |||
| 18 | impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> { | ||
| 19 | pub(super) fn new(writer: &'a mut W) -> Self { | ||
| 20 | Flush { writer } | ||
| 21 | } | ||
| 22 | } | ||
| 23 | |||
| 24 | impl<W: AsyncWrite + ?Sized + Unpin> Future for Flush<'_, W> { | ||
| 25 | type Output = Result<()>; | ||
| 26 | |||
| 27 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 28 | let this = &mut *self; | ||
| 29 | let _ = ready!(Pin::new(&mut this.writer).poll_flush(cx))?; | ||
| 30 | Poll::Ready(Ok(())) | ||
| 31 | } | ||
| 32 | } | ||
diff --git a/embassy/src/io/util/mod.rs b/embassy/src/io/util/mod.rs index de6643cb3..49758ba99 100644 --- a/embassy/src/io/util/mod.rs +++ b/embassy/src/io/util/mod.rs | |||
| @@ -27,6 +27,9 @@ pub use self::skip_while::SkipWhile; | |||
| 27 | mod drain; | 27 | mod drain; |
| 28 | pub use self::drain::Drain; | 28 | pub use self::drain::Drain; |
| 29 | 29 | ||
| 30 | mod flush; | ||
| 31 | pub use self::flush::Flush; | ||
| 32 | |||
| 30 | mod write; | 33 | mod write; |
| 31 | pub use self::write::Write; | 34 | pub use self::write::Write; |
| 32 | 35 | ||
| @@ -160,6 +163,15 @@ pub trait AsyncWriteExt: AsyncWrite { | |||
| 160 | { | 163 | { |
| 161 | Write::new(self, buf) | 164 | Write::new(self, buf) |
| 162 | } | 165 | } |
| 166 | |||
| 167 | /// Awaits until all bytes have actually been written, and | ||
| 168 | /// not just enqueued as per the other "write" methods. | ||
| 169 | fn flush<'a>(&mut self) -> Flush<Self> | ||
| 170 | where | ||
| 171 | Self: Unpin, | ||
| 172 | { | ||
| 173 | Flush::new(self) | ||
| 174 | } | ||
| 163 | } | 175 | } |
| 164 | 176 | ||
| 165 | impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} | 177 | impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} |
diff --git a/embassy/src/io/util/split.rs b/embassy/src/io/util/split.rs index 0cebb5cbd..cc029aa53 100644 --- a/embassy/src/io/util/split.rs +++ b/embassy/src/io/util/split.rs | |||
| @@ -32,6 +32,9 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> { | |||
| 32 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { | 32 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { |
| 33 | Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf) | 33 | Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf) |
| 34 | } | 34 | } |
| 35 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 36 | Pin::new(unsafe { &mut *self.handle.get() }).poll_flush(cx) | ||
| 37 | } | ||
| 35 | } | 38 | } |
| 36 | 39 | ||
| 37 | pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { | 40 | pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { |
diff --git a/examples/nrf/src/bin/buffered_uart.rs b/examples/nrf/src/bin/buffered_uart.rs index 5d9075edf..c3e07e44a 100644 --- a/examples/nrf/src/bin/buffered_uart.rs +++ b/examples/nrf/src/bin/buffered_uart.rs | |||
| @@ -61,5 +61,8 @@ async fn main(_spawner: Spawner, p: Peripherals) { | |||
| 61 | info!("writing..."); | 61 | info!("writing..."); |
| 62 | unwrap!(u.write_all(&buf).await); | 62 | unwrap!(u.write_all(&buf).await); |
| 63 | info!("write done"); | 63 | info!("write done"); |
| 64 | |||
| 65 | // Wait until the bytes are actually finished being transmitted | ||
| 66 | unwrap!(u.flush().await); | ||
| 64 | } | 67 | } |
| 65 | } | 68 | } |
