aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy-net-driver-channel/src/lib.rs237
-rw-r--r--embassy-sync/src/lib.rs1
-rw-r--r--embassy-sync/src/zero_copy_channel.rs209
3 files changed, 223 insertions, 224 deletions
diff --git a/embassy-net-driver-channel/src/lib.rs b/embassy-net-driver-channel/src/lib.rs
index f2aa6b254..e8cd66f8d 100644
--- a/embassy-net-driver-channel/src/lib.rs
+++ b/embassy-net-driver-channel/src/lib.rs
@@ -14,6 +14,7 @@ use embassy_net_driver::{Capabilities, LinkState, Medium};
14use embassy_sync::blocking_mutex::raw::NoopRawMutex; 14use embassy_sync::blocking_mutex::raw::NoopRawMutex;
15use embassy_sync::blocking_mutex::Mutex; 15use embassy_sync::blocking_mutex::Mutex;
16use embassy_sync::waitqueue::WakerRegistration; 16use embassy_sync::waitqueue::WakerRegistration;
17use embassy_sync::zero_copy_channel;
17 18
18pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> { 19pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> {
19 rx: [PacketBuf<MTU>; N_RX], 20 rx: [PacketBuf<MTU>; N_RX],
@@ -34,8 +35,8 @@ impl<const MTU: usize, const N_RX: usize, const N_TX: usize> State<MTU, N_RX, N_
34} 35}
35 36
36struct StateInner<'d, const MTU: usize> { 37struct StateInner<'d, const MTU: usize> {
37 rx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>, 38 rx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>,
38 tx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>, 39 tx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>,
39 shared: Mutex<NoopRawMutex, RefCell<Shared>>, 40 shared: Mutex<NoopRawMutex, RefCell<Shared>>,
40} 41}
41 42
@@ -47,8 +48,8 @@ struct Shared {
47} 48}
48 49
49pub struct Runner<'d, const MTU: usize> { 50pub struct Runner<'d, const MTU: usize> {
50 tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>, 51 tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
51 rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>, 52 rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
52 shared: &'d Mutex<NoopRawMutex, RefCell<Shared>>, 53 shared: &'d Mutex<NoopRawMutex, RefCell<Shared>>,
53} 54}
54 55
@@ -58,11 +59,11 @@ pub struct StateRunner<'d> {
58} 59}
59 60
60pub struct RxRunner<'d, const MTU: usize> { 61pub struct RxRunner<'d, const MTU: usize> {
61 rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>, 62 rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
62} 63}
63 64
64pub struct TxRunner<'d, const MTU: usize> { 65pub struct TxRunner<'d, const MTU: usize> {
65 tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>, 66 tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
66} 67}
67 68
68impl<'d, const MTU: usize> Runner<'d, MTU> { 69impl<'d, const MTU: usize> Runner<'d, MTU> {
@@ -243,8 +244,8 @@ pub fn new<'d, const MTU: usize, const N_RX: usize, const N_TX: usize>(
243 let state_uninit: *mut MaybeUninit<StateInner<'d, MTU>> = 244 let state_uninit: *mut MaybeUninit<StateInner<'d, MTU>> =
244 (&mut state.inner as *mut MaybeUninit<StateInner<'static, MTU>>).cast(); 245 (&mut state.inner as *mut MaybeUninit<StateInner<'static, MTU>>).cast();
245 let state = unsafe { &mut *state_uninit }.write(StateInner { 246 let state = unsafe { &mut *state_uninit }.write(StateInner {
246 rx: zerocopy_channel::Channel::new(&mut state.rx[..]), 247 rx: zero_copy_channel::Channel::new(&mut state.rx[..]),
247 tx: zerocopy_channel::Channel::new(&mut state.tx[..]), 248 tx: zero_copy_channel::Channel::new(&mut state.tx[..]),
248 shared: Mutex::new(RefCell::new(Shared { 249 shared: Mutex::new(RefCell::new(Shared {
249 link_state: LinkState::Down, 250 link_state: LinkState::Down,
250 hardware_address, 251 hardware_address,
@@ -282,8 +283,8 @@ impl<const MTU: usize> PacketBuf<MTU> {
282} 283}
283 284
284pub struct Device<'d, const MTU: usize> { 285pub struct Device<'d, const MTU: usize> {
285 rx: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>, 286 rx: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
286 tx: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>, 287 tx: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
287 shared: &'d Mutex<NoopRawMutex, RefCell<Shared>>, 288 shared: &'d Mutex<NoopRawMutex, RefCell<Shared>>,
288 caps: Capabilities, 289 caps: Capabilities,
289} 290}
@@ -328,7 +329,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> {
328} 329}
329 330
330pub struct RxToken<'a, const MTU: usize> { 331pub struct RxToken<'a, const MTU: usize> {
331 rx: zerocopy_channel::Receiver<'a, NoopRawMutex, PacketBuf<MTU>>, 332 rx: zero_copy_channel::Receiver<'a, NoopRawMutex, PacketBuf<MTU>>,
332} 333}
333 334
334impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { 335impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> {
@@ -345,7 +346,7 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> {
345} 346}
346 347
347pub struct TxToken<'a, const MTU: usize> { 348pub struct TxToken<'a, const MTU: usize> {
348 tx: zerocopy_channel::Sender<'a, NoopRawMutex, PacketBuf<MTU>>, 349 tx: zero_copy_channel::Sender<'a, NoopRawMutex, PacketBuf<MTU>>,
349} 350}
350 351
351impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> { 352impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> {
@@ -361,215 +362,3 @@ impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> {
361 r 362 r
362 } 363 }
363} 364}
364
365mod zerocopy_channel {
366 use core::cell::RefCell;
367 use core::future::poll_fn;
368 use core::marker::PhantomData;
369 use core::task::{Context, Poll};
370
371 use embassy_sync::blocking_mutex::raw::RawMutex;
372 use embassy_sync::blocking_mutex::Mutex;
373 use embassy_sync::waitqueue::WakerRegistration;
374
375 pub struct Channel<'a, M: RawMutex, T> {
376 buf: *mut T,
377 phantom: PhantomData<&'a mut T>,
378 state: Mutex<M, RefCell<State>>,
379 }
380
381 impl<'a, M: RawMutex, T> Channel<'a, M, T> {
382 pub fn new(buf: &'a mut [T]) -> Self {
383 let len = buf.len();
384 assert!(len != 0);
385
386 Self {
387 buf: buf.as_mut_ptr(),
388 phantom: PhantomData,
389 state: Mutex::new(RefCell::new(State {
390 len,
391 front: 0,
392 back: 0,
393 full: false,
394 send_waker: WakerRegistration::new(),
395 recv_waker: WakerRegistration::new(),
396 })),
397 }
398 }
399
400 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
401 (Sender { channel: self }, Receiver { channel: self })
402 }
403 }
404
405 pub struct Sender<'a, M: RawMutex, T> {
406 channel: &'a Channel<'a, M, T>,
407 }
408
409 impl<'a, M: RawMutex, T> Sender<'a, M, T> {
410 pub fn borrow(&mut self) -> Sender<'_, M, T> {
411 Sender { channel: self.channel }
412 }
413
414 pub fn try_send(&mut self) -> Option<&mut T> {
415 self.channel.state.lock(|s| {
416 let s = &mut *s.borrow_mut();
417 match s.push_index() {
418 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
419 None => None,
420 }
421 })
422 }
423
424 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
425 self.channel.state.lock(|s| {
426 let s = &mut *s.borrow_mut();
427 match s.push_index() {
428 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
429 None => {
430 s.recv_waker.register(cx.waker());
431 Poll::Pending
432 }
433 }
434 })
435 }
436
437 pub async fn send(&mut self) -> &mut T {
438 let i = poll_fn(|cx| {
439 self.channel.state.lock(|s| {
440 let s = &mut *s.borrow_mut();
441 match s.push_index() {
442 Some(i) => Poll::Ready(i),
443 None => {
444 s.recv_waker.register(cx.waker());
445 Poll::Pending
446 }
447 }
448 })
449 })
450 .await;
451 unsafe { &mut *self.channel.buf.add(i) }
452 }
453
454 pub fn send_done(&mut self) {
455 self.channel.state.lock(|s| s.borrow_mut().push_done())
456 }
457 }
458 pub struct Receiver<'a, M: RawMutex, T> {
459 channel: &'a Channel<'a, M, T>,
460 }
461
462 impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
463 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
464 Receiver { channel: self.channel }
465 }
466
467 pub fn try_recv(&mut self) -> Option<&mut T> {
468 self.channel.state.lock(|s| {
469 let s = &mut *s.borrow_mut();
470 match s.pop_index() {
471 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
472 None => None,
473 }
474 })
475 }
476
477 pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> {
478 self.channel.state.lock(|s| {
479 let s = &mut *s.borrow_mut();
480 match s.pop_index() {
481 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
482 None => {
483 s.send_waker.register(cx.waker());
484 Poll::Pending
485 }
486 }
487 })
488 }
489
490 pub async fn recv(&mut self) -> &mut T {
491 let i = poll_fn(|cx| {
492 self.channel.state.lock(|s| {
493 let s = &mut *s.borrow_mut();
494 match s.pop_index() {
495 Some(i) => Poll::Ready(i),
496 None => {
497 s.send_waker.register(cx.waker());
498 Poll::Pending
499 }
500 }
501 })
502 })
503 .await;
504 unsafe { &mut *self.channel.buf.add(i) }
505 }
506
507 pub fn recv_done(&mut self) {
508 self.channel.state.lock(|s| s.borrow_mut().pop_done())
509 }
510 }
511
512 struct State {
513 len: usize,
514
515 /// Front index. Always 0..=(N-1)
516 front: usize,
517 /// Back index. Always 0..=(N-1).
518 back: usize,
519
520 /// Used to distinguish "empty" and "full" cases when `front == back`.
521 /// May only be `true` if `front == back`, always `false` otherwise.
522 full: bool,
523
524 send_waker: WakerRegistration,
525 recv_waker: WakerRegistration,
526 }
527
528 impl State {
529 fn increment(&self, i: usize) -> usize {
530 if i + 1 == self.len {
531 0
532 } else {
533 i + 1
534 }
535 }
536
537 fn is_full(&self) -> bool {
538 self.full
539 }
540
541 fn is_empty(&self) -> bool {
542 self.front == self.back && !self.full
543 }
544
545 fn push_index(&mut self) -> Option<usize> {
546 match self.is_full() {
547 true => None,
548 false => Some(self.back),
549 }
550 }
551
552 fn push_done(&mut self) {
553 assert!(!self.is_full());
554 self.back = self.increment(self.back);
555 if self.back == self.front {
556 self.full = true;
557 }
558 self.send_waker.wake();
559 }
560
561 fn pop_index(&mut self) -> Option<usize> {
562 match self.is_empty() {
563 true => None,
564 false => Some(self.front),
565 }
566 }
567
568 fn pop_done(&mut self) {
569 assert!(!self.is_empty());
570 self.front = self.increment(self.front);
571 self.full = false;
572 self.recv_waker.wake();
573 }
574 }
575}
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 53d95d081..48a7b13f6 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -17,3 +17,4 @@ pub mod pipe;
17pub mod pubsub; 17pub mod pubsub;
18pub mod signal; 18pub mod signal;
19pub mod waitqueue; 19pub mod waitqueue;
20pub mod zero_copy_channel;
diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs
new file mode 100644
index 000000000..3701ccf1a
--- /dev/null
+++ b/embassy-sync/src/zero_copy_channel.rs
@@ -0,0 +1,209 @@
1use core::cell::RefCell;
2use core::future::poll_fn;
3use core::marker::PhantomData;
4use core::task::{Context, Poll};
5
6use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex;
8use crate::waitqueue::WakerRegistration;
9
10pub struct Channel<'a, M: RawMutex, T> {
11 buf: *mut T,
12 phantom: PhantomData<&'a mut T>,
13 state: Mutex<M, RefCell<State>>,
14}
15
16impl<'a, M: RawMutex, T> Channel<'a, M, T> {
17 pub fn new(buf: &'a mut [T]) -> Self {
18 let len = buf.len();
19 assert!(len != 0);
20
21 Self {
22 buf: buf.as_mut_ptr(),
23 phantom: PhantomData,
24 state: Mutex::new(RefCell::new(State {
25 len,
26 front: 0,
27 back: 0,
28 full: false,
29 send_waker: WakerRegistration::new(),
30 recv_waker: WakerRegistration::new(),
31 })),
32 }
33 }
34
35 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
36 (Sender { channel: self }, Receiver { channel: self })
37 }
38}
39
40pub struct Sender<'a, M: RawMutex, T> {
41 channel: &'a Channel<'a, M, T>,
42}
43
44impl<'a, M: RawMutex, T> Sender<'a, M, T> {
45 pub fn borrow(&mut self) -> Sender<'_, M, T> {
46 Sender { channel: self.channel }
47 }
48
49 pub fn try_send(&mut self) -> Option<&mut T> {
50 self.channel.state.lock(|s| {
51 let s = &mut *s.borrow_mut();
52 match s.push_index() {
53 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
54 None => None,
55 }
56 })
57 }
58
59 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
60 self.channel.state.lock(|s| {
61 let s = &mut *s.borrow_mut();
62 match s.push_index() {
63 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
64 None => {
65 s.recv_waker.register(cx.waker());
66 Poll::Pending
67 }
68 }
69 })
70 }
71
72 pub async fn send(&mut self) -> &mut T {
73 let i = poll_fn(|cx| {
74 self.channel.state.lock(|s| {
75 let s = &mut *s.borrow_mut();
76 match s.push_index() {
77 Some(i) => Poll::Ready(i),
78 None => {
79 s.recv_waker.register(cx.waker());
80 Poll::Pending
81 }
82 }
83 })
84 })
85 .await;
86 unsafe { &mut *self.channel.buf.add(i) }
87 }
88
89 pub fn send_done(&mut self) {
90 self.channel.state.lock(|s| s.borrow_mut().push_done())
91 }
92}
93pub struct Receiver<'a, M: RawMutex, T> {
94 channel: &'a Channel<'a, M, T>,
95}
96
97impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
98 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
99 Receiver { channel: self.channel }
100 }
101
102 pub fn try_recv(&mut self) -> Option<&mut T> {
103 self.channel.state.lock(|s| {
104 let s = &mut *s.borrow_mut();
105 match s.pop_index() {
106 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
107 None => None,
108 }
109 })
110 }
111
112 pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> {
113 self.channel.state.lock(|s| {
114 let s = &mut *s.borrow_mut();
115 match s.pop_index() {
116 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
117 None => {
118 s.send_waker.register(cx.waker());
119 Poll::Pending
120 }
121 }
122 })
123 }
124
125 pub async fn recv(&mut self) -> &mut T {
126 let i = poll_fn(|cx| {
127 self.channel.state.lock(|s| {
128 let s = &mut *s.borrow_mut();
129 match s.pop_index() {
130 Some(i) => Poll::Ready(i),
131 None => {
132 s.send_waker.register(cx.waker());
133 Poll::Pending
134 }
135 }
136 })
137 })
138 .await;
139 unsafe { &mut *self.channel.buf.add(i) }
140 }
141
142 pub fn recv_done(&mut self) {
143 self.channel.state.lock(|s| s.borrow_mut().pop_done())
144 }
145}
146
147struct State {
148 len: usize,
149
150 /// Front index. Always 0..=(N-1)
151 front: usize,
152 /// Back index. Always 0..=(N-1).
153 back: usize,
154
155 /// Used to distinguish "empty" and "full" cases when `front == back`.
156 /// May only be `true` if `front == back`, always `false` otherwise.
157 full: bool,
158
159 send_waker: WakerRegistration,
160 recv_waker: WakerRegistration,
161}
162
163impl State {
164 fn increment(&self, i: usize) -> usize {
165 if i + 1 == self.len {
166 0
167 } else {
168 i + 1
169 }
170 }
171
172 fn is_full(&self) -> bool {
173 self.full
174 }
175
176 fn is_empty(&self) -> bool {
177 self.front == self.back && !self.full
178 }
179
180 fn push_index(&mut self) -> Option<usize> {
181 match self.is_full() {
182 true => None,
183 false => Some(self.back),
184 }
185 }
186
187 fn push_done(&mut self) {
188 assert!(!self.is_full());
189 self.back = self.increment(self.back);
190 if self.back == self.front {
191 self.full = true;
192 }
193 self.send_waker.wake();
194 }
195
196 fn pop_index(&mut self) -> Option<usize> {
197 match self.is_empty() {
198 true => None,
199 false => Some(self.front),
200 }
201 }
202
203 fn pop_done(&mut self) {
204 assert!(!self.is_empty());
205 self.front = self.increment(self.front);
206 self.full = false;
207 self.recv_waker.wake();
208 }
209}