aboutsummaryrefslogtreecommitdiff
path: root/embassy-net/src
diff options
context:
space:
mode:
authorHenrik Alsér <[email protected]>2022-05-07 09:47:29 +0200
committerGitHub <[email protected]>2022-05-07 09:47:29 +0200
commit1ca5475010a1cae6ebc55a27948ca4320decd5cd (patch)
tree2484384d601823b4dfe96c2bc4bf2d260b34c5f1 /embassy-net/src
parent108a98136096f8b530266aa6687bdbbed4a6a382 (diff)
parenta4bf190f2f0ce28a298626de6de1c8059269cedc (diff)
Merge branch 'embassy-rs:master' into qdec
Diffstat (limited to 'embassy-net/src')
-rw-r--r--embassy-net/src/device.rs9
-rw-r--r--embassy-net/src/lib.rs9
-rw-r--r--embassy-net/src/tcp/io_impl.rs67
-rw-r--r--embassy-net/src/tcp/mod.rs (renamed from embassy-net/src/tcp_socket.rs)131
4 files changed, 135 insertions, 81 deletions
diff --git a/embassy-net/src/device.rs b/embassy-net/src/device.rs
index f66ebc193..1f4fa5208 100644
--- a/embassy-net/src/device.rs
+++ b/embassy-net/src/device.rs
@@ -4,7 +4,6 @@ use smoltcp::phy::DeviceCapabilities;
4use smoltcp::time::Instant as SmolInstant; 4use smoltcp::time::Instant as SmolInstant;
5 5
6use crate::packet_pool::PacketBoxExt; 6use crate::packet_pool::PacketBoxExt;
7use crate::Result;
8use crate::{Packet, PacketBox, PacketBuf}; 7use crate::{Packet, PacketBox, PacketBuf};
9 8
10#[derive(PartialEq, Eq, Clone, Copy)] 9#[derive(PartialEq, Eq, Clone, Copy)]
@@ -78,9 +77,9 @@ pub struct RxToken {
78} 77}
79 78
80impl smoltcp::phy::RxToken for RxToken { 79impl smoltcp::phy::RxToken for RxToken {
81 fn consume<R, F>(mut self, _timestamp: SmolInstant, f: F) -> Result<R> 80 fn consume<R, F>(mut self, _timestamp: SmolInstant, f: F) -> smoltcp::Result<R>
82 where 81 where
83 F: FnOnce(&mut [u8]) -> Result<R>, 82 F: FnOnce(&mut [u8]) -> smoltcp::Result<R>,
84 { 83 {
85 f(&mut self.pkt) 84 f(&mut self.pkt)
86 } 85 }
@@ -92,9 +91,9 @@ pub struct TxToken<'a> {
92} 91}
93 92
94impl<'a> smoltcp::phy::TxToken for TxToken<'a> { 93impl<'a> smoltcp::phy::TxToken for TxToken<'a> {
95 fn consume<R, F>(self, _timestamp: SmolInstant, len: usize, f: F) -> Result<R> 94 fn consume<R, F>(self, _timestamp: SmolInstant, len: usize, f: F) -> smoltcp::Result<R>
96 where 95 where
97 F: FnOnce(&mut [u8]) -> Result<R>, 96 F: FnOnce(&mut [u8]) -> smoltcp::Result<R>,
98 { 97 {
99 let mut buf = self.pkt.slice(0..len); 98 let mut buf = self.pkt.slice(0..len);
100 let r = f(&mut buf)?; 99 let r = f(&mut buf)?;
diff --git a/embassy-net/src/lib.rs b/embassy-net/src/lib.rs
index ffe786b36..ded841909 100644
--- a/embassy-net/src/lib.rs
+++ b/embassy-net/src/lib.rs
@@ -1,5 +1,9 @@
1#![cfg_attr(not(feature = "std"), no_std)] 1#![cfg_attr(not(feature = "std"), no_std)]
2#![allow(clippy::new_without_default)] 2#![allow(clippy::new_without_default)]
3#![cfg_attr(
4 feature = "nightly",
5 feature(generic_associated_types, type_alias_impl_trait)
6)]
3 7
4// This mod MUST go first, so that the others see its macros. 8// This mod MUST go first, so that the others see its macros.
5pub(crate) mod fmt; 9pub(crate) mod fmt;
@@ -20,9 +24,7 @@ pub use stack::{
20}; 24};
21 25
22#[cfg(feature = "tcp")] 26#[cfg(feature = "tcp")]
23mod tcp_socket; 27pub mod tcp;
24#[cfg(feature = "tcp")]
25pub use tcp_socket::TcpSocket;
26 28
27// smoltcp reexports 29// smoltcp reexports
28pub use smoltcp::phy::{DeviceCapabilities, Medium}; 30pub use smoltcp::phy::{DeviceCapabilities, Medium};
@@ -32,4 +34,3 @@ pub use smoltcp::time::Instant as SmolInstant;
32pub use smoltcp::wire::{EthernetAddress, HardwareAddress}; 34pub use smoltcp::wire::{EthernetAddress, HardwareAddress};
33pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr}; 35pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr};
34pub type Interface = smoltcp::iface::Interface<'static, device::DeviceAdapter>; 36pub type Interface = smoltcp::iface::Interface<'static, device::DeviceAdapter>;
35pub use smoltcp::{Error, Result};
diff --git a/embassy-net/src/tcp/io_impl.rs b/embassy-net/src/tcp/io_impl.rs
new file mode 100644
index 000000000..155733497
--- /dev/null
+++ b/embassy-net/src/tcp/io_impl.rs
@@ -0,0 +1,67 @@
1use core::future::Future;
2use core::task::Poll;
3use futures::future::poll_fn;
4
5use super::{Error, TcpSocket};
6
7impl<'d> embedded_io::asynch::Read for TcpSocket<'d> {
8 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
9 where
10 Self: 'a;
11
12 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
13 poll_fn(move |cx| {
14 // CAUTION: smoltcp semantics around EOF are different to what you'd expect
15 // from posix-like IO, so we have to tweak things here.
16 self.with(|s, _| match s.recv_slice(buf) {
17 // No data ready
18 Ok(0) => {
19 s.register_recv_waker(cx.waker());
20 Poll::Pending
21 }
22 // Data ready!
23 Ok(n) => Poll::Ready(Ok(n)),
24 // EOF
25 Err(smoltcp::Error::Finished) => Poll::Ready(Ok(0)),
26 // Connection reset. TODO: this can also be timeouts etc, investigate.
27 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)),
28 // smoltcp returns no errors other than the above.
29 Err(_) => unreachable!(),
30 })
31 })
32 }
33}
34
35impl<'d> embedded_io::asynch::Write for TcpSocket<'d> {
36 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
37 where
38 Self: 'a;
39
40 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
41 poll_fn(move |cx| {
42 self.with(|s, _| match s.send_slice(buf) {
43 // Not ready to send (no space in the tx buffer)
44 Ok(0) => {
45 s.register_send_waker(cx.waker());
46 Poll::Pending
47 }
48 // Some data sent
49 Ok(n) => Poll::Ready(Ok(n)),
50 // Connection reset. TODO: this can also be timeouts etc, investigate.
51 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)),
52 // smoltcp returns no errors other than the above.
53 Err(_) => unreachable!(),
54 })
55 })
56 }
57
58 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
59 where
60 Self: 'a;
61
62 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
63 poll_fn(move |_| {
64 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
65 })
66 }
67}
diff --git a/embassy-net/src/tcp_socket.rs b/embassy-net/src/tcp/mod.rs
index 5637505d4..3bfd4c7b6 100644
--- a/embassy-net/src/tcp_socket.rs
+++ b/embassy-net/src/tcp/mod.rs
@@ -1,17 +1,46 @@
1use core::marker::PhantomData; 1use core::marker::PhantomData;
2use core::mem; 2use core::mem;
3use core::pin::Pin; 3use core::task::Poll;
4use core::task::{Context, Poll};
5use embassy::io;
6use embassy::io::{AsyncBufRead, AsyncWrite};
7use smoltcp::iface::{Context as SmolContext, SocketHandle}; 4use smoltcp::iface::{Context as SmolContext, SocketHandle};
8use smoltcp::socket::TcpSocket as SyncTcpSocket; 5use smoltcp::socket::TcpSocket as SyncTcpSocket;
9use smoltcp::socket::{TcpSocketBuffer, TcpState}; 6use smoltcp::socket::{TcpSocketBuffer, TcpState};
10use smoltcp::time::Duration; 7use smoltcp::time::Duration;
11use smoltcp::wire::IpEndpoint; 8use smoltcp::wire::IpEndpoint;
12 9
10#[cfg(feature = "nightly")]
11mod io_impl;
12
13use super::stack::Stack; 13use super::stack::Stack;
14use crate::{Error, Result}; 14
15#[derive(PartialEq, Eq, Clone, Copy, Debug)]
16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub enum Error {
18 ConnectionReset,
19}
20
21#[derive(PartialEq, Eq, Clone, Copy, Debug)]
22#[cfg_attr(feature = "defmt", derive(defmt::Format))]
23pub enum ConnectError {
24 /// The socket is already connected or listening.
25 InvalidState,
26 /// The remote host rejected the connection with a RST packet.
27 ConnectionReset,
28 /// Connect timed out.
29 TimedOut,
30 /// No route to host.
31 NoRoute,
32}
33
34#[derive(PartialEq, Eq, Clone, Copy, Debug)]
35#[cfg_attr(feature = "defmt", derive(defmt::Format))]
36pub enum AcceptError {
37 /// The socket is already connected or listening.
38 InvalidState,
39 /// Invalid listen port
40 InvalidPort,
41 /// The remote host rejected the connection with a RST packet.
42 ConnectionReset,
43}
15 44
16pub struct TcpSocket<'a> { 45pub struct TcpSocket<'a> {
17 handle: SocketHandle, 46 handle: SocketHandle,
@@ -37,17 +66,25 @@ impl<'a> TcpSocket<'a> {
37 } 66 }
38 } 67 }
39 68
40 pub async fn connect<T>(&mut self, remote_endpoint: T) -> Result<()> 69 pub async fn connect<T>(&mut self, remote_endpoint: T) -> Result<(), ConnectError>
41 where 70 where
42 T: Into<IpEndpoint>, 71 T: Into<IpEndpoint>,
43 { 72 {
44 let local_port = Stack::with(|stack| stack.get_local_port()); 73 let local_port = Stack::with(|stack| stack.get_local_port());
45 self.with(|s, cx| s.connect(cx, remote_endpoint, local_port))?; 74 match self.with(|s, cx| s.connect(cx, remote_endpoint, local_port)) {
75 Ok(()) => {}
76 Err(smoltcp::Error::Illegal) => return Err(ConnectError::InvalidState),
77 Err(smoltcp::Error::Unaddressable) => return Err(ConnectError::NoRoute),
78 // smoltcp returns no errors other than the above.
79 Err(_) => unreachable!(),
80 }
46 81
47 futures::future::poll_fn(|cx| { 82 futures::future::poll_fn(|cx| {
48 self.with(|s, _| match s.state() { 83 self.with(|s, _| match s.state() {
49 TcpState::Closed | TcpState::TimeWait => Poll::Ready(Err(Error::Unaddressable)), 84 TcpState::Closed | TcpState::TimeWait => {
50 TcpState::Listen => Poll::Ready(Err(Error::Illegal)), 85 Poll::Ready(Err(ConnectError::ConnectionReset))
86 }
87 TcpState::Listen => unreachable!(),
51 TcpState::SynSent | TcpState::SynReceived => { 88 TcpState::SynSent | TcpState::SynReceived => {
52 s.register_send_waker(cx.waker()); 89 s.register_send_waker(cx.waker());
53 Poll::Pending 90 Poll::Pending
@@ -58,11 +95,17 @@ impl<'a> TcpSocket<'a> {
58 .await 95 .await
59 } 96 }
60 97
61 pub async fn accept<T>(&mut self, local_endpoint: T) -> Result<()> 98 pub async fn accept<T>(&mut self, local_endpoint: T) -> Result<(), AcceptError>
62 where 99 where
63 T: Into<IpEndpoint>, 100 T: Into<IpEndpoint>,
64 { 101 {
65 self.with(|s, _| s.listen(local_endpoint))?; 102 match self.with(|s, _| s.listen(local_endpoint)) {
103 Ok(()) => {}
104 Err(smoltcp::Error::Illegal) => return Err(AcceptError::InvalidState),
105 Err(smoltcp::Error::Unaddressable) => return Err(AcceptError::InvalidPort),
106 // smoltcp returns no errors other than the above.
107 Err(_) => unreachable!(),
108 }
66 109
67 futures::future::poll_fn(|cx| { 110 futures::future::poll_fn(|cx| {
68 self.with(|s, _| match s.state() { 111 self.with(|s, _| match s.state() {
@@ -130,11 +173,6 @@ impl<'a> TcpSocket<'a> {
130 } 173 }
131} 174}
132 175
133fn to_ioerr(_err: Error) -> io::Error {
134 // todo
135 io::Error::Other
136}
137
138impl<'a> Drop for TcpSocket<'a> { 176impl<'a> Drop for TcpSocket<'a> {
139 fn drop(&mut self) { 177 fn drop(&mut self) {
140 Stack::with(|stack| { 178 Stack::with(|stack| {
@@ -143,63 +181,12 @@ impl<'a> Drop for TcpSocket<'a> {
143 } 181 }
144} 182}
145 183
146impl<'a> AsyncBufRead for TcpSocket<'a> { 184impl embedded_io::Error for Error {
147 fn poll_fill_buf<'z>( 185 fn kind(&self) -> embedded_io::ErrorKind {
148 self: Pin<&'z mut Self>, 186 embedded_io::ErrorKind::Other
149 cx: &mut Context<'_>,
150 ) -> Poll<io::Result<&'z [u8]>> {
151 self.with(|s, _| match s.peek(1 << 30) {
152 // No data ready
153 Ok(buf) if buf.is_empty() => {
154 s.register_recv_waker(cx.waker());
155 Poll::Pending
156 }
157 // Data ready!
158 Ok(buf) => {
159 // Safety:
160 // - User can't touch the inner TcpSocket directly at all.
161 // - The socket itself won't touch these bytes until consume() is called, which
162 // requires the user to release this borrow.
163 let buf: &'z [u8] = unsafe { core::mem::transmute(&*buf) };
164 Poll::Ready(Ok(buf))
165 }
166 // EOF
167 Err(Error::Finished) => Poll::Ready(Ok(&[][..])),
168 // Error
169 Err(e) => Poll::Ready(Err(to_ioerr(e))),
170 })
171 }
172
173 fn consume(self: Pin<&mut Self>, amt: usize) {
174 if amt == 0 {
175 // smoltcp's recv returns Finished if we're at EOF,
176 // even if we're "reading" 0 bytes.
177 return;
178 }
179 self.with(|s, _| s.recv(|_| (amt, ()))).unwrap()
180 } 187 }
181} 188}
182 189
183impl<'a> AsyncWrite for TcpSocket<'a> { 190impl<'d> embedded_io::Io for TcpSocket<'d> {
184 fn poll_write( 191 type Error = Error;
185 self: Pin<&mut Self>,
186 cx: &mut Context<'_>,
187 buf: &[u8],
188 ) -> Poll<io::Result<usize>> {
189 self.with(|s, _| match s.send_slice(buf) {
190 // Not ready to send (no space in the tx buffer)
191 Ok(0) => {
192 s.register_send_waker(cx.waker());
193 Poll::Pending
194 }
195 // Some data sent
196 Ok(n) => Poll::Ready(Ok(n)),
197 // Error
198 Err(e) => Poll::Ready(Err(to_ioerr(e))),
199 })
200 }
201
202 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
203 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
204 }
205} 192}