aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2026-01-09 11:28:49 +0000
committerdiogo464 <[email protected]>2026-01-09 11:30:08 +0000
commit27779ecf0d7c00ae0acb650c0d75d28edcc3aa89 (patch)
treec9312c4baf2f0f1a9e19b80feb0a7637c31ad5e2 /src/lib.rs
parent0a1659dbd57180a7c1337d2ec4138f8c29427ce3 (diff)
add mqtt helper functions and refactor buffer management
Created helper functions to eliminate code duplication in MQTT operations: - device_mqtt_subscribe: handles subscription with timeout and error logging - device_mqtt_publish: handles publishing with optional retain flag - mqtt_receive_data: handles data receiving with buffer validation and timeout - publish_entity_discoveries: publishes all entity discoveries - generate_entity_discovery: generates entity discovery payloads Refactored buffer management by introducing DeviceBuffersOwned and DeviceBuffers structures to group related buffers together, making them easier to pass to helper functions. Added MQTT_TIMEOUT constant at module level for consistent timeout handling. Fixed bug where only the last entity's discovery was republished when Home Assistant came back online. Now correctly republishes all entities by subscribing to homeassistant/status and using publish_entity_discoveries when the "online" status is received. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs690
1 files changed, 417 insertions, 273 deletions
diff --git a/src/lib.rs b/src/lib.rs
index edc60f2..b09bda2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -153,6 +153,7 @@ pub use unit::*;
153const AVAILABLE_PAYLOAD: &str = "online"; 153const AVAILABLE_PAYLOAD: &str = "online";
154const NOT_AVAILABLE_PAYLOAD: &str = "offline"; 154const NOT_AVAILABLE_PAYLOAD: &str = "offline";
155const DEFAULT_KEEPALIVE_TIME: u16 = 30; 155const DEFAULT_KEEPALIVE_TIME: u16 = 30;
156const MQTT_TIMEOUT: Duration = Duration::from_secs(30);
156 157
157#[derive(Debug)] 158#[derive(Debug)]
158pub struct Error(&'static str); 159pub struct Error(&'static str);
@@ -323,19 +324,77 @@ pub struct DeviceConfig {
323 pub model: &'static str, 324 pub model: &'static str,
324} 325}
325 326
327pub struct DeviceBuffersOwned {
328 pub publish: Vec<u8, 2048>,
329 pub subscribe: Vec<u8, 128>,
330 pub discovery: Vec<u8, 2048>,
331 pub availability_topic: String<128>,
332 pub discovery_topic: String<128>,
333 pub state_topic: String<128>,
334 pub command_topic: String<128>,
335 pub attributes_topic: String<128>,
336}
337
338impl Default for DeviceBuffersOwned {
339 fn default() -> Self {
340 Self {
341 publish: Default::default(),
342 subscribe: Default::default(),
343 discovery: Default::default(),
344 availability_topic: Default::default(),
345 discovery_topic: Default::default(),
346 state_topic: Default::default(),
347 command_topic: Default::default(),
348 attributes_topic: Default::default(),
349 }
350 }
351}
352
353impl DeviceBuffersOwned {
354 pub fn as_buffers_mut(&mut self) -> DeviceBuffers<'_> {
355 DeviceBuffers {
356 publish: &mut self.publish,
357 subscribe: &mut self.subscribe,
358 discovery: &mut self.discovery,
359 availability_topic: &mut self.availability_topic,
360 discovery_topic: &mut self.discovery_topic,
361 state_topic: &mut self.state_topic,
362 command_topic: &mut self.command_topic,
363 attributes_topic: &mut self.attributes_topic,
364 }
365 }
366}
367
368pub struct DeviceBuffers<'a> {
369 pub publish: &'a mut VecView<u8>,
370 pub subscribe: &'a mut VecView<u8>,
371 pub discovery: &'a mut VecView<u8>,
372 pub availability_topic: &'a mut StringView,
373 pub discovery_topic: &'a mut StringView,
374 pub state_topic: &'a mut StringView,
375 pub command_topic: &'a mut StringView,
376 pub attributes_topic: &'a mut StringView,
377}
378
379impl<'a> DeviceBuffers<'a> {
380 pub fn clear(&mut self) {
381 self.publish.clear();
382 self.subscribe.clear();
383 self.discovery.clear();
384 self.availability_topic.clear();
385 self.discovery_topic.clear();
386 self.state_topic.clear();
387 self.command_topic.clear();
388 self.attributes_topic.clear();
389 }
390}
391
326pub struct DeviceResources { 392pub struct DeviceResources {
327 waker: AtomicWaker, 393 waker: AtomicWaker,
328 entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT], 394 entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT],
329 395
330 mqtt_resources: mqtt::ClientResources, 396 mqtt_resources: mqtt::ClientResources,
331 publish_buffer: Vec<u8, 2048>, 397 buffers: DeviceBuffersOwned,
332 subscribe_buffer: Vec<u8, 128>,
333 discovery_buffer: Vec<u8, 2048>,
334 availability_topic_buffer: String<128>,
335 discovery_topic_buffer: String<128>,
336 state_topic_buffer: String<128>,
337 command_topic_buffer: String<128>,
338 attributes_topic_buffer: String<128>,
339} 398}
340 399
341impl DeviceResources { 400impl DeviceResources {
@@ -349,14 +408,7 @@ impl Default for DeviceResources {
349 entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT], 408 entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT],
350 409
351 mqtt_resources: Default::default(), 410 mqtt_resources: Default::default(),
352 publish_buffer: Default::default(), 411 buffers: Default::default(),
353 subscribe_buffer: Default::default(),
354 discovery_buffer: Default::default(),
355 availability_topic_buffer: Default::default(),
356 discovery_topic_buffer: Default::default(),
357 state_topic_buffer: Default::default(),
358 command_topic_buffer: Default::default(),
359 attributes_topic_buffer: Default::default(),
360 } 412 }
361 } 413 }
362} 414}
@@ -564,14 +616,7 @@ pub struct Device<'a> {
564 entities: &'a [RefCell<Option<EntityData>>], 616 entities: &'a [RefCell<Option<EntityData>>],
565 617
566 mqtt_resources: &'a mut mqtt::ClientResources, 618 mqtt_resources: &'a mut mqtt::ClientResources,
567 publish_buffer: &'a mut VecView<u8>, 619 buffers: DeviceBuffers<'a>,
568 subscribe_buffer: &'a mut VecView<u8>,
569 discovery_buffer: &'a mut VecView<u8>,
570 availability_topic_buffer: &'a mut StringView,
571 discovery_topic_buffer: &'a mut StringView,
572 state_topic_buffer: &'a mut StringView,
573 command_topic_buffer: &'a mut StringView,
574 attributes_topic_buffer: &'a mut StringView,
575} 620}
576 621
577pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> { 622pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> {
@@ -581,14 +626,7 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi
581 entities: &resources.entities, 626 entities: &resources.entities,
582 627
583 mqtt_resources: &mut resources.mqtt_resources, 628 mqtt_resources: &mut resources.mqtt_resources,
584 publish_buffer: &mut resources.publish_buffer, 629 buffers: resources.buffers.as_buffers_mut(),
585 subscribe_buffer: &mut resources.subscribe_buffer,
586 discovery_buffer: &mut resources.discovery_buffer,
587 availability_topic_buffer: &mut resources.availability_topic_buffer,
588 discovery_topic_buffer: &mut resources.discovery_topic_buffer,
589 state_topic_buffer: &mut resources.state_topic_buffer,
590 command_topic_buffer: &mut resources.command_topic_buffer,
591 attributes_topic_buffer: &mut resources.attributes_topic_buffer,
592 } 630 }
593} 631}
594 632
@@ -741,6 +779,272 @@ pub fn create_device_tracker<'a>(
741 DeviceTracker::new(entity) 779 DeviceTracker::new(entity)
742} 780}
743 781
782async fn device_mqtt_subscribe<T: Transport>(
783 client: &mut mqtt::Client<'_, T>,
784 topic: impl core::fmt::Display,
785) -> Result<(), Error> {
786 use core::fmt::Write;
787
788 // Format topic to string for both subscribe call and logging
789 let mut topic_buffer = heapless::String::<128>::new();
790 write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
791 let topic_str = topic_buffer.as_str();
792
793 match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(topic_str)).await {
794 Ok(Ok(_)) => Ok(()),
795 Ok(Err(err)) => {
796 crate::log::error!(
797 "mqtt subscribe to '{}' failed with: {:?}",
798 topic_str,
799 crate::log::Debug2Format(&err)
800 );
801 Err(Error::new("mqtt subscribe failed"))
802 }
803 Err(_) => {
804 crate::log::error!("mqtt subscribe to '{}' timed out", topic_str);
805 Err(Error::new("mqtt subscribe timed out"))
806 }
807 }
808}
809
810async fn device_mqtt_publish<T: Transport>(
811 client: &mut mqtt::Client<'_, T>,
812 topic: impl core::fmt::Display,
813 data: &[u8],
814 retain: bool,
815) -> Result<(), Error> {
816 use core::fmt::Write;
817
818 // Format topic to string for both publish call and logging
819 let mut topic_buffer = heapless::String::<128>::new();
820 write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
821 let topic_str = topic_buffer.as_str();
822
823 let result = if retain {
824 embassy_time::with_timeout(
825 MQTT_TIMEOUT,
826 client.publish_with(
827 topic_str,
828 data,
829 mqtt::PublishParams {
830 retain: true,
831 ..Default::default()
832 },
833 ),
834 )
835 .await
836 } else {
837 embassy_time::with_timeout(MQTT_TIMEOUT, client.publish(topic_str, data)).await
838 };
839
840 match result {
841 Ok(Ok(_)) => Ok(()),
842 Ok(Err(err)) => {
843 crate::log::error!(
844 "mqtt publish to '{}' failed with: {:?}",
845 topic_str,
846 crate::log::Debug2Format(&err)
847 );
848 Err(Error::new("mqtt publish failed"))
849 }
850 Err(_) => {
851 crate::log::error!("mqtt publish to '{}' timed out", topic_str);
852 Err(Error::new("mqtt publish timed out"))
853 }
854 }
855}
856
857/// Receives MQTT publish data with timeout and proper error handling.
858///
859/// This helper function handles the common pattern of receiving MQTT data with:
860/// - Automatic timeout handling using MQTT_TIMEOUT
861/// - Consistent error logging
862/// - Size validation against buffer capacity
863///
864/// # Arguments
865/// * `client` - MQTT client for receiving data
866/// * `data_len` - Expected length of data to receive
867/// * `buffer` - Buffer to receive the data into
868///
869/// # Returns
870/// * `Ok(&[u8])` - Slice of the buffer containing the received data
871/// * `Err(Error)` - If operation fails, times out, or data exceeds buffer size
872///
873/// # Errors
874/// Returns error if:
875/// - `data_len` is greater than `buffer.len()` (buffer too small)
876/// - The receive operation times out after MQTT_TIMEOUT seconds
877/// - The underlying MQTT receive operation fails
878async fn mqtt_receive_data<'a, T: Transport>(
879 client: &mut mqtt::Client<'_, T>,
880 data_len: usize,
881 buffer: &'a mut [u8],
882) -> Result<&'a [u8], Error> {
883 // Validate buffer size - reject if too small (per user requirement)
884 if data_len > buffer.len() {
885 crate::log::warn!(
886 "mqtt publish payload is too large ({} bytes, buffer size {} bytes), rejecting",
887 data_len,
888 buffer.len()
889 );
890 return Err(Error::new("mqtt payload too large for buffer"));
891 }
892
893 crate::log::debug!("mqtt receiving {} bytes of data", data_len);
894
895 match embassy_time::with_timeout(MQTT_TIMEOUT, client.receive_data(&mut buffer[..data_len]))
896 .await
897 {
898 Ok(Ok(())) => Ok(&buffer[..data_len]),
899 Ok(Err(err)) => {
900 crate::log::error!(
901 "mqtt receive data failed with: {:?}",
902 crate::log::Debug2Format(&err)
903 );
904 Err(Error::new("mqtt receive data failed"))
905 }
906 Err(_) => {
907 crate::log::error!("mqtt receive data timed out");
908 Err(Error::new("mqtt receive data timed out"))
909 }
910 }
911}
912
913/// Publishes discovery messages for all entities in the device.
914///
915/// This function iterates over all entities, generates their discovery messages,
916/// publishes them to MQTT, and optionally subscribes to their command topics.
917///
918/// # Arguments
919/// * `client` - MQTT client for publishing and subscribing
920/// * `entities` - Slice of entities to publish discoveries for
921/// * `buffers` - Device buffers for generating discovery messages
922/// * `device_config` - Device configuration
923/// * `availability_topic` - The device availability topic string
924/// * `subscribe_to_commands` - Whether to subscribe to command topics (true for initial discovery, false for rediscovery)
925///
926/// # Returns
927/// `Ok(())` if all discoveries were published successfully, or an error if any operation fails
928async fn publish_entity_discoveries<T: Transport>(
929 client: &mut mqtt::Client<'_, T>,
930 entities: &[RefCell<Option<EntityData>>],
931 buffers: &mut DeviceBuffers<'_>,
932 device_config: &DeviceConfig,
933 availability_topic: &str,
934 subscribe_to_commands: bool,
935) -> Result<(), Error> {
936 crate::log::debug!("publishing entity discovery messages");
937
938 for entity in entities {
939 buffers.clear();
940
941 // Borrow the entity and fill out the buffers to be sent
942 // This should be done inside a block so that we do not hold the RefMut across an await
943 {
944 let mut entity = entity.borrow_mut();
945 let entity = match entity.as_mut() {
946 Some(entity) => entity,
947 None => break,
948 };
949
950 generate_entity_discovery(buffers, device_config, &entity.config, availability_topic);
951 }
952
953 let discovery_topic = buffers.discovery_topic.as_str();
954 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
955 device_mqtt_publish(client, discovery_topic, buffers.discovery, false).await?;
956
957 if subscribe_to_commands {
958 let command_topic = buffers.command_topic.as_str();
959 crate::log::debug!("subscribing to command topic '{}'", command_topic);
960 device_mqtt_subscribe(client, command_topic).await?;
961 }
962 }
963
964 Ok(())
965}
966
967fn generate_entity_discovery(
968 buffers: &mut DeviceBuffers<'_>,
969 device_config: &DeviceConfig,
970 entity_config: &EntityConfig,
971 availability_topic: &str,
972) {
973 use core::fmt::Write;
974
975 let discovery_topic_display = DiscoveryTopicDisplay {
976 domain: entity_config.domain,
977 device_id: device_config.device_id,
978 entity_id: entity_config.id,
979 };
980 let state_topic_display = StateTopicDisplay {
981 device_id: device_config.device_id,
982 entity_id: entity_config.id,
983 };
984 let command_topic_display = CommandTopicDisplay {
985 device_id: device_config.device_id,
986 entity_id: entity_config.id,
987 };
988 let attributes_topic_display = AttributesTopicDisplay {
989 device_id: device_config.device_id,
990 entity_id: entity_config.id,
991 };
992
993 write!(buffers.discovery_topic, "{discovery_topic_display}")
994 .expect("discovery topic buffer too small");
995 write!(buffers.state_topic, "{state_topic_display}").expect("state topic buffer too small");
996 write!(buffers.command_topic, "{command_topic_display}")
997 .expect("command topic buffer too small");
998 write!(buffers.attributes_topic, "{attributes_topic_display}")
999 .expect("attributes topic buffer too small");
1000
1001 let device_discovery = DeviceDiscovery {
1002 identifiers: &[device_config.device_id],
1003 name: device_config.device_name,
1004 manufacturer: device_config.manufacturer,
1005 model: device_config.model,
1006 };
1007
1008 let discovery = EntityDiscovery {
1009 id: entity_config.id,
1010 name: entity_config.name,
1011 device_class: entity_config.device_class,
1012 state_topic: Some(buffers.state_topic.as_str()),
1013 command_topic: Some(buffers.command_topic.as_str()),
1014 json_attributes_topic: Some(buffers.attributes_topic.as_str()),
1015 unit_of_measurement: entity_config.measurement_unit,
1016 schema: entity_config.schema,
1017 platform: entity_config.platform,
1018 state_class: entity_config.state_class,
1019 icon: entity_config.icon,
1020 entity_category: entity_config.category,
1021 entity_picture: entity_config.picture,
1022 min: entity_config.min,
1023 max: entity_config.max,
1024 step: entity_config.step,
1025 mode: entity_config.mode,
1026 suggested_display_precision: entity_config.suggested_display_precision,
1027 availability_topic: Some(availability_topic),
1028 payload_available: Some(AVAILABLE_PAYLOAD),
1029 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
1030 device: &device_discovery,
1031 };
1032
1033 crate::log::debug!(
1034 "discovery for entity '{}': {:?}",
1035 entity_config.id,
1036 discovery
1037 );
1038
1039 buffers
1040 .discovery
1041 .resize(buffers.discovery.capacity(), 0)
1042 .unwrap();
1043 let n = serde_json_core::to_slice(&discovery, buffers.discovery)
1044 .expect("discovery buffer too small");
1045 buffers.discovery.truncate(n);
1046}
1047
744/// Runs the main Home Assistant device event loop. 1048/// Runs the main Home Assistant device event loop.
745/// 1049///
746/// This function handles MQTT communication, entity discovery, and state updates. It will run 1050/// This function handles MQTT communication, entity discovery, and state updates. It will run
@@ -787,18 +1091,22 @@ pub fn create_device_tracker<'a>(
787pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { 1091pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> {
788 use core::fmt::Write; 1092 use core::fmt::Write;
789 1093
790 const MQTT_TIMEOUT: Duration = Duration::from_secs(30); 1094 device.buffers.availability_topic.clear();
791
792 device.availability_topic_buffer.clear();
793 write!( 1095 write!(
794 device.availability_topic_buffer, 1096 device.buffers.availability_topic,
795 "{}", 1097 "{}",
796 DeviceAvailabilityTopic { 1098 DeviceAvailabilityTopic {
797 device_id: device.config.device_id 1099 device_id: device.config.device_id
798 } 1100 }
799 ) 1101 )
800 .expect("device availability buffer too small"); 1102 .expect("device availability buffer too small");
801 let availability_topic = device.availability_topic_buffer.as_str(); 1103
1104 // Store availability_topic in a separate buffer to avoid borrow conflicts
1105 let mut availability_topic_copy = heapless::String::<128>::new();
1106 availability_topic_copy
1107 .push_str(device.buffers.availability_topic.as_str())
1108 .expect("availability topic too large");
1109 let availability_topic = availability_topic_copy.as_str();
802 1110
803 let mut ping_ticker = 1111 let mut ping_ticker =
804 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); 1112 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
@@ -830,169 +1138,25 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
830 } 1138 }
831 } 1139 }
832 1140
833 crate::log::debug!("sending discover messages"); 1141 device_mqtt_subscribe(&mut client, constants::HA_STATUS_TOPIC).await?;
834 let device_discovery = DeviceDiscovery {
835 identifiers: &[device.config.device_id],
836 name: device.config.device_name,
837 manufacturer: device.config.manufacturer,
838 model: device.config.model,
839 };
840
841 for entity in device.entities {
842 device.publish_buffer.clear();
843 device.subscribe_buffer.clear();
844 device.discovery_buffer.clear();
845 device.discovery_topic_buffer.clear();
846 device.state_topic_buffer.clear();
847 device.command_topic_buffer.clear();
848 device.attributes_topic_buffer.clear();
849
850 // borrow the entity and fill out the buffers to be sent
851 // this should be done inside a block so that we do not hold the RefMut across an
852 // await
853 {
854 let mut entity = entity.borrow_mut();
855 let entity = match entity.as_mut() {
856 Some(entity) => entity,
857 None => break,
858 };
859 let entity_config = &entity.config;
860
861 let discovery_topic_display = DiscoveryTopicDisplay {
862 domain: entity_config.domain,
863 device_id: device.config.device_id,
864 entity_id: entity_config.id,
865 };
866 let state_topic_display = StateTopicDisplay {
867 device_id: device.config.device_id,
868 entity_id: entity_config.id,
869 };
870 let command_topic_display = CommandTopicDisplay {
871 device_id: device.config.device_id,
872 entity_id: entity_config.id,
873 };
874 let attributes_topic_display = AttributesTopicDisplay {
875 device_id: device.config.device_id,
876 entity_id: entity_config.id,
877 };
878
879 write!(device.discovery_topic_buffer, "{discovery_topic_display}")
880 .expect("discovery topic buffer too small");
881 write!(device.state_topic_buffer, "{state_topic_display}")
882 .expect("state topic buffer too small");
883 write!(device.command_topic_buffer, "{command_topic_display}")
884 .expect("command topic buffer too small");
885 write!(device.attributes_topic_buffer, "{attributes_topic_display}")
886 .expect("attributes topic buffer too small");
887
888 let discovery = EntityDiscovery {
889 id: entity_config.id,
890 name: entity_config.name,
891 device_class: entity_config.device_class,
892 state_topic: Some(device.state_topic_buffer.as_str()),
893 command_topic: Some(device.command_topic_buffer.as_str()),
894 json_attributes_topic: Some(device.attributes_topic_buffer.as_str()),
895 unit_of_measurement: entity_config.measurement_unit,
896 schema: entity_config.schema,
897 platform: entity_config.platform,
898 state_class: entity_config.state_class,
899 icon: entity_config.icon,
900 entity_category: entity_config.category,
901 entity_picture: entity_config.picture,
902 min: entity_config.min,
903 max: entity_config.max,
904 step: entity_config.step,
905 mode: entity_config.mode,
906 suggested_display_precision: entity_config.suggested_display_precision,
907 availability_topic: Some(availability_topic),
908 payload_available: Some(AVAILABLE_PAYLOAD),
909 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
910 device: &device_discovery,
911 };
912 crate::log::debug!(
913 "discovery for entity '{}': {:?}",
914 entity_config.id,
915 discovery
916 );
917
918 device
919 .discovery_buffer
920 .resize(device.discovery_buffer.capacity(), 0)
921 .unwrap();
922 let n = serde_json_core::to_slice(&discovery, device.discovery_buffer)
923 .expect("discovery buffer too small");
924 device.discovery_buffer.truncate(n);
925 }
926
927 let discovery_topic = device.discovery_topic_buffer.as_str();
928 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
929 match embassy_time::with_timeout(
930 MQTT_TIMEOUT,
931 client.publish(discovery_topic, device.discovery_buffer),
932 )
933 .await
934 {
935 Ok(Ok(_)) => {}
936 Ok(Err(err)) => {
937 crate::log::error!(
938 "mqtt discovery publish failed with: {:?}",
939 crate::log::Debug2Format(&err)
940 );
941 return Err(Error::new("mqtt discovery publish failed"));
942 }
943 Err(_) => {
944 crate::log::error!("mqtt discovery publish timed out");
945 return Err(Error::new("mqtt discovery publish timed out"));
946 }
947 }
948 1142
949 let command_topic = device.command_topic_buffer.as_str(); 1143 publish_entity_discoveries(
950 crate::log::debug!("subscribing to command topic '{}'", command_topic); 1144 &mut client,
951 match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await { 1145 device.entities,
952 Ok(Ok(_)) => {} 1146 &mut device.buffers,
953 Ok(Err(err)) => { 1147 &device.config,
954 crate::log::error!( 1148 availability_topic,
955 "mqtt subscribe to '{}' failed with: {:?}", 1149 true,
956 command_topic, 1150 )
957 crate::log::Debug2Format(&err) 1151 .await?;
958 );
959 return Err(Error::new(
960 "mqtt subscription to entity command topic failed",
961 ));
962 }
963 Err(_) => {
964 crate::log::error!("mqtt subscribe to '{}' timed out", command_topic);
965 return Err(Error::new("mqtt subscribe timed out"));
966 }
967 }
968 }
969 1152
970 match embassy_time::with_timeout( 1153 device_mqtt_publish(
971 MQTT_TIMEOUT, 1154 &mut client,
972 client.publish_with( 1155 availability_topic,
973 availability_topic, 1156 AVAILABLE_PAYLOAD.as_bytes(),
974 AVAILABLE_PAYLOAD.as_bytes(), 1157 true,
975 mqtt::PublishParams {
976 retain: true,
977 ..Default::default()
978 },
979 ),
980 ) 1158 )
981 .await 1159 .await?;
982 {
983 Ok(Ok(_)) => {}
984 Ok(Err(err)) => {
985 crate::log::error!(
986 "mqtt availability publish failed with: {:?}",
987 crate::log::Debug2Format(&err)
988 );
989 return Err(Error::new("mqtt availability publish failed"));
990 }
991 Err(_) => {
992 crate::log::error!("mqtt availability publish timed out");
993 return Err(Error::new("mqtt availability publish timed out"));
994 }
995 }
996 1160
997 let mut first_iteration_push = true; 1161 let mut first_iteration_push = true;
998 'outer_loop: loop { 1162 'outer_loop: loop {
@@ -1011,7 +1175,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1011 } 1175 }
1012 1176
1013 entity.publish = false; 1177 entity.publish = false;
1014 device.publish_buffer.clear(); 1178 device.buffers.publish.clear();
1015 1179
1016 let mut publish_to_attributes = false; 1180 let mut publish_to_attributes = false;
1017 match &entity.storage { 1181 match &entity.storage {
@@ -1019,37 +1183,39 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1019 state: Some(SwitchState { value, .. }), 1183 state: Some(SwitchState { value, .. }),
1020 .. 1184 ..
1021 }) => device 1185 }) => device
1022 .publish_buffer 1186 .buffers
1187 .publish
1023 .extend_from_slice(value.as_str().as_bytes()) 1188 .extend_from_slice(value.as_str().as_bytes())
1024 .expect("publish buffer too small for switch state payload"), 1189 .expect("publish buffer too small for switch state payload"),
1025 EntityStorage::BinarySensor(BinarySensorStorage { 1190 EntityStorage::BinarySensor(BinarySensorStorage {
1026 state: Some(BinarySensorState { value, .. }), 1191 state: Some(BinarySensorState { value, .. }),
1027 }) => device 1192 }) => device
1028 .publish_buffer 1193 .buffers
1194 .publish
1029 .extend_from_slice(value.as_str().as_bytes()) 1195 .extend_from_slice(value.as_str().as_bytes())
1030 .expect("publish buffer too small for binary sensor state payload"), 1196 .expect("publish buffer too small for binary sensor state payload"),
1031 EntityStorage::NumericSensor(NumericSensorStorage { 1197 EntityStorage::NumericSensor(NumericSensorStorage {
1032 state: Some(NumericSensorState { value, .. }), 1198 state: Some(NumericSensorState { value, .. }),
1033 .. 1199 ..
1034 }) => write!(device.publish_buffer, "{}", value) 1200 }) => write!(device.buffers.publish, "{}", value)
1035 .expect("publish buffer too small for numeric sensor payload"), 1201 .expect("publish buffer too small for numeric sensor payload"),
1036 EntityStorage::Number(NumberStorage { 1202 EntityStorage::Number(NumberStorage {
1037 state: Some(NumberState { value, .. }), 1203 state: Some(NumberState { value, .. }),
1038 .. 1204 ..
1039 }) => write!(device.publish_buffer, "{}", value) 1205 }) => write!(device.buffers.publish, "{}", value)
1040 .expect("publish buffer too small for number state payload"), 1206 .expect("publish buffer too small for number state payload"),
1041 EntityStorage::DeviceTracker(DeviceTrackerStorage { 1207 EntityStorage::DeviceTracker(DeviceTrackerStorage {
1042 state: Some(tracker_state), 1208 state: Some(tracker_state),
1043 }) => { 1209 }) => {
1044 publish_to_attributes = true; 1210 publish_to_attributes = true;
1045 device 1211 device
1046 .publish_buffer 1212 .buffers
1047 .resize(device.publish_buffer.capacity(), 0) 1213 .publish
1214 .resize(device.buffers.publish.capacity(), 0)
1048 .expect("resize to capacity should never fail"); 1215 .expect("resize to capacity should never fail");
1049 let n = 1216 let n = serde_json_core::to_slice(&tracker_state, device.buffers.publish)
1050 serde_json_core::to_slice(&tracker_state, device.publish_buffer) 1217 .expect("publish buffer too small for tracker state payload");
1051 .expect("publish buffer too small for tracker state payload"); 1218 device.buffers.publish.truncate(n);
1052 device.publish_buffer.truncate(n);
1053 } 1219 }
1054 _ => { 1220 _ => {
1055 if !first_iteration_push { 1221 if !first_iteration_push {
@@ -1067,42 +1233,26 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1067 device_id: device.config.device_id, 1233 device_id: device.config.device_id,
1068 entity_id: entity.config.id, 1234 entity_id: entity.config.id,
1069 }; 1235 };
1070 device.attributes_topic_buffer.clear(); 1236 device.buffers.attributes_topic.clear();
1071 write!(device.attributes_topic_buffer, "{attributes_topic_display}") 1237 write!(
1072 .expect("attributes topic buffer too small"); 1238 device.buffers.attributes_topic,
1073 device.attributes_topic_buffer.as_str() 1239 "{attributes_topic_display}"
1240 )
1241 .expect("attributes topic buffer too small");
1242 device.buffers.attributes_topic.as_str()
1074 } else { 1243 } else {
1075 let state_topic_display = StateTopicDisplay { 1244 let state_topic_display = StateTopicDisplay {
1076 device_id: device.config.device_id, 1245 device_id: device.config.device_id,
1077 entity_id: entity.config.id, 1246 entity_id: entity.config.id,
1078 }; 1247 };
1079 device.state_topic_buffer.clear(); 1248 device.buffers.state_topic.clear();
1080 write!(device.state_topic_buffer, "{state_topic_display}") 1249 write!(device.buffers.state_topic, "{state_topic_display}")
1081 .expect("state topic buffer too small"); 1250 .expect("state topic buffer too small");
1082 device.state_topic_buffer.as_str() 1251 device.buffers.state_topic.as_str()
1083 } 1252 }
1084 }; 1253 };
1085 1254
1086 match embassy_time::with_timeout( 1255 device_mqtt_publish(&mut client, publish_topic, device.buffers.publish, false).await?;
1087 MQTT_TIMEOUT,
1088 client.publish(publish_topic, device.publish_buffer),
1089 )
1090 .await
1091 {
1092 Ok(Ok(_)) => {}
1093 Ok(Err(err)) => {
1094 crate::log::error!(
1095 "mqtt state publish on topic '{}' failed with: {:?}",
1096 publish_topic,
1097 crate::log::Debug2Format(&err)
1098 );
1099 return Err(Error::new("mqtt publish failed"));
1100 }
1101 Err(_) => {
1102 crate::log::error!("mqtt state publish on topic '{}' timed out", publish_topic);
1103 return Err(Error::new("mqtt publish timed out"));
1104 }
1105 }
1106 } 1256 }
1107 first_iteration_push = false; 1257 first_iteration_push = false;
1108 1258
@@ -1134,6 +1284,29 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1134 } 1284 }
1135 }; 1285 };
1136 1286
1287 if publish.topic == constants::HA_STATUS_TOPIC {
1288 let mut receive_buffer = [0u8; 64];
1289 let receive_data_len = publish.data_len;
1290 let receive_data =
1291 mqtt_receive_data(&mut client, receive_data_len, &mut receive_buffer).await?;
1292
1293 if receive_data == constants::HA_STATUS_PAYLOAD_ONLINE.as_bytes() {
1294 first_iteration_push = true;
1295
1296 crate::log::debug!("home assistant came online, republishing discoveries");
1297 publish_entity_discoveries(
1298 &mut client,
1299 device.entities,
1300 &mut device.buffers,
1301 &device.config,
1302 availability_topic,
1303 false,
1304 )
1305 .await?;
1306 }
1307 continue;
1308 }
1309
1137 let entity = 'entity_search_block: { 1310 let entity = 'entity_search_block: {
1138 for entity in device.entities { 1311 for entity in device.entities {
1139 let mut data = entity.borrow_mut(); 1312 let mut data = entity.borrow_mut();
@@ -1146,11 +1319,11 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1146 device_id: device.config.device_id, 1319 device_id: device.config.device_id,
1147 entity_id: data.config.id, 1320 entity_id: data.config.id,
1148 }; 1321 };
1149 device.command_topic_buffer.clear(); 1322 device.buffers.command_topic.clear();
1150 write!(device.command_topic_buffer, "{command_topic_display}") 1323 write!(device.buffers.command_topic, "{command_topic_display}")
1151 .expect("command topic buffer too small"); 1324 .expect("command topic buffer too small");
1152 1325
1153 if device.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { 1326 if device.buffers.command_topic.as_bytes() == publish.topic.as_bytes() {
1154 break 'entity_search_block entity; 1327 break 'entity_search_block entity;
1155 } 1328 }
1156 } 1329 }
@@ -1158,43 +1331,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1158 }; 1331 };
1159 1332
1160 let mut read_buffer = [0u8; 128]; 1333 let mut read_buffer = [0u8; 128];
1161 if publish.data_len > read_buffer.len() {
1162 crate::log::warn!(
1163 "mqtt publish payload on topic {} is too large ({} bytes), ignoring it",
1164 publish.topic,
1165 publish.data_len
1166 );
1167 continue;
1168 }
1169
1170 crate::log::debug!(
1171 "mqtt receiving {} bytes of data on topic {}",
1172 publish.data_len,
1173 publish.topic
1174 );
1175
1176 let data_len = publish.data_len; 1334 let data_len = publish.data_len;
1177 match embassy_time::with_timeout( 1335 let receive_data =
1178 MQTT_TIMEOUT, 1336 match mqtt_receive_data(&mut client, data_len, &mut read_buffer).await {
1179 client.receive_data(&mut read_buffer[..data_len]), 1337 Ok(data) => data,
1180 ) 1338 Err(_) => continue 'outer_loop,
1181 .await 1339 };
1182 {
1183 Ok(Ok(())) => {}
1184 Ok(Err(err)) => {
1185 crate::log::error!(
1186 "mqtt receive data failed with: {:?}",
1187 crate::log::Debug2Format(&err)
1188 );
1189 return Err(Error::new("mqtt receive data failed"));
1190 }
1191 Err(_) => {
1192 crate::log::error!("mqtt receive data timed out");
1193 return Err(Error::new("mqtt receive data timed out"));
1194 }
1195 }
1196 1340
1197 let command = match str::from_utf8(&read_buffer[..data_len]) { 1341 let command = match str::from_utf8(receive_data) {
1198 Ok(command) => command, 1342 Ok(command) => command,
1199 Err(_) => { 1343 Err(_) => {
1200 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); 1344 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it");