From 8ac9ddd2cbc9cf454eae066e5e60d05ee714a83e Mon Sep 17 00:00:00 2001 From: diogo464 Date: Mon, 8 Dec 2025 20:49:23 +0000 Subject: formatting and improved timeout handling --- src/lib.rs | 230 ++++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 168 insertions(+), 62 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index c1b5191..672fde9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,14 @@ #![no_std] -use core::{cell::RefCell, net::{Ipv4Addr, SocketAddrV4}, task::Waker}; +use core::{ + cell::RefCell, + net::{Ipv4Addr, SocketAddrV4}, + task::Waker, +}; use embassy_net::tcp::TcpSocket; use embassy_sync::waitqueue::AtomicWaker; -use embassy_time::Timer; +use embassy_time::{Duration, Timer}; use heapless::{ Vec, VecView, string::{String, StringView}, @@ -432,7 +436,11 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi } } -fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityStorage) -> Entity<'a> { +fn create_entity<'a>( + device: &Device<'a>, + config: EntityConfig, + storage: EntityStorage, +) -> Entity<'a> { let index = 'outer: { for idx in 0..device.entities.len() { if device.entities[idx].borrow().is_none() { @@ -457,7 +465,11 @@ fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityS } } -pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorConfig) -> Sensor<'a> { +pub fn create_sensor<'a>( + device: &Device<'a>, + id: &'static str, + config: SensorConfig, +) -> Sensor<'a> { let mut entity_config = EntityConfig::default(); entity_config.id = id; config.populate(&mut entity_config); @@ -470,16 +482,28 @@ pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorCo Sensor::new(entity) } -pub fn create_button<'a>(device: &Device<'a>, id: &'static str, config: ButtonConfig) -> Button<'a> { +pub fn create_button<'a>( + device: &Device<'a>, + id: &'static str, + config: ButtonConfig, +) -> Button<'a> { let mut entity_config = EntityConfig::default(); entity_config.id = id; config.populate(&mut entity_config); - let entity = create_entity(device, entity_config, EntityStorage::Button(Default::default())); + let entity = create_entity( + device, + entity_config, + EntityStorage::Button(Default::default()), + ); Button::new(entity) } -pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberConfig) -> Number<'a> { +pub fn create_number<'a>( + device: &Device<'a>, + id: &'static str, + config: NumberConfig, +) -> Number<'a> { let mut entity_config = EntityConfig::default(); entity_config.id = id; config.populate(&mut entity_config); @@ -495,7 +519,11 @@ pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberCo Number::new(entity) } -pub fn create_switch<'a>(device: &Device<'a>, id: &'static str, config: SwitchConfig) -> Switch<'a> { +pub fn create_switch<'a>( + device: &Device<'a>, + id: &'static str, + config: SwitchConfig, +) -> Switch<'a> { let mut entity_config = EntityConfig::default(); entity_config.id = id; config.populate(&mut entity_config); @@ -531,6 +559,8 @@ pub fn create_binary_sensor<'a>( pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { use core::fmt::Write; + const MQTT_TIMEOUT: Duration = Duration::from_secs(30); + device.availability_topic_buffer.clear(); write!( device.availability_topic_buffer, @@ -549,15 +579,24 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re will_retain: true, ..Default::default() }; - if let Err(err) = client - .connect_with(device.config.device_id, connect_params) - .await + match embassy_time::with_timeout( + MQTT_TIMEOUT, + client.connect_with(device.config.device_id, connect_params), + ) + .await { - crate::log::error!( - "mqtt connect failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt connection failed")); + Ok(Ok(())) => {} + Ok(Err(err)) => { + crate::log::error!( + "mqtt connect failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt connection failed")); + } + Err(_) => { + crate::log::error!("mqtt connect timed out"); + return Err(Error::new("mqtt connect timed out")); + } } crate::log::debug!("sending discover messages"); @@ -635,7 +674,8 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re discovery ); - device.discovery_buffer + device + .discovery_buffer .resize(device.discovery_buffer.capacity(), 0) .unwrap(); let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer) @@ -645,48 +685,73 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re let discovery_topic = device.discovery_topic_buffer.as_str(); crate::log::debug!("sending discovery to topic '{}'", discovery_topic); - if let Err(err) = client - .publish(discovery_topic, &device.discovery_buffer) - .await + match embassy_time::with_timeout( + MQTT_TIMEOUT, + client.publish(discovery_topic, &device.discovery_buffer), + ) + .await { - crate::log::error!( - "mqtt discovery publish failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt discovery publish failed")); + Ok(Ok(_)) => {} + Ok(Err(err)) => { + crate::log::error!( + "mqtt discovery publish failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt discovery publish failed")); + } + Err(_) => { + crate::log::error!("mqtt discovery publish timed out"); + return Err(Error::new("mqtt discovery publish timed out")); + } } let command_topic = device.command_topic_buffer.as_str(); crate::log::debug!("subscribing to command topic '{}'", command_topic); - if let Err(err) = client.subscribe(command_topic).await { - crate::log::error!( - "mqtt subscribe to '{}' failed with: {:?}", - command_topic, - crate::log::Debug2Format(&err) - ); - return Err(Error::new( - "mqtt subscription to entity command topic failed", - )); + match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await { + Ok(Ok(_)) => {} + Ok(Err(err)) => { + crate::log::error!( + "mqtt subscribe to '{}' failed with: {:?}", + command_topic, + crate::log::Debug2Format(&err) + ); + return Err(Error::new( + "mqtt subscription to entity command topic failed", + )); + } + Err(_) => { + crate::log::error!("mqtt subscribe to '{}' timed out", command_topic); + return Err(Error::new("mqtt subscribe timed out")); + } } } - if let Err(err) = client - .publish_with( - availability_topic, - AVAILABLE_PAYLOAD.as_bytes(), - embedded_mqtt::PublishParams { - retain: true, - ..Default::default() - }, - ) - .await - { + match embassy_time::with_timeout( + MQTT_TIMEOUT, + client.publish_with( + availability_topic, + AVAILABLE_PAYLOAD.as_bytes(), + embedded_mqtt::PublishParams { + retain: true, + ..Default::default() + }, + ), + ) + .await + { + Ok(Ok(_)) => {} + Ok(Err(err)) => { crate::log::error!( "mqtt availability publish failed with: {:?}", crate::log::Debug2Format(&err) ); return Err(Error::new("mqtt availability publish failed")); } + Err(_) => { + crate::log::error!("mqtt availability publish timed out"); + return Err(Error::new("mqtt availability publish timed out")); + } + } 'outer_loop: loop { use core::fmt::Write; @@ -749,7 +814,14 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } let state_topic = device.state_topic_buffer.as_str(); - if let Err(err) = client.publish(state_topic, device.publish_buffer).await { + match embassy_time::with_timeout( + MQTT_TIMEOUT, + client.publish(state_topic, device.publish_buffer), + ) + .await + { + Ok(Ok(_)) => {} + Ok(Err(err)) => { crate::log::error!( "mqtt state publish on topic '{}' failed with: {:?}", state_topic, @@ -757,12 +829,22 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re ); return Err(Error::new("mqtt publish failed")); } + Err(_) => { + crate::log::error!("mqtt state publish on topic '{}' timed out", state_topic); + return Err(Error::new("mqtt publish timed out")); + } } + } let receive = client.receive(); let waker = wait_on_atomic_waker(device.waker); - let publish = match embassy_futures::select::select(receive, waker).await { - embassy_futures::select::Either::First(packet) => match packet { + let publish = match embassy_time::with_timeout( + MQTT_TIMEOUT, + embassy_futures::select::select(receive, waker), + ) + .await + { + Ok(embassy_futures::select::Either::First(packet)) => match packet { Ok(embedded_mqtt::Packet::Publish(publish)) => publish, Err(err) => { crate::log::error!( @@ -773,7 +855,11 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } _ => continue, }, - embassy_futures::select::Either::Second(_) => continue, + Ok(embassy_futures::select::Either::Second(_)) => continue, + Err(_) => { + crate::log::error!("mqtt receive timed out"); + return Err(Error::new("mqtt receive timed out")); + } }; let entity = 'entity_search_block: { @@ -816,12 +902,24 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re ); let data_len = publish.data_len; - if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { - crate::log::error!( - "mqtt receive data failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt receive data failed")); + match embassy_time::with_timeout( + MQTT_TIMEOUT, + client.receive_data(&mut read_buffer[..data_len]), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(err)) => { + crate::log::error!( + "mqtt receive data failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt receive data failed")); + } + Err(_) => { + crate::log::error!("mqtt receive data timed out"); + return Err(Error::new("mqtt receive data timed out")); + } } let command = match str::from_utf8(&read_buffer[..data_len]) { @@ -990,13 +1088,21 @@ pub async fn connect_and_run( let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); socket.set_timeout(Some(embassy_time::Duration::from_secs(10))); - if let Err(err) = socket.connect(addr).await { - crate::log::error!( - "TCP connect to {} failed with: {:?}", - addr, - crate::log::Debug2Format(&err) - ); - continue; + let connect_fut = embassy_time::with_timeout(Duration::from_secs(10), socket.connect(addr)); + match connect_fut.await { + Ok(Err(err)) => { + crate::log::error!( + "TCP connect to {} failed with: {:?}", + addr, + crate::log::Debug2Format(&err) + ); + continue; + } + Err(_) => { + crate::log::error!("TCP connect to {} timed out", addr); + continue; + } + _ => {} } socket.set_timeout(None); -- cgit