diff options
| author | diogo464 <[email protected]> | 2025-12-13 20:34:27 +0000 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-12-13 20:34:27 +0000 |
| commit | 2889e53ed5218669c2e53ad68ed6366c08a1495a (patch) | |
| tree | 67397fd0f274d4b5474dfa2bbf458122ee6bbedf /src/lib.rs | |
| parent | eb35788c70a1855a4691df77946821c33702dafb (diff) | |
| parent | 871b67e886ed9853e3fa392ff33a05c7da09fe9b (diff) | |
Merge branch 'main' into embassy-git
Diffstat (limited to 'src/lib.rs')
| -rw-r--r-- | src/lib.rs | 64 |
1 files changed, 37 insertions, 27 deletions
| @@ -72,6 +72,7 @@ pub use unit::*; | |||
| 72 | 72 | ||
| 73 | const AVAILABLE_PAYLOAD: &str = "online"; | 73 | const AVAILABLE_PAYLOAD: &str = "online"; |
| 74 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; | 74 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; |
| 75 | const DEFAULT_KEEPALIVE_TIME: u16 = 30; | ||
| 75 | 76 | ||
| 76 | #[derive(Debug)] | 77 | #[derive(Debug)] |
| 77 | pub struct Error(&'static str); | 78 | pub struct Error(&'static str); |
| @@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 673 | .expect("device availability buffer too small"); | 674 | .expect("device availability buffer too small"); |
| 674 | let availability_topic = device.availability_topic_buffer.as_str(); | 675 | let availability_topic = device.availability_topic_buffer.as_str(); |
| 675 | 676 | ||
| 677 | let mut ping_ticker = | ||
| 678 | embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); | ||
| 676 | let mut client = mqtt::Client::new(device.mqtt_resources, transport); | 679 | let mut client = mqtt::Client::new(device.mqtt_resources, transport); |
| 677 | let connect_params = mqtt::ConnectParams { | 680 | let connect_params = mqtt::ConnectParams { |
| 678 | will_topic: Some(availability_topic), | 681 | will_topic: Some(availability_topic), |
| 679 | will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), | 682 | will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), |
| 680 | will_retain: true, | 683 | will_retain: true, |
| 684 | keepalive: Some(DEFAULT_KEEPALIVE_TIME), | ||
| 681 | ..Default::default() | 685 | ..Default::default() |
| 682 | }; | 686 | }; |
| 683 | match embassy_time::with_timeout( | 687 | match embassy_time::with_timeout( |
| @@ -863,6 +867,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 863 | } | 867 | } |
| 864 | } | 868 | } |
| 865 | 869 | ||
| 870 | let mut first_iteration_push = true; | ||
| 866 | 'outer_loop: loop { | 871 | 'outer_loop: loop { |
| 867 | use core::fmt::Write; | 872 | use core::fmt::Write; |
| 868 | 873 | ||
| @@ -874,7 +879,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 874 | None => break, | 879 | None => break, |
| 875 | }; | 880 | }; |
| 876 | 881 | ||
| 877 | if !entity.publish { | 882 | if !entity.publish && !first_iteration_push { |
| 878 | continue; | 883 | continue; |
| 879 | } | 884 | } |
| 880 | 885 | ||
| @@ -920,10 +925,12 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 920 | device.publish_buffer.truncate(n); | 925 | device.publish_buffer.truncate(n); |
| 921 | } | 926 | } |
| 922 | _ => { | 927 | _ => { |
| 923 | crate::log::warn!( | 928 | if !first_iteration_push { |
| 924 | "entity '{}' requested state publish but its storage does not support it", | 929 | crate::log::warn!( |
| 925 | entity.config.id | 930 | "entity '{}' requested state publish but its storage does not support it", |
| 926 | ); | 931 | entity.config.id |
| 932 | ); | ||
| 933 | } | ||
| 927 | continue; | 934 | continue; |
| 928 | } | 935 | } |
| 929 | } | 936 | } |
| @@ -970,32 +977,35 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 970 | } | 977 | } |
| 971 | } | 978 | } |
| 972 | } | 979 | } |
| 980 | first_iteration_push = false; | ||
| 973 | 981 | ||
| 974 | let receive = client.receive(); | 982 | let receive = client.receive(); |
| 975 | let waker = wait_on_atomic_waker(device.waker); | 983 | let waker = wait_on_atomic_waker(device.waker); |
| 976 | let publish = match embassy_time::with_timeout( | 984 | let publish = |
| 977 | MQTT_TIMEOUT, | 985 | match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await { |
| 978 | embassy_futures::select::select(receive, waker), | 986 | embassy_futures::select::Either3::First(packet) => match packet { |
| 979 | ) | 987 | Ok(mqtt::Packet::Publish(publish)) => publish, |
| 980 | .await | 988 | Err(err) => { |
| 981 | { | 989 | crate::log::error!( |
| 982 | Ok(embassy_futures::select::Either::First(packet)) => match packet { | 990 | "mqtt receive failed with: {:?}", |
| 983 | Ok(mqtt::Packet::Publish(publish)) => publish, | 991 | crate::log::Debug2Format(&err) |
| 984 | Err(err) => { | 992 | ); |
| 985 | crate::log::error!( | 993 | return Err(Error::new("mqtt receive failed")); |
| 986 | "mqtt receive failed with: {:?}", | 994 | } |
| 987 | crate::log::Debug2Format(&err) | 995 | _ => continue, |
| 988 | ); | 996 | }, |
| 989 | return Err(Error::new("mqtt receive failed")); | 997 | embassy_futures::select::Either3::Second(_) => continue, |
| 998 | embassy_futures::select::Either3::Third(_) => { | ||
| 999 | if let Err(err) = client.ping().await { | ||
| 1000 | crate::log::error!( | ||
| 1001 | "mqtt ping failed with: {:?}", | ||
| 1002 | crate::log::Debug2Format(&err) | ||
| 1003 | ); | ||
| 1004 | return Err(Error::new("mqtt ping failed")); | ||
| 1005 | } | ||
| 1006 | continue; | ||
| 990 | } | 1007 | } |
| 991 | _ => continue, | 1008 | }; |
| 992 | }, | ||
| 993 | Ok(embassy_futures::select::Either::Second(_)) => continue, | ||
| 994 | Err(_) => { | ||
| 995 | crate::log::error!("mqtt receive timed out"); | ||
| 996 | return Err(Error::new("mqtt receive timed out")); | ||
| 997 | } | ||
| 998 | }; | ||
| 999 | 1009 | ||
| 1000 | let entity = 'entity_search_block: { | 1010 | let entity = 'entity_search_block: { |
| 1001 | for entity in device.entities { | 1011 | for entity in device.entities { |
