aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-04-05 23:53:59 +0000
committerGitHub <[email protected]>2022-04-05 23:53:59 +0000
commitc1b382296434e762d16a36d658d2f308358e3f87 (patch)
tree6edd9fd7c2d69a5ad130dc13ae7c0bbed442e640
parentaee19185b7cf34466f7941784b55e639c925fae4 (diff)
parent27a1b0ea7316be4687e7173a73861d276974d502 (diff)
Merge #695
695: Simplify Channel. r=Dirbaio a=Dirbaio - Allow initializing in a static, without Forever. - Remove ability to close, since in embedded enviromnents channels usually live forever and don't get closed. - Remove MPSC restriction, it's MPMC now. Rename "mpsc" to "channel". - `Sender` and `Receiver` are still available if you want to enforce a piece of code only has send/receive access, but are optional: you can send/receive directly into the Channel if you want. Co-authored-by: Dario Nieuwenhuis <[email protected]>
-rw-r--r--embassy/src/channel/channel.rs430
-rw-r--r--embassy/src/channel/mod.rs2
-rw-r--r--embassy/src/channel/mpsc.rs822
-rw-r--r--embassy/src/channel/signal.rs2
-rw-r--r--examples/nrf/src/bin/channel.rs45
-rw-r--r--examples/nrf/src/bin/channel_sender_receiver.rs52
-rw-r--r--examples/nrf/src/bin/mpsc.rs60
-rw-r--r--examples/nrf/src/bin/uart_split.rs23
-rw-r--r--examples/stm32f3/src/bin/button_events.rs59
-rw-r--r--examples/stm32h7/src/bin/usart_split.rs26
10 files changed, 571 insertions, 950 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs
new file mode 100644
index 000000000..9084cd57b
--- /dev/null
+++ b/embassy/src/channel/channel.rs
@@ -0,0 +1,430 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! This queue takes a Mutex type so that various
7//! targets can be attained. For example, a ThreadModeMutex can be used
8//! for single-core Cortex-M targets where messages are only passed
9//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
10//! can also be used for single-core targets where messages are to be
11//! passed from exception mode e.g. out of an interrupt handler.
12//!
13//! This module provides a bounded channel that has a limit on the number of
14//! messages that it can store, and if this limit is reached, trying to send
15//! another message will result in an error being returned.
16//!
17
18use core::cell::RefCell;
19use core::pin::Pin;
20use core::task::Context;
21use core::task::Poll;
22
23use futures::Future;
24use heapless::Deque;
25
26use crate::blocking_mutex::raw::RawMutex;
27use crate::blocking_mutex::Mutex;
28use crate::waitqueue::WakerRegistration;
29
30/// Send-only access to a [`Channel`].
31#[derive(Copy, Clone)]
32pub struct Sender<'ch, M, T, const N: usize>
33where
34 M: RawMutex,
35{
36 channel: &'ch Channel<M, T, N>,
37}
38
39impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
40where
41 M: RawMutex,
42{
43 /// Sends a value.
44 ///
45 /// See [`Channel::send()`]
46 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
47 self.channel.send(message)
48 }
49
50 /// Attempt to immediately send a message.
51 ///
52 /// See [`Channel::send()`]
53 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
54 self.channel.try_send(message)
55 }
56}
57
58/// Receive-only access to a [`Channel`].
59#[derive(Copy, Clone)]
60pub struct Receiver<'ch, M, T, const N: usize>
61where
62 M: RawMutex,
63{
64 channel: &'ch Channel<M, T, N>,
65}
66
67impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
68where
69 M: RawMutex,
70{
71 /// Receive the next value.
72 ///
73 /// See [`Channel::recv()`].
74 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
75 self.channel.recv()
76 }
77
78 /// Attempt to immediately receive the next value.
79 ///
80 /// See [`Channel::try_recv()`]
81 pub fn try_recv(&self) -> Result<T, TryRecvError> {
82 self.channel.try_recv()
83 }
84}
85
86pub struct RecvFuture<'ch, M, T, const N: usize>
87where
88 M: RawMutex,
89{
90 channel: &'ch Channel<M, T, N>,
91}
92
93impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
94where
95 M: RawMutex,
96{
97 type Output = T;
98
99 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
100 self.channel
101 .lock(|c| match c.try_recv_with_context(Some(cx)) {
102 Ok(v) => Poll::Ready(v),
103 Err(TryRecvError::Empty) => Poll::Pending,
104 })
105 }
106}
107
108pub struct SendFuture<'ch, M, T, const N: usize>
109where
110 M: RawMutex,
111{
112 channel: &'ch Channel<M, T, N>,
113 message: Option<T>,
114}
115
116impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
117where
118 M: RawMutex,
119{
120 type Output = ();
121
122 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123 match self.message.take() {
124 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
125 Ok(..) => Poll::Ready(()),
126 Err(TrySendError::Full(m)) => {
127 self.message = Some(m);
128 Poll::Pending
129 }
130 },
131 None => panic!("Message cannot be None"),
132 }
133 }
134}
135
136impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
137
138/// Error returned by [`try_recv`](Channel::try_recv).
139#[derive(PartialEq, Eq, Clone, Copy, Debug)]
140#[cfg_attr(feature = "defmt", derive(defmt::Format))]
141pub enum TryRecvError {
142 /// A message could not be received because the channel is empty.
143 Empty,
144}
145
146/// Error returned by [`try_send`](Channel::try_send).
147#[derive(PartialEq, Eq, Clone, Copy, Debug)]
148#[cfg_attr(feature = "defmt", derive(defmt::Format))]
149pub enum TrySendError<T> {
150 /// The data could not be sent on the channel because the channel is
151 /// currently full and sending would require blocking.
152 Full(T),
153}
154
155struct ChannelState<T, const N: usize> {
156 queue: Deque<T, N>,
157 receiver_waker: WakerRegistration,
158 senders_waker: WakerRegistration,
159}
160
161impl<T, const N: usize> ChannelState<T, N> {
162 const fn new() -> Self {
163 ChannelState {
164 queue: Deque::new(),
165 receiver_waker: WakerRegistration::new(),
166 senders_waker: WakerRegistration::new(),
167 }
168 }
169
170 fn try_recv(&mut self) -> Result<T, TryRecvError> {
171 self.try_recv_with_context(None)
172 }
173
174 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
175 if self.queue.is_full() {
176 self.senders_waker.wake();
177 }
178
179 if let Some(message) = self.queue.pop_front() {
180 Ok(message)
181 } else {
182 if let Some(cx) = cx {
183 self.receiver_waker.register(cx.waker());
184 }
185 Err(TryRecvError::Empty)
186 }
187 }
188
189 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
190 self.try_send_with_context(message, None)
191 }
192
193 fn try_send_with_context(
194 &mut self,
195 message: T,
196 cx: Option<&mut Context<'_>>,
197 ) -> Result<(), TrySendError<T>> {
198 match self.queue.push_back(message) {
199 Ok(()) => {
200 self.receiver_waker.wake();
201 Ok(())
202 }
203 Err(message) => {
204 if let Some(cx) = cx {
205 self.senders_waker.register(cx.waker());
206 }
207 Err(TrySendError::Full(message))
208 }
209 }
210 }
211}
212
213/// A bounded channel for communicating between asynchronous tasks
214/// with backpressure.
215///
216/// The channel will buffer up to the provided number of messages. Once the
217/// buffer is full, attempts to `send` new messages will wait until a message is
218/// received from the channel.
219///
220/// All data sent will become available in the same order as it was sent.
221pub struct Channel<M, T, const N: usize>
222where
223 M: RawMutex,
224{
225 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
226}
227
228impl<M, T, const N: usize> Channel<M, T, N>
229where
230 M: RawMutex,
231{
232 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
233 ///
234 /// ```
235 /// use embassy::channel::channel::Channel;
236 /// use embassy::blocking_mutex::raw::NoopRawMutex;
237 ///
238 /// // Declare a bounded channel of 3 u32s.
239 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
240 /// ```
241 #[cfg(feature = "nightly")]
242 pub const fn new() -> Self {
243 Self {
244 inner: Mutex::new(RefCell::new(ChannelState::new())),
245 }
246 }
247
248 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
249 ///
250 /// ```
251 /// use embassy::channel::channel::Channel;
252 /// use embassy::blocking_mutex::raw::NoopRawMutex;
253 ///
254 /// // Declare a bounded channel of 3 u32s.
255 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
256 /// ```
257 #[cfg(not(feature = "nightly"))]
258 pub fn new() -> Self {
259 Self {
260 inner: Mutex::new(RefCell::new(ChannelState::new())),
261 }
262 }
263
264 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
265 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
266 }
267
268 /// Get a sender for this channel.
269 pub fn sender(&self) -> Sender<'_, M, T, N> {
270 Sender { channel: self }
271 }
272
273 /// Get a receiver for this channel.
274 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
275 Receiver { channel: self }
276 }
277
278 /// Send a value, waiting until there is capacity.
279 ///
280 /// Sending completes when the value has been pushed to the channel's queue.
281 /// This doesn't mean the value has been received yet.
282 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
283 SendFuture {
284 channel: self,
285 message: Some(message),
286 }
287 }
288
289 /// Attempt to immediately send a message.
290 ///
291 /// This method differs from [`send`] by returning immediately if the channel's
292 /// buffer is full, instead of waiting.
293 ///
294 /// # Errors
295 ///
296 /// If the channel capacity has been reached, i.e., the channel has `n`
297 /// buffered values where `n` is the argument passed to [`Channel`], then an
298 /// error is returned.
299 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
300 self.lock(|c| c.try_send(message))
301 }
302
303 /// Receive the next value.
304 ///
305 /// If there are no messages in the channel's buffer, this method will
306 /// wait until a message is sent.
307 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
308 RecvFuture { channel: self }
309 }
310
311 /// Attempt to immediately receive a message.
312 ///
313 /// This method will either receive a message from the channel immediately or return an error
314 /// if the channel is empty.
315 pub fn try_recv(&self) -> Result<T, TryRecvError> {
316 self.lock(|c| c.try_recv())
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use core::time::Duration;
323
324 use futures::task::SpawnExt;
325 use futures_executor::ThreadPool;
326 use futures_timer::Delay;
327
328 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
329 use crate::util::Forever;
330
331 use super::*;
332
333 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
334 c.queue.capacity() - c.queue.len()
335 }
336
337 #[test]
338 fn sending_once() {
339 let mut c = ChannelState::<u32, 3>::new();
340 assert!(c.try_send(1).is_ok());
341 assert_eq!(capacity(&c), 2);
342 }
343
344 #[test]
345 fn sending_when_full() {
346 let mut c = ChannelState::<u32, 3>::new();
347 let _ = c.try_send(1);
348 let _ = c.try_send(1);
349 let _ = c.try_send(1);
350 match c.try_send(2) {
351 Err(TrySendError::Full(2)) => assert!(true),
352 _ => assert!(false),
353 }
354 assert_eq!(capacity(&c), 0);
355 }
356
357 #[test]
358 fn receiving_once_with_one_send() {
359 let mut c = ChannelState::<u32, 3>::new();
360 assert!(c.try_send(1).is_ok());
361 assert_eq!(c.try_recv().unwrap(), 1);
362 assert_eq!(capacity(&c), 3);
363 }
364
365 #[test]
366 fn receiving_when_empty() {
367 let mut c = ChannelState::<u32, 3>::new();
368 match c.try_recv() {
369 Err(TryRecvError::Empty) => assert!(true),
370 _ => assert!(false),
371 }
372 assert_eq!(capacity(&c), 3);
373 }
374
375 #[test]
376 fn simple_send_and_receive() {
377 let c = Channel::<NoopRawMutex, u32, 3>::new();
378 assert!(c.try_send(1).is_ok());
379 assert_eq!(c.try_recv().unwrap(), 1);
380 }
381
382 #[futures_test::test]
383 async fn receiver_receives_given_try_send_async() {
384 let executor = ThreadPool::new().unwrap();
385
386 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
387 let c = &*CHANNEL.put(Channel::new());
388 let c2 = c;
389 assert!(executor
390 .spawn(async move {
391 assert!(c2.try_send(1).is_ok());
392 })
393 .is_ok());
394 assert_eq!(c.recv().await, 1);
395 }
396
397 #[futures_test::test]
398 async fn sender_send_completes_if_capacity() {
399 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
400 c.send(1).await;
401 assert_eq!(c.recv().await, 1);
402 }
403
404 #[futures_test::test]
405 async fn senders_sends_wait_until_capacity() {
406 let executor = ThreadPool::new().unwrap();
407
408 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
409 let c = &*CHANNEL.put(Channel::new());
410 assert!(c.try_send(1).is_ok());
411
412 let c2 = c;
413 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
414 let c2 = c;
415 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
416 // Wish I could think of a means of determining that the async send is waiting instead.
417 // However, I've used the debugger to observe that the send does indeed wait.
418 Delay::new(Duration::from_millis(500)).await;
419 assert_eq!(c.recv().await, 1);
420 assert!(executor
421 .spawn(async move {
422 loop {
423 c.recv().await;
424 }
425 })
426 .is_ok());
427 send_task_1.unwrap().await;
428 send_task_2.unwrap().await;
429 }
430}
diff --git a/embassy/src/channel/mod.rs b/embassy/src/channel/mod.rs
index 9e8c67ee9..e51a442df 100644
--- a/embassy/src/channel/mod.rs
+++ b/embassy/src/channel/mod.rs
@@ -1,4 +1,4 @@
1//! Async channels 1//! Async channels
2 2
3pub mod mpsc; 3pub mod channel;
4pub mod signal; 4pub mod signal;
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs
deleted file mode 100644
index 32787d810..000000000
--- a/embassy/src/channel/mpsc.rs
+++ /dev/null
@@ -1,822 +0,0 @@
1//! A multi-producer, single-consumer queue for sending values between
2//! asynchronous tasks. This queue takes a Mutex type so that various
3//! targets can be attained. For example, a ThreadModeMutex can be used
4//! for single-core Cortex-M targets where messages are only passed
5//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
6//! can also be used for single-core targets where messages are to be
7//! passed from exception mode e.g. out of an interrupt handler.
8//!
9//! This module provides a bounded channel that has a limit on the number of
10//! messages that it can store, and if this limit is reached, trying to send
11//! another message will result in an error being returned.
12//!
13//! Similar to the `mpsc` channels provided by `std`, the channel constructor
14//! functions provide separate send and receive handles, [`Sender`] and
15//! [`Receiver`]. If there is no message to read, the current task will be
16//! notified when a new value is sent. [`Sender`] allows sending values into
17//! the channel. If the bounded channel is at capacity, the send is rejected.
18//!
19//! # Disconnection
20//!
21//! When all [`Sender`] handles have been dropped, it is no longer
22//! possible to send values into the channel. This is considered the termination
23//! event of the stream.
24//!
25//! If the [`Receiver`] handle is dropped, then messages can no longer
26//! be read out of the channel. In this case, all further attempts to send will
27//! result in an error.
28//!
29//! # Clean Shutdown
30//!
31//! When the [`Receiver`] is dropped, it is possible for unprocessed messages to
32//! remain in the channel. Instead, it is usually desirable to perform a "clean"
33//! shutdown. To do this, the receiver first calls `close`, which will prevent
34//! any further messages to be sent into the channel. Then, the receiver
35//! consumes the channel to completion, at which point the receiver can be
36//! dropped.
37//!
38//! This channel and its associated types were derived from <https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html>
39
40use core::cell::RefCell;
41use core::fmt;
42use core::pin::Pin;
43use core::task::Context;
44use core::task::Poll;
45use core::task::Waker;
46
47use futures::Future;
48use heapless::Deque;
49
50use crate::blocking_mutex::raw::RawMutex;
51use crate::blocking_mutex::Mutex;
52use crate::waitqueue::WakerRegistration;
53
54/// Send values to the associated `Receiver`.
55///
56/// Instances are created by the [`split`](split) function.
57pub struct Sender<'ch, M, T, const N: usize>
58where
59 M: RawMutex,
60{
61 channel: &'ch Channel<M, T, N>,
62}
63
64/// Receive values from the associated `Sender`.
65///
66/// Instances are created by the [`split`](split) function.
67pub struct Receiver<'ch, M, T, const N: usize>
68where
69 M: RawMutex,
70{
71 channel: &'ch Channel<M, T, N>,
72}
73
74/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
75///
76/// All data sent on `Sender` will become available on `Receiver` in the same
77/// order as it was sent.
78///
79/// The `Sender` can be cloned to `send` to the same channel from multiple code
80/// locations. Only one `Receiver` is valid.
81///
82/// If the `Receiver` is disconnected while trying to `send`, the `send` method
83/// will return a `SendError`. Similarly, if `Sender` is disconnected while
84/// trying to `recv`, the `recv` method will return a `RecvError`.
85///
86/// Note that when splitting the channel, the sender and receiver cannot outlive
87/// their channel. The following will therefore fail compilation:
88////
89/// ```compile_fail
90/// use embassy::channel::mpsc;
91/// use embassy::channel::mpsc::{Channel, WithThreadModeOnly};
92///
93/// let (sender, receiver) = {
94/// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only();
95/// mpsc::split(&mut channel)
96/// };
97/// ```
98pub fn split<M, T, const N: usize>(
99 channel: &mut Channel<M, T, N>,
100) -> (Sender<M, T, N>, Receiver<M, T, N>)
101where
102 M: RawMutex,
103{
104 let sender = Sender { channel };
105 let receiver = Receiver { channel };
106 channel.lock(|c| {
107 c.register_receiver();
108 c.register_sender();
109 });
110 (sender, receiver)
111}
112
113impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
114where
115 M: RawMutex,
116{
117 /// Receives the next value for this receiver.
118 ///
119 /// This method returns `None` if the channel has been closed and there are
120 /// no remaining messages in the channel's buffer. This indicates that no
121 /// further values can ever be received from this `Receiver`. The channel is
122 /// closed when all senders have been dropped, or when [`close`] is called.
123 ///
124 /// If there are no messages in the channel's buffer, but the channel has
125 /// not yet been closed, this method will sleep until a message is sent or
126 /// the channel is closed.
127 ///
128 /// Note that if [`close`] is called, but there are still outstanding
129 /// messages from before it was closed, the channel is not considered
130 /// closed by `recv` until they are all consumed.
131 ///
132 /// [`close`]: Self::close
133 pub fn recv(&mut self) -> RecvFuture<'_, M, T, N> {
134 RecvFuture {
135 channel: self.channel,
136 }
137 }
138
139 /// Attempts to immediately receive a message on this `Receiver`
140 ///
141 /// This method will either receive a message from the channel immediately or return an error
142 /// if the channel is empty.
143 pub fn try_recv(&self) -> Result<T, TryRecvError> {
144 self.channel.lock(|c| c.try_recv())
145 }
146
147 /// Closes the receiving half of a channel without dropping it.
148 ///
149 /// This prevents any further messages from being sent on the channel while
150 /// still enabling the receiver to drain messages that are buffered.
151 ///
152 /// To guarantee that no messages are dropped, after calling `close()`,
153 /// `recv()` must be called until `None` is returned. If there are
154 /// outstanding messages, the `recv` method will not return `None`
155 /// until those are released.
156 ///
157 pub fn close(&mut self) {
158 self.channel.lock(|c| c.close())
159 }
160}
161
162impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
163where
164 M: RawMutex,
165{
166 fn drop(&mut self) {
167 self.channel.lock(|c| c.deregister_receiver())
168 }
169}
170
171pub struct RecvFuture<'ch, M, T, const N: usize>
172where
173 M: RawMutex,
174{
175 channel: &'ch Channel<M, T, N>,
176}
177
178impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
179where
180 M: RawMutex,
181{
182 type Output = Option<T>;
183
184 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
185 self.channel
186 .lock(|c| match c.try_recv_with_context(Some(cx)) {
187 Ok(v) => Poll::Ready(Some(v)),
188 Err(TryRecvError::Closed) => Poll::Ready(None),
189 Err(TryRecvError::Empty) => Poll::Pending,
190 })
191 }
192}
193
194impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
195where
196 M: RawMutex,
197{
198 /// Sends a value, waiting until there is capacity.
199 ///
200 /// A successful send occurs when it is determined that the other end of the
201 /// channel has not hung up already. An unsuccessful send would be one where
202 /// the corresponding receiver has already been closed. Note that a return
203 /// value of `Err` means that the data will never be received, but a return
204 /// value of `Ok` does not mean that the data will be received. It is
205 /// possible for the corresponding receiver to hang up immediately after
206 /// this function returns `Ok`.
207 ///
208 /// # Errors
209 ///
210 /// If the receive half of the channel is closed, either due to [`close`]
211 /// being called or the [`Receiver`] handle dropping, the function returns
212 /// an error. The error includes the value passed to `send`.
213 ///
214 /// [`close`]: Receiver::close
215 /// [`Receiver`]: Receiver
216 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
217 SendFuture {
218 channel: self.channel,
219 message: Some(message),
220 }
221 }
222
223 /// Attempts to immediately send a message on this `Sender`
224 ///
225 /// This method differs from [`send`] by returning immediately if the channel's
226 /// buffer is full or no receiver is waiting to acquire some data. Compared
227 /// with [`send`], this function has two failure cases instead of one (one for
228 /// disconnection, one for a full buffer).
229 ///
230 /// # Errors
231 ///
232 /// If the channel capacity has been reached, i.e., the channel has `n`
233 /// buffered values where `n` is the argument passed to [`channel`], then an
234 /// error is returned.
235 ///
236 /// If the receive half of the channel is closed, either due to [`close`]
237 /// being called or the [`Receiver`] handle dropping, the function returns
238 /// an error. The error includes the value passed to `send`.
239 ///
240 /// [`send`]: Sender::send
241 /// [`channel`]: channel
242 /// [`close`]: Receiver::close
243 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
244 self.channel.lock(|c| c.try_send(message))
245 }
246
247 /// Completes when the receiver has dropped.
248 ///
249 /// This allows the producers to get notified when interest in the produced
250 /// values is canceled and immediately stop doing work.
251 pub async fn closed(&self) {
252 CloseFuture {
253 channel: self.channel,
254 }
255 .await
256 }
257
258 /// Checks if the channel has been closed. This happens when the
259 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
260 /// called.
261 ///
262 /// [`Receiver`]: Receiver
263 /// [`Receiver::close`]: Receiver::close
264 pub fn is_closed(&self) -> bool {
265 self.channel.lock(|c| c.is_closed())
266 }
267}
268
269pub struct SendFuture<'ch, M, T, const N: usize>
270where
271 M: RawMutex,
272{
273 channel: &'ch Channel<M, T, N>,
274 message: Option<T>,
275}
276
277impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
278where
279 M: RawMutex,
280{
281 type Output = Result<(), SendError<T>>;
282
283 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
284 match self.message.take() {
285 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
286 Ok(..) => Poll::Ready(Ok(())),
287 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
288 Err(TrySendError::Full(m)) => {
289 self.message = Some(m);
290 Poll::Pending
291 }
292 },
293 None => panic!("Message cannot be None"),
294 }
295 }
296}
297
298impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
299
300struct CloseFuture<'ch, M, T, const N: usize>
301where
302 M: RawMutex,
303{
304 channel: &'ch Channel<M, T, N>,
305}
306
307impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
308where
309 M: RawMutex,
310{
311 type Output = ();
312
313 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
314 if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) {
315 Poll::Ready(())
316 } else {
317 Poll::Pending
318 }
319 }
320}
321
322impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
323where
324 M: RawMutex,
325{
326 fn drop(&mut self) {
327 self.channel.lock(|c| c.deregister_sender())
328 }
329}
330
331impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
332where
333 M: RawMutex,
334{
335 fn clone(&self) -> Self {
336 self.channel.lock(|c| c.register_sender());
337 Sender {
338 channel: self.channel,
339 }
340 }
341}
342
343/// An error returned from the [`try_recv`] method.
344///
345/// [`try_recv`]: Receiver::try_recv
346#[derive(PartialEq, Eq, Clone, Copy, Debug)]
347#[cfg_attr(feature = "defmt", derive(defmt::Format))]
348pub enum TryRecvError {
349 /// A message could not be received because the channel is empty.
350 Empty,
351
352 /// The message could not be received because the channel is empty and closed.
353 Closed,
354}
355
356/// Error returned by the `Sender`.
357#[derive(Debug)]
358pub struct SendError<T>(pub T);
359
360impl<T> fmt::Display for SendError<T> {
361 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
362 write!(fmt, "channel closed")
363 }
364}
365
366#[cfg(feature = "defmt")]
367impl<T> defmt::Format for SendError<T> {
368 fn format(&self, fmt: defmt::Formatter<'_>) {
369 defmt::write!(fmt, "channel closed")
370 }
371}
372
373/// This enumeration is the list of the possible error outcomes for the
374/// [try_send](Sender::try_send) method.
375#[derive(Debug)]
376pub enum TrySendError<T> {
377 /// The data could not be sent on the channel because the channel is
378 /// currently full and sending would require blocking.
379 Full(T),
380
381 /// The receive half of the channel was explicitly closed or has been
382 /// dropped.
383 Closed(T),
384}
385
386impl<T> fmt::Display for TrySendError<T> {
387 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
388 write!(
389 fmt,
390 "{}",
391 match self {
392 TrySendError::Full(..) => "no available capacity",
393 TrySendError::Closed(..) => "channel closed",
394 }
395 )
396 }
397}
398
399#[cfg(feature = "defmt")]
400impl<T> defmt::Format for TrySendError<T> {
401 fn format(&self, fmt: defmt::Formatter<'_>) {
402 match self {
403 TrySendError::Full(..) => defmt::write!(fmt, "no available capacity"),
404 TrySendError::Closed(..) => defmt::write!(fmt, "channel closed"),
405 }
406 }
407}
408
409struct ChannelState<T, const N: usize> {
410 queue: Deque<T, N>,
411 closed: bool,
412 receiver_registered: bool,
413 senders_registered: u32,
414 receiver_waker: WakerRegistration,
415 senders_waker: WakerRegistration,
416}
417
418impl<T, const N: usize> ChannelState<T, N> {
419 const fn new() -> Self {
420 ChannelState {
421 queue: Deque::new(),
422 closed: false,
423 receiver_registered: false,
424 senders_registered: 0,
425 receiver_waker: WakerRegistration::new(),
426 senders_waker: WakerRegistration::new(),
427 }
428 }
429
430 fn try_recv(&mut self) -> Result<T, TryRecvError> {
431 self.try_recv_with_context(None)
432 }
433
434 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
435 if self.queue.is_full() {
436 self.senders_waker.wake();
437 }
438
439 if let Some(message) = self.queue.pop_front() {
440 Ok(message)
441 } else if !self.closed {
442 if let Some(cx) = cx {
443 self.set_receiver_waker(cx.waker());
444 }
445 Err(TryRecvError::Empty)
446 } else {
447 Err(TryRecvError::Closed)
448 }
449 }
450
451 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
452 self.try_send_with_context(message, None)
453 }
454
455 fn try_send_with_context(
456 &mut self,
457 message: T,
458 cx: Option<&mut Context<'_>>,
459 ) -> Result<(), TrySendError<T>> {
460 if self.closed {
461 return Err(TrySendError::Closed(message));
462 }
463
464 match self.queue.push_back(message) {
465 Ok(()) => {
466 self.receiver_waker.wake();
467
468 Ok(())
469 }
470 Err(message) => {
471 cx.into_iter()
472 .for_each(|cx| self.set_senders_waker(cx.waker()));
473 Err(TrySendError::Full(message))
474 }
475 }
476 }
477
478 fn close(&mut self) {
479 self.receiver_waker.wake();
480 self.closed = true;
481 }
482
483 fn is_closed(&mut self) -> bool {
484 self.is_closed_with_context(None)
485 }
486
487 fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool {
488 if self.closed {
489 cx.into_iter()
490 .for_each(|cx| self.set_senders_waker(cx.waker()));
491 true
492 } else {
493 false
494 }
495 }
496
497 fn register_receiver(&mut self) {
498 assert!(!self.receiver_registered);
499 self.receiver_registered = true;
500 }
501
502 fn deregister_receiver(&mut self) {
503 if self.receiver_registered {
504 self.closed = true;
505 self.senders_waker.wake();
506 }
507 self.receiver_registered = false;
508 }
509
510 fn register_sender(&mut self) {
511 self.senders_registered += 1;
512 }
513
514 fn deregister_sender(&mut self) {
515 assert!(self.senders_registered > 0);
516 self.senders_registered -= 1;
517 if self.senders_registered == 0 {
518 self.receiver_waker.wake();
519 self.closed = true;
520 }
521 }
522
523 fn set_receiver_waker(&mut self, receiver_waker: &Waker) {
524 self.receiver_waker.register(receiver_waker);
525 }
526
527 fn set_senders_waker(&mut self, senders_waker: &Waker) {
528 // Dispose of any existing sender causing them to be polled again.
529 // This could cause a spin given multiple concurrent senders, however given that
530 // most sends only block waiting for the receiver to become active, this should
531 // be a short-lived activity. The upside is a greatly simplified implementation
532 // that avoids the need for intrusive linked-lists and unsafe operations on pinned
533 // pointers.
534 self.senders_waker.wake();
535 self.senders_waker.register(senders_waker);
536 }
537}
538
539/// A a bounded mpsc channel for communicating between asynchronous tasks
540/// with backpressure.
541///
542/// The channel will buffer up to the provided number of messages. Once the
543/// buffer is full, attempts to `send` new messages will wait until a message is
544/// received from the channel.
545///
546/// All data sent will become available in the same order as it was sent.
547pub struct Channel<M, T, const N: usize>
548where
549 M: RawMutex,
550{
551 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
552}
553
554impl<M, T, const N: usize> Channel<M, T, N>
555where
556 M: RawMutex,
557{
558 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
559 ///
560 /// ```
561 /// use embassy::channel::mpsc;
562 /// use embassy::blocking_mutex::raw::NoopRawMutex;
563 /// use embassy::channel::mpsc::Channel;
564 ///
565 /// // Declare a bounded channel of 3 u32s.
566 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
567 /// // once we have a channel, obtain its sender and receiver
568 /// let (sender, receiver) = mpsc::split(&mut channel);
569 /// ```
570 #[cfg(feature = "nightly")]
571 pub const fn new() -> Self {
572 Self {
573 inner: Mutex::new(RefCell::new(ChannelState::new())),
574 }
575 }
576
577 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
578 ///
579 /// ```
580 /// use embassy::channel::mpsc;
581 /// use embassy::blocking_mutex::raw::NoopRawMutex;
582 /// use embassy::channel::mpsc::Channel;
583 ///
584 /// // Declare a bounded channel of 3 u32s.
585 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
586 /// // once we have a channel, obtain its sender and receiver
587 /// let (sender, receiver) = mpsc::split(&mut channel);
588 /// ```
589 #[cfg(not(feature = "nightly"))]
590 pub fn new() -> Self {
591 Self {
592 inner: Mutex::new(RefCell::new(ChannelState::new())),
593 }
594 }
595
596 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
597 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use core::time::Duration;
604
605 use futures::task::SpawnExt;
606 use futures_executor::ThreadPool;
607 use futures_timer::Delay;
608
609 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
610 use crate::util::Forever;
611
612 use super::*;
613
614 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
615 c.queue.capacity() - c.queue.len()
616 }
617
618 #[test]
619 fn sending_once() {
620 let mut c = ChannelState::<u32, 3>::new();
621 assert!(c.try_send(1).is_ok());
622 assert_eq!(capacity(&c), 2);
623 }
624
625 #[test]
626 fn sending_when_full() {
627 let mut c = ChannelState::<u32, 3>::new();
628 let _ = c.try_send(1);
629 let _ = c.try_send(1);
630 let _ = c.try_send(1);
631 match c.try_send(2) {
632 Err(TrySendError::Full(2)) => assert!(true),
633 _ => assert!(false),
634 }
635 assert_eq!(capacity(&c), 0);
636 }
637
638 #[test]
639 fn sending_when_closed() {
640 let mut c = ChannelState::<u32, 3>::new();
641 c.closed = true;
642 match c.try_send(2) {
643 Err(TrySendError::Closed(2)) => assert!(true),
644 _ => assert!(false),
645 }
646 }
647
648 #[test]
649 fn receiving_once_with_one_send() {
650 let mut c = ChannelState::<u32, 3>::new();
651 assert!(c.try_send(1).is_ok());
652 assert_eq!(c.try_recv().unwrap(), 1);
653 assert_eq!(capacity(&c), 3);
654 }
655
656 #[test]
657 fn receiving_when_empty() {
658 let mut c = ChannelState::<u32, 3>::new();
659 match c.try_recv() {
660 Err(TryRecvError::Empty) => assert!(true),
661 _ => assert!(false),
662 }
663 assert_eq!(capacity(&c), 3);
664 }
665
666 #[test]
667 fn receiving_when_closed() {
668 let mut c = ChannelState::<u32, 3>::new();
669 c.closed = true;
670 match c.try_recv() {
671 Err(TryRecvError::Closed) => assert!(true),
672 _ => assert!(false),
673 }
674 }
675
676 #[test]
677 fn simple_send_and_receive() {
678 let mut c = Channel::<NoopRawMutex, u32, 3>::new();
679 let (s, r) = split(&mut c);
680 assert!(s.clone().try_send(1).is_ok());
681 assert_eq!(r.try_recv().unwrap(), 1);
682 }
683
684 #[test]
685 fn should_close_without_sender() {
686 let mut c = Channel::<NoopRawMutex, u32, 3>::new();
687 let (s, r) = split(&mut c);
688 drop(s);
689 match r.try_recv() {
690 Err(TryRecvError::Closed) => assert!(true),
691 _ => assert!(false),
692 }
693 }
694
695 #[test]
696 fn should_close_once_drained() {
697 let mut c = Channel::<NoopRawMutex, u32, 3>::new();
698 let (s, r) = split(&mut c);
699 assert!(s.try_send(1).is_ok());
700 drop(s);
701 assert_eq!(r.try_recv().unwrap(), 1);
702 match r.try_recv() {
703 Err(TryRecvError::Closed) => assert!(true),
704 _ => assert!(false),
705 }
706 }
707
708 #[test]
709 fn should_reject_send_when_receiver_dropped() {
710 let mut c = Channel::<NoopRawMutex, u32, 3>::new();
711 let (s, r) = split(&mut c);
712 drop(r);
713 match s.try_send(1) {
714 Err(TrySendError::Closed(1)) => assert!(true),
715 _ => assert!(false),
716 }
717 }
718
719 #[test]
720 fn should_reject_send_when_channel_closed() {
721 let mut c = Channel::<NoopRawMutex, u32, 3>::new();
722 let (s, mut r) = split(&mut c);
723 assert!(s.try_send(1).is_ok());
724 r.close();
725 assert_eq!(r.try_recv().unwrap(), 1);
726 match r.try_recv() {
727 Err(TryRecvError::Closed) => assert!(true),
728 _ => assert!(false),
729 }
730 assert!(s.is_closed());
731 }
732
733 #[futures_test::test]
734 async fn receiver_closes_when_sender_dropped_async() {
735 let executor = ThreadPool::new().unwrap();
736
737 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
738 let c = CHANNEL.put(Channel::new());
739 let (s, mut r) = split(c);
740 assert!(executor
741 .spawn(async move {
742 drop(s);
743 })
744 .is_ok());
745 assert_eq!(r.recv().await, None);
746 }
747
748 #[futures_test::test]
749 async fn receiver_receives_given_try_send_async() {
750 let executor = ThreadPool::new().unwrap();
751
752 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
753 let c = CHANNEL.put(Channel::new());
754 let (s, mut r) = split(c);
755 assert!(executor
756 .spawn(async move {
757 assert!(s.try_send(1).is_ok());
758 })
759 .is_ok());
760 assert_eq!(r.recv().await, Some(1));
761 }
762
763 #[futures_test::test]
764 async fn sender_send_completes_if_capacity() {
765 let mut c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
766 let (s, mut r) = split(&mut c);
767 assert!(s.send(1).await.is_ok());
768 assert_eq!(r.recv().await, Some(1));
769 }
770
771 #[futures_test::test]
772 async fn sender_send_completes_if_closed() {
773 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
774 let c = CHANNEL.put(Channel::new());
775 let (s, r) = split(c);
776 drop(r);
777 match s.send(1).await {
778 Err(SendError(1)) => assert!(true),
779 _ => assert!(false),
780 }
781 }
782
783 #[futures_test::test]
784 async fn senders_sends_wait_until_capacity() {
785 let executor = ThreadPool::new().unwrap();
786
787 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
788 let c = CHANNEL.put(Channel::new());
789 let (s0, mut r) = split(c);
790 assert!(s0.try_send(1).is_ok());
791 let s1 = s0.clone();
792 let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await });
793 let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await });
794 // Wish I could think of a means of determining that the async send is waiting instead.
795 // However, I've used the debugger to observe that the send does indeed wait.
796 Delay::new(Duration::from_millis(500)).await;
797 assert_eq!(r.recv().await, Some(1));
798 assert!(executor
799 .spawn(async move { while let Some(_) = r.recv().await {} })
800 .is_ok());
801 assert!(send_task_1.unwrap().await.is_ok());
802 assert!(send_task_2.unwrap().await.is_ok());
803 }
804
805 #[futures_test::test]
806 async fn sender_close_completes_if_closing() {
807 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
808 let c = CHANNEL.put(Channel::new());
809 let (s, mut r) = split(c);
810 r.close();
811 s.closed().await;
812 }
813
814 #[futures_test::test]
815 async fn sender_close_completes_if_closed() {
816 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
817 let c = CHANNEL.put(Channel::new());
818 let (s, r) = split(c);
819 drop(r);
820 s.closed().await;
821 }
822}
diff --git a/embassy/src/channel/signal.rs b/embassy/src/channel/signal.rs
index 027f4f47c..e1f6c4b1d 100644
--- a/embassy/src/channel/signal.rs
+++ b/embassy/src/channel/signal.rs
@@ -5,7 +5,7 @@ use core::task::{Context, Poll, Waker};
5 5
6/// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks. 6/// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks.
7/// For a simple use-case where the receiver is only ever interested in the latest value of 7/// For a simple use-case where the receiver is only ever interested in the latest value of
8/// something, Signals work well. For more advanced use cases, please consider [crate::channel::mpsc]. 8/// something, Signals work well. For more advanced use cases, you might want to use [`Channel`](crate::channel::channel::Channel) instead..
9/// 9///
10/// Signals are generally declared as being a static const and then borrowed as required. 10/// Signals are generally declared as being a static const and then borrowed as required.
11/// 11///
diff --git a/examples/nrf/src/bin/channel.rs b/examples/nrf/src/bin/channel.rs
new file mode 100644
index 000000000..476ec09a1
--- /dev/null
+++ b/examples/nrf/src/bin/channel.rs
@@ -0,0 +1,45 @@
1#![no_std]
2#![no_main]
3#![feature(type_alias_impl_trait)]
4
5use defmt::unwrap;
6use embassy::blocking_mutex::raw::ThreadModeRawMutex;
7use embassy::channel::channel::Channel;
8use embassy::executor::Spawner;
9use embassy::time::{Duration, Timer};
10use embassy_nrf::gpio::{Level, Output, OutputDrive};
11use embassy_nrf::Peripherals;
12
13use defmt_rtt as _; // global logger
14use panic_probe as _;
15
16enum LedState {
17 On,
18 Off,
19}
20
21static CHANNEL: Channel<ThreadModeRawMutex, LedState, 1> = Channel::new();
22
23#[embassy::task]
24async fn my_task() {
25 loop {
26 CHANNEL.send(LedState::On).await;
27 Timer::after(Duration::from_secs(1)).await;
28 CHANNEL.send(LedState::Off).await;
29 Timer::after(Duration::from_secs(1)).await;
30 }
31}
32
33#[embassy::main]
34async fn main(spawner: Spawner, p: Peripherals) {
35 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
36
37 unwrap!(spawner.spawn(my_task()));
38
39 loop {
40 match CHANNEL.recv().await {
41 LedState::On => led.set_high(),
42 LedState::Off => led.set_low(),
43 }
44 }
45}
diff --git a/examples/nrf/src/bin/channel_sender_receiver.rs b/examples/nrf/src/bin/channel_sender_receiver.rs
new file mode 100644
index 000000000..c79f2fd6b
--- /dev/null
+++ b/examples/nrf/src/bin/channel_sender_receiver.rs
@@ -0,0 +1,52 @@
1#![no_std]
2#![no_main]
3#![feature(type_alias_impl_trait)]
4
5use defmt::unwrap;
6use embassy::blocking_mutex::raw::NoopRawMutex;
7use embassy::channel::channel::{Channel, Receiver, Sender};
8use embassy::executor::Spawner;
9use embassy::time::{Duration, Timer};
10use embassy::util::Forever;
11use embassy_nrf::gpio::{AnyPin, Level, Output, OutputDrive, Pin};
12use embassy_nrf::Peripherals;
13
14use defmt_rtt as _; // global logger
15use panic_probe as _;
16
17enum LedState {
18 On,
19 Off,
20}
21
22static CHANNEL: Forever<Channel<NoopRawMutex, LedState, 1>> = Forever::new();
23
24#[embassy::task]
25async fn send_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) {
26 loop {
27 sender.send(LedState::On).await;
28 Timer::after(Duration::from_secs(1)).await;
29 sender.send(LedState::Off).await;
30 Timer::after(Duration::from_secs(1)).await;
31 }
32}
33
34#[embassy::task]
35async fn recv_task(led: AnyPin, receiver: Receiver<'static, NoopRawMutex, LedState, 1>) {
36 let mut led = Output::new(led, Level::Low, OutputDrive::Standard);
37
38 loop {
39 match receiver.recv().await {
40 LedState::On => led.set_high(),
41 LedState::Off => led.set_low(),
42 }
43 }
44}
45
46#[embassy::main]
47async fn main(spawner: Spawner, p: Peripherals) {
48 let channel = CHANNEL.put(Channel::new());
49
50 unwrap!(spawner.spawn(send_task(channel.sender())));
51 unwrap!(spawner.spawn(recv_task(p.P0_13.degrade(), channel.receiver())));
52}
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs
deleted file mode 100644
index 0cb182755..000000000
--- a/examples/nrf/src/bin/mpsc.rs
+++ /dev/null
@@ -1,60 +0,0 @@
1#![no_std]
2#![no_main]
3#![feature(type_alias_impl_trait)]
4
5use defmt::unwrap;
6use embassy::blocking_mutex::raw::NoopRawMutex;
7use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError};
8use embassy::executor::Spawner;
9use embassy::time::{Duration, Timer};
10use embassy::util::Forever;
11use embassy_nrf::gpio::{Level, Output, OutputDrive};
12use embassy_nrf::Peripherals;
13
14use defmt_rtt as _; // global logger
15use panic_probe as _;
16
17enum LedState {
18 On,
19 Off,
20}
21
22static CHANNEL: Forever<Channel<NoopRawMutex, LedState, 1>> = Forever::new();
23
24#[embassy::task(pool_size = 1)]
25async fn my_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) {
26 loop {
27 let _ = sender.send(LedState::On).await;
28 Timer::after(Duration::from_secs(1)).await;
29 let _ = sender.send(LedState::Off).await;
30 Timer::after(Duration::from_secs(1)).await;
31 }
32}
33
34#[embassy::main]
35async fn main(spawner: Spawner, p: Peripherals) {
36 let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard);
37
38 let channel = CHANNEL.put(Channel::new());
39 let (sender, mut receiver) = mpsc::split(channel);
40
41 unwrap!(spawner.spawn(my_task(sender)));
42
43 // We could just loop on `receiver.recv()` for simplicity. The code below
44 // is optimized to drain the queue as fast as possible in the spirit of
45 // handling events as fast as possible. This optimization is benign when in
46 // thread mode, but can be useful when interrupts are sending messages
47 // with the channel having been created via with_critical_sections.
48 loop {
49 let maybe_message = match receiver.try_recv() {
50 m @ Ok(..) => m.ok(),
51 Err(TryRecvError::Empty) => receiver.recv().await,
52 Err(TryRecvError::Closed) => break,
53 };
54 match maybe_message {
55 Some(LedState::On) => led.set_high(),
56 Some(LedState::Off) => led.set_low(),
57 _ => (),
58 }
59 }
60}
diff --git a/examples/nrf/src/bin/uart_split.rs b/examples/nrf/src/bin/uart_split.rs
index 909429b1a..3fde2f0d8 100644
--- a/examples/nrf/src/bin/uart_split.rs
+++ b/examples/nrf/src/bin/uart_split.rs
@@ -3,10 +3,9 @@
3#![feature(type_alias_impl_trait)] 3#![feature(type_alias_impl_trait)]
4 4
5use defmt::*; 5use defmt::*;
6use embassy::blocking_mutex::raw::NoopRawMutex; 6use embassy::blocking_mutex::raw::ThreadModeRawMutex;
7use embassy::channel::mpsc::{self, Channel, Sender}; 7use embassy::channel::channel::Channel;
8use embassy::executor::Spawner; 8use embassy::executor::Spawner;
9use embassy::util::Forever;
10use embassy_nrf::peripherals::UARTE0; 9use embassy_nrf::peripherals::UARTE0;
11use embassy_nrf::uarte::UarteRx; 10use embassy_nrf::uarte::UarteRx;
12use embassy_nrf::{interrupt, uarte, Peripherals}; 11use embassy_nrf::{interrupt, uarte, Peripherals};
@@ -14,7 +13,7 @@ use embassy_nrf::{interrupt, uarte, Peripherals};
14use defmt_rtt as _; // global logger 13use defmt_rtt as _; // global logger
15use panic_probe as _; 14use panic_probe as _;
16 15
17static CHANNEL: Forever<Channel<NoopRawMutex, [u8; 8], 1>> = Forever::new(); 16static CHANNEL: Channel<ThreadModeRawMutex, [u8; 8], 1> = Channel::new();
18 17
19#[embassy::main] 18#[embassy::main]
20async fn main(spawner: Spawner, p: Peripherals) { 19async fn main(spawner: Spawner, p: Peripherals) {
@@ -26,14 +25,11 @@ async fn main(spawner: Spawner, p: Peripherals) {
26 let uart = uarte::Uarte::new(p.UARTE0, irq, p.P0_08, p.P0_06, config); 25 let uart = uarte::Uarte::new(p.UARTE0, irq, p.P0_08, p.P0_06, config);
27 let (mut tx, rx) = uart.split(); 26 let (mut tx, rx) = uart.split();
28 27
29 let c = CHANNEL.put(Channel::new());
30 let (s, mut r) = mpsc::split(c);
31
32 info!("uarte initialized!"); 28 info!("uarte initialized!");
33 29
34 // Spawn a task responsible purely for reading 30 // Spawn a task responsible purely for reading
35 31
36 unwrap!(spawner.spawn(reader(rx, s))); 32 unwrap!(spawner.spawn(reader(rx)));
37 33
38 // Message must be in SRAM 34 // Message must be in SRAM
39 { 35 {
@@ -48,19 +44,18 @@ async fn main(spawner: Spawner, p: Peripherals) {
48 // back out the buffer we receive from the read 44 // back out the buffer we receive from the read
49 // task. 45 // task.
50 loop { 46 loop {
51 if let Some(buf) = r.recv().await { 47 let buf = CHANNEL.recv().await;
52 info!("writing..."); 48 info!("writing...");
53 unwrap!(tx.write(&buf).await); 49 unwrap!(tx.write(&buf).await);
54 }
55 } 50 }
56} 51}
57 52
58#[embassy::task] 53#[embassy::task]
59async fn reader(mut rx: UarteRx<'static, UARTE0>, s: Sender<'static, NoopRawMutex, [u8; 8], 1>) { 54async fn reader(mut rx: UarteRx<'static, UARTE0>) {
60 let mut buf = [0; 8]; 55 let mut buf = [0; 8];
61 loop { 56 loop {
62 info!("reading..."); 57 info!("reading...");
63 unwrap!(rx.read(&mut buf).await); 58 unwrap!(rx.read(&mut buf).await);
64 unwrap!(s.send(buf).await); 59 CHANNEL.send(buf).await;
65 } 60 }
66} 61}
diff --git a/examples/stm32f3/src/bin/button_events.rs b/examples/stm32f3/src/bin/button_events.rs
index 99aab3027..06e8eec1f 100644
--- a/examples/stm32f3/src/bin/button_events.rs
+++ b/examples/stm32f3/src/bin/button_events.rs
@@ -11,11 +11,10 @@
11#![feature(type_alias_impl_trait)] 11#![feature(type_alias_impl_trait)]
12 12
13use defmt::*; 13use defmt::*;
14use embassy::blocking_mutex::raw::NoopRawMutex; 14use embassy::blocking_mutex::raw::ThreadModeRawMutex;
15use embassy::channel::mpsc::{self, Channel, Receiver, Sender}; 15use embassy::channel::channel::Channel;
16use embassy::executor::Spawner; 16use embassy::executor::Spawner;
17use embassy::time::{with_timeout, Duration, Timer}; 17use embassy::time::{with_timeout, Duration, Timer};
18use embassy::util::Forever;
19use embassy_stm32::exti::ExtiInput; 18use embassy_stm32::exti::ExtiInput;
20use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed}; 19use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed};
21use embassy_stm32::peripherals::PA0; 20use embassy_stm32::peripherals::PA0;
@@ -51,14 +50,15 @@ impl<'a> Leds<'a> {
51 } 50 }
52 } 51 }
53 52
54 async fn show(&mut self, queue: &mut Receiver<'static, NoopRawMutex, ButtonEvent, 4>) { 53 async fn show(&mut self) {
55 self.leds[self.current_led].set_high(); 54 self.leds[self.current_led].set_high();
56 if let Ok(new_message) = with_timeout(Duration::from_millis(500), queue.recv()).await { 55 if let Ok(new_message) = with_timeout(Duration::from_millis(500), CHANNEL.recv()).await {
57 self.leds[self.current_led].set_low(); 56 self.leds[self.current_led].set_low();
58 self.process_event(new_message).await; 57 self.process_event(new_message).await;
59 } else { 58 } else {
60 self.leds[self.current_led].set_low(); 59 self.leds[self.current_led].set_low();
61 if let Ok(new_message) = with_timeout(Duration::from_millis(200), queue.recv()).await { 60 if let Ok(new_message) = with_timeout(Duration::from_millis(200), CHANNEL.recv()).await
61 {
62 self.process_event(new_message).await; 62 self.process_event(new_message).await;
63 } 63 }
64 } 64 }
@@ -77,15 +77,18 @@ impl<'a> Leds<'a> {
77 } 77 }
78 } 78 }
79 79
80 async fn process_event(&mut self, event: Option<ButtonEvent>) { 80 async fn process_event(&mut self, event: ButtonEvent) {
81 match event { 81 match event {
82 Some(ButtonEvent::SingleClick) => self.move_next(), 82 ButtonEvent::SingleClick => {
83 Some(ButtonEvent::DoubleClick) => { 83 self.move_next();
84 }
85 ButtonEvent::DoubleClick => {
84 self.change_direction(); 86 self.change_direction();
85 self.move_next() 87 self.move_next();
88 }
89 ButtonEvent::Hold => {
90 self.flash().await;
86 } 91 }
87 Some(ButtonEvent::Hold) => self.flash().await,
88 _ => {}
89 } 92 }
90 } 93 }
91} 94}
@@ -97,7 +100,7 @@ enum ButtonEvent {
97 Hold, 100 Hold,
98} 101}
99 102
100static BUTTON_EVENTS_QUEUE: Forever<Channel<NoopRawMutex, ButtonEvent, 4>> = Forever::new(); 103static CHANNEL: Channel<ThreadModeRawMutex, ButtonEvent, 4> = Channel::new();
101 104
102#[embassy::main] 105#[embassy::main]
103async fn main(spawner: Spawner, p: Peripherals) { 106async fn main(spawner: Spawner, p: Peripherals) {
@@ -116,27 +119,19 @@ async fn main(spawner: Spawner, p: Peripherals) {
116 ]; 119 ];
117 let leds = Leds::new(leds); 120 let leds = Leds::new(leds);
118 121
119 let buttons_queue = BUTTON_EVENTS_QUEUE.put(Channel::new()); 122 spawner.spawn(button_waiter(button)).unwrap();
120 let (sender, receiver) = mpsc::split(buttons_queue); 123 spawner.spawn(led_blinker(leds)).unwrap();
121 spawner.spawn(button_waiter(button, sender)).unwrap();
122 spawner.spawn(led_blinker(leds, receiver)).unwrap();
123} 124}
124 125
125#[embassy::task] 126#[embassy::task]
126async fn led_blinker( 127async fn led_blinker(mut leds: Leds<'static>) {
127 mut leds: Leds<'static>,
128 mut queue: Receiver<'static, NoopRawMutex, ButtonEvent, 4>,
129) {
130 loop { 128 loop {
131 leds.show(&mut queue).await; 129 leds.show().await;
132 } 130 }
133} 131}
134 132
135#[embassy::task] 133#[embassy::task]
136async fn button_waiter( 134async fn button_waiter(mut button: ExtiInput<'static, PA0>) {
137 mut button: ExtiInput<'static, PA0>,
138 queue: Sender<'static, NoopRawMutex, ButtonEvent, 4>,
139) {
140 const DOUBLE_CLICK_DELAY: u64 = 250; 135 const DOUBLE_CLICK_DELAY: u64 = 250;
141 const HOLD_DELAY: u64 = 1000; 136 const HOLD_DELAY: u64 = 1000;
142 137
@@ -150,9 +145,7 @@ async fn button_waiter(
150 .is_err() 145 .is_err()
151 { 146 {
152 info!("Hold"); 147 info!("Hold");
153 if queue.send(ButtonEvent::Hold).await.is_err() { 148 CHANNEL.send(ButtonEvent::Hold).await;
154 break;
155 }
156 button.wait_for_falling_edge().await; 149 button.wait_for_falling_edge().await;
157 } else if with_timeout( 150 } else if with_timeout(
158 Duration::from_millis(DOUBLE_CLICK_DELAY), 151 Duration::from_millis(DOUBLE_CLICK_DELAY),
@@ -161,15 +154,11 @@ async fn button_waiter(
161 .await 154 .await
162 .is_err() 155 .is_err()
163 { 156 {
164 if queue.send(ButtonEvent::SingleClick).await.is_err() {
165 break;
166 }
167 info!("Single click"); 157 info!("Single click");
158 CHANNEL.send(ButtonEvent::SingleClick).await;
168 } else { 159 } else {
169 info!("Double click"); 160 info!("Double click");
170 if queue.send(ButtonEvent::DoubleClick).await.is_err() { 161 CHANNEL.send(ButtonEvent::DoubleClick).await;
171 break;
172 }
173 button.wait_for_falling_edge().await; 162 button.wait_for_falling_edge().await;
174 } 163 }
175 button.wait_for_rising_edge().await; 164 button.wait_for_rising_edge().await;
diff --git a/examples/stm32h7/src/bin/usart_split.rs b/examples/stm32h7/src/bin/usart_split.rs
index ee1763aa4..40a7c3e44 100644
--- a/examples/stm32h7/src/bin/usart_split.rs
+++ b/examples/stm32h7/src/bin/usart_split.rs
@@ -4,10 +4,9 @@
4 4
5use defmt::*; 5use defmt::*;
6use defmt_rtt as _; // global logger 6use defmt_rtt as _; // global logger
7use embassy::blocking_mutex::raw::NoopRawMutex; 7use embassy::blocking_mutex::raw::ThreadModeRawMutex;
8use embassy::channel::mpsc::{self, Channel, Sender}; 8use embassy::channel::channel::Channel;
9use embassy::executor::Spawner; 9use embassy::executor::Spawner;
10use embassy::util::Forever;
11use embassy_stm32::dma::NoDma; 10use embassy_stm32::dma::NoDma;
12use embassy_stm32::{ 11use embassy_stm32::{
13 peripherals::{DMA1_CH1, UART7}, 12 peripherals::{DMA1_CH1, UART7},
@@ -28,7 +27,7 @@ async fn writer(mut usart: Uart<'static, UART7, NoDma, NoDma>) {
28 } 27 }
29} 28}
30 29
31static CHANNEL: Forever<Channel<NoopRawMutex, [u8; 8], 1>> = Forever::new(); 30static CHANNEL: Channel<ThreadModeRawMutex, [u8; 8], 1> = Channel::new();
32 31
33#[embassy::main] 32#[embassy::main]
34async fn main(spawner: Spawner, p: Peripherals) -> ! { 33async fn main(spawner: Spawner, p: Peripherals) -> ! {
@@ -40,28 +39,21 @@ async fn main(spawner: Spawner, p: Peripherals) -> ! {
40 39
41 let (mut tx, rx) = usart.split(); 40 let (mut tx, rx) = usart.split();
42 41
43 let c = CHANNEL.put(Channel::new()); 42 unwrap!(spawner.spawn(reader(rx)));
44 let (s, mut r) = mpsc::split(c);
45
46 unwrap!(spawner.spawn(reader(rx, s)));
47 43
48 loop { 44 loop {
49 if let Some(buf) = r.recv().await { 45 let buf = CHANNEL.recv().await;
50 info!("writing..."); 46 info!("writing...");
51 unwrap!(tx.write(&buf).await); 47 unwrap!(tx.write(&buf).await);
52 }
53 } 48 }
54} 49}
55 50
56#[embassy::task] 51#[embassy::task]
57async fn reader( 52async fn reader(mut rx: UartRx<'static, UART7, DMA1_CH1>) {
58 mut rx: UartRx<'static, UART7, DMA1_CH1>,
59 s: Sender<'static, NoopRawMutex, [u8; 8], 1>,
60) {
61 let mut buf = [0; 8]; 53 let mut buf = [0; 8];
62 loop { 54 loop {
63 info!("reading..."); 55 info!("reading...");
64 unwrap!(rx.read(&mut buf).await); 56 unwrap!(rx.read(&mut buf).await);
65 unwrap!(s.send(buf).await); 57 CHANNEL.send(buf).await;
66 } 58 }
67} 59}