aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2021-09-13 00:25:53 +0200
committerGitHub <[email protected]>2021-09-13 00:25:53 +0200
commitf1c35b40c74db489da8e04f1c2e87a1d4030c617 (patch)
treec8f8671800eff4f2497ea8e4c7a96ba752db4aaa
parent67fa6b06fafc8635d2063e687904d30864f45a05 (diff)
parent70e5877d6823ba0894241e8bedc5cefa7e21bceb (diff)
Merge pull request #396 from embassy-rs/channel-fixes
embassy/channel: several improvements
-rw-r--r--embassy/Cargo.toml1
-rw-r--r--embassy/src/blocking_mutex/kind.rs19
-rw-r--r--embassy/src/blocking_mutex/mod.rs10
-rw-r--r--embassy/src/channel/mpsc.rs264
-rw-r--r--examples/nrf/src/bin/mpsc.rs7
5 files changed, 116 insertions, 185 deletions
diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml
index 0a8ab4434..ae06bc198 100644
--- a/embassy/Cargo.toml
+++ b/embassy/Cargo.toml
@@ -42,6 +42,7 @@ embassy-traits = { version = "0.1.0", path = "../embassy-traits"}
42atomic-polyfill = "0.1.3" 42atomic-polyfill = "0.1.3"
43critical-section = "0.2.1" 43critical-section = "0.2.1"
44embedded-hal = "0.2.6" 44embedded-hal = "0.2.6"
45heapless = "0.7.5"
45 46
46[dev-dependencies] 47[dev-dependencies]
47embassy = { path = ".", features = ["executor-agnostic"] } 48embassy = { path = ".", features = ["executor-agnostic"] }
diff --git a/embassy/src/blocking_mutex/kind.rs b/embassy/src/blocking_mutex/kind.rs
new file mode 100644
index 000000000..30fc90497
--- /dev/null
+++ b/embassy/src/blocking_mutex/kind.rs
@@ -0,0 +1,19 @@
1use super::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex};
2
3pub trait MutexKind {
4 type Mutex<T>: Mutex<Data = T>;
5}
6
7pub enum CriticalSection {}
8impl MutexKind for CriticalSection {
9 type Mutex<T> = CriticalSectionMutex<T>;
10}
11
12pub enum ThreadMode {}
13impl MutexKind for ThreadMode {
14 type Mutex<T> = ThreadModeMutex<T>;
15}
16pub enum Noop {}
17impl MutexKind for Noop {
18 type Mutex<T> = NoopMutex<T>;
19}
diff --git a/embassy/src/blocking_mutex/mod.rs b/embassy/src/blocking_mutex/mod.rs
index d112d2ede..641a1ed93 100644
--- a/embassy/src/blocking_mutex/mod.rs
+++ b/embassy/src/blocking_mutex/mod.rs
@@ -1,5 +1,7 @@
1//! Blocking mutex (not async) 1//! Blocking mutex (not async)
2 2
3pub mod kind;
4
3use core::cell::UnsafeCell; 5use core::cell::UnsafeCell;
4use critical_section::CriticalSection; 6use critical_section::CriticalSection;
5 7
@@ -13,7 +15,7 @@ pub trait Mutex {
13 fn new(data: Self::Data) -> Self; 15 fn new(data: Self::Data) -> Self;
14 16
15 /// Creates a critical section and grants temporary access to the protected data. 17 /// Creates a critical section and grants temporary access to the protected data.
16 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; 18 fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R;
17} 19}
18 20
19/// A "mutex" based on critical sections 21/// A "mutex" based on critical sections
@@ -55,7 +57,7 @@ impl<T> Mutex for CriticalSectionMutex<T> {
55 Self::new(data) 57 Self::new(data)
56 } 58 }
57 59
58 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { 60 fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R {
59 critical_section::with(|cs| f(self.borrow(cs))) 61 critical_section::with(|cs| f(self.borrow(cs)))
60 } 62 }
61} 63}
@@ -102,7 +104,7 @@ impl<T> Mutex for ThreadModeMutex<T> {
102 Self::new(data) 104 Self::new(data)
103 } 105 }
104 106
105 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { 107 fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R {
106 f(self.borrow()) 108 f(self.borrow())
107 } 109 }
108} 110}
@@ -155,7 +157,7 @@ impl<T> Mutex for NoopMutex<T> {
155 Self::new(data) 157 Self::new(data)
156 } 158 }
157 159
158 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { 160 fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R {
159 f(self.borrow()) 161 f(self.borrow())
160 } 162 }
161} 163}
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs
index b20d48a95..9a57c0b19 100644
--- a/embassy/src/channel/mpsc.rs
+++ b/embassy/src/channel/mpsc.rs
@@ -37,19 +37,18 @@
37//! 37//!
38//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html 38//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html
39 39
40use core::cell::UnsafeCell; 40use core::cell::RefCell;
41use core::fmt; 41use core::fmt;
42use core::marker::PhantomData;
43use core::mem::MaybeUninit;
44use core::pin::Pin; 42use core::pin::Pin;
45use core::ptr;
46use core::task::Context; 43use core::task::Context;
47use core::task::Poll; 44use core::task::Poll;
48use core::task::Waker; 45use core::task::Waker;
49 46
50use futures::Future; 47use futures::Future;
48use heapless::Deque;
51 49
52use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; 50use crate::blocking_mutex::kind::MutexKind;
51use crate::blocking_mutex::Mutex;
53use crate::waitqueue::WakerRegistration; 52use crate::waitqueue::WakerRegistration;
54 53
55/// Send values to the associated `Receiver`. 54/// Send values to the associated `Receiver`.
@@ -57,36 +56,19 @@ use crate::waitqueue::WakerRegistration;
57/// Instances are created by the [`split`](split) function. 56/// Instances are created by the [`split`](split) function.
58pub struct Sender<'ch, M, T, const N: usize> 57pub struct Sender<'ch, M, T, const N: usize>
59where 58where
60 M: Mutex<Data = ()>, 59 M: MutexKind,
61{ 60{
62 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, 61 channel: &'ch Channel<M, T, N>,
63} 62}
64 63
65// Safe to pass the sender around
66unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
67{}
68unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
69{}
70
71/// Receive values from the associated `Sender`. 64/// Receive values from the associated `Sender`.
72/// 65///
73/// Instances are created by the [`split`](split) function. 66/// Instances are created by the [`split`](split) function.
74pub struct Receiver<'ch, M, T, const N: usize> 67pub struct Receiver<'ch, M, T, const N: usize>
75where 68where
76 M: Mutex<Data = ()>, 69 M: MutexKind,
77{
78 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
79 _receiver_consumed: &'ch mut PhantomData<()>,
80}
81
82// Safe to pass the receiver around
83unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where
84 M: Mutex<Data = ()> + Sync
85{
86}
87unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where
88 M: Mutex<Data = ()> + Sync
89{ 70{
71 channel: &'ch Channel<M, T, N>,
90} 72}
91 73
92/// Splits a bounded mpsc channel into a `Sender` and `Receiver`. 74/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
@@ -117,16 +99,11 @@ pub fn split<M, T, const N: usize>(
117 channel: &mut Channel<M, T, N>, 99 channel: &mut Channel<M, T, N>,
118) -> (Sender<M, T, N>, Receiver<M, T, N>) 100) -> (Sender<M, T, N>, Receiver<M, T, N>)
119where 101where
120 M: Mutex<Data = ()>, 102 M: MutexKind,
121{ 103{
122 let sender = Sender { 104 let sender = Sender { channel };
123 channel_cell: &channel.channel_cell, 105 let receiver = Receiver { channel };
124 }; 106 channel.lock(|c| {
125 let receiver = Receiver {
126 channel_cell: &channel.channel_cell,
127 _receiver_consumed: &mut channel.receiver_consumed,
128 };
129 Channel::lock(&channel.channel_cell, |c| {
130 c.register_receiver(); 107 c.register_receiver();
131 c.register_sender(); 108 c.register_sender();
132 }); 109 });
@@ -135,7 +112,7 @@ where
135 112
136impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> 113impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
137where 114where
138 M: Mutex<Data = ()>, 115 M: MutexKind,
139{ 116{
140 /// Receives the next value for this receiver. 117 /// Receives the next value for this receiver.
141 /// 118 ///
@@ -155,7 +132,7 @@ where
155 /// [`close`]: Self::close 132 /// [`close`]: Self::close
156 pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { 133 pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> {
157 RecvFuture { 134 RecvFuture {
158 channel_cell: self.channel_cell, 135 channel: self.channel,
159 } 136 }
160 } 137 }
161 138
@@ -164,7 +141,7 @@ where
164 /// This method will either receive a message from the channel immediately or return an error 141 /// This method will either receive a message from the channel immediately or return an error
165 /// if the channel is empty. 142 /// if the channel is empty.
166 pub fn try_recv(&self) -> Result<T, TryRecvError> { 143 pub fn try_recv(&self) -> Result<T, TryRecvError> {
167 Channel::lock(self.channel_cell, |c| c.try_recv()) 144 self.channel.lock(|c| c.try_recv())
168 } 145 }
169 146
170 /// Closes the receiving half of a channel without dropping it. 147 /// Closes the receiving half of a channel without dropping it.
@@ -178,56 +155,45 @@ where
178 /// until those are released. 155 /// until those are released.
179 /// 156 ///
180 pub fn close(&mut self) { 157 pub fn close(&mut self) {
181 Channel::lock(self.channel_cell, |c| c.close()) 158 self.channel.lock(|c| c.close())
182 } 159 }
183} 160}
184 161
185impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> 162impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
186where 163where
187 M: Mutex<Data = ()>, 164 M: MutexKind,
188{ 165{
189 fn drop(&mut self) { 166 fn drop(&mut self) {
190 Channel::lock(self.channel_cell, |c| c.deregister_receiver()) 167 self.channel.lock(|c| c.deregister_receiver())
191 } 168 }
192} 169}
193 170
194pub struct RecvFuture<'ch, M, T, const N: usize> 171pub struct RecvFuture<'ch, M, T, const N: usize>
195where 172where
196 M: Mutex<Data = ()>, 173 M: MutexKind,
197{ 174{
198 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, 175 channel: &'ch Channel<M, T, N>,
199} 176}
200 177
201impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> 178impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
202where 179where
203 M: Mutex<Data = ()>, 180 M: MutexKind,
204{ 181{
205 type Output = Option<T>; 182 type Output = Option<T>;
206 183
207 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { 184 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
208 Channel::lock(self.channel_cell, |c| { 185 self.channel
209 match c.try_recv_with_context(Some(cx)) { 186 .lock(|c| match c.try_recv_with_context(Some(cx)) {
210 Ok(v) => Poll::Ready(Some(v)), 187 Ok(v) => Poll::Ready(Some(v)),
211 Err(TryRecvError::Closed) => Poll::Ready(None), 188 Err(TryRecvError::Closed) => Poll::Ready(None),
212 Err(TryRecvError::Empty) => Poll::Pending, 189 Err(TryRecvError::Empty) => Poll::Pending,
213 } 190 })
214 })
215 } 191 }
216} 192}
217 193
218// Safe to pass the receive future around since it locks channel whenever polled
219unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where
220 M: Mutex<Data = ()> + Sync
221{
222}
223unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where
224 M: Mutex<Data = ()> + Sync
225{
226}
227
228impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> 194impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
229where 195where
230 M: Mutex<Data = ()>, 196 M: MutexKind,
231{ 197{
232 /// Sends a value, waiting until there is capacity. 198 /// Sends a value, waiting until there is capacity.
233 /// 199 ///
@@ -249,7 +215,7 @@ where
249 /// [`Receiver`]: Receiver 215 /// [`Receiver`]: Receiver
250 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { 216 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
251 SendFuture { 217 SendFuture {
252 sender: self.clone(), 218 channel: self.channel,
253 message: Some(message), 219 message: Some(message),
254 } 220 }
255 } 221 }
@@ -275,7 +241,7 @@ where
275 /// [`channel`]: channel 241 /// [`channel`]: channel
276 /// [`close`]: Receiver::close 242 /// [`close`]: Receiver::close
277 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { 243 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
278 Channel::lock(self.channel_cell, |c| c.try_send(message)) 244 self.channel.lock(|c| c.try_send(message))
279 } 245 }
280 246
281 /// Completes when the receiver has dropped. 247 /// Completes when the receiver has dropped.
@@ -284,7 +250,7 @@ where
284 /// values is canceled and immediately stop doing work. 250 /// values is canceled and immediately stop doing work.
285 pub async fn closed(&self) { 251 pub async fn closed(&self) {
286 CloseFuture { 252 CloseFuture {
287 sender: self.clone(), 253 channel: self.channel,
288 } 254 }
289 .await 255 .await
290 } 256 }
@@ -296,29 +262,27 @@ where
296 /// [`Receiver`]: crate::sync::mpsc::Receiver 262 /// [`Receiver`]: crate::sync::mpsc::Receiver
297 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close 263 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
298 pub fn is_closed(&self) -> bool { 264 pub fn is_closed(&self) -> bool {
299 Channel::lock(self.channel_cell, |c| c.is_closed()) 265 self.channel.lock(|c| c.is_closed())
300 } 266 }
301} 267}
302 268
303pub struct SendFuture<'ch, M, T, const N: usize> 269pub struct SendFuture<'ch, M, T, const N: usize>
304where 270where
305 M: Mutex<Data = ()>, 271 M: MutexKind,
306{ 272{
307 sender: Sender<'ch, M, T, N>, 273 channel: &'ch Channel<M, T, N>,
308 message: Option<T>, 274 message: Option<T>,
309} 275}
310 276
311impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> 277impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
312where 278where
313 M: Mutex<Data = ()>, 279 M: MutexKind,
314{ 280{
315 type Output = Result<(), SendError<T>>; 281 type Output = Result<(), SendError<T>>;
316 282
317 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 283 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
318 match self.message.take() { 284 match self.message.take() {
319 Some(m) => match Channel::lock(self.sender.channel_cell, |c| { 285 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
320 c.try_send_with_context(m, Some(cx))
321 }) {
322 Ok(..) => Poll::Ready(Ok(())), 286 Ok(..) => Poll::Ready(Ok(())),
323 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), 287 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
324 Err(TrySendError::Full(m)) => { 288 Err(TrySendError::Full(m)) => {
@@ -331,25 +295,23 @@ where
331 } 295 }
332} 296}
333 297
334impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {} 298impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: MutexKind {}
335 299
336struct CloseFuture<'ch, M, T, const N: usize> 300struct CloseFuture<'ch, M, T, const N: usize>
337where 301where
338 M: Mutex<Data = ()>, 302 M: MutexKind,
339{ 303{
340 sender: Sender<'ch, M, T, N>, 304 channel: &'ch Channel<M, T, N>,
341} 305}
342 306
343impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> 307impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
344where 308where
345 M: Mutex<Data = ()>, 309 M: MutexKind,
346{ 310{
347 type Output = (); 311 type Output = ();
348 312
349 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 313 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350 if Channel::lock(self.sender.channel_cell, |c| { 314 if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) {
351 c.is_closed_with_context(Some(cx))
352 }) {
353 Poll::Ready(()) 315 Poll::Ready(())
354 } else { 316 } else {
355 Poll::Pending 317 Poll::Pending
@@ -359,22 +321,21 @@ where
359 321
360impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> 322impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
361where 323where
362 M: Mutex<Data = ()>, 324 M: MutexKind,
363{ 325{
364 fn drop(&mut self) { 326 fn drop(&mut self) {
365 Channel::lock(self.channel_cell, |c| c.deregister_sender()) 327 self.channel.lock(|c| c.deregister_sender())
366 } 328 }
367} 329}
368 330
369impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> 331impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
370where 332where
371 M: Mutex<Data = ()>, 333 M: MutexKind,
372{ 334{
373 #[allow(clippy::clone_double_ref)]
374 fn clone(&self) -> Self { 335 fn clone(&self) -> Self {
375 Channel::lock(self.channel_cell, |c| c.register_sender()); 336 self.channel.lock(|c| c.register_sender());
376 Sender { 337 Sender {
377 channel_cell: self.channel_cell.clone(), 338 channel: self.channel,
378 } 339 }
379 } 340 }
380} 341}
@@ -446,10 +407,7 @@ impl<T> defmt::Format for TrySendError<T> {
446} 407}
447 408
448struct ChannelState<T, const N: usize> { 409struct ChannelState<T, const N: usize> {
449 buf: [MaybeUninit<UnsafeCell<T>>; N], 410 queue: Deque<T, N>,
450 read_pos: usize,
451 write_pos: usize,
452 full: bool,
453 closed: bool, 411 closed: bool,
454 receiver_registered: bool, 412 receiver_registered: bool,
455 senders_registered: u32, 413 senders_registered: u32,
@@ -458,14 +416,9 @@ struct ChannelState<T, const N: usize> {
458} 416}
459 417
460impl<T, const N: usize> ChannelState<T, N> { 418impl<T, const N: usize> ChannelState<T, N> {
461 const INIT: MaybeUninit<UnsafeCell<T>> = MaybeUninit::uninit();
462
463 const fn new() -> Self { 419 const fn new() -> Self {
464 ChannelState { 420 ChannelState {
465 buf: [Self::INIT; N], 421 queue: Deque::new(),
466 read_pos: 0,
467 write_pos: 0,
468 full: false,
469 closed: false, 422 closed: false,
470 receiver_registered: false, 423 receiver_registered: false,
471 senders_registered: 0, 424 senders_registered: 0,
@@ -479,17 +432,16 @@ impl<T, const N: usize> ChannelState<T, N> {
479 } 432 }
480 433
481 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { 434 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
482 if self.read_pos != self.write_pos || self.full { 435 if self.queue.is_full() {
483 if self.full { 436 self.senders_waker.wake();
484 self.full = false; 437 }
485 self.senders_waker.wake(); 438
486 } 439 if let Some(message) = self.queue.pop_front() {
487 let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() };
488 self.read_pos = (self.read_pos + 1) % self.buf.len();
489 Ok(message) 440 Ok(message)
490 } else if !self.closed { 441 } else if !self.closed {
491 cx.into_iter() 442 if let Some(cx) = cx {
492 .for_each(|cx| self.set_receiver_waker(&cx.waker())); 443 self.set_receiver_waker(cx.waker());
444 }
493 Err(TryRecvError::Empty) 445 Err(TryRecvError::Empty)
494 } else { 446 } else {
495 Err(TryRecvError::Closed) 447 Err(TryRecvError::Closed)
@@ -505,22 +457,21 @@ impl<T, const N: usize> ChannelState<T, N> {
505 message: T, 457 message: T,
506 cx: Option<&mut Context<'_>>, 458 cx: Option<&mut Context<'_>>,
507 ) -> Result<(), TrySendError<T>> { 459 ) -> Result<(), TrySendError<T>> {
508 if !self.closed { 460 if self.closed {
509 if !self.full { 461 return Err(TrySendError::Closed(message));
510 self.buf[self.write_pos] = MaybeUninit::new(message.into()); 462 }
511 self.write_pos = (self.write_pos + 1) % self.buf.len(); 463
512 if self.write_pos == self.read_pos { 464 match self.queue.push_back(message) {
513 self.full = true; 465 Ok(()) => {
514 }
515 self.receiver_waker.wake(); 466 self.receiver_waker.wake();
467
516 Ok(()) 468 Ok(())
517 } else { 469 }
470 Err(message) => {
518 cx.into_iter() 471 cx.into_iter()
519 .for_each(|cx| self.set_senders_waker(&cx.waker())); 472 .for_each(|cx| self.set_senders_waker(&cx.waker()));
520 Err(TrySendError::Full(message)) 473 Err(TrySendError::Full(message))
521 } 474 }
522 } else {
523 Err(TrySendError::Closed(message))
524 } 475 }
525 } 476 }
526 477
@@ -585,16 +536,6 @@ impl<T, const N: usize> ChannelState<T, N> {
585 } 536 }
586} 537}
587 538
588impl<T, const N: usize> Drop for ChannelState<T, N> {
589 fn drop(&mut self) {
590 while self.read_pos != self.write_pos || self.full {
591 self.full = false;
592 unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) };
593 self.read_pos = (self.read_pos + 1) % N;
594 }
595 }
596}
597
598/// A a bounded mpsc channel for communicating between asynchronous tasks 539/// A a bounded mpsc channel for communicating between asynchronous tasks
599/// with backpressure. 540/// with backpressure.
600/// 541///
@@ -605,61 +546,35 @@ impl<T, const N: usize> Drop for ChannelState<T, N> {
605/// All data sent will become available in the same order as it was sent. 546/// All data sent will become available in the same order as it was sent.
606pub struct Channel<M, T, const N: usize> 547pub struct Channel<M, T, const N: usize>
607where 548where
608 M: Mutex<Data = ()>, 549 M: MutexKind,
609{ 550{
610 channel_cell: UnsafeCell<ChannelCell<M, T, N>>, 551 inner: M::Mutex<RefCell<ChannelState<T, N>>>,
611 receiver_consumed: PhantomData<()>,
612} 552}
613 553
614struct ChannelCell<M, T, const N: usize>
615where
616 M: Mutex<Data = ()>,
617{
618 mutex: M,
619 state: ChannelState<T, N>,
620}
621
622pub type WithCriticalSections = CriticalSectionMutex<()>;
623
624pub type WithThreadModeOnly = ThreadModeMutex<()>;
625
626pub type WithNoThreads = NoopMutex<()>;
627
628impl<M, T, const N: usize> Channel<M, T, N> 554impl<M, T, const N: usize> Channel<M, T, N>
629where 555where
630 M: Mutex<Data = ()>, 556 M: MutexKind,
631{ 557{
632 /// Establish a new bounded channel. For example, to create one with a NoopMutex: 558 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
633 /// 559 ///
634 /// ``` 560 /// ```
635 /// use embassy::channel::mpsc; 561 /// use embassy::channel::mpsc;
636 /// use embassy::channel::mpsc::{Channel, WithNoThreads}; 562 /// use embassy::blocking_mutex::kind::Noop;
563 /// use embassy::channel::mpsc::Channel;
637 /// 564 ///
638 /// // Declare a bounded channel of 3 u32s. 565 /// // Declare a bounded channel of 3 u32s.
639 /// let mut channel = Channel::<WithNoThreads, u32, 3>::new(); 566 /// let mut channel = Channel::<Noop, u32, 3>::new();
640 /// // once we have a channel, obtain its sender and receiver 567 /// // once we have a channel, obtain its sender and receiver
641 /// let (sender, receiver) = mpsc::split(&mut channel); 568 /// let (sender, receiver) = mpsc::split(&mut channel);
642 /// ``` 569 /// ```
643 pub fn new() -> Self { 570 pub fn new() -> Self {
644 let mutex = M::new(()); 571 Self {
645 let state = ChannelState::new(); 572 inner: M::Mutex::new(RefCell::new(ChannelState::new())),
646 let channel_cell = ChannelCell { mutex, state };
647 Channel {
648 channel_cell: UnsafeCell::new(channel_cell),
649 receiver_consumed: PhantomData,
650 } 573 }
651 } 574 }
652 575
653 fn lock<R>( 576 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
654 channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, 577 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
655 f: impl FnOnce(&mut ChannelState<T, N>) -> R,
656 ) -> R {
657 unsafe {
658 let channel_cell = &mut *(channel_cell.get());
659 let mutex = &mut channel_cell.mutex;
660 let mut state = &mut channel_cell.state;
661 mutex.lock(|_| f(&mut state))
662 }
663 } 578 }
664} 579}
665 580
@@ -671,20 +586,13 @@ mod tests {
671 use futures_executor::ThreadPool; 586 use futures_executor::ThreadPool;
672 use futures_timer::Delay; 587 use futures_timer::Delay;
673 588
589 use crate::blocking_mutex::kind::{CriticalSection, Noop};
674 use crate::util::Forever; 590 use crate::util::Forever;
675 591
676 use super::*; 592 use super::*;
677 593
678 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { 594 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
679 if !c.full { 595 c.queue.capacity() - c.queue.len()
680 if c.write_pos > c.read_pos {
681 (c.buf.len() - c.write_pos) + c.read_pos
682 } else {
683 (c.buf.len() - c.read_pos) + c.write_pos
684 }
685 } else {
686 0
687 }
688 } 596 }
689 597
690 #[test] 598 #[test]
@@ -747,7 +655,7 @@ mod tests {
747 655
748 #[test] 656 #[test]
749 fn simple_send_and_receive() { 657 fn simple_send_and_receive() {
750 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 658 let mut c = Channel::<Noop, u32, 3>::new();
751 let (s, r) = split(&mut c); 659 let (s, r) = split(&mut c);
752 assert!(s.clone().try_send(1).is_ok()); 660 assert!(s.clone().try_send(1).is_ok());
753 assert_eq!(r.try_recv().unwrap(), 1); 661 assert_eq!(r.try_recv().unwrap(), 1);
@@ -755,7 +663,7 @@ mod tests {
755 663
756 #[test] 664 #[test]
757 fn should_close_without_sender() { 665 fn should_close_without_sender() {
758 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 666 let mut c = Channel::<Noop, u32, 3>::new();
759 let (s, r) = split(&mut c); 667 let (s, r) = split(&mut c);
760 drop(s); 668 drop(s);
761 match r.try_recv() { 669 match r.try_recv() {
@@ -766,7 +674,7 @@ mod tests {
766 674
767 #[test] 675 #[test]
768 fn should_close_once_drained() { 676 fn should_close_once_drained() {
769 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 677 let mut c = Channel::<Noop, u32, 3>::new();
770 let (s, r) = split(&mut c); 678 let (s, r) = split(&mut c);
771 assert!(s.try_send(1).is_ok()); 679 assert!(s.try_send(1).is_ok());
772 drop(s); 680 drop(s);
@@ -779,7 +687,7 @@ mod tests {
779 687
780 #[test] 688 #[test]
781 fn should_reject_send_when_receiver_dropped() { 689 fn should_reject_send_when_receiver_dropped() {
782 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 690 let mut c = Channel::<Noop, u32, 3>::new();
783 let (s, r) = split(&mut c); 691 let (s, r) = split(&mut c);
784 drop(r); 692 drop(r);
785 match s.try_send(1) { 693 match s.try_send(1) {
@@ -790,7 +698,7 @@ mod tests {
790 698
791 #[test] 699 #[test]
792 fn should_reject_send_when_channel_closed() { 700 fn should_reject_send_when_channel_closed() {
793 let mut c = Channel::<WithNoThreads, u32, 3>::new(); 701 let mut c = Channel::<Noop, u32, 3>::new();
794 let (s, mut r) = split(&mut c); 702 let (s, mut r) = split(&mut c);
795 assert!(s.try_send(1).is_ok()); 703 assert!(s.try_send(1).is_ok());
796 r.close(); 704 r.close();
@@ -806,7 +714,7 @@ mod tests {
806 async fn receiver_closes_when_sender_dropped_async() { 714 async fn receiver_closes_when_sender_dropped_async() {
807 let executor = ThreadPool::new().unwrap(); 715 let executor = ThreadPool::new().unwrap();
808 716
809 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); 717 static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new();
810 let c = CHANNEL.put(Channel::new()); 718 let c = CHANNEL.put(Channel::new());
811 let (s, mut r) = split(c); 719 let (s, mut r) = split(c);
812 assert!(executor 720 assert!(executor
@@ -821,7 +729,7 @@ mod tests {
821 async fn receiver_receives_given_try_send_async() { 729 async fn receiver_receives_given_try_send_async() {
822 let executor = ThreadPool::new().unwrap(); 730 let executor = ThreadPool::new().unwrap();
823 731
824 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); 732 static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new();
825 let c = CHANNEL.put(Channel::new()); 733 let c = CHANNEL.put(Channel::new());
826 let (s, mut r) = split(c); 734 let (s, mut r) = split(c);
827 assert!(executor 735 assert!(executor
@@ -834,7 +742,7 @@ mod tests {
834 742
835 #[futures_test::test] 743 #[futures_test::test]
836 async fn sender_send_completes_if_capacity() { 744 async fn sender_send_completes_if_capacity() {
837 let mut c = Channel::<WithCriticalSections, u32, 1>::new(); 745 let mut c = Channel::<CriticalSection, u32, 1>::new();
838 let (s, mut r) = split(&mut c); 746 let (s, mut r) = split(&mut c);
839 assert!(s.send(1).await.is_ok()); 747 assert!(s.send(1).await.is_ok());
840 assert_eq!(r.recv().await, Some(1)); 748 assert_eq!(r.recv().await, Some(1));
@@ -842,7 +750,7 @@ mod tests {
842 750
843 #[futures_test::test] 751 #[futures_test::test]
844 async fn sender_send_completes_if_closed() { 752 async fn sender_send_completes_if_closed() {
845 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 753 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
846 let c = CHANNEL.put(Channel::new()); 754 let c = CHANNEL.put(Channel::new());
847 let (s, r) = split(c); 755 let (s, r) = split(c);
848 drop(r); 756 drop(r);
@@ -856,7 +764,7 @@ mod tests {
856 async fn senders_sends_wait_until_capacity() { 764 async fn senders_sends_wait_until_capacity() {
857 let executor = ThreadPool::new().unwrap(); 765 let executor = ThreadPool::new().unwrap();
858 766
859 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 767 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
860 let c = CHANNEL.put(Channel::new()); 768 let c = CHANNEL.put(Channel::new());
861 let (s0, mut r) = split(c); 769 let (s0, mut r) = split(c);
862 assert!(s0.try_send(1).is_ok()); 770 assert!(s0.try_send(1).is_ok());
@@ -876,7 +784,7 @@ mod tests {
876 784
877 #[futures_test::test] 785 #[futures_test::test]
878 async fn sender_close_completes_if_closing() { 786 async fn sender_close_completes_if_closing() {
879 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 787 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
880 let c = CHANNEL.put(Channel::new()); 788 let c = CHANNEL.put(Channel::new());
881 let (s, mut r) = split(c); 789 let (s, mut r) = split(c);
882 r.close(); 790 r.close();
@@ -885,7 +793,7 @@ mod tests {
885 793
886 #[futures_test::test] 794 #[futures_test::test]
887 async fn sender_close_completes_if_closed() { 795 async fn sender_close_completes_if_closed() {
888 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); 796 static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
889 let c = CHANNEL.put(Channel::new()); 797 let c = CHANNEL.put(Channel::new());
890 let (s, r) = split(c); 798 let (s, r) = split(c);
891 drop(r); 799 drop(r);
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
index 79fa3dfb9..c85b7c282 100644
--- a/examples/nrf/src/bin/mpsc.rs
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -6,7 +6,8 @@
6mod example_common; 6mod example_common;
7 7
8use defmt::unwrap; 8use defmt::unwrap;
9use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError, WithNoThreads}; 9use embassy::blocking_mutex::kind::Noop;
10use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError};
10use embassy::executor::Spawner; 11use embassy::executor::Spawner;
11use embassy::time::{Duration, Timer}; 12use embassy::time::{Duration, Timer};
12use embassy::util::Forever; 13use embassy::util::Forever;
@@ -19,10 +20,10 @@ enum LedState {
19 Off, 20 Off,
20} 21}
21 22
22static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new(); 23static CHANNEL: Forever<Channel<Noop, LedState, 1>> = Forever::new();
23 24
24#[embassy::task(pool_size = 1)] 25#[embassy::task(pool_size = 1)]
25async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { 26async fn my_task(sender: Sender<'static, Noop, LedState, 1>) {
26 loop { 27 loop {
27 let _ = sender.send(LedState::On).await; 28 let _ = sender.send(LedState::On).await;
28 Timer::after(Duration::from_secs(1)).await; 29 Timer::after(Duration::from_secs(1)).await;