diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2022-04-12 09:55:33 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-04-12 09:55:33 +0000 |
| commit | ac3986e40ef297b90de19812aebccfe2e7f9ceec (patch) | |
| tree | f7ffc0c8a2541edf8eb42d935a13a73097621d2f | |
| parent | e844893095ac186058495df5b049e6846781497a (diff) | |
| parent | cdf30e68eb8e91ef13ad6195bea42bf75c9d3018 (diff) | |
Merge #712
712: Add types for channel dynamic dispatch r=lulf a=lulf
* Add internal DynamicChannel trait implemented by Channel that allows
polling for internal state in a lock safe manner and does not require
knowing the channel size.
* Existing usage of Sender and Receiver is preserved and does not use
dynamic dispatch.
* Add DynamicSender and DynamicReceiver types that references the
channel using the DynamicChannel trait and does not require the const
generic channel size parameter.
Having the ability not know the channel size is very convenient when you don't want to change all of your channel using code when tuning the size. With this change, existing usage can be kept, and those willing to pay the price for dynamic dispatch may do so.
Co-authored-by: Ulf Lilleengen <[email protected]>
| -rw-r--r-- | embassy/src/channel/channel.rs | 184 |
1 files changed, 178 insertions, 6 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs index d749e5971..a1d805c53 100644 --- a/embassy/src/channel/channel.rs +++ b/embassy/src/channel/channel.rs | |||
| @@ -66,6 +66,48 @@ where | |||
| 66 | } | 66 | } |
| 67 | } | 67 | } |
| 68 | 68 | ||
| 69 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 70 | #[derive(Copy)] | ||
| 71 | pub struct DynamicSender<'ch, T> { | ||
| 72 | channel: &'ch dyn DynamicChannel<T>, | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'ch, T> Clone for DynamicSender<'ch, T> { | ||
| 76 | fn clone(&self) -> Self { | ||
| 77 | DynamicSender { | ||
| 78 | channel: self.channel, | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 83 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T> | ||
| 84 | where | ||
| 85 | M: RawMutex, | ||
| 86 | { | ||
| 87 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 88 | Self { channel: s.channel } | ||
| 89 | } | ||
| 90 | } | ||
| 91 | |||
| 92 | impl<'ch, T> DynamicSender<'ch, T> { | ||
| 93 | /// Sends a value. | ||
| 94 | /// | ||
| 95 | /// See [`Channel::send()`] | ||
| 96 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 97 | DynamicSendFuture { | ||
| 98 | channel: self.channel, | ||
| 99 | message: Some(message), | ||
| 100 | } | ||
| 101 | } | ||
| 102 | |||
| 103 | /// Attempt to immediately send a message. | ||
| 104 | /// | ||
| 105 | /// See [`Channel::send()`] | ||
| 106 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 107 | self.channel.try_send_with_context(message, None) | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 69 | /// Receive-only access to a [`Channel`]. | 111 | /// Receive-only access to a [`Channel`]. |
| 70 | #[derive(Copy)] | 112 | #[derive(Copy)] |
| 71 | pub struct Receiver<'ch, M, T, const N: usize> | 113 | pub struct Receiver<'ch, M, T, const N: usize> |
| @@ -105,6 +147,47 @@ where | |||
| 105 | } | 147 | } |
| 106 | } | 148 | } |
| 107 | 149 | ||
| 150 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 151 | #[derive(Copy)] | ||
| 152 | pub struct DynamicReceiver<'ch, T> { | ||
| 153 | channel: &'ch dyn DynamicChannel<T>, | ||
| 154 | } | ||
| 155 | |||
| 156 | impl<'ch, T> Clone for DynamicReceiver<'ch, T> { | ||
| 157 | fn clone(&self) -> Self { | ||
| 158 | DynamicReceiver { | ||
| 159 | channel: self.channel, | ||
| 160 | } | ||
| 161 | } | ||
| 162 | } | ||
| 163 | |||
| 164 | impl<'ch, T> DynamicReceiver<'ch, T> { | ||
| 165 | /// Receive the next value. | ||
| 166 | /// | ||
| 167 | /// See [`Channel::recv()`]. | ||
| 168 | pub fn recv(&self) -> DynamicRecvFuture<'_, T> { | ||
| 169 | DynamicRecvFuture { | ||
| 170 | channel: self.channel, | ||
| 171 | } | ||
| 172 | } | ||
| 173 | |||
| 174 | /// Attempt to immediately receive the next value. | ||
| 175 | /// | ||
| 176 | /// See [`Channel::try_recv()`] | ||
| 177 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 178 | self.channel.try_recv_with_context(None) | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T> | ||
| 183 | where | ||
| 184 | M: RawMutex, | ||
| 185 | { | ||
| 186 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 187 | Self { channel: s.channel } | ||
| 188 | } | ||
| 189 | } | ||
| 190 | |||
| 108 | pub struct RecvFuture<'ch, M, T, const N: usize> | 191 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 109 | where | 192 | where |
| 110 | M: RawMutex, | 193 | M: RawMutex, |
| @@ -119,11 +202,25 @@ where | |||
| 119 | type Output = T; | 202 | type Output = T; |
| 120 | 203 | ||
| 121 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | 204 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
| 122 | self.channel | 205 | match self.channel.try_recv_with_context(Some(cx)) { |
| 123 | .lock(|c| match c.try_recv_with_context(Some(cx)) { | 206 | Ok(v) => Poll::Ready(v), |
| 124 | Ok(v) => Poll::Ready(v), | 207 | Err(TryRecvError::Empty) => Poll::Pending, |
| 125 | Err(TryRecvError::Empty) => Poll::Pending, | 208 | } |
| 126 | }) | 209 | } |
| 210 | } | ||
| 211 | |||
| 212 | pub struct DynamicRecvFuture<'ch, T> { | ||
| 213 | channel: &'ch dyn DynamicChannel<T>, | ||
| 214 | } | ||
| 215 | |||
| 216 | impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | ||
| 217 | type Output = T; | ||
| 218 | |||
| 219 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 220 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 221 | Ok(v) => Poll::Ready(v), | ||
| 222 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 223 | } | ||
| 127 | } | 224 | } |
| 128 | } | 225 | } |
| 129 | 226 | ||
| @@ -143,7 +240,7 @@ where | |||
| 143 | 240 | ||
| 144 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 241 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 145 | match self.message.take() { | 242 | match self.message.take() { |
| 146 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { | 243 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { |
| 147 | Ok(..) => Poll::Ready(()), | 244 | Ok(..) => Poll::Ready(()), |
| 148 | Err(TrySendError::Full(m)) => { | 245 | Err(TrySendError::Full(m)) => { |
| 149 | self.message = Some(m); | 246 | self.message = Some(m); |
| @@ -157,6 +254,40 @@ where | |||
| 157 | 254 | ||
| 158 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | 255 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} |
| 159 | 256 | ||
| 257 | pub struct DynamicSendFuture<'ch, T> { | ||
| 258 | channel: &'ch dyn DynamicChannel<T>, | ||
| 259 | message: Option<T>, | ||
| 260 | } | ||
| 261 | |||
| 262 | impl<'ch, T> Future for DynamicSendFuture<'ch, T> { | ||
| 263 | type Output = (); | ||
| 264 | |||
| 265 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 266 | match self.message.take() { | ||
| 267 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 268 | Ok(..) => Poll::Ready(()), | ||
| 269 | Err(TrySendError::Full(m)) => { | ||
| 270 | self.message = Some(m); | ||
| 271 | Poll::Pending | ||
| 272 | } | ||
| 273 | }, | ||
| 274 | None => panic!("Message cannot be None"), | ||
| 275 | } | ||
| 276 | } | ||
| 277 | } | ||
| 278 | |||
| 279 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} | ||
| 280 | |||
| 281 | trait DynamicChannel<T> { | ||
| 282 | fn try_send_with_context( | ||
| 283 | &self, | ||
| 284 | message: T, | ||
| 285 | cx: Option<&mut Context<'_>>, | ||
| 286 | ) -> Result<(), TrySendError<T>>; | ||
| 287 | |||
| 288 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; | ||
| 289 | } | ||
| 290 | |||
| 160 | /// Error returned by [`try_recv`](Channel::try_recv). | 291 | /// Error returned by [`try_recv`](Channel::try_recv). |
| 161 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | 292 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
| 162 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | 293 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| @@ -287,6 +418,18 @@ where | |||
| 287 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | 418 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) |
| 288 | } | 419 | } |
| 289 | 420 | ||
| 421 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 422 | self.lock(|c| c.try_recv_with_context(cx)) | ||
| 423 | } | ||
| 424 | |||
| 425 | fn try_send_with_context( | ||
| 426 | &self, | ||
| 427 | m: T, | ||
| 428 | cx: Option<&mut Context<'_>>, | ||
| 429 | ) -> Result<(), TrySendError<T>> { | ||
| 430 | self.lock(|c| c.try_send_with_context(m, cx)) | ||
| 431 | } | ||
| 432 | |||
| 290 | /// Get a sender for this channel. | 433 | /// Get a sender for this channel. |
| 291 | pub fn sender(&self) -> Sender<'_, M, T, N> { | 434 | pub fn sender(&self) -> Sender<'_, M, T, N> { |
| 292 | Sender { channel: self } | 435 | Sender { channel: self } |
| @@ -339,6 +482,25 @@ where | |||
| 339 | } | 482 | } |
| 340 | } | 483 | } |
| 341 | 484 | ||
| 485 | /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the | ||
| 486 | /// tradeoff cost of dynamic dispatch. | ||
| 487 | impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N> | ||
| 488 | where | ||
| 489 | M: RawMutex, | ||
| 490 | { | ||
| 491 | fn try_send_with_context( | ||
| 492 | &self, | ||
| 493 | m: T, | ||
| 494 | cx: Option<&mut Context<'_>>, | ||
| 495 | ) -> Result<(), TrySendError<T>> { | ||
| 496 | Channel::try_send_with_context(self, m, cx) | ||
| 497 | } | ||
| 498 | |||
| 499 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 500 | Channel::try_recv_with_context(self, cx) | ||
| 501 | } | ||
| 502 | } | ||
| 503 | |||
| 342 | #[cfg(test)] | 504 | #[cfg(test)] |
| 343 | mod tests { | 505 | mod tests { |
| 344 | use core::time::Duration; | 506 | use core::time::Duration; |
| @@ -411,6 +573,16 @@ mod tests { | |||
| 411 | let _ = s1.clone(); | 573 | let _ = s1.clone(); |
| 412 | } | 574 | } |
| 413 | 575 | ||
| 576 | #[test] | ||
| 577 | fn dynamic_dispatch() { | ||
| 578 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 579 | let s: DynamicSender<'_, u32> = c.sender().into(); | ||
| 580 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | ||
| 581 | |||
| 582 | assert!(s.try_send(1).is_ok()); | ||
| 583 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 584 | } | ||
| 585 | |||
| 414 | #[futures_test::test] | 586 | #[futures_test::test] |
| 415 | async fn receiver_receives_given_try_send_async() { | 587 | async fn receiver_receives_given_try_send_async() { |
| 416 | let executor = ThreadPool::new().unwrap(); | 588 | let executor = ThreadPool::new().unwrap(); |
