From b456db36b7d973154f0a10bc1adca235de2c5c12 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sat, 13 Dec 2025 20:09:33 +0000 Subject: publish values on first iteration of run if the connection fails its possible some value that needed to be published did not get published so this should fix that. --- src/lib.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index f28d2bd..64057a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -863,6 +863,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } } + let mut first_iteration_push = true; 'outer_loop: loop { use core::fmt::Write; @@ -874,7 +875,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re None => break, }; - if !entity.publish { + if !entity.publish && !first_iteration_push { continue; } @@ -920,10 +921,12 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re device.publish_buffer.truncate(n); } _ => { - crate::log::warn!( - "entity '{}' requested state publish but its storage does not support it", - entity.config.id - ); + if !first_iteration_push { + crate::log::warn!( + "entity '{}' requested state publish but its storage does not support it", + entity.config.id + ); + } continue; } } @@ -970,6 +973,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } } } + first_iteration_push = false; let receive = client.receive(); let waker = wait_on_atomic_waker(device.waker); -- cgit From 871b67e886ed9853e3fa392ff33a05c7da09fe9b Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sat, 13 Dec 2025 20:32:31 +0000 Subject: use 30s mqtt keepalive --- src/lib.rs | 50 ++++++++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 22 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index 64057a4..7d31ac5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,7 @@ pub use unit::*; const AVAILABLE_PAYLOAD: &str = "online"; const NOT_AVAILABLE_PAYLOAD: &str = "offline"; +const DEFAULT_KEEPALIVE_TIME: u16 = 30; #[derive(Debug)] pub struct Error(&'static str); @@ -673,11 +674,14 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re .expect("device availability buffer too small"); let availability_topic = device.availability_topic_buffer.as_str(); + let mut ping_ticker = + embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); let mut client = mqtt::Client::new(device.mqtt_resources, transport); let connect_params = mqtt::ConnectParams { will_topic: Some(availability_topic), will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), will_retain: true, + keepalive: Some(DEFAULT_KEEPALIVE_TIME), ..Default::default() }; match embassy_time::with_timeout( @@ -977,29 +981,31 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re let receive = client.receive(); let waker = wait_on_atomic_waker(device.waker); - let publish = match embassy_time::with_timeout( - MQTT_TIMEOUT, - embassy_futures::select::select(receive, waker), - ) - .await - { - Ok(embassy_futures::select::Either::First(packet)) => match packet { - Ok(mqtt::Packet::Publish(publish)) => publish, - Err(err) => { - crate::log::error!( - "mqtt receive failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt receive failed")); + let publish = + match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await { + embassy_futures::select::Either3::First(packet) => match packet { + Ok(mqtt::Packet::Publish(publish)) => publish, + Err(err) => { + crate::log::error!( + "mqtt receive failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt receive failed")); + } + _ => continue, + }, + embassy_futures::select::Either3::Second(_) => continue, + embassy_futures::select::Either3::Third(_) => { + if let Err(err) = client.ping().await { + crate::log::error!( + "mqtt ping failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt ping failed")); + } + continue; } - _ => continue, - }, - Ok(embassy_futures::select::Either::Second(_)) => continue, - Err(_) => { - crate::log::error!("mqtt receive timed out"); - return Err(Error::new("mqtt receive timed out")); - } - }; + }; let entity = 'entity_search_block: { for entity in device.entities { -- cgit