aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2021-08-02 13:20:29 +0200
committerGitHub <[email protected]>2021-08-02 13:20:29 +0200
commitc458ad52e698ae0e9810e90d094394bd3ab35c77 (patch)
tree1a26a2a753b9c65b18669d384ab929d2f2c5baab
parent0a1da180d0fd93deaf9cc453566c3d66511c03a8 (diff)
parentf2c2ad06caa0b05c4c8a9b3b88741afe5ab9f836 (diff)
Merge pull request #337 from lulf/introduce-future-types
Introduce future types
-rw-r--r--embassy/src/util/mpsc.rs55
1 files changed, 40 insertions, 15 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index cc9e2a5dd..4a934eb2f 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -156,18 +156,10 @@ where
156 /// closed by `recv` until they are all consumed. 156 /// closed by `recv` until they are all consumed.
157 /// 157 ///
158 /// [`close`]: Self::close 158 /// [`close`]: Self::close
159 pub async fn recv(&mut self) -> Option<T> { 159 pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> {
160 futures::future::poll_fn(|cx| self.recv_poll(cx)).await 160 RecvFuture {
161 } 161 channel_cell: self.channel_cell,
162 162 }
163 fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
164 Channel::lock(self.channel_cell, |c| {
165 match c.try_recv_with_context(Some(cx)) {
166 Ok(v) => Poll::Ready(Some(v)),
167 Err(TryRecvError::Closed) => Poll::Ready(None),
168 Err(TryRecvError::Empty) => Poll::Pending,
169 }
170 })
171 } 163 }
172 164
173 /// Attempts to immediately receive a message on this `Receiver` 165 /// Attempts to immediately receive a message on this `Receiver`
@@ -202,6 +194,40 @@ where
202 } 194 }
203} 195}
204 196
197pub struct RecvFuture<'ch, M, T, const N: usize>
198where
199 M: Mutex<Data = ()>,
200{
201 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
202}
203
204impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
205where
206 M: Mutex<Data = ()>,
207{
208 type Output = Option<T>;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
211 Channel::lock(self.channel_cell, |c| {
212 match c.try_recv_with_context(Some(cx)) {
213 Ok(v) => Poll::Ready(Some(v)),
214 Err(TryRecvError::Closed) => Poll::Ready(None),
215 Err(TryRecvError::Empty) => Poll::Pending,
216 }
217 })
218 }
219}
220
221// Safe to pass the receive future around since it locks channel whenever polled
222unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where
223 M: Mutex<Data = ()> + Sync
224{
225}
226unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where
227 M: Mutex<Data = ()> + Sync
228{
229}
230
205impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> 231impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
206where 232where
207 M: Mutex<Data = ()>, 233 M: Mutex<Data = ()>,
@@ -224,12 +250,11 @@ where
224 /// 250 ///
225 /// [`close`]: Receiver::close 251 /// [`close`]: Receiver::close
226 /// [`Receiver`]: Receiver 252 /// [`Receiver`]: Receiver
227 pub async fn send(&self, message: T) -> Result<(), SendError<T>> { 253 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
228 SendFuture { 254 SendFuture {
229 sender: self.clone(), 255 sender: self.clone(),
230 message: Some(message), 256 message: Some(message),
231 } 257 }
232 .await
233 } 258 }
234 259
235 /// Attempts to immediately send a message on this `Sender` 260 /// Attempts to immediately send a message on this `Sender`
@@ -278,7 +303,7 @@ where
278 } 303 }
279} 304}
280 305
281struct SendFuture<'ch, M, T, const N: usize> 306pub struct SendFuture<'ch, M, T, const N: usize>
282where 307where
283 M: Mutex<Data = ()>, 308 M: Mutex<Data = ()>,
284{ 309{