aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/priority_channel.rs
diff options
context:
space:
mode:
authorScott Mabin <[email protected]>2023-11-18 15:01:12 +0000
committerScott Mabin <[email protected]>2023-11-18 15:01:12 +0000
commitf482a105b8491f3c21d41cb7e6f52fe6d778258f (patch)
treeafd4e5a46641f7111186eb578b2c56b3f1645ff7 /embassy-sync/src/priority_channel.rs
parent7589b5e13e0e01922804e603918fea0aa4dac30a (diff)
more clean up, refactor channel into module to share code
Diffstat (limited to 'embassy-sync/src/priority_channel.rs')
-rw-r--r--embassy-sync/src/priority_channel.rs653
1 files changed, 0 insertions, 653 deletions
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
deleted file mode 100644
index 143662814..000000000
--- a/embassy-sync/src/priority_channel.rs
+++ /dev/null
@@ -1,653 +0,0 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue.
4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel.
5
6use core::cell::RefCell;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use heapless::binary_heap::Kind;
12use heapless::BinaryHeap;
13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration;
18
19/// Send-only access to a [`PriorityChannel`].
20pub struct Sender<'ch, M, T, K, const N: usize>
21where
22 T: Ord,
23 K: Kind,
24 M: RawMutex,
25{
26 channel: &'ch PriorityChannel<M, T, K, N>,
27}
28
29impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
30where
31 T: Ord,
32 K: Kind,
33 M: RawMutex,
34{
35 fn clone(&self) -> Self {
36 Sender { channel: self.channel }
37 }
38}
39
40impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
41where
42 T: Ord,
43 K: Kind,
44 M: RawMutex,
45{
46}
47
48impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
49where
50 T: Ord,
51 K: Kind,
52 M: RawMutex,
53{
54 /// Sends a value.
55 ///
56 /// See [`PriorityChannel::send()`]
57 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> {
58 self.channel.send(message)
59 }
60
61 /// Attempt to immediately send a message.
62 ///
63 /// See [`PriorityChannel::send()`]
64 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67
68 /// Allows a poll_fn to poll until the channel is ready to send
69 ///
70 /// See [`PriorityChannel::poll_ready_to_send()`]
71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx)
73 }
74}
75
76impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
77where
78 T: Ord,
79 K: Kind,
80 M: RawMutex,
81{
82 fn from(s: Sender<'ch, M, T, K, N>) -> Self {
83 Self { channel: s.channel }
84 }
85}
86
87/// Receive-only access to a [`PriorityChannel`].
88pub struct Receiver<'ch, M, T, K, const N: usize>
89where
90 T: Ord,
91 K: Kind,
92 M: RawMutex,
93{
94 channel: &'ch PriorityChannel<M, T, K, N>,
95}
96
97impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
98where
99 T: Ord,
100 K: Kind,
101 M: RawMutex,
102{
103 fn clone(&self) -> Self {
104 Receiver { channel: self.channel }
105 }
106}
107
108impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
109where
110 T: Ord,
111 K: Kind,
112 M: RawMutex,
113{
114}
115
116impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
117where
118 T: Ord,
119 K: Kind,
120 M: RawMutex,
121{
122 /// Receive the next value.
123 ///
124 /// See [`PriorityChannel::receive()`].
125 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
126 self.channel.receive()
127 }
128
129 /// Attempt to immediately receive the next value.
130 ///
131 /// See [`PriorityChannel::try_receive()`]
132 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
133 self.channel.try_receive()
134 }
135
136 /// Allows a poll_fn to poll until the channel is ready to receive
137 ///
138 /// See [`PriorityChannel::poll_ready_to_receive()`]
139 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
140 self.channel.poll_ready_to_receive(cx)
141 }
142
143 /// Poll the channel for the next item
144 ///
145 /// See [`PriorityChannel::poll_receive()`]
146 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
147 self.channel.poll_receive(cx)
148 }
149}
150
151impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
152where
153 T: Ord,
154 K: Kind,
155 M: RawMutex,
156{
157 fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
158 Self { channel: s.channel }
159 }
160}
161
162/// Future returned by [`PriorityChannel::receive`] and [`Receiver::receive`].
163#[must_use = "futures do nothing unless you `.await` or poll them"]
164pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
165where
166 T: Ord,
167 K: Kind,
168 M: RawMutex,
169{
170 channel: &'ch PriorityChannel<M, T, K, N>,
171}
172
173impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
174where
175 T: Ord,
176 K: Kind,
177 M: RawMutex,
178{
179 type Output = T;
180
181 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
182 self.channel.poll_receive(cx)
183 }
184}
185
186/// Future returned by [`DynamicReceiver::receive`].
187#[must_use = "futures do nothing unless you `.await` or poll them"]
188pub struct DynamicReceiveFuture<'ch, T> {
189 channel: &'ch dyn DynamicChannel<T>,
190}
191
192impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
193 type Output = T;
194
195 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
196 match self.channel.try_receive_with_context(Some(cx)) {
197 Ok(v) => Poll::Ready(v),
198 Err(TryReceiveError::Empty) => Poll::Pending,
199 }
200 }
201}
202
203/// Future returned by [`PriorityChannel::send`] and [`Sender::send`].
204#[must_use = "futures do nothing unless you `.await` or poll them"]
205pub struct SendFuture<'ch, M, T, K, const N: usize>
206where
207 T: Ord,
208 K: Kind,
209 M: RawMutex,
210{
211 channel: &'ch PriorityChannel<M, T, K, N>,
212 message: Option<T>,
213}
214
215impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
216where
217 T: Ord,
218 K: Kind,
219 M: RawMutex,
220{
221 type Output = ();
222
223 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
224 match self.message.take() {
225 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
226 Ok(..) => Poll::Ready(()),
227 Err(TrySendError::Full(m)) => {
228 self.message = Some(m);
229 Poll::Pending
230 }
231 },
232 None => panic!("Message cannot be None"),
233 }
234 }
235}
236
237impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
238where
239 T: Ord,
240 K: Kind,
241 M: RawMutex,
242{
243}
244
245/// Future returned by [`DynamicSender::send`].
246#[must_use = "futures do nothing unless you `.await` or poll them"]
247pub struct DynamicSendFuture<'ch, T> {
248 channel: &'ch dyn DynamicChannel<T>,
249 message: Option<T>,
250}
251
252impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
253 type Output = ();
254
255 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256 match self.message.take() {
257 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
258 Ok(..) => Poll::Ready(()),
259 Err(TrySendError::Full(m)) => {
260 self.message = Some(m);
261 Poll::Pending
262 }
263 },
264 None => panic!("Message cannot be None"),
265 }
266 }
267}
268
269impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
270
271struct ChannelState<T, K, const N: usize> {
272 queue: BinaryHeap<T, K, N>,
273 receiver_waker: WakerRegistration,
274 senders_waker: WakerRegistration,
275}
276
277impl<T, K, const N: usize> ChannelState<T, K, N>
278where
279 T: Ord,
280 K: Kind,
281{
282 const fn new() -> Self {
283 ChannelState {
284 queue: BinaryHeap::new(),
285 receiver_waker: WakerRegistration::new(),
286 senders_waker: WakerRegistration::new(),
287 }
288 }
289
290 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
291 self.try_receive_with_context(None)
292 }
293
294 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
295 if self.queue.len() == self.queue.capacity() {
296 self.senders_waker.wake();
297 }
298
299 if let Some(message) = self.queue.pop() {
300 Ok(message)
301 } else {
302 if let Some(cx) = cx {
303 self.receiver_waker.register(cx.waker());
304 }
305 Err(TryReceiveError::Empty)
306 }
307 }
308
309 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
310 if self.queue.len() == self.queue.capacity() {
311 self.senders_waker.wake();
312 }
313
314 if let Some(message) = self.queue.pop() {
315 Poll::Ready(message)
316 } else {
317 self.receiver_waker.register(cx.waker());
318 Poll::Pending
319 }
320 }
321
322 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
323 self.receiver_waker.register(cx.waker());
324
325 if !self.queue.is_empty() {
326 Poll::Ready(())
327 } else {
328 Poll::Pending
329 }
330 }
331
332 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
333 self.try_send_with_context(message, None)
334 }
335
336 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
337 match self.queue.push(message) {
338 Ok(()) => {
339 self.receiver_waker.wake();
340 Ok(())
341 }
342 Err(message) => {
343 if let Some(cx) = cx {
344 self.senders_waker.register(cx.waker());
345 }
346 Err(TrySendError::Full(message))
347 }
348 }
349 }
350
351 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
352 self.senders_waker.register(cx.waker());
353
354 if !self.queue.len() == self.queue.capacity() {
355 Poll::Ready(())
356 } else {
357 Poll::Pending
358 }
359 }
360}
361
362/// A bounded channel for communicating between asynchronous tasks
363/// with backpressure.
364///
365/// The channel will buffer up to the provided number of messages. Once the
366/// buffer is full, attempts to `send` new messages will wait until a message is
367/// received from the channel.
368///
369/// All data sent will become available in the same order as it was sent.
370pub struct PriorityChannel<M, T, K, const N: usize>
371where
372 T: Ord,
373 K: Kind,
374 M: RawMutex,
375{
376 inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
377}
378
379impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
380where
381 T: Ord,
382 K: Kind,
383 M: RawMutex,
384{
385 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
386 ///
387 /// ```
388 /// # use heapless::binary_heap::Max;
389 /// use embassy_sync::priority_channel::PriorityChannel;
390 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
391 ///
392 /// // Declare a bounded channel of 3 u32s.
393 /// let mut channel = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
394 /// ```
395 pub const fn new() -> Self {
396 Self {
397 inner: Mutex::new(RefCell::new(ChannelState::new())),
398 }
399 }
400
401 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, K, N>) -> R) -> R {
402 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
403 }
404
405 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
406 self.lock(|c| c.try_receive_with_context(cx))
407 }
408
409 /// Poll the channel for the next message
410 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
411 self.lock(|c| c.poll_receive(cx))
412 }
413
414 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
415 self.lock(|c| c.try_send_with_context(m, cx))
416 }
417
418 /// Allows a poll_fn to poll until the channel is ready to receive
419 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
420 self.lock(|c| c.poll_ready_to_receive(cx))
421 }
422
423 /// Allows a poll_fn to poll until the channel is ready to send
424 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
425 self.lock(|c| c.poll_ready_to_send(cx))
426 }
427
428 /// Get a sender for this channel.
429 pub fn sender(&self) -> Sender<'_, M, T, K, N> {
430 Sender { channel: self }
431 }
432
433 /// Get a receiver for this channel.
434 pub fn receiver(&self) -> Receiver<'_, M, T, K, N> {
435 Receiver { channel: self }
436 }
437
438 /// Send a value, waiting until there is capacity.
439 ///
440 /// Sending completes when the value has been pushed to the channel's queue.
441 /// This doesn't mean the value has been received yet.
442 pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> {
443 SendFuture {
444 channel: self,
445 message: Some(message),
446 }
447 }
448
449 /// Attempt to immediately send a message.
450 ///
451 /// This method differs from [`send`](PriorityChannel::send) by returning immediately if the channel's
452 /// buffer is full, instead of waiting.
453 ///
454 /// # Errors
455 ///
456 /// If the channel capacity has been reached, i.e., the channel has `n`
457 /// buffered values where `n` is the argument passed to [`PriorityChannel`], then an
458 /// error is returned.
459 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
460 self.lock(|c| c.try_send(message))
461 }
462
463 /// Receive the next value.
464 ///
465 /// If there are no messages in the channel's buffer, this method will
466 /// wait until a message is sent.
467 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
468 ReceiveFuture { channel: self }
469 }
470
471 /// Attempt to immediately receive a message.
472 ///
473 /// This method will either receive a message from the channel immediately or return an error
474 /// if the channel is empty.
475 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
476 self.lock(|c| c.try_receive())
477 }
478}
479
480/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
481/// tradeoff cost of dynamic dispatch.
482impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
483where
484 T: Ord,
485 K: Kind,
486 M: RawMutex,
487{
488 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
489 PriorityChannel::try_send_with_context(self, m, cx)
490 }
491
492 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
493 PriorityChannel::try_receive_with_context(self, cx)
494 }
495
496 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
497 PriorityChannel::poll_ready_to_send(self, cx)
498 }
499
500 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
501 PriorityChannel::poll_ready_to_receive(self, cx)
502 }
503
504 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
505 PriorityChannel::poll_receive(self, cx)
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use core::time::Duration;
512
513 use futures_executor::ThreadPool;
514 use futures_timer::Delay;
515 use futures_util::task::SpawnExt;
516 use heapless::binary_heap::{Kind, Max};
517 use static_cell::StaticCell;
518
519 use super::*;
520 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
521
522 fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
523 where
524 T: Ord,
525 K: Kind,
526 {
527 c.queue.capacity() - c.queue.len()
528 }
529
530 #[test]
531 fn sending_once() {
532 let mut c = ChannelState::<u32, Max, 3>::new();
533 assert!(c.try_send(1).is_ok());
534 assert_eq!(capacity(&c), 2);
535 }
536
537 #[test]
538 fn sending_when_full() {
539 let mut c = ChannelState::<u32, Max, 3>::new();
540 let _ = c.try_send(1);
541 let _ = c.try_send(1);
542 let _ = c.try_send(1);
543 match c.try_send(2) {
544 Err(TrySendError::Full(2)) => assert!(true),
545 _ => assert!(false),
546 }
547 assert_eq!(capacity(&c), 0);
548 }
549
550 #[test]
551 fn send_priority() {
552 // Prio channel with kind `Max` sifts larger numbers to the front of the queue
553 let mut c = ChannelState::<u32, Max, 3>::new();
554 assert!(c.try_send(1).is_ok());
555 assert!(c.try_send(3).is_ok());
556 assert_eq!(c.try_receive().unwrap(), 3);
557 assert_eq!(c.try_receive().unwrap(), 1);
558 }
559
560 #[test]
561 fn receiving_once_with_one_send() {
562 let mut c = ChannelState::<u32, Max, 3>::new();
563 assert!(c.try_send(1).is_ok());
564 assert_eq!(c.try_receive().unwrap(), 1);
565 assert_eq!(capacity(&c), 3);
566 }
567
568 #[test]
569 fn receiving_when_empty() {
570 let mut c = ChannelState::<u32, Max, 3>::new();
571 match c.try_receive() {
572 Err(TryReceiveError::Empty) => assert!(true),
573 _ => assert!(false),
574 }
575 assert_eq!(capacity(&c), 3);
576 }
577
578 #[test]
579 fn simple_send_and_receive() {
580 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
581 assert!(c.try_send(1).is_ok());
582 assert_eq!(c.try_receive().unwrap(), 1);
583 }
584
585 #[test]
586 fn cloning() {
587 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
588 let r1 = c.receiver();
589 let s1 = c.sender();
590
591 let _ = r1.clone();
592 let _ = s1.clone();
593 }
594
595 #[test]
596 fn dynamic_dispatch() {
597 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
598 let s: DynamicSender<'_, u32> = c.sender().into();
599 let r: DynamicReceiver<'_, u32> = c.receiver().into();
600
601 assert!(s.try_send(1).is_ok());
602 assert_eq!(r.try_receive().unwrap(), 1);
603 }
604
605 #[futures_test::test]
606 async fn receiver_receives_given_try_send_async() {
607 let executor = ThreadPool::new().unwrap();
608
609 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
610 let c = &*CHANNEL.init(PriorityChannel::new());
611 let c2 = c;
612 assert!(executor
613 .spawn(async move {
614 assert!(c2.try_send(1).is_ok());
615 })
616 .is_ok());
617 assert_eq!(c.receive().await, 1);
618 }
619
620 #[futures_test::test]
621 async fn sender_send_completes_if_capacity() {
622 let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
623 c.send(1).await;
624 assert_eq!(c.receive().await, 1);
625 }
626
627 #[futures_test::test]
628 async fn senders_sends_wait_until_capacity() {
629 let executor = ThreadPool::new().unwrap();
630
631 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
632 let c = &*CHANNEL.init(PriorityChannel::new());
633 assert!(c.try_send(1).is_ok());
634
635 let c2 = c;
636 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
637 let c2 = c;
638 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
639 // Wish I could think of a means of determining that the async send is waiting instead.
640 // However, I've used the debugger to observe that the send does indeed wait.
641 Delay::new(Duration::from_millis(500)).await;
642 assert_eq!(c.receive().await, 1);
643 assert!(executor
644 .spawn(async move {
645 loop {
646 c.receive().await;
647 }
648 })
649 .is_ok());
650 send_task_1.unwrap().await;
651 send_task_2.unwrap().await;
652 }
653}