aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-13 20:32:31 +0000
committerdiogo464 <[email protected]>2025-12-13 20:32:31 +0000
commit871b67e886ed9853e3fa392ff33a05c7da09fe9b (patch)
treee08c9a0312f1e37187d8e6860b8b8221a7e25a77 /src/lib.rs
parentb456db36b7d973154f0a10bc1adca235de2c5c12 (diff)
use 30s mqtt keepalive
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs50
1 files changed, 28 insertions, 22 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 64057a4..7d31ac5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -72,6 +72,7 @@ pub use unit::*;
72 72
73const AVAILABLE_PAYLOAD: &str = "online"; 73const AVAILABLE_PAYLOAD: &str = "online";
74const NOT_AVAILABLE_PAYLOAD: &str = "offline"; 74const NOT_AVAILABLE_PAYLOAD: &str = "offline";
75const DEFAULT_KEEPALIVE_TIME: u16 = 30;
75 76
76#[derive(Debug)] 77#[derive(Debug)]
77pub struct Error(&'static str); 78pub struct Error(&'static str);
@@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
673 .expect("device availability buffer too small"); 674 .expect("device availability buffer too small");
674 let availability_topic = device.availability_topic_buffer.as_str(); 675 let availability_topic = device.availability_topic_buffer.as_str();
675 676
677 let mut ping_ticker =
678 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
676 let mut client = mqtt::Client::new(device.mqtt_resources, transport); 679 let mut client = mqtt::Client::new(device.mqtt_resources, transport);
677 let connect_params = mqtt::ConnectParams { 680 let connect_params = mqtt::ConnectParams {
678 will_topic: Some(availability_topic), 681 will_topic: Some(availability_topic),
679 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), 682 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
680 will_retain: true, 683 will_retain: true,
684 keepalive: Some(DEFAULT_KEEPALIVE_TIME),
681 ..Default::default() 685 ..Default::default()
682 }; 686 };
683 match embassy_time::with_timeout( 687 match embassy_time::with_timeout(
@@ -977,29 +981,31 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
977 981
978 let receive = client.receive(); 982 let receive = client.receive();
979 let waker = wait_on_atomic_waker(device.waker); 983 let waker = wait_on_atomic_waker(device.waker);
980 let publish = match embassy_time::with_timeout( 984 let publish =
981 MQTT_TIMEOUT, 985 match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await {
982 embassy_futures::select::select(receive, waker), 986 embassy_futures::select::Either3::First(packet) => match packet {
983 ) 987 Ok(mqtt::Packet::Publish(publish)) => publish,
984 .await 988 Err(err) => {
985 { 989 crate::log::error!(
986 Ok(embassy_futures::select::Either::First(packet)) => match packet { 990 "mqtt receive failed with: {:?}",
987 Ok(mqtt::Packet::Publish(publish)) => publish, 991 crate::log::Debug2Format(&err)
988 Err(err) => { 992 );
989 crate::log::error!( 993 return Err(Error::new("mqtt receive failed"));
990 "mqtt receive failed with: {:?}", 994 }
991 crate::log::Debug2Format(&err) 995 _ => continue,
992 ); 996 },
993 return Err(Error::new("mqtt receive failed")); 997 embassy_futures::select::Either3::Second(_) => continue,
998 embassy_futures::select::Either3::Third(_) => {
999 if let Err(err) = client.ping().await {
1000 crate::log::error!(
1001 "mqtt ping failed with: {:?}",
1002 crate::log::Debug2Format(&err)
1003 );
1004 return Err(Error::new("mqtt ping failed"));
1005 }
1006 continue;
994 } 1007 }
995 _ => continue, 1008 };
996 },
997 Ok(embassy_futures::select::Either::Second(_)) => continue,
998 Err(_) => {
999 crate::log::error!("mqtt receive timed out");
1000 return Err(Error::new("mqtt receive timed out"));
1001 }
1002 };
1003 1009
1004 let entity = 'entity_search_block: { 1010 let entity = 'entity_search_block: {
1005 for entity in device.entities { 1011 for entity in device.entities {