aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuntc <[email protected]>2021-07-09 12:04:22 +1000
committerhuntc <[email protected]>2021-07-15 12:31:52 +1000
commit5f87c7808c9d896a2a2d5e064a58ed2ac23a4348 (patch)
tree4e3eec5de3d804bb6b90684cca912936195b1ea2
parent56b3e927fe2c779c4bc6d556ff9fc836d2a4f2d4 (diff)
Remove the cell and trait
At the expense of exposing the channel types again. We do this as we want to avoid using dyn traits given their overhead for embedded environments.
-rw-r--r--embassy/src/util/mpsc.rs169
-rw-r--r--examples/nrf/src/bin/mpsc.rs8
2 files changed, 90 insertions, 87 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index fc8006c38..65e4bf7b7 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -51,55 +51,33 @@ use super::CriticalSectionMutex;
51use super::Mutex; 51use super::Mutex;
52use super::ThreadModeMutex; 52use super::ThreadModeMutex;
53 53
54/// A ChannelCell permits a channel to be shared between senders and their receivers.
55// Derived from UnsafeCell.
56#[repr(transparent)]
57pub struct ChannelCell<T: ?Sized> {
58 _value: T,
59}
60
61impl<T> ChannelCell<T> {
62 #[inline(always)]
63 pub const fn new<U>(value: T) -> ChannelCell<T>
64 where
65 T: ChannelLike<U>,
66 {
67 ChannelCell { _value: value }
68 }
69}
70
71impl<T: ?Sized> ChannelCell<T> {
72 #[inline(always)]
73 const fn get(&self) -> *mut T {
74 // As per UnsafeCell:
75 // We can just cast the pointer from `ChannelCell<T>` to `T` because of
76 // #[repr(transparent)]. This exploits libstd's special status, there is
77 // no guarantee for user code that this will work in future versions of the compiler!
78 self as *const ChannelCell<T> as *const T as *mut T
79 }
80}
81
82/// Send values to the associated `Receiver`. 54/// Send values to the associated `Receiver`.
83/// 55///
84/// Instances are created by the [`split`](split) function. 56/// Instances are created by the [`split`](split) function.
85pub struct Sender<'ch, T> { 57pub struct Sender<'ch, M, T, const N: usize>
86 channel: &'ch ChannelCell<dyn ChannelLike<T>>, 58where
59 M: Mutex<Data = ()>,
60{
61 channel: &'ch Channel<M, T, N>,
87} 62}
88 63
89// Safe to pass the sender around 64// Safe to pass the sender around
90unsafe impl<'ch, T> Send for Sender<'ch, T> {} 65unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {}
91unsafe impl<'ch, T> Sync for Sender<'ch, T> {} 66unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {}
92 67
93/// Receive values from the associated `Sender`. 68/// Receive values from the associated `Sender`.
94/// 69///
95/// Instances are created by the [`split`](split) function. 70/// Instances are created by the [`split`](split) function.
96pub struct Receiver<'ch, T> { 71pub struct Receiver<'ch, M, T, const N: usize>
97 channel: &'ch ChannelCell<dyn ChannelLike<T>>, 72where
73 M: Mutex<Data = ()>,
74{
75 channel: &'ch Channel<M, T, N>,
98} 76}
99 77
100// Safe to pass the receiver around 78// Safe to pass the receiver around
101unsafe impl<'ch, T> Send for Receiver<'ch, T> {} 79unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {}
102unsafe impl<'ch, T> Sync for Receiver<'ch, T> {} 80unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {}
103 81
104/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. 82/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
105/// 83///
@@ -125,18 +103,26 @@ unsafe impl<'ch, T> Sync for Receiver<'ch, T> {}
125/// mpsc::split(&channel) 103/// mpsc::split(&channel)
126/// }; 104/// };
127/// ``` 105/// ```
128pub fn split<T>(channel: &ChannelCell<dyn ChannelLike<T>>) -> (Sender<T>, Receiver<T>) { 106pub fn split<M, T, const N: usize>(
107 channel: &Channel<M, T, N>,
108) -> (Sender<M, T, N>, Receiver<M, T, N>)
109where
110 M: Mutex<Data = ()>,
111{
129 let sender = Sender { channel: &channel }; 112 let sender = Sender { channel: &channel };
130 let receiver = Receiver { channel: &channel }; 113 let receiver = Receiver { channel: &channel };
131 { 114 {
132 let c = unsafe { &mut *channel.get() }; 115 let c = channel.get();
133 c.register_receiver(); 116 c.register_receiver();
134 c.register_sender(); 117 c.register_sender();
135 } 118 }
136 (sender, receiver) 119 (sender, receiver)
137} 120}
138 121
139impl<'ch, T> Receiver<'ch, T> { 122impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
123where
124 M: Mutex<Data = ()>,
125{
140 /// Receives the next value for this receiver. 126 /// Receives the next value for this receiver.
141 /// 127 ///
142 /// This method returns `None` if the channel has been closed and there are 128 /// This method returns `None` if the channel has been closed and there are
@@ -162,7 +148,7 @@ impl<'ch, T> Receiver<'ch, T> {
162 /// This method will either receive a message from the channel immediately or return an error 148 /// This method will either receive a message from the channel immediately or return an error
163 /// if the channel is empty. 149 /// if the channel is empty.
164 pub fn try_recv(&self) -> Result<T, TryRecvError> { 150 pub fn try_recv(&self) -> Result<T, TryRecvError> {
165 unsafe { &mut *self.channel.get() }.try_recv() 151 self.channel.get().try_recv()
166 } 152 }
167 153
168 /// Closes the receiving half of a channel without dropping it. 154 /// Closes the receiving half of a channel without dropping it.
@@ -176,11 +162,14 @@ impl<'ch, T> Receiver<'ch, T> {
176 /// until those are released. 162 /// until those are released.
177 /// 163 ///
178 pub fn close(&mut self) { 164 pub fn close(&mut self) {
179 unsafe { &mut *self.channel.get() }.close() 165 self.channel.get().close()
180 } 166 }
181} 167}
182 168
183impl<'ch, T> Future for Receiver<'ch, T> { 169impl<'ch, M, T, const N: usize> Future for Receiver<'ch, M, T, N>
170where
171 M: Mutex<Data = ()>,
172{
184 type Output = Option<T>; 173 type Output = Option<T>;
185 174
186 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 175 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -188,20 +177,26 @@ impl<'ch, T> Future for Receiver<'ch, T> {
188 Ok(v) => Poll::Ready(Some(v)), 177 Ok(v) => Poll::Ready(Some(v)),
189 Err(TryRecvError::Closed) => Poll::Ready(None), 178 Err(TryRecvError::Closed) => Poll::Ready(None),
190 Err(TryRecvError::Empty) => { 179 Err(TryRecvError::Empty) => {
191 unsafe { &mut *self.channel.get() }.set_receiver_waker(cx.waker().clone()); 180 self.channel.get().set_receiver_waker(cx.waker().clone());
192 Poll::Pending 181 Poll::Pending
193 } 182 }
194 } 183 }
195 } 184 }
196} 185}
197 186
198impl<'ch, T> Drop for Receiver<'ch, T> { 187impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
188where
189 M: Mutex<Data = ()>,
190{
199 fn drop(&mut self) { 191 fn drop(&mut self) {
200 unsafe { &mut *self.channel.get() }.deregister_receiver() 192 self.channel.get().deregister_receiver()
201 } 193 }
202} 194}
203 195
204impl<'ch, T> Sender<'ch, T> { 196impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
197where
198 M: Mutex<Data = ()>,
199{
205 /// Sends a value, waiting until there is capacity. 200 /// Sends a value, waiting until there is capacity.
206 /// 201 ///
207 /// A successful send occurs when it is determined that the other end of the 202 /// A successful send occurs when it is determined that the other end of the
@@ -249,7 +244,7 @@ impl<'ch, T> Sender<'ch, T> {
249 /// [`channel`]: channel 244 /// [`channel`]: channel
250 /// [`close`]: Receiver::close 245 /// [`close`]: Receiver::close
251 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { 246 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
252 unsafe { &mut *self.channel.get() }.try_send(message) 247 self.channel.get().try_send(message)
253 } 248 }
254 249
255 /// Completes when the receiver has dropped. 250 /// Completes when the receiver has dropped.
@@ -270,16 +265,22 @@ impl<'ch, T> Sender<'ch, T> {
270 /// [`Receiver`]: crate::sync::mpsc::Receiver 265 /// [`Receiver`]: crate::sync::mpsc::Receiver
271 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close 266 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
272 pub fn is_closed(&self) -> bool { 267 pub fn is_closed(&self) -> bool {
273 unsafe { &mut *self.channel.get() }.is_closed() 268 self.channel.get().is_closed()
274 } 269 }
275} 270}
276 271
277struct SendFuture<'ch, T> { 272struct SendFuture<'ch, M, T, const N: usize>
278 sender: Sender<'ch, T>, 273where
274 M: Mutex<Data = ()>,
275{
276 sender: Sender<'ch, M, T, N>,
279 message: UnsafeCell<T>, 277 message: UnsafeCell<T>,
280} 278}
281 279
282impl<'ch, T> Future for SendFuture<'ch, T> { 280impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
281where
282 M: Mutex<Data = ()>,
283{
283 type Output = Result<(), SendError<T>>; 284 type Output = Result<(), SendError<T>>;
284 285
285 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 286 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -287,7 +288,10 @@ impl<'ch, T> Future for SendFuture<'ch, T> {
287 Ok(..) => Poll::Ready(Ok(())), 288 Ok(..) => Poll::Ready(Ok(())),
288 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), 289 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
289 Err(TrySendError::Full(..)) => { 290 Err(TrySendError::Full(..)) => {
290 unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone()); 291 self.sender
292 .channel
293 .get()
294 .set_senders_waker(cx.waker().clone());
291 Poll::Pending 295 Poll::Pending
292 // Note we leave the existing UnsafeCell contents - they still 296 // Note we leave the existing UnsafeCell contents - they still
293 // contain the original message. We could create another UnsafeCell 297 // contain the original message. We could create another UnsafeCell
@@ -297,33 +301,48 @@ impl<'ch, T> Future for SendFuture<'ch, T> {
297 } 301 }
298} 302}
299 303
300struct CloseFuture<'ch, T> { 304struct CloseFuture<'ch, M, T, const N: usize>
301 sender: Sender<'ch, T>, 305where
306 M: Mutex<Data = ()>,
307{
308 sender: Sender<'ch, M, T, N>,
302} 309}
303 310
304impl<'ch, T> Future for CloseFuture<'ch, T> { 311impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
312where
313 M: Mutex<Data = ()>,
314{
305 type Output = (); 315 type Output = ();
306 316
307 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 317 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
308 if self.sender.is_closed() { 318 if self.sender.is_closed() {
309 Poll::Ready(()) 319 Poll::Ready(())
310 } else { 320 } else {
311 unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone()); 321 self.sender
322 .channel
323 .get()
324 .set_senders_waker(cx.waker().clone());
312 Poll::Pending 325 Poll::Pending
313 } 326 }
314 } 327 }
315} 328}
316 329
317impl<'ch, T> Drop for Sender<'ch, T> { 330impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
331where
332 M: Mutex<Data = ()>,
333{
318 fn drop(&mut self) { 334 fn drop(&mut self) {
319 unsafe { &mut *self.channel.get() }.deregister_sender() 335 self.channel.get().deregister_sender()
320 } 336 }
321} 337}
322 338
323impl<'ch, T> Clone for Sender<'ch, T> { 339impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
340where
341 M: Mutex<Data = ()>,
342{
324 #[allow(clippy::clone_double_ref)] 343 #[allow(clippy::clone_double_ref)]
325 fn clone(&self) -> Self { 344 fn clone(&self) -> Self {
326 unsafe { &mut *self.channel.get() }.register_sender(); 345 self.channel.get().register_sender();
327 Sender { 346 Sender {
328 channel: self.channel.clone(), 347 channel: self.channel.clone(),
329 } 348 }
@@ -378,28 +397,6 @@ impl<T> fmt::Display for TrySendError<T> {
378 } 397 }
379} 398}
380 399
381pub trait ChannelLike<T> {
382 fn try_recv(&mut self) -> Result<T, TryRecvError>;
383
384 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>>;
385
386 fn close(&mut self);
387
388 fn is_closed(&mut self) -> bool;
389
390 fn register_receiver(&mut self);
391
392 fn deregister_receiver(&mut self);
393
394 fn register_sender(&mut self);
395
396 fn deregister_sender(&mut self);
397
398 fn set_receiver_waker(&mut self, receiver_waker: Waker);
399
400 fn set_senders_waker(&mut self, senders_waker: Waker);
401}
402
403struct ChannelState<T, const N: usize> { 400struct ChannelState<T, const N: usize> {
404 buf: [MaybeUninit<UnsafeCell<T>>; N], 401 buf: [MaybeUninit<UnsafeCell<T>>; N],
405 read_pos: usize, 402 read_pos: usize,
@@ -505,10 +502,16 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
505 } 502 }
506} 503}
507 504
508impl<M, T, const N: usize> ChannelLike<T> for Channel<M, T, N> 505impl<M, T, const N: usize> Channel<M, T, N>
509where 506where
510 M: Mutex<Data = ()>, 507 M: Mutex<Data = ()>,
511{ 508{
509 fn get(&self) -> &mut Self {
510 let const_ptr = self as *const Self;
511 let mut_ptr = const_ptr as *mut Self;
512 unsafe { &mut *mut_ptr }
513 }
514
512 fn try_recv(&mut self) -> Result<T, TryRecvError> { 515 fn try_recv(&mut self) -> Result<T, TryRecvError> {
513 let state = &mut self.state; 516 let state = &mut self.state;
514 self.mutex.lock(|_| { 517 self.mutex.lock(|_| {
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
index eafa29e60..6a0f8f471 100644
--- a/examples/nrf/src/bin/mpsc.rs
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -11,7 +11,7 @@ mod example_common;
11use defmt::panic; 11use defmt::panic;
12use embassy::executor::Spawner; 12use embassy::executor::Spawner;
13use embassy::time::{Duration, Timer}; 13use embassy::time::{Duration, Timer};
14use embassy::util::mpsc::{ChannelCell, TryRecvError}; 14use embassy::util::mpsc::TryRecvError;
15use embassy::util::{mpsc, Forever}; 15use embassy::util::{mpsc, Forever};
16use embassy_nrf::gpio::{Level, Output, OutputDrive}; 16use embassy_nrf::gpio::{Level, Output, OutputDrive};
17use embassy_nrf::Peripherals; 17use embassy_nrf::Peripherals;
@@ -23,10 +23,10 @@ enum LedState {
23 Off, 23 Off,
24} 24}
25 25
26static CHANNEL: Forever<ChannelCell<Channel<WithThreadModeOnly, LedState, 1>>> = Forever::new(); 26static CHANNEL: Forever<Channel<WithThreadModeOnly, LedState, 1>> = Forever::new();
27 27
28#[embassy::task(pool_size = 1)] 28#[embassy::task(pool_size = 1)]
29async fn my_task(sender: Sender<'static, LedState>) { 29async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) {
30 loop { 30 loop {
31 let _ = sender.send(LedState::On).await; 31 let _ = sender.send(LedState::On).await;
32 Timer::after(Duration::from_secs(1)).await; 32 Timer::after(Duration::from_secs(1)).await;
@@ -39,7 +39,7 @@ async fn my_task(sender: Sender<'static, LedState>) {
39async fn main(spawner: Spawner, p: Peripherals) { 39async fn main(spawner: Spawner, p: Peripherals) {
40 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); 40 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
41 41
42 let channel = CHANNEL.put(ChannelCell::new(Channel::with_thread_mode_only())); 42 let channel = CHANNEL.put(Channel::with_thread_mode_only());
43 let (sender, mut receiver) = mpsc::split(channel); 43 let (sender, mut receiver) = mpsc::split(channel);
44 44
45 spawner.spawn(my_task(sender)).unwrap(); 45 spawner.spawn(my_task(sender)).unwrap();