aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-08 20:49:23 +0000
committerdiogo464 <[email protected]>2025-12-08 20:49:23 +0000
commit8ac9ddd2cbc9cf454eae066e5e60d05ee714a83e (patch)
treee4797159d8fdae7e8521295fd8178cadc8c2974a /src/lib.rs
parent28d9961141a38ebde8bd6144636c3021eb2755a5 (diff)
formatting and improved timeout handling
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs230
1 files changed, 168 insertions, 62 deletions
diff --git a/src/lib.rs b/src/lib.rs
index c1b5191..672fde9 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,10 +1,14 @@
1#![no_std] 1#![no_std]
2 2
3use core::{cell::RefCell, net::{Ipv4Addr, SocketAddrV4}, task::Waker}; 3use core::{
4 cell::RefCell,
5 net::{Ipv4Addr, SocketAddrV4},
6 task::Waker,
7};
4 8
5use embassy_net::tcp::TcpSocket; 9use embassy_net::tcp::TcpSocket;
6use embassy_sync::waitqueue::AtomicWaker; 10use embassy_sync::waitqueue::AtomicWaker;
7use embassy_time::Timer; 11use embassy_time::{Duration, Timer};
8use heapless::{ 12use heapless::{
9 Vec, VecView, 13 Vec, VecView,
10 string::{String, StringView}, 14 string::{String, StringView},
@@ -432,7 +436,11 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi
432 } 436 }
433} 437}
434 438
435fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityStorage) -> Entity<'a> { 439fn create_entity<'a>(
440 device: &Device<'a>,
441 config: EntityConfig,
442 storage: EntityStorage,
443) -> Entity<'a> {
436 let index = 'outer: { 444 let index = 'outer: {
437 for idx in 0..device.entities.len() { 445 for idx in 0..device.entities.len() {
438 if device.entities[idx].borrow().is_none() { 446 if device.entities[idx].borrow().is_none() {
@@ -457,7 +465,11 @@ fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityS
457 } 465 }
458} 466}
459 467
460pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorConfig) -> Sensor<'a> { 468pub fn create_sensor<'a>(
469 device: &Device<'a>,
470 id: &'static str,
471 config: SensorConfig,
472) -> Sensor<'a> {
461 let mut entity_config = EntityConfig::default(); 473 let mut entity_config = EntityConfig::default();
462 entity_config.id = id; 474 entity_config.id = id;
463 config.populate(&mut entity_config); 475 config.populate(&mut entity_config);
@@ -470,16 +482,28 @@ pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorCo
470 Sensor::new(entity) 482 Sensor::new(entity)
471} 483}
472 484
473pub fn create_button<'a>(device: &Device<'a>, id: &'static str, config: ButtonConfig) -> Button<'a> { 485pub fn create_button<'a>(
486 device: &Device<'a>,
487 id: &'static str,
488 config: ButtonConfig,
489) -> Button<'a> {
474 let mut entity_config = EntityConfig::default(); 490 let mut entity_config = EntityConfig::default();
475 entity_config.id = id; 491 entity_config.id = id;
476 config.populate(&mut entity_config); 492 config.populate(&mut entity_config);
477 493
478 let entity = create_entity(device, entity_config, EntityStorage::Button(Default::default())); 494 let entity = create_entity(
495 device,
496 entity_config,
497 EntityStorage::Button(Default::default()),
498 );
479 Button::new(entity) 499 Button::new(entity)
480} 500}
481 501
482pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberConfig) -> Number<'a> { 502pub fn create_number<'a>(
503 device: &Device<'a>,
504 id: &'static str,
505 config: NumberConfig,
506) -> Number<'a> {
483 let mut entity_config = EntityConfig::default(); 507 let mut entity_config = EntityConfig::default();
484 entity_config.id = id; 508 entity_config.id = id;
485 config.populate(&mut entity_config); 509 config.populate(&mut entity_config);
@@ -495,7 +519,11 @@ pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberCo
495 Number::new(entity) 519 Number::new(entity)
496} 520}
497 521
498pub fn create_switch<'a>(device: &Device<'a>, id: &'static str, config: SwitchConfig) -> Switch<'a> { 522pub fn create_switch<'a>(
523 device: &Device<'a>,
524 id: &'static str,
525 config: SwitchConfig,
526) -> Switch<'a> {
499 let mut entity_config = EntityConfig::default(); 527 let mut entity_config = EntityConfig::default();
500 entity_config.id = id; 528 entity_config.id = id;
501 config.populate(&mut entity_config); 529 config.populate(&mut entity_config);
@@ -531,6 +559,8 @@ pub fn create_binary_sensor<'a>(
531pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { 559pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> {
532 use core::fmt::Write; 560 use core::fmt::Write;
533 561
562 const MQTT_TIMEOUT: Duration = Duration::from_secs(30);
563
534 device.availability_topic_buffer.clear(); 564 device.availability_topic_buffer.clear();
535 write!( 565 write!(
536 device.availability_topic_buffer, 566 device.availability_topic_buffer,
@@ -549,15 +579,24 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
549 will_retain: true, 579 will_retain: true,
550 ..Default::default() 580 ..Default::default()
551 }; 581 };
552 if let Err(err) = client 582 match embassy_time::with_timeout(
553 .connect_with(device.config.device_id, connect_params) 583 MQTT_TIMEOUT,
554 .await 584 client.connect_with(device.config.device_id, connect_params),
585 )
586 .await
555 { 587 {
556 crate::log::error!( 588 Ok(Ok(())) => {}
557 "mqtt connect failed with: {:?}", 589 Ok(Err(err)) => {
558 crate::log::Debug2Format(&err) 590 crate::log::error!(
559 ); 591 "mqtt connect failed with: {:?}",
560 return Err(Error::new("mqtt connection failed")); 592 crate::log::Debug2Format(&err)
593 );
594 return Err(Error::new("mqtt connection failed"));
595 }
596 Err(_) => {
597 crate::log::error!("mqtt connect timed out");
598 return Err(Error::new("mqtt connect timed out"));
599 }
561 } 600 }
562 601
563 crate::log::debug!("sending discover messages"); 602 crate::log::debug!("sending discover messages");
@@ -635,7 +674,8 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
635 discovery 674 discovery
636 ); 675 );
637 676
638 device.discovery_buffer 677 device
678 .discovery_buffer
639 .resize(device.discovery_buffer.capacity(), 0) 679 .resize(device.discovery_buffer.capacity(), 0)
640 .unwrap(); 680 .unwrap();
641 let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer) 681 let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer)
@@ -645,48 +685,73 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
645 685
646 let discovery_topic = device.discovery_topic_buffer.as_str(); 686 let discovery_topic = device.discovery_topic_buffer.as_str();
647 crate::log::debug!("sending discovery to topic '{}'", discovery_topic); 687 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
648 if let Err(err) = client 688 match embassy_time::with_timeout(
649 .publish(discovery_topic, &device.discovery_buffer) 689 MQTT_TIMEOUT,
650 .await 690 client.publish(discovery_topic, &device.discovery_buffer),
691 )
692 .await
651 { 693 {
652 crate::log::error!( 694 Ok(Ok(_)) => {}
653 "mqtt discovery publish failed with: {:?}", 695 Ok(Err(err)) => {
654 crate::log::Debug2Format(&err) 696 crate::log::error!(
655 ); 697 "mqtt discovery publish failed with: {:?}",
656 return Err(Error::new("mqtt discovery publish failed")); 698 crate::log::Debug2Format(&err)
699 );
700 return Err(Error::new("mqtt discovery publish failed"));
701 }
702 Err(_) => {
703 crate::log::error!("mqtt discovery publish timed out");
704 return Err(Error::new("mqtt discovery publish timed out"));
705 }
657 } 706 }
658 707
659 let command_topic = device.command_topic_buffer.as_str(); 708 let command_topic = device.command_topic_buffer.as_str();
660 crate::log::debug!("subscribing to command topic '{}'", command_topic); 709 crate::log::debug!("subscribing to command topic '{}'", command_topic);
661 if let Err(err) = client.subscribe(command_topic).await { 710 match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await {
662 crate::log::error!( 711 Ok(Ok(_)) => {}
663 "mqtt subscribe to '{}' failed with: {:?}", 712 Ok(Err(err)) => {
664 command_topic, 713 crate::log::error!(
665 crate::log::Debug2Format(&err) 714 "mqtt subscribe to '{}' failed with: {:?}",
666 ); 715 command_topic,
667 return Err(Error::new( 716 crate::log::Debug2Format(&err)
668 "mqtt subscription to entity command topic failed", 717 );
669 )); 718 return Err(Error::new(
719 "mqtt subscription to entity command topic failed",
720 ));
721 }
722 Err(_) => {
723 crate::log::error!("mqtt subscribe to '{}' timed out", command_topic);
724 return Err(Error::new("mqtt subscribe timed out"));
725 }
670 } 726 }
671 } 727 }
672 728
673 if let Err(err) = client 729 match embassy_time::with_timeout(
674 .publish_with( 730 MQTT_TIMEOUT,
675 availability_topic, 731 client.publish_with(
676 AVAILABLE_PAYLOAD.as_bytes(), 732 availability_topic,
677 embedded_mqtt::PublishParams { 733 AVAILABLE_PAYLOAD.as_bytes(),
678 retain: true, 734 embedded_mqtt::PublishParams {
679 ..Default::default() 735 retain: true,
680 }, 736 ..Default::default()
681 ) 737 },
682 .await 738 ),
683 { 739 )
740 .await
741 {
742 Ok(Ok(_)) => {}
743 Ok(Err(err)) => {
684 crate::log::error!( 744 crate::log::error!(
685 "mqtt availability publish failed with: {:?}", 745 "mqtt availability publish failed with: {:?}",
686 crate::log::Debug2Format(&err) 746 crate::log::Debug2Format(&err)
687 ); 747 );
688 return Err(Error::new("mqtt availability publish failed")); 748 return Err(Error::new("mqtt availability publish failed"));
689 } 749 }
750 Err(_) => {
751 crate::log::error!("mqtt availability publish timed out");
752 return Err(Error::new("mqtt availability publish timed out"));
753 }
754 }
690 755
691 'outer_loop: loop { 756 'outer_loop: loop {
692 use core::fmt::Write; 757 use core::fmt::Write;
@@ -749,7 +814,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
749 } 814 }
750 815
751 let state_topic = device.state_topic_buffer.as_str(); 816 let state_topic = device.state_topic_buffer.as_str();
752 if let Err(err) = client.publish(state_topic, device.publish_buffer).await { 817 match embassy_time::with_timeout(
818 MQTT_TIMEOUT,
819 client.publish(state_topic, device.publish_buffer),
820 )
821 .await
822 {
823 Ok(Ok(_)) => {}
824 Ok(Err(err)) => {
753 crate::log::error!( 825 crate::log::error!(
754 "mqtt state publish on topic '{}' failed with: {:?}", 826 "mqtt state publish on topic '{}' failed with: {:?}",
755 state_topic, 827 state_topic,
@@ -757,12 +829,22 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
757 ); 829 );
758 return Err(Error::new("mqtt publish failed")); 830 return Err(Error::new("mqtt publish failed"));
759 } 831 }
832 Err(_) => {
833 crate::log::error!("mqtt state publish on topic '{}' timed out", state_topic);
834 return Err(Error::new("mqtt publish timed out"));
835 }
760 } 836 }
837 }
761 838
762 let receive = client.receive(); 839 let receive = client.receive();
763 let waker = wait_on_atomic_waker(device.waker); 840 let waker = wait_on_atomic_waker(device.waker);
764 let publish = match embassy_futures::select::select(receive, waker).await { 841 let publish = match embassy_time::with_timeout(
765 embassy_futures::select::Either::First(packet) => match packet { 842 MQTT_TIMEOUT,
843 embassy_futures::select::select(receive, waker),
844 )
845 .await
846 {
847 Ok(embassy_futures::select::Either::First(packet)) => match packet {
766 Ok(embedded_mqtt::Packet::Publish(publish)) => publish, 848 Ok(embedded_mqtt::Packet::Publish(publish)) => publish,
767 Err(err) => { 849 Err(err) => {
768 crate::log::error!( 850 crate::log::error!(
@@ -773,7 +855,11 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
773 } 855 }
774 _ => continue, 856 _ => continue,
775 }, 857 },
776 embassy_futures::select::Either::Second(_) => continue, 858 Ok(embassy_futures::select::Either::Second(_)) => continue,
859 Err(_) => {
860 crate::log::error!("mqtt receive timed out");
861 return Err(Error::new("mqtt receive timed out"));
862 }
777 }; 863 };
778 864
779 let entity = 'entity_search_block: { 865 let entity = 'entity_search_block: {
@@ -816,12 +902,24 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
816 ); 902 );
817 903
818 let data_len = publish.data_len; 904 let data_len = publish.data_len;
819 if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { 905 match embassy_time::with_timeout(
820 crate::log::error!( 906 MQTT_TIMEOUT,
821 "mqtt receive data failed with: {:?}", 907 client.receive_data(&mut read_buffer[..data_len]),
822 crate::log::Debug2Format(&err) 908 )
823 ); 909 .await
824 return Err(Error::new("mqtt receive data failed")); 910 {
911 Ok(Ok(())) => {}
912 Ok(Err(err)) => {
913 crate::log::error!(
914 "mqtt receive data failed with: {:?}",
915 crate::log::Debug2Format(&err)
916 );
917 return Err(Error::new("mqtt receive data failed"));
918 }
919 Err(_) => {
920 crate::log::error!("mqtt receive data timed out");
921 return Err(Error::new("mqtt receive data timed out"));
922 }
825 } 923 }
826 924
827 let command = match str::from_utf8(&read_buffer[..data_len]) { 925 let command = match str::from_utf8(&read_buffer[..data_len]) {
@@ -990,13 +1088,21 @@ pub async fn connect_and_run(
990 let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); 1088 let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
991 socket.set_timeout(Some(embassy_time::Duration::from_secs(10))); 1089 socket.set_timeout(Some(embassy_time::Duration::from_secs(10)));
992 1090
993 if let Err(err) = socket.connect(addr).await { 1091 let connect_fut = embassy_time::with_timeout(Duration::from_secs(10), socket.connect(addr));
994 crate::log::error!( 1092 match connect_fut.await {
995 "TCP connect to {} failed with: {:?}", 1093 Ok(Err(err)) => {
996 addr, 1094 crate::log::error!(
997 crate::log::Debug2Format(&err) 1095 "TCP connect to {} failed with: {:?}",
998 ); 1096 addr,
999 continue; 1097 crate::log::Debug2Format(&err)
1098 );
1099 continue;
1100 }
1101 Err(_) => {
1102 crate::log::error!("TCP connect to {} timed out", addr);
1103 continue;
1104 }
1105 _ => {}
1000 } 1106 }
1001 1107
1002 socket.set_timeout(None); 1108 socket.set_timeout(None);