diff options
| author | diogo464 <[email protected]> | 2025-12-05 16:30:21 +0000 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-12-05 16:30:21 +0000 |
| commit | 1fc7ac328e4828a208be56fd574e81156d81117f (patch) | |
| tree | 83603cab8a454ef7bfa11be93ea31317f7a57a4a /src/lib.rs | |
| parent | 51027d838eb4fbf298c546b163d650a5b0eb2599 (diff) | |
improved error handling and log messages
Diffstat (limited to 'src/lib.rs')
| -rw-r--r-- | src/lib.rs | 252 |
1 files changed, 167 insertions, 85 deletions
| @@ -4,7 +4,6 @@ use core::{cell::RefCell, task::Waker}; | |||
| 4 | 4 | ||
| 5 | use defmt::Format; | 5 | use defmt::Format; |
| 6 | use embassy_sync::waitqueue::AtomicWaker; | 6 | use embassy_sync::waitqueue::AtomicWaker; |
| 7 | use embassy_time::Timer; | ||
| 8 | use heapless::{ | 7 | use heapless::{ |
| 9 | Vec, VecView, | 8 | Vec, VecView, |
| 10 | string::{String, StringView}, | 9 | string::{String, StringView}, |
| @@ -43,6 +42,23 @@ pub use transport::Transport; | |||
| 43 | mod unit; | 42 | mod unit; |
| 44 | pub use unit::*; | 43 | pub use unit::*; |
| 45 | 44 | ||
| 45 | #[derive(Debug)] | ||
| 46 | pub struct Error(&'static str); | ||
| 47 | |||
| 48 | impl core::fmt::Display for Error { | ||
| 49 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | ||
| 50 | f.write_str(self.0) | ||
| 51 | } | ||
| 52 | } | ||
| 53 | |||
| 54 | impl core::error::Error for Error {} | ||
| 55 | |||
| 56 | impl Error { | ||
| 57 | pub(crate) fn new(message: &'static str) -> Self { | ||
| 58 | Self(message) | ||
| 59 | } | ||
| 60 | } | ||
| 61 | |||
| 46 | #[derive(Debug, Format, Clone, Copy, Serialize)] | 62 | #[derive(Debug, Format, Clone, Copy, Serialize)] |
| 47 | struct DeviceDiscovery<'a> { | 63 | struct DeviceDiscovery<'a> { |
| 48 | identifiers: &'a [&'a str], | 64 | identifiers: &'a [&'a str], |
| @@ -464,16 +480,12 @@ impl<'a> Device<'a> { | |||
| 464 | BinarySensor::new(entity) | 480 | BinarySensor::new(entity) |
| 465 | } | 481 | } |
| 466 | 482 | ||
| 467 | pub async fn run<T: Transport>(&mut self, transport: &mut T) -> ! { | 483 | pub async fn run<T: Transport>(&mut self, transport: &mut T) -> Result<(), Error> { |
| 468 | loop { | ||
| 469 | self.run_iteration(transport).await; | ||
| 470 | Timer::after_millis(5000).await; | ||
| 471 | } | ||
| 472 | } | ||
| 473 | |||
| 474 | async fn run_iteration<T: Transport>(&mut self, transport: &mut T) { | ||
| 475 | let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); | 484 | let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); |
| 476 | client.connect(self.config.device_id).await.unwrap(); | 485 | if let Err(err) = client.connect(self.config.device_id).await { |
| 486 | defmt::error!("mqtt connect failed with: {:?}", defmt::Debug2Format(&err)); | ||
| 487 | return Err(Error::new("mqtt connection failed")); | ||
| 488 | } | ||
| 477 | 489 | ||
| 478 | defmt::info!("sending discover messages"); | 490 | defmt::info!("sending discover messages"); |
| 479 | let device_discovery = DeviceDiscovery { | 491 | let device_discovery = DeviceDiscovery { |
| @@ -504,36 +516,26 @@ impl<'a> Device<'a> { | |||
| 504 | }; | 516 | }; |
| 505 | let entity_config = &entity.config; | 517 | let entity_config = &entity.config; |
| 506 | 518 | ||
| 507 | write!( | 519 | let discovery_topic_display = DiscoveryTopicDisplay { |
| 508 | self.discovery_topic_buffer, | 520 | domain: entity_config.domain, |
| 509 | "{}", | 521 | device_id: self.config.device_id, |
| 510 | DiscoveryTopicDisplay { | 522 | entity_id: entity_config.id, |
| 511 | domain: entity_config.domain, | 523 | }; |
| 512 | device_id: self.config.device_id, | 524 | let state_topic_display = StateTopicDisplay { |
| 513 | entity_id: entity_config.id, | 525 | device_id: self.config.device_id, |
| 514 | } | 526 | entity_id: entity_config.id, |
| 515 | ) | 527 | }; |
| 516 | .unwrap(); | 528 | let command_topic_display = CommandTopicDisplay { |
| 517 | 529 | device_id: self.config.device_id, | |
| 518 | write!( | 530 | entity_id: entity_config.id, |
| 519 | self.state_topic_buffer, | 531 | }; |
| 520 | "{}", | ||
| 521 | StateTopicDisplay { | ||
| 522 | device_id: self.config.device_id, | ||
| 523 | entity_id: entity_config.id | ||
| 524 | } | ||
| 525 | ) | ||
| 526 | .unwrap(); | ||
| 527 | 532 | ||
| 528 | write!( | 533 | write!(self.discovery_topic_buffer, "{discovery_topic_display}") |
| 529 | self.command_topic_buffer, | 534 | .expect("discovery topic buffer too small"); |
| 530 | "{}", | 535 | write!(self.state_topic_buffer, "{state_topic_display}") |
| 531 | CommandTopicDisplay { | 536 | .expect("state topic buffer too small"); |
| 532 | device_id: self.config.device_id, | 537 | write!(self.command_topic_buffer, "{command_topic_display}") |
| 533 | entity_id: entity_config.id | 538 | .expect("command topic buffer too small"); |
| 534 | } | ||
| 535 | ) | ||
| 536 | .unwrap(); | ||
| 537 | 539 | ||
| 538 | let discovery = EntityDiscovery { | 540 | let discovery = EntityDiscovery { |
| 539 | id: entity_config.id, | 541 | id: entity_config.id, |
| @@ -552,24 +554,41 @@ impl<'a> Device<'a> { | |||
| 552 | mode: entity_config.mode, | 554 | mode: entity_config.mode, |
| 553 | device: &device_discovery, | 555 | device: &device_discovery, |
| 554 | }; | 556 | }; |
| 555 | defmt::info!("discovery: {}", discovery); | 557 | defmt::info!("discovery for entity '{}': {}", entity_config.id, discovery); |
| 556 | 558 | ||
| 557 | self.discovery_buffer | 559 | self.discovery_buffer |
| 558 | .resize(self.discovery_buffer.capacity(), 0) | 560 | .resize(self.discovery_buffer.capacity(), 0) |
| 559 | .unwrap(); | 561 | .unwrap(); |
| 560 | let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer).unwrap(); | 562 | let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer) |
| 563 | .expect("discovery buffer too small"); | ||
| 561 | self.discovery_buffer.truncate(n); | 564 | self.discovery_buffer.truncate(n); |
| 562 | } | 565 | } |
| 563 | 566 | ||
| 564 | defmt::info!( | 567 | let discovery_topic = self.discovery_topic_buffer.as_str(); |
| 565 | "sending discovery to {}", | 568 | defmt::info!("sending discovery to topic '{}'", discovery_topic); |
| 566 | self.discovery_topic_buffer.as_str() | 569 | if let Err(err) = client |
| 567 | ); | 570 | .publish(discovery_topic, &self.discovery_buffer) |
| 568 | client | ||
| 569 | .publish(&self.discovery_topic_buffer, &self.discovery_buffer) | ||
| 570 | .await | 571 | .await |
| 571 | .unwrap(); | 572 | { |
| 572 | client.subscribe(&self.command_topic_buffer).await.unwrap(); | 573 | defmt::error!( |
| 574 | "mqtt discovery publish failed with: {:?}", | ||
| 575 | defmt::Debug2Format(&err) | ||
| 576 | ); | ||
| 577 | return Err(Error::new("mqtt discovery publish failed")); | ||
| 578 | } | ||
| 579 | |||
| 580 | let command_topic = self.command_topic_buffer.as_str(); | ||
| 581 | defmt::info!("subscribing to command topic '{}'", command_topic); | ||
| 582 | if let Err(err) = client.subscribe(command_topic).await { | ||
| 583 | defmt::error!( | ||
| 584 | "mqtt subscribe to '{}' failed with: {:?}", | ||
| 585 | command_topic, | ||
| 586 | defmt::Debug2Format(&err) | ||
| 587 | ); | ||
| 588 | return Err(Error::new( | ||
| 589 | "mqtt subscription to entity command topic failed", | ||
| 590 | )); | ||
| 591 | } | ||
| 573 | } | 592 | } |
| 574 | 593 | ||
| 575 | 'outer_loop: loop { | 594 | 'outer_loop: loop { |
| @@ -597,47 +616,61 @@ impl<'a> Device<'a> { | |||
| 597 | }) => self | 616 | }) => self |
| 598 | .publish_buffer | 617 | .publish_buffer |
| 599 | .extend_from_slice(value.as_str().as_bytes()) | 618 | .extend_from_slice(value.as_str().as_bytes()) |
| 600 | .unwrap(), | 619 | .expect("publish buffer too small for switch state payload"), |
| 601 | EntityStorage::BinarySensor(BinarySensorStorage { | 620 | EntityStorage::BinarySensor(BinarySensorStorage { |
| 602 | state: Some(BinarySensorState { value, .. }), | 621 | state: Some(BinarySensorState { value, .. }), |
| 603 | }) => self | 622 | }) => self |
| 604 | .publish_buffer | 623 | .publish_buffer |
| 605 | .extend_from_slice(value.as_str().as_bytes()) | 624 | .extend_from_slice(value.as_str().as_bytes()) |
| 606 | .unwrap(), | 625 | .expect("publish buffer too small for binary sensor state payload"), |
| 607 | EntityStorage::NumericSensor(NumericSensorStorage { | 626 | EntityStorage::NumericSensor(NumericSensorStorage { |
| 608 | state: Some(NumericSensorState { value, .. }), | 627 | state: Some(NumericSensorState { value, .. }), |
| 609 | .. | 628 | .. |
| 610 | }) => write!(self.publish_buffer, "{}", value).unwrap(), | 629 | }) => write!(self.publish_buffer, "{}", value) |
| 630 | .expect("publish buffer too small for numeric sensor payload"), | ||
| 611 | EntityStorage::Number(NumberStorage { | 631 | EntityStorage::Number(NumberStorage { |
| 612 | state: Some(NumberState { value, .. }), | 632 | state: Some(NumberState { value, .. }), |
| 613 | .. | 633 | .. |
| 614 | }) => write!(self.publish_buffer, "{}", value).unwrap(), | 634 | }) => write!(self.publish_buffer, "{}", value) |
| 615 | _ => continue, // TODO: print warning | 635 | .expect("publish buffer too small for number state payload"), |
| 636 | _ => { | ||
| 637 | defmt::warn!( | ||
| 638 | "entity '{}' requested state publish but its storage does not support it", | ||
| 639 | entity.config.id | ||
| 640 | ); | ||
| 641 | continue; | ||
| 642 | } | ||
| 616 | } | 643 | } |
| 617 | 644 | ||
| 645 | let state_topic_display = StateTopicDisplay { | ||
| 646 | device_id: self.config.device_id, | ||
| 647 | entity_id: entity.config.id, | ||
| 648 | }; | ||
| 618 | self.state_topic_buffer.clear(); | 649 | self.state_topic_buffer.clear(); |
| 619 | write!( | 650 | write!(self.state_topic_buffer, "{state_topic_display}") |
| 620 | self.state_topic_buffer, | 651 | .expect("state topic buffer too small"); |
| 621 | "{}", | ||
| 622 | StateTopicDisplay { | ||
| 623 | device_id: self.config.device_id, | ||
| 624 | entity_id: entity.config.id | ||
| 625 | } | ||
| 626 | ) | ||
| 627 | .unwrap(); | ||
| 628 | } | 652 | } |
| 629 | 653 | ||
| 630 | client | 654 | let state_topic = self.state_topic_buffer.as_str(); |
| 631 | .publish(&self.state_topic_buffer, self.publish_buffer) | 655 | if let Err(err) = client.publish(state_topic, self.publish_buffer).await { |
| 632 | .await | 656 | defmt::error!( |
| 633 | .unwrap(); | 657 | "mqtt state publish on topic '{}' failed with: {:?}", |
| 658 | state_topic, | ||
| 659 | defmt::Debug2Format(&err) | ||
| 660 | ); | ||
| 661 | return Err(Error::new("mqtt publish failed")); | ||
| 662 | } | ||
| 634 | } | 663 | } |
| 635 | 664 | ||
| 636 | let receive = client.receive(); | 665 | let receive = client.receive(); |
| 637 | let waker = wait_on_atomic_waker(self.waker); | 666 | let waker = wait_on_atomic_waker(self.waker); |
| 638 | let publish = match embassy_futures::select::select(receive, waker).await { | 667 | let publish = match embassy_futures::select::select(receive, waker).await { |
| 639 | embassy_futures::select::Either::First(packet) => match packet.unwrap() { | 668 | embassy_futures::select::Either::First(packet) => match packet { |
| 640 | embedded_mqtt::Packet::Publish(publish) => publish, | 669 | Ok(embedded_mqtt::Packet::Publish(publish)) => publish, |
| 670 | Err(err) => { | ||
| 671 | defmt::error!("mqtt receive failed with: {:?}", defmt::Debug2Format(&err)); | ||
| 672 | return Err(Error::new("mqtt receive failed")); | ||
| 673 | } | ||
| 641 | _ => continue, | 674 | _ => continue, |
| 642 | }, | 675 | }, |
| 643 | embassy_futures::select::Either::Second(_) => continue, | 676 | embassy_futures::select::Either::Second(_) => continue, |
| @@ -651,16 +684,13 @@ impl<'a> Device<'a> { | |||
| 651 | None => break, | 684 | None => break, |
| 652 | }; | 685 | }; |
| 653 | 686 | ||
| 687 | let command_topic_display = CommandTopicDisplay { | ||
| 688 | device_id: self.config.device_id, | ||
| 689 | entity_id: data.config.id, | ||
| 690 | }; | ||
| 654 | self.command_topic_buffer.clear(); | 691 | self.command_topic_buffer.clear(); |
| 655 | write!( | 692 | write!(self.command_topic_buffer, "{command_topic_display}") |
| 656 | self.command_topic_buffer, | 693 | .expect("command topic buffer too small"); |
| 657 | "{}", | ||
| 658 | CommandTopicDisplay { | ||
| 659 | device_id: self.config.device_id, | ||
| 660 | entity_id: data.config.id | ||
| 661 | } | ||
| 662 | ) | ||
| 663 | .unwrap(); | ||
| 664 | 694 | ||
| 665 | if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { | 695 | if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { |
| 666 | break 'entity_search_block entity; | 696 | break 'entity_search_block entity; |
| @@ -671,31 +701,83 @@ impl<'a> Device<'a> { | |||
| 671 | 701 | ||
| 672 | let mut read_buffer = [0u8; 128]; | 702 | let mut read_buffer = [0u8; 128]; |
| 673 | if publish.data_len > read_buffer.len() { | 703 | if publish.data_len > read_buffer.len() { |
| 674 | defmt::warn!("mqtt publish payload too large, ignoring message"); | 704 | defmt::warn!( |
| 705 | "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", | ||
| 706 | publish.topic, | ||
| 707 | publish.data_len | ||
| 708 | ); | ||
| 675 | continue; | 709 | continue; |
| 676 | } | 710 | } |
| 677 | let b = &mut read_buffer[..publish.data_len]; | 711 | |
| 678 | client.receive_data(b).await.unwrap(); | 712 | defmt::info!( |
| 679 | let command = str::from_utf8(b).unwrap(); | 713 | "mqtt receiving {} bytes of data on topic {}", |
| 714 | publish.data_len, | ||
| 715 | publish.topic | ||
| 716 | ); | ||
| 717 | |||
| 718 | let data_len = publish.data_len; | ||
| 719 | if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { | ||
| 720 | defmt::error!( | ||
| 721 | "mqtt receive data failed with: {:?}", | ||
| 722 | defmt::Debug2Format(&err) | ||
| 723 | ); | ||
| 724 | return Err(Error::new("mqtt receive data failed")); | ||
| 725 | } | ||
| 726 | |||
| 727 | let command = match str::from_utf8(&read_buffer[..data_len]) { | ||
| 728 | Ok(command) => command, | ||
| 729 | Err(_) => { | ||
| 730 | defmt::warn!("mqtt message contained invalid utf-8, ignoring it"); | ||
| 731 | continue; | ||
| 732 | } | ||
| 733 | }; | ||
| 680 | 734 | ||
| 681 | let mut entity = entity.borrow_mut(); | 735 | let mut entity = entity.borrow_mut(); |
| 682 | let data = entity.as_mut().unwrap(); | 736 | let data = entity.as_mut().unwrap(); |
| 683 | 737 | ||
| 684 | match &mut data.storage { | 738 | match &mut data.storage { |
| 685 | EntityStorage::Button(button_storage) => { | 739 | EntityStorage::Button(button_storage) => { |
| 686 | assert_eq!(command, constants::HA_BUTTON_PAYLOAD_PRESS); | 740 | if command != constants::HA_BUTTON_PAYLOAD_PRESS { |
| 741 | defmt::warn!( | ||
| 742 | "button '{}' received unexpected command '{}', expected '{}', ignoring it", | ||
| 743 | data.config.id, | ||
| 744 | command, | ||
| 745 | constants::HA_BUTTON_PAYLOAD_PRESS | ||
| 746 | ); | ||
| 747 | continue; | ||
| 748 | } | ||
| 687 | button_storage.consumed = false; | 749 | button_storage.consumed = false; |
| 688 | button_storage.timestamp = Some(embassy_time::Instant::now()); | 750 | button_storage.timestamp = Some(embassy_time::Instant::now()); |
| 689 | } | 751 | } |
| 690 | EntityStorage::Switch(switch_storage) => { | 752 | EntityStorage::Switch(switch_storage) => { |
| 691 | let command = command.parse::<BinaryState>().unwrap(); | 753 | let command = match command.parse::<BinaryState>() { |
| 754 | Ok(command) => command, | ||
| 755 | Err(_) => { | ||
| 756 | defmt::warn!( | ||
| 757 | "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it", | ||
| 758 | data.config.id, | ||
| 759 | command | ||
| 760 | ); | ||
| 761 | continue; | ||
| 762 | } | ||
| 763 | }; | ||
| 692 | switch_storage.command = Some(SwitchCommand { | 764 | switch_storage.command = Some(SwitchCommand { |
| 693 | value: command, | 765 | value: command, |
| 694 | timestamp: embassy_time::Instant::now(), | 766 | timestamp: embassy_time::Instant::now(), |
| 695 | }); | 767 | }); |
| 696 | } | 768 | } |
| 697 | EntityStorage::Number(number_storage) => { | 769 | EntityStorage::Number(number_storage) => { |
| 698 | let command = command.parse::<f32>().unwrap(); | 770 | let command = match command.parse::<f32>() { |
| 771 | Ok(command) => command, | ||
| 772 | Err(_) => { | ||
| 773 | defmt::warn!( | ||
| 774 | "number '{}' received invalid command '{}', expected a valid number, ignoring it", | ||
| 775 | data.config.id, | ||
| 776 | command | ||
| 777 | ); | ||
| 778 | continue; | ||
| 779 | } | ||
| 780 | }; | ||
| 699 | number_storage.command = Some(NumberCommand { | 781 | number_storage.command = Some(NumberCommand { |
| 700 | value: command, | 782 | value: command, |
| 701 | timestamp: embassy_time::Instant::now(), | 783 | timestamp: embassy_time::Instant::now(), |
