diff options
| author | Ruben De Smet <[email protected]> | 2023-08-11 13:50:12 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2023-09-04 22:13:27 +0200 |
| commit | 6e38b0764253ba07d3106ce3d57c2fd3509d7beb (patch) | |
| tree | 47a78362b1050c2b713e884a82c4795448e794aa /embassy-sync | |
| parent | 1eb03dc41a4a5fa8435f9a49d26e29ceea6d498e (diff) | |
Add docs to zero-copy-channel
Diffstat (limited to 'embassy-sync')
| -rw-r--r-- | embassy-sync/src/zero_copy_channel.rs | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs index cbb8cb526..f704cbd5d 100644 --- a/embassy-sync/src/zero_copy_channel.rs +++ b/embassy-sync/src/zero_copy_channel.rs | |||
| @@ -1,3 +1,22 @@ | |||
| 1 | //! A zero-copy queue for sending values between asynchronous tasks. | ||
| 2 | //! | ||
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | ||
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | ||
| 5 | //! | ||
| 6 | //! Receivers are competing for messages. So a message that is received by | ||
| 7 | //! one receiver is not received by any other. | ||
| 8 | //! | ||
| 9 | //! This queue takes a Mutex type so that various | ||
| 10 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 11 | //! for single-core Cortex-M targets where messages are only passed | ||
| 12 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 13 | //! can also be used for single-core targets where messages are to be | ||
| 14 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 15 | //! | ||
| 16 | //! This module provides a bounded channel that has a limit on the number of | ||
| 17 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 18 | //! another message will result in an error being returned. | ||
| 19 | |||
| 1 | use core::cell::RefCell; | 20 | use core::cell::RefCell; |
| 2 | use core::future::poll_fn; | 21 | use core::future::poll_fn; |
| 3 | use core::marker::PhantomData; | 22 | use core::marker::PhantomData; |
| @@ -7,6 +26,17 @@ use crate::blocking_mutex::raw::RawMutex; | |||
| 7 | use crate::blocking_mutex::Mutex; | 26 | use crate::blocking_mutex::Mutex; |
| 8 | use crate::waitqueue::WakerRegistration; | 27 | use crate::waitqueue::WakerRegistration; |
| 9 | 28 | ||
| 29 | /// A bounded zero-copy channel for communicating between asynchronous tasks | ||
| 30 | /// with backpressure. | ||
| 31 | /// | ||
| 32 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 33 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 34 | /// received from the channel. | ||
| 35 | /// | ||
| 36 | /// All data sent will become available in the same order as it was sent. | ||
| 37 | /// | ||
| 38 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through | ||
| 39 | /// an `&mut T`. | ||
| 10 | pub struct Channel<'a, M: RawMutex, T> { | 40 | pub struct Channel<'a, M: RawMutex, T> { |
| 11 | buf: *mut T, | 41 | buf: *mut T, |
| 12 | phantom: PhantomData<&'a mut T>, | 42 | phantom: PhantomData<&'a mut T>, |
| @@ -14,6 +44,10 @@ pub struct Channel<'a, M: RawMutex, T> { | |||
| 14 | } | 44 | } |
| 15 | 45 | ||
| 16 | impl<'a, M: RawMutex, T> Channel<'a, M, T> { | 46 | impl<'a, M: RawMutex, T> Channel<'a, M, T> { |
| 47 | /// Initialize a new [`Channel`]. | ||
| 48 | /// | ||
| 49 | /// The provided buffer will be used and reused by the channel's logic, and thus dictates the | ||
| 50 | /// channel's capacity. | ||
| 17 | pub fn new(buf: &'a mut [T]) -> Self { | 51 | pub fn new(buf: &'a mut [T]) -> Self { |
| 18 | let len = buf.len(); | 52 | let len = buf.len(); |
| 19 | assert!(len != 0); | 53 | assert!(len != 0); |
| @@ -32,20 +66,27 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { | |||
| 32 | } | 66 | } |
| 33 | } | 67 | } |
| 34 | 68 | ||
| 69 | /// Creates a [`Sender`] and [`Receiver`] from an existing channel. | ||
| 70 | /// | ||
| 71 | /// Further Senders and Receivers can be created through [`Sender::borrow`] and | ||
| 72 | /// [`Receiver::borrow`] respectively. | ||
| 35 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { | 73 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { |
| 36 | (Sender { channel: self }, Receiver { channel: self }) | 74 | (Sender { channel: self }, Receiver { channel: self }) |
| 37 | } | 75 | } |
| 38 | } | 76 | } |
| 39 | 77 | ||
| 78 | /// Send-only access to a [`Channel`]. | ||
| 40 | pub struct Sender<'a, M: RawMutex, T> { | 79 | pub struct Sender<'a, M: RawMutex, T> { |
| 41 | channel: &'a Channel<'a, M, T>, | 80 | channel: &'a Channel<'a, M, T>, |
| 42 | } | 81 | } |
| 43 | 82 | ||
| 44 | impl<'a, M: RawMutex, T> Sender<'a, M, T> { | 83 | impl<'a, M: RawMutex, T> Sender<'a, M, T> { |
| 84 | /// Creates one further [`Sender`] over the same channel. | ||
| 45 | pub fn borrow(&mut self) -> Sender<'_, M, T> { | 85 | pub fn borrow(&mut self) -> Sender<'_, M, T> { |
| 46 | Sender { channel: self.channel } | 86 | Sender { channel: self.channel } |
| 47 | } | 87 | } |
| 48 | 88 | ||
| 89 | /// Attempts to send a value over the channel. | ||
| 49 | pub fn try_send(&mut self) -> Option<&mut T> { | 90 | pub fn try_send(&mut self) -> Option<&mut T> { |
| 50 | self.channel.state.lock(|s| { | 91 | self.channel.state.lock(|s| { |
| 51 | let s = &mut *s.borrow_mut(); | 92 | let s = &mut *s.borrow_mut(); |
| @@ -56,6 +97,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 56 | }) | 97 | }) |
| 57 | } | 98 | } |
| 58 | 99 | ||
| 100 | /// Attempts to send a value over the channel. | ||
| 59 | pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { | 101 | pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { |
| 60 | self.channel.state.lock(|s| { | 102 | self.channel.state.lock(|s| { |
| 61 | let s = &mut *s.borrow_mut(); | 103 | let s = &mut *s.borrow_mut(); |
| @@ -69,6 +111,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 69 | }) | 111 | }) |
| 70 | } | 112 | } |
| 71 | 113 | ||
| 114 | /// Asynchronously send a value over the channel. | ||
| 72 | pub async fn send(&mut self) -> &mut T { | 115 | pub async fn send(&mut self) -> &mut T { |
| 73 | let i = poll_fn(|cx| { | 116 | let i = poll_fn(|cx| { |
| 74 | self.channel.state.lock(|s| { | 117 | self.channel.state.lock(|s| { |
| @@ -86,19 +129,24 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 86 | unsafe { &mut *self.channel.buf.add(i) } | 129 | unsafe { &mut *self.channel.buf.add(i) } |
| 87 | } | 130 | } |
| 88 | 131 | ||
| 132 | /// Notify the channel that the sending of the value has been finalized. | ||
| 89 | pub fn send_done(&mut self) { | 133 | pub fn send_done(&mut self) { |
| 90 | self.channel.state.lock(|s| s.borrow_mut().push_done()) | 134 | self.channel.state.lock(|s| s.borrow_mut().push_done()) |
| 91 | } | 135 | } |
| 92 | } | 136 | } |
| 137 | |||
| 138 | /// Receive-only access to a [`Channel`]. | ||
| 93 | pub struct Receiver<'a, M: RawMutex, T> { | 139 | pub struct Receiver<'a, M: RawMutex, T> { |
| 94 | channel: &'a Channel<'a, M, T>, | 140 | channel: &'a Channel<'a, M, T>, |
| 95 | } | 141 | } |
| 96 | 142 | ||
| 97 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | 143 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { |
| 144 | /// Creates one further [`Sender`] over the same channel. | ||
| 98 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { | 145 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { |
| 99 | Receiver { channel: self.channel } | 146 | Receiver { channel: self.channel } |
| 100 | } | 147 | } |
| 101 | 148 | ||
| 149 | /// Attempts to receive a value over the channel. | ||
| 102 | pub fn try_receive(&mut self) -> Option<&mut T> { | 150 | pub fn try_receive(&mut self) -> Option<&mut T> { |
| 103 | self.channel.state.lock(|s| { | 151 | self.channel.state.lock(|s| { |
| 104 | let s = &mut *s.borrow_mut(); | 152 | let s = &mut *s.borrow_mut(); |
| @@ -109,6 +157,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 109 | }) | 157 | }) |
| 110 | } | 158 | } |
| 111 | 159 | ||
| 160 | /// Attempts to asynchronously receive a value over the channel. | ||
| 112 | pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { | 161 | pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { |
| 113 | self.channel.state.lock(|s| { | 162 | self.channel.state.lock(|s| { |
| 114 | let s = &mut *s.borrow_mut(); | 163 | let s = &mut *s.borrow_mut(); |
| @@ -122,6 +171,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 122 | }) | 171 | }) |
| 123 | } | 172 | } |
| 124 | 173 | ||
| 174 | /// Asynchronously receive a value over the channel. | ||
| 125 | pub async fn receive(&mut self) -> &mut T { | 175 | pub async fn receive(&mut self) -> &mut T { |
| 126 | let i = poll_fn(|cx| { | 176 | let i = poll_fn(|cx| { |
| 127 | self.channel.state.lock(|s| { | 177 | self.channel.state.lock(|s| { |
| @@ -139,6 +189,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 139 | unsafe { &mut *self.channel.buf.add(i) } | 189 | unsafe { &mut *self.channel.buf.add(i) } |
| 140 | } | 190 | } |
| 141 | 191 | ||
| 192 | /// Notify the channel that the receiving of the value has been finalized. | ||
| 142 | pub fn receive_done(&mut self) { | 193 | pub fn receive_done(&mut self) { |
| 143 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) | 194 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) |
| 144 | } | 195 | } |
