aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorRuben De Smet <[email protected]>2023-08-11 13:50:12 +0200
committerDario Nieuwenhuis <[email protected]>2023-09-04 22:13:27 +0200
commit6e38b0764253ba07d3106ce3d57c2fd3509d7beb (patch)
tree47a78362b1050c2b713e884a82c4795448e794aa /embassy-sync
parent1eb03dc41a4a5fa8435f9a49d26e29ceea6d498e (diff)
Add docs to zero-copy-channel
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/src/zero_copy_channel.rs51
1 files changed, 51 insertions, 0 deletions
diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs
index cbb8cb526..f704cbd5d 100644
--- a/embassy-sync/src/zero_copy_channel.rs
+++ b/embassy-sync/src/zero_copy_channel.rs
@@ -1,3 +1,22 @@
1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19
1use core::cell::RefCell; 20use core::cell::RefCell;
2use core::future::poll_fn; 21use core::future::poll_fn;
3use core::marker::PhantomData; 22use core::marker::PhantomData;
@@ -7,6 +26,17 @@ use crate::blocking_mutex::raw::RawMutex;
7use crate::blocking_mutex::Mutex; 26use crate::blocking_mutex::Mutex;
8use crate::waitqueue::WakerRegistration; 27use crate::waitqueue::WakerRegistration;
9 28
29/// A bounded zero-copy channel for communicating between asynchronous tasks
30/// with backpressure.
31///
32/// The channel will buffer up to the provided number of messages. Once the
33/// buffer is full, attempts to `send` new messages will wait until a message is
34/// received from the channel.
35///
36/// All data sent will become available in the same order as it was sent.
37///
38/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
39/// an `&mut T`.
10pub struct Channel<'a, M: RawMutex, T> { 40pub struct Channel<'a, M: RawMutex, T> {
11 buf: *mut T, 41 buf: *mut T,
12 phantom: PhantomData<&'a mut T>, 42 phantom: PhantomData<&'a mut T>,
@@ -14,6 +44,10 @@ pub struct Channel<'a, M: RawMutex, T> {
14} 44}
15 45
16impl<'a, M: RawMutex, T> Channel<'a, M, T> { 46impl<'a, M: RawMutex, T> Channel<'a, M, T> {
47 /// Initialize a new [`Channel`].
48 ///
49 /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
50 /// channel's capacity.
17 pub fn new(buf: &'a mut [T]) -> Self { 51 pub fn new(buf: &'a mut [T]) -> Self {
18 let len = buf.len(); 52 let len = buf.len();
19 assert!(len != 0); 53 assert!(len != 0);
@@ -32,20 +66,27 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
32 } 66 }
33 } 67 }
34 68
69 /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
70 ///
71 /// Further Senders and Receivers can be created through [`Sender::borrow`] and
72 /// [`Receiver::borrow`] respectively.
35 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { 73 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
36 (Sender { channel: self }, Receiver { channel: self }) 74 (Sender { channel: self }, Receiver { channel: self })
37 } 75 }
38} 76}
39 77
78/// Send-only access to a [`Channel`].
40pub struct Sender<'a, M: RawMutex, T> { 79pub struct Sender<'a, M: RawMutex, T> {
41 channel: &'a Channel<'a, M, T>, 80 channel: &'a Channel<'a, M, T>,
42} 81}
43 82
44impl<'a, M: RawMutex, T> Sender<'a, M, T> { 83impl<'a, M: RawMutex, T> Sender<'a, M, T> {
84 /// Creates one further [`Sender`] over the same channel.
45 pub fn borrow(&mut self) -> Sender<'_, M, T> { 85 pub fn borrow(&mut self) -> Sender<'_, M, T> {
46 Sender { channel: self.channel } 86 Sender { channel: self.channel }
47 } 87 }
48 88
89 /// Attempts to send a value over the channel.
49 pub fn try_send(&mut self) -> Option<&mut T> { 90 pub fn try_send(&mut self) -> Option<&mut T> {
50 self.channel.state.lock(|s| { 91 self.channel.state.lock(|s| {
51 let s = &mut *s.borrow_mut(); 92 let s = &mut *s.borrow_mut();
@@ -56,6 +97,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
56 }) 97 })
57 } 98 }
58 99
100 /// Attempts to send a value over the channel.
59 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { 101 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
60 self.channel.state.lock(|s| { 102 self.channel.state.lock(|s| {
61 let s = &mut *s.borrow_mut(); 103 let s = &mut *s.borrow_mut();
@@ -69,6 +111,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
69 }) 111 })
70 } 112 }
71 113
114 /// Asynchronously send a value over the channel.
72 pub async fn send(&mut self) -> &mut T { 115 pub async fn send(&mut self) -> &mut T {
73 let i = poll_fn(|cx| { 116 let i = poll_fn(|cx| {
74 self.channel.state.lock(|s| { 117 self.channel.state.lock(|s| {
@@ -86,19 +129,24 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
86 unsafe { &mut *self.channel.buf.add(i) } 129 unsafe { &mut *self.channel.buf.add(i) }
87 } 130 }
88 131
132 /// Notify the channel that the sending of the value has been finalized.
89 pub fn send_done(&mut self) { 133 pub fn send_done(&mut self) {
90 self.channel.state.lock(|s| s.borrow_mut().push_done()) 134 self.channel.state.lock(|s| s.borrow_mut().push_done())
91 } 135 }
92} 136}
137
138/// Receive-only access to a [`Channel`].
93pub struct Receiver<'a, M: RawMutex, T> { 139pub struct Receiver<'a, M: RawMutex, T> {
94 channel: &'a Channel<'a, M, T>, 140 channel: &'a Channel<'a, M, T>,
95} 141}
96 142
97impl<'a, M: RawMutex, T> Receiver<'a, M, T> { 143impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
144 /// Creates one further [`Sender`] over the same channel.
98 pub fn borrow(&mut self) -> Receiver<'_, M, T> { 145 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
99 Receiver { channel: self.channel } 146 Receiver { channel: self.channel }
100 } 147 }
101 148
149 /// Attempts to receive a value over the channel.
102 pub fn try_receive(&mut self) -> Option<&mut T> { 150 pub fn try_receive(&mut self) -> Option<&mut T> {
103 self.channel.state.lock(|s| { 151 self.channel.state.lock(|s| {
104 let s = &mut *s.borrow_mut(); 152 let s = &mut *s.borrow_mut();
@@ -109,6 +157,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
109 }) 157 })
110 } 158 }
111 159
160 /// Attempts to asynchronously receive a value over the channel.
112 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { 161 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
113 self.channel.state.lock(|s| { 162 self.channel.state.lock(|s| {
114 let s = &mut *s.borrow_mut(); 163 let s = &mut *s.borrow_mut();
@@ -122,6 +171,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
122 }) 171 })
123 } 172 }
124 173
174 /// Asynchronously receive a value over the channel.
125 pub async fn receive(&mut self) -> &mut T { 175 pub async fn receive(&mut self) -> &mut T {
126 let i = poll_fn(|cx| { 176 let i = poll_fn(|cx| {
127 self.channel.state.lock(|s| { 177 self.channel.state.lock(|s| {
@@ -139,6 +189,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
139 unsafe { &mut *self.channel.buf.add(i) } 189 unsafe { &mut *self.channel.buf.add(i) }
140 } 190 }
141 191
192 /// Notify the channel that the receiving of the value has been finalized.
142 pub fn receive_done(&mut self) { 193 pub fn receive_done(&mut self) {
143 self.channel.state.lock(|s| s.borrow_mut().pop_done()) 194 self.channel.state.lock(|s| s.borrow_mut().pop_done())
144 } 195 }