From 036e6ae30c9e772ef8ef20439f121e108b9106f0 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Fri, 28 Jul 2023 13:23:22 +0200 Subject: Rename embassy-hal-common to embassy-hal-internal, document it's for internal use only. (#1700) --- embassy-hal-internal/src/atomic_ring_buffer.rs | 556 +++++++++++++++++++++++++ 1 file changed, 556 insertions(+) create mode 100644 embassy-hal-internal/src/atomic_ring_buffer.rs (limited to 'embassy-hal-internal/src/atomic_ring_buffer.rs') diff --git a/embassy-hal-internal/src/atomic_ring_buffer.rs b/embassy-hal-internal/src/atomic_ring_buffer.rs new file mode 100644 index 000000000..ea84925c4 --- /dev/null +++ b/embassy-hal-internal/src/atomic_ring_buffer.rs @@ -0,0 +1,556 @@ +use core::slice; +use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +/// Atomic reusable ringbuffer +/// +/// This ringbuffer implementation is designed to be stored in a `static`, +/// therefore all methods take `&self` and not `&mut self`. +/// +/// It is "reusable": when created it has no backing buffer, you can give it +/// one with `init` and take it back with `deinit`, and init it again in the +/// future if needed. This is very non-idiomatic, but helps a lot when storing +/// it in a `static`. +/// +/// One concurrent writer and one concurrent reader are supported, even at +/// different execution priorities (like main and irq). +pub struct RingBuffer { + pub buf: AtomicPtr, + pub len: AtomicUsize, + + // start and end wrap at len*2, not at len. + // This allows distinguishing "full" and "empty". + // full is when start+len == end (modulo len*2) + // empty is when start == end + // + // This avoids having to consider the ringbuffer "full" at len-1 instead of len. + // The usual solution is adding a "full" flag, but that can't be made atomic + pub start: AtomicUsize, + pub end: AtomicUsize, +} + +pub struct Reader<'a>(&'a RingBuffer); +pub struct Writer<'a>(&'a RingBuffer); + +impl RingBuffer { + /// Create a new empty ringbuffer. + pub const fn new() -> Self { + Self { + buf: AtomicPtr::new(core::ptr::null_mut()), + len: AtomicUsize::new(0), + start: AtomicUsize::new(0), + end: AtomicUsize::new(0), + } + } + + /// Initialize the ring buffer with a buffer. + /// + /// # Safety + /// - The buffer (`buf .. buf+len`) must be valid memory until `deinit` is called. + /// - Must not be called concurrently with any other methods. + pub unsafe fn init(&self, buf: *mut u8, len: usize) { + // Ordering: it's OK to use `Relaxed` because this is not called + // concurrently with other methods. + self.buf.store(buf, Ordering::Relaxed); + self.len.store(len, Ordering::Relaxed); + self.start.store(0, Ordering::Relaxed); + self.end.store(0, Ordering::Relaxed); + } + + /// Deinitialize the ringbuffer. + /// + /// After calling this, the ringbuffer becomes empty, as if it was + /// just created with `new()`. + /// + /// # Safety + /// - Must not be called concurrently with any other methods. + pub unsafe fn deinit(&self) { + // Ordering: it's OK to use `Relaxed` because this is not called + // concurrently with other methods. + self.len.store(0, Ordering::Relaxed); + self.start.store(0, Ordering::Relaxed); + self.end.store(0, Ordering::Relaxed); + } + + /// Create a reader. + /// + /// # Safety + /// + /// Only one reader can exist at a time. + pub unsafe fn reader(&self) -> Reader<'_> { + Reader(self) + } + + /// Create a writer. + /// + /// # Safety + /// + /// Only one writer can exist at a time. + pub unsafe fn writer(&self) -> Writer<'_> { + Writer(self) + } + + pub fn len(&self) -> usize { + self.len.load(Ordering::Relaxed) + } + + pub fn is_full(&self) -> bool { + let len = self.len.load(Ordering::Relaxed); + let start = self.start.load(Ordering::Relaxed); + let end = self.end.load(Ordering::Relaxed); + + self.wrap(start + len) == end + } + + pub fn is_empty(&self) -> bool { + let start = self.start.load(Ordering::Relaxed); + let end = self.end.load(Ordering::Relaxed); + + start == end + } + + fn wrap(&self, mut n: usize) -> usize { + let len = self.len.load(Ordering::Relaxed); + + if n >= len * 2 { + n -= len * 2 + } + n + } +} + +impl<'a> Writer<'a> { + /// Push data into the buffer in-place. + /// + /// The closure `f` is called with a free part of the buffer, it must write + /// some data to it and return the amount of bytes written. + pub fn push(&mut self, f: impl FnOnce(&mut [u8]) -> usize) -> usize { + let (p, n) = self.push_buf(); + let buf = unsafe { slice::from_raw_parts_mut(p, n) }; + let n = f(buf); + self.push_done(n); + n + } + + /// Push one data byte. + /// + /// Returns true if pushed successfully. + pub fn push_one(&mut self, val: u8) -> bool { + let n = self.push(|f| match f { + [] => 0, + [x, ..] => { + *x = val; + 1 + } + }); + n != 0 + } + + /// Get a buffer where data can be pushed to. + /// + /// Equivalent to [`Self::push_buf`] but returns a slice. + pub fn push_slice(&mut self) -> &mut [u8] { + let (data, len) = self.push_buf(); + unsafe { slice::from_raw_parts_mut(data, len) } + } + + /// Get up to two buffers where data can be pushed to. + /// + /// Equivalent to [`Self::push_bufs`] but returns slices. + pub fn push_slices(&mut self) -> [&mut [u8]; 2] { + let [(d0, l0), (d1, l1)] = self.push_bufs(); + unsafe { [slice::from_raw_parts_mut(d0, l0), slice::from_raw_parts_mut(d1, l1)] } + } + + /// Get a buffer where data can be pushed to. + /// + /// Write data to the start of the buffer, then call `push_done` with + /// however many bytes you've pushed. + /// + /// The buffer is suitable to DMA to. + /// + /// If the ringbuf is full, size=0 will be returned. + /// + /// The buffer stays valid as long as no other `Writer` method is called + /// and `init`/`deinit` aren't called on the ringbuf. + pub fn push_buf(&mut self) -> (*mut u8, usize) { + // Ordering: popping writes `start` last, so we read `start` first. + // Read it with Acquire ordering, so that the next accesses can't be reordered up past it. + let mut start = self.0.start.load(Ordering::Acquire); + let buf = self.0.buf.load(Ordering::Relaxed); + let len = self.0.len.load(Ordering::Relaxed); + let mut end = self.0.end.load(Ordering::Relaxed); + + let empty = start == end; + + if start >= len { + start -= len + } + if end >= len { + end -= len + } + + if start == end && !empty { + // full + return (buf, 0); + } + let n = if start > end { start - end } else { len - end }; + + trace!(" ringbuf: push_buf {:?}..{:?}", end, end + n); + (unsafe { buf.add(end) }, n) + } + + /// Get up to two buffers where data can be pushed to. + /// + /// Write data starting at the beginning of the first buffer, then call + /// `push_done` with however many bytes you've pushed. + /// + /// The buffers are suitable to DMA to. + /// + /// If the ringbuf is full, both buffers will be zero length. + /// If there is only area available, the second buffer will be zero length. + /// + /// The buffer stays valid as long as no other `Writer` method is called + /// and `init`/`deinit` aren't called on the ringbuf. + pub fn push_bufs(&mut self) -> [(*mut u8, usize); 2] { + // Ordering: as per push_buf() + let mut start = self.0.start.load(Ordering::Acquire); + let buf = self.0.buf.load(Ordering::Relaxed); + let len = self.0.len.load(Ordering::Relaxed); + let mut end = self.0.end.load(Ordering::Relaxed); + + let empty = start == end; + + if start >= len { + start -= len + } + if end >= len { + end -= len + } + + if start == end && !empty { + // full + return [(buf, 0), (buf, 0)]; + } + let n0 = if start > end { start - end } else { len - end }; + let n1 = if start <= end { start } else { 0 }; + + trace!(" ringbuf: push_bufs [{:?}..{:?}, {:?}..{:?}]", end, end + n0, 0, n1); + [(unsafe { buf.add(end) }, n0), (buf, n1)] + } + + pub fn push_done(&mut self, n: usize) { + trace!(" ringbuf: push {:?}", n); + let end = self.0.end.load(Ordering::Relaxed); + + // Ordering: write `end` last, with Release ordering. + // The ordering ensures no preceding memory accesses (such as writing + // the actual data in the buffer) can be reordered down past it, which + // will guarantee the reader sees them after reading from `end`. + self.0.end.store(self.0.wrap(end + n), Ordering::Release); + } +} + +impl<'a> Reader<'a> { + /// Pop data from the buffer in-place. + /// + /// The closure `f` is called with the next data, it must process + /// some data from it and return the amount of bytes processed. + pub fn pop(&mut self, f: impl FnOnce(&[u8]) -> usize) -> usize { + let (p, n) = self.pop_buf(); + let buf = unsafe { slice::from_raw_parts(p, n) }; + let n = f(buf); + self.pop_done(n); + n + } + + /// Pop one data byte. + /// + /// Returns true if popped successfully. + pub fn pop_one(&mut self) -> Option { + let mut res = None; + self.pop(|f| match f { + &[] => 0, + &[x, ..] => { + res = Some(x); + 1 + } + }); + res + } + + /// Get a buffer where data can be popped from. + /// + /// Equivalent to [`Self::pop_buf`] but returns a slice. + pub fn pop_slice(&mut self) -> &mut [u8] { + let (data, len) = self.pop_buf(); + unsafe { slice::from_raw_parts_mut(data, len) } + } + + /// Get a buffer where data can be popped from. + /// + /// Read data from the start of the buffer, then call `pop_done` with + /// however many bytes you've processed. + /// + /// The buffer is suitable to DMA from. + /// + /// If the ringbuf is empty, size=0 will be returned. + /// + /// The buffer stays valid as long as no other `Reader` method is called + /// and `init`/`deinit` aren't called on the ringbuf. + pub fn pop_buf(&mut self) -> (*mut u8, usize) { + // Ordering: pushing writes `end` last, so we read `end` first. + // Read it with Acquire ordering, so that the next accesses can't be reordered up past it. + // This is needed to guarantee we "see" the data written by the writer. + let mut end = self.0.end.load(Ordering::Acquire); + let buf = self.0.buf.load(Ordering::Relaxed); + let len = self.0.len.load(Ordering::Relaxed); + let mut start = self.0.start.load(Ordering::Relaxed); + + if start == end { + return (buf, 0); + } + + if start >= len { + start -= len + } + if end >= len { + end -= len + } + + let n = if end > start { end - start } else { len - start }; + + trace!(" ringbuf: pop_buf {:?}..{:?}", start, start + n); + (unsafe { buf.add(start) }, n) + } + + pub fn pop_done(&mut self, n: usize) { + trace!(" ringbuf: pop {:?}", n); + + let start = self.0.start.load(Ordering::Relaxed); + + // Ordering: write `start` last, with Release ordering. + // The ordering ensures no preceding memory accesses (such as reading + // the actual data) can be reordered down past it. This is necessary + // because writing to `start` is effectively freeing the read part of the + // buffer, which "gives permission" to the writer to write to it again. + // Therefore, all buffer accesses must be completed before this. + self.0.start.store(self.0.wrap(start + n), Ordering::Release); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_pop() { + let mut b = [0; 4]; + let rb = RingBuffer::new(); + unsafe { + rb.init(b.as_mut_ptr(), 4); + + assert_eq!(rb.is_empty(), true); + assert_eq!(rb.is_full(), false); + + rb.writer().push(|buf| { + assert_eq!(4, buf.len()); + buf[0] = 1; + buf[1] = 2; + buf[2] = 3; + buf[3] = 4; + 4 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), true); + + rb.writer().push(|buf| { + // If it's full, we can push 0 bytes. + assert_eq!(0, buf.len()); + 0 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), true); + + rb.reader().pop(|buf| { + assert_eq!(4, buf.len()); + assert_eq!(1, buf[0]); + 1 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), false); + + rb.reader().pop(|buf| { + assert_eq!(3, buf.len()); + 0 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), false); + + rb.reader().pop(|buf| { + assert_eq!(3, buf.len()); + assert_eq!(2, buf[0]); + assert_eq!(3, buf[1]); + 2 + }); + rb.reader().pop(|buf| { + assert_eq!(1, buf.len()); + assert_eq!(4, buf[0]); + 1 + }); + + assert_eq!(rb.is_empty(), true); + assert_eq!(rb.is_full(), false); + + rb.reader().pop(|buf| { + assert_eq!(0, buf.len()); + 0 + }); + + rb.writer().push(|buf| { + assert_eq!(4, buf.len()); + buf[0] = 10; + 1 + }); + + rb.writer().push(|buf| { + assert_eq!(3, buf.len()); + buf[0] = 11; + buf[1] = 12; + 2 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), false); + + rb.writer().push(|buf| { + assert_eq!(1, buf.len()); + buf[0] = 13; + 1 + }); + + assert_eq!(rb.is_empty(), false); + assert_eq!(rb.is_full(), true); + } + } + + #[test] + fn zero_len() { + let rb = RingBuffer::new(); + unsafe { + assert_eq!(rb.is_empty(), true); + assert_eq!(rb.is_full(), true); + + rb.writer().push(|buf| { + assert_eq!(0, buf.len()); + 0 + }); + + rb.reader().pop(|buf| { + assert_eq!(0, buf.len()); + 0 + }); + } + } + + #[test] + fn push_slices() { + let mut b = [0; 4]; + let rb = RingBuffer::new(); + unsafe { + rb.init(b.as_mut_ptr(), 4); + + /* push 3 -> [1 2 3 x] */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(4, ps[0].len()); + assert_eq!(0, ps[1].len()); + ps[0][0] = 1; + ps[0][1] = 2; + ps[0][2] = 3; + w.push_done(3); + drop(w); + + /* pop 2 -> [x x 3 x] */ + rb.reader().pop(|buf| { + assert_eq!(3, buf.len()); + assert_eq!(1, buf[0]); + assert_eq!(2, buf[1]); + assert_eq!(3, buf[2]); + 2 + }); + + /* push 3 -> [5 6 3 4] */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(1, ps[0].len()); + assert_eq!(2, ps[1].len()); + ps[0][0] = 4; + ps[1][0] = 5; + ps[1][1] = 6; + w.push_done(3); + drop(w); + + /* buf is now full */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(0, ps[0].len()); + assert_eq!(0, ps[1].len()); + + /* pop 2 -> [5 6 x x] */ + rb.reader().pop(|buf| { + assert_eq!(2, buf.len()); + assert_eq!(3, buf[0]); + assert_eq!(4, buf[1]); + 2 + }); + + /* should now have one push slice again */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(2, ps[0].len()); + assert_eq!(0, ps[1].len()); + drop(w); + + /* pop 2 -> [x x x x] */ + rb.reader().pop(|buf| { + assert_eq!(2, buf.len()); + assert_eq!(5, buf[0]); + assert_eq!(6, buf[1]); + 2 + }); + + /* should now have two push slices */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(2, ps[0].len()); + assert_eq!(2, ps[1].len()); + drop(w); + + /* make sure we exercise all wrap around cases properly */ + for _ in 0..10 { + /* should be empty, push 1 */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(4, ps[0].len() + ps[1].len()); + w.push_done(1); + drop(w); + + /* should have 1 element */ + let mut w = rb.writer(); + let ps = w.push_slices(); + assert_eq!(3, ps[0].len() + ps[1].len()); + drop(w); + + /* pop 1 */ + rb.reader().pop(|buf| { + assert_eq!(1, buf.len()); + 1 + }); + } + } + } +} -- cgit