aboutsummaryrefslogtreecommitdiff
path: root/src/mqtt/rx.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mqtt/rx.rs')
-rw-r--r--src/mqtt/rx.rs18
1 files changed, 13 insertions, 5 deletions
diff --git a/src/mqtt/rx.rs b/src/mqtt/rx.rs
index 10b775a..0dfd858 100644
--- a/src/mqtt/rx.rs
+++ b/src/mqtt/rx.rs
@@ -37,7 +37,9 @@ impl From<varint::Error> for Error {
37 fn from(value: varint::Error) -> Self { 37 fn from(value: varint::Error) -> Self {
38 match value { 38 match value {
39 varint::Error::NeedMoreData => Self::NeedMoreData, 39 varint::Error::NeedMoreData => Self::NeedMoreData,
40 varint::Error::InvalidVarInt => Self::InvalidPacket("invalid variable integer encoding"), 40 varint::Error::InvalidVarInt => {
41 Self::InvalidPacket("invalid variable integer encoding")
42 }
41 } 43 }
42 } 44 }
43} 45}
@@ -92,8 +94,10 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
92 protocol::PACKET_TYPE_PUBLISH => { 94 protocol::PACKET_TYPE_PUBLISH => {
93 // Extract flags from the fixed header 95 // Extract flags from the fixed header
94 let retain = (packet_flags & protocol::PUBLISH_FLAG_RETAIN) != 0; 96 let retain = (packet_flags & protocol::PUBLISH_FLAG_RETAIN) != 0;
95 let qos_value = (packet_flags & protocol::PUBLISH_FLAG_QOS_MASK) >> protocol::PUBLISH_FLAG_QOS_SHIFT; 97 let qos_value = (packet_flags & protocol::PUBLISH_FLAG_QOS_MASK)
96 let qos = Qos::from_u8(qos_value).ok_or(Error::InvalidPacket("PUBLISH has invalid QoS value"))?; 98 >> protocol::PUBLISH_FLAG_QOS_SHIFT;
99 let qos = Qos::from_u8(qos_value)
100 .ok_or(Error::InvalidPacket("PUBLISH has invalid QoS value"))?;
97 let dup = (packet_flags & protocol::PUBLISH_FLAG_DUP) != 0; 101 let dup = (packet_flags & protocol::PUBLISH_FLAG_DUP) != 0;
98 102
99 // Track position after fixed header to calculate data length 103 // Track position after fixed header to calculate data length
@@ -113,7 +117,9 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
113 let variable_header_len = reader.num_read() - variable_header_start; 117 let variable_header_len = reader.num_read() - variable_header_start;
114 let data_len = (packet_len as usize) 118 let data_len = (packet_len as usize)
115 .checked_sub(variable_header_len) 119 .checked_sub(variable_header_len)
116 .ok_or(Error::InvalidPacket("PUBLISH remaining length is too short for headers"))?; 120 .ok_or(Error::InvalidPacket(
121 "PUBLISH remaining length is too short for headers",
122 ))?;
117 123
118 Packet::Publish { 124 Packet::Publish {
119 topic, 125 topic,
@@ -140,7 +146,9 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
140 } 146 }
141 if packet_len < 3 { 147 if packet_len < 3 {
142 // Minimum: 2 bytes packet ID + 1 byte return code 148 // Minimum: 2 bytes packet ID + 1 byte return code
143 return Err(Error::InvalidPacket("SUBACK remaining length must be at least 3")); 149 return Err(Error::InvalidPacket(
150 "SUBACK remaining length must be at least 3",
151 ));
144 } 152 }
145 let packet_id = PacketId::from(reader.read_u16()?); 153 let packet_id = PacketId::from(reader.read_u16()?);
146 let return_code = reader.read_u8()?; 154 let return_code = reader.read_u8()?;