From b658f10db9a963d85b8465759692b0aa7973a1d1 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Wed, 9 Aug 2023 11:50:26 +0200 Subject: Expose poll_ready_to_{send,receive} in Sender/Receiver --- embassy-sync/src/channel.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index d6f36f53d..3896f70ac 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -65,6 +65,13 @@ where pub fn try_send(&self, message: T) -> Result<(), TrySendError> { self.channel.try_send(message) } + + /// Allows a poll_fn to poll until the channel is ready to send + /// + /// See [`Channel::poll_ready_to_send()`] + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + self.channel.poll_ready_to_send(cx) + } } /// Send-only access to a [`Channel`] without knowing channel size. @@ -106,6 +113,13 @@ impl<'ch, T> DynamicSender<'ch, T> { pub fn try_send(&self, message: T) -> Result<(), TrySendError> { self.channel.try_send_with_context(message, None) } + + /// Allows a poll_fn to poll until the channel is ready to send + /// + /// See [`Channel::poll_ready_to_send()`] + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + self.channel.poll_ready_to_send(cx) + } } /// Receive-only access to a [`Channel`]. @@ -144,6 +158,13 @@ where pub fn try_recv(&self) -> Result { self.channel.try_recv() } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`Channel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + self.channel.poll_ready_to_receive(cx) + } } /// Receive-only access to a [`Channel`] without knowing channel size. @@ -173,6 +194,13 @@ impl<'ch, T> DynamicReceiver<'ch, T> { pub fn try_recv(&self) -> Result { self.channel.try_recv_with_context(None) } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`Channel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + self.channel.poll_ready_to_receive(cx) + } } impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> @@ -286,6 +314,9 @@ trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; + + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool; + fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool; } /// Error returned by [`try_recv`](Channel::try_recv). @@ -492,6 +523,14 @@ where fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { Channel::try_recv_with_context(self, cx) } + + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + Channel::poll_ready_to_send(self, cx) + } + + fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + Channel::poll_ready_to_receive(self, cx) + } } #[cfg(test)] -- cgit From f9d251cd5cd9c84718cd66e7697a8502c4d61a0a Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 11:15:17 +0200 Subject: Channel poll methods return Poll instead of bool --- embassy-sync/src/channel.rs | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 3896f70ac..e7224856c 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -69,7 +69,7 @@ where /// Allows a poll_fn to poll until the channel is ready to send /// /// See [`Channel::poll_ready_to_send()`] - pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_send(cx) } } @@ -117,7 +117,7 @@ impl<'ch, T> DynamicSender<'ch, T> { /// Allows a poll_fn to poll until the channel is ready to send /// /// See [`Channel::poll_ready_to_send()`] - pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_send(cx) } } @@ -162,7 +162,7 @@ where /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`Channel::poll_ready_to_receive()`] - pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_receive(cx) } } @@ -198,7 +198,7 @@ impl<'ch, T> DynamicReceiver<'ch, T> { /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`Channel::poll_ready_to_receive()`] - pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_receive(cx) } } @@ -315,8 +315,8 @@ trait DynamicChannel { fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; - fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool; - fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool; + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; + fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; } /// Error returned by [`try_recv`](Channel::try_recv). @@ -370,10 +370,14 @@ impl ChannelState { } } - fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> bool { + fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { self.receiver_waker.register(cx.waker()); - !self.queue.is_empty() + if !self.queue.is_empty() { + Poll::Ready(()) + } else { + Poll::Pending + } } fn try_send(&mut self, message: T) -> Result<(), TrySendError> { @@ -395,10 +399,14 @@ impl ChannelState { } } - fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> bool { + fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> { self.senders_waker.register(cx.waker()); - !self.queue.is_full() + if !self.queue.is_full() { + Poll::Ready(()) + } else { + Poll::Pending + } } } @@ -449,12 +457,12 @@ where } /// Allows a poll_fn to poll until the channel is ready to receive - pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.lock(|c| c.poll_ready_to_receive(cx)) } /// Allows a poll_fn to poll until the channel is ready to send - pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { self.lock(|c| c.poll_ready_to_send(cx)) } @@ -524,11 +532,11 @@ where Channel::try_recv_with_context(self, cx) } - fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { Channel::poll_ready_to_send(self, cx) } - fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { Channel::poll_ready_to_receive(self, cx) } } -- cgit From b1ec460b9af131ef80fcafd79a7f63aa326aaf94 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 11:30:29 +0200 Subject: Implement Channel::poll_receive(..) -> Poll --- embassy-sync/src/channel.rs | 43 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index e7224856c..dc727fb10 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -165,6 +165,13 @@ where pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_receive(cx) } + + /// Poll the channel for the next item + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } } /// Receive-only access to a [`Channel`] without knowing channel size. @@ -201,6 +208,13 @@ impl<'ch, T> DynamicReceiver<'ch, T> { pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_receive(cx) } + + /// Poll the channel for the next item + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } } impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> @@ -228,10 +242,7 @@ where type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } + self.channel.poll_receive(cx) } } @@ -317,6 +328,8 @@ trait DynamicChannel { fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; + + fn poll_receive(&self, cx: &mut Context<'_>) -> Poll; } /// Error returned by [`try_recv`](Channel::try_recv). @@ -370,6 +383,19 @@ impl ChannelState { } } + fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop_front() { + Poll::Ready(message) + } else { + self.receiver_waker.register(cx.waker()); + Poll::Pending + } + } + fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { self.receiver_waker.register(cx.waker()); @@ -452,6 +478,11 @@ where self.lock(|c| c.try_recv_with_context(cx)) } + /// Poll the channel for the next message + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.lock(|c| c.poll_receive(cx)) + } + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { self.lock(|c| c.try_send_with_context(m, cx)) } @@ -539,6 +570,10 @@ where fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { Channel::poll_ready_to_receive(self, cx) } + + fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + Channel::poll_receive(self, cx) + } } #[cfg(test)] -- cgit From 8655ba110c95777c3edab3aabef34ae71d64957a Mon Sep 17 00:00:00 2001 From: Dániel Buga Date: Tue, 15 Aug 2023 19:13:36 +0200 Subject: Fix typo --- embassy-sync/src/blocking_mutex/raw.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs index 15796f1b2..a8afcad34 100644 --- a/embassy-sync/src/blocking_mutex/raw.rs +++ b/embassy-sync/src/blocking_mutex/raw.rs @@ -11,7 +11,7 @@ use core::marker::PhantomData; /// /// Note that, unlike other mutexes, implementations only guarantee no /// concurrent access from other threads: concurrent access from the current -/// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly. +/// thread is allowed. For example, it's possible to lock the same mutex multiple times reentrantly. /// /// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access /// to the data, it is not enough to guarantee exclusive (`&mut`) access. -- cgit From c39671266e21dd9e35e60cc680453cd5c38162db Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 11:58:22 +0200 Subject: Deprecate *recv* in favor of *receive* --- embassy-sync/src/channel.rs | 88 ++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 44 deletions(-) (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index dc727fb10..62ea1307d 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -147,16 +147,16 @@ where { /// Receive the next value. /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - self.channel.recv() + /// See [`Channel::receive()`]. + pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> { + self.channel.receive() } /// Attempt to immediately receive the next value. /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv() + /// See [`Channel::try_receive()`] + pub fn try_receive(&self) -> Result { + self.channel.try_receive() } /// Allows a poll_fn to poll until the channel is ready to receive @@ -190,16 +190,16 @@ impl<'ch, T> Copy for DynamicReceiver<'ch, T> {} impl<'ch, T> DynamicReceiver<'ch, T> { /// Receive the next value. /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> DynamicRecvFuture<'_, T> { - DynamicRecvFuture { channel: self.channel } + /// See [`Channel::receive()`]. + pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { + DynamicReceiveFuture { channel: self.channel } } /// Attempt to immediately receive the next value. /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv_with_context(None) + /// See [`Channel::try_receive()`] + pub fn try_receive(&self) -> Result { + self.channel.try_receive_with_context(None) } /// Allows a poll_fn to poll until the channel is ready to receive @@ -226,16 +226,16 @@ where } } -/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. +/// Future returned by [`Channel::receive`] and [`Receiver::receive`]. #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct RecvFuture<'ch, M, T, const N: usize> +pub struct ReceiveFuture<'ch, M, T, const N: usize> where M: RawMutex, { channel: &'ch Channel, } -impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N> where M: RawMutex, { @@ -246,19 +246,19 @@ where } } -/// Future returned by [`DynamicReceiver::recv`]. +/// Future returned by [`DynamicReceiver::receive`]. #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct DynamicRecvFuture<'ch, T> { +pub struct DynamicReceiveFuture<'ch, T> { channel: &'ch dyn DynamicChannel, } -impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { +impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { + match self.channel.try_receive_with_context(Some(cx)) { Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, + Err(TryReceiveError::Empty) => Poll::Pending, } } } @@ -324,7 +324,7 @@ impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} trait DynamicChannel { fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; + fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; @@ -332,10 +332,10 @@ trait DynamicChannel { fn poll_receive(&self, cx: &mut Context<'_>) -> Poll; } -/// Error returned by [`try_recv`](Channel::try_recv). +/// Error returned by [`try_receive`](Channel::try_receive). #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryRecvError { +pub enum TryReceiveError { /// A message could not be received because the channel is empty. Empty, } @@ -364,11 +364,11 @@ impl ChannelState { } } - fn try_recv(&mut self) -> Result { - self.try_recv_with_context(None) + fn try_receive(&mut self) -> Result { + self.try_receive_with_context(None) } - fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { + fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { if self.queue.is_full() { self.senders_waker.wake(); } @@ -379,7 +379,7 @@ impl ChannelState { if let Some(cx) = cx { self.receiver_waker.register(cx.waker()); } - Err(TryRecvError::Empty) + Err(TryReceiveError::Empty) } } @@ -474,8 +474,8 @@ where self.inner.lock(|rc| f(&mut *rc.borrow_mut())) } - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - self.lock(|c| c.try_recv_with_context(cx)) + fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + self.lock(|c| c.try_receive_with_context(cx)) } /// Poll the channel for the next message @@ -536,16 +536,16 @@ where /// /// If there are no messages in the channel's buffer, this method will /// wait until a message is sent. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - RecvFuture { channel: self } + pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> { + ReceiveFuture { channel: self } } /// Attempt to immediately receive a message. /// /// This method will either receive a message from the channel immediately or return an error /// if the channel is empty. - pub fn try_recv(&self) -> Result { - self.lock(|c| c.try_recv()) + pub fn try_receive(&self) -> Result { + self.lock(|c| c.try_receive()) } } @@ -559,8 +559,8 @@ where Channel::try_send_with_context(self, m, cx) } - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - Channel::try_recv_with_context(self, cx) + fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + Channel::try_receive_with_context(self, cx) } fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { @@ -616,15 +616,15 @@ mod tests { fn receiving_once_with_one_send() { let mut c = ChannelState::::new(); assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); + assert_eq!(c.try_receive().unwrap(), 1); assert_eq!(capacity(&c), 3); } #[test] fn receiving_when_empty() { let mut c = ChannelState::::new(); - match c.try_recv() { - Err(TryRecvError::Empty) => assert!(true), + match c.try_receive() { + Err(TryReceiveError::Empty) => assert!(true), _ => assert!(false), } assert_eq!(capacity(&c), 3); @@ -634,7 +634,7 @@ mod tests { fn simple_send_and_receive() { let c = Channel::::new(); assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); + assert_eq!(c.try_receive().unwrap(), 1); } #[test] @@ -654,7 +654,7 @@ mod tests { let r: DynamicReceiver<'_, u32> = c.receiver().into(); assert!(s.try_send(1).is_ok()); - assert_eq!(r.try_recv().unwrap(), 1); + assert_eq!(r.try_receive().unwrap(), 1); } #[futures_test::test] @@ -669,14 +669,14 @@ mod tests { assert!(c2.try_send(1).is_ok()); }) .is_ok()); - assert_eq!(c.recv().await, 1); + assert_eq!(c.receive().await, 1); } #[futures_test::test] async fn sender_send_completes_if_capacity() { let c = Channel::::new(); c.send(1).await; - assert_eq!(c.recv().await, 1); + assert_eq!(c.receive().await, 1); } #[futures_test::test] @@ -694,11 +694,11 @@ mod tests { // Wish I could think of a means of determining that the async send is waiting instead. // However, I've used the debugger to observe that the send does indeed wait. Delay::new(Duration::from_millis(500)).await; - assert_eq!(c.recv().await, 1); + assert_eq!(c.receive().await, 1); assert!(executor .spawn(async move { loop { - c.recv().await; + c.receive().await; } }) .is_ok()); -- cgit