diff options
| -rw-r--r-- | embassy/src/channel/channel.rs | 184 |
1 files changed, 178 insertions, 6 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs index d749e5971..a1d805c53 100644 --- a/embassy/src/channel/channel.rs +++ b/embassy/src/channel/channel.rs | |||
| @@ -66,6 +66,48 @@ where | |||
| 66 | } | 66 | } |
| 67 | } | 67 | } |
| 68 | 68 | ||
| 69 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 70 | #[derive(Copy)] | ||
| 71 | pub struct DynamicSender<'ch, T> { | ||
| 72 | channel: &'ch dyn DynamicChannel<T>, | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'ch, T> Clone for DynamicSender<'ch, T> { | ||
| 76 | fn clone(&self) -> Self { | ||
| 77 | DynamicSender { | ||
| 78 | channel: self.channel, | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 83 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T> | ||
| 84 | where | ||
| 85 | M: RawMutex, | ||
| 86 | { | ||
| 87 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 88 | Self { channel: s.channel } | ||
| 89 | } | ||
| 90 | } | ||
| 91 | |||
| 92 | impl<'ch, T> DynamicSender<'ch, T> { | ||
| 93 | /// Sends a value. | ||
| 94 | /// | ||
| 95 | /// See [`Channel::send()`] | ||
| 96 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 97 | DynamicSendFuture { | ||
| 98 | channel: self.channel, | ||
| 99 | message: Some(message), | ||
| 100 | } | ||
| 101 | } | ||
| 102 | |||
| 103 | /// Attempt to immediately send a message. | ||
| 104 | /// | ||
| 105 | /// See [`Channel::send()`] | ||
| 106 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 107 | self.channel.try_send_with_context(message, None) | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 69 | /// Receive-only access to a [`Channel`]. | 111 | /// Receive-only access to a [`Channel`]. |
| 70 | #[derive(Copy)] | 112 | #[derive(Copy)] |
| 71 | pub struct Receiver<'ch, M, T, const N: usize> | 113 | pub struct Receiver<'ch, M, T, const N: usize> |
| @@ -105,6 +147,47 @@ where | |||
| 105 | } | 147 | } |
| 106 | } | 148 | } |
| 107 | 149 | ||
| 150 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 151 | #[derive(Copy)] | ||
| 152 | pub struct DynamicReceiver<'ch, T> { | ||
| 153 | channel: &'ch dyn DynamicChannel<T>, | ||
| 154 | } | ||
| 155 | |||
| 156 | impl<'ch, T> Clone for DynamicReceiver<'ch, T> { | ||
| 157 | fn clone(&self) -> Self { | ||
| 158 | DynamicReceiver { | ||
| 159 | channel: self.channel, | ||
| 160 | } | ||
| 161 | } | ||
| 162 | } | ||
| 163 | |||
| 164 | impl<'ch, T> DynamicReceiver<'ch, T> { | ||
| 165 | /// Receive the next value. | ||
| 166 | /// | ||
| 167 | /// See [`Channel::recv()`]. | ||
| 168 | pub fn recv(&self) -> DynamicRecvFuture<'_, T> { | ||
| 169 | DynamicRecvFuture { | ||
| 170 | channel: self.channel, | ||
| 171 | } | ||
| 172 | } | ||
| 173 | |||
| 174 | /// Attempt to immediately receive the next value. | ||
| 175 | /// | ||
| 176 | /// See [`Channel::try_recv()`] | ||
| 177 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 178 | self.channel.try_recv_with_context(None) | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T> | ||
| 183 | where | ||
| 184 | M: RawMutex, | ||
| 185 | { | ||
| 186 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 187 | Self { channel: s.channel } | ||
| 188 | } | ||
| 189 | } | ||
| 190 | |||
| 108 | pub struct RecvFuture<'ch, M, T, const N: usize> | 191 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 109 | where | 192 | where |
| 110 | M: RawMutex, | 193 | M: RawMutex, |
| @@ -119,11 +202,25 @@ where | |||
| 119 | type Output = T; | 202 | type Output = T; |
| 120 | 203 | ||
| 121 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | 204 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
| 122 | self.channel | 205 | match self.channel.try_recv_with_context(Some(cx)) { |
| 123 | .lock(|c| match c.try_recv_with_context(Some(cx)) { | 206 | Ok(v) => Poll::Ready(v), |
| 124 | Ok(v) => Poll::Ready(v), | 207 | Err(TryRecvError::Empty) => Poll::Pending, |
| 125 | Err(TryRecvError::Empty) => Poll::Pending, | 208 | } |
| 126 | }) | 209 | } |
| 210 | } | ||
| 211 | |||
| 212 | pub struct DynamicRecvFuture<'ch, T> { | ||
| 213 | channel: &'ch dyn DynamicChannel<T>, | ||
| 214 | } | ||
| 215 | |||
| 216 | impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | ||
| 217 | type Output = T; | ||
| 218 | |||
| 219 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 220 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 221 | Ok(v) => Poll::Ready(v), | ||
| 222 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 223 | } | ||
| 127 | } | 224 | } |
| 128 | } | 225 | } |
| 129 | 226 | ||
| @@ -143,7 +240,7 @@ where | |||
| 143 | 240 | ||
| 144 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 241 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 145 | match self.message.take() { | 242 | match self.message.take() { |
| 146 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { | 243 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { |
| 147 | Ok(..) => Poll::Ready(()), | 244 | Ok(..) => Poll::Ready(()), |
| 148 | Err(TrySendError::Full(m)) => { | 245 | Err(TrySendError::Full(m)) => { |
| 149 | self.message = Some(m); | 246 | self.message = Some(m); |
| @@ -157,6 +254,40 @@ where | |||
| 157 | 254 | ||
| 158 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | 255 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} |
| 159 | 256 | ||
| 257 | pub struct DynamicSendFuture<'ch, T> { | ||
| 258 | channel: &'ch dyn DynamicChannel<T>, | ||
| 259 | message: Option<T>, | ||
| 260 | } | ||
| 261 | |||
| 262 | impl<'ch, T> Future for DynamicSendFuture<'ch, T> { | ||
| 263 | type Output = (); | ||
| 264 | |||
| 265 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 266 | match self.message.take() { | ||
| 267 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 268 | Ok(..) => Poll::Ready(()), | ||
| 269 | Err(TrySendError::Full(m)) => { | ||
| 270 | self.message = Some(m); | ||
| 271 | Poll::Pending | ||
| 272 | } | ||
| 273 | }, | ||
| 274 | None => panic!("Message cannot be None"), | ||
| 275 | } | ||
| 276 | } | ||
| 277 | } | ||
| 278 | |||
| 279 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} | ||
| 280 | |||
| 281 | trait DynamicChannel<T> { | ||
| 282 | fn try_send_with_context( | ||
| 283 | &self, | ||
| 284 | message: T, | ||
| 285 | cx: Option<&mut Context<'_>>, | ||
| 286 | ) -> Result<(), TrySendError<T>>; | ||
| 287 | |||
| 288 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; | ||
| 289 | } | ||
| 290 | |||
| 160 | /// Error returned by [`try_recv`](Channel::try_recv). | 291 | /// Error returned by [`try_recv`](Channel::try_recv). |
| 161 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | 292 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
| 162 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | 293 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| @@ -287,6 +418,18 @@ where | |||
| 287 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | 418 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) |
| 288 | } | 419 | } |
| 289 | 420 | ||
| 421 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 422 | self.lock(|c| c.try_recv_with_context(cx)) | ||
| 423 | } | ||
| 424 | |||
| 425 | fn try_send_with_context( | ||
| 426 | &self, | ||
| 427 | m: T, | ||
| 428 | cx: Option<&mut Context<'_>>, | ||
| 429 | ) -> Result<(), TrySendError<T>> { | ||
| 430 | self.lock(|c| c.try_send_with_context(m, cx)) | ||
| 431 | } | ||
| 432 | |||
| 290 | /// Get a sender for this channel. | 433 | /// Get a sender for this channel. |
| 291 | pub fn sender(&self) -> Sender<'_, M, T, N> { | 434 | pub fn sender(&self) -> Sender<'_, M, T, N> { |
| 292 | Sender { channel: self } | 435 | Sender { channel: self } |
| @@ -339,6 +482,25 @@ where | |||
| 339 | } | 482 | } |
| 340 | } | 483 | } |
| 341 | 484 | ||
| 485 | /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the | ||
| 486 | /// tradeoff cost of dynamic dispatch. | ||
| 487 | impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N> | ||
| 488 | where | ||
| 489 | M: RawMutex, | ||
| 490 | { | ||
| 491 | fn try_send_with_context( | ||
| 492 | &self, | ||
| 493 | m: T, | ||
| 494 | cx: Option<&mut Context<'_>>, | ||
| 495 | ) -> Result<(), TrySendError<T>> { | ||
| 496 | Channel::try_send_with_context(self, m, cx) | ||
| 497 | } | ||
| 498 | |||
| 499 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 500 | Channel::try_recv_with_context(self, cx) | ||
| 501 | } | ||
| 502 | } | ||
| 503 | |||
| 342 | #[cfg(test)] | 504 | #[cfg(test)] |
| 343 | mod tests { | 505 | mod tests { |
| 344 | use core::time::Duration; | 506 | use core::time::Duration; |
| @@ -411,6 +573,16 @@ mod tests { | |||
| 411 | let _ = s1.clone(); | 573 | let _ = s1.clone(); |
| 412 | } | 574 | } |
| 413 | 575 | ||
| 576 | #[test] | ||
| 577 | fn dynamic_dispatch() { | ||
| 578 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 579 | let s: DynamicSender<'_, u32> = c.sender().into(); | ||
| 580 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | ||
| 581 | |||
| 582 | assert!(s.try_send(1).is_ok()); | ||
| 583 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 584 | } | ||
| 585 | |||
| 414 | #[futures_test::test] | 586 | #[futures_test::test] |
| 415 | async fn receiver_receives_given_try_send_async() { | 587 | async fn receiver_receives_given_try_send_async() { |
| 416 | let executor = ThreadPool::new().unwrap(); | 588 | let executor = ThreadPool::new().unwrap(); |
