aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-06 15:05:48 +0000
committerdiogo464 <[email protected]>2025-12-06 15:06:44 +0000
commit077b4c04d1eba229d2bdd42952f5617966c6b3fe (patch)
treeedce665979c90dabe61f96e1769c1bd63d96ab25
parent3a974cec36cd8a48992e05629325d8279cf289b7 (diff)
Implement connect_and_run helper function
Added a convenience function to handle connection lifecycle with automatic reconnection. The function supports: - Parsing addresses as IP:port, IP with default port (1883), or hostname with DNS resolution - Filtering DNS results for IPv4 addresses only - Automatic reconnection with 5-second delay on failures - Comprehensive error logging for DNS, TCP, and device run failures - TCP socket timeout of 10 seconds 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
-rw-r--r--Cargo.toml2
-rw-r--r--src/lib.rs103
2 files changed, 103 insertions, 2 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 13a11fe..5f679a9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,7 +10,7 @@ tracing = ["dep:tracing"]
10 10
11[dependencies] 11[dependencies]
12embedded-mqtt = { path = "./embedded-mqtt" , features = ["embassy-net"] } 12embedded-mqtt = { path = "./embedded-mqtt" , features = ["embassy-net"] }
13embassy-net = { version = "0.7.1", features = ["medium-ip", "proto-ipv4", "tcp"] } 13embassy-net = { version = "0.7.1", features = ["medium-ip", "proto-ipv4", "tcp", "dns"] }
14heapless = "0.9.2" 14heapless = "0.9.2"
15embassy-time = { version = "0.5.0" } 15embassy-time = { version = "0.5.0" }
16serde-json-core = "0.6.0" 16serde-json-core = "0.6.0"
diff --git a/src/lib.rs b/src/lib.rs
index ca2ab82..abb803f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,8 +1,10 @@
1#![no_std] 1#![no_std]
2 2
3use core::{cell::RefCell, task::Waker}; 3use core::{cell::RefCell, net::{Ipv4Addr, SocketAddrV4}, task::Waker};
4 4
5use embassy_net::tcp::TcpSocket;
5use embassy_sync::waitqueue::AtomicWaker; 6use embassy_sync::waitqueue::AtomicWaker;
7use embassy_time::Timer;
6use heapless::{ 8use heapless::{
7 Vec, VecView, 9 Vec, VecView,
8 string::{String, StringView}, 10 string::{String, StringView},
@@ -905,6 +907,105 @@ impl<'a> Device<'a> {
905 } 907 }
906} 908}
907 909
910pub async fn connect_and_run(
911 stack: embassy_net::Stack<'_>,
912 mut device: Device<'_>,
913 address: &str,
914) -> ! {
915 const DEFAULT_MQTT_PORT: u16 = 1883;
916
917 let mut rx_buffer = [0u8; 1024];
918 let mut tx_buffer = [0u8; 1024];
919 let mut delay = false;
920
921 loop {
922 if !delay {
923 delay = true;
924 } else {
925 crate::log::info!("Retrying connection in 5 seconds...");
926 Timer::after_secs(5).await;
927 }
928
929 let addr = {
930 // Try to parse as complete SocketAddrV4 first (e.g., "192.168.1.1:1883")
931 if let Ok(sock_addr) = address.parse::<SocketAddrV4>() {
932 sock_addr
933 }
934 // Try to parse as Ipv4Addr with default port (e.g., "192.168.1.1")
935 else if let Ok(ip_addr) = address.parse::<Ipv4Addr>() {
936 SocketAddrV4::new(ip_addr, DEFAULT_MQTT_PORT)
937 }
938 // Otherwise, parse as hostname:port or hostname
939 else {
940 let (addr_str, port) = match address.split_once(':') {
941 Some((addr_str, port_str)) => {
942 let port = port_str
943 .parse::<u16>()
944 .expect("Invalid port number in address");
945 (addr_str, port)
946 }
947 None => (address, DEFAULT_MQTT_PORT),
948 };
949
950 let addrs = match stack
951 .dns_query(addr_str, embassy_net::dns::DnsQueryType::A)
952 .await
953 {
954 Ok(addrs) => addrs,
955 Err(err) => {
956 crate::log::error!(
957 "DNS query for '{}' failed with: {:?}",
958 addr_str,
959 crate::log::Debug2Format(&err)
960 );
961 continue;
962 }
963 };
964
965 let ipv4_addr = match addrs
966 .iter()
967 .filter_map(|addr| match addr {
968 embassy_net::IpAddress::Ipv4(ipv4) => Some((*ipv4).into()),
969 })
970 .next()
971 {
972 Some(addr) => addr,
973 None => {
974 crate::log::error!(
975 "DNS query for '{}' returned no IPv4 addresses",
976 addr_str
977 );
978 continue;
979 }
980 };
981
982 SocketAddrV4::new(ipv4_addr, port)
983 }
984 };
985
986 crate::log::info!("Connecting to MQTT broker at {}", addr);
987
988 let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
989 socket.set_timeout(Some(embassy_time::Duration::from_secs(10)));
990
991 if let Err(err) = socket.connect(addr).await {
992 crate::log::error!(
993 "TCP connect to {} failed with: {:?}",
994 addr,
995 crate::log::Debug2Format(&err)
996 );
997 continue;
998 }
999
1000 if let Err(err) = device.run(&mut socket).await {
1001 crate::log::error!(
1002 "Device run failed with: {:?}",
1003 crate::log::Debug2Format(&err)
1004 );
1005 }
1006 }
1007}
1008
908async fn wait_on_atomic_waker(waker: &AtomicWaker) { 1009async fn wait_on_atomic_waker(waker: &AtomicWaker) {
909 struct F<'a>(&'a AtomicWaker, bool); 1010 struct F<'a>(&'a AtomicWaker, bool);
910 impl<'a> core::future::Future for F<'a> { 1011 impl<'a> core::future::Future for F<'a> {