aboutsummaryrefslogtreecommitdiff
path: root/src/mqtt/tx.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mqtt/tx.rs')
-rw-r--r--src/mqtt/tx.rs203
1 files changed, 203 insertions, 0 deletions
diff --git a/src/mqtt/tx.rs b/src/mqtt/tx.rs
new file mode 100644
index 0000000..cdf1c75
--- /dev/null
+++ b/src/mqtt/tx.rs
@@ -0,0 +1,203 @@
1use super::{
2 PacketId,
3 field::{self, Field, FieldBuffer},
4 protocol,
5 qos::Qos,
6};
7
8pub struct Connect<'a> {
9 pub client_id: &'a str,
10 pub clean_session: bool,
11 pub username: Option<&'a str>,
12 pub password: Option<&'a [u8]>,
13 pub will_topic: Option<&'a str>,
14 pub will_payload: Option<&'a [u8]>,
15 pub will_retain: bool,
16 pub keepalive: Option<u16>,
17}
18
19pub fn connect<'a>(buffer: &mut FieldBuffer<'a>, connect: Connect<'a>) {
20 let mut flags = 0;
21 if connect.clean_session {
22 flags |= protocol::CONNECT_FLAG_CLEAN_SESSION;
23 }
24 if connect.username.is_some() {
25 flags |= protocol::CONNECT_FLAG_USERNAME;
26 }
27 if connect.password.is_some() {
28 flags |= protocol::CONNECT_FLAG_PASSWORD;
29 }
30 if connect.will_topic.is_some() {
31 flags |= protocol::CONNECT_FLAG_WILL_FLAG;
32 }
33 if connect.will_retain {
34 flags |= protocol::CONNECT_FLAG_WILL_RETAIN;
35 }
36
37 buffer.push(Field::U8(protocol::create_header_control(
38 protocol::PACKET_TYPE_CONNECT,
39 0,
40 )));
41 buffer.push(Field::VarInt(0));
42
43 buffer.push(Field::LenPrefixedString(protocol::PROTOCOL_NAME));
44 buffer.push(Field::U8(protocol::PROTOCOL_LEVEL_3_1_1));
45 buffer.push(Field::U8(flags));
46 buffer.push(Field::U16(connect.keepalive.unwrap_or(0)));
47 buffer.push(Field::LenPrefixedString(connect.client_id));
48 if let Some(will_topic) = connect.will_topic {
49 buffer.push(Field::LenPrefixedString(will_topic));
50 buffer.push(Field::LenPrefixedBuffer(
51 connect.will_payload.unwrap_or(&[]),
52 ));
53 }
54 if let Some(username) = connect.username {
55 buffer.push(Field::LenPrefixedString(username));
56 }
57 if let Some(password) = connect.password {
58 buffer.push(Field::LenPrefixedBuffer(password));
59 }
60
61 let message_size = field::fields_size(&buffer.as_slice()[2..]);
62 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
63}
64
65pub struct Publish<'a> {
66 pub topic: &'a str,
67 pub payload: &'a [u8],
68 pub qos: Qos,
69 pub retain: bool,
70 pub dup: bool,
71 pub packet_id: Option<PacketId>,
72}
73
74pub fn publish<'a>(buffer: &mut FieldBuffer<'a>, publish: Publish<'a>) {
75 let mut flags = 0u8;
76
77 // Set QoS bits (bits 1-2)
78 flags |= (publish.qos.to_u8() & 0x03) << 1;
79
80 // Set RETAIN flag (bit 0)
81 if publish.retain {
82 flags |= 0x01;
83 }
84
85 // Set DUP flag (bit 3)
86 if publish.dup {
87 flags |= 0x08;
88 }
89
90 buffer.push(Field::U8(protocol::create_header_control(
91 protocol::PACKET_TYPE_PUBLISH,
92 flags,
93 )));
94 buffer.push(Field::VarInt(0));
95
96 buffer.push(Field::LenPrefixedString(publish.topic));
97
98 // Packet ID is only present for QoS 1 and 2
99 if publish.qos.to_u8() > 0 {
100 // TODO: turn this into a warning
101 let packet_id = publish.packet_id.expect("packet_id required for QoS > 0");
102 buffer.push(Field::U16(packet_id.into()));
103 }
104
105 buffer.push(Field::Buffer(publish.payload));
106
107 let message_size = field::fields_size(&buffer.as_slice()[2..]);
108 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
109}
110
111pub struct Subscribe<'a> {
112 pub topic: &'a str,
113 pub qos: Qos,
114 pub packet_id: PacketId,
115}
116
117pub fn subscribe<'a>(buffer: &mut FieldBuffer<'a>, subscribe: Subscribe<'a>) {
118 // SUBSCRIBE packets have fixed header flags (reserved bits)
119 buffer.push(Field::U8(protocol::create_header_control(
120 protocol::PACKET_TYPE_SUBSCRIBE,
121 protocol::SUBSCRIBE_HEADER_FLAGS,
122 )));
123 buffer.push(Field::VarInt(0));
124
125 // Variable header: packet identifier
126 buffer.push(Field::U16(subscribe.packet_id.into()));
127
128 // Payload: topic filter + QoS
129 buffer.push(Field::LenPrefixedString(subscribe.topic));
130 buffer.push(Field::U8(subscribe.qos.to_u8()));
131
132 let message_size = field::fields_size(&buffer.as_slice()[2..]);
133 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
134}
135
136pub struct Unsubscribe<'a> {
137 pub topic: &'a str,
138 pub packet_id: PacketId,
139}
140
141pub fn unsubscribe<'a>(buffer: &mut FieldBuffer<'a>, unsubscribe: Unsubscribe<'a>) {
142 // UNSUBSCRIBE packets have fixed header flags (reserved bits)
143 buffer.push(Field::U8(protocol::create_header_control(
144 protocol::PACKET_TYPE_UNSUBSCRIBE,
145 protocol::UNSUBSCRIBE_HEADER_FLAGS,
146 )));
147 buffer.push(Field::VarInt(0));
148
149 // Variable header: packet identifier
150 buffer.push(Field::U16(unsubscribe.packet_id.into()));
151
152 // Payload: topic filter (no QoS)
153 buffer.push(Field::LenPrefixedString(unsubscribe.topic));
154
155 let message_size = field::fields_size(&buffer.as_slice()[2..]);
156 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
157}
158
159pub fn disconnect(buffer: &mut FieldBuffer) {
160 // DISCONNECT has no variable header or payload
161 buffer.push(Field::U8(protocol::create_header_control(
162 protocol::PACKET_TYPE_DISCONNECT,
163 0,
164 )));
165 buffer.push(Field::VarInt(0));
166}
167
168pub fn puback(buffer: &mut FieldBuffer, packet_id: PacketId) {
169 buffer.push(Field::U8(protocol::create_header_control(
170 protocol::PACKET_TYPE_PUBACK,
171 0,
172 )));
173 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
174 buffer.push(Field::U16(packet_id.into()));
175}
176
177pub fn pubrec(buffer: &mut FieldBuffer, packet_id: PacketId) {
178 buffer.push(Field::U8(protocol::create_header_control(
179 protocol::PACKET_TYPE_PUBREC,
180 0,
181 )));
182 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
183 buffer.push(Field::U16(packet_id.into()));
184}
185
186pub fn pubrel(buffer: &mut FieldBuffer, packet_id: PacketId) {
187 buffer.push(Field::U8(protocol::create_header_control(
188 protocol::PACKET_TYPE_PUBREL,
189 protocol::PUBREL_HEADER_FLAGS,
190 )));
191 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
192 buffer.push(Field::U16(packet_id.into()));
193}
194
195pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) {
196 buffer.push(Field::U8(protocol::create_header_control(
197 protocol::PACKET_TYPE_PUBCOMP,
198 0,
199 )));
200 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
201 buffer.push(Field::U16(packet_id.into()));
202}
203