diff options
| author | Dario Nieuwenhuis <[email protected]> | 2023-08-15 13:08:05 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-08-15 13:08:05 +0000 |
| commit | cb3644856d8a83dfce35e915aa20ce471a02de09 (patch) | |
| tree | 69027bded3a896a3213f41f13f146b41f6f27afe | |
| parent | 03576b9e83a33466e839dabb8db4ca574654a0ff (diff) | |
| parent | fbec797d6403348f5c8c306c1d9a7c396b063f10 (diff) | |
Merge pull request #1777 from MabezDev/embassy-net/async-send-recv
embassy-net:tcp:write_with/read_with
| -rw-r--r-- | embassy-net/src/tcp.rs | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs index c903fb245..b4ce40945 100644 --- a/embassy-net/src/tcp.rs +++ b/embassy-net/src/tcp.rs | |||
| @@ -82,6 +82,17 @@ impl<'a> TcpReader<'a> { | |||
| 82 | pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { | 82 | pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { |
| 83 | self.io.read(buf).await | 83 | self.io.read(buf).await |
| 84 | } | 84 | } |
| 85 | |||
| 86 | /// Call `f` with the largest contiguous slice of octets in the receive buffer, | ||
| 87 | /// and dequeue the amount of elements returned by `f`. | ||
| 88 | /// | ||
| 89 | /// If no data is available, it waits until there is at least one byte available. | ||
| 90 | pub async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error> | ||
| 91 | where | ||
| 92 | F: FnOnce(&mut [u8]) -> (usize, R), | ||
| 93 | { | ||
| 94 | self.io.read_with(f).await | ||
| 95 | } | ||
| 85 | } | 96 | } |
| 86 | 97 | ||
| 87 | impl<'a> TcpWriter<'a> { | 98 | impl<'a> TcpWriter<'a> { |
| @@ -100,6 +111,17 @@ impl<'a> TcpWriter<'a> { | |||
| 100 | pub async fn flush(&mut self) -> Result<(), Error> { | 111 | pub async fn flush(&mut self) -> Result<(), Error> { |
| 101 | self.io.flush().await | 112 | self.io.flush().await |
| 102 | } | 113 | } |
| 114 | |||
| 115 | /// Call `f` with the largest contiguous slice of octets in the transmit buffer, | ||
| 116 | /// and enqueue the amount of elements returned by `f`. | ||
| 117 | /// | ||
| 118 | /// If the socket is not ready to accept data, it waits until it is. | ||
| 119 | pub async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error> | ||
| 120 | where | ||
| 121 | F: FnOnce(&mut [u8]) -> (usize, R), | ||
| 122 | { | ||
| 123 | self.io.write_with(f).await | ||
| 124 | } | ||
| 103 | } | 125 | } |
| 104 | 126 | ||
| 105 | impl<'a> TcpSocket<'a> { | 127 | impl<'a> TcpSocket<'a> { |
| @@ -121,6 +143,28 @@ impl<'a> TcpSocket<'a> { | |||
| 121 | } | 143 | } |
| 122 | } | 144 | } |
| 123 | 145 | ||
| 146 | /// Call `f` with the largest contiguous slice of octets in the transmit buffer, | ||
| 147 | /// and enqueue the amount of elements returned by `f`. | ||
| 148 | /// | ||
| 149 | /// If the socket is not ready to accept data, it waits until it is. | ||
| 150 | pub async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error> | ||
| 151 | where | ||
| 152 | F: FnOnce(&mut [u8]) -> (usize, R), | ||
| 153 | { | ||
| 154 | self.io.write_with(f).await | ||
| 155 | } | ||
| 156 | |||
| 157 | /// Call `f` with the largest contiguous slice of octets in the receive buffer, | ||
| 158 | /// and dequeue the amount of elements returned by `f`. | ||
| 159 | /// | ||
| 160 | /// If no data is available, it waits until there is at least one byte available. | ||
| 161 | pub async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error> | ||
| 162 | where | ||
| 163 | F: FnOnce(&mut [u8]) -> (usize, R), | ||
| 164 | { | ||
| 165 | self.io.read_with(f).await | ||
| 166 | } | ||
| 167 | |||
| 124 | /// Split the socket into reader and a writer halves. | 168 | /// Split the socket into reader and a writer halves. |
| 125 | pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) { | 169 | pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) { |
| 126 | (TcpReader { io: self.io }, TcpWriter { io: self.io }) | 170 | (TcpReader { io: self.io }, TcpWriter { io: self.io }) |
| @@ -359,6 +403,64 @@ impl<'d> TcpIo<'d> { | |||
| 359 | .await | 403 | .await |
| 360 | } | 404 | } |
| 361 | 405 | ||
| 406 | async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error> | ||
| 407 | where | ||
| 408 | F: FnOnce(&mut [u8]) -> (usize, R), | ||
| 409 | { | ||
| 410 | let mut f = Some(f); | ||
| 411 | poll_fn(move |cx| { | ||
| 412 | self.with_mut(|s, _| { | ||
| 413 | if !s.can_send() { | ||
| 414 | if s.may_send() { | ||
| 415 | // socket buffer is full wait until it has atleast one byte free | ||
| 416 | s.register_send_waker(cx.waker()); | ||
| 417 | Poll::Pending | ||
| 418 | } else { | ||
| 419 | // if we can't transmit because the transmit half of the duplex connection is closed then return an error | ||
| 420 | Poll::Ready(Err(Error::ConnectionReset)) | ||
| 421 | } | ||
| 422 | } else { | ||
| 423 | Poll::Ready(match s.send(f.take().unwrap()) { | ||
| 424 | // Connection reset. TODO: this can also be timeouts etc, investigate. | ||
| 425 | Err(tcp::SendError::InvalidState) => Err(Error::ConnectionReset), | ||
| 426 | Ok(r) => Ok(r), | ||
| 427 | }) | ||
| 428 | } | ||
| 429 | }) | ||
| 430 | }) | ||
| 431 | .await | ||
| 432 | } | ||
| 433 | |||
| 434 | async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error> | ||
| 435 | where | ||
| 436 | F: FnOnce(&mut [u8]) -> (usize, R), | ||
| 437 | { | ||
| 438 | let mut f = Some(f); | ||
| 439 | poll_fn(move |cx| { | ||
| 440 | self.with_mut(|s, _| { | ||
| 441 | if !s.can_recv() { | ||
| 442 | if s.may_recv() { | ||
| 443 | // socket buffer is empty wait until it has atleast one byte has arrived | ||
| 444 | s.register_recv_waker(cx.waker()); | ||
| 445 | Poll::Pending | ||
| 446 | } else { | ||
| 447 | // if we can't receive because the recieve half of the duplex connection is closed then return an error | ||
| 448 | Poll::Ready(Err(Error::ConnectionReset)) | ||
| 449 | } | ||
| 450 | } else { | ||
| 451 | Poll::Ready(match s.recv(f.take().unwrap()) { | ||
| 452 | // Connection reset. TODO: this can also be timeouts etc, investigate. | ||
| 453 | Err(tcp::RecvError::Finished) | Err(tcp::RecvError::InvalidState) => { | ||
| 454 | Err(Error::ConnectionReset) | ||
| 455 | } | ||
| 456 | Ok(r) => Ok(r), | ||
| 457 | }) | ||
| 458 | } | ||
| 459 | }) | ||
| 460 | }) | ||
| 461 | .await | ||
| 462 | } | ||
| 463 | |||
| 362 | async fn flush(&mut self) -> Result<(), Error> { | 464 | async fn flush(&mut self) -> Result<(), Error> { |
| 363 | poll_fn(move |cx| { | 465 | poll_fn(move |cx| { |
| 364 | self.with_mut(|s, _| { | 466 | self.with_mut(|s, _| { |
