diff options
| -rw-r--r-- | src/lib.rs | 50 | ||||
| -rw-r--r-- | src/mqtt/mod.rs | 21 | ||||
| -rw-r--r-- | src/mqtt/rx.rs | 13 | ||||
| -rw-r--r-- | src/mqtt/tx.rs | 9 |
4 files changed, 65 insertions, 28 deletions
| @@ -72,6 +72,7 @@ pub use unit::*; | |||
| 72 | 72 | ||
| 73 | const AVAILABLE_PAYLOAD: &str = "online"; | 73 | const AVAILABLE_PAYLOAD: &str = "online"; |
| 74 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; | 74 | const NOT_AVAILABLE_PAYLOAD: &str = "offline"; |
| 75 | const DEFAULT_KEEPALIVE_TIME: u16 = 30; | ||
| 75 | 76 | ||
| 76 | #[derive(Debug)] | 77 | #[derive(Debug)] |
| 77 | pub struct Error(&'static str); | 78 | pub struct Error(&'static str); |
| @@ -673,11 +674,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 673 | .expect("device availability buffer too small"); | 674 | .expect("device availability buffer too small"); |
| 674 | let availability_topic = device.availability_topic_buffer.as_str(); | 675 | let availability_topic = device.availability_topic_buffer.as_str(); |
| 675 | 676 | ||
| 677 | let mut ping_ticker = | ||
| 678 | embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME))); | ||
| 676 | let mut client = mqtt::Client::new(device.mqtt_resources, transport); | 679 | let mut client = mqtt::Client::new(device.mqtt_resources, transport); |
| 677 | let connect_params = mqtt::ConnectParams { | 680 | let connect_params = mqtt::ConnectParams { |
| 678 | will_topic: Some(availability_topic), | 681 | will_topic: Some(availability_topic), |
| 679 | will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), | 682 | will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()), |
| 680 | will_retain: true, | 683 | will_retain: true, |
| 684 | keepalive: Some(DEFAULT_KEEPALIVE_TIME), | ||
| 681 | ..Default::default() | 685 | ..Default::default() |
| 682 | }; | 686 | }; |
| 683 | match embassy_time::with_timeout( | 687 | match embassy_time::with_timeout( |
| @@ -977,29 +981,31 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 977 | 981 | ||
| 978 | let receive = client.receive(); | 982 | let receive = client.receive(); |
| 979 | let waker = wait_on_atomic_waker(device.waker); | 983 | let waker = wait_on_atomic_waker(device.waker); |
| 980 | let publish = match embassy_time::with_timeout( | 984 | let publish = |
| 981 | MQTT_TIMEOUT, | 985 | match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await { |
| 982 | embassy_futures::select::select(receive, waker), | 986 | embassy_futures::select::Either3::First(packet) => match packet { |
| 983 | ) | 987 | Ok(mqtt::Packet::Publish(publish)) => publish, |
| 984 | .await | 988 | Err(err) => { |
| 985 | { | 989 | crate::log::error!( |
| 986 | Ok(embassy_futures::select::Either::First(packet)) => match packet { | 990 | "mqtt receive failed with: {:?}", |
| 987 | Ok(mqtt::Packet::Publish(publish)) => publish, | 991 | crate::log::Debug2Format(&err) |
| 988 | Err(err) => { | 992 | ); |
| 989 | crate::log::error!( | 993 | return Err(Error::new("mqtt receive failed")); |
| 990 | "mqtt receive failed with: {:?}", | 994 | } |
| 991 | crate::log::Debug2Format(&err) | 995 | _ => continue, |
| 992 | ); | 996 | }, |
| 993 | return Err(Error::new("mqtt receive failed")); | 997 | embassy_futures::select::Either3::Second(_) => continue, |
| 998 | embassy_futures::select::Either3::Third(_) => { | ||
| 999 | if let Err(err) = client.ping().await { | ||
| 1000 | crate::log::error!( | ||
| 1001 | "mqtt ping failed with: {:?}", | ||
| 1002 | crate::log::Debug2Format(&err) | ||
| 1003 | ); | ||
| 1004 | return Err(Error::new("mqtt ping failed")); | ||
| 1005 | } | ||
| 1006 | continue; | ||
| 994 | } | 1007 | } |
| 995 | _ => continue, | 1008 | }; |
| 996 | }, | ||
| 997 | Ok(embassy_futures::select::Either::Second(_)) => continue, | ||
| 998 | Err(_) => { | ||
| 999 | crate::log::error!("mqtt receive timed out"); | ||
| 1000 | return Err(Error::new("mqtt receive timed out")); | ||
| 1001 | } | ||
| 1002 | }; | ||
| 1003 | 1009 | ||
| 1004 | let entity = 'entity_search_block: { | 1010 | let entity = 'entity_search_block: { |
| 1005 | for entity in device.entities { | 1011 | for entity in device.entities { |
diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 04e63b6..d502f85 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs | |||
| @@ -120,6 +120,7 @@ pub enum Packet<'a> { | |||
| 120 | PublishAck(PublishAck), | 120 | PublishAck(PublishAck), |
| 121 | SubscribeAck(SubscribeAck), | 121 | SubscribeAck(SubscribeAck), |
| 122 | UnsubscribeAck(UnsubscribeAck), | 122 | UnsubscribeAck(UnsubscribeAck), |
| 123 | PingResponse, | ||
| 123 | } | 124 | } |
| 124 | 125 | ||
| 125 | pub struct ClientResources< | 126 | pub struct ClientResources< |
| @@ -199,7 +200,7 @@ where | |||
| 199 | will_topic: params.will_topic, | 200 | will_topic: params.will_topic, |
| 200 | will_payload: params.will_payload, | 201 | will_payload: params.will_payload, |
| 201 | will_retain: params.will_retain, | 202 | will_retain: params.will_retain, |
| 202 | keepalive: None, | 203 | keepalive: params.keepalive, |
| 203 | }, | 204 | }, |
| 204 | ); | 205 | ); |
| 205 | self.transport | 206 | self.transport |
| @@ -220,12 +221,23 @@ where | |||
| 220 | Err(Error::ConnectFailed(code)) | 221 | Err(Error::ConnectFailed(code)) |
| 221 | } | 222 | } |
| 222 | } | 223 | } |
| 223 | _ => Err(Error::Protocol( | 224 | _ => Err(Error::Protocol("expected CONNACK packet after CONNECT")), |
| 224 | "expected CONNACK packet after CONNECT", | ||
| 225 | )), | ||
| 226 | } | 225 | } |
| 227 | } | 226 | } |
| 228 | 227 | ||
| 228 | pub async fn ping(&mut self) -> Result<(), Error<T>> { | ||
| 229 | let mut buffer = FieldBuffer::default(); | ||
| 230 | tx::pingreq(&mut buffer); | ||
| 231 | |||
| 232 | self.transport | ||
| 233 | .write_fields(&buffer) | ||
| 234 | .await | ||
| 235 | .map_err(Error::Transport)?; | ||
| 236 | self.transport.flush().await.map_err(Error::Transport)?; | ||
| 237 | |||
| 238 | Ok(()) | ||
| 239 | } | ||
| 240 | |||
| 229 | pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> { | 241 | pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> { |
| 230 | self.publish_with(topic, data, Default::default()).await | 242 | self.publish_with(topic, data, Default::default()).await |
| 231 | } | 243 | } |
| @@ -391,6 +403,7 @@ where | |||
| 391 | rx::Packet::UnsubscribeAck { packet_id } => { | 403 | rx::Packet::UnsubscribeAck { packet_id } => { |
| 392 | Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id })) | 404 | Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id })) |
| 393 | } | 405 | } |
| 406 | rx::Packet::PingResp => Ok(Packet::PingResponse), | ||
| 394 | } | 407 | } |
| 395 | } | 408 | } |
| 396 | 409 | ||
diff --git a/src/mqtt/rx.rs b/src/mqtt/rx.rs index e81171d..10b775a 100644 --- a/src/mqtt/rx.rs +++ b/src/mqtt/rx.rs | |||
| @@ -65,6 +65,7 @@ pub enum Packet<'a> { | |||
| 65 | UnsubscribeAck { | 65 | UnsubscribeAck { |
| 66 | packet_id: PacketId, | 66 | packet_id: PacketId, |
| 67 | }, | 67 | }, |
| 68 | PingResp, | ||
| 68 | } | 69 | } |
| 69 | 70 | ||
| 70 | pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | 71 | pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { |
| @@ -156,6 +157,15 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | |||
| 156 | let packet_id = PacketId::from(reader.read_u16()?); | 157 | let packet_id = PacketId::from(reader.read_u16()?); |
| 157 | Packet::UnsubscribeAck { packet_id } | 158 | Packet::UnsubscribeAck { packet_id } |
| 158 | } | 159 | } |
| 160 | protocol::PACKET_TYPE_PINGRESP => { | ||
| 161 | if packet_flags != 0 { | ||
| 162 | return Err(Error::InvalidPacket("PINGRESP flags must be zero")); | ||
| 163 | } | ||
| 164 | if packet_len != 0 { | ||
| 165 | return Err(Error::InvalidPacket("PINGRESP remaining length must be 0")); | ||
| 166 | } | ||
| 167 | Packet::PingResp | ||
| 168 | } | ||
| 159 | protocol::PACKET_TYPE_CONNECT | 169 | protocol::PACKET_TYPE_CONNECT |
| 160 | | protocol::PACKET_TYPE_PUBREC | 170 | | protocol::PACKET_TYPE_PUBREC |
| 161 | | protocol::PACKET_TYPE_PUBREL | 171 | | protocol::PACKET_TYPE_PUBREL |
| @@ -163,8 +173,7 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | |||
| 163 | | protocol::PACKET_TYPE_DISCONNECT | 173 | | protocol::PACKET_TYPE_DISCONNECT |
| 164 | | protocol::PACKET_TYPE_SUBSCRIBE | 174 | | protocol::PACKET_TYPE_SUBSCRIBE |
| 165 | | protocol::PACKET_TYPE_UNSUBSCRIBE | 175 | | protocol::PACKET_TYPE_UNSUBSCRIBE |
| 166 | | protocol::PACKET_TYPE_PINGREQ | 176 | | protocol::PACKET_TYPE_PINGREQ => { |
| 167 | | protocol::PACKET_TYPE_PINGRESP => { | ||
| 168 | return Err(Error::UnsupportedPacket { | 177 | return Err(Error::UnsupportedPacket { |
| 169 | packet_type, | 178 | packet_type, |
| 170 | packet_len, | 179 | packet_len, |
diff --git a/src/mqtt/tx.rs b/src/mqtt/tx.rs index cdf1c75..7a4d443 100644 --- a/src/mqtt/tx.rs +++ b/src/mqtt/tx.rs | |||
| @@ -201,3 +201,12 @@ pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) { | |||
| 201 | buffer.push(Field::U16(packet_id.into())); | 201 | buffer.push(Field::U16(packet_id.into())); |
| 202 | } | 202 | } |
| 203 | 203 | ||
| 204 | pub fn pingreq(buffer: &mut FieldBuffer) { | ||
| 205 | // PINGREQ has no variable header or payload | ||
| 206 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 207 | protocol::PACKET_TYPE_PINGREQ, | ||
| 208 | 0, | ||
| 209 | ))); | ||
| 210 | buffer.push(Field::VarInt(0)); | ||
| 211 | } | ||
| 212 | |||
