aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Mabin <[email protected]>2023-08-13 23:39:06 +0100
committerScott Mabin <[email protected]>2023-08-14 15:33:02 +0100
commitfbec797d6403348f5c8c306c1d9a7c396b063f10 (patch)
tree48dd0f16f794f0ed0cec5e27c37210ca1273bd2a
parent0727f8690c4684d0622547edee2cf9dc22215a9b (diff)
embassy-net:tcp:send/recv
- Add async versions of smoltcp's `send` and `recv` closure based API.
-rw-r--r--embassy-net/src/tcp.rs102
1 files changed, 102 insertions, 0 deletions
diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs
index c903fb245..b4ce40945 100644
--- a/embassy-net/src/tcp.rs
+++ b/embassy-net/src/tcp.rs
@@ -82,6 +82,17 @@ impl<'a> TcpReader<'a> {
82 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { 82 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
83 self.io.read(buf).await 83 self.io.read(buf).await
84 } 84 }
85
86 /// Call `f` with the largest contiguous slice of octets in the receive buffer,
87 /// and dequeue the amount of elements returned by `f`.
88 ///
89 /// If no data is available, it waits until there is at least one byte available.
90 pub async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error>
91 where
92 F: FnOnce(&mut [u8]) -> (usize, R),
93 {
94 self.io.read_with(f).await
95 }
85} 96}
86 97
87impl<'a> TcpWriter<'a> { 98impl<'a> TcpWriter<'a> {
@@ -100,6 +111,17 @@ impl<'a> TcpWriter<'a> {
100 pub async fn flush(&mut self) -> Result<(), Error> { 111 pub async fn flush(&mut self) -> Result<(), Error> {
101 self.io.flush().await 112 self.io.flush().await
102 } 113 }
114
115 /// Call `f` with the largest contiguous slice of octets in the transmit buffer,
116 /// and enqueue the amount of elements returned by `f`.
117 ///
118 /// If the socket is not ready to accept data, it waits until it is.
119 pub async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error>
120 where
121 F: FnOnce(&mut [u8]) -> (usize, R),
122 {
123 self.io.write_with(f).await
124 }
103} 125}
104 126
105impl<'a> TcpSocket<'a> { 127impl<'a> TcpSocket<'a> {
@@ -121,6 +143,28 @@ impl<'a> TcpSocket<'a> {
121 } 143 }
122 } 144 }
123 145
146 /// Call `f` with the largest contiguous slice of octets in the transmit buffer,
147 /// and enqueue the amount of elements returned by `f`.
148 ///
149 /// If the socket is not ready to accept data, it waits until it is.
150 pub async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error>
151 where
152 F: FnOnce(&mut [u8]) -> (usize, R),
153 {
154 self.io.write_with(f).await
155 }
156
157 /// Call `f` with the largest contiguous slice of octets in the receive buffer,
158 /// and dequeue the amount of elements returned by `f`.
159 ///
160 /// If no data is available, it waits until there is at least one byte available.
161 pub async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error>
162 where
163 F: FnOnce(&mut [u8]) -> (usize, R),
164 {
165 self.io.read_with(f).await
166 }
167
124 /// Split the socket into reader and a writer halves. 168 /// Split the socket into reader and a writer halves.
125 pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) { 169 pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) {
126 (TcpReader { io: self.io }, TcpWriter { io: self.io }) 170 (TcpReader { io: self.io }, TcpWriter { io: self.io })
@@ -359,6 +403,64 @@ impl<'d> TcpIo<'d> {
359 .await 403 .await
360 } 404 }
361 405
406 async fn write_with<F, R>(&mut self, f: F) -> Result<R, Error>
407 where
408 F: FnOnce(&mut [u8]) -> (usize, R),
409 {
410 let mut f = Some(f);
411 poll_fn(move |cx| {
412 self.with_mut(|s, _| {
413 if !s.can_send() {
414 if s.may_send() {
415 // socket buffer is full wait until it has atleast one byte free
416 s.register_send_waker(cx.waker());
417 Poll::Pending
418 } else {
419 // if we can't transmit because the transmit half of the duplex connection is closed then return an error
420 Poll::Ready(Err(Error::ConnectionReset))
421 }
422 } else {
423 Poll::Ready(match s.send(f.take().unwrap()) {
424 // Connection reset. TODO: this can also be timeouts etc, investigate.
425 Err(tcp::SendError::InvalidState) => Err(Error::ConnectionReset),
426 Ok(r) => Ok(r),
427 })
428 }
429 })
430 })
431 .await
432 }
433
434 async fn read_with<F, R>(&mut self, f: F) -> Result<R, Error>
435 where
436 F: FnOnce(&mut [u8]) -> (usize, R),
437 {
438 let mut f = Some(f);
439 poll_fn(move |cx| {
440 self.with_mut(|s, _| {
441 if !s.can_recv() {
442 if s.may_recv() {
443 // socket buffer is empty wait until it has atleast one byte has arrived
444 s.register_recv_waker(cx.waker());
445 Poll::Pending
446 } else {
447 // if we can't receive because the recieve half of the duplex connection is closed then return an error
448 Poll::Ready(Err(Error::ConnectionReset))
449 }
450 } else {
451 Poll::Ready(match s.recv(f.take().unwrap()) {
452 // Connection reset. TODO: this can also be timeouts etc, investigate.
453 Err(tcp::RecvError::Finished) | Err(tcp::RecvError::InvalidState) => {
454 Err(Error::ConnectionReset)
455 }
456 Ok(r) => Ok(r),
457 })
458 }
459 })
460 })
461 .await
462 }
463
362 async fn flush(&mut self) -> Result<(), Error> { 464 async fn flush(&mut self) -> Result<(), Error> {
363 poll_fn(move |cx| { 465 poll_fn(move |cx| {
364 self.with_mut(|s, _| { 466 self.with_mut(|s, _| {