diff options
Diffstat (limited to 'src/mqtt/rx.rs')
| -rw-r--r-- | src/mqtt/rx.rs | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/src/mqtt/rx.rs b/src/mqtt/rx.rs index 10b775a..0dfd858 100644 --- a/src/mqtt/rx.rs +++ b/src/mqtt/rx.rs | |||
| @@ -37,7 +37,9 @@ impl From<varint::Error> for Error { | |||
| 37 | fn from(value: varint::Error) -> Self { | 37 | fn from(value: varint::Error) -> Self { |
| 38 | match value { | 38 | match value { |
| 39 | varint::Error::NeedMoreData => Self::NeedMoreData, | 39 | varint::Error::NeedMoreData => Self::NeedMoreData, |
| 40 | varint::Error::InvalidVarInt => Self::InvalidPacket("invalid variable integer encoding"), | 40 | varint::Error::InvalidVarInt => { |
| 41 | Self::InvalidPacket("invalid variable integer encoding") | ||
| 42 | } | ||
| 41 | } | 43 | } |
| 42 | } | 44 | } |
| 43 | } | 45 | } |
| @@ -92,8 +94,10 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | |||
| 92 | protocol::PACKET_TYPE_PUBLISH => { | 94 | protocol::PACKET_TYPE_PUBLISH => { |
| 93 | // Extract flags from the fixed header | 95 | // Extract flags from the fixed header |
| 94 | let retain = (packet_flags & protocol::PUBLISH_FLAG_RETAIN) != 0; | 96 | let retain = (packet_flags & protocol::PUBLISH_FLAG_RETAIN) != 0; |
| 95 | let qos_value = (packet_flags & protocol::PUBLISH_FLAG_QOS_MASK) >> protocol::PUBLISH_FLAG_QOS_SHIFT; | 97 | let qos_value = (packet_flags & protocol::PUBLISH_FLAG_QOS_MASK) |
| 96 | let qos = Qos::from_u8(qos_value).ok_or(Error::InvalidPacket("PUBLISH has invalid QoS value"))?; | 98 | >> protocol::PUBLISH_FLAG_QOS_SHIFT; |
| 99 | let qos = Qos::from_u8(qos_value) | ||
| 100 | .ok_or(Error::InvalidPacket("PUBLISH has invalid QoS value"))?; | ||
| 97 | let dup = (packet_flags & protocol::PUBLISH_FLAG_DUP) != 0; | 101 | let dup = (packet_flags & protocol::PUBLISH_FLAG_DUP) != 0; |
| 98 | 102 | ||
| 99 | // Track position after fixed header to calculate data length | 103 | // Track position after fixed header to calculate data length |
| @@ -113,7 +117,9 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | |||
| 113 | let variable_header_len = reader.num_read() - variable_header_start; | 117 | let variable_header_len = reader.num_read() - variable_header_start; |
| 114 | let data_len = (packet_len as usize) | 118 | let data_len = (packet_len as usize) |
| 115 | .checked_sub(variable_header_len) | 119 | .checked_sub(variable_header_len) |
| 116 | .ok_or(Error::InvalidPacket("PUBLISH remaining length is too short for headers"))?; | 120 | .ok_or(Error::InvalidPacket( |
| 121 | "PUBLISH remaining length is too short for headers", | ||
| 122 | ))?; | ||
| 117 | 123 | ||
| 118 | Packet::Publish { | 124 | Packet::Publish { |
| 119 | topic, | 125 | topic, |
| @@ -140,7 +146,9 @@ pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> { | |||
| 140 | } | 146 | } |
| 141 | if packet_len < 3 { | 147 | if packet_len < 3 { |
| 142 | // Minimum: 2 bytes packet ID + 1 byte return code | 148 | // Minimum: 2 bytes packet ID + 1 byte return code |
| 143 | return Err(Error::InvalidPacket("SUBACK remaining length must be at least 3")); | 149 | return Err(Error::InvalidPacket( |
| 150 | "SUBACK remaining length must be at least 3", | ||
| 151 | )); | ||
| 144 | } | 152 | } |
| 145 | let packet_id = PacketId::from(reader.read_u16()?); | 153 | let packet_id = PacketId::from(reader.read_u16()?); |
| 146 | let return_code = reader.read_u8()?; | 154 | let return_code = reader.read_u8()?; |
