From 28d9961141a38ebde8bd6144636c3021eb2755a5 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sun, 7 Dec 2025 15:57:17 +0000 Subject: code style change --- examples/binary_sensor.rs | 7 +- examples/button.rs | 6 +- examples/number.rs | 7 +- examples/sensor.rs | 10 +- examples/switch.rs | 7 +- src/lib.rs | 810 +++++++++++++++++++++++----------------------- 6 files changed, 427 insertions(+), 420 deletions(-) diff --git a/examples/binary_sensor.rs b/examples/binary_sensor.rs index 27cfdb5..a52a5fb 100644 --- a/examples/binary_sensor.rs +++ b/examples/binary_sensor.rs @@ -11,7 +11,7 @@ static RESOURCES: StaticCell = StaticCell::new(); async fn main_task(spawner: Spawner) { let mut stream = AsyncTcp::connect(std::env!("MQTT_ADDRESS")); - let mut device = embassy_ha::Device::new( + let mut device = embassy_ha::new( RESOURCES.init(Default::default()), embassy_ha::DeviceConfig { device_id: "example-device-id", @@ -21,7 +21,8 @@ async fn main_task(spawner: Spawner) { }, ); - let sensor = device.create_binary_sensor( + let sensor = embassy_ha::create_binary_sensor( + &device, "binary-sensor-id", embassy_ha::BinarySensorConfig { common: embassy_ha::EntityCommonConfig { @@ -34,7 +35,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(binary_sensor_class(sensor)); - device.run(&mut stream).await.unwrap(); + embassy_ha::run(&mut device, &mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/button.rs b/examples/button.rs index e3d7086..4a2a228 100644 --- a/examples/button.rs +++ b/examples/button.rs @@ -10,7 +10,7 @@ static RESOURCES: StaticCell = StaticCell::new(); async fn main_task(spawner: Spawner) { let mut stream = AsyncTcp::connect(std::env!("MQTT_ADDRESS")); - let mut device = embassy_ha::Device::new( + let mut device = embassy_ha::new( RESOURCES.init(Default::default()), embassy_ha::DeviceConfig { device_id: "example-device-id", @@ -20,11 +20,11 @@ async fn main_task(spawner: Spawner) { }, ); - let button = device.create_button("button-sensor-id", embassy_ha::ButtonConfig::default()); + let button = embassy_ha::create_button(&device, "button-sensor-id", embassy_ha::ButtonConfig::default()); spawner.must_spawn(button_task(button)); - device.run(&mut stream).await.unwrap(); + embassy_ha::run(&mut device, &mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/number.rs b/examples/number.rs index 2231be1..8ef6656 100644 --- a/examples/number.rs +++ b/examples/number.rs @@ -11,7 +11,7 @@ static RESOURCES: StaticCell = StaticCell::new(); async fn main_task(spawner: Spawner) { let mut stream = AsyncTcp::connect(std::env!("MQTT_ADDRESS")); - let mut device = embassy_ha::Device::new( + let mut device = embassy_ha::new( RESOURCES.init(Default::default()), embassy_ha::DeviceConfig { device_id: "example-device-id", @@ -21,7 +21,8 @@ async fn main_task(spawner: Spawner) { }, ); - let number = device.create_number( + let number = embassy_ha::create_number( + &device, "number-id", embassy_ha::NumberConfig { common: embassy_ha::EntityCommonConfig { @@ -40,7 +41,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(number_task(number)); - device.run(&mut stream).await.unwrap(); + embassy_ha::run(&mut device, &mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/sensor.rs b/examples/sensor.rs index 69bd87a..2fd2d1f 100644 --- a/examples/sensor.rs +++ b/examples/sensor.rs @@ -11,7 +11,7 @@ static RESOURCES: StaticCell = StaticCell::new(); async fn main_task(spawner: Spawner) { let mut stream = AsyncTcp::connect(std::env!("MQTT_ADDRESS")); - let mut device = embassy_ha::Device::new( + let mut device = embassy_ha::new( RESOURCES.init(Default::default()), embassy_ha::DeviceConfig { device_id: "example-device-id", @@ -21,7 +21,8 @@ async fn main_task(spawner: Spawner) { }, ); - let temperature_sensor = device.create_sensor( + let temperature_sensor = embassy_ha::create_sensor( + &device, "random-temperature-sensor-id", embassy_ha::SensorConfig { common: embassy_ha::EntityCommonConfig { @@ -35,7 +36,8 @@ async fn main_task(spawner: Spawner) { }, ); - let humidity_sensor = device.create_sensor( + let humidity_sensor = embassy_ha::create_sensor( + &device, "random-humidity-sensor-id", embassy_ha::SensorConfig { common: embassy_ha::EntityCommonConfig { @@ -52,7 +54,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(random_temperature_task(temperature_sensor)); spawner.must_spawn(random_humidity_task(humidity_sensor)); - device.run(&mut stream).await.unwrap(); + embassy_ha::run(&mut device, &mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/examples/switch.rs b/examples/switch.rs index 67767d7..0158ede 100644 --- a/examples/switch.rs +++ b/examples/switch.rs @@ -11,7 +11,7 @@ static RESOURCES: StaticCell = StaticCell::new(); async fn main_task(spawner: Spawner) { let mut stream = AsyncTcp::connect(std::env!("MQTT_ADDRESS")); - let mut device = embassy_ha::Device::new( + let mut device = embassy_ha::new( RESOURCES.init(Default::default()), embassy_ha::DeviceConfig { device_id: "example-device-id", @@ -21,7 +21,8 @@ async fn main_task(spawner: Spawner) { }, ); - let switch = device.create_switch( + let switch = embassy_ha::create_switch( + &device, "switch-id", embassy_ha::SwitchConfig { common: embassy_ha::EntityCommonConfig { @@ -34,7 +35,7 @@ async fn main_task(spawner: Spawner) { spawner.must_spawn(switch_task(switch)); - device.run(&mut stream).await.unwrap(); + embassy_ha::run(&mut device, &mut stream).await.unwrap(); } #[embassy_executor::task] diff --git a/src/lib.rs b/src/lib.rs index 4e0896c..c1b5191 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -415,257 +415,260 @@ pub struct Device<'a> { command_topic_buffer: &'a mut StringView, } -impl<'a> Device<'a> { - pub fn new(resources: &'a mut DeviceResources, config: DeviceConfig) -> Self { - Self { - config, - waker: &resources.waker, - entities: &resources.entities, - - mqtt_resources: &mut resources.mqtt_resources, - publish_buffer: &mut resources.publish_buffer, - subscribe_buffer: &mut resources.subscribe_buffer, - discovery_buffer: &mut resources.discovery_buffer, - availability_topic_buffer: &mut resources.availability_topic_buffer, - discovery_topic_buffer: &mut resources.discovery_topic_buffer, - state_topic_buffer: &mut resources.state_topic_buffer, - command_topic_buffer: &mut resources.command_topic_buffer, - } +pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> { + Device { + config, + waker: &resources.waker, + entities: &resources.entities, + + mqtt_resources: &mut resources.mqtt_resources, + publish_buffer: &mut resources.publish_buffer, + subscribe_buffer: &mut resources.subscribe_buffer, + discovery_buffer: &mut resources.discovery_buffer, + availability_topic_buffer: &mut resources.availability_topic_buffer, + discovery_topic_buffer: &mut resources.discovery_topic_buffer, + state_topic_buffer: &mut resources.state_topic_buffer, + command_topic_buffer: &mut resources.command_topic_buffer, } +} - pub fn create_entity(&self, config: EntityConfig, storage: EntityStorage) -> Entity<'a> { - let index = 'outer: { - for idx in 0..self.entities.len() { - if self.entities[idx].borrow().is_none() { - break 'outer idx; - } +fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityStorage) -> Entity<'a> { + let index = 'outer: { + for idx in 0..device.entities.len() { + if device.entities[idx].borrow().is_none() { + break 'outer idx; } - panic!("device entity limit reached"); - }; - - let data = EntityData { - config, - storage, - publish: false, - command: false, - command_waker: None, - }; - self.entities[index].replace(Some(data)); - - Entity { - data: &self.entities[index], - waker: self.waker, } + panic!("device entity limit reached"); + }; + + let data = EntityData { + config, + storage, + publish: false, + command: false, + command_waker: None, + }; + device.entities[index].replace(Some(data)); + + Entity { + data: &device.entities[index], + waker: device.waker, } +} - pub fn create_sensor(&self, id: &'static str, config: SensorConfig) -> Sensor<'a> { - let mut entity_config = EntityConfig::default(); - entity_config.id = id; - config.populate(&mut entity_config); +pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorConfig) -> Sensor<'a> { + let mut entity_config = EntityConfig::default(); + entity_config.id = id; + config.populate(&mut entity_config); + + let entity = create_entity( + device, + entity_config, + EntityStorage::NumericSensor(Default::default()), + ); + Sensor::new(entity) +} - let entity = self.create_entity( - entity_config, - EntityStorage::NumericSensor(Default::default()), - ); - Sensor::new(entity) - } +pub fn create_button<'a>(device: &Device<'a>, id: &'static str, config: ButtonConfig) -> Button<'a> { + let mut entity_config = EntityConfig::default(); + entity_config.id = id; + config.populate(&mut entity_config); - pub fn create_button(&self, id: &'static str, config: ButtonConfig) -> Button<'a> { - let mut entity_config = EntityConfig::default(); - entity_config.id = id; - config.populate(&mut entity_config); + let entity = create_entity(device, entity_config, EntityStorage::Button(Default::default())); + Button::new(entity) +} - let entity = self.create_entity(entity_config, EntityStorage::Button(Default::default())); - Button::new(entity) - } +pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberConfig) -> Number<'a> { + let mut entity_config = EntityConfig::default(); + entity_config.id = id; + config.populate(&mut entity_config); - pub fn create_number(&self, id: &'static str, config: NumberConfig) -> Number<'a> { - let mut entity_config = EntityConfig::default(); - entity_config.id = id; - config.populate(&mut entity_config); - - let entity = self.create_entity( - entity_config, - EntityStorage::Number(NumberStorage { - publish_on_command: config.publish_on_command, - ..Default::default() - }), - ); - Number::new(entity) - } + let entity = create_entity( + device, + entity_config, + EntityStorage::Number(NumberStorage { + publish_on_command: config.publish_on_command, + ..Default::default() + }), + ); + Number::new(entity) +} - pub fn create_switch(&self, id: &'static str, config: SwitchConfig) -> Switch<'a> { - let mut entity_config = EntityConfig::default(); - entity_config.id = id; - config.populate(&mut entity_config); - - let entity = self.create_entity( - entity_config, - EntityStorage::Switch(SwitchStorage { - publish_on_command: config.publish_on_command, - ..Default::default() - }), - ); - Switch::new(entity) - } +pub fn create_switch<'a>(device: &Device<'a>, id: &'static str, config: SwitchConfig) -> Switch<'a> { + let mut entity_config = EntityConfig::default(); + entity_config.id = id; + config.populate(&mut entity_config); + + let entity = create_entity( + device, + entity_config, + EntityStorage::Switch(SwitchStorage { + publish_on_command: config.publish_on_command, + ..Default::default() + }), + ); + Switch::new(entity) +} + +pub fn create_binary_sensor<'a>( + device: &Device<'a>, + id: &'static str, + config: BinarySensorConfig, +) -> BinarySensor<'a> { + let mut entity_config = EntityConfig::default(); + entity_config.id = id; + config.populate(&mut entity_config); + + let entity = create_entity( + device, + entity_config, + EntityStorage::BinarySensor(Default::default()), + ); + BinarySensor::new(entity) +} + +pub async fn run(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { + use core::fmt::Write; - pub fn create_binary_sensor( - &self, - id: &'static str, - config: BinarySensorConfig, - ) -> BinarySensor<'a> { - let mut entity_config = EntityConfig::default(); - entity_config.id = id; - config.populate(&mut entity_config); - - let entity = self.create_entity( - entity_config, - EntityStorage::BinarySensor(Default::default()), + device.availability_topic_buffer.clear(); + write!( + device.availability_topic_buffer, + "{}", + DeviceAvailabilityTopic { + device_id: device.config.device_id + } + ) + .expect("device availability buffer too small"); + let availability_topic = device.availability_topic_buffer.as_str(); + + let mut client = embedded_mqtt::Client::new(device.mqtt_resources, transport); + let connect_params = embedded_mqtt::ConnectParams { + will_topic: Some(availability_topic), + will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), + will_retain: true, + ..Default::default() + }; + if let Err(err) = client + .connect_with(device.config.device_id, connect_params) + .await + { + crate::log::error!( + "mqtt connect failed with: {:?}", + crate::log::Debug2Format(&err) ); - BinarySensor::new(entity) + return Err(Error::new("mqtt connection failed")); } - pub async fn run(&mut self, transport: &mut T) -> Result<(), Error> { - use core::fmt::Write; + crate::log::debug!("sending discover messages"); + let device_discovery = DeviceDiscovery { + identifiers: &[device.config.device_id], + name: device.config.device_name, + manufacturer: device.config.manufacturer, + model: device.config.model, + }; + + for entity in device.entities { + device.publish_buffer.clear(); + device.subscribe_buffer.clear(); + device.discovery_buffer.clear(); + device.discovery_topic_buffer.clear(); + device.state_topic_buffer.clear(); + device.command_topic_buffer.clear(); + + // borrow the entity and fill out the buffers to be sent + // this should be done inside a block so that we do not hold the RefMut across an + // await + { + let mut entity = entity.borrow_mut(); + let entity = match entity.as_mut() { + Some(entity) => entity, + None => break, + }; + let entity_config = &entity.config; - self.availability_topic_buffer.clear(); - write!( - self.availability_topic_buffer, - "{}", - DeviceAvailabilityTopic { - device_id: self.config.device_id - } - ) - .expect("device availability buffer too small"); - let availability_topic = self.availability_topic_buffer.as_str(); - - let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); - let connect_params = embedded_mqtt::ConnectParams { - will_topic: Some(availability_topic), - will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), - will_retain: true, - ..Default::default() - }; + let discovery_topic_display = DiscoveryTopicDisplay { + domain: entity_config.domain, + device_id: device.config.device_id, + entity_id: entity_config.id, + }; + let state_topic_display = StateTopicDisplay { + device_id: device.config.device_id, + entity_id: entity_config.id, + }; + let command_topic_display = CommandTopicDisplay { + device_id: device.config.device_id, + entity_id: entity_config.id, + }; + + write!(device.discovery_topic_buffer, "{discovery_topic_display}") + .expect("discovery topic buffer too small"); + write!(device.state_topic_buffer, "{state_topic_display}") + .expect("state topic buffer too small"); + write!(device.command_topic_buffer, "{command_topic_display}") + .expect("command topic buffer too small"); + + let discovery = EntityDiscovery { + id: entity_config.id, + name: entity_config.name, + device_class: entity_config.device_class, + state_topic: Some(device.state_topic_buffer.as_str()), + command_topic: Some(device.command_topic_buffer.as_str()), + unit_of_measurement: entity_config.measurement_unit, + schema: entity_config.schema, + state_class: entity_config.state_class, + icon: entity_config.icon, + entity_picture: entity_config.picture, + min: entity_config.min, + max: entity_config.max, + step: entity_config.step, + mode: entity_config.mode, + suggested_display_precision: entity_config.suggested_display_precision, + availability_topic: Some(availability_topic), + payload_available: Some(AVAILABLE_PAYLOAD), + payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), + device: &device_discovery, + }; + crate::log::debug!( + "discovery for entity '{}': {:?}", + entity_config.id, + discovery + ); + + device.discovery_buffer + .resize(device.discovery_buffer.capacity(), 0) + .unwrap(); + let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer) + .expect("discovery buffer too small"); + device.discovery_buffer.truncate(n); + } + + let discovery_topic = device.discovery_topic_buffer.as_str(); + crate::log::debug!("sending discovery to topic '{}'", discovery_topic); if let Err(err) = client - .connect_with(self.config.device_id, connect_params) + .publish(discovery_topic, &device.discovery_buffer) .await { crate::log::error!( - "mqtt connect failed with: {:?}", + "mqtt discovery publish failed with: {:?}", crate::log::Debug2Format(&err) ); - return Err(Error::new("mqtt connection failed")); + return Err(Error::new("mqtt discovery publish failed")); } - crate::log::debug!("sending discover messages"); - let device_discovery = DeviceDiscovery { - identifiers: &[self.config.device_id], - name: self.config.device_name, - manufacturer: self.config.manufacturer, - model: self.config.model, - }; - - for entity in self.entities { - self.publish_buffer.clear(); - self.subscribe_buffer.clear(); - self.discovery_buffer.clear(); - self.discovery_topic_buffer.clear(); - self.state_topic_buffer.clear(); - self.command_topic_buffer.clear(); - - // borrow the entity and fill out the buffers to be sent - // this should be done inside a block so that we do not hold the RefMut across an - // await - { - let mut entity = entity.borrow_mut(); - let entity = match entity.as_mut() { - Some(entity) => entity, - None => break, - }; - let entity_config = &entity.config; - - let discovery_topic_display = DiscoveryTopicDisplay { - domain: entity_config.domain, - device_id: self.config.device_id, - entity_id: entity_config.id, - }; - let state_topic_display = StateTopicDisplay { - device_id: self.config.device_id, - entity_id: entity_config.id, - }; - let command_topic_display = CommandTopicDisplay { - device_id: self.config.device_id, - entity_id: entity_config.id, - }; - - write!(self.discovery_topic_buffer, "{discovery_topic_display}") - .expect("discovery topic buffer too small"); - write!(self.state_topic_buffer, "{state_topic_display}") - .expect("state topic buffer too small"); - write!(self.command_topic_buffer, "{command_topic_display}") - .expect("command topic buffer too small"); - - let discovery = EntityDiscovery { - id: entity_config.id, - name: entity_config.name, - device_class: entity_config.device_class, - state_topic: Some(self.state_topic_buffer.as_str()), - command_topic: Some(self.command_topic_buffer.as_str()), - unit_of_measurement: entity_config.measurement_unit, - schema: entity_config.schema, - state_class: entity_config.state_class, - icon: entity_config.icon, - entity_picture: entity_config.picture, - min: entity_config.min, - max: entity_config.max, - step: entity_config.step, - mode: entity_config.mode, - suggested_display_precision: entity_config.suggested_display_precision, - availability_topic: Some(availability_topic), - payload_available: Some(AVAILABLE_PAYLOAD), - payload_not_available: Some(NOT_AVAILABLE_PAYLOAD), - device: &device_discovery, - }; - crate::log::debug!( - "discovery for entity '{}': {:?}", - entity_config.id, - discovery - ); - - self.discovery_buffer - .resize(self.discovery_buffer.capacity(), 0) - .unwrap(); - let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer) - .expect("discovery buffer too small"); - self.discovery_buffer.truncate(n); - } - - let discovery_topic = self.discovery_topic_buffer.as_str(); - crate::log::debug!("sending discovery to topic '{}'", discovery_topic); - if let Err(err) = client - .publish(discovery_topic, &self.discovery_buffer) - .await - { - crate::log::error!( - "mqtt discovery publish failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt discovery publish failed")); - } - - let command_topic = self.command_topic_buffer.as_str(); - crate::log::debug!("subscribing to command topic '{}'", command_topic); - if let Err(err) = client.subscribe(command_topic).await { - crate::log::error!( - "mqtt subscribe to '{}' failed with: {:?}", - command_topic, - crate::log::Debug2Format(&err) - ); - return Err(Error::new( - "mqtt subscription to entity command topic failed", - )); - } + let command_topic = device.command_topic_buffer.as_str(); + crate::log::debug!("subscribing to command topic '{}'", command_topic); + if let Err(err) = client.subscribe(command_topic).await { + crate::log::error!( + "mqtt subscribe to '{}' failed with: {:?}", + command_topic, + crate::log::Debug2Format(&err) + ); + return Err(Error::new( + "mqtt subscription to entity command topic failed", + )); } + } if let Err(err) = client .publish_with( @@ -685,68 +688,68 @@ impl<'a> Device<'a> { return Err(Error::new("mqtt availability publish failed")); } - 'outer_loop: loop { - use core::fmt::Write; + 'outer_loop: loop { + use core::fmt::Write; - for entity in self.entities { - { - let mut entity = entity.borrow_mut(); - let entity = match entity.as_mut() { - Some(entity) => entity, - None => break, - }; + for entity in device.entities { + { + let mut entity = entity.borrow_mut(); + let entity = match entity.as_mut() { + Some(entity) => entity, + None => break, + }; - if !entity.publish { - continue; - } + if !entity.publish { + continue; + } - entity.publish = false; - self.publish_buffer.clear(); - - match &entity.storage { - EntityStorage::Switch(SwitchStorage { - state: Some(SwitchState { value, .. }), - .. - }) => self - .publish_buffer - .extend_from_slice(value.as_str().as_bytes()) - .expect("publish buffer too small for switch state payload"), - EntityStorage::BinarySensor(BinarySensorStorage { - state: Some(BinarySensorState { value, .. }), - }) => self - .publish_buffer - .extend_from_slice(value.as_str().as_bytes()) - .expect("publish buffer too small for binary sensor state payload"), - EntityStorage::NumericSensor(NumericSensorStorage { - state: Some(NumericSensorState { value, .. }), - .. - }) => write!(self.publish_buffer, "{}", value) - .expect("publish buffer too small for numeric sensor payload"), - EntityStorage::Number(NumberStorage { - state: Some(NumberState { value, .. }), - .. - }) => write!(self.publish_buffer, "{}", value) - .expect("publish buffer too small for number state payload"), - _ => { - crate::log::warn!( - "entity '{}' requested state publish but its storage does not support it", - entity.config.id - ); - continue; - } + entity.publish = false; + device.publish_buffer.clear(); + + match &entity.storage { + EntityStorage::Switch(SwitchStorage { + state: Some(SwitchState { value, .. }), + .. + }) => device + .publish_buffer + .extend_from_slice(value.as_str().as_bytes()) + .expect("publish buffer too small for switch state payload"), + EntityStorage::BinarySensor(BinarySensorStorage { + state: Some(BinarySensorState { value, .. }), + }) => device + .publish_buffer + .extend_from_slice(value.as_str().as_bytes()) + .expect("publish buffer too small for binary sensor state payload"), + EntityStorage::NumericSensor(NumericSensorStorage { + state: Some(NumericSensorState { value, .. }), + .. + }) => write!(device.publish_buffer, "{}", value) + .expect("publish buffer too small for numeric sensor payload"), + EntityStorage::Number(NumberStorage { + state: Some(NumberState { value, .. }), + .. + }) => write!(device.publish_buffer, "{}", value) + .expect("publish buffer too small for number state payload"), + _ => { + crate::log::warn!( + "entity '{}' requested state publish but its storage does not support it", + entity.config.id + ); + continue; } - - let state_topic_display = StateTopicDisplay { - device_id: self.config.device_id, - entity_id: entity.config.id, - }; - self.state_topic_buffer.clear(); - write!(self.state_topic_buffer, "{state_topic_display}") - .expect("state topic buffer too small"); } - let state_topic = self.state_topic_buffer.as_str(); - if let Err(err) = client.publish(state_topic, self.publish_buffer).await { + let state_topic_display = StateTopicDisplay { + device_id: device.config.device_id, + entity_id: entity.config.id, + }; + device.state_topic_buffer.clear(); + write!(device.state_topic_buffer, "{state_topic_display}") + .expect("state topic buffer too small"); + } + + let state_topic = device.state_topic_buffer.as_str(); + if let Err(err) = client.publish(state_topic, device.publish_buffer).await { crate::log::error!( "mqtt state publish on topic '{}' failed with: {:?}", state_topic, @@ -756,153 +759,152 @@ impl<'a> Device<'a> { } } - let receive = client.receive(); - let waker = wait_on_atomic_waker(self.waker); - let publish = match embassy_futures::select::select(receive, waker).await { - embassy_futures::select::Either::First(packet) => match packet { - Ok(embedded_mqtt::Packet::Publish(publish)) => publish, - Err(err) => { - crate::log::error!( - "mqtt receive failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt receive failed")); - } - _ => continue, - }, - embassy_futures::select::Either::Second(_) => continue, - }; - - let entity = 'entity_search_block: { - for entity in self.entities { - let mut data = entity.borrow_mut(); - let data = match data.as_mut() { - Some(data) => data, - None => break, - }; - - let command_topic_display = CommandTopicDisplay { - device_id: self.config.device_id, - entity_id: data.config.id, - }; - self.command_topic_buffer.clear(); - write!(self.command_topic_buffer, "{command_topic_display}") - .expect("command topic buffer too small"); - - if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { - break 'entity_search_block entity; - } + let receive = client.receive(); + let waker = wait_on_atomic_waker(device.waker); + let publish = match embassy_futures::select::select(receive, waker).await { + embassy_futures::select::Either::First(packet) => match packet { + Ok(embedded_mqtt::Packet::Publish(publish)) => publish, + Err(err) => { + crate::log::error!( + "mqtt receive failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt receive failed")); } - continue 'outer_loop; - }; + _ => continue, + }, + embassy_futures::select::Either::Second(_) => continue, + }; - let mut read_buffer = [0u8; 128]; - if publish.data_len > read_buffer.len() { - crate::log::warn!( - "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", - publish.topic, - publish.data_len - ); - continue; + let entity = 'entity_search_block: { + for entity in device.entities { + let mut data = entity.borrow_mut(); + let data = match data.as_mut() { + Some(data) => data, + None => break, + }; + + let command_topic_display = CommandTopicDisplay { + device_id: device.config.device_id, + entity_id: data.config.id, + }; + device.command_topic_buffer.clear(); + write!(device.command_topic_buffer, "{command_topic_display}") + .expect("command topic buffer too small"); + + if device.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { + break 'entity_search_block entity; + } } + continue 'outer_loop; + }; - crate::log::debug!( - "mqtt receiving {} bytes of data on topic {}", - publish.data_len, - publish.topic + let mut read_buffer = [0u8; 128]; + if publish.data_len > read_buffer.len() { + crate::log::warn!( + "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", + publish.topic, + publish.data_len ); + continue; + } + + crate::log::debug!( + "mqtt receiving {} bytes of data on topic {}", + publish.data_len, + publish.topic + ); - let data_len = publish.data_len; - if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { - crate::log::error!( - "mqtt receive data failed with: {:?}", - crate::log::Debug2Format(&err) - ); - return Err(Error::new("mqtt receive data failed")); + let data_len = publish.data_len; + if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { + crate::log::error!( + "mqtt receive data failed with: {:?}", + crate::log::Debug2Format(&err) + ); + return Err(Error::new("mqtt receive data failed")); + } + + let command = match str::from_utf8(&read_buffer[..data_len]) { + Ok(command) => command, + Err(_) => { + crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); + continue; } + }; - let command = match str::from_utf8(&read_buffer[..data_len]) { - Ok(command) => command, - Err(_) => { - crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); + let mut entity = entity.borrow_mut(); + let data = entity.as_mut().unwrap(); + + match &mut data.storage { + EntityStorage::Button(button_storage) => { + if command != constants::HA_BUTTON_PAYLOAD_PRESS { + crate::log::warn!( + "button '{}' received unexpected command '{}', expected '{}', ignoring it", + data.config.id, + command, + constants::HA_BUTTON_PAYLOAD_PRESS + ); continue; } - }; - - let mut entity = entity.borrow_mut(); - let data = entity.as_mut().unwrap(); - - match &mut data.storage { - EntityStorage::Button(button_storage) => { - if command != constants::HA_BUTTON_PAYLOAD_PRESS { + button_storage.consumed = false; + button_storage.timestamp = Some(embassy_time::Instant::now()); + } + EntityStorage::Switch(switch_storage) => { + let command = match command.parse::() { + Ok(command) => command, + Err(_) => { crate::log::warn!( - "button '{}' received unexpected command '{}', expected '{}', ignoring it", + "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it", data.config.id, - command, - constants::HA_BUTTON_PAYLOAD_PRESS + command ); continue; } - button_storage.consumed = false; - button_storage.timestamp = Some(embassy_time::Instant::now()); - } - EntityStorage::Switch(switch_storage) => { - let command = match command.parse::() { - Ok(command) => command, - Err(_) => { - crate::log::warn!( - "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it", - data.config.id, - command - ); - continue; - } - }; - let timestamp = embassy_time::Instant::now(); - if switch_storage.publish_on_command { - data.publish = true; - switch_storage.state = Some(SwitchState { - value: command, - timestamp, - }); - } - switch_storage.command = Some(SwitchCommand { + }; + let timestamp = embassy_time::Instant::now(); + if switch_storage.publish_on_command { + data.publish = true; + switch_storage.state = Some(SwitchState { value: command, timestamp, }); } - EntityStorage::Number(number_storage) => { - let command = match command.parse::() { - Ok(command) => command, - Err(_) => { - crate::log::warn!( - "number '{}' received invalid command '{}', expected a valid number, ignoring it", - data.config.id, - command - ); - continue; - } - }; - let timestamp = embassy_time::Instant::now(); - if number_storage.publish_on_command { - data.publish = true; - number_storage.state = Some(NumberState { - value: command, - timestamp, - }); + switch_storage.command = Some(SwitchCommand { + value: command, + timestamp, + }); + } + EntityStorage::Number(number_storage) => { + let command = match command.parse::() { + Ok(command) => command, + Err(_) => { + crate::log::warn!( + "number '{}' received invalid command '{}', expected a valid number, ignoring it", + data.config.id, + command + ); + continue; } - number_storage.command = Some(NumberCommand { + }; + let timestamp = embassy_time::Instant::now(); + if number_storage.publish_on_command { + data.publish = true; + number_storage.state = Some(NumberState { value: command, timestamp, }); } - _ => continue 'outer_loop, + number_storage.command = Some(NumberCommand { + value: command, + timestamp, + }); } + _ => continue 'outer_loop, + } - data.command = true; - if let Some(waker) = data.command_waker.take() { - waker.wake(); - } + data.command = true; + if let Some(waker) = data.command_waker.take() { + waker.wake(); } } } @@ -999,7 +1001,7 @@ pub async fn connect_and_run( socket.set_timeout(None); - if let Err(err) = device.run(&mut socket).await { + if let Err(err) = run(&mut device, &mut socket).await { crate::log::error!( "Device run failed with: {:?}", crate::log::Debug2Format(&err) -- cgit