diff options
| author | Ulf Lilleengen <[email protected]> | 2022-04-11 08:57:15 +0200 |
|---|---|---|
| committer | Ulf Lilleengen <[email protected]> | 2022-04-11 08:57:15 +0200 |
| commit | bc1dff34c0e5d44706c6387892d8f2c2f3539bd9 (patch) | |
| tree | 633c85ba7aee24a269b879fee935569f321b3bcb | |
| parent | e844893095ac186058495df5b049e6846781497a (diff) | |
Add types for channel dynamic dispatch
* Add internal DynamicChannel trait implemented by Channel that allows
polling for internal state in a lock safe manner and does not require
knowing the channel size.
* Existing usage of Sender and Receiver is preserved and does not use
dynamic dispatch.
* Add DynamicSender and DynamicReceiver types that references the
channel using the DynamicChannel trait and does not require the const
generic channel size parameter.
| -rw-r--r-- | embassy/src/channel/channel.rs | 217 |
1 files changed, 211 insertions, 6 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs index d749e5971..c7a89793a 100644 --- a/embassy/src/channel/channel.rs +++ b/embassy/src/channel/channel.rs | |||
| @@ -66,6 +66,57 @@ where | |||
| 66 | } | 66 | } |
| 67 | } | 67 | } |
| 68 | 68 | ||
| 69 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 70 | #[derive(Copy)] | ||
| 71 | pub struct DynamicSender<'ch, M, T> | ||
| 72 | where | ||
| 73 | M: RawMutex, | ||
| 74 | { | ||
| 75 | channel: &'ch dyn DynamicChannel<M, T>, | ||
| 76 | } | ||
| 77 | |||
| 78 | impl<'ch, M, T> Clone for DynamicSender<'ch, M, T> | ||
| 79 | where | ||
| 80 | M: RawMutex, | ||
| 81 | { | ||
| 82 | fn clone(&self) -> Self { | ||
| 83 | DynamicSender { | ||
| 84 | channel: self.channel, | ||
| 85 | } | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, M, T> | ||
| 90 | where | ||
| 91 | M: RawMutex, | ||
| 92 | { | ||
| 93 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 94 | Self { channel: s.channel } | ||
| 95 | } | ||
| 96 | } | ||
| 97 | |||
| 98 | impl<'ch, M, T> DynamicSender<'ch, M, T> | ||
| 99 | where | ||
| 100 | M: RawMutex, | ||
| 101 | { | ||
| 102 | /// Sends a value. | ||
| 103 | /// | ||
| 104 | /// See [`Channel::send()`] | ||
| 105 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, M, T> { | ||
| 106 | DynamicSendFuture { | ||
| 107 | channel: self.channel, | ||
| 108 | message: Some(message), | ||
| 109 | } | ||
| 110 | } | ||
| 111 | |||
| 112 | /// Attempt to immediately send a message. | ||
| 113 | /// | ||
| 114 | /// See [`Channel::send()`] | ||
| 115 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 116 | self.channel.try_send_with_context(message, None) | ||
| 117 | } | ||
| 118 | } | ||
| 119 | |||
| 69 | /// Receive-only access to a [`Channel`]. | 120 | /// Receive-only access to a [`Channel`]. |
| 70 | #[derive(Copy)] | 121 | #[derive(Copy)] |
| 71 | pub struct Receiver<'ch, M, T, const N: usize> | 122 | pub struct Receiver<'ch, M, T, const N: usize> |
| @@ -105,6 +156,56 @@ where | |||
| 105 | } | 156 | } |
| 106 | } | 157 | } |
| 107 | 158 | ||
| 159 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 160 | #[derive(Copy)] | ||
| 161 | pub struct DynamicReceiver<'ch, M, T> | ||
| 162 | where | ||
| 163 | M: RawMutex, | ||
| 164 | { | ||
| 165 | channel: &'ch dyn DynamicChannel<M, T>, | ||
| 166 | } | ||
| 167 | |||
| 168 | impl<'ch, M, T> Clone for DynamicReceiver<'ch, M, T> | ||
| 169 | where | ||
| 170 | M: RawMutex, | ||
| 171 | { | ||
| 172 | fn clone(&self) -> Self { | ||
| 173 | DynamicReceiver { | ||
| 174 | channel: self.channel, | ||
| 175 | } | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | impl<'ch, M, T> DynamicReceiver<'ch, M, T> | ||
| 180 | where | ||
| 181 | M: RawMutex, | ||
| 182 | { | ||
| 183 | /// Receive the next value. | ||
| 184 | /// | ||
| 185 | /// See [`Channel::recv()`]. | ||
| 186 | pub fn recv(&self) -> DynamicRecvFuture<'_, M, T> { | ||
| 187 | DynamicRecvFuture { | ||
| 188 | channel: self.channel, | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 192 | /// Attempt to immediately receive the next value. | ||
| 193 | /// | ||
| 194 | /// See [`Channel::try_recv()`] | ||
| 195 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 196 | self.channel.try_recv_with_context(None) | ||
| 197 | } | ||
| 198 | } | ||
| 199 | |||
| 200 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, M, T> | ||
| 201 | where | ||
| 202 | M: RawMutex, | ||
| 203 | { | ||
| 204 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 205 | Self { channel: s.channel } | ||
| 206 | } | ||
| 207 | } | ||
| 208 | |||
| 108 | pub struct RecvFuture<'ch, M, T, const N: usize> | 209 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 109 | where | 210 | where |
| 110 | M: RawMutex, | 211 | M: RawMutex, |
| @@ -119,11 +220,31 @@ where | |||
| 119 | type Output = T; | 220 | type Output = T; |
| 120 | 221 | ||
| 121 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | 222 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
| 122 | self.channel | 223 | match self.channel.try_recv_with_context(Some(cx)) { |
| 123 | .lock(|c| match c.try_recv_with_context(Some(cx)) { | 224 | Ok(v) => Poll::Ready(v), |
| 124 | Ok(v) => Poll::Ready(v), | 225 | Err(TryRecvError::Empty) => Poll::Pending, |
| 125 | Err(TryRecvError::Empty) => Poll::Pending, | 226 | } |
| 126 | }) | 227 | } |
| 228 | } | ||
| 229 | |||
| 230 | pub struct DynamicRecvFuture<'ch, M, T> | ||
| 231 | where | ||
| 232 | M: RawMutex, | ||
| 233 | { | ||
| 234 | channel: &'ch dyn DynamicChannel<M, T>, | ||
| 235 | } | ||
| 236 | |||
| 237 | impl<'ch, M, T> Future for DynamicRecvFuture<'ch, M, T> | ||
| 238 | where | ||
| 239 | M: RawMutex, | ||
| 240 | { | ||
| 241 | type Output = T; | ||
| 242 | |||
| 243 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 244 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 245 | Ok(v) => Poll::Ready(v), | ||
| 246 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 247 | } | ||
| 127 | } | 248 | } |
| 128 | } | 249 | } |
| 129 | 250 | ||
| @@ -143,7 +264,7 @@ where | |||
| 143 | 264 | ||
| 144 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 265 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 145 | match self.message.take() { | 266 | match self.message.take() { |
| 146 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { | 267 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { |
| 147 | Ok(..) => Poll::Ready(()), | 268 | Ok(..) => Poll::Ready(()), |
| 148 | Err(TrySendError::Full(m)) => { | 269 | Err(TrySendError::Full(m)) => { |
| 149 | self.message = Some(m); | 270 | self.message = Some(m); |
| @@ -157,6 +278,49 @@ where | |||
| 157 | 278 | ||
| 158 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | 279 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} |
| 159 | 280 | ||
| 281 | pub struct DynamicSendFuture<'ch, M, T> | ||
| 282 | where | ||
| 283 | M: RawMutex, | ||
| 284 | { | ||
| 285 | channel: &'ch dyn DynamicChannel<M, T>, | ||
| 286 | message: Option<T>, | ||
| 287 | } | ||
| 288 | |||
| 289 | impl<'ch, M, T> Future for DynamicSendFuture<'ch, M, T> | ||
| 290 | where | ||
| 291 | M: RawMutex, | ||
| 292 | { | ||
| 293 | type Output = (); | ||
| 294 | |||
| 295 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 296 | match self.message.take() { | ||
| 297 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 298 | Ok(..) => Poll::Ready(()), | ||
| 299 | Err(TrySendError::Full(m)) => { | ||
| 300 | self.message = Some(m); | ||
| 301 | Poll::Pending | ||
| 302 | } | ||
| 303 | }, | ||
| 304 | None => panic!("Message cannot be None"), | ||
| 305 | } | ||
| 306 | } | ||
| 307 | } | ||
| 308 | |||
| 309 | impl<'ch, M, T> Unpin for DynamicSendFuture<'ch, M, T> where M: RawMutex {} | ||
| 310 | |||
| 311 | trait DynamicChannel<M, T> | ||
| 312 | where | ||
| 313 | M: RawMutex, | ||
| 314 | { | ||
| 315 | fn try_send_with_context( | ||
| 316 | &self, | ||
| 317 | message: T, | ||
| 318 | cx: Option<&mut Context<'_>>, | ||
| 319 | ) -> Result<(), TrySendError<T>>; | ||
| 320 | |||
| 321 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; | ||
| 322 | } | ||
| 323 | |||
| 160 | /// Error returned by [`try_recv`](Channel::try_recv). | 324 | /// Error returned by [`try_recv`](Channel::try_recv). |
| 161 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | 325 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
| 162 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | 326 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| @@ -287,6 +451,18 @@ where | |||
| 287 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | 451 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) |
| 288 | } | 452 | } |
| 289 | 453 | ||
| 454 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 455 | self.lock(|c| c.try_recv_with_context(cx)) | ||
| 456 | } | ||
| 457 | |||
| 458 | fn try_send_with_context( | ||
| 459 | &self, | ||
| 460 | m: T, | ||
| 461 | cx: Option<&mut Context<'_>>, | ||
| 462 | ) -> Result<(), TrySendError<T>> { | ||
| 463 | self.lock(|c| c.try_send_with_context(m, cx)) | ||
| 464 | } | ||
| 465 | |||
| 290 | /// Get a sender for this channel. | 466 | /// Get a sender for this channel. |
| 291 | pub fn sender(&self) -> Sender<'_, M, T, N> { | 467 | pub fn sender(&self) -> Sender<'_, M, T, N> { |
| 292 | Sender { channel: self } | 468 | Sender { channel: self } |
| @@ -339,6 +515,25 @@ where | |||
| 339 | } | 515 | } |
| 340 | } | 516 | } |
| 341 | 517 | ||
| 518 | /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the | ||
| 519 | /// tradeoff cost of dynamic dispatch. | ||
| 520 | impl<M, T, const N: usize> DynamicChannel<M, T> for Channel<M, T, N> | ||
| 521 | where | ||
| 522 | M: RawMutex, | ||
| 523 | { | ||
| 524 | fn try_send_with_context( | ||
| 525 | &self, | ||
| 526 | m: T, | ||
| 527 | cx: Option<&mut Context<'_>>, | ||
| 528 | ) -> Result<(), TrySendError<T>> { | ||
| 529 | Channel::try_send_with_context(self, m, cx) | ||
| 530 | } | ||
| 531 | |||
| 532 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 533 | Channel::try_recv_with_context(self, cx) | ||
| 534 | } | ||
| 535 | } | ||
| 536 | |||
| 342 | #[cfg(test)] | 537 | #[cfg(test)] |
| 343 | mod tests { | 538 | mod tests { |
| 344 | use core::time::Duration; | 539 | use core::time::Duration; |
| @@ -411,6 +606,16 @@ mod tests { | |||
| 411 | let _ = s1.clone(); | 606 | let _ = s1.clone(); |
| 412 | } | 607 | } |
| 413 | 608 | ||
| 609 | #[test] | ||
| 610 | fn dynamic_dispatch() { | ||
| 611 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 612 | let s: DynamicSender<'_, NoopRawMutex, u32> = c.sender().into(); | ||
| 613 | let r: DynamicReceiver<'_, NoopRawMutex, u32> = c.receiver().into(); | ||
| 614 | |||
| 615 | assert!(s.try_send(1).is_ok()); | ||
| 616 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 617 | } | ||
| 618 | |||
| 414 | #[futures_test::test] | 619 | #[futures_test::test] |
| 415 | async fn receiver_receives_given_try_send_async() { | 620 | async fn receiver_receives_given_try_send_async() { |
| 416 | let executor = ThreadPool::new().unwrap(); | 621 | let executor = ThreadPool::new().unwrap(); |
