aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuntc <[email protected]>2021-07-06 23:20:47 +1000
committerhuntc <[email protected]>2021-07-15 12:31:52 +1000
commit816b78c0d9733362d8653eb2032f126e6a710030 (patch)
tree5748746b9f292d8f383e30ecf48a82bc916862e7
parent1b9d5e50710cefde4bd1e234695783d62e824c68 (diff)
Reduces the types on sender and receiver
In exchange for an UnsafeCell being passed into split
-rw-r--r--embassy/src/util/mpsc.rs248
-rw-r--r--examples/nrf/src/bin/mpsc.rs8
2 files changed, 110 insertions, 146 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index d24eb00bf..d8a010d7d 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -39,7 +39,6 @@
39 39
40use core::cell::UnsafeCell; 40use core::cell::UnsafeCell;
41use core::fmt; 41use core::fmt;
42use core::marker::PhantomData;
43use core::mem::MaybeUninit; 42use core::mem::MaybeUninit;
44use core::pin::Pin; 43use core::pin::Pin;
45use core::task::Context; 44use core::task::Context;
@@ -55,32 +54,24 @@ use super::ThreadModeMutex;
55/// Send values to the associated `Receiver`. 54/// Send values to the associated `Receiver`.
56/// 55///
57/// Instances are created by the [`split`](split) function. 56/// Instances are created by the [`split`](split) function.
58pub struct Sender<'ch, M, T, const N: usize> 57pub struct Sender<'ch, T> {
59where 58 channel: &'ch UnsafeCell<dyn ChannelLike<T>>,
60 M: Mutex<Data = ()>,
61{
62 channel: *mut Channel<M, T, N>,
63 phantom_data: &'ch PhantomData<T>,
64} 59}
65 60
66// Safe to pass the sender around 61// Safe to pass the sender around
67unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {} 62unsafe impl<'ch, T> Send for Sender<'ch, T> {}
68unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {} 63unsafe impl<'ch, T> Sync for Sender<'ch, T> {}
69 64
70/// Receive values from the associated `Sender`. 65/// Receive values from the associated `Sender`.
71/// 66///
72/// Instances are created by the [`split`](split) function. 67/// Instances are created by the [`split`](split) function.
73pub struct Receiver<'ch, M, T, const N: usize> 68pub struct Receiver<'ch, T> {
74where 69 channel: &'ch UnsafeCell<dyn ChannelLike<T>>,
75 M: Mutex<Data = ()>,
76{
77 channel: *mut Channel<M, T, N>,
78 _phantom_data: &'ch PhantomData<T>,
79} 70}
80 71
81// Safe to pass the receiver around 72// Safe to pass the receiver around
82unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {} 73unsafe impl<'ch, T> Send for Receiver<'ch, T> {}
83unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {} 74unsafe impl<'ch, T> Sync for Receiver<'ch, T> {}
84 75
85/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. 76/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
86/// 77///
@@ -98,37 +89,29 @@ unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M:
98/// their channel. The following will therefore fail compilation: 89/// their channel. The following will therefore fail compilation:
99//// 90////
100/// ```compile_fail 91/// ```compile_fail
92/// use core::cell::UnsafeCell;
101/// use embassy::util::mpsc; 93/// use embassy::util::mpsc;
102/// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; 94/// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
103/// 95///
104/// let (sender, receiver) = { 96/// let (sender, receiver) = {
105/// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); 97/// let mut channel = UnsafeCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only());
106/// mpsc::split(&mut channel) 98/// mpsc::split(&channel)
107/// }; 99/// };
108/// ``` 100/// ```
109pub fn split<'ch, M, T, const N: usize>( 101pub fn split<'ch, T>(
110 channel: &'ch mut Channel<M, T, N>, 102 channel: &'ch UnsafeCell<dyn ChannelLike<T>>,
111) -> (Sender<'ch, M, T, N>, Receiver<'ch, M, T, N>) 103) -> (Sender<'ch, T>, Receiver<'ch, T>) {
112where 104 let sender = Sender { channel: &channel };
113 M: Mutex<Data = ()>, 105 let receiver = Receiver { channel: &channel };
114{ 106 {
115 let sender = Sender { 107 let c = unsafe { &mut *channel.get() };
116 channel, 108 c.register_receiver();
117 phantom_data: &PhantomData, 109 c.register_sender();
118 }; 110 }
119 let receiver = Receiver {
120 channel,
121 _phantom_data: &PhantomData,
122 };
123 channel.register_receiver();
124 channel.register_sender();
125 (sender, receiver) 111 (sender, receiver)
126} 112}
127 113
128impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> 114impl<'ch, T> Receiver<'ch, T> {
129where
130 M: Mutex<Data = ()>,
131{
132 /// Receives the next value for this receiver. 115 /// Receives the next value for this receiver.
133 /// 116 ///
134 /// This method returns `None` if the channel has been closed and there are 117 /// This method returns `None` if the channel has been closed and there are
@@ -154,7 +137,7 @@ where
154 /// This method will either receive a message from the channel immediately or return an error 137 /// This method will either receive a message from the channel immediately or return an error
155 /// if the channel is empty. 138 /// if the channel is empty.
156 pub fn try_recv(&self) -> Result<T, TryRecvError> { 139 pub fn try_recv(&self) -> Result<T, TryRecvError> {
157 unsafe { self.channel.as_mut().unwrap().try_recv() } 140 unsafe { &mut *self.channel.get() }.try_recv()
158 } 141 }
159 142
160 /// Closes the receiving half of a channel without dropping it. 143 /// Closes the receiving half of a channel without dropping it.
@@ -168,14 +151,11 @@ where
168 /// until those are released. 151 /// until those are released.
169 /// 152 ///
170 pub fn close(&mut self) { 153 pub fn close(&mut self) {
171 unsafe { self.channel.as_mut().unwrap().close() } 154 unsafe { &mut *self.channel.get() }.close()
172 } 155 }
173} 156}
174 157
175impl<'ch, M, T, const N: usize> Future for Receiver<'ch, M, T, N> 158impl<'ch, T> Future for Receiver<'ch, T> {
176where
177 M: Mutex<Data = ()>,
178{
179 type Output = Option<T>; 159 type Output = Option<T>;
180 160
181 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 161 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -183,31 +163,20 @@ where
183 Ok(v) => Poll::Ready(Some(v)), 163 Ok(v) => Poll::Ready(Some(v)),
184 Err(TryRecvError::Closed) => Poll::Ready(None), 164 Err(TryRecvError::Closed) => Poll::Ready(None),
185 Err(TryRecvError::Empty) => { 165 Err(TryRecvError::Empty) => {
186 unsafe { 166 unsafe { &mut *self.channel.get() }.set_receiver_waker(cx.waker().clone());
187 self.channel
188 .as_mut()
189 .unwrap()
190 .set_receiver_waker(cx.waker().clone());
191 };
192 Poll::Pending 167 Poll::Pending
193 } 168 }
194 } 169 }
195 } 170 }
196} 171}
197 172
198impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> 173impl<'ch, T> Drop for Receiver<'ch, T> {
199where
200 M: Mutex<Data = ()>,
201{
202 fn drop(&mut self) { 174 fn drop(&mut self) {
203 unsafe { self.channel.as_mut().unwrap().deregister_receiver() } 175 unsafe { &mut *self.channel.get() }.deregister_receiver()
204 } 176 }
205} 177}
206 178
207impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> 179impl<'ch, T> Sender<'ch, T> {
208where
209 M: Mutex<Data = ()>,
210{
211 /// Sends a value, waiting until there is capacity. 180 /// Sends a value, waiting until there is capacity.
212 /// 181 ///
213 /// A successful send occurs when it is determined that the other end of the 182 /// A successful send occurs when it is determined that the other end of the
@@ -255,7 +224,7 @@ where
255 /// [`channel`]: channel 224 /// [`channel`]: channel
256 /// [`close`]: Receiver::close 225 /// [`close`]: Receiver::close
257 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { 226 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
258 unsafe { self.channel.as_mut().unwrap().try_send(message) } 227 unsafe { &mut *self.channel.get() }.try_send(message)
259 } 228 }
260 229
261 /// Completes when the receiver has dropped. 230 /// Completes when the receiver has dropped.
@@ -276,22 +245,16 @@ where
276 /// [`Receiver`]: crate::sync::mpsc::Receiver 245 /// [`Receiver`]: crate::sync::mpsc::Receiver
277 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close 246 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
278 pub fn is_closed(&self) -> bool { 247 pub fn is_closed(&self) -> bool {
279 unsafe { self.channel.as_mut().unwrap().is_closed() } 248 unsafe { &mut *self.channel.get() }.is_closed()
280 } 249 }
281} 250}
282 251
283struct SendFuture<'ch, M, T, const N: usize> 252struct SendFuture<'ch, T> {
284where 253 sender: Sender<'ch, T>,
285 M: Mutex<Data = ()>,
286{
287 sender: Sender<'ch, M, T, N>,
288 message: UnsafeCell<T>, 254 message: UnsafeCell<T>,
289} 255}
290 256
291impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> 257impl<'ch, T> Future for SendFuture<'ch, T> {
292where
293 M: Mutex<Data = ()>,
294{
295 type Output = Result<(), SendError<T>>; 258 type Output = Result<(), SendError<T>>;
296 259
297 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 260 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -299,13 +262,7 @@ where
299 Ok(..) => Poll::Ready(Ok(())), 262 Ok(..) => Poll::Ready(Ok(())),
300 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), 263 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
301 Err(TrySendError::Full(..)) => { 264 Err(TrySendError::Full(..)) => {
302 unsafe { 265 unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone());
303 self.sender
304 .channel
305 .as_mut()
306 .unwrap()
307 .set_senders_waker(cx.waker().clone());
308 };
309 Poll::Pending 266 Poll::Pending
310 // Note we leave the existing UnsafeCell contents - they still 267 // Note we leave the existing UnsafeCell contents - they still
311 // contain the original message. We could create another UnsafeCell 268 // contain the original message. We could create another UnsafeCell
@@ -315,53 +272,34 @@ where
315 } 272 }
316} 273}
317 274
318struct CloseFuture<'ch, M, T, const N: usize> 275struct CloseFuture<'ch, T> {
319where 276 sender: Sender<'ch, T>,
320 M: Mutex<Data = ()>,
321{
322 sender: Sender<'ch, M, T, N>,
323} 277}
324 278
325impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> 279impl<'ch, T> Future for CloseFuture<'ch, T> {
326where
327 M: Mutex<Data = ()>,
328{
329 type Output = (); 280 type Output = ();
330 281
331 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 282 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
332 if self.sender.is_closed() { 283 if self.sender.is_closed() {
333 Poll::Ready(()) 284 Poll::Ready(())
334 } else { 285 } else {
335 unsafe { 286 unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone());
336 self.sender
337 .channel
338 .as_mut()
339 .unwrap()
340 .set_senders_waker(cx.waker().clone());
341 };
342 Poll::Pending 287 Poll::Pending
343 } 288 }
344 } 289 }
345} 290}
346 291
347impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> 292impl<'ch, T> Drop for Sender<'ch, T> {
348where
349 M: Mutex<Data = ()>,
350{
351 fn drop(&mut self) { 293 fn drop(&mut self) {
352 unsafe { self.channel.as_mut().unwrap().deregister_sender() } 294 unsafe { &mut *self.channel.get() }.deregister_sender()
353 } 295 }
354} 296}
355 297
356impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> 298impl<'ch, T> Clone for Sender<'ch, T> {
357where
358 M: Mutex<Data = ()>,
359{
360 fn clone(&self) -> Self { 299 fn clone(&self) -> Self {
361 unsafe { self.channel.as_mut().unwrap().register_sender() }; 300 unsafe { &mut *self.channel.get() }.register_sender();
362 Sender { 301 Sender {
363 channel: self.channel, 302 channel: self.channel.clone(),
364 phantom_data: self.phantom_data,
365 } 303 }
366 } 304 }
367} 305}
@@ -414,6 +352,28 @@ impl<T> fmt::Display for TrySendError<T> {
414 } 352 }
415} 353}
416 354
355pub trait ChannelLike<T> {
356 fn try_recv(&mut self) -> Result<T, TryRecvError>;
357
358 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>>;
359
360 fn close(&mut self);
361
362 fn is_closed(&mut self) -> bool;
363
364 fn register_receiver(&mut self);
365
366 fn deregister_receiver(&mut self);
367
368 fn register_sender(&mut self);
369
370 fn deregister_sender(&mut self);
371
372 fn set_receiver_waker(&mut self, receiver_waker: Waker);
373
374 fn set_senders_waker(&mut self, senders_waker: Waker);
375}
376
417pub struct ChannelState<T, const N: usize> { 377pub struct ChannelState<T, const N: usize> {
418 buf: [MaybeUninit<UnsafeCell<T>>; N], 378 buf: [MaybeUninit<UnsafeCell<T>>; N],
419 read_pos: usize, 379 read_pos: usize,
@@ -480,13 +440,14 @@ impl<T, const N: usize> Channel<WithCriticalSections, T, N> {
480 /// from exception mode e.g. interrupt handlers. To create one: 440 /// from exception mode e.g. interrupt handlers. To create one:
481 /// 441 ///
482 /// ``` 442 /// ```
443 /// use core::cell::UnsafeCell;
483 /// use embassy::util::mpsc; 444 /// use embassy::util::mpsc;
484 /// use embassy::util::mpsc::{Channel, WithCriticalSections}; 445 /// use embassy::util::mpsc::{Channel, WithCriticalSections};
485 /// 446 ///
486 /// // Declare a bounded channel of 3 u32s. 447 /// // Declare a bounded channel of 3 u32s.
487 /// let mut channel = mpsc::Channel::<WithCriticalSections, u32, 3>::with_critical_sections(); 448 /// let mut channel = UnsafeCell::new(mpsc::Channel::<WithCriticalSections, u32, 3>::with_critical_sections());
488 /// // once we have a channel, obtain its sender and receiver 449 /// // once we have a channel, obtain its sender and receiver
489 /// let (sender, receiver) = mpsc::split(&mut channel); 450 /// let (sender, receiver) = mpsc::split(&channel);
490 /// ``` 451 /// ```
491 pub const fn with_critical_sections() -> Self { 452 pub const fn with_critical_sections() -> Self {
492 let mutex = CriticalSectionMutex::new(()); 453 let mutex = CriticalSectionMutex::new(());
@@ -504,13 +465,14 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
504 /// channel avoids all locks. To create one: 465 /// channel avoids all locks. To create one:
505 /// 466 ///
506 /// ``` no_run 467 /// ``` no_run
468 /// use core::cell::UnsafeCell;
507 /// use embassy::util::mpsc; 469 /// use embassy::util::mpsc;
508 /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; 470 /// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
509 /// 471 ///
510 /// // Declare a bounded channel of 3 u32s. 472 /// // Declare a bounded channel of 3 u32s.
511 /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); 473 /// let mut channel = UnsafeCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only());
512 /// // once we have a channel, obtain its sender and receiver 474 /// // once we have a channel, obtain its sender and receiver
513 /// let (sender, receiver) = mpsc::split(&mut channel); 475 /// let (sender, receiver) = mpsc::split(&channel);
514 /// ``` 476 /// ```
515 pub const fn with_thread_mode_only() -> Self { 477 pub const fn with_thread_mode_only() -> Self {
516 let mutex = ThreadModeMutex::new(()); 478 let mutex = ThreadModeMutex::new(());
@@ -519,7 +481,7 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> {
519 } 481 }
520} 482}
521 483
522impl<M, T, const N: usize> Channel<M, T, N> 484impl<M, T, const N: usize> ChannelLike<T> for Channel<M, T, N>
523where 485where
524 M: Mutex<Data = ()>, 486 M: Mutex<Data = ()>,
525{ 487{
@@ -771,16 +733,16 @@ mod tests {
771 733
772 #[test] 734 #[test]
773 fn simple_send_and_receive() { 735 fn simple_send_and_receive() {
774 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 736 let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads());
775 let (s, r) = split(&mut c); 737 let (s, r) = split(&c);
776 assert!(s.clone().try_send(1).is_ok()); 738 assert!(s.clone().try_send(1).is_ok());
777 assert_eq!(r.try_recv().unwrap(), 1); 739 assert_eq!(r.try_recv().unwrap(), 1);
778 } 740 }
779 741
780 #[test] 742 #[test]
781 fn should_close_without_sender() { 743 fn should_close_without_sender() {
782 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 744 let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads());
783 let (s, r) = split(&mut c); 745 let (s, r) = split(&c);
784 drop(s); 746 drop(s);
785 match r.try_recv() { 747 match r.try_recv() {
786 Err(TryRecvError::Closed) => assert!(true), 748 Err(TryRecvError::Closed) => assert!(true),
@@ -790,8 +752,8 @@ mod tests {
790 752
791 #[test] 753 #[test]
792 fn should_close_once_drained() { 754 fn should_close_once_drained() {
793 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 755 let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads());
794 let (s, r) = split(&mut c); 756 let (s, r) = split(&c);
795 assert!(s.try_send(1).is_ok()); 757 assert!(s.try_send(1).is_ok());
796 drop(s); 758 drop(s);
797 assert_eq!(r.try_recv().unwrap(), 1); 759 assert_eq!(r.try_recv().unwrap(), 1);
@@ -803,8 +765,8 @@ mod tests {
803 765
804 #[test] 766 #[test]
805 fn should_reject_send_when_receiver_dropped() { 767 fn should_reject_send_when_receiver_dropped() {
806 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 768 let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads());
807 let (s, r) = split(&mut c); 769 let (s, r) = split(&c);
808 drop(r); 770 drop(r);
809 match s.try_send(1) { 771 match s.try_send(1) {
810 Err(TrySendError::Closed(1)) => assert!(true), 772 Err(TrySendError::Closed(1)) => assert!(true),
@@ -814,8 +776,8 @@ mod tests {
814 776
815 #[test] 777 #[test]
816 fn should_reject_send_when_channel_closed() { 778 fn should_reject_send_when_channel_closed() {
817 let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); 779 let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads());
818 let (s, mut r) = split(&mut c); 780 let (s, mut r) = split(&c);
819 assert!(s.try_send(1).is_ok()); 781 assert!(s.try_send(1).is_ok());
820 r.close(); 782 r.close();
821 assert_eq!(r.try_recv().unwrap(), 1); 783 assert_eq!(r.try_recv().unwrap(), 1);
@@ -830,9 +792,9 @@ mod tests {
830 async fn receiver_closes_when_sender_dropped_async() { 792 async fn receiver_closes_when_sender_dropped_async() {
831 let executor = ThreadPool::new().unwrap(); 793 let executor = ThreadPool::new().unwrap();
832 794
833 static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = 795 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 3>> =
834 Channel::with_critical_sections(); 796 UnsafeCell::new(Channel::with_critical_sections());
835 let (s, mut r) = split(unsafe { &mut CHANNEL }); 797 let (s, mut r) = split(unsafe { &CHANNEL });
836 assert!(executor 798 assert!(executor
837 .spawn(async move { 799 .spawn(async move {
838 drop(s); 800 drop(s);
@@ -845,12 +807,12 @@ mod tests {
845 async fn receiver_receives_given_try_send_async() { 807 async fn receiver_receives_given_try_send_async() {
846 let executor = ThreadPool::new().unwrap(); 808 let executor = ThreadPool::new().unwrap();
847 809
848 static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = 810 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 3>> =
849 Channel::with_critical_sections(); 811 UnsafeCell::new(Channel::with_critical_sections());
850 let (s, mut r) = split(unsafe { &mut CHANNEL }); 812 let (s, mut r) = split(unsafe { &CHANNEL });
851 assert!(executor 813 assert!(executor
852 .spawn(async move { 814 .spawn(async move {
853 let _ = s.try_send(1); 815 assert!(s.try_send(1).is_ok());
854 }) 816 })
855 .is_ok()); 817 .is_ok());
856 assert_eq!(r.recv().await, Some(1)); 818 assert_eq!(r.recv().await, Some(1));
@@ -858,18 +820,18 @@ mod tests {
858 820
859 #[futures_test::test] 821 #[futures_test::test]
860 async fn sender_send_completes_if_capacity() { 822 async fn sender_send_completes_if_capacity() {
861 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 823 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> =
862 Channel::with_critical_sections(); 824 UnsafeCell::new(Channel::with_critical_sections());
863 let (s, mut r) = split(unsafe { &mut CHANNEL }); 825 let (s, mut r) = split(unsafe { &CHANNEL });
864 assert!(s.send(1).await.is_ok()); 826 assert!(s.send(1).await.is_ok());
865 assert_eq!(r.recv().await, Some(1)); 827 assert_eq!(r.recv().await, Some(1));
866 } 828 }
867 829
868 #[futures_test::test] 830 #[futures_test::test]
869 async fn sender_send_completes_if_closed() { 831 async fn sender_send_completes_if_closed() {
870 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 832 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> =
871 Channel::with_critical_sections(); 833 UnsafeCell::new(Channel::with_critical_sections());
872 let (s, r) = split(unsafe { &mut CHANNEL }); 834 let (s, r) = split(unsafe { &CHANNEL });
873 drop(r); 835 drop(r);
874 match s.send(1).await { 836 match s.send(1).await {
875 Err(SendError(1)) => assert!(true), 837 Err(SendError(1)) => assert!(true),
@@ -881,9 +843,9 @@ mod tests {
881 async fn senders_sends_wait_until_capacity() { 843 async fn senders_sends_wait_until_capacity() {
882 let executor = ThreadPool::new().unwrap(); 844 let executor = ThreadPool::new().unwrap();
883 845
884 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 846 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> =
885 Channel::with_critical_sections(); 847 UnsafeCell::new(Channel::with_critical_sections());
886 let (s0, mut r) = split(unsafe { &mut CHANNEL }); 848 let (s0, mut r) = split(unsafe { &CHANNEL });
887 assert!(s0.try_send(1).is_ok()); 849 assert!(s0.try_send(1).is_ok());
888 let s1 = s0.clone(); 850 let s1 = s0.clone();
889 let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); 851 let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
@@ -901,18 +863,18 @@ mod tests {
901 863
902 #[futures_test::test] 864 #[futures_test::test]
903 async fn sender_close_completes_if_closing() { 865 async fn sender_close_completes_if_closing() {
904 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 866 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> =
905 Channel::with_critical_sections(); 867 UnsafeCell::new(Channel::with_critical_sections());
906 let (s, mut r) = split(unsafe { &mut CHANNEL }); 868 let (s, mut r) = split(unsafe { &CHANNEL });
907 r.close(); 869 r.close();
908 s.closed().await; 870 s.closed().await;
909 } 871 }
910 872
911 #[futures_test::test] 873 #[futures_test::test]
912 async fn sender_close_completes_if_closed() { 874 async fn sender_close_completes_if_closed() {
913 static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = 875 static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> =
914 Channel::with_critical_sections(); 876 UnsafeCell::new(Channel::with_critical_sections());
915 let (s, r) = split(unsafe { &mut CHANNEL }); 877 let (s, r) = split(unsafe { &CHANNEL });
916 drop(r); 878 drop(r);
917 s.closed().await; 879 s.closed().await;
918 } 880 }
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
index 6a0f8f471..d692abee2 100644
--- a/examples/nrf/src/bin/mpsc.rs
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -8,6 +8,8 @@
8#[path = "../example_common.rs"] 8#[path = "../example_common.rs"]
9mod example_common; 9mod example_common;
10 10
11use core::cell::UnsafeCell;
12
11use defmt::panic; 13use defmt::panic;
12use embassy::executor::Spawner; 14use embassy::executor::Spawner;
13use embassy::time::{Duration, Timer}; 15use embassy::time::{Duration, Timer};
@@ -23,10 +25,10 @@ enum LedState {
23 Off, 25 Off,
24} 26}
25 27
26static CHANNEL: Forever<Channel<WithThreadModeOnly, LedState, 1>> = Forever::new(); 28static CHANNEL: Forever<UnsafeCell<Channel<WithThreadModeOnly, LedState, 1>>> = Forever::new();
27 29
28#[embassy::task(pool_size = 1)] 30#[embassy::task(pool_size = 1)]
29async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { 31async fn my_task(sender: Sender<'static, LedState>) {
30 loop { 32 loop {
31 let _ = sender.send(LedState::On).await; 33 let _ = sender.send(LedState::On).await;
32 Timer::after(Duration::from_secs(1)).await; 34 Timer::after(Duration::from_secs(1)).await;
@@ -39,7 +41,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) {
39async fn main(spawner: Spawner, p: Peripherals) { 41async fn main(spawner: Spawner, p: Peripherals) {
40 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); 42 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
41 43
42 let channel = CHANNEL.put(Channel::with_thread_mode_only()); 44 let channel = CHANNEL.put(UnsafeCell::new(Channel::with_thread_mode_only()));
43 let (sender, mut receiver) = mpsc::split(channel); 45 let (sender, mut receiver) = mpsc::split(channel);
44 46
45 spawner.spawn(my_task(sender)).unwrap(); 47 spawner.spawn(my_task(sender)).unwrap();