aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/channel.rs
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2022-08-22 22:00:06 +0200
committerDario Nieuwenhuis <[email protected]>2022-08-22 22:18:13 +0200
commit5677b13a86beca58aa57ecfd7cea0db7ceb189fa (patch)
tree0c7425dae57acb94cb6ddca27def7e77609369b3 /embassy-sync/src/channel.rs
parent21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 (diff)
sync: flatten module structure.
Diffstat (limited to 'embassy-sync/src/channel.rs')
-rw-r--r--embassy-sync/src/channel.rs596
1 files changed, 596 insertions, 0 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
new file mode 100644
index 000000000..76f42d0e7
--- /dev/null
+++ b/embassy-sync/src/channel.rs
@@ -0,0 +1,596 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20
21use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32/// Send-only access to a [`Channel`].
33#[derive(Copy)]
34pub struct Sender<'ch, M, T, const N: usize>
35where
36 M: RawMutex,
37{
38 channel: &'ch Channel<M, T, N>,
39}
40
41impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
42where
43 M: RawMutex,
44{
45 fn clone(&self) -> Self {
46 Sender { channel: self.channel }
47 }
48}
49
50impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
51where
52 M: RawMutex,
53{
54 /// Sends a value.
55 ///
56 /// See [`Channel::send()`]
57 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
58 self.channel.send(message)
59 }
60
61 /// Attempt to immediately send a message.
62 ///
63 /// See [`Channel::send()`]
64 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67}
68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, T> {
72 channel: &'ch dyn DynamicChannel<T>,
73}
74
75impl<'ch, T> Clone for DynamicSender<'ch, T> {
76 fn clone(&self) -> Self {
77 DynamicSender { channel: self.channel }
78 }
79}
80
81impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
82where
83 M: RawMutex,
84{
85 fn from(s: Sender<'ch, M, T, N>) -> Self {
86 Self { channel: s.channel }
87 }
88}
89
90impl<'ch, T> DynamicSender<'ch, T> {
91 /// Sends a value.
92 ///
93 /// See [`Channel::send()`]
94 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
95 DynamicSendFuture {
96 channel: self.channel,
97 message: Some(message),
98 }
99 }
100
101 /// Attempt to immediately send a message.
102 ///
103 /// See [`Channel::send()`]
104 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
105 self.channel.try_send_with_context(message, None)
106 }
107}
108
109/// Receive-only access to a [`Channel`].
110#[derive(Copy)]
111pub struct Receiver<'ch, M, T, const N: usize>
112where
113 M: RawMutex,
114{
115 channel: &'ch Channel<M, T, N>,
116}
117
118impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
119where
120 M: RawMutex,
121{
122 fn clone(&self) -> Self {
123 Receiver { channel: self.channel }
124 }
125}
126
127impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
128where
129 M: RawMutex,
130{
131 /// Receive the next value.
132 ///
133 /// See [`Channel::recv()`].
134 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
135 self.channel.recv()
136 }
137
138 /// Attempt to immediately receive the next value.
139 ///
140 /// See [`Channel::try_recv()`]
141 pub fn try_recv(&self) -> Result<T, TryRecvError> {
142 self.channel.try_recv()
143 }
144}
145
146/// Receive-only access to a [`Channel`] without knowing channel size.
147#[derive(Copy)]
148pub struct DynamicReceiver<'ch, T> {
149 channel: &'ch dyn DynamicChannel<T>,
150}
151
152impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
153 fn clone(&self) -> Self {
154 DynamicReceiver { channel: self.channel }
155 }
156}
157
158impl<'ch, T> DynamicReceiver<'ch, T> {
159 /// Receive the next value.
160 ///
161 /// See [`Channel::recv()`].
162 pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
163 DynamicRecvFuture { channel: self.channel }
164 }
165
166 /// Attempt to immediately receive the next value.
167 ///
168 /// See [`Channel::try_recv()`]
169 pub fn try_recv(&self) -> Result<T, TryRecvError> {
170 self.channel.try_recv_with_context(None)
171 }
172}
173
174impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
175where
176 M: RawMutex,
177{
178 fn from(s: Receiver<'ch, M, T, N>) -> Self {
179 Self { channel: s.channel }
180 }
181}
182
183/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
184pub struct RecvFuture<'ch, M, T, const N: usize>
185where
186 M: RawMutex,
187{
188 channel: &'ch Channel<M, T, N>,
189}
190
191impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
192where
193 M: RawMutex,
194{
195 type Output = T;
196
197 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
198 match self.channel.try_recv_with_context(Some(cx)) {
199 Ok(v) => Poll::Ready(v),
200 Err(TryRecvError::Empty) => Poll::Pending,
201 }
202 }
203}
204
205/// Future returned by [`DynamicReceiver::recv`].
206pub struct DynamicRecvFuture<'ch, T> {
207 channel: &'ch dyn DynamicChannel<T>,
208}
209
210impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
211 type Output = T;
212
213 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
214 match self.channel.try_recv_with_context(Some(cx)) {
215 Ok(v) => Poll::Ready(v),
216 Err(TryRecvError::Empty) => Poll::Pending,
217 }
218 }
219}
220
221/// Future returned by [`Channel::send`] and [`Sender::send`].
222pub struct SendFuture<'ch, M, T, const N: usize>
223where
224 M: RawMutex,
225{
226 channel: &'ch Channel<M, T, N>,
227 message: Option<T>,
228}
229
230impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
231where
232 M: RawMutex,
233{
234 type Output = ();
235
236 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237 match self.message.take() {
238 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
239 Ok(..) => Poll::Ready(()),
240 Err(TrySendError::Full(m)) => {
241 self.message = Some(m);
242 Poll::Pending
243 }
244 },
245 None => panic!("Message cannot be None"),
246 }
247 }
248}
249
250impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
251
252/// Future returned by [`DynamicSender::send`].
253pub struct DynamicSendFuture<'ch, T> {
254 channel: &'ch dyn DynamicChannel<T>,
255 message: Option<T>,
256}
257
258impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
259 type Output = ();
260
261 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 match self.message.take() {
263 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
264 Ok(..) => Poll::Ready(()),
265 Err(TrySendError::Full(m)) => {
266 self.message = Some(m);
267 Poll::Pending
268 }
269 },
270 None => panic!("Message cannot be None"),
271 }
272 }
273}
274
275impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
276
277trait DynamicChannel<T> {
278 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
279
280 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
281}
282
283/// Error returned by [`try_recv`](Channel::try_recv).
284#[derive(PartialEq, Eq, Clone, Copy, Debug)]
285#[cfg_attr(feature = "defmt", derive(defmt::Format))]
286pub enum TryRecvError {
287 /// A message could not be received because the channel is empty.
288 Empty,
289}
290
291/// Error returned by [`try_send`](Channel::try_send).
292#[derive(PartialEq, Eq, Clone, Copy, Debug)]
293#[cfg_attr(feature = "defmt", derive(defmt::Format))]
294pub enum TrySendError<T> {
295 /// The data could not be sent on the channel because the channel is
296 /// currently full and sending would require blocking.
297 Full(T),
298}
299
300struct ChannelState<T, const N: usize> {
301 queue: Deque<T, N>,
302 receiver_waker: WakerRegistration,
303 senders_waker: WakerRegistration,
304}
305
306impl<T, const N: usize> ChannelState<T, N> {
307 const fn new() -> Self {
308 ChannelState {
309 queue: Deque::new(),
310 receiver_waker: WakerRegistration::new(),
311 senders_waker: WakerRegistration::new(),
312 }
313 }
314
315 fn try_recv(&mut self) -> Result<T, TryRecvError> {
316 self.try_recv_with_context(None)
317 }
318
319 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
320 if self.queue.is_full() {
321 self.senders_waker.wake();
322 }
323
324 if let Some(message) = self.queue.pop_front() {
325 Ok(message)
326 } else {
327 if let Some(cx) = cx {
328 self.receiver_waker.register(cx.waker());
329 }
330 Err(TryRecvError::Empty)
331 }
332 }
333
334 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
335 self.try_send_with_context(message, None)
336 }
337
338 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
339 match self.queue.push_back(message) {
340 Ok(()) => {
341 self.receiver_waker.wake();
342 Ok(())
343 }
344 Err(message) => {
345 if let Some(cx) = cx {
346 self.senders_waker.register(cx.waker());
347 }
348 Err(TrySendError::Full(message))
349 }
350 }
351 }
352}
353
354/// A bounded channel for communicating between asynchronous tasks
355/// with backpressure.
356///
357/// The channel will buffer up to the provided number of messages. Once the
358/// buffer is full, attempts to `send` new messages will wait until a message is
359/// received from the channel.
360///
361/// All data sent will become available in the same order as it was sent.
362pub struct Channel<M, T, const N: usize>
363where
364 M: RawMutex,
365{
366 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
367}
368
369impl<M, T, const N: usize> Channel<M, T, N>
370where
371 M: RawMutex,
372{
373 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
374 ///
375 /// ```
376 /// use embassy_sync::channel::Channel;
377 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
378 ///
379 /// // Declare a bounded channel of 3 u32s.
380 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
381 /// ```
382 pub const fn new() -> Self {
383 Self {
384 inner: Mutex::new(RefCell::new(ChannelState::new())),
385 }
386 }
387
388 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
389 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
390 }
391
392 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
393 self.lock(|c| c.try_recv_with_context(cx))
394 }
395
396 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
397 self.lock(|c| c.try_send_with_context(m, cx))
398 }
399
400 /// Get a sender for this channel.
401 pub fn sender(&self) -> Sender<'_, M, T, N> {
402 Sender { channel: self }
403 }
404
405 /// Get a receiver for this channel.
406 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
407 Receiver { channel: self }
408 }
409
410 /// Send a value, waiting until there is capacity.
411 ///
412 /// Sending completes when the value has been pushed to the channel's queue.
413 /// This doesn't mean the value has been received yet.
414 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
415 SendFuture {
416 channel: self,
417 message: Some(message),
418 }
419 }
420
421 /// Attempt to immediately send a message.
422 ///
423 /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
424 /// buffer is full, instead of waiting.
425 ///
426 /// # Errors
427 ///
428 /// If the channel capacity has been reached, i.e., the channel has `n`
429 /// buffered values where `n` is the argument passed to [`Channel`], then an
430 /// error is returned.
431 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
432 self.lock(|c| c.try_send(message))
433 }
434
435 /// Receive the next value.
436 ///
437 /// If there are no messages in the channel's buffer, this method will
438 /// wait until a message is sent.
439 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
440 RecvFuture { channel: self }
441 }
442
443 /// Attempt to immediately receive a message.
444 ///
445 /// This method will either receive a message from the channel immediately or return an error
446 /// if the channel is empty.
447 pub fn try_recv(&self) -> Result<T, TryRecvError> {
448 self.lock(|c| c.try_recv())
449 }
450}
451
452/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
453/// tradeoff cost of dynamic dispatch.
454impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
455where
456 M: RawMutex,
457{
458 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
459 Channel::try_send_with_context(self, m, cx)
460 }
461
462 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
463 Channel::try_recv_with_context(self, cx)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use core::time::Duration;
470
471 use futures_executor::ThreadPool;
472 use futures_timer::Delay;
473 use futures_util::task::SpawnExt;
474 use static_cell::StaticCell;
475
476 use super::*;
477 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
478
479 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
480 c.queue.capacity() - c.queue.len()
481 }
482
483 #[test]
484 fn sending_once() {
485 let mut c = ChannelState::<u32, 3>::new();
486 assert!(c.try_send(1).is_ok());
487 assert_eq!(capacity(&c), 2);
488 }
489
490 #[test]
491 fn sending_when_full() {
492 let mut c = ChannelState::<u32, 3>::new();
493 let _ = c.try_send(1);
494 let _ = c.try_send(1);
495 let _ = c.try_send(1);
496 match c.try_send(2) {
497 Err(TrySendError::Full(2)) => assert!(true),
498 _ => assert!(false),
499 }
500 assert_eq!(capacity(&c), 0);
501 }
502
503 #[test]
504 fn receiving_once_with_one_send() {
505 let mut c = ChannelState::<u32, 3>::new();
506 assert!(c.try_send(1).is_ok());
507 assert_eq!(c.try_recv().unwrap(), 1);
508 assert_eq!(capacity(&c), 3);
509 }
510
511 #[test]
512 fn receiving_when_empty() {
513 let mut c = ChannelState::<u32, 3>::new();
514 match c.try_recv() {
515 Err(TryRecvError::Empty) => assert!(true),
516 _ => assert!(false),
517 }
518 assert_eq!(capacity(&c), 3);
519 }
520
521 #[test]
522 fn simple_send_and_receive() {
523 let c = Channel::<NoopRawMutex, u32, 3>::new();
524 assert!(c.try_send(1).is_ok());
525 assert_eq!(c.try_recv().unwrap(), 1);
526 }
527
528 #[test]
529 fn cloning() {
530 let c = Channel::<NoopRawMutex, u32, 3>::new();
531 let r1 = c.receiver();
532 let s1 = c.sender();
533
534 let _ = r1.clone();
535 let _ = s1.clone();
536 }
537
538 #[test]
539 fn dynamic_dispatch() {
540 let c = Channel::<NoopRawMutex, u32, 3>::new();
541 let s: DynamicSender<'_, u32> = c.sender().into();
542 let r: DynamicReceiver<'_, u32> = c.receiver().into();
543
544 assert!(s.try_send(1).is_ok());
545 assert_eq!(r.try_recv().unwrap(), 1);
546 }
547
548 #[futures_test::test]
549 async fn receiver_receives_given_try_send_async() {
550 let executor = ThreadPool::new().unwrap();
551
552 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
553 let c = &*CHANNEL.init(Channel::new());
554 let c2 = c;
555 assert!(executor
556 .spawn(async move {
557 assert!(c2.try_send(1).is_ok());
558 })
559 .is_ok());
560 assert_eq!(c.recv().await, 1);
561 }
562
563 #[futures_test::test]
564 async fn sender_send_completes_if_capacity() {
565 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
566 c.send(1).await;
567 assert_eq!(c.recv().await, 1);
568 }
569
570 #[futures_test::test]
571 async fn senders_sends_wait_until_capacity() {
572 let executor = ThreadPool::new().unwrap();
573
574 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
575 let c = &*CHANNEL.init(Channel::new());
576 assert!(c.try_send(1).is_ok());
577
578 let c2 = c;
579 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
580 let c2 = c;
581 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
582 // Wish I could think of a means of determining that the async send is waiting instead.
583 // However, I've used the debugger to observe that the send does indeed wait.
584 Delay::new(Duration::from_millis(500)).await;
585 assert_eq!(c.recv().await, 1);
586 assert!(executor
587 .spawn(async move {
588 loop {
589 c.recv().await;
590 }
591 })
592 .is_ok());
593 send_task_1.unwrap().await;
594 send_task_2.unwrap().await;
595 }
596}