aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorRuben De Smet <[email protected]>2023-08-11 11:30:29 +0200
committerRuben De Smet <[email protected]>2023-08-11 13:22:56 +0200
commitb1ec460b9af131ef80fcafd79a7f63aa326aaf94 (patch)
tree394ea85af2f26dd773ec0698d5590d93d283ba46 /embassy-sync
parentf9d251cd5cd9c84718cd66e7697a8502c4d61a0a (diff)
Implement Channel::poll_receive(..) -> Poll<T>
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/src/channel.rs43
1 files changed, 39 insertions, 4 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index e7224856c..dc727fb10 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -165,6 +165,13 @@ where
165 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { 165 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
166 self.channel.poll_ready_to_receive(cx) 166 self.channel.poll_ready_to_receive(cx)
167 } 167 }
168
169 /// Poll the channel for the next item
170 ///
171 /// See [`Channel::poll_receive()`]
172 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
173 self.channel.poll_receive(cx)
174 }
168} 175}
169 176
170/// Receive-only access to a [`Channel`] without knowing channel size. 177/// Receive-only access to a [`Channel`] without knowing channel size.
@@ -201,6 +208,13 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
201 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { 208 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
202 self.channel.poll_ready_to_receive(cx) 209 self.channel.poll_ready_to_receive(cx)
203 } 210 }
211
212 /// Poll the channel for the next item
213 ///
214 /// See [`Channel::poll_receive()`]
215 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
216 self.channel.poll_receive(cx)
217 }
204} 218}
205 219
206impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T> 220impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
@@ -228,10 +242,7 @@ where
228 type Output = T; 242 type Output = T;
229 243
230 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 244 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
231 match self.channel.try_recv_with_context(Some(cx)) { 245 self.channel.poll_receive(cx)
232 Ok(v) => Poll::Ready(v),
233 Err(TryRecvError::Empty) => Poll::Pending,
234 }
235 } 246 }
236} 247}
237 248
@@ -317,6 +328,8 @@ trait DynamicChannel<T> {
317 328
318 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; 329 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
319 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; 330 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
331
332 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
320} 333}
321 334
322/// Error returned by [`try_recv`](Channel::try_recv). 335/// Error returned by [`try_recv`](Channel::try_recv).
@@ -370,6 +383,19 @@ impl<T, const N: usize> ChannelState<T, N> {
370 } 383 }
371 } 384 }
372 385
386 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
387 if self.queue.is_full() {
388 self.senders_waker.wake();
389 }
390
391 if let Some(message) = self.queue.pop_front() {
392 Poll::Ready(message)
393 } else {
394 self.receiver_waker.register(cx.waker());
395 Poll::Pending
396 }
397 }
398
373 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { 399 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
374 self.receiver_waker.register(cx.waker()); 400 self.receiver_waker.register(cx.waker());
375 401
@@ -452,6 +478,11 @@ where
452 self.lock(|c| c.try_recv_with_context(cx)) 478 self.lock(|c| c.try_recv_with_context(cx))
453 } 479 }
454 480
481 /// Poll the channel for the next message
482 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
483 self.lock(|c| c.poll_receive(cx))
484 }
485
455 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { 486 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
456 self.lock(|c| c.try_send_with_context(m, cx)) 487 self.lock(|c| c.try_send_with_context(m, cx))
457 } 488 }
@@ -539,6 +570,10 @@ where
539 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { 570 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
540 Channel::poll_ready_to_receive(self, cx) 571 Channel::poll_ready_to_receive(self, cx)
541 } 572 }
573
574 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
575 Channel::poll_receive(self, cx)
576 }
542} 577}
543 578
544#[cfg(test)] 579#[cfg(test)]