aboutsummaryrefslogtreecommitdiff
path: root/embassy-net/src
diff options
context:
space:
mode:
authorArtur Kowalski <[email protected]>2022-07-28 10:25:47 +0200
committerArtur Kowalski <[email protected]>2022-08-10 19:40:35 +0200
commitd5ab0d3ebb119c7ffd95da4b67325f75cae05b7e (patch)
treeda0c94e370118c9df5e3a1e9582aa684035063fa /embassy-net/src
parent0e524247fa4adc524c546b0d073e7061ad6c1b83 (diff)
Add UDP socket support
Diffstat (limited to 'embassy-net/src')
-rw-r--r--embassy-net/src/lib.rs5
-rw-r--r--embassy-net/src/udp.rs227
2 files changed, 232 insertions, 0 deletions
diff --git a/embassy-net/src/lib.rs b/embassy-net/src/lib.rs
index 1c5ba103a..83d364715 100644
--- a/embassy-net/src/lib.rs
+++ b/embassy-net/src/lib.rs
@@ -16,6 +16,9 @@ pub use stack::{Config, ConfigStrategy, Stack, StackResources};
16#[cfg(feature = "tcp")] 16#[cfg(feature = "tcp")]
17pub mod tcp; 17pub mod tcp;
18 18
19#[cfg(feature = "udp")]
20pub mod udp;
21
19// smoltcp reexports 22// smoltcp reexports
20pub use smoltcp::phy::{DeviceCapabilities, Medium}; 23pub use smoltcp::phy::{DeviceCapabilities, Medium};
21pub use smoltcp::time::{Duration as SmolDuration, Instant as SmolInstant}; 24pub use smoltcp::time::{Duration as SmolDuration, Instant as SmolInstant};
@@ -24,3 +27,5 @@ pub use smoltcp::wire::{EthernetAddress, HardwareAddress};
24pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr}; 27pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr};
25#[cfg(feature = "proto-ipv6")] 28#[cfg(feature = "proto-ipv6")]
26pub use smoltcp::wire::{Ipv6Address, Ipv6Cidr}; 29pub use smoltcp::wire::{Ipv6Address, Ipv6Cidr};
30#[cfg(feature = "udp")]
31pub use smoltcp::{socket::udp::PacketMetadata, wire::IpListenEndpoint};
diff --git a/embassy-net/src/udp.rs b/embassy-net/src/udp.rs
new file mode 100644
index 000000000..6b15805c2
--- /dev/null
+++ b/embassy-net/src/udp.rs
@@ -0,0 +1,227 @@
1use core::cell::UnsafeCell;
2use core::mem;
3use core::task::Poll;
4
5use futures::future::poll_fn;
6use smoltcp::iface::{Interface, SocketHandle};
7use smoltcp::socket::udp::{self, PacketMetadata};
8use smoltcp::wire::{IpEndpoint, IpListenEndpoint};
9
10use super::stack::SocketStack;
11use crate::{Device, Stack};
12
13#[derive(PartialEq, Eq, Clone, Copy, Debug)]
14#[cfg_attr(feature = "defmt", derive(defmt::Format))]
15pub enum BindError {
16 /// The socket was already open.
17 InvalidState,
18 /// No route to host.
19 NoRoute,
20}
21
22#[derive(PartialEq, Eq, Clone, Copy, Debug)]
23#[cfg_attr(feature = "defmt", derive(defmt::Format))]
24pub enum Error {
25 /// No route to host.
26 NoRoute,
27}
28
29pub struct UdpSocket<'a> {
30 io: UdpIo<'a>,
31}
32
33pub struct UdpReader<'a> {
34 io: UdpIo<'a>,
35}
36
37pub struct UdpWriter<'a> {
38 io: UdpIo<'a>,
39}
40
41impl<'a> UdpSocket<'a> {
42 pub fn new<D: Device>(
43 stack: &'a Stack<D>,
44 rx_meta: &'a mut [PacketMetadata],
45 rx_buffer: &'a mut [u8],
46 tx_meta: &'a mut [PacketMetadata],
47 tx_buffer: &'a mut [u8],
48 ) -> Self {
49 // safety: not accessed reentrantly.
50 let s = unsafe { &mut *stack.socket.get() };
51
52 let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) };
53 let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
54 let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) };
55 let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
56 let handle = s.sockets.add(udp::Socket::new(
57 udp::PacketBuffer::new(rx_meta, rx_buffer),
58 udp::PacketBuffer::new(tx_meta, tx_buffer),
59 ));
60
61 Self {
62 io: UdpIo {
63 stack: &stack.socket,
64 handle,
65 },
66 }
67 }
68
69 pub fn split(&mut self) -> (UdpReader<'_>, UdpWriter<'_>) {
70 (UdpReader { io: self.io }, UdpWriter { io: self.io })
71 }
72
73 pub fn bind<T>(&mut self, endpoint: T) -> Result<(), BindError>
74 where
75 T: Into<IpListenEndpoint>,
76 {
77 let mut endpoint = endpoint.into();
78
79 // safety: not accessed reentrantly.
80 if endpoint.port == 0 {
81 // If user didn't specify port allocate a dynamic port.
82 endpoint.port = unsafe { &mut *self.io.stack.get() }.get_local_port();
83 }
84
85 // safety: not accessed reentrantly.
86 match unsafe { self.io.with_mut(|s, _| s.bind(endpoint)) } {
87 Ok(()) => Ok(()),
88 Err(udp::BindError::InvalidState) => Err(BindError::InvalidState),
89 Err(udp::BindError::Unaddressable) => Err(BindError::NoRoute),
90 }
91 }
92
93 pub async fn send_to<T>(&mut self, buf: &[u8], remote_endpoint: T) -> Result<(), Error>
94 where
95 T: Into<IpEndpoint>,
96 {
97 self.io.write(buf, remote_endpoint.into()).await
98 }
99
100 pub async fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> {
101 self.io.read(buf).await
102 }
103
104 pub async fn flush(&mut self) -> Result<(), Error> {
105 self.io.flush().await
106 }
107
108 pub fn endpoint(&self) -> IpListenEndpoint {
109 unsafe { self.io.with(|s, _| s.endpoint()) }
110 }
111
112 pub fn is_open(&self) -> bool {
113 unsafe { self.io.with(|s, _| s.is_open()) }
114 }
115
116 pub fn close(&mut self) {
117 unsafe { self.io.with_mut(|s, _| s.close()) }
118 }
119
120 pub fn may_send(&self) -> bool {
121 unsafe { self.io.with(|s, _| s.can_send()) }
122 }
123
124 pub fn may_recv(&self) -> bool {
125 unsafe { self.io.with(|s, _| s.can_recv()) }
126 }
127}
128
129impl Drop for UdpSocket<'_> {
130 fn drop(&mut self) {
131 // safety: not accessed reentrantly.
132 let s = unsafe { &mut *self.io.stack.get() };
133 s.sockets.remove(self.io.handle);
134 }
135}
136
137#[derive(Copy, Clone)]
138pub struct UdpIo<'a> {
139 stack: &'a UnsafeCell<SocketStack>,
140 handle: SocketHandle,
141}
142
143impl UdpIo<'_> {
144 /// SAFETY: must not call reentrantly.
145 unsafe fn with<R>(&self, f: impl FnOnce(&udp::Socket, &Interface) -> R) -> R {
146 let s = &*self.stack.get();
147 let socket = s.sockets.get::<udp::Socket>(self.handle);
148 f(socket, &s.iface)
149 }
150
151 /// SAFETY: must not call reentrantly.
152 unsafe fn with_mut<R>(&mut self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R {
153 let s = &mut *self.stack.get();
154 let socket = s.sockets.get_mut::<udp::Socket>(self.handle);
155 let res = f(socket, &mut s.iface);
156 s.waker.wake();
157 res
158 }
159
160 async fn read(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> {
161 poll_fn(move |cx| unsafe {
162 self.with_mut(|s, _| match s.recv_slice(buf) {
163 Ok(x) => Poll::Ready(Ok(x)),
164 // No data ready
165 Err(udp::RecvError::Exhausted) => {
166 //s.register_recv_waker(cx.waker());
167 cx.waker().wake_by_ref();
168 Poll::Pending
169 }
170 })
171 })
172 .await
173 }
174
175 async fn write(&mut self, buf: &[u8], ep: IpEndpoint) -> Result<(), Error> {
176 poll_fn(move |cx| unsafe {
177 self.with_mut(|s, _| match s.send_slice(buf, ep) {
178 // Entire datagram has been sent
179 Ok(()) => Poll::Ready(Ok(())),
180 Err(udp::SendError::BufferFull) => {
181 s.register_send_waker(cx.waker());
182 Poll::Pending
183 }
184 Err(udp::SendError::Unaddressable) => Poll::Ready(Err(Error::NoRoute)),
185 })
186 })
187 .await
188 }
189
190 async fn flush(&mut self) -> Result<(), Error> {
191 poll_fn(move |_| {
192 Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
193 })
194 .await
195 }
196}
197
198impl UdpReader<'_> {
199 pub async fn recv_from(&mut self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> {
200 self.io.read(buf).await
201 }
202}
203
204impl UdpWriter<'_> {
205 pub async fn send_to<T>(&mut self, buf: &[u8], remote_endpoint: T) -> Result<(), Error>
206 where
207 T: Into<IpEndpoint>,
208 {
209 self.io.write(buf, remote_endpoint.into()).await
210 }
211
212 pub async fn flush(&mut self) -> Result<(), Error> {
213 self.io.flush().await
214 }
215}
216
217impl embedded_io::Error for BindError {
218 fn kind(&self) -> embedded_io::ErrorKind {
219 embedded_io::ErrorKind::Other
220 }
221}
222
223impl embedded_io::Error for Error {
224 fn kind(&self) -> embedded_io::ErrorKind {
225 embedded_io::ErrorKind::Other
226 }
227}