aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/constant-temperature.rs171
1 files changed, 171 insertions, 0 deletions
diff --git a/examples/constant-temperature.rs b/examples/constant-temperature.rs
new file mode 100644
index 0000000..27e1076
--- /dev/null
+++ b/examples/constant-temperature.rs
@@ -0,0 +1,171 @@
1use std::{
2 io::{Read, Write},
3 net::{TcpStream, ToSocketAddrs},
4 sync::{Arc, Mutex},
5 thread::JoinHandle,
6};
7
8use embassy_executor::{Executor, Spawner};
9use embassy_sync::waitqueue::AtomicWaker;
10use embassy_time::Timer;
11use static_cell::StaticCell;
12
13static EXECUTOR: StaticCell<Executor> = StaticCell::new();
14static RESOURCES: StaticCell<embassy_ha::DeviceResources> = StaticCell::new();
15
16struct AsyncTcp {
17 write_handle: JoinHandle<()>,
18 write_buffer: Arc<Mutex<Vec<u8>>>,
19 read_buffer: Arc<Mutex<Vec<u8>>>,
20 waker: Arc<AtomicWaker>,
21}
22
23impl AsyncTcp {
24 fn connect(addr: impl ToSocketAddrs) -> Self {
25 let stream = TcpStream::connect(addr).expect("failed to connect to remote");
26 let mut read_stream = stream.try_clone().unwrap();
27 let mut write_stream = stream;
28
29 let read_buffer: Arc<Mutex<Vec<u8>>> = Default::default();
30 let write_buffer: Arc<Mutex<Vec<u8>>> = Default::default();
31
32 let waker = Arc::new(AtomicWaker::new());
33
34 let write_handle = std::thread::spawn({
35 let write_buffer = write_buffer.clone();
36 move || {
37 loop {
38 let buffer = {
39 let mut buffer = write_buffer.lock().unwrap();
40 std::mem::take(&mut *buffer)
41 };
42 if !buffer.is_empty() {
43 println!("writing {} bytes", buffer.len());
44 write_stream.write_all(&buffer).unwrap();
45 write_stream.flush().unwrap();
46 } else {
47 std::thread::park();
48 }
49 }
50 }
51 });
52
53 std::thread::spawn({
54 let read_buffer = read_buffer.clone();
55 let waker = waker.clone();
56 move || {
57 let mut scratch = [0u8; 1024];
58 loop {
59 let n = read_stream.read(&mut scratch).unwrap();
60 if n == 0 {
61 panic!("EOF");
62 }
63
64 {
65 let mut buffer = read_buffer.lock().unwrap();
66 buffer.extend_from_slice(&scratch[..n]);
67 waker.wake();
68 }
69 }
70 }
71 });
72
73 Self {
74 write_handle,
75 write_buffer,
76 read_buffer,
77 waker,
78 }
79 }
80}
81
82impl embedded_io_async::ErrorType for AsyncTcp {
83 type Error = std::io::Error;
84}
85
86impl embedded_io_async::Write for AsyncTcp {
87 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
88 {
89 let mut buffer = self.write_buffer.lock().unwrap();
90 buffer.extend_from_slice(buf);
91 }
92 self.write_handle.thread().unpark();
93 Ok(buf.len())
94 }
95}
96
97impl embedded_io_async::Read for AsyncTcp {
98 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
99 struct WaitForWaker<'a>(&'a AtomicWaker, bool);
100
101 impl<'a> Future for WaitForWaker<'a> {
102 type Output = ();
103
104 fn poll(
105 mut self: std::pin::Pin<&mut Self>,
106 cx: &mut std::task::Context<'_>,
107 ) -> std::task::Poll<Self::Output> {
108 if self.1 {
109 std::task::Poll::Ready(())
110 } else {
111 self.as_mut().1 = true;
112 self.0.register(cx.waker());
113 std::task::Poll::Pending
114 }
115 }
116 }
117
118 loop {
119 {
120 let mut buffer = self.read_buffer.lock().unwrap();
121 if !buffer.is_empty() {
122 let copy_n = buf.len().min(buffer.len());
123 buf[..copy_n].copy_from_slice(&buffer[..copy_n]);
124 buffer.drain(..copy_n);
125 return Ok(copy_n);
126 }
127 }
128 WaitForWaker(&self.waker, false).await
129 }
130 }
131}
132
133#[embassy_executor::task]
134async fn main_task(spawner: Spawner) {
135 let mut stream = AsyncTcp::connect("mqtt.d464.sh:1883");
136
137 let mut device = embassy_ha::Device::new(
138 RESOURCES.init(Default::default()),
139 embassy_ha::DeviceConfig {
140 device_id: "example-device-id",
141 device_name: "Example Device Name",
142 manufacturer: "Example Device Manufacturer",
143 model: "Example Device Model",
144 },
145 );
146
147 let temperature_sensor = device.create_temperature_sensor(
148 "temperature-sensor-id",
149 "Temperature Sensor Name",
150 embassy_ha::TemperatureUnit::Celcius,
151 );
152
153 spawner.must_spawn(temperature(temperature_sensor));
154
155 device.run(&mut stream).await;
156}
157
158#[embassy_executor::task]
159async fn temperature(mut sensor: embassy_ha::TemperatureSensor<'static>) {
160 loop {
161 sensor.publish(42.0);
162 Timer::after_secs(1).await;
163 }
164}
165
166fn main() {
167 let executor = EXECUTOR.init(Executor::new());
168 executor.run(|spawner| {
169 spawner.must_spawn(main_task(spawner));
170 });
171}