diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2023-02-22 14:27:19 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-22 14:27:19 +0000 |
| commit | 464faa2a04416a30b728910ac9504c06d484eb9a (patch) | |
| tree | 17433add913f78d5d19c2a61b3248c203f445d07 | |
| parent | b05cd77a62bc8902db2ae2dd36ed58b88acee932 (diff) | |
| parent | 035de6f3ff7bae31dce25b55e822f75b46ebe047 (diff) | |
Merge #1226
1226: embassy-net: Implement flush for TcpSocket r=Dirbaio a=kbleeke
Implements flush for TcpSocket by checking the send queue.
Flushing is implemented by checking if smoltcp's send_queue/tx_buffer is empty. The flush is completed when all outstanding octets are acknowledged. Smoltcp wakes the send waker [here](https://docs.rs/smoltcp/latest/src/smoltcp/socket/tcp.rs.html#1712) when ACKs are processed and data is removed from the send buffer. So we can re-check in our flush implementation, if the buffer is now empty.
fixes #1223
Co-authored-by: kbleeke <[email protected]>
| -rw-r--r-- | embassy-net/src/tcp.rs | 23 |
1 files changed, 20 insertions, 3 deletions
diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs index d46bd4dbf..c3d8764b0 100644 --- a/embassy-net/src/tcp.rs +++ b/embassy-net/src/tcp.rs | |||
| @@ -63,6 +63,10 @@ impl<'a> TcpWriter<'a> { | |||
| 63 | pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> { | 63 | pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> { |
| 64 | self.io.write(buf).await | 64 | self.io.write(buf).await |
| 65 | } | 65 | } |
| 66 | |||
| 67 | pub async fn flush(&mut self) -> Result<(), Error> { | ||
| 68 | self.io.flush().await | ||
| 69 | } | ||
| 66 | } | 70 | } |
| 67 | 71 | ||
| 68 | impl<'a> TcpSocket<'a> { | 72 | impl<'a> TcpSocket<'a> { |
| @@ -146,6 +150,10 @@ impl<'a> TcpSocket<'a> { | |||
| 146 | self.io.write(buf).await | 150 | self.io.write(buf).await |
| 147 | } | 151 | } |
| 148 | 152 | ||
| 153 | pub async fn flush(&mut self) -> Result<(), Error> { | ||
| 154 | self.io.flush().await | ||
| 155 | } | ||
| 156 | |||
| 149 | pub fn set_timeout(&mut self, duration: Option<Duration>) { | 157 | pub fn set_timeout(&mut self, duration: Option<Duration>) { |
| 150 | self.io.with_mut(|s, _| s.set_timeout(duration)) | 158 | self.io.with_mut(|s, _| s.set_timeout(duration)) |
| 151 | } | 159 | } |
| @@ -254,10 +262,19 @@ impl<'d> TcpIo<'d> { | |||
| 254 | .await | 262 | .await |
| 255 | } | 263 | } |
| 256 | 264 | ||
| 257 | #[allow(unused)] | ||
| 258 | async fn flush(&mut self) -> Result<(), Error> { | 265 | async fn flush(&mut self) -> Result<(), Error> { |
| 259 | poll_fn(move |_| { | 266 | poll_fn(move |cx| { |
| 260 | Poll::Ready(Ok(())) // TODO: Is there a better implementation for this? | 267 | self.with_mut(|s, _| { |
| 268 | // If there are outstanding send operations, register for wake up and wait | ||
| 269 | // smoltcp issues wake-ups when octets are dequeued from the send buffer | ||
| 270 | if s.send_queue() > 0 { | ||
| 271 | s.register_send_waker(cx.waker()); | ||
| 272 | Poll::Pending | ||
| 273 | // No outstanding sends, socket is flushed | ||
| 274 | } else { | ||
| 275 | Poll::Ready(Ok(())) | ||
| 276 | } | ||
| 277 | }) | ||
| 261 | }) | 278 | }) |
| 262 | .await | 279 | .await |
| 263 | } | 280 | } |
