aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2026-01-09 11:33:30 +0000
committerdiogo464 <[email protected]>2026-01-09 11:33:30 +0000
commit7c200523811cfd2b77eb327fa749228339c0523e (patch)
tree3a39119c9e9e2faaec52a89f0440361474426c92
parent22ff449cd1fdf5184faa6bc28a0b634155fbed22 (diff)
parent27779ecf0d7c00ae0acb650c0d75d28edcc3aa89 (diff)
Merge branch 'main' into embassy-git
-rw-r--r--Cargo.toml1
-rw-r--r--src/constants.rs4
-rw-r--r--src/lib.rs690
3 files changed, 422 insertions, 273 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 9e2b1a0..480d32b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ repository = "https://github.com/diogo464/embassy-ha"
9readme = "README.md" 9readme = "README.md"
10keywords = ["homeassistant", "mqtt", "iot", "embedded", "embassy"] 10keywords = ["homeassistant", "mqtt", "iot", "embedded", "embassy"]
11categories = ["embedded", "network-programming"] 11categories = ["embedded", "network-programming"]
12rust-version = "1.91.1"
12 13
13[features] 14[features]
14default = [] 15default = []
diff --git a/src/constants.rs b/src/constants.rs
index 73266f7..c9935b1 100644
--- a/src/constants.rs
+++ b/src/constants.rs
@@ -1,5 +1,9 @@
1#![allow(unused)] 1#![allow(unused)]
2 2
3pub const HA_STATUS_TOPIC: &str = "homeassistant/status";
4pub const HA_STATUS_PAYLOAD_ONLINE: &str = "online";
5pub const HA_STATUS_PAYLOAD_OFFLINE: &str = "offline";
6
3pub const HA_DOMAIN_SENSOR: &str = "sensor"; 7pub const HA_DOMAIN_SENSOR: &str = "sensor";
4pub const HA_DOMAIN_BINARY_SENSOR: &str = "binary_sensor"; 8pub const HA_DOMAIN_BINARY_SENSOR: &str = "binary_sensor";
5pub const HA_DOMAIN_SWITCH: &str = "switch"; 9pub const HA_DOMAIN_SWITCH: &str = "switch";
diff --git a/src/lib.rs b/src/lib.rs
index edc60f2..b09bda2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -153,6 +153,7 @@ pub use unit::*;
153const AVAILABLE_PAYLOAD: &str = "online"; 153const AVAILABLE_PAYLOAD: &str = "online";
154const NOT_AVAILABLE_PAYLOAD: &str = "offline"; 154const NOT_AVAILABLE_PAYLOAD: &str = "offline";
155const DEFAULT_KEEPALIVE_TIME: u16 = 30; 155const DEFAULT_KEEPALIVE_TIME: u16 = 30;
156const MQTT_TIMEOUT: Duration = Duration::from_secs(30);
156 157
157#[derive(Debug)] 158#[derive(Debug)]
158pub struct Error(&'static str); 159pub struct Error(&'static str);
@@ -323,19 +324,77 @@ pub struct DeviceConfig {
323 pub model: &'static str, 324 pub model: &'static str,
324} 325}
325 326
327pub struct DeviceBuffersOwned {
328 pub publish: Vec<u8, 2048>,
329 pub subscribe: Vec<u8, 128>,
330 pub discovery: Vec<u8, 2048>,
331 pub availability_topic: String<128>,
332 pub discovery_topic: String<128>,
333 pub state_topic: String<128>,
334 pub command_topic: String<128>,
335 pub attributes_topic: String<128>,
336}
337
338impl Default for DeviceBuffersOwned {
339 fn default() -> Self {
340 Self {
341 publish: Default::default(),
342 subscribe: Default::default(),
343 discovery: Default::default(),
344 availability_topic: Default::default(),
345 discovery_topic: Default::default(),
346 state_topic: Default::default(),
347 command_topic: Default::default(),
348 attributes_topic: Default::default(),
349 }
350 }
351}
352
353impl DeviceBuffersOwned {
354 pub fn as_buffers_mut(&mut self) -> DeviceBuffers<'_> {
355 DeviceBuffers {
356 publish: &mut self.publish,
357 subscribe: &mut self.subscribe,
358 discovery: &mut self.discovery,
359 availability_topic: &mut self.availability_topic,
360 discovery_topic: &mut self.discovery_topic,
361 state_topic: &mut self.state_topic,
362 command_topic: &mut self.command_topic,
363 attributes_topic: &mut self.attributes_topic,
364 }
365 }
366}
367
368pub struct DeviceBuffers<'a> {
369 pub publish: &'a mut VecView<u8>,
370 pub subscribe: &'a mut VecView<u8>,
371 pub discovery: &'a mut VecView<u8>,
372 pub availability_topic: &'a mut StringView,
373 pub discovery_topic: &'a mut StringView,
374 pub state_topic: &'a mut StringView,
375 pub command_topic: &'a mut StringView,
376 pub attributes_topic: &'a mut StringView,
377}
378
379impl<'a> DeviceBuffers<'a> {
380 pub fn clear(&mut self) {
381 self.publish.clear();
382 self.subscribe.clear();
383 self.discovery.clear();
384 self.availability_topic.clear();
385 self.discovery_topic.clear();
386 self.state_topic.clear();
387 self.command_topic.clear();
388 self.attributes_topic.clear();
389 }
390}
391
326pub struct DeviceResources { 392pub struct DeviceResources {
327 waker: AtomicWaker, 393 waker: AtomicWaker,
328 entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT], 394 entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT],
329 395
330 mqtt_resources: mqtt::ClientResources, 396 mqtt_resources: mqtt::ClientResources,
331 publish_buffer: Vec<u8, 2048>, 397 buffers: DeviceBuffersOwned,
332 subscribe_buffer: Vec<u8, 128>,
333 discovery_buffer: Vec<u8, 2048>,
334 availability_topic_buffer: String<128>,
335 discovery_topic_buffer: String<128>,
336 state_topic_buffer: String<128>,
337 command_topic_buffer: String<128>,
338 attributes_topic_buffer: String<128>,
339} 398}
340 399
341impl DeviceResources { 400impl DeviceResources {
@@ -349,14 +408,7 @@ impl Default for DeviceResources {
349 entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT], 408 entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT],
350 409
351 mqtt_resources: Default::default(), 410 mqtt_resources: Default::default(),
352 publish_buffer: Default::default(), 411 buffers: Default::default(),
353 subscribe_buffer: Default::default(),
354 discovery_buffer: Default::default(),
355 availability_topic_buffer: Default::default(),
356 discovery_topic_buffer: Default::default(),
357 state_topic_buffer: Default::default(),
358 command_topic_buffer: Default::default(),
359 attributes_topic_buffer: Default::default(),
360 } 412 }
361 } 413 }
362} 414}
@@ -564,14 +616,7 @@ pub struct Device<'a> {
564 entities: &'a [RefCell<Option<EntityData>>], 616 entities: &'a [RefCell<Option<EntityData>>],
565 617
566 mqtt_resources: &'a mut mqtt::ClientResources, 618 mqtt_resources: &'a mut mqtt::ClientResources,
567 publish_buffer: &'a mut VecView<u8>, 619 buffers: DeviceBuffers<'a>,
568 subscribe_buffer: &'a mut VecView<u8>,
569 discovery_buffer: &'a mut VecView<u8>,
570 availability_topic_buffer: &'a mut StringView,
571 discovery_topic_buffer: &'a mut StringView,
572 state_topic_buffer: &'a mut StringView,
573 command_topic_buffer: &'a mut StringView,
574 attributes_topic_buffer: &'a mut StringView,
575} 620}
576 621
577pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> { 622pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> {
@@ -581,14 +626,7 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi
581 entities: &resources.entities, 626 entities: &resources.entities,
582 627
583 mqtt_resources: &mut resources.mqtt_resources, 628 mqtt_resources: &mut resources.mqtt_resources,
584 publish_buffer: &mut resources.publish_buffer, 629 buffers: resources.buffers.as_buffers_mut(),
585 subscribe_buffer: &mut resources.subscribe_buffer,
586 discovery_buffer: &mut resources.discovery_buffer,
587 availability_topic_buffer: &mut resources.availability_topic_buffer,
588 discovery_topic_buffer: &mut resources.discovery_topic_buffer,
589 state_topic_buffer: &mut resources.state_topic_buffer,
590 command_topic_buffer: &mut resources.command_topic_buffer,
591 attributes_topic_buffer: &mut resources.attributes_topic_buffer,
592 } 630 }
593} 631}
594 632
@@ -741,6 +779,272 @@ pub fn create_device_tracker<'a>(
741 DeviceTracker::new(entity) 779 DeviceTracker::new(entity)
742} 780}
743 781
782async fn device_mqtt_subscribe<T: Transport>(
783 client: &mut mqtt::Client<'_, T>,
784 topic: impl core::fmt::Display,
785) -> Result<(), Error> {
786 use core::fmt::Write;
787
788 // Format topic to string for both subscribe call and logging
789 let mut topic_buffer = heapless::String::<128>::new();
790 write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
791 let topic_str = topic_buffer.as_str();
792
793 match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(topic_str)).await {
794 Ok(Ok(_)) => Ok(()),
795 Ok(Err(err)) => {
796 crate::log::error!(
797 "mqtt subscribe to '{}' failed with: {:?}",
798 topic_str,
799 crate::log::Debug2Format(&err)
800 );
801 Err(Error::new("mqtt subscribe failed"))
802 }
803 Err(_) => {
804 crate::log::error!("mqtt subscribe to '{}' timed out", topic_str);
805 Err(Error::new("mqtt subscribe timed out"))
806 }
807 }
808}
809
810async fn device_mqtt_publish<T: Transport>(
811 client: &mut mqtt::Client<'_, T>,
812 topic: impl core::fmt::Display,
813 data: &[u8],
814 retain: bool,
815) -> Result<(), Error> {
816 use core::fmt::Write;
817
818 // Format topic to string for both publish call and logging
819 let mut topic_buffer = heapless::String::<128>::new();
820 write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
821 let topic_str = topic_buffer.as_str();
822
823 let result = if retain {
824 embassy_time::with_timeout(
825 MQTT_TIMEOUT,
826 client.publish_with(
827 topic_str,
828 data,
829 mqtt::PublishParams {
830 retain: true,
831 ..Default::default()
832 },
833 ),
834 )
835 .await
836 } else {
837 embassy_time::with_timeout(MQTT_TIMEOUT, client.publish(topic_str, data)).await
838 };
839
840 match result {
841 Ok(Ok(_)) => Ok(()),
842 Ok(Err(err)) => {
843 crate::log::error!(
844 "mqtt publish to '{}' failed with: {:?}",
845 topic_str,
846 crate::log::Debug2Format(&err)
847 );
848 Err(Error::new("mqtt publish failed"))
849 }
850 Err(_) => {
851 crate::log::error!("mqtt publish to '{}' timed out", topic_str);
852 Err(Error::new("mqtt publish timed out"))
853 }
854 }
855}
856
857/// Receives MQTT publish data with timeout and proper error handling.
858///
859/// This helper function handles the common pattern of receiving MQTT data with:
860/// - Automatic timeout handling using MQTT_TIMEOUT
861/// - Consistent error logging
862/// - Size validation against buffer capacity
863///
864/// # Arguments
865/// * `client` - MQTT client for receiving data
866/// * `data_len` - Expected length of data to receive
867/// * `buffer` - Buffer to receive the data into
868///
869/// # Returns
870/// * `Ok(&[u8])` - Slice of the buffer containing the received data
871/// * `Err(Error)` - If operation fails, times out, or data exceeds buffer size
872///
873/// # Errors
874/// Returns error if:
875/// - `data_len` is greater than `buffer.len()` (buffer too small)
876/// - The receive operation times out after MQTT_TIMEOUT seconds
877/// - The underlying MQTT receive operation fails
878async fn mqtt_receive_data<'a, T: Transport>(
879 client: &mut mqtt::Client<'_, T>,
880 data_len: usize,
881 buffer: &'a mut [u8],
882) -> Result<&'a [u8], Error> {
883 // Validate buffer size - reject if too small (per user requirement)
884 if data_len > buffer.len() {
885 crate::log::warn!(
886 "mqtt publish payload is too large ({} bytes, buffer size {} bytes), rejecting",
887 data_len,
888 buffer.len()
889 );
890 return Err(Error::new("mqtt payload too large for buffer"));
891 }
892
893 crate::log::debug!("mqtt receiving {} bytes of data", data_len);
894
895 match embassy_time::with_timeout(MQTT_TIMEOUT, client.receive_data(&mut buffer[..data_len]))
896 .await
897 {
898 Ok(Ok(())) => Ok(&buffer[..data_len]),
899 Ok(Err(err)) => {
900 crate::log::error!(
901 "mqtt receive data failed with: {:?}",
902 crate::log::Debug2Format(&err)
903 );
904 Err(Error::new("mqtt receive data failed"))
905 }
906 Err(_) => {
907 crate::log::error!("mqtt receive data timed out");
908 Err(Error::new("mqtt receive data timed out"))
909 }
910 }
911}
912
913/// Publishes discovery messages for all entities in the device.
914///
915/// This function iterates over all entities, generates their discovery messages,
916/// publishes them to MQTT, and optionally subscribes to their command topics.
917///
918/// # Arguments
919/// * `client` - MQTT client for publishing and subscribing
920/// * `entities` - Slice of entities to publish discoveries for
921/// * `buffers` - Device buffers for generating discovery messages
922/// * `device_config` - Device configuration
923/// * `availability_topic` - The device availability topic string
924/// * `subscribe_to_commands` - Whether to subscribe to command topics (true for initial discovery, false for rediscovery)
925///
926/// # Returns
927/// `Ok(())` if all discoveries were published successfully, or an error if any operation fails
928async fn publish_entity_discoveries<T: Transport>(
929 client: &mut mqtt::Client<'_, T>,
930 entities: &[RefCell<Option<EntityData>>],
931 buffers: &mut DeviceBuffers<'_>,
932 device_config: &DeviceConfig,
933 availability_topic: &str,
934 subscribe_to_commands: bool,
935) -> Result<(), Error> {
936 crate::log::debug!("publishing entity discovery messages");
937
938 for entity in entities {
939 buffers.clear();
940
941 // Borrow the entity and fill out the buffers to be sent
942 // This should be done inside a block so that we do not hold the RefMut across an await
943 {
944 let mut entity = entity.borrow_mut();
945 let entity = match entity.as_mut() {
946 Some(entity) => entity,
947 None => break,
948 };
949
950 generate_entity_discovery(buffers, device_config, &entity.config, availability_topic);
951 }
952
953 let discovery_topic = buffers.discovery_topic.as_str();
954 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
955 device_mqtt_publish(client, discovery_topic, buffers.discovery, false).await?;
956
957 if subscribe_to_commands {
958 let command_topic = buffers.command_topic.as_str();
959 crate::log::debug!("subscribing to command topic '{}'", command_topic);
960 device_mqtt_subscribe(client, command_topic).await?;
961 }
962 }
963
964 Ok(())
965}
966
967fn generate_entity_discovery(
968 buffers: &mut DeviceBuffers<'_>,
969 device_config: &DeviceConfig,
970 entity_config: &EntityConfig,
971 availability_topic: &str,
972) {
973 use core::fmt::Write;
974
975 let discovery_topic_display = DiscoveryTopicDisplay {
976 domain: entity_config.domain,
977 device_id: device_config.device_id,
978 entity_id: entity_config.id,
979 };
980 let state_topic_display = StateTopicDisplay {
981 device_id: device_config.device_id,
982 entity_id: entity_config.id,
983 };
984 let command_topic_display = CommandTopicDisplay {
985 device_id: device_config.device_id,
986 entity_id: entity_config.id,
987 };
988 let attributes_topic_display = AttributesTopicDisplay {
989 device_id: device_config.device_id,
990 entity_id: entity_config.id,
991 };
992
993 write!(buffers.discovery_topic, "{discovery_topic_display}")
994 .expect("discovery topic buffer too small");
995 write!(buffers.state_topic, "{state_topic_display}").expect("state topic buffer too small");
996 write!(buffers.command_topic, "{command_topic_display}")
997 .expect("command topic buffer too small");
998 write!(buffers.attributes_topic, "{attributes_topic_display}")
999 .expect("attributes topic buffer too small");
1000
1001 let device_discovery = DeviceDiscovery {
1002 identifiers: &[device_config.device_id],
1003 name: device_config.device_name,
1004 manufacturer: device_config.manufacturer,
1005 model: device_config.model,
1006 };
1007
1008 let discovery = EntityDiscovery {
1009 id: entity_config.id,
1010 name: entity_config.name,
1011 device_class: entity_config.device_class,
1012 state_topic: Some(buffers.state_topic.as_str()),
1013 command_topic: Some(buffers.command_topic.as_str()),
1014 json_attributes_topic: Some(buffers.attributes_topic.as_str()),
1015 unit_of_measurement: entity_config.measurement_unit,
1016 schema: entity_config.schema,
1017 platform: entity_config.platform,
1018 state_class: entity_config.state_class,
1019 icon: entity_config.icon,
1020 entity_category: entity_config.category,
1021 entity_picture: entity_config.picture,
1022 min: entity_config.min,
1023 max: entity_config.max,
1024 step: entity_config.step,
1025 mode: entity_config.mode,
1026 suggested_display_precision: entity_config.suggested_display_precision,
1027 availability_topic: Some(availability_topic),
1028 payload_available: Some(AVAILABLE_PAYLOAD),
1029 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
1030 device: &device_discovery,
1031 };
1032
1033 crate::log::debug!(
1034 "discovery for entity '{}': {:?}",
1035 entity_config.id,
1036 discovery
1037 );
1038
1039 buffers
1040 .discovery
1041 .resize(buffers.discovery.capacity(), 0)
1042 .unwrap();
1043 let n = serde_json_core::to_slice(&discovery, buffers.discovery)
1044 .expect("discovery buffer too small");
1045 buffers.discovery.truncate(n);
1046}
1047
744/// Runs the main Home Assistant device event loop. 1048/// Runs the main Home Assistant device event loop.
745/// 1049///
746/// This function handles MQTT communication, entity discovery, and state updates. It will run 1050/// This function handles MQTT communication, entity discovery, and state updates. It will run
@@ -787,18 +1091,22 @@ pub fn create_device_tracker<'a>(
787pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { 1091pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> {
788 use core::fmt::Write; 1092 use core::fmt::Write;
789 1093
790 const MQTT_TIMEOUT: Duration = Duration::from_secs(30); 1094 device.buffers.availability_topic.clear();
791
792 device.availability_topic_buffer.clear();
793 write!( 1095 write!(
794 device.availability_topic_buffer, 1096 device.buffers.availability_topic,
795 "{}", 1097 "{}",
796 DeviceAvailabilityTopic { 1098 DeviceAvailabilityTopic {
797 device_id: device.config.device_id 1099 device_id: device.config.device_id
798 } 1100 }
799 ) 1101 )
800 .expect("device availability buffer too small"); 1102 .expect("device availability buffer too small");
801 let availability_topic = device.availability_topic_buffer.as_str(); 1103
1104 // Store availability_topic in a separate buffer to avoid borrow conflicts
1105 let mut availability_topic_copy = heapless::String::<128>::new();
1106 availability_topic_copy
1107 .push_str(device.buffers.availability_topic.as_str())
1108 .expect("availability topic too large");
1109 let availability_topic = availability_topic_copy.as_str();
802 1110
803 let mut ping_ticker = 1111 let mut ping_ticker =
804 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); 1112 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
@@ -830,169 +1138,25 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
830 } 1138 }
831 } 1139 }
832 1140
833 crate::log::debug!("sending discover messages"); 1141 device_mqtt_subscribe(&mut client, constants::HA_STATUS_TOPIC).await?;
834 let device_discovery = DeviceDiscovery {
835 identifiers: &[device.config.device_id],
836 name: device.config.device_name,
837 manufacturer: device.config.manufacturer,
838 model: device.config.model,
839 };
840
841 for entity in device.entities {
842 device.publish_buffer.clear();
843 device.subscribe_buffer.clear();
844 device.discovery_buffer.clear();
845 device.discovery_topic_buffer.clear();
846 device.state_topic_buffer.clear();
847 device.command_topic_buffer.clear();
848 device.attributes_topic_buffer.clear();
849
850 // borrow the entity and fill out the buffers to be sent
851 // this should be done inside a block so that we do not hold the RefMut across an
852 // await
853 {
854 let mut entity = entity.borrow_mut();
855 let entity = match entity.as_mut() {
856 Some(entity) => entity,
857 None => break,
858 };
859 let entity_config = &entity.config;
860
861 let discovery_topic_display = DiscoveryTopicDisplay {
862 domain: entity_config.domain,
863 device_id: device.config.device_id,
864 entity_id: entity_config.id,
865 };
866 let state_topic_display = StateTopicDisplay {
867 device_id: device.config.device_id,
868 entity_id: entity_config.id,
869 };
870 let command_topic_display = CommandTopicDisplay {
871 device_id: device.config.device_id,
872 entity_id: entity_config.id,
873 };
874 let attributes_topic_display = AttributesTopicDisplay {
875 device_id: device.config.device_id,
876 entity_id: entity_config.id,
877 };
878
879 write!(device.discovery_topic_buffer, "{discovery_topic_display}")
880 .expect("discovery topic buffer too small");
881 write!(device.state_topic_buffer, "{state_topic_display}")
882 .expect("state topic buffer too small");
883 write!(device.command_topic_buffer, "{command_topic_display}")
884 .expect("command topic buffer too small");
885 write!(device.attributes_topic_buffer, "{attributes_topic_display}")
886 .expect("attributes topic buffer too small");
887
888 let discovery = EntityDiscovery {
889 id: entity_config.id,
890 name: entity_config.name,
891 device_class: entity_config.device_class,
892 state_topic: Some(device.state_topic_buffer.as_str()),
893 command_topic: Some(device.command_topic_buffer.as_str()),
894 json_attributes_topic: Some(device.attributes_topic_buffer.as_str()),
895 unit_of_measurement: entity_config.measurement_unit,
896 schema: entity_config.schema,
897 platform: entity_config.platform,
898 state_class: entity_config.state_class,
899 icon: entity_config.icon,
900 entity_category: entity_config.category,
901 entity_picture: entity_config.picture,
902 min: entity_config.min,
903 max: entity_config.max,
904 step: entity_config.step,
905 mode: entity_config.mode,
906 suggested_display_precision: entity_config.suggested_display_precision,
907 availability_topic: Some(availability_topic),
908 payload_available: Some(AVAILABLE_PAYLOAD),
909 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
910 device: &device_discovery,
911 };
912 crate::log::debug!(
913 "discovery for entity '{}': {:?}",
914 entity_config.id,
915 discovery
916 );
917
918 device
919 .discovery_buffer
920 .resize(device.discovery_buffer.capacity(), 0)
921 .unwrap();
922 let n = serde_json_core::to_slice(&discovery, device.discovery_buffer)
923 .expect("discovery buffer too small");
924 device.discovery_buffer.truncate(n);
925 }
926
927 let discovery_topic = device.discovery_topic_buffer.as_str();
928 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
929 match embassy_time::with_timeout(
930 MQTT_TIMEOUT,
931 client.publish(discovery_topic, device.discovery_buffer),
932 )
933 .await
934 {
935 Ok(Ok(_)) => {}
936 Ok(Err(err)) => {
937 crate::log::error!(
938 "mqtt discovery publish failed with: {:?}",
939 crate::log::Debug2Format(&err)
940 );
941 return Err(Error::new("mqtt discovery publish failed"));
942 }
943 Err(_) => {
944 crate::log::error!("mqtt discovery publish timed out");
945 return Err(Error::new("mqtt discovery publish timed out"));
946 }
947 }
948 1142
949 let command_topic = device.command_topic_buffer.as_str(); 1143 publish_entity_discoveries(
950 crate::log::debug!("subscribing to command topic '{}'", command_topic); 1144 &mut client,
951 match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await { 1145 device.entities,
952 Ok(Ok(_)) => {} 1146 &mut device.buffers,
953 Ok(Err(err)) => { 1147 &device.config,
954 crate::log::error!( 1148 availability_topic,
955 "mqtt subscribe to '{}' failed with: {:?}", 1149 true,
956 command_topic, 1150 )
957 crate::log::Debug2Format(&err) 1151 .await?;
958 );
959 return Err(Error::new(
960 "mqtt subscription to entity command topic failed",
961 ));
962 }
963 Err(_) => {
964 crate::log::error!("mqtt subscribe to '{}' timed out", command_topic);
965 return Err(Error::new("mqtt subscribe timed out"));
966 }
967 }
968 }
969 1152
970 match embassy_time::with_timeout( 1153 device_mqtt_publish(
971 MQTT_TIMEOUT, 1154 &mut client,
972 client.publish_with( 1155 availability_topic,
973 availability_topic, 1156 AVAILABLE_PAYLOAD.as_bytes(),
974 AVAILABLE_PAYLOAD.as_bytes(), 1157 true,
975 mqtt::PublishParams {
976 retain: true,
977 ..Default::default()
978 },
979 ),
980 ) 1158 )
981 .await 1159 .await?;
982 {
983 Ok(Ok(_)) => {}
984 Ok(Err(err)) => {
985 crate::log::error!(
986 "mqtt availability publish failed with: {:?}",
987 crate::log::Debug2Format(&err)
988 );
989 return Err(Error::new("mqtt availability publish failed"));
990 }
991 Err(_) => {
992 crate::log::error!("mqtt availability publish timed out");
993 return Err(Error::new("mqtt availability publish timed out"));
994 }
995 }
996 1160
997 let mut first_iteration_push = true; 1161 let mut first_iteration_push = true;
998 'outer_loop: loop { 1162 'outer_loop: loop {
@@ -1011,7 +1175,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1011 } 1175 }
1012 1176
1013 entity.publish = false; 1177 entity.publish = false;
1014 device.publish_buffer.clear(); 1178 device.buffers.publish.clear();
1015 1179
1016 let mut publish_to_attributes = false; 1180 let mut publish_to_attributes = false;
1017 match &entity.storage { 1181 match &entity.storage {
@@ -1019,37 +1183,39 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1019 state: Some(SwitchState { value, .. }), 1183 state: Some(SwitchState { value, .. }),
1020 .. 1184 ..
1021 }) => device 1185 }) => device
1022 .publish_buffer 1186 .buffers
1187 .publish
1023 .extend_from_slice(value.as_str().as_bytes()) 1188 .extend_from_slice(value.as_str().as_bytes())
1024 .expect("publish buffer too small for switch state payload"), 1189 .expect("publish buffer too small for switch state payload"),
1025 EntityStorage::BinarySensor(BinarySensorStorage { 1190 EntityStorage::BinarySensor(BinarySensorStorage {
1026 state: Some(BinarySensorState { value, .. }), 1191 state: Some(BinarySensorState { value, .. }),
1027 }) => device 1192 }) => device
1028 .publish_buffer 1193 .buffers
1194 .publish
1029 .extend_from_slice(value.as_str().as_bytes()) 1195 .extend_from_slice(value.as_str().as_bytes())
1030 .expect("publish buffer too small for binary sensor state payload"), 1196 .expect("publish buffer too small for binary sensor state payload"),
1031 EntityStorage::NumericSensor(NumericSensorStorage { 1197 EntityStorage::NumericSensor(NumericSensorStorage {
1032 state: Some(NumericSensorState { value, .. }), 1198 state: Some(NumericSensorState { value, .. }),
1033 .. 1199 ..
1034 }) => write!(device.publish_buffer, "{}", value) 1200 }) => write!(device.buffers.publish, "{}", value)
1035 .expect("publish buffer too small for numeric sensor payload"), 1201 .expect("publish buffer too small for numeric sensor payload"),
1036 EntityStorage::Number(NumberStorage { 1202 EntityStorage::Number(NumberStorage {
1037 state: Some(NumberState { value, .. }), 1203 state: Some(NumberState { value, .. }),
1038 .. 1204 ..
1039 }) => write!(device.publish_buffer, "{}", value) 1205 }) => write!(device.buffers.publish, "{}", value)
1040 .expect("publish buffer too small for number state payload"), 1206 .expect("publish buffer too small for number state payload"),
1041 EntityStorage::DeviceTracker(DeviceTrackerStorage { 1207 EntityStorage::DeviceTracker(DeviceTrackerStorage {
1042 state: Some(tracker_state), 1208 state: Some(tracker_state),
1043 }) => { 1209 }) => {
1044 publish_to_attributes = true; 1210 publish_to_attributes = true;
1045 device 1211 device
1046 .publish_buffer 1212 .buffers
1047 .resize(device.publish_buffer.capacity(), 0) 1213 .publish
1214 .resize(device.buffers.publish.capacity(), 0)
1048 .expect("resize to capacity should never fail"); 1215 .expect("resize to capacity should never fail");
1049 let n = 1216 let n = serde_json_core::to_slice(&tracker_state, device.buffers.publish)
1050 serde_json_core::to_slice(&tracker_state, device.publish_buffer) 1217 .expect("publish buffer too small for tracker state payload");
1051 .expect("publish buffer too small for tracker state payload"); 1218 device.buffers.publish.truncate(n);
1052 device.publish_buffer.truncate(n);
1053 } 1219 }
1054 _ => { 1220 _ => {
1055 if !first_iteration_push { 1221 if !first_iteration_push {
@@ -1067,42 +1233,26 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1067 device_id: device.config.device_id, 1233 device_id: device.config.device_id,
1068 entity_id: entity.config.id, 1234 entity_id: entity.config.id,
1069 }; 1235 };
1070 device.attributes_topic_buffer.clear(); 1236 device.buffers.attributes_topic.clear();
1071 write!(device.attributes_topic_buffer, "{attributes_topic_display}") 1237 write!(
1072 .expect("attributes topic buffer too small"); 1238 device.buffers.attributes_topic,
1073 device.attributes_topic_buffer.as_str() 1239 "{attributes_topic_display}"
1240 )
1241 .expect("attributes topic buffer too small");
1242 device.buffers.attributes_topic.as_str()
1074 } else { 1243 } else {
1075 let state_topic_display = StateTopicDisplay { 1244 let state_topic_display = StateTopicDisplay {
1076 device_id: device.config.device_id, 1245 device_id: device.config.device_id,
1077 entity_id: entity.config.id, 1246 entity_id: entity.config.id,
1078 }; 1247 };
1079 device.state_topic_buffer.clear(); 1248 device.buffers.state_topic.clear();
1080 write!(device.state_topic_buffer, "{state_topic_display}") 1249 write!(device.buffers.state_topic, "{state_topic_display}")
1081 .expect("state topic buffer too small"); 1250 .expect("state topic buffer too small");
1082 device.state_topic_buffer.as_str() 1251 device.buffers.state_topic.as_str()
1083 } 1252 }
1084 }; 1253 };
1085 1254
1086 match embassy_time::with_timeout( 1255 device_mqtt_publish(&mut client, publish_topic, device.buffers.publish, false).await?;
1087 MQTT_TIMEOUT,
1088 client.publish(publish_topic, device.publish_buffer),
1089 )
1090 .await
1091 {
1092 Ok(Ok(_)) => {}
1093 Ok(Err(err)) => {
1094 crate::log::error!(
1095 "mqtt state publish on topic '{}' failed with: {:?}",
1096 publish_topic,
1097 crate::log::Debug2Format(&err)
1098 );
1099 return Err(Error::new("mqtt publish failed"));
1100 }
1101 Err(_) => {
1102 crate::log::error!("mqtt state publish on topic '{}' timed out", publish_topic);
1103 return Err(Error::new("mqtt publish timed out"));
1104 }
1105 }
1106 } 1256 }
1107 first_iteration_push = false; 1257 first_iteration_push = false;
1108 1258
@@ -1134,6 +1284,29 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1134 } 1284 }
1135 }; 1285 };
1136 1286
1287 if publish.topic == constants::HA_STATUS_TOPIC {
1288 let mut receive_buffer = [0u8; 64];
1289 let receive_data_len = publish.data_len;
1290 let receive_data =
1291 mqtt_receive_data(&mut client, receive_data_len, &mut receive_buffer).await?;
1292
1293 if receive_data == constants::HA_STATUS_PAYLOAD_ONLINE.as_bytes() {
1294 first_iteration_push = true;
1295
1296 crate::log::debug!("home assistant came online, republishing discoveries");
1297 publish_entity_discoveries(
1298 &mut client,
1299 device.entities,
1300 &mut device.buffers,
1301 &device.config,
1302 availability_topic,
1303 false,
1304 )
1305 .await?;
1306 }
1307 continue;
1308 }
1309
1137 let entity = 'entity_search_block: { 1310 let entity = 'entity_search_block: {
1138 for entity in device.entities { 1311 for entity in device.entities {
1139 let mut data = entity.borrow_mut(); 1312 let mut data = entity.borrow_mut();
@@ -1146,11 +1319,11 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1146 device_id: device.config.device_id, 1319 device_id: device.config.device_id,
1147 entity_id: data.config.id, 1320 entity_id: data.config.id,
1148 }; 1321 };
1149 device.command_topic_buffer.clear(); 1322 device.buffers.command_topic.clear();
1150 write!(device.command_topic_buffer, "{command_topic_display}") 1323 write!(device.buffers.command_topic, "{command_topic_display}")
1151 .expect("command topic buffer too small"); 1324 .expect("command topic buffer too small");
1152 1325
1153 if device.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { 1326 if device.buffers.command_topic.as_bytes() == publish.topic.as_bytes() {
1154 break 'entity_search_block entity; 1327 break 'entity_search_block entity;
1155 } 1328 }
1156 } 1329 }
@@ -1158,43 +1331,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
1158 }; 1331 };
1159 1332
1160 let mut read_buffer = [0u8; 128]; 1333 let mut read_buffer = [0u8; 128];
1161 if publish.data_len > read_buffer.len() {
1162 crate::log::warn!(
1163 "mqtt publish payload on topic {} is too large ({} bytes), ignoring it",
1164 publish.topic,
1165 publish.data_len
1166 );
1167 continue;
1168 }
1169
1170 crate::log::debug!(
1171 "mqtt receiving {} bytes of data on topic {}",
1172 publish.data_len,
1173 publish.topic
1174 );
1175
1176 let data_len = publish.data_len; 1334 let data_len = publish.data_len;
1177 match embassy_time::with_timeout( 1335 let receive_data =
1178 MQTT_TIMEOUT, 1336 match mqtt_receive_data(&mut client, data_len, &mut read_buffer).await {
1179 client.receive_data(&mut read_buffer[..data_len]), 1337 Ok(data) => data,
1180 ) 1338 Err(_) => continue 'outer_loop,
1181 .await 1339 };
1182 {
1183 Ok(Ok(())) => {}
1184 Ok(Err(err)) => {
1185 crate::log::error!(
1186 "mqtt receive data failed with: {:?}",
1187 crate::log::Debug2Format(&err)
1188 );
1189 return Err(Error::new("mqtt receive data failed"));
1190 }
1191 Err(_) => {
1192 crate::log::error!("mqtt receive data timed out");
1193 return Err(Error::new("mqtt receive data timed out"));
1194 }
1195 }
1196 1340
1197 let command = match str::from_utf8(&read_buffer[..data_len]) { 1341 let command = match str::from_utf8(receive_data) {
1198 Ok(command) => command, 1342 Ok(command) => command,
1199 Err(_) => { 1343 Err(_) => {
1200 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); 1344 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it");