aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-12-13 20:32:31 +0000
committerdiogo464 <[email protected]>2025-12-13 20:32:31 +0000
commit871b67e886ed9853e3fa392ff33a05c7da09fe9b (patch)
treee08c9a0312f1e37187d8e6860b8b8221a7e25a77
parentb456db36b7d973154f0a10bc1adca235de2c5c12 (diff)
use 30s mqtt keepalive
-rw-r--r--src/lib.rs50
-rw-r--r--src/mqtt/mod.rs21
-rw-r--r--src/mqtt/rx.rs13
-rw-r--r--src/mqtt/tx.rs9
4 files changed, 65 insertions, 28 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 64057a4..7d31ac5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -72,6 +72,7 @@ pub use unit::*;
72 72
73const AVAILABLE_PAYLOAD: &str = "online"; 73const AVAILABLE_PAYLOAD: &str = "online";
74const NOT_AVAILABLE_PAYLOAD: &str = "offline"; 74const NOT_AVAILABLE_PAYLOAD: &str = "offline";
75const DEFAULT_KEEPALIVE_TIME: u16 = 30;
75 76
76#[derive(Debug)] 77#[derive(Debug)]
77pub struct Error(&'static str); 78pub struct Error(&'static str);
@@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
673 .expect("device availability buffer too small"); 674 .expect("device availability buffer too small");
674 let availability_topic = device.availability_topic_buffer.as_str(); 675 let availability_topic = device.availability_topic_buffer.as_str();
675 676
677 let mut ping_ticker =
678 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
676 let mut client = mqtt::Client::new(device.mqtt_resources, transport); 679 let mut client = mqtt::Client::new(device.mqtt_resources, transport);
677 let connect_params = mqtt::ConnectParams { 680 let connect_params = mqtt::ConnectParams {
678 will_topic: Some(availability_topic), 681 will_topic: Some(availability_topic),
679 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), 682 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
680 will_retain: true, 683 will_retain: true,
684 keepalive: Some(DEFAULT_KEEPALIVE_TIME),
681 ..Default::default() 685 ..Default::default()
682 }; 686 };
683 match embassy_time::with_timeout( 687 match embassy_time::with_timeout(
@@ -977,29 +981,31 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re
977 981
978 let receive = client.receive(); 982 let receive = client.receive();
979 let waker = wait_on_atomic_waker(device.waker); 983 let waker = wait_on_atomic_waker(device.waker);
980 let publish = match embassy_time::with_timeout( 984 let publish =
981 MQTT_TIMEOUT, 985 match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await {
982 embassy_futures::select::select(receive, waker), 986 embassy_futures::select::Either3::First(packet) => match packet {
983 ) 987 Ok(mqtt::Packet::Publish(publish)) => publish,
984 .await 988 Err(err) => {
985 { 989 crate::log::error!(
986 Ok(embassy_futures::select::Either::First(packet)) => match packet { 990 "mqtt receive failed with: {:?}",
987 Ok(mqtt::Packet::Publish(publish)) => publish, 991 crate::log::Debug2Format(&err)
988 Err(err) => { 992 );
989 crate::log::error!( 993 return Err(Error::new("mqtt receive failed"));
990 "mqtt receive failed with: {:?}", 994 }
991 crate::log::Debug2Format(&err) 995 _ => continue,
992 ); 996 },
993 return Err(Error::new("mqtt receive failed")); 997 embassy_futures::select::Either3::Second(_) => continue,
998 embassy_futures::select::Either3::Third(_) => {
999 if let Err(err) = client.ping().await {
1000 crate::log::error!(
1001 "mqtt ping failed with: {:?}",
1002 crate::log::Debug2Format(&err)
1003 );
1004 return Err(Error::new("mqtt ping failed"));
1005 }
1006 continue;
994 } 1007 }
995 _ => continue, 1008 };
996 },
997 Ok(embassy_futures::select::Either::Second(_)) => continue,
998 Err(_) => {
999 crate::log::error!("mqtt receive timed out");
1000 return Err(Error::new("mqtt receive timed out"));
1001 }
1002 };
1003 1009
1004 let entity = 'entity_search_block: { 1010 let entity = 'entity_search_block: {
1005 for entity in device.entities { 1011 for entity in device.entities {
diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs
index 04e63b6..d502f85 100644
--- a/src/mqtt/mod.rs
+++ b/src/mqtt/mod.rs
@@ -120,6 +120,7 @@ pub enum Packet<'a> {
120 PublishAck(PublishAck), 120 PublishAck(PublishAck),
121 SubscribeAck(SubscribeAck), 121 SubscribeAck(SubscribeAck),
122 UnsubscribeAck(UnsubscribeAck), 122 UnsubscribeAck(UnsubscribeAck),
123 PingResponse,
123} 124}
124 125
125pub struct ClientResources< 126pub struct ClientResources<
@@ -199,7 +200,7 @@ where
199 will_topic: params.will_topic, 200 will_topic: params.will_topic,
200 will_payload: params.will_payload, 201 will_payload: params.will_payload,
201 will_retain: params.will_retain, 202 will_retain: params.will_retain,
202 keepalive: None, 203 keepalive: params.keepalive,
203 }, 204 },
204 ); 205 );
205 self.transport 206 self.transport
@@ -220,12 +221,23 @@ where
220 Err(Error::ConnectFailed(code)) 221 Err(Error::ConnectFailed(code))
221 } 222 }
222 } 223 }
223 _ => Err(Error::Protocol( 224 _ => Err(Error::Protocol("expected CONNACK packet after CONNECT")),
224 "expected CONNACK packet after CONNECT",
225 )),
226 } 225 }
227 } 226 }
228 227
228 pub async fn ping(&mut self) -> Result<(), Error<T>> {
229 let mut buffer = FieldBuffer::default();
230 tx::pingreq(&mut buffer);
231
232 self.transport
233 .write_fields(&buffer)
234 .await
235 .map_err(Error::Transport)?;
236 self.transport.flush().await.map_err(Error::Transport)?;
237
238 Ok(())
239 }
240
229 pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> { 241 pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> {
230 self.publish_with(topic, data, Default::default()).await 242 self.publish_with(topic, data, Default::default()).await
231 } 243 }
@@ -391,6 +403,7 @@ where
391 rx::Packet::UnsubscribeAck { packet_id } => { 403 rx::Packet::UnsubscribeAck { packet_id } => {
392 Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id })) 404 Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id }))
393 } 405 }
406 rx::Packet::PingResp => Ok(Packet::PingResponse),
394 } 407 }
395 } 408 }
396 409
diff --git a/src/mqtt/rx.rs b/src/mqtt/rx.rs
index e81171d..10b775a 100644
--- a/src/mqtt/rx.rs
+++ b/src/mqtt/rx.rs
@@ -65,6 +65,7 @@ pub enum Packet<'a> {
65 UnsubscribeAck { 65 UnsubscribeAck {
66 packet_id: PacketId, 66 packet_id: PacketId,
67 }, 67 },
68 PingResp,
68} 69}
69 70
70pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { 71pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
@@ -156,6 +157,15 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
156 let packet_id = PacketId::from(reader.read_u16()?); 157 let packet_id = PacketId::from(reader.read_u16()?);
157 Packet::UnsubscribeAck { packet_id } 158 Packet::UnsubscribeAck { packet_id }
158 } 159 }
160 protocol::PACKET_TYPE_PINGRESP => {
161 if packet_flags != 0 {
162 return Err(Error::InvalidPacket("PINGRESP flags must be zero"));
163 }
164 if packet_len != 0 {
165 return Err(Error::InvalidPacket("PINGRESP remaining length must be 0"));
166 }
167 Packet::PingResp
168 }
159 protocol::PACKET_TYPE_CONNECT 169 protocol::PACKET_TYPE_CONNECT
160 | protocol::PACKET_TYPE_PUBREC 170 | protocol::PACKET_TYPE_PUBREC
161 | protocol::PACKET_TYPE_PUBREL 171 | protocol::PACKET_TYPE_PUBREL
@@ -163,8 +173,7 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
163 | protocol::PACKET_TYPE_DISCONNECT 173 | protocol::PACKET_TYPE_DISCONNECT
164 | protocol::PACKET_TYPE_SUBSCRIBE 174 | protocol::PACKET_TYPE_SUBSCRIBE
165 | protocol::PACKET_TYPE_UNSUBSCRIBE 175 | protocol::PACKET_TYPE_UNSUBSCRIBE
166 | protocol::PACKET_TYPE_PINGREQ 176 | protocol::PACKET_TYPE_PINGREQ => {
167 | protocol::PACKET_TYPE_PINGRESP => {
168 return Err(Error::UnsupportedPacket { 177 return Err(Error::UnsupportedPacket {
169 packet_type, 178 packet_type,
170 packet_len, 179 packet_len,
diff --git a/src/mqtt/tx.rs b/src/mqtt/tx.rs
index cdf1c75..7a4d443 100644
--- a/src/mqtt/tx.rs
+++ b/src/mqtt/tx.rs
@@ -201,3 +201,12 @@ pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) {
201 buffer.push(Field::U16(packet_id.into())); 201 buffer.push(Field::U16(packet_id.into()));
202} 202}
203 203
204pub fn pingreq(buffer: &mut FieldBuffer) {
205 // PINGREQ has no variable header or payload
206 buffer.push(Field::U8(protocol::create_header_control(
207 protocol::PACKET_TYPE_PINGREQ,
208 0,
209 )));
210 buffer.push(Field::VarInt(0));
211}
212