aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-04-12 09:55:33 +0000
committerGitHub <[email protected]>2022-04-12 09:55:33 +0000
commitac3986e40ef297b90de19812aebccfe2e7f9ceec (patch)
treef7ffc0c8a2541edf8eb42d935a13a73097621d2f
parente844893095ac186058495df5b049e6846781497a (diff)
parentcdf30e68eb8e91ef13ad6195bea42bf75c9d3018 (diff)
Merge #712
712: Add types for channel dynamic dispatch r=lulf a=lulf * Add internal DynamicChannel trait implemented by Channel that allows polling for internal state in a lock safe manner and does not require knowing the channel size. * Existing usage of Sender and Receiver is preserved and does not use dynamic dispatch. * Add DynamicSender and DynamicReceiver types that references the channel using the DynamicChannel trait and does not require the const generic channel size parameter. Having the ability not know the channel size is very convenient when you don't want to change all of your channel using code when tuning the size. With this change, existing usage can be kept, and those willing to pay the price for dynamic dispatch may do so. Co-authored-by: Ulf Lilleengen <[email protected]>
-rw-r--r--embassy/src/channel/channel.rs184
1 files changed, 178 insertions, 6 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs
index d749e5971..a1d805c53 100644
--- a/embassy/src/channel/channel.rs
+++ b/embassy/src/channel/channel.rs
@@ -66,6 +66,48 @@ where
66 } 66 }
67} 67}
68 68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, T> {
72 channel: &'ch dyn DynamicChannel<T>,
73}
74
75impl<'ch, T> Clone for DynamicSender<'ch, T> {
76 fn clone(&self) -> Self {
77 DynamicSender {
78 channel: self.channel,
79 }
80 }
81}
82
83impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
84where
85 M: RawMutex,
86{
87 fn from(s: Sender<'ch, M, T, N>) -> Self {
88 Self { channel: s.channel }
89 }
90}
91
92impl<'ch, T> DynamicSender<'ch, T> {
93 /// Sends a value.
94 ///
95 /// See [`Channel::send()`]
96 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
97 DynamicSendFuture {
98 channel: self.channel,
99 message: Some(message),
100 }
101 }
102
103 /// Attempt to immediately send a message.
104 ///
105 /// See [`Channel::send()`]
106 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
107 self.channel.try_send_with_context(message, None)
108 }
109}
110
69/// Receive-only access to a [`Channel`]. 111/// Receive-only access to a [`Channel`].
70#[derive(Copy)] 112#[derive(Copy)]
71pub struct Receiver<'ch, M, T, const N: usize> 113pub struct Receiver<'ch, M, T, const N: usize>
@@ -105,6 +147,47 @@ where
105 } 147 }
106} 148}
107 149
150/// Receive-only access to a [`Channel`] without knowing channel size.
151#[derive(Copy)]
152pub struct DynamicReceiver<'ch, T> {
153 channel: &'ch dyn DynamicChannel<T>,
154}
155
156impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
157 fn clone(&self) -> Self {
158 DynamicReceiver {
159 channel: self.channel,
160 }
161 }
162}
163
164impl<'ch, T> DynamicReceiver<'ch, T> {
165 /// Receive the next value.
166 ///
167 /// See [`Channel::recv()`].
168 pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
169 DynamicRecvFuture {
170 channel: self.channel,
171 }
172 }
173
174 /// Attempt to immediately receive the next value.
175 ///
176 /// See [`Channel::try_recv()`]
177 pub fn try_recv(&self) -> Result<T, TryRecvError> {
178 self.channel.try_recv_with_context(None)
179 }
180}
181
182impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
183where
184 M: RawMutex,
185{
186 fn from(s: Receiver<'ch, M, T, N>) -> Self {
187 Self { channel: s.channel }
188 }
189}
190
108pub struct RecvFuture<'ch, M, T, const N: usize> 191pub struct RecvFuture<'ch, M, T, const N: usize>
109where 192where
110 M: RawMutex, 193 M: RawMutex,
@@ -119,11 +202,25 @@ where
119 type Output = T; 202 type Output = T;
120 203
121 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 204 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
122 self.channel 205 match self.channel.try_recv_with_context(Some(cx)) {
123 .lock(|c| match c.try_recv_with_context(Some(cx)) { 206 Ok(v) => Poll::Ready(v),
124 Ok(v) => Poll::Ready(v), 207 Err(TryRecvError::Empty) => Poll::Pending,
125 Err(TryRecvError::Empty) => Poll::Pending, 208 }
126 }) 209 }
210}
211
212pub struct DynamicRecvFuture<'ch, T> {
213 channel: &'ch dyn DynamicChannel<T>,
214}
215
216impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
217 type Output = T;
218
219 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
220 match self.channel.try_recv_with_context(Some(cx)) {
221 Ok(v) => Poll::Ready(v),
222 Err(TryRecvError::Empty) => Poll::Pending,
223 }
127 } 224 }
128} 225}
129 226
@@ -143,7 +240,7 @@ where
143 240
144 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145 match self.message.take() { 242 match self.message.take() {
146 Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { 243 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
147 Ok(..) => Poll::Ready(()), 244 Ok(..) => Poll::Ready(()),
148 Err(TrySendError::Full(m)) => { 245 Err(TrySendError::Full(m)) => {
149 self.message = Some(m); 246 self.message = Some(m);
@@ -157,6 +254,40 @@ where
157 254
158impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} 255impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
159 256
257pub struct DynamicSendFuture<'ch, T> {
258 channel: &'ch dyn DynamicChannel<T>,
259 message: Option<T>,
260}
261
262impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
263 type Output = ();
264
265 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266 match self.message.take() {
267 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
268 Ok(..) => Poll::Ready(()),
269 Err(TrySendError::Full(m)) => {
270 self.message = Some(m);
271 Poll::Pending
272 }
273 },
274 None => panic!("Message cannot be None"),
275 }
276 }
277}
278
279impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
280
281trait DynamicChannel<T> {
282 fn try_send_with_context(
283 &self,
284 message: T,
285 cx: Option<&mut Context<'_>>,
286 ) -> Result<(), TrySendError<T>>;
287
288 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
289}
290
160/// Error returned by [`try_recv`](Channel::try_recv). 291/// Error returned by [`try_recv`](Channel::try_recv).
161#[derive(PartialEq, Eq, Clone, Copy, Debug)] 292#[derive(PartialEq, Eq, Clone, Copy, Debug)]
162#[cfg_attr(feature = "defmt", derive(defmt::Format))] 293#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -287,6 +418,18 @@ where
287 self.inner.lock(|rc| f(&mut *rc.borrow_mut())) 418 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
288 } 419 }
289 420
421 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
422 self.lock(|c| c.try_recv_with_context(cx))
423 }
424
425 fn try_send_with_context(
426 &self,
427 m: T,
428 cx: Option<&mut Context<'_>>,
429 ) -> Result<(), TrySendError<T>> {
430 self.lock(|c| c.try_send_with_context(m, cx))
431 }
432
290 /// Get a sender for this channel. 433 /// Get a sender for this channel.
291 pub fn sender(&self) -> Sender<'_, M, T, N> { 434 pub fn sender(&self) -> Sender<'_, M, T, N> {
292 Sender { channel: self } 435 Sender { channel: self }
@@ -339,6 +482,25 @@ where
339 } 482 }
340} 483}
341 484
485/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
486/// tradeoff cost of dynamic dispatch.
487impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
488where
489 M: RawMutex,
490{
491 fn try_send_with_context(
492 &self,
493 m: T,
494 cx: Option<&mut Context<'_>>,
495 ) -> Result<(), TrySendError<T>> {
496 Channel::try_send_with_context(self, m, cx)
497 }
498
499 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
500 Channel::try_recv_with_context(self, cx)
501 }
502}
503
342#[cfg(test)] 504#[cfg(test)]
343mod tests { 505mod tests {
344 use core::time::Duration; 506 use core::time::Duration;
@@ -411,6 +573,16 @@ mod tests {
411 let _ = s1.clone(); 573 let _ = s1.clone();
412 } 574 }
413 575
576 #[test]
577 fn dynamic_dispatch() {
578 let c = Channel::<NoopRawMutex, u32, 3>::new();
579 let s: DynamicSender<'_, u32> = c.sender().into();
580 let r: DynamicReceiver<'_, u32> = c.receiver().into();
581
582 assert!(s.try_send(1).is_ok());
583 assert_eq!(r.try_recv().unwrap(), 1);
584 }
585
414 #[futures_test::test] 586 #[futures_test::test]
415 async fn receiver_receives_given_try_send_async() { 587 async fn receiver_receives_given_try_send_async() {
416 let executor = ThreadPool::new().unwrap(); 588 let executor = ThreadPool::new().unwrap();