aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/zerocopy_channel.rs
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2023-09-04 22:16:28 +0200
committerDario Nieuwenhuis <[email protected]>2023-09-04 22:16:28 +0200
commit615882ebd67f4e7e60fb8aa1505b1272655c4fa4 (patch)
treeb8b79f097f86d85d5a88ff83f27841a8a21b70ef /embassy-sync/src/zerocopy_channel.rs
parent6e38b0764253ba07d3106ce3d57c2fd3509d7beb (diff)
Rename zero_copy -> zerocopy.
Diffstat (limited to 'embassy-sync/src/zerocopy_channel.rs')
-rw-r--r--embassy-sync/src/zerocopy_channel.rs260
1 files changed, 260 insertions, 0 deletions
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
new file mode 100644
index 000000000..f704cbd5d
--- /dev/null
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -0,0 +1,260 @@
1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19
20use core::cell::RefCell;
21use core::future::poll_fn;
22use core::marker::PhantomData;
23use core::task::{Context, Poll};
24
25use crate::blocking_mutex::raw::RawMutex;
26use crate::blocking_mutex::Mutex;
27use crate::waitqueue::WakerRegistration;
28
29/// A bounded zero-copy channel for communicating between asynchronous tasks
30/// with backpressure.
31///
32/// The channel will buffer up to the provided number of messages. Once the
33/// buffer is full, attempts to `send` new messages will wait until a message is
34/// received from the channel.
35///
36/// All data sent will become available in the same order as it was sent.
37///
38/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
39/// an `&mut T`.
40pub struct Channel<'a, M: RawMutex, T> {
41 buf: *mut T,
42 phantom: PhantomData<&'a mut T>,
43 state: Mutex<M, RefCell<State>>,
44}
45
46impl<'a, M: RawMutex, T> Channel<'a, M, T> {
47 /// Initialize a new [`Channel`].
48 ///
49 /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
50 /// channel's capacity.
51 pub fn new(buf: &'a mut [T]) -> Self {
52 let len = buf.len();
53 assert!(len != 0);
54
55 Self {
56 buf: buf.as_mut_ptr(),
57 phantom: PhantomData,
58 state: Mutex::new(RefCell::new(State {
59 len,
60 front: 0,
61 back: 0,
62 full: false,
63 send_waker: WakerRegistration::new(),
64 receive_waker: WakerRegistration::new(),
65 })),
66 }
67 }
68
69 /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
70 ///
71 /// Further Senders and Receivers can be created through [`Sender::borrow`] and
72 /// [`Receiver::borrow`] respectively.
73 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
74 (Sender { channel: self }, Receiver { channel: self })
75 }
76}
77
78/// Send-only access to a [`Channel`].
79pub struct Sender<'a, M: RawMutex, T> {
80 channel: &'a Channel<'a, M, T>,
81}
82
83impl<'a, M: RawMutex, T> Sender<'a, M, T> {
84 /// Creates one further [`Sender`] over the same channel.
85 pub fn borrow(&mut self) -> Sender<'_, M, T> {
86 Sender { channel: self.channel }
87 }
88
89 /// Attempts to send a value over the channel.
90 pub fn try_send(&mut self) -> Option<&mut T> {
91 self.channel.state.lock(|s| {
92 let s = &mut *s.borrow_mut();
93 match s.push_index() {
94 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
95 None => None,
96 }
97 })
98 }
99
100 /// Attempts to send a value over the channel.
101 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
102 self.channel.state.lock(|s| {
103 let s = &mut *s.borrow_mut();
104 match s.push_index() {
105 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
106 None => {
107 s.receive_waker.register(cx.waker());
108 Poll::Pending
109 }
110 }
111 })
112 }
113
114 /// Asynchronously send a value over the channel.
115 pub async fn send(&mut self) -> &mut T {
116 let i = poll_fn(|cx| {
117 self.channel.state.lock(|s| {
118 let s = &mut *s.borrow_mut();
119 match s.push_index() {
120 Some(i) => Poll::Ready(i),
121 None => {
122 s.receive_waker.register(cx.waker());
123 Poll::Pending
124 }
125 }
126 })
127 })
128 .await;
129 unsafe { &mut *self.channel.buf.add(i) }
130 }
131
132 /// Notify the channel that the sending of the value has been finalized.
133 pub fn send_done(&mut self) {
134 self.channel.state.lock(|s| s.borrow_mut().push_done())
135 }
136}
137
138/// Receive-only access to a [`Channel`].
139pub struct Receiver<'a, M: RawMutex, T> {
140 channel: &'a Channel<'a, M, T>,
141}
142
143impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
144 /// Creates one further [`Sender`] over the same channel.
145 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
146 Receiver { channel: self.channel }
147 }
148
149 /// Attempts to receive a value over the channel.
150 pub fn try_receive(&mut self) -> Option<&mut T> {
151 self.channel.state.lock(|s| {
152 let s = &mut *s.borrow_mut();
153 match s.pop_index() {
154 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
155 None => None,
156 }
157 })
158 }
159
160 /// Attempts to asynchronously receive a value over the channel.
161 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
162 self.channel.state.lock(|s| {
163 let s = &mut *s.borrow_mut();
164 match s.pop_index() {
165 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
166 None => {
167 s.send_waker.register(cx.waker());
168 Poll::Pending
169 }
170 }
171 })
172 }
173
174 /// Asynchronously receive a value over the channel.
175 pub async fn receive(&mut self) -> &mut T {
176 let i = poll_fn(|cx| {
177 self.channel.state.lock(|s| {
178 let s = &mut *s.borrow_mut();
179 match s.pop_index() {
180 Some(i) => Poll::Ready(i),
181 None => {
182 s.send_waker.register(cx.waker());
183 Poll::Pending
184 }
185 }
186 })
187 })
188 .await;
189 unsafe { &mut *self.channel.buf.add(i) }
190 }
191
192 /// Notify the channel that the receiving of the value has been finalized.
193 pub fn receive_done(&mut self) {
194 self.channel.state.lock(|s| s.borrow_mut().pop_done())
195 }
196}
197
198struct State {
199 len: usize,
200
201 /// Front index. Always 0..=(N-1)
202 front: usize,
203 /// Back index. Always 0..=(N-1).
204 back: usize,
205
206 /// Used to distinguish "empty" and "full" cases when `front == back`.
207 /// May only be `true` if `front == back`, always `false` otherwise.
208 full: bool,
209
210 send_waker: WakerRegistration,
211 receive_waker: WakerRegistration,
212}
213
214impl State {
215 fn increment(&self, i: usize) -> usize {
216 if i + 1 == self.len {
217 0
218 } else {
219 i + 1
220 }
221 }
222
223 fn is_full(&self) -> bool {
224 self.full
225 }
226
227 fn is_empty(&self) -> bool {
228 self.front == self.back && !self.full
229 }
230
231 fn push_index(&mut self) -> Option<usize> {
232 match self.is_full() {
233 true => None,
234 false => Some(self.back),
235 }
236 }
237
238 fn push_done(&mut self) {
239 assert!(!self.is_full());
240 self.back = self.increment(self.back);
241 if self.back == self.front {
242 self.full = true;
243 }
244 self.send_waker.wake();
245 }
246
247 fn pop_index(&mut self) -> Option<usize> {
248 match self.is_empty() {
249 true => None,
250 false => Some(self.front),
251 }
252 }
253
254 fn pop_done(&mut self) {
255 assert!(!self.is_empty());
256 self.front = self.increment(self.front);
257 self.full = false;
258 self.receive_waker.wake();
259 }
260}