diff options
| author | huntc <[email protected]> | 2021-12-10 12:08:00 +1100 |
|---|---|---|
| committer | huntc <[email protected]> | 2021-12-10 12:16:08 +1100 |
| commit | 7256ff3e71ceea9091349b040a2ebc987aca590c (patch) | |
| tree | 6f23c25b08c027db1dd133a7489b50661e08da7c | |
| parent | 60b7c50d8b02f92844287b150c5f504750846625 (diff) | |
Provides AsyncWrite with flush
As per Tokio and others, this commit provides a `poll_flush` method on `AsyncWrite` so that a best-effort attempt at wakening once all bytes are flushed can be made.
| -rw-r--r-- | embassy-hal-common/src/ring_buffer.rs | 7 | ||||
| -rw-r--r-- | embassy-hal-common/src/usb/usb_serial.rs | 15 | ||||
| -rw-r--r-- | embassy-nrf/src/buffered_uarte.rs | 14 | ||||
| -rw-r--r-- | embassy/src/io/traits.rs | 17 | ||||
| -rw-r--r-- | embassy/src/io/util/flush.rs | 32 | ||||
| -rw-r--r-- | embassy/src/io/util/mod.rs | 12 | ||||
| -rw-r--r-- | examples/nrf/src/bin/buffered_uart.rs | 3 |
7 files changed, 100 insertions, 0 deletions
diff --git a/embassy-hal-common/src/ring_buffer.rs b/embassy-hal-common/src/ring_buffer.rs index 6829f62f5..fcad68bb1 100644 --- a/embassy-hal-common/src/ring_buffer.rs +++ b/embassy-hal-common/src/ring_buffer.rs | |||
| @@ -125,5 +125,12 @@ mod tests { | |||
| 125 | let buf = rb.pop_buf(); | 125 | let buf = rb.pop_buf(); |
| 126 | assert_eq!(1, buf.len()); | 126 | assert_eq!(1, buf.len()); |
| 127 | assert_eq!(4, buf[0]); | 127 | assert_eq!(4, buf[0]); |
| 128 | rb.pop(1); | ||
| 129 | |||
| 130 | let buf = rb.pop_buf(); | ||
| 131 | assert_eq!(0, buf.len()); | ||
| 132 | |||
| 133 | let buf = rb.push_buf(); | ||
| 134 | assert_eq!(4, buf.len()); | ||
| 128 | } | 135 | } |
| 129 | } | 136 | } |
diff --git a/embassy-hal-common/src/usb/usb_serial.rs b/embassy-hal-common/src/usb/usb_serial.rs index ca43a4d73..2592d05a6 100644 --- a/embassy-hal-common/src/usb/usb_serial.rs +++ b/embassy-hal-common/src/usb/usb_serial.rs | |||
| @@ -106,6 +106,17 @@ where | |||
| 106 | serial.poll_write(cx, buf) | 106 | serial.poll_write(cx, buf) |
| 107 | }) | 107 | }) |
| 108 | } | 108 | } |
| 109 | |||
| 110 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| 111 | let this = self.get_mut(); | ||
| 112 | let mut mutex = this.inner.borrow_mut(); | ||
| 113 | mutex.with(|state| { | ||
| 114 | let serial = state.classes.get_serial(); | ||
| 115 | let serial = Pin::new(serial); | ||
| 116 | |||
| 117 | serial.poll_flush(cx) | ||
| 118 | }) | ||
| 119 | } | ||
| 109 | } | 120 | } |
| 110 | 121 | ||
| 111 | pub struct UsbSerial<'bus, 'a, B: UsbBus> { | 122 | pub struct UsbSerial<'bus, 'a, B: UsbBus> { |
| @@ -167,6 +178,10 @@ impl<'bus, 'a, B: UsbBus> AsyncWrite for UsbSerial<'bus, 'a, B> { | |||
| 167 | this.flush_write(); | 178 | this.flush_write(); |
| 168 | Poll::Ready(Ok(count)) | 179 | Poll::Ready(Ok(count)) |
| 169 | } | 180 | } |
| 181 | |||
| 182 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
| 183 | Poll::Ready(Ok(())) | ||
| 184 | } | ||
| 170 | } | 185 | } |
| 171 | 186 | ||
| 172 | /// Keeps track of the type of the last written packet. | 187 | /// Keeps track of the type of the last written packet. |
diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 9b0451c12..e3ca74384 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs | |||
| @@ -266,6 +266,20 @@ impl<'d, U: UarteInstance, T: TimerInstance> AsyncWrite for BufferedUarte<'d, U, | |||
| 266 | 266 | ||
| 267 | poll | 267 | poll |
| 268 | } | 268 | } |
| 269 | |||
| 270 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<embassy::io::Result<()>> { | ||
| 271 | self.inner.with(|state| { | ||
| 272 | trace!("poll_flush"); | ||
| 273 | |||
| 274 | if !state.tx.is_empty() { | ||
| 275 | trace!("poll_flush: pending"); | ||
| 276 | state.tx_waker.register(cx.waker()); | ||
| 277 | return Poll::Pending; | ||
| 278 | } | ||
| 279 | |||
| 280 | Poll::Ready(Ok(())) | ||
| 281 | }) | ||
| 282 | } | ||
| 269 | } | 283 | } |
| 270 | 284 | ||
| 271 | impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> { | 285 | impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> { |
diff --git a/embassy/src/io/traits.rs b/embassy/src/io/traits.rs index 8e4a981da..06500a687 100644 --- a/embassy/src/io/traits.rs +++ b/embassy/src/io/traits.rs | |||
| @@ -89,6 +89,15 @@ pub trait AsyncWrite { | |||
| 89 | /// `poll_write` must try to make progress by flushing the underlying object if | 89 | /// `poll_write` must try to make progress by flushing the underlying object if |
| 90 | /// that is the only way the underlying object can become writable again. | 90 | /// that is the only way the underlying object can become writable again. |
| 91 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>; | 91 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>; |
| 92 | |||
| 93 | /// Attempt to flush the object, ensuring that any buffered data reach their destination. | ||
| 94 | /// | ||
| 95 | /// On success, returns Poll::Ready(Ok(())). | ||
| 96 | /// | ||
| 97 | /// If flushing cannot immediately complete, this method returns [Poll::Pending] and arranges for the | ||
| 98 | /// current task (via cx.waker()) to receive a notification when the object can make progress | ||
| 99 | /// towards flushing. | ||
| 100 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>; | ||
| 92 | } | 101 | } |
| 93 | 102 | ||
| 94 | macro_rules! defer_async_read { | 103 | macro_rules! defer_async_read { |
| @@ -135,6 +144,10 @@ macro_rules! deref_async_write { | |||
| 135 | ) -> Poll<Result<usize>> { | 144 | ) -> Poll<Result<usize>> { |
| 136 | Pin::new(&mut **self).poll_write(cx, buf) | 145 | Pin::new(&mut **self).poll_write(cx, buf) |
| 137 | } | 146 | } |
| 147 | |||
| 148 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 149 | Pin::new(&mut **self).poll_flush(cx) | ||
| 150 | } | ||
| 138 | }; | 151 | }; |
| 139 | } | 152 | } |
| 140 | 153 | ||
| @@ -155,4 +168,8 @@ where | |||
| 155 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { | 168 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { |
| 156 | self.get_mut().as_mut().poll_write(cx, buf) | 169 | self.get_mut().as_mut().poll_write(cx, buf) |
| 157 | } | 170 | } |
| 171 | |||
| 172 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 173 | self.get_mut().as_mut().poll_flush(cx) | ||
| 174 | } | ||
| 158 | } | 175 | } |
diff --git a/embassy/src/io/util/flush.rs b/embassy/src/io/util/flush.rs new file mode 100644 index 000000000..966ef10fb --- /dev/null +++ b/embassy/src/io/util/flush.rs | |||
| @@ -0,0 +1,32 @@ | |||
| 1 | use core::pin::Pin; | ||
| 2 | use futures::future::Future; | ||
| 3 | use futures::ready; | ||
| 4 | use futures::task::{Context, Poll}; | ||
| 5 | |||
| 6 | use super::super::error::Result; | ||
| 7 | use super::super::traits::AsyncWrite; | ||
| 8 | |||
| 9 | /// Future for the [`flush`](super::AsyncWriteExt::flush) method. | ||
| 10 | #[derive(Debug)] | ||
| 11 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 12 | pub struct Flush<'a, W: ?Sized> { | ||
| 13 | writer: &'a mut W, | ||
| 14 | } | ||
| 15 | |||
| 16 | impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {} | ||
| 17 | |||
| 18 | impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> { | ||
| 19 | pub(super) fn new(writer: &'a mut W) -> Self { | ||
| 20 | Flush { writer } | ||
| 21 | } | ||
| 22 | } | ||
| 23 | |||
| 24 | impl<W: AsyncWrite + ?Sized + Unpin> Future for Flush<'_, W> { | ||
| 25 | type Output = Result<()>; | ||
| 26 | |||
| 27 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||
| 28 | let this = &mut *self; | ||
| 29 | let _ = ready!(Pin::new(&mut this.writer).poll_flush(cx))?; | ||
| 30 | Poll::Ready(Ok(())) | ||
| 31 | } | ||
| 32 | } | ||
diff --git a/embassy/src/io/util/mod.rs b/embassy/src/io/util/mod.rs index de6643cb3..49758ba99 100644 --- a/embassy/src/io/util/mod.rs +++ b/embassy/src/io/util/mod.rs | |||
| @@ -27,6 +27,9 @@ pub use self::skip_while::SkipWhile; | |||
| 27 | mod drain; | 27 | mod drain; |
| 28 | pub use self::drain::Drain; | 28 | pub use self::drain::Drain; |
| 29 | 29 | ||
| 30 | mod flush; | ||
| 31 | pub use self::flush::Flush; | ||
| 32 | |||
| 30 | mod write; | 33 | mod write; |
| 31 | pub use self::write::Write; | 34 | pub use self::write::Write; |
| 32 | 35 | ||
| @@ -160,6 +163,15 @@ pub trait AsyncWriteExt: AsyncWrite { | |||
| 160 | { | 163 | { |
| 161 | Write::new(self, buf) | 164 | Write::new(self, buf) |
| 162 | } | 165 | } |
| 166 | |||
| 167 | /// Awaits until all bytes have actually been written, and | ||
| 168 | /// not just enqueued as per the other "write" methods. | ||
| 169 | fn flush<'a>(&mut self) -> Flush<Self> | ||
| 170 | where | ||
| 171 | Self: Unpin, | ||
| 172 | { | ||
| 173 | Flush::new(self) | ||
| 174 | } | ||
| 163 | } | 175 | } |
| 164 | 176 | ||
| 165 | impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} | 177 | impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} |
diff --git a/examples/nrf/src/bin/buffered_uart.rs b/examples/nrf/src/bin/buffered_uart.rs index 5d9075edf..c3e07e44a 100644 --- a/examples/nrf/src/bin/buffered_uart.rs +++ b/examples/nrf/src/bin/buffered_uart.rs | |||
| @@ -61,5 +61,8 @@ async fn main(_spawner: Spawner, p: Peripherals) { | |||
| 61 | info!("writing..."); | 61 | info!("writing..."); |
| 62 | unwrap!(u.write_all(&buf).await); | 62 | unwrap!(u.write_all(&buf).await); |
| 63 | info!("write done"); | 63 | info!("write done"); |
| 64 | |||
| 65 | // Wait until the bytes are actually finished being transmitted | ||
| 66 | unwrap!(u.flush().await); | ||
| 64 | } | 67 | } |
| 65 | } | 68 | } |
