aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorTyler Gilbert <[email protected]>2023-09-29 21:08:21 -0500
committerTyler Gilbert <[email protected]>2023-09-29 21:08:21 -0500
commit2addfc4b86621f1012441a8cea07d69cb6d42edc (patch)
treec4a9122a5f2ede4fd8762a6f00c33b961c24a893 /embassy-sync
parent92df87781d6a0f0e85753d3d58c6594846bb2f0b (diff)
parenta35d149cb151d53fd95b5073cef927116b34fe0d (diff)
Merge branch 'issue-1974-add-sai-driver' of https://github.com/tyler-gilbert/embassy into issue-1974-add-sai-driver
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/CHANGELOG.md9
-rw-r--r--embassy-sync/Cargo.toml2
-rw-r--r--embassy-sync/src/channel.rs2
-rw-r--r--embassy-sync/src/fmt.rs45
-rw-r--r--embassy-sync/src/lib.rs1
-rw-r--r--embassy-sync/src/mutex.rs2
-rw-r--r--embassy-sync/src/pipe.rs306
-rw-r--r--embassy-sync/src/ring_buffer.rs46
-rw-r--r--embassy-sync/src/zerocopy_channel.rs260
9 files changed, 538 insertions, 135 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md
index a60f3f7c4..2c53dd0f8 100644
--- a/embassy-sync/CHANGELOG.md
+++ b/embassy-sync/CHANGELOG.md
@@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), 5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7 7
8## 0.3.0 - 2023-09-14
9
10- switch to embedded-io 0.5
11- add api for polling channels with context
12- standardise fn names on channels
13- add zero-copy channel
14
8## 0.2.0 - 2023-04-13 15## 0.2.0 - 2023-04-13
9 16
10- pubsub: Fix messages not getting popped when the last subscriber that needed them gets dropped. 17- pubsub: Fix messages not getting popped when the last subscriber that needed them gets dropped.
@@ -19,4 +26,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
19 26
20## 0.1.0 - 2022-08-26 27## 0.1.0 - 2022-08-26
21 28
22- First release \ No newline at end of file 29- First release
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
index 94d6799e5..f7739f305 100644
--- a/embassy-sync/Cargo.toml
+++ b/embassy-sync/Cargo.toml
@@ -1,6 +1,6 @@
1[package] 1[package]
2name = "embassy-sync" 2name = "embassy-sync"
3version = "0.2.0" 3version = "0.3.0"
4edition = "2021" 4edition = "2021"
5description = "no-std, no-alloc synchronization primitives with async support" 5description = "no-std, no-alloc synchronization primitives with async support"
6repository = "https://github.com/embassy-rs/embassy" 6repository = "https://github.com/embassy-rs/embassy"
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index 62ea1307d..a512e0c41 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -471,7 +471,7 @@ where
471 } 471 }
472 472
473 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { 473 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
474 self.inner.lock(|rc| f(&mut *rc.borrow_mut())) 474 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
475 } 475 }
476 476
477 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { 477 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
index 066970813..78e583c1c 100644
--- a/embassy-sync/src/fmt.rs
+++ b/embassy-sync/src/fmt.rs
@@ -1,6 +1,8 @@
1#![macro_use] 1#![macro_use]
2#![allow(unused_macros)] 2#![allow(unused_macros)]
3 3
4use core::fmt::{Debug, Display, LowerHex};
5
4#[cfg(all(feature = "defmt", feature = "log"))] 6#[cfg(all(feature = "defmt", feature = "log"))]
5compile_error!("You may not enable both `defmt` and `log` features."); 7compile_error!("You may not enable both `defmt` and `log` features.");
6 8
@@ -81,14 +83,17 @@ macro_rules! todo {
81 }; 83 };
82} 84}
83 85
86#[cfg(not(feature = "defmt"))]
84macro_rules! unreachable { 87macro_rules! unreachable {
85 ($($x:tt)*) => { 88 ($($x:tt)*) => {
86 { 89 ::core::unreachable!($($x)*)
87 #[cfg(not(feature = "defmt"))] 90 };
88 ::core::unreachable!($($x)*); 91}
89 #[cfg(feature = "defmt")] 92
90 ::defmt::unreachable!($($x)*); 93#[cfg(feature = "defmt")]
91 } 94macro_rules! unreachable {
95 ($($x:tt)*) => {
96 ::defmt::unreachable!($($x)*)
92 }; 97 };
93} 98}
94 99
@@ -223,3 +228,31 @@ impl<T, E> Try for Result<T, E> {
223 self 228 self
224 } 229 }
225} 230}
231
232#[allow(unused)]
233pub(crate) struct Bytes<'a>(pub &'a [u8]);
234
235impl<'a> Debug for Bytes<'a> {
236 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
237 write!(f, "{:#02x?}", self.0)
238 }
239}
240
241impl<'a> Display for Bytes<'a> {
242 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
243 write!(f, "{:#02x?}", self.0)
244 }
245}
246
247impl<'a> LowerHex for Bytes<'a> {
248 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
249 write!(f, "{:#02x?}", self.0)
250 }
251}
252
253#[cfg(feature = "defmt")]
254impl<'a> defmt::Format for Bytes<'a> {
255 fn format(&self, fmt: defmt::Formatter) {
256 defmt::write!(fmt, "{:02x}", self.0)
257 }
258}
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 53d95d081..8a9f841ee 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -17,3 +17,4 @@ pub mod pipe;
17pub mod pubsub; 17pub mod pubsub;
18pub mod signal; 18pub mod signal;
19pub mod waitqueue; 19pub mod waitqueue;
20pub mod zerocopy_channel;
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index fcf056d36..72459d660 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -149,7 +149,7 @@ where
149{ 149{
150 fn drop(&mut self) { 150 fn drop(&mut self) {
151 self.mutex.state.lock(|s| { 151 self.mutex.state.lock(|s| {
152 let mut s = s.borrow_mut(); 152 let mut s = unwrap!(s.try_borrow_mut());
153 s.locked = false; 153 s.locked = false;
154 s.waker.wake(); 154 s.waker.wake();
155 }) 155 })
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 21d451ea6..ec0cbbf2a 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -1,7 +1,8 @@
1//! Async byte stream pipe. 1//! Async byte stream pipe.
2 2
3use core::cell::RefCell; 3use core::cell::{RefCell, UnsafeCell};
4use core::future::Future; 4use core::future::Future;
5use core::ops::Range;
5use core::pin::Pin; 6use core::pin::Pin;
6use core::task::{Context, Poll}; 7use core::task::{Context, Poll};
7 8
@@ -82,17 +83,6 @@ where
82 pipe: &'p Pipe<M, N>, 83 pipe: &'p Pipe<M, N>,
83} 84}
84 85
85impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
86where
87 M: RawMutex,
88{
89 fn clone(&self) -> Self {
90 Reader { pipe: self.pipe }
91 }
92}
93
94impl<'p, M, const N: usize> Copy for Reader<'p, M, N> where M: RawMutex {}
95
96impl<'p, M, const N: usize> Reader<'p, M, N> 86impl<'p, M, const N: usize> Reader<'p, M, N>
97where 87where
98 M: RawMutex, 88 M: RawMutex,
@@ -110,6 +100,29 @@ where
110 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { 100 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
111 self.pipe.try_read(buf) 101 self.pipe.try_read(buf)
112 } 102 }
103
104 /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
105 ///
106 /// If no bytes are currently available to read, this function waits until at least one byte is available.
107 ///
108 /// If the reader is at end-of-file (EOF), an empty slice is returned.
109 pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> {
110 FillBufFuture { pipe: Some(self.pipe) }
111 }
112
113 /// Try returning contents of the internal buffer.
114 ///
115 /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
116 ///
117 /// If the reader is at end-of-file (EOF), an empty slice is returned.
118 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
119 unsafe { self.pipe.try_fill_buf_with_context(None) }
120 }
121
122 /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
123 pub fn consume(&mut self, amt: usize) {
124 self.pipe.consume(amt)
125 }
113} 126}
114 127
115/// Future returned by [`Pipe::read`] and [`Reader::read`]. 128/// Future returned by [`Pipe::read`] and [`Reader::read`].
@@ -138,6 +151,35 @@ where
138 151
139impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} 152impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
140 153
154/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`].
155#[must_use = "futures do nothing unless you `.await` or poll them"]
156pub struct FillBufFuture<'p, M, const N: usize>
157where
158 M: RawMutex,
159{
160 pipe: Option<&'p Pipe<M, N>>,
161}
162
163impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N>
164where
165 M: RawMutex,
166{
167 type Output = &'p [u8];
168
169 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170 let pipe = self.pipe.take().unwrap();
171 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
172 Ok(buf) => Poll::Ready(buf),
173 Err(TryReadError::Empty) => {
174 self.pipe = Some(pipe);
175 Poll::Pending
176 }
177 }
178 }
179}
180
181impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {}
182
141/// Error returned by [`try_read`](Pipe::try_read). 183/// Error returned by [`try_read`](Pipe::try_read).
142#[derive(PartialEq, Eq, Clone, Copy, Debug)] 184#[derive(PartialEq, Eq, Clone, Copy, Debug)]
143#[cfg_attr(feature = "defmt", derive(defmt::Format))] 185#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -162,67 +204,24 @@ struct PipeState<const N: usize> {
162 write_waker: WakerRegistration, 204 write_waker: WakerRegistration,
163} 205}
164 206
165impl<const N: usize> PipeState<N> { 207#[repr(transparent)]
166 const fn new() -> Self { 208struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
167 PipeState {
168 buffer: RingBuffer::new(),
169 read_waker: WakerRegistration::new(),
170 write_waker: WakerRegistration::new(),
171 }
172 }
173
174 fn clear(&mut self) {
175 self.buffer.clear();
176 self.write_waker.wake();
177 }
178
179 fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
180 self.try_read_with_context(None, buf)
181 }
182
183 fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
184 if self.buffer.is_full() {
185 self.write_waker.wake();
186 }
187
188 let available = self.buffer.pop_buf();
189 if available.is_empty() {
190 if let Some(cx) = cx {
191 self.read_waker.register(cx.waker());
192 }
193 return Err(TryReadError::Empty);
194 }
195
196 let n = available.len().min(buf.len());
197 buf[..n].copy_from_slice(&available[..n]);
198 self.buffer.pop(n);
199 Ok(n)
200 }
201 209
202 fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> { 210impl<const N: usize> Buffer<N> {
203 self.try_write_with_context(None, buf) 211 unsafe fn get<'a>(&self, r: Range<usize>) -> &'a [u8] {
212 let p = self.0.get() as *const u8;
213 core::slice::from_raw_parts(p.add(r.start), r.end - r.start)
204 } 214 }
205 215
206 fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { 216 unsafe fn get_mut<'a>(&self, r: Range<usize>) -> &'a mut [u8] {
207 if self.buffer.is_empty() { 217 let p = self.0.get() as *mut u8;
208 self.read_waker.wake(); 218 core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start)
209 }
210
211 let available = self.buffer.push_buf();
212 if available.is_empty() {
213 if let Some(cx) = cx {
214 self.write_waker.register(cx.waker());
215 }
216 return Err(TryWriteError::Full);
217 }
218
219 let n = available.len().min(buf.len());
220 available[..n].copy_from_slice(&buf[..n]);
221 self.buffer.push(n);
222 Ok(n)
223 } 219 }
224} 220}
225 221
222unsafe impl<const N: usize> Send for Buffer<N> {}
223unsafe impl<const N: usize> Sync for Buffer<N> {}
224
226/// A bounded byte-oriented pipe for communicating between asynchronous tasks 225/// A bounded byte-oriented pipe for communicating between asynchronous tasks
227/// with backpressure. 226/// with backpressure.
228/// 227///
@@ -234,6 +233,7 @@ pub struct Pipe<M, const N: usize>
234where 233where
235 M: RawMutex, 234 M: RawMutex,
236{ 235{
236 buf: Buffer<N>,
237 inner: Mutex<M, RefCell<PipeState<N>>>, 237 inner: Mutex<M, RefCell<PipeState<N>>>,
238} 238}
239 239
@@ -252,7 +252,12 @@ where
252 /// ``` 252 /// ```
253 pub const fn new() -> Self { 253 pub const fn new() -> Self {
254 Self { 254 Self {
255 inner: Mutex::new(RefCell::new(PipeState::new())), 255 buf: Buffer(UnsafeCell::new([0; N])),
256 inner: Mutex::new(RefCell::new(PipeState {
257 buffer: RingBuffer::new(),
258 read_waker: WakerRegistration::new(),
259 write_waker: WakerRegistration::new(),
260 })),
256 } 261 }
257 } 262 }
258 263
@@ -261,21 +266,91 @@ where
261 } 266 }
262 267
263 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { 268 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
264 self.lock(|c| c.try_read_with_context(cx, buf)) 269 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
270 let s = &mut *rc.borrow_mut();
271
272 if s.buffer.is_full() {
273 s.write_waker.wake();
274 }
275
276 let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
277 if available.is_empty() {
278 if let Some(cx) = cx {
279 s.read_waker.register(cx.waker());
280 }
281 return Err(TryReadError::Empty);
282 }
283
284 let n = available.len().min(buf.len());
285 buf[..n].copy_from_slice(&available[..n]);
286 s.buffer.pop(n);
287 Ok(n)
288 })
265 } 289 }
266 290
267 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { 291 // safety: While the returned slice is alive,
268 self.lock(|c| c.try_write_with_context(cx, buf)) 292 // no `read` or `consume` methods in the pipe must be called.
293 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
294 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
295 let s = &mut *rc.borrow_mut();
296
297 if s.buffer.is_full() {
298 s.write_waker.wake();
299 }
300
301 let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
302 if available.is_empty() {
303 if let Some(cx) = cx {
304 s.read_waker.register(cx.waker());
305 }
306 return Err(TryReadError::Empty);
307 }
308
309 Ok(available)
310 })
269 } 311 }
270 312
271 /// Get a writer for this pipe. 313 fn consume(&self, amt: usize) {
272 pub fn writer(&self) -> Writer<'_, M, N> { 314 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
273 Writer { pipe: self } 315 let s = &mut *rc.borrow_mut();
316 let available = s.buffer.pop_buf();
317 assert!(amt <= available.len());
318 s.buffer.pop(amt);
319 })
274 } 320 }
275 321
276 /// Get a reader for this pipe. 322 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
277 pub fn reader(&self) -> Reader<'_, M, N> { 323 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
278 Reader { pipe: self } 324 let s = &mut *rc.borrow_mut();
325
326 if s.buffer.is_empty() {
327 s.read_waker.wake();
328 }
329
330 let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) };
331 if available.is_empty() {
332 if let Some(cx) = cx {
333 s.write_waker.register(cx.waker());
334 }
335 return Err(TryWriteError::Full);
336 }
337
338 let n = available.len().min(buf.len());
339 available[..n].copy_from_slice(&buf[..n]);
340 s.buffer.push(n);
341 Ok(n)
342 })
343 }
344
345 /// Split this pipe into a BufRead-capable reader and a writer.
346 ///
347 /// The reader and writer borrow the current pipe mutably, so it is not
348 /// possible to use it directly while they exist. This is needed because
349 /// implementing `BufRead` requires there is a single reader.
350 ///
351 /// The writer is cloneable, the reader is not.
352 pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) {
353 (Reader { pipe: self }, Writer { pipe: self })
279 } 354 }
280 355
281 /// Write some bytes to the pipe. 356 /// Write some bytes to the pipe.
@@ -312,7 +387,7 @@ where
312 /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant 387 /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant
313 /// that waits instead of returning an error. 388 /// that waits instead of returning an error.
314 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { 389 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
315 self.lock(|c| c.try_write(buf)) 390 self.try_write_with_context(None, buf)
316 } 391 }
317 392
318 /// Read some bytes from the pipe. 393 /// Read some bytes from the pipe.
@@ -339,12 +414,17 @@ where
339 /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant 414 /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant
340 /// that waits instead of returning an error. 415 /// that waits instead of returning an error.
341 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { 416 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
342 self.lock(|c| c.try_read(buf)) 417 self.try_read_with_context(None, buf)
343 } 418 }
344 419
345 /// Clear the data in the pipe's buffer. 420 /// Clear the data in the pipe's buffer.
346 pub fn clear(&self) { 421 pub fn clear(&self) {
347 self.lock(|c| c.clear()) 422 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
423 let s = &mut *rc.borrow_mut();
424
425 s.buffer.clear();
426 s.write_waker.wake();
427 })
348 } 428 }
349 429
350 /// Return whether the pipe is full (no free space in the buffer) 430 /// Return whether the pipe is full (no free space in the buffer)
@@ -433,6 +513,16 @@ mod io_impls {
433 } 513 }
434 } 514 }
435 515
516 impl<M: RawMutex, const N: usize> embedded_io_async::BufRead for Reader<'_, M, N> {
517 async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
518 Ok(Reader::fill_buf(self).await)
519 }
520
521 fn consume(&mut self, amt: usize) {
522 Reader::consume(self, amt)
523 }
524 }
525
436 impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> { 526 impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> {
437 type Error = Infallible; 527 type Error = Infallible;
438 } 528 }
@@ -457,43 +547,39 @@ mod tests {
457 use super::*; 547 use super::*;
458 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; 548 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
459 549
460 fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
461 N - c.buffer.len()
462 }
463
464 #[test] 550 #[test]
465 fn writing_once() { 551 fn writing_once() {
466 let mut c = PipeState::<3>::new(); 552 let c = Pipe::<NoopRawMutex, 3>::new();
467 assert!(c.try_write(&[1]).is_ok()); 553 assert!(c.try_write(&[1]).is_ok());
468 assert_eq!(capacity(&c), 2); 554 assert_eq!(c.free_capacity(), 2);
469 } 555 }
470 556
471 #[test] 557 #[test]
472 fn writing_when_full() { 558 fn writing_when_full() {
473 let mut c = PipeState::<3>::new(); 559 let c = Pipe::<NoopRawMutex, 3>::new();
474 assert_eq!(c.try_write(&[42]), Ok(1)); 560 assert_eq!(c.try_write(&[42]), Ok(1));
475 assert_eq!(c.try_write(&[43]), Ok(1)); 561 assert_eq!(c.try_write(&[43]), Ok(1));
476 assert_eq!(c.try_write(&[44]), Ok(1)); 562 assert_eq!(c.try_write(&[44]), Ok(1));
477 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); 563 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
478 assert_eq!(capacity(&c), 0); 564 assert_eq!(c.free_capacity(), 0);
479 } 565 }
480 566
481 #[test] 567 #[test]
482 fn receiving_once_with_one_send() { 568 fn receiving_once_with_one_send() {
483 let mut c = PipeState::<3>::new(); 569 let c = Pipe::<NoopRawMutex, 3>::new();
484 assert!(c.try_write(&[42]).is_ok()); 570 assert!(c.try_write(&[42]).is_ok());
485 let mut buf = [0; 16]; 571 let mut buf = [0; 16];
486 assert_eq!(c.try_read(&mut buf), Ok(1)); 572 assert_eq!(c.try_read(&mut buf), Ok(1));
487 assert_eq!(buf[0], 42); 573 assert_eq!(buf[0], 42);
488 assert_eq!(capacity(&c), 3); 574 assert_eq!(c.free_capacity(), 3);
489 } 575 }
490 576
491 #[test] 577 #[test]
492 fn receiving_when_empty() { 578 fn receiving_when_empty() {
493 let mut c = PipeState::<3>::new(); 579 let c = Pipe::<NoopRawMutex, 3>::new();
494 let mut buf = [0; 16]; 580 let mut buf = [0; 16];
495 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); 581 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
496 assert_eq!(capacity(&c), 3); 582 assert_eq!(c.free_capacity(), 3);
497 } 583 }
498 584
499 #[test] 585 #[test]
@@ -506,13 +592,37 @@ mod tests {
506 } 592 }
507 593
508 #[test] 594 #[test]
509 fn cloning() { 595 fn read_buf() {
510 let c = Pipe::<NoopRawMutex, 3>::new(); 596 let mut c = Pipe::<NoopRawMutex, 3>::new();
511 let r1 = c.reader(); 597 let (mut r, w) = c.split();
512 let w1 = c.writer(); 598 assert!(w.try_write(&[42, 43]).is_ok());
599 let buf = r.try_fill_buf().unwrap();
600 assert_eq!(buf, &[42, 43]);
601 let buf = r.try_fill_buf().unwrap();
602 assert_eq!(buf, &[42, 43]);
603 r.consume(1);
604 let buf = r.try_fill_buf().unwrap();
605 assert_eq!(buf, &[43]);
606 r.consume(1);
607 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
608 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
609 assert_eq!(w.try_write(&[45, 46]), Ok(2));
610 let buf = r.try_fill_buf().unwrap();
611 assert_eq!(buf, &[44]); // only one byte due to wraparound.
612 r.consume(1);
613 let buf = r.try_fill_buf().unwrap();
614 assert_eq!(buf, &[45, 46]);
615 assert!(w.try_write(&[47]).is_ok());
616 let buf = r.try_fill_buf().unwrap();
617 assert_eq!(buf, &[45, 46, 47]);
618 r.consume(3);
619 }
513 620
514 let _ = r1.clone(); 621 #[test]
515 let _ = w1.clone(); 622 fn writer_is_cloneable() {
623 let mut c = Pipe::<NoopRawMutex, 3>::new();
624 let (_r, w) = c.split();
625 let _ = w.clone();
516 } 626 }
517 627
518 #[futures_test::test] 628 #[futures_test::test]
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs
index 521084024..d95ffa7c9 100644
--- a/embassy-sync/src/ring_buffer.rs
+++ b/embassy-sync/src/ring_buffer.rs
@@ -1,5 +1,6 @@
1use core::ops::Range;
2
1pub struct RingBuffer<const N: usize> { 3pub struct RingBuffer<const N: usize> {
2 buf: [u8; N],
3 start: usize, 4 start: usize,
4 end: usize, 5 end: usize,
5 empty: bool, 6 empty: bool,
@@ -8,27 +9,26 @@ pub struct RingBuffer<const N: usize> {
8impl<const N: usize> RingBuffer<N> { 9impl<const N: usize> RingBuffer<N> {
9 pub const fn new() -> Self { 10 pub const fn new() -> Self {
10 Self { 11 Self {
11 buf: [0; N],
12 start: 0, 12 start: 0,
13 end: 0, 13 end: 0,
14 empty: true, 14 empty: true,
15 } 15 }
16 } 16 }
17 17
18 pub fn push_buf(&mut self) -> &mut [u8] { 18 pub fn push_buf(&mut self) -> Range<usize> {
19 if self.start == self.end && !self.empty { 19 if self.start == self.end && !self.empty {
20 trace!(" ringbuf: push_buf empty"); 20 trace!(" ringbuf: push_buf empty");
21 return &mut self.buf[..0]; 21 return 0..0;
22 } 22 }
23 23
24 let n = if self.start <= self.end { 24 let n = if self.start <= self.end {
25 self.buf.len() - self.end 25 N - self.end
26 } else { 26 } else {
27 self.start - self.end 27 self.start - self.end
28 }; 28 };
29 29
30 trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); 30 trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
31 &mut self.buf[self.end..self.end + n] 31 self.end..self.end + n
32 } 32 }
33 33
34 pub fn push(&mut self, n: usize) { 34 pub fn push(&mut self, n: usize) {
@@ -41,20 +41,20 @@ impl<const N: usize> RingBuffer<N> {
41 self.empty = false; 41 self.empty = false;
42 } 42 }
43 43
44 pub fn pop_buf(&mut self) -> &mut [u8] { 44 pub fn pop_buf(&mut self) -> Range<usize> {
45 if self.empty { 45 if self.empty {
46 trace!(" ringbuf: pop_buf empty"); 46 trace!(" ringbuf: pop_buf empty");
47 return &mut self.buf[..0]; 47 return 0..0;
48 } 48 }
49 49
50 let n = if self.end <= self.start { 50 let n = if self.end <= self.start {
51 self.buf.len() - self.start 51 N - self.start
52 } else { 52 } else {
53 self.end - self.start 53 self.end - self.start
54 }; 54 };
55 55
56 trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); 56 trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
57 &mut self.buf[self.start..self.start + n] 57 self.start..self.start + n
58 } 58 }
59 59
60 pub fn pop(&mut self, n: usize) { 60 pub fn pop(&mut self, n: usize) {
@@ -93,8 +93,8 @@ impl<const N: usize> RingBuffer<N> {
93 } 93 }
94 94
95 fn wrap(&self, n: usize) -> usize { 95 fn wrap(&self, n: usize) -> usize {
96 assert!(n <= self.buf.len()); 96 assert!(n <= N);
97 if n == self.buf.len() { 97 if n == N {
98 0 98 0
99 } else { 99 } else {
100 n 100 n
@@ -110,37 +110,29 @@ mod tests {
110 fn push_pop() { 110 fn push_pop() {
111 let mut rb: RingBuffer<4> = RingBuffer::new(); 111 let mut rb: RingBuffer<4> = RingBuffer::new();
112 let buf = rb.push_buf(); 112 let buf = rb.push_buf();
113 assert_eq!(4, buf.len()); 113 assert_eq!(0..4, buf);
114 buf[0] = 1;
115 buf[1] = 2;
116 buf[2] = 3;
117 buf[3] = 4;
118 rb.push(4); 114 rb.push(4);
119 115
120 let buf = rb.pop_buf(); 116 let buf = rb.pop_buf();
121 assert_eq!(4, buf.len()); 117 assert_eq!(0..4, buf);
122 assert_eq!(1, buf[0]);
123 rb.pop(1); 118 rb.pop(1);
124 119
125 let buf = rb.pop_buf(); 120 let buf = rb.pop_buf();
126 assert_eq!(3, buf.len()); 121 assert_eq!(1..4, buf);
127 assert_eq!(2, buf[0]);
128 rb.pop(1); 122 rb.pop(1);
129 123
130 let buf = rb.pop_buf(); 124 let buf = rb.pop_buf();
131 assert_eq!(2, buf.len()); 125 assert_eq!(2..4, buf);
132 assert_eq!(3, buf[0]);
133 rb.pop(1); 126 rb.pop(1);
134 127
135 let buf = rb.pop_buf(); 128 let buf = rb.pop_buf();
136 assert_eq!(1, buf.len()); 129 assert_eq!(3..4, buf);
137 assert_eq!(4, buf[0]);
138 rb.pop(1); 130 rb.pop(1);
139 131
140 let buf = rb.pop_buf(); 132 let buf = rb.pop_buf();
141 assert_eq!(0, buf.len()); 133 assert_eq!(0..0, buf);
142 134
143 let buf = rb.push_buf(); 135 let buf = rb.push_buf();
144 assert_eq!(4, buf.len()); 136 assert_eq!(0..4, buf);
145 } 137 }
146} 138}
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
new file mode 100644
index 000000000..f704cbd5d
--- /dev/null
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -0,0 +1,260 @@
1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19
20use core::cell::RefCell;
21use core::future::poll_fn;
22use core::marker::PhantomData;
23use core::task::{Context, Poll};
24
25use crate::blocking_mutex::raw::RawMutex;
26use crate::blocking_mutex::Mutex;
27use crate::waitqueue::WakerRegistration;
28
29/// A bounded zero-copy channel for communicating between asynchronous tasks
30/// with backpressure.
31///
32/// The channel will buffer up to the provided number of messages. Once the
33/// buffer is full, attempts to `send` new messages will wait until a message is
34/// received from the channel.
35///
36/// All data sent will become available in the same order as it was sent.
37///
38/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
39/// an `&mut T`.
40pub struct Channel<'a, M: RawMutex, T> {
41 buf: *mut T,
42 phantom: PhantomData<&'a mut T>,
43 state: Mutex<M, RefCell<State>>,
44}
45
46impl<'a, M: RawMutex, T> Channel<'a, M, T> {
47 /// Initialize a new [`Channel`].
48 ///
49 /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
50 /// channel's capacity.
51 pub fn new(buf: &'a mut [T]) -> Self {
52 let len = buf.len();
53 assert!(len != 0);
54
55 Self {
56 buf: buf.as_mut_ptr(),
57 phantom: PhantomData,
58 state: Mutex::new(RefCell::new(State {
59 len,
60 front: 0,
61 back: 0,
62 full: false,
63 send_waker: WakerRegistration::new(),
64 receive_waker: WakerRegistration::new(),
65 })),
66 }
67 }
68
69 /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
70 ///
71 /// Further Senders and Receivers can be created through [`Sender::borrow`] and
72 /// [`Receiver::borrow`] respectively.
73 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
74 (Sender { channel: self }, Receiver { channel: self })
75 }
76}
77
78/// Send-only access to a [`Channel`].
79pub struct Sender<'a, M: RawMutex, T> {
80 channel: &'a Channel<'a, M, T>,
81}
82
83impl<'a, M: RawMutex, T> Sender<'a, M, T> {
84 /// Creates one further [`Sender`] over the same channel.
85 pub fn borrow(&mut self) -> Sender<'_, M, T> {
86 Sender { channel: self.channel }
87 }
88
89 /// Attempts to send a value over the channel.
90 pub fn try_send(&mut self) -> Option<&mut T> {
91 self.channel.state.lock(|s| {
92 let s = &mut *s.borrow_mut();
93 match s.push_index() {
94 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
95 None => None,
96 }
97 })
98 }
99
100 /// Attempts to send a value over the channel.
101 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
102 self.channel.state.lock(|s| {
103 let s = &mut *s.borrow_mut();
104 match s.push_index() {
105 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
106 None => {
107 s.receive_waker.register(cx.waker());
108 Poll::Pending
109 }
110 }
111 })
112 }
113
114 /// Asynchronously send a value over the channel.
115 pub async fn send(&mut self) -> &mut T {
116 let i = poll_fn(|cx| {
117 self.channel.state.lock(|s| {
118 let s = &mut *s.borrow_mut();
119 match s.push_index() {
120 Some(i) => Poll::Ready(i),
121 None => {
122 s.receive_waker.register(cx.waker());
123 Poll::Pending
124 }
125 }
126 })
127 })
128 .await;
129 unsafe { &mut *self.channel.buf.add(i) }
130 }
131
132 /// Notify the channel that the sending of the value has been finalized.
133 pub fn send_done(&mut self) {
134 self.channel.state.lock(|s| s.borrow_mut().push_done())
135 }
136}
137
138/// Receive-only access to a [`Channel`].
139pub struct Receiver<'a, M: RawMutex, T> {
140 channel: &'a Channel<'a, M, T>,
141}
142
143impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
144 /// Creates one further [`Sender`] over the same channel.
145 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
146 Receiver { channel: self.channel }
147 }
148
149 /// Attempts to receive a value over the channel.
150 pub fn try_receive(&mut self) -> Option<&mut T> {
151 self.channel.state.lock(|s| {
152 let s = &mut *s.borrow_mut();
153 match s.pop_index() {
154 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
155 None => None,
156 }
157 })
158 }
159
160 /// Attempts to asynchronously receive a value over the channel.
161 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
162 self.channel.state.lock(|s| {
163 let s = &mut *s.borrow_mut();
164 match s.pop_index() {
165 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
166 None => {
167 s.send_waker.register(cx.waker());
168 Poll::Pending
169 }
170 }
171 })
172 }
173
174 /// Asynchronously receive a value over the channel.
175 pub async fn receive(&mut self) -> &mut T {
176 let i = poll_fn(|cx| {
177 self.channel.state.lock(|s| {
178 let s = &mut *s.borrow_mut();
179 match s.pop_index() {
180 Some(i) => Poll::Ready(i),
181 None => {
182 s.send_waker.register(cx.waker());
183 Poll::Pending
184 }
185 }
186 })
187 })
188 .await;
189 unsafe { &mut *self.channel.buf.add(i) }
190 }
191
192 /// Notify the channel that the receiving of the value has been finalized.
193 pub fn receive_done(&mut self) {
194 self.channel.state.lock(|s| s.borrow_mut().pop_done())
195 }
196}
197
198struct State {
199 len: usize,
200
201 /// Front index. Always 0..=(N-1)
202 front: usize,
203 /// Back index. Always 0..=(N-1).
204 back: usize,
205
206 /// Used to distinguish "empty" and "full" cases when `front == back`.
207 /// May only be `true` if `front == back`, always `false` otherwise.
208 full: bool,
209
210 send_waker: WakerRegistration,
211 receive_waker: WakerRegistration,
212}
213
214impl State {
215 fn increment(&self, i: usize) -> usize {
216 if i + 1 == self.len {
217 0
218 } else {
219 i + 1
220 }
221 }
222
223 fn is_full(&self) -> bool {
224 self.full
225 }
226
227 fn is_empty(&self) -> bool {
228 self.front == self.back && !self.full
229 }
230
231 fn push_index(&mut self) -> Option<usize> {
232 match self.is_full() {
233 true => None,
234 false => Some(self.back),
235 }
236 }
237
238 fn push_done(&mut self) {
239 assert!(!self.is_full());
240 self.back = self.increment(self.back);
241 if self.back == self.front {
242 self.full = true;
243 }
244 self.send_waker.wake();
245 }
246
247 fn pop_index(&mut self) -> Option<usize> {
248 match self.is_empty() {
249 true => None,
250 false => Some(self.front),
251 }
252 }
253
254 fn pop_done(&mut self) {
255 assert!(!self.is_empty());
256 self.front = self.increment(self.front);
257 self.full = false;
258 self.receive_waker.wake();
259 }
260}