aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/Cargo.toml8
-rw-r--r--embassy/src/util/mod.rs1
-rw-r--r--embassy/src/util/mpsc.rs854
-rw-r--r--embassy/src/util/mutex.rs66
-rw-r--r--examples/nrf/src/bin/mpsc.rs65
5 files changed, 994 insertions, 0 deletions
diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml
index b2ad80495..c03fc0df5 100644
--- a/embassy/Cargo.toml
+++ b/embassy/Cargo.toml
@@ -3,6 +3,7 @@ name = "embassy"
3version = "0.1.0" 3version = "0.1.0"
4authors = ["Dario Nieuwenhuis <[email protected]>"] 4authors = ["Dario Nieuwenhuis <[email protected]>"]
5edition = "2018" 5edition = "2018"
6resolver = "2"
6 7
7[features] 8[features]
8default = [] 9default = []
@@ -36,3 +37,10 @@ embedded-hal = "0.2.5"
36 37
37# Workaround https://github.com/japaric/cast.rs/pull/27 38# Workaround https://github.com/japaric/cast.rs/pull/27
38cast = { version = "=0.2.3", default-features = false } 39cast = { version = "=0.2.3", default-features = false }
40
41[dev-dependencies]
42embassy = { path = ".", features = ["executor-agnostic"] }
43futures-executor = { version = "0.3", features = [ "thread-pool" ] }
44futures-test = "0.3"
45futures-timer = "0.3"
46futures-util = { version = "0.3", features = [ "channel" ] }
diff --git a/embassy/src/util/mod.rs b/embassy/src/util/mod.rs
index 88ae5c285..87d313e28 100644
--- a/embassy/src/util/mod.rs
+++ b/embassy/src/util/mod.rs
@@ -11,6 +11,7 @@ mod waker;
11 11
12pub use drop_bomb::*; 12pub use drop_bomb::*;
13pub use forever::*; 13pub use forever::*;
14pub mod mpsc;
14pub use mutex::*; 15pub use mutex::*;
15pub use on_drop::*; 16pub use on_drop::*;
16pub use portal::*; 17pub use portal::*;
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
new file mode 100644
index 000000000..cc9e2a5dd
--- /dev/null
+++ b/embassy/src/util/mpsc.rs
@@ -0,0 +1,854 @@
1//! A multi-producer, single-consumer queue for sending values between
2//! asynchronous tasks. This queue takes a Mutex type so that various
3//! targets can be attained. For example, a ThreadModeMutex can be used
4//! for single-core Cortex-M targets where messages are only passed
5//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
6//! can also be used for single-core targets where messages are to be
7//! passed from exception mode e.g. out of an interrupt handler.
8//!
9//! This module provides a bounded channel that has a limit on the number of
10//! messages that it can store, and if this limit is reached, trying to send
11//! another message will result in an error being returned.
12//!
13//! Similar to the `mpsc` channels provided by `std`, the channel constructor
14//! functions provide separate send and receive handles, [`Sender`] and
15//! [`Receiver`]. If there is no message to read, the current task will be
16//! notified when a new value is sent. [`Sender`] allows sending values into
17//! the channel. If the bounded channel is at capacity, the send is rejected.
18//!
19//! # Disconnection
20//!
21//! When all [`Sender`] handles have been dropped, it is no longer
22//! possible to send values into the channel. This is considered the termination
23//! event of the stream.
24//!
25//! If the [`Receiver`] handle is dropped, then messages can no longer
26//! be read out of the channel. In this case, all further attempts to send will
27//! result in an error.
28//!
29//! # Clean Shutdown
30//!
31//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to
32//! remain in the channel. Instead, it is usually desirable to perform a "clean"
33//! shutdown. To do this, the receiver first calls `close`, which will prevent
34//! any further messages to be sent into the channel. Then, the receiver
35//! consumes the channel to completion, at which point the receiver can be
36//! dropped.
37//!
38//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html
39
40use core::cell::UnsafeCell;
41use core::fmt;
42use core::marker::PhantomData;
43use core::mem::MaybeUninit;
44use core::pin::Pin;
45use core::ptr;
46use core::task::Context;
47use core::task::Poll;
48use core::task::Waker;
49
50use futures::Future;
51
52use super::CriticalSectionMutex;
53use super::Mutex;
54use super::NoopMutex;
55use super::ThreadModeMutex;
56use super::WakerRegistration;
57
58/// Send values to the associated `Receiver`.
59///
60/// Instances are created by the [`split`](split) function.
61pub struct Sender<'ch, M, T, const N: usize>
62where
63 M: Mutex<Data = ()>,
64{
65 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
66}
67
68// Safe to pass the sender around
69unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
70{}
71unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
72{}
73
74/// Receive values from the associated `Sender`.
75///
76/// Instances are created by the [`split`](split) function.
77pub struct Receiver<'ch, M, T, const N: usize>
78where
79 M: Mutex<Data = ()>,
80{
81 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
82 _receiver_consumed: &'ch mut PhantomData<()>,
83}
84
85// Safe to pass the receiver around
86unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where
87 M: Mutex<Data = ()> + Sync
88{
89}
90unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where
91 M: Mutex<Data = ()> + Sync
92{
93}
94
95/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
96///
97/// All data sent on `Sender` will become available on `Receiver` in the same
98/// order as it was sent.
99///
100/// The `Sender` can be cloned to `send` to the same channel from multiple code
101/// locations. Only one `Receiver` is valid.
102///
103/// If the `Receiver` is disconnected while trying to `send`, the `send` method
104/// will return a `SendError`. Similarly, if `Sender` is disconnected while
105/// trying to `recv`, the `recv` method will return a `RecvError`.
106///
107/// Note that when splitting the channel, the sender and receiver cannot outlive
108/// their channel. The following will therefore fail compilation:
109////
110/// ```compile_fail
111/// use embassy::util::mpsc;
112/// use embassy::util::mpsc::{Channel, WithThreadModeOnly};
113///
114/// let (sender, receiver) = {
115/// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
116/// mpsc::split(&mut channel)
117/// };
118/// ```
119pub fn split<M, T, const N: usize>(
120 channel: &mut Channel<M, T, N>,
121) -> (Sender<M, T, N>, Receiver<M, T, N>)
122where
123 M: Mutex<Data = ()>,
124{
125 let sender = Sender {
126 channel_cell: &channel.channel_cell,
127 };
128 let receiver = Receiver {
129 channel_cell: &channel.channel_cell,
130 _receiver_consumed: &mut channel.receiver_consumed,
131 };
132 Channel::lock(&channel.channel_cell, |c| {
133 c.register_receiver();
134 c.register_sender();
135 });
136 (sender, receiver)
137}
138
139impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
140where
141 M: Mutex<Data = ()>,
142{
143 /// Receives the next value for this receiver.
144 ///
145 /// This method returns `None` if the channel has been closed and there are
146 /// no remaining messages in the channel's buffer. This indicates that no
147 /// further values can ever be received from this `Receiver`. The channel is
148 /// closed when all senders have been dropped, or when [`close`] is called.
149 ///
150 /// If there are no messages in the channel's buffer, but the channel has
151 /// not yet been closed, this method will sleep until a message is sent or
152 /// the channel is closed.
153 ///
154 /// Note that if [`close`] is called, but there are still outstanding
155 /// messages from before it was closed, the channel is not considered
156 /// closed by `recv` until they are all consumed.
157 ///
158 /// [`close`]: Self::close
159 pub async fn recv(&mut self) -> Option<T> {
160 futures::future::poll_fn(|cx| self.recv_poll(cx)).await
161 }
162
163 fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
164 Channel::lock(self.channel_cell, |c| {
165 match c.try_recv_with_context(Some(cx)) {
166 Ok(v) => Poll::Ready(Some(v)),
167 Err(TryRecvError::Closed) => Poll::Ready(None),
168 Err(TryRecvError::Empty) => Poll::Pending,
169 }
170 })
171 }
172
173 /// Attempts to immediately receive a message on this `Receiver`
174 ///
175 /// This method will either receive a message from the channel immediately or return an error
176 /// if the channel is empty.
177 pub fn try_recv(&self) -> Result<T, TryRecvError> {
178 Channel::lock(self.channel_cell, |c| c.try_recv())
179 }
180
181 /// Closes the receiving half of a channel without dropping it.
182 ///
183 /// This prevents any further messages from being sent on the channel while
184 /// still enabling the receiver to drain messages that are buffered.
185 ///
186 /// To guarantee that no messages are dropped, after calling `close()`,
187 /// `recv()` must be called until `None` is returned. If there are
188 /// outstanding messages, the `recv` method will not return `None`
189 /// until those are released.
190 ///
191 pub fn close(&mut self) {
192 Channel::lock(self.channel_cell, |c| c.close())
193 }
194}
195
196impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
197where
198 M: Mutex<Data = ()>,
199{
200 fn drop(&mut self) {
201 Channel::lock(self.channel_cell, |c| c.deregister_receiver())
202 }
203}
204
205impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
206where
207 M: Mutex<Data = ()>,
208{
209 /// Sends a value, waiting until there is capacity.
210 ///
211 /// A successful send occurs when it is determined that the other end of the
212 /// channel has not hung up already. An unsuccessful send would be one where
213 /// the corresponding receiver has already been closed. Note that a return
214 /// value of `Err` means that the data will never be received, but a return
215 /// value of `Ok` does not mean that the data will be received. It is
216 /// possible for the corresponding receiver to hang up immediately after
217 /// this function returns `Ok`.
218 ///
219 /// # Errors
220 ///
221 /// If the receive half of the channel is closed, either due to [`close`]
222 /// being called or the [`Receiver`] handle dropping, the function returns
223 /// an error. The error includes the value passed to `send`.
224 ///
225 /// [`close`]: Receiver::close
226 /// [`Receiver`]: Receiver
227 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
228 SendFuture {
229 sender: self.clone(),
230 message: Some(message),
231 }
232 .await
233 }
234
235 /// Attempts to immediately send a message on this `Sender`
236 ///
237 /// This method differs from [`send`] by returning immediately if the channel's
238 /// buffer is full or no receiver is waiting to acquire some data. Compared
239 /// with [`send`], this function has two failure cases instead of one (one for
240 /// disconnection, one for a full buffer).
241 ///
242 /// # Errors
243 ///
244 /// If the channel capacity has been reached, i.e., the channel has `n`
245 /// buffered values where `n` is the argument passed to [`channel`], then an
246 /// error is returned.
247 ///
248 /// If the receive half of the channel is closed, either due to [`close`]
249 /// being called or the [`Receiver`] handle dropping, the function returns
250 /// an error. The error includes the value passed to `send`.
251 ///
252 /// [`send`]: Sender::send
253 /// [`channel`]: channel
254 /// [`close`]: Receiver::close
255 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
256 Channel::lock(self.channel_cell, |c| c.try_send(message))
257 }
258
259 /// Completes when the receiver has dropped.
260 ///
261 /// This allows the producers to get notified when interest in the produced
262 /// values is canceled and immediately stop doing work.
263 pub async fn closed(&self) {
264 CloseFuture {
265 sender: self.clone(),
266 }
267 .await
268 }
269
270 /// Checks if the channel has been closed. This happens when the
271 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
272 /// called.
273 ///
274 /// [`Receiver`]: crate::sync::mpsc::Receiver
275 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
276 pub fn is_closed(&self) -> bool {
277 Channel::lock(self.channel_cell, |c| c.is_closed())
278 }
279}
280
281struct SendFuture<'ch, M, T, const N: usize>
282where
283 M: Mutex<Data = ()>,
284{
285 sender: Sender<'ch, M, T, N>,
286 message: Option<T>,
287}
288
289impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
290where
291 M: Mutex<Data = ()>,
292{
293 type Output = Result<(), SendError<T>>;
294
295 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296 match self.message.take() {
297 Some(m) => match Channel::lock(self.sender.channel_cell, |c| {
298 c.try_send_with_context(m, Some(cx))
299 }) {
300 Ok(..) => Poll::Ready(Ok(())),
301 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
302 Err(TrySendError::Full(m)) => {
303 self.message.insert(m);
304 Poll::Pending
305 }
306 },
307 None => panic!("Message cannot be None"),
308 }
309 }
310}
311
312impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {}
313
314struct CloseFuture<'ch, M, T, const N: usize>
315where
316 M: Mutex<Data = ()>,
317{
318 sender: Sender<'ch, M, T, N>,
319}
320
321impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
322where
323 M: Mutex<Data = ()>,
324{
325 type Output = ();
326
327 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328 if Channel::lock(self.sender.channel_cell, |c| {
329 c.is_closed_with_context(Some(cx))
330 }) {
331 Poll::Ready(())
332 } else {
333 Poll::Pending
334 }
335 }
336}
337
338impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
339where
340 M: Mutex<Data = ()>,
341{
342 fn drop(&mut self) {
343 Channel::lock(self.channel_cell, |c| c.deregister_sender())
344 }
345}
346
347impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
348where
349 M: Mutex<Data = ()>,
350{
351 #[allow(clippy::clone_double_ref)]
352 fn clone(&self) -> Self {
353 Channel::lock(self.channel_cell, |c| c.register_sender());
354 Sender {
355 channel_cell: self.channel_cell.clone(),
356 }
357 }
358}
359
360/// An error returned from the [`try_recv`] method.
361///
362/// [`try_recv`]: super::Receiver::try_recv
363#[derive(PartialEq, Eq, Clone, Copy, Debug)]
364pub enum TryRecvError {
365 /// A message could not be received because the channel is empty.
366 Empty,
367
368 /// The message could not be received because the channel is empty and closed.
369 Closed,
370}
371
372/// Error returned by the `Sender`.
373#[derive(Debug)]
374pub struct SendError<T>(pub T);
375
376impl<T> fmt::Display for SendError<T> {
377 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
378 write!(fmt, "channel closed")
379 }
380}
381
382/// This enumeration is the list of the possible error outcomes for the
383/// [try_send](super::Sender::try_send) method.
384#[derive(Debug)]
385pub enum TrySendError<T> {
386 /// The data could not be sent on the channel because the channel is
387 /// currently full and sending would require blocking.
388 Full(T),
389
390 /// The receive half of the channel was explicitly closed or has been
391 /// dropped.
392 Closed(T),
393}
394
395impl<T> fmt::Display for TrySendError<T> {
396 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
397 write!(
398 fmt,
399 "{}",
400 match self {
401 TrySendError::Full(..) => "no available capacity",
402 TrySendError::Closed(..) => "channel closed",
403 }
404 )
405 }
406}
407
408struct ChannelState<T, const N: usize> {
409 buf: [MaybeUninit<UnsafeCell<T>>; N],
410 read_pos: usize,
411 write_pos: usize,
412 full: bool,
413 closed: bool,
414 receiver_registered: bool,
415 senders_registered: u32,
416 receiver_waker: WakerRegistration,
417 senders_waker: WakerRegistration,
418}
419
420impl<T, const N: usize> ChannelState<T, N> {
421 const INIT: MaybeUninit<UnsafeCell<T>> = MaybeUninit::uninit();
422
423 const fn new() -> Self {
424 ChannelState {
425 buf: [Self::INIT; N],
426 read_pos: 0,
427 write_pos: 0,
428 full: false,
429 closed: false,
430 receiver_registered: false,
431 senders_registered: 0,
432 receiver_waker: WakerRegistration::new(),
433 senders_waker: WakerRegistration::new(),
434 }
435 }
436
437 fn try_recv(&mut self) -> Result<T, TryRecvError> {
438 self.try_recv_with_context(None)
439 }
440
441 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
442 if self.read_pos != self.write_pos || self.full {
443 if self.full {
444 self.full = false;
445 self.senders_waker.wake();
446 }
447 let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() };
448 self.read_pos = (self.read_pos + 1) % self.buf.len();
449 Ok(message)
450 } else if !self.closed {
451 cx.into_iter()
452 .for_each(|cx| self.set_receiver_waker(&cx.waker()));
453 Err(TryRecvError::Empty)
454 } else {
455 Err(TryRecvError::Closed)
456 }
457 }
458
459 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
460 self.try_send_with_context(message, None)
461 }
462
463 fn try_send_with_context(
464 &mut self,
465 message: T,
466 cx: Option<&mut Context<'_>>,
467 ) -> Result<(), TrySendError<T>> {
468 if !self.closed {
469 if !self.full {
470 self.buf[self.write_pos] = MaybeUninit::new(message.into());
471 self.write_pos = (self.write_pos + 1) % self.buf.len();
472 if self.write_pos == self.read_pos {
473 self.full = true;
474 }
475 self.receiver_waker.wake();
476 Ok(())
477 } else {
478 cx.into_iter()
479 .for_each(|cx| self.set_senders_waker(&cx.waker()));
480 Err(TrySendError::Full(message))
481 }
482 } else {
483 Err(TrySendError::Closed(message))
484 }
485 }
486
487 fn close(&mut self) {
488 self.receiver_waker.wake();
489 self.closed = true;
490 }
491
492 fn is_closed(&mut self) -> bool {
493 self.is_closed_with_context(None)
494 }
495
496 fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool {
497 if self.closed {
498 cx.into_iter()
499 .for_each(|cx| self.set_senders_waker(&cx.waker()));
500 true
501 } else {
502 false
503 }
504 }
505
506 fn register_receiver(&mut self) {
507 assert!(!self.receiver_registered);
508 self.receiver_registered = true;
509 }
510
511 fn deregister_receiver(&mut self) {
512 if self.receiver_registered {
513 self.closed = true;
514 self.senders_waker.wake();
515 }
516 self.receiver_registered = false;
517 }
518
519 fn register_sender(&mut self) {
520 self.senders_registered += 1;
521 }
522
523 fn deregister_sender(&mut self) {
524 assert!(self.senders_registered > 0);
525 self.senders_registered -= 1;
526 if self.senders_registered == 0 {
527 self.receiver_waker.wake();
528 self.closed = true;
529 }
530 }
531
532 fn set_receiver_waker(&mut self, receiver_waker: &Waker) {
533 self.receiver_waker.register(receiver_waker);
534 }
535
536 fn set_senders_waker(&mut self, senders_waker: &Waker) {
537 // Dispose of any existing sender causing them to be polled again.
538 // This could cause a spin given multiple concurrent senders, however given that
539 // most sends only block waiting for the receiver to become active, this should
540 // be a short-lived activity. The upside is a greatly simplified implementation
541 // that avoids the need for intrusive linked-lists and unsafe operations on pinned
542 // pointers.
543 self.senders_waker.wake();
544 self.senders_waker.register(senders_waker);
545 }
546}
547
548impl<T, const N: usize> Drop for ChannelState<T, N> {
549 fn drop(&mut self) {
550 while self.read_pos != self.write_pos || self.full {
551 self.full = false;
552 unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) };
553 self.read_pos = (self.read_pos + 1) % N;
554 }
555 }
556}
557
558/// A a bounded mpsc channel for communicating between asynchronous tasks
559/// with backpressure.
560///
561/// The channel will buffer up to the provided number of messages. Once the
562/// buffer is full, attempts to `send` new messages will wait until a message is
563/// received from the channel.
564///
565/// All data sent will become available in the same order as it was sent.
566pub struct Channel<M, T, const N: usize>
567where
568 M: Mutex<Data = ()>,
569{
570 channel_cell: UnsafeCell<ChannelCell<M, T, N>>,
571 receiver_consumed: PhantomData<()>,
572}
573
574struct ChannelCell<M, T, const N: usize>
575where
576 M: Mutex<Data = ()>,
577{
578 mutex: M,
579 state: ChannelState<T, N>,
580}
581
582pub type WithCriticalSections = CriticalSectionMutex<()>;
583
584pub type WithThreadModeOnly = ThreadModeMutex<()>;
585
586pub type WithNoThreads = NoopMutex<()>;
587
588impl<M, T, const N: usize> Channel<M, T, N>
589where
590 M: Mutex<Data = ()>,
591{
592 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
593 ///
594 /// ```
595 /// use embassy::util::mpsc;
596 /// use embassy::util::mpsc::{Channel, WithNoThreads};
597 ///
598 /// // Declare a bounded channel of 3 u32s.
599 /// let mut channel = Channel::<WithNoThreads, u32, 3>::new();
600 /// // once we have a channel, obtain its sender and receiver
601 /// let (sender, receiver) = mpsc::split(&mut channel);
602 /// ```
603 pub fn new() -> Self {
604 let mutex = M::new(());
605 let state = ChannelState::new();
606 let channel_cell = ChannelCell { mutex, state };
607 Channel {
608 channel_cell: UnsafeCell::new(channel_cell),
609 receiver_consumed: PhantomData,
610 }
611 }
612
613 fn lock<R>(
614 channel_cell: &UnsafeCell<ChannelCell<M, T, N>>,
615 f: impl FnOnce(&mut ChannelState<T, N>) -> R,
616 ) -> R {
617 unsafe {
618 let channel_cell = &mut *(channel_cell.get());
619 let mutex = &mut channel_cell.mutex;
620 let mut state = &mut channel_cell.state;
621 mutex.lock(|_| f(&mut state))
622 }
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use core::time::Duration;
629
630 use futures::task::SpawnExt;
631 use futures_executor::ThreadPool;
632 use futures_timer::Delay;
633
634 use crate::util::Forever;
635
636 use super::*;
637
638 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
639 if !c.full {
640 if c.write_pos > c.read_pos {
641 (c.buf.len() - c.write_pos) + c.read_pos
642 } else {
643 (c.buf.len() - c.read_pos) + c.write_pos
644 }
645 } else {
646 0
647 }
648 }
649
650 #[test]
651 fn sending_once() {
652 let mut c = ChannelState::<u32, 3>::new();
653 assert!(c.try_send(1).is_ok());
654 assert_eq!(capacity(&c), 2);
655 }
656
657 #[test]
658 fn sending_when_full() {
659 let mut c = ChannelState::<u32, 3>::new();
660 let _ = c.try_send(1);
661 let _ = c.try_send(1);
662 let _ = c.try_send(1);
663 match c.try_send(2) {
664 Err(TrySendError::Full(2)) => assert!(true),
665 _ => assert!(false),
666 }
667 assert_eq!(capacity(&c), 0);
668 }
669
670 #[test]
671 fn sending_when_closed() {
672 let mut c = ChannelState::<u32, 3>::new();
673 c.closed = true;
674 match c.try_send(2) {
675 Err(TrySendError::Closed(2)) => assert!(true),
676 _ => assert!(false),
677 }
678 }
679
680 #[test]
681 fn receiving_once_with_one_send() {
682 let mut c = ChannelState::<u32, 3>::new();
683 assert!(c.try_send(1).is_ok());
684 assert_eq!(c.try_recv().unwrap(), 1);
685 assert_eq!(capacity(&c), 3);
686 }
687
688 #[test]
689 fn receiving_when_empty() {
690 let mut c = ChannelState::<u32, 3>::new();
691 match c.try_recv() {
692 Err(TryRecvError::Empty) => assert!(true),
693 _ => assert!(false),
694 }
695 assert_eq!(capacity(&c), 3);
696 }
697
698 #[test]
699 fn receiving_when_closed() {
700 let mut c = ChannelState::<u32, 3>::new();
701 c.closed = true;
702 match c.try_recv() {
703 Err(TryRecvError::Closed) => assert!(true),
704 _ => assert!(false),
705 }
706 }
707
708 #[test]
709 fn simple_send_and_receive() {
710 let mut c = Channel::<WithNoThreads, u32, 3>::new();
711 let (s, r) = split(&mut c);
712 assert!(s.clone().try_send(1).is_ok());
713 assert_eq!(r.try_recv().unwrap(), 1);
714 }
715
716 #[test]
717 fn should_close_without_sender() {
718 let mut c = Channel::<WithNoThreads, u32, 3>::new();
719 let (s, r) = split(&mut c);
720 drop(s);
721 match r.try_recv() {
722 Err(TryRecvError::Closed) => assert!(true),
723 _ => assert!(false),
724 }
725 }
726
727 #[test]
728 fn should_close_once_drained() {
729 let mut c = Channel::<WithNoThreads, u32, 3>::new();
730 let (s, r) = split(&mut c);
731 assert!(s.try_send(1).is_ok());
732 drop(s);
733 assert_eq!(r.try_recv().unwrap(), 1);
734 match r.try_recv() {
735 Err(TryRecvError::Closed) => assert!(true),
736 _ => assert!(false),
737 }
738 }
739
740 #[test]
741 fn should_reject_send_when_receiver_dropped() {
742 let mut c = Channel::<WithNoThreads, u32, 3>::new();
743 let (s, r) = split(&mut c);
744 drop(r);
745 match s.try_send(1) {
746 Err(TrySendError::Closed(1)) => assert!(true),
747 _ => assert!(false),
748 }
749 }
750
751 #[test]
752 fn should_reject_send_when_channel_closed() {
753 let mut c = Channel::<WithNoThreads, u32, 3>::new();
754 let (s, mut r) = split(&mut c);
755 assert!(s.try_send(1).is_ok());
756 r.close();
757 assert_eq!(r.try_recv().unwrap(), 1);
758 match r.try_recv() {
759 Err(TryRecvError::Closed) => assert!(true),
760 _ => assert!(false),
761 }
762 assert!(s.is_closed());
763 }
764
765 #[futures_test::test]
766 async fn receiver_closes_when_sender_dropped_async() {
767 let executor = ThreadPool::new().unwrap();
768
769 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
770 let c = CHANNEL.put(Channel::new());
771 let (s, mut r) = split(c);
772 assert!(executor
773 .spawn(async move {
774 drop(s);
775 })
776 .is_ok());
777 assert_eq!(r.recv().await, None);
778 }
779
780 #[futures_test::test]
781 async fn receiver_receives_given_try_send_async() {
782 let executor = ThreadPool::new().unwrap();
783
784 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
785 let c = CHANNEL.put(Channel::new());
786 let (s, mut r) = split(c);
787 assert!(executor
788 .spawn(async move {
789 assert!(s.try_send(1).is_ok());
790 })
791 .is_ok());
792 assert_eq!(r.recv().await, Some(1));
793 }
794
795 #[futures_test::test]
796 async fn sender_send_completes_if_capacity() {
797 let mut c = Channel::<WithCriticalSections, u32, 1>::new();
798 let (s, mut r) = split(&mut c);
799 assert!(s.send(1).await.is_ok());
800 assert_eq!(r.recv().await, Some(1));
801 }
802
803 #[futures_test::test]
804 async fn sender_send_completes_if_closed() {
805 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
806 let c = CHANNEL.put(Channel::new());
807 let (s, r) = split(c);
808 drop(r);
809 match s.send(1).await {
810 Err(SendError(1)) => assert!(true),
811 _ => assert!(false),
812 }
813 }
814
815 #[futures_test::test]
816 async fn senders_sends_wait_until_capacity() {
817 let executor = ThreadPool::new().unwrap();
818
819 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
820 let c = CHANNEL.put(Channel::new());
821 let (s0, mut r) = split(c);
822 assert!(s0.try_send(1).is_ok());
823 let s1 = s0.clone();
824 let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
825 let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await });
826 // Wish I could think of a means of determining that the async send is waiting instead.
827 // However, I've used the debugger to observe that the send does indeed wait.
828 assert!(Delay::new(Duration::from_millis(500)).await.is_ok());
829 assert_eq!(r.recv().await, Some(1));
830 assert!(executor
831 .spawn(async move { while let Some(_) = r.recv().await {} })
832 .is_ok());
833 assert!(send_task_1.unwrap().await.is_ok());
834 assert!(send_task_2.unwrap().await.is_ok());
835 }
836
837 #[futures_test::test]
838 async fn sender_close_completes_if_closing() {
839 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
840 let c = CHANNEL.put(Channel::new());
841 let (s, mut r) = split(c);
842 r.close();
843 s.closed().await;
844 }
845
846 #[futures_test::test]
847 async fn sender_close_completes_if_closed() {
848 static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
849 let c = CHANNEL.put(Channel::new());
850 let (s, r) = split(c);
851 drop(r);
852 s.closed().await;
853 }
854}
diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs
index e4b7764ce..0506ffe6f 100644
--- a/embassy/src/util/mutex.rs
+++ b/embassy/src/util/mutex.rs
@@ -1,6 +1,19 @@
1use core::cell::UnsafeCell; 1use core::cell::UnsafeCell;
2use critical_section::CriticalSection; 2use critical_section::CriticalSection;
3 3
4/// Any object implementing this trait guarantees exclusive access to the data contained
5/// within the mutex for the duration of the lock.
6/// Adapted from https://github.com/rust-embedded/mutex-trait.
7pub trait Mutex {
8 /// Data protected by the mutex.
9 type Data;
10
11 fn new(data: Self::Data) -> Self;
12
13 /// Creates a critical section and grants temporary access to the protected data.
14 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R;
15}
16
4/// A "mutex" based on critical sections 17/// A "mutex" based on critical sections
5/// 18///
6/// # Safety 19/// # Safety
@@ -33,6 +46,18 @@ impl<T> CriticalSectionMutex<T> {
33 } 46 }
34} 47}
35 48
49impl<T> Mutex for CriticalSectionMutex<T> {
50 type Data = T;
51
52 fn new(data: T) -> Self {
53 Self::new(data)
54 }
55
56 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
57 critical_section::with(|cs| f(self.borrow(cs)))
58 }
59}
60
36/// A "mutex" that only allows borrowing from thread mode. 61/// A "mutex" that only allows borrowing from thread mode.
37/// 62///
38/// # Safety 63/// # Safety
@@ -70,6 +95,18 @@ impl<T> ThreadModeMutex<T> {
70 } 95 }
71} 96}
72 97
98impl<T> Mutex for ThreadModeMutex<T> {
99 type Data = T;
100
101 fn new(data: T) -> Self {
102 Self::new(data)
103 }
104
105 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
106 f(self.borrow())
107 }
108}
109
73pub fn in_thread_mode() -> bool { 110pub fn in_thread_mode() -> bool {
74 #[cfg(feature = "std")] 111 #[cfg(feature = "std")]
75 return Some("main") == std::thread::current().name(); 112 return Some("main") == std::thread::current().name();
@@ -78,3 +115,32 @@ pub fn in_thread_mode() -> bool {
78 return cortex_m::peripheral::SCB::vect_active() 115 return cortex_m::peripheral::SCB::vect_active()
79 == cortex_m::peripheral::scb::VectActive::ThreadMode; 116 == cortex_m::peripheral::scb::VectActive::ThreadMode;
80} 117}
118
119/// A "mutex" that does nothing and cannot be shared between threads.
120pub struct NoopMutex<T> {
121 inner: T,
122}
123
124impl<T> NoopMutex<T> {
125 pub const fn new(value: T) -> Self {
126 NoopMutex { inner: value }
127 }
128}
129
130impl<T> NoopMutex<T> {
131 pub fn borrow(&self) -> &T {
132 &self.inner
133 }
134}
135
136impl<T> Mutex for NoopMutex<T> {
137 type Data = T;
138
139 fn new(data: T) -> Self {
140 Self::new(data)
141 }
142
143 fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
144 f(self.borrow())
145 }
146}
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
new file mode 100644
index 000000000..443955239
--- /dev/null
+++ b/examples/nrf/src/bin/mpsc.rs
@@ -0,0 +1,65 @@
1#![no_std]
2#![no_main]
3#![feature(min_type_alias_impl_trait)]
4#![feature(impl_trait_in_bindings)]
5#![feature(type_alias_impl_trait)]
6#![allow(incomplete_features)]
7
8#[path = "../example_common.rs"]
9mod example_common;
10
11use defmt::panic;
12use embassy::executor::Spawner;
13use embassy::time::{Duration, Timer};
14use embassy::util::mpsc::TryRecvError;
15use embassy::util::{mpsc, Forever};
16use embassy_nrf::gpio::{Level, Output, OutputDrive};
17use embassy_nrf::Peripherals;
18use embedded_hal::digital::v2::OutputPin;
19use mpsc::{Channel, Sender, WithNoThreads};
20
21enum LedState {
22 On,
23 Off,
24}
25
26static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new();
27
28#[embassy::task(pool_size = 1)]
29async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) {
30 loop {
31 let _ = sender.send(LedState::On).await;
32 Timer::after(Duration::from_secs(1)).await;
33 let _ = sender.send(LedState::Off).await;
34 Timer::after(Duration::from_secs(1)).await;
35 }
36}
37
38#[embassy::main]
39async fn main(spawner: Spawner, p: Peripherals) {
40
41 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
42
43 let channel = CHANNEL.put(Channel::new());
44 let (sender, mut receiver) = mpsc::split(channel);
45
46 spawner.spawn(my_task(sender)).unwrap();
47
48 // We could just loop on `receiver.recv()` for simplicity. The code below
49 // is optimized to drain the queue as fast as possible in the spirit of
50 // handling events as fast as possible. This optimization is benign when in
51 // thread mode, but can be useful when interrupts are sending messages
52 // with the channel having been created via with_critical_sections.
53 loop {
54 let maybe_message = match receiver.try_recv() {
55 m @ Ok(..) => m.ok(),
56 Err(TryRecvError::Empty) => receiver.recv().await,
57 Err(TryRecvError::Closed) => break,
58 };
59 match maybe_message {
60 Some(LedState::On) => led.set_high().unwrap(),
61 Some(LedState::Off) => led.set_low().unwrap(),
62 _ => (),
63 }
64 }
65}