From b456db36b7d973154f0a10bc1adca235de2c5c12 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sat, 13 Dec 2025 20:09:33 +0000 Subject: publish values on first iteration of run if the connection fails its possible some value that needed to be published did not get published so this should fix that. --- src/lib.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f28d2bd..64057a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -863,6 +863,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } } + let mut first_iteration_push = true; 'outer_loop: loop { use core::fmt::Write; @@ -874,7 +875,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re None => break, }; - if !entity.publish { + if !entity.publish && !first_iteration_push { continue; } @@ -920,10 +921,12 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re device.publish_buffer.truncate(n); } _ => { - crate::log::warn!( - "entity '{}' requested state publish but its storage does not support it", - entity.config.id - ); + if !first_iteration_push { + crate::log::warn!( + "entity '{}' requested state publish but its storage does not support it", + entity.config.id + ); + } continue; } } @@ -970,6 +973,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } } } + first_iteration_push = false; let receive = client.receive(); let waker = wait_on_atomic_waker(device.waker); -- cgit From 871b67e886ed9853e3fa392ff33a05c7da09fe9b Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sat, 13 Dec 2025 20:32:31 +0000 Subject: use 30s mqtt keepalive --- src/lib.rs | 50 ++++++++++++++++++++++++++++---------------------- src/mqtt/mod.rs | 21 +++++++++++++++++---- src/mqtt/rx.rs | 13 +++++++++++-- src/mqtt/tx.rs | 9 +++++++++ 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 64057a4..7d31ac5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,7 @@ pub use unit::*; const AVAILABLE_PAYLOAD: &str = "online"; const NOT_AVAILABLE_PAYLOAD: &str = "offline"; +const DEFAULT_KEEPALIVE_TIME: u16 = 30; #[derive(Debug)] pub struct Error(&'static str); @@ -673,11 +674,14 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re .expect("device availability buffer too small"); let availability_topic = device.availability_topic_buffer.as_str(); + let mut ping_ticker = + embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); let mut client = mqtt::Client::new(device.mqtt_resources, transport); let connect_params = mqtt::ConnectParams { will_topic: Some(availability_topic), will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), will_retain: true, + keepalive: Some(DEFAULT_KEEPALIVE_TIME), ..Default::default() }; match embassy_time::with_timeout( @@ -977,29 +981,31 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re let receive = client.receive(); let waker = wait_on_atomic_waker(device.waker); - let publish = match embassy_time::with_timeout( - MQTT_TIMEOUT, - embassy_futures::select::select(receive, waker), - ) - .await - { - Ok(embassy_futures::select::Either::First(packet)) => match packet { - Ok(mqtt::Packet::Publish(publish)) => publish, - Err(err) => { - crate::log::error!( - "mqtt receive failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt receive failed")); + let publish = + match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await { + embassy_futures::select::Either3::First(packet) => match packet { + Ok(mqtt::Packet::Publish(publish)) => publish, + Err(err) => { + crate::log::error!( + "mqtt receive failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt receive failed")); + } + _ => continue, + }, + embassy_futures::select::Either3::Second(_) => continue, + embassy_futures::select::Either3::Third(_) => { + if let Err(err) = client.ping().await { + crate::log::error!( + "mqtt ping failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt ping failed")); + } + continue; } - _ => continue, - }, - Ok(embassy_futures::select::Either::Second(_)) => continue, - Err(_) => { - crate::log::error!("mqtt receive timed out"); - return Err(Error::new("mqtt receive timed out")); - } - }; + }; let entity = 'entity_search_block: { for entity in device.entities { diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 04e63b6..d502f85 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -120,6 +120,7 @@ pub enum Packet<'a> { PublishAck(PublishAck), SubscribeAck(SubscribeAck), UnsubscribeAck(UnsubscribeAck), + PingResponse, } pub struct ClientResources< @@ -199,7 +200,7 @@ where will_topic: params.will_topic, will_payload: params.will_payload, will_retain: params.will_retain, - keepalive: None, + keepalive: params.keepalive, }, ); self.transport @@ -220,12 +221,23 @@ where Err(Error::ConnectFailed(code)) } } - _ => Err(Error::Protocol( - "expected CONNACK packet after CONNECT", - )), + _ => Err(Error::Protocol("expected CONNACK packet after CONNECT")), } } + pub async fn ping(&mut self) -> Result<(), Error> { + let mut buffer = FieldBuffer::default(); + tx::pingreq(&mut buffer); + + self.transport + .write_fields(&buffer) + .await + .map_err(Error::Transport)?; + self.transport.flush().await.map_err(Error::Transport)?; + + Ok(()) + } + pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result> { self.publish_with(topic, data, Default::default()).await } @@ -391,6 +403,7 @@ where rx::Packet::UnsubscribeAck { packet_id } => { Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id })) } + rx::Packet::PingResp => Ok(Packet::PingResponse), } } diff --git a/src/mqtt/rx.rs b/src/mqtt/rx.rs index e81171d..10b775a 100644 --- a/src/mqtt/rx.rs +++ b/src/mqtt/rx.rs @@ -65,6 +65,7 @@ pub enum Packet<'a> { UnsubscribeAck { packet_id: PacketId, }, + PingResp, } pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { @@ -156,6 +157,15 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { let packet_id = PacketId::from(reader.read_u16()?); Packet::UnsubscribeAck { packet_id } } + protocol::PACKET_TYPE_PINGRESP => { + if packet_flags != 0 { + return Err(Error::InvalidPacket("PINGRESP flags must be zero")); + } + if packet_len != 0 { + return Err(Error::InvalidPacket("PINGRESP remaining length must be 0")); + } + Packet::PingResp + } protocol::PACKET_TYPE_CONNECT | protocol::PACKET_TYPE_PUBREC | protocol::PACKET_TYPE_PUBREL @@ -163,8 +173,7 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | protocol::PACKET_TYPE_DISCONNECT | protocol::PACKET_TYPE_SUBSCRIBE | protocol::PACKET_TYPE_UNSUBSCRIBE - | protocol::PACKET_TYPE_PINGREQ - | protocol::PACKET_TYPE_PINGRESP => { + | protocol::PACKET_TYPE_PINGREQ => { return Err(Error::UnsupportedPacket { packet_type, packet_len, diff --git a/src/mqtt/tx.rs b/src/mqtt/tx.rs index cdf1c75..7a4d443 100644 --- a/src/mqtt/tx.rs +++ b/src/mqtt/tx.rs @@ -201,3 +201,12 @@ pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) { buffer.push(Field::U16(packet_id.into())); } +pub fn pingreq(buffer: &mut FieldBuffer) { + // PINGREQ has no variable header or payload + buffer.push(Field::U8(protocol::create_header_control( + protocol::PACKET_TYPE_PINGREQ, + 0, + ))); + buffer.push(Field::VarInt(0)); +} + -- cgit