diff options
| author | Dario Nieuwenhuis <[email protected]> | 2023-08-22 21:25:29 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-08-22 21:25:29 +0000 |
| commit | b3212ae383e2cb4f167cbe9361b64bf83bf89049 (patch) | |
| tree | 7ee528ade33153b92e3b26d4255119742dbf8cb0 | |
| parent | b436aad2a0116b93ccb84b9ffd92aac082a0eeab (diff) | |
| parent | c39671266e21dd9e35e60cc680453cd5c38162db (diff) | |
Merge pull request #1763 from rubdos/sender-receiver-with-ctx
Refactor Channel/Sender/Receiver poll methods
| -rw-r--r-- | embassy-stm32-wpan/src/mac/driver.rs | 10 | ||||
| -rw-r--r-- | embassy-stm32-wpan/src/mac/runner.rs | 2 | ||||
| -rw-r--r-- | embassy-stm32/src/can/bxcan.rs | 13 | ||||
| -rw-r--r-- | embassy-sync/src/channel.rs | 190 | ||||
| -rw-r--r-- | examples/nrf52840/src/bin/channel.rs | 2 | ||||
| -rw-r--r-- | examples/nrf52840/src/bin/channel_sender_receiver.rs | 2 | ||||
| -rw-r--r-- | examples/nrf52840/src/bin/uart_split.rs | 2 | ||||
| -rw-r--r-- | examples/rp/src/bin/lora_p2p_send_multicore.rs | 2 | ||||
| -rw-r--r-- | examples/rp/src/bin/multicore.rs | 2 | ||||
| -rw-r--r-- | examples/stm32f3/src/bin/button_events.rs | 4 | ||||
| -rw-r--r-- | examples/stm32h5/src/bin/usart_split.rs | 2 | ||||
| -rw-r--r-- | examples/stm32h7/src/bin/usart_split.rs | 2 | ||||
| -rw-r--r-- | tests/rp/src/bin/gpio_multicore.rs | 6 | ||||
| -rw-r--r-- | tests/rp/src/bin/multicore.rs | 4 |
14 files changed, 160 insertions, 83 deletions
diff --git a/embassy-stm32-wpan/src/mac/driver.rs b/embassy-stm32-wpan/src/mac/driver.rs index f8e3a2b08..bfc4f1ee8 100644 --- a/embassy-stm32-wpan/src/mac/driver.rs +++ b/embassy-stm32-wpan/src/mac/driver.rs | |||
| @@ -28,7 +28,9 @@ impl<'d> embassy_net_driver::Driver for Driver<'d> { | |||
| 28 | type TxToken<'a> = TxToken<'d> where Self: 'a; | 28 | type TxToken<'a> = TxToken<'d> where Self: 'a; |
| 29 | 29 | ||
| 30 | fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { | 30 | fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { |
| 31 | if self.runner.rx_channel.poll_ready_to_receive(cx) && self.runner.tx_buf_channel.poll_ready_to_receive(cx) { | 31 | if self.runner.rx_channel.poll_ready_to_receive(cx).is_ready() |
| 32 | && self.runner.tx_buf_channel.poll_ready_to_receive(cx).is_ready() | ||
| 33 | { | ||
| 32 | Some(( | 34 | Some(( |
| 33 | RxToken { | 35 | RxToken { |
| 34 | rx: &self.runner.rx_channel, | 36 | rx: &self.runner.rx_channel, |
| @@ -44,7 +46,7 @@ impl<'d> embassy_net_driver::Driver for Driver<'d> { | |||
| 44 | } | 46 | } |
| 45 | 47 | ||
| 46 | fn transmit(&mut self, cx: &mut Context) -> Option<Self::TxToken<'_>> { | 48 | fn transmit(&mut self, cx: &mut Context) -> Option<Self::TxToken<'_>> { |
| 47 | if self.runner.tx_buf_channel.poll_ready_to_receive(cx) { | 49 | if self.runner.tx_buf_channel.poll_ready_to_receive(cx).is_ready() { |
| 48 | Some(TxToken { | 50 | Some(TxToken { |
| 49 | tx: &self.runner.tx_channel, | 51 | tx: &self.runner.tx_channel, |
| 50 | tx_buf: &self.runner.tx_buf_channel, | 52 | tx_buf: &self.runner.tx_buf_channel, |
| @@ -91,7 +93,7 @@ impl<'d> embassy_net_driver::RxToken for RxToken<'d> { | |||
| 91 | { | 93 | { |
| 92 | // Only valid data events should be put into the queue | 94 | // Only valid data events should be put into the queue |
| 93 | 95 | ||
| 94 | let data_event = match self.rx.try_recv().unwrap() { | 96 | let data_event = match self.rx.try_receive().unwrap() { |
| 95 | MacEvent::McpsDataInd(data_event) => data_event, | 97 | MacEvent::McpsDataInd(data_event) => data_event, |
| 96 | _ => unreachable!(), | 98 | _ => unreachable!(), |
| 97 | }; | 99 | }; |
| @@ -111,7 +113,7 @@ impl<'d> embassy_net_driver::TxToken for TxToken<'d> { | |||
| 111 | F: FnOnce(&mut [u8]) -> R, | 113 | F: FnOnce(&mut [u8]) -> R, |
| 112 | { | 114 | { |
| 113 | // Only valid tx buffers should be put into the queue | 115 | // Only valid tx buffers should be put into the queue |
| 114 | let buf = self.tx_buf.try_recv().unwrap(); | 116 | let buf = self.tx_buf.try_receive().unwrap(); |
| 115 | let r = f(&mut buf[..len]); | 117 | let r = f(&mut buf[..len]); |
| 116 | 118 | ||
| 117 | // The tx channel should always be of equal capacity to the tx_buf channel | 119 | // The tx channel should always be of equal capacity to the tx_buf channel |
diff --git a/embassy-stm32-wpan/src/mac/runner.rs b/embassy-stm32-wpan/src/mac/runner.rs index 1be6df8a4..d3099b6b7 100644 --- a/embassy-stm32-wpan/src/mac/runner.rs +++ b/embassy-stm32-wpan/src/mac/runner.rs | |||
| @@ -73,7 +73,7 @@ impl<'a> Runner<'a> { | |||
| 73 | let mut msdu_handle = 0x02; | 73 | let mut msdu_handle = 0x02; |
| 74 | 74 | ||
| 75 | loop { | 75 | loop { |
| 76 | let (buf, len) = self.tx_channel.recv().await; | 76 | let (buf, len) = self.tx_channel.receive().await; |
| 77 | let _wm = self.write_mutex.lock().await; | 77 | let _wm = self.write_mutex.lock().await; |
| 78 | 78 | ||
| 79 | // The mutex should be dropped on the next loop iteration | 79 | // The mutex should be dropped on the next loop iteration |
diff --git a/embassy-stm32/src/can/bxcan.rs b/embassy-stm32/src/can/bxcan.rs index fb223e4a9..7ad13cece 100644 --- a/embassy-stm32/src/can/bxcan.rs +++ b/embassy-stm32/src/can/bxcan.rs | |||
| @@ -478,7 +478,7 @@ impl<'c, 'd, T: Instance> CanRx<'c, 'd, T> { | |||
| 478 | pub async fn read(&mut self) -> Result<Envelope, BusError> { | 478 | pub async fn read(&mut self) -> Result<Envelope, BusError> { |
| 479 | poll_fn(|cx| { | 479 | poll_fn(|cx| { |
| 480 | T::state().err_waker.register(cx.waker()); | 480 | T::state().err_waker.register(cx.waker()); |
| 481 | if let Poll::Ready(envelope) = T::state().rx_queue.recv().poll_unpin(cx) { | 481 | if let Poll::Ready(envelope) = T::state().rx_queue.receive().poll_unpin(cx) { |
| 482 | return Poll::Ready(Ok(envelope)); | 482 | return Poll::Ready(Ok(envelope)); |
| 483 | } else if let Some(err) = self.curr_error() { | 483 | } else if let Some(err) = self.curr_error() { |
| 484 | return Poll::Ready(Err(err)); | 484 | return Poll::Ready(Err(err)); |
| @@ -493,7 +493,7 @@ impl<'c, 'd, T: Instance> CanRx<'c, 'd, T> { | |||
| 493 | /// | 493 | /// |
| 494 | /// Returns [Err(TryReadError::Empty)] if there are no frames in the rx queue. | 494 | /// Returns [Err(TryReadError::Empty)] if there are no frames in the rx queue. |
| 495 | pub fn try_read(&mut self) -> Result<Envelope, TryReadError> { | 495 | pub fn try_read(&mut self) -> Result<Envelope, TryReadError> { |
| 496 | if let Ok(envelope) = T::state().rx_queue.try_recv() { | 496 | if let Ok(envelope) = T::state().rx_queue.try_receive() { |
| 497 | return Ok(envelope); | 497 | return Ok(envelope); |
| 498 | } | 498 | } |
| 499 | 499 | ||
| @@ -506,14 +506,7 @@ impl<'c, 'd, T: Instance> CanRx<'c, 'd, T> { | |||
| 506 | 506 | ||
| 507 | /// Waits while receive queue is empty. | 507 | /// Waits while receive queue is empty. |
| 508 | pub async fn wait_not_empty(&mut self) { | 508 | pub async fn wait_not_empty(&mut self) { |
| 509 | poll_fn(|cx| { | 509 | poll_fn(|cx| T::state().rx_queue.poll_ready_to_receive(cx)).await |
| 510 | if T::state().rx_queue.poll_ready_to_receive(cx) { | ||
| 511 | Poll::Ready(()) | ||
| 512 | } else { | ||
| 513 | Poll::Pending | ||
| 514 | } | ||
| 515 | }) | ||
| 516 | .await | ||
| 517 | } | 510 | } |
| 518 | 511 | ||
| 519 | fn curr_error(&self) -> Option<BusError> { | 512 | fn curr_error(&self) -> Option<BusError> { |
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index d6f36f53d..62ea1307d 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -65,6 +65,13 @@ where | |||
| 65 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | 65 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
| 66 | self.channel.try_send(message) | 66 | self.channel.try_send(message) |
| 67 | } | 67 | } |
| 68 | |||
| 69 | /// Allows a poll_fn to poll until the channel is ready to send | ||
| 70 | /// | ||
| 71 | /// See [`Channel::poll_ready_to_send()`] | ||
| 72 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 73 | self.channel.poll_ready_to_send(cx) | ||
| 74 | } | ||
| 68 | } | 75 | } |
| 69 | 76 | ||
| 70 | /// Send-only access to a [`Channel`] without knowing channel size. | 77 | /// Send-only access to a [`Channel`] without knowing channel size. |
| @@ -106,6 +113,13 @@ impl<'ch, T> DynamicSender<'ch, T> { | |||
| 106 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | 113 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
| 107 | self.channel.try_send_with_context(message, None) | 114 | self.channel.try_send_with_context(message, None) |
| 108 | } | 115 | } |
| 116 | |||
| 117 | /// Allows a poll_fn to poll until the channel is ready to send | ||
| 118 | /// | ||
| 119 | /// See [`Channel::poll_ready_to_send()`] | ||
| 120 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 121 | self.channel.poll_ready_to_send(cx) | ||
| 122 | } | ||
| 109 | } | 123 | } |
| 110 | 124 | ||
| 111 | /// Receive-only access to a [`Channel`]. | 125 | /// Receive-only access to a [`Channel`]. |
| @@ -133,16 +147,30 @@ where | |||
| 133 | { | 147 | { |
| 134 | /// Receive the next value. | 148 | /// Receive the next value. |
| 135 | /// | 149 | /// |
| 136 | /// See [`Channel::recv()`]. | 150 | /// See [`Channel::receive()`]. |
| 137 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | 151 | pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> { |
| 138 | self.channel.recv() | 152 | self.channel.receive() |
| 139 | } | 153 | } |
| 140 | 154 | ||
| 141 | /// Attempt to immediately receive the next value. | 155 | /// Attempt to immediately receive the next value. |
| 142 | /// | 156 | /// |
| 143 | /// See [`Channel::try_recv()`] | 157 | /// See [`Channel::try_receive()`] |
| 144 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | 158 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { |
| 145 | self.channel.try_recv() | 159 | self.channel.try_receive() |
| 160 | } | ||
| 161 | |||
| 162 | /// Allows a poll_fn to poll until the channel is ready to receive | ||
| 163 | /// | ||
| 164 | /// See [`Channel::poll_ready_to_receive()`] | ||
| 165 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 166 | self.channel.poll_ready_to_receive(cx) | ||
| 167 | } | ||
| 168 | |||
| 169 | /// Poll the channel for the next item | ||
| 170 | /// | ||
| 171 | /// See [`Channel::poll_receive()`] | ||
| 172 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 173 | self.channel.poll_receive(cx) | ||
| 146 | } | 174 | } |
| 147 | } | 175 | } |
| 148 | 176 | ||
| @@ -162,16 +190,30 @@ impl<'ch, T> Copy for DynamicReceiver<'ch, T> {} | |||
| 162 | impl<'ch, T> DynamicReceiver<'ch, T> { | 190 | impl<'ch, T> DynamicReceiver<'ch, T> { |
| 163 | /// Receive the next value. | 191 | /// Receive the next value. |
| 164 | /// | 192 | /// |
| 165 | /// See [`Channel::recv()`]. | 193 | /// See [`Channel::receive()`]. |
| 166 | pub fn recv(&self) -> DynamicRecvFuture<'_, T> { | 194 | pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { |
| 167 | DynamicRecvFuture { channel: self.channel } | 195 | DynamicReceiveFuture { channel: self.channel } |
| 168 | } | 196 | } |
| 169 | 197 | ||
| 170 | /// Attempt to immediately receive the next value. | 198 | /// Attempt to immediately receive the next value. |
| 171 | /// | 199 | /// |
| 172 | /// See [`Channel::try_recv()`] | 200 | /// See [`Channel::try_receive()`] |
| 173 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | 201 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { |
| 174 | self.channel.try_recv_with_context(None) | 202 | self.channel.try_receive_with_context(None) |
| 203 | } | ||
| 204 | |||
| 205 | /// Allows a poll_fn to poll until the channel is ready to receive | ||
| 206 | /// | ||
| 207 | /// See [`Channel::poll_ready_to_receive()`] | ||
| 208 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 209 | self.channel.poll_ready_to_receive(cx) | ||
| 210 | } | ||
| 211 | |||
| 212 | /// Poll the channel for the next item | ||
| 213 | /// | ||
| 214 | /// See [`Channel::poll_receive()`] | ||
| 215 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 216 | self.channel.poll_receive(cx) | ||
| 175 | } | 217 | } |
| 176 | } | 218 | } |
| 177 | 219 | ||
| @@ -184,42 +226,39 @@ where | |||
| 184 | } | 226 | } |
| 185 | } | 227 | } |
| 186 | 228 | ||
| 187 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. | 229 | /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. |
| 188 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 230 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 189 | pub struct RecvFuture<'ch, M, T, const N: usize> | 231 | pub struct ReceiveFuture<'ch, M, T, const N: usize> |
| 190 | where | 232 | where |
| 191 | M: RawMutex, | 233 | M: RawMutex, |
| 192 | { | 234 | { |
| 193 | channel: &'ch Channel<M, T, N>, | 235 | channel: &'ch Channel<M, T, N>, |
| 194 | } | 236 | } |
| 195 | 237 | ||
| 196 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | 238 | impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N> |
| 197 | where | 239 | where |
| 198 | M: RawMutex, | 240 | M: RawMutex, |
| 199 | { | 241 | { |
| 200 | type Output = T; | 242 | type Output = T; |
| 201 | 243 | ||
| 202 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | 244 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
| 203 | match self.channel.try_recv_with_context(Some(cx)) { | 245 | self.channel.poll_receive(cx) |
| 204 | Ok(v) => Poll::Ready(v), | ||
| 205 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 206 | } | ||
| 207 | } | 246 | } |
| 208 | } | 247 | } |
| 209 | 248 | ||
| 210 | /// Future returned by [`DynamicReceiver::recv`]. | 249 | /// Future returned by [`DynamicReceiver::receive`]. |
| 211 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 250 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 212 | pub struct DynamicRecvFuture<'ch, T> { | 251 | pub struct DynamicReceiveFuture<'ch, T> { |
| 213 | channel: &'ch dyn DynamicChannel<T>, | 252 | channel: &'ch dyn DynamicChannel<T>, |
| 214 | } | 253 | } |
| 215 | 254 | ||
| 216 | impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | 255 | impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { |
| 217 | type Output = T; | 256 | type Output = T; |
| 218 | 257 | ||
| 219 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | 258 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { |
| 220 | match self.channel.try_recv_with_context(Some(cx)) { | 259 | match self.channel.try_receive_with_context(Some(cx)) { |
| 221 | Ok(v) => Poll::Ready(v), | 260 | Ok(v) => Poll::Ready(v), |
| 222 | Err(TryRecvError::Empty) => Poll::Pending, | 261 | Err(TryReceiveError::Empty) => Poll::Pending, |
| 223 | } | 262 | } |
| 224 | } | 263 | } |
| 225 | } | 264 | } |
| @@ -285,13 +324,18 @@ impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} | |||
| 285 | trait DynamicChannel<T> { | 324 | trait DynamicChannel<T> { |
| 286 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; | 325 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; |
| 287 | 326 | ||
| 288 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; | 327 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; |
| 328 | |||
| 329 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; | ||
| 330 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; | ||
| 331 | |||
| 332 | fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>; | ||
| 289 | } | 333 | } |
| 290 | 334 | ||
| 291 | /// Error returned by [`try_recv`](Channel::try_recv). | 335 | /// Error returned by [`try_receive`](Channel::try_receive). |
| 292 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | 336 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
| 293 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | 337 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| 294 | pub enum TryRecvError { | 338 | pub enum TryReceiveError { |
| 295 | /// A message could not be received because the channel is empty. | 339 | /// A message could not be received because the channel is empty. |
| 296 | Empty, | 340 | Empty, |
| 297 | } | 341 | } |
| @@ -320,11 +364,11 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 320 | } | 364 | } |
| 321 | } | 365 | } |
| 322 | 366 | ||
| 323 | fn try_recv(&mut self) -> Result<T, TryRecvError> { | 367 | fn try_receive(&mut self) -> Result<T, TryReceiveError> { |
| 324 | self.try_recv_with_context(None) | 368 | self.try_receive_with_context(None) |
| 325 | } | 369 | } |
| 326 | 370 | ||
| 327 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | 371 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 328 | if self.queue.is_full() { | 372 | if self.queue.is_full() { |
| 329 | self.senders_waker.wake(); | 373 | self.senders_waker.wake(); |
| 330 | } | 374 | } |
| @@ -335,14 +379,31 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 335 | if let Some(cx) = cx { | 379 | if let Some(cx) = cx { |
| 336 | self.receiver_waker.register(cx.waker()); | 380 | self.receiver_waker.register(cx.waker()); |
| 337 | } | 381 | } |
| 338 | Err(TryRecvError::Empty) | 382 | Err(TryReceiveError::Empty) |
| 383 | } | ||
| 384 | } | ||
| 385 | |||
| 386 | fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 387 | if self.queue.is_full() { | ||
| 388 | self.senders_waker.wake(); | ||
| 389 | } | ||
| 390 | |||
| 391 | if let Some(message) = self.queue.pop_front() { | ||
| 392 | Poll::Ready(message) | ||
| 393 | } else { | ||
| 394 | self.receiver_waker.register(cx.waker()); | ||
| 395 | Poll::Pending | ||
| 339 | } | 396 | } |
| 340 | } | 397 | } |
| 341 | 398 | ||
| 342 | fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> bool { | 399 | fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
| 343 | self.receiver_waker.register(cx.waker()); | 400 | self.receiver_waker.register(cx.waker()); |
| 344 | 401 | ||
| 345 | !self.queue.is_empty() | 402 | if !self.queue.is_empty() { |
| 403 | Poll::Ready(()) | ||
| 404 | } else { | ||
| 405 | Poll::Pending | ||
| 406 | } | ||
| 346 | } | 407 | } |
| 347 | 408 | ||
| 348 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { | 409 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { |
| @@ -364,10 +425,14 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 364 | } | 425 | } |
| 365 | } | 426 | } |
| 366 | 427 | ||
| 367 | fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> bool { | 428 | fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
| 368 | self.senders_waker.register(cx.waker()); | 429 | self.senders_waker.register(cx.waker()); |
| 369 | 430 | ||
| 370 | !self.queue.is_full() | 431 | if !self.queue.is_full() { |
| 432 | Poll::Ready(()) | ||
| 433 | } else { | ||
| 434 | Poll::Pending | ||
| 435 | } | ||
| 371 | } | 436 | } |
| 372 | } | 437 | } |
| 373 | 438 | ||
| @@ -409,8 +474,13 @@ where | |||
| 409 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | 474 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) |
| 410 | } | 475 | } |
| 411 | 476 | ||
| 412 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | 477 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 413 | self.lock(|c| c.try_recv_with_context(cx)) | 478 | self.lock(|c| c.try_receive_with_context(cx)) |
| 479 | } | ||
| 480 | |||
| 481 | /// Poll the channel for the next message | ||
| 482 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 483 | self.lock(|c| c.poll_receive(cx)) | ||
| 414 | } | 484 | } |
| 415 | 485 | ||
| 416 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | 486 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { |
| @@ -418,12 +488,12 @@ where | |||
| 418 | } | 488 | } |
| 419 | 489 | ||
| 420 | /// Allows a poll_fn to poll until the channel is ready to receive | 490 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 421 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { | 491 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 422 | self.lock(|c| c.poll_ready_to_receive(cx)) | 492 | self.lock(|c| c.poll_ready_to_receive(cx)) |
| 423 | } | 493 | } |
| 424 | 494 | ||
| 425 | /// Allows a poll_fn to poll until the channel is ready to send | 495 | /// Allows a poll_fn to poll until the channel is ready to send |
| 426 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { | 496 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 427 | self.lock(|c| c.poll_ready_to_send(cx)) | 497 | self.lock(|c| c.poll_ready_to_send(cx)) |
| 428 | } | 498 | } |
| 429 | 499 | ||
| @@ -466,16 +536,16 @@ where | |||
| 466 | /// | 536 | /// |
| 467 | /// If there are no messages in the channel's buffer, this method will | 537 | /// If there are no messages in the channel's buffer, this method will |
| 468 | /// wait until a message is sent. | 538 | /// wait until a message is sent. |
| 469 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | 539 | pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> { |
| 470 | RecvFuture { channel: self } | 540 | ReceiveFuture { channel: self } |
| 471 | } | 541 | } |
| 472 | 542 | ||
| 473 | /// Attempt to immediately receive a message. | 543 | /// Attempt to immediately receive a message. |
| 474 | /// | 544 | /// |
| 475 | /// This method will either receive a message from the channel immediately or return an error | 545 | /// This method will either receive a message from the channel immediately or return an error |
| 476 | /// if the channel is empty. | 546 | /// if the channel is empty. |
| 477 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | 547 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { |
| 478 | self.lock(|c| c.try_recv()) | 548 | self.lock(|c| c.try_receive()) |
| 479 | } | 549 | } |
| 480 | } | 550 | } |
| 481 | 551 | ||
| @@ -489,8 +559,20 @@ where | |||
| 489 | Channel::try_send_with_context(self, m, cx) | 559 | Channel::try_send_with_context(self, m, cx) |
| 490 | } | 560 | } |
| 491 | 561 | ||
| 492 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | 562 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 493 | Channel::try_recv_with_context(self, cx) | 563 | Channel::try_receive_with_context(self, cx) |
| 564 | } | ||
| 565 | |||
| 566 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 567 | Channel::poll_ready_to_send(self, cx) | ||
| 568 | } | ||
| 569 | |||
| 570 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 571 | Channel::poll_ready_to_receive(self, cx) | ||
| 572 | } | ||
| 573 | |||
| 574 | fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 575 | Channel::poll_receive(self, cx) | ||
| 494 | } | 576 | } |
| 495 | } | 577 | } |
| 496 | 578 | ||
| @@ -534,15 +616,15 @@ mod tests { | |||
| 534 | fn receiving_once_with_one_send() { | 616 | fn receiving_once_with_one_send() { |
| 535 | let mut c = ChannelState::<u32, 3>::new(); | 617 | let mut c = ChannelState::<u32, 3>::new(); |
| 536 | assert!(c.try_send(1).is_ok()); | 618 | assert!(c.try_send(1).is_ok()); |
| 537 | assert_eq!(c.try_recv().unwrap(), 1); | 619 | assert_eq!(c.try_receive().unwrap(), 1); |
| 538 | assert_eq!(capacity(&c), 3); | 620 | assert_eq!(capacity(&c), 3); |
| 539 | } | 621 | } |
| 540 | 622 | ||
| 541 | #[test] | 623 | #[test] |
| 542 | fn receiving_when_empty() { | 624 | fn receiving_when_empty() { |
| 543 | let mut c = ChannelState::<u32, 3>::new(); | 625 | let mut c = ChannelState::<u32, 3>::new(); |
| 544 | match c.try_recv() { | 626 | match c.try_receive() { |
| 545 | Err(TryRecvError::Empty) => assert!(true), | 627 | Err(TryReceiveError::Empty) => assert!(true), |
| 546 | _ => assert!(false), | 628 | _ => assert!(false), |
| 547 | } | 629 | } |
| 548 | assert_eq!(capacity(&c), 3); | 630 | assert_eq!(capacity(&c), 3); |
| @@ -552,7 +634,7 @@ mod tests { | |||
| 552 | fn simple_send_and_receive() { | 634 | fn simple_send_and_receive() { |
| 553 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | 635 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
| 554 | assert!(c.try_send(1).is_ok()); | 636 | assert!(c.try_send(1).is_ok()); |
| 555 | assert_eq!(c.try_recv().unwrap(), 1); | 637 | assert_eq!(c.try_receive().unwrap(), 1); |
| 556 | } | 638 | } |
| 557 | 639 | ||
| 558 | #[test] | 640 | #[test] |
| @@ -572,7 +654,7 @@ mod tests { | |||
| 572 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | 654 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
| 573 | 655 | ||
| 574 | assert!(s.try_send(1).is_ok()); | 656 | assert!(s.try_send(1).is_ok()); |
| 575 | assert_eq!(r.try_recv().unwrap(), 1); | 657 | assert_eq!(r.try_receive().unwrap(), 1); |
| 576 | } | 658 | } |
| 577 | 659 | ||
| 578 | #[futures_test::test] | 660 | #[futures_test::test] |
| @@ -587,14 +669,14 @@ mod tests { | |||
| 587 | assert!(c2.try_send(1).is_ok()); | 669 | assert!(c2.try_send(1).is_ok()); |
| 588 | }) | 670 | }) |
| 589 | .is_ok()); | 671 | .is_ok()); |
| 590 | assert_eq!(c.recv().await, 1); | 672 | assert_eq!(c.receive().await, 1); |
| 591 | } | 673 | } |
| 592 | 674 | ||
| 593 | #[futures_test::test] | 675 | #[futures_test::test] |
| 594 | async fn sender_send_completes_if_capacity() { | 676 | async fn sender_send_completes_if_capacity() { |
| 595 | let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); | 677 | let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); |
| 596 | c.send(1).await; | 678 | c.send(1).await; |
| 597 | assert_eq!(c.recv().await, 1); | 679 | assert_eq!(c.receive().await, 1); |
| 598 | } | 680 | } |
| 599 | 681 | ||
| 600 | #[futures_test::test] | 682 | #[futures_test::test] |
| @@ -612,11 +694,11 @@ mod tests { | |||
| 612 | // Wish I could think of a means of determining that the async send is waiting instead. | 694 | // Wish I could think of a means of determining that the async send is waiting instead. |
| 613 | // However, I've used the debugger to observe that the send does indeed wait. | 695 | // However, I've used the debugger to observe that the send does indeed wait. |
| 614 | Delay::new(Duration::from_millis(500)).await; | 696 | Delay::new(Duration::from_millis(500)).await; |
| 615 | assert_eq!(c.recv().await, 1); | 697 | assert_eq!(c.receive().await, 1); |
| 616 | assert!(executor | 698 | assert!(executor |
| 617 | .spawn(async move { | 699 | .spawn(async move { |
| 618 | loop { | 700 | loop { |
| 619 | c.recv().await; | 701 | c.receive().await; |
| 620 | } | 702 | } |
| 621 | }) | 703 | }) |
| 622 | .is_ok()); | 704 | .is_ok()); |
diff --git a/examples/nrf52840/src/bin/channel.rs b/examples/nrf52840/src/bin/channel.rs index d782a79e7..bd9c909da 100644 --- a/examples/nrf52840/src/bin/channel.rs +++ b/examples/nrf52840/src/bin/channel.rs | |||
| @@ -35,7 +35,7 @@ async fn main(spawner: Spawner) { | |||
| 35 | unwrap!(spawner.spawn(my_task())); | 35 | unwrap!(spawner.spawn(my_task())); |
| 36 | 36 | ||
| 37 | loop { | 37 | loop { |
| 38 | match CHANNEL.recv().await { | 38 | match CHANNEL.receive().await { |
| 39 | LedState::On => led.set_high(), | 39 | LedState::On => led.set_high(), |
| 40 | LedState::Off => led.set_low(), | 40 | LedState::Off => led.set_low(), |
| 41 | } | 41 | } |
diff --git a/examples/nrf52840/src/bin/channel_sender_receiver.rs b/examples/nrf52840/src/bin/channel_sender_receiver.rs index fcccdaed5..ec4f1d800 100644 --- a/examples/nrf52840/src/bin/channel_sender_receiver.rs +++ b/examples/nrf52840/src/bin/channel_sender_receiver.rs | |||
| @@ -33,7 +33,7 @@ async fn recv_task(led: AnyPin, receiver: Receiver<'static, NoopRawMutex, LedSta | |||
| 33 | let mut led = Output::new(led, Level::Low, OutputDrive::Standard); | 33 | let mut led = Output::new(led, Level::Low, OutputDrive::Standard); |
| 34 | 34 | ||
| 35 | loop { | 35 | loop { |
| 36 | match receiver.recv().await { | 36 | match receiver.receive().await { |
| 37 | LedState::On => led.set_high(), | 37 | LedState::On => led.set_high(), |
| 38 | LedState::Off => led.set_low(), | 38 | LedState::Off => led.set_low(), |
| 39 | } | 39 | } |
diff --git a/examples/nrf52840/src/bin/uart_split.rs b/examples/nrf52840/src/bin/uart_split.rs index 9979a1d53..b748bfcd8 100644 --- a/examples/nrf52840/src/bin/uart_split.rs +++ b/examples/nrf52840/src/bin/uart_split.rs | |||
| @@ -46,7 +46,7 @@ async fn main(spawner: Spawner) { | |||
| 46 | // back out the buffer we receive from the read | 46 | // back out the buffer we receive from the read |
| 47 | // task. | 47 | // task. |
| 48 | loop { | 48 | loop { |
| 49 | let buf = CHANNEL.recv().await; | 49 | let buf = CHANNEL.receive().await; |
| 50 | info!("writing..."); | 50 | info!("writing..."); |
| 51 | unwrap!(tx.write(&buf).await); | 51 | unwrap!(tx.write(&buf).await); |
| 52 | } | 52 | } |
diff --git a/examples/rp/src/bin/lora_p2p_send_multicore.rs b/examples/rp/src/bin/lora_p2p_send_multicore.rs index 89a62818d..b54cc92f6 100644 --- a/examples/rp/src/bin/lora_p2p_send_multicore.rs +++ b/examples/rp/src/bin/lora_p2p_send_multicore.rs | |||
| @@ -113,7 +113,7 @@ async fn core1_task( | |||
| 113 | }; | 113 | }; |
| 114 | 114 | ||
| 115 | loop { | 115 | loop { |
| 116 | let buffer: [u8; 3] = CHANNEL.recv().await; | 116 | let buffer: [u8; 3] = CHANNEL.receive().await; |
| 117 | match lora.prepare_for_tx(&mdltn_params, 20, false).await { | 117 | match lora.prepare_for_tx(&mdltn_params, 20, false).await { |
| 118 | Ok(()) => {} | 118 | Ok(()) => {} |
| 119 | Err(err) => { | 119 | Err(err) => { |
diff --git a/examples/rp/src/bin/multicore.rs b/examples/rp/src/bin/multicore.rs index 893b724bf..bf017f6a7 100644 --- a/examples/rp/src/bin/multicore.rs +++ b/examples/rp/src/bin/multicore.rs | |||
| @@ -56,7 +56,7 @@ async fn core0_task() { | |||
| 56 | async fn core1_task(mut led: Output<'static, PIN_25>) { | 56 | async fn core1_task(mut led: Output<'static, PIN_25>) { |
| 57 | info!("Hello from core 1"); | 57 | info!("Hello from core 1"); |
| 58 | loop { | 58 | loop { |
| 59 | match CHANNEL.recv().await { | 59 | match CHANNEL.receive().await { |
| 60 | LedState::On => led.set_high(), | 60 | LedState::On => led.set_high(), |
| 61 | LedState::Off => led.set_low(), | 61 | LedState::Off => led.set_low(), |
| 62 | } | 62 | } |
diff --git a/examples/stm32f3/src/bin/button_events.rs b/examples/stm32f3/src/bin/button_events.rs index 02c475f66..8e97e85eb 100644 --- a/examples/stm32f3/src/bin/button_events.rs +++ b/examples/stm32f3/src/bin/button_events.rs | |||
| @@ -49,12 +49,12 @@ impl<'a> Leds<'a> { | |||
| 49 | 49 | ||
| 50 | async fn show(&mut self) { | 50 | async fn show(&mut self) { |
| 51 | self.leds[self.current_led].set_high(); | 51 | self.leds[self.current_led].set_high(); |
| 52 | if let Ok(new_message) = with_timeout(Duration::from_millis(500), CHANNEL.recv()).await { | 52 | if let Ok(new_message) = with_timeout(Duration::from_millis(500), CHANNEL.receive()).await { |
| 53 | self.leds[self.current_led].set_low(); | 53 | self.leds[self.current_led].set_low(); |
| 54 | self.process_event(new_message).await; | 54 | self.process_event(new_message).await; |
| 55 | } else { | 55 | } else { |
| 56 | self.leds[self.current_led].set_low(); | 56 | self.leds[self.current_led].set_low(); |
| 57 | if let Ok(new_message) = with_timeout(Duration::from_millis(200), CHANNEL.recv()).await { | 57 | if let Ok(new_message) = with_timeout(Duration::from_millis(200), CHANNEL.receive()).await { |
| 58 | self.process_event(new_message).await; | 58 | self.process_event(new_message).await; |
| 59 | } | 59 | } |
| 60 | } | 60 | } |
diff --git a/examples/stm32h5/src/bin/usart_split.rs b/examples/stm32h5/src/bin/usart_split.rs index debd6f454..a6b2e690b 100644 --- a/examples/stm32h5/src/bin/usart_split.rs +++ b/examples/stm32h5/src/bin/usart_split.rs | |||
| @@ -44,7 +44,7 @@ async fn main(spawner: Spawner) -> ! { | |||
| 44 | unwrap!(spawner.spawn(reader(rx))); | 44 | unwrap!(spawner.spawn(reader(rx))); |
| 45 | 45 | ||
| 46 | loop { | 46 | loop { |
| 47 | let buf = CHANNEL.recv().await; | 47 | let buf = CHANNEL.receive().await; |
| 48 | info!("writing..."); | 48 | info!("writing..."); |
| 49 | unwrap!(tx.write(&buf).await); | 49 | unwrap!(tx.write(&buf).await); |
| 50 | } | 50 | } |
diff --git a/examples/stm32h7/src/bin/usart_split.rs b/examples/stm32h7/src/bin/usart_split.rs index 330d1ce09..aa0753450 100644 --- a/examples/stm32h7/src/bin/usart_split.rs +++ b/examples/stm32h7/src/bin/usart_split.rs | |||
| @@ -44,7 +44,7 @@ async fn main(spawner: Spawner) -> ! { | |||
| 44 | unwrap!(spawner.spawn(reader(rx))); | 44 | unwrap!(spawner.spawn(reader(rx))); |
| 45 | 45 | ||
| 46 | loop { | 46 | loop { |
| 47 | let buf = CHANNEL.recv().await; | 47 | let buf = CHANNEL.receive().await; |
| 48 | info!("writing..."); | 48 | info!("writing..."); |
| 49 | unwrap!(tx.write(&buf).await); | 49 | unwrap!(tx.write(&buf).await); |
| 50 | } | 50 | } |
diff --git a/tests/rp/src/bin/gpio_multicore.rs b/tests/rp/src/bin/gpio_multicore.rs index 22be78248..6ab7f6717 100644 --- a/tests/rp/src/bin/gpio_multicore.rs +++ b/tests/rp/src/bin/gpio_multicore.rs | |||
| @@ -37,11 +37,11 @@ async fn core0_task(p: PIN_0) { | |||
| 37 | let mut pin = Output::new(p, Level::Low); | 37 | let mut pin = Output::new(p, Level::Low); |
| 38 | 38 | ||
| 39 | CHANNEL0.send(()).await; | 39 | CHANNEL0.send(()).await; |
| 40 | CHANNEL1.recv().await; | 40 | CHANNEL1.receive().await; |
| 41 | 41 | ||
| 42 | pin.set_high(); | 42 | pin.set_high(); |
| 43 | 43 | ||
| 44 | CHANNEL1.recv().await; | 44 | CHANNEL1.receive().await; |
| 45 | 45 | ||
| 46 | info!("Test OK"); | 46 | info!("Test OK"); |
| 47 | cortex_m::asm::bkpt(); | 47 | cortex_m::asm::bkpt(); |
| @@ -51,7 +51,7 @@ async fn core0_task(p: PIN_0) { | |||
| 51 | async fn core1_task(p: PIN_1) { | 51 | async fn core1_task(p: PIN_1) { |
| 52 | info!("CORE1 is running"); | 52 | info!("CORE1 is running"); |
| 53 | 53 | ||
| 54 | CHANNEL0.recv().await; | 54 | CHANNEL0.receive().await; |
| 55 | 55 | ||
| 56 | let mut pin = Input::new(p, Pull::Down); | 56 | let mut pin = Input::new(p, Pull::Down); |
| 57 | let wait = pin.wait_for_rising_edge(); | 57 | let wait = pin.wait_for_rising_edge(); |
diff --git a/tests/rp/src/bin/multicore.rs b/tests/rp/src/bin/multicore.rs index ec794c48a..f4188135e 100644 --- a/tests/rp/src/bin/multicore.rs +++ b/tests/rp/src/bin/multicore.rs | |||
| @@ -33,7 +33,7 @@ async fn core0_task() { | |||
| 33 | info!("CORE0 is running"); | 33 | info!("CORE0 is running"); |
| 34 | let ping = true; | 34 | let ping = true; |
| 35 | CHANNEL0.send(ping).await; | 35 | CHANNEL0.send(ping).await; |
| 36 | let pong = CHANNEL1.recv().await; | 36 | let pong = CHANNEL1.receive().await; |
| 37 | assert_eq!(ping, pong); | 37 | assert_eq!(ping, pong); |
| 38 | 38 | ||
| 39 | info!("Test OK"); | 39 | info!("Test OK"); |
| @@ -43,6 +43,6 @@ async fn core0_task() { | |||
| 43 | #[embassy_executor::task] | 43 | #[embassy_executor::task] |
| 44 | async fn core1_task() { | 44 | async fn core1_task() { |
| 45 | info!("CORE1 is running"); | 45 | info!("CORE1 is running"); |
| 46 | let ping = CHANNEL0.recv().await; | 46 | let ping = CHANNEL0.receive().await; |
| 47 | CHANNEL1.send(ping).await; | 47 | CHANNEL1.send(ping).await; |
| 48 | } | 48 | } |
