aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorRuben De Smet <[email protected]>2023-08-11 12:04:30 +0200
committerDario Nieuwenhuis <[email protected]>2023-09-04 22:13:27 +0200
commita2656f402b1c59461cec5f5dc685b2692119b996 (patch)
tree44d7d511c0101f042fbc0c85e55038da47f5c53e /embassy-sync
parent3295ec94e5ea26e75a9debe17071808a64f9e517 (diff)
Move embassy-net-driver-channel::zerocopy_channel to embassy_sync::zero_copy_channel
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/src/lib.rs1
-rw-r--r--embassy-sync/src/zero_copy_channel.rs209
2 files changed, 210 insertions, 0 deletions
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 53d95d081..48a7b13f6 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -17,3 +17,4 @@ pub mod pipe;
17pub mod pubsub; 17pub mod pubsub;
18pub mod signal; 18pub mod signal;
19pub mod waitqueue; 19pub mod waitqueue;
20pub mod zero_copy_channel;
diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs
new file mode 100644
index 000000000..3701ccf1a
--- /dev/null
+++ b/embassy-sync/src/zero_copy_channel.rs
@@ -0,0 +1,209 @@
1use core::cell::RefCell;
2use core::future::poll_fn;
3use core::marker::PhantomData;
4use core::task::{Context, Poll};
5
6use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex;
8use crate::waitqueue::WakerRegistration;
9
10pub struct Channel<'a, M: RawMutex, T> {
11 buf: *mut T,
12 phantom: PhantomData<&'a mut T>,
13 state: Mutex<M, RefCell<State>>,
14}
15
16impl<'a, M: RawMutex, T> Channel<'a, M, T> {
17 pub fn new(buf: &'a mut [T]) -> Self {
18 let len = buf.len();
19 assert!(len != 0);
20
21 Self {
22 buf: buf.as_mut_ptr(),
23 phantom: PhantomData,
24 state: Mutex::new(RefCell::new(State {
25 len,
26 front: 0,
27 back: 0,
28 full: false,
29 send_waker: WakerRegistration::new(),
30 recv_waker: WakerRegistration::new(),
31 })),
32 }
33 }
34
35 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
36 (Sender { channel: self }, Receiver { channel: self })
37 }
38}
39
40pub struct Sender<'a, M: RawMutex, T> {
41 channel: &'a Channel<'a, M, T>,
42}
43
44impl<'a, M: RawMutex, T> Sender<'a, M, T> {
45 pub fn borrow(&mut self) -> Sender<'_, M, T> {
46 Sender { channel: self.channel }
47 }
48
49 pub fn try_send(&mut self) -> Option<&mut T> {
50 self.channel.state.lock(|s| {
51 let s = &mut *s.borrow_mut();
52 match s.push_index() {
53 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
54 None => None,
55 }
56 })
57 }
58
59 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
60 self.channel.state.lock(|s| {
61 let s = &mut *s.borrow_mut();
62 match s.push_index() {
63 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
64 None => {
65 s.recv_waker.register(cx.waker());
66 Poll::Pending
67 }
68 }
69 })
70 }
71
72 pub async fn send(&mut self) -> &mut T {
73 let i = poll_fn(|cx| {
74 self.channel.state.lock(|s| {
75 let s = &mut *s.borrow_mut();
76 match s.push_index() {
77 Some(i) => Poll::Ready(i),
78 None => {
79 s.recv_waker.register(cx.waker());
80 Poll::Pending
81 }
82 }
83 })
84 })
85 .await;
86 unsafe { &mut *self.channel.buf.add(i) }
87 }
88
89 pub fn send_done(&mut self) {
90 self.channel.state.lock(|s| s.borrow_mut().push_done())
91 }
92}
93pub struct Receiver<'a, M: RawMutex, T> {
94 channel: &'a Channel<'a, M, T>,
95}
96
97impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
98 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
99 Receiver { channel: self.channel }
100 }
101
102 pub fn try_recv(&mut self) -> Option<&mut T> {
103 self.channel.state.lock(|s| {
104 let s = &mut *s.borrow_mut();
105 match s.pop_index() {
106 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
107 None => None,
108 }
109 })
110 }
111
112 pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> {
113 self.channel.state.lock(|s| {
114 let s = &mut *s.borrow_mut();
115 match s.pop_index() {
116 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
117 None => {
118 s.send_waker.register(cx.waker());
119 Poll::Pending
120 }
121 }
122 })
123 }
124
125 pub async fn recv(&mut self) -> &mut T {
126 let i = poll_fn(|cx| {
127 self.channel.state.lock(|s| {
128 let s = &mut *s.borrow_mut();
129 match s.pop_index() {
130 Some(i) => Poll::Ready(i),
131 None => {
132 s.send_waker.register(cx.waker());
133 Poll::Pending
134 }
135 }
136 })
137 })
138 .await;
139 unsafe { &mut *self.channel.buf.add(i) }
140 }
141
142 pub fn recv_done(&mut self) {
143 self.channel.state.lock(|s| s.borrow_mut().pop_done())
144 }
145}
146
147struct State {
148 len: usize,
149
150 /// Front index. Always 0..=(N-1)
151 front: usize,
152 /// Back index. Always 0..=(N-1).
153 back: usize,
154
155 /// Used to distinguish "empty" and "full" cases when `front == back`.
156 /// May only be `true` if `front == back`, always `false` otherwise.
157 full: bool,
158
159 send_waker: WakerRegistration,
160 recv_waker: WakerRegistration,
161}
162
163impl State {
164 fn increment(&self, i: usize) -> usize {
165 if i + 1 == self.len {
166 0
167 } else {
168 i + 1
169 }
170 }
171
172 fn is_full(&self) -> bool {
173 self.full
174 }
175
176 fn is_empty(&self) -> bool {
177 self.front == self.back && !self.full
178 }
179
180 fn push_index(&mut self) -> Option<usize> {
181 match self.is_full() {
182 true => None,
183 false => Some(self.back),
184 }
185 }
186
187 fn push_done(&mut self) {
188 assert!(!self.is_full());
189 self.back = self.increment(self.back);
190 if self.back == self.front {
191 self.full = true;
192 }
193 self.send_waker.wake();
194 }
195
196 fn pop_index(&mut self) -> Option<usize> {
197 match self.is_empty() {
198 true => None,
199 false => Some(self.front),
200 }
201 }
202
203 fn pop_done(&mut self) {
204 assert!(!self.is_empty());
205 self.front = self.increment(self.front);
206 self.full = false;
207 self.recv_waker.wake();
208 }
209}