From 27779ecf0d7c00ae0acb650c0d75d28edcc3aa89 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Fri, 9 Jan 2026 11:28:49 +0000 Subject: add mqtt helper functions and refactor buffer management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Created helper functions to eliminate code duplication in MQTT operations: - device_mqtt_subscribe: handles subscription with timeout and error logging - device_mqtt_publish: handles publishing with optional retain flag - mqtt_receive_data: handles data receiving with buffer validation and timeout - publish_entity_discoveries: publishes all entity discoveries - generate_entity_discovery: generates entity discovery payloads Refactored buffer management by introducing DeviceBuffersOwned and DeviceBuffers structures to group related buffers together, making them easier to pass to helper functions. Added MQTT_TIMEOUT constant at module level for consistent timeout handling. Fixed bug where only the last entity's discovery was republished when Home Assistant came back online. Now correctly republishes all entities by subscribing to homeassistant/status and using publish_entity_discoveries when the "online" status is received. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- src/lib.rs | 690 +++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 417 insertions(+), 273 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index edc60f2..b09bda2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,6 +153,7 @@ pub use unit::*; const AVAILABLE_PAYLOAD: &str = "online"; const NOT_AVAILABLE_PAYLOAD: &str = "offline"; const DEFAULT_KEEPALIVE_TIME: u16 = 30; +const MQTT_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug)] pub struct Error(&'static str); @@ -323,19 +324,77 @@ pub struct DeviceConfig { pub model: &'static str, } +pub struct DeviceBuffersOwned { + pub publish: Vec, + pub subscribe: Vec, + pub discovery: Vec, + pub availability_topic: String<128>, + pub discovery_topic: String<128>, + pub state_topic: String<128>, + pub command_topic: String<128>, + pub attributes_topic: String<128>, +} + +impl Default for DeviceBuffersOwned { + fn default() -> Self { + Self { + publish: Default::default(), + subscribe: Default::default(), + discovery: Default::default(), + availability_topic: Default::default(), + discovery_topic: Default::default(), + state_topic: Default::default(), + command_topic: Default::default(), + attributes_topic: Default::default(), + } + } +} + +impl DeviceBuffersOwned { + pub fn as_buffers_mut(&mut self) -> DeviceBuffers<'_> { + DeviceBuffers { + publish: &mut self.publish, + subscribe: &mut self.subscribe, + discovery: &mut self.discovery, + availability_topic: &mut self.availability_topic, + discovery_topic: &mut self.discovery_topic, + state_topic: &mut self.state_topic, + command_topic: &mut self.command_topic, + attributes_topic: &mut self.attributes_topic, + } + } +} + +pub struct DeviceBuffers<'a> { + pub publish: &'a mut VecView, + pub subscribe: &'a mut VecView, + pub discovery: &'a mut VecView, + pub availability_topic: &'a mut StringView, + pub discovery_topic: &'a mut StringView, + pub state_topic: &'a mut StringView, + pub command_topic: &'a mut StringView, + pub attributes_topic: &'a mut StringView, +} + +impl<'a> DeviceBuffers<'a> { + pub fn clear(&mut self) { + self.publish.clear(); + self.subscribe.clear(); + self.discovery.clear(); + self.availability_topic.clear(); + self.discovery_topic.clear(); + self.state_topic.clear(); + self.command_topic.clear(); + self.attributes_topic.clear(); + } +} + pub struct DeviceResources { waker: AtomicWaker, entities: [RefCell>; Self::ENTITY_LIMIT], mqtt_resources: mqtt::ClientResources, - publish_buffer: Vec, - subscribe_buffer: Vec, - discovery_buffer: Vec, - availability_topic_buffer: String<128>, - discovery_topic_buffer: String<128>, - state_topic_buffer: String<128>, - command_topic_buffer: String<128>, - attributes_topic_buffer: String<128>, + buffers: DeviceBuffersOwned, } impl DeviceResources { @@ -349,14 +408,7 @@ impl Default for DeviceResources { entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT], mqtt_resources: Default::default(), - publish_buffer: Default::default(), - subscribe_buffer: Default::default(), - discovery_buffer: Default::default(), - availability_topic_buffer: Default::default(), - discovery_topic_buffer: Default::default(), - state_topic_buffer: Default::default(), - command_topic_buffer: Default::default(), - attributes_topic_buffer: Default::default(), + buffers: Default::default(), } } } @@ -564,14 +616,7 @@ pub struct Device<'a> { entities: &'a [RefCell>], mqtt_resources: &'a mut mqtt::ClientResources, - publish_buffer: &'a mut VecView, - subscribe_buffer: &'a mut VecView, - discovery_buffer: &'a mut VecView, - availability_topic_buffer: &'a mut StringView, - discovery_topic_buffer: &'a mut StringView, - state_topic_buffer: &'a mut StringView, - command_topic_buffer: &'a mut StringView, - attributes_topic_buffer: &'a mut StringView, + buffers: DeviceBuffers<'a>, } pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> { @@ -581,14 +626,7 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi entities: &resources.entities, mqtt_resources: &mut resources.mqtt_resources, - publish_buffer: &mut resources.publish_buffer, - subscribe_buffer: &mut resources.subscribe_buffer, - discovery_buffer: &mut resources.discovery_buffer, - availability_topic_buffer: &mut resources.availability_topic_buffer, - discovery_topic_buffer: &mut resources.discovery_topic_buffer, - state_topic_buffer: &mut resources.state_topic_buffer, - command_topic_buffer: &mut resources.command_topic_buffer, - attributes_topic_buffer: &mut resources.attributes_topic_buffer, + buffers: resources.buffers.as_buffers_mut(), } } @@ -741,6 +779,272 @@ pub fn create_device_tracker<'a>( DeviceTracker::new(entity) } +async fn device_mqtt_subscribe( + client: &mut mqtt::Client<'_, T>, + topic: impl core::fmt::Display, +) -> Result<(), Error> { + use core::fmt::Write; + + // Format topic to string for both subscribe call and logging + let mut topic_buffer = heapless::String::<128>::new(); + write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small"); + let topic_str = topic_buffer.as_str(); + + match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(topic_str)).await { + Ok(Ok(_)) => Ok(()), + Ok(Err(err)) => { + crate::log::error!( + "mqtt subscribe to '{}' failed with: {:?}", + topic_str, + crate::log::Debug2Format(&err) + ); + Err(Error::new("mqtt subscribe failed")) + } + Err(_) => { + crate::log::error!("mqtt subscribe to '{}' timed out", topic_str); + Err(Error::new("mqtt subscribe timed out")) + } + } +} + +async fn device_mqtt_publish( + client: &mut mqtt::Client<'_, T>, + topic: impl core::fmt::Display, + data: &[u8], + retain: bool, +) -> Result<(), Error> { + use core::fmt::Write; + + // Format topic to string for both publish call and logging + let mut topic_buffer = heapless::String::<128>::new(); + write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small"); + let topic_str = topic_buffer.as_str(); + + let result = if retain { + embassy_time::with_timeout( + MQTT_TIMEOUT, + client.publish_with( + topic_str, + data, + mqtt::PublishParams { + retain: true, + ..Default::default() + }, + ), + ) + .await + } else { + embassy_time::with_timeout(MQTT_TIMEOUT, client.publish(topic_str, data)).await + }; + + match result { + Ok(Ok(_)) => Ok(()), + Ok(Err(err)) => { + crate::log::error!( + "mqtt publish to '{}' failed with: {:?}", + topic_str, + crate::log::Debug2Format(&err) + ); + Err(Error::new("mqtt publish failed")) + } + Err(_) => { + crate::log::error!("mqtt publish to '{}' timed out", topic_str); + Err(Error::new("mqtt publish timed out")) + } + } +} + +/// Receives MQTT publish data with timeout and proper error handling. +/// +/// This helper function handles the common pattern of receiving MQTT data with: +/// - Automatic timeout handling using MQTT_TIMEOUT +/// - Consistent error logging +/// - Size validation against buffer capacity +/// +/// # Arguments +/// * `client` - MQTT client for receiving data +/// * `data_len` - Expected length of data to receive +/// * `buffer` - Buffer to receive the data into +/// +/// # Returns +/// * `Ok(&[u8])` - Slice of the buffer containing the received data +/// * `Err(Error)` - If operation fails, times out, or data exceeds buffer size +/// +/// # Errors +/// Returns error if: +/// - `data_len` is greater than `buffer.len()` (buffer too small) +/// - The receive operation times out after MQTT_TIMEOUT seconds +/// - The underlying MQTT receive operation fails +async fn mqtt_receive_data<'a, T: Transport>( + client: &mut mqtt::Client<'_, T>, + data_len: usize, + buffer: &'a mut [u8], +) -> Result<&'a [u8], Error> { + // Validate buffer size - reject if too small (per user requirement) + if data_len > buffer.len() { + crate::log::warn!( + "mqtt publish payload is too large ({} bytes, buffer size {} bytes), rejecting", + data_len, + buffer.len() + ); + return Err(Error::new("mqtt payload too large for buffer")); + } + + crate::log::debug!("mqtt receiving {} bytes of data", data_len); + + match embassy_time::with_timeout(MQTT_TIMEOUT, client.receive_data(&mut buffer[..data_len])) + .await + { + Ok(Ok(())) => Ok(&buffer[..data_len]), + Ok(Err(err)) => { + crate::log::error!( + "mqtt receive data failed with: {:?}", + crate::log::Debug2Format(&err) + ); + Err(Error::new("mqtt receive data failed")) + } + Err(_) => { + crate::log::error!("mqtt receive data timed out"); + Err(Error::new("mqtt receive data timed out")) + } + } +} + +/// Publishes discovery messages for all entities in the device. +/// +/// This function iterates over all entities, generates their discovery messages, +/// publishes them to MQTT, and optionally subscribes to their command topics. +/// +/// # Arguments +/// * `client` - MQTT client for publishing and subscribing +/// * `entities` - Slice of entities to publish discoveries for +/// * `buffers` - Device buffers for generating discovery messages +/// * `device_config` - Device configuration +/// * `availability_topic` - The device availability topic string +/// * `subscribe_to_commands` - Whether to subscribe to command topics (true for initial discovery, false for rediscovery) +/// +/// # Returns +/// `Ok(())` if all discoveries were published successfully, or an error if any operation fails +async fn publish_entity_discoveries( + client: &mut mqtt::Client<'_, T>, + entities: &[RefCell>], + buffers: &mut DeviceBuffers<'_>, + device_config: &DeviceConfig, + availability_topic: &str, + subscribe_to_commands: bool, +) -> Result<(), Error> { + crate::log::debug!("publishing entity discovery messages"); + + for entity in entities { + buffers.clear(); + + // Borrow the entity and fill out the buffers to be sent + // This should be done inside a block so that we do not hold the RefMut across an await + { + let mut entity = entity.borrow_mut(); + let entity = match entity.as_mut() { + Some(entity) => entity, + None => break, + }; + + generate_entity_discovery(buffers, device_config, &entity.config, availability_topic); + } + + let discovery_topic = buffers.discovery_topic.as_str(); + crate::log::debug!("sending discovery to topic '{}'", discovery_topic); + device_mqtt_publish(client, discovery_topic, buffers.discovery, false).await?; + + if subscribe_to_commands { + let command_topic = buffers.command_topic.as_str(); + crate::log::debug!("subscribing to command topic '{}'", command_topic); + device_mqtt_subscribe(client, command_topic).await?; + } + } + + Ok(()) +} + +fn generate_entity_discovery( + buffers: &mut DeviceBuffers<'_>, + device_config: &DeviceConfig, + entity_config: &EntityConfig, + availability_topic: &str, +) { + use core::fmt::Write; + + let discovery_topic_display = DiscoveryTopicDisplay { + domain: entity_config.domain, + device_id: device_config.device_id, + entity_id: entity_config.id, + }; + let state_topic_display = StateTopicDisplay { + device_id: device_config.device_id, + entity_id: entity_config.id, + }; + let command_topic_display = CommandTopicDisplay { + device_id: device_config.device_id, + entity_id: entity_config.id, + }; + let attributes_topic_display = AttributesTopicDisplay { + device_id: device_config.device_id, + entity_id: entity_config.id, + }; + + write!(buffers.discovery_topic, "{discovery_topic_display}") + .expect("discovery topic buffer too small"); + write!(buffers.state_topic, "{state_topic_display}").expect("state topic buffer too small"); + write!(buffers.command_topic, "{command_topic_display}") + .expect("command topic buffer too small"); + write!(buffers.attributes_topic, "{attributes_topic_display}") + .expect("attributes topic buffer too small"); + + let device_discovery = DeviceDiscovery { + identifiers: &[device_config.device_id], + name: device_config.device_name, + manufacturer: device_config.manufacturer, + model: device_config.model, + }; + + let discovery = EntityDiscovery { + id: entity_config.id, + name: entity_config.name, + device_class: entity_config.device_class, + state_topic: Some(buffers.state_topic.as_str()), + command_topic: Some(buffers.command_topic.as_str()), + json_attributes_topic: Some(buffers.attributes_topic.as_str()), + unit_of_measurement: entity_config.measurement_unit, + schema: entity_config.schema, + platform: entity_config.platform, + state_class: entity_config.state_class, + icon: entity_config.icon, + entity_category: entity_config.category, + entity_picture: entity_config.picture, + min: entity_config.min, + max: entity_config.max, + step: entity_config.step, + mode: entity_config.mode, + suggested_display_precision: entity_config.suggested_display_precision, + availability_topic: Some(availability_topic), + payload_available: Some(AVAILABLE_PAYLOAD), + payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), + device: &device_discovery, + }; + + crate::log::debug!( + "discovery for entity '{}': {:?}", + entity_config.id, + discovery + ); + + buffers + .discovery + .resize(buffers.discovery.capacity(), 0) + .unwrap(); + let n = serde_json_core::to_slice(&discovery, buffers.discovery) + .expect("discovery buffer too small"); + buffers.discovery.truncate(n); +} + /// Runs the main Home Assistant device event loop. /// /// This function handles MQTT communication, entity discovery, and state updates. It will run @@ -787,18 +1091,22 @@ pub fn create_device_tracker<'a>( pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { use core::fmt::Write; - const MQTT_TIMEOUT: Duration = Duration::from_secs(30); - - device.availability_topic_buffer.clear(); + device.buffers.availability_topic.clear(); write!( - device.availability_topic_buffer, + device.buffers.availability_topic, "{}", DeviceAvailabilityTopic { device_id: device.config.device_id } ) .expect("device availability buffer too small"); - let availability_topic = device.availability_topic_buffer.as_str(); + + // Store availability_topic in a separate buffer to avoid borrow conflicts + let mut availability_topic_copy = heapless::String::<128>::new(); + availability_topic_copy + .push_str(device.buffers.availability_topic.as_str()) + .expect("availability topic too large"); + let availability_topic = availability_topic_copy.as_str(); let mut ping_ticker = embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); @@ -830,169 +1138,25 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } } - crate::log::debug!("sending discover messages"); - let device_discovery = DeviceDiscovery { - identifiers: &[device.config.device_id], - name: device.config.device_name, - manufacturer: device.config.manufacturer, - model: device.config.model, - }; - - for entity in device.entities { - device.publish_buffer.clear(); - device.subscribe_buffer.clear(); - device.discovery_buffer.clear(); - device.discovery_topic_buffer.clear(); - device.state_topic_buffer.clear(); - device.command_topic_buffer.clear(); - device.attributes_topic_buffer.clear(); - - // borrow the entity and fill out the buffers to be sent - // this should be done inside a block so that we do not hold the RefMut across an - // await - { - let mut entity = entity.borrow_mut(); - let entity = match entity.as_mut() { - Some(entity) => entity, - None => break, - }; - let entity_config = &entity.config; - - let discovery_topic_display = DiscoveryTopicDisplay { - domain: entity_config.domain, - device_id: device.config.device_id, - entity_id: entity_config.id, - }; - let state_topic_display = StateTopicDisplay { - device_id: device.config.device_id, - entity_id: entity_config.id, - }; - let command_topic_display = CommandTopicDisplay { - device_id: device.config.device_id, - entity_id: entity_config.id, - }; - let attributes_topic_display = AttributesTopicDisplay { - device_id: device.config.device_id, - entity_id: entity_config.id, - }; - - write!(device.discovery_topic_buffer, "{discovery_topic_display}") - .expect("discovery topic buffer too small"); - write!(device.state_topic_buffer, "{state_topic_display}") - .expect("state topic buffer too small"); - write!(device.command_topic_buffer, "{command_topic_display}") - .expect("command topic buffer too small"); - write!(device.attributes_topic_buffer, "{attributes_topic_display}") - .expect("attributes topic buffer too small"); - - let discovery = EntityDiscovery { - id: entity_config.id, - name: entity_config.name, - device_class: entity_config.device_class, - state_topic: Some(device.state_topic_buffer.as_str()), - command_topic: Some(device.command_topic_buffer.as_str()), - json_attributes_topic: Some(device.attributes_topic_buffer.as_str()), - unit_of_measurement: entity_config.measurement_unit, - schema: entity_config.schema, - platform: entity_config.platform, - state_class: entity_config.state_class, - icon: entity_config.icon, - entity_category: entity_config.category, - entity_picture: entity_config.picture, - min: entity_config.min, - max: entity_config.max, - step: entity_config.step, - mode: entity_config.mode, - suggested_display_precision: entity_config.suggested_display_precision, - availability_topic: Some(availability_topic), - payload_available: Some(AVAILABLE_PAYLOAD), - payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), - device: &device_discovery, - }; - crate::log::debug!( - "discovery for entity '{}': {:?}", - entity_config.id, - discovery - ); - - device - .discovery_buffer - .resize(device.discovery_buffer.capacity(), 0) - .unwrap(); - let n = serde_json_core::to_slice(&discovery, device.discovery_buffer) - .expect("discovery buffer too small"); - device.discovery_buffer.truncate(n); - } - - let discovery_topic = device.discovery_topic_buffer.as_str(); - crate::log::debug!("sending discovery to topic '{}'", discovery_topic); - match embassy_time::with_timeout( - MQTT_TIMEOUT, - client.publish(discovery_topic, device.discovery_buffer), - ) - .await - { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - crate::log::error!( - "mqtt discovery publish failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt discovery publish failed")); - } - Err(_) => { - crate::log::error!("mqtt discovery publish timed out"); - return Err(Error::new("mqtt discovery publish timed out")); - } - } + device_mqtt_subscribe(&mut client, constants::HA_STATUS_TOPIC).await?; - let command_topic = device.command_topic_buffer.as_str(); - crate::log::debug!("subscribing to command topic '{}'", command_topic); - match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - crate::log::error!( - "mqtt subscribe to '{}' failed with: {:?}", - command_topic, - crate::log::Debug2Format(&err) - ); - return Err(Error::new( - "mqtt subscription to entity command topic failed", - )); - } - Err(_) => { - crate::log::error!("mqtt subscribe to '{}' timed out", command_topic); - return Err(Error::new("mqtt subscribe timed out")); - } - } - } + publish_entity_discoveries( + &mut client, + device.entities, + &mut device.buffers, + &device.config, + availability_topic, + true, + ) + .await?; - match embassy_time::with_timeout( - MQTT_TIMEOUT, - client.publish_with( - availability_topic, - AVAILABLE_PAYLOAD.as_bytes(), - mqtt::PublishParams { - retain: true, - ..Default::default() - }, - ), + device_mqtt_publish( + &mut client, + availability_topic, + AVAILABLE_PAYLOAD.as_bytes(), + true, ) - .await - { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - crate::log::error!( - "mqtt availability publish failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt availability publish failed")); - } - Err(_) => { - crate::log::error!("mqtt availability publish timed out"); - return Err(Error::new("mqtt availability publish timed out")); - } - } + .await?; let mut first_iteration_push = true; 'outer_loop: loop { @@ -1011,7 +1175,7 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } entity.publish = false; - device.publish_buffer.clear(); + device.buffers.publish.clear(); let mut publish_to_attributes = false; match &entity.storage { @@ -1019,37 +1183,39 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re state: Some(SwitchState { value, .. }), .. }) => device - .publish_buffer + .buffers + .publish .extend_from_slice(value.as_str().as_bytes()) .expect("publish buffer too small for switch state payload"), EntityStorage::BinarySensor(BinarySensorStorage { state: Some(BinarySensorState { value, .. }), }) => device - .publish_buffer + .buffers + .publish .extend_from_slice(value.as_str().as_bytes()) .expect("publish buffer too small for binary sensor state payload"), EntityStorage::NumericSensor(NumericSensorStorage { state: Some(NumericSensorState { value, .. }), .. - }) => write!(device.publish_buffer, "{}", value) + }) => write!(device.buffers.publish, "{}", value) .expect("publish buffer too small for numeric sensor payload"), EntityStorage::Number(NumberStorage { state: Some(NumberState { value, .. }), .. - }) => write!(device.publish_buffer, "{}", value) + }) => write!(device.buffers.publish, "{}", value) .expect("publish buffer too small for number state payload"), EntityStorage::DeviceTracker(DeviceTrackerStorage { state: Some(tracker_state), }) => { publish_to_attributes = true; device - .publish_buffer - .resize(device.publish_buffer.capacity(), 0) + .buffers + .publish + .resize(device.buffers.publish.capacity(), 0) .expect("resize to capacity should never fail"); - let n = - serde_json_core::to_slice(&tracker_state, device.publish_buffer) - .expect("publish buffer too small for tracker state payload"); - device.publish_buffer.truncate(n); + let n = serde_json_core::to_slice(&tracker_state, device.buffers.publish) + .expect("publish buffer too small for tracker state payload"); + device.buffers.publish.truncate(n); } _ => { if !first_iteration_push { @@ -1067,42 +1233,26 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re device_id: device.config.device_id, entity_id: entity.config.id, }; - device.attributes_topic_buffer.clear(); - write!(device.attributes_topic_buffer, "{attributes_topic_display}") - .expect("attributes topic buffer too small"); - device.attributes_topic_buffer.as_str() + device.buffers.attributes_topic.clear(); + write!( + device.buffers.attributes_topic, + "{attributes_topic_display}" + ) + .expect("attributes topic buffer too small"); + device.buffers.attributes_topic.as_str() } else { let state_topic_display = StateTopicDisplay { device_id: device.config.device_id, entity_id: entity.config.id, }; - device.state_topic_buffer.clear(); - write!(device.state_topic_buffer, "{state_topic_display}") + device.buffers.state_topic.clear(); + write!(device.buffers.state_topic, "{state_topic_display}") .expect("state topic buffer too small"); - device.state_topic_buffer.as_str() + device.buffers.state_topic.as_str() } }; - match embassy_time::with_timeout( - MQTT_TIMEOUT, - client.publish(publish_topic, device.publish_buffer), - ) - .await - { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - crate::log::error!( - "mqtt state publish on topic '{}' failed with: {:?}", - publish_topic, - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt publish failed")); - } - Err(_) => { - crate::log::error!("mqtt state publish on topic '{}' timed out", publish_topic); - return Err(Error::new("mqtt publish timed out")); - } - } + device_mqtt_publish(&mut client, publish_topic, device.buffers.publish, false).await?; } first_iteration_push = false; @@ -1134,6 +1284,29 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re } }; + if publish.topic == constants::HA_STATUS_TOPIC { + let mut receive_buffer = [0u8; 64]; + let receive_data_len = publish.data_len; + let receive_data = + mqtt_receive_data(&mut client, receive_data_len, &mut receive_buffer).await?; + + if receive_data == constants::HA_STATUS_PAYLOAD_ONLINE.as_bytes() { + first_iteration_push = true; + + crate::log::debug!("home assistant came online, republishing discoveries"); + publish_entity_discoveries( + &mut client, + device.entities, + &mut device.buffers, + &device.config, + availability_topic, + false, + ) + .await?; + } + continue; + } + let entity = 'entity_search_block: { for entity in device.entities { let mut data = entity.borrow_mut(); @@ -1146,11 +1319,11 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re device_id: device.config.device_id, entity_id: data.config.id, }; - device.command_topic_buffer.clear(); - write!(device.command_topic_buffer, "{command_topic_display}") + device.buffers.command_topic.clear(); + write!(device.buffers.command_topic, "{command_topic_display}") .expect("command topic buffer too small"); - if device.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { + if device.buffers.command_topic.as_bytes() == publish.topic.as_bytes() { break 'entity_search_block entity; } } @@ -1158,43 +1331,14 @@ pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Re }; let mut read_buffer = [0u8; 128]; - if publish.data_len > read_buffer.len() { - crate::log::warn!( - "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", - publish.topic, - publish.data_len - ); - continue; - } - - crate::log::debug!( - "mqtt receiving {} bytes of data on topic {}", - publish.data_len, - publish.topic - ); - let data_len = publish.data_len; - match embassy_time::with_timeout( - MQTT_TIMEOUT, - client.receive_data(&mut read_buffer[..data_len]), - ) - .await - { - Ok(Ok(())) => {} - Ok(Err(err)) => { - crate::log::error!( - "mqtt receive data failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt receive data failed")); - } - Err(_) => { - crate::log::error!("mqtt receive data timed out"); - return Err(Error::new("mqtt receive data timed out")); - } - } + let receive_data = + match mqtt_receive_data(&mut client, data_len, &mut read_buffer).await { + Ok(data) => data, + Err(_) => continue 'outer_loop, + }; - let command = match str::from_utf8(&read_buffer[..data_len]) { + let command = match str::from_utf8(receive_data) { Ok(command) => command, Err(_) => { crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); -- cgit