aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/util/mpsc.rs45
1 files changed, 30 insertions, 15 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index cc9e2a5dd..d41c86291 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -156,18 +156,10 @@ where
156 /// closed by `recv` until they are all consumed. 156 /// closed by `recv` until they are all consumed.
157 /// 157 ///
158 /// [`close`]: Self::close 158 /// [`close`]: Self::close
159 pub async fn recv(&mut self) -> Option<T> { 159 pub fn recv(&mut self) -> RecvFuture<'ch, M, T, N> {
160 futures::future::poll_fn(|cx| self.recv_poll(cx)).await 160 RecvFuture {
161 } 161 channel_cell: self.channel_cell,
162 162 }
163 fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
164 Channel::lock(self.channel_cell, |c| {
165 match c.try_recv_with_context(Some(cx)) {
166 Ok(v) => Poll::Ready(Some(v)),
167 Err(TryRecvError::Closed) => Poll::Ready(None),
168 Err(TryRecvError::Empty) => Poll::Pending,
169 }
170 })
171 } 163 }
172 164
173 /// Attempts to immediately receive a message on this `Receiver` 165 /// Attempts to immediately receive a message on this `Receiver`
@@ -202,6 +194,30 @@ where
202 } 194 }
203} 195}
204 196
197pub struct RecvFuture<'ch, M, T, const N: usize>
198where
199 M: Mutex<Data = ()>,
200{
201 channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
202}
203
204impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
205where
206 M: Mutex<Data = ()>,
207{
208 type Output = Option<T>;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
211 Channel::lock(self.channel_cell, |c| {
212 match c.try_recv_with_context(Some(cx)) {
213 Ok(v) => Poll::Ready(Some(v)),
214 Err(TryRecvError::Closed) => Poll::Ready(None),
215 Err(TryRecvError::Empty) => Poll::Pending,
216 }
217 })
218 }
219}
220
205impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> 221impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
206where 222where
207 M: Mutex<Data = ()>, 223 M: Mutex<Data = ()>,
@@ -224,12 +240,11 @@ where
224 /// 240 ///
225 /// [`close`]: Receiver::close 241 /// [`close`]: Receiver::close
226 /// [`Receiver`]: Receiver 242 /// [`Receiver`]: Receiver
227 pub async fn send(&self, message: T) -> Result<(), SendError<T>> { 243 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
228 SendFuture { 244 SendFuture {
229 sender: self.clone(), 245 sender: self.clone(),
230 message: Some(message), 246 message: Some(message),
231 } 247 }
232 .await
233 } 248 }
234 249
235 /// Attempts to immediately send a message on this `Sender` 250 /// Attempts to immediately send a message on this `Sender`
@@ -278,7 +293,7 @@ where
278 } 293 }
279} 294}
280 295
281struct SendFuture<'ch, M, T, const N: usize> 296pub struct SendFuture<'ch, M, T, const N: usize>
282where 297where
283 M: Mutex<Data = ()>, 298 M: Mutex<Data = ()>,
284{ 299{