aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2023-08-22 21:25:29 +0000
committerGitHub <[email protected]>2023-08-22 21:25:29 +0000
commitb3212ae383e2cb4f167cbe9361b64bf83bf89049 (patch)
tree7ee528ade33153b92e3b26d4255119742dbf8cb0 /embassy-sync/src
parentb436aad2a0116b93ccb84b9ffd92aac082a0eeab (diff)
parentc39671266e21dd9e35e60cc680453cd5c38162db (diff)
Merge pull request #1763 from rubdos/sender-receiver-with-ctx
Refactor Channel/Sender/Receiver poll methods
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/channel.rs190
1 files changed, 136 insertions, 54 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index d6f36f53d..62ea1307d 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -65,6 +65,13 @@ where
65 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { 65 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
66 self.channel.try_send(message) 66 self.channel.try_send(message)
67 } 67 }
68
69 /// Allows a poll_fn to poll until the channel is ready to send
70 ///
71 /// See [`Channel::poll_ready_to_send()`]
72 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73 self.channel.poll_ready_to_send(cx)
74 }
68} 75}
69 76
70/// Send-only access to a [`Channel`] without knowing channel size. 77/// Send-only access to a [`Channel`] without knowing channel size.
@@ -106,6 +113,13 @@ impl<'ch, T> DynamicSender<'ch, T> {
106 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { 113 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
107 self.channel.try_send_with_context(message, None) 114 self.channel.try_send_with_context(message, None)
108 } 115 }
116
117 /// Allows a poll_fn to poll until the channel is ready to send
118 ///
119 /// See [`Channel::poll_ready_to_send()`]
120 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
121 self.channel.poll_ready_to_send(cx)
122 }
109} 123}
110 124
111/// Receive-only access to a [`Channel`]. 125/// Receive-only access to a [`Channel`].
@@ -133,16 +147,30 @@ where
133{ 147{
134 /// Receive the next value. 148 /// Receive the next value.
135 /// 149 ///
136 /// See [`Channel::recv()`]. 150 /// See [`Channel::receive()`].
137 pub fn recv(&self) -> RecvFuture<'_, M, T, N> { 151 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
138 self.channel.recv() 152 self.channel.receive()
139 } 153 }
140 154
141 /// Attempt to immediately receive the next value. 155 /// Attempt to immediately receive the next value.
142 /// 156 ///
143 /// See [`Channel::try_recv()`] 157 /// See [`Channel::try_receive()`]
144 pub fn try_recv(&self) -> Result<T, TryRecvError> { 158 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
145 self.channel.try_recv() 159 self.channel.try_receive()
160 }
161
162 /// Allows a poll_fn to poll until the channel is ready to receive
163 ///
164 /// See [`Channel::poll_ready_to_receive()`]
165 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
166 self.channel.poll_ready_to_receive(cx)
167 }
168
169 /// Poll the channel for the next item
170 ///
171 /// See [`Channel::poll_receive()`]
172 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
173 self.channel.poll_receive(cx)
146 } 174 }
147} 175}
148 176
@@ -162,16 +190,30 @@ impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
162impl<'ch, T> DynamicReceiver<'ch, T> { 190impl<'ch, T> DynamicReceiver<'ch, T> {
163 /// Receive the next value. 191 /// Receive the next value.
164 /// 192 ///
165 /// See [`Channel::recv()`]. 193 /// See [`Channel::receive()`].
166 pub fn recv(&self) -> DynamicRecvFuture<'_, T> { 194 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
167 DynamicRecvFuture { channel: self.channel } 195 DynamicReceiveFuture { channel: self.channel }
168 } 196 }
169 197
170 /// Attempt to immediately receive the next value. 198 /// Attempt to immediately receive the next value.
171 /// 199 ///
172 /// See [`Channel::try_recv()`] 200 /// See [`Channel::try_receive()`]
173 pub fn try_recv(&self) -> Result<T, TryRecvError> { 201 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
174 self.channel.try_recv_with_context(None) 202 self.channel.try_receive_with_context(None)
203 }
204
205 /// Allows a poll_fn to poll until the channel is ready to receive
206 ///
207 /// See [`Channel::poll_ready_to_receive()`]
208 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
209 self.channel.poll_ready_to_receive(cx)
210 }
211
212 /// Poll the channel for the next item
213 ///
214 /// See [`Channel::poll_receive()`]
215 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
216 self.channel.poll_receive(cx)
175 } 217 }
176} 218}
177 219
@@ -184,42 +226,39 @@ where
184 } 226 }
185} 227}
186 228
187/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. 229/// Future returned by [`Channel::receive`] and [`Receiver::receive`].
188#[must_use = "futures do nothing unless you `.await` or poll them"] 230#[must_use = "futures do nothing unless you `.await` or poll them"]
189pub struct RecvFuture<'ch, M, T, const N: usize> 231pub struct ReceiveFuture<'ch, M, T, const N: usize>
190where 232where
191 M: RawMutex, 233 M: RawMutex,
192{ 234{
193 channel: &'ch Channel<M, T, N>, 235 channel: &'ch Channel<M, T, N>,
194} 236}
195 237
196impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> 238impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
197where 239where
198 M: RawMutex, 240 M: RawMutex,
199{ 241{
200 type Output = T; 242 type Output = T;
201 243
202 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 244 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
203 match self.channel.try_recv_with_context(Some(cx)) { 245 self.channel.poll_receive(cx)
204 Ok(v) => Poll::Ready(v),
205 Err(TryRecvError::Empty) => Poll::Pending,
206 }
207 } 246 }
208} 247}
209 248
210/// Future returned by [`DynamicReceiver::recv`]. 249/// Future returned by [`DynamicReceiver::receive`].
211#[must_use = "futures do nothing unless you `.await` or poll them"] 250#[must_use = "futures do nothing unless you `.await` or poll them"]
212pub struct DynamicRecvFuture<'ch, T> { 251pub struct DynamicReceiveFuture<'ch, T> {
213 channel: &'ch dyn DynamicChannel<T>, 252 channel: &'ch dyn DynamicChannel<T>,
214} 253}
215 254
216impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { 255impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
217 type Output = T; 256 type Output = T;
218 257
219 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 258 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
220 match self.channel.try_recv_with_context(Some(cx)) { 259 match self.channel.try_receive_with_context(Some(cx)) {
221 Ok(v) => Poll::Ready(v), 260 Ok(v) => Poll::Ready(v),
222 Err(TryRecvError::Empty) => Poll::Pending, 261 Err(TryReceiveError::Empty) => Poll::Pending,
223 } 262 }
224 } 263 }
225} 264}
@@ -285,13 +324,18 @@ impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
285trait DynamicChannel<T> { 324trait DynamicChannel<T> {
286 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; 325 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
287 326
288 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; 327 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
328
329 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
330 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
331
332 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
289} 333}
290 334
291/// Error returned by [`try_recv`](Channel::try_recv). 335/// Error returned by [`try_receive`](Channel::try_receive).
292#[derive(PartialEq, Eq, Clone, Copy, Debug)] 336#[derive(PartialEq, Eq, Clone, Copy, Debug)]
293#[cfg_attr(feature = "defmt", derive(defmt::Format))] 337#[cfg_attr(feature = "defmt", derive(defmt::Format))]
294pub enum TryRecvError { 338pub enum TryReceiveError {
295 /// A message could not be received because the channel is empty. 339 /// A message could not be received because the channel is empty.
296 Empty, 340 Empty,
297} 341}
@@ -320,11 +364,11 @@ impl<T, const N: usize> ChannelState<T, N> {
320 } 364 }
321 } 365 }
322 366
323 fn try_recv(&mut self) -> Result<T, TryRecvError> { 367 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
324 self.try_recv_with_context(None) 368 self.try_receive_with_context(None)
325 } 369 }
326 370
327 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { 371 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
328 if self.queue.is_full() { 372 if self.queue.is_full() {
329 self.senders_waker.wake(); 373 self.senders_waker.wake();
330 } 374 }
@@ -335,14 +379,31 @@ impl<T, const N: usize> ChannelState<T, N> {
335 if let Some(cx) = cx { 379 if let Some(cx) = cx {
336 self.receiver_waker.register(cx.waker()); 380 self.receiver_waker.register(cx.waker());
337 } 381 }
338 Err(TryRecvError::Empty) 382 Err(TryReceiveError::Empty)
383 }
384 }
385
386 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
387 if self.queue.is_full() {
388 self.senders_waker.wake();
389 }
390
391 if let Some(message) = self.queue.pop_front() {
392 Poll::Ready(message)
393 } else {
394 self.receiver_waker.register(cx.waker());
395 Poll::Pending
339 } 396 }
340 } 397 }
341 398
342 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> bool { 399 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
343 self.receiver_waker.register(cx.waker()); 400 self.receiver_waker.register(cx.waker());
344 401
345 !self.queue.is_empty() 402 if !self.queue.is_empty() {
403 Poll::Ready(())
404 } else {
405 Poll::Pending
406 }
346 } 407 }
347 408
348 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { 409 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
@@ -364,10 +425,14 @@ impl<T, const N: usize> ChannelState<T, N> {
364 } 425 }
365 } 426 }
366 427
367 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> bool { 428 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
368 self.senders_waker.register(cx.waker()); 429 self.senders_waker.register(cx.waker());
369 430
370 !self.queue.is_full() 431 if !self.queue.is_full() {
432 Poll::Ready(())
433 } else {
434 Poll::Pending
435 }
371 } 436 }
372} 437}
373 438
@@ -409,8 +474,13 @@ where
409 self.inner.lock(|rc| f(&mut *rc.borrow_mut())) 474 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
410 } 475 }
411 476
412 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { 477 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
413 self.lock(|c| c.try_recv_with_context(cx)) 478 self.lock(|c| c.try_receive_with_context(cx))
479 }
480
481 /// Poll the channel for the next message
482 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
483 self.lock(|c| c.poll_receive(cx))
414 } 484 }
415 485
416 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { 486 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
@@ -418,12 +488,12 @@ where
418 } 488 }
419 489
420 /// Allows a poll_fn to poll until the channel is ready to receive 490 /// Allows a poll_fn to poll until the channel is ready to receive
421 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { 491 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
422 self.lock(|c| c.poll_ready_to_receive(cx)) 492 self.lock(|c| c.poll_ready_to_receive(cx))
423 } 493 }
424 494
425 /// Allows a poll_fn to poll until the channel is ready to send 495 /// Allows a poll_fn to poll until the channel is ready to send
426 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { 496 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
427 self.lock(|c| c.poll_ready_to_send(cx)) 497 self.lock(|c| c.poll_ready_to_send(cx))
428 } 498 }
429 499
@@ -466,16 +536,16 @@ where
466 /// 536 ///
467 /// If there are no messages in the channel's buffer, this method will 537 /// If there are no messages in the channel's buffer, this method will
468 /// wait until a message is sent. 538 /// wait until a message is sent.
469 pub fn recv(&self) -> RecvFuture<'_, M, T, N> { 539 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
470 RecvFuture { channel: self } 540 ReceiveFuture { channel: self }
471 } 541 }
472 542
473 /// Attempt to immediately receive a message. 543 /// Attempt to immediately receive a message.
474 /// 544 ///
475 /// This method will either receive a message from the channel immediately or return an error 545 /// This method will either receive a message from the channel immediately or return an error
476 /// if the channel is empty. 546 /// if the channel is empty.
477 pub fn try_recv(&self) -> Result<T, TryRecvError> { 547 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
478 self.lock(|c| c.try_recv()) 548 self.lock(|c| c.try_receive())
479 } 549 }
480} 550}
481 551
@@ -489,8 +559,20 @@ where
489 Channel::try_send_with_context(self, m, cx) 559 Channel::try_send_with_context(self, m, cx)
490 } 560 }
491 561
492 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { 562 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
493 Channel::try_recv_with_context(self, cx) 563 Channel::try_receive_with_context(self, cx)
564 }
565
566 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
567 Channel::poll_ready_to_send(self, cx)
568 }
569
570 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
571 Channel::poll_ready_to_receive(self, cx)
572 }
573
574 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
575 Channel::poll_receive(self, cx)
494 } 576 }
495} 577}
496 578
@@ -534,15 +616,15 @@ mod tests {
534 fn receiving_once_with_one_send() { 616 fn receiving_once_with_one_send() {
535 let mut c = ChannelState::<u32, 3>::new(); 617 let mut c = ChannelState::<u32, 3>::new();
536 assert!(c.try_send(1).is_ok()); 618 assert!(c.try_send(1).is_ok());
537 assert_eq!(c.try_recv().unwrap(), 1); 619 assert_eq!(c.try_receive().unwrap(), 1);
538 assert_eq!(capacity(&c), 3); 620 assert_eq!(capacity(&c), 3);
539 } 621 }
540 622
541 #[test] 623 #[test]
542 fn receiving_when_empty() { 624 fn receiving_when_empty() {
543 let mut c = ChannelState::<u32, 3>::new(); 625 let mut c = ChannelState::<u32, 3>::new();
544 match c.try_recv() { 626 match c.try_receive() {
545 Err(TryRecvError::Empty) => assert!(true), 627 Err(TryReceiveError::Empty) => assert!(true),
546 _ => assert!(false), 628 _ => assert!(false),
547 } 629 }
548 assert_eq!(capacity(&c), 3); 630 assert_eq!(capacity(&c), 3);
@@ -552,7 +634,7 @@ mod tests {
552 fn simple_send_and_receive() { 634 fn simple_send_and_receive() {
553 let c = Channel::<NoopRawMutex, u32, 3>::new(); 635 let c = Channel::<NoopRawMutex, u32, 3>::new();
554 assert!(c.try_send(1).is_ok()); 636 assert!(c.try_send(1).is_ok());
555 assert_eq!(c.try_recv().unwrap(), 1); 637 assert_eq!(c.try_receive().unwrap(), 1);
556 } 638 }
557 639
558 #[test] 640 #[test]
@@ -572,7 +654,7 @@ mod tests {
572 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 654 let r: DynamicReceiver<'_, u32> = c.receiver().into();
573 655
574 assert!(s.try_send(1).is_ok()); 656 assert!(s.try_send(1).is_ok());
575 assert_eq!(r.try_recv().unwrap(), 1); 657 assert_eq!(r.try_receive().unwrap(), 1);
576 } 658 }
577 659
578 #[futures_test::test] 660 #[futures_test::test]
@@ -587,14 +669,14 @@ mod tests {
587 assert!(c2.try_send(1).is_ok()); 669 assert!(c2.try_send(1).is_ok());
588 }) 670 })
589 .is_ok()); 671 .is_ok());
590 assert_eq!(c.recv().await, 1); 672 assert_eq!(c.receive().await, 1);
591 } 673 }
592 674
593 #[futures_test::test] 675 #[futures_test::test]
594 async fn sender_send_completes_if_capacity() { 676 async fn sender_send_completes_if_capacity() {
595 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); 677 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
596 c.send(1).await; 678 c.send(1).await;
597 assert_eq!(c.recv().await, 1); 679 assert_eq!(c.receive().await, 1);
598 } 680 }
599 681
600 #[futures_test::test] 682 #[futures_test::test]
@@ -612,11 +694,11 @@ mod tests {
612 // Wish I could think of a means of determining that the async send is waiting instead. 694 // Wish I could think of a means of determining that the async send is waiting instead.
613 // However, I've used the debugger to observe that the send does indeed wait. 695 // However, I've used the debugger to observe that the send does indeed wait.
614 Delay::new(Duration::from_millis(500)).await; 696 Delay::new(Duration::from_millis(500)).await;
615 assert_eq!(c.recv().await, 1); 697 assert_eq!(c.receive().await, 1);
616 assert!(executor 698 assert!(executor
617 .spawn(async move { 699 .spawn(async move {
618 loop { 700 loop {
619 c.recv().await; 701 c.receive().await;
620 } 702 }
621 }) 703 })
622 .is_ok()); 704 .is_ok());