aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-08-04 21:40:44 +0000
committerGitHub <[email protected]>2022-08-04 21:40:44 +0000
commitba67f6d3a8c84262fcc9b2020670515d192555f1 (patch)
tree445397e11fd4e26b99c351c0d83a48080ef1d81b
parent1924f2d67d32a4466e71ef0aabc84305a9e8e165 (diff)
parent99a97ec6c9c1212c16a5411e6743979de5a040e9 (diff)
Merge #891
891: Async Pipe r=Dirbaio a=Dirbaio Co-authored-by: Dario Nieuwenhuis <[email protected]>
-rw-r--r--embassy-util/Cargo.toml6
-rw-r--r--embassy-util/src/lib.rs5
-rw-r--r--embassy-util/src/pipe.rs551
-rw-r--r--embassy-util/src/ring_buffer.rs146
4 files changed, 706 insertions, 2 deletions
diff --git a/embassy-util/Cargo.toml b/embassy-util/Cargo.toml
index 32b796c0a..ef5acc0f0 100644
--- a/embassy-util/Cargo.toml
+++ b/embassy-util/Cargo.toml
@@ -6,11 +6,14 @@ edition = "2021"
6[package.metadata.embassy_docs] 6[package.metadata.embassy_docs]
7src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/" 7src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/"
8src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/" 8src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/"
9features = ["nightly", "defmt", "unstable-traits", "time", "time-tick-1mhz"] 9features = ["nightly"]
10flavors = [ 10flavors = [
11 { name = "default", target = "x86_64-unknown-linux-gnu" }, 11 { name = "default", target = "x86_64-unknown-linux-gnu" },
12] 12]
13 13
14[features]
15nightly = ["embedded-io/async"]
16
14[dependencies] 17[dependencies]
15defmt = { version = "0.3", optional = true } 18defmt = { version = "0.3", optional = true }
16log = { version = "0.4.14", optional = true } 19log = { version = "0.4.14", optional = true }
@@ -20,6 +23,7 @@ atomic-polyfill = "0.1.5"
20critical-section = "0.2.5" 23critical-section = "0.2.5"
21heapless = "0.7.5" 24heapless = "0.7.5"
22cfg-if = "1.0.0" 25cfg-if = "1.0.0"
26embedded-io = "0.3.0"
23 27
24[dev-dependencies] 28[dev-dependencies]
25futures-executor = { version = "0.3.17", features = [ "thread-pool" ] } 29futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs
index 07b1633ea..110c72811 100644
--- a/embassy-util/src/lib.rs
+++ b/embassy-util/src/lib.rs
@@ -1,6 +1,5 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] 2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
3#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))]
4#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
5#![doc = include_str!("../../README.md")] 4#![doc = include_str!("../../README.md")]
6#![warn(missing_docs)] 5#![warn(missing_docs)]
@@ -8,9 +7,13 @@
8// This mod MUST go first, so that the others see its macros. 7// This mod MUST go first, so that the others see its macros.
9pub(crate) mod fmt; 8pub(crate) mod fmt;
10 9
10// internal use
11mod ring_buffer;
12
11pub mod blocking_mutex; 13pub mod blocking_mutex;
12pub mod channel; 14pub mod channel;
13pub mod mutex; 15pub mod mutex;
16pub mod pipe;
14pub mod waitqueue; 17pub mod waitqueue;
15 18
16mod forever; 19mod forever;
diff --git a/embassy-util/src/pipe.rs b/embassy-util/src/pipe.rs
new file mode 100644
index 000000000..9c20aeeff
--- /dev/null
+++ b/embassy-util/src/pipe.rs
@@ -0,0 +1,551 @@
1//! Async byte stream pipe.
2
3use core::cell::RefCell;
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use crate::blocking_mutex::raw::RawMutex;
9use crate::blocking_mutex::Mutex;
10use crate::ring_buffer::RingBuffer;
11use crate::waitqueue::WakerRegistration;
12
13/// Write-only access to a [`Pipe`].
14#[derive(Copy)]
15pub struct Writer<'p, M, const N: usize>
16where
17 M: RawMutex,
18{
19 pipe: &'p Pipe<M, N>,
20}
21
22impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
23where
24 M: RawMutex,
25{
26 fn clone(&self) -> Self {
27 Writer { pipe: self.pipe }
28 }
29}
30
31impl<'p, M, const N: usize> Writer<'p, M, N>
32where
33 M: RawMutex,
34{
35 /// Writes a value.
36 ///
37 /// See [`Pipe::write()`]
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
39 self.pipe.write(buf)
40 }
41
42 /// Attempt to immediately write a message.
43 ///
44 /// See [`Pipe::write()`]
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
46 self.pipe.try_write(buf)
47 }
48}
49
50/// Future returned by [`Pipe::write`] and [`Writer::write`].
51pub struct WriteFuture<'p, M, const N: usize>
52where
53 M: RawMutex,
54{
55 pipe: &'p Pipe<M, N>,
56 buf: &'p [u8],
57}
58
59impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
60where
61 M: RawMutex,
62{
63 type Output = usize;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 match self.pipe.try_write_with_context(Some(cx), self.buf) {
67 Ok(n) => Poll::Ready(n),
68 Err(TryWriteError::Full) => Poll::Pending,
69 }
70 }
71}
72
73impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
74
75/// Read-only access to a [`Pipe`].
76#[derive(Copy)]
77pub struct Reader<'p, M, const N: usize>
78where
79 M: RawMutex,
80{
81 pipe: &'p Pipe<M, N>,
82}
83
84impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
85where
86 M: RawMutex,
87{
88 fn clone(&self) -> Self {
89 Reader { pipe: self.pipe }
90 }
91}
92
93impl<'p, M, const N: usize> Reader<'p, M, N>
94where
95 M: RawMutex,
96{
97 /// Reads a value.
98 ///
99 /// See [`Pipe::read()`]
100 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
101 self.pipe.read(buf)
102 }
103
104 /// Attempt to immediately read a message.
105 ///
106 /// See [`Pipe::read()`]
107 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
108 self.pipe.try_read(buf)
109 }
110}
111
112/// Future returned by [`Pipe::read`] and [`Reader::read`].
113pub struct ReadFuture<'p, M, const N: usize>
114where
115 M: RawMutex,
116{
117 pipe: &'p Pipe<M, N>,
118 buf: &'p mut [u8],
119}
120
121impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
122where
123 M: RawMutex,
124{
125 type Output = usize;
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 match self.pipe.try_read_with_context(Some(cx), self.buf) {
129 Ok(n) => Poll::Ready(n),
130 Err(TryReadError::Empty) => Poll::Pending,
131 }
132 }
133}
134
135impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
136
137/// Error returned by [`try_read`](Pipe::try_read).
138#[derive(PartialEq, Eq, Clone, Copy, Debug)]
139#[cfg_attr(feature = "defmt", derive(defmt::Format))]
140pub enum TryReadError {
141 /// No data could be read from the pipe because it is currently
142 /// empty, and reading would require blocking.
143 Empty,
144}
145
146/// Error returned by [`try_write`](Pipe::try_write).
147#[derive(PartialEq, Eq, Clone, Copy, Debug)]
148#[cfg_attr(feature = "defmt", derive(defmt::Format))]
149pub enum TryWriteError {
150 /// No data could be written to the pipe because it is
151 /// currently full, and writing would require blocking.
152 Full,
153}
154
155struct PipeState<const N: usize> {
156 buffer: RingBuffer<N>,
157 read_waker: WakerRegistration,
158 write_waker: WakerRegistration,
159}
160
161impl<const N: usize> PipeState<N> {
162 const fn new() -> Self {
163 PipeState {
164 buffer: RingBuffer::new(),
165 read_waker: WakerRegistration::new(),
166 write_waker: WakerRegistration::new(),
167 }
168 }
169
170 fn clear(&mut self) {
171 self.buffer.clear();
172 self.write_waker.wake();
173 }
174
175 fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
176 self.try_read_with_context(None, buf)
177 }
178
179 fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
180 if self.buffer.is_full() {
181 self.write_waker.wake();
182 }
183
184 let available = self.buffer.pop_buf();
185 if available.is_empty() {
186 if let Some(cx) = cx {
187 self.read_waker.register(cx.waker());
188 }
189 return Err(TryReadError::Empty);
190 }
191
192 let n = available.len().min(buf.len());
193 buf[..n].copy_from_slice(&available[..n]);
194 self.buffer.pop(n);
195 Ok(n)
196 }
197
198 fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
199 self.try_write_with_context(None, buf)
200 }
201
202 fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
203 if self.buffer.is_empty() {
204 self.read_waker.wake();
205 }
206
207 let available = self.buffer.push_buf();
208 if available.is_empty() {
209 if let Some(cx) = cx {
210 self.write_waker.register(cx.waker());
211 }
212 return Err(TryWriteError::Full);
213 }
214
215 let n = available.len().min(buf.len());
216 available[..n].copy_from_slice(&buf[..n]);
217 self.buffer.push(n);
218 Ok(n)
219 }
220}
221
222/// A bounded pipe for communicating between asynchronous tasks
223/// with backpressure.
224///
225/// The pipe will buffer up to the provided number of messages. Once the
226/// buffer is full, attempts to `write` new messages will wait until a message is
227/// read from the pipe.
228///
229/// All data written will become available in the same order as it was written.
230pub struct Pipe<M, const N: usize>
231where
232 M: RawMutex,
233{
234 inner: Mutex<M, RefCell<PipeState<N>>>,
235}
236
237impl<M, const N: usize> Pipe<M, N>
238where
239 M: RawMutex,
240{
241 /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
242 ///
243 /// ```
244 /// use embassy_util::pipe::Pipe;
245 /// use embassy_util::blocking_mutex::raw::NoopRawMutex;
246 ///
247 /// // Declare a bounded pipe, with a buffer of 256 bytes.
248 /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
249 /// ```
250 pub const fn new() -> Self {
251 Self {
252 inner: Mutex::new(RefCell::new(PipeState::new())),
253 }
254 }
255
256 fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
257 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
258 }
259
260 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
261 self.lock(|c| c.try_read_with_context(cx, buf))
262 }
263
264 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
265 self.lock(|c| c.try_write_with_context(cx, buf))
266 }
267
268 /// Get a writer for this pipe.
269 pub fn writer(&self) -> Writer<'_, M, N> {
270 Writer { pipe: self }
271 }
272
273 /// Get a reader for this pipe.
274 pub fn reader(&self) -> Reader<'_, M, N> {
275 Reader { pipe: self }
276 }
277
278 /// Write a value, waiting until there is capacity.
279 ///
280 /// Writeing completes when the value has been pushed to the pipe's queue.
281 /// This doesn't mean the value has been read yet.
282 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
283 WriteFuture { pipe: self, buf }
284 }
285
286 /// Attempt to immediately write a message.
287 ///
288 /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
289 /// buffer is full, instead of waiting.
290 ///
291 /// # Errors
292 ///
293 /// If the pipe capacity has been reached, i.e., the pipe has `n`
294 /// buffered values where `n` is the argument passed to [`Pipe`], then an
295 /// error is returned.
296 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
297 self.lock(|c| c.try_write(buf))
298 }
299
300 /// Receive the next value.
301 ///
302 /// If there are no messages in the pipe's buffer, this method will
303 /// wait until a message is written.
304 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
305 ReadFuture { pipe: self, buf }
306 }
307
308 /// Attempt to immediately read a message.
309 ///
310 /// This method will either read a message from the pipe immediately or return an error
311 /// if the pipe is empty.
312 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
313 self.lock(|c| c.try_read(buf))
314 }
315
316 /// Clear the data in the pipe's buffer.
317 pub fn clear(&self) {
318 self.lock(|c| c.clear())
319 }
320
321 /// Return whether the pipe is full (no free space in the buffer)
322 pub fn is_full(&self) -> bool {
323 self.len() == N
324 }
325
326 /// Return whether the pipe is empty (no data buffered)
327 pub fn is_empty(&self) -> bool {
328 self.len() == 0
329 }
330
331 /// Total byte capacity.
332 ///
333 /// This is the same as the `N` generic param.
334 pub fn capacity(&self) -> usize {
335 N
336 }
337
338 /// Used byte capacity.
339 pub fn len(&self) -> usize {
340 self.lock(|c| c.buffer.len())
341 }
342
343 /// Free byte capacity.
344 ///
345 /// This is equivalent to `capacity() - len()`
346 pub fn free_capacity(&self) -> usize {
347 N - self.len()
348 }
349}
350
351#[cfg(feature = "nightly")]
352mod io_impls {
353 use core::convert::Infallible;
354
355 use futures_util::FutureExt;
356
357 use super::*;
358
359 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> {
360 type Error = Infallible;
361 }
362
363 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> {
364 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
365 where
366 Self: 'a;
367
368 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
369 Pipe::read(self, buf).map(Ok)
370 }
371 }
372
373 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> {
374 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
375 where
376 Self: 'a;
377
378 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
379 Pipe::write(self, buf).map(Ok)
380 }
381
382 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
383 where
384 Self: 'a;
385
386 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
387 futures_util::future::ready(Ok(()))
388 }
389 }
390
391 impl<M: RawMutex, const N: usize> embedded_io::Io for &Pipe<M, N> {
392 type Error = Infallible;
393 }
394
395 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> {
396 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
397 where
398 Self: 'a;
399
400 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
401 Pipe::read(self, buf).map(Ok)
402 }
403 }
404
405 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> {
406 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
407 where
408 Self: 'a;
409
410 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
411 Pipe::write(self, buf).map(Ok)
412 }
413
414 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
415 where
416 Self: 'a;
417
418 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
419 futures_util::future::ready(Ok(()))
420 }
421 }
422
423 impl<M: RawMutex, const N: usize> embedded_io::Io for Reader<'_, M, N> {
424 type Error = Infallible;
425 }
426
427 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> {
428 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
429 where
430 Self: 'a;
431
432 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
433 Reader::read(self, buf).map(Ok)
434 }
435 }
436
437 impl<M: RawMutex, const N: usize> embedded_io::Io for Writer<'_, M, N> {
438 type Error = Infallible;
439 }
440
441 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> {
442 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
443 where
444 Self: 'a;
445
446 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
447 Writer::write(self, buf).map(Ok)
448 }
449
450 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
451 where
452 Self: 'a;
453
454 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
455 futures_util::future::ready(Ok(()))
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use futures_executor::ThreadPool;
463 use futures_util::task::SpawnExt;
464
465 use super::*;
466 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
467 use crate::Forever;
468
469 fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
470 N - c.buffer.len()
471 }
472
473 #[test]
474 fn writing_once() {
475 let mut c = PipeState::<3>::new();
476 assert!(c.try_write(&[1]).is_ok());
477 assert_eq!(capacity(&c), 2);
478 }
479
480 #[test]
481 fn writing_when_full() {
482 let mut c = PipeState::<3>::new();
483 assert_eq!(c.try_write(&[42]), Ok(1));
484 assert_eq!(c.try_write(&[43]), Ok(1));
485 assert_eq!(c.try_write(&[44]), Ok(1));
486 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
487 assert_eq!(capacity(&c), 0);
488 }
489
490 #[test]
491 fn receiving_once_with_one_send() {
492 let mut c = PipeState::<3>::new();
493 assert!(c.try_write(&[42]).is_ok());
494 let mut buf = [0; 16];
495 assert_eq!(c.try_read(&mut buf), Ok(1));
496 assert_eq!(buf[0], 42);
497 assert_eq!(capacity(&c), 3);
498 }
499
500 #[test]
501 fn receiving_when_empty() {
502 let mut c = PipeState::<3>::new();
503 let mut buf = [0; 16];
504 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
505 assert_eq!(capacity(&c), 3);
506 }
507
508 #[test]
509 fn simple_send_and_receive() {
510 let c = Pipe::<NoopRawMutex, 3>::new();
511 assert!(c.try_write(&[42]).is_ok());
512 let mut buf = [0; 16];
513 assert_eq!(c.try_read(&mut buf), Ok(1));
514 assert_eq!(buf[0], 42);
515 }
516
517 #[test]
518 fn cloning() {
519 let c = Pipe::<NoopRawMutex, 3>::new();
520 let r1 = c.reader();
521 let w1 = c.writer();
522
523 let _ = r1.clone();
524 let _ = w1.clone();
525 }
526
527 #[futures_test::test]
528 async fn receiver_receives_given_try_write_async() {
529 let executor = ThreadPool::new().unwrap();
530
531 static CHANNEL: Forever<Pipe<CriticalSectionRawMutex, 3>> = Forever::new();
532 let c = &*CHANNEL.put(Pipe::new());
533 let c2 = c;
534 let f = async move {
535 assert_eq!(c2.try_write(&[42]), Ok(1));
536 };
537 executor.spawn(f).unwrap();
538 let mut buf = [0; 16];
539 assert_eq!(c.read(&mut buf).await, 1);
540 assert_eq!(buf[0], 42);
541 }
542
543 #[futures_test::test]
544 async fn sender_send_completes_if_capacity() {
545 let c = Pipe::<CriticalSectionRawMutex, 1>::new();
546 c.write(&[42]).await;
547 let mut buf = [0; 16];
548 assert_eq!(c.read(&mut buf).await, 1);
549 assert_eq!(buf[0], 42);
550 }
551}
diff --git a/embassy-util/src/ring_buffer.rs b/embassy-util/src/ring_buffer.rs
new file mode 100644
index 000000000..521084024
--- /dev/null
+++ b/embassy-util/src/ring_buffer.rs
@@ -0,0 +1,146 @@
1pub struct RingBuffer<const N: usize> {
2 buf: [u8; N],
3 start: usize,
4 end: usize,
5 empty: bool,
6}
7
8impl<const N: usize> RingBuffer<N> {
9 pub const fn new() -> Self {
10 Self {
11 buf: [0; N],
12 start: 0,
13 end: 0,
14 empty: true,
15 }
16 }
17
18 pub fn push_buf(&mut self) -> &mut [u8] {
19 if self.start == self.end && !self.empty {
20 trace!(" ringbuf: push_buf empty");
21 return &mut self.buf[..0];
22 }
23
24 let n = if self.start <= self.end {
25 self.buf.len() - self.end
26 } else {
27 self.start - self.end
28 };
29
30 trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
31 &mut self.buf[self.end..self.end + n]
32 }
33
34 pub fn push(&mut self, n: usize) {
35 trace!(" ringbuf: push {:?}", n);
36 if n == 0 {
37 return;
38 }
39
40 self.end = self.wrap(self.end + n);
41 self.empty = false;
42 }
43
44 pub fn pop_buf(&mut self) -> &mut [u8] {
45 if self.empty {
46 trace!(" ringbuf: pop_buf empty");
47 return &mut self.buf[..0];
48 }
49
50 let n = if self.end <= self.start {
51 self.buf.len() - self.start
52 } else {
53 self.end - self.start
54 };
55
56 trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
57 &mut self.buf[self.start..self.start + n]
58 }
59
60 pub fn pop(&mut self, n: usize) {
61 trace!(" ringbuf: pop {:?}", n);
62 if n == 0 {
63 return;
64 }
65
66 self.start = self.wrap(self.start + n);
67 self.empty = self.start == self.end;
68 }
69
70 pub fn is_full(&self) -> bool {
71 self.start == self.end && !self.empty
72 }
73
74 pub fn is_empty(&self) -> bool {
75 self.empty
76 }
77
78 #[allow(unused)]
79 pub fn len(&self) -> usize {
80 if self.empty {
81 0
82 } else if self.start < self.end {
83 self.end - self.start
84 } else {
85 N + self.end - self.start
86 }
87 }
88
89 pub fn clear(&mut self) {
90 self.start = 0;
91 self.end = 0;
92 self.empty = true;
93 }
94
95 fn wrap(&self, n: usize) -> usize {
96 assert!(n <= self.buf.len());
97 if n == self.buf.len() {
98 0
99 } else {
100 n
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[test]
110 fn push_pop() {
111 let mut rb: RingBuffer<4> = RingBuffer::new();
112 let buf = rb.push_buf();
113 assert_eq!(4, buf.len());
114 buf[0] = 1;
115 buf[1] = 2;
116 buf[2] = 3;
117 buf[3] = 4;
118 rb.push(4);
119
120 let buf = rb.pop_buf();
121 assert_eq!(4, buf.len());
122 assert_eq!(1, buf[0]);
123 rb.pop(1);
124
125 let buf = rb.pop_buf();
126 assert_eq!(3, buf.len());
127 assert_eq!(2, buf[0]);
128 rb.pop(1);
129
130 let buf = rb.pop_buf();
131 assert_eq!(2, buf.len());
132 assert_eq!(3, buf[0]);
133 rb.pop(1);
134
135 let buf = rb.pop_buf();
136 assert_eq!(1, buf.len());
137 assert_eq!(4, buf[0]);
138 rb.pop(1);
139
140 let buf = rb.pop_buf();
141 assert_eq!(0, buf.len());
142
143 let buf = rb.push_buf();
144 assert_eq!(4, buf.len());
145 }
146}