diff options
| author | diogo464 <[email protected]> | 2025-12-13 20:32:31 +0000 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-12-13 20:32:31 +0000 |
| commit | 871b67e886ed9853e3fa392ff33a05c7da09fe9b (patch) | |
| tree | e08c9a0312f1e37187d8e6860b8b8221a7e25a77 /src/lib.rs | |
| parent | b456db36b7d973154f0a10bc1adca235de2c5c12 (diff) | |
use 30s mqtt keepalive
Diffstat (limited to 'src/lib.rs')
| -rw-r--r-- | src/lib.rs | 50 |
1 files changed, 28 insertions, 22 deletions
| @@ -72,6 +72,7 @@ pub use unit::*; | |||
| 72 | 72 | ||
| 73 | const AVAILABLE_PAYLOAD: &str = "online"; | 73 | const AVAILABLE_PAYLOAD: &str = "online"; |
| 74 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; | 74 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; |
| 75 | const DEFAULT_KEEPALIVE_TIME: u16 = 30; | ||
| 75 | 76 | ||
| 76 | #[derive(Debug)] | 77 | #[derive(Debug)] |
| 77 | pub struct Error(&'static str); | 78 | pub struct Error(&'static str); |
| @@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 673 | .expect("device availability buffer too small"); | 674 | .expect("device availability buffer too small"); |
| 674 | let availability_topic = device.availability_topic_buffer.as_str(); | 675 | let availability_topic = device.availability_topic_buffer.as_str(); |
| 675 | 676 | ||
| 677 | let mut ping_ticker = | ||
| 678 | embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); | ||
| 676 | let mut client = mqtt::Client::new(device.mqtt_resources, transport); | 679 | let mut client = mqtt::Client::new(device.mqtt_resources, transport); |
| 677 | let connect_params = mqtt::ConnectParams { | 680 | let connect_params = mqtt::ConnectParams { |
| 678 | will_topic: Some(availability_topic), | 681 | will_topic: Some(availability_topic), |
| 679 | will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), | 682 | will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), |
| 680 | will_retain: true, | 683 | will_retain: true, |
| 684 | keepalive: Some(DEFAULT_KEEPALIVE_TIME), | ||
| 681 | ..Default::default() | 685 | ..Default::default() |
| 682 | }; | 686 | }; |
| 683 | match embassy_time::with_timeout( | 687 | match embassy_time::with_timeout( |
| @@ -977,29 +981,31 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 977 | 981 | ||
| 978 | let receive = client.receive(); | 982 | let receive = client.receive(); |
| 979 | let waker = wait_on_atomic_waker(device.waker); | 983 | let waker = wait_on_atomic_waker(device.waker); |
| 980 | let publish = match embassy_time::with_timeout( | 984 | let publish = |
| 981 | MQTT_TIMEOUT, | 985 | match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await { |
| 982 | embassy_futures::select::select(receive, waker), | 986 | embassy_futures::select::Either3::First(packet) => match packet { |
| 983 | ) | 987 | Ok(mqtt::Packet::Publish(publish)) => publish, |
| 984 | .await | 988 | Err(err) => { |
| 985 | { | 989 | crate::log::error!( |
| 986 | Ok(embassy_futures::select::Either::First(packet)) => match packet { | 990 | "mqtt receive failed with: {:?}", |
| 987 | Ok(mqtt::Packet::Publish(publish)) => publish, | 991 | crate::log::Debug2Format(&err) |
| 988 | Err(err) => { | 992 | ); |
| 989 | crate::log::error!( | 993 | return Err(Error::new("mqtt receive failed")); |
| 990 | "mqtt receive failed with: {:?}", | 994 | } |
| 991 | crate::log::Debug2Format(&err) | 995 | _ => continue, |
| 992 | ); | 996 | }, |
| 993 | return Err(Error::new("mqtt receive failed")); | 997 | embassy_futures::select::Either3::Second(_) => continue, |
| 998 | embassy_futures::select::Either3::Third(_) => { | ||
| 999 | if let Err(err) = client.ping().await { | ||
| 1000 | crate::log::error!( | ||
| 1001 | "mqtt ping failed with: {:?}", | ||
| 1002 | crate::log::Debug2Format(&err) | ||
| 1003 | ); | ||
| 1004 | return Err(Error::new("mqtt ping failed")); | ||
| 1005 | } | ||
| 1006 | continue; | ||
| 994 | } | 1007 | } |
| 995 | _ => continue, | 1008 | }; |
| 996 | }, | ||
| 997 | Ok(embassy_futures::select::Either::Second(_)) => continue, | ||
| 998 | Err(_) => { | ||
| 999 | crate::log::error!("mqtt receive timed out"); | ||
| 1000 | return Err(Error::new("mqtt receive timed out")); | ||
| 1001 | } | ||
| 1002 | }; | ||
| 1003 | 1009 | ||
| 1004 | let entity = 'entity_search_block: { | 1010 | let entity = 'entity_search_block: { |
| 1005 | for entity in device.entities { | 1011 | for entity in device.entities { |
