aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlf Lilleengen <[email protected]>2022-04-11 08:57:15 +0200
committerUlf Lilleengen <[email protected]>2022-04-11 08:57:15 +0200
commitbc1dff34c0e5d44706c6387892d8f2c2f3539bd9 (patch)
tree633c85ba7aee24a269b879fee935569f321b3bcb
parente844893095ac186058495df5b049e6846781497a (diff)
Add types for channel dynamic dispatch
* Add internal DynamicChannel trait implemented by Channel that allows polling for internal state in a lock safe manner and does not require knowing the channel size. * Existing usage of Sender and Receiver is preserved and does not use dynamic dispatch. * Add DynamicSender and DynamicReceiver types that references the channel using the DynamicChannel trait and does not require the const generic channel size parameter.
-rw-r--r--embassy/src/channel/channel.rs217
1 files changed, 211 insertions, 6 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs
index d749e5971..c7a89793a 100644
--- a/embassy/src/channel/channel.rs
+++ b/embassy/src/channel/channel.rs
@@ -66,6 +66,57 @@ where
66 } 66 }
67} 67}
68 68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, M, T>
72where
73 M: RawMutex,
74{
75 channel: &'ch dyn DynamicChannel<M, T>,
76}
77
78impl<'ch, M, T> Clone for DynamicSender<'ch, M, T>
79where
80 M: RawMutex,
81{
82 fn clone(&self) -> Self {
83 DynamicSender {
84 channel: self.channel,
85 }
86 }
87}
88
89impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, M, T>
90where
91 M: RawMutex,
92{
93 fn from(s: Sender<'ch, M, T, N>) -> Self {
94 Self { channel: s.channel }
95 }
96}
97
98impl<'ch, M, T> DynamicSender<'ch, M, T>
99where
100 M: RawMutex,
101{
102 /// Sends a value.
103 ///
104 /// See [`Channel::send()`]
105 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, M, T> {
106 DynamicSendFuture {
107 channel: self.channel,
108 message: Some(message),
109 }
110 }
111
112 /// Attempt to immediately send a message.
113 ///
114 /// See [`Channel::send()`]
115 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
116 self.channel.try_send_with_context(message, None)
117 }
118}
119
69/// Receive-only access to a [`Channel`]. 120/// Receive-only access to a [`Channel`].
70#[derive(Copy)] 121#[derive(Copy)]
71pub struct Receiver<'ch, M, T, const N: usize> 122pub struct Receiver<'ch, M, T, const N: usize>
@@ -105,6 +156,56 @@ where
105 } 156 }
106} 157}
107 158
159/// Receive-only access to a [`Channel`] without knowing channel size.
160#[derive(Copy)]
161pub struct DynamicReceiver<'ch, M, T>
162where
163 M: RawMutex,
164{
165 channel: &'ch dyn DynamicChannel<M, T>,
166}
167
168impl<'ch, M, T> Clone for DynamicReceiver<'ch, M, T>
169where
170 M: RawMutex,
171{
172 fn clone(&self) -> Self {
173 DynamicReceiver {
174 channel: self.channel,
175 }
176 }
177}
178
179impl<'ch, M, T> DynamicReceiver<'ch, M, T>
180where
181 M: RawMutex,
182{
183 /// Receive the next value.
184 ///
185 /// See [`Channel::recv()`].
186 pub fn recv(&self) -> DynamicRecvFuture<'_, M, T> {
187 DynamicRecvFuture {
188 channel: self.channel,
189 }
190 }
191
192 /// Attempt to immediately receive the next value.
193 ///
194 /// See [`Channel::try_recv()`]
195 pub fn try_recv(&self) -> Result<T, TryRecvError> {
196 self.channel.try_recv_with_context(None)
197 }
198}
199
200impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, M, T>
201where
202 M: RawMutex,
203{
204 fn from(s: Receiver<'ch, M, T, N>) -> Self {
205 Self { channel: s.channel }
206 }
207}
208
108pub struct RecvFuture<'ch, M, T, const N: usize> 209pub struct RecvFuture<'ch, M, T, const N: usize>
109where 210where
110 M: RawMutex, 211 M: RawMutex,
@@ -119,11 +220,31 @@ where
119 type Output = T; 220 type Output = T;
120 221
121 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 222 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
122 self.channel 223 match self.channel.try_recv_with_context(Some(cx)) {
123 .lock(|c| match c.try_recv_with_context(Some(cx)) { 224 Ok(v) => Poll::Ready(v),
124 Ok(v) => Poll::Ready(v), 225 Err(TryRecvError::Empty) => Poll::Pending,
125 Err(TryRecvError::Empty) => Poll::Pending, 226 }
126 }) 227 }
228}
229
230pub struct DynamicRecvFuture<'ch, M, T>
231where
232 M: RawMutex,
233{
234 channel: &'ch dyn DynamicChannel<M, T>,
235}
236
237impl<'ch, M, T> Future for DynamicRecvFuture<'ch, M, T>
238where
239 M: RawMutex,
240{
241 type Output = T;
242
243 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
244 match self.channel.try_recv_with_context(Some(cx)) {
245 Ok(v) => Poll::Ready(v),
246 Err(TryRecvError::Empty) => Poll::Pending,
247 }
127 } 248 }
128} 249}
129 250
@@ -143,7 +264,7 @@ where
143 264
144 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 265 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145 match self.message.take() { 266 match self.message.take() {
146 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { 267 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
147 Ok(..) => Poll::Ready(()), 268 Ok(..) => Poll::Ready(()),
148 Err(TrySendError::Full(m)) => { 269 Err(TrySendError::Full(m)) => {
149 self.message = Some(m); 270 self.message = Some(m);
@@ -157,6 +278,49 @@ where
157 278
158impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} 279impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
159 280
281pub struct DynamicSendFuture<'ch, M, T>
282where
283 M: RawMutex,
284{
285 channel: &'ch dyn DynamicChannel<M, T>,
286 message: Option<T>,
287}
288
289impl<'ch, M, T> Future for DynamicSendFuture<'ch, M, T>
290where
291 M: RawMutex,
292{
293 type Output = ();
294
295 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296 match self.message.take() {
297 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
298 Ok(..) => Poll::Ready(()),
299 Err(TrySendError::Full(m)) => {
300 self.message = Some(m);
301 Poll::Pending
302 }
303 },
304 None => panic!("Message cannot be None"),
305 }
306 }
307}
308
309impl<'ch, M, T> Unpin for DynamicSendFuture<'ch, M, T> where M: RawMutex {}
310
311trait DynamicChannel<M, T>
312where
313 M: RawMutex,
314{
315 fn try_send_with_context(
316 &self,
317 message: T,
318 cx: Option<&mut Context<'_>>,
319 ) -> Result<(), TrySendError<T>>;
320
321 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
322}
323
160/// Error returned by [`try_recv`](Channel::try_recv). 324/// Error returned by [`try_recv`](Channel::try_recv).
161#[derive(PartialEq, Eq, Clone, Copy, Debug)] 325#[derive(PartialEq, Eq, Clone, Copy, Debug)]
162#[cfg_attr(feature = "defmt", derive(defmt::Format))] 326#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -287,6 +451,18 @@ where
287 self.inner.lock(|rc| f(&mut *rc.borrow_mut())) 451 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
288 } 452 }
289 453
454 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
455 self.lock(|c| c.try_recv_with_context(cx))
456 }
457
458 fn try_send_with_context(
459 &self,
460 m: T,
461 cx: Option<&mut Context<'_>>,
462 ) -> Result<(), TrySendError<T>> {
463 self.lock(|c| c.try_send_with_context(m, cx))
464 }
465
290 /// Get a sender for this channel. 466 /// Get a sender for this channel.
291 pub fn sender(&self) -> Sender<'_, M, T, N> { 467 pub fn sender(&self) -> Sender<'_, M, T, N> {
292 Sender { channel: self } 468 Sender { channel: self }
@@ -339,6 +515,25 @@ where
339 } 515 }
340} 516}
341 517
518/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
519/// tradeoff cost of dynamic dispatch.
520impl<M, T, const N: usize> DynamicChannel<M, T> for Channel<M, T, N>
521where
522 M: RawMutex,
523{
524 fn try_send_with_context(
525 &self,
526 m: T,
527 cx: Option<&mut Context<'_>>,
528 ) -> Result<(), TrySendError<T>> {
529 Channel::try_send_with_context(self, m, cx)
530 }
531
532 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
533 Channel::try_recv_with_context(self, cx)
534 }
535}
536
342#[cfg(test)] 537#[cfg(test)]
343mod tests { 538mod tests {
344 use core::time::Duration; 539 use core::time::Duration;
@@ -411,6 +606,16 @@ mod tests {
411 let _ = s1.clone(); 606 let _ = s1.clone();
412 } 607 }
413 608
609 #[test]
610 fn dynamic_dispatch() {
611 let c = Channel::<NoopRawMutex, u32, 3>::new();
612 let s: DynamicSender<'_, NoopRawMutex, u32> = c.sender().into();
613 let r: DynamicReceiver<'_, NoopRawMutex, u32> = c.receiver().into();
614
615 assert!(s.try_send(1).is_ok());
616 assert_eq!(r.try_recv().unwrap(), 1);
617 }
618
414 #[futures_test::test] 619 #[futures_test::test]
415 async fn receiver_receives_given_try_send_async() { 620 async fn receiver_receives_given_try_send_async() {
416 let executor = ThreadPool::new().unwrap(); 621 let executor = ThreadPool::new().unwrap();