aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/pipe.rs
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src/pipe.rs')
-rw-r--r--embassy-sync/src/pipe.rs551
1 files changed, 551 insertions, 0 deletions
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
new file mode 100644
index 000000000..7d64b648e
--- /dev/null
+++ b/embassy-sync/src/pipe.rs
@@ -0,0 +1,551 @@
1//! Async byte stream pipe.
2
3use core::cell::RefCell;
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use crate::blocking_mutex::raw::RawMutex;
9use crate::blocking_mutex::Mutex;
10use crate::ring_buffer::RingBuffer;
11use crate::waitqueue::WakerRegistration;
12
13/// Write-only access to a [`Pipe`].
14#[derive(Copy)]
15pub struct Writer<'p, M, const N: usize>
16where
17 M: RawMutex,
18{
19 pipe: &'p Pipe<M, N>,
20}
21
22impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
23where
24 M: RawMutex,
25{
26 fn clone(&self) -> Self {
27 Writer { pipe: self.pipe }
28 }
29}
30
31impl<'p, M, const N: usize> Writer<'p, M, N>
32where
33 M: RawMutex,
34{
35 /// Writes a value.
36 ///
37 /// See [`Pipe::write()`]
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
39 self.pipe.write(buf)
40 }
41
42 /// Attempt to immediately write a message.
43 ///
44 /// See [`Pipe::write()`]
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
46 self.pipe.try_write(buf)
47 }
48}
49
50/// Future returned by [`Pipe::write`] and [`Writer::write`].
51pub struct WriteFuture<'p, M, const N: usize>
52where
53 M: RawMutex,
54{
55 pipe: &'p Pipe<M, N>,
56 buf: &'p [u8],
57}
58
59impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
60where
61 M: RawMutex,
62{
63 type Output = usize;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 match self.pipe.try_write_with_context(Some(cx), self.buf) {
67 Ok(n) => Poll::Ready(n),
68 Err(TryWriteError::Full) => Poll::Pending,
69 }
70 }
71}
72
73impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
74
75/// Read-only access to a [`Pipe`].
76#[derive(Copy)]
77pub struct Reader<'p, M, const N: usize>
78where
79 M: RawMutex,
80{
81 pipe: &'p Pipe<M, N>,
82}
83
84impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
85where
86 M: RawMutex,
87{
88 fn clone(&self) -> Self {
89 Reader { pipe: self.pipe }
90 }
91}
92
93impl<'p, M, const N: usize> Reader<'p, M, N>
94where
95 M: RawMutex,
96{
97 /// Reads a value.
98 ///
99 /// See [`Pipe::read()`]
100 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
101 self.pipe.read(buf)
102 }
103
104 /// Attempt to immediately read a message.
105 ///
106 /// See [`Pipe::read()`]
107 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
108 self.pipe.try_read(buf)
109 }
110}
111
112/// Future returned by [`Pipe::read`] and [`Reader::read`].
113pub struct ReadFuture<'p, M, const N: usize>
114where
115 M: RawMutex,
116{
117 pipe: &'p Pipe<M, N>,
118 buf: &'p mut [u8],
119}
120
121impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
122where
123 M: RawMutex,
124{
125 type Output = usize;
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 match self.pipe.try_read_with_context(Some(cx), self.buf) {
129 Ok(n) => Poll::Ready(n),
130 Err(TryReadError::Empty) => Poll::Pending,
131 }
132 }
133}
134
135impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
136
137/// Error returned by [`try_read`](Pipe::try_read).
138#[derive(PartialEq, Eq, Clone, Copy, Debug)]
139#[cfg_attr(feature = "defmt", derive(defmt::Format))]
140pub enum TryReadError {
141 /// No data could be read from the pipe because it is currently
142 /// empty, and reading would require blocking.
143 Empty,
144}
145
146/// Error returned by [`try_write`](Pipe::try_write).
147#[derive(PartialEq, Eq, Clone, Copy, Debug)]
148#[cfg_attr(feature = "defmt", derive(defmt::Format))]
149pub enum TryWriteError {
150 /// No data could be written to the pipe because it is
151 /// currently full, and writing would require blocking.
152 Full,
153}
154
155struct PipeState<const N: usize> {
156 buffer: RingBuffer<N>,
157 read_waker: WakerRegistration,
158 write_waker: WakerRegistration,
159}
160
161impl<const N: usize> PipeState<N> {
162 const fn new() -> Self {
163 PipeState {
164 buffer: RingBuffer::new(),
165 read_waker: WakerRegistration::new(),
166 write_waker: WakerRegistration::new(),
167 }
168 }
169
170 fn clear(&mut self) {
171 self.buffer.clear();
172 self.write_waker.wake();
173 }
174
175 fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
176 self.try_read_with_context(None, buf)
177 }
178
179 fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
180 if self.buffer.is_full() {
181 self.write_waker.wake();
182 }
183
184 let available = self.buffer.pop_buf();
185 if available.is_empty() {
186 if let Some(cx) = cx {
187 self.read_waker.register(cx.waker());
188 }
189 return Err(TryReadError::Empty);
190 }
191
192 let n = available.len().min(buf.len());
193 buf[..n].copy_from_slice(&available[..n]);
194 self.buffer.pop(n);
195 Ok(n)
196 }
197
198 fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
199 self.try_write_with_context(None, buf)
200 }
201
202 fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
203 if self.buffer.is_empty() {
204 self.read_waker.wake();
205 }
206
207 let available = self.buffer.push_buf();
208 if available.is_empty() {
209 if let Some(cx) = cx {
210 self.write_waker.register(cx.waker());
211 }
212 return Err(TryWriteError::Full);
213 }
214
215 let n = available.len().min(buf.len());
216 available[..n].copy_from_slice(&buf[..n]);
217 self.buffer.push(n);
218 Ok(n)
219 }
220}
221
222/// A bounded pipe for communicating between asynchronous tasks
223/// with backpressure.
224///
225/// The pipe will buffer up to the provided number of messages. Once the
226/// buffer is full, attempts to `write` new messages will wait until a message is
227/// read from the pipe.
228///
229/// All data written will become available in the same order as it was written.
230pub struct Pipe<M, const N: usize>
231where
232 M: RawMutex,
233{
234 inner: Mutex<M, RefCell<PipeState<N>>>,
235}
236
237impl<M, const N: usize> Pipe<M, N>
238where
239 M: RawMutex,
240{
241 /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
242 ///
243 /// ```
244 /// use embassy_sync::pipe::Pipe;
245 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
246 ///
247 /// // Declare a bounded pipe, with a buffer of 256 bytes.
248 /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
249 /// ```
250 pub const fn new() -> Self {
251 Self {
252 inner: Mutex::new(RefCell::new(PipeState::new())),
253 }
254 }
255
256 fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
257 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
258 }
259
260 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
261 self.lock(|c| c.try_read_with_context(cx, buf))
262 }
263
264 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
265 self.lock(|c| c.try_write_with_context(cx, buf))
266 }
267
268 /// Get a writer for this pipe.
269 pub fn writer(&self) -> Writer<'_, M, N> {
270 Writer { pipe: self }
271 }
272
273 /// Get a reader for this pipe.
274 pub fn reader(&self) -> Reader<'_, M, N> {
275 Reader { pipe: self }
276 }
277
278 /// Write a value, waiting until there is capacity.
279 ///
280 /// Writeing completes when the value has been pushed to the pipe's queue.
281 /// This doesn't mean the value has been read yet.
282 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
283 WriteFuture { pipe: self, buf }
284 }
285
286 /// Attempt to immediately write a message.
287 ///
288 /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
289 /// buffer is full, instead of waiting.
290 ///
291 /// # Errors
292 ///
293 /// If the pipe capacity has been reached, i.e., the pipe has `n`
294 /// buffered values where `n` is the argument passed to [`Pipe`], then an
295 /// error is returned.
296 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
297 self.lock(|c| c.try_write(buf))
298 }
299
300 /// Receive the next value.
301 ///
302 /// If there are no messages in the pipe's buffer, this method will
303 /// wait until a message is written.
304 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
305 ReadFuture { pipe: self, buf }
306 }
307
308 /// Attempt to immediately read a message.
309 ///
310 /// This method will either read a message from the pipe immediately or return an error
311 /// if the pipe is empty.
312 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
313 self.lock(|c| c.try_read(buf))
314 }
315
316 /// Clear the data in the pipe's buffer.
317 pub fn clear(&self) {
318 self.lock(|c| c.clear())
319 }
320
321 /// Return whether the pipe is full (no free space in the buffer)
322 pub fn is_full(&self) -> bool {
323 self.len() == N
324 }
325
326 /// Return whether the pipe is empty (no data buffered)
327 pub fn is_empty(&self) -> bool {
328 self.len() == 0
329 }
330
331 /// Total byte capacity.
332 ///
333 /// This is the same as the `N` generic param.
334 pub fn capacity(&self) -> usize {
335 N
336 }
337
338 /// Used byte capacity.
339 pub fn len(&self) -> usize {
340 self.lock(|c| c.buffer.len())
341 }
342
343 /// Free byte capacity.
344 ///
345 /// This is equivalent to `capacity() - len()`
346 pub fn free_capacity(&self) -> usize {
347 N - self.len()
348 }
349}
350
351#[cfg(feature = "nightly")]
352mod io_impls {
353 use core::convert::Infallible;
354
355 use futures_util::FutureExt;
356
357 use super::*;
358
359 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> {
360 type Error = Infallible;
361 }
362
363 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> {
364 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
365 where
366 Self: 'a;
367
368 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
369 Pipe::read(self, buf).map(Ok)
370 }
371 }
372
373 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> {
374 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
375 where
376 Self: 'a;
377
378 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
379 Pipe::write(self, buf).map(Ok)
380 }
381
382 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
383 where
384 Self: 'a;
385
386 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
387 futures_util::future::ready(Ok(()))
388 }
389 }
390
391 impl<M: RawMutex, const N: usize> embedded_io::Io for &Pipe<M, N> {
392 type Error = Infallible;
393 }
394
395 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> {
396 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
397 where
398 Self: 'a;
399
400 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
401 Pipe::read(self, buf).map(Ok)
402 }
403 }
404
405 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> {
406 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
407 where
408 Self: 'a;
409
410 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
411 Pipe::write(self, buf).map(Ok)
412 }
413
414 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
415 where
416 Self: 'a;
417
418 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
419 futures_util::future::ready(Ok(()))
420 }
421 }
422
423 impl<M: RawMutex, const N: usize> embedded_io::Io for Reader<'_, M, N> {
424 type Error = Infallible;
425 }
426
427 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> {
428 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
429 where
430 Self: 'a;
431
432 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
433 Reader::read(self, buf).map(Ok)
434 }
435 }
436
437 impl<M: RawMutex, const N: usize> embedded_io::Io for Writer<'_, M, N> {
438 type Error = Infallible;
439 }
440
441 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> {
442 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
443 where
444 Self: 'a;
445
446 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
447 Writer::write(self, buf).map(Ok)
448 }
449
450 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
451 where
452 Self: 'a;
453
454 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
455 futures_util::future::ready(Ok(()))
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use futures_executor::ThreadPool;
463 use futures_util::task::SpawnExt;
464 use static_cell::StaticCell;
465
466 use super::*;
467 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
468
469 fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
470 N - c.buffer.len()
471 }
472
473 #[test]
474 fn writing_once() {
475 let mut c = PipeState::<3>::new();
476 assert!(c.try_write(&[1]).is_ok());
477 assert_eq!(capacity(&c), 2);
478 }
479
480 #[test]
481 fn writing_when_full() {
482 let mut c = PipeState::<3>::new();
483 assert_eq!(c.try_write(&[42]), Ok(1));
484 assert_eq!(c.try_write(&[43]), Ok(1));
485 assert_eq!(c.try_write(&[44]), Ok(1));
486 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
487 assert_eq!(capacity(&c), 0);
488 }
489
490 #[test]
491 fn receiving_once_with_one_send() {
492 let mut c = PipeState::<3>::new();
493 assert!(c.try_write(&[42]).is_ok());
494 let mut buf = [0; 16];
495 assert_eq!(c.try_read(&mut buf), Ok(1));
496 assert_eq!(buf[0], 42);
497 assert_eq!(capacity(&c), 3);
498 }
499
500 #[test]
501 fn receiving_when_empty() {
502 let mut c = PipeState::<3>::new();
503 let mut buf = [0; 16];
504 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
505 assert_eq!(capacity(&c), 3);
506 }
507
508 #[test]
509 fn simple_send_and_receive() {
510 let c = Pipe::<NoopRawMutex, 3>::new();
511 assert!(c.try_write(&[42]).is_ok());
512 let mut buf = [0; 16];
513 assert_eq!(c.try_read(&mut buf), Ok(1));
514 assert_eq!(buf[0], 42);
515 }
516
517 #[test]
518 fn cloning() {
519 let c = Pipe::<NoopRawMutex, 3>::new();
520 let r1 = c.reader();
521 let w1 = c.writer();
522
523 let _ = r1.clone();
524 let _ = w1.clone();
525 }
526
527 #[futures_test::test]
528 async fn receiver_receives_given_try_write_async() {
529 let executor = ThreadPool::new().unwrap();
530
531 static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new();
532 let c = &*CHANNEL.init(Pipe::new());
533 let c2 = c;
534 let f = async move {
535 assert_eq!(c2.try_write(&[42]), Ok(1));
536 };
537 executor.spawn(f).unwrap();
538 let mut buf = [0; 16];
539 assert_eq!(c.read(&mut buf).await, 1);
540 assert_eq!(buf[0], 42);
541 }
542
543 #[futures_test::test]
544 async fn sender_send_completes_if_capacity() {
545 let c = Pipe::<CriticalSectionRawMutex, 1>::new();
546 c.write(&[42]).await;
547 let mut buf = [0; 16];
548 assert_eq!(c.read(&mut buf).await, 1);
549 assert_eq!(buf[0], 42);
550 }
551}