aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2021-09-12 23:36:52 +0200
committerDario Nieuwenhuis <[email protected]>2021-09-13 00:08:41 +0200
commit70e5877d6823ba0894241e8bedc5cefa7e21bceb (patch)
treec8f8671800eff4f2497ea8e4c7a96ba752db4aaa
parent5be5bdfd20900ee1973097cf46ec946cad547e30 (diff)
embassy/channel: switch to use MutexKind
-rw-r--r--embassy/src/channel/mpsc.rs188
-rw-r--r--examples/nrf/src/bin/mpsc.rs7
2 files changed, 69 insertions, 126 deletions
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs
index 3585b8039..9a57c0b19 100644
--- a/embassy/src/channel/mpsc.rs
+++ b/embassy/src/channel/mpsc.rs
@@ -37,7 +37,7 @@
37//! 37//!
38//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html 38//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html
39 39
40use core::cell::UnsafeCell; 40use core::cell::RefCell;
41use core::fmt; 41use core::fmt;
42use core::pin::Pin; 42use core::pin::Pin;
43use core::task::Context; 43use core::task::Context;
@@ -47,7 +47,8 @@ use core::task::Waker;
47use futures::Future; 47use futures::Future;
48use heapless::Deque; 48use heapless::Deque;
49 49
50use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; 50use crate::blocking_mutex::kind::MutexKind;
51use crate::blocking_mutex::Mutex;
51use crate::waitqueue::WakerRegistration; 52use crate::waitqueue::WakerRegistration;
52 53
53/// Send values to the associated `Receiver`. 54/// Send values to the associated `Receiver`.
@@ -55,35 +56,19 @@ use crate::waitqueue::WakerRegistration;
55/// Instances are created by the [`split`](split) function. 56/// Instances are created by the [`split`](split) function.
56pub struct Sender<'ch, M, T, const N: usize> 57pub struct Sender<'ch, M, T, const N: usize>
57where 58where
58 M: Mutex<Data = ()>, 59 M: MutexKind,
59{ 60{
60 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, 61 channel: &'ch Channel<M, T, N>,
61} 62}
62 63
63// Safe to pass the sender around
64unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
65{}
66unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
67{}
68
69/// Receive values from the associated `Sender`. 64/// Receive values from the associated `Sender`.
70/// 65///
71/// Instances are created by the [`split`](split) function. 66/// Instances are created by the [`split`](split) function.
72pub struct Receiver<'ch, M, T, const N: usize> 67pub struct Receiver<'ch, M, T, const N: usize>
73where 68where
74 M: Mutex<Data = ()>, 69 M: MutexKind,
75{
76 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
77}
78
79// Safe to pass the receiver around
80unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where
81 M: Mutex<Data = ()> + Sync
82{
83}
84unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where
85 M: Mutex<Data = ()> + Sync
86{ 70{
71 channel: &'ch Channel<M, T, N>,
87} 72}
88 73
89/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. 74/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
@@ -114,15 +99,11 @@ pub fn split<M, T, const N: usize>(
114 channel: &mut Channel<M, T, N>, 99 channel: &mut Channel<M, T, N>,
115) -> (Sender<M, T, N>, Receiver<M, T, N>) 100) -> (Sender<M, T, N>, Receiver<M, T, N>)
116where 101where
117 M: Mutex<Data = ()>, 102 M: MutexKind,
118{ 103{
119 let sender = Sender { 104 let sender = Sender { channel };
120 channel_cell: &channel.channel_cell, 105 let receiver = Receiver { channel };
121 }; 106 channel.lock(|c| {
122 let receiver = Receiver {
123 channel_cell: &channel.channel_cell,
124 };
125 Channel::lock(&channel.channel_cell, |c| {
126 c.register_receiver(); 107 c.register_receiver();
127 c.register_sender(); 108 c.register_sender();
128 }); 109 });
@@ -131,7 +112,7 @@ where
131 112
132impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> 113impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
133where 114where
134 M: Mutex<Data = ()>, 115 M: MutexKind,
135{ 116{
136 /// Receives the next value for this receiver. 117 /// Receives the next value for this receiver.
137 /// 118 ///
@@ -151,7 +132,7 @@ where
151 /// [`close`]: Self::close 132 /// [`close`]: Self::close
152 pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { 133 pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> {
153 RecvFuture { 134 RecvFuture {
154 channel_cell: self.channel_cell, 135 channel: self.channel,
155 } 136 }
156 } 137 }
157 138
@@ -160,7 +141,7 @@ where
160 /// This method will either receive a message from the channel immediately or return an error 141 /// This method will either receive a message from the channel immediately or return an error
161 /// if the channel is empty. 142 /// if the channel is empty.
162 pub fn try_recv(&self) -> Result<T, TryRecvError> { 143 pub fn try_recv(&self) -> Result<T, TryRecvError> {
163 Channel::lock(self.channel_cell, |c| c.try_recv()) 144 self.channel.lock(|c| c.try_recv())
164 } 145 }
165 146
166 /// Closes the receiving half of a channel without dropping it. 147 /// Closes the receiving half of a channel without dropping it.
@@ -174,56 +155,45 @@ where
174 /// until those are released. 155 /// until those are released.
175 /// 156 ///
176 pub fn close(&mut self) { 157 pub fn close(&mut self) {
177 Channel::lock(self.channel_cell, |c| c.close()) 158 self.channel.lock(|c| c.close())
178 } 159 }
179} 160}
180 161
181impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> 162impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
182where 163where
183 M: Mutex<Data = ()>, 164 M: MutexKind,
184{ 165{
185 fn drop(&mut self) { 166 fn drop(&mut self) {
186 Channel::lock(self.channel_cell, |c| c.deregister_receiver()) 167 self.channel.lock(|c| c.deregister_receiver())
187 } 168 }
188} 169}
189 170
190pub struct RecvFuture<'ch, M, T, const N: usize> 171pub struct RecvFuture<'ch, M, T, const N: usize>
191where 172where
192 M: Mutex<Data = ()>, 173 M: MutexKind,
193{ 174{
194 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, 175 channel: &'ch Channel<M, T, N>,
195} 176}
196 177
197impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> 178impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
198where 179where
199 M: Mutex<Data = ()>, 180 M: MutexKind,
200{ 181{
201 type Output = Option<T>; 182 type Output = Option<T>;
202 183
203 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { 184 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
204 Channel::lock(self.channel_cell, |c| { 185 self.channel
205 match c.try_recv_with_context(Some(cx)) { 186 .lock(|c| match c.try_recv_with_context(Some(cx)) {
206 Ok(v) => Poll::Ready(Some(v)), 187 Ok(v) => Poll::Ready(Some(v)),
207 Err(TryRecvError::Closed) => Poll::Ready(None), 188 Err(TryRecvError::Closed) => Poll::Ready(None),
208 Err(TryRecvError::Empty) => Poll::Pending, 189 Err(TryRecvError::Empty) => Poll::Pending,
209 } 190 })
210 })
211 } 191 }
212} 192}
213 193
214// Safe to pass the receive future around since it locks channel whenever polled
215unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where
216 M: Mutex<Data = ()> + Sync
217{
218}
219unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where
220 M: Mutex<Data = ()> + Sync
221{
222}
223
224impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> 194impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
225where 195where
226 M: Mutex<Data = ()>, 196 M: MutexKind,
227{ 197{
228 /// Sends a value, waiting until there is capacity. 198 /// Sends a value, waiting until there is capacity.
229 /// 199 ///
@@ -245,7 +215,7 @@ where
245 /// [`Receiver`]: Receiver 215 /// [`Receiver`]: Receiver
246 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { 216 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
247 SendFuture { 217 SendFuture {
248 sender: self.clone(), 218 channel: self.channel,
249 message: Some(message), 219 message: Some(message),
250 } 220 }
251 } 221 }
@@ -271,7 +241,7 @@ where
271 /// [`channel`]: channel 241 /// [`channel`]: channel
272 /// [`close`]: Receiver::close 242 /// [`close`]: Receiver::close
273 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { 243 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
274 Channel::lock(self.channel_cell, |c| c.try_send(message)) 244 self.channel.lock(|c| c.try_send(message))
275 } 245 }
276 246
277 /// Completes when the receiver has dropped. 247 /// Completes when the receiver has dropped.
@@ -280,7 +250,7 @@ where
280 /// values is canceled and immediately stop doing work. 250 /// values is canceled and immediately stop doing work.
281 pub async fn closed(&self) { 251 pub async fn closed(&self) {
282 CloseFuture { 252 CloseFuture {
283 sender: self.clone(), 253 channel: self.channel,
284 } 254 }
285 .await 255 .await
286 } 256 }
@@ -292,29 +262,27 @@ where
292 /// [`Receiver`]: crate::sync::mpsc::Receiver 262 /// [`Receiver`]: crate::sync::mpsc::Receiver
293 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close 263 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
294 pub fn is_closed(&self) -> bool { 264 pub fn is_closed(&self) -> bool {
295 Channel::lock(self.channel_cell, |c| c.is_closed()) 265 self.channel.lock(|c| c.is_closed())
296 } 266 }
297} 267}
298 268
299pub struct SendFuture<'ch, M, T, const N: usize> 269pub struct SendFuture<'ch, M, T, const N: usize>
300where 270where
301 M: Mutex<Data = ()>, 271 M: MutexKind,
302{ 272{
303 sender: Sender<'ch, M, T, N>, 273 channel: &'ch Channel<M, T, N>,
304 message: Option<T>, 274 message: Option<T>,
305} 275}
306 276
307impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> 277impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
308where 278where
309 M: Mutex<Data = ()>, 279 M: MutexKind,
310{ 280{
311 type Output = Result<(), SendError<T>>; 281 type Output = Result<(), SendError<T>>;
312 282
313 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 283 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
314 match self.message.take() { 284 match self.message.take() {
315 Some(m) => match Channel::lock(self.sender.channel_cell, |c| { 285 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
316 c.try_send_with_context(m, Some(cx))
317 }) {
318 Ok(..) => Poll::Ready(Ok(())), 286 Ok(..) => Poll::Ready(Ok(())),
319 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), 287 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
320 Err(TrySendError::Full(m)) => { 288 Err(TrySendError::Full(m)) => {
@@ -327,25 +295,23 @@ where
327 } 295 }
328} 296}
329 297
330impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {} 298impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: MutexKind {}
331 299
332struct CloseFuture<'ch, M, T, const N: usize> 300struct CloseFuture<'ch, M, T, const N: usize>
333where 301where
334 M: Mutex<Data = ()>, 302 M: MutexKind,
335{ 303{
336 sender: Sender<'ch, M, T, N>, 304 channel: &'ch Channel<M, T, N>,
337} 305}
338 306
339impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> 307impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
340where 308where
341 M: Mutex<Data = ()>, 309 M: MutexKind,
342{ 310{
343 type Output = (); 311 type Output = ();
344 312
345 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 313 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
346 if Channel::lock(self.sender.channel_cell, |c| { 314 if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) {
347 c.is_closed_with_context(Some(cx))
348 }) {
349 Poll::Ready(()) 315 Poll::Ready(())
350 } else { 316 } else {
351 Poll::Pending 317 Poll::Pending
@@ -355,22 +321,21 @@ where
355 321
356impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> 322impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
357where 323where
358 M: Mutex<Data = ()>, 324 M: MutexKind,
359{ 325{
360 fn drop(&mut self) { 326 fn drop(&mut self) {
361 Channel::lock(self.channel_cell, |c| c.deregister_sender()) 327 self.channel.lock(|c| c.deregister_sender())
362 } 328 }
363} 329}
364 330
365impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> 331impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
366where 332where
367 M: Mutex<Data = ()>, 333 M: MutexKind,
368{ 334{
369 #[allow(clippy::clone_double_ref)]
370 fn clone(&self) -> Self { 335 fn clone(&self) -> Self {
371 Channel::lock(self.channel_cell, |c| c.register_sender()); 336 self.channel.lock(|c| c.register_sender());
372 Sender { 337 Sender {
373 channel_cell: self.channel_cell.clone(), 338 channel: self.channel,
374 } 339 }
375 } 340 }
376} 341}
@@ -581,59 +546,35 @@ impl<T, const N: usize> ChannelState<T, N> {
581/// All data sent will become available in the same order as it was sent. 546/// All data sent will become available in the same order as it was sent.
582pub struct Channel<M, T, const N: usize> 547pub struct Channel<M, T, const N: usize>
583where 548where
584 M: Mutex<Data = ()>, 549 M: MutexKind,
585{
586 channel_cell: UnsafeCell<ChannelCell<M, T, N>>,
587}
588
589struct ChannelCell<M, T, const N: usize>
590where
591 M: Mutex<Data = ()>,
592{ 550{
593 mutex: M, 551 inner: M::Mutex<RefCell<ChannelState<T, N>>>,
594 state: ChannelState<T, N>,
595} 552}
596 553
597pub type WithCriticalSections = CriticalSectionMutex<()>;
598
599pub type WithThreadModeOnly = ThreadModeMutex<()>;
600
601pub type WithNoThreads = NoopMutex<()>;
602
603impl<M, T, const N: usize> Channel<M, T, N> 554impl<M, T, const N: usize> Channel<M, T, N>
604where 555where
605 M: Mutex<Data = ()>, 556 M: MutexKind,
606{ 557{
607 /// Establish a new bounded channel. For example, to create one with a NoopMutex: 558 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
608 /// 559 ///
609 /// ``` 560 /// ```
610 /// use embassy::channel::mpsc; 561 /// use embassy::channel::mpsc;
611 /// use embassy::channel::mpsc::{Channel, WithNoThreads}; 562 /// use embassy::blocking_mutex::kind::Noop;
563 /// use embassy::channel::mpsc::Channel;
612 /// 564 ///
613 /// // Declare a bounded channel of 3 u32s. 565 /// // Declare a bounded channel of 3 u32s.
614 /// let mut channel = Channel::<WithNoThreads, u32, 3>::new(); 566 /// let mut channel = Channel::<Noop, u32, 3>::new();
615 /// // once we have a channel, obtain its sender and receiver 567 /// // once we have a channel, obtain its sender and receiver
616 /// let (sender, receiver) = mpsc::split(&mut channel); 568 /// let (sender, receiver) = mpsc::split(&mut channel);
617 /// ``` 569 /// ```
618 pub fn new() -> Self { 570 pub fn new() -> Self {
619 let mutex = M::new(()); 571 Self {
620 let state = ChannelState::new(); 572 inner: M::Mutex::new(RefCell::new(ChannelState::new())),
621 let channel_cell = ChannelCell { mutex, state };
622 Channel {
623 channel_cell: UnsafeCell::new(channel_cell),
624 } 573 }
625 } 574 }
626 575
627 fn lock<R>( 576 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
628 channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, 577 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
629 f: impl FnOnce(&mut ChannelState<T, N>) -> R,
630 ) -> R {
631 unsafe {
632 let channel_cell = &mut *(channel_cell.get());
633 let mutex = &mut channel_cell.mutex;
634 let mut state = &mut channel_cell.state;
635 mutex.lock(|_| f(&mut state))
636 }
637 } 578 }
638} 579}
639 580
@@ -645,6 +586,7 @@ mod tests {
645 use futures_executor::ThreadPool; 586 use futures_executor::ThreadPool;
646 use futures_timer::Delay; 587 use futures_timer::Delay;
647 588
589 use crate::blocking_mutex::kind::{CriticalSection, Noop};
648 use crate::util::Forever; 590 use crate::util::Forever;
649 591
650 use super::*; 592 use super::*;
@@ -713,7 +655,7 @@ mod tests {
713 655
714 #[test] 656 #[test]
715 fn simple_send_and_receive() { 657 fn simple_send_and_receive() {
716 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 658 let mut c = Channel::<Noop, u32, 3>::new();
717 let (s, r) = split(&mut c); 659 let (s, r) = split(&mut c);
718 assert!(s.clone().try_send(1).is_ok()); 660 assert!(s.clone().try_send(1).is_ok());
719 assert_eq!(r.try_recv().unwrap(), 1); 661 assert_eq!(r.try_recv().unwrap(), 1);
@@ -721,7 +663,7 @@ mod tests {
721 663
722 #[test] 664 #[test]
723 fn should_close_without_sender() { 665 fn should_close_without_sender() {
724 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 666 let mut c = Channel::<Noop, u32, 3>::new();
725 let (s, r) = split(&mut c); 667 let (s, r) = split(&mut c);
726 drop(s); 668 drop(s);
727 match r.try_recv() { 669 match r.try_recv() {
@@ -732,7 +674,7 @@ mod tests {
732 674
733 #[test] 675 #[test]
734 fn should_close_once_drained() { 676 fn should_close_once_drained() {
735 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 677 let mut c = Channel::<Noop, u32, 3>::new();
736 let (s, r) = split(&mut c); 678 let (s, r) = split(&mut c);
737 assert!(s.try_send(1).is_ok()); 679 assert!(s.try_send(1).is_ok());
738 drop(s); 680 drop(s);
@@ -745,7 +687,7 @@ mod tests {
745 687
746 #[test] 688 #[test]
747 fn should_reject_send_when_receiver_dropped() { 689 fn should_reject_send_when_receiver_dropped() {
748 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 690 let mut c = Channel::<Noop, u32, 3>::new();
749 let (s, r) = split(&mut c); 691 let (s, r) = split(&mut c);
750 drop(r); 692 drop(r);
751 match s.try_send(1) { 693 match s.try_send(1) {
@@ -756,7 +698,7 @@ mod tests {
756 698
757 #[test] 699 #[test]
758 fn should_reject_send_when_channel_closed() { 700 fn should_reject_send_when_channel_closed() {
759 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 701 let mut c = Channel::<Noop, u32, 3>::new();
760 let (s, mut r) = split(&mut c); 702 let (s, mut r) = split(&mut c);
761 assert!(s.try_send(1).is_ok()); 703 assert!(s.try_send(1).is_ok());
762 r.close(); 704 r.close();
@@ -772,7 +714,7 @@ mod tests {
772 async fn receiver_closes_when_sender_dropped_async() { 714 async fn receiver_closes_when_sender_dropped_async() {
773 let executor = ThreadPool::new().unwrap(); 715 let executor = ThreadPool::new().unwrap();
774 716
775 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); 717 static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new();
776 let c = CHANNEL.put(Channel::new()); 718 let c = CHANNEL.put(Channel::new());
777 let (s, mut r) = split(c); 719 let (s, mut r) = split(c);
778 assert!(executor 720 assert!(executor
@@ -787,7 +729,7 @@ mod tests {
787 async fn receiver_receives_given_try_send_async() { 729 async fn receiver_receives_given_try_send_async() {
788 let executor = ThreadPool::new().unwrap(); 730 let executor = ThreadPool::new().unwrap();
789 731
790 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); 732 static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new();
791 let c = CHANNEL.put(Channel::new()); 733 let c = CHANNEL.put(Channel::new());
792 let (s, mut r) = split(c); 734 let (s, mut r) = split(c);
793 assert!(executor 735 assert!(executor
@@ -800,7 +742,7 @@ mod tests {
800 742
801 #[futures_test::test] 743 #[futures_test::test]
802 async fn sender_send_completes_if_capacity() { 744 async fn sender_send_completes_if_capacity() {
803 let mut c = Channel::<WithCriticalSections, u32, 1>::new(); 745 let mut c = Channel::<CriticalSection, u32, 1>::new();
804 let (s, mut r) = split(&mut c); 746 let (s, mut r) = split(&mut c);
805 assert!(s.send(1).await.is_ok()); 747 assert!(s.send(1).await.is_ok());
806 assert_eq!(r.recv().await, Some(1)); 748 assert_eq!(r.recv().await, Some(1));
@@ -808,7 +750,7 @@ mod tests {
808 750
809 #[futures_test::test] 751 #[futures_test::test]
810 async fn sender_send_completes_if_closed() { 752 async fn sender_send_completes_if_closed() {
811 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 753 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
812 let c = CHANNEL.put(Channel::new()); 754 let c = CHANNEL.put(Channel::new());
813 let (s, r) = split(c); 755 let (s, r) = split(c);
814 drop(r); 756 drop(r);
@@ -822,7 +764,7 @@ mod tests {
822 async fn senders_sends_wait_until_capacity() { 764 async fn senders_sends_wait_until_capacity() {
823 let executor = ThreadPool::new().unwrap(); 765 let executor = ThreadPool::new().unwrap();
824 766
825 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 767 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
826 let c = CHANNEL.put(Channel::new()); 768 let c = CHANNEL.put(Channel::new());
827 let (s0, mut r) = split(c); 769 let (s0, mut r) = split(c);
828 assert!(s0.try_send(1).is_ok()); 770 assert!(s0.try_send(1).is_ok());
@@ -842,7 +784,7 @@ mod tests {
842 784
843 #[futures_test::test] 785 #[futures_test::test]
844 async fn sender_close_completes_if_closing() { 786 async fn sender_close_completes_if_closing() {
845 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 787 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
846 let c = CHANNEL.put(Channel::new()); 788 let c = CHANNEL.put(Channel::new());
847 let (s, mut r) = split(c); 789 let (s, mut r) = split(c);
848 r.close(); 790 r.close();
@@ -851,7 +793,7 @@ mod tests {
851 793
852 #[futures_test::test] 794 #[futures_test::test]
853 async fn sender_close_completes_if_closed() { 795 async fn sender_close_completes_if_closed() {
854 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 796 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
855 let c = CHANNEL.put(Channel::new()); 797 let c = CHANNEL.put(Channel::new());
856 let (s, r) = split(c); 798 let (s, r) = split(c);
857 drop(r); 799 drop(r);
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
index 79fa3dfb9..c85b7c282 100644
--- a/examples/nrf/src/bin/mpsc.rs
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -6,7 +6,8 @@
6mod example_common; 6mod example_common;
7 7
8use defmt::unwrap; 8use defmt::unwrap;
9use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError, WithNoThreads}; 9use embassy::blocking_mutex::kind::Noop;
10use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError};
10use embassy::executor::Spawner; 11use embassy::executor::Spawner;
11use embassy::time::{Duration, Timer}; 12use embassy::time::{Duration, Timer};
12use embassy::util::Forever; 13use embassy::util::Forever;
@@ -19,10 +20,10 @@ enum LedState {
19 Off, 20 Off,
20} 21}
21 22
22static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new(); 23static CHANNEL: Forever<Channel<Noop, LedState, 1>> = Forever::new();
23 24
24#[embassy::task(pool_size = 1)] 25#[embassy::task(pool_size = 1)]
25async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { 26async fn my_task(sender: Sender<'static, Noop, LedState, 1>) {
26 loop { 27 loop {
27 let _ = sender.send(LedState::On).await; 28 let _ = sender.send(LedState::On).await;
28 Timer::after(Duration::from_secs(1)).await; 29 Timer::after(Duration::from_secs(1)).await;