aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-net/src/tcp.rs14
-rw-r--r--examples/std/src/bin/tcp_accept.rs133
2 files changed, 144 insertions, 3 deletions
diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs
index 732b6d217..7babb5293 100644
--- a/embassy-net/src/tcp.rs
+++ b/embassy-net/src/tcp.rs
@@ -95,7 +95,8 @@ impl<'a> TcpWriter<'a> {
95 95
96 /// Flushes the written data to the socket. 96 /// Flushes the written data to the socket.
97 /// 97 ///
98 /// This waits until all data has been sent, and ACKed by the remote host. 98 /// This waits until all data has been sent, and ACKed by the remote host. For a connection
99 /// closed with [`abort()`](TcpSocket::abort) it will wait for the TCP RST packet to be sent.
99 pub async fn flush(&mut self) -> Result<(), Error> { 100 pub async fn flush(&mut self) -> Result<(), Error> {
100 self.io.flush().await 101 self.io.flush().await
101 } 102 }
@@ -198,7 +199,8 @@ impl<'a> TcpSocket<'a> {
198 199
199 /// Flushes the written data to the socket. 200 /// Flushes the written data to the socket.
200 /// 201 ///
201 /// This waits until all data has been sent, and ACKed by the remote host. 202 /// This waits until all data has been sent, and ACKed by the remote host. For a connection
203 /// closed with [`abort()`](TcpSocket::abort) it will wait for the TCP RST packet to be sent.
202 pub async fn flush(&mut self) -> Result<(), Error> { 204 pub async fn flush(&mut self) -> Result<(), Error> {
203 self.io.flush().await 205 self.io.flush().await
204 } 206 }
@@ -262,6 +264,11 @@ impl<'a> TcpSocket<'a> {
262 /// 264 ///
263 /// This instantly closes both the read and write halves of the socket. Any pending data 265 /// This instantly closes both the read and write halves of the socket. Any pending data
264 /// that has not been sent will be lost. 266 /// that has not been sent will be lost.
267 ///
268 /// Note that the TCP RST packet is not sent immediately - if the `TcpSocket` is dropped too soon
269 /// the remote host may not know the connection has been closed.
270 /// `abort()` callers should wait for a [`flush()`](TcpSocket::flush) call to complete before
271 /// dropping or reusing the socket.
265 pub fn abort(&mut self) { 272 pub fn abort(&mut self) {
266 self.io.with_mut(|s, _| s.abort()) 273 self.io.with_mut(|s, _| s.abort())
267 } 274 }
@@ -347,9 +354,10 @@ impl<'d> TcpIo<'d> {
347 async fn flush(&mut self) -> Result<(), Error> { 354 async fn flush(&mut self) -> Result<(), Error> {
348 poll_fn(move |cx| { 355 poll_fn(move |cx| {
349 self.with_mut(|s, _| { 356 self.with_mut(|s, _| {
357 let waiting_close = s.state() == tcp::State::Closed && s.remote_endpoint().is_some();
350 // If there are outstanding send operations, register for wake up and wait 358 // If there are outstanding send operations, register for wake up and wait
351 // smoltcp issues wake-ups when octets are dequeued from the send buffer 359 // smoltcp issues wake-ups when octets are dequeued from the send buffer
352 if s.send_queue() > 0 { 360 if s.send_queue() > 0 || waiting_close {
353 s.register_send_waker(cx.waker()); 361 s.register_send_waker(cx.waker());
354 Poll::Pending 362 Poll::Pending
355 // No outstanding sends, socket is flushed 363 // No outstanding sends, socket is flushed
diff --git a/examples/std/src/bin/tcp_accept.rs b/examples/std/src/bin/tcp_accept.rs
new file mode 100644
index 000000000..97ce77f42
--- /dev/null
+++ b/examples/std/src/bin/tcp_accept.rs
@@ -0,0 +1,133 @@
1#![feature(type_alias_impl_trait)]
2
3use core::fmt::Write as _;
4use std::default::Default;
5
6use clap::Parser;
7use embassy_executor::{Executor, Spawner};
8use embassy_net::tcp::TcpSocket;
9use embassy_net::{Config, Ipv4Address, Ipv4Cidr, Stack, StackResources};
10use embassy_time::{Duration, Timer};
11use embedded_io::asynch::Write as _;
12use heapless::Vec;
13use log::*;
14use rand_core::{OsRng, RngCore};
15use static_cell::StaticCell;
16
17#[path = "../tuntap.rs"]
18mod tuntap;
19
20use crate::tuntap::TunTapDevice;
21
22macro_rules! singleton {
23 ($val:expr) => {{
24 type T = impl Sized;
25 static STATIC_CELL: StaticCell<T> = StaticCell::new();
26 STATIC_CELL.init_with(move || $val)
27 }};
28}
29
30#[derive(Parser)]
31#[clap(version = "1.0")]
32struct Opts {
33 /// TAP device name
34 #[clap(long, default_value = "tap0")]
35 tap: String,
36 /// use a static IP instead of DHCP
37 #[clap(long)]
38 static_ip: bool,
39}
40
41#[embassy_executor::task]
42async fn net_task(stack: &'static Stack<TunTapDevice>) -> ! {
43 stack.run().await
44}
45
46#[derive(Default)]
47struct StrWrite(pub heapless::Vec<u8, 30>);
48
49impl core::fmt::Write for StrWrite {
50 fn write_str(&mut self, s: &str) -> Result<(), core::fmt::Error> {
51 self.0.extend_from_slice(s.as_bytes()).unwrap();
52 Ok(())
53 }
54}
55
56#[embassy_executor::task]
57async fn main_task(spawner: Spawner) {
58 let opts: Opts = Opts::parse();
59
60 // Init network device
61 let device = TunTapDevice::new(&opts.tap).unwrap();
62
63 // Choose between dhcp or static ip
64 let config = if opts.static_ip {
65 Config::Static(embassy_net::StaticConfig {
66 address: Ipv4Cidr::new(Ipv4Address::new(192, 168, 69, 2), 24),
67 dns_servers: Vec::new(),
68 gateway: Some(Ipv4Address::new(192, 168, 69, 1)),
69 })
70 } else {
71 Config::Dhcp(Default::default())
72 };
73
74 // Generate random seed
75 let mut seed = [0; 8];
76 OsRng.fill_bytes(&mut seed);
77 let seed = u64::from_le_bytes(seed);
78
79 // Init network stack
80 let stack = &*singleton!(Stack::new(device, config, singleton!(StackResources::<3>::new()), seed));
81
82 // Launch network task
83 spawner.spawn(net_task(stack)).unwrap();
84
85 // Then we can use it!
86 let mut rx_buffer = [0; 4096];
87 let mut tx_buffer = [0; 4096];
88
89 loop {
90 let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
91 socket.set_timeout(Some(Duration::from_secs(10)));
92 info!("Listening on TCP:9999...");
93 if let Err(_) = socket.accept(9999).await {
94 warn!("accept error");
95 continue;
96 }
97
98 info!("Accepted a connection");
99
100 // Write some quick output
101 for i in 1..=5 {
102 let mut w = StrWrite::default();
103 write!(w, "{}! ", i).unwrap();
104 let r = socket.write_all(&w.0).await;
105 if let Err(e) = r {
106 warn!("write error: {:?}", e);
107 return;
108 }
109
110 Timer::after(Duration::from_millis(500)).await;
111 }
112 info!("Closing the connection");
113 socket.abort();
114 info!("Flushing the RST out...");
115 socket.flush().await;
116 info!("Finished with the socket");
117 }
118}
119
120static EXECUTOR: StaticCell<Executor> = StaticCell::new();
121
122fn main() {
123 env_logger::builder()
124 .filter_level(log::LevelFilter::Debug)
125 .filter_module("async_io", log::LevelFilter::Info)
126 .format_timestamp_nanos()
127 .init();
128
129 let executor = EXECUTOR.init(Executor::new());
130 executor.run(|spawner| {
131 spawner.spawn(main_task(spawner)).unwrap();
132 });
133}