aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/channel/channel.rs184
1 files changed, 178 insertions, 6 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs
index d749e5971..a1d805c53 100644
--- a/embassy/src/channel/channel.rs
+++ b/embassy/src/channel/channel.rs
@@ -66,6 +66,48 @@ where
66 } 66 }
67} 67}
68 68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, T> {
72 channel: &'ch dyn DynamicChannel<T>,
73}
74
75impl<'ch, T> Clone for DynamicSender<'ch, T> {
76 fn clone(&self) -> Self {
77 DynamicSender {
78 channel: self.channel,
79 }
80 }
81}
82
83impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
84where
85 M: RawMutex,
86{
87 fn from(s: Sender<'ch, M, T, N>) -> Self {
88 Self { channel: s.channel }
89 }
90}
91
92impl<'ch, T> DynamicSender<'ch, T> {
93 /// Sends a value.
94 ///
95 /// See [`Channel::send()`]
96 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
97 DynamicSendFuture {
98 channel: self.channel,
99 message: Some(message),
100 }
101 }
102
103 /// Attempt to immediately send a message.
104 ///
105 /// See [`Channel::send()`]
106 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
107 self.channel.try_send_with_context(message, None)
108 }
109}
110
69/// Receive-only access to a [`Channel`]. 111/// Receive-only access to a [`Channel`].
70#[derive(Copy)] 112#[derive(Copy)]
71pub struct Receiver<'ch, M, T, const N: usize> 113pub struct Receiver<'ch, M, T, const N: usize>
@@ -105,6 +147,47 @@ where
105 } 147 }
106} 148}
107 149
150/// Receive-only access to a [`Channel`] without knowing channel size.
151#[derive(Copy)]
152pub struct DynamicReceiver<'ch, T> {
153 channel: &'ch dyn DynamicChannel<T>,
154}
155
156impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
157 fn clone(&self) -> Self {
158 DynamicReceiver {
159 channel: self.channel,
160 }
161 }
162}
163
164impl<'ch, T> DynamicReceiver<'ch, T> {
165 /// Receive the next value.
166 ///
167 /// See [`Channel::recv()`].
168 pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
169 DynamicRecvFuture {
170 channel: self.channel,
171 }
172 }
173
174 /// Attempt to immediately receive the next value.
175 ///
176 /// See [`Channel::try_recv()`]
177 pub fn try_recv(&self) -> Result<T, TryRecvError> {
178 self.channel.try_recv_with_context(None)
179 }
180}
181
182impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
183where
184 M: RawMutex,
185{
186 fn from(s: Receiver<'ch, M, T, N>) -> Self {
187 Self { channel: s.channel }
188 }
189}
190
108pub struct RecvFuture<'ch, M, T, const N: usize> 191pub struct RecvFuture<'ch, M, T, const N: usize>
109where 192where
110 M: RawMutex, 193 M: RawMutex,
@@ -119,11 +202,25 @@ where
119 type Output = T; 202 type Output = T;
120 203
121 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 204 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
122 self.channel 205 match self.channel.try_recv_with_context(Some(cx)) {
123 .lock(|c| match c.try_recv_with_context(Some(cx)) { 206 Ok(v) => Poll::Ready(v),
124 Ok(v) => Poll::Ready(v), 207 Err(TryRecvError::Empty) => Poll::Pending,
125 Err(TryRecvError::Empty) => Poll::Pending, 208 }
126 }) 209 }
210}
211
212pub struct DynamicRecvFuture<'ch, T> {
213 channel: &'ch dyn DynamicChannel<T>,
214}
215
216impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
217 type Output = T;
218
219 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
220 match self.channel.try_recv_with_context(Some(cx)) {
221 Ok(v) => Poll::Ready(v),
222 Err(TryRecvError::Empty) => Poll::Pending,
223 }
127 } 224 }
128} 225}
129 226
@@ -143,7 +240,7 @@ where
143 240
144 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145 match self.message.take() { 242 match self.message.take() {
146 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { 243 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
147 Ok(..) => Poll::Ready(()), 244 Ok(..) => Poll::Ready(()),
148 Err(TrySendError::Full(m)) => { 245 Err(TrySendError::Full(m)) => {
149 self.message = Some(m); 246 self.message = Some(m);
@@ -157,6 +254,40 @@ where
157 254
158impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} 255impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
159 256
257pub struct DynamicSendFuture<'ch, T> {
258 channel: &'ch dyn DynamicChannel<T>,
259 message: Option<T>,
260}
261
262impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
263 type Output = ();
264
265 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266 match self.message.take() {
267 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
268 Ok(..) => Poll::Ready(()),
269 Err(TrySendError::Full(m)) => {
270 self.message = Some(m);
271 Poll::Pending
272 }
273 },
274 None => panic!("Message cannot be None"),
275 }
276 }
277}
278
279impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
280
281trait DynamicChannel<T> {
282 fn try_send_with_context(
283 &self,
284 message: T,
285 cx: Option<&mut Context<'_>>,
286 ) -> Result<(), TrySendError<T>>;
287
288 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
289}
290
160/// Error returned by [`try_recv`](Channel::try_recv). 291/// Error returned by [`try_recv`](Channel::try_recv).
161#[derive(PartialEq, Eq, Clone, Copy, Debug)] 292#[derive(PartialEq, Eq, Clone, Copy, Debug)]
162#[cfg_attr(feature = "defmt", derive(defmt::Format))] 293#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -287,6 +418,18 @@ where
287 self.inner.lock(|rc| f(&mut *rc.borrow_mut())) 418 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
288 } 419 }
289 420
421 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
422 self.lock(|c| c.try_recv_with_context(cx))
423 }
424
425 fn try_send_with_context(
426 &self,
427 m: T,
428 cx: Option<&mut Context<'_>>,
429 ) -> Result<(), TrySendError<T>> {
430 self.lock(|c| c.try_send_with_context(m, cx))
431 }
432
290 /// Get a sender for this channel. 433 /// Get a sender for this channel.
291 pub fn sender(&self) -> Sender<'_, M, T, N> { 434 pub fn sender(&self) -> Sender<'_, M, T, N> {
292 Sender { channel: self } 435 Sender { channel: self }
@@ -339,6 +482,25 @@ where
339 } 482 }
340} 483}
341 484
485/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
486/// tradeoff cost of dynamic dispatch.
487impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
488where
489 M: RawMutex,
490{
491 fn try_send_with_context(
492 &self,
493 m: T,
494 cx: Option<&mut Context<'_>>,
495 ) -> Result<(), TrySendError<T>> {
496 Channel::try_send_with_context(self, m, cx)
497 }
498
499 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
500 Channel::try_recv_with_context(self, cx)
501 }
502}
503
342#[cfg(test)] 504#[cfg(test)]
343mod tests { 505mod tests {
344 use core::time::Duration; 506 use core::time::Duration;
@@ -411,6 +573,16 @@ mod tests {
411 let _ = s1.clone(); 573 let _ = s1.clone();
412 } 574 }
413 575
576 #[test]
577 fn dynamic_dispatch() {
578 let c = Channel::<NoopRawMutex, u32, 3>::new();
579 let s: DynamicSender<'_, u32> = c.sender().into();
580 let r: DynamicReceiver<'_, u32> = c.receiver().into();
581
582 assert!(s.try_send(1).is_ok());
583 assert_eq!(r.try_recv().unwrap(), 1);
584 }
585
414 #[futures_test::test] 586 #[futures_test::test]
415 async fn receiver_receives_given_try_send_async() { 587 async fn receiver_receives_given_try_send_async() {
416 let executor = ThreadPool::new().unwrap(); 588 let executor = ThreadPool::new().unwrap();