aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-13 20:34:27 +0000
committerdiogo464 <[email protected]>2025-12-13 20:34:27 +0000
commit2889e53ed5218669c2e53ad68ed6366c08a1495a (patch)
tree67397fd0f274d4b5474dfa2bbf458122ee6bbedf /src/lib.rs
parenteb35788c70a1855a4691df77946821c33702dafb (diff)
parent871b67e886ed9853e3fa392ff33a05c7da09fe9b (diff)
Merge branch 'main' into embassy-git
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs64
1 files changed, 37 insertions, 27 deletions
diff --git a/src/lib.rs b/src/lib.rs
index f28d2bd..7d31ac5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -72,6 +72,7 @@ pub use unit::*;
72 72
73const AVAILABLE_PAYLOAD: &str = "online"; 73const AVAILABLE_PAYLOAD: &str = "online";
74const NOT_AVAILABLE_PAYLOAD: &str = "offline"; 74const NOT_AVAILABLE_PAYLOAD: &str = "offline";
75const DEFAULT_KEEPALIVE_TIME: u16 = 30;
75 76
76#[derive(Debug)] 77#[derive(Debug)]
77pub struct Error(&'static str); 78pub struct Error(&'static str);
@@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
673 .expect("device availability buffer too small"); 674 .expect("device availability buffer too small");
674 let availability_topic = device.availability_topic_buffer.as_str(); 675 let availability_topic = device.availability_topic_buffer.as_str();
675 676
677 let mut ping_ticker =
678 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
676 let mut client = mqtt::Client::new(device.mqtt_resources, transport); 679 let mut client = mqtt::Client::new(device.mqtt_resources, transport);
677 let connect_params = mqtt::ConnectParams { 680 let connect_params = mqtt::ConnectParams {
678 will_topic: Some(availability_topic), 681 will_topic: Some(availability_topic),
679 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), 682 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
680 will_retain: true, 683 will_retain: true,
684 keepalive: Some(DEFAULT_KEEPALIVE_TIME),
681 ..Default::default() 685 ..Default::default()
682 }; 686 };
683 match embassy_time::with_timeout( 687 match embassy_time::with_timeout(
@@ -863,6 +867,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
863 } 867 }
864 } 868 }
865 869
870 let mut first_iteration_push = true;
866 'outer_loop: loop { 871 'outer_loop: loop {
867 use core::fmt::Write; 872 use core::fmt::Write;
868 873
@@ -874,7 +879,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
874 None => break, 879 None => break,
875 }; 880 };
876 881
877 if !entity.publish { 882 if !entity.publish && !first_iteration_push {
878 continue; 883 continue;
879 } 884 }
880 885
@@ -920,10 +925,12 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
920 device.publish_buffer.truncate(n); 925 device.publish_buffer.truncate(n);
921 } 926 }
922 _ => { 927 _ => {
923 crate::log::warn!( 928 if !first_iteration_push {
924 "entity '{}' requested state publish but its storage does not support it", 929 crate::log::warn!(
925 entity.config.id 930 "entity '{}' requested state publish but its storage does not support it",
926 ); 931 entity.config.id
932 );
933 }
927 continue; 934 continue;
928 } 935 }
929 } 936 }
@@ -970,32 +977,35 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
970 } 977 }
971 } 978 }
972 } 979 }
980 first_iteration_push = false;
973 981
974 let receive = client.receive(); 982 let receive = client.receive();
975 let waker = wait_on_atomic_waker(device.waker); 983 let waker = wait_on_atomic_waker(device.waker);
976 let publish = match embassy_time::with_timeout( 984 let publish =
977 MQTT_TIMEOUT, 985 match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await {
978 embassy_futures::select::select(receive, waker), 986 embassy_futures::select::Either3::First(packet) => match packet {
979 ) 987 Ok(mqtt::Packet::Publish(publish)) => publish,
980 .await 988 Err(err) => {
981 { 989 crate::log::error!(
982 Ok(embassy_futures::select::Either::First(packet)) => match packet { 990 "mqtt receive failed with: {:?}",
983 Ok(mqtt::Packet::Publish(publish)) => publish, 991 crate::log::Debug2Format(&err)
984 Err(err) => { 992 );
985 crate::log::error!( 993 return Err(Error::new("mqtt receive failed"));
986 "mqtt receive failed with: {:?}", 994 }
987 crate::log::Debug2Format(&err) 995 _ => continue,
988 ); 996 },
989 return Err(Error::new("mqtt receive failed")); 997 embassy_futures::select::Either3::Second(_) => continue,
998 embassy_futures::select::Either3::Third(_) => {
999 if let Err(err) = client.ping().await {
1000 crate::log::error!(
1001 "mqtt ping failed with: {:?}",
1002 crate::log::Debug2Format(&err)
1003 );
1004 return Err(Error::new("mqtt ping failed"));
1005 }
1006 continue;
990 } 1007 }
991 _ => continue, 1008 };
992 },
993 Ok(embassy_futures::select::Either::Second(_)) => continue,
994 Err(_) => {
995 crate::log::error!("mqtt receive timed out");
996 return Err(Error::new("mqtt receive timed out"));
997 }
998 };
999 1009
1000 let entity = 'entity_search_block: { 1010 let entity = 'entity_search_block: {
1001 for entity in device.entities { 1011 for entity in device.entities {