From 077b4c04d1eba229d2bdd42952f5617966c6b3fe Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sat, 6 Dec 2025 15:05:48 +0000 Subject: Implement connect_and_run helper function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a convenience function to handle connection lifecycle with automatic reconnection. The function supports: - Parsing addresses as IP:port, IP with default port (1883), or hostname with DNS resolution - Filtering DNS results for IPv4 addresses only - Automatic reconnection with 5-second delay on failures - Comprehensive error logging for DNS, TCP, and device run failures - TCP socket timeout of 10 seconds 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/lib.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index ca2ab82..abb803f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,10 @@ #![no_std] -use core::{cell::RefCell, task::Waker}; +use core::{cell::RefCell, net::{Ipv4Addr, SocketAddrV4}, task::Waker}; +use embassy_net::tcp::TcpSocket; use embassy_sync::waitqueue::AtomicWaker; +use embassy_time::Timer; use heapless::{ Vec, VecView, string::{String, StringView}, @@ -905,6 +907,105 @@ impl<'a> Device<'a> { } } +pub async fn connect_and_run( + stack: embassy_net::Stack<'_>, + mut device: Device<'_>, + address: &str, +) -> ! { + const DEFAULT_MQTT_PORT: u16 = 1883; + + let mut rx_buffer = [0u8; 1024]; + let mut tx_buffer = [0u8; 1024]; + let mut delay = false; + + loop { + if !delay { + delay = true; + } else { + crate::log::info!("Retrying connection in 5 seconds..."); + Timer::after_secs(5).await; + } + + let addr = { + // Try to parse as complete SocketAddrV4 first (e.g., "192.168.1.1:1883") + if let Ok(sock_addr) = address.parse::() { + sock_addr + } + // Try to parse as Ipv4Addr with default port (e.g., "192.168.1.1") + else if let Ok(ip_addr) = address.parse::() { + SocketAddrV4::new(ip_addr, DEFAULT_MQTT_PORT) + } + // Otherwise, parse as hostname:port or hostname + else { + let (addr_str, port) = match address.split_once(':') { + Some((addr_str, port_str)) => { + let port = port_str + .parse::() + .expect("Invalid port number in address"); + (addr_str, port) + } + None => (address, DEFAULT_MQTT_PORT), + }; + + let addrs = match stack + .dns_query(addr_str, embassy_net::dns::DnsQueryType::A) + .await + { + Ok(addrs) => addrs, + Err(err) => { + crate::log::error!( + "DNS query for '{}' failed with: {:?}", + addr_str, + crate::log::Debug2Format(&err) + ); + continue; + } + }; + + let ipv4_addr = match addrs + .iter() + .filter_map(|addr| match addr { + embassy_net::IpAddress::Ipv4(ipv4) => Some((*ipv4).into()), + }) + .next() + { + Some(addr) => addr, + None => { + crate::log::error!( + "DNS query for '{}' returned no IPv4 addresses", + addr_str + ); + continue; + } + }; + + SocketAddrV4::new(ipv4_addr, port) + } + }; + + crate::log::info!("Connecting to MQTT broker at {}", addr); + + let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); + socket.set_timeout(Some(embassy_time::Duration::from_secs(10))); + + if let Err(err) = socket.connect(addr).await { + crate::log::error!( + "TCP connect to {} failed with: {:?}", + addr, + crate::log::Debug2Format(&err) + ); + continue; + } + + if let Err(err) = device.run(&mut socket).await { + crate::log::error!( + "Device run failed with: {:?}", + crate::log::Debug2Format(&err) + ); + } + } +} + async fn wait_on_atomic_waker(waker: &AtomicWaker) { struct F<'a>(&'a AtomicWaker, bool); impl<'a> core::future::Future for F<'a> { -- cgit