diff options
Diffstat (limited to 'embassy-sync/src/zerocopy_channel.rs')
| -rw-r--r-- | embassy-sync/src/zerocopy_channel.rs | 260 |
1 files changed, 260 insertions, 0 deletions
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs new file mode 100644 index 000000000..f704cbd5d --- /dev/null +++ b/embassy-sync/src/zerocopy_channel.rs | |||
| @@ -0,0 +1,260 @@ | |||
| 1 | //! A zero-copy queue for sending values between asynchronous tasks. | ||
| 2 | //! | ||
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | ||
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | ||
| 5 | //! | ||
| 6 | //! Receivers are competing for messages. So a message that is received by | ||
| 7 | //! one receiver is not received by any other. | ||
| 8 | //! | ||
| 9 | //! This queue takes a Mutex type so that various | ||
| 10 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 11 | //! for single-core Cortex-M targets where messages are only passed | ||
| 12 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 13 | //! can also be used for single-core targets where messages are to be | ||
| 14 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 15 | //! | ||
| 16 | //! This module provides a bounded channel that has a limit on the number of | ||
| 17 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 18 | //! another message will result in an error being returned. | ||
| 19 | |||
| 20 | use core::cell::RefCell; | ||
| 21 | use core::future::poll_fn; | ||
| 22 | use core::marker::PhantomData; | ||
| 23 | use core::task::{Context, Poll}; | ||
| 24 | |||
| 25 | use crate::blocking_mutex::raw::RawMutex; | ||
| 26 | use crate::blocking_mutex::Mutex; | ||
| 27 | use crate::waitqueue::WakerRegistration; | ||
| 28 | |||
| 29 | /// A bounded zero-copy channel for communicating between asynchronous tasks | ||
| 30 | /// with backpressure. | ||
| 31 | /// | ||
| 32 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 33 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 34 | /// received from the channel. | ||
| 35 | /// | ||
| 36 | /// All data sent will become available in the same order as it was sent. | ||
| 37 | /// | ||
| 38 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through | ||
| 39 | /// an `&mut T`. | ||
| 40 | pub struct Channel<'a, M: RawMutex, T> { | ||
| 41 | buf: *mut T, | ||
| 42 | phantom: PhantomData<&'a mut T>, | ||
| 43 | state: Mutex<M, RefCell<State>>, | ||
| 44 | } | ||
| 45 | |||
| 46 | impl<'a, M: RawMutex, T> Channel<'a, M, T> { | ||
| 47 | /// Initialize a new [`Channel`]. | ||
| 48 | /// | ||
| 49 | /// The provided buffer will be used and reused by the channel's logic, and thus dictates the | ||
| 50 | /// channel's capacity. | ||
| 51 | pub fn new(buf: &'a mut [T]) -> Self { | ||
| 52 | let len = buf.len(); | ||
| 53 | assert!(len != 0); | ||
| 54 | |||
| 55 | Self { | ||
| 56 | buf: buf.as_mut_ptr(), | ||
| 57 | phantom: PhantomData, | ||
| 58 | state: Mutex::new(RefCell::new(State { | ||
| 59 | len, | ||
| 60 | front: 0, | ||
| 61 | back: 0, | ||
| 62 | full: false, | ||
| 63 | send_waker: WakerRegistration::new(), | ||
| 64 | receive_waker: WakerRegistration::new(), | ||
| 65 | })), | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Creates a [`Sender`] and [`Receiver`] from an existing channel. | ||
| 70 | /// | ||
| 71 | /// Further Senders and Receivers can be created through [`Sender::borrow`] and | ||
| 72 | /// [`Receiver::borrow`] respectively. | ||
| 73 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { | ||
| 74 | (Sender { channel: self }, Receiver { channel: self }) | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 78 | /// Send-only access to a [`Channel`]. | ||
| 79 | pub struct Sender<'a, M: RawMutex, T> { | ||
| 80 | channel: &'a Channel<'a, M, T>, | ||
| 81 | } | ||
| 82 | |||
| 83 | impl<'a, M: RawMutex, T> Sender<'a, M, T> { | ||
| 84 | /// Creates one further [`Sender`] over the same channel. | ||
| 85 | pub fn borrow(&mut self) -> Sender<'_, M, T> { | ||
| 86 | Sender { channel: self.channel } | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Attempts to send a value over the channel. | ||
| 90 | pub fn try_send(&mut self) -> Option<&mut T> { | ||
| 91 | self.channel.state.lock(|s| { | ||
| 92 | let s = &mut *s.borrow_mut(); | ||
| 93 | match s.push_index() { | ||
| 94 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 95 | None => None, | ||
| 96 | } | ||
| 97 | }) | ||
| 98 | } | ||
| 99 | |||
| 100 | /// Attempts to send a value over the channel. | ||
| 101 | pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { | ||
| 102 | self.channel.state.lock(|s| { | ||
| 103 | let s = &mut *s.borrow_mut(); | ||
| 104 | match s.push_index() { | ||
| 105 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 106 | None => { | ||
| 107 | s.receive_waker.register(cx.waker()); | ||
| 108 | Poll::Pending | ||
| 109 | } | ||
| 110 | } | ||
| 111 | }) | ||
| 112 | } | ||
| 113 | |||
| 114 | /// Asynchronously send a value over the channel. | ||
| 115 | pub async fn send(&mut self) -> &mut T { | ||
| 116 | let i = poll_fn(|cx| { | ||
| 117 | self.channel.state.lock(|s| { | ||
| 118 | let s = &mut *s.borrow_mut(); | ||
| 119 | match s.push_index() { | ||
| 120 | Some(i) => Poll::Ready(i), | ||
| 121 | None => { | ||
| 122 | s.receive_waker.register(cx.waker()); | ||
| 123 | Poll::Pending | ||
| 124 | } | ||
| 125 | } | ||
| 126 | }) | ||
| 127 | }) | ||
| 128 | .await; | ||
| 129 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 130 | } | ||
| 131 | |||
| 132 | /// Notify the channel that the sending of the value has been finalized. | ||
| 133 | pub fn send_done(&mut self) { | ||
| 134 | self.channel.state.lock(|s| s.borrow_mut().push_done()) | ||
| 135 | } | ||
| 136 | } | ||
| 137 | |||
| 138 | /// Receive-only access to a [`Channel`]. | ||
| 139 | pub struct Receiver<'a, M: RawMutex, T> { | ||
| 140 | channel: &'a Channel<'a, M, T>, | ||
| 141 | } | ||
| 142 | |||
| 143 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | ||
| 144 | /// Creates one further [`Sender`] over the same channel. | ||
| 145 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { | ||
| 146 | Receiver { channel: self.channel } | ||
| 147 | } | ||
| 148 | |||
| 149 | /// Attempts to receive a value over the channel. | ||
| 150 | pub fn try_receive(&mut self) -> Option<&mut T> { | ||
| 151 | self.channel.state.lock(|s| { | ||
| 152 | let s = &mut *s.borrow_mut(); | ||
| 153 | match s.pop_index() { | ||
| 154 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 155 | None => None, | ||
| 156 | } | ||
| 157 | }) | ||
| 158 | } | ||
| 159 | |||
| 160 | /// Attempts to asynchronously receive a value over the channel. | ||
| 161 | pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { | ||
| 162 | self.channel.state.lock(|s| { | ||
| 163 | let s = &mut *s.borrow_mut(); | ||
| 164 | match s.pop_index() { | ||
| 165 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 166 | None => { | ||
| 167 | s.send_waker.register(cx.waker()); | ||
| 168 | Poll::Pending | ||
| 169 | } | ||
| 170 | } | ||
| 171 | }) | ||
| 172 | } | ||
| 173 | |||
| 174 | /// Asynchronously receive a value over the channel. | ||
| 175 | pub async fn receive(&mut self) -> &mut T { | ||
| 176 | let i = poll_fn(|cx| { | ||
| 177 | self.channel.state.lock(|s| { | ||
| 178 | let s = &mut *s.borrow_mut(); | ||
| 179 | match s.pop_index() { | ||
| 180 | Some(i) => Poll::Ready(i), | ||
| 181 | None => { | ||
| 182 | s.send_waker.register(cx.waker()); | ||
| 183 | Poll::Pending | ||
| 184 | } | ||
| 185 | } | ||
| 186 | }) | ||
| 187 | }) | ||
| 188 | .await; | ||
| 189 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 190 | } | ||
| 191 | |||
| 192 | /// Notify the channel that the receiving of the value has been finalized. | ||
| 193 | pub fn receive_done(&mut self) { | ||
| 194 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) | ||
| 195 | } | ||
| 196 | } | ||
| 197 | |||
| 198 | struct State { | ||
| 199 | len: usize, | ||
| 200 | |||
| 201 | /// Front index. Always 0..=(N-1) | ||
| 202 | front: usize, | ||
| 203 | /// Back index. Always 0..=(N-1). | ||
| 204 | back: usize, | ||
| 205 | |||
| 206 | /// Used to distinguish "empty" and "full" cases when `front == back`. | ||
| 207 | /// May only be `true` if `front == back`, always `false` otherwise. | ||
| 208 | full: bool, | ||
| 209 | |||
| 210 | send_waker: WakerRegistration, | ||
| 211 | receive_waker: WakerRegistration, | ||
| 212 | } | ||
| 213 | |||
| 214 | impl State { | ||
| 215 | fn increment(&self, i: usize) -> usize { | ||
| 216 | if i + 1 == self.len { | ||
| 217 | 0 | ||
| 218 | } else { | ||
| 219 | i + 1 | ||
| 220 | } | ||
| 221 | } | ||
| 222 | |||
| 223 | fn is_full(&self) -> bool { | ||
| 224 | self.full | ||
| 225 | } | ||
| 226 | |||
| 227 | fn is_empty(&self) -> bool { | ||
| 228 | self.front == self.back && !self.full | ||
| 229 | } | ||
| 230 | |||
| 231 | fn push_index(&mut self) -> Option<usize> { | ||
| 232 | match self.is_full() { | ||
| 233 | true => None, | ||
| 234 | false => Some(self.back), | ||
| 235 | } | ||
| 236 | } | ||
| 237 | |||
| 238 | fn push_done(&mut self) { | ||
| 239 | assert!(!self.is_full()); | ||
| 240 | self.back = self.increment(self.back); | ||
| 241 | if self.back == self.front { | ||
| 242 | self.full = true; | ||
| 243 | } | ||
| 244 | self.send_waker.wake(); | ||
| 245 | } | ||
| 246 | |||
| 247 | fn pop_index(&mut self) -> Option<usize> { | ||
| 248 | match self.is_empty() { | ||
| 249 | true => None, | ||
| 250 | false => Some(self.front), | ||
| 251 | } | ||
| 252 | } | ||
| 253 | |||
| 254 | fn pop_done(&mut self) { | ||
| 255 | assert!(!self.is_empty()); | ||
| 256 | self.front = self.increment(self.front); | ||
| 257 | self.full = false; | ||
| 258 | self.receive_waker.wake(); | ||
| 259 | } | ||
| 260 | } | ||
