aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/priority_channel.rs
diff options
context:
space:
mode:
authorScott Mabin <[email protected]>2023-11-20 11:28:31 +0000
committerScott Mabin <[email protected]>2023-11-20 11:28:31 +0000
commit454828accbfa3eecfbe782a6a23435c7a01ee29b (patch)
tree83361bd7931bed85979d4d75ca27295187fe8a6d /embassy-sync/src/priority_channel.rs
parent5a60024af71b70c059d4a2a2eacdfd7f73a3398d (diff)
revert module changes, reexport heapless relevant items
Diffstat (limited to 'embassy-sync/src/priority_channel.rs')
-rw-r--r--embassy-sync/src/priority_channel.rs613
1 files changed, 613 insertions, 0 deletions
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
new file mode 100644
index 000000000..bd75c0135
--- /dev/null
+++ b/embassy-sync/src/priority_channel.rs
@@ -0,0 +1,613 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue.
4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel.
5
6use core::cell::RefCell;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub use heapless::binary_heap::{Kind, Max, Min};
12use heapless::BinaryHeap;
13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration;
18
19/// Send-only access to a [`PriorityChannel`].
20pub struct Sender<'ch, M, T, K, const N: usize>
21where
22 T: Ord,
23 K: Kind,
24 M: RawMutex,
25{
26 channel: &'ch PriorityChannel<M, T, K, N>,
27}
28
29impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
30where
31 T: Ord,
32 K: Kind,
33 M: RawMutex,
34{
35 fn clone(&self) -> Self {
36 Sender { channel: self.channel }
37 }
38}
39
40impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
41where
42 T: Ord,
43 K: Kind,
44 M: RawMutex,
45{
46}
47
48impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
49where
50 T: Ord,
51 K: Kind,
52 M: RawMutex,
53{
54 /// Sends a value.
55 ///
56 /// See [`PriorityChannel::send()`]
57 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> {
58 self.channel.send(message)
59 }
60
61 /// Attempt to immediately send a message.
62 ///
63 /// See [`PriorityChannel::send()`]
64 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67
68 /// Allows a poll_fn to poll until the channel is ready to send
69 ///
70 /// See [`PriorityChannel::poll_ready_to_send()`]
71 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx)
73 }
74}
75
76impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
77where
78 T: Ord,
79 K: Kind,
80 M: RawMutex,
81{
82 fn from(s: Sender<'ch, M, T, K, N>) -> Self {
83 Self { channel: s.channel }
84 }
85}
86
87/// Receive-only access to a [`PriorityChannel`].
88pub struct Receiver<'ch, M, T, K, const N: usize>
89where
90 T: Ord,
91 K: Kind,
92 M: RawMutex,
93{
94 channel: &'ch PriorityChannel<M, T, K, N>,
95}
96
97impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
98where
99 T: Ord,
100 K: Kind,
101 M: RawMutex,
102{
103 fn clone(&self) -> Self {
104 Receiver { channel: self.channel }
105 }
106}
107
108impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
109where
110 T: Ord,
111 K: Kind,
112 M: RawMutex,
113{
114}
115
116impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
117where
118 T: Ord,
119 K: Kind,
120 M: RawMutex,
121{
122 /// Receive the next value.
123 ///
124 /// See [`PriorityChannel::receive()`].
125 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
126 self.channel.receive()
127 }
128
129 /// Attempt to immediately receive the next value.
130 ///
131 /// See [`PriorityChannel::try_receive()`]
132 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
133 self.channel.try_receive()
134 }
135
136 /// Allows a poll_fn to poll until the channel is ready to receive
137 ///
138 /// See [`PriorityChannel::poll_ready_to_receive()`]
139 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
140 self.channel.poll_ready_to_receive(cx)
141 }
142
143 /// Poll the channel for the next item
144 ///
145 /// See [`PriorityChannel::poll_receive()`]
146 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
147 self.channel.poll_receive(cx)
148 }
149}
150
151impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
152where
153 T: Ord,
154 K: Kind,
155 M: RawMutex,
156{
157 fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
158 Self { channel: s.channel }
159 }
160}
161
162/// Future returned by [`PriorityChannel::receive`] and [`Receiver::receive`].
163#[must_use = "futures do nothing unless you `.await` or poll them"]
164pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
165where
166 T: Ord,
167 K: Kind,
168 M: RawMutex,
169{
170 channel: &'ch PriorityChannel<M, T, K, N>,
171}
172
173impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
174where
175 T: Ord,
176 K: Kind,
177 M: RawMutex,
178{
179 type Output = T;
180
181 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
182 self.channel.poll_receive(cx)
183 }
184}
185
186/// Future returned by [`PriorityChannel::send`] and [`Sender::send`].
187#[must_use = "futures do nothing unless you `.await` or poll them"]
188pub struct SendFuture<'ch, M, T, K, const N: usize>
189where
190 T: Ord,
191 K: Kind,
192 M: RawMutex,
193{
194 channel: &'ch PriorityChannel<M, T, K, N>,
195 message: Option<T>,
196}
197
198impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
199where
200 T: Ord,
201 K: Kind,
202 M: RawMutex,
203{
204 type Output = ();
205
206 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207 match self.message.take() {
208 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
209 Ok(..) => Poll::Ready(()),
210 Err(TrySendError::Full(m)) => {
211 self.message = Some(m);
212 Poll::Pending
213 }
214 },
215 None => panic!("Message cannot be None"),
216 }
217 }
218}
219
220impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
221where
222 T: Ord,
223 K: Kind,
224 M: RawMutex,
225{
226}
227
228struct ChannelState<T, K, const N: usize> {
229 queue: BinaryHeap<T, K, N>,
230 receiver_waker: WakerRegistration,
231 senders_waker: WakerRegistration,
232}
233
234impl<T, K, const N: usize> ChannelState<T, K, N>
235where
236 T: Ord,
237 K: Kind,
238{
239 const fn new() -> Self {
240 ChannelState {
241 queue: BinaryHeap::new(),
242 receiver_waker: WakerRegistration::new(),
243 senders_waker: WakerRegistration::new(),
244 }
245 }
246
247 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
248 self.try_receive_with_context(None)
249 }
250
251 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
252 if self.queue.len() == self.queue.capacity() {
253 self.senders_waker.wake();
254 }
255
256 if let Some(message) = self.queue.pop() {
257 Ok(message)
258 } else {
259 if let Some(cx) = cx {
260 self.receiver_waker.register(cx.waker());
261 }
262 Err(TryReceiveError::Empty)
263 }
264 }
265
266 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
267 if self.queue.len() == self.queue.capacity() {
268 self.senders_waker.wake();
269 }
270
271 if let Some(message) = self.queue.pop() {
272 Poll::Ready(message)
273 } else {
274 self.receiver_waker.register(cx.waker());
275 Poll::Pending
276 }
277 }
278
279 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
280 self.receiver_waker.register(cx.waker());
281
282 if !self.queue.is_empty() {
283 Poll::Ready(())
284 } else {
285 Poll::Pending
286 }
287 }
288
289 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
290 self.try_send_with_context(message, None)
291 }
292
293 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
294 match self.queue.push(message) {
295 Ok(()) => {
296 self.receiver_waker.wake();
297 Ok(())
298 }
299 Err(message) => {
300 if let Some(cx) = cx {
301 self.senders_waker.register(cx.waker());
302 }
303 Err(TrySendError::Full(message))
304 }
305 }
306 }
307
308 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
309 self.senders_waker.register(cx.waker());
310
311 if !self.queue.len() == self.queue.capacity() {
312 Poll::Ready(())
313 } else {
314 Poll::Pending
315 }
316 }
317}
318
319/// A bounded channel for communicating between asynchronous tasks
320/// with backpressure.
321///
322/// The channel will buffer up to the provided number of messages. Once the
323/// buffer is full, attempts to `send` new messages will wait until a message is
324/// received from the channel.
325///
326/// Sent data may be reordered based on their priorty within the channel.
327/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
328/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be recieved as `[3, 2, 1]`.
329pub struct PriorityChannel<M, T, K, const N: usize>
330where
331 T: Ord,
332 K: Kind,
333 M: RawMutex,
334{
335 inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
336}
337
338impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
339where
340 T: Ord,
341 K: Kind,
342 M: RawMutex,
343{
344 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
345 ///
346 /// ```
347 /// use embassy_sync::priority_channel::{PriorityChannel, Max};
348 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
349 ///
350 /// // Declare a bounded channel of 3 u32s.
351 /// let mut channel = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
352 /// ```
353 pub const fn new() -> Self {
354 Self {
355 inner: Mutex::new(RefCell::new(ChannelState::new())),
356 }
357 }
358
359 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, K, N>) -> R) -> R {
360 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
361 }
362
363 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
364 self.lock(|c| c.try_receive_with_context(cx))
365 }
366
367 /// Poll the channel for the next message
368 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
369 self.lock(|c| c.poll_receive(cx))
370 }
371
372 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
373 self.lock(|c| c.try_send_with_context(m, cx))
374 }
375
376 /// Allows a poll_fn to poll until the channel is ready to receive
377 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
378 self.lock(|c| c.poll_ready_to_receive(cx))
379 }
380
381 /// Allows a poll_fn to poll until the channel is ready to send
382 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
383 self.lock(|c| c.poll_ready_to_send(cx))
384 }
385
386 /// Get a sender for this channel.
387 pub fn sender(&self) -> Sender<'_, M, T, K, N> {
388 Sender { channel: self }
389 }
390
391 /// Get a receiver for this channel.
392 pub fn receiver(&self) -> Receiver<'_, M, T, K, N> {
393 Receiver { channel: self }
394 }
395
396 /// Send a value, waiting until there is capacity.
397 ///
398 /// Sending completes when the value has been pushed to the channel's queue.
399 /// This doesn't mean the value has been received yet.
400 pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> {
401 SendFuture {
402 channel: self,
403 message: Some(message),
404 }
405 }
406
407 /// Attempt to immediately send a message.
408 ///
409 /// This method differs from [`send`](PriorityChannel::send) by returning immediately if the channel's
410 /// buffer is full, instead of waiting.
411 ///
412 /// # Errors
413 ///
414 /// If the channel capacity has been reached, i.e., the channel has `n`
415 /// buffered values where `n` is the argument passed to [`PriorityChannel`], then an
416 /// error is returned.
417 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
418 self.lock(|c| c.try_send(message))
419 }
420
421 /// Receive the next value.
422 ///
423 /// If there are no messages in the channel's buffer, this method will
424 /// wait until a message is sent.
425 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
426 ReceiveFuture { channel: self }
427 }
428
429 /// Attempt to immediately receive a message.
430 ///
431 /// This method will either receive a message from the channel immediately or return an error
432 /// if the channel is empty.
433 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
434 self.lock(|c| c.try_receive())
435 }
436}
437
438/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
439/// tradeoff cost of dynamic dispatch.
440impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
441where
442 T: Ord,
443 K: Kind,
444 M: RawMutex,
445{
446 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
447 PriorityChannel::try_send_with_context(self, m, cx)
448 }
449
450 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
451 PriorityChannel::try_receive_with_context(self, cx)
452 }
453
454 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
455 PriorityChannel::poll_ready_to_send(self, cx)
456 }
457
458 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
459 PriorityChannel::poll_ready_to_receive(self, cx)
460 }
461
462 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
463 PriorityChannel::poll_receive(self, cx)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use core::time::Duration;
470
471 use futures_executor::ThreadPool;
472 use futures_timer::Delay;
473 use futures_util::task::SpawnExt;
474 use heapless::binary_heap::{Kind, Max};
475 use static_cell::StaticCell;
476
477 use super::*;
478 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
479
480 fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
481 where
482 T: Ord,
483 K: Kind,
484 {
485 c.queue.capacity() - c.queue.len()
486 }
487
488 #[test]
489 fn sending_once() {
490 let mut c = ChannelState::<u32, Max, 3>::new();
491 assert!(c.try_send(1).is_ok());
492 assert_eq!(capacity(&c), 2);
493 }
494
495 #[test]
496 fn sending_when_full() {
497 let mut c = ChannelState::<u32, Max, 3>::new();
498 let _ = c.try_send(1);
499 let _ = c.try_send(1);
500 let _ = c.try_send(1);
501 match c.try_send(2) {
502 Err(TrySendError::Full(2)) => assert!(true),
503 _ => assert!(false),
504 }
505 assert_eq!(capacity(&c), 0);
506 }
507
508 #[test]
509 fn send_priority() {
510 // Prio channel with kind `Max` sifts larger numbers to the front of the queue
511 let mut c = ChannelState::<u32, Max, 3>::new();
512 assert!(c.try_send(1).is_ok());
513 assert!(c.try_send(2).is_ok());
514 assert!(c.try_send(3).is_ok());
515 assert_eq!(c.try_receive().unwrap(), 3);
516 assert_eq!(c.try_receive().unwrap(), 2);
517 assert_eq!(c.try_receive().unwrap(), 1);
518 }
519
520 #[test]
521 fn receiving_once_with_one_send() {
522 let mut c = ChannelState::<u32, Max, 3>::new();
523 assert!(c.try_send(1).is_ok());
524 assert_eq!(c.try_receive().unwrap(), 1);
525 assert_eq!(capacity(&c), 3);
526 }
527
528 #[test]
529 fn receiving_when_empty() {
530 let mut c = ChannelState::<u32, Max, 3>::new();
531 match c.try_receive() {
532 Err(TryReceiveError::Empty) => assert!(true),
533 _ => assert!(false),
534 }
535 assert_eq!(capacity(&c), 3);
536 }
537
538 #[test]
539 fn simple_send_and_receive() {
540 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
541 assert!(c.try_send(1).is_ok());
542 assert_eq!(c.try_receive().unwrap(), 1);
543 }
544
545 #[test]
546 fn cloning() {
547 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
548 let r1 = c.receiver();
549 let s1 = c.sender();
550
551 let _ = r1.clone();
552 let _ = s1.clone();
553 }
554
555 #[test]
556 fn dynamic_dispatch() {
557 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
558 let s: DynamicSender<'_, u32> = c.sender().into();
559 let r: DynamicReceiver<'_, u32> = c.receiver().into();
560
561 assert!(s.try_send(1).is_ok());
562 assert_eq!(r.try_receive().unwrap(), 1);
563 }
564
565 #[futures_test::test]
566 async fn receiver_receives_given_try_send_async() {
567 let executor = ThreadPool::new().unwrap();
568
569 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
570 let c = &*CHANNEL.init(PriorityChannel::new());
571 let c2 = c;
572 assert!(executor
573 .spawn(async move {
574 assert!(c2.try_send(1).is_ok());
575 })
576 .is_ok());
577 assert_eq!(c.receive().await, 1);
578 }
579
580 #[futures_test::test]
581 async fn sender_send_completes_if_capacity() {
582 let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
583 c.send(1).await;
584 assert_eq!(c.receive().await, 1);
585 }
586
587 #[futures_test::test]
588 async fn senders_sends_wait_until_capacity() {
589 let executor = ThreadPool::new().unwrap();
590
591 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
592 let c = &*CHANNEL.init(PriorityChannel::new());
593 assert!(c.try_send(1).is_ok());
594
595 let c2 = c;
596 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
597 let c2 = c;
598 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
599 // Wish I could think of a means of determining that the async send is waiting instead.
600 // However, I've used the debugger to observe that the send does indeed wait.
601 Delay::new(Duration::from_millis(500)).await;
602 assert_eq!(c.receive().await, 1);
603 assert!(executor
604 .spawn(async move {
605 loop {
606 c.receive().await;
607 }
608 })
609 .is_ok());
610 send_task_1.unwrap().await;
611 send_task_2.unwrap().await;
612 }
613}