diff options
| author | Dario Nieuwenhuis <[email protected]> | 2021-08-02 13:20:29 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-08-02 13:20:29 +0200 |
| commit | c458ad52e698ae0e9810e90d094394bd3ab35c77 (patch) | |
| tree | 1a26a2a753b9c65b18669d384ab929d2f2c5baab | |
| parent | 0a1da180d0fd93deaf9cc453566c3d66511c03a8 (diff) | |
| parent | f2c2ad06caa0b05c4c8a9b3b88741afe5ab9f836 (diff) | |
Merge pull request #337 from lulf/introduce-future-types
Introduce future types
| -rw-r--r-- | embassy/src/util/mpsc.rs | 55 |
1 files changed, 40 insertions, 15 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index cc9e2a5dd..4a934eb2f 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs | |||
| @@ -156,18 +156,10 @@ where | |||
| 156 | /// closed by `recv` until they are all consumed. | 156 | /// closed by `recv` until they are all consumed. |
| 157 | /// | 157 | /// |
| 158 | /// [`close`]: Self::close | 158 | /// [`close`]: Self::close |
| 159 | pub async fn recv(&mut self) -> Option<T> { | 159 | pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { |
| 160 | futures::future::poll_fn(|cx| self.recv_poll(cx)).await | 160 | RecvFuture { |
| 161 | } | 161 | channel_cell: self.channel_cell, |
| 162 | 162 | } | |
| 163 | fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { | ||
| 164 | Channel::lock(self.channel_cell, |c| { | ||
| 165 | match c.try_recv_with_context(Some(cx)) { | ||
| 166 | Ok(v) => Poll::Ready(Some(v)), | ||
| 167 | Err(TryRecvError::Closed) => Poll::Ready(None), | ||
| 168 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 169 | } | ||
| 170 | }) | ||
| 171 | } | 163 | } |
| 172 | 164 | ||
| 173 | /// Attempts to immediately receive a message on this `Receiver` | 165 | /// Attempts to immediately receive a message on this `Receiver` |
| @@ -202,6 +194,40 @@ where | |||
| 202 | } | 194 | } |
| 203 | } | 195 | } |
| 204 | 196 | ||
| 197 | pub struct RecvFuture<'ch, M, T, const N: usize> | ||
| 198 | where | ||
| 199 | M: Mutex<Data = ()>, | ||
| 200 | { | ||
| 201 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | ||
| 202 | } | ||
| 203 | |||
| 204 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | ||
| 205 | where | ||
| 206 | M: Mutex<Data = ()>, | ||
| 207 | { | ||
| 208 | type Output = Option<T>; | ||
| 209 | |||
| 210 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | ||
| 211 | Channel::lock(self.channel_cell, |c| { | ||
| 212 | match c.try_recv_with_context(Some(cx)) { | ||
| 213 | Ok(v) => Poll::Ready(Some(v)), | ||
| 214 | Err(TryRecvError::Closed) => Poll::Ready(None), | ||
| 215 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 216 | } | ||
| 217 | }) | ||
| 218 | } | ||
| 219 | } | ||
| 220 | |||
| 221 | // Safe to pass the receive future around since it locks channel whenever polled | ||
| 222 | unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where | ||
| 223 | M: Mutex<Data = ()> + Sync | ||
| 224 | { | ||
| 225 | } | ||
| 226 | unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where | ||
| 227 | M: Mutex<Data = ()> + Sync | ||
| 228 | { | ||
| 229 | } | ||
| 230 | |||
| 205 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | 231 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> |
| 206 | where | 232 | where |
| 207 | M: Mutex<Data = ()>, | 233 | M: Mutex<Data = ()>, |
| @@ -224,12 +250,11 @@ where | |||
| 224 | /// | 250 | /// |
| 225 | /// [`close`]: Receiver::close | 251 | /// [`close`]: Receiver::close |
| 226 | /// [`Receiver`]: Receiver | 252 | /// [`Receiver`]: Receiver |
| 227 | pub async fn send(&self, message: T) -> Result<(), SendError<T>> { | 253 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { |
| 228 | SendFuture { | 254 | SendFuture { |
| 229 | sender: self.clone(), | 255 | sender: self.clone(), |
| 230 | message: Some(message), | 256 | message: Some(message), |
| 231 | } | 257 | } |
| 232 | .await | ||
| 233 | } | 258 | } |
| 234 | 259 | ||
| 235 | /// Attempts to immediately send a message on this `Sender` | 260 | /// Attempts to immediately send a message on this `Sender` |
| @@ -278,7 +303,7 @@ where | |||
| 278 | } | 303 | } |
| 279 | } | 304 | } |
| 280 | 305 | ||
| 281 | struct SendFuture<'ch, M, T, const N: usize> | 306 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 282 | where | 307 | where |
| 283 | M: Mutex<Data = ()>, | 308 | M: Mutex<Data = ()>, |
| 284 | { | 309 | { |
