From 6c165f8dc03455863bc123101d4f4a0dabdfcfbf Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 28 Aug 2023 01:53:15 +0200 Subject: sync/pipe: impl BufRead. --- embassy-sync/src/pipe.rs | 306 +++++++++++++++++++++++++++------------- embassy-sync/src/ring_buffer.rs | 46 +++--- 2 files changed, 227 insertions(+), 125 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 21d451ea6..ec0cbbf2a 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -1,7 +1,8 @@ //! Async byte stream pipe. -use core::cell::RefCell; +use core::cell::{RefCell, UnsafeCell}; use core::future::Future; +use core::ops::Range; use core::pin::Pin; use core::task::{Context, Poll}; @@ -82,17 +83,6 @@ where pipe: &'p Pipe, } -impl<'p, M, const N: usize> Clone for Reader<'p, M, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Reader { pipe: self.pipe } - } -} - -impl<'p, M, const N: usize> Copy for Reader<'p, M, N> where M: RawMutex {} - impl<'p, M, const N: usize> Reader<'p, M, N> where M: RawMutex, @@ -110,6 +100,29 @@ where pub fn try_read(&self, buf: &mut [u8]) -> Result { self.pipe.try_read(buf) } + + /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty. + /// + /// If no bytes are currently available to read, this function waits until at least one byte is available. + /// + /// If the reader is at end-of-file (EOF), an empty slice is returned. + pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> { + FillBufFuture { pipe: Some(self.pipe) } + } + + /// Try returning contents of the internal buffer. + /// + /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`. + /// + /// If the reader is at end-of-file (EOF), an empty slice is returned. + pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> { + unsafe { self.pipe.try_fill_buf_with_context(None) } + } + + /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`. + pub fn consume(&mut self, amt: usize) { + self.pipe.consume(amt) + } } /// Future returned by [`Pipe::read`] and [`Reader::read`]. @@ -138,6 +151,35 @@ where impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} +/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct FillBufFuture<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: Option<&'p Pipe>, +} + +impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N> +where + M: RawMutex, +{ + type Output = &'p [u8]; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pipe = self.pipe.take().unwrap(); + match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } { + Ok(buf) => Poll::Ready(buf), + Err(TryReadError::Empty) => { + self.pipe = Some(pipe); + Poll::Pending + } + } + } +} + +impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {} + /// Error returned by [`try_read`](Pipe::try_read). #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -162,67 +204,24 @@ struct PipeState { write_waker: WakerRegistration, } -impl PipeState { - const fn new() -> Self { - PipeState { - buffer: RingBuffer::new(), - read_waker: WakerRegistration::new(), - write_waker: WakerRegistration::new(), - } - } - - fn clear(&mut self) { - self.buffer.clear(); - self.write_waker.wake(); - } - - fn try_read(&mut self, buf: &mut [u8]) -> Result { - self.try_read_with_context(None, buf) - } - - fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - if self.buffer.is_full() { - self.write_waker.wake(); - } - - let available = self.buffer.pop_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.read_waker.register(cx.waker()); - } - return Err(TryReadError::Empty); - } - - let n = available.len().min(buf.len()); - buf[..n].copy_from_slice(&available[..n]); - self.buffer.pop(n); - Ok(n) - } +#[repr(transparent)] +struct Buffer(UnsafeCell<[u8; N]>); - fn try_write(&mut self, buf: &[u8]) -> Result { - self.try_write_with_context(None, buf) +impl Buffer { + unsafe fn get<'a>(&self, r: Range) -> &'a [u8] { + let p = self.0.get() as *const u8; + core::slice::from_raw_parts(p.add(r.start), r.end - r.start) } - fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - if self.buffer.is_empty() { - self.read_waker.wake(); - } - - let available = self.buffer.push_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.write_waker.register(cx.waker()); - } - return Err(TryWriteError::Full); - } - - let n = available.len().min(buf.len()); - available[..n].copy_from_slice(&buf[..n]); - self.buffer.push(n); - Ok(n) + unsafe fn get_mut<'a>(&self, r: Range) -> &'a mut [u8] { + let p = self.0.get() as *mut u8; + core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start) } } +unsafe impl Send for Buffer {} +unsafe impl Sync for Buffer {} + /// A bounded byte-oriented pipe for communicating between asynchronous tasks /// with backpressure. /// @@ -234,6 +233,7 @@ pub struct Pipe where M: RawMutex, { + buf: Buffer, inner: Mutex>>, } @@ -252,7 +252,12 @@ where /// ``` pub const fn new() -> Self { Self { - inner: Mutex::new(RefCell::new(PipeState::new())), + buf: Buffer(UnsafeCell::new([0; N])), + inner: Mutex::new(RefCell::new(PipeState { + buffer: RingBuffer::new(), + read_waker: WakerRegistration::new(), + write_waker: WakerRegistration::new(), + })), } } @@ -261,21 +266,91 @@ where } fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read_with_context(cx, buf)) + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + if s.buffer.is_full() { + s.write_waker.wake(); + } + + let available = unsafe { self.buf.get(s.buffer.pop_buf()) }; + if available.is_empty() { + if let Some(cx) = cx { + s.read_waker.register(cx.waker()); + } + return Err(TryReadError::Empty); + } + + let n = available.len().min(buf.len()); + buf[..n].copy_from_slice(&available[..n]); + s.buffer.pop(n); + Ok(n) + }) } - fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - self.lock(|c| c.try_write_with_context(cx, buf)) + // safety: While the returned slice is alive, + // no `read` or `consume` methods in the pipe must be called. + unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> { + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + if s.buffer.is_full() { + s.write_waker.wake(); + } + + let available = unsafe { self.buf.get(s.buffer.pop_buf()) }; + if available.is_empty() { + if let Some(cx) = cx { + s.read_waker.register(cx.waker()); + } + return Err(TryReadError::Empty); + } + + Ok(available) + }) } - /// Get a writer for this pipe. - pub fn writer(&self) -> Writer<'_, M, N> { - Writer { pipe: self } + fn consume(&self, amt: usize) { + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + let available = s.buffer.pop_buf(); + assert!(amt <= available.len()); + s.buffer.pop(amt); + }) } - /// Get a reader for this pipe. - pub fn reader(&self) -> Reader<'_, M, N> { - Reader { pipe: self } + fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + if s.buffer.is_empty() { + s.read_waker.wake(); + } + + let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) }; + if available.is_empty() { + if let Some(cx) = cx { + s.write_waker.register(cx.waker()); + } + return Err(TryWriteError::Full); + } + + let n = available.len().min(buf.len()); + available[..n].copy_from_slice(&buf[..n]); + s.buffer.push(n); + Ok(n) + }) + } + + /// Split this pipe into a BufRead-capable reader and a writer. + /// + /// The reader and writer borrow the current pipe mutably, so it is not + /// possible to use it directly while they exist. This is needed because + /// implementing `BufRead` requires there is a single reader. + /// + /// The writer is cloneable, the reader is not. + pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) { + (Reader { pipe: self }, Writer { pipe: self }) } /// Write some bytes to the pipe. @@ -312,7 +387,7 @@ where /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant /// that waits instead of returning an error. pub fn try_write(&self, buf: &[u8]) -> Result { - self.lock(|c| c.try_write(buf)) + self.try_write_with_context(None, buf) } /// Read some bytes from the pipe. @@ -339,12 +414,17 @@ where /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant /// that waits instead of returning an error. pub fn try_read(&self, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read(buf)) + self.try_read_with_context(None, buf) } /// Clear the data in the pipe's buffer. pub fn clear(&self) { - self.lock(|c| c.clear()) + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + s.buffer.clear(); + s.write_waker.wake(); + }) } /// Return whether the pipe is full (no free space in the buffer) @@ -433,6 +513,16 @@ mod io_impls { } } + impl embedded_io_async::BufRead for Reader<'_, M, N> { + async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { + Ok(Reader::fill_buf(self).await) + } + + fn consume(&mut self, amt: usize) { + Reader::consume(self, amt) + } + } + impl embedded_io_async::ErrorType for Writer<'_, M, N> { type Error = Infallible; } @@ -457,43 +547,39 @@ mod tests { use super::*; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - fn capacity(c: &PipeState) -> usize { - N - c.buffer.len() - } - #[test] fn writing_once() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); assert!(c.try_write(&[1]).is_ok()); - assert_eq!(capacity(&c), 2); + assert_eq!(c.free_capacity(), 2); } #[test] fn writing_when_full() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); assert_eq!(c.try_write(&[42]), Ok(1)); assert_eq!(c.try_write(&[43]), Ok(1)); assert_eq!(c.try_write(&[44]), Ok(1)); assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); - assert_eq!(capacity(&c), 0); + assert_eq!(c.free_capacity(), 0); } #[test] fn receiving_once_with_one_send() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); assert!(c.try_write(&[42]).is_ok()); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Ok(1)); assert_eq!(buf[0], 42); - assert_eq!(capacity(&c), 3); + assert_eq!(c.free_capacity(), 3); } #[test] fn receiving_when_empty() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); - assert_eq!(capacity(&c), 3); + assert_eq!(c.free_capacity(), 3); } #[test] @@ -506,13 +592,37 @@ mod tests { } #[test] - fn cloning() { - let c = Pipe::::new(); - let r1 = c.reader(); - let w1 = c.writer(); + fn read_buf() { + let mut c = Pipe::::new(); + let (mut r, w) = c.split(); + assert!(w.try_write(&[42, 43]).is_ok()); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[42, 43]); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[42, 43]); + r.consume(1); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[43]); + r.consume(1); + assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty)); + assert_eq!(w.try_write(&[44, 45, 46]), Ok(1)); + assert_eq!(w.try_write(&[45, 46]), Ok(2)); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[44]); // only one byte due to wraparound. + r.consume(1); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[45, 46]); + assert!(w.try_write(&[47]).is_ok()); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[45, 46, 47]); + r.consume(3); + } - let _ = r1.clone(); - let _ = w1.clone(); + #[test] + fn writer_is_cloneable() { + let mut c = Pipe::::new(); + let (_r, w) = c.split(); + let _ = w.clone(); } #[futures_test::test] diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs index 521084024..d95ffa7c9 100644 --- a/embassy-sync/src/ring_buffer.rs +++ b/embassy-sync/src/ring_buffer.rs @@ -1,5 +1,6 @@ +use core::ops::Range; + pub struct RingBuffer { - buf: [u8; N], start: usize, end: usize, empty: bool, @@ -8,27 +9,26 @@ pub struct RingBuffer { impl RingBuffer { pub const fn new() -> Self { Self { - buf: [0; N], start: 0, end: 0, empty: true, } } - pub fn push_buf(&mut self) -> &mut [u8] { + pub fn push_buf(&mut self) -> Range { if self.start == self.end && !self.empty { trace!(" ringbuf: push_buf empty"); - return &mut self.buf[..0]; + return 0..0; } let n = if self.start <= self.end { - self.buf.len() - self.end + N - self.end } else { self.start - self.end }; trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); - &mut self.buf[self.end..self.end + n] + self.end..self.end + n } pub fn push(&mut self, n: usize) { @@ -41,20 +41,20 @@ impl RingBuffer { self.empty = false; } - pub fn pop_buf(&mut self) -> &mut [u8] { + pub fn pop_buf(&mut self) -> Range { if self.empty { trace!(" ringbuf: pop_buf empty"); - return &mut self.buf[..0]; + return 0..0; } let n = if self.end <= self.start { - self.buf.len() - self.start + N - self.start } else { self.end - self.start }; trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); - &mut self.buf[self.start..self.start + n] + self.start..self.start + n } pub fn pop(&mut self, n: usize) { @@ -93,8 +93,8 @@ impl RingBuffer { } fn wrap(&self, n: usize) -> usize { - assert!(n <= self.buf.len()); - if n == self.buf.len() { + assert!(n <= N); + if n == N { 0 } else { n @@ -110,37 +110,29 @@ mod tests { fn push_pop() { let mut rb: RingBuffer<4> = RingBuffer::new(); let buf = rb.push_buf(); - assert_eq!(4, buf.len()); - buf[0] = 1; - buf[1] = 2; - buf[2] = 3; - buf[3] = 4; + assert_eq!(0..4, buf); rb.push(4); let buf = rb.pop_buf(); - assert_eq!(4, buf.len()); - assert_eq!(1, buf[0]); + assert_eq!(0..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(3, buf.len()); - assert_eq!(2, buf[0]); + assert_eq!(1..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(2, buf.len()); - assert_eq!(3, buf[0]); + assert_eq!(2..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(1, buf.len()); - assert_eq!(4, buf[0]); + assert_eq!(3..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(0, buf.len()); + assert_eq!(0..0, buf); let buf = rb.push_buf(); - assert_eq!(4, buf.len()); + assert_eq!(0..4, buf); } } -- cgit From 5e613d9abbb945e7fc7d4c895d645bfad6a3d2c8 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 30 Aug 2023 01:37:18 +0200 Subject: Sync all fmt.rs files. --- embassy-sync/src/fmt.rs | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 066970813..78e583c1c 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs @@ -1,6 +1,8 @@ #![macro_use] #![allow(unused_macros)] +use core::fmt::{Debug, Display, LowerHex}; + #[cfg(all(feature = "defmt", feature = "log"))] compile_error!("You may not enable both `defmt` and `log` features."); @@ -81,14 +83,17 @@ macro_rules! todo { }; } +#[cfg(not(feature = "defmt"))] macro_rules! unreachable { ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::unreachable!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::unreachable!($($x)*); - } + ::core::unreachable!($($x)*) + }; +} + +#[cfg(feature = "defmt")] +macro_rules! unreachable { + ($($x:tt)*) => { + ::defmt::unreachable!($($x)*) }; } @@ -223,3 +228,31 @@ impl Try for Result { self } } + +#[allow(unused)] +pub(crate) struct Bytes<'a>(pub &'a [u8]); + +impl<'a> Debug for Bytes<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:#02x?}", self.0) + } +} + +impl<'a> Display for Bytes<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:#02x?}", self.0) + } +} + +impl<'a> LowerHex for Bytes<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:#02x?}", self.0) + } +} + +#[cfg(feature = "defmt")] +impl<'a> defmt::Format for Bytes<'a> { + fn format(&self, fmt: defmt::Formatter) { + defmt::write!(fmt, "{:02x}", self.0) + } +} -- cgit From 0c66636d003979c3aecff36c9e03e87f34147a94 Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Sat, 2 Sep 2023 07:44:10 +0200 Subject: Use fmt::unwrap --- embassy-sync/src/channel.rs | 2 +- embassy-sync/src/mutex.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 62ea1307d..a512e0c41 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -471,7 +471,7 @@ where } fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) + self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut()))) } fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index fcf056d36..72459d660 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs @@ -149,7 +149,7 @@ where { fn drop(&mut self) { self.mutex.state.lock(|s| { - let mut s = s.borrow_mut(); + let mut s = unwrap!(s.try_borrow_mut()); s.locked = false; s.waker.wake(); }) -- cgit From a2656f402b1c59461cec5f5dc685b2692119b996 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 12:04:30 +0200 Subject: Move embassy-net-driver-channel::zerocopy_channel to embassy_sync::zero_copy_channel --- embassy-sync/src/lib.rs | 1 + embassy-sync/src/zero_copy_channel.rs | 209 ++++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 embassy-sync/src/zero_copy_channel.rs (limited to 'embassy-sync') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 53d95d081..48a7b13f6 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -17,3 +17,4 @@ pub mod pipe; pub mod pubsub; pub mod signal; pub mod waitqueue; +pub mod zero_copy_channel; diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs new file mode 100644 index 000000000..3701ccf1a --- /dev/null +++ b/embassy-sync/src/zero_copy_channel.rs @@ -0,0 +1,209 @@ +use core::cell::RefCell; +use core::future::poll_fn; +use core::marker::PhantomData; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +pub struct Channel<'a, M: RawMutex, T> { + buf: *mut T, + phantom: PhantomData<&'a mut T>, + state: Mutex>, +} + +impl<'a, M: RawMutex, T> Channel<'a, M, T> { + pub fn new(buf: &'a mut [T]) -> Self { + let len = buf.len(); + assert!(len != 0); + + Self { + buf: buf.as_mut_ptr(), + phantom: PhantomData, + state: Mutex::new(RefCell::new(State { + len, + front: 0, + back: 0, + full: false, + send_waker: WakerRegistration::new(), + recv_waker: WakerRegistration::new(), + })), + } + } + + pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { + (Sender { channel: self }, Receiver { channel: self }) + } +} + +pub struct Sender<'a, M: RawMutex, T> { + channel: &'a Channel<'a, M, T>, +} + +impl<'a, M: RawMutex, T> Sender<'a, M, T> { + pub fn borrow(&mut self) -> Sender<'_, M, T> { + Sender { channel: self.channel } + } + + pub fn try_send(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), + None => None, + } + }) + } + + pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), + None => { + s.recv_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + pub async fn send(&mut self) -> &mut T { + let i = poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(i), + None => { + s.recv_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + .await; + unsafe { &mut *self.channel.buf.add(i) } + } + + pub fn send_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().push_done()) + } +} +pub struct Receiver<'a, M: RawMutex, T> { + channel: &'a Channel<'a, M, T>, +} + +impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + pub fn borrow(&mut self) -> Receiver<'_, M, T> { + Receiver { channel: self.channel } + } + + pub fn try_recv(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), + None => None, + } + }) + } + + pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + pub async fn recv(&mut self) -> &mut T { + let i = poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(i), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + .await; + unsafe { &mut *self.channel.buf.add(i) } + } + + pub fn recv_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().pop_done()) + } +} + +struct State { + len: usize, + + /// Front index. Always 0..=(N-1) + front: usize, + /// Back index. Always 0..=(N-1). + back: usize, + + /// Used to distinguish "empty" and "full" cases when `front == back`. + /// May only be `true` if `front == back`, always `false` otherwise. + full: bool, + + send_waker: WakerRegistration, + recv_waker: WakerRegistration, +} + +impl State { + fn increment(&self, i: usize) -> usize { + if i + 1 == self.len { + 0 + } else { + i + 1 + } + } + + fn is_full(&self) -> bool { + self.full + } + + fn is_empty(&self) -> bool { + self.front == self.back && !self.full + } + + fn push_index(&mut self) -> Option { + match self.is_full() { + true => None, + false => Some(self.back), + } + } + + fn push_done(&mut self) { + assert!(!self.is_full()); + self.back = self.increment(self.back); + if self.back == self.front { + self.full = true; + } + self.send_waker.wake(); + } + + fn pop_index(&mut self) -> Option { + match self.is_empty() { + true => None, + false => Some(self.front), + } + } + + fn pop_done(&mut self) { + assert!(!self.is_empty()); + self.front = self.increment(self.front); + self.full = false; + self.recv_waker.wake(); + } +} -- cgit From 1eb03dc41a4a5fa8435f9a49d26e29ceea6d498e Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 12:07:30 +0200 Subject: Prefer `receive` over `recv` --- embassy-sync/src/zero_copy_channel.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs index 3701ccf1a..cbb8cb526 100644 --- a/embassy-sync/src/zero_copy_channel.rs +++ b/embassy-sync/src/zero_copy_channel.rs @@ -27,7 +27,7 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { back: 0, full: false, send_waker: WakerRegistration::new(), - recv_waker: WakerRegistration::new(), + receive_waker: WakerRegistration::new(), })), } } @@ -62,7 +62,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { match s.push_index() { Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), None => { - s.recv_waker.register(cx.waker()); + s.receive_waker.register(cx.waker()); Poll::Pending } } @@ -76,7 +76,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { match s.push_index() { Some(i) => Poll::Ready(i), None => { - s.recv_waker.register(cx.waker()); + s.receive_waker.register(cx.waker()); Poll::Pending } } @@ -99,7 +99,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { Receiver { channel: self.channel } } - pub fn try_recv(&mut self) -> Option<&mut T> { + pub fn try_receive(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); match s.pop_index() { @@ -109,7 +109,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } - pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> { + pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); match s.pop_index() { @@ -122,7 +122,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } - pub async fn recv(&mut self) -> &mut T { + pub async fn receive(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -139,7 +139,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } - pub fn recv_done(&mut self) { + pub fn receive_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().pop_done()) } } @@ -157,7 +157,7 @@ struct State { full: bool, send_waker: WakerRegistration, - recv_waker: WakerRegistration, + receive_waker: WakerRegistration, } impl State { @@ -204,6 +204,6 @@ impl State { assert!(!self.is_empty()); self.front = self.increment(self.front); self.full = false; - self.recv_waker.wake(); + self.receive_waker.wake(); } } -- cgit From 6e38b0764253ba07d3106ce3d57c2fd3509d7beb Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 13:50:12 +0200 Subject: Add docs to zero-copy-channel --- embassy-sync/src/zero_copy_channel.rs | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) (limited to 'embassy-sync') diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs index cbb8cb526..f704cbd5d 100644 --- a/embassy-sync/src/zero_copy_channel.rs +++ b/embassy-sync/src/zero_copy_channel.rs @@ -1,3 +1,22 @@ +//! A zero-copy queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! Receivers are competing for messages. So a message that is received by +//! one receiver is not received by any other. +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. + use core::cell::RefCell; use core::future::poll_fn; use core::marker::PhantomData; @@ -7,6 +26,17 @@ use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::WakerRegistration; +/// A bounded zero-copy channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +/// +/// The channel requires a buffer of recyclable elements. Writing to the channel is done through +/// an `&mut T`. pub struct Channel<'a, M: RawMutex, T> { buf: *mut T, phantom: PhantomData<&'a mut T>, @@ -14,6 +44,10 @@ pub struct Channel<'a, M: RawMutex, T> { } impl<'a, M: RawMutex, T> Channel<'a, M, T> { + /// Initialize a new [`Channel`]. + /// + /// The provided buffer will be used and reused by the channel's logic, and thus dictates the + /// channel's capacity. pub fn new(buf: &'a mut [T]) -> Self { let len = buf.len(); assert!(len != 0); @@ -32,20 +66,27 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { } } + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { (Sender { channel: self }, Receiver { channel: self }) } } +/// Send-only access to a [`Channel`]. pub struct Sender<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } impl<'a, M: RawMutex, T> Sender<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Sender<'_, M, T> { Sender { channel: self.channel } } + /// Attempts to send a value over the channel. pub fn try_send(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -56,6 +97,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { }) } + /// Attempts to send a value over the channel. pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -69,6 +111,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { }) } + /// Asynchronously send a value over the channel. pub async fn send(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { @@ -86,19 +129,24 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } + /// Notify the channel that the sending of the value has been finalized. pub fn send_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().push_done()) } } + +/// Receive-only access to a [`Channel`]. pub struct Receiver<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Receiver<'_, M, T> { Receiver { channel: self.channel } } + /// Attempts to receive a value over the channel. pub fn try_receive(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -109,6 +157,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } + /// Attempts to asynchronously receive a value over the channel. pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -122,6 +171,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } + /// Asynchronously receive a value over the channel. pub async fn receive(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { @@ -139,6 +189,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } + /// Notify the channel that the receiving of the value has been finalized. pub fn receive_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().pop_done()) } -- cgit From 615882ebd67f4e7e60fb8aa1505b1272655c4fa4 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 4 Sep 2023 22:16:28 +0200 Subject: Rename zero_copy -> zerocopy. --- embassy-sync/src/lib.rs | 2 +- embassy-sync/src/zero_copy_channel.rs | 260 ---------------------------------- embassy-sync/src/zerocopy_channel.rs | 260 ++++++++++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 261 deletions(-) delete mode 100644 embassy-sync/src/zero_copy_channel.rs create mode 100644 embassy-sync/src/zerocopy_channel.rs (limited to 'embassy-sync') diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 48a7b13f6..8a9f841ee 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -17,4 +17,4 @@ pub mod pipe; pub mod pubsub; pub mod signal; pub mod waitqueue; -pub mod zero_copy_channel; +pub mod zerocopy_channel; diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs deleted file mode 100644 index f704cbd5d..000000000 --- a/embassy-sync/src/zero_copy_channel.rs +++ /dev/null @@ -1,260 +0,0 @@ -//! A zero-copy queue for sending values between asynchronous tasks. -//! -//! It can be used concurrently by multiple producers (senders) and multiple -//! consumers (receivers), i.e. it is an "MPMC channel". -//! -//! Receivers are competing for messages. So a message that is received by -//! one receiver is not received by any other. -//! -//! This queue takes a Mutex type so that various -//! targets can be attained. For example, a ThreadModeMutex can be used -//! for single-core Cortex-M targets where messages are only passed -//! between tasks running in thread mode. Similarly, a CriticalSectionMutex -//! can also be used for single-core targets where messages are to be -//! passed from exception mode e.g. out of an interrupt handler. -//! -//! This module provides a bounded channel that has a limit on the number of -//! messages that it can store, and if this limit is reached, trying to send -//! another message will result in an error being returned. - -use core::cell::RefCell; -use core::future::poll_fn; -use core::marker::PhantomData; -use core::task::{Context, Poll}; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::WakerRegistration; - -/// A bounded zero-copy channel for communicating between asynchronous tasks -/// with backpressure. -/// -/// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is -/// received from the channel. -/// -/// All data sent will become available in the same order as it was sent. -/// -/// The channel requires a buffer of recyclable elements. Writing to the channel is done through -/// an `&mut T`. -pub struct Channel<'a, M: RawMutex, T> { - buf: *mut T, - phantom: PhantomData<&'a mut T>, - state: Mutex>, -} - -impl<'a, M: RawMutex, T> Channel<'a, M, T> { - /// Initialize a new [`Channel`]. - /// - /// The provided buffer will be used and reused by the channel's logic, and thus dictates the - /// channel's capacity. - pub fn new(buf: &'a mut [T]) -> Self { - let len = buf.len(); - assert!(len != 0); - - Self { - buf: buf.as_mut_ptr(), - phantom: PhantomData, - state: Mutex::new(RefCell::new(State { - len, - front: 0, - back: 0, - full: false, - send_waker: WakerRegistration::new(), - receive_waker: WakerRegistration::new(), - })), - } - } - - /// Creates a [`Sender`] and [`Receiver`] from an existing channel. - /// - /// Further Senders and Receivers can be created through [`Sender::borrow`] and - /// [`Receiver::borrow`] respectively. - pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { - (Sender { channel: self }, Receiver { channel: self }) - } -} - -/// Send-only access to a [`Channel`]. -pub struct Sender<'a, M: RawMutex, T> { - channel: &'a Channel<'a, M, T>, -} - -impl<'a, M: RawMutex, T> Sender<'a, M, T> { - /// Creates one further [`Sender`] over the same channel. - pub fn borrow(&mut self) -> Sender<'_, M, T> { - Sender { channel: self.channel } - } - - /// Attempts to send a value over the channel. - pub fn try_send(&mut self) -> Option<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.push_index() { - Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), - None => None, - } - }) - } - - /// Attempts to send a value over the channel. - pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.push_index() { - Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), - None => { - s.receive_waker.register(cx.waker()); - Poll::Pending - } - } - }) - } - - /// Asynchronously send a value over the channel. - pub async fn send(&mut self) -> &mut T { - let i = poll_fn(|cx| { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.push_index() { - Some(i) => Poll::Ready(i), - None => { - s.receive_waker.register(cx.waker()); - Poll::Pending - } - } - }) - }) - .await; - unsafe { &mut *self.channel.buf.add(i) } - } - - /// Notify the channel that the sending of the value has been finalized. - pub fn send_done(&mut self) { - self.channel.state.lock(|s| s.borrow_mut().push_done()) - } -} - -/// Receive-only access to a [`Channel`]. -pub struct Receiver<'a, M: RawMutex, T> { - channel: &'a Channel<'a, M, T>, -} - -impl<'a, M: RawMutex, T> Receiver<'a, M, T> { - /// Creates one further [`Sender`] over the same channel. - pub fn borrow(&mut self) -> Receiver<'_, M, T> { - Receiver { channel: self.channel } - } - - /// Attempts to receive a value over the channel. - pub fn try_receive(&mut self) -> Option<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.pop_index() { - Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), - None => None, - } - }) - } - - /// Attempts to asynchronously receive a value over the channel. - pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.pop_index() { - Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), - None => { - s.send_waker.register(cx.waker()); - Poll::Pending - } - } - }) - } - - /// Asynchronously receive a value over the channel. - pub async fn receive(&mut self) -> &mut T { - let i = poll_fn(|cx| { - self.channel.state.lock(|s| { - let s = &mut *s.borrow_mut(); - match s.pop_index() { - Some(i) => Poll::Ready(i), - None => { - s.send_waker.register(cx.waker()); - Poll::Pending - } - } - }) - }) - .await; - unsafe { &mut *self.channel.buf.add(i) } - } - - /// Notify the channel that the receiving of the value has been finalized. - pub fn receive_done(&mut self) { - self.channel.state.lock(|s| s.borrow_mut().pop_done()) - } -} - -struct State { - len: usize, - - /// Front index. Always 0..=(N-1) - front: usize, - /// Back index. Always 0..=(N-1). - back: usize, - - /// Used to distinguish "empty" and "full" cases when `front == back`. - /// May only be `true` if `front == back`, always `false` otherwise. - full: bool, - - send_waker: WakerRegistration, - receive_waker: WakerRegistration, -} - -impl State { - fn increment(&self, i: usize) -> usize { - if i + 1 == self.len { - 0 - } else { - i + 1 - } - } - - fn is_full(&self) -> bool { - self.full - } - - fn is_empty(&self) -> bool { - self.front == self.back && !self.full - } - - fn push_index(&mut self) -> Option { - match self.is_full() { - true => None, - false => Some(self.back), - } - } - - fn push_done(&mut self) { - assert!(!self.is_full()); - self.back = self.increment(self.back); - if self.back == self.front { - self.full = true; - } - self.send_waker.wake(); - } - - fn pop_index(&mut self) -> Option { - match self.is_empty() { - true => None, - false => Some(self.front), - } - } - - fn pop_done(&mut self) { - assert!(!self.is_empty()); - self.front = self.increment(self.front); - self.full = false; - self.receive_waker.wake(); - } -} diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs new file mode 100644 index 000000000..f704cbd5d --- /dev/null +++ b/embassy-sync/src/zerocopy_channel.rs @@ -0,0 +1,260 @@ +//! A zero-copy queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! Receivers are competing for messages. So a message that is received by +//! one receiver is not received by any other. +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. + +use core::cell::RefCell; +use core::future::poll_fn; +use core::marker::PhantomData; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// A bounded zero-copy channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +/// +/// The channel requires a buffer of recyclable elements. Writing to the channel is done through +/// an `&mut T`. +pub struct Channel<'a, M: RawMutex, T> { + buf: *mut T, + phantom: PhantomData<&'a mut T>, + state: Mutex>, +} + +impl<'a, M: RawMutex, T> Channel<'a, M, T> { + /// Initialize a new [`Channel`]. + /// + /// The provided buffer will be used and reused by the channel's logic, and thus dictates the + /// channel's capacity. + pub fn new(buf: &'a mut [T]) -> Self { + let len = buf.len(); + assert!(len != 0); + + Self { + buf: buf.as_mut_ptr(), + phantom: PhantomData, + state: Mutex::new(RefCell::new(State { + len, + front: 0, + back: 0, + full: false, + send_waker: WakerRegistration::new(), + receive_waker: WakerRegistration::new(), + })), + } + } + + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. + pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { + (Sender { channel: self }, Receiver { channel: self }) + } +} + +/// Send-only access to a [`Channel`]. +pub struct Sender<'a, M: RawMutex, T> { + channel: &'a Channel<'a, M, T>, +} + +impl<'a, M: RawMutex, T> Sender<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. + pub fn borrow(&mut self) -> Sender<'_, M, T> { + Sender { channel: self.channel } + } + + /// Attempts to send a value over the channel. + pub fn try_send(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), + None => None, + } + }) + } + + /// Attempts to send a value over the channel. + pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), + None => { + s.receive_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + /// Asynchronously send a value over the channel. + pub async fn send(&mut self) -> &mut T { + let i = poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.push_index() { + Some(i) => Poll::Ready(i), + None => { + s.receive_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + .await; + unsafe { &mut *self.channel.buf.add(i) } + } + + /// Notify the channel that the sending of the value has been finalized. + pub fn send_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().push_done()) + } +} + +/// Receive-only access to a [`Channel`]. +pub struct Receiver<'a, M: RawMutex, T> { + channel: &'a Channel<'a, M, T>, +} + +impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. + pub fn borrow(&mut self) -> Receiver<'_, M, T> { + Receiver { channel: self.channel } + } + + /// Attempts to receive a value over the channel. + pub fn try_receive(&mut self) -> Option<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), + None => None, + } + }) + } + + /// Attempts to asynchronously receive a value over the channel. + pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + } + + /// Asynchronously receive a value over the channel. + pub async fn receive(&mut self) -> &mut T { + let i = poll_fn(|cx| { + self.channel.state.lock(|s| { + let s = &mut *s.borrow_mut(); + match s.pop_index() { + Some(i) => Poll::Ready(i), + None => { + s.send_waker.register(cx.waker()); + Poll::Pending + } + } + }) + }) + .await; + unsafe { &mut *self.channel.buf.add(i) } + } + + /// Notify the channel that the receiving of the value has been finalized. + pub fn receive_done(&mut self) { + self.channel.state.lock(|s| s.borrow_mut().pop_done()) + } +} + +struct State { + len: usize, + + /// Front index. Always 0..=(N-1) + front: usize, + /// Back index. Always 0..=(N-1). + back: usize, + + /// Used to distinguish "empty" and "full" cases when `front == back`. + /// May only be `true` if `front == back`, always `false` otherwise. + full: bool, + + send_waker: WakerRegistration, + receive_waker: WakerRegistration, +} + +impl State { + fn increment(&self, i: usize) -> usize { + if i + 1 == self.len { + 0 + } else { + i + 1 + } + } + + fn is_full(&self) -> bool { + self.full + } + + fn is_empty(&self) -> bool { + self.front == self.back && !self.full + } + + fn push_index(&mut self) -> Option { + match self.is_full() { + true => None, + false => Some(self.back), + } + } + + fn push_done(&mut self) { + assert!(!self.is_full()); + self.back = self.increment(self.back); + if self.back == self.front { + self.full = true; + } + self.send_waker.wake(); + } + + fn pop_index(&mut self) -> Option { + match self.is_empty() { + true => None, + false => Some(self.front), + } + } + + fn pop_done(&mut self) { + assert!(!self.is_empty()); + self.front = self.increment(self.front); + self.full = false; + self.receive_waker.wake(); + } +} -- cgit From 1b20ba27b12a93ae94b4eff39160da884c592db4 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Thu, 14 Sep 2023 18:26:00 +0200 Subject: feat: bump embassy-sync version to 0.3.0 Update changelog in preparation for release --- embassy-sync/CHANGELOG.md | 9 ++++++++- embassy-sync/Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'embassy-sync') diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md index a60f3f7c4..2c53dd0f8 100644 --- a/embassy-sync/CHANGELOG.md +++ b/embassy-sync/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.3.0 - 2023-09-14 + +- switch to embedded-io 0.5 +- add api for polling channels with context +- standardise fn names on channels +- add zero-copy channel + ## 0.2.0 - 2023-04-13 - pubsub: Fix messages not getting popped when the last subscriber that needed them gets dropped. @@ -19,4 +26,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 0.1.0 - 2022-08-26 -- First release \ No newline at end of file +- First release diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml index 94d6799e5..f7739f305 100644 --- a/embassy-sync/Cargo.toml +++ b/embassy-sync/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "embassy-sync" -version = "0.2.0" +version = "0.3.0" edition = "2021" description = "no-std, no-alloc synchronization primitives with async support" repository = "https://github.com/embassy-rs/embassy" -- cgit