aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--Cargo.lock454
-rw-r--r--Cargo.toml16
-rw-r--r--embedded-mqtt/Cargo.lock248
-rw-r--r--embedded-mqtt/Cargo.toml11
-rw-r--r--embedded-mqtt/mqtt-wire-format.md1089
-rw-r--r--embedded-mqtt/src/connect_code.rs58
-rw-r--r--embedded-mqtt/src/field.rs89
-rw-r--r--embedded-mqtt/src/lib.rs486
-rw-r--r--embedded-mqtt/src/packet_id.rs14
-rw-r--r--embedded-mqtt/src/protocol.rs65
-rw-r--r--embedded-mqtt/src/qos.rs64
-rw-r--r--embedded-mqtt/src/rx.rs241
-rw-r--r--embedded-mqtt/src/transport.rs39
-rw-r--r--embedded-mqtt/src/tx.rs203
-rw-r--r--embedded-mqtt/src/varint.rs69
-rw-r--r--src/constants.rs97
-rw-r--r--src/lib.rs967
-rw-r--r--src/transport.rs3
-rw-r--r--src/unit.rs45
20 files changed, 4260 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f2564bc
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
1.DS_Store
2target/
diff --git a/Cargo.lock b/Cargo.lock
new file mode 100644
index 0000000..1fadd29
--- /dev/null
+++ b/Cargo.lock
@@ -0,0 +1,454 @@
1# This file is automatically @generated by Cargo.
2# It is not intended for manual editing.
3version = 4
4
5[[package]]
6name = "bitflags"
7version = "1.3.2"
8source = "registry+https://github.com/rust-lang/crates.io-index"
9checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
10
11[[package]]
12name = "byteorder"
13version = "1.5.0"
14source = "registry+https://github.com/rust-lang/crates.io-index"
15checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
16
17[[package]]
18name = "cfg-if"
19version = "1.0.4"
20source = "registry+https://github.com/rust-lang/crates.io-index"
21checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
22
23[[package]]
24name = "critical-section"
25version = "1.2.0"
26source = "registry+https://github.com/rust-lang/crates.io-index"
27checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
28
29[[package]]
30name = "defmt"
31version = "0.3.100"
32source = "registry+https://github.com/rust-lang/crates.io-index"
33checksum = "f0963443817029b2024136fc4dd07a5107eb8f977eaf18fcd1fdeb11306b64ad"
34dependencies = [
35 "defmt 1.0.1",
36]
37
38[[package]]
39name = "defmt"
40version = "1.0.1"
41source = "registry+https://github.com/rust-lang/crates.io-index"
42checksum = "548d977b6da32fa1d1fda2876453da1e7df63ad0304c8b3dae4dbe7b96f39b78"
43dependencies = [
44 "bitflags",
45 "defmt-macros",
46]
47
48[[package]]
49name = "defmt-macros"
50version = "1.0.1"
51source = "registry+https://github.com/rust-lang/crates.io-index"
52checksum = "3d4fc12a85bcf441cfe44344c4b72d58493178ce635338a3f3b78943aceb258e"
53dependencies = [
54 "defmt-parser",
55 "proc-macro-error2",
56 "proc-macro2",
57 "quote",
58 "syn",
59]
60
61[[package]]
62name = "defmt-parser"
63version = "1.0.0"
64source = "registry+https://github.com/rust-lang/crates.io-index"
65checksum = "10d60334b3b2e7c9d91ef8150abfb6fa4c1c39ebbcf4a81c2e346aad939fee3e"
66dependencies = [
67 "thiserror",
68]
69
70[[package]]
71name = "document-features"
72version = "0.2.12"
73source = "registry+https://github.com/rust-lang/crates.io-index"
74checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61"
75dependencies = [
76 "litrs",
77]
78
79[[package]]
80name = "embassy-futures"
81version = "0.1.2"
82source = "registry+https://github.com/rust-lang/crates.io-index"
83checksum = "dc2d050bdc5c21e0862a89256ed8029ae6c290a93aecefc73084b3002cdebb01"
84
85[[package]]
86name = "embassy-ha"
87version = "0.1.0"
88dependencies = [
89 "defmt 1.0.1",
90 "embassy-futures",
91 "embassy-net",
92 "embassy-sync",
93 "embassy-time",
94 "embedded-io-async",
95 "embedded-mqtt",
96 "heapless 0.9.2",
97 "serde",
98 "serde-json-core",
99]
100
101[[package]]
102name = "embassy-net"
103version = "0.7.1"
104source = "registry+https://github.com/rust-lang/crates.io-index"
105checksum = "0558a231a47e7d4a06a28b5278c92e860f1200f24821d2f365a2f40fe3f3c7b2"
106dependencies = [
107 "defmt 1.0.1",
108 "document-features",
109 "embassy-net-driver",
110 "embassy-sync",
111 "embassy-time",
112 "embedded-io-async",
113 "embedded-nal-async",
114 "heapless 0.8.0",
115 "managed",
116 "smoltcp",
117]
118
119[[package]]
120name = "embassy-net-driver"
121version = "0.2.0"
122source = "registry+https://github.com/rust-lang/crates.io-index"
123checksum = "524eb3c489760508f71360112bca70f6e53173e6fe48fc5f0efd0f5ab217751d"
124dependencies = [
125 "defmt 0.3.100",
126]
127
128[[package]]
129name = "embassy-sync"
130version = "0.7.2"
131source = "registry+https://github.com/rust-lang/crates.io-index"
132checksum = "73974a3edbd0bd286759b3d483540f0ebef705919a5f56f4fc7709066f71689b"
133dependencies = [
134 "cfg-if",
135 "critical-section",
136 "defmt 1.0.1",
137 "embedded-io-async",
138 "futures-core",
139 "futures-sink",
140 "heapless 0.8.0",
141]
142
143[[package]]
144name = "embassy-time"
145version = "0.5.0"
146source = "registry+https://github.com/rust-lang/crates.io-index"
147checksum = "f4fa65b9284d974dad7a23bb72835c4ec85c0b540d86af7fc4098c88cff51d65"
148dependencies = [
149 "cfg-if",
150 "critical-section",
151 "defmt 1.0.1",
152 "document-features",
153 "embassy-time-driver",
154 "embedded-hal 0.2.7",
155 "embedded-hal 1.0.0",
156 "embedded-hal-async",
157 "futures-core",
158]
159
160[[package]]
161name = "embassy-time-driver"
162version = "0.2.1"
163source = "registry+https://github.com/rust-lang/crates.io-index"
164checksum = "a0a244c7dc22c8d0289379c8d8830cae06bb93d8f990194d0de5efb3b5ae7ba6"
165dependencies = [
166 "document-features",
167]
168
169[[package]]
170name = "embedded-hal"
171version = "0.2.7"
172source = "registry+https://github.com/rust-lang/crates.io-index"
173checksum = "35949884794ad573cf46071e41c9b60efb0cb311e3ca01f7af807af1debc66ff"
174dependencies = [
175 "nb 0.1.3",
176 "void",
177]
178
179[[package]]
180name = "embedded-hal"
181version = "1.0.0"
182source = "registry+https://github.com/rust-lang/crates.io-index"
183checksum = "361a90feb7004eca4019fb28352a9465666b24f840f5c3cddf0ff13920590b89"
184
185[[package]]
186name = "embedded-hal-async"
187version = "1.0.0"
188source = "registry+https://github.com/rust-lang/crates.io-index"
189checksum = "0c4c685bbef7fe13c3c6dd4da26841ed3980ef33e841cddfa15ce8a8fb3f1884"
190dependencies = [
191 "embedded-hal 1.0.0",
192]
193
194[[package]]
195name = "embedded-io"
196version = "0.6.1"
197source = "registry+https://github.com/rust-lang/crates.io-index"
198checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d"
199
200[[package]]
201name = "embedded-io-async"
202version = "0.6.1"
203source = "registry+https://github.com/rust-lang/crates.io-index"
204checksum = "3ff09972d4073aa8c299395be75161d582e7629cd663171d62af73c8d50dba3f"
205dependencies = [
206 "embedded-io",
207]
208
209[[package]]
210name = "embedded-mqtt"
211version = "0.1.0"
212dependencies = [
213 "embassy-net",
214 "embedded-io-async",
215]
216
217[[package]]
218name = "embedded-nal"
219version = "0.9.0"
220source = "registry+https://github.com/rust-lang/crates.io-index"
221checksum = "c56a28be191a992f28f178ec338a0bf02f63d7803244add736d026a471e6ed77"
222dependencies = [
223 "nb 1.1.0",
224]
225
226[[package]]
227name = "embedded-nal-async"
228version = "0.8.0"
229source = "registry+https://github.com/rust-lang/crates.io-index"
230checksum = "76959917cd2b86f40a98c28dd5624eddd1fa69d746241c8257eac428d83cb211"
231dependencies = [
232 "embedded-io-async",
233 "embedded-nal",
234]
235
236[[package]]
237name = "futures-core"
238version = "0.3.31"
239source = "registry+https://github.com/rust-lang/crates.io-index"
240checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
241
242[[package]]
243name = "futures-sink"
244version = "0.3.31"
245source = "registry+https://github.com/rust-lang/crates.io-index"
246checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
247
248[[package]]
249name = "hash32"
250version = "0.3.1"
251source = "registry+https://github.com/rust-lang/crates.io-index"
252checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
253dependencies = [
254 "byteorder",
255]
256
257[[package]]
258name = "heapless"
259version = "0.8.0"
260source = "registry+https://github.com/rust-lang/crates.io-index"
261checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad"
262dependencies = [
263 "defmt 0.3.100",
264 "hash32",
265 "serde",
266 "stable_deref_trait",
267]
268
269[[package]]
270name = "heapless"
271version = "0.9.2"
272source = "registry+https://github.com/rust-lang/crates.io-index"
273checksum = "2af2455f757db2b292a9b1768c4b70186d443bcb3b316252d6b540aec1cd89ed"
274dependencies = [
275 "hash32",
276 "stable_deref_trait",
277]
278
279[[package]]
280name = "litrs"
281version = "1.0.0"
282source = "registry+https://github.com/rust-lang/crates.io-index"
283checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
284
285[[package]]
286name = "managed"
287version = "0.8.0"
288source = "registry+https://github.com/rust-lang/crates.io-index"
289checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d"
290
291[[package]]
292name = "nb"
293version = "0.1.3"
294source = "registry+https://github.com/rust-lang/crates.io-index"
295checksum = "801d31da0513b6ec5214e9bf433a77966320625a37860f910be265be6e18d06f"
296dependencies = [
297 "nb 1.1.0",
298]
299
300[[package]]
301name = "nb"
302version = "1.1.0"
303source = "registry+https://github.com/rust-lang/crates.io-index"
304checksum = "8d5439c4ad607c3c23abf66de8c8bf57ba8adcd1f129e699851a6e43935d339d"
305
306[[package]]
307name = "proc-macro-error-attr2"
308version = "2.0.0"
309source = "registry+https://github.com/rust-lang/crates.io-index"
310checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5"
311dependencies = [
312 "proc-macro2",
313 "quote",
314]
315
316[[package]]
317name = "proc-macro-error2"
318version = "2.0.1"
319source = "registry+https://github.com/rust-lang/crates.io-index"
320checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802"
321dependencies = [
322 "proc-macro-error-attr2",
323 "proc-macro2",
324 "quote",
325 "syn",
326]
327
328[[package]]
329name = "proc-macro2"
330version = "1.0.103"
331source = "registry+https://github.com/rust-lang/crates.io-index"
332checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
333dependencies = [
334 "unicode-ident",
335]
336
337[[package]]
338name = "quote"
339version = "1.0.42"
340source = "registry+https://github.com/rust-lang/crates.io-index"
341checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f"
342dependencies = [
343 "proc-macro2",
344]
345
346[[package]]
347name = "ryu"
348version = "1.0.20"
349source = "registry+https://github.com/rust-lang/crates.io-index"
350checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
351
352[[package]]
353name = "serde"
354version = "1.0.228"
355source = "registry+https://github.com/rust-lang/crates.io-index"
356checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
357dependencies = [
358 "serde_core",
359 "serde_derive",
360]
361
362[[package]]
363name = "serde-json-core"
364version = "0.6.0"
365source = "registry+https://github.com/rust-lang/crates.io-index"
366checksum = "5b81787e655bd59cecadc91f7b6b8651330b2be6c33246039a65e5cd6f4e0828"
367dependencies = [
368 "heapless 0.8.0",
369 "ryu",
370 "serde",
371]
372
373[[package]]
374name = "serde_core"
375version = "1.0.228"
376source = "registry+https://github.com/rust-lang/crates.io-index"
377checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
378dependencies = [
379 "serde_derive",
380]
381
382[[package]]
383name = "serde_derive"
384version = "1.0.228"
385source = "registry+https://github.com/rust-lang/crates.io-index"
386checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
387dependencies = [
388 "proc-macro2",
389 "quote",
390 "syn",
391]
392
393[[package]]
394name = "smoltcp"
395version = "0.12.0"
396source = "registry+https://github.com/rust-lang/crates.io-index"
397checksum = "dad095989c1533c1c266d9b1e8d70a1329dd3723c3edac6d03bbd67e7bf6f4bb"
398dependencies = [
399 "bitflags",
400 "byteorder",
401 "cfg-if",
402 "defmt 0.3.100",
403 "heapless 0.8.0",
404 "managed",
405]
406
407[[package]]
408name = "stable_deref_trait"
409version = "1.2.1"
410source = "registry+https://github.com/rust-lang/crates.io-index"
411checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
412
413[[package]]
414name = "syn"
415version = "2.0.111"
416source = "registry+https://github.com/rust-lang/crates.io-index"
417checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87"
418dependencies = [
419 "proc-macro2",
420 "quote",
421 "unicode-ident",
422]
423
424[[package]]
425name = "thiserror"
426version = "2.0.17"
427source = "registry+https://github.com/rust-lang/crates.io-index"
428checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
429dependencies = [
430 "thiserror-impl",
431]
432
433[[package]]
434name = "thiserror-impl"
435version = "2.0.17"
436source = "registry+https://github.com/rust-lang/crates.io-index"
437checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
438dependencies = [
439 "proc-macro2",
440 "quote",
441 "syn",
442]
443
444[[package]]
445name = "unicode-ident"
446version = "1.0.22"
447source = "registry+https://github.com/rust-lang/crates.io-index"
448checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
449
450[[package]]
451name = "void"
452version = "1.0.2"
453source = "registry+https://github.com/rust-lang/crates.io-index"
454checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..908e183
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,16 @@
1[package]
2name = "embassy-ha"
3version = "0.1.0"
4edition = "2024"
5
6[dependencies]
7embedded-mqtt = { path = "./embedded-mqtt" , features = ["embassy-net"] }
8embassy-net = { version = "0.7.1", features = ["defmt", "medium-ip", "proto-ipv4", "tcp"] }
9heapless = "0.9.2"
10embassy-time = { version = "0.5.0" }
11serde-json-core = "0.6.0"
12serde = { version = "1.0.228", default-features = false, features = ["derive"] }
13defmt = "1.0.1"
14embassy-sync = { version = "0.7.2", features = ["defmt"] }
15embassy-futures = "0.1.2"
16embedded-io-async = "0.6"
diff --git a/embedded-mqtt/Cargo.lock b/embedded-mqtt/Cargo.lock
new file mode 100644
index 0000000..4fd3a20
--- /dev/null
+++ b/embedded-mqtt/Cargo.lock
@@ -0,0 +1,248 @@
1# This file is automatically @generated by Cargo.
2# It is not intended for manual editing.
3version = 4
4
5[[package]]
6name = "bitflags"
7version = "1.3.2"
8source = "registry+https://github.com/rust-lang/crates.io-index"
9checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
10
11[[package]]
12name = "byteorder"
13version = "1.5.0"
14source = "registry+https://github.com/rust-lang/crates.io-index"
15checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
16
17[[package]]
18name = "cfg-if"
19version = "1.0.4"
20source = "registry+https://github.com/rust-lang/crates.io-index"
21checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
22
23[[package]]
24name = "critical-section"
25version = "1.2.0"
26source = "registry+https://github.com/rust-lang/crates.io-index"
27checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
28
29[[package]]
30name = "document-features"
31version = "0.2.12"
32source = "registry+https://github.com/rust-lang/crates.io-index"
33checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61"
34dependencies = [
35 "litrs",
36]
37
38[[package]]
39name = "embassy-net"
40version = "0.7.1"
41source = "registry+https://github.com/rust-lang/crates.io-index"
42checksum = "0558a231a47e7d4a06a28b5278c92e860f1200f24821d2f365a2f40fe3f3c7b2"
43dependencies = [
44 "document-features",
45 "embassy-net-driver",
46 "embassy-sync",
47 "embassy-time",
48 "embedded-io-async",
49 "embedded-nal-async",
50 "heapless",
51 "managed",
52 "smoltcp",
53]
54
55[[package]]
56name = "embassy-net-driver"
57version = "0.2.0"
58source = "registry+https://github.com/rust-lang/crates.io-index"
59checksum = "524eb3c489760508f71360112bca70f6e53173e6fe48fc5f0efd0f5ab217751d"
60
61[[package]]
62name = "embassy-sync"
63version = "0.7.2"
64source = "registry+https://github.com/rust-lang/crates.io-index"
65checksum = "73974a3edbd0bd286759b3d483540f0ebef705919a5f56f4fc7709066f71689b"
66dependencies = [
67 "cfg-if",
68 "critical-section",
69 "embedded-io-async",
70 "futures-core",
71 "futures-sink",
72 "heapless",
73]
74
75[[package]]
76name = "embassy-time"
77version = "0.5.0"
78source = "registry+https://github.com/rust-lang/crates.io-index"
79checksum = "f4fa65b9284d974dad7a23bb72835c4ec85c0b540d86af7fc4098c88cff51d65"
80dependencies = [
81 "cfg-if",
82 "critical-section",
83 "document-features",
84 "embassy-time-driver",
85 "embedded-hal 0.2.7",
86 "embedded-hal 1.0.0",
87 "embedded-hal-async",
88 "futures-core",
89]
90
91[[package]]
92name = "embassy-time-driver"
93version = "0.2.1"
94source = "registry+https://github.com/rust-lang/crates.io-index"
95checksum = "a0a244c7dc22c8d0289379c8d8830cae06bb93d8f990194d0de5efb3b5ae7ba6"
96dependencies = [
97 "document-features",
98]
99
100[[package]]
101name = "embedded-hal"
102version = "0.2.7"
103source = "registry+https://github.com/rust-lang/crates.io-index"
104checksum = "35949884794ad573cf46071e41c9b60efb0cb311e3ca01f7af807af1debc66ff"
105dependencies = [
106 "nb 0.1.3",
107 "void",
108]
109
110[[package]]
111name = "embedded-hal"
112version = "1.0.0"
113source = "registry+https://github.com/rust-lang/crates.io-index"
114checksum = "361a90feb7004eca4019fb28352a9465666b24f840f5c3cddf0ff13920590b89"
115
116[[package]]
117name = "embedded-hal-async"
118version = "1.0.0"
119source = "registry+https://github.com/rust-lang/crates.io-index"
120checksum = "0c4c685bbef7fe13c3c6dd4da26841ed3980ef33e841cddfa15ce8a8fb3f1884"
121dependencies = [
122 "embedded-hal 1.0.0",
123]
124
125[[package]]
126name = "embedded-io"
127version = "0.6.1"
128source = "registry+https://github.com/rust-lang/crates.io-index"
129checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d"
130
131[[package]]
132name = "embedded-io-async"
133version = "0.6.1"
134source = "registry+https://github.com/rust-lang/crates.io-index"
135checksum = "3ff09972d4073aa8c299395be75161d582e7629cd663171d62af73c8d50dba3f"
136dependencies = [
137 "embedded-io",
138]
139
140[[package]]
141name = "embedded-mqtt"
142version = "0.1.0"
143dependencies = [
144 "embassy-net",
145 "embedded-io-async",
146]
147
148[[package]]
149name = "embedded-nal"
150version = "0.9.0"
151source = "registry+https://github.com/rust-lang/crates.io-index"
152checksum = "c56a28be191a992f28f178ec338a0bf02f63d7803244add736d026a471e6ed77"
153dependencies = [
154 "nb 1.1.0",
155]
156
157[[package]]
158name = "embedded-nal-async"
159version = "0.8.0"
160source = "registry+https://github.com/rust-lang/crates.io-index"
161checksum = "76959917cd2b86f40a98c28dd5624eddd1fa69d746241c8257eac428d83cb211"
162dependencies = [
163 "embedded-io-async",
164 "embedded-nal",
165]
166
167[[package]]
168name = "futures-core"
169version = "0.3.31"
170source = "registry+https://github.com/rust-lang/crates.io-index"
171checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
172
173[[package]]
174name = "futures-sink"
175version = "0.3.31"
176source = "registry+https://github.com/rust-lang/crates.io-index"
177checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
178
179[[package]]
180name = "hash32"
181version = "0.3.1"
182source = "registry+https://github.com/rust-lang/crates.io-index"
183checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
184dependencies = [
185 "byteorder",
186]
187
188[[package]]
189name = "heapless"
190version = "0.8.0"
191source = "registry+https://github.com/rust-lang/crates.io-index"
192checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad"
193dependencies = [
194 "hash32",
195 "stable_deref_trait",
196]
197
198[[package]]
199name = "litrs"
200version = "1.0.0"
201source = "registry+https://github.com/rust-lang/crates.io-index"
202checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
203
204[[package]]
205name = "managed"
206version = "0.8.0"
207source = "registry+https://github.com/rust-lang/crates.io-index"
208checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d"
209
210[[package]]
211name = "nb"
212version = "0.1.3"
213source = "registry+https://github.com/rust-lang/crates.io-index"
214checksum = "801d31da0513b6ec5214e9bf433a77966320625a37860f910be265be6e18d06f"
215dependencies = [
216 "nb 1.1.0",
217]
218
219[[package]]
220name = "nb"
221version = "1.1.0"
222source = "registry+https://github.com/rust-lang/crates.io-index"
223checksum = "8d5439c4ad607c3c23abf66de8c8bf57ba8adcd1f129e699851a6e43935d339d"
224
225[[package]]
226name = "smoltcp"
227version = "0.12.0"
228source = "registry+https://github.com/rust-lang/crates.io-index"
229checksum = "dad095989c1533c1c266d9b1e8d70a1329dd3723c3edac6d03bbd67e7bf6f4bb"
230dependencies = [
231 "bitflags",
232 "byteorder",
233 "cfg-if",
234 "heapless",
235 "managed",
236]
237
238[[package]]
239name = "stable_deref_trait"
240version = "1.2.1"
241source = "registry+https://github.com/rust-lang/crates.io-index"
242checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
243
244[[package]]
245name = "void"
246version = "1.0.2"
247source = "registry+https://github.com/rust-lang/crates.io-index"
248checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
diff --git a/embedded-mqtt/Cargo.toml b/embedded-mqtt/Cargo.toml
new file mode 100644
index 0000000..749eb2b
--- /dev/null
+++ b/embedded-mqtt/Cargo.toml
@@ -0,0 +1,11 @@
1[package]
2name = "embedded-mqtt"
3version = "0.1.0"
4edition = "2024"
5
6[features]
7embassy-net = ["dep:embassy-net"]
8
9[dependencies]
10embassy-net = { version = "0.7", features = ["tcp", "proto-ipv4", "medium-ip"], optional = true }
11embedded-io-async = "0.6"
diff --git a/embedded-mqtt/mqtt-wire-format.md b/embedded-mqtt/mqtt-wire-format.md
new file mode 100644
index 0000000..b4d4f65
--- /dev/null
+++ b/embedded-mqtt/mqtt-wire-format.md
@@ -0,0 +1,1089 @@
1# MQTT Wire Format for Home Assistant
2
3This document covers the essential MQTT wire format details needed for implementing Home Assistant's MQTT integration.
4
5## Overview
6
7MQTT uses a binary protocol over TCP. Each message consists of:
81. **Fixed Header** (always present)
92. **Variable Header** (present in most message types)
103. **Payload** (optional, depends on message type)
11
12## Message Flow: When to Wait for Responses
13
14Understanding which messages require broker responses is critical for implementing the protocol correctly.
15
16### Client-Initiated Messages
17
18| Client Sends | Broker Responds | Must Wait? | Notes |
19| --------------------------------------- | --------------------------------------- | --------------------------------------- | --------------------------------------- |
20| CONNECT | CONNACK | ✅ Yes | Must wait before sending other messages |
21| PUBLISH QoS 0 | *(none)* | ❌ No | Fire and forget |
22| PUBLISH QoS 1 | PUBACK | ✅ Yes | Wait for acknowledgment |
23| PUBLISH QoS 2 | PUBREC → PUBREL → PUBCOMP | ✅ Yes | Four-way handshake |
24| SUBSCRIBE | SUBACK | ✅ Yes | Contains subscription result codes |
25| UNSUBSCRIBE | UNSUBACK | ✅ Yes | Confirms unsubscription |
26| PINGREQ | PINGRESP | ✅ Yes | Keep-alive mechanism |
27| DISCONNECT | *(none)* | ❌ No | Graceful shutdown |
28
29
30### Broker-Initiated Messages
31
32The broker can send these messages to the client at any time:
33
34| Broker Sends | When | Client Action |
35|--------------|------|---------------|
36| PUBLISH | When a message arrives on a subscribed topic | Send PUBACK if QoS 1, or PUBREC if QoS 2 |
37| PINGRESP | In response to PINGREQ | No further action needed |
38
39### Implementation Notes
40
411. **CONNECT**: Always the first message. Block until CONNACK received before sending anything else.
42
432. **PUBLISH QoS 0**: Most common for Home Assistant. Send and continue immediately.
44
453. **PUBLISH QoS 1**: For important messages (e.g., config updates). Wait for PUBACK with matching Packet ID.
46
474. **SUBSCRIBE**: Wait for SUBACK to confirm subscriptions were accepted before assuming you'll receive messages.
48
495. **PINGREQ**: Send when no other messages have been sent during keep-alive period. Wait for PINGRESP.
50
516. **Receiving PUBLISH**: When broker sends PUBLISH with QoS 1, you must respond with PUBACK containing the same Packet ID.
52
53## Fixed Header Format
54
55Every MQTT message starts with a 2+ byte fixed header:
56
57```
58Byte 1: Control Packet Type + Flags
59┌────────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
60│ Bit │ 7 │ 6 │ 5 │ 4 │ 3 │ 2 │ 1 │ 0 │
61├────────┼─────┴─────┴─────┴─────┼─────┴─────┴─────┴─────┤
62│ Field │ Message Type (4) │ Flags (4) │
63└────────┴───────────────────────┴───────────────────────┘
64
65Byte 2+: Remaining Length (1-4 bytes, variable length encoding)
66```
67
68### Message Types
69- `1` = CONNECT
70- `2` = CONNACK
71- `3` = PUBLISH
72- `4` = PUBACK
73- `8` = SUBSCRIBE
74- `9` = SUBACK
75- `10` = UNSUBSCRIBE
76- `11` = UNSUBACK
77- `12` = PINGREQ
78- `13` = PINGRESP
79- `14` = DISCONNECT
80
81### Fixed Header Flags (Bits 0-3)
82
83The meaning of the 4 flag bits depends on the message type.
84
85#### PUBLISH Flags (Message Type 3)
86
87For PUBLISH messages, all 4 flag bits are used:
88
89```
90Bit 3: DUP (Duplicate delivery)
91Bit 2-1: QoS level (2 bits)
92Bit 0: RETAIN
93```
94
95**Bit 3 - DUP (Duplicate)**:
96- `0` = First delivery attempt
97- `1` = Message is being re-delivered (only for QoS > 0)
98
99**Bits 2-1 - QoS Level**:
100- `00` (0) = At most once (fire and forget)
101- `01` (1) = At least once (acknowledged)
102- `10` (2) = Exactly once (four-way handshake)
103- `11` (3) = Reserved (invalid)
104
105**Bit 0 - RETAIN**:
106- `0` = Normal message
107- `1` = Broker should retain this message for new subscribers
108
109**Examples**:
110```
1110x30 = 0011 0000 = PUBLISH, DUP=0, QoS=0, RETAIN=0
1120x31 = 0011 0001 = PUBLISH, DUP=0, QoS=0, RETAIN=1 (retained message)
1130x32 = 0011 0010 = PUBLISH, DUP=0, QoS=1, RETAIN=0
1140x33 = 0011 0011 = PUBLISH, DUP=0, QoS=1, RETAIN=1
1150x34 = 0011 0100 = PUBLISH, DUP=0, QoS=2, RETAIN=0
1160x3A = 0011 1010 = PUBLISH, DUP=1, QoS=1, RETAIN=0 (re-delivery)
117```
118
119#### SUBSCRIBE Flags (Message Type 8)
120
121For SUBSCRIBE, flags MUST be `0010` (bit 1 = 1):
122
123```
124Fixed header byte 1: 0x82 = 1000 0010
125```
126
127This is mandated by the MQTT specification. Other flag values are protocol violations.
128
129#### UNSUBSCRIBE Flags (Message Type 10)
130
131For UNSUBSCRIBE, flags MUST be `0010` (bit 1 = 1):
132
133```
134Fixed header byte 1: 0xA2 = 1010 0010
135```
136
137#### All Other Message Types
138
139For all other message types (CONNECT, CONNACK, PUBACK, SUBACK, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT), the flags MUST be `0000`:
140
141```
142CONNECT: 0x10 = 0001 0000
143CONNACK: 0x20 = 0010 0000
144PUBACK: 0x40 = 0100 0000
145SUBACK: 0x90 = 1001 0000
146UNSUBACK: 0xB0 = 1011 0000
147PINGREQ: 0xC0 = 1100 0000
148PINGRESP: 0xD0 = 1101 0000
149DISCONNECT: 0xE0 = 1110 0000
150```
151
152Any other flag values for these message types are protocol violations.
153
154### Summary Table
155
156| Message Type | Required Flags | Flag Meaning |
157|--------------|----------------|--------------|
158| CONNECT (1) | `0000` | Reserved |
159| CONNACK (2) | `0000` | Reserved |
160| **PUBLISH (3)** | **`DQQR`** | **D=DUP, QQ=QoS, R=RETAIN** |
161| PUBACK (4) | `0000` | Reserved |
162| SUBSCRIBE (8) | `0010` | Fixed |
163| SUBACK (9) | `0000` | Reserved |
164| UNSUBSCRIBE (10) | `0010` | Fixed |
165| UNSUBACK (11) | `0000` | Reserved |
166| PINGREQ (12) | `0000` | Reserved |
167| PINGRESP (13) | `0000` | Reserved |
168| DISCONNECT (14) | `0000` | Reserved |
169
170**Key Point**: Only PUBLISH messages use flags meaningfully. SUBSCRIBE and UNSUBSCRIBE have fixed flag values, and all other message types must have flags set to 0.
171
172### Remaining Length Encoding
173
174The remaining length uses a variable-length encoding (1-4 bytes):
175- Each byte encodes 7 bits of data and 1 continuation bit
176- Bit 7 = continuation bit (1 = more bytes follow, 0 = last byte)
177- Bits 6-0 = length value
178
179Example decoding:
180```
1810x7F = 127 bytes
1820x80 0x01 = 128 bytes
1830xFF 0x7F = 16,383 bytes
1840xFF 0xFF 0x7F = 2,097,151 bytes
185```
186
187## Data Types
188
189### UTF-8 String
190
191**Important**: String length is a **fixed 2-byte unsigned integer (u16)**, NOT a variable-length integer like the Remaining Length field.
192
193#### Format
194```
195┌──────────────┬──────────────────┬─────────────────────────┐
196│ Length MSB │ Length LSB │ UTF-8 Encoded String │
197│ (byte 0) │ (byte 1) │ (N bytes) │
198└──────────────┴──────────────────┴─────────────────────────┘
199 ↑──── u16 big-endian ────↑
200
201Total size: 2 bytes + string length
202Maximum string length: 65,535 bytes (0xFFFF)
203```
204
205#### Encoding Details
206- **Length prefix**: 2 bytes, unsigned 16-bit integer
207- **Byte order**: Big-endian (MSB first, then LSB)
208- **String encoding**: UTF-8
209- **Maximum length**: 65,535 bytes (not 65,536 - zero-length strings are allowed)
210- This is **different** from the variable-length encoding used for "Remaining Length"
211
212#### Examples
213
214**Example 1: "MQTT" (4 bytes)**
215```
2160x00 0x04 'M' 'Q' 'T' 'T'
217
218Breakdown:
2190x00 0x04 = length 4 (u16 big-endian)
220'M' 'Q' 'T' 'T' = 0x4D 0x51 0x54 0x54
221```
222
223**Example 2: "ha-client" (9 bytes)**
224```
2250x00 0x09 'h' 'a' '-' 'c' 'l' 'i' 'e' 'n' 't'
226
227Breakdown:
2280x00 0x09 = length 9
229```
230
231**Example 3: "homeassistant/sensor/temperature/state" (39 bytes)**
232```
2330x00 0x27 'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' ...
234
235Breakdown:
2360x00 0x27 = length 39 (0x27 = 39 in decimal)
237```
238
239**Example 4: Empty string (0 bytes)**
240```
2410x00 0x00
242
243Breakdown:
2440x00 0x00 = length 0 (valid in MQTT)
245```
246
247**Example 5: Long string (300 bytes)**
248```
2490x01 0x2C ...300 bytes of UTF-8 data...
250
251Breakdown:
2520x01 0x2C = length 300 (256 + 44 = 300)
253```
254
255#### Parsing String in Rust
256```rust
257fn parse_mqtt_string(buf: &[u8]) -> Result<(&str, &[u8])> {
258 // Read u16 big-endian length
259 if buf.len() < 2 {
260 return Err(Error::InsufficientData);
261 }
262 let len = u16::from_be_bytes([buf[0], buf[1]]) as usize;
263
264 // Read string data
265 if buf.len() < 2 + len {
266 return Err(Error::InsufficientData);
267 }
268 let string_bytes = &buf[2..2 + len];
269 let string = core::str::from_utf8(string_bytes)?;
270 let remaining = &buf[2 + len..];
271
272 Ok((string, remaining))
273}
274```
275
276#### Writing String in Rust
277```rust
278fn write_mqtt_string(buf: &mut [u8], s: &str) -> Result<usize> {
279 let len = s.len();
280 if len > 65535 {
281 return Err(Error::StringTooLong);
282 }
283 if buf.len() < 2 + len {
284 return Err(Error::InsufficientBuffer);
285 }
286
287 // Write u16 big-endian length
288 buf[0] = (len >> 8) as u8; // MSB
289 buf[1] = (len & 0xFF) as u8; // LSB
290
291 // Write string bytes
292 buf[2..2 + len].copy_from_slice(s.as_bytes());
293
294 Ok(2 + len)
295}
296```
297
298### Binary Data
299Same format as UTF-8 string (2-byte u16 length prefix + data), but contains raw binary data instead of UTF-8 text. Used for passwords and message payloads.
300
301## Building MQTT Messages: Handling the Length Challenge
302
303### The Problem
304
305The fixed header contains the "Remaining Length" field which specifies the number of bytes following the fixed header. But you need to write the fixed header first, before you know how long the rest of the message will be.
306
307### Solution Approaches
308
309#### Approach 1: Calculate Length First (Recommended for Embedded)
310
311Calculate the message size before writing anything. This avoids buffering the entire message.
312
313```rust
314// Step 1: Calculate sizes
315let client_id = "ha-sensor";
316let topic = "homeassistant/status";
317
318let client_id_len = 2 + client_id.len(); // u16 prefix + string
319let topic_len = 2 + topic.len();
320
321// For CONNECT: protocol name + level + flags + keep-alive + payload
322let remaining_length =
323 (2 + 4) + // Protocol name "MQTT" (2 byte len + 4 bytes)
324 1 + // Protocol level
325 1 + // Connect flags
326 2 + // Keep-alive
327 client_id_len; // Client ID
328
329// Step 2: Write fixed header with calculated length
330let control_byte = 0x10; // CONNECT
331write_u8(buf, control_byte);
332write_varint(buf, remaining_length);
333
334// Step 3: Write variable header and payload
335write_mqtt_string(buf, "MQTT");
336write_u8(buf, 0x04); // Protocol level
337write_u8(buf, 0x02); // Connect flags
338write_u16_be(buf, 60); // Keep-alive
339write_mqtt_string(buf, client_id);
340```
341
342**Pros**:
343- No buffering needed (good for embedded)
344- Direct write to TCP socket
345- Minimal memory usage
346
347**Cons**:
348- Must calculate sizes carefully
349- Easy to make mistakes
350
351#### Approach 2: Reserve Space and Backfill
352
353Reserve maximum space for the header (5 bytes), write the message, then go back and fill in the actual length.
354
355```rust
356fn build_publish(buf: &mut [u8], topic: &str, payload: &[u8]) -> usize {
357 let mut pos = 0;
358
359 // Step 1: Reserve space for fixed header (max 5 bytes)
360 let header_start = pos;
361 pos += 5; // Maximum: 1 byte control + 4 bytes remaining length
362
363 // Step 2: Write variable header and payload
364 let payload_start = pos;
365 pos += write_mqtt_string(&mut buf[pos..], topic);
366 buf[pos..pos + payload.len()].copy_from_slice(payload);
367 pos += payload.len();
368
369 // Step 3: Calculate actual remaining length
370 let remaining_length = pos - payload_start;
371
372 // Step 4: Encode remaining length to temp buffer
373 let mut temp = [0u8; 4];
374 let varint_len = encode_varint(remaining_length as u32, &mut temp);
375
376 // Step 5: Backfill header (shift if needed)
377 let actual_header_len = 1 + varint_len;
378 let shift = 5 - actual_header_len;
379 if shift > 0 {
380 // Shift payload left to remove unused header bytes
381 buf.copy_within(payload_start..pos, payload_start - shift);
382 pos -= shift;
383 }
384
385 // Step 6: Write actual header
386 buf[0] = 0x30; // PUBLISH QoS 0
387 buf[1..1 + varint_len].copy_from_slice(&temp[..varint_len]);
388
389 pos
390}
391```
392
393**Pros**:
394- Single pass through data
395- No size calculation needed upfront
396
397**Cons**:
398- May need to shift data (costly on embedded)
399- Wastes up to 4 bytes initially
400- More complex
401
402#### Approach 3: Two-Buffer Strategy
403
404Write payload to one buffer, then construct final message.
405
406```rust
407fn build_connect(out: &mut [u8]) -> usize {
408 let mut payload_buf = [0u8; 256];
409 let mut payload_pos = 0;
410
411 // Step 1: Build variable header + payload in temp buffer
412 payload_pos += write_mqtt_string(&mut payload_buf[payload_pos..], "MQTT");
413 payload_buf[payload_pos] = 0x04; payload_pos += 1; // Protocol level
414 payload_buf[payload_pos] = 0x02; payload_pos += 1; // Connect flags
415 payload_pos += write_u16_be(&mut payload_buf[payload_pos..], 60); // Keep-alive
416 payload_pos += write_mqtt_string(&mut payload_buf[payload_pos..], "ha-client");
417
418 let remaining_length = payload_pos;
419
420 // Step 2: Write fixed header to output
421 let mut pos = 0;
422 out[pos] = 0x10; pos += 1; // CONNECT
423 pos += write_varint(&mut out[pos..], remaining_length as u32);
424
425 // Step 3: Copy payload to output
426 out[pos..pos + payload_pos].copy_from_slice(&payload_buf[..payload_pos]);
427 pos + payload_pos
428}
429```
430
431**Pros**:
432- Simple and clear
433- No backfilling needed
434
435**Cons**:
436- Uses 2x memory (bad for embedded)
437- Extra copy operation
438
439#### Approach 4: Builder Pattern with Deferred Write
440
441Build a message description, calculate size, then serialize.
442
443```rust
444struct ConnectMessage<'a> {
445 client_id: &'a str,
446 clean_session: bool,
447 keep_alive: u16,
448}
449
450impl<'a> ConnectMessage<'a> {
451 fn calculate_size(&self) -> usize {
452 let mut size = 0;
453 size += 2 + 4; // "MQTT"
454 size += 1; // Protocol level
455 size += 1; // Connect flags
456 size += 2; // Keep-alive
457 size += 2 + self.client_id.len(); // Client ID
458 size
459 }
460
461 fn serialize(&self, buf: &mut [u8]) -> usize {
462 let remaining_length = self.calculate_size();
463 let mut pos = 0;
464
465 // Write fixed header
466 buf[pos] = 0x10; pos += 1;
467 pos += write_varint(&mut buf[pos..], remaining_length as u32);
468
469 // Write variable header + payload
470 pos += write_mqtt_string(&mut buf[pos..], "MQTT");
471 buf[pos] = 0x04; pos += 1;
472 let flags = if self.clean_session { 0x02 } else { 0x00 };
473 buf[pos] = flags; pos += 1;
474 pos += write_u16_be(&mut buf[pos..], self.keep_alive);
475 pos += write_mqtt_string(&mut buf[pos..], self.client_id);
476
477 pos
478 }
479}
480```
481
482**Pros**:
483- Clean API
484- Size calculation is explicit and testable
485- No wasted space or copying
486
487**Cons**:
488- More code
489- Need to keep calculation in sync with serialization
490
491### Recommended Approach for Embedded Systems
492
493For embedded systems like RP2350 with Embassy:
494
495**Use Approach 1 (Calculate First) or Approach 4 (Builder Pattern)**
496
497```rust
498// Example: Helper to calculate MQTT string size
499fn mqtt_string_size(s: &str) -> usize {
500 2 + s.len()
501}
502
503// Example: Build PUBLISH message
504fn build_publish_qos0(
505 buf: &mut [u8],
506 topic: &str,
507 payload: &[u8]
508) -> usize {
509 // Calculate remaining length
510 let remaining_length =
511 mqtt_string_size(topic) + // Topic
512 payload.len(); // Payload
513
514 let mut pos = 0;
515
516 // Write fixed header
517 buf[pos] = 0x30; pos += 1; // PUBLISH QoS 0
518 pos += write_varint(&mut buf[pos..], remaining_length as u32);
519
520 // Write variable header
521 pos += write_mqtt_string(&mut buf[pos..], topic);
522
523 // Write payload
524 buf[pos..pos + payload.len()].copy_from_slice(payload);
525 pos += payload.len();
526
527 pos
528}
529```
530
531### Key Tips
532
5331. **Create size calculation helpers**: Make functions like `mqtt_string_size()`, `connect_size()` to avoid mistakes
534
5352. **Maximum varint size**: The remaining length varint can be 1-4 bytes. For small messages (<128 bytes), it's always 1 byte. This is predictable.
536
5373. **QoS 0 PUBLISH is simplest**: No packet identifier needed, making size calculation trivial
538
5394. **Test your calculations**: Write unit tests comparing calculated size vs actual serialized size
540
5415. **Consider const generics**: For fixed message types, you can calculate sizes at compile time
542
543## CONNECT Message
544
545Client initiates connection to broker.
546
547### Fixed Header
548```
549Byte 1: 0x10 (Message Type = 1, Flags = 0)
550Byte 2+: Remaining Length
551```
552
553### Variable Header
554```
555┌─────────────────────────────────────┐
556│ Protocol Name (UTF-8 String) │
557│ "MQTT" (0x00 0x04 0x4D 0x51 0x54 0x54) for MQTT 3.1.1
558├─────────────────────────────────────┤
559│ Protocol Level (1 byte) │
560│ 0x04 for MQTT 3.1.1 │
561│ 0x05 for MQTT 5.0 │
562├─────────────────────────────────────┤
563│ Connect Flags (1 byte) │
564│ Bit 7: User Name Flag │
565│ Bit 6: Password Flag │
566│ Bit 5: Will Retain │
567│ Bit 4-3: Will QoS (2 bits) │
568│ Bit 2: Will Flag │
569│ Bit 1: Clean Session (v3.1.1) │
570│ Clean Start (v5.0) │
571│ Bit 0: Reserved (must be 0) │
572├─────────────────────────────────────┤
573│ Keep Alive (2 bytes, MSB first) │
574│ Seconds, 0 = disabled │
575└─────────────────────────────────────┘
576```
577
578### Payload (in order)
5791. **Client ID** (UTF-8 String) - Required
5802. **Will Topic** (UTF-8 String) - If Will Flag = 1
5813. **Will Payload** (Binary Data) - If Will Flag = 1
5824. **User Name** (UTF-8 String) - If User Name Flag = 1
5835. **Password** (Binary Data) - If Password Flag = 1
584
585
586#### Connect Flags Breakdown
587
588The Connect Flags byte (byte 10 of CONNECT message) controls authentication and the Last Will and Testament.
589
590**Last Will and Testament (LWT)**: A message the broker will automatically publish if the client disconnects unexpectedly (network failure, crash, etc.). This is useful for Home Assistant to detect when a device goes offline.
591
592**Connect Flags bit layout:**
593```
594Bit: 7 6 5 4 3 2 1 0
595 ┌──────┬──────┬──────┬──────┬──────┬──────┬──────────────┬──────┐
596 │ User │ Pass │ Will │ Will QoS │ Will │ Clean │ Res. │
597 │ Name │ word │Retain│ │ Flag │ Session │ (0) │
598 └──────┴──────┴──────┴──────┴──────┴──────┴──────────────┴──────┘
599```
600
601**Bits 4-3: Will QoS** (only applies if Bit 2 Will Flag = 1):
602- `00` (0) = QoS 0 for Will message (fire and forget)
603- `01` (1) = QoS 1 for Will message (acknowledged)
604- `10` (2) = QoS 2 for Will message (exactly once)
605- `11` (3) = Invalid, must not be used
606
607**Important**: If Will Flag (bit 2) = 0, then Will QoS MUST be set to 00 and Will Retain (bit 5) MUST be 0.
608
609#### Connect Flags Examples
610
611**Example 1: Minimal connection (no Will, no auth)**
612```
613Binary: 0000 0010
614Hex: 0x02
615- User Name Flag: 0 (no username)
616- Password Flag: 0 (no password)
617- Will Retain: 0
618- Will QoS: 00 (not used, Will Flag = 0)
619- Will Flag: 0 (no Will message)
620- Clean Session: 1
621- Reserved: 0
622```
623
624**Example 2: With Last Will (QoS 0, no retain)**
625```
626Binary: 0000 0110
627Hex: 0x06
628- User Name Flag: 0
629- Password Flag: 0
630- Will Retain: 0 (don't retain Will message)
631- Will QoS: 00 (QoS 0 for Will)
632- Will Flag: 1 (Will message enabled)
633- Clean Session: 1
634- Reserved: 0
635
636Must include Will Topic and Will Payload in CONNECT payload.
637```
638
639**Example 3: With Last Will (QoS 1, retain)**
640```
641Binary: 0011 0110
642Hex: 0x36
643- User Name Flag: 0
644- Password Flag: 0
645- Will Retain: 1 (broker retains Will message)
646- Will QoS: 01 (QoS 1 for Will)
647- Will Flag: 1 (Will message enabled)
648- Clean Session: 1
649- Reserved: 0
650
651Will message will be retained and delivered with QoS 1.
652```
653
654**Example 4: With authentication and Last Will**
655```
656Binary: 1100 0110
657Hex: 0xC6
658- User Name Flag: 1 (username provided)
659- Password Flag: 1 (password provided)
660- Will Retain: 0
661- Will QoS: 00 (QoS 0 for Will)
662- Will Flag: 1 (Will message enabled)
663- Clean Session: 1
664- Reserved: 0
665
666Must include Username, Password, Will Topic, and Will Payload in payload.
667```
668
669### Example CONNECT (Simple)
670```
671Client ID: "ha-client"
672Clean Session: true
673Keep Alive: 60 seconds
674No Will, No Auth
675
6760x10 // Fixed header: CONNECT
6770x17 // Remaining length: 23 bytes
6780x00 0x04 'M' 'Q' 'T' 'T' // Protocol name
6790x04 // Protocol level: 3.1.1
6800x02 // Connect flags: 0000 0010 = Clean Session only
6810x00 0x3C // Keep Alive: 60 seconds
6820x00 0x09 'h' 'a' '-' 'c' 'l' 'i' 'e' 'n' 't' // Client ID: "ha-client"
683```
684
685### Example CONNECT (With Last Will)
686```
687Client ID: "sensor1"
688Clean Session: true
689Keep Alive: 60 seconds
690Will Topic: "homeassistant/sensor1/availability"
691Will Payload: "offline"
692Will QoS: 0
693Will Retain: true
694
6950x10 // Fixed header: CONNECT
6960x3C // Remaining length: 60 bytes
6970x00 0x04 'M' 'Q' 'T' 'T' // Protocol name: "MQTT"
6980x04 // Protocol level: 3.1.1
6990x26 // Connect flags: 0010 0110
700 // Bit 5: Will Retain = 1
701 // Bit 4-3: Will QoS = 00
702 // Bit 2: Will Flag = 1
703 // Bit 1: Clean Session = 1
7040x00 0x3C // Keep Alive: 60 seconds
7050x00 0x07 's' 'e' 'n' 's' 'o' 'r' '1' // Client ID: "sensor1"
7060x00 0x25 // Will Topic length: 37 bytes
707'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' 'a' 'n' 't' '/'
708's' 'e' 'n' 's' 'o' 'r' '1' '/'
709'a' 'v' 'a' 'i' 'l' 'a' 'b' 'i' 'l' 'i' 't' 'y'
7100x00 0x07 // Will Payload length: 7 bytes
711'o' 'f' 'f' 'l' 'i' 'n' 'e' // Will Payload: "offline"
712```
713
714When this client disconnects unexpectedly, the broker will publish:
715- Topic: `homeassistant/sensor1/availability`
716- Payload: `offline`
717- QoS: 0
718- Retained: Yes (so Home Assistant sees it even if it reconnects later)
719
720## DISCONNECT Message
721
722Client graceful disconnect (MQTT 3.1.1).
723
724### Fixed Header
725```
726Byte 1: 0xE0 (Message Type = 14, Flags = 0)
727Byte 2: 0x00 (Remaining Length = 0)
728```
729
730No Variable Header or Payload in MQTT 3.1.1.
731
732### Complete Message
733```
7340xE0 0x00
735```
736
737## PUBLISH Message
738
739Publish message to a topic.
740
741### Fixed Header
742```
743Byte 1: 0x3X (Message Type = 3, Flags in bits 0-3)
744 Bit 3: DUP flag (duplicate delivery)
745 Bit 2-1: QoS level (00, 01, or 10)
746 Bit 0: RETAIN flag
747
748Examples:
749 0x30 = PUBLISH, QoS 0, no retain
750 0x31 = PUBLISH, QoS 0, retain
751 0x32 = PUBLISH, QoS 1, no retain
752 0x34 = PUBLISH, QoS 2, no retain
753
754Byte 2+: Remaining Length
755```
756
757### Variable Header
758```
759┌─────────────────────────────────────┐
760│ Topic Name (UTF-8 String) │
761├─────────────────────────────────────┤
762│ Packet Identifier (2 bytes) │
763│ Only present if QoS > 0 │
764└─────────────────────────────────────┘
765```
766
767### Payload
768Raw message data (can be text or binary).
769
770### Example PUBLISH
771```
772Topic: "homeassistant/sensor/temp/state"
773Payload: "23.5"
774QoS: 0
775Retain: false
776
7770x30 // Fixed header: PUBLISH, QoS 0
7780x29 // Remaining length: 41 bytes
7790x00 0x23 // Topic length: 35 bytes
780'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' 'a' 'n' 't' '/'
781's' 'e' 'n' 's' 'o' 'r' '/' 't' 'e' 'm' 'p' '/'
782's' 't' 'a' 't' 'e'
783'2' '3' '.' '5' // Payload: "23.5"
784```
785
786### Example PUBLISH (QoS 1)
787```
788Topic: "homeassistant/switch/state"
789Payload: "ON"
790QoS: 1
791Packet ID: 1
792
7930x32 // Fixed header: PUBLISH, QoS 1
7940x1F // Remaining length
7950x00 0x1B // Topic length: 27 bytes
796'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' 'a' 'n' 't' '/'
797's' 'w' 'i' 't' 'c' 'h' '/' 's' 't' 'a' 't' 'e'
7980x00 0x01 // Packet Identifier: 1
799'O' 'N' // Payload
800```
801
802## SUBSCRIBE Message
803
804Subscribe to one or more topics.
805
806### Fixed Header
807```
808Byte 1: 0x82 (Message Type = 8, Flags = 0010)
809 Bit 1 MUST be 1 for SUBSCRIBE
810Byte 2+: Remaining Length
811```
812
813### Variable Header
814```
815┌─────────────────────────────────────┐
816│ Packet Identifier (2 bytes, MSB) │
817└─────────────────────────────────────┘
818```
819
820### Payload
821List of topic filters with QoS levels:
822```
823For each subscription:
824┌─────────────────────────────────────┐
825│ Topic Filter (UTF-8 String) │
826│ Supports wildcards: │
827│ + = single level wildcard │
828│ # = multi level wildcard │
829├─────────────────────────────────────┤
830│ QoS (1 byte) │
831│ Bits 7-2: Reserved (must be 0) │
832│ Bits 1-0: QoS level (0, 1, or 2) │
833└─────────────────────────────────────┘
834```
835
836### Example SUBSCRIBE
837```
838Subscribe to: "homeassistant/#" (all topics under homeassistant/)
839QoS: 0
840Packet ID: 2
841
8420x82 // Fixed header: SUBSCRIBE
8430x13 // Remaining length: 19 bytes
8440x00 0x02 // Packet Identifier: 2
8450x00 0x0F // Topic length: 15 bytes
846'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' 'a' 'n' 't' '/' '#'
8470x00 // QoS: 0
848```
849
850### Example SUBSCRIBE (Multiple Topics)
851```
852Subscribe to:
853 1. "homeassistant/status" QoS 0
854 2. "homeassistant/+/state" QoS 1
855Packet ID: 3
856
8570x82 // Fixed header
8580x37 // Remaining length
8590x00 0x03 // Packet Identifier: 3
8600x00 0x16 // Topic 1 length: 22 bytes
861'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' 'a' 'n' 't' '/'
862's' 't' 'a' 't' 'u' 's'
8630x00 // QoS: 0
8640x00 0x16 // Topic 2 length: 22 bytes
865'h' 'o' 'm' 'e' 'a' 's' 's' 'i' 's' 't' 'a' 'n' 't' '/'
866'+' '/' 's' 't' 'a' 't' 'e'
8670x01 // QoS: 1
868```
869
870## QoS Levels
871
872Home Assistant typically uses:
873- **QoS 0**: At most once delivery (fire and forget)
874- **QoS 1**: At least once delivery (requires PUBACK)
875- **QoS 2**: Exactly once delivery (requires PUBREC, PUBREL, PUBCOMP)
876
877For most Home Assistant integrations, QoS 0 is sufficient. Use QoS 1 for important state changes.
878
879## Topic Wildcards
880
881- `+` (single-level): Matches one topic level
882 - `homeassistant/+/state` matches `homeassistant/sensor/state` and `homeassistant/switch/state`
883- `#` (multi-level): Matches zero or more levels, must be last character
884 - `homeassistant/#` matches everything under `homeassistant/`
885
886## Home Assistant Topic Structure
887
888Common Home Assistant MQTT topics:
889```
890homeassistant/<component>/<node_id>/<object_id>/config
891homeassistant/<component>/<node_id>/<object_id>/state
892homeassistant/<component>/<node_id>/<object_id>/command
893homeassistant/status
894```
895
896Example:
897```
898homeassistant/sensor/living_room/temperature/config
899homeassistant/sensor/living_room/temperature/state
900homeassistant/switch/bedroom/light/config
901homeassistant/switch/bedroom/light/state
902homeassistant/switch/bedroom/light/command
903```
904
905## Typical Flow for Home Assistant Device
906
9071. **Connect** to broker
9082. **Subscribe** to:
909 - `homeassistant/status` (to detect HA restarts)
910 - Command topics for your devices (e.g., `homeassistant/switch/+/command`)
9113. **Publish** discovery configs to `homeassistant/.../config` with retain=true
9124. **Publish** state updates to `homeassistant/.../state`
9135. **Receive** commands on subscribed topics
9146. On disconnect, broker sends Will message if configured
915
916## Keep-Alive and PINGREQ/PINGRESP
917
918If no messages are sent within keep-alive period:
919- Client sends PINGREQ: `0xC0 0x00`
920- Broker responds with PINGRESP: `0xD0 0x00`
921
922If broker doesn't receive any message within 1.5x keep-alive, it disconnects the client.
923
924## Broker Response Messages
925
926These are messages the broker sends back to the client that you need to parse.
927
928### CONNACK (Connection Acknowledgment)
929
930Broker response to CONNECT.
931
932#### Fixed Header
933```
934Byte 1: 0x20 (Message Type = 2, Flags = 0)
935Byte 2: 0x02 (Remaining Length = 2)
936```
937
938#### Variable Header
939```
940┌─────────────────────────────────────┐
941│ Connect Acknowledge Flags (1 byte) │
942│ Bit 0: Session Present │
943│ Bits 7-1: Reserved (must be 0) │
944├─────────────────────────────────────┤
945│ Connect Return Code (1 byte) │
946│ 0x00 = Connection Accepted │
947│ 0x01 = Unacceptable protocol ver │
948│ 0x02 = Identifier rejected │
949│ 0x03 = Server unavailable │
950│ 0x04 = Bad username/password │
951│ 0x05 = Not authorized │
952└─────────────────────────────────────┘
953```
954
955#### Example CONNACK (Success)
956```
9570x20 0x02 // Fixed header
9580x00 // Session Present = 0
9590x00 // Return code = 0 (accepted)
960```
961
962#### Example CONNACK (Connection Refused)
963```
9640x20 0x02 // Fixed header
9650x00 // Session Present = 0
9660x05 // Return code = 5 (not authorized)
967```
968
969**Implementation**: Check return code is 0x00 before proceeding. If non-zero, connection failed.
970
971---
972
973### PUBACK (Publish Acknowledgment)
974
975Broker response to PUBLISH with QoS 1.
976
977#### Fixed Header
978```
979Byte 1: 0x40 (Message Type = 4, Flags = 0)
980Byte 2: 0x02 (Remaining Length = 2)
981```
982
983#### Variable Header
984```
985┌─────────────────────────────────────┐
986│ Packet Identifier (2 bytes, MSB) │
987│ Must match the PUBLISH packet ID │
988└─────────────────────────────────────┘
989```
990
991#### Example PUBACK
992```
993For PUBLISH with Packet ID = 42 (0x002A):
994
9950x40 0x02 // Fixed header
9960x00 0x2A // Packet Identifier: 42
997```
998
999**Implementation**: Match Packet ID with the PUBLISH you sent. Once received, the message is acknowledged.
1000
1001---
1002
1003### SUBACK (Subscribe Acknowledgment)
1004
1005Broker response to SUBSCRIBE.
1006
1007#### Fixed Header
1008```
1009Byte 1: 0x90 (Message Type = 9, Flags = 0)
1010Byte 2+: Remaining Length
1011```
1012
1013#### Variable Header
1014```
1015┌─────────────────────────────────────┐
1016│ Packet Identifier (2 bytes, MSB) │
1017│ Must match the SUBSCRIBE packet ID │
1018└─────────────────────────────────────┘
1019```
1020
1021#### Payload
1022One return code for each topic filter in the SUBSCRIBE request:
1023```
1024┌─────────────────────────────────────┐
1025│ Return Code (1 byte per topic) │
1026│ 0x00 = Success, QoS 0 │
1027│ 0x01 = Success, QoS 1 │
1028│ 0x02 = Success, QoS 2 │
1029│ 0x80 = Failure │
1030└─────────────────────────────────────┘
1031```
1032
1033#### Example SUBACK (Single Topic)
1034```
1035For SUBSCRIBE with Packet ID = 2, one topic requesting QoS 0:
1036
10370x90 0x03 // Fixed header, remaining = 3 bytes
10380x00 0x02 // Packet Identifier: 2
10390x00 // Return code: Success, QoS 0
1040```
1041
1042#### Example SUBACK (Multiple Topics)
1043```
1044For SUBSCRIBE with Packet ID = 3, two topics:
1045
10460x90 0x04 // Fixed header, remaining = 4 bytes
10470x00 0x03 // Packet Identifier: 3
10480x00 // Topic 1: Success, QoS 0
10490x01 // Topic 2: Success, QoS 1
1050```
1051
1052#### Example SUBACK (Failed Subscription)
1053```
10540x90 0x03 // Fixed header
10550x00 0x04 // Packet Identifier: 4
10560x80 // Return code: Failure
1057```
1058
1059**Implementation**: Check each return code. 0x80 means that topic subscription failed.
1060
1061---
1062
1063### UNSUBACK (Unsubscribe Acknowledgment)
1064
1065Broker response to UNSUBSCRIBE.
1066
1067#### Fixed Header
1068```
1069Byte 1: 0xB0 (Message Type = 11, Flags = 0)
1070Byte 2: 0x02 (Remaining Length = 2)
1071```
1072
1073#### Variable Header
1074```
1075┌─────────────────────────────────────┐
1076│ Packet Identifier (2 bytes, MSB) │
1077│ Must match UNSUBSCRIBE packet ID │
1078└─────────────────────────────────────┘
1079```
1080
1081#### Example UNSUBACK
1082```
1083For UNSUBSCRIBE with Packet ID = 5:
1084
10850xB0 0x02 // Fixed header
10860x00 0x05 // Packet Identifier: 5
1087```
1088
1089**Implementation**: Once received, unsubscription is confirmed.
diff --git a/embedded-mqtt/src/connect_code.rs b/embedded-mqtt/src/connect_code.rs
new file mode 100644
index 0000000..148eff6
--- /dev/null
+++ b/embedded-mqtt/src/connect_code.rs
@@ -0,0 +1,58 @@
1use crate::protocol;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum ConnectCode {
5 ConnectionAccepted,
6 UnacceptableProtocolVersion,
7 IdentifierRejected,
8 ServerUnavailable,
9 BadUsernamePassword,
10 NotAuthorized,
11 Unknown(u8),
12}
13
14impl core::fmt::Display for ConnectCode {
15 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
16 match self {
17 ConnectCode::ConnectionAccepted => write!(f, "Connection Accepted"),
18 ConnectCode::UnacceptableProtocolVersion => write!(f, "Unacceptable Protocol Version"),
19 ConnectCode::IdentifierRejected => write!(f, "Identifier Rejected"),
20 ConnectCode::ServerUnavailable => write!(f, "Server Unavailable"),
21 ConnectCode::BadUsernamePassword => write!(f, "Bad Username or Password"),
22 ConnectCode::NotAuthorized => write!(f, "Not Authorized"),
23 ConnectCode::Unknown(code) => write!(f, "Unknown({})", code),
24 }
25 }
26}
27
28impl From<u8> for ConnectCode {
29 fn from(value: u8) -> Self {
30 match value {
31 protocol::CONNACK_CODE_ACCEPTED => ConnectCode::ConnectionAccepted,
32 protocol::CONNACK_CODE_UNACCEPTABLE_PROTOCOL_VERSION => {
33 ConnectCode::UnacceptableProtocolVersion
34 }
35 protocol::CONNACK_CODE_IDENTIFIER_REJECTED => ConnectCode::IdentifierRejected,
36 protocol::CONNACK_CODE_SERVER_UNAVAILABLE => ConnectCode::ServerUnavailable,
37 protocol::CONNACK_CODE_BAD_USERNAME_PASSWORD => ConnectCode::BadUsernamePassword,
38 protocol::CONNACK_CODE_NOT_AUTHORIZED => ConnectCode::NotAuthorized,
39 code => ConnectCode::Unknown(code),
40 }
41 }
42}
43
44impl From<ConnectCode> for u8 {
45 fn from(value: ConnectCode) -> Self {
46 match value {
47 ConnectCode::ConnectionAccepted => protocol::CONNACK_CODE_ACCEPTED,
48 ConnectCode::UnacceptableProtocolVersion => {
49 protocol::CONNACK_CODE_UNACCEPTABLE_PROTOCOL_VERSION
50 }
51 ConnectCode::IdentifierRejected => protocol::CONNACK_CODE_IDENTIFIER_REJECTED,
52 ConnectCode::ServerUnavailable => protocol::CONNACK_CODE_SERVER_UNAVAILABLE,
53 ConnectCode::BadUsernamePassword => protocol::CONNACK_CODE_BAD_USERNAME_PASSWORD,
54 ConnectCode::NotAuthorized => protocol::CONNACK_CODE_NOT_AUTHORIZED,
55 ConnectCode::Unknown(code) => code,
56 }
57 }
58}
diff --git a/embedded-mqtt/src/field.rs b/embedded-mqtt/src/field.rs
new file mode 100644
index 0000000..d921b27
--- /dev/null
+++ b/embedded-mqtt/src/field.rs
@@ -0,0 +1,89 @@
1use core::{mem::MaybeUninit, ops::Deref};
2
3use crate::varint;
4
5const DEFAULT_FIELD_BUFFER_CAP: usize = 32;
6
7pub enum Field<'a> {
8 U8(u8),
9 U16(u16),
10 VarInt(u32),
11 Buffer(&'a [u8]),
12 LenPrefixedBuffer(&'a [u8]),
13 LenPrefixedString(&'a str),
14}
15
16pub struct FieldBuffer<'a, const N: usize = DEFAULT_FIELD_BUFFER_CAP> {
17 data: [MaybeUninit<Field<'a>>; N],
18 len: usize,
19}
20
21impl<'a, const N: usize> Default for FieldBuffer<'a, N> {
22 fn default() -> Self {
23 Self {
24 data: [const { MaybeUninit::uninit() }; N],
25 len: 0,
26 }
27 }
28}
29
30impl<'a, const N: usize> FieldBuffer<'a, N> {
31 pub fn clear(&mut self) {
32 self.len = 0;
33 }
34
35 pub fn push(&mut self, field: Field<'a>) {
36 assert!(self.len < N, "field buffer lenght limit exceeded");
37 self.data[self.len].write(field);
38 self.len += 1;
39 }
40
41 pub fn set(&mut self, n: usize, field: Field<'a>) {
42 assert!(self.len > n);
43 self.data[n].write(field);
44 }
45
46 pub fn as_slice<'s>(&'s self) -> &'s [Field<'a>] {
47 unsafe {
48 core::mem::transmute::<&'s [MaybeUninit<Field<'a>>], &'s [Field<'a>]>(
49 &self.data[..self.len],
50 )
51 }
52 }
53}
54
55impl<'a, const N: usize> AsRef<[Field<'a>]> for FieldBuffer<'a, N> {
56 fn as_ref(&self) -> &[Field<'a>] {
57 self.as_slice()
58 }
59}
60
61impl<'a, const N: usize> Deref for FieldBuffer<'a, N> {
62 type Target = [Field<'a>];
63
64 fn deref(&self) -> &Self::Target {
65 self.as_slice()
66 }
67}
68
69pub fn field_size(field: &Field) -> usize {
70 match field {
71 Field::U8(_) => 1,
72 Field::U16(_) => 2,
73 Field::VarInt(v) => {
74 let (_, n) = varint::encode(*v);
75 n
76 }
77 Field::Buffer(v) => v.len(),
78 Field::LenPrefixedBuffer(v) => v.len().strict_add(2),
79 Field::LenPrefixedString(v) => v.len().strict_add(2),
80 }
81}
82
83pub fn fields_size(fields: &[Field]) -> usize {
84 let mut total_size = 0usize;
85 for field in fields {
86 total_size = total_size.strict_add(field_size(field));
87 }
88 total_size
89}
diff --git a/embedded-mqtt/src/lib.rs b/embedded-mqtt/src/lib.rs
new file mode 100644
index 0000000..096af63
--- /dev/null
+++ b/embedded-mqtt/src/lib.rs
@@ -0,0 +1,486 @@
1#![no_std]
2
3mod connect_code;
4mod field;
5mod packet_id;
6mod protocol;
7mod qos;
8mod rx;
9mod transport;
10mod tx;
11mod varint;
12
13pub use connect_code::ConnectCode;
14use embedded_io_async::ReadExactError;
15pub use packet_id::PacketId;
16pub use qos::Qos;
17pub use transport::Transport;
18
19use crate::{field::FieldBuffer, transport::TransportExt as _};
20
21const DEFAULT_CLIENT_RX_BUFFER_SIZE: usize = 512;
22const DEFAULT_CLIENT_TX_BUFFER_SIZE: usize = 512;
23
24pub enum Error<T: Transport> {
25 Transport(T::Error),
26 TransportEOF,
27 InsufficientBufferSpace,
28 ProtocolError(&'static str),
29 ConnectFailed(ConnectCode),
30}
31
32impl<T: Transport> core::fmt::Debug for Error<T> {
33 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
34 match self {
35 Error::Transport(err) => f.debug_tuple("Transport").field(err).finish(),
36 Error::TransportEOF => f.write_str("TransportEOF"),
37 Error::InsufficientBufferSpace => f.write_str("InsufficientBufferSpace"),
38 Error::ProtocolError(msg) => f.debug_tuple("ProtocolError").field(msg).finish(),
39 Error::ConnectFailed(code) => f.debug_tuple("ConnectFailed").field(code).finish(),
40 }
41 }
42}
43
44impl<T: Transport> core::fmt::Display for Error<T> {
45 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
46 match self {
47 Error::Transport(err) => write!(f, "transport error: {:?}", err),
48 Error::TransportEOF => write!(f, "unexpected end of transport stream"),
49 Error::InsufficientBufferSpace => {
50 write!(f, "insufficient buffer space to receive packet")
51 }
52 Error::ProtocolError(msg) => write!(f, "MQTT protocol error: {}", msg),
53 Error::ConnectFailed(code) => write!(f, "connection failed: {}", code),
54 }
55 }
56}
57
58impl<T: Transport> core::error::Error for Error<T>
59where
60 T::Error: core::error::Error + 'static,
61{
62 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
63 match self {
64 Error::Transport(err) => Some(err),
65 _ => None,
66 }
67 }
68}
69
70#[derive(Debug, Default)]
71pub struct ConnectParams<'a> {
72 pub will_topic: Option<&'a str>,
73 pub will_payload: Option<&'a [u8]>,
74 pub username: Option<&'a str>,
75 pub password: Option<&'a [u8]>,
76 pub keepalive: Option<u16>,
77}
78
79#[derive(Debug, Default)]
80pub struct PublishParams {
81 pub qos: Qos,
82 pub retain: bool,
83}
84
85#[derive(Debug)]
86pub enum PublishData<'a> {
87 Inline(&'a [u8]),
88 Deferred(usize),
89}
90
91#[derive(Debug)]
92pub struct Publish<'a> {
93 pub topic: &'a str,
94 pub packet_id: Option<PacketId>,
95 pub qos: Qos,
96 pub retain: bool,
97 pub data_len: usize,
98}
99
100#[derive(Debug)]
101pub struct PublishAck {
102 pub packet_id: PacketId,
103}
104
105#[derive(Debug)]
106pub struct SubscribeAck {
107 pub packet_id: PacketId,
108 pub success: bool,
109}
110
111#[derive(Debug)]
112pub struct UnsubscribeAck {
113 pub packet_id: PacketId,
114}
115
116#[derive(Debug)]
117pub enum Packet<'a> {
118 Publish(Publish<'a>),
119 PublishAck(PublishAck),
120 SubscribeAck(SubscribeAck),
121 UnsubscribeAck(UnsubscribeAck),
122}
123
124pub struct ClientResources<
125 const RX: usize = DEFAULT_CLIENT_RX_BUFFER_SIZE,
126 const TX: usize = DEFAULT_CLIENT_TX_BUFFER_SIZE,
127> {
128 rx_buffer: [u8; RX],
129 tx_buffer: [u8; TX],
130}
131
132impl<const RX: usize, const TX: usize> Default for ClientResources<RX, TX> {
133 fn default() -> Self {
134 Self {
135 rx_buffer: [0u8; RX],
136 tx_buffer: [0u8; TX],
137 }
138 }
139}
140
141pub struct Client<'a, T> {
142 transport: T,
143 rx_buffer: &'a mut [u8],
144 rx_buffer_len: usize,
145 rx_buffer_skip: usize,
146 rx_buffer_data: usize,
147 tx_buffer: &'a mut [u8],
148 next_packet_id: u16,
149}
150
151impl<'a, T> Client<'a, T> {
152 pub fn new<const RX: usize, const TX: usize>(
153 resources: &'a mut ClientResources<RX, TX>,
154 transport: T,
155 ) -> Self {
156 Self {
157 transport,
158 rx_buffer: &mut resources.rx_buffer,
159 rx_buffer_len: 0,
160 rx_buffer_skip: 0,
161 rx_buffer_data: 0,
162 tx_buffer: &mut resources.tx_buffer,
163 next_packet_id: 1,
164 }
165 }
166}
167
168impl<'a, T> Client<'a, T>
169where
170 T: Transport,
171{
172 fn allocate_packet_id(&mut self) -> PacketId {
173 let packet_id = self.next_packet_id;
174 self.next_packet_id = self.next_packet_id.wrapping_add(1);
175 if self.next_packet_id == 0 {
176 self.next_packet_id = 1;
177 }
178 PacketId::from(packet_id)
179 }
180
181 pub async fn connect(&mut self, client_id: &str) -> Result<(), Error<T>> {
182 self.connect_with(client_id, Default::default()).await
183 }
184
185 pub async fn connect_with(
186 &mut self,
187 client_id: &str,
188 params: ConnectParams<'_>,
189 ) -> Result<(), Error<T>> {
190 let mut buffer = FieldBuffer::default();
191 tx::connect(
192 &mut buffer,
193 tx::Connect {
194 client_id,
195 clean_session: true,
196 username: params.username,
197 password: params.password,
198 will_topic: params.will_topic,
199 will_payload: params.will_payload,
200 will_retain: false,
201 keepalive: None,
202 },
203 );
204 self.transport
205 .write_fields(&buffer)
206 .await
207 .map_err(Error::Transport)?;
208 self.transport.flush().await.map_err(Error::Transport)?;
209
210 // Wait for CONNACK response
211 match self.receive_inner().await? {
212 rx::Packet::ConnAck {
213 session_present,
214 code,
215 } => {
216 if code == ConnectCode::ConnectionAccepted {
217 Ok(())
218 } else {
219 Err(Error::ConnectFailed(code))
220 }
221 }
222 _ => Err(Error::ProtocolError(
223 "expected CONNACK packet after CONNECT",
224 )),
225 }
226 }
227
228 pub async fn publish(&mut self, topic: &str, data: &[u8]) -> Result<PacketId, Error<T>> {
229 self.publish_with(topic, data, Default::default()).await
230 }
231
232 pub async fn publish_with(
233 &mut self,
234 topic: &str,
235 data: &[u8],
236 params: PublishParams,
237 ) -> Result<PacketId, Error<T>> {
238 let packet_id = if params.qos.to_u8() > 0 {
239 Some(self.allocate_packet_id())
240 } else {
241 None
242 };
243
244 let mut buffer = FieldBuffer::default();
245 tx::publish(
246 &mut buffer,
247 tx::Publish {
248 topic,
249 payload: data,
250 qos: params.qos,
251 retain: params.retain,
252 dup: false,
253 packet_id,
254 },
255 );
256
257 self.transport
258 .write_fields(&buffer)
259 .await
260 .map_err(Error::Transport)?;
261 self.transport.flush().await.map_err(Error::Transport)?;
262
263 Ok(packet_id.unwrap_or(PacketId::from(0)))
264 }
265
266 pub async fn publish_ack(&mut self, packet_id: PacketId, qos: Qos) -> Result<(), Error<T>> {
267 let mut buffer = FieldBuffer::default();
268
269 match qos {
270 Qos::AtMostOnce => {
271 // QoS 0: No acknowledgment needed
272 return Ok(());
273 }
274 Qos::AtLeastOnce => {
275 // QoS 1: Send PUBACK
276 tx::puback(&mut buffer, packet_id);
277 }
278 Qos::ExactlyOnce => todo!("not implemented"),
279 }
280
281 self.transport
282 .write_fields(&buffer)
283 .await
284 .map_err(Error::Transport)?;
285 self.transport.flush().await.map_err(Error::Transport)?;
286
287 Ok(())
288 }
289
290 pub async fn subscribe(&mut self, topic: &str) -> Result<PacketId, Error<T>> {
291 self.subscribe_with(topic, Qos::AtMostOnce).await
292 }
293
294 pub async fn subscribe_with(&mut self, topic: &str, qos: Qos) -> Result<PacketId, Error<T>> {
295 let packet_id = self.allocate_packet_id();
296
297 let mut buffer = FieldBuffer::default();
298 tx::subscribe(
299 &mut buffer,
300 tx::Subscribe {
301 topic,
302 qos,
303 packet_id,
304 },
305 );
306
307 self.transport
308 .write_fields(&buffer)
309 .await
310 .map_err(Error::Transport)?;
311 self.transport.flush().await.map_err(Error::Transport)?;
312
313 Ok(packet_id)
314 }
315
316 pub async fn unsubscribe(&mut self, topic: &str) -> Result<PacketId, Error<T>> {
317 let packet_id = self.allocate_packet_id();
318
319 let mut buffer = FieldBuffer::default();
320 tx::unsubscribe(&mut buffer, tx::Unsubscribe { topic, packet_id });
321
322 self.transport
323 .write_fields(&buffer)
324 .await
325 .map_err(Error::Transport)?;
326 self.transport.flush().await.map_err(Error::Transport)?;
327
328 Ok(packet_id)
329 }
330
331 async fn receive_inner<'s>(&'s mut self) -> Result<rx::Packet<'s>, Error<T>> {
332 self.skip_if_required();
333 self.discard_data().await?;
334
335 loop {
336 let buf = &self.rx_buffer[..self.rx_buffer_len];
337 match rx::decode(buf) {
338 Ok(_) => {
339 // NOTE: stupid workaround for borrow checker, should not
340 // need to decode twice
341 let buf = &self.rx_buffer[..self.rx_buffer_len];
342 let (packet, n) = rx::decode(buf).unwrap();
343 self.rx_buffer_skip = n;
344 if let rx::Packet::Publish { data_len, .. } = &packet {
345 self.rx_buffer_data = *data_len;
346 }
347 return Ok(packet);
348 }
349 Err(err) => match err {
350 rx::Error::NeedMoreData => {
351 if self.rx_buffer.len() == self.rx_buffer_len {
352 return Err(Error::InsufficientBufferSpace);
353 }
354 }
355 rx::Error::InvalidPacket(msg) => return Err(Error::ProtocolError(msg)),
356 rx::Error::UnsupportedPacket { packet_type, .. } => {
357 return Err(Error::ProtocolError("unsupported packet type"));
358 }
359 rx::Error::UnknownPacket { packet_type, .. } => {
360 return Err(Error::ProtocolError("unknown packet type"));
361 }
362 },
363 }
364
365 self.fill_rx_buffer().await?;
366 }
367 }
368
369 pub async fn receive<'s>(&'s mut self) -> Result<Packet<'s>, Error<T>> {
370 match self.receive_inner().await? {
371 rx::Packet::ConnAck { .. } => {
372 return Err(Error::ProtocolError("unexpected CONNACK packet"));
373 }
374 rx::Packet::Publish {
375 topic,
376 packet_id,
377 qos,
378 retain,
379 dup: _dup,
380 data_len,
381 } => {
382 return Ok(Packet::Publish(Publish {
383 topic,
384 packet_id,
385 qos,
386 retain,
387 data_len,
388 }));
389 }
390 rx::Packet::PubAck { packet_id } => {
391 return Ok(Packet::PublishAck(PublishAck { packet_id }));
392 }
393 rx::Packet::SubscribeAck { packet_id, success } => {
394 return Ok(Packet::SubscribeAck(SubscribeAck { packet_id, success }));
395 }
396 rx::Packet::UnsubscribeAck { packet_id } => {
397 return Ok(Packet::UnsubscribeAck(UnsubscribeAck { packet_id }));
398 }
399 }
400 }
401
402 pub async fn receive_data(&mut self, buf: &mut [u8]) -> Result<(), Error<T>> {
403 self.skip_if_required();
404 if buf.len() != self.rx_buffer_data {
405 return Err(Error::InsufficientBufferSpace);
406 }
407
408 assert_eq!(self.rx_buffer_skip, 0);
409 let from_buffer = self.rx_buffer_data.min(self.rx_buffer_len);
410 let from_transport = self.rx_buffer_data.strict_sub(from_buffer);
411
412 buf[..from_buffer].copy_from_slice(&self.rx_buffer[..from_buffer]);
413 self.rx_buffer_len -= from_buffer;
414
415 if from_transport > 0 {
416 assert_eq!(self.rx_buffer_len, 0);
417 self.transport
418 .read_exact(&mut buf[from_buffer..])
419 .await
420 .map_err(|err| match err {
421 ReadExactError::UnexpectedEof => Error::<T>::TransportEOF,
422 ReadExactError::Other(e) => Error::Transport(e),
423 })?;
424 }
425 self.rx_buffer_data = 0;
426
427 Ok(())
428 }
429
430 pub async fn disconnect(&mut self) -> Result<(), Error<T>> {
431 let mut buffer = FieldBuffer::default();
432 tx::disconnect(&mut buffer);
433
434 self.transport
435 .write_fields(&buffer)
436 .await
437 .map_err(Error::Transport)?;
438 self.transport.flush().await.map_err(Error::Transport)?;
439
440 Ok(())
441 }
442
443 async fn fill_rx_buffer(&mut self) -> Result<(), Error<T>> {
444 let n = self
445 .transport
446 .read(&mut self.rx_buffer[self.rx_buffer_len..])
447 .await
448 .map_err(Error::Transport)?;
449 if n == 0 {
450 return Err(Error::TransportEOF);
451 }
452 self.rx_buffer_len += n;
453
454 Ok(())
455 }
456
457 fn skip_if_required(&mut self) {
458 assert!(self.rx_buffer_len >= self.rx_buffer_skip);
459 if self.rx_buffer_skip != 0 {
460 self.rx_buffer.copy_within(self.rx_buffer_skip.., 0);
461 self.rx_buffer_len = self.rx_buffer_len.strict_sub(self.rx_buffer_skip);
462 self.rx_buffer_skip = 0;
463 }
464 }
465
466 async fn discard_data(&mut self) -> Result<(), Error<T>> {
467 if self.rx_buffer_data == 0 {
468 return Ok(());
469 }
470
471 assert_eq!(self.rx_buffer_skip, 0);
472 while self.rx_buffer_data > 0 {
473 if self.rx_buffer_len <= self.rx_buffer_data {
474 self.rx_buffer_data -= self.rx_buffer_len;
475 self.rx_buffer_len = 0;
476 } else {
477 self.rx_buffer.copy_within(self.rx_buffer_data.., 0);
478 self.rx_buffer_len -= self.rx_buffer_data;
479 self.rx_buffer_data = 0;
480 }
481 self.fill_rx_buffer().await?;
482 }
483
484 Ok(())
485 }
486}
diff --git a/embedded-mqtt/src/packet_id.rs b/embedded-mqtt/src/packet_id.rs
new file mode 100644
index 0000000..4e0158f
--- /dev/null
+++ b/embedded-mqtt/src/packet_id.rs
@@ -0,0 +1,14 @@
1#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2pub struct PacketId(u16);
3
4impl From<u16> for PacketId {
5 fn from(value: u16) -> Self {
6 Self(value)
7 }
8}
9
10impl From<PacketId> for u16 {
11 fn from(value: PacketId) -> Self {
12 value.0
13 }
14}
diff --git a/embedded-mqtt/src/protocol.rs b/embedded-mqtt/src/protocol.rs
new file mode 100644
index 0000000..bf77d78
--- /dev/null
+++ b/embedded-mqtt/src/protocol.rs
@@ -0,0 +1,65 @@
1pub const PACKET_TYPE_CONNECT: u8 = 1;
2pub const PACKET_TYPE_CONNACK: u8 = 2;
3pub const PACKET_TYPE_PUBLISH: u8 = 3;
4pub const PACKET_TYPE_PUBACK: u8 = 4;
5pub const PACKET_TYPE_PUBREC: u8 = 5;
6pub const PACKET_TYPE_PUBREL: u8 = 6;
7pub const PACKET_TYPE_PUBCOMP: u8 = 7;
8pub const PACKET_TYPE_SUBSCRIBE: u8 = 8;
9pub const PACKET_TYPE_SUBACK: u8 = 9;
10pub const PACKET_TYPE_UNSUBSCRIBE: u8 = 10;
11pub const PACKET_TYPE_UNSUBACK: u8 = 11;
12pub const PACKET_TYPE_PINGREQ: u8 = 12;
13pub const PACKET_TYPE_PINGRESP: u8 = 13;
14pub const PACKET_TYPE_DISCONNECT: u8 = 14;
15
16pub const PROTOCOL_NAME: &str = "MQTT";
17
18pub const PROTOCOL_LEVEL_3_1_1: u8 = 0x04;
19pub const PROTOCOL_LEVEL_5_0_0: u8 = 0x05;
20
21pub const CONNECT_FLAG_USERNAME: u8 = 1 << 7;
22pub const CONNECT_FLAG_PASSWORD: u8 = 1 << 6;
23pub const CONNECT_FLAG_WILL_RETAIN: u8 = 1 << 5;
24pub const CONNECT_FLAG_WILL_FLAG: u8 = 1 << 2;
25pub const CONNECT_FLAG_CLEAN_SESSION: u8 = 1 << 1;
26
27pub const SUBSCRIBE_HEADER_FLAGS: u8 = 0x02;
28pub const UNSUBSCRIBE_HEADER_FLAGS: u8 = 0x02;
29pub const PUBREL_HEADER_FLAGS: u8 = 0x02;
30
31pub const CONNACK_CODE_ACCEPTED: u8 = 0;
32pub const CONNACK_CODE_UNACCEPTABLE_PROTOCOL_VERSION: u8 = 1;
33pub const CONNACK_CODE_IDENTIFIER_REJECTED: u8 = 2;
34pub const CONNACK_CODE_SERVER_UNAVAILABLE: u8 = 3;
35pub const CONNACK_CODE_BAD_USERNAME_PASSWORD: u8 = 4;
36pub const CONNACK_CODE_NOT_AUTHORIZED: u8 = 5;
37
38pub const CONNACK_FLAG_SESSION_PRESENT: u8 = 0x01;
39pub const CONNACK_FLAG_RESERVED: u8 = 0xFE;
40
41pub const SUBACK_FAILURE: u8 = 0x80;
42
43pub const PUBLISH_FLAG_RETAIN: u8 = 0x01;
44pub const PUBLISH_FLAG_QOS_MASK: u8 = 0x06;
45pub const PUBLISH_FLAG_QOS_SHIFT: u8 = 1;
46pub const PUBLISH_FLAG_DUP: u8 = 0x08;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct HeaderControl {
50 pub packet_type: u8,
51 pub packet_flags: u8,
52}
53
54pub fn create_header_control(packet_type: u8, flags: u8) -> u8 {
55 assert!(packet_type & 0xF0 == 0);
56 assert!(flags & 0xF0 == 0);
57 packet_type << 4 | flags
58}
59
60pub fn split_header_control(control: u8) -> HeaderControl {
61 HeaderControl {
62 packet_type: control >> 4,
63 packet_flags: control & 0x0F,
64 }
65}
diff --git a/embedded-mqtt/src/qos.rs b/embedded-mqtt/src/qos.rs
new file mode 100644
index 0000000..0d464b4
--- /dev/null
+++ b/embedded-mqtt/src/qos.rs
@@ -0,0 +1,64 @@
1#[derive(Debug)]
2pub struct InvalidQos(u8);
3
4impl core::fmt::Display for InvalidQos {
5 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
6 write!(f, "invalid QoS value: '{}'", self.0)
7 }
8}
9
10impl core::error::Error for InvalidQos {}
11
12#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
13pub enum Qos {
14 #[default]
15 AtMostOnce,
16 AtLeastOnce,
17 ExactlyOnce,
18}
19
20impl core::fmt::Display for Qos {
21 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
22 f.write_str(match self {
23 Qos::AtMostOnce => "QoS::AtMostOnce",
24 Qos::AtLeastOnce => "Qos::AtLeastOnce",
25 Qos::ExactlyOnce => "Qos::ExactlyOnce",
26 })
27 }
28}
29
30impl Qos {
31 pub fn to_u8(&self) -> u8 {
32 match self {
33 Qos::AtMostOnce => 0,
34 Qos::AtLeastOnce => 1,
35 Qos::ExactlyOnce => 2,
36 }
37 }
38
39 pub fn from_u8(v: u8) -> Option<Self> {
40 match v {
41 0 => Some(Self::AtMostOnce),
42 1 => Some(Self::AtLeastOnce),
43 2 => Some(Self::ExactlyOnce),
44 _ => None,
45 }
46 }
47}
48
49impl TryFrom<u8> for Qos {
50 type Error = InvalidQos;
51
52 fn try_from(value: u8) -> Result<Self, Self::Error> {
53 match Self::from_u8(value) {
54 Some(v) => Ok(v),
55 None => Err(InvalidQos(value)),
56 }
57 }
58}
59
60impl From<Qos> for u8 {
61 fn from(value: Qos) -> Self {
62 value.to_u8()
63 }
64}
diff --git a/embedded-mqtt/src/rx.rs b/embedded-mqtt/src/rx.rs
new file mode 100644
index 0000000..2d2164f
--- /dev/null
+++ b/embedded-mqtt/src/rx.rs
@@ -0,0 +1,241 @@
1use crate::{ConnectCode, PacketId, Qos, protocol, varint};
2
3#[derive(Debug)]
4pub enum Error {
5 NeedMoreData,
6 InvalidPacket(&'static str),
7 UnsupportedPacket { packet_type: u8, packet_len: u32 },
8 UnknownPacket { packet_type: u8, packet_len: u32 },
9}
10
11impl core::fmt::Display for Error {
12 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
13 match self {
14 Error::NeedMoreData => f.write_str("need more data"),
15 Error::InvalidPacket(msg) => write!(f, "invalid packet: {}", msg),
16 Error::UnsupportedPacket {
17 packet_type,
18 packet_len,
19 } => write!(
20 f,
21 "unsupported packet type {} with length {}",
22 packet_type, packet_len
23 ),
24 Error::UnknownPacket {
25 packet_type,
26 packet_len,
27 } => write!(
28 f,
29 "unknown packet type {} with length {}",
30 packet_type, packet_len
31 ),
32 }
33 }
34}
35
36impl From<varint::Error> for Error {
37 fn from(value: varint::Error) -> Self {
38 match value {
39 varint::Error::NeedMoreData => Self::NeedMoreData,
40 varint::Error::InvalidVarInt => Self::InvalidPacket("invalid variable integer encoding"),
41 }
42 }
43}
44
45pub enum Packet<'a> {
46 ConnAck {
47 session_present: bool,
48 code: ConnectCode,
49 },
50 Publish {
51 topic: &'a str,
52 packet_id: Option<PacketId>,
53 qos: Qos,
54 retain: bool,
55 dup: bool,
56 data_len: usize,
57 },
58 PubAck {
59 packet_id: PacketId,
60 },
61 SubscribeAck {
62 packet_id: PacketId,
63 success: bool,
64 },
65 UnsubscribeAck {
66 packet_id: PacketId,
67 },
68}
69
70pub fn decode<'a>(buf: &'a [u8]) -> Result<(Packet<'a>, usize), Error> {
71 let mut reader = Reader::new(buf);
72 let protocol::HeaderControl {
73 packet_type,
74 packet_flags,
75 } = protocol::split_header_control(reader.read_u8()?);
76 let packet_len = reader.read_varint()?;
77
78 let packet = match packet_type {
79 protocol::PACKET_TYPE_CONNACK => {
80 let flags = reader.read_u8()?;
81 let code = ConnectCode::from(reader.read_u8()?);
82 let session_present = flags & protocol::CONNACK_FLAG_SESSION_PRESENT != 0;
83 if flags & protocol::CONNACK_FLAG_RESERVED != 0 {
84 return Err(Error::InvalidPacket("CONNACK reserved flags must be zero"));
85 }
86 Packet::ConnAck {
87 session_present,
88 code,
89 }
90 }
91 protocol::PACKET_TYPE_PUBLISH => {
92 // Extract flags from the fixed header
93 let retain = (packet_flags & protocol::PUBLISH_FLAG_RETAIN) != 0;
94 let qos_value = (packet_flags & protocol::PUBLISH_FLAG_QOS_MASK) >> protocol::PUBLISH_FLAG_QOS_SHIFT;
95 let qos = Qos::from_u8(qos_value).ok_or(Error::InvalidPacket("PUBLISH has invalid QoS value"))?;
96 let dup = (packet_flags & protocol::PUBLISH_FLAG_DUP) != 0;
97
98 // Track position after fixed header to calculate data length
99 let variable_header_start = reader.num_read();
100
101 // Read topic name
102 let topic = reader.read_len_prefix_str()?;
103
104 // Read packet ID if QoS > 0
105 let packet_id = if qos.to_u8() > 0 {
106 Some(PacketId::from(reader.read_u16()?))
107 } else {
108 None
109 };
110
111 // Calculate payload length without reading it
112 let variable_header_len = reader.num_read() - variable_header_start;
113 let data_len = (packet_len as usize)
114 .checked_sub(variable_header_len)
115 .ok_or(Error::InvalidPacket("PUBLISH remaining length is too short for headers"))?;
116
117 Packet::Publish {
118 topic,
119 packet_id,
120 qos,
121 retain,
122 dup,
123 data_len,
124 }
125 }
126 protocol::PACKET_TYPE_PUBACK => {
127 if packet_flags != 0 {
128 return Err(Error::InvalidPacket("PUBACK flags must be zero"));
129 }
130 if packet_len != 2 {
131 return Err(Error::InvalidPacket("PUBACK remaining length must be 2"));
132 }
133 let packet_id = PacketId::from(reader.read_u16()?);
134 Packet::PubAck { packet_id }
135 }
136 protocol::PACKET_TYPE_SUBACK => {
137 if packet_flags != 0 {
138 return Err(Error::InvalidPacket("SUBACK flags must be zero"));
139 }
140 if packet_len < 3 {
141 // Minimum: 2 bytes packet ID + 1 byte return code
142 return Err(Error::InvalidPacket("SUBACK remaining length must be at least 3"));
143 }
144 let packet_id = PacketId::from(reader.read_u16()?);
145 let return_code = reader.read_u8()?;
146 let success = return_code != protocol::SUBACK_FAILURE;
147 Packet::SubscribeAck { packet_id, success }
148 }
149 protocol::PACKET_TYPE_UNSUBACK => {
150 if packet_flags != 0 {
151 return Err(Error::InvalidPacket("UNSUBACK flags must be zero"));
152 }
153 if packet_len != 2 {
154 return Err(Error::InvalidPacket("UNSUBACK remaining length must be 2"));
155 }
156 let packet_id = PacketId::from(reader.read_u16()?);
157 Packet::UnsubscribeAck { packet_id }
158 }
159 protocol::PACKET_TYPE_CONNECT
160 | protocol::PACKET_TYPE_PUBREC
161 | protocol::PACKET_TYPE_PUBREL
162 | protocol::PACKET_TYPE_PUBCOMP
163 | protocol::PACKET_TYPE_DISCONNECT
164 | protocol::PACKET_TYPE_SUBSCRIBE
165 | protocol::PACKET_TYPE_UNSUBSCRIBE
166 | protocol::PACKET_TYPE_PINGREQ
167 | protocol::PACKET_TYPE_PINGRESP => {
168 return Err(Error::UnsupportedPacket {
169 packet_type,
170 packet_len,
171 });
172 }
173 _ => {
174 return Err(Error::UnknownPacket {
175 packet_type,
176 packet_len,
177 });
178 }
179 };
180
181 Ok((packet, reader.num_read()))
182}
183
184struct Reader<'a> {
185 buf: &'a [u8],
186 off: usize,
187}
188
189impl<'a> Reader<'a> {
190 fn new(buf: &'a [u8]) -> Self {
191 Self { buf, off: 0 }
192 }
193
194 fn remain(&self) -> usize {
195 self.buf.len() - self.off
196 }
197
198 fn remain_slice(&self) -> &'a [u8] {
199 &self.buf[self.off..]
200 }
201
202 fn num_read(&self) -> usize {
203 self.off
204 }
205
206 fn read_buf(&mut self, n: usize) -> Result<&'a [u8], Error> {
207 if self.remain() < n {
208 return Err(Error::NeedMoreData);
209 }
210 let v = &self.buf[self.off..self.off + n];
211 self.off += n;
212 Ok(v)
213 }
214
215 fn read_u8(&mut self) -> Result<u8, Error> {
216 let v = self.read_buf(1)?;
217 Ok(v[0])
218 }
219
220 fn read_u16(&mut self) -> Result<u16, Error> {
221 let v = self.read_buf(2)?;
222 Ok(u16::from_be_bytes([v[0], v[1]]))
223 }
224
225 fn read_len_prefix_buf(&mut self) -> Result<&'a [u8], Error> {
226 let l = self.read_u16()?;
227 let v = self.read_buf(usize::from(l))?;
228 Ok(v)
229 }
230
231 fn read_len_prefix_str(&mut self) -> Result<&'a str, Error> {
232 let v = self.read_len_prefix_buf()?;
233 Ok(str::from_utf8(v).unwrap())
234 }
235
236 fn read_varint(&mut self) -> Result<u32, Error> {
237 let (value, consumed) = varint::decode(self.remain_slice())?;
238 self.off += consumed;
239 Ok(value)
240 }
241}
diff --git a/embedded-mqtt/src/transport.rs b/embedded-mqtt/src/transport.rs
new file mode 100644
index 0000000..5a6a016
--- /dev/null
+++ b/embedded-mqtt/src/transport.rs
@@ -0,0 +1,39 @@
1use crate::{field::Field, varint};
2
3pub trait Transport: embedded_io_async::Read + embedded_io_async::Write {}
4
5impl<T> Transport for T where T: embedded_io_async::Read + embedded_io_async::Write {}
6
7pub(crate) trait TransportExt: Transport {
8 async fn write_fields(&mut self, fields: &[Field]) -> Result<(), Self::Error>;
9}
10
11impl<T> TransportExt for T
12where
13 T: Transport,
14{
15 async fn write_fields(&mut self, fields: &[Field<'_>]) -> Result<(), Self::Error> {
16 for field in fields {
17 match field {
18 Field::U8(v) => self.write_all(&[*v]).await?,
19 Field::U16(v) => self.write_all(&u16::to_be_bytes(*v)).await?,
20 Field::VarInt(v) => {
21 let (v_buf, v_len) = varint::encode(*v);
22 self.write_all(&v_buf[..v_len]).await?;
23 }
24 Field::Buffer(v) => self.write_all(v).await?,
25 Field::LenPrefixedBuffer(v) => {
26 self.write_all(&u16::to_be_bytes(u16::try_from(v.len()).unwrap()))
27 .await?;
28 self.write_all(v).await?;
29 }
30 Field::LenPrefixedString(v) => {
31 self.write_all(&u16::to_be_bytes(u16::try_from(v.len()).unwrap()))
32 .await?;
33 self.write_all(v.as_bytes()).await?;
34 }
35 }
36 }
37 Ok(())
38 }
39}
diff --git a/embedded-mqtt/src/tx.rs b/embedded-mqtt/src/tx.rs
new file mode 100644
index 0000000..ae3d641
--- /dev/null
+++ b/embedded-mqtt/src/tx.rs
@@ -0,0 +1,203 @@
1use crate::{
2 PacketId,
3 field::{self, Field, FieldBuffer},
4 protocol,
5 qos::Qos,
6};
7
8pub struct Connect<'a> {
9 pub client_id: &'a str,
10 pub clean_session: bool,
11 pub username: Option<&'a str>,
12 pub password: Option<&'a [u8]>,
13 pub will_topic: Option<&'a str>,
14 pub will_payload: Option<&'a [u8]>,
15 pub will_retain: bool,
16 pub keepalive: Option<u16>,
17}
18
19pub fn connect<'a>(buffer: &mut FieldBuffer<'a>, connect: Connect<'a>) {
20 let mut flags = 0;
21 if connect.clean_session {
22 flags |= protocol::CONNECT_FLAG_CLEAN_SESSION;
23 }
24 if connect.username.is_some() {
25 flags |= protocol::CONNECT_FLAG_USERNAME;
26 }
27 if connect.password.is_some() {
28 flags |= protocol::CONNECT_FLAG_PASSWORD;
29 }
30 if connect.will_topic.is_some() {
31 flags |= protocol::CONNECT_FLAG_WILL_FLAG;
32 }
33 if connect.will_retain {
34 flags |= protocol::CONNECT_FLAG_WILL_RETAIN;
35 }
36
37 buffer.push(Field::U8(protocol::create_header_control(
38 protocol::PACKET_TYPE_CONNECT,
39 0,
40 )));
41 buffer.push(Field::VarInt(0));
42
43 buffer.push(Field::LenPrefixedString(protocol::PROTOCOL_NAME));
44 buffer.push(Field::U8(protocol::PROTOCOL_LEVEL_3_1_1));
45 buffer.push(Field::U8(flags));
46 buffer.push(Field::U16(connect.keepalive.unwrap_or(0)));
47 buffer.push(Field::LenPrefixedString(connect.client_id));
48 if let Some(will_topic) = connect.will_topic {
49 buffer.push(Field::LenPrefixedString(will_topic));
50 buffer.push(Field::LenPrefixedBuffer(
51 connect.will_payload.unwrap_or(&[]),
52 ));
53 }
54 if let Some(username) = connect.username {
55 buffer.push(Field::LenPrefixedString(username));
56 }
57 if let Some(password) = connect.password {
58 buffer.push(Field::LenPrefixedBuffer(password));
59 }
60
61 let message_size = field::fields_size(&buffer.as_slice()[2..]);
62 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
63}
64
65pub struct Publish<'a> {
66 pub topic: &'a str,
67 pub payload: &'a [u8],
68 pub qos: Qos,
69 pub retain: bool,
70 pub dup: bool,
71 pub packet_id: Option<PacketId>,
72}
73
74pub fn publish<'a>(buffer: &mut FieldBuffer<'a>, publish: Publish<'a>) {
75 let mut flags = 0u8;
76
77 // Set QoS bits (bits 1-2)
78 flags |= (publish.qos.to_u8() & 0x03) << 1;
79
80 // Set RETAIN flag (bit 0)
81 if publish.retain {
82 flags |= 0x01;
83 }
84
85 // Set DUP flag (bit 3)
86 if publish.dup {
87 flags |= 0x08;
88 }
89
90 buffer.push(Field::U8(protocol::create_header_control(
91 protocol::PACKET_TYPE_PUBLISH,
92 flags,
93 )));
94 buffer.push(Field::VarInt(0));
95
96 buffer.push(Field::LenPrefixedString(publish.topic));
97
98 // Packet ID is only present for QoS 1 and 2
99 if publish.qos.to_u8() > 0 {
100 // TODO: turn this into a warning
101 let packet_id = publish.packet_id.expect("packet_id required for QoS > 0");
102 buffer.push(Field::U16(packet_id.into()));
103 }
104
105 buffer.push(Field::Buffer(publish.payload));
106
107 let message_size = field::fields_size(&buffer.as_slice()[2..]);
108 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
109}
110
111pub struct Subscribe<'a> {
112 pub topic: &'a str,
113 pub qos: Qos,
114 pub packet_id: PacketId,
115}
116
117pub fn subscribe<'a>(buffer: &mut FieldBuffer<'a>, subscribe: Subscribe<'a>) {
118 // SUBSCRIBE packets have fixed header flags (reserved bits)
119 buffer.push(Field::U8(protocol::create_header_control(
120 protocol::PACKET_TYPE_SUBSCRIBE,
121 protocol::SUBSCRIBE_HEADER_FLAGS,
122 )));
123 buffer.push(Field::VarInt(0));
124
125 // Variable header: packet identifier
126 buffer.push(Field::U16(subscribe.packet_id.into()));
127
128 // Payload: topic filter + QoS
129 buffer.push(Field::LenPrefixedString(subscribe.topic));
130 buffer.push(Field::U8(subscribe.qos.to_u8()));
131
132 let message_size = field::fields_size(&buffer.as_slice()[2..]);
133 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
134}
135
136pub struct Unsubscribe<'a> {
137 pub topic: &'a str,
138 pub packet_id: PacketId,
139}
140
141pub fn unsubscribe<'a>(buffer: &mut FieldBuffer<'a>, unsubscribe: Unsubscribe<'a>) {
142 // UNSUBSCRIBE packets have fixed header flags (reserved bits)
143 buffer.push(Field::U8(protocol::create_header_control(
144 protocol::PACKET_TYPE_UNSUBSCRIBE,
145 protocol::UNSUBSCRIBE_HEADER_FLAGS,
146 )));
147 buffer.push(Field::VarInt(0));
148
149 // Variable header: packet identifier
150 buffer.push(Field::U16(unsubscribe.packet_id.into()));
151
152 // Payload: topic filter (no QoS)
153 buffer.push(Field::LenPrefixedString(unsubscribe.topic));
154
155 let message_size = field::fields_size(&buffer.as_slice()[2..]);
156 buffer.set(1, Field::VarInt(u32::try_from(message_size).unwrap()));
157}
158
159pub fn disconnect(buffer: &mut FieldBuffer) {
160 // DISCONNECT has no variable header or payload
161 buffer.push(Field::U8(protocol::create_header_control(
162 protocol::PACKET_TYPE_DISCONNECT,
163 0,
164 )));
165 buffer.push(Field::VarInt(0));
166}
167
168pub fn puback(buffer: &mut FieldBuffer, packet_id: PacketId) {
169 buffer.push(Field::U8(protocol::create_header_control(
170 protocol::PACKET_TYPE_PUBACK,
171 0,
172 )));
173 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
174 buffer.push(Field::U16(packet_id.into()));
175}
176
177pub fn pubrec(buffer: &mut FieldBuffer, packet_id: PacketId) {
178 buffer.push(Field::U8(protocol::create_header_control(
179 protocol::PACKET_TYPE_PUBREC,
180 0,
181 )));
182 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
183 buffer.push(Field::U16(packet_id.into()));
184}
185
186pub fn pubrel(buffer: &mut FieldBuffer, packet_id: PacketId) {
187 buffer.push(Field::U8(protocol::create_header_control(
188 protocol::PACKET_TYPE_PUBREL,
189 protocol::PUBREL_HEADER_FLAGS,
190 )));
191 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
192 buffer.push(Field::U16(packet_id.into()));
193}
194
195pub fn pubcomp(buffer: &mut FieldBuffer, packet_id: PacketId) {
196 buffer.push(Field::U8(protocol::create_header_control(
197 protocol::PACKET_TYPE_PUBCOMP,
198 0,
199 )));
200 buffer.push(Field::VarInt(2)); // Remaining length is always 2 (packet ID)
201 buffer.push(Field::U16(packet_id.into()));
202}
203
diff --git a/embedded-mqtt/src/varint.rs b/embedded-mqtt/src/varint.rs
new file mode 100644
index 0000000..63bdd06
--- /dev/null
+++ b/embedded-mqtt/src/varint.rs
@@ -0,0 +1,69 @@
1#[derive(Debug)]
2pub enum Error {
3 NeedMoreData,
4 InvalidVarInt,
5}
6
7impl core::fmt::Display for Error {
8 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9 match self {
10 Error::NeedMoreData => f.write_str("NeedMoreData"),
11 Error::InvalidVarInt => f.write_str("InvalidVarInt"),
12 }
13 }
14}
15
16impl core::error::Error for Error {}
17
18pub fn encode(mut v: u32) -> ([u8; 4], usize) {
19 let mut encoded = [0u8; 4];
20 let mut count = 0;
21
22 loop {
23 let mut byte = (v % 128) as u8;
24 v /= 128;
25
26 if v > 0 {
27 byte |= 0x80; // Set continuation bit
28 }
29
30 encoded[count] = byte;
31 count += 1;
32
33 if v == 0 {
34 break;
35 }
36 }
37
38 (encoded, count)
39}
40
41pub fn decode(buf: &[u8]) -> Result<(u32, usize), Error> {
42 let mut value = 0u32;
43
44 let v = buf.get(0).ok_or(Error::NeedMoreData)?;
45 value |= ((v & 0x7F) as u32) << 0;
46 if v & 0x80 == 0 {
47 return Ok((value, 1));
48 }
49
50 let v = buf.get(1).ok_or(Error::NeedMoreData)?;
51 value |= ((v & 0x7F) as u32) << 7;
52 if v & 0x80 == 0 {
53 return Ok((value, 2));
54 }
55
56 let v = buf.get(2).ok_or(Error::NeedMoreData)?;
57 value |= ((v & 0x7F) as u32) << 14;
58 if v & 0x80 == 0 {
59 return Ok((value, 3));
60 }
61
62 let v = buf.get(3).ok_or(Error::NeedMoreData)?;
63 value |= ((v & 0x7F) as u32) << 21;
64 if v & 0x80 != 0 {
65 return Err(Error::InvalidVarInt);
66 }
67
68 Ok((value, 4))
69}
diff --git a/src/constants.rs b/src/constants.rs
new file mode 100644
index 0000000..a67b6fe
--- /dev/null
+++ b/src/constants.rs
@@ -0,0 +1,97 @@
1#![allow(unused)]
2
3pub const HA_DOMAIN_SENSOR: &str = "sensor";
4pub const HA_DOMAIN_BINARY_SENSOR: &str = "binary_sensor";
5pub const HA_DOMAIN_SWITCH: &str = "switch";
6pub const HA_DOMAIN_LIGHT: &str = "light";
7pub const HA_DOMAIN_BUTTON: &str = "button";
8pub const HA_DOMAIN_SELECT: &str = "select";
9
10pub const HA_DEVICE_CLASS_SENSOR_APPARENT_POWER: &str = "apparent_power";
11pub const HA_DEVICE_CLASS_SENSOR_AQI: &str = "aqi";
12pub const HA_DEVICE_CLASS_SENSOR_ATMOSPHERIC_PRESSURE: &str = "atmospheric_pressure";
13pub const HA_DEVICE_CLASS_SENSOR_BATTERY: &str = "battery";
14pub const HA_DEVICE_CLASS_SENSOR_CARBON_DIOXIDE: &str = "carbon_dioxide";
15pub const HA_DEVICE_CLASS_SENSOR_CARBON_MONOXIDE: &str = "carbon_monoxide";
16pub const HA_DEVICE_CLASS_SENSOR_CURRENT: &str = "current";
17pub const HA_DEVICE_CLASS_SENSOR_DATA_RATE: &str = "data_rate";
18pub const HA_DEVICE_CLASS_SENSOR_DATA_SIZE: &str = "data_size";
19pub const HA_DEVICE_CLASS_SENSOR_DATE: &str = "date";
20pub const HA_DEVICE_CLASS_SENSOR_DISTANCE: &str = "distance";
21pub const HA_DEVICE_CLASS_SENSOR_DURATION: &str = "duration";
22pub const HA_DEVICE_CLASS_SENSOR_ENERGY: &str = "energy";
23pub const HA_DEVICE_CLASS_SENSOR_ENERGY_STORAGE: &str = "energy_storage";
24pub const HA_DEVICE_CLASS_SENSOR_ENUM: &str = "enum";
25pub const HA_DEVICE_CLASS_SENSOR_FREQUENCY: &str = "frequency";
26pub const HA_DEVICE_CLASS_SENSOR_GAS: &str = "gas";
27pub const HA_DEVICE_CLASS_SENSOR_HUMIDITY: &str = "humidity";
28pub const HA_DEVICE_CLASS_SENSOR_ILLUMINANCE: &str = "illuminance";
29pub const HA_DEVICE_CLASS_SENSOR_IRRADIANCE: &str = "irradiance";
30pub const HA_DEVICE_CLASS_SENSOR_MOISTURE: &str = "moisture";
31pub const HA_DEVICE_CLASS_SENSOR_MONETARY: &str = "monetary";
32pub const HA_DEVICE_CLASS_SENSOR_NITROGEN_DIOXIDE: &str = "nitrogen_dioxide";
33pub const HA_DEVICE_CLASS_SENSOR_NITROGEN_MONOXIDE: &str = "nitrogen_monoxide";
34pub const HA_DEVICE_CLASS_SENSOR_NITROUS_OXIDE: &str = "nitrous_oxide";
35pub const HA_DEVICE_CLASS_SENSOR_OZONE: &str = "ozone";
36pub const HA_DEVICE_CLASS_SENSOR_PH: &str = "ph";
37pub const HA_DEVICE_CLASS_SENSOR_PM1: &str = "pm1";
38pub const HA_DEVICE_CLASS_SENSOR_PM25: &str = "pm25";
39pub const HA_DEVICE_CLASS_SENSOR_PM10: &str = "pm10";
40pub const HA_DEVICE_CLASS_SENSOR_POWER_FACTOR: &str = "power_factor";
41pub const HA_DEVICE_CLASS_SENSOR_POWER: &str = "power";
42pub const HA_DEVICE_CLASS_SENSOR_PRECIPITATION: &str = "precipitation";
43pub const HA_DEVICE_CLASS_SENSOR_PRECIPITATION_INTENSITY: &str = "precipitation_intensity";
44pub const HA_DEVICE_CLASS_SENSOR_PRESSURE: &str = "pressure";
45pub const HA_DEVICE_CLASS_SENSOR_REACTIVE_POWER: &str = "reactive_power";
46pub const HA_DEVICE_CLASS_SENSOR_SIGNAL_STRENGTH: &str = "signal_strength";
47pub const HA_DEVICE_CLASS_SENSOR_SOUND_PRESSURE: &str = "sound_pressure";
48pub const HA_DEVICE_CLASS_SENSOR_SPEED: &str = "speed";
49pub const HA_DEVICE_CLASS_SENSOR_SULPHUR_DIOXIDE: &str = "sulphur_dioxide";
50pub const HA_DEVICE_CLASS_SENSOR_TEMPERATURE: &str = "temperature";
51pub const HA_DEVICE_CLASS_SENSOR_TIMESTAMP: &str = "timestamp";
52pub const HA_DEVICE_CLASS_SENSOR_VOLATILE_ORGANIC_COMPOUNDS: &str = "volatile_organic_compounds";
53pub const HA_DEVICE_CLASS_SENSOR_VOLATILE_ORGANIC_COMPOUNDS_PARTS: &str =
54 "volatile_organic_compounds_parts";
55pub const HA_DEVICE_CLASS_SENSOR_VOLTAGE: &str = "voltage";
56pub const HA_DEVICE_CLASS_SENSOR_VOLUME: &str = "volume";
57pub const HA_DEVICE_CLASS_SENSOR_VOLUME_FLOW_RATE: &str = "volume_flow_rate";
58pub const HA_DEVICE_CLASS_SENSOR_VOLUME_STORAGE: &str = "volume_storage";
59pub const HA_DEVICE_CLASS_SENSOR_WATER: &str = "water";
60pub const HA_DEVICE_CLASS_SENSOR_WEIGHT: &str = "weight";
61pub const HA_DEVICE_CLASS_SENSOR_WIND_SPEED: &str = "wind_speed";
62
63pub const HA_DEVICE_CLASS_BINARY_SENSOR_BATTERY: &str = "battery";
64pub const HA_DEVICE_CLASS_BINARY_SENSOR_BATTERY_CHARGING: &str = "battery_charging";
65pub const HA_DEVICE_CLASS_BINARY_SENSOR_CARBON_MONOXIDE: &str = "carbon_monoxide";
66pub const HA_DEVICE_CLASS_BINARY_SENSOR_COLD: &str = "cold";
67pub const HA_DEVICE_CLASS_BINARY_SENSOR_CONNECTIVITY: &str = "connectivity";
68pub const HA_DEVICE_CLASS_BINARY_SENSOR_DOOR: &str = "door";
69pub const HA_DEVICE_CLASS_BINARY_SENSOR_GARAGE_DOOR: &str = "garage_door";
70pub const HA_DEVICE_CLASS_BINARY_SENSOR_GAS: &str = "gas";
71pub const HA_DEVICE_CLASS_BINARY_SENSOR_HEAT: &str = "heat";
72pub const HA_DEVICE_CLASS_BINARY_SENSOR_LIGHT: &str = "light";
73pub const HA_DEVICE_CLASS_BINARY_SENSOR_LOCK: &str = "lock";
74pub const HA_DEVICE_CLASS_BINARY_SENSOR_MOISTURE: &str = "moisture";
75pub const HA_DEVICE_CLASS_BINARY_SENSOR_MOTION: &str = "motion";
76pub const HA_DEVICE_CLASS_BINARY_SENSOR_MOVING: &str = "moving";
77pub const HA_DEVICE_CLASS_BINARY_SENSOR_OCCUPANCY: &str = "occupancy";
78pub const HA_DEVICE_CLASS_BINARY_SENSOR_OPENING: &str = "opening";
79pub const HA_DEVICE_CLASS_BINARY_SENSOR_PLUG: &str = "plug";
80pub const HA_DEVICE_CLASS_BINARY_SENSOR_POWER: &str = "power";
81pub const HA_DEVICE_CLASS_BINARY_SENSOR_PRESENCE: &str = "presence";
82pub const HA_DEVICE_CLASS_BINARY_SENSOR_PROBLEM: &str = "problem";
83pub const HA_DEVICE_CLASS_BINARY_SENSOR_RUNNING: &str = "running";
84pub const HA_DEVICE_CLASS_BINARY_SENSOR_SAFETY: &str = "safety";
85pub const HA_DEVICE_CLASS_BINARY_SENSOR_SMOKE: &str = "smoke";
86pub const HA_DEVICE_CLASS_BINARY_SENSOR_SOUND: &str = "sound";
87pub const HA_DEVICE_CLASS_BINARY_SENSOR_TAMPER: &str = "tamper";
88pub const HA_DEVICE_CLASS_BINARY_SENSOR_UPDATE: &str = "update";
89pub const HA_DEVICE_CLASS_BINARY_SENSOR_VIBRATION: &str = "vibration";
90pub const HA_DEVICE_CLASS_BINARY_SENSOR_WINDOW: &str = "window";
91
92pub const HA_DEVICE_CLASS_BUTTON_IDENTIFY: &str = "identify";
93pub const HA_DEVICE_CLASS_BUTTON_RESTART: &str = "restart";
94pub const HA_DEVICE_CLASS_BUTTON_UPDATE: &str = "update";
95
96pub const HA_DEVICE_CLASS_SWITCH_OUTLET: &str = "outlet";
97pub const HA_DEVICE_CLASS_SWITCH_SWITCH: &str = "switch";
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..5ecd5ea
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,967 @@
1#![no_std]
2
3use core::{
4 cell::RefCell,
5 net::SocketAddrV4,
6 sync::atomic::{AtomicBool, AtomicU32},
7 task::Waker,
8};
9
10use defmt::Format;
11use embassy_net::tcp::TcpSocket;
12use embassy_sync::waitqueue::AtomicWaker;
13use embassy_time::Timer;
14use heapless::{
15 Vec, VecView,
16 string::{String, StringView},
17};
18use serde::Serialize;
19
20mod constants;
21mod transport;
22mod unit;
23
24pub use constants::*;
25pub use transport::Transport;
26pub use unit::*;
27
28enum Unit {
29 Temperature(TemperatureUnit),
30}
31
32impl Unit {
33 fn as_str(&self) -> &'static str {
34 match self {
35 Unit::Temperature(unit) => unit.as_str(),
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub enum ComponentType {
42 Sensor,
43 BinarySensor,
44}
45
46impl core::fmt::Display for ComponentType {
47 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
48 f.write_str(self.as_str())
49 }
50}
51
52impl ComponentType {
53 fn as_str(&self) -> &'static str {
54 match self {
55 ComponentType::Sensor => "sensor",
56 ComponentType::BinarySensor => "binary_sensor",
57 }
58 }
59}
60
61// TODO: see what classes need this and defaults
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum StateClass {
64 Measurement,
65 Total,
66 TotalIncreasing,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum DeviceClass {
71 Temperature {
72 unit: TemperatureUnit,
73 },
74 Humidity {
75 unit: HumidityUnit,
76 },
77
78 // binary sensors
79 Door,
80 Window,
81 Motion,
82 Occupancy,
83 Opening,
84 Plug,
85 Presence,
86 Problem,
87 Safety,
88 Smoke,
89 Sound,
90 Vibration,
91
92 Battery {
93 unit: BatteryUnit,
94 },
95 Illuminance {
96 unit: LightUnit,
97 },
98 Pressure {
99 unit: PressureUnit,
100 },
101 Generic {
102 device_class: Option<&'static str>,
103 unit: Option<&'static str>,
104 },
105 Energy {
106 unit: EnergyUnit,
107 },
108}
109
110impl DeviceClass {
111 fn tag(&self) -> &'static str {
112 match self {
113 DeviceClass::Temperature { .. } => "temperature",
114 DeviceClass::Humidity { .. } => "humidity",
115 _ => todo!(),
116 }
117 }
118
119 fn unit_of_measurement(&self) -> Option<Unit> {
120 // TODO: fix
121 Some(Unit::Temperature(TemperatureUnit::Celcius))
122 }
123
124 fn component_type(&self) -> ComponentType {
125 match self {
126 DeviceClass::Temperature { .. } => ComponentType::Sensor,
127 DeviceClass::Humidity { .. } => ComponentType::Sensor,
128 DeviceClass::Door => ComponentType::BinarySensor,
129 DeviceClass::Window => ComponentType::BinarySensor,
130 _ => todo!(),
131 }
132 }
133}
134
135pub trait Entity {
136 // TODO: possibly collapse all these functions into a single one that returns a struct
137 fn id(&self) -> &'static str;
138 fn name(&self) -> &'static str;
139 fn device_class(&self) -> DeviceClass;
140 fn register_waker(&self, waker: &Waker);
141 fn value(&self) -> Option<StateValue>;
142}
143
144// TODO: figure out proper atomic orderings
145
146struct StateContainer {
147 dirty: AtomicBool,
148 waker: AtomicWaker,
149 value: StateContainerValue,
150}
151
152impl StateContainer {
153 const fn new(value: StateContainerValue) -> Self {
154 Self {
155 dirty: AtomicBool::new(false),
156 waker: AtomicWaker::new(),
157 value,
158 }
159 }
160
161 pub const fn new_u32() -> Self {
162 Self::new(StateContainerValue::U32(AtomicU32::new(0)))
163 }
164
165 pub const fn new_f32() -> Self {
166 Self::new(StateContainerValue::F32(AtomicU32::new(0)))
167 }
168}
169
170enum StateContainerValue {
171 U32(AtomicU32),
172 F32(AtomicU32),
173}
174
175pub enum StateValue {
176 U32(u32),
177 F32(f32),
178}
179
180#[derive(Debug, Format, Clone, Copy, Serialize)]
181struct DeviceDiscovery<'a> {
182 identifiers: &'a [&'a str],
183 name: &'a str,
184 manufacturer: &'a str,
185 model: &'a str,
186}
187
188pub enum SensorKind {
189 Generic,
190 Temperature { unit: TemperatureUnit },
191 Humidity { unit: HumidityUnit },
192 // TODO: complete
193}
194
195impl SensorKind {
196 fn as_str(&self) -> &'static str {
197 match self {
198 SensorKind::Generic => "sensor",
199 SensorKind::Temperature { .. } => "temperature",
200 SensorKind::Humidity { .. } => "humidity",
201 }
202 }
203}
204
205impl core::fmt::Display for SensorKind {
206 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
207 f.write_str(self.as_str())
208 }
209}
210
211enum BinarySensorKind {
212 Generic,
213 Motion,
214 Door,
215 Window,
216 Occupancy,
217 // TODO: complete
218}
219
220impl BinarySensorKind {
221 fn as_str(&self) -> &'static str {
222 match self {
223 BinarySensorKind::Generic => "binary_sensor",
224 BinarySensorKind::Motion => "motion",
225 BinarySensorKind::Door => "door",
226 BinarySensorKind::Window => "window",
227 BinarySensorKind::Occupancy => "occupancy",
228 }
229 }
230}
231
232impl core::fmt::Display for BinarySensorKind {
233 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
234 f.write_str(self.as_str())
235 }
236}
237
238enum SwitchKind {
239 Generic,
240 Outlet,
241 Switch,
242}
243
244impl SwitchKind {
245 fn as_str(&self) -> &'static str {
246 match self {
247 SwitchKind::Generic => "switch",
248 SwitchKind::Outlet => "outlet",
249 SwitchKind::Switch => "switch",
250 }
251 }
252}
253
254impl core::fmt::Display for SwitchKind {
255 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
256 f.write_str(self.as_str())
257 }
258}
259
260enum ButtonKind {
261 Generic,
262 Identify,
263 Restart,
264 Update,
265}
266
267impl ButtonKind {
268 fn as_str(&self) -> &'static str {
269 match self {
270 ButtonKind::Generic => "button",
271 ButtonKind::Identify => "identify",
272 ButtonKind::Restart => "restart",
273 ButtonKind::Update => "update",
274 }
275 }
276}
277
278impl core::fmt::Display for ButtonKind {
279 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
280 f.write_str(self.as_str())
281 }
282}
283
284enum NumberKind {
285 Generic,
286 // TODO: alot of different ones
287 // https://www.home-assistant.io/integrations/number
288}
289
290impl NumberKind {
291 fn as_str(&self) -> &'static str {
292 match self {
293 NumberKind::Generic => "number",
294 }
295 }
296}
297
298impl core::fmt::Display for NumberKind {
299 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
300 f.write_str(self.as_str())
301 }
302}
303
304// this is called the component type in the ha api
305pub enum EntityDomain {
306 Sensor(SensorKind),
307 BinarySensor(BinarySensorKind),
308 Switch(SwitchKind),
309 Light,
310 Button(ButtonKind),
311 Select,
312}
313
314impl core::fmt::Display for EntityDomain {
315 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
316 f.write_str(self.as_str())
317 }
318}
319
320impl EntityDomain {
321 fn as_str(&self) -> &'static str {
322 match self {
323 EntityDomain::Sensor(_) => "sensor",
324 EntityDomain::BinarySensor(_) => "binary_sensor",
325 EntityDomain::Switch(_) => "switch",
326 EntityDomain::Light => "light",
327 EntityDomain::Button(_) => "button",
328 EntityDomain::Select => "select",
329 }
330 }
331}
332
333#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
334enum EntityCategory {
335 Config,
336 Diagnostic,
337}
338
339#[derive(Debug, Format, Serialize)]
340struct EntityDiscovery<'a> {
341 #[serde(rename = "unique_id")]
342 id: &'a str,
343
344 name: &'a str,
345
346 #[serde(skip_serializing_if = "Option::is_none")]
347 device_class: Option<&'a str>,
348
349 #[serde(skip_serializing_if = "Option::is_none")]
350 state_topic: Option<&'a str>,
351
352 #[serde(skip_serializing_if = "Option::is_none")]
353 command_topic: Option<&'a str>,
354
355 #[serde(skip_serializing_if = "Option::is_none")]
356 unit_of_measurement: Option<&'a str>,
357
358 #[serde(skip_serializing_if = "Option::is_none")]
359 schema: Option<&'a str>,
360
361 #[serde(skip_serializing_if = "Option::is_none")]
362 state_class: Option<&'a str>,
363
364 #[serde(skip_serializing_if = "Option::is_none")]
365 icon: Option<&'a str>,
366
367 device: &'a DeviceDiscovery<'a>,
368}
369
370struct DiscoveryTopicDisplay<'a> {
371 domain: &'a str,
372 device_id: &'a str,
373 entity_id: &'a str,
374}
375
376impl<'a> core::fmt::Display for DiscoveryTopicDisplay<'a> {
377 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
378 write!(
379 f,
380 "homeassistant/{}/{}_{}/config",
381 self.domain, self.device_id, self.entity_id
382 )
383 }
384}
385
386struct StateTopicDisplay<'a> {
387 device_id: &'a str,
388 entity_id: &'a str,
389}
390
391impl<'a> core::fmt::Display for StateTopicDisplay<'a> {
392 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
393 write!(f, "embassy-ha/{}/{}/state", self.device_id, self.entity_id)
394 }
395}
396
397struct CommandTopicDisplay<'a> {
398 device_id: &'a str,
399 entity_id: &'a str,
400}
401
402impl<'a> core::fmt::Display for CommandTopicDisplay<'a> {
403 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
404 write!(
405 f,
406 "embassy-ha/{}/{}/command",
407 self.device_id, self.entity_id
408 )
409 }
410}
411
412pub struct DeviceConfig {
413 pub device_id: &'static str,
414 pub device_name: &'static str,
415 pub manufacturer: &'static str,
416 pub model: &'static str,
417}
418
419pub struct DeviceResources {
420 waker: AtomicWaker,
421 entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT],
422
423 mqtt_resources: embedded_mqtt::ClientResources,
424 publish_buffer: Vec<u8, 2048>,
425 subscribe_buffer: Vec<u8, 128>,
426 discovery_buffer: Vec<u8, 2048>,
427 discovery_topic_buffer: String<128>,
428 state_topic_buffer: String<128>,
429 command_topic_buffer: String<128>,
430}
431
432impl DeviceResources {
433 const RX_BUFFER_LEN: usize = 2048;
434 const TX_BUFFER_LEN: usize = 2048;
435 const ENTITY_LIMIT: usize = 16;
436}
437
438impl Default for DeviceResources {
439 fn default() -> Self {
440 Self {
441 waker: AtomicWaker::new(),
442 entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT],
443
444 mqtt_resources: Default::default(),
445 publish_buffer: Default::default(),
446 subscribe_buffer: Default::default(),
447 discovery_buffer: Default::default(),
448 discovery_topic_buffer: Default::default(),
449 state_topic_buffer: Default::default(),
450 command_topic_buffer: Default::default(),
451 }
452 }
453}
454
455pub struct TemperatureSensor<'a>(Entity2<'a>);
456
457impl<'a> TemperatureSensor<'a> {
458 pub fn publish(&mut self, temperature: f32) {
459 use core::fmt::Write;
460 self.0
461 .publish_with(|view| write!(view, "{}", temperature).unwrap());
462 }
463}
464
465pub struct Button<'a>(Entity2<'a>);
466
467impl<'a> Button<'a> {
468 pub async fn pressed(&mut self) {
469 self.0.wait_command().await;
470 }
471}
472
473pub struct EntityConfig {
474 pub id: &'static str,
475 pub name: &'static str,
476 pub domain: &'static str,
477 pub device_class: Option<&'static str>,
478 pub measurement_unit: Option<&'static str>,
479 pub icon: Option<&'static str>,
480 pub category: Option<&'static str>,
481 pub state_class: Option<&'static str>,
482 pub schema: Option<&'static str>,
483}
484
485struct EntityData {
486 config: EntityConfig,
487 publish_dirty: bool,
488 publish_value: heapless::Vec<u8, 64>,
489 command_dirty: bool,
490 command_value: heapless::Vec<u8, 64>,
491 command_wait_waker: Option<Waker>,
492}
493
494pub struct Entity2<'a> {
495 data: &'a RefCell<Option<EntityData>>,
496 waker: &'a AtomicWaker,
497}
498
499impl<'a> Entity2<'a> {
500 pub fn publish(&mut self, payload: &[u8]) {
501 self.publish_with(|view| view.extend_from_slice(payload).unwrap());
502 }
503
504 pub fn publish_with<F>(&mut self, f: F)
505 where
506 F: FnOnce(&mut VecView<u8>),
507 {
508 self.with_data(move |data| {
509 data.publish_value.clear();
510 f(data.publish_value.as_mut_view());
511 data.publish_dirty = true;
512 });
513 self.waker.wake();
514 }
515
516 pub async fn wait_command(&mut self) {
517 struct Fut<'a, 'b>(&'a mut Entity2<'b>);
518
519 impl<'a, 'b> core::future::Future for Fut<'a, 'b> {
520 type Output = ();
521
522 fn poll(
523 mut self: core::pin::Pin<&mut Self>,
524 cx: &mut core::task::Context<'_>,
525 ) -> core::task::Poll<Self::Output> {
526 let this = &mut self.as_mut().0;
527 this.with_data(|data| {
528 let dirty = data.command_dirty;
529 if dirty {
530 data.command_dirty = false;
531 data.command_wait_waker = None;
532 core::task::Poll::Ready(())
533 } else {
534 // TODO: avoid clone if waker would wake
535 data.command_wait_waker = Some(cx.waker().clone());
536 core::task::Poll::Pending
537 }
538 })
539 }
540 }
541
542 Fut(self).await
543 }
544
545 pub fn with_command<F, R>(&mut self, f: F) -> R
546 where
547 F: FnOnce(&[u8]) -> R,
548 {
549 self.with_data(|data| f(data.command_value.as_slice()))
550 }
551
552 fn with_data<F, R>(&self, f: F) -> R
553 where
554 F: FnOnce(&mut EntityData) -> R,
555 {
556 f(self.data.borrow_mut().as_mut().unwrap())
557 }
558}
559
560pub struct Device<'a> {
561 config: DeviceConfig,
562
563 // resources
564 waker: &'a AtomicWaker,
565 entities: &'a [RefCell<Option<EntityData>>],
566
567 mqtt_resources: &'a mut embedded_mqtt::ClientResources,
568 publish_buffer: &'a mut VecView<u8>,
569 subscribe_buffer: &'a mut VecView<u8>,
570 discovery_buffer: &'a mut VecView<u8>,
571 discovery_topic_buffer: &'a mut StringView,
572 state_topic_buffer: &'a mut StringView,
573 command_topic_buffer: &'a mut StringView,
574}
575
576impl<'a> Device<'a> {
577 pub fn new(resources: &'a mut DeviceResources, config: DeviceConfig) -> Self {
578 Self {
579 config,
580 waker: &resources.waker,
581 entities: &resources.entities,
582
583 mqtt_resources: &mut resources.mqtt_resources,
584 publish_buffer: &mut resources.publish_buffer,
585 subscribe_buffer: &mut resources.subscribe_buffer,
586 discovery_buffer: &mut resources.discovery_buffer,
587 discovery_topic_buffer: &mut resources.discovery_topic_buffer,
588 state_topic_buffer: &mut resources.state_topic_buffer,
589 command_topic_buffer: &mut resources.command_topic_buffer,
590 }
591 }
592
593 pub fn create_entity(&self, config: EntityConfig) -> Entity2<'a> {
594 let index = 'outer: {
595 for idx in 0..self.entities.len() {
596 if self.entities[idx].borrow().is_none() {
597 break 'outer idx;
598 }
599 }
600 panic!("device entity limit reached");
601 };
602
603 let data = EntityData {
604 config,
605 publish_dirty: false,
606 publish_value: Default::default(),
607 command_dirty: false,
608 command_value: Default::default(),
609 command_wait_waker: None,
610 };
611 self.entities[index].replace(Some(data));
612
613 Entity2 {
614 data: &self.entities[index],
615 waker: self.waker,
616 }
617 }
618
619 pub fn create_temperature_sensor(
620 &self,
621 id: &'static str,
622 name: &'static str,
623 unit: TemperatureUnit,
624 ) -> TemperatureSensor<'a> {
625 let entity = self.create_entity(EntityConfig {
626 id,
627 name,
628 domain: HA_DOMAIN_SENSOR,
629 device_class: Some(HA_DEVICE_CLASS_SENSOR_TEMPERATURE),
630 measurement_unit: Some(unit.as_str()),
631 icon: None,
632 category: None,
633 state_class: None,
634 schema: None,
635 });
636 TemperatureSensor(entity)
637 }
638
639 pub fn create_button(&self, id: &'static str, name: &'static str) -> Button<'a> {
640 let entity = self.create_entity(EntityConfig {
641 id,
642 name,
643 domain: HA_DOMAIN_BUTTON,
644 device_class: None,
645 measurement_unit: None,
646 icon: None,
647 category: None,
648 state_class: None,
649 schema: None,
650 });
651 Button(entity)
652 }
653
654 pub async fn run<T: Transport>(&mut self, transport: &mut T) -> ! {
655 loop {
656 self.run_iteration(&mut *transport).await;
657 Timer::after_millis(5000).await;
658 }
659 }
660
661 async fn run_iteration<T: Transport>(&mut self, transport: T) {
662 let mut client = embedded_mqtt::Client::new(self.mqtt_resources, transport);
663 client.connect("embassy-ha-client-id").await.unwrap();
664
665 defmt::info!("sending discover messages");
666 let device_discovery = DeviceDiscovery {
667 identifiers: &[self.config.device_id],
668 name: self.config.device_name,
669 manufacturer: self.config.manufacturer,
670 model: self.config.model,
671 };
672
673 for entity in self.entities {
674 use core::fmt::Write;
675
676 self.publish_buffer.clear();
677 self.subscribe_buffer.clear();
678 self.discovery_buffer.clear();
679 self.discovery_topic_buffer.clear();
680 self.state_topic_buffer.clear();
681 self.command_topic_buffer.clear();
682
683 // borrow the entity and fill out the buffers to be sent
684 // this should be done inside a block so that we do not hold the RefMut across an
685 // await
686 {
687 let mut entity = entity.borrow_mut();
688 let entity = match entity.as_mut() {
689 Some(entity) => entity,
690 None => break,
691 };
692 let entity_config = &entity.config;
693
694 write!(
695 self.discovery_topic_buffer,
696 "{}",
697 DiscoveryTopicDisplay {
698 domain: entity_config.domain,
699 device_id: self.config.device_id,
700 entity_id: entity_config.id,
701 }
702 )
703 .unwrap();
704
705 write!(
706 self.state_topic_buffer,
707 "{}",
708 StateTopicDisplay {
709 device_id: self.config.device_id,
710 entity_id: entity_config.id
711 }
712 )
713 .unwrap();
714
715 write!(
716 self.command_topic_buffer,
717 "{}",
718 CommandTopicDisplay {
719 device_id: self.config.device_id,
720 entity_id: entity_config.id
721 }
722 )
723 .unwrap();
724
725 let discovery = EntityDiscovery {
726 id: entity_config.id,
727 name: entity_config.name,
728 device_class: entity_config.device_class,
729 state_topic: Some(self.state_topic_buffer.as_str()),
730 command_topic: Some(self.command_topic_buffer.as_str()),
731 unit_of_measurement: entity_config.measurement_unit,
732 schema: entity_config.schema,
733 state_class: entity_config.state_class,
734 icon: entity_config.icon,
735 device: &device_discovery,
736 };
737 defmt::info!("discovery: {}", discovery);
738
739 self.discovery_buffer
740 .resize(self.discovery_buffer.capacity(), 0)
741 .unwrap();
742 let n = serde_json_core::to_slice(&discovery, &mut self.discovery_buffer).unwrap();
743 self.discovery_buffer.truncate(n);
744 }
745
746 defmt::info!(
747 "sending discovery to {}",
748 self.discovery_topic_buffer.as_str()
749 );
750 client
751 .publish(&self.discovery_topic_buffer, &self.discovery_buffer)
752 .await
753 .unwrap();
754 client.subscribe(&self.command_topic_buffer).await.unwrap();
755 }
756
757 loop {
758 use core::fmt::Write;
759
760 for entity in self.entities {
761 {
762 let mut entity = entity.borrow_mut();
763 let entity = match entity.as_mut() {
764 Some(entity) => entity,
765 None => break,
766 };
767
768 if !entity.publish_dirty {
769 continue;
770 }
771
772 entity.publish_dirty = false;
773
774 self.state_topic_buffer.clear();
775 write!(
776 self.state_topic_buffer,
777 "{}",
778 StateTopicDisplay {
779 device_id: self.config.device_id,
780 entity_id: entity.config.id
781 }
782 )
783 .unwrap();
784
785 self.publish_buffer.clear();
786 self.publish_buffer
787 .extend_from_slice(entity.publish_value.as_slice())
788 .unwrap();
789 }
790
791 client
792 .publish(&self.state_topic_buffer, self.publish_buffer)
793 .await
794 .unwrap();
795 }
796
797 let receive = client.receive();
798 let waker = wait_on_atomic_waker(self.waker);
799 match embassy_futures::select::select(receive, waker).await {
800 embassy_futures::select::Either::First(packet) => {
801 let packet = packet.unwrap();
802 let mut read_buffer = [0u8; 128];
803 if let embedded_mqtt::Packet::Publish(publish) = packet {
804 if publish.data_len > 128 {
805 defmt::warn!("mqtt publish payload too large, ignoring message");
806 } else {
807 let b = &mut read_buffer[..publish.data_len];
808 client.receive_data(b).await.unwrap();
809 defmt::info!("receive value {}", str::from_utf8(b).unwrap());
810 for entity in self.entities {
811 let mut entity = entity.borrow_mut();
812 if let Some(entity) = entity.as_mut() {
813 entity.command_dirty = true;
814 entity.command_value.clear();
815 entity.command_value.extend_from_slice(b"ON").unwrap();
816 if let Some(ref waker) = entity.command_wait_waker {
817 waker.wake_by_ref();
818 }
819 }
820 }
821 }
822 }
823 }
824 embassy_futures::select::Either::Second(_) => {}
825 }
826 }
827 }
828}
829
830async fn wait_on_atomic_waker(waker: &AtomicWaker) {
831 struct F<'a>(&'a AtomicWaker, bool);
832 impl<'a> core::future::Future for F<'a> {
833 type Output = ();
834
835 fn poll(
836 self: core::pin::Pin<&mut Self>,
837 cx: &mut core::task::Context<'_>,
838 ) -> core::task::Poll<Self::Output> {
839 if !self.1 {
840 self.0.register(cx.waker());
841 self.get_mut().1 = true;
842 core::task::Poll::Pending
843 } else {
844 core::task::Poll::Ready(())
845 }
846 }
847 }
848 F(waker, false).await
849}
850
851/*
852 Step-by-Step Process
853
854 1. What are you measuring/controlling?
855
856 Start with the physical thing:
857 - "I want to measure temperature"
858 - "I want to detect if a door is open"
859 - "I want to control a relay"
860 - "I want a button to restart the device"
861
862 2. Pick the component type based on behavior
863
864 Ask yourself:
865 - Is it read-only or controllable?
866 - Does it have numeric values or on/off states?
867
868 Decision tree:
869 Read-only measurement?
870 ├─ Numeric value (23.5, 65%, etc.)
871 │ └─ Component: sensor
872 └─ On/off state (open/closed, detected/not detected)
873 └─ Component: binary_sensor
874
875 Controllable?
876 ├─ On/off control
877 │ └─ Component: switch (or light for LEDs)
878 ├─ Adjustable number
879 │ └─ Component: number
880 ├─ Select from options
881 │ └─ Component: select
882 └─ Trigger action (no state)
883 └─ Component: button
884
885 3. Pick the device_class (if applicable)
886
887 Now look at the component type you chose:
888
889 For sensor - What kind of measurement?
890 - Temperature → device_class: "temperature"
891 - Humidity → device_class: "humidity"
892 - Pressure → device_class: "pressure"
893 - Custom metric → device_class: None
894
895 For binary_sensor - What kind of detection?
896 - Door → device_class: "door"
897 - Motion → device_class: "motion"
898 - Window → device_class: "window"
899 - Generic → device_class: None
900
901 For button - No device_class needed!
902
903 4. Pick units (if applicable)
904
905 Based on your device_class:
906 - Temperature → "°C" or "°F"
907 - Humidity → "%"
908 - Pressure → "hPa"
909
910 Examples
911
912 Example 1: DHT22 Temperature Reading
913
914 1. What? → Measure temperature
915 2. Component? → sensor (numeric, read-only)
916 3. Device class? → "temperature"
917 4. Unit? → "°C"
918
919 Result:
920 - Discovery: homeassistant/sensor/pico2w_temp/config
921 - JSON: device_class: "temperature", unit_of_measurement: "°C"
922
923 Example 2: Reed Switch on Door
924
925 1. What? → Detect door open/closed
926 2. Component? → binary_sensor (on/off state, read-only)
927 3. Device class? → "door"
928 4. Unit? → N/A
929
930 Result:
931 - Discovery: homeassistant/binary_sensor/pico2w_door/config
932 - JSON: device_class: "door"
933
934 Example 3: Relay Control
935
936 1. What? → Control a relay
937 2. Component? → switch (on/off, controllable)
938 3. Device class? → None (switches typically don't have device_class)
939 4. Unit? → N/A
940
941 Result:
942 - Discovery: homeassistant/switch/pico2w_relay/config
943 - JSON: No device_class needed
944
945 Example 4: Restart Button
946
947 1. What? → Trigger device restart
948 2. Component? → button (action trigger, no state)
949 3. Device class? → None (buttons don't have device_class)
950 4. Unit? → N/A
951
952 Result:
953 - Discovery: homeassistant/button/pico2w_restart/config
954 - JSON: No device_class, no state_topic
955
956 TL;DR Workflow
957
958 Physical thing
959
960 Component type (behavior: read-only numeric? binary? controllable?)
961
962 Device class (what specific type?)
963
964 Units (if numeric)
965
966 Does this mental model make sense now?
967*/
diff --git a/src/transport.rs b/src/transport.rs
new file mode 100644
index 0000000..5214b37
--- /dev/null
+++ b/src/transport.rs
@@ -0,0 +1,3 @@
1pub trait Transport: embedded_io_async::Read + embedded_io_async::Write {}
2
3impl<T> Transport for T where T: embedded_io_async::Read + embedded_io_async::Write {}
diff --git a/src/unit.rs b/src/unit.rs
new file mode 100644
index 0000000..4f3ca19
--- /dev/null
+++ b/src/unit.rs
@@ -0,0 +1,45 @@
1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2pub enum TemperatureUnit {
3 Celcius,
4 Kelvin,
5 Fahrenheit,
6 Other(&'static str),
7}
8
9impl TemperatureUnit {
10 pub fn as_str(&self) -> &'static str {
11 // TODO: improve
12 match self {
13 TemperatureUnit::Celcius => "C",
14 TemperatureUnit::Kelvin => "k",
15 TemperatureUnit::Fahrenheit => "F",
16 TemperatureUnit::Other(other) => other,
17 }
18 }
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum HumidityUnit {
23 Percentage,
24 Other(&'static str),
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum BatteryUnit {
29 Percentage,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum LightUnit {
34 Lux,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum PressureUnit {
39 HectoPascal,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum EnergyUnit {
44 KiloWattHour,
45}