aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-13 20:34:27 +0000
committerdiogo464 <[email protected]>2025-12-13 20:34:27 +0000
commit2889e53ed5218669c2e53ad68ed6366c08a1495a (patch)
tree67397fd0f274d4b5474dfa2bbf458122ee6bbedf
parenteb35788c70a1855a4691df77946821c33702dafb (diff)
parent871b67e886ed9853e3fa392ff33a05c7da09fe9b (diff)
Merge branch 'main' into embassy-git
-rw-r--r--src/lib.rs64
-rw-r--r--src/mqtt/mod.rs21
-rw-r--r--src/mqtt/rx.rs13
-rw-r--r--src/mqtt/tx.rs9
4 files changed, 74 insertions, 33 deletions
diff --git a/src/lib.rs b/src/lib.rs
index f28d2bd..7d31ac5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -72,6 +72,7 @@ pub use unit::*;
72 72
73const AVAILABLE_PAYLOAD: &str = "online"; 73const AVAILABLE_PAYLOAD: &str = "online";
74const NOT_AVAILABLE_PAYLOAD: &str = "offline"; 74const NOT_AVAILABLE_PAYLOAD: &str = "offline";
75const DEFAULT_KEEPALIVE_TIME: u16 = 30;
75 76
76#[derive(Debug)] 77#[derive(Debug)]
77pub struct Error(&'static str); 78pub struct Error(&'static str);
@@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
673 .expect("device availability buffer too small"); 674 .expect("device availability buffer too small");
674 let availability_topic = device.availability_topic_buffer.as_str(); 675 let availability_topic = device.availability_topic_buffer.as_str();
675 676
677 let mut ping_ticker =
678 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
676 let mut client = mqtt::Client::new(device.mqtt_resources, transport); 679 let mut client = mqtt::Client::new(device.mqtt_resources, transport);
677 let connect_params = mqtt::ConnectParams { 680 let connect_params = mqtt::ConnectParams {
678 will_topic: Some(availability_topic), 681 will_topic: Some(availability_topic),
679 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), 682 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
680 will_retain: true, 683 will_retain: true,
684 keepalive: Some(DEFAULT_KEEPALIVE_TIME),
681 ..Default::default() 685 ..Default::default()
682 }; 686 };
683 match embassy_time::with_timeout( 687 match embassy_time::with_timeout(
@@ -863,6 +867,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
863 } 867 }
864 } 868 }
865 869
870 let mut first_iteration_push = true;
866 'outer_loop: loop { 871 'outer_loop: loop {
867 use core::fmt::Write; 872 use core::fmt::Write;
868 873
@@ -874,7 +879,7 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
874 None => break, 879 None => break,
875 }; 880 };
876 881
877 if !entity.publish { 882 if !entity.publish && !first_iteration_push {
878 continue; 883 continue;
879 } 884 }
880 885
@@ -920,10 +925,12 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
920 device.publish_buffer.truncate(n); 925 device.publish_buffer.truncate(n);
921 } 926 }
922 _ => { 927 _ => {
923 crate::log::warn!( 928 if !first_iteration_push {
924 "entity '{}' requested state publish but its storage does not support it", 929 crate::log::warn!(
925 entity.config.id 930 "entity '{}' requested state publish but its storage does not support it",
926 ); 931 entity.config.id
932 );
933 }
927 continue; 934 continue;
928 } 935 }
929 } 936 }
@@ -970,32 +977,35 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
970 } 977 }
971 } 978 }
972 } 979 }
980 first_iteration_push = false;
973 981
974 let receive = client.receive(); 982 let receive = client.receive();
975 let waker = wait_on_atomic_waker(device.waker); 983 let waker = wait_on_atomic_waker(device.waker);
976 let publish = match embassy_time::with_timeout( 984 let publish =
977 MQTT_TIMEOUT, 985 match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await {
978 embassy_futures::select::select(receive, waker), 986 embassy_futures::select::Either3::First(packet) => match packet {
979 ) 987 Ok(mqtt::Packet::Publish(publish)) => publish,
980 .await 988 Err(err) => {
981 { 989 crate::log::error!(
982 Ok(embassy_futures::select::Either::First(packet)) => match packet { 990 "mqtt receive failed with: {:?}",
983 Ok(mqtt::Packet::Publish(publish)) => publish, 991 crate::log::Debug2Format(&err)
984 Err(err) => { 992 );
985 crate::log::error!( 993 return Err(Error::new("mqtt receive failed"));
986 "mqtt receive failed with: {:?}", 994 }
987 crate::log::Debug2Format(&err) 995 _ => continue,
988 ); 996 },
989 return Err(Error::new("mqtt receive failed")); 997 embassy_futures::select::Either3::Second(_) => continue,
998 embassy_futures::select::Either3::Third(_) => {
999 if let Err(err) = client.ping().await {
1000 crate::log::error!(
1001 "mqtt ping failed with: {:?}",
1002 crate::log::Debug2Format(&err)
1003 );
1004 return Err(Error::new("mqtt ping failed"));
1005 }
1006 continue;
990 } 1007 }
991 _ => continue, 1008 };
992 },
993 Ok(embassy_futures::select::Either::Second(_)) => continue,
994 Err(_) => {
995 crate::log::error!("mqtt receive timed out");
996 return Err(Error::new("mqtt receive timed out"));
997 }
998 };
999 1009
1000 let entity = 'entity_search_block: { 1010 let entity = 'entity_search_block: {
1001 for entity in device.entities { 1011 for entity in device.entities {
diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs
index 04e63b6..d502f85 100644
--- a/src/mqtt/mod.rs
+++ b/src/mqtt/mod.rs
@@ -120,6 +120,7 @@ pub enum Packet<'a> {
120 PublishAck(PublishAck), 120 PublishAck(PublishAck),
121 SubscribeAck(SubscribeAck), 121 SubscribeAck(SubscribeAck),
122 UnsubscribeAck(UnsubscribeAck), 122 UnsubscribeAck(UnsubscribeAck),
123 PingResponse,
123} 124}
124 125
125pub struct ClientResources< 126pub struct ClientResources<
@@ -199,7 +200,7 @@ where
199 will_topic: params.will_topic, 200 will_topic: params.will_topic,
200 will_payload: params.will_payload, 201 will_payload: params.will_payload,
201 will_retain: params.will_retain, 202 will_retain: params.will_retain,
202 keepalive: None, 203 keepalive: params.keepalive,
203 }, 204 },
204 ); 205 );
205 self.transport 206 self.transport
@@ -220,12 +221,23 @@ where
220 Err(Error::ConnectFailed(code)) 221 Err(Error::ConnectFailed(code))
221 } 222 }
222 } 223 }
223 _ => Err(Error::Protocol( 224 _ => Err(Error::Protocol("expected CONNACK packet after CONNECT")),
224 "expected CONNACK packet after CONNECT",
225 )),
226 } 225 }
227 } 226 }
228 227
228 pub async fn ping(&mut self) -> Result<(), Error<T>> {
229 let mut buffer = FieldBuffer::default();
230 tx::pingreq(&mut buffer);
231
232 self.transport
233 .write_fields(&buffer)
234 .await
235 .map_err(Error::Transport)?;
236 self.transport.flush().await.map_err(Error::Transport)?;
237
238 Ok(())
239 }
240
229 pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> { 241 pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> {
230 self.publish_with(topic, data, Default::default()).await 242 self.publish_with(topic, data, Default::default()).await
231 } 243 }
@@ -391,6 +403,7 @@ where
391 rx::Packet::UnsubscribeAck { packet_id } => { 403 rx::Packet::UnsubscribeAck { packet_id } => {
392 Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id })) 404 Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id }))
393 } 405 }
406 rx::Packet::PingResp => Ok(Packet::PingResponse),
394 } 407 }
395 } 408 }
396 409
diff --git a/src/mqtt/rx.rs b/src/mqtt/rx.rs
index e81171d..10b775a 100644
--- a/src/mqtt/rx.rs
+++ b/src/mqtt/rx.rs
@@ -65,6 +65,7 @@ pub enum Packet<'a> {
65 UnsubscribeAck { 65 UnsubscribeAck {
66 packet_id: PacketId, 66 packet_id: PacketId,
67 }, 67 },
68 PingResp,
68} 69}
69 70
70pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { 71pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
@@ -156,6 +157,15 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
156 let packet_id = PacketId::from(reader.read_u16()?); 157 let packet_id = PacketId::from(reader.read_u16()?);
157 Packet::UnsubscribeAck { packet_id } 158 Packet::UnsubscribeAck { packet_id }
158 } 159 }
160 protocol::PACKET_TYPE_PINGRESP => {
161 if packet_flags != 0 {
162 return Err(Error::InvalidPacket("PINGRESP flags must be zero"));
163 }
164 if packet_len != 0 {
165 return Err(Error::InvalidPacket("PINGRESP remaining length must be 0"));
166 }
167 Packet::PingResp
168 }
159 protocol::PACKET_TYPE_CONNECT 169 protocol::PACKET_TYPE_CONNECT
160 | protocol::PACKET_TYPE_PUBREC 170 | protocol::PACKET_TYPE_PUBREC
161 | protocol::PACKET_TYPE_PUBREL 171 | protocol::PACKET_TYPE_PUBREL
@@ -163,8 +173,7 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
163 | protocol::PACKET_TYPE_DISCONNECT 173 | protocol::PACKET_TYPE_DISCONNECT
164 | protocol::PACKET_TYPE_SUBSCRIBE 174 | protocol::PACKET_TYPE_SUBSCRIBE
165 | protocol::PACKET_TYPE_UNSUBSCRIBE 175 | protocol::PACKET_TYPE_UNSUBSCRIBE
166 | protocol::PACKET_TYPE_PINGREQ 176 | protocol::PACKET_TYPE_PINGREQ => {
167 | protocol::PACKET_TYPE_PINGRESP => {
168 return Err(Error::UnsupportedPacket { 177 return Err(Error::UnsupportedPacket {
169 packet_type, 178 packet_type,
170 packet_len, 179 packet_len,
diff --git a/src/mqtt/tx.rs b/src/mqtt/tx.rs
index cdf1c75..7a4d443 100644
--- a/src/mqtt/tx.rs
+++ b/src/mqtt/tx.rs
@@ -201,3 +201,12 @@ pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) {
201 buffer.push(Field::U16(packet_id.into())); 201 buffer.push(Field::U16(packet_id.into()));
202} 202}
203 203
204pub fn pingreq(buffer: &mut FieldBuffer) {
205 // PINGREQ has no variable header or payload
206 buffer.push(Field::U8(protocol::create_header_control(
207 protocol::PACKET_TYPE_PINGREQ,
208 0,
209 )));
210 buffer.push(Field::VarInt(0));
211}
212