aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-07 15:57:17 +0000
committerdiogo464 <[email protected]>2025-12-07 15:57:17 +0000
commit28d9961141a38ebde8bd6144636c3021eb2755a5 (patch)
treef33f300157b50f1ebfe7af7d8630d8fcc7537a04 /src
parent42c899931074fc1a8adcf30e4bd103163ee84b1a (diff)
code style change
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs810
1 files changed, 406 insertions, 404 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 4e0896c..c1b5191 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -415,257 +415,260 @@ pub struct Device<'a> {
415 command_topic_buffer: &'a mut StringView, 415 command_topic_buffer: &'a mut StringView,
416} 416}
417 417
418impl<'a> Device<'a> { 418pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> {
419 pub fn new(resources: &'a mut DeviceResources, config: DeviceConfig) -> Self { 419 Device {
420 Self { 420 config,
421 config, 421 waker: &resources.waker,
422 waker: &resources.waker, 422 entities: &resources.entities,
423 entities: &resources.entities, 423
424 424 mqtt_resources: &mut resources.mqtt_resources,
425 mqtt_resources: &mut resources.mqtt_resources, 425 publish_buffer: &mut resources.publish_buffer,
426 publish_buffer: &mut resources.publish_buffer, 426 subscribe_buffer: &mut resources.subscribe_buffer,
427 subscribe_buffer: &mut resources.subscribe_buffer, 427 discovery_buffer: &mut resources.discovery_buffer,
428 discovery_buffer: &mut resources.discovery_buffer, 428 availability_topic_buffer: &mut resources.availability_topic_buffer,
429 availability_topic_buffer: &mut resources.availability_topic_buffer, 429 discovery_topic_buffer: &mut resources.discovery_topic_buffer,
430 discovery_topic_buffer: &mut resources.discovery_topic_buffer, 430 state_topic_buffer: &mut resources.state_topic_buffer,
431 state_topic_buffer: &mut resources.state_topic_buffer, 431 command_topic_buffer: &mut resources.command_topic_buffer,
432 command_topic_buffer: &mut resources.command_topic_buffer,
433 }
434 } 432 }
433}
435 434
436 pub fn create_entity(&self, config: EntityConfig, storage: EntityStorage) -> Entity<'a> { 435fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityStorage) -> Entity<'a> {
437 let index = 'outer: { 436 let index = 'outer: {
438 for idx in 0..self.entities.len() { 437 for idx in 0..device.entities.len() {
439 if self.entities[idx].borrow().is_none() { 438 if device.entities[idx].borrow().is_none() {
440 break 'outer idx; 439 break 'outer idx;
441 }
442 } 440 }
443 panic!("device entity limit reached");
444 };
445
446 let data = EntityData {
447 config,
448 storage,
449 publish: false,
450 command: false,
451 command_waker: None,
452 };
453 self.entities[index].replace(Some(data));
454
455 Entity {
456 data: &self.entities[index],
457 waker: self.waker,
458 } 441 }
442 panic!("device entity limit reached");
443 };
444
445 let data = EntityData {
446 config,
447 storage,
448 publish: false,
449 command: false,
450 command_waker: None,
451 };
452 device.entities[index].replace(Some(data));
453
454 Entity {
455 data: &device.entities[index],
456 waker: device.waker,
459 } 457 }
458}
460 459
461 pub fn create_sensor(&self, id: &'static str, config: SensorConfig) -> Sensor<'a> { 460pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorConfig) -> Sensor<'a> {
462 let mut entity_config = EntityConfig::default(); 461 let mut entity_config = EntityConfig::default();
463 entity_config.id = id; 462 entity_config.id = id;
464 config.populate(&mut entity_config); 463 config.populate(&mut entity_config);
464
465 let entity = create_entity(
466 device,
467 entity_config,
468 EntityStorage::NumericSensor(Default::default()),
469 );
470 Sensor::new(entity)
471}
465 472
466 let entity = self.create_entity( 473pub fn create_button<'a>(device: &Device<'a>, id: &'static str, config: ButtonConfig) -> Button<'a> {
467 entity_config, 474 let mut entity_config = EntityConfig::default();
468 EntityStorage::NumericSensor(Default::default()), 475 entity_config.id = id;
469 ); 476 config.populate(&mut entity_config);
470 Sensor::new(entity)
471 }
472 477
473 pub fn create_button(&self, id: &'static str, config: ButtonConfig) -> Button<'a> { 478 let entity = create_entity(device, entity_config, EntityStorage::Button(Default::default()));
474 let mut entity_config = EntityConfig::default(); 479 Button::new(entity)
475 entity_config.id = id; 480}
476 config.populate(&mut entity_config);
477 481
478 let entity = self.create_entity(entity_config, EntityStorage::Button(Default::default())); 482pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberConfig) -> Number<'a> {
479 Button::new(entity) 483 let mut entity_config = EntityConfig::default();
480 } 484 entity_config.id = id;
485 config.populate(&mut entity_config);
481 486
482 pub fn create_number(&self, id: &'static str, config: NumberConfig) -> Number<'a> { 487 let entity = create_entity(
483 let mut entity_config = EntityConfig::default(); 488 device,
484 entity_config.id = id; 489 entity_config,
485 config.populate(&mut entity_config); 490 EntityStorage::Number(NumberStorage {
486 491 publish_on_command: config.publish_on_command,
487 let entity = self.create_entity( 492 ..Default::default()
488 entity_config, 493 }),
489 EntityStorage::Number(NumberStorage { 494 );
490 publish_on_command: config.publish_on_command, 495 Number::new(entity)
491 ..Default::default() 496}
492 }),
493 );
494 Number::new(entity)
495 }
496 497
497 pub fn create_switch(&self, id: &'static str, config: SwitchConfig) -> Switch<'a> { 498pub fn create_switch<'a>(device: &Device<'a>, id: &'static str, config: SwitchConfig) -> Switch<'a> {
498 let mut entity_config = EntityConfig::default(); 499 let mut entity_config = EntityConfig::default();
499 entity_config.id = id; 500 entity_config.id = id;
500 config.populate(&mut entity_config); 501 config.populate(&mut entity_config);
501 502
502 let entity = self.create_entity( 503 let entity = create_entity(
503 entity_config, 504 device,
504 EntityStorage::Switch(SwitchStorage { 505 entity_config,
505 publish_on_command: config.publish_on_command, 506 EntityStorage::Switch(SwitchStorage {
506 ..Default::default() 507 publish_on_command: config.publish_on_command,
507 }), 508 ..Default::default()
508 ); 509 }),
509 Switch::new(entity) 510 );
510 } 511 Switch::new(entity)
512}
513
514pub fn create_binary_sensor<'a>(
515 device: &Device<'a>,
516 id: &'static str,
517 config: BinarySensorConfig,
518) -> BinarySensor<'a> {
519 let mut entity_config = EntityConfig::default();
520 entity_config.id = id;
521 config.populate(&mut entity_config);
522
523 let entity = create_entity(
524 device,
525 entity_config,
526 EntityStorage::BinarySensor(Default::default()),
527 );
528 BinarySensor::new(entity)
529}
530
531pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> {
532 use core::fmt::Write;
511 533
512 pub fn create_binary_sensor( 534 device.availability_topic_buffer.clear();
513 &self, 535 write!(
514 id: &'static str, 536 device.availability_topic_buffer,
515 config: BinarySensorConfig, 537 "{}",
516 ) -> BinarySensor<'a> { 538 DeviceAvailabilityTopic {
517 let mut entity_config = EntityConfig::default(); 539 device_id: device.config.device_id
518 entity_config.id = id; 540 }
519 config.populate(&mut entity_config); 541 )
520 542 .expect("device availability buffer too small");
521 let entity = self.create_entity( 543 let availability_topic = device.availability_topic_buffer.as_str();
522 entity_config, 544
523 EntityStorage::BinarySensor(Default::default()), 545 let mut client = embedded_mqtt::Client::new(device.mqtt_resources, transport);
546 let connect_params = embedded_mqtt::ConnectParams {
547 will_topic: Some(availability_topic),
548 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
549 will_retain: true,
550 ..Default::default()
551 };
552 if let Err(err) = client
553 .connect_with(device.config.device_id, connect_params)
554 .await
555 {
556 crate::log::error!(
557 "mqtt connect failed with: {:?}",
558 crate::log::Debug2Format(&err)
524 ); 559 );
525 BinarySensor::new(entity) 560 return Err(Error::new("mqtt connection failed"));
526 } 561 }
527 562
528 pub async fn run<T: Transport>(&mut self, transport: &mut T) -> Result<(), Error> { 563 crate::log::debug!("sending discover messages");
529 use core::fmt::Write; 564 let device_discovery = DeviceDiscovery {
565 identifiers: &[device.config.device_id],
566 name: device.config.device_name,
567 manufacturer: device.config.manufacturer,
568 model: device.config.model,
569 };
570
571 for entity in device.entities {
572 device.publish_buffer.clear();
573 device.subscribe_buffer.clear();
574 device.discovery_buffer.clear();
575 device.discovery_topic_buffer.clear();
576 device.state_topic_buffer.clear();
577 device.command_topic_buffer.clear();
578
579 // borrow the entity and fill out the buffers to be sent
580 // this should be done inside a block so that we do not hold the RefMut across an
581 // await
582 {
583 let mut entity = entity.borrow_mut();
584 let entity = match entity.as_mut() {
585 Some(entity) => entity,
586 None => break,
587 };
588 let entity_config = &entity.config;
530 589
531 self.availability_topic_buffer.clear(); 590 let discovery_topic_display = DiscoveryTopicDisplay {
532 write!( 591 domain: entity_config.domain,
533 self.availability_topic_buffer, 592 device_id: device.config.device_id,
534 "{}", 593 entity_id: entity_config.id,
535 DeviceAvailabilityTopic { 594 };
536 device_id: self.config.device_id 595 let state_topic_display = StateTopicDisplay {
537 } 596 device_id: device.config.device_id,
538 ) 597 entity_id: entity_config.id,
539 .expect("device availability buffer too small"); 598 };
540 let availability_topic = self.availability_topic_buffer.as_str(); 599 let command_topic_display = CommandTopicDisplay {
541 600 device_id: device.config.device_id,
542 let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport); 601 entity_id: entity_config.id,
543 let connect_params = embedded_mqtt::ConnectParams { 602 };
544 will_topic: Some(availability_topic), 603
545 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), 604 write!(device.discovery_topic_buffer, "{discovery_topic_display}")
546 will_retain: true, 605 .expect("discovery topic buffer too small");
547 ..Default::default() 606 write!(device.state_topic_buffer, "{state_topic_display}")
548 }; 607 .expect("state topic buffer too small");
608 write!(device.command_topic_buffer, "{command_topic_display}")
609 .expect("command topic buffer too small");
610
611 let discovery = EntityDiscovery {
612 id: entity_config.id,
613 name: entity_config.name,
614 device_class: entity_config.device_class,
615 state_topic: Some(device.state_topic_buffer.as_str()),
616 command_topic: Some(device.command_topic_buffer.as_str()),
617 unit_of_measurement: entity_config.measurement_unit,
618 schema: entity_config.schema,
619 state_class: entity_config.state_class,
620 icon: entity_config.icon,
621 entity_picture: entity_config.picture,
622 min: entity_config.min,
623 max: entity_config.max,
624 step: entity_config.step,
625 mode: entity_config.mode,
626 suggested_display_precision: entity_config.suggested_display_precision,
627 availability_topic: Some(availability_topic),
628 payload_available: Some(AVAILABLE_PAYLOAD),
629 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
630 device: &device_discovery,
631 };
632 crate::log::debug!(
633 "discovery for entity '{}': {:?}",
634 entity_config.id,
635 discovery
636 );
637
638 device.discovery_buffer
639 .resize(device.discovery_buffer.capacity(), 0)
640 .unwrap();
641 let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer)
642 .expect("discovery buffer too small");
643 device.discovery_buffer.truncate(n);
644 }
645
646 let discovery_topic = device.discovery_topic_buffer.as_str();
647 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
549 if let Err(err) = client 648 if let Err(err) = client
550 .connect_with(self.config.device_id, connect_params) 649 .publish(discovery_topic, &device.discovery_buffer)
551 .await 650 .await
552 { 651 {
553 crate::log::error!( 652 crate::log::error!(
554 "mqtt connect failed with: {:?}", 653 "mqtt discovery publish failed with: {:?}",
555 crate::log::Debug2Format(&err) 654 crate::log::Debug2Format(&err)
556 ); 655 );
557 return Err(Error::new("mqtt connection failed")); 656 return Err(Error::new("mqtt discovery publish failed"));
558 } 657 }
559 658
560 crate::log::debug!("sending discover messages"); 659 let command_topic = device.command_topic_buffer.as_str();
561 let device_discovery = DeviceDiscovery { 660 crate::log::debug!("subscribing to command topic '{}'", command_topic);
562 identifiers: &[self.config.device_id], 661 if let Err(err) = client.subscribe(command_topic).await {
563 name: self.config.device_name, 662 crate::log::error!(
564 manufacturer: self.config.manufacturer, 663 "mqtt subscribe to '{}' failed with: {:?}",
565 model: self.config.model, 664 command_topic,
566 }; 665 crate::log::Debug2Format(&err)
567 666 );
568 for entity in self.entities { 667 return Err(Error::new(
569 self.publish_buffer.clear(); 668 "mqtt subscription to entity command topic failed",
570 self.subscribe_buffer.clear(); 669 ));
571 self.discovery_buffer.clear();
572 self.discovery_topic_buffer.clear();
573 self.state_topic_buffer.clear();
574 self.command_topic_buffer.clear();
575
576 // borrow the entity and fill out the buffers to be sent
577 // this should be done inside a block so that we do not hold the RefMut across an
578 // await
579 {
580 let mut entity = entity.borrow_mut();
581 let entity = match entity.as_mut() {
582 Some(entity) => entity,
583 None => break,
584 };
585 let entity_config = &entity.config;
586
587 let discovery_topic_display = DiscoveryTopicDisplay {
588 domain: entity_config.domain,
589 device_id: self.config.device_id,
590 entity_id: entity_config.id,
591 };
592 let state_topic_display = StateTopicDisplay {
593 device_id: self.config.device_id,
594 entity_id: entity_config.id,
595 };
596 let command_topic_display = CommandTopicDisplay {
597 device_id: self.config.device_id,
598 entity_id: entity_config.id,
599 };
600
601 write!(self.discovery_topic_buffer, "{discovery_topic_display}")
602 .expect("discovery topic buffer too small");
603 write!(self.state_topic_buffer, "{state_topic_display}")
604 .expect("state topic buffer too small");
605 write!(self.command_topic_buffer, "{command_topic_display}")
606 .expect("command topic buffer too small");
607
608 let discovery = EntityDiscovery {
609 id: entity_config.id,
610 name: entity_config.name,
611 device_class: entity_config.device_class,
612 state_topic: Some(self.state_topic_buffer.as_str()),
613 command_topic: Some(self.command_topic_buffer.as_str()),
614 unit_of_measurement: entity_config.measurement_unit,
615 schema: entity_config.schema,
616 state_class: entity_config.state_class,
617 icon: entity_config.icon,
618 entity_picture: entity_config.picture,
619 min: entity_config.min,
620 max: entity_config.max,
621 step: entity_config.step,
622 mode: entity_config.mode,
623 suggested_display_precision: entity_config.suggested_display_precision,
624 availability_topic: Some(availability_topic),
625 payload_available: Some(AVAILABLE_PAYLOAD),
626 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
627 device: &device_discovery,
628 };
629 crate::log::debug!(
630 "discovery for entity '{}': {:?}",
631 entity_config.id,
632 discovery
633 );
634
635 self.discovery_buffer
636 .resize(self.discovery_buffer.capacity(), 0)
637 .unwrap();
638 let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer)
639 .expect("discovery buffer too small");
640 self.discovery_buffer.truncate(n);
641 }
642
643 let discovery_topic = self.discovery_topic_buffer.as_str();
644 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
645 if let Err(err) = client
646 .publish(discovery_topic, &self.discovery_buffer)
647 .await
648 {
649 crate::log::error!(
650 "mqtt discovery publish failed with: {:?}",
651 crate::log::Debug2Format(&err)
652 );
653 return Err(Error::new("mqtt discovery publish failed"));
654 }
655
656 let command_topic = self.command_topic_buffer.as_str();
657 crate::log::debug!("subscribing to command topic '{}'", command_topic);
658 if let Err(err) = client.subscribe(command_topic).await {
659 crate::log::error!(
660 "mqtt subscribe to '{}' failed with: {:?}",
661 command_topic,
662 crate::log::Debug2Format(&err)
663 );
664 return Err(Error::new(
665 "mqtt subscription to entity command topic failed",
666 ));
667 }
668 } 670 }
671 }
669 672
670 if let Err(err) = client 673 if let Err(err) = client
671 .publish_with( 674 .publish_with(
@@ -685,68 +688,68 @@ impl<'a> Device<'a> {
685 return Err(Error::new("mqtt availability publish failed")); 688 return Err(Error::new("mqtt availability publish failed"));
686 } 689 }
687 690
688 'outer_loop: loop { 691 'outer_loop: loop {
689 use core::fmt::Write; 692 use core::fmt::Write;
690 693
691 for entity in self.entities { 694 for entity in device.entities {
692 { 695 {
693 let mut entity = entity.borrow_mut(); 696 let mut entity = entity.borrow_mut();
694 let entity = match entity.as_mut() { 697 let entity = match entity.as_mut() {
695 Some(entity) => entity, 698 Some(entity) => entity,
696 None => break, 699 None => break,
697 }; 700 };
698 701
699 if !entity.publish { 702 if !entity.publish {
700 continue; 703 continue;
701 } 704 }
702 705
703 entity.publish = false; 706 entity.publish = false;
704 self.publish_buffer.clear(); 707 device.publish_buffer.clear();
705 708
706 match &entity.storage { 709 match &entity.storage {
707 EntityStorage::Switch(SwitchStorage { 710 EntityStorage::Switch(SwitchStorage {
708 state: Some(SwitchState { value, .. }), 711 state: Some(SwitchState { value, .. }),
709 .. 712 ..
710 }) => self 713 }) => device
711 .publish_buffer 714 .publish_buffer
712 .extend_from_slice(value.as_str().as_bytes()) 715 .extend_from_slice(value.as_str().as_bytes())
713 .expect("publish buffer too small for switch state payload"), 716 .expect("publish buffer too small for switch state payload"),
714 EntityStorage::BinarySensor(BinarySensorStorage { 717 EntityStorage::BinarySensor(BinarySensorStorage {
715 state: Some(BinarySensorState { value, .. }), 718 state: Some(BinarySensorState { value, .. }),
716 }) => self 719 }) => device
717 .publish_buffer 720 .publish_buffer
718 .extend_from_slice(value.as_str().as_bytes()) 721 .extend_from_slice(value.as_str().as_bytes())
719 .expect("publish buffer too small for binary sensor state payload"), 722 .expect("publish buffer too small for binary sensor state payload"),
720 EntityStorage::NumericSensor(NumericSensorStorage { 723 EntityStorage::NumericSensor(NumericSensorStorage {
721 state: Some(NumericSensorState { value, .. }), 724 state: Some(NumericSensorState { value, .. }),
722 .. 725 ..
723 }) => write!(self.publish_buffer, "{}", value) 726 }) => write!(device.publish_buffer, "{}", value)
724 .expect("publish buffer too small for numeric sensor payload"), 727 .expect("publish buffer too small for numeric sensor payload"),
725 EntityStorage::Number(NumberStorage { 728 EntityStorage::Number(NumberStorage {
726 state: Some(NumberState { value, .. }), 729 state: Some(NumberState { value, .. }),
727 .. 730 ..
728 }) => write!(self.publish_buffer, "{}", value) 731 }) => write!(device.publish_buffer, "{}", value)
729 .expect("publish buffer too small for number state payload"), 732 .expect("publish buffer too small for number state payload"),
730 _ => { 733 _ => {
731 crate::log::warn!( 734 crate::log::warn!(
732 "entity '{}' requested state publish but its storage does not support it", 735 "entity '{}' requested state publish but its storage does not support it",
733 entity.config.id 736 entity.config.id
734 ); 737 );
735 continue; 738 continue;
736 }
737 } 739 }
738
739 let state_topic_display = StateTopicDisplay {
740 device_id: self.config.device_id,
741 entity_id: entity.config.id,
742 };
743 self.state_topic_buffer.clear();
744 write!(self.state_topic_buffer, "{state_topic_display}")
745 .expect("state topic buffer too small");
746 } 740 }
747 741
748 let state_topic = self.state_topic_buffer.as_str(); 742 let state_topic_display = StateTopicDisplay {
749 if let Err(err) = client.publish(state_topic, self.publish_buffer).await { 743 device_id: device.config.device_id,
744 entity_id: entity.config.id,
745 };
746 device.state_topic_buffer.clear();
747 write!(device.state_topic_buffer, "{state_topic_display}")
748 .expect("state topic buffer too small");
749 }
750
751 let state_topic = device.state_topic_buffer.as_str();
752 if let Err(err) = client.publish(state_topic, device.publish_buffer).await {
750 crate::log::error!( 753 crate::log::error!(
751 "mqtt state publish on topic '{}' failed with: {:?}", 754 "mqtt state publish on topic '{}' failed with: {:?}",
752 state_topic, 755 state_topic,
@@ -756,153 +759,152 @@ impl<'a> Device<'a> {
756 } 759 }
757 } 760 }
758 761
759 let receive = client.receive(); 762 let receive = client.receive();
760 let waker = wait_on_atomic_waker(self.waker); 763 let waker = wait_on_atomic_waker(device.waker);
761 let publish = match embassy_futures::select::select(receive, waker).await { 764 let publish = match embassy_futures::select::select(receive, waker).await {
762 embassy_futures::select::Either::First(packet) => match packet { 765 embassy_futures::select::Either::First(packet) => match packet {
763 Ok(embedded_mqtt::Packet::Publish(publish)) => publish, 766 Ok(embedded_mqtt::Packet::Publish(publish)) => publish,
764 Err(err) => { 767 Err(err) => {
765 crate::log::error!( 768 crate::log::error!(
766 "mqtt receive failed with: {:?}", 769 "mqtt receive failed with: {:?}",
767 crate::log::Debug2Format(&err) 770 crate::log::Debug2Format(&err)
768 ); 771 );
769 return Err(Error::new("mqtt receive failed")); 772 return Err(Error::new("mqtt receive failed"));
770 }
771 _ => continue,
772 },
773 embassy_futures::select::Either::Second(_) => continue,
774 };
775
776 let entity = 'entity_search_block: {
777 for entity in self.entities {
778 let mut data = entity.borrow_mut();
779 let data = match data.as_mut() {
780 Some(data) => data,
781 None => break,
782 };
783
784 let command_topic_display = CommandTopicDisplay {
785 device_id: self.config.device_id,
786 entity_id: data.config.id,
787 };
788 self.command_topic_buffer.clear();
789 write!(self.command_topic_buffer, "{command_topic_display}")
790 .expect("command topic buffer too small");
791
792 if self.command_topic_buffer.as_bytes() == publish.topic.as_bytes() {
793 break 'entity_search_block entity;
794 }
795 } 773 }
796 continue 'outer_loop; 774 _ => continue,
797 }; 775 },
776 embassy_futures::select::Either::Second(_) => continue,
777 };
798 778
799 let mut read_buffer = [0u8; 128]; 779 let entity = 'entity_search_block: {
800 if publish.data_len > read_buffer.len() { 780 for entity in device.entities {
801 crate::log::warn!( 781 let mut data = entity.borrow_mut();
802 "mqtt publish payload on topic {} is too large ({} bytes), ignoring it", 782 let data = match data.as_mut() {
803 publish.topic, 783 Some(data) => data,
804 publish.data_len 784 None => break,
805 ); 785 };
806 continue; 786
787 let command_topic_display = CommandTopicDisplay {
788 device_id: device.config.device_id,
789 entity_id: data.config.id,
790 };
791 device.command_topic_buffer.clear();
792 write!(device.command_topic_buffer, "{command_topic_display}")
793 .expect("command topic buffer too small");
794
795 if device.command_topic_buffer.as_bytes() == publish.topic.as_bytes() {
796 break 'entity_search_block entity;
797 }
807 } 798 }
799 continue 'outer_loop;
800 };
808 801
809 crate::log::debug!( 802 let mut read_buffer = [0u8; 128];
810 "mqtt receiving {} bytes of data on topic {}", 803 if publish.data_len > read_buffer.len() {
811 publish.data_len, 804 crate::log::warn!(
812 publish.topic 805 "mqtt publish payload on topic {} is too large ({} bytes), ignoring it",
806 publish.topic,
807 publish.data_len
813 ); 808 );
809 continue;
810 }
811
812 crate::log::debug!(
813 "mqtt receiving {} bytes of data on topic {}",
814 publish.data_len,
815 publish.topic
816 );
814 817
815 let data_len = publish.data_len; 818 let data_len = publish.data_len;
816 if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { 819 if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await {
817 crate::log::error!( 820 crate::log::error!(
818 "mqtt receive data failed with: {:?}", 821 "mqtt receive data failed with: {:?}",
819 crate::log::Debug2Format(&err) 822 crate::log::Debug2Format(&err)
820 ); 823 );
821 return Err(Error::new("mqtt receive data failed")); 824 return Err(Error::new("mqtt receive data failed"));
825 }
826
827 let command = match str::from_utf8(&read_buffer[..data_len]) {
828 Ok(command) => command,
829 Err(_) => {
830 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it");
831 continue;
822 } 832 }
833 };
823 834
824 let command = match str::from_utf8(&read_buffer[..data_len]) { 835 let mut entity = entity.borrow_mut();
825 Ok(command) => command, 836 let data = entity.as_mut().unwrap();
826 Err(_) => { 837
827 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it"); 838 match &mut data.storage {
839 EntityStorage::Button(button_storage) => {
840 if command != constants::HA_BUTTON_PAYLOAD_PRESS {
841 crate::log::warn!(
842 "button '{}' received unexpected command '{}', expected '{}', ignoring it",
843 data.config.id,
844 command,
845 constants::HA_BUTTON_PAYLOAD_PRESS
846 );
828 continue; 847 continue;
829 } 848 }
830 }; 849 button_storage.consumed = false;
831 850 button_storage.timestamp = Some(embassy_time::Instant::now());
832 let mut entity = entity.borrow_mut(); 851 }
833 let data = entity.as_mut().unwrap(); 852 EntityStorage::Switch(switch_storage) => {
834 853 let command = match command.parse::<BinaryState>() {
835 match &mut data.storage { 854 Ok(command) => command,
836 EntityStorage::Button(button_storage) => { 855 Err(_) => {
837 if command != constants::HA_BUTTON_PAYLOAD_PRESS {
838 crate::log::warn!( 856 crate::log::warn!(
839 "button '{}' received unexpected command '{}', expected '{}', ignoring it", 857 "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it",
840 data.config.id, 858 data.config.id,
841 command, 859 command
842 constants::HA_BUTTON_PAYLOAD_PRESS
843 ); 860 );
844 continue; 861 continue;
845 } 862 }
846 button_storage.consumed = false; 863 };
847 button_storage.timestamp = Some(embassy_time::Instant::now()); 864 let timestamp = embassy_time::Instant::now();
848 } 865 if switch_storage.publish_on_command {
849 EntityStorage::Switch(switch_storage) => { 866 data.publish = true;
850 let command = match command.parse::<BinaryState>() { 867 switch_storage.state = Some(SwitchState {
851 Ok(command) => command,
852 Err(_) => {
853 crate::log::warn!(
854 "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it",
855 data.config.id,
856 command
857 );
858 continue;
859 }
860 };
861 let timestamp = embassy_time::Instant::now();
862 if switch_storage.publish_on_command {
863 data.publish = true;
864 switch_storage.state = Some(SwitchState {
865 value: command,
866 timestamp,
867 });
868 }
869 switch_storage.command = Some(SwitchCommand {
870 value: command, 868 value: command,
871 timestamp, 869 timestamp,
872 }); 870 });
873 } 871 }
874 EntityStorage::Number(number_storage) => { 872 switch_storage.command = Some(SwitchCommand {
875 let command = match command.parse::<f32>() { 873 value: command,
876 Ok(command) => command, 874 timestamp,
877 Err(_) => { 875 });
878 crate::log::warn!( 876 }
879 "number '{}' received invalid command '{}', expected a valid number, ignoring it", 877 EntityStorage::Number(number_storage) => {
880 data.config.id, 878 let command = match command.parse::<f32>() {
881 command 879 Ok(command) => command,
882 ); 880 Err(_) => {
883 continue; 881 crate::log::warn!(
884 } 882 "number '{}' received invalid command '{}', expected a valid number, ignoring it",
885 }; 883 data.config.id,
886 let timestamp = embassy_time::Instant::now(); 884 command
887 if number_storage.publish_on_command { 885 );
888 data.publish = true; 886 continue;
889 number_storage.state = Some(NumberState {
890 value: command,
891 timestamp,
892 });
893 } 887 }
894 number_storage.command = Some(NumberCommand { 888 };
889 let timestamp = embassy_time::Instant::now();
890 if number_storage.publish_on_command {
891 data.publish = true;
892 number_storage.state = Some(NumberState {
895 value: command, 893 value: command,
896 timestamp, 894 timestamp,
897 }); 895 });
898 } 896 }
899 _ => continue 'outer_loop, 897 number_storage.command = Some(NumberCommand {
898 value: command,
899 timestamp,
900 });
900 } 901 }
902 _ => continue 'outer_loop,
903 }
901 904
902 data.command = true; 905 data.command = true;
903 if let Some(waker) = data.command_waker.take() { 906 if let Some(waker) = data.command_waker.take() {
904 waker.wake(); 907 waker.wake();
905 }
906 } 908 }
907 } 909 }
908} 910}
@@ -999,7 +1001,7 @@ pub async fn connect_and_run(
999 1001
1000 socket.set_timeout(None); 1002 socket.set_timeout(None);
1001 1003
1002 if let Err(err) = device.run(&mut socket).await { 1004 if let Err(err) = run(&mut device, &mut socket).await {
1003 crate::log::error!( 1005 crate::log::error!(
1004 "Device run failed with: {:?}", 1006 "Device run failed with: {:?}",
1005 crate::log::Debug2Format(&err) 1007 crate::log::Debug2Format(&err)