diff options
| author | diogo464 <[email protected]> | 2025-12-06 15:05:48 +0000 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-12-06 15:06:44 +0000 |
| commit | 077b4c04d1eba229d2bdd42952f5617966c6b3fe (patch) | |
| tree | edce665979c90dabe61f96e1769c1bd63d96ab25 | |
| parent | 3a974cec36cd8a48992e05629325d8279cf289b7 (diff) | |
Implement connect_and_run helper function
Added a convenience function to handle connection lifecycle with automatic reconnection. The function supports:
- Parsing addresses as IP:port, IP with default port (1883), or hostname with DNS resolution
- Filtering DNS results for IPv4 addresses only
- Automatic reconnection with 5-second delay on failures
- Comprehensive error logging for DNS, TCP, and device run failures
- TCP socket timeout of 10 seconds
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
| -rw-r--r-- | Cargo.toml | 2 | ||||
| -rw-r--r-- | src/lib.rs | 103 |
2 files changed, 103 insertions, 2 deletions
| @@ -10,7 +10,7 @@ tracing = ["dep:tracing"] | |||
| 10 | 10 | ||
| 11 | [dependencies] | 11 | [dependencies] |
| 12 | embedded-mqtt = { path = "./embedded-mqtt" , features = ["embassy-net"] } | 12 | embedded-mqtt = { path = "./embedded-mqtt" , features = ["embassy-net"] } |
| 13 | embassy-net = { version = "0.7.1", features = ["medium-ip", "proto-ipv4", "tcp"] } | 13 | embassy-net = { version = "0.7.1", features = ["medium-ip", "proto-ipv4", "tcp", "dns"] } |
| 14 | heapless = "0.9.2" | 14 | heapless = "0.9.2" |
| 15 | embassy-time = { version = "0.5.0" } | 15 | embassy-time = { version = "0.5.0" } |
| 16 | serde-json-core = "0.6.0" | 16 | serde-json-core = "0.6.0" |
| @@ -1,8 +1,10 @@ | |||
| 1 | #![no_std] | 1 | #![no_std] |
| 2 | 2 | ||
| 3 | use core::{cell::RefCell, task::Waker}; | 3 | use core::{cell::RefCell, net::{Ipv4Addr, SocketAddrV4}, task::Waker}; |
| 4 | 4 | ||
| 5 | use embassy_net::tcp::TcpSocket; | ||
| 5 | use embassy_sync::waitqueue::AtomicWaker; | 6 | use embassy_sync::waitqueue::AtomicWaker; |
| 7 | use embassy_time::Timer; | ||
| 6 | use heapless::{ | 8 | use heapless::{ |
| 7 | Vec, VecView, | 9 | Vec, VecView, |
| 8 | string::{String, StringView}, | 10 | string::{String, StringView}, |
| @@ -905,6 +907,105 @@ impl<'a> Device<'a> { | |||
| 905 | } | 907 | } |
| 906 | } | 908 | } |
| 907 | 909 | ||
| 910 | pub async fn connect_and_run( | ||
| 911 | stack: embassy_net::Stack<'_>, | ||
| 912 | mut device: Device<'_>, | ||
| 913 | address: &str, | ||
| 914 | ) -> ! { | ||
| 915 | const DEFAULT_MQTT_PORT: u16 = 1883; | ||
| 916 | |||
| 917 | let mut rx_buffer = [0u8; 1024]; | ||
| 918 | let mut tx_buffer = [0u8; 1024]; | ||
| 919 | let mut delay = false; | ||
| 920 | |||
| 921 | loop { | ||
| 922 | if !delay { | ||
| 923 | delay = true; | ||
| 924 | } else { | ||
| 925 | crate::log::info!("Retrying connection in 5 seconds..."); | ||
| 926 | Timer::after_secs(5).await; | ||
| 927 | } | ||
| 928 | |||
| 929 | let addr = { | ||
| 930 | // Try to parse as complete SocketAddrV4 first (e.g., "192.168.1.1:1883") | ||
| 931 | if let Ok(sock_addr) = address.parse::<SocketAddrV4>() { | ||
| 932 | sock_addr | ||
| 933 | } | ||
| 934 | // Try to parse as Ipv4Addr with default port (e.g., "192.168.1.1") | ||
| 935 | else if let Ok(ip_addr) = address.parse::<Ipv4Addr>() { | ||
| 936 | SocketAddrV4::new(ip_addr, DEFAULT_MQTT_PORT) | ||
| 937 | } | ||
| 938 | // Otherwise, parse as hostname:port or hostname | ||
| 939 | else { | ||
| 940 | let (addr_str, port) = match address.split_once(':') { | ||
| 941 | Some((addr_str, port_str)) => { | ||
| 942 | let port = port_str | ||
| 943 | .parse::<u16>() | ||
| 944 | .expect("Invalid port number in address"); | ||
| 945 | (addr_str, port) | ||
| 946 | } | ||
| 947 | None => (address, DEFAULT_MQTT_PORT), | ||
| 948 | }; | ||
| 949 | |||
| 950 | let addrs = match stack | ||
| 951 | .dns_query(addr_str, embassy_net::dns::DnsQueryType::A) | ||
| 952 | .await | ||
| 953 | { | ||
| 954 | Ok(addrs) => addrs, | ||
| 955 | Err(err) => { | ||
| 956 | crate::log::error!( | ||
| 957 | "DNS query for '{}' failed with: {:?}", | ||
| 958 | addr_str, | ||
| 959 | crate::log::Debug2Format(&err) | ||
| 960 | ); | ||
| 961 | continue; | ||
| 962 | } | ||
| 963 | }; | ||
| 964 | |||
| 965 | let ipv4_addr = match addrs | ||
| 966 | .iter() | ||
| 967 | .filter_map(|addr| match addr { | ||
| 968 | embassy_net::IpAddress::Ipv4(ipv4) => Some((*ipv4).into()), | ||
| 969 | }) | ||
| 970 | .next() | ||
| 971 | { | ||
| 972 | Some(addr) => addr, | ||
| 973 | None => { | ||
| 974 | crate::log::error!( | ||
| 975 | "DNS query for '{}' returned no IPv4 addresses", | ||
| 976 | addr_str | ||
| 977 | ); | ||
| 978 | continue; | ||
| 979 | } | ||
| 980 | }; | ||
| 981 | |||
| 982 | SocketAddrV4::new(ipv4_addr, port) | ||
| 983 | } | ||
| 984 | }; | ||
| 985 | |||
| 986 | crate::log::info!("Connecting to MQTT broker at {}", addr); | ||
| 987 | |||
| 988 | let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); | ||
| 989 | socket.set_timeout(Some(embassy_time::Duration::from_secs(10))); | ||
| 990 | |||
| 991 | if let Err(err) = socket.connect(addr).await { | ||
| 992 | crate::log::error!( | ||
| 993 | "TCP connect to {} failed with: {:?}", | ||
| 994 | addr, | ||
| 995 | crate::log::Debug2Format(&err) | ||
| 996 | ); | ||
| 997 | continue; | ||
| 998 | } | ||
| 999 | |||
| 1000 | if let Err(err) = device.run(&mut socket).await { | ||
| 1001 | crate::log::error!( | ||
| 1002 | "Device run failed with: {:?}", | ||
| 1003 | crate::log::Debug2Format(&err) | ||
| 1004 | ); | ||
| 1005 | } | ||
| 1006 | } | ||
| 1007 | } | ||
| 1008 | |||
| 908 | async fn wait_on_atomic_waker(waker: &AtomicWaker) { | 1009 | async fn wait_on_atomic_waker(waker: &AtomicWaker) { |
| 909 | struct F<'a>(&'a AtomicWaker, bool); | 1010 | struct F<'a>(&'a AtomicWaker, bool); |
| 910 | impl<'a> core::future::Future for F<'a> { | 1011 | impl<'a> core::future::Future for F<'a> { |
