aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-05 16:30:21 +0000
committerdiogo464 <[email protected]>2025-12-05 16:30:21 +0000
commit1fc7ac328e4828a208be56fd574e81156d81117f (patch)
tree83603cab8a454ef7bfa11be93ea31317f7a57a4a /src/lib.rs
parent51027d838eb4fbf298c546b163d650a5b0eb2599 (diff)
improved error handling and log messages
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs252
1 files changed, 167 insertions, 85 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 325df1d..b16169a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,7 +4,6 @@ use core::{cell::RefCell, task::Waker};
4 4
5use defmt::Format; 5use defmt::Format;
6use embassy_sync::waitqueue::AtomicWaker; 6use embassy_sync::waitqueue::AtomicWaker;
7use embassy_time::Timer;
8use heapless::{ 7use heapless::{
9 Vec, VecView, 8 Vec, VecView,
10 string::{String, StringView}, 9 string::{String, StringView},
@@ -43,6 +42,23 @@ pub use transport::Transport;
43mod unit; 42mod unit;
44pub use unit::*; 43pub use unit::*;
45 44
45#[derive(Debug)]
46pub struct Error(&'static str);
47
48impl core::fmt::Display for Error {
49 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
50 f.write_str(self.0)
51 }
52}
53
54impl core::error::Error for Error {}
55
56impl Error {
57 pub(crate) fn new(message: &'static str) -> Self {
58 Self(message)
59 }
60}
61
46#[derive(Debug, Format, Clone, Copy, Serialize)] 62#[derive(Debug, Format, Clone, Copy, Serialize)]
47struct DeviceDiscovery<'a> { 63struct DeviceDiscovery<'a> {
48 identifiers: &'a [&'a str], 64 identifiers: &'a [&'a str],
@@ -464,16 +480,12 @@ impl<'a> Device<'a> {
464 BinarySensor::new(entity) 480 BinarySensor::new(entity)
465 } 481 }
466 482
467 pub async fn run<T: Transport>(&mut self, transport: &mut T) -> ! { 483 pub async fn run<T: Transport>(&mut self, transport: &mut T) -> Result<(), Error> {
468 loop {
469 self.run_iteration(transport).await;
470 Timer::after_millis(5000).await;
471 }
472 }
473
474 async fn run_iteration<T: Transport>(&mut self, transport: &mut T) {
475 let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); 484 let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport);
476 client.connect(self.config.device_id).await.unwrap(); 485 if let Err(err) = client.connect(self.config.device_id).await {
486 defmt::error!("mqtt connect failed with: {:?}", defmt::Debug2Format(&err));
487 return Err(Error::new("mqtt connection failed"));
488 }
477 489
478 defmt::info!("sending discover messages"); 490 defmt::info!("sending discover messages");
479 let device_discovery = DeviceDiscovery { 491 let device_discovery = DeviceDiscovery {
@@ -504,36 +516,26 @@ impl<'a> Device<'a> {
504 }; 516 };
505 let entity_config = &entity.config; 517 let entity_config = &entity.config;
506 518
507 write!( 519 let discovery_topic_display = DiscoveryTopicDisplay {
508 self.discovery_topic_buffer, 520 domain: entity_config.domain,
509 "{}", 521 device_id: self.config.device_id,
510 DiscoveryTopicDisplay { 522 entity_id: entity_config.id,
511 domain: entity_config.domain, 523 };
512 device_id: self.config.device_id, 524 let state_topic_display = StateTopicDisplay {
513 entity_id: entity_config.id, 525 device_id: self.config.device_id,
514 } 526 entity_id: entity_config.id,
515 ) 527 };
516 .unwrap(); 528 let command_topic_display = CommandTopicDisplay {
517 529 device_id: self.config.device_id,
518 write!( 530 entity_id: entity_config.id,
519 self.state_topic_buffer, 531 };
520 "{}",
521 StateTopicDisplay {
522 device_id: self.config.device_id,
523 entity_id: entity_config.id
524 }
525 )
526 .unwrap();
527 532
528 write!( 533 write!(self.discovery_topic_buffer, "{discovery_topic_display}")
529 self.command_topic_buffer, 534 .expect("discovery topic buffer too small");
530 "{}", 535 write!(self.state_topic_buffer, "{state_topic_display}")
531 CommandTopicDisplay { 536 .expect("state topic buffer too small");
532 device_id: self.config.device_id, 537 write!(self.command_topic_buffer, "{command_topic_display}")
533 entity_id: entity_config.id 538 .expect("command topic buffer too small");
534 }
535 )
536 .unwrap();
537 539
538 let discovery = EntityDiscovery { 540 let discovery = EntityDiscovery {
539 id: entity_config.id, 541 id: entity_config.id,
@@ -552,24 +554,41 @@ impl<'a> Device<'a> {
552 mode: entity_config.mode, 554 mode: entity_config.mode,
553 device: &device_discovery, 555 device: &device_discovery,
554 }; 556 };
555 defmt::info!("discovery: {}", discovery); 557 defmt::info!("discovery for entity '{}': {}", entity_config.id, discovery);
556 558
557 self.discovery_buffer 559 self.discovery_buffer
558 .resize(self.discovery_buffer.capacity(), 0) 560 .resize(self.discovery_buffer.capacity(), 0)
559 .unwrap(); 561 .unwrap();
560 let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer).unwrap(); 562 let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer)
563 .expect("discovery buffer too small");
561 self.discovery_buffer.truncate(n); 564 self.discovery_buffer.truncate(n);
562 } 565 }
563 566
564 defmt::info!( 567 let discovery_topic = self.discovery_topic_buffer.as_str();
565 "sending discovery to {}", 568 defmt::info!("sending discovery to topic '{}'", discovery_topic);
566 self.discovery_topic_buffer.as_str() 569 if let Err(err) = client
567 ); 570 .publish(discovery_topic, &self.discovery_buffer)
568 client
569 .publish(&self.discovery_topic_buffer, &self.discovery_buffer)
570 .await 571 .await
571 .unwrap(); 572 {
572 client.subscribe(&self.command_topic_buffer).await.unwrap(); 573 defmt::error!(
574 "mqtt discovery publish failed with: {:?}",
575 defmt::Debug2Format(&err)
576 );
577 return Err(Error::new("mqtt discovery publish failed"));
578 }
579
580 let command_topic = self.command_topic_buffer.as_str();
581 defmt::info!("subscribing to command topic '{}'", command_topic);
582 if let Err(err) = client.subscribe(command_topic).await {
583 defmt::error!(
584 "mqtt subscribe to '{}' failed with: {:?}",
585 command_topic,
586 defmt::Debug2Format(&err)
587 );
588 return Err(Error::new(
589 "mqtt subscription to entity command topic failed",
590 ));
591 }
573 } 592 }
574 593
575 'outer_loop: loop { 594 'outer_loop: loop {
@@ -597,47 +616,61 @@ impl<'a> Device<'a> {
597 }) => self 616 }) => self
598 .publish_buffer 617 .publish_buffer
599 .extend_from_slice(value.as_str().as_bytes()) 618 .extend_from_slice(value.as_str().as_bytes())
600 .unwrap(), 619 .expect("publish buffer too small for switch state payload"),
601 EntityStorage::BinarySensor(BinarySensorStorage { 620 EntityStorage::BinarySensor(BinarySensorStorage {
602 state: Some(BinarySensorState { value, .. }), 621 state: Some(BinarySensorState { value, .. }),
603 }) => self 622 }) => self
604 .publish_buffer 623 .publish_buffer
605 .extend_from_slice(value.as_str().as_bytes()) 624 .extend_from_slice(value.as_str().as_bytes())
606 .unwrap(), 625 .expect("publish buffer too small for binary sensor state payload"),
607 EntityStorage::NumericSensor(NumericSensorStorage { 626 EntityStorage::NumericSensor(NumericSensorStorage {
608 state: Some(NumericSensorState { value, .. }), 627 state: Some(NumericSensorState { value, .. }),
609 .. 628 ..
610 }) => write!(self.publish_buffer, "{}", value).unwrap(), 629 }) => write!(self.publish_buffer, "{}", value)
630 .expect("publish buffer too small for numeric sensor payload"),
611 EntityStorage::Number(NumberStorage { 631 EntityStorage::Number(NumberStorage {
612 state: Some(NumberState { value, .. }), 632 state: Some(NumberState { value, .. }),
613 .. 633 ..
614 }) => write!(self.publish_buffer, "{}", value).unwrap(), 634 }) => write!(self.publish_buffer, "{}", value)
615 _ => continue, // TODO: print warning 635 .expect("publish buffer too small for number state payload"),
636 _ => {
637 defmt::warn!(
638 "entity '{}' requested state publish but its storage does not support it",
639 entity.config.id
640 );
641 continue;
642 }
616 } 643 }
617 644
645 let state_topic_display = StateTopicDisplay {
646 device_id: self.config.device_id,
647 entity_id: entity.config.id,
648 };
618 self.state_topic_buffer.clear(); 649 self.state_topic_buffer.clear();
619 write!( 650 write!(self.state_topic_buffer, "{state_topic_display}")
620 self.state_topic_buffer, 651 .expect("state topic buffer too small");
621 "{}",
622 StateTopicDisplay {
623 device_id: self.config.device_id,
624 entity_id: entity.config.id
625 }
626 )
627 .unwrap();
628 } 652 }
629 653
630 client 654 let state_topic = self.state_topic_buffer.as_str();
631 .publish(&self.state_topic_buffer, self.publish_buffer) 655 if let Err(err) = client.publish(state_topic, self.publish_buffer).await {
632 .await 656 defmt::error!(
633 .unwrap(); 657 "mqtt state publish on topic '{}' failed with: {:?}",
658 state_topic,
659 defmt::Debug2Format(&err)
660 );
661 return Err(Error::new("mqtt publish failed"));
662 }
634 } 663 }
635 664
636 let receive = client.receive(); 665 let receive = client.receive();
637 let waker = wait_on_atomic_waker(self.waker); 666 let waker = wait_on_atomic_waker(self.waker);
638 let publish = match embassy_futures::select::select(receive, waker).await { 667 let publish = match embassy_futures::select::select(receive, waker).await {
639 embassy_futures::select::Either::First(packet) => match packet.unwrap() { 668 embassy_futures::select::Either::First(packet) => match packet {
640 embedded_mqtt::Packet::Publish(publish) => publish, 669 Ok(embedded_mqtt::Packet::Publish(publish)) => publish,
670 Err(err) => {
671 defmt::error!("mqtt receive failed with: {:?}", defmt::Debug2Format(&err));
672 return Err(Error::new("mqtt receive failed"));
673 }
641 _ => continue, 674 _ => continue,
642 }, 675 },
643 embassy_futures::select::Either::Second(_) => continue, 676 embassy_futures::select::Either::Second(_) => continue,
@@ -651,16 +684,13 @@ impl<'a> Device<'a> {
651 None => break, 684 None => break,
652 }; 685 };
653 686
687 let command_topic_display = CommandTopicDisplay {
688 device_id: self.config.device_id,
689 entity_id: data.config.id,
690 };
654 self.command_topic_buffer.clear(); 691 self.command_topic_buffer.clear();
655 write!( 692 write!(self.command_topic_buffer, "{command_topic_display}")
656 self.command_topic_buffer, 693 .expect("command topic buffer too small");
657 "{}",
658 CommandTopicDisplay {
659 device_id: self.config.device_id,
660 entity_id: data.config.id
661 }
662 )
663 .unwrap();
664 694
665 if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() { 695 if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() {
666 break 'entity_search_block entity; 696 break 'entity_search_block entity;
@@ -671,31 +701,83 @@ impl<'a> Device<'a> {
671 701
672 let mut read_buffer = [0u8; 128]; 702 let mut read_buffer = [0u8; 128];
673 if publish.data_len > read_buffer.len() { 703 if publish.data_len > read_buffer.len() {
674 defmt::warn!("mqtt publish payload too large, ignoring message"); 704 defmt::warn!(
705 "mqtt publish payload on topic {} is too large ({} bytes), ignoring it",
706 publish.topic,
707 publish.data_len
708 );
675 continue; 709 continue;
676 } 710 }
677 let b = &mut read_buffer[..publish.data_len]; 711
678 client.receive_data(b).await.unwrap(); 712 defmt::info!(
679 let command = str::from_utf8(b).unwrap(); 713 "mqtt receiving {} bytes of data on topic {}",
714 publish.data_len,
715 publish.topic
716 );
717
718 let data_len = publish.data_len;
719 if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await {
720 defmt::error!(
721 "mqtt receive data failed with: {:?}",
722 defmt::Debug2Format(&err)
723 );
724 return Err(Error::new("mqtt receive data failed"));
725 }
726
727 let command = match str::from_utf8(&read_buffer[..data_len]) {
728 Ok(command) => command,
729 Err(_) => {
730 defmt::warn!("mqtt message contained invalid utf-8, ignoring it");
731 continue;
732 }
733 };
680 734
681 let mut entity = entity.borrow_mut(); 735 let mut entity = entity.borrow_mut();
682 let data = entity.as_mut().unwrap(); 736 let data = entity.as_mut().unwrap();
683 737
684 match &mut data.storage { 738 match &mut data.storage {
685 EntityStorage::Button(button_storage) => { 739 EntityStorage::Button(button_storage) => {
686 assert_eq!(command, constants::HA_BUTTON_PAYLOAD_PRESS); 740 if command != constants::HA_BUTTON_PAYLOAD_PRESS {
741 defmt::warn!(
742 "button '{}' received unexpected command '{}', expected '{}', ignoring it",
743 data.config.id,
744 command,
745 constants::HA_BUTTON_PAYLOAD_PRESS
746 );
747 continue;
748 }
687 button_storage.consumed = false; 749 button_storage.consumed = false;
688 button_storage.timestamp = Some(embassy_time::Instant::now()); 750 button_storage.timestamp = Some(embassy_time::Instant::now());
689 } 751 }
690 EntityStorage::Switch(switch_storage) => { 752 EntityStorage::Switch(switch_storage) => {
691 let command = command.parse::<BinaryState>().unwrap(); 753 let command = match command.parse::<BinaryState>() {
754 Ok(command) => command,
755 Err(_) => {
756 defmt::warn!(
757 "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it",
758 data.config.id,
759 command
760 );
761 continue;
762 }
763 };
692 switch_storage.command = Some(SwitchCommand { 764 switch_storage.command = Some(SwitchCommand {
693 value: command, 765 value: command,
694 timestamp: embassy_time::Instant::now(), 766 timestamp: embassy_time::Instant::now(),
695 }); 767 });
696 } 768 }
697 EntityStorage::Number(number_storage) => { 769 EntityStorage::Number(number_storage) => {
698 let command = command.parse::<f32>().unwrap(); 770 let command = match command.parse::<f32>() {
771 Ok(command) => command,
772 Err(_) => {
773 defmt::warn!(
774 "number '{}' received invalid command '{}', expected a valid number, ignoring it",
775 data.config.id,
776 command
777 );
778 continue;
779 }
780 };
699 number_storage.command = Some(NumberCommand { 781 number_storage.command = Some(NumberCommand {
700 value: command, 782 value: command,
701 timestamp: embassy_time::Instant::now(), 783 timestamp: embassy_time::Instant::now(),