diff options
| -rw-r--r-- | embassy/src/util/mpsc.rs | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index cc9e2a5dd..d41c86291 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs | |||
| @@ -156,18 +156,10 @@ where | |||
| 156 | /// closed by `recv` until they are all consumed. | 156 | /// closed by `recv` until they are all consumed. |
| 157 | /// | 157 | /// |
| 158 | /// [`close`]: Self::close | 158 | /// [`close`]: Self::close |
| 159 | pub async fn recv(&mut self) -> Option<T> { | 159 | pub fn recv(&mut self) -> RecvFuture<'ch, M, T, N> { |
| 160 | futures::future::poll_fn(|cx| self.recv_poll(cx)).await | 160 | RecvFuture { |
| 161 | } | 161 | channel_cell: self.channel_cell, |
| 162 | 162 | } | |
| 163 | fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { | ||
| 164 | Channel::lock(self.channel_cell, |c| { | ||
| 165 | match c.try_recv_with_context(Some(cx)) { | ||
| 166 | Ok(v) => Poll::Ready(Some(v)), | ||
| 167 | Err(TryRecvError::Closed) => Poll::Ready(None), | ||
| 168 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 169 | } | ||
| 170 | }) | ||
| 171 | } | 163 | } |
| 172 | 164 | ||
| 173 | /// Attempts to immediately receive a message on this `Receiver` | 165 | /// Attempts to immediately receive a message on this `Receiver` |
| @@ -202,6 +194,30 @@ where | |||
| 202 | } | 194 | } |
| 203 | } | 195 | } |
| 204 | 196 | ||
| 197 | pub struct RecvFuture<'ch, M, T, const N: usize> | ||
| 198 | where | ||
| 199 | M: Mutex<Data = ()>, | ||
| 200 | { | ||
| 201 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | ||
| 202 | } | ||
| 203 | |||
| 204 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | ||
| 205 | where | ||
| 206 | M: Mutex<Data = ()>, | ||
| 207 | { | ||
| 208 | type Output = Option<T>; | ||
| 209 | |||
| 210 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | ||
| 211 | Channel::lock(self.channel_cell, |c| { | ||
| 212 | match c.try_recv_with_context(Some(cx)) { | ||
| 213 | Ok(v) => Poll::Ready(Some(v)), | ||
| 214 | Err(TryRecvError::Closed) => Poll::Ready(None), | ||
| 215 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 216 | } | ||
| 217 | }) | ||
| 218 | } | ||
| 219 | } | ||
| 220 | |||
| 205 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | 221 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> |
| 206 | where | 222 | where |
| 207 | M: Mutex<Data = ()>, | 223 | M: Mutex<Data = ()>, |
| @@ -224,12 +240,11 @@ where | |||
| 224 | /// | 240 | /// |
| 225 | /// [`close`]: Receiver::close | 241 | /// [`close`]: Receiver::close |
| 226 | /// [`Receiver`]: Receiver | 242 | /// [`Receiver`]: Receiver |
| 227 | pub async fn send(&self, message: T) -> Result<(), SendError<T>> { | 243 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { |
| 228 | SendFuture { | 244 | SendFuture { |
| 229 | sender: self.clone(), | 245 | sender: self.clone(), |
| 230 | message: Some(message), | 246 | message: Some(message), |
| 231 | } | 247 | } |
| 232 | .await | ||
| 233 | } | 248 | } |
| 234 | 249 | ||
| 235 | /// Attempts to immediately send a message on this `Sender` | 250 | /// Attempts to immediately send a message on this `Sender` |
| @@ -278,7 +293,7 @@ where | |||
| 278 | } | 293 | } |
| 279 | } | 294 | } |
| 280 | 295 | ||
| 281 | struct SendFuture<'ch, M, T, const N: usize> | 296 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 282 | where | 297 | where |
| 283 | M: Mutex<Data = ()>, | 298 | M: Mutex<Data = ()>, |
| 284 | { | 299 | { |
