diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/constants.rs | 4 | ||||
| -rw-r--r-- | src/lib.rs | 690 |
2 files changed, 421 insertions, 273 deletions
diff --git a/src/constants.rs b/src/constants.rs index 73266f7..c9935b1 100644 --- a/src/constants.rs +++ b/src/constants.rs | |||
| @@ -1,5 +1,9 @@ | |||
| 1 | #![allow(unused)] | 1 | #![allow(unused)] |
| 2 | 2 | ||
| 3 | pub const HA_STATUS_TOPIC: &str = "homeassistant/status"; | ||
| 4 | pub const HA_STATUS_PAYLOAD_ONLINE: &str = "online"; | ||
| 5 | pub const HA_STATUS_PAYLOAD_OFFLINE: &str = "offline"; | ||
| 6 | |||
| 3 | pub const HA_DOMAIN_SENSOR: &str = "sensor"; | 7 | pub const HA_DOMAIN_SENSOR: &str = "sensor"; |
| 4 | pub const HA_DOMAIN_BINARY_SENSOR: &str = "binary_sensor"; | 8 | pub const HA_DOMAIN_BINARY_SENSOR: &str = "binary_sensor"; |
| 5 | pub const HA_DOMAIN_SWITCH: &str = "switch"; | 9 | pub const HA_DOMAIN_SWITCH: &str = "switch"; |
| @@ -153,6 +153,7 @@ pub use unit::*; | |||
| 153 | const AVAILABLE_PAYLOAD: &str = "online"; | 153 | const AVAILABLE_PAYLOAD: &str = "online"; |
| 154 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; | 154 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; |
| 155 | const DEFAULT_KEEPALIVE_TIME: u16 = 30; | 155 | const DEFAULT_KEEPALIVE_TIME: u16 = 30; |
| 156 | const MQTT_TIMEOUT: Duration = Duration::from_secs(30); | ||
| 156 | 157 | ||
| 157 | #[derive(Debug)] | 158 | #[derive(Debug)] |
| 158 | pub struct Error(&'static str); | 159 | pub struct Error(&'static str); |
| @@ -323,19 +324,77 @@ pub struct DeviceConfig { | |||
| 323 | pub model: &'static str, | 324 | pub model: &'static str, |
| 324 | } | 325 | } |
| 325 | 326 | ||
| 327 | pub struct DeviceBuffersOwned { | ||
| 328 | pub publish: Vec<u8, 2048>, | ||
| 329 | pub subscribe: Vec<u8, 128>, | ||
| 330 | pub discovery: Vec<u8, 2048>, | ||
| 331 | pub availability_topic: String<128>, | ||
| 332 | pub discovery_topic: String<128>, | ||
| 333 | pub state_topic: String<128>, | ||
| 334 | pub command_topic: String<128>, | ||
| 335 | pub attributes_topic: String<128>, | ||
| 336 | } | ||
| 337 | |||
| 338 | impl Default for DeviceBuffersOwned { | ||
| 339 | fn default() -> Self { | ||
| 340 | Self { | ||
| 341 | publish: Default::default(), | ||
| 342 | subscribe: Default::default(), | ||
| 343 | discovery: Default::default(), | ||
| 344 | availability_topic: Default::default(), | ||
| 345 | discovery_topic: Default::default(), | ||
| 346 | state_topic: Default::default(), | ||
| 347 | command_topic: Default::default(), | ||
| 348 | attributes_topic: Default::default(), | ||
| 349 | } | ||
| 350 | } | ||
| 351 | } | ||
| 352 | |||
| 353 | impl DeviceBuffersOwned { | ||
| 354 | pub fn as_buffers_mut(&mut self) -> DeviceBuffers<'_> { | ||
| 355 | DeviceBuffers { | ||
| 356 | publish: &mut self.publish, | ||
| 357 | subscribe: &mut self.subscribe, | ||
| 358 | discovery: &mut self.discovery, | ||
| 359 | availability_topic: &mut self.availability_topic, | ||
| 360 | discovery_topic: &mut self.discovery_topic, | ||
| 361 | state_topic: &mut self.state_topic, | ||
| 362 | command_topic: &mut self.command_topic, | ||
| 363 | attributes_topic: &mut self.attributes_topic, | ||
| 364 | } | ||
| 365 | } | ||
| 366 | } | ||
| 367 | |||
| 368 | pub struct DeviceBuffers<'a> { | ||
| 369 | pub publish: &'a mut VecView<u8>, | ||
| 370 | pub subscribe: &'a mut VecView<u8>, | ||
| 371 | pub discovery: &'a mut VecView<u8>, | ||
| 372 | pub availability_topic: &'a mut StringView, | ||
| 373 | pub discovery_topic: &'a mut StringView, | ||
| 374 | pub state_topic: &'a mut StringView, | ||
| 375 | pub command_topic: &'a mut StringView, | ||
| 376 | pub attributes_topic: &'a mut StringView, | ||
| 377 | } | ||
| 378 | |||
| 379 | impl<'a> DeviceBuffers<'a> { | ||
| 380 | pub fn clear(&mut self) { | ||
| 381 | self.publish.clear(); | ||
| 382 | self.subscribe.clear(); | ||
| 383 | self.discovery.clear(); | ||
| 384 | self.availability_topic.clear(); | ||
| 385 | self.discovery_topic.clear(); | ||
| 386 | self.state_topic.clear(); | ||
| 387 | self.command_topic.clear(); | ||
| 388 | self.attributes_topic.clear(); | ||
| 389 | } | ||
| 390 | } | ||
| 391 | |||
| 326 | pub struct DeviceResources { | 392 | pub struct DeviceResources { |
| 327 | waker: AtomicWaker, | 393 | waker: AtomicWaker, |
| 328 | entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT], | 394 | entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT], |
| 329 | 395 | ||
| 330 | mqtt_resources: mqtt::ClientResources, | 396 | mqtt_resources: mqtt::ClientResources, |
| 331 | publish_buffer: Vec<u8, 2048>, | 397 | buffers: DeviceBuffersOwned, |
| 332 | subscribe_buffer: Vec<u8, 128>, | ||
| 333 | discovery_buffer: Vec<u8, 2048>, | ||
| 334 | availability_topic_buffer: String<128>, | ||
| 335 | discovery_topic_buffer: String<128>, | ||
| 336 | state_topic_buffer: String<128>, | ||
| 337 | command_topic_buffer: String<128>, | ||
| 338 | attributes_topic_buffer: String<128>, | ||
| 339 | } | 398 | } |
| 340 | 399 | ||
| 341 | impl DeviceResources { | 400 | impl DeviceResources { |
| @@ -349,14 +408,7 @@ impl Default for DeviceResources { | |||
| 349 | entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT], | 408 | entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT], |
| 350 | 409 | ||
| 351 | mqtt_resources: Default::default(), | 410 | mqtt_resources: Default::default(), |
| 352 | publish_buffer: Default::default(), | 411 | buffers: Default::default(), |
| 353 | subscribe_buffer: Default::default(), | ||
| 354 | discovery_buffer: Default::default(), | ||
| 355 | availability_topic_buffer: Default::default(), | ||
| 356 | discovery_topic_buffer: Default::default(), | ||
| 357 | state_topic_buffer: Default::default(), | ||
| 358 | command_topic_buffer: Default::default(), | ||
| 359 | attributes_topic_buffer: Default::default(), | ||
| 360 | } | 412 | } |
| 361 | } | 413 | } |
| 362 | } | 414 | } |
| @@ -564,14 +616,7 @@ pub struct Device<'a> { | |||
| 564 | entities: &'a [RefCell<Option<EntityData>>], | 616 | entities: &'a [RefCell<Option<EntityData>>], |
| 565 | 617 | ||
| 566 | mqtt_resources: &'a mut mqtt::ClientResources, | 618 | mqtt_resources: &'a mut mqtt::ClientResources, |
| 567 | publish_buffer: &'a mut VecView<u8>, | 619 | buffers: DeviceBuffers<'a>, |
| 568 | subscribe_buffer: &'a mut VecView<u8>, | ||
| 569 | discovery_buffer: &'a mut VecView<u8>, | ||
| 570 | availability_topic_buffer: &'a mut StringView, | ||
| 571 | discovery_topic_buffer: &'a mut StringView, | ||
| 572 | state_topic_buffer: &'a mut StringView, | ||
| 573 | command_topic_buffer: &'a mut StringView, | ||
| 574 | attributes_topic_buffer: &'a mut StringView, | ||
| 575 | } | 620 | } |
| 576 | 621 | ||
| 577 | pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> { | 622 | pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> { |
| @@ -581,14 +626,7 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi | |||
| 581 | entities: &resources.entities, | 626 | entities: &resources.entities, |
| 582 | 627 | ||
| 583 | mqtt_resources: &mut resources.mqtt_resources, | 628 | mqtt_resources: &mut resources.mqtt_resources, |
| 584 | publish_buffer: &mut resources.publish_buffer, | 629 | buffers: resources.buffers.as_buffers_mut(), |
| 585 | subscribe_buffer: &mut resources.subscribe_buffer, | ||
| 586 | discovery_buffer: &mut resources.discovery_buffer, | ||
| 587 | availability_topic_buffer: &mut resources.availability_topic_buffer, | ||
| 588 | discovery_topic_buffer: &mut resources.discovery_topic_buffer, | ||
| 589 | state_topic_buffer: &mut resources.state_topic_buffer, | ||
| 590 | command_topic_buffer: &mut resources.command_topic_buffer, | ||
| 591 | attributes_topic_buffer: &mut resources.attributes_topic_buffer, | ||
| 592 | } | 630 | } |
| 593 | } | 631 | } |
| 594 | 632 | ||
| @@ -741,6 +779,272 @@ pub fn create_device_tracker<'a>( | |||
| 741 | DeviceTracker::new(entity) | 779 | DeviceTracker::new(entity) |
| 742 | } | 780 | } |
| 743 | 781 | ||
| 782 | async fn device_mqtt_subscribe<T: Transport>( | ||
| 783 | client: &mut mqtt::Client<'_, T>, | ||
| 784 | topic: impl core::fmt::Display, | ||
| 785 | ) -> Result<(), Error> { | ||
| 786 | use core::fmt::Write; | ||
| 787 | |||
| 788 | // Format topic to string for both subscribe call and logging | ||
| 789 | let mut topic_buffer = heapless::String::<128>::new(); | ||
| 790 | write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small"); | ||
| 791 | let topic_str = topic_buffer.as_str(); | ||
| 792 | |||
| 793 | match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(topic_str)).await { | ||
| 794 | Ok(Ok(_)) => Ok(()), | ||
| 795 | Ok(Err(err)) => { | ||
| 796 | crate::log::error!( | ||
| 797 | "mqtt subscribe to '{}' failed with: {:?}", | ||
| 798 | topic_str, | ||
| 799 | crate::log::Debug2Format(&err) | ||
| 800 | ); | ||
| 801 | Err(Error::new("mqtt subscribe failed")) | ||
| 802 | } | ||
| 803 | Err(_) => { | ||
| 804 | crate::log::error!("mqtt subscribe to '{}' timed out", topic_str); | ||
| 805 | Err(Error::new("mqtt subscribe timed out")) | ||
| 806 | } | ||
| 807 | } | ||
| 808 | } | ||
| 809 | |||
| 810 | async fn device_mqtt_publish<T: Transport>( | ||
| 811 | client: &mut mqtt::Client<'_, T>, | ||
| 812 | topic: impl core::fmt::Display, | ||
| 813 | data: &[u8], | ||
| 814 | retain: bool, | ||
| 815 | ) -> Result<(), Error> { | ||
| 816 | use core::fmt::Write; | ||
| 817 | |||
| 818 | // Format topic to string for both publish call and logging | ||
| 819 | let mut topic_buffer = heapless::String::<128>::new(); | ||
| 820 | write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small"); | ||
| 821 | let topic_str = topic_buffer.as_str(); | ||
| 822 | |||
| 823 | let result = if retain { | ||
| 824 | embassy_time::with_timeout( | ||
| 825 | MQTT_TIMEOUT, | ||
| 826 | client.publish_with( | ||
| 827 | topic_str, | ||
| 828 | data, | ||
| 829 | mqtt::PublishParams { | ||
| 830 | retain: true, | ||
| 831 | ..Default::default() | ||
| 832 | }, | ||
| 833 | ), | ||
| 834 | ) | ||
| 835 | .await | ||
| 836 | } else { | ||
| 837 | embassy_time::with_timeout(MQTT_TIMEOUT, client.publish(topic_str, data)).await | ||
| 838 | }; | ||
| 839 | |||
| 840 | match result { | ||
| 841 | Ok(Ok(_)) => Ok(()), | ||
| 842 | Ok(Err(err)) => { | ||
| 843 | crate::log::error!( | ||
| 844 | "mqtt publish to '{}' failed with: {:?}", | ||
| 845 | topic_str, | ||
| 846 | crate::log::Debug2Format(&err) | ||
| 847 | ); | ||
| 848 | Err(Error::new("mqtt publish failed")) | ||
| 849 | } | ||
| 850 | Err(_) => { | ||
| 851 | crate::log::error!("mqtt publish to '{}' timed out", topic_str); | ||
| 852 | Err(Error::new("mqtt publish timed out")) | ||
| 853 | } | ||
| 854 | } | ||
| 855 | } | ||
| 856 | |||
| 857 | /// Receives MQTT publish data with timeout and proper error handling. | ||
| 858 | /// | ||
| 859 | /// This helper function handles the common pattern of receiving MQTT data with: | ||
| 860 | /// - Automatic timeout handling using MQTT_TIMEOUT | ||
| 861 | /// - Consistent error logging | ||
| 862 | /// - Size validation against buffer capacity | ||
| 863 | /// | ||
| 864 | /// # Arguments | ||
| 865 | /// * `client` - MQTT client for receiving data | ||
| 866 | /// * `data_len` - Expected length of data to receive | ||
| 867 | /// * `buffer` - Buffer to receive the data into | ||
| 868 | /// | ||
| 869 | /// # Returns | ||
| 870 | /// * `Ok(&[u8])` - Slice of the buffer containing the received data | ||
| 871 | /// * `Err(Error)` - If operation fails, times out, or data exceeds buffer size | ||
| 872 | /// | ||
| 873 | /// # Errors | ||
| 874 | /// Returns error if: | ||
| 875 | /// - `data_len` is greater than `buffer.len()` (buffer too small) | ||
| 876 | /// - The receive operation times out after MQTT_TIMEOUT seconds | ||
| 877 | /// - The underlying MQTT receive operation fails | ||
| 878 | async fn mqtt_receive_data<'a, T: Transport>( | ||
| 879 | client: &mut mqtt::Client<'_, T>, | ||
| 880 | data_len: usize, | ||
| 881 | buffer: &'a mut [u8], | ||
| 882 | ) -> Result<&'a [u8], Error> { | ||
| 883 | // Validate buffer size - reject if too small (per user requirement) | ||
| 884 | if data_len > buffer.len() { | ||
| 885 | crate::log::warn!( | ||
| 886 | "mqtt publish payload is too large ({} bytes, buffer size {} bytes), rejecting", | ||
| 887 | data_len, | ||
| 888 | buffer.len() | ||
| 889 | ); | ||
| 890 | return Err(Error::new("mqtt payload too large for buffer")); | ||
| 891 | } | ||
| 892 | |||
| 893 | crate::log::debug!("mqtt receiving {} bytes of data", data_len); | ||
| 894 | |||
| 895 | match embassy_time::with_timeout(MQTT_TIMEOUT, client.receive_data(&mut buffer[..data_len])) | ||
| 896 | .await | ||
| 897 | { | ||
| 898 | Ok(Ok(())) => Ok(&buffer[..data_len]), | ||
| 899 | Ok(Err(err)) => { | ||
| 900 | crate::log::error!( | ||
| 901 | "mqtt receive data failed with: {:?}", | ||
| 902 | crate::log::Debug2Format(&err) | ||
| 903 | ); | ||
| 904 | Err(Error::new("mqtt receive data failed")) | ||
| 905 | } | ||
| 906 | Err(_) => { | ||
| 907 | crate::log::error!("mqtt receive data timed out"); | ||
| 908 | Err(Error::new("mqtt receive data timed out")) | ||
| 909 | } | ||
| 910 | } | ||
| 911 | } | ||
| 912 | |||
| 913 | /// Publishes discovery messages for all entities in the device. | ||
| 914 | /// | ||
| 915 | /// This function iterates over all entities, generates their discovery messages, | ||
| 916 | /// publishes them to MQTT, and optionally subscribes to their command topics. | ||
| 917 | /// | ||
| 918 | /// # Arguments | ||
| 919 | /// * `client` - MQTT client for publishing and subscribing | ||
| 920 | /// * `entities` - Slice of entities to publish discoveries for | ||
| 921 | /// * `buffers` - Device buffers for generating discovery messages | ||
| 922 | /// * `device_config` - Device configuration | ||
| 923 | /// * `availability_topic` - The device availability topic string | ||
| 924 | /// * `subscribe_to_commands` - Whether to subscribe to command topics (true for initial discovery, false for rediscovery) | ||
| 925 | /// | ||
| 926 | /// # Returns | ||
| 927 | /// `Ok(())` if all discoveries were published successfully, or an error if any operation fails | ||
| 928 | async fn publish_entity_discoveries<T: Transport>( | ||
| 929 | client: &mut mqtt::Client<'_, T>, | ||
| 930 | entities: &[RefCell<Option<EntityData>>], | ||
| 931 | buffers: &mut DeviceBuffers<'_>, | ||
| 932 | device_config: &DeviceConfig, | ||
| 933 | availability_topic: &str, | ||
| 934 | subscribe_to_commands: bool, | ||
| 935 | ) -> Result<(), Error> { | ||
| 936 | crate::log::debug!("publishing entity discovery messages"); | ||
| 937 | |||
| 938 | for entity in entities { | ||
| 939 | buffers.clear(); | ||
| 940 | |||
| 941 | // Borrow the entity and fill out the buffers to be sent | ||
| 942 | // This should be done inside a block so that we do not hold the RefMut across an await | ||
| 943 | { | ||
| 944 | let mut entity = entity.borrow_mut(); | ||
| 945 | let entity = match entity.as_mut() { | ||
| 946 | Some(entity) => entity, | ||
| 947 | None => break, | ||
| 948 | }; | ||
| 949 | |||
| 950 | generate_entity_discovery(buffers, device_config, &entity.config, availability_topic); | ||
| 951 | } | ||
| 952 | |||
| 953 | let discovery_topic = buffers.discovery_topic.as_str(); | ||
| 954 | crate::log::debug!("sending discovery to topic '{}'", discovery_topic); | ||
| 955 | device_mqtt_publish(client, discovery_topic, buffers.discovery, false).await?; | ||
| 956 | |||
| 957 | if subscribe_to_commands { | ||
| 958 | let command_topic = buffers.command_topic.as_str(); | ||
| 959 | crate::log::debug!("subscribing to command topic '{}'", command_topic); | ||
| 960 | device_mqtt_subscribe(client, command_topic).await?; | ||
| 961 | } | ||
| 962 | } | ||
| 963 | |||
| 964 | Ok(()) | ||
| 965 | } | ||
| 966 | |||
| 967 | fn generate_entity_discovery( | ||
| 968 | buffers: &mut DeviceBuffers<'_>, | ||
| 969 | device_config: &DeviceConfig, | ||
| 970 | entity_config: &EntityConfig, | ||
| 971 | availability_topic: &str, | ||
| 972 | ) { | ||
| 973 | use core::fmt::Write; | ||
| 974 | |||
| 975 | let discovery_topic_display = DiscoveryTopicDisplay { | ||
| 976 | domain: entity_config.domain, | ||
| 977 | device_id: device_config.device_id, | ||
| 978 | entity_id: entity_config.id, | ||
| 979 | }; | ||
| 980 | let state_topic_display = StateTopicDisplay { | ||
| 981 | device_id: device_config.device_id, | ||
| 982 | entity_id: entity_config.id, | ||
| 983 | }; | ||
| 984 | let command_topic_display = CommandTopicDisplay { | ||
| 985 | device_id: device_config.device_id, | ||
| 986 | entity_id: entity_config.id, | ||
| 987 | }; | ||
| 988 | let attributes_topic_display = AttributesTopicDisplay { | ||
| 989 | device_id: device_config.device_id, | ||
| 990 | entity_id: entity_config.id, | ||
| 991 | }; | ||
| 992 | |||
| 993 | write!(buffers.discovery_topic, "{discovery_topic_display}") | ||
| 994 | .expect("discovery topic buffer too small"); | ||
| 995 | write!(buffers.state_topic, "{state_topic_display}").expect("state topic buffer too small"); | ||
| 996 | write!(buffers.command_topic, "{command_topic_display}") | ||
| 997 | .expect("command topic buffer too small"); | ||
| 998 | write!(buffers.attributes_topic, "{attributes_topic_display}") | ||
| 999 | .expect("attributes topic buffer too small"); | ||
| 1000 | |||
| 1001 | let device_discovery = DeviceDiscovery { | ||
| 1002 | identifiers: &[device_config.device_id], | ||
| 1003 | name: device_config.device_name, | ||
| 1004 | manufacturer: device_config.manufacturer, | ||
| 1005 | model: device_config.model, | ||
| 1006 | }; | ||
| 1007 | |||
| 1008 | let discovery = EntityDiscovery { | ||
| 1009 | id: entity_config.id, | ||
| 1010 | name: entity_config.name, | ||
| 1011 | device_class: entity_config.device_class, | ||
| 1012 | state_topic: Some(buffers.state_topic.as_str()), | ||
| 1013 | command_topic: Some(buffers.command_topic.as_str()), | ||
| 1014 | json_attributes_topic: Some(buffers.attributes_topic.as_str()), | ||
| 1015 | unit_of_measurement: entity_config.measurement_unit, | ||
| 1016 | schema: entity_config.schema, | ||
| 1017 | platform: entity_config.platform, | ||
| 1018 | state_class: entity_config.state_class, | ||
| 1019 | icon: entity_config.icon, | ||
| 1020 | entity_category: entity_config.category, | ||
| 1021 | entity_picture: entity_config.picture, | ||
| 1022 | min: entity_config.min, | ||
| 1023 | max: entity_config.max, | ||
| 1024 | step: entity_config.step, | ||
| 1025 | mode: entity_config.mode, | ||
| 1026 | suggested_display_precision: entity_config.suggested_display_precision, | ||
| 1027 | availability_topic: Some(availability_topic), | ||
| 1028 | payload_available: Some(AVAILABLE_PAYLOAD), | ||
| 1029 | payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), | ||
| 1030 | device: &device_discovery, | ||
| 1031 | }; | ||
| 1032 | |||
| 1033 | crate::log::debug!( | ||
| 1034 | "discovery for entity '{}': {:?}", | ||
| 1035 | entity_config.id, | ||
| 1036 | discovery | ||
| 1037 | ); | ||
| 1038 | |||
| 1039 | buffers | ||
| 1040 | .discovery | ||
| 1041 | .resize(buffers.discovery.capacity(), 0) | ||
| 1042 | .unwrap(); | ||
| 1043 | let n = serde_json_core::to_slice(&discovery, buffers.discovery) | ||
| 1044 | .expect("discovery buffer too small"); | ||
| 1045 | buffers.discovery.truncate(n); | ||
| 1046 | } | ||
| 1047 | |||
| 744 | /// Runs the main Home Assistant device event loop. | 1048 | /// Runs the main Home Assistant device event loop. |
| 745 | /// | 1049 | /// |
| 746 | /// This function handles MQTT communication, entity discovery, and state updates. It will run | 1050 | /// This function handles MQTT communication, entity discovery, and state updates. It will run |
| @@ -787,18 +1091,22 @@ pub fn create_device_tracker<'a>( | |||
| 787 | pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { | 1091 | pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { |
| 788 | use core::fmt::Write; | 1092 | use core::fmt::Write; |
| 789 | 1093 | ||
| 790 | const MQTT_TIMEOUT: Duration = Duration::from_secs(30); | 1094 | device.buffers.availability_topic.clear(); |
| 791 | |||
| 792 | device.availability_topic_buffer.clear(); | ||
| 793 | write!( | 1095 | write!( |
| 794 | device.availability_topic_buffer, | 1096 | device.buffers.availability_topic, |
| 795 | "{}", | 1097 | "{}", |
| 796 | DeviceAvailabilityTopic { | 1098 | DeviceAvailabilityTopic { |
| 797 | device_id: device.config.device_id | 1099 | device_id: device.config.device_id |
| 798 | } | 1100 | } |
| 799 | ) | 1101 | ) |
| 800 | .expect("device availability buffer too small"); | 1102 | .expect("device availability buffer too small"); |
| 801 | let availability_topic = device.availability_topic_buffer.as_str(); | 1103 | |
| 1104 | // Store availability_topic in a separate buffer to avoid borrow conflicts | ||
| 1105 | let mut availability_topic_copy = heapless::String::<128>::new(); | ||
| 1106 | availability_topic_copy | ||
| 1107 | .push_str(device.buffers.availability_topic.as_str()) | ||
| 1108 | .expect("availability topic too large"); | ||
| 1109 | let availability_topic = availability_topic_copy.as_str(); | ||
| 802 | 1110 | ||
| 803 | let mut ping_ticker = | 1111 | let mut ping_ticker = |
| 804 | embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); | 1112 | embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); |
| @@ -830,169 +1138,25 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 830 | } | 1138 | } |
| 831 | } | 1139 | } |
| 832 | 1140 | ||
| 833 | crate::log::debug!("sending discover messages"); | 1141 | device_mqtt_subscribe(&mut client, constants::HA_STATUS_TOPIC).await?; |
| 834 | let device_discovery = DeviceDiscovery { | ||
| 835 | identifiers: &[device.config.device_id], | ||
| 836 | name: device.config.device_name, | ||
| 837 | manufacturer: device.config.manufacturer, | ||
| 838 | model: device.config.model, | ||
| 839 | }; | ||
| 840 | |||
| 841 | for entity in device.entities { | ||
| 842 | device.publish_buffer.clear(); | ||
| 843 | device.subscribe_buffer.clear(); | ||
| 844 | device.discovery_buffer.clear(); | ||
| 845 | device.discovery_topic_buffer.clear(); | ||
| 846 | device.state_topic_buffer.clear(); | ||
| 847 | device.command_topic_buffer.clear(); | ||
| 848 | device.attributes_topic_buffer.clear(); | ||
| 849 | |||
| 850 | // borrow the entity and fill out the buffers to be sent | ||
| 851 | // this should be done inside a block so that we do not hold the RefMut across an | ||
| 852 | // await | ||
| 853 | { | ||
| 854 | let mut entity = entity.borrow_mut(); | ||
| 855 | let entity = match entity.as_mut() { | ||
| 856 | Some(entity) => entity, | ||
| 857 | None => break, | ||
| 858 | }; | ||
| 859 | let entity_config = &entity.config; | ||
| 860 | |||
| 861 | let discovery_topic_display = DiscoveryTopicDisplay { | ||
| 862 | domain: entity_config.domain, | ||
| 863 | device_id: device.config.device_id, | ||
| 864 | entity_id: entity_config.id, | ||
| 865 | }; | ||
| 866 | let state_topic_display = StateTopicDisplay { | ||
| 867 | device_id: device.config.device_id, | ||
| 868 | entity_id: entity_config.id, | ||
| 869 | }; | ||
| 870 | let command_topic_display = CommandTopicDisplay { | ||
| 871 | device_id: device.config.device_id, | ||
| 872 | entity_id: entity_config.id, | ||
| 873 | }; | ||
| 874 | let attributes_topic_display = AttributesTopicDisplay { | ||
| 875 | device_id: device.config.device_id, | ||
| 876 | entity_id: entity_config.id, | ||
| 877 | }; | ||
| 878 | |||
| 879 | write!(device.discovery_topic_buffer, "{discovery_topic_display}") | ||
| 880 | .expect("discovery topic buffer too small"); | ||
| 881 | write!(device.state_topic_buffer, "{state_topic_display}") | ||
| 882 | .expect("state topic buffer too small"); | ||
| 883 | write!(device.command_topic_buffer, "{command_topic_display}") | ||
| 884 | .expect("command topic buffer too small"); | ||
| 885 | write!(device.attributes_topic_buffer, "{attributes_topic_display}") | ||
| 886 | .expect("attributes topic buffer too small"); | ||
| 887 | |||
| 888 | let discovery = EntityDiscovery { | ||
| 889 | id: entity_config.id, | ||
| 890 | name: entity_config.name, | ||
| 891 | device_class: entity_config.device_class, | ||
| 892 | state_topic: Some(device.state_topic_buffer.as_str()), | ||
| 893 | command_topic: Some(device.command_topic_buffer.as_str()), | ||
| 894 | json_attributes_topic: Some(device.attributes_topic_buffer.as_str()), | ||
| 895 | unit_of_measurement: entity_config.measurement_unit, | ||
| 896 | schema: entity_config.schema, | ||
| 897 | platform: entity_config.platform, | ||
| 898 | state_class: entity_config.state_class, | ||
| 899 | icon: entity_config.icon, | ||
| 900 | entity_category: entity_config.category, | ||
| 901 | entity_picture: entity_config.picture, | ||
| 902 | min: entity_config.min, | ||
| 903 | max: entity_config.max, | ||
| 904 | step: entity_config.step, | ||
| 905 | mode: entity_config.mode, | ||
| 906 | suggested_display_precision: entity_config.suggested_display_precision, | ||
| 907 | availability_topic: Some(availability_topic), | ||
| 908 | payload_available: Some(AVAILABLE_PAYLOAD), | ||
| 909 | payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), | ||
| 910 | device: &device_discovery, | ||
| 911 | }; | ||
| 912 | crate::log::debug!( | ||
| 913 | "discovery for entity '{}': {:?}", | ||
| 914 | entity_config.id, | ||
| 915 | discovery | ||
| 916 | ); | ||
| 917 | |||
| 918 | device | ||
| 919 | .discovery_buffer | ||
| 920 | .resize(device.discovery_buffer.capacity(), 0) | ||
| 921 | .unwrap(); | ||
| 922 | let n = serde_json_core::to_slice(&discovery, device.discovery_buffer) | ||
| 923 | .expect("discovery buffer too small"); | ||
| 924 | device.discovery_buffer.truncate(n); | ||
| 925 | } | ||
| 926 | |||
| 927 | let discovery_topic = device.discovery_topic_buffer.as_str(); | ||
| 928 | crate::log::debug!("sending discovery to topic '{}'", discovery_topic); | ||
| 929 | match embassy_time::with_timeout( | ||
| 930 | MQTT_TIMEOUT, | ||
| 931 | client.publish(discovery_topic, device.discovery_buffer), | ||
| 932 | ) | ||
| 933 | .await | ||
| 934 | { | ||
| 935 | Ok(Ok(_)) => {} | ||
| 936 | Ok(Err(err)) => { | ||
| 937 | crate::log::error!( | ||
| 938 | "mqtt discovery publish failed with: {:?}", | ||
| 939 | crate::log::Debug2Format(&err) | ||
| 940 | ); | ||
| 941 | return Err(Error::new("mqtt discovery publish failed")); | ||
| 942 | } | ||
| 943 | Err(_) => { | ||
| 944 | crate::log::error!("mqtt discovery publish timed out"); | ||
| 945 | return Err(Error::new("mqtt discovery publish timed out")); | ||
| 946 | } | ||
| 947 | } | ||
| 948 | 1142 | ||
| 949 | let command_topic = device.command_topic_buffer.as_str(); | 1143 | publish_entity_discoveries( |
| 950 | crate::log::debug!("subscribing to command topic '{}'", command_topic); | 1144 | &mut client, |
| 951 | match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await { | 1145 | device.entities, |
| 952 | Ok(Ok(_)) => {} | 1146 | &mut device.buffers, |
| 953 | Ok(Err(err)) => { | 1147 | &device.config, |
| 954 | crate::log::error!( | 1148 | availability_topic, |
| 955 | "mqtt subscribe to '{}' failed with: {:?}", | 1149 | true, |
| 956 | command_topic, | 1150 | ) |
| 957 | crate::log::Debug2Format(&err) | 1151 | .await?; |
| 958 | ); | ||
| 959 | return Err(Error::new( | ||
| 960 | "mqtt subscription to entity command topic failed", | ||
| 961 | )); | ||
| 962 | } | ||
| 963 | Err(_) => { | ||
| 964 | crate::log::error!("mqtt subscribe to '{}' timed out", command_topic); | ||
| 965 | return Err(Error::new("mqtt subscribe timed out")); | ||
| 966 | } | ||
| 967 | } | ||
| 968 | } | ||
| 969 | 1152 | ||
| 970 | match embassy_time::with_timeout( | 1153 | device_mqtt_publish( |
| 971 | MQTT_TIMEOUT, | 1154 | &mut client, |
| 972 | client.publish_with( | 1155 | availability_topic, |
| 973 | availability_topic, | 1156 | AVAILABLE_PAYLOAD.as_bytes(), |
| 974 | AVAILABLE_PAYLOAD.as_bytes(), | 1157 | true, |
| 975 | mqtt::PublishParams { | ||
| 976 | retain: true, | ||
| 977 | ..Default::default() | ||
| 978 | }, | ||
| 979 | ), | ||
| 980 | ) | 1158 | ) |
| 981 | .await | 1159 | .await?; |
| 982 | { | ||
| 983 | Ok(Ok(_)) => {} | ||
| 984 | Ok(Err(err)) => { | ||
| 985 | crate::log::error!( | ||
| 986 | "mqtt availability publish failed with: {:?}", | ||
| 987 | crate::log::Debug2Format(&err) | ||
| 988 | ); | ||
| 989 | return Err(Error::new("mqtt availability publish failed")); | ||
| 990 | } | ||
| 991 | Err(_) => { | ||
| 992 | crate::log::error!("mqtt availability publish timed out"); | ||
| 993 | return Err(Error::new("mqtt availability publish timed out")); | ||
| 994 | } | ||
| 995 | } | ||
| 996 | 1160 | ||
| 997 | let mut first_iteration_push = true; | 1161 | let mut first_iteration_push = true; |
| 998 | 'outer_loop: loop { | 1162 | 'outer_loop: loop { |
| @@ -1011,7 +1175,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 1011 | } | 1175 | } |
| 1012 | 1176 | ||
| 1013 | entity.publish = false; | 1177 | entity.publish = false; |
| 1014 | device.publish_buffer.clear(); | 1178 | device.buffers.publish.clear(); |
| 1015 | 1179 | ||
| 1016 | let mut publish_to_attributes = false; | 1180 | let mut publish_to_attributes = false; |
| 1017 | match &entity.storage { | 1181 | match &entity.storage { |
| @@ -1019,37 +1183,39 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 1019 | state: Some(SwitchState { value, .. }), | 1183 | state: Some(SwitchState { value, .. }), |
| 1020 | .. | 1184 | .. |
| 1021 | }) => device | 1185 | }) => device |
| 1022 | .publish_buffer | 1186 | .buffers |
| 1187 | .publish | ||
| 1023 | .extend_from_slice(value.as_str().as_bytes()) | 1188 | .extend_from_slice(value.as_str().as_bytes()) |
| 1024 | .expect("publish buffer too small for switch state payload"), | 1189 | .expect("publish buffer too small for switch state payload"), |
| 1025 | EntityStorage::BinarySensor(BinarySensorStorage { | 1190 | EntityStorage::BinarySensor(BinarySensorStorage { |
| 1026 | state: Some(BinarySensorState { value, .. }), | 1191 | state: Some(BinarySensorState { value, .. }), |
| 1027 | }) => device | 1192 | }) => device |
| 1028 | .publish_buffer | 1193 | .buffers |
| 1194 | .publish | ||
| 1029 | .extend_from_slice(value.as_str().as_bytes()) | 1195 | .extend_from_slice(value.as_str().as_bytes()) |
| 1030 | .expect("publish buffer too small for binary sensor state payload"), | 1196 | .expect("publish buffer too small for binary sensor state payload"), |
| 1031 | EntityStorage::NumericSensor(NumericSensorStorage { | 1197 | EntityStorage::NumericSensor(NumericSensorStorage { |
| 1032 | state: Some(NumericSensorState { value, .. }), | 1198 | state: Some(NumericSensorState { value, .. }), |
| 1033 | .. | 1199 | .. |
| 1034 | }) => write!(device.publish_buffer, "{}", value) | 1200 | }) => write!(device.buffers.publish, "{}", value) |
| 1035 | .expect("publish buffer too small for numeric sensor payload"), | 1201 | .expect("publish buffer too small for numeric sensor payload"), |
| 1036 | EntityStorage::Number(NumberStorage { | 1202 | EntityStorage::Number(NumberStorage { |
| 1037 | state: Some(NumberState { value, .. }), | 1203 | state: Some(NumberState { value, .. }), |
| 1038 | .. | 1204 | .. |
| 1039 | }) => write!(device.publish_buffer, "{}", value) | 1205 | }) => write!(device.buffers.publish, "{}", value) |
| 1040 | .expect("publish buffer too small for number state payload"), | 1206 | .expect("publish buffer too small for number state payload"), |
| 1041 | EntityStorage::DeviceTracker(DeviceTrackerStorage { | 1207 | EntityStorage::DeviceTracker(DeviceTrackerStorage { |
| 1042 | state: Some(tracker_state), | 1208 | state: Some(tracker_state), |
| 1043 | }) => { | 1209 | }) => { |
| 1044 | publish_to_attributes = true; | 1210 | publish_to_attributes = true; |
| 1045 | device | 1211 | device |
| 1046 | .publish_buffer | 1212 | .buffers |
| 1047 | .resize(device.publish_buffer.capacity(), 0) | 1213 | .publish |
| 1214 | .resize(device.buffers.publish.capacity(), 0) | ||
| 1048 | .expect("resize to capacity should never fail"); | 1215 | .expect("resize to capacity should never fail"); |
| 1049 | let n = | 1216 | let n = serde_json_core::to_slice(&tracker_state, device.buffers.publish) |
| 1050 | serde_json_core::to_slice(&tracker_state, device.publish_buffer) | 1217 | .expect("publish buffer too small for tracker state payload"); |
| 1051 | .expect("publish buffer too small for tracker state payload"); | 1218 | device.buffers.publish.truncate(n); |
| 1052 | device.publish_buffer.truncate(n); | ||
| 1053 | } | 1219 | } |
| 1054 | _ => { | 1220 | _ => { |
| 1055 | if !first_iteration_push { | 1221 | if !first_iteration_push { |
| @@ -1067,42 +1233,26 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 1067 | device_id: device.config.device_id, | 1233 | device_id: device.config.device_id, |
| 1068 | entity_id: entity.config.id, | 1234 | entity_id: entity.config.id, |
| 1069 | }; | 1235 | }; |
| 1070 | device.attributes_topic_buffer.clear(); | 1236 | device.buffers.attributes_topic.clear(); |
| 1071 | write!(device.attributes_topic_buffer, "{attributes_topic_display}") | 1237 | write!( |
| 1072 | .expect("attributes topic buffer too small"); | 1238 | device.buffers.attributes_topic, |
| 1073 | device.attributes_topic_buffer.as_str() | 1239 | "{attributes_topic_display}" |
| 1240 | ) | ||
| 1241 | .expect("attributes topic buffer too small"); | ||
| 1242 | device.buffers.attributes_topic.as_str() | ||
| 1074 | } else { | 1243 | } else { |
| 1075 | let state_topic_display = StateTopicDisplay { | 1244 | let state_topic_display = StateTopicDisplay { |
| 1076 | device_id: device.config.device_id, | 1245 | device_id: device.config.device_id, |
| 1077 | entity_id: entity.config.id, | 1246 | entity_id: entity.config.id, |
| 1078 | }; | 1247 | }; |
| 1079 | device.state_topic_buffer.clear(); | 1248 | device.buffers.state_topic.clear(); |
| 1080 | write!(device.state_topic_buffer, "{state_topic_display}") | 1249 | write!(device.buffers.state_topic, "{state_topic_display}") |
| 1081 | .expect("state topic buffer too small"); | 1250 | .expect("state topic buffer too small"); |
| 1082 | device.state_topic_buffer.as_str() | 1251 | device.buffers.state_topic.as_str() |
| 1083 | } | 1252 | } |
| 1084 | }; | 1253 | }; |
| 1085 | 1254 | ||
| 1086 | match embassy_time::with_timeout( | 1255 | device_mqtt_publish(&mut client, publish_topic, device.buffers.publish, false).await?; |
| 1087 | MQTT_TIMEOUT, | ||
| 1088 | client.publish(publish_topic, device.publish_buffer), | ||
| 1089 | ) | ||
| 1090 | .await | ||
| 1091 | { | ||
| 1092 | Ok(Ok(_)) => {} | ||
| 1093 | Ok(Err(err)) => { | ||
| 1094 | crate::log::error!( | ||
| 1095 | "mqtt state publish on topic '{}' failed with: {:?}", | ||
| 1096 | publish_topic, | ||
| 1097 | crate::log::Debug2Format(&err) | ||
| 1098 | ); | ||
| 1099 | return Err(Error::new("mqtt publish failed")); | ||
| 1100 | } | ||
| 1101 | Err(_) => { | ||
| 1102 | crate::log::error!("mqtt state publish on topic '{}' timed out", publish_topic); | ||
| 1103 | return Err(Error::new("mqtt publish timed out")); | ||
| 1104 | } | ||
| 1105 | } | ||
| 1106 | } | 1256 | } |
| 1107 | first_iteration_push = false; | 1257 | first_iteration_push = false; |
| 1108 | 1258 | ||
| @@ -1134,6 +1284,29 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 1134 | } | 1284 | } |
| 1135 | }; | 1285 | }; |
| 1136 | 1286 | ||
| 1287 | if publish.topic == constants::HA_STATUS_TOPIC { | ||
| 1288 | let mut receive_buffer = [0u8; 64]; | ||
| 1289 | let receive_data_len = publish.data_len; | ||
| 1290 | let receive_data = | ||
| 1291 | mqtt_receive_data(&mut client, receive_data_len, &mut receive_buffer).await?; | ||
| 1292 | |||
| 1293 | if receive_data == constants::HA_STATUS_PAYLOAD_ONLINE.as_bytes() { | ||
| 1294 | first_iteration_push = true; | ||
| 1295 | |||
| 1296 | crate::log::debug!("home assistant came online, republishing discoveries"); | ||
| 1297 | publish_entity_discoveries( | ||
| 1298 | &mut client, | ||
| 1299 | device.entities, | ||
| 1300 | &mut device.buffers, | ||
| 1301 | &device.config, | ||
| 1302 | availability_topic, | ||
| 1303 | false, | ||
| 1304 | ) | ||
| 1305 | .await?; | ||
| 1306 | } | ||
| 1307 | continue; | ||
| 1308 | } | ||
| 1309 | |||
| 1137 | let entity = 'entity_search_block: { | 1310 | let entity = 'entity_search_block: { |
| 1138 | for entity in device.entities { | 1311 | for entity in device.entities { |
| 1139 | let mut data = entity.borrow_mut(); | 1312 | let mut data = entity.borrow_mut(); |
| @@ -1146,11 +1319,11 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 1146 | device_id: device.config.device_id, | 1319 | device_id: device.config.device_id, |
| 1147 | entity_id: data.config.id, | 1320 | entity_id: data.config.id, |
| 1148 | }; | 1321 | }; |
| 1149 | device.command_topic_buffer.clear(); | 1322 | device.buffers.command_topic.clear(); |
| 1150 | write!(device.command_topic_buffer, "{command_topic_display}") | 1323 | write!(device.buffers.command_topic, "{command_topic_display}") |
| 1151 | .expect("command topic buffer too small"); | 1324 | .expect("command topic buffer too small"); |
| 1152 | 1325 | ||
| 1153 | if device.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { | 1326 | if device.buffers.command_topic.as_bytes() == publish.topic.as_bytes() { |
| 1154 | break 'entity_search_block entity; | 1327 | break 'entity_search_block entity; |
| 1155 | } | 1328 | } |
| 1156 | } | 1329 | } |
| @@ -1158,43 +1331,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 1158 | }; | 1331 | }; |
| 1159 | 1332 | ||
| 1160 | let mut read_buffer = [0u8; 128]; | 1333 | let mut read_buffer = [0u8; 128]; |
| 1161 | if publish.data_len > read_buffer.len() { | ||
| 1162 | crate::log::warn!( | ||
| 1163 | "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", | ||
| 1164 | publish.topic, | ||
| 1165 | publish.data_len | ||
| 1166 | ); | ||
| 1167 | continue; | ||
| 1168 | } | ||
| 1169 | |||
| 1170 | crate::log::debug!( | ||
| 1171 | "mqtt receiving {} bytes of data on topic {}", | ||
| 1172 | publish.data_len, | ||
| 1173 | publish.topic | ||
| 1174 | ); | ||
| 1175 | |||
| 1176 | let data_len = publish.data_len; | 1334 | let data_len = publish.data_len; |
| 1177 | match embassy_time::with_timeout( | 1335 | let receive_data = |
| 1178 | MQTT_TIMEOUT, | 1336 | match mqtt_receive_data(&mut client, data_len, &mut read_buffer).await { |
| 1179 | client.receive_data(&mut read_buffer[..data_len]), | 1337 | Ok(data) => data, |
| 1180 | ) | 1338 | Err(_) => continue 'outer_loop, |
| 1181 | .await | 1339 | }; |
| 1182 | { | ||
| 1183 | Ok(Ok(())) => {} | ||
| 1184 | Ok(Err(err)) => { | ||
| 1185 | crate::log::error!( | ||
| 1186 | "mqtt receive data failed with: {:?}", | ||
| 1187 | crate::log::Debug2Format(&err) | ||
| 1188 | ); | ||
| 1189 | return Err(Error::new("mqtt receive data failed")); | ||
| 1190 | } | ||
| 1191 | Err(_) => { | ||
| 1192 | crate::log::error!("mqtt receive data timed out"); | ||
| 1193 | return Err(Error::new("mqtt receive data timed out")); | ||
| 1194 | } | ||
| 1195 | } | ||
| 1196 | 1340 | ||
| 1197 | let command = match str::from_utf8(&read_buffer[..data_len]) { | 1341 | let command = match str::from_utf8(receive_data) { |
| 1198 | Ok(command) => command, | 1342 | Ok(command) => command, |
| 1199 | Err(_) => { | 1343 | Err(_) => { |
| 1200 | crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); | 1344 | crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); |
