diff options
Diffstat (limited to 'src/mqtt/tx.rs')
| -rw-r--r-- | src/mqtt/tx.rs | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/mqtt/tx.rs b/src/mqtt/tx.rs new file mode 100644 index 0000000..cdf1c75 --- /dev/null +++ b/src/mqtt/tx.rs | |||
| @@ -0,0 +1,203 @@ | |||
| 1 | use super::{ | ||
| 2 | PacketId, | ||
| 3 | field::{self, Field, FieldBuffer}, | ||
| 4 | protocol, | ||
| 5 | qos::Qos, | ||
| 6 | }; | ||
| 7 | |||
| 8 | pub struct Connect<'a> { | ||
| 9 | pub client_id: &'a str, | ||
| 10 | pub clean_session: bool, | ||
| 11 | pub username: Option<&'a str>, | ||
| 12 | pub password: Option<&'a [u8]>, | ||
| 13 | pub will_topic: Option<&'a str>, | ||
| 14 | pub will_payload: Option<&'a [u8]>, | ||
| 15 | pub will_retain: bool, | ||
| 16 | pub keepalive: Option<u16>, | ||
| 17 | } | ||
| 18 | |||
| 19 | pub fn connect<'a>(buffer: &mut FieldBuffer<'a>, connect: Connect<'a>) { | ||
| 20 | let mut flags = 0; | ||
| 21 | if connect.clean_session { | ||
| 22 | flags |= protocol::CONNECT_FLAG_CLEAN_SESSION; | ||
| 23 | } | ||
| 24 | if connect.username.is_some() { | ||
| 25 | flags |= protocol::CONNECT_FLAG_USERNAME; | ||
| 26 | } | ||
| 27 | if connect.password.is_some() { | ||
| 28 | flags |= protocol::CONNECT_FLAG_PASSWORD; | ||
| 29 | } | ||
| 30 | if connect.will_topic.is_some() { | ||
| 31 | flags |= protocol::CONNECT_FLAG_WILL_FLAG; | ||
| 32 | } | ||
| 33 | if connect.will_retain { | ||
| 34 | flags |= protocol::CONNECT_FLAG_WILL_RETAIN; | ||
| 35 | } | ||
| 36 | |||
| 37 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 38 | protocol::PACKET_TYPE_CONNECT, | ||
| 39 | 0, | ||
| 40 | ))); | ||
| 41 | buffer.push(Field::VarInt(0)); | ||
| 42 | |||
| 43 | buffer.push(Field::LenPrefixedString(protocol::PROTOCOL_NAME)); | ||
| 44 | buffer.push(Field::U8(protocol::PROTOCOL_LEVEL_3_1_1)); | ||
| 45 | buffer.push(Field::U8(flags)); | ||
| 46 | buffer.push(Field::U16(connect.keepalive.unwrap_or(0))); | ||
| 47 | buffer.push(Field::LenPrefixedString(connect.client_id)); | ||
| 48 | if let Some(will_topic) = connect.will_topic { | ||
| 49 | buffer.push(Field::LenPrefixedString(will_topic)); | ||
| 50 | buffer.push(Field::LenPrefixedBuffer( | ||
| 51 | connect.will_payload.unwrap_or(&[]), | ||
| 52 | )); | ||
| 53 | } | ||
| 54 | if let Some(username) = connect.username { | ||
| 55 | buffer.push(Field::LenPrefixedString(username)); | ||
| 56 | } | ||
| 57 | if let Some(password) = connect.password { | ||
| 58 | buffer.push(Field::LenPrefixedBuffer(password)); | ||
| 59 | } | ||
| 60 | |||
| 61 | let message_size = field::fields_size(&buffer.as_slice()[2..]); | ||
| 62 | buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap())); | ||
| 63 | } | ||
| 64 | |||
| 65 | pub struct Publish<'a> { | ||
| 66 | pub topic: &'a str, | ||
| 67 | pub payload: &'a [u8], | ||
| 68 | pub qos: Qos, | ||
| 69 | pub retain: bool, | ||
| 70 | pub dup: bool, | ||
| 71 | pub packet_id: Option<PacketId>, | ||
| 72 | } | ||
| 73 | |||
| 74 | pub fn publish<'a>(buffer: &mut FieldBuffer<'a>, publish: Publish<'a>) { | ||
| 75 | let mut flags = 0u8; | ||
| 76 | |||
| 77 | // Set QoS bits (bits 1-2) | ||
| 78 | flags |= (publish.qos.to_u8() & 0x03) << 1; | ||
| 79 | |||
| 80 | // Set RETAIN flag (bit 0) | ||
| 81 | if publish.retain { | ||
| 82 | flags |= 0x01; | ||
| 83 | } | ||
| 84 | |||
| 85 | // Set DUP flag (bit 3) | ||
| 86 | if publish.dup { | ||
| 87 | flags |= 0x08; | ||
| 88 | } | ||
| 89 | |||
| 90 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 91 | protocol::PACKET_TYPE_PUBLISH, | ||
| 92 | flags, | ||
| 93 | ))); | ||
| 94 | buffer.push(Field::VarInt(0)); | ||
| 95 | |||
| 96 | buffer.push(Field::LenPrefixedString(publish.topic)); | ||
| 97 | |||
| 98 | // Packet ID is only present for QoS 1 and 2 | ||
| 99 | if publish.qos.to_u8() > 0 { | ||
| 100 | // TODO: turn this into a warning | ||
| 101 | let packet_id = publish.packet_id.expect("packet_id required for QoS > 0"); | ||
| 102 | buffer.push(Field::U16(packet_id.into())); | ||
| 103 | } | ||
| 104 | |||
| 105 | buffer.push(Field::Buffer(publish.payload)); | ||
| 106 | |||
| 107 | let message_size = field::fields_size(&buffer.as_slice()[2..]); | ||
| 108 | buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap())); | ||
| 109 | } | ||
| 110 | |||
| 111 | pub struct Subscribe<'a> { | ||
| 112 | pub topic: &'a str, | ||
| 113 | pub qos: Qos, | ||
| 114 | pub packet_id: PacketId, | ||
| 115 | } | ||
| 116 | |||
| 117 | pub fn subscribe<'a>(buffer: &mut FieldBuffer<'a>, subscribe: Subscribe<'a>) { | ||
| 118 | // SUBSCRIBE packets have fixed header flags (reserved bits) | ||
| 119 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 120 | protocol::PACKET_TYPE_SUBSCRIBE, | ||
| 121 | protocol::SUBSCRIBE_HEADER_FLAGS, | ||
| 122 | ))); | ||
| 123 | buffer.push(Field::VarInt(0)); | ||
| 124 | |||
| 125 | // Variable header: packet identifier | ||
| 126 | buffer.push(Field::U16(subscribe.packet_id.into())); | ||
| 127 | |||
| 128 | // Payload: topic filter + QoS | ||
| 129 | buffer.push(Field::LenPrefixedString(subscribe.topic)); | ||
| 130 | buffer.push(Field::U8(subscribe.qos.to_u8())); | ||
| 131 | |||
| 132 | let message_size = field::fields_size(&buffer.as_slice()[2..]); | ||
| 133 | buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap())); | ||
| 134 | } | ||
| 135 | |||
| 136 | pub struct Unsubscribe<'a> { | ||
| 137 | pub topic: &'a str, | ||
| 138 | pub packet_id: PacketId, | ||
| 139 | } | ||
| 140 | |||
| 141 | pub fn unsubscribe<'a>(buffer: &mut FieldBuffer<'a>, unsubscribe: Unsubscribe<'a>) { | ||
| 142 | // UNSUBSCRIBE packets have fixed header flags (reserved bits) | ||
| 143 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 144 | protocol::PACKET_TYPE_UNSUBSCRIBE, | ||
| 145 | protocol::UNSUBSCRIBE_HEADER_FLAGS, | ||
| 146 | ))); | ||
| 147 | buffer.push(Field::VarInt(0)); | ||
| 148 | |||
| 149 | // Variable header: packet identifier | ||
| 150 | buffer.push(Field::U16(unsubscribe.packet_id.into())); | ||
| 151 | |||
| 152 | // Payload: topic filter (no QoS) | ||
| 153 | buffer.push(Field::LenPrefixedString(unsubscribe.topic)); | ||
| 154 | |||
| 155 | let message_size = field::fields_size(&buffer.as_slice()[2..]); | ||
| 156 | buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap())); | ||
| 157 | } | ||
| 158 | |||
| 159 | pub fn disconnect(buffer: &mut FieldBuffer) { | ||
| 160 | // DISCONNECT has no variable header or payload | ||
| 161 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 162 | protocol::PACKET_TYPE_DISCONNECT, | ||
| 163 | 0, | ||
| 164 | ))); | ||
| 165 | buffer.push(Field::VarInt(0)); | ||
| 166 | } | ||
| 167 | |||
| 168 | pub fn puback(buffer: &mut FieldBuffer, packet_id: PacketId) { | ||
| 169 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 170 | protocol::PACKET_TYPE_PUBACK, | ||
| 171 | 0, | ||
| 172 | ))); | ||
| 173 | buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID) | ||
| 174 | buffer.push(Field::U16(packet_id.into())); | ||
| 175 | } | ||
| 176 | |||
| 177 | pub fn pubrec(buffer: &mut FieldBuffer, packet_id: PacketId) { | ||
| 178 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 179 | protocol::PACKET_TYPE_PUBREC, | ||
| 180 | 0, | ||
| 181 | ))); | ||
| 182 | buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID) | ||
| 183 | buffer.push(Field::U16(packet_id.into())); | ||
| 184 | } | ||
| 185 | |||
| 186 | pub fn pubrel(buffer: &mut FieldBuffer, packet_id: PacketId) { | ||
| 187 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 188 | protocol::PACKET_TYPE_PUBREL, | ||
| 189 | protocol::PUBREL_HEADER_FLAGS, | ||
| 190 | ))); | ||
| 191 | buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID) | ||
| 192 | buffer.push(Field::U16(packet_id.into())); | ||
| 193 | } | ||
| 194 | |||
| 195 | pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) { | ||
| 196 | buffer.push(Field::U8(protocol::create_header_control( | ||
| 197 | protocol::PACKET_TYPE_PUBCOMP, | ||
| 198 | 0, | ||
| 199 | ))); | ||
| 200 | buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID) | ||
| 201 | buffer.push(Field::U16(packet_id.into())); | ||
| 202 | } | ||
| 203 | |||
