From 1fc7ac328e4828a208be56fd574e81156d81117f Mon Sep 17 00:00:00 2001 From: diogo464 Date: Fri, 5 Dec 2025 16:30:21 +0000 Subject: improved error handling and log messages --- examples/binary_sensor.rs | 2 +- examples/button.rs | 2 +- examples/number.rs | 2 +- examples/switch.rs | 2 +- examples/temperature.rs | 2 +- src/lib.rs | 252 ++++++++++++++++++++++++++++++---------------- src/transport.rs | 7 +- 7 files changed, 178 insertions(+), 91 deletions(-) diff --git a/examples/binary_sensor.rs b/examples/binary_sensor.rs index c0afa4a..7363a6c 100644 --- a/examples/binary_sensor.rs +++ b/examples/binary_sensor.rs @@ -34,7 +34,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(binary_sensor_class(sensor)); - device.run(&mut stream).await; + device.run(&mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/button.rs b/examples/button.rs index 3ac1de8..d1d2159 100644 --- a/examples/button.rs +++ b/examples/button.rs @@ -24,7 +24,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(button_task(button)); - device.run(&mut stream).await; + device.run(&mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/number.rs b/examples/number.rs index 5cad84b..19f11fa 100644 --- a/examples/number.rs +++ b/examples/number.rs @@ -39,7 +39,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(number_task(number)); - device.run(&mut stream).await; + device.run(&mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/switch.rs b/examples/switch.rs index 815a0f9..49ff353 100644 --- a/examples/switch.rs +++ b/examples/switch.rs @@ -34,7 +34,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(switch_task(switch)); - device.run(&mut stream).await; + device.run(&mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/temperature.rs b/examples/temperature.rs index 4cf0d42..d449cd4 100644 --- a/examples/temperature.rs +++ b/examples/temperature.rs @@ -46,7 +46,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(constant_temperature_task(constant_temperature_sensor)); spawner.must_spawn(random_temperature_task(random_temperature_sensor)); - device.run(&mut stream).await; + device.run(&mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/src/lib.rs b/src/lib.rs index 325df1d..b16169a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,6 @@ use core::{cell::RefCell, task::Waker}; use defmt::Format; use embassy_sync::waitqueue::AtomicWaker; -use embassy_time::Timer; use heapless::{ Vec, VecView, string::{String, StringView}, @@ -43,6 +42,23 @@ pub use transport::Transport; mod unit; pub use unit::*; +#[derive(Debug)] +pub struct Error(&'static str); + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_str(self.0) + } +} + +impl core::error::Error for Error {} + +impl Error { + pub(crate) fn new(message: &'static str) -> Self { + Self(message) + } +} + #[derive(Debug, Format, Clone, Copy, Serialize)] struct DeviceDiscovery<'a> { identifiers: &'a [&'a str], @@ -464,16 +480,12 @@ impl<'a> Device<'a> { BinarySensor::new(entity) } - pub async fn run(&mut self, transport: &mut T) -> ! { - loop { - self.run_iteration(transport).await; - Timer::after_millis(5000).await; - } - } - - async fn run_iteration(&mut self, transport: &mut T) { + pub async fn run(&mut self, transport: &mut T) -> Result<(), Error> { let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); - client.connect(self.config.device_id).await.unwrap(); + if let Err(err) = client.connect(self.config.device_id).await { + defmt::error!("mqtt connect failed with: {:?}", defmt::Debug2Format(&err)); + return Err(Error::new("mqtt connection failed")); + } defmt::info!("sending discover messages"); let device_discovery = DeviceDiscovery { @@ -504,36 +516,26 @@ impl<'a> Device<'a> { }; let entity_config = &entity.config; - write!( - self.discovery_topic_buffer, - "{}", - DiscoveryTopicDisplay { - domain: entity_config.domain, - device_id: self.config.device_id, - entity_id: entity_config.id, - } - ) - .unwrap(); - - write!( - self.state_topic_buffer, - "{}", - StateTopicDisplay { - device_id: self.config.device_id, - entity_id: entity_config.id - } - ) - .unwrap(); + let discovery_topic_display = DiscoveryTopicDisplay { + domain: entity_config.domain, + device_id: self.config.device_id, + entity_id: entity_config.id, + }; + let state_topic_display = StateTopicDisplay { + device_id: self.config.device_id, + entity_id: entity_config.id, + }; + let command_topic_display = CommandTopicDisplay { + device_id: self.config.device_id, + entity_id: entity_config.id, + }; - write!( - self.command_topic_buffer, - "{}", - CommandTopicDisplay { - device_id: self.config.device_id, - entity_id: entity_config.id - } - ) - .unwrap(); + write!(self.discovery_topic_buffer, "{discovery_topic_display}") + .expect("discovery topic buffer too small"); + write!(self.state_topic_buffer, "{state_topic_display}") + .expect("state topic buffer too small"); + write!(self.command_topic_buffer, "{command_topic_display}") + .expect("command topic buffer too small"); let discovery = EntityDiscovery { id: entity_config.id, @@ -552,24 +554,41 @@ impl<'a> Device<'a> { mode: entity_config.mode, device: &device_discovery, }; - defmt::info!("discovery: {}", discovery); + defmt::info!("discovery for entity '{}': {}", entity_config.id, discovery); self.discovery_buffer .resize(self.discovery_buffer.capacity(), 0) .unwrap(); - let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer).unwrap(); + let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer) + .expect("discovery buffer too small"); self.discovery_buffer.truncate(n); } - defmt::info!( - "sending discovery to {}", - self.discovery_topic_buffer.as_str() - ); - client - .publish(&self.discovery_topic_buffer, &self.discovery_buffer) + let discovery_topic = self.discovery_topic_buffer.as_str(); + defmt::info!("sending discovery to topic '{}'", discovery_topic); + if let Err(err) = client + .publish(discovery_topic, &self.discovery_buffer) .await - .unwrap(); - client.subscribe(&self.command_topic_buffer).await.unwrap(); + { + defmt::error!( + "mqtt discovery publish failed with: {:?}", + defmt::Debug2Format(&err) + ); + return Err(Error::new("mqtt discovery publish failed")); + } + + let command_topic = self.command_topic_buffer.as_str(); + defmt::info!("subscribing to command topic '{}'", command_topic); + if let Err(err) = client.subscribe(command_topic).await { + defmt::error!( + "mqtt subscribe to '{}' failed with: {:?}", + command_topic, + defmt::Debug2Format(&err) + ); + return Err(Error::new( + "mqtt subscription to entity command topic failed", + )); + } } 'outer_loop: loop { @@ -597,47 +616,61 @@ impl<'a> Device<'a> { }) => self .publish_buffer .extend_from_slice(value.as_str().as_bytes()) - .unwrap(), + .expect("publish buffer too small for switch state payload"), EntityStorage::BinarySensor(BinarySensorStorage { state: Some(BinarySensorState { value, .. }), }) => self .publish_buffer .extend_from_slice(value.as_str().as_bytes()) - .unwrap(), + .expect("publish buffer too small for binary sensor state payload"), EntityStorage::NumericSensor(NumericSensorStorage { state: Some(NumericSensorState { value, .. }), .. - }) => write!(self.publish_buffer, "{}", value).unwrap(), + }) => write!(self.publish_buffer, "{}", value) + .expect("publish buffer too small for numeric sensor payload"), EntityStorage::Number(NumberStorage { state: Some(NumberState { value, .. }), .. - }) => write!(self.publish_buffer, "{}", value).unwrap(), - _ => continue, // TODO: print warning + }) => write!(self.publish_buffer, "{}", value) + .expect("publish buffer too small for number state payload"), + _ => { + defmt::warn!( + "entity '{}' requested state publish but its storage does not support it", + entity.config.id + ); + continue; + } } + let state_topic_display = StateTopicDisplay { + device_id: self.config.device_id, + entity_id: entity.config.id, + }; self.state_topic_buffer.clear(); - write!( - self.state_topic_buffer, - "{}", - StateTopicDisplay { - device_id: self.config.device_id, - entity_id: entity.config.id - } - ) - .unwrap(); + write!(self.state_topic_buffer, "{state_topic_display}") + .expect("state topic buffer too small"); } - client - .publish(&self.state_topic_buffer, self.publish_buffer) - .await - .unwrap(); + let state_topic = self.state_topic_buffer.as_str(); + if let Err(err) = client.publish(state_topic, self.publish_buffer).await { + defmt::error!( + "mqtt state publish on topic '{}' failed with: {:?}", + state_topic, + defmt::Debug2Format(&err) + ); + return Err(Error::new("mqtt publish failed")); + } } let receive = client.receive(); let waker = wait_on_atomic_waker(self.waker); let publish = match embassy_futures::select::select(receive, waker).await { - embassy_futures::select::Either::First(packet) => match packet.unwrap() { - embedded_mqtt::Packet::Publish(publish) => publish, + embassy_futures::select::Either::First(packet) => match packet { + Ok(embedded_mqtt::Packet::Publish(publish)) => publish, + Err(err) => { + defmt::error!("mqtt receive failed with: {:?}", defmt::Debug2Format(&err)); + return Err(Error::new("mqtt receive failed")); + } _ => continue, }, embassy_futures::select::Either::Second(_) => continue, @@ -651,16 +684,13 @@ impl<'a> Device<'a> { None => break, }; + let command_topic_display = CommandTopicDisplay { + device_id: self.config.device_id, + entity_id: data.config.id, + }; self.command_topic_buffer.clear(); - write!( - self.command_topic_buffer, - "{}", - CommandTopicDisplay { - device_id: self.config.device_id, - entity_id: data.config.id - } - ) - .unwrap(); + write!(self.command_topic_buffer, "{command_topic_display}") + .expect("command topic buffer too small"); if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { break 'entity_search_block entity; @@ -671,31 +701,83 @@ impl<'a> Device<'a> { let mut read_buffer = [0u8; 128]; if publish.data_len > read_buffer.len() { - defmt::warn!("mqtt publish payload too large, ignoring message"); + defmt::warn!( + "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", + publish.topic, + publish.data_len + ); continue; } - let b = &mut read_buffer[..publish.data_len]; - client.receive_data(b).await.unwrap(); - let command = str::from_utf8(b).unwrap(); + + defmt::info!( + "mqtt receiving {} bytes of data on topic {}", + publish.data_len, + publish.topic + ); + + let data_len = publish.data_len; + if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { + defmt::error!( + "mqtt receive data failed with: {:?}", + defmt::Debug2Format(&err) + ); + return Err(Error::new("mqtt receive data failed")); + } + + let command = match str::from_utf8(&read_buffer[..data_len]) { + Ok(command) => command, + Err(_) => { + defmt::warn!("mqtt message contained invalid utf-8, ignoring it"); + continue; + } + }; let mut entity = entity.borrow_mut(); let data = entity.as_mut().unwrap(); match &mut data.storage { EntityStorage::Button(button_storage) => { - assert_eq!(command, constants::HA_BUTTON_PAYLOAD_PRESS); + if command != constants::HA_BUTTON_PAYLOAD_PRESS { + defmt::warn!( + "button '{}' received unexpected command '{}', expected '{}', ignoring it", + data.config.id, + command, + constants::HA_BUTTON_PAYLOAD_PRESS + ); + continue; + } button_storage.consumed = false; button_storage.timestamp = Some(embassy_time::Instant::now()); } EntityStorage::Switch(switch_storage) => { - let command = command.parse::().unwrap(); + let command = match command.parse::() { + Ok(command) => command, + Err(_) => { + defmt::warn!( + "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it", + data.config.id, + command + ); + continue; + } + }; switch_storage.command = Some(SwitchCommand { value: command, timestamp: embassy_time::Instant::now(), }); } EntityStorage::Number(number_storage) => { - let command = command.parse::().unwrap(); + let command = match command.parse::() { + Ok(command) => command, + Err(_) => { + defmt::warn!( + "number '{}' received invalid command '{}', expected a valid number, ignoring it", + data.config.id, + command + ); + continue; + } + }; number_storage.command = Some(NumberCommand { value: command, timestamp: embassy_time::Instant::now(), diff --git a/src/transport.rs b/src/transport.rs index 5214b37..2e7889e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,3 +1,8 @@ pub trait Transport: embedded_io_async::Read + embedded_io_async::Write {} -impl Transport for T where T: embedded_io_async::Read + embedded_io_async::Write {} +impl Transport for T +where + T: embedded_io_async::Read + embedded_io_async::Write, + ::Error: core::fmt::Debug, +{ +} -- cgit