aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2023-09-04 23:38:11 +0200
committerGitHub <[email protected]>2023-09-04 23:38:11 +0200
commitce662766be80d75b5f9294ae4b792f7db252ccd3 (patch)
tree3f785aa5a74823a00c980ce7e24ae62d5e89ae0f
parenta03b6be693293f8a646a96eb3f26c3e79e0882fe (diff)
parent615882ebd67f4e7e60fb8aa1505b1272655c4fa4 (diff)
Merge pull request #1771 from rubdos/zero-copy-channel
embassy_sync::zero_copy_channel
-rw-r--r--embassy-net-driver-channel/src/lib.rs235
-rw-r--r--embassy-sync/src/lib.rs1
-rw-r--r--embassy-sync/src/zerocopy_channel.rs260
3 files changed, 273 insertions, 223 deletions
diff --git a/embassy-net-driver-channel/src/lib.rs b/embassy-net-driver-channel/src/lib.rs
index f2aa6b254..bf7ae5217 100644
--- a/embassy-net-driver-channel/src/lib.rs
+++ b/embassy-net-driver-channel/src/lib.rs
@@ -14,6 +14,7 @@ use embassy_net_driver::{Capabilities, LinkState, Medium};
14use embassy_sync::blocking_mutex::raw::NoopRawMutex; 14use embassy_sync::blocking_mutex::raw::NoopRawMutex;
15use embassy_sync::blocking_mutex::Mutex; 15use embassy_sync::blocking_mutex::Mutex;
16use embassy_sync::waitqueue::WakerRegistration; 16use embassy_sync::waitqueue::WakerRegistration;
17use embassy_sync::zerocopy_channel;
17 18
18pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> { 19pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> {
19 rx: [PacketBuf<MTU>; N_RX], 20 rx: [PacketBuf<MTU>; N_RX],
@@ -130,24 +131,24 @@ impl<'d, const MTU: usize> Runner<'d, MTU> {
130 } 131 }
131 132
132 pub async fn tx_buf(&mut self) -> &mut [u8] { 133 pub async fn tx_buf(&mut self) -> &mut [u8] {
133 let p = self.tx_chan.recv().await; 134 let p = self.tx_chan.receive().await;
134 &mut p.buf[..p.len] 135 &mut p.buf[..p.len]
135 } 136 }
136 137
137 pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { 138 pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> {
138 let p = self.tx_chan.try_recv()?; 139 let p = self.tx_chan.try_receive()?;
139 Some(&mut p.buf[..p.len]) 140 Some(&mut p.buf[..p.len])
140 } 141 }
141 142
142 pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { 143 pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> {
143 match self.tx_chan.poll_recv(cx) { 144 match self.tx_chan.poll_receive(cx) {
144 Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), 145 Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]),
145 Poll::Pending => Poll::Pending, 146 Poll::Pending => Poll::Pending,
146 } 147 }
147 } 148 }
148 149
149 pub fn tx_done(&mut self) { 150 pub fn tx_done(&mut self) {
150 self.tx_chan.recv_done(); 151 self.tx_chan.receive_done();
151 } 152 }
152} 153}
153 154
@@ -204,24 +205,24 @@ impl<'d, const MTU: usize> RxRunner<'d, MTU> {
204 205
205impl<'d, const MTU: usize> TxRunner<'d, MTU> { 206impl<'d, const MTU: usize> TxRunner<'d, MTU> {
206 pub async fn tx_buf(&mut self) -> &mut [u8] { 207 pub async fn tx_buf(&mut self) -> &mut [u8] {
207 let p = self.tx_chan.recv().await; 208 let p = self.tx_chan.receive().await;
208 &mut p.buf[..p.len] 209 &mut p.buf[..p.len]
209 } 210 }
210 211
211 pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { 212 pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> {
212 let p = self.tx_chan.try_recv()?; 213 let p = self.tx_chan.try_receive()?;
213 Some(&mut p.buf[..p.len]) 214 Some(&mut p.buf[..p.len])
214 } 215 }
215 216
216 pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { 217 pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> {
217 match self.tx_chan.poll_recv(cx) { 218 match self.tx_chan.poll_receive(cx) {
218 Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), 219 Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]),
219 Poll::Pending => Poll::Pending, 220 Poll::Pending => Poll::Pending,
220 } 221 }
221 } 222 }
222 223
223 pub fn tx_done(&mut self) { 224 pub fn tx_done(&mut self) {
224 self.tx_chan.recv_done(); 225 self.tx_chan.receive_done();
225 } 226 }
226} 227}
227 228
@@ -293,7 +294,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> {
293 type TxToken<'a> = TxToken<'a, MTU> where Self: 'a ; 294 type TxToken<'a> = TxToken<'a, MTU> where Self: 'a ;
294 295
295 fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { 296 fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
296 if self.rx.poll_recv(cx).is_ready() && self.tx.poll_send(cx).is_ready() { 297 if self.rx.poll_receive(cx).is_ready() && self.tx.poll_send(cx).is_ready() {
297 Some((RxToken { rx: self.rx.borrow() }, TxToken { tx: self.tx.borrow() })) 298 Some((RxToken { rx: self.rx.borrow() }, TxToken { tx: self.tx.borrow() }))
298 } else { 299 } else {
299 None 300 None
@@ -337,9 +338,9 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> {
337 F: FnOnce(&mut [u8]) -> R, 338 F: FnOnce(&mut [u8]) -> R,
338 { 339 {
339 // NOTE(unwrap): we checked the queue wasn't full when creating the token. 340 // NOTE(unwrap): we checked the queue wasn't full when creating the token.
340 let pkt = unwrap!(self.rx.try_recv()); 341 let pkt = unwrap!(self.rx.try_receive());
341 let r = f(&mut pkt.buf[..pkt.len]); 342 let r = f(&mut pkt.buf[..pkt.len]);
342 self.rx.recv_done(); 343 self.rx.receive_done();
343 r 344 r
344 } 345 }
345} 346}
@@ -361,215 +362,3 @@ impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> {
361 r 362 r
362 } 363 }
363} 364}
364
365mod zerocopy_channel {
366 use core::cell::RefCell;
367 use core::future::poll_fn;
368 use core::marker::PhantomData;
369 use core::task::{Context, Poll};
370
371 use embassy_sync::blocking_mutex::raw::RawMutex;
372 use embassy_sync::blocking_mutex::Mutex;
373 use embassy_sync::waitqueue::WakerRegistration;
374
375 pub struct Channel<'a, M: RawMutex, T> {
376 buf: *mut T,
377 phantom: PhantomData<&'a mut T>,
378 state: Mutex<M, RefCell<State>>,
379 }
380
381 impl<'a, M: RawMutex, T> Channel<'a, M, T> {
382 pub fn new(buf: &'a mut [T]) -> Self {
383 let len = buf.len();
384 assert!(len != 0);
385
386 Self {
387 buf: buf.as_mut_ptr(),
388 phantom: PhantomData,
389 state: Mutex::new(RefCell::new(State {
390 len,
391 front: 0,
392 back: 0,
393 full: false,
394 send_waker: WakerRegistration::new(),
395 recv_waker: WakerRegistration::new(),
396 })),
397 }
398 }
399
400 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
401 (Sender { channel: self }, Receiver { channel: self })
402 }
403 }
404
405 pub struct Sender<'a, M: RawMutex, T> {
406 channel: &'a Channel<'a, M, T>,
407 }
408
409 impl<'a, M: RawMutex, T> Sender<'a, M, T> {
410 pub fn borrow(&mut self) -> Sender<'_, M, T> {
411 Sender { channel: self.channel }
412 }
413
414 pub fn try_send(&mut self) -> Option<&mut T> {
415 self.channel.state.lock(|s| {
416 let s = &mut *s.borrow_mut();
417 match s.push_index() {
418 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
419 None => None,
420 }
421 })
422 }
423
424 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
425 self.channel.state.lock(|s| {
426 let s = &mut *s.borrow_mut();
427 match s.push_index() {
428 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
429 None => {
430 s.recv_waker.register(cx.waker());
431 Poll::Pending
432 }
433 }
434 })
435 }
436
437 pub async fn send(&mut self) -> &mut T {
438 let i = poll_fn(|cx| {
439 self.channel.state.lock(|s| {
440 let s = &mut *s.borrow_mut();
441 match s.push_index() {
442 Some(i) => Poll::Ready(i),
443 None => {
444 s.recv_waker.register(cx.waker());
445 Poll::Pending
446 }
447 }
448 })
449 })
450 .await;
451 unsafe { &mut *self.channel.buf.add(i) }
452 }
453
454 pub fn send_done(&mut self) {
455 self.channel.state.lock(|s| s.borrow_mut().push_done())
456 }
457 }
458 pub struct Receiver<'a, M: RawMutex, T> {
459 channel: &'a Channel<'a, M, T>,
460 }
461
462 impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
463 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
464 Receiver { channel: self.channel }
465 }
466
467 pub fn try_recv(&mut self) -> Option<&mut T> {
468 self.channel.state.lock(|s| {
469 let s = &mut *s.borrow_mut();
470 match s.pop_index() {
471 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
472 None => None,
473 }
474 })
475 }
476
477 pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> {
478 self.channel.state.lock(|s| {
479 let s = &mut *s.borrow_mut();
480 match s.pop_index() {
481 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
482 None => {
483 s.send_waker.register(cx.waker());
484 Poll::Pending
485 }
486 }
487 })
488 }
489
490 pub async fn recv(&mut self) -> &mut T {
491 let i = poll_fn(|cx| {
492 self.channel.state.lock(|s| {
493 let s = &mut *s.borrow_mut();
494 match s.pop_index() {
495 Some(i) => Poll::Ready(i),
496 None => {
497 s.send_waker.register(cx.waker());
498 Poll::Pending
499 }
500 }
501 })
502 })
503 .await;
504 unsafe { &mut *self.channel.buf.add(i) }
505 }
506
507 pub fn recv_done(&mut self) {
508 self.channel.state.lock(|s| s.borrow_mut().pop_done())
509 }
510 }
511
512 struct State {
513 len: usize,
514
515 /// Front index. Always 0..=(N-1)
516 front: usize,
517 /// Back index. Always 0..=(N-1).
518 back: usize,
519
520 /// Used to distinguish "empty" and "full" cases when `front == back`.
521 /// May only be `true` if `front == back`, always `false` otherwise.
522 full: bool,
523
524 send_waker: WakerRegistration,
525 recv_waker: WakerRegistration,
526 }
527
528 impl State {
529 fn increment(&self, i: usize) -> usize {
530 if i + 1 == self.len {
531 0
532 } else {
533 i + 1
534 }
535 }
536
537 fn is_full(&self) -> bool {
538 self.full
539 }
540
541 fn is_empty(&self) -> bool {
542 self.front == self.back && !self.full
543 }
544
545 fn push_index(&mut self) -> Option<usize> {
546 match self.is_full() {
547 true => None,
548 false => Some(self.back),
549 }
550 }
551
552 fn push_done(&mut self) {
553 assert!(!self.is_full());
554 self.back = self.increment(self.back);
555 if self.back == self.front {
556 self.full = true;
557 }
558 self.send_waker.wake();
559 }
560
561 fn pop_index(&mut self) -> Option<usize> {
562 match self.is_empty() {
563 true => None,
564 false => Some(self.front),
565 }
566 }
567
568 fn pop_done(&mut self) {
569 assert!(!self.is_empty());
570 self.front = self.increment(self.front);
571 self.full = false;
572 self.recv_waker.wake();
573 }
574 }
575}
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 53d95d081..8a9f841ee 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -17,3 +17,4 @@ pub mod pipe;
17pub mod pubsub; 17pub mod pubsub;
18pub mod signal; 18pub mod signal;
19pub mod waitqueue; 19pub mod waitqueue;
20pub mod zerocopy_channel;
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
new file mode 100644
index 000000000..f704cbd5d
--- /dev/null
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -0,0 +1,260 @@
1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19
20use core::cell::RefCell;
21use core::future::poll_fn;
22use core::marker::PhantomData;
23use core::task::{Context, Poll};
24
25use crate::blocking_mutex::raw::RawMutex;
26use crate::blocking_mutex::Mutex;
27use crate::waitqueue::WakerRegistration;
28
29/// A bounded zero-copy channel for communicating between asynchronous tasks
30/// with backpressure.
31///
32/// The channel will buffer up to the provided number of messages. Once the
33/// buffer is full, attempts to `send` new messages will wait until a message is
34/// received from the channel.
35///
36/// All data sent will become available in the same order as it was sent.
37///
38/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
39/// an `&mut T`.
40pub struct Channel<'a, M: RawMutex, T> {
41 buf: *mut T,
42 phantom: PhantomData<&'a mut T>,
43 state: Mutex<M, RefCell<State>>,
44}
45
46impl<'a, M: RawMutex, T> Channel<'a, M, T> {
47 /// Initialize a new [`Channel`].
48 ///
49 /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
50 /// channel's capacity.
51 pub fn new(buf: &'a mut [T]) -> Self {
52 let len = buf.len();
53 assert!(len != 0);
54
55 Self {
56 buf: buf.as_mut_ptr(),
57 phantom: PhantomData,
58 state: Mutex::new(RefCell::new(State {
59 len,
60 front: 0,
61 back: 0,
62 full: false,
63 send_waker: WakerRegistration::new(),
64 receive_waker: WakerRegistration::new(),
65 })),
66 }
67 }
68
69 /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
70 ///
71 /// Further Senders and Receivers can be created through [`Sender::borrow`] and
72 /// [`Receiver::borrow`] respectively.
73 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
74 (Sender { channel: self }, Receiver { channel: self })
75 }
76}
77
78/// Send-only access to a [`Channel`].
79pub struct Sender<'a, M: RawMutex, T> {
80 channel: &'a Channel<'a, M, T>,
81}
82
83impl<'a, M: RawMutex, T> Sender<'a, M, T> {
84 /// Creates one further [`Sender`] over the same channel.
85 pub fn borrow(&mut self) -> Sender<'_, M, T> {
86 Sender { channel: self.channel }
87 }
88
89 /// Attempts to send a value over the channel.
90 pub fn try_send(&mut self) -> Option<&mut T> {
91 self.channel.state.lock(|s| {
92 let s = &mut *s.borrow_mut();
93 match s.push_index() {
94 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
95 None => None,
96 }
97 })
98 }
99
100 /// Attempts to send a value over the channel.
101 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
102 self.channel.state.lock(|s| {
103 let s = &mut *s.borrow_mut();
104 match s.push_index() {
105 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
106 None => {
107 s.receive_waker.register(cx.waker());
108 Poll::Pending
109 }
110 }
111 })
112 }
113
114 /// Asynchronously send a value over the channel.
115 pub async fn send(&mut self) -> &mut T {
116 let i = poll_fn(|cx| {
117 self.channel.state.lock(|s| {
118 let s = &mut *s.borrow_mut();
119 match s.push_index() {
120 Some(i) => Poll::Ready(i),
121 None => {
122 s.receive_waker.register(cx.waker());
123 Poll::Pending
124 }
125 }
126 })
127 })
128 .await;
129 unsafe { &mut *self.channel.buf.add(i) }
130 }
131
132 /// Notify the channel that the sending of the value has been finalized.
133 pub fn send_done(&mut self) {
134 self.channel.state.lock(|s| s.borrow_mut().push_done())
135 }
136}
137
138/// Receive-only access to a [`Channel`].
139pub struct Receiver<'a, M: RawMutex, T> {
140 channel: &'a Channel<'a, M, T>,
141}
142
143impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
144 /// Creates one further [`Sender`] over the same channel.
145 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
146 Receiver { channel: self.channel }
147 }
148
149 /// Attempts to receive a value over the channel.
150 pub fn try_receive(&mut self) -> Option<&mut T> {
151 self.channel.state.lock(|s| {
152 let s = &mut *s.borrow_mut();
153 match s.pop_index() {
154 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
155 None => None,
156 }
157 })
158 }
159
160 /// Attempts to asynchronously receive a value over the channel.
161 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
162 self.channel.state.lock(|s| {
163 let s = &mut *s.borrow_mut();
164 match s.pop_index() {
165 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
166 None => {
167 s.send_waker.register(cx.waker());
168 Poll::Pending
169 }
170 }
171 })
172 }
173
174 /// Asynchronously receive a value over the channel.
175 pub async fn receive(&mut self) -> &mut T {
176 let i = poll_fn(|cx| {
177 self.channel.state.lock(|s| {
178 let s = &mut *s.borrow_mut();
179 match s.pop_index() {
180 Some(i) => Poll::Ready(i),
181 None => {
182 s.send_waker.register(cx.waker());
183 Poll::Pending
184 }
185 }
186 })
187 })
188 .await;
189 unsafe { &mut *self.channel.buf.add(i) }
190 }
191
192 /// Notify the channel that the receiving of the value has been finalized.
193 pub fn receive_done(&mut self) {
194 self.channel.state.lock(|s| s.borrow_mut().pop_done())
195 }
196}
197
198struct State {
199 len: usize,
200
201 /// Front index. Always 0..=(N-1)
202 front: usize,
203 /// Back index. Always 0..=(N-1).
204 back: usize,
205
206 /// Used to distinguish "empty" and "full" cases when `front == back`.
207 /// May only be `true` if `front == back`, always `false` otherwise.
208 full: bool,
209
210 send_waker: WakerRegistration,
211 receive_waker: WakerRegistration,
212}
213
214impl State {
215 fn increment(&self, i: usize) -> usize {
216 if i + 1 == self.len {
217 0
218 } else {
219 i + 1
220 }
221 }
222
223 fn is_full(&self) -> bool {
224 self.full
225 }
226
227 fn is_empty(&self) -> bool {
228 self.front == self.back && !self.full
229 }
230
231 fn push_index(&mut self) -> Option<usize> {
232 match self.is_full() {
233 true => None,
234 false => Some(self.back),
235 }
236 }
237
238 fn push_done(&mut self) {
239 assert!(!self.is_full());
240 self.back = self.increment(self.back);
241 if self.back == self.front {
242 self.full = true;
243 }
244 self.send_waker.wake();
245 }
246
247 fn pop_index(&mut self) -> Option<usize> {
248 match self.is_empty() {
249 true => None,
250 false => Some(self.front),
251 }
252 }
253
254 fn pop_done(&mut self) {
255 assert!(!self.is_empty());
256 self.front = self.increment(self.front);
257 self.full = false;
258 self.receive_waker.wake();
259 }
260}