aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2022-08-03 13:55:46 +0200
committerDario Nieuwenhuis <[email protected]>2022-08-03 13:55:46 +0200
commit3967c4194b1f28d2179fd30ec7fa688bcf23ab1b (patch)
tree015391fb93a3e96eeb0b09ce259506684638b5a9
parent1924f2d67d32a4466e71ef0aabc84305a9e8e165 (diff)
util: add pipe
-rw-r--r--embassy-util/src/lib.rs4
-rw-r--r--embassy-util/src/pipe.rs413
-rw-r--r--embassy-util/src/ring_buffer.rs146
3 files changed, 563 insertions, 0 deletions
diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs
index 07b1633ea..a65ebd518 100644
--- a/embassy-util/src/lib.rs
+++ b/embassy-util/src/lib.rs
@@ -8,9 +8,13 @@
8// This mod MUST go first, so that the others see its macros. 8// This mod MUST go first, so that the others see its macros.
9pub(crate) mod fmt; 9pub(crate) mod fmt;
10 10
11// internal use
12mod ring_buffer;
13
11pub mod blocking_mutex; 14pub mod blocking_mutex;
12pub mod channel; 15pub mod channel;
13pub mod mutex; 16pub mod mutex;
17pub mod pipe;
14pub mod waitqueue; 18pub mod waitqueue;
15 19
16mod forever; 20mod forever;
diff --git a/embassy-util/src/pipe.rs b/embassy-util/src/pipe.rs
new file mode 100644
index 000000000..e4f21732a
--- /dev/null
+++ b/embassy-util/src/pipe.rs
@@ -0,0 +1,413 @@
1//! Async byte stream pipe.
2
3use core::cell::RefCell;
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use crate::blocking_mutex::raw::RawMutex;
9use crate::blocking_mutex::Mutex;
10use crate::ring_buffer::RingBuffer;
11use crate::waitqueue::WakerRegistration;
12
13/// Write-only access to a [`Pipe`].
14#[derive(Copy)]
15pub struct Writer<'p, M, const N: usize>
16where
17 M: RawMutex,
18{
19 pipe: &'p Pipe<M, N>,
20}
21
22impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
23where
24 M: RawMutex,
25{
26 fn clone(&self) -> Self {
27 Writer { pipe: self.pipe }
28 }
29}
30
31impl<'p, M, const N: usize> Writer<'p, M, N>
32where
33 M: RawMutex,
34{
35 /// Writes a value.
36 ///
37 /// See [`Pipe::write()`]
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
39 self.pipe.write(buf)
40 }
41
42 /// Attempt to immediately write a message.
43 ///
44 /// See [`Pipe::write()`]
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
46 self.pipe.try_write(buf)
47 }
48}
49
50/// Future returned by [`Pipe::write`] and [`Writer::write`].
51pub struct WriteFuture<'p, M, const N: usize>
52where
53 M: RawMutex,
54{
55 pipe: &'p Pipe<M, N>,
56 buf: &'p [u8],
57}
58
59impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
60where
61 M: RawMutex,
62{
63 type Output = usize;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 match self.pipe.try_write_with_context(Some(cx), self.buf) {
67 Ok(n) => Poll::Ready(n),
68 Err(TryWriteError::Full) => Poll::Pending,
69 }
70 }
71}
72
73impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
74
75/// Read-only access to a [`Pipe`].
76#[derive(Copy)]
77pub struct Reader<'p, M, const N: usize>
78where
79 M: RawMutex,
80{
81 pipe: &'p Pipe<M, N>,
82}
83
84impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
85where
86 M: RawMutex,
87{
88 fn clone(&self) -> Self {
89 Reader { pipe: self.pipe }
90 }
91}
92
93impl<'p, M, const N: usize> Reader<'p, M, N>
94where
95 M: RawMutex,
96{
97 /// Reads a value.
98 ///
99 /// See [`Pipe::read()`]
100 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
101 self.pipe.read(buf)
102 }
103
104 /// Attempt to immediately read a message.
105 ///
106 /// See [`Pipe::read()`]
107 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
108 self.pipe.try_read(buf)
109 }
110}
111
112/// Future returned by [`Pipe::read`] and [`Reader::read`].
113pub struct ReadFuture<'p, M, const N: usize>
114where
115 M: RawMutex,
116{
117 pipe: &'p Pipe<M, N>,
118 buf: &'p mut [u8],
119}
120
121impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
122where
123 M: RawMutex,
124{
125 type Output = usize;
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 match self.pipe.try_read_with_context(Some(cx), self.buf) {
129 Ok(n) => Poll::Ready(n),
130 Err(TryReadError::Empty) => Poll::Pending,
131 }
132 }
133}
134
135impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
136
137/// Error returned by [`try_read`](Pipe::try_read).
138#[derive(PartialEq, Eq, Clone, Copy, Debug)]
139#[cfg_attr(feature = "defmt", derive(defmt::Format))]
140pub enum TryReadError {
141 /// No data could be read from the pipe because it is currently
142 /// empty, and reading would require blocking.
143 Empty,
144}
145
146/// Error returned by [`try_write`](Pipe::try_write).
147#[derive(PartialEq, Eq, Clone, Copy, Debug)]
148#[cfg_attr(feature = "defmt", derive(defmt::Format))]
149pub enum TryWriteError {
150 /// No data could be written to the pipe because it is
151 /// currently full, and writing would require blocking.
152 Full,
153}
154
155struct PipeState<const N: usize> {
156 buffer: RingBuffer<N>,
157 read_waker: WakerRegistration,
158 write_waker: WakerRegistration,
159}
160
161impl<const N: usize> PipeState<N> {
162 const fn new() -> Self {
163 PipeState {
164 buffer: RingBuffer::new(),
165 read_waker: WakerRegistration::new(),
166 write_waker: WakerRegistration::new(),
167 }
168 }
169
170 fn clear(&mut self) {
171 self.buffer.clear();
172 self.write_waker.wake();
173 }
174
175 fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
176 self.try_read_with_context(None, buf)
177 }
178
179 fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
180 if self.buffer.is_full() {
181 self.write_waker.wake();
182 }
183
184 let available = self.buffer.pop_buf();
185 if available.is_empty() {
186 if let Some(cx) = cx {
187 self.read_waker.register(cx.waker());
188 }
189 return Err(TryReadError::Empty);
190 }
191
192 let n = available.len().min(buf.len());
193 buf[..n].copy_from_slice(&available[..n]);
194 self.buffer.pop(n);
195 Ok(n)
196 }
197
198 fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
199 self.try_write_with_context(None, buf)
200 }
201
202 fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
203 if self.buffer.is_empty() {
204 self.read_waker.wake();
205 }
206
207 let available = self.buffer.push_buf();
208 if available.is_empty() {
209 if let Some(cx) = cx {
210 self.write_waker.register(cx.waker());
211 }
212 return Err(TryWriteError::Full);
213 }
214
215 let n = available.len().min(buf.len());
216 available[..n].copy_from_slice(&buf[..n]);
217 self.buffer.push(n);
218 Ok(n)
219 }
220}
221
222/// A bounded pipe for communicating between asynchronous tasks
223/// with backpressure.
224///
225/// The pipe will buffer up to the provided number of messages. Once the
226/// buffer is full, attempts to `write` new messages will wait until a message is
227/// read from the pipe.
228///
229/// All data written will become available in the same order as it was written.
230pub struct Pipe<M, const N: usize>
231where
232 M: RawMutex,
233{
234 inner: Mutex<M, RefCell<PipeState<N>>>,
235}
236
237impl<M, const N: usize> Pipe<M, N>
238where
239 M: RawMutex,
240{
241 /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
242 ///
243 /// ```
244 /// use embassy_util::pipe::Pipe;
245 /// use embassy_util::blocking_mutex::raw::NoopRawMutex;
246 ///
247 /// // Declare a bounded pipe, with a buffer of 256 bytes.
248 /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
249 /// ```
250 pub const fn new() -> Self {
251 Self {
252 inner: Mutex::new(RefCell::new(PipeState::new())),
253 }
254 }
255
256 fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
257 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
258 }
259
260 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
261 self.lock(|c| c.try_read_with_context(cx, buf))
262 }
263
264 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
265 self.lock(|c| c.try_write_with_context(cx, buf))
266 }
267
268 /// Get a writer for this pipe.
269 pub fn writer(&self) -> Writer<'_, M, N> {
270 Writer { pipe: self }
271 }
272
273 /// Get a reader for this pipe.
274 pub fn reader(&self) -> Reader<'_, M, N> {
275 Reader { pipe: self }
276 }
277
278 /// Write a value, waiting until there is capacity.
279 ///
280 /// Writeing completes when the value has been pushed to the pipe's queue.
281 /// This doesn't mean the value has been read yet.
282 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
283 WriteFuture { pipe: self, buf }
284 }
285
286 /// Attempt to immediately write a message.
287 ///
288 /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
289 /// buffer is full, instead of waiting.
290 ///
291 /// # Errors
292 ///
293 /// If the pipe capacity has been reached, i.e., the pipe has `n`
294 /// buffered values where `n` is the argument passed to [`Pipe`], then an
295 /// error is returned.
296 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
297 self.lock(|c| c.try_write(buf))
298 }
299
300 /// Receive the next value.
301 ///
302 /// If there are no messages in the pipe's buffer, this method will
303 /// wait until a message is written.
304 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
305 ReadFuture { pipe: self, buf }
306 }
307
308 /// Attempt to immediately read a message.
309 ///
310 /// This method will either read a message from the pipe immediately or return an error
311 /// if the pipe is empty.
312 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
313 self.lock(|c| c.try_read(buf))
314 }
315
316 /// Clear the data in the pipe's buffer.
317 pub fn clear(&self) {
318 self.lock(|c| c.clear())
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use futures_executor::ThreadPool;
325 use futures_util::task::SpawnExt;
326
327 use super::*;
328 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
329 use crate::Forever;
330
331 fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
332 N - c.buffer.len()
333 }
334
335 #[test]
336 fn writing_once() {
337 let mut c = PipeState::<3>::new();
338 assert!(c.try_write(&[1]).is_ok());
339 assert_eq!(capacity(&c), 2);
340 }
341
342 #[test]
343 fn writing_when_full() {
344 let mut c = PipeState::<3>::new();
345 assert_eq!(c.try_write(&[42]), Ok(1));
346 assert_eq!(c.try_write(&[43]), Ok(1));
347 assert_eq!(c.try_write(&[44]), Ok(1));
348 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
349 assert_eq!(capacity(&c), 0);
350 }
351
352 #[test]
353 fn receiving_once_with_one_send() {
354 let mut c = PipeState::<3>::new();
355 assert!(c.try_write(&[42]).is_ok());
356 let mut buf = [0; 16];
357 assert_eq!(c.try_read(&mut buf), Ok(1));
358 assert_eq!(buf[0], 42);
359 assert_eq!(capacity(&c), 3);
360 }
361
362 #[test]
363 fn receiving_when_empty() {
364 let mut c = PipeState::<3>::new();
365 let mut buf = [0; 16];
366 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
367 assert_eq!(capacity(&c), 3);
368 }
369
370 #[test]
371 fn simple_send_and_receive() {
372 let c = Pipe::<NoopRawMutex, 3>::new();
373 assert!(c.try_write(&[42]).is_ok());
374 let mut buf = [0; 16];
375 assert_eq!(c.try_read(&mut buf), Ok(1));
376 assert_eq!(buf[0], 42);
377 }
378
379 #[test]
380 fn cloning() {
381 let c = Pipe::<NoopRawMutex, 3>::new();
382 let r1 = c.reader();
383 let w1 = c.writer();
384
385 let _ = r1.clone();
386 let _ = w1.clone();
387 }
388
389 #[futures_test::test]
390 async fn receiver_receives_given_try_write_async() {
391 let executor = ThreadPool::new().unwrap();
392
393 static CHANNEL: Forever<Pipe<CriticalSectionRawMutex, 3>> = Forever::new();
394 let c = &*CHANNEL.put(Pipe::new());
395 let c2 = c;
396 let f = async move {
397 assert_eq!(c2.try_write(&[42]), Ok(1));
398 };
399 executor.spawn(f).unwrap();
400 let mut buf = [0; 16];
401 assert_eq!(c.read(&mut buf).await, 1);
402 assert_eq!(buf[0], 42);
403 }
404
405 #[futures_test::test]
406 async fn sender_send_completes_if_capacity() {
407 let c = Pipe::<CriticalSectionRawMutex, 1>::new();
408 c.write(&[42]).await;
409 let mut buf = [0; 16];
410 assert_eq!(c.read(&mut buf).await, 1);
411 assert_eq!(buf[0], 42);
412 }
413}
diff --git a/embassy-util/src/ring_buffer.rs b/embassy-util/src/ring_buffer.rs
new file mode 100644
index 000000000..521084024
--- /dev/null
+++ b/embassy-util/src/ring_buffer.rs
@@ -0,0 +1,146 @@
1pub struct RingBuffer<const N: usize> {
2 buf: [u8; N],
3 start: usize,
4 end: usize,
5 empty: bool,
6}
7
8impl<const N: usize> RingBuffer<N> {
9 pub const fn new() -> Self {
10 Self {
11 buf: [0; N],
12 start: 0,
13 end: 0,
14 empty: true,
15 }
16 }
17
18 pub fn push_buf(&mut self) -> &mut [u8] {
19 if self.start == self.end && !self.empty {
20 trace!(" ringbuf: push_buf empty");
21 return &mut self.buf[..0];
22 }
23
24 let n = if self.start <= self.end {
25 self.buf.len() - self.end
26 } else {
27 self.start - self.end
28 };
29
30 trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
31 &mut self.buf[self.end..self.end + n]
32 }
33
34 pub fn push(&mut self, n: usize) {
35 trace!(" ringbuf: push {:?}", n);
36 if n == 0 {
37 return;
38 }
39
40 self.end = self.wrap(self.end + n);
41 self.empty = false;
42 }
43
44 pub fn pop_buf(&mut self) -> &mut [u8] {
45 if self.empty {
46 trace!(" ringbuf: pop_buf empty");
47 return &mut self.buf[..0];
48 }
49
50 let n = if self.end <= self.start {
51 self.buf.len() - self.start
52 } else {
53 self.end - self.start
54 };
55
56 trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
57 &mut self.buf[self.start..self.start + n]
58 }
59
60 pub fn pop(&mut self, n: usize) {
61 trace!(" ringbuf: pop {:?}", n);
62 if n == 0 {
63 return;
64 }
65
66 self.start = self.wrap(self.start + n);
67 self.empty = self.start == self.end;
68 }
69
70 pub fn is_full(&self) -> bool {
71 self.start == self.end && !self.empty
72 }
73
74 pub fn is_empty(&self) -> bool {
75 self.empty
76 }
77
78 #[allow(unused)]
79 pub fn len(&self) -> usize {
80 if self.empty {
81 0
82 } else if self.start < self.end {
83 self.end - self.start
84 } else {
85 N + self.end - self.start
86 }
87 }
88
89 pub fn clear(&mut self) {
90 self.start = 0;
91 self.end = 0;
92 self.empty = true;
93 }
94
95 fn wrap(&self, n: usize) -> usize {
96 assert!(n <= self.buf.len());
97 if n == self.buf.len() {
98 0
99 } else {
100 n
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[test]
110 fn push_pop() {
111 let mut rb: RingBuffer<4> = RingBuffer::new();
112 let buf = rb.push_buf();
113 assert_eq!(4, buf.len());
114 buf[0] = 1;
115 buf[1] = 2;
116 buf[2] = 3;
117 buf[3] = 4;
118 rb.push(4);
119
120 let buf = rb.pop_buf();
121 assert_eq!(4, buf.len());
122 assert_eq!(1, buf[0]);
123 rb.pop(1);
124
125 let buf = rb.pop_buf();
126 assert_eq!(3, buf.len());
127 assert_eq!(2, buf[0]);
128 rb.pop(1);
129
130 let buf = rb.pop_buf();
131 assert_eq!(2, buf.len());
132 assert_eq!(3, buf[0]);
133 rb.pop(1);
134
135 let buf = rb.pop_buf();
136 assert_eq!(1, buf.len());
137 assert_eq!(4, buf[0]);
138 rb.pop(1);
139
140 let buf = rb.pop_buf();
141 assert_eq!(0, buf.len());
142
143 let buf = rb.push_buf();
144 assert_eq!(4, buf.len());
145 }
146}