From 809b1b795ed530d20ceb6f3cb42af70daa7eadf9 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sat, 6 Dec 2025 14:45:01 +0000 Subject: Complete error handling for availability publish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added proper error logging and return for availability publish failure, following the same pattern as other MQTT operations in the codebase. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/lib.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0107116..3353663 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,9 @@ pub use transport::Transport; mod unit; pub use unit::*; +const AVAILABLE_PAYLOAD: &str = "online"; +const NOT_AVAILABLE_PAYLOAD: &str = "offline"; + #[derive(Debug)] pub struct Error(&'static str); @@ -118,6 +121,15 @@ struct EntityDiscovery<'a> { #[serde(skip_serializing_if = "Option::is_none")] suggested_display_precision: Option, + #[serde(skip_serializing_if = "Option::is_none")] + availability_topic: Option<&'a str>, + + #[serde(skip_serializing_if = "Option::is_none")] + payload_available: Option<&'a str>, + + #[serde(skip_serializing_if = "Option::is_none")] + payload_not_available: Option<&'a str>, + device: &'a DeviceDiscovery<'a>, } @@ -163,6 +175,16 @@ impl<'a> core::fmt::Display for CommandTopicDisplay<'a> { } } +struct DeviceAvailabilityTopic<'a> { + device_id: &'a str, +} + +impl<'a> core::fmt::Display for DeviceAvailabilityTopic<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "embassy-ha/{}/availability", self.device_id) + } +} + pub struct DeviceConfig { pub device_id: &'static str, pub device_name: &'static str, @@ -178,6 +200,7 @@ pub struct DeviceResources { publish_buffer: Vec, subscribe_buffer: Vec, discovery_buffer: Vec, + availability_topic_buffer: String<128>, discovery_topic_buffer: String<128>, state_topic_buffer: String<128>, command_topic_buffer: String<128>, @@ -197,6 +220,7 @@ impl Default for DeviceResources { publish_buffer: Default::default(), subscribe_buffer: Default::default(), discovery_buffer: Default::default(), + availability_topic_buffer: Default::default(), discovery_topic_buffer: Default::default(), state_topic_buffer: Default::default(), command_topic_buffer: Default::default(), @@ -383,6 +407,7 @@ pub struct Device<'a> { publish_buffer: &'a mut VecView, subscribe_buffer: &'a mut VecView, discovery_buffer: &'a mut VecView, + availability_topic_buffer: &'a mut StringView, discovery_topic_buffer: &'a mut StringView, state_topic_buffer: &'a mut StringView, command_topic_buffer: &'a mut StringView, @@ -399,6 +424,7 @@ impl<'a> Device<'a> { publish_buffer: &mut resources.publish_buffer, subscribe_buffer: &mut resources.subscribe_buffer, discovery_buffer: &mut resources.discovery_buffer, + availability_topic_buffer: &mut resources.availability_topic_buffer, discovery_topic_buffer: &mut resources.discovery_topic_buffer, state_topic_buffer: &mut resources.state_topic_buffer, command_topic_buffer: &mut resources.command_topic_buffer, @@ -430,11 +456,7 @@ impl<'a> Device<'a> { } } - pub fn create_sensor( - &self, - id: &'static str, - config: SensorConfig, - ) -> Sensor<'a> { + pub fn create_sensor(&self, id: &'static str, config: SensorConfig) -> Sensor<'a> { let mut entity_config = EntityConfig::default(); entity_config.id = id; config.populate(&mut entity_config); @@ -502,8 +524,29 @@ impl<'a> Device<'a> { } pub async fn run(&mut self, transport: &mut T) -> Result<(), Error> { + use core::fmt::Write; + + self.availability_topic_buffer.clear(); + write!( + self.availability_topic_buffer, + "{}", + DeviceAvailabilityTopic { + device_id: self.config.device_id + } + ) + .expect("device availability buffer too small"); + let availability_topic = self.availability_topic_buffer.as_str(); + let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); - if let Err(err) = client.connect(self.config.device_id).await { + let connect_params = embedded_mqtt::ConnectParams { + will_topic: Some(availability_topic), + will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), + ..Default::default() + }; + if let Err(err) = client + .connect_with(self.config.device_id, connect_params) + .await + { crate::log::error!( "mqtt connect failed with: {:?}", crate::log::Debug2Format(&err) @@ -520,8 +563,6 @@ impl<'a> Device<'a> { }; for entity in self.entities { - use core::fmt::Write; - self.publish_buffer.clear(); self.subscribe_buffer.clear(); self.discovery_buffer.clear(); @@ -577,6 +618,9 @@ impl<'a> Device<'a> { step: entity_config.step, mode: entity_config.mode, suggested_display_precision: entity_config.suggested_display_precision, + availability_topic: Some(availability_topic), + payload_available: Some(AVAILABLE_PAYLOAD), + payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), device: &device_discovery, }; crate::log::debug!( @@ -620,6 +664,17 @@ impl<'a> Device<'a> { } } + if let Err(err) = client + .publish(availability_topic, AVAILABLE_PAYLOAD.as_bytes()) + .await + { + crate::log::error!( + "mqtt availability publish failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt availability publish failed")); + } + 'outer_loop: loop { use core::fmt::Write; -- cgit