aboutsummaryrefslogtreecommitdiff
path: root/embassy-net/src
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2022-05-23 03:50:43 +0200
committerDario Nieuwenhuis <[email protected]>2022-05-25 19:56:22 +0200
commita5aea995a802fea8fc1b3e4b5fe47bd6d1fca2a4 (patch)
tree0fcb4c01914347eff5b3be44b284aa9432e28678 /embassy-net/src
parent36a1f203648dcb402727ea3eb5d30cf1f6993795 (diff)
WIP embassy-net v2
Diffstat (limited to 'embassy-net/src')
-rw-r--r--embassy-net/src/config/dhcp.rs55
-rw-r--r--embassy-net/src/config/mod.rs35
-rw-r--r--embassy-net/src/config/statik.rs29
-rw-r--r--embassy-net/src/device.rs50
-rw-r--r--embassy-net/src/lib.rs10
-rw-r--r--embassy-net/src/stack.rs393
-rw-r--r--embassy-net/src/tcp.rs299
7 files changed, 405 insertions, 466 deletions
diff --git a/embassy-net/src/config/dhcp.rs b/embassy-net/src/config/dhcp.rs
deleted file mode 100644
index 298657ed6..000000000
--- a/embassy-net/src/config/dhcp.rs
+++ /dev/null
@@ -1,55 +0,0 @@
1use heapless::Vec;
2use smoltcp::iface::SocketHandle;
3use smoltcp::socket::{Dhcpv4Event, Dhcpv4Socket};
4use smoltcp::time::Instant;
5
6use super::*;
7use crate::device::LinkState;
8use crate::Interface;
9
10pub struct DhcpConfigurator {
11 handle: Option<SocketHandle>,
12}
13
14impl DhcpConfigurator {
15 pub fn new() -> Self {
16 Self { handle: None }
17 }
18}
19
20impl Configurator for DhcpConfigurator {
21 fn poll(&mut self, iface: &mut Interface, _timestamp: Instant) -> Event {
22 if self.handle.is_none() {
23 let handle = iface.add_socket(Dhcpv4Socket::new());
24 self.handle = Some(handle)
25 }
26
27 let link_up = iface.device_mut().device.link_state() == LinkState::Up;
28
29 let socket = iface.get_socket::<Dhcpv4Socket>(self.handle.unwrap());
30
31 if !link_up {
32 socket.reset();
33 return Event::Deconfigured;
34 }
35
36 match socket.poll() {
37 None => Event::NoChange,
38 Some(Dhcpv4Event::Deconfigured) => Event::Deconfigured,
39 Some(Dhcpv4Event::Configured(config)) => {
40 let mut dns_servers = Vec::new();
41 for s in &config.dns_servers {
42 if let Some(addr) = s {
43 dns_servers.push(addr.clone()).unwrap();
44 }
45 }
46
47 Event::Configured(Config {
48 address: config.address,
49 gateway: config.router,
50 dns_servers,
51 })
52 }
53 }
54 }
55}
diff --git a/embassy-net/src/config/mod.rs b/embassy-net/src/config/mod.rs
deleted file mode 100644
index eb1b6636a..000000000
--- a/embassy-net/src/config/mod.rs
+++ /dev/null
@@ -1,35 +0,0 @@
1use heapless::Vec;
2use smoltcp::time::Instant;
3use smoltcp::wire::{Ipv4Address, Ipv4Cidr};
4
5use crate::Interface;
6
7mod statik;
8pub use statik::StaticConfigurator;
9
10#[cfg(feature = "dhcpv4")]
11mod dhcp;
12#[cfg(feature = "dhcpv4")]
13pub use dhcp::DhcpConfigurator;
14
15/// Return value for the `Configurator::poll` function
16#[derive(Debug, Clone)]
17pub enum Event {
18 /// No change has occured to the configuration.
19 NoChange,
20 /// Configuration has been lost (for example, DHCP lease has expired)
21 Deconfigured,
22 /// Configuration has been newly acquired, or modified.
23 Configured(Config),
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct Config {
28 pub address: Ipv4Cidr,
29 pub gateway: Option<Ipv4Address>,
30 pub dns_servers: Vec<Ipv4Address, 3>,
31}
32
33pub trait Configurator {
34 fn poll(&mut self, iface: &mut Interface, timestamp: Instant) -> Event;
35}
diff --git a/embassy-net/src/config/statik.rs b/embassy-net/src/config/statik.rs
deleted file mode 100644
index e614db73b..000000000
--- a/embassy-net/src/config/statik.rs
+++ /dev/null
@@ -1,29 +0,0 @@
1use smoltcp::time::Instant;
2
3use super::*;
4use crate::Interface;
5
6pub struct StaticConfigurator {
7 config: Config,
8 returned: bool,
9}
10
11impl StaticConfigurator {
12 pub fn new(config: Config) -> Self {
13 Self {
14 config,
15 returned: false,
16 }
17 }
18}
19
20impl Configurator for StaticConfigurator {
21 fn poll(&mut self, _iface: &mut Interface, _timestamp: Instant) -> Event {
22 if self.returned {
23 Event::NoChange
24 } else {
25 self.returned = true;
26 Event::Configured(self.config.clone())
27 }
28 }
29}
diff --git a/embassy-net/src/device.rs b/embassy-net/src/device.rs
index 1f4fa5208..99c6a2212 100644
--- a/embassy-net/src/device.rs
+++ b/embassy-net/src/device.rs
@@ -12,24 +12,50 @@ pub enum LinkState {
12 Up, 12 Up,
13} 13}
14 14
15// 'static required due to the "fake GAT" in smoltcp::phy::Device.
16// https://github.com/smoltcp-rs/smoltcp/pull/572
15pub trait Device { 17pub trait Device {
16 fn is_transmit_ready(&mut self) -> bool; 18 fn is_transmit_ready(&mut self) -> bool;
17 fn transmit(&mut self, pkt: PacketBuf); 19 fn transmit(&mut self, pkt: PacketBuf);
18 fn receive(&mut self) -> Option<PacketBuf>; 20 fn receive(&mut self) -> Option<PacketBuf>;
19 21
20 fn register_waker(&mut self, waker: &Waker); 22 fn register_waker(&mut self, waker: &Waker);
21 fn capabilities(&mut self) -> DeviceCapabilities; 23 fn capabilities(&self) -> DeviceCapabilities;
22 fn link_state(&mut self) -> LinkState; 24 fn link_state(&mut self) -> LinkState;
23 fn ethernet_address(&self) -> [u8; 6]; 25 fn ethernet_address(&self) -> [u8; 6];
24} 26}
25 27
26pub struct DeviceAdapter { 28impl<T: ?Sized + Device> Device for &'static mut T {
27 pub device: &'static mut dyn Device, 29 fn is_transmit_ready(&mut self) -> bool {
30 T::is_transmit_ready(self)
31 }
32 fn transmit(&mut self, pkt: PacketBuf) {
33 T::transmit(self, pkt)
34 }
35 fn receive(&mut self) -> Option<PacketBuf> {
36 T::receive(self)
37 }
38 fn register_waker(&mut self, waker: &Waker) {
39 T::register_waker(self, waker)
40 }
41 fn capabilities(&self) -> DeviceCapabilities {
42 T::capabilities(self)
43 }
44 fn link_state(&mut self) -> LinkState {
45 T::link_state(self)
46 }
47 fn ethernet_address(&self) -> [u8; 6] {
48 T::ethernet_address(self)
49 }
50}
51
52pub struct DeviceAdapter<D: Device> {
53 pub device: D,
28 caps: DeviceCapabilities, 54 caps: DeviceCapabilities,
29} 55}
30 56
31impl DeviceAdapter { 57impl<D: Device> DeviceAdapter<D> {
32 pub(crate) fn new(device: &'static mut dyn Device) -> Self { 58 pub(crate) fn new(device: D) -> Self {
33 Self { 59 Self {
34 caps: device.capabilities(), 60 caps: device.capabilities(),
35 device, 61 device,
@@ -37,16 +63,16 @@ impl DeviceAdapter {
37 } 63 }
38} 64}
39 65
40impl<'a> SmolDevice<'a> for DeviceAdapter { 66impl<'a, D: Device + 'static> SmolDevice<'a> for DeviceAdapter<D> {
41 type RxToken = RxToken; 67 type RxToken = RxToken;
42 type TxToken = TxToken<'a>; 68 type TxToken = TxToken<'a, D>;
43 69
44 fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> { 70 fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
45 let tx_pkt = PacketBox::new(Packet::new())?; 71 let tx_pkt = PacketBox::new(Packet::new())?;
46 let rx_pkt = self.device.receive()?; 72 let rx_pkt = self.device.receive()?;
47 let rx_token = RxToken { pkt: rx_pkt }; 73 let rx_token = RxToken { pkt: rx_pkt };
48 let tx_token = TxToken { 74 let tx_token = TxToken {
49 device: self.device, 75 device: &mut self.device,
50 pkt: tx_pkt, 76 pkt: tx_pkt,
51 }; 77 };
52 78
@@ -61,7 +87,7 @@ impl<'a> SmolDevice<'a> for DeviceAdapter {
61 87
62 let tx_pkt = PacketBox::new(Packet::new())?; 88 let tx_pkt = PacketBox::new(Packet::new())?;
63 Some(TxToken { 89 Some(TxToken {
64 device: self.device, 90 device: &mut self.device,
65 pkt: tx_pkt, 91 pkt: tx_pkt,
66 }) 92 })
67 } 93 }
@@ -85,12 +111,12 @@ impl smoltcp::phy::RxToken for RxToken {
85 } 111 }
86} 112}
87 113
88pub struct TxToken<'a> { 114pub struct TxToken<'a, D: Device> {
89 device: &'a mut dyn Device, 115 device: &'a mut D,
90 pkt: PacketBox, 116 pkt: PacketBox,
91} 117}
92 118
93impl<'a> smoltcp::phy::TxToken for TxToken<'a> { 119impl<'a, D: Device> smoltcp::phy::TxToken for TxToken<'a, D> {
94 fn consume<R, F>(self, _timestamp: SmolInstant, len: usize, f: F) -> smoltcp::Result<R> 120 fn consume<R, F>(self, _timestamp: SmolInstant, len: usize, f: F) -> smoltcp::Result<R>
95 where 121 where
96 F: FnOnce(&mut [u8]) -> smoltcp::Result<R>, 122 F: FnOnce(&mut [u8]) -> smoltcp::Result<R>,
diff --git a/embassy-net/src/lib.rs b/embassy-net/src/lib.rs
index 18dc1ef61..7b5f29f16 100644
--- a/embassy-net/src/lib.rs
+++ b/embassy-net/src/lib.rs
@@ -5,20 +5,13 @@
5// This mod MUST go first, so that the others see its macros. 5// This mod MUST go first, so that the others see its macros.
6pub(crate) mod fmt; 6pub(crate) mod fmt;
7 7
8mod config;
9mod device; 8mod device;
10mod packet_pool; 9mod packet_pool;
11mod stack; 10mod stack;
12 11
13#[cfg(feature = "dhcpv4")]
14pub use config::DhcpConfigurator;
15pub use config::{Config, Configurator, Event as ConfigEvent, StaticConfigurator};
16
17pub use device::{Device, LinkState}; 12pub use device::{Device, LinkState};
18pub use packet_pool::{Packet, PacketBox, PacketBoxExt, PacketBuf, MTU}; 13pub use packet_pool::{Packet, PacketBox, PacketBoxExt, PacketBuf, MTU};
19pub use stack::{ 14pub use stack::{Config, ConfigStrategy, Stack, StackResources};
20 config, ethernet_address, init, is_config_up, is_init, is_link_up, run, StackResources,
21};
22 15
23#[cfg(feature = "tcp")] 16#[cfg(feature = "tcp")]
24pub mod tcp; 17pub mod tcp;
@@ -30,4 +23,3 @@ pub use smoltcp::time::Instant as SmolInstant;
30#[cfg(feature = "medium-ethernet")] 23#[cfg(feature = "medium-ethernet")]
31pub use smoltcp::wire::{EthernetAddress, HardwareAddress}; 24pub use smoltcp::wire::{EthernetAddress, HardwareAddress};
32pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr}; 25pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr};
33pub type Interface = smoltcp::iface::Interface<'static, device::DeviceAdapter>;
diff --git a/embassy-net/src/stack.rs b/embassy-net/src/stack.rs
index 9461f832f..e28370df8 100644
--- a/embassy-net/src/stack.rs
+++ b/embassy-net/src/stack.rs
@@ -1,13 +1,18 @@
1use core::cell::RefCell; 1use core::cell::UnsafeCell;
2use core::future::Future; 2use core::future::Future;
3use core::task::Context; 3use core::task::Context;
4use core::task::Poll; 4use core::task::Poll;
5use embassy::blocking_mutex::ThreadModeMutex;
6use embassy::time::{Instant, Timer}; 5use embassy::time::{Instant, Timer};
7use embassy::waitqueue::WakerRegistration; 6use embassy::waitqueue::WakerRegistration;
7use futures::future::poll_fn;
8use futures::pin_mut; 8use futures::pin_mut;
9use smoltcp::iface::InterfaceBuilder; 9use heapless::Vec;
10use smoltcp::iface::SocketStorage; 10#[cfg(feature = "dhcpv4")]
11use smoltcp::iface::SocketHandle;
12use smoltcp::iface::{Interface, InterfaceBuilder};
13use smoltcp::iface::{SocketSet, SocketStorage};
14#[cfg(feature = "dhcpv4")]
15use smoltcp::socket::dhcpv4;
11use smoltcp::time::Instant as SmolInstant; 16use smoltcp::time::Instant as SmolInstant;
12use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr}; 17use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr};
13 18
@@ -18,10 +23,7 @@ use smoltcp::phy::{Device as _, Medium};
18#[cfg(feature = "medium-ethernet")] 23#[cfg(feature = "medium-ethernet")]
19use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress}; 24use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress};
20 25
21use crate::config::Configurator;
22use crate::config::Event;
23use crate::device::{Device, DeviceAdapter, LinkState}; 26use crate::device::{Device, DeviceAdapter, LinkState};
24use crate::{Config, Interface};
25 27
26const LOCAL_PORT_MIN: u16 = 1025; 28const LOCAL_PORT_MIN: u16 = 1025;
27const LOCAL_PORT_MAX: u16 = 65535; 29const LOCAL_PORT_MAX: u16 = 65535;
@@ -51,24 +53,144 @@ impl<const ADDR: usize, const SOCK: usize, const NEIGHBOR: usize>
51 } 53 }
52} 54}
53 55
54static STACK: ThreadModeMutex<RefCell<Option<Stack>>> = ThreadModeMutex::new(RefCell::new(None)); 56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct Config {
58 pub address: Ipv4Cidr,
59 pub gateway: Option<Ipv4Address>,
60 pub dns_servers: Vec<Ipv4Address, 3>,
61}
62
63pub enum ConfigStrategy {
64 Static(Config),
65 #[cfg(feature = "dhcpv4")]
66 Dhcp,
67}
55 68
56pub(crate) struct Stack { 69pub struct Stack<D: Device> {
57 pub iface: Interface, 70 pub(crate) socket: UnsafeCell<SocketStack>,
71 inner: UnsafeCell<Inner<D>>,
72}
73
74struct Inner<D: Device> {
75 device: DeviceAdapter<D>,
58 link_up: bool, 76 link_up: bool,
59 config: Option<Config>, 77 config: Option<Config>,
78 #[cfg(feature = "dhcpv4")]
79 dhcp_socket: Option<SocketHandle>,
80}
81
82pub(crate) struct SocketStack {
83 pub(crate) sockets: SocketSet<'static>,
84 pub(crate) iface: Interface<'static>,
85 pub(crate) waker: WakerRegistration,
60 next_local_port: u16, 86 next_local_port: u16,
61 configurator: &'static mut dyn Configurator,
62 waker: WakerRegistration,
63} 87}
64 88
65impl Stack { 89unsafe impl<D: Device> Send for Stack<D> {}
66 pub(crate) fn with<R>(f: impl FnOnce(&mut Stack) -> R) -> R { 90
67 let mut stack = STACK.borrow().borrow_mut(); 91impl<D: Device + 'static> Stack<D> {
68 let stack = stack.as_mut().unwrap(); 92 pub fn new<const ADDR: usize, const SOCK: usize, const NEIGH: usize>(
69 f(stack) 93 device: D,
94 config: ConfigStrategy,
95 resources: &'static mut StackResources<ADDR, SOCK, NEIGH>,
96 random_seed: u64,
97 ) -> Self {
98 #[cfg(feature = "medium-ethernet")]
99 let medium = device.capabilities().medium;
100
101 #[cfg(feature = "medium-ethernet")]
102 let ethernet_addr = if medium == Medium::Ethernet {
103 device.ethernet_address()
104 } else {
105 [0, 0, 0, 0, 0, 0]
106 };
107
108 let mut device = DeviceAdapter::new(device);
109
110 let mut b = InterfaceBuilder::new();
111 b = b.ip_addrs(&mut resources.addresses[..]);
112 b = b.random_seed(random_seed);
113
114 #[cfg(feature = "medium-ethernet")]
115 if medium == Medium::Ethernet {
116 b = b.hardware_addr(HardwareAddress::Ethernet(EthernetAddress(ethernet_addr)));
117 b = b.neighbor_cache(NeighborCache::new(&mut resources.neighbor_cache[..]));
118 b = b.routes(Routes::new(&mut resources.routes[..]));
119 }
120
121 let iface = b.finalize(&mut device);
122
123 let sockets = SocketSet::new(&mut resources.sockets[..]);
124
125 let next_local_port =
126 (random_seed % (LOCAL_PORT_MAX - LOCAL_PORT_MIN) as u64) as u16 + LOCAL_PORT_MIN;
127
128 let mut inner = Inner {
129 device,
130 link_up: false,
131 config: None,
132 #[cfg(feature = "dhcpv4")]
133 dhcp_socket: None,
134 };
135 let mut socket = SocketStack {
136 sockets,
137 iface,
138 waker: WakerRegistration::new(),
139 next_local_port,
140 };
141
142 match config {
143 ConfigStrategy::Static(config) => inner.apply_config(&mut socket, config),
144 #[cfg(feature = "dhcpv4")]
145 ConfigStrategy::Dhcp => {
146 let handle = socket.sockets.add(smoltcp::socket::dhcpv4::Socket::new());
147 inner.dhcp_socket = Some(handle);
148 }
149 }
150
151 Self {
152 socket: UnsafeCell::new(socket),
153 inner: UnsafeCell::new(inner),
154 }
155 }
156
157 /// SAFETY: must not call reentrantly.
158 unsafe fn with<R>(&self, f: impl FnOnce(&SocketStack, &Inner<D>) -> R) -> R {
159 f(&*self.socket.get(), &*self.inner.get())
160 }
161
162 /// SAFETY: must not call reentrantly.
163 unsafe fn with_mut<R>(&self, f: impl FnOnce(&mut SocketStack, &mut Inner<D>) -> R) -> R {
164 f(&mut *self.socket.get(), &mut *self.inner.get())
165 }
166
167 pub fn ethernet_address(&self) -> [u8; 6] {
168 unsafe { self.with(|_s, i| i.device.device.ethernet_address()) }
169 }
170
171 pub fn is_link_up(&self) -> bool {
172 unsafe { self.with(|_s, i| i.link_up) }
173 }
174
175 pub fn is_config_up(&self) -> bool {
176 unsafe { self.with(|_s, i| i.config.is_some()) }
177 }
178
179 pub fn config(&self) -> Option<Config> {
180 unsafe { self.with(|_s, i| i.config.clone()) }
181 }
182
183 pub async fn run(&self) -> ! {
184 poll_fn(|cx| {
185 unsafe { self.with_mut(|s, i| i.poll(cx, s)) }
186 Poll::<()>::Pending
187 })
188 .await;
189 unreachable!()
70 } 190 }
191}
71 192
193impl SocketStack {
72 #[allow(clippy::absurd_extreme_comparisons)] 194 #[allow(clippy::absurd_extreme_comparisons)]
73 pub fn get_local_port(&mut self) -> u16 { 195 pub fn get_local_port(&mut self) -> u16 {
74 let res = self.next_local_port; 196 let res = self.next_local_port;
@@ -79,60 +201,68 @@ impl Stack {
79 }; 201 };
80 res 202 res
81 } 203 }
204}
205
206impl<D: Device + 'static> Inner<D> {
207 fn apply_config(&mut self, s: &mut SocketStack, config: Config) {
208 #[cfg(feature = "medium-ethernet")]
209 let medium = self.device.capabilities().medium;
210
211 debug!("Acquired IP configuration:");
82 212
83 pub(crate) fn wake(&mut self) { 213 debug!(" IP address: {}", config.address);
84 self.waker.wake() 214 self.set_ipv4_addr(s, config.address);
215
216 #[cfg(feature = "medium-ethernet")]
217 if medium == Medium::Ethernet {
218 if let Some(gateway) = config.gateway {
219 debug!(" Default gateway: {}", gateway);
220 s.iface
221 .routes_mut()
222 .add_default_ipv4_route(gateway)
223 .unwrap();
224 } else {
225 debug!(" Default gateway: None");
226 s.iface.routes_mut().remove_default_ipv4_route();
227 }
228 }
229 for (i, s) in config.dns_servers.iter().enumerate() {
230 debug!(" DNS server {}: {}", i, s);
231 }
232
233 self.config = Some(config)
85 } 234 }
86 235
87 fn poll_configurator(&mut self, timestamp: SmolInstant) { 236 #[allow(unused)] // used only with dhcp
237 fn unapply_config(&mut self, s: &mut SocketStack) {
88 #[cfg(feature = "medium-ethernet")] 238 #[cfg(feature = "medium-ethernet")]
89 let medium = self.iface.device().capabilities().medium; 239 let medium = self.device.capabilities().medium;
90
91 match self.configurator.poll(&mut self.iface, timestamp) {
92 Event::NoChange => {}
93 Event::Configured(config) => {
94 debug!("Acquired IP configuration:");
95
96 debug!(" IP address: {}", config.address);
97 set_ipv4_addr(&mut self.iface, config.address);
98
99 #[cfg(feature = "medium-ethernet")]
100 if medium == Medium::Ethernet {
101 if let Some(gateway) = config.gateway {
102 debug!(" Default gateway: {}", gateway);
103 self.iface
104 .routes_mut()
105 .add_default_ipv4_route(gateway)
106 .unwrap();
107 } else {
108 debug!(" Default gateway: None");
109 self.iface.routes_mut().remove_default_ipv4_route();
110 }
111 }
112 for (i, s) in config.dns_servers.iter().enumerate() {
113 debug!(" DNS server {}: {}", i, s);
114 }
115 240
116 self.config = Some(config) 241 debug!("Lost IP configuration");
117 } 242 self.set_ipv4_addr(s, Ipv4Cidr::new(Ipv4Address::UNSPECIFIED, 0));
118 Event::Deconfigured => { 243 #[cfg(feature = "medium-ethernet")]
119 debug!("Lost IP configuration"); 244 if medium == Medium::Ethernet {
120 set_ipv4_addr(&mut self.iface, Ipv4Cidr::new(Ipv4Address::UNSPECIFIED, 0)); 245 s.iface.routes_mut().remove_default_ipv4_route();
121 #[cfg(feature = "medium-ethernet")]
122 if medium == Medium::Ethernet {
123 self.iface.routes_mut().remove_default_ipv4_route();
124 }
125 self.config = None
126 }
127 } 246 }
247 self.config = None
248 }
249
250 fn set_ipv4_addr(&mut self, s: &mut SocketStack, cidr: Ipv4Cidr) {
251 s.iface.update_ip_addrs(|addrs| {
252 let dest = addrs.iter_mut().next().unwrap();
253 *dest = IpCidr::Ipv4(cidr);
254 });
128 } 255 }
129 256
130 fn poll(&mut self, cx: &mut Context<'_>) { 257 fn poll(&mut self, cx: &mut Context<'_>, s: &mut SocketStack) {
131 self.iface.device_mut().device.register_waker(cx.waker()); 258 self.device.device.register_waker(cx.waker());
132 self.waker.register(cx.waker()); 259 s.waker.register(cx.waker());
133 260
134 let timestamp = instant_to_smoltcp(Instant::now()); 261 let timestamp = instant_to_smoltcp(Instant::now());
135 if self.iface.poll(timestamp).is_err() { 262 if s.iface
263 .poll(timestamp, &mut self.device, &mut s.sockets)
264 .is_err()
265 {
136 // If poll() returns error, it may not be done yet, so poll again later. 266 // If poll() returns error, it may not be done yet, so poll again later.
137 cx.waker().wake_by_ref(); 267 cx.waker().wake_by_ref();
138 return; 268 return;
@@ -140,18 +270,49 @@ impl Stack {
140 270
141 // Update link up 271 // Update link up
142 let old_link_up = self.link_up; 272 let old_link_up = self.link_up;
143 self.link_up = self.iface.device_mut().device.link_state() == LinkState::Up; 273 self.link_up = self.device.device.link_state() == LinkState::Up;
144 274
145 // Print when changed 275 // Print when changed
146 if old_link_up != self.link_up { 276 if old_link_up != self.link_up {
147 info!("link_up = {:?}", self.link_up); 277 info!("link_up = {:?}", self.link_up);
148 } 278 }
149 279
150 if old_link_up || self.link_up { 280 #[cfg(feature = "dhcpv4")]
151 self.poll_configurator(timestamp) 281 if let Some(dhcp_handle) = self.dhcp_socket {
282 let socket = s.sockets.get_mut::<dhcpv4::Socket>(dhcp_handle);
283
284 if self.link_up {
285 match socket.poll() {
286 None => {}
287 Some(dhcpv4::Event::Deconfigured) => self.unapply_config(s),
288 Some(dhcpv4::Event::Configured(config)) => {
289 let mut dns_servers = Vec::new();
290 for s in &config.dns_servers {
291 if let Some(addr) = s {
292 dns_servers.push(addr.clone()).unwrap();
293 }
294 }
295
296 self.apply_config(
297 s,
298 Config {
299 address: config.address,
300 gateway: config.router,
301 dns_servers,
302 },
303 )
304 }
305 }
306 } else if old_link_up {
307 socket.reset();
308 self.unapply_config(s);
309 }
152 } 310 }
311 //if old_link_up || self.link_up {
312 // self.poll_configurator(timestamp)
313 //}
153 314
154 if let Some(poll_at) = self.iface.poll_at(timestamp) { 315 if let Some(poll_at) = s.iface.poll_at(timestamp, &mut s.sockets) {
155 let t = Timer::at(instant_from_smoltcp(poll_at)); 316 let t = Timer::at(instant_from_smoltcp(poll_at));
156 pin_mut!(t); 317 pin_mut!(t);
157 if t.poll(cx).is_ready() { 318 if t.poll(cx).is_ready() {
@@ -161,100 +322,6 @@ impl Stack {
161 } 322 }
162} 323}
163 324
164fn set_ipv4_addr(iface: &mut Interface, cidr: Ipv4Cidr) {
165 iface.update_ip_addrs(|addrs| {
166 let dest = addrs.iter_mut().next().unwrap();
167 *dest = IpCidr::Ipv4(cidr);
168 });
169}
170
171/// Initialize embassy_net.
172/// This function must be called from thread mode.
173pub fn init<const ADDR: usize, const SOCK: usize, const NEIGH: usize>(
174 device: &'static mut dyn Device,
175 configurator: &'static mut dyn Configurator,
176 resources: &'static mut StackResources<ADDR, SOCK, NEIGH>,
177) {
178 #[cfg(feature = "medium-ethernet")]
179 let medium = device.capabilities().medium;
180
181 #[cfg(feature = "medium-ethernet")]
182 let ethernet_addr = if medium == Medium::Ethernet {
183 device.ethernet_address()
184 } else {
185 [0, 0, 0, 0, 0, 0]
186 };
187
188 let mut b = InterfaceBuilder::new(DeviceAdapter::new(device), &mut resources.sockets[..]);
189 b = b.ip_addrs(&mut resources.addresses[..]);
190
191 #[cfg(feature = "medium-ethernet")]
192 if medium == Medium::Ethernet {
193 b = b.hardware_addr(HardwareAddress::Ethernet(EthernetAddress(ethernet_addr)));
194 b = b.neighbor_cache(NeighborCache::new(&mut resources.neighbor_cache[..]));
195 b = b.routes(Routes::new(&mut resources.routes[..]));
196 }
197
198 let iface = b.finalize();
199
200 let local_port = loop {
201 let mut res = [0u8; 2];
202 rand(&mut res);
203 let port = u16::from_le_bytes(res);
204 if (LOCAL_PORT_MIN..=LOCAL_PORT_MAX).contains(&port) {
205 break port;
206 }
207 };
208
209 let stack = Stack {
210 iface,
211 link_up: false,
212 config: None,
213 configurator,
214 next_local_port: local_port,
215 waker: WakerRegistration::new(),
216 };
217
218 *STACK.borrow().borrow_mut() = Some(stack);
219}
220
221pub fn ethernet_address() -> [u8; 6] {
222 STACK
223 .borrow()
224 .borrow()
225 .as_ref()
226 .unwrap()
227 .iface
228 .device()
229 .device
230 .ethernet_address()
231}
232
233pub fn is_init() -> bool {
234 STACK.borrow().borrow().is_some()
235}
236
237pub fn is_link_up() -> bool {
238 STACK.borrow().borrow().as_ref().unwrap().link_up
239}
240
241pub fn is_config_up() -> bool {
242 STACK.borrow().borrow().as_ref().unwrap().config.is_some()
243}
244
245pub fn config() -> Option<Config> {
246 STACK.borrow().borrow().as_ref().unwrap().config.clone()
247}
248
249pub async fn run() -> ! {
250 futures::future::poll_fn(|cx| {
251 Stack::with(|stack| stack.poll(cx));
252 Poll::<()>::Pending
253 })
254 .await;
255 unreachable!()
256}
257
258fn instant_to_smoltcp(instant: Instant) -> SmolInstant { 325fn instant_to_smoltcp(instant: Instant) -> SmolInstant {
259 SmolInstant::from_millis(instant.as_millis() as i64) 326 SmolInstant::from_millis(instant.as_millis() as i64)
260} 327}
@@ -262,11 +329,3 @@ fn instant_to_smoltcp(instant: Instant) -> SmolInstant {
262fn instant_from_smoltcp(instant: SmolInstant) -> Instant { 329fn instant_from_smoltcp(instant: SmolInstant) -> Instant {
263 Instant::from_millis(instant.total_millis() as u64) 330 Instant::from_millis(instant.total_millis() as u64)
264} 331}
265
266extern "Rust" {
267 fn _embassy_rand(buf: &mut [u8]);
268}
269
270fn rand(buf: &mut [u8]) {
271 unsafe { _embassy_rand(buf) }
272}
diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs
index c18651b93..2d81e66bd 100644
--- a/embassy-net/src/tcp.rs
+++ b/embassy-net/src/tcp.rs
@@ -1,13 +1,16 @@
1use core::cell::UnsafeCell;
1use core::future::Future; 2use core::future::Future;
2use core::marker::PhantomData;
3use core::mem; 3use core::mem;
4use core::task::Poll; 4use core::task::Poll;
5use futures::future::poll_fn; 5use futures::future::poll_fn;
6use smoltcp::iface::{Context as SmolContext, SocketHandle}; 6use smoltcp::iface::{Interface, SocketHandle};
7use smoltcp::socket::TcpSocket as SyncTcpSocket; 7use smoltcp::socket::tcp;
8use smoltcp::socket::{TcpSocketBuffer, TcpState};
9use smoltcp::time::Duration; 8use smoltcp::time::Duration;
10use smoltcp::wire::IpEndpoint; 9use smoltcp::wire::IpEndpoint;
10use smoltcp::wire::IpListenEndpoint;
11
12use crate::stack::SocketStack;
13use crate::Device;
11 14
12use super::stack::Stack; 15use super::stack::Stack;
13 16
@@ -42,78 +45,68 @@ pub enum AcceptError {
42} 45}
43 46
44pub struct TcpSocket<'a> { 47pub struct TcpSocket<'a> {
45 handle: SocketHandle, 48 io: TcpIo<'a>,
46 ghost: PhantomData<&'a mut [u8]>,
47} 49}
48 50
49impl<'a> Unpin for TcpSocket<'a> {}
50
51pub struct TcpReader<'a> { 51pub struct TcpReader<'a> {
52 handle: SocketHandle, 52 io: TcpIo<'a>,
53 ghost: PhantomData<&'a mut [u8]>,
54} 53}
55 54
56impl<'a> Unpin for TcpReader<'a> {}
57
58pub struct TcpWriter<'a> { 55pub struct TcpWriter<'a> {
59 handle: SocketHandle, 56 io: TcpIo<'a>,
60 ghost: PhantomData<&'a mut [u8]>,
61} 57}
62 58
63impl<'a> Unpin for TcpWriter<'a> {}
64
65impl<'a> TcpSocket<'a> { 59impl<'a> TcpSocket<'a> {
66 pub fn new(rx_buffer: &'a mut [u8], tx_buffer: &'a mut [u8]) -> Self { 60 pub fn new<D: Device>(
67 let handle = Stack::with(|stack| { 61 stack: &'a Stack<D>,
68 let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) }; 62 rx_buffer: &'a mut [u8],
69 let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) }; 63 tx_buffer: &'a mut [u8],
70 stack.iface.add_socket(SyncTcpSocket::new( 64 ) -> Self {
71 TcpSocketBuffer::new(rx_buffer), 65 // safety: not accessed reentrantly.
72 TcpSocketBuffer::new(tx_buffer), 66 let s = unsafe { &mut *stack.socket.get() };
73 )) 67 let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
74 }); 68 let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
69 let handle = s.sockets.add(tcp::Socket::new(
70 tcp::SocketBuffer::new(rx_buffer),
71 tcp::SocketBuffer::new(tx_buffer),
72 ));
75 73
76 Self { 74 Self {
77 handle, 75 io: TcpIo {
78 ghost: PhantomData, 76 stack: &stack.socket,
77 handle,
78 },
79 } 79 }
80 } 80 }
81 81
82 pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) { 82 pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) {
83 ( 83 (TcpReader { io: self.io }, TcpWriter { io: self.io })
84 TcpReader {
85 handle: self.handle,
86 ghost: PhantomData,
87 },
88 TcpWriter {
89 handle: self.handle,
90 ghost: PhantomData,
91 },
92 )
93 } 84 }
94 85
95 pub async fn connect<T>(&mut self, remote_endpoint: T) -> Result<(), ConnectError> 86 pub async fn connect<T>(&mut self, remote_endpoint: T) -> Result<(), ConnectError>
96 where 87 where
97 T: Into<IpEndpoint>, 88 T: Into<IpEndpoint>,
98 { 89 {
99 let local_port = Stack::with(|stack| stack.get_local_port()); 90 // safety: not accessed reentrantly.
100 match with_socket(self.handle, |s, cx| { 91 let local_port = unsafe { &mut *self.io.stack.get() }.get_local_port();
101 s.connect(cx, remote_endpoint, local_port) 92
102 }) { 93 // safety: not accessed reentrantly.
94 match unsafe {
95 self.io
96 .with_mut(|s, i| s.connect(i, remote_endpoint, local_port))
97 } {
103 Ok(()) => {} 98 Ok(()) => {}
104 Err(smoltcp::Error::Illegal) => return Err(ConnectError::InvalidState), 99 Err(tcp::ConnectError::InvalidState) => return Err(ConnectError::InvalidState),
105 Err(smoltcp::Error::Unaddressable) => return Err(ConnectError::NoRoute), 100 Err(tcp::ConnectError::Unaddressable) => return Err(ConnectError::NoRoute),
106 // smoltcp returns no errors other than the above.
107 Err(_) => unreachable!(),
108 } 101 }
109 102
110 futures::future::poll_fn(|cx| { 103 futures::future::poll_fn(|cx| unsafe {
111 with_socket(self.handle, |s, _| match s.state() { 104 self.io.with_mut(|s, _| match s.state() {
112 TcpState::Closed | TcpState::TimeWait => { 105 tcp::State::Closed | tcp::State::TimeWait => {
113 Poll::Ready(Err(ConnectError::ConnectionReset)) 106 Poll::Ready(Err(ConnectError::ConnectionReset))
114 } 107 }
115 TcpState::Listen => unreachable!(), 108 tcp::State::Listen => unreachable!(),
116 TcpState::SynSent | TcpState::SynReceived => { 109 tcp::State::SynSent | tcp::State::SynReceived => {
117 s.register_send_waker(cx.waker()); 110 s.register_send_waker(cx.waker());
118 Poll::Pending 111 Poll::Pending
119 } 112 }
@@ -125,19 +118,18 @@ impl<'a> TcpSocket<'a> {
125 118
126 pub async fn accept<T>(&mut self, local_endpoint: T) -> Result<(), AcceptError> 119 pub async fn accept<T>(&mut self, local_endpoint: T) -> Result<(), AcceptError>
127 where 120 where
128 T: Into<IpEndpoint>, 121 T: Into<IpListenEndpoint>,
129 { 122 {
130 match with_socket(self.handle, |s, _| s.listen(local_endpoint)) { 123 // safety: not accessed reentrantly.
124 match unsafe { self.io.with_mut(|s, _| s.listen(local_endpoint)) } {
131 Ok(()) => {} 125 Ok(()) => {}
132 Err(smoltcp::Error::Illegal) => return Err(AcceptError::InvalidState), 126 Err(tcp::ListenError::InvalidState) => return Err(AcceptError::InvalidState),
133 Err(smoltcp::Error::Unaddressable) => return Err(AcceptError::InvalidPort), 127 Err(tcp::ListenError::Unaddressable) => return Err(AcceptError::InvalidPort),
134 // smoltcp returns no errors other than the above.
135 Err(_) => unreachable!(),
136 } 128 }
137 129
138 futures::future::poll_fn(|cx| { 130 futures::future::poll_fn(|cx| unsafe {
139 with_socket(self.handle, |s, _| match s.state() { 131 self.io.with_mut(|s, _| match s.state() {
140 TcpState::Listen | TcpState::SynSent | TcpState::SynReceived => { 132 tcp::State::Listen | tcp::State::SynSent | tcp::State::SynReceived => {
141 s.register_send_waker(cx.waker()); 133 s.register_send_waker(cx.waker());
142 Poll::Pending 134 Poll::Pending
143 } 135 }
@@ -148,88 +140,84 @@ impl<'a> TcpSocket<'a> {
148 } 140 }
149 141
150 pub fn set_timeout(&mut self, duration: Option<Duration>) { 142 pub fn set_timeout(&mut self, duration: Option<Duration>) {
151 with_socket(self.handle, |s, _| s.set_timeout(duration)) 143 unsafe { self.io.with_mut(|s, _| s.set_timeout(duration)) }
152 } 144 }
153 145
154 pub fn set_keep_alive(&mut self, interval: Option<Duration>) { 146 pub fn set_keep_alive(&mut self, interval: Option<Duration>) {
155 with_socket(self.handle, |s, _| s.set_keep_alive(interval)) 147 unsafe { self.io.with_mut(|s, _| s.set_keep_alive(interval)) }
156 } 148 }
157 149
158 pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) { 150 pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) {
159 with_socket(self.handle, |s, _| s.set_hop_limit(hop_limit)) 151 unsafe { self.io.with_mut(|s, _| s.set_hop_limit(hop_limit)) }
160 } 152 }
161 153
162 pub fn local_endpoint(&self) -> IpEndpoint { 154 pub fn local_endpoint(&self) -> Option<IpEndpoint> {
163 with_socket(self.handle, |s, _| s.local_endpoint()) 155 unsafe { self.io.with(|s, _| s.local_endpoint()) }
164 } 156 }
165 157
166 pub fn remote_endpoint(&self) -> IpEndpoint { 158 pub fn remote_endpoint(&self) -> Option<IpEndpoint> {
167 with_socket(self.handle, |s, _| s.remote_endpoint()) 159 unsafe { self.io.with(|s, _| s.remote_endpoint()) }
168 } 160 }
169 161
170 pub fn state(&self) -> TcpState { 162 pub fn state(&self) -> tcp::State {
171 with_socket(self.handle, |s, _| s.state()) 163 unsafe { self.io.with(|s, _| s.state()) }
172 } 164 }
173 165
174 pub fn close(&mut self) { 166 pub fn close(&mut self) {
175 with_socket(self.handle, |s, _| s.close()) 167 unsafe { self.io.with_mut(|s, _| s.close()) }
176 } 168 }
177 169
178 pub fn abort(&mut self) { 170 pub fn abort(&mut self) {
179 with_socket(self.handle, |s, _| s.abort()) 171 unsafe { self.io.with_mut(|s, _| s.abort()) }
180 } 172 }
181 173
182 pub fn may_send(&self) -> bool { 174 pub fn may_send(&self) -> bool {
183 with_socket(self.handle, |s, _| s.may_send()) 175 unsafe { self.io.with(|s, _| s.may_send()) }
184 } 176 }
185 177
186 pub fn may_recv(&self) -> bool { 178 pub fn may_recv(&self) -> bool {
187 with_socket(self.handle, |s, _| s.may_recv()) 179 unsafe { self.io.with(|s, _| s.may_recv()) }
188 } 180 }
189} 181}
190 182
191fn with_socket<R>(
192 handle: SocketHandle,
193 f: impl FnOnce(&mut SyncTcpSocket, &mut SmolContext) -> R,
194) -> R {
195 Stack::with(|stack| {
196 let res = {
197 let (s, cx) = stack.iface.get_socket_and_context::<SyncTcpSocket>(handle);
198 f(s, cx)
199 };
200 stack.wake();
201 res
202 })
203}
204
205impl<'a> Drop for TcpSocket<'a> { 183impl<'a> Drop for TcpSocket<'a> {
206 fn drop(&mut self) { 184 fn drop(&mut self) {
207 Stack::with(|stack| { 185 // safety: not accessed reentrantly.
208 stack.iface.remove_socket(self.handle); 186 let s = unsafe { &mut *self.io.stack.get() };
209 }) 187 s.sockets.remove(self.io.handle);
210 } 188 }
211} 189}
212 190
213impl embedded_io::Error for Error { 191// =======================
214 fn kind(&self) -> embedded_io::ErrorKind {
215 embedded_io::ErrorKind::Other
216 }
217}
218 192
219impl<'d> embedded_io::Io for TcpSocket<'d> { 193#[derive(Copy, Clone)]
220 type Error = Error; 194pub struct TcpIo<'a> {
195 stack: &'a UnsafeCell<SocketStack>,
196 handle: SocketHandle,
221} 197}
222 198
223impl<'d> embedded_io::asynch::Read for TcpSocket<'d> { 199impl<'d> TcpIo<'d> {
224 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> 200 /// SAFETY: must not call reentrantly.
225 where 201 unsafe fn with<R>(&self, f: impl FnOnce(&tcp::Socket, &Interface) -> R) -> R {
226 Self: 'a; 202 let s = &*self.stack.get();
203 let socket = s.sockets.get::<tcp::Socket>(self.handle);
204 f(socket, &s.iface)
205 }
227 206
228 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { 207 /// SAFETY: must not call reentrantly.
229 poll_fn(move |cx| { 208 unsafe fn with_mut<R>(&mut self, f: impl FnOnce(&mut tcp::Socket, &mut Interface) -> R) -> R {
209 let s = &mut *self.stack.get();
210 let socket = s.sockets.get_mut::<tcp::Socket>(self.handle);
211 let res = f(socket, &mut s.iface);
212 s.waker.wake();
213 res
214 }
215
216 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
217 poll_fn(move |cx| unsafe {
230 // CAUTION: smoltcp semantics around EOF are different to what you'd expect 218 // CAUTION: smoltcp semantics around EOF are different to what you'd expect
231 // from posix-like IO, so we have to tweak things here. 219 // from posix-like IO, so we have to tweak things here.
232 with_socket(self.handle, |s, _| match s.recv_slice(buf) { 220 self.with_mut(|s, _| match s.recv_slice(buf) {
233 // No data ready 221 // No data ready
234 Ok(0) => { 222 Ok(0) => {
235 s.register_recv_waker(cx.waker()); 223 s.register_recv_waker(cx.waker());
@@ -238,24 +226,17 @@ impl<'d> embedded_io::asynch::Read for TcpSocket<'d> {
238 // Data ready! 226 // Data ready!
239 Ok(n) => Poll::Ready(Ok(n)), 227 Ok(n) => Poll::Ready(Ok(n)),
240 // EOF 228 // EOF
241 Err(smoltcp::Error::Finished) => Poll::Ready(Ok(0)), 229 Err(tcp::RecvError::Finished) => Poll::Ready(Ok(0)),
242 // Connection reset. TODO: this can also be timeouts etc, investigate. 230 // Connection reset. TODO: this can also be timeouts etc, investigate.
243 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)), 231 Err(tcp::RecvError::InvalidState) => Poll::Ready(Err(Error::ConnectionReset)),
244 // smoltcp returns no errors other than the above.
245 Err(_) => unreachable!(),
246 }) 232 })
247 }) 233 })
234 .await
248 } 235 }
249}
250
251impl<'d> embedded_io::asynch::Write for TcpSocket<'d> {
252 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
253 where
254 Self: 'a;
255 236
256 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { 237 async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
257 poll_fn(move |cx| { 238 poll_fn(move |cx| unsafe {
258 with_socket(self.handle, |s, _| match s.send_slice(buf) { 239 self.with_mut(|s, _| match s.send_slice(buf) {
259 // Not ready to send (no space in the tx buffer) 240 // Not ready to send (no space in the tx buffer)
260 Ok(0) => { 241 Ok(0) => {
261 s.register_send_waker(cx.waker()); 242 s.register_send_waker(cx.waker());
@@ -264,11 +245,47 @@ impl<'d> embedded_io::asynch::Write for TcpSocket<'d> {
264 // Some data sent 245 // Some data sent
265 Ok(n) => Poll::Ready(Ok(n)), 246 Ok(n) => Poll::Ready(Ok(n)),
266 // Connection reset. TODO: this can also be timeouts etc, investigate. 247 // Connection reset. TODO: this can also be timeouts etc, investigate.
267 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)), 248 Err(tcp::SendError::InvalidState) => Poll::Ready(Err(Error::ConnectionReset)),
268 // smoltcp returns no errors other than the above.
269 Err(_) => unreachable!(),
270 }) 249 })
271 }) 250 })
251 .await
252 }
253
254 async fn flush(&mut self) -> Result<(), Error> {
255 poll_fn(move |_| {
256 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
257 })
258 .await
259 }
260}
261
262impl embedded_io::Error for Error {
263 fn kind(&self) -> embedded_io::ErrorKind {
264 embedded_io::ErrorKind::Other
265 }
266}
267
268impl<'d> embedded_io::Io for TcpSocket<'d> {
269 type Error = Error;
270}
271
272impl<'d> embedded_io::asynch::Read for TcpSocket<'d> {
273 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
274 where
275 Self: 'a;
276
277 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
278 self.io.read(buf)
279 }
280}
281
282impl<'d> embedded_io::asynch::Write for TcpSocket<'d> {
283 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
284 where
285 Self: 'a;
286
287 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
288 self.io.write(buf)
272 } 289 }
273 290
274 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 291 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
@@ -276,9 +293,7 @@ impl<'d> embedded_io::asynch::Write for TcpSocket<'d> {
276 Self: 'a; 293 Self: 'a;
277 294
278 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { 295 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
279 poll_fn(move |_| { 296 self.io.flush()
280 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
281 })
282 } 297 }
283} 298}
284 299
@@ -292,25 +307,7 @@ impl<'d> embedded_io::asynch::Read for TcpReader<'d> {
292 Self: 'a; 307 Self: 'a;
293 308
294 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { 309 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
295 poll_fn(move |cx| { 310 self.io.read(buf)
296 // CAUTION: smoltcp semantics around EOF are different to what you'd expect
297 // from posix-like IO, so we have to tweak things here.
298 with_socket(self.handle, |s, _| match s.recv_slice(buf) {
299 // No data ready
300 Ok(0) => {
301 s.register_recv_waker(cx.waker());
302 Poll::Pending
303 }
304 // Data ready!
305 Ok(n) => Poll::Ready(Ok(n)),
306 // EOF
307 Err(smoltcp::Error::Finished) => Poll::Ready(Ok(0)),
308 // Connection reset. TODO: this can also be timeouts etc, investigate.
309 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)),
310 // smoltcp returns no errors other than the above.
311 Err(_) => unreachable!(),
312 })
313 })
314 } 311 }
315} 312}
316 313
@@ -324,21 +321,7 @@ impl<'d> embedded_io::asynch::Write for TcpWriter<'d> {
324 Self: 'a; 321 Self: 'a;
325 322
326 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { 323 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
327 poll_fn(move |cx| { 324 self.io.write(buf)
328 with_socket(self.handle, |s, _| match s.send_slice(buf) {
329 // Not ready to send (no space in the tx buffer)
330 Ok(0) => {
331 s.register_send_waker(cx.waker());
332 Poll::Pending
333 }
334 // Some data sent
335 Ok(n) => Poll::Ready(Ok(n)),
336 // Connection reset. TODO: this can also be timeouts etc, investigate.
337 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)),
338 // smoltcp returns no errors other than the above.
339 Err(_) => unreachable!(),
340 })
341 })
342 } 325 }
343 326
344 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> 327 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
@@ -346,8 +329,6 @@ impl<'d> embedded_io::asynch::Write for TcpWriter<'d> {
346 Self: 'a; 329 Self: 'a;
347 330
348 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { 331 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
349 poll_fn(move |_| { 332 self.io.flush()
350 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
351 })
352 } 333 }
353} 334}