diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2022-04-05 23:53:59 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-04-05 23:53:59 +0000 |
| commit | c1b382296434e762d16a36d658d2f308358e3f87 (patch) | |
| tree | 6edd9fd7c2d69a5ad130dc13ae7c0bbed442e640 | |
| parent | aee19185b7cf34466f7941784b55e639c925fae4 (diff) | |
| parent | 27a1b0ea7316be4687e7173a73861d276974d502 (diff) | |
Merge #695
695: Simplify Channel. r=Dirbaio a=Dirbaio
- Allow initializing in a static, without Forever.
- Remove ability to close, since in embedded enviromnents channels usually live forever and don't get closed.
- Remove MPSC restriction, it's MPMC now. Rename "mpsc" to "channel".
- `Sender` and `Receiver` are still available if you want to enforce a piece of code only has send/receive access, but are optional: you can send/receive directly into the Channel if you want.
Co-authored-by: Dario Nieuwenhuis <[email protected]>
| -rw-r--r-- | embassy/src/channel/channel.rs | 430 | ||||
| -rw-r--r-- | embassy/src/channel/mod.rs | 2 | ||||
| -rw-r--r-- | embassy/src/channel/mpsc.rs | 822 | ||||
| -rw-r--r-- | embassy/src/channel/signal.rs | 2 | ||||
| -rw-r--r-- | examples/nrf/src/bin/channel.rs | 45 | ||||
| -rw-r--r-- | examples/nrf/src/bin/channel_sender_receiver.rs | 52 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 60 | ||||
| -rw-r--r-- | examples/nrf/src/bin/uart_split.rs | 23 | ||||
| -rw-r--r-- | examples/stm32f3/src/bin/button_events.rs | 59 | ||||
| -rw-r--r-- | examples/stm32h7/src/bin/usart_split.rs | 26 |
10 files changed, 571 insertions, 950 deletions
diff --git a/embassy/src/channel/channel.rs b/embassy/src/channel/channel.rs new file mode 100644 index 000000000..9084cd57b --- /dev/null +++ b/embassy/src/channel/channel.rs | |||
| @@ -0,0 +1,430 @@ | |||
| 1 | //! A queue for sending values between asynchronous tasks. | ||
| 2 | //! | ||
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | ||
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | ||
| 5 | //! | ||
| 6 | //! This queue takes a Mutex type so that various | ||
| 7 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 8 | //! for single-core Cortex-M targets where messages are only passed | ||
| 9 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 10 | //! can also be used for single-core targets where messages are to be | ||
| 11 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 12 | //! | ||
| 13 | //! This module provides a bounded channel that has a limit on the number of | ||
| 14 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 15 | //! another message will result in an error being returned. | ||
| 16 | //! | ||
| 17 | |||
| 18 | use core::cell::RefCell; | ||
| 19 | use core::pin::Pin; | ||
| 20 | use core::task::Context; | ||
| 21 | use core::task::Poll; | ||
| 22 | |||
| 23 | use futures::Future; | ||
| 24 | use heapless::Deque; | ||
| 25 | |||
| 26 | use crate::blocking_mutex::raw::RawMutex; | ||
| 27 | use crate::blocking_mutex::Mutex; | ||
| 28 | use crate::waitqueue::WakerRegistration; | ||
| 29 | |||
| 30 | /// Send-only access to a [`Channel`]. | ||
| 31 | #[derive(Copy, Clone)] | ||
| 32 | pub struct Sender<'ch, M, T, const N: usize> | ||
| 33 | where | ||
| 34 | M: RawMutex, | ||
| 35 | { | ||
| 36 | channel: &'ch Channel<M, T, N>, | ||
| 37 | } | ||
| 38 | |||
| 39 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | ||
| 40 | where | ||
| 41 | M: RawMutex, | ||
| 42 | { | ||
| 43 | /// Sends a value. | ||
| 44 | /// | ||
| 45 | /// See [`Channel::send()`] | ||
| 46 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { | ||
| 47 | self.channel.send(message) | ||
| 48 | } | ||
| 49 | |||
| 50 | /// Attempt to immediately send a message. | ||
| 51 | /// | ||
| 52 | /// See [`Channel::send()`] | ||
| 53 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 54 | self.channel.try_send(message) | ||
| 55 | } | ||
| 56 | } | ||
| 57 | |||
| 58 | /// Receive-only access to a [`Channel`]. | ||
| 59 | #[derive(Copy, Clone)] | ||
| 60 | pub struct Receiver<'ch, M, T, const N: usize> | ||
| 61 | where | ||
| 62 | M: RawMutex, | ||
| 63 | { | ||
| 64 | channel: &'ch Channel<M, T, N>, | ||
| 65 | } | ||
| 66 | |||
| 67 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | ||
| 68 | where | ||
| 69 | M: RawMutex, | ||
| 70 | { | ||
| 71 | /// Receive the next value. | ||
| 72 | /// | ||
| 73 | /// See [`Channel::recv()`]. | ||
| 74 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | ||
| 75 | self.channel.recv() | ||
| 76 | } | ||
| 77 | |||
| 78 | /// Attempt to immediately receive the next value. | ||
| 79 | /// | ||
| 80 | /// See [`Channel::try_recv()`] | ||
| 81 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 82 | self.channel.try_recv() | ||
| 83 | } | ||
| 84 | } | ||
| 85 | |||
| 86 | pub struct RecvFuture<'ch, M, T, const N: usize> | ||
| 87 | where | ||
| 88 | M: RawMutex, | ||
| 89 | { | ||
| 90 | channel: &'ch Channel<M, T, N>, | ||
| 91 | } | ||
| 92 | |||
| 93 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | ||
| 94 | where | ||
| 95 | M: RawMutex, | ||
| 96 | { | ||
| 97 | type Output = T; | ||
| 98 | |||
| 99 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 100 | self.channel | ||
| 101 | .lock(|c| match c.try_recv_with_context(Some(cx)) { | ||
| 102 | Ok(v) => Poll::Ready(v), | ||
| 103 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 104 | }) | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | pub struct SendFuture<'ch, M, T, const N: usize> | ||
| 109 | where | ||
| 110 | M: RawMutex, | ||
| 111 | { | ||
| 112 | channel: &'ch Channel<M, T, N>, | ||
| 113 | message: Option<T>, | ||
| 114 | } | ||
| 115 | |||
| 116 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | ||
| 117 | where | ||
| 118 | M: RawMutex, | ||
| 119 | { | ||
| 120 | type Output = (); | ||
| 121 | |||
| 122 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 123 | match self.message.take() { | ||
| 124 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { | ||
| 125 | Ok(..) => Poll::Ready(()), | ||
| 126 | Err(TrySendError::Full(m)) => { | ||
| 127 | self.message = Some(m); | ||
| 128 | Poll::Pending | ||
| 129 | } | ||
| 130 | }, | ||
| 131 | None => panic!("Message cannot be None"), | ||
| 132 | } | ||
| 133 | } | ||
| 134 | } | ||
| 135 | |||
| 136 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | ||
| 137 | |||
| 138 | /// Error returned by [`try_recv`](Channel::try_recv). | ||
| 139 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 140 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 141 | pub enum TryRecvError { | ||
| 142 | /// A message could not be received because the channel is empty. | ||
| 143 | Empty, | ||
| 144 | } | ||
| 145 | |||
| 146 | /// Error returned by [`try_send`](Channel::try_send). | ||
| 147 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 148 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 149 | pub enum TrySendError<T> { | ||
| 150 | /// The data could not be sent on the channel because the channel is | ||
| 151 | /// currently full and sending would require blocking. | ||
| 152 | Full(T), | ||
| 153 | } | ||
| 154 | |||
| 155 | struct ChannelState<T, const N: usize> { | ||
| 156 | queue: Deque<T, N>, | ||
| 157 | receiver_waker: WakerRegistration, | ||
| 158 | senders_waker: WakerRegistration, | ||
| 159 | } | ||
| 160 | |||
| 161 | impl<T, const N: usize> ChannelState<T, N> { | ||
| 162 | const fn new() -> Self { | ||
| 163 | ChannelState { | ||
| 164 | queue: Deque::new(), | ||
| 165 | receiver_waker: WakerRegistration::new(), | ||
| 166 | senders_waker: WakerRegistration::new(), | ||
| 167 | } | ||
| 168 | } | ||
| 169 | |||
| 170 | fn try_recv(&mut self) -> Result<T, TryRecvError> { | ||
| 171 | self.try_recv_with_context(None) | ||
| 172 | } | ||
| 173 | |||
| 174 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 175 | if self.queue.is_full() { | ||
| 176 | self.senders_waker.wake(); | ||
| 177 | } | ||
| 178 | |||
| 179 | if let Some(message) = self.queue.pop_front() { | ||
| 180 | Ok(message) | ||
| 181 | } else { | ||
| 182 | if let Some(cx) = cx { | ||
| 183 | self.receiver_waker.register(cx.waker()); | ||
| 184 | } | ||
| 185 | Err(TryRecvError::Empty) | ||
| 186 | } | ||
| 187 | } | ||
| 188 | |||
| 189 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { | ||
| 190 | self.try_send_with_context(message, None) | ||
| 191 | } | ||
| 192 | |||
| 193 | fn try_send_with_context( | ||
| 194 | &mut self, | ||
| 195 | message: T, | ||
| 196 | cx: Option<&mut Context<'_>>, | ||
| 197 | ) -> Result<(), TrySendError<T>> { | ||
| 198 | match self.queue.push_back(message) { | ||
| 199 | Ok(()) => { | ||
| 200 | self.receiver_waker.wake(); | ||
| 201 | Ok(()) | ||
| 202 | } | ||
| 203 | Err(message) => { | ||
| 204 | if let Some(cx) = cx { | ||
| 205 | self.senders_waker.register(cx.waker()); | ||
| 206 | } | ||
| 207 | Err(TrySendError::Full(message)) | ||
| 208 | } | ||
| 209 | } | ||
| 210 | } | ||
| 211 | } | ||
| 212 | |||
| 213 | /// A bounded channel for communicating between asynchronous tasks | ||
| 214 | /// with backpressure. | ||
| 215 | /// | ||
| 216 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 217 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 218 | /// received from the channel. | ||
| 219 | /// | ||
| 220 | /// All data sent will become available in the same order as it was sent. | ||
| 221 | pub struct Channel<M, T, const N: usize> | ||
| 222 | where | ||
| 223 | M: RawMutex, | ||
| 224 | { | ||
| 225 | inner: Mutex<M, RefCell<ChannelState<T, N>>>, | ||
| 226 | } | ||
| 227 | |||
| 228 | impl<M, T, const N: usize> Channel<M, T, N> | ||
| 229 | where | ||
| 230 | M: RawMutex, | ||
| 231 | { | ||
| 232 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 233 | /// | ||
| 234 | /// ``` | ||
| 235 | /// use embassy::channel::channel::Channel; | ||
| 236 | /// use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 237 | /// | ||
| 238 | /// // Declare a bounded channel of 3 u32s. | ||
| 239 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 240 | /// ``` | ||
| 241 | #[cfg(feature = "nightly")] | ||
| 242 | pub const fn new() -> Self { | ||
| 243 | Self { | ||
| 244 | inner: Mutex::new(RefCell::new(ChannelState::new())), | ||
| 245 | } | ||
| 246 | } | ||
| 247 | |||
| 248 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 249 | /// | ||
| 250 | /// ``` | ||
| 251 | /// use embassy::channel::channel::Channel; | ||
| 252 | /// use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 253 | /// | ||
| 254 | /// // Declare a bounded channel of 3 u32s. | ||
| 255 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 256 | /// ``` | ||
| 257 | #[cfg(not(feature = "nightly"))] | ||
| 258 | pub fn new() -> Self { | ||
| 259 | Self { | ||
| 260 | inner: Mutex::new(RefCell::new(ChannelState::new())), | ||
| 261 | } | ||
| 262 | } | ||
| 263 | |||
| 264 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { | ||
| 265 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | ||
| 266 | } | ||
| 267 | |||
| 268 | /// Get a sender for this channel. | ||
| 269 | pub fn sender(&self) -> Sender<'_, M, T, N> { | ||
| 270 | Sender { channel: self } | ||
| 271 | } | ||
| 272 | |||
| 273 | /// Get a receiver for this channel. | ||
| 274 | pub fn receiver(&self) -> Receiver<'_, M, T, N> { | ||
| 275 | Receiver { channel: self } | ||
| 276 | } | ||
| 277 | |||
| 278 | /// Send a value, waiting until there is capacity. | ||
| 279 | /// | ||
| 280 | /// Sending completes when the value has been pushed to the channel's queue. | ||
| 281 | /// This doesn't mean the value has been received yet. | ||
| 282 | pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { | ||
| 283 | SendFuture { | ||
| 284 | channel: self, | ||
| 285 | message: Some(message), | ||
| 286 | } | ||
| 287 | } | ||
| 288 | |||
| 289 | /// Attempt to immediately send a message. | ||
| 290 | /// | ||
| 291 | /// This method differs from [`send`] by returning immediately if the channel's | ||
| 292 | /// buffer is full, instead of waiting. | ||
| 293 | /// | ||
| 294 | /// # Errors | ||
| 295 | /// | ||
| 296 | /// If the channel capacity has been reached, i.e., the channel has `n` | ||
| 297 | /// buffered values where `n` is the argument passed to [`Channel`], then an | ||
| 298 | /// error is returned. | ||
| 299 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 300 | self.lock(|c| c.try_send(message)) | ||
| 301 | } | ||
| 302 | |||
| 303 | /// Receive the next value. | ||
| 304 | /// | ||
| 305 | /// If there are no messages in the channel's buffer, this method will | ||
| 306 | /// wait until a message is sent. | ||
| 307 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | ||
| 308 | RecvFuture { channel: self } | ||
| 309 | } | ||
| 310 | |||
| 311 | /// Attempt to immediately receive a message. | ||
| 312 | /// | ||
| 313 | /// This method will either receive a message from the channel immediately or return an error | ||
| 314 | /// if the channel is empty. | ||
| 315 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 316 | self.lock(|c| c.try_recv()) | ||
| 317 | } | ||
| 318 | } | ||
| 319 | |||
| 320 | #[cfg(test)] | ||
| 321 | mod tests { | ||
| 322 | use core::time::Duration; | ||
| 323 | |||
| 324 | use futures::task::SpawnExt; | ||
| 325 | use futures_executor::ThreadPool; | ||
| 326 | use futures_timer::Delay; | ||
| 327 | |||
| 328 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; | ||
| 329 | use crate::util::Forever; | ||
| 330 | |||
| 331 | use super::*; | ||
| 332 | |||
| 333 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | ||
| 334 | c.queue.capacity() - c.queue.len() | ||
| 335 | } | ||
| 336 | |||
| 337 | #[test] | ||
| 338 | fn sending_once() { | ||
| 339 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 340 | assert!(c.try_send(1).is_ok()); | ||
| 341 | assert_eq!(capacity(&c), 2); | ||
| 342 | } | ||
| 343 | |||
| 344 | #[test] | ||
| 345 | fn sending_when_full() { | ||
| 346 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 347 | let _ = c.try_send(1); | ||
| 348 | let _ = c.try_send(1); | ||
| 349 | let _ = c.try_send(1); | ||
| 350 | match c.try_send(2) { | ||
| 351 | Err(TrySendError::Full(2)) => assert!(true), | ||
| 352 | _ => assert!(false), | ||
| 353 | } | ||
| 354 | assert_eq!(capacity(&c), 0); | ||
| 355 | } | ||
| 356 | |||
| 357 | #[test] | ||
| 358 | fn receiving_once_with_one_send() { | ||
| 359 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 360 | assert!(c.try_send(1).is_ok()); | ||
| 361 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 362 | assert_eq!(capacity(&c), 3); | ||
| 363 | } | ||
| 364 | |||
| 365 | #[test] | ||
| 366 | fn receiving_when_empty() { | ||
| 367 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 368 | match c.try_recv() { | ||
| 369 | Err(TryRecvError::Empty) => assert!(true), | ||
| 370 | _ => assert!(false), | ||
| 371 | } | ||
| 372 | assert_eq!(capacity(&c), 3); | ||
| 373 | } | ||
| 374 | |||
| 375 | #[test] | ||
| 376 | fn simple_send_and_receive() { | ||
| 377 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 378 | assert!(c.try_send(1).is_ok()); | ||
| 379 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 380 | } | ||
| 381 | |||
| 382 | #[futures_test::test] | ||
| 383 | async fn receiver_receives_given_try_send_async() { | ||
| 384 | let executor = ThreadPool::new().unwrap(); | ||
| 385 | |||
| 386 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new(); | ||
| 387 | let c = &*CHANNEL.put(Channel::new()); | ||
| 388 | let c2 = c; | ||
| 389 | assert!(executor | ||
| 390 | .spawn(async move { | ||
| 391 | assert!(c2.try_send(1).is_ok()); | ||
| 392 | }) | ||
| 393 | .is_ok()); | ||
| 394 | assert_eq!(c.recv().await, 1); | ||
| 395 | } | ||
| 396 | |||
| 397 | #[futures_test::test] | ||
| 398 | async fn sender_send_completes_if_capacity() { | ||
| 399 | let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); | ||
| 400 | c.send(1).await; | ||
| 401 | assert_eq!(c.recv().await, 1); | ||
| 402 | } | ||
| 403 | |||
| 404 | #[futures_test::test] | ||
| 405 | async fn senders_sends_wait_until_capacity() { | ||
| 406 | let executor = ThreadPool::new().unwrap(); | ||
| 407 | |||
| 408 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new(); | ||
| 409 | let c = &*CHANNEL.put(Channel::new()); | ||
| 410 | assert!(c.try_send(1).is_ok()); | ||
| 411 | |||
| 412 | let c2 = c; | ||
| 413 | let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); | ||
| 414 | let c2 = c; | ||
| 415 | let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); | ||
| 416 | // Wish I could think of a means of determining that the async send is waiting instead. | ||
| 417 | // However, I've used the debugger to observe that the send does indeed wait. | ||
| 418 | Delay::new(Duration::from_millis(500)).await; | ||
| 419 | assert_eq!(c.recv().await, 1); | ||
| 420 | assert!(executor | ||
| 421 | .spawn(async move { | ||
| 422 | loop { | ||
| 423 | c.recv().await; | ||
| 424 | } | ||
| 425 | }) | ||
| 426 | .is_ok()); | ||
| 427 | send_task_1.unwrap().await; | ||
| 428 | send_task_2.unwrap().await; | ||
| 429 | } | ||
| 430 | } | ||
diff --git a/embassy/src/channel/mod.rs b/embassy/src/channel/mod.rs index 9e8c67ee9..e51a442df 100644 --- a/embassy/src/channel/mod.rs +++ b/embassy/src/channel/mod.rs | |||
| @@ -1,4 +1,4 @@ | |||
| 1 | //! Async channels | 1 | //! Async channels |
| 2 | 2 | ||
| 3 | pub mod mpsc; | 3 | pub mod channel; |
| 4 | pub mod signal; | 4 | pub mod signal; |
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs deleted file mode 100644 index 32787d810..000000000 --- a/embassy/src/channel/mpsc.rs +++ /dev/null | |||
| @@ -1,822 +0,0 @@ | |||
| 1 | //! A multi-producer, single-consumer queue for sending values between | ||
| 2 | //! asynchronous tasks. This queue takes a Mutex type so that various | ||
| 3 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 4 | //! for single-core Cortex-M targets where messages are only passed | ||
| 5 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 6 | //! can also be used for single-core targets where messages are to be | ||
| 7 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 8 | //! | ||
| 9 | //! This module provides a bounded channel that has a limit on the number of | ||
| 10 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 11 | //! another message will result in an error being returned. | ||
| 12 | //! | ||
| 13 | //! Similar to the `mpsc` channels provided by `std`, the channel constructor | ||
| 14 | //! functions provide separate send and receive handles, [`Sender`] and | ||
| 15 | //! [`Receiver`]. If there is no message to read, the current task will be | ||
| 16 | //! notified when a new value is sent. [`Sender`] allows sending values into | ||
| 17 | //! the channel. If the bounded channel is at capacity, the send is rejected. | ||
| 18 | //! | ||
| 19 | //! # Disconnection | ||
| 20 | //! | ||
| 21 | //! When all [`Sender`] handles have been dropped, it is no longer | ||
| 22 | //! possible to send values into the channel. This is considered the termination | ||
| 23 | //! event of the stream. | ||
| 24 | //! | ||
| 25 | //! If the [`Receiver`] handle is dropped, then messages can no longer | ||
| 26 | //! be read out of the channel. In this case, all further attempts to send will | ||
| 27 | //! result in an error. | ||
| 28 | //! | ||
| 29 | //! # Clean Shutdown | ||
| 30 | //! | ||
| 31 | //! When the [`Receiver`] is dropped, it is possible for unprocessed messages to | ||
| 32 | //! remain in the channel. Instead, it is usually desirable to perform a "clean" | ||
| 33 | //! shutdown. To do this, the receiver first calls `close`, which will prevent | ||
| 34 | //! any further messages to be sent into the channel. Then, the receiver | ||
| 35 | //! consumes the channel to completion, at which point the receiver can be | ||
| 36 | //! dropped. | ||
| 37 | //! | ||
| 38 | //! This channel and its associated types were derived from <https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html> | ||
| 39 | |||
| 40 | use core::cell::RefCell; | ||
| 41 | use core::fmt; | ||
| 42 | use core::pin::Pin; | ||
| 43 | use core::task::Context; | ||
| 44 | use core::task::Poll; | ||
| 45 | use core::task::Waker; | ||
| 46 | |||
| 47 | use futures::Future; | ||
| 48 | use heapless::Deque; | ||
| 49 | |||
| 50 | use crate::blocking_mutex::raw::RawMutex; | ||
| 51 | use crate::blocking_mutex::Mutex; | ||
| 52 | use crate::waitqueue::WakerRegistration; | ||
| 53 | |||
| 54 | /// Send values to the associated `Receiver`. | ||
| 55 | /// | ||
| 56 | /// Instances are created by the [`split`](split) function. | ||
| 57 | pub struct Sender<'ch, M, T, const N: usize> | ||
| 58 | where | ||
| 59 | M: RawMutex, | ||
| 60 | { | ||
| 61 | channel: &'ch Channel<M, T, N>, | ||
| 62 | } | ||
| 63 | |||
| 64 | /// Receive values from the associated `Sender`. | ||
| 65 | /// | ||
| 66 | /// Instances are created by the [`split`](split) function. | ||
| 67 | pub struct Receiver<'ch, M, T, const N: usize> | ||
| 68 | where | ||
| 69 | M: RawMutex, | ||
| 70 | { | ||
| 71 | channel: &'ch Channel<M, T, N>, | ||
| 72 | } | ||
| 73 | |||
| 74 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. | ||
| 75 | /// | ||
| 76 | /// All data sent on `Sender` will become available on `Receiver` in the same | ||
| 77 | /// order as it was sent. | ||
| 78 | /// | ||
| 79 | /// The `Sender` can be cloned to `send` to the same channel from multiple code | ||
| 80 | /// locations. Only one `Receiver` is valid. | ||
| 81 | /// | ||
| 82 | /// If the `Receiver` is disconnected while trying to `send`, the `send` method | ||
| 83 | /// will return a `SendError`. Similarly, if `Sender` is disconnected while | ||
| 84 | /// trying to `recv`, the `recv` method will return a `RecvError`. | ||
| 85 | /// | ||
| 86 | /// Note that when splitting the channel, the sender and receiver cannot outlive | ||
| 87 | /// their channel. The following will therefore fail compilation: | ||
| 88 | //// | ||
| 89 | /// ```compile_fail | ||
| 90 | /// use embassy::channel::mpsc; | ||
| 91 | /// use embassy::channel::mpsc::{Channel, WithThreadModeOnly}; | ||
| 92 | /// | ||
| 93 | /// let (sender, receiver) = { | ||
| 94 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); | ||
| 95 | /// mpsc::split(&mut channel) | ||
| 96 | /// }; | ||
| 97 | /// ``` | ||
| 98 | pub fn split<M, T, const N: usize>( | ||
| 99 | channel: &mut Channel<M, T, N>, | ||
| 100 | ) -> (Sender<M, T, N>, Receiver<M, T, N>) | ||
| 101 | where | ||
| 102 | M: RawMutex, | ||
| 103 | { | ||
| 104 | let sender = Sender { channel }; | ||
| 105 | let receiver = Receiver { channel }; | ||
| 106 | channel.lock(|c| { | ||
| 107 | c.register_receiver(); | ||
| 108 | c.register_sender(); | ||
| 109 | }); | ||
| 110 | (sender, receiver) | ||
| 111 | } | ||
| 112 | |||
| 113 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | ||
| 114 | where | ||
| 115 | M: RawMutex, | ||
| 116 | { | ||
| 117 | /// Receives the next value for this receiver. | ||
| 118 | /// | ||
| 119 | /// This method returns `None` if the channel has been closed and there are | ||
| 120 | /// no remaining messages in the channel's buffer. This indicates that no | ||
| 121 | /// further values can ever be received from this `Receiver`. The channel is | ||
| 122 | /// closed when all senders have been dropped, or when [`close`] is called. | ||
| 123 | /// | ||
| 124 | /// If there are no messages in the channel's buffer, but the channel has | ||
| 125 | /// not yet been closed, this method will sleep until a message is sent or | ||
| 126 | /// the channel is closed. | ||
| 127 | /// | ||
| 128 | /// Note that if [`close`] is called, but there are still outstanding | ||
| 129 | /// messages from before it was closed, the channel is not considered | ||
| 130 | /// closed by `recv` until they are all consumed. | ||
| 131 | /// | ||
| 132 | /// [`close`]: Self::close | ||
| 133 | pub fn recv(&mut self) -> RecvFuture<'_, M, T, N> { | ||
| 134 | RecvFuture { | ||
| 135 | channel: self.channel, | ||
| 136 | } | ||
| 137 | } | ||
| 138 | |||
| 139 | /// Attempts to immediately receive a message on this `Receiver` | ||
| 140 | /// | ||
| 141 | /// This method will either receive a message from the channel immediately or return an error | ||
| 142 | /// if the channel is empty. | ||
| 143 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 144 | self.channel.lock(|c| c.try_recv()) | ||
| 145 | } | ||
| 146 | |||
| 147 | /// Closes the receiving half of a channel without dropping it. | ||
| 148 | /// | ||
| 149 | /// This prevents any further messages from being sent on the channel while | ||
| 150 | /// still enabling the receiver to drain messages that are buffered. | ||
| 151 | /// | ||
| 152 | /// To guarantee that no messages are dropped, after calling `close()`, | ||
| 153 | /// `recv()` must be called until `None` is returned. If there are | ||
| 154 | /// outstanding messages, the `recv` method will not return `None` | ||
| 155 | /// until those are released. | ||
| 156 | /// | ||
| 157 | pub fn close(&mut self) { | ||
| 158 | self.channel.lock(|c| c.close()) | ||
| 159 | } | ||
| 160 | } | ||
| 161 | |||
| 162 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> | ||
| 163 | where | ||
| 164 | M: RawMutex, | ||
| 165 | { | ||
| 166 | fn drop(&mut self) { | ||
| 167 | self.channel.lock(|c| c.deregister_receiver()) | ||
| 168 | } | ||
| 169 | } | ||
| 170 | |||
| 171 | pub struct RecvFuture<'ch, M, T, const N: usize> | ||
| 172 | where | ||
| 173 | M: RawMutex, | ||
| 174 | { | ||
| 175 | channel: &'ch Channel<M, T, N>, | ||
| 176 | } | ||
| 177 | |||
| 178 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | ||
| 179 | where | ||
| 180 | M: RawMutex, | ||
| 181 | { | ||
| 182 | type Output = Option<T>; | ||
| 183 | |||
| 184 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | ||
| 185 | self.channel | ||
| 186 | .lock(|c| match c.try_recv_with_context(Some(cx)) { | ||
| 187 | Ok(v) => Poll::Ready(Some(v)), | ||
| 188 | Err(TryRecvError::Closed) => Poll::Ready(None), | ||
| 189 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 190 | }) | ||
| 191 | } | ||
| 192 | } | ||
| 193 | |||
| 194 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | ||
| 195 | where | ||
| 196 | M: RawMutex, | ||
| 197 | { | ||
| 198 | /// Sends a value, waiting until there is capacity. | ||
| 199 | /// | ||
| 200 | /// A successful send occurs when it is determined that the other end of the | ||
| 201 | /// channel has not hung up already. An unsuccessful send would be one where | ||
| 202 | /// the corresponding receiver has already been closed. Note that a return | ||
| 203 | /// value of `Err` means that the data will never be received, but a return | ||
| 204 | /// value of `Ok` does not mean that the data will be received. It is | ||
| 205 | /// possible for the corresponding receiver to hang up immediately after | ||
| 206 | /// this function returns `Ok`. | ||
| 207 | /// | ||
| 208 | /// # Errors | ||
| 209 | /// | ||
| 210 | /// If the receive half of the channel is closed, either due to [`close`] | ||
| 211 | /// being called or the [`Receiver`] handle dropping, the function returns | ||
| 212 | /// an error. The error includes the value passed to `send`. | ||
| 213 | /// | ||
| 214 | /// [`close`]: Receiver::close | ||
| 215 | /// [`Receiver`]: Receiver | ||
| 216 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { | ||
| 217 | SendFuture { | ||
| 218 | channel: self.channel, | ||
| 219 | message: Some(message), | ||
| 220 | } | ||
| 221 | } | ||
| 222 | |||
| 223 | /// Attempts to immediately send a message on this `Sender` | ||
| 224 | /// | ||
| 225 | /// This method differs from [`send`] by returning immediately if the channel's | ||
| 226 | /// buffer is full or no receiver is waiting to acquire some data. Compared | ||
| 227 | /// with [`send`], this function has two failure cases instead of one (one for | ||
| 228 | /// disconnection, one for a full buffer). | ||
| 229 | /// | ||
| 230 | /// # Errors | ||
| 231 | /// | ||
| 232 | /// If the channel capacity has been reached, i.e., the channel has `n` | ||
| 233 | /// buffered values where `n` is the argument passed to [`channel`], then an | ||
| 234 | /// error is returned. | ||
| 235 | /// | ||
| 236 | /// If the receive half of the channel is closed, either due to [`close`] | ||
| 237 | /// being called or the [`Receiver`] handle dropping, the function returns | ||
| 238 | /// an error. The error includes the value passed to `send`. | ||
| 239 | /// | ||
| 240 | /// [`send`]: Sender::send | ||
| 241 | /// [`channel`]: channel | ||
| 242 | /// [`close`]: Receiver::close | ||
| 243 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 244 | self.channel.lock(|c| c.try_send(message)) | ||
| 245 | } | ||
| 246 | |||
| 247 | /// Completes when the receiver has dropped. | ||
| 248 | /// | ||
| 249 | /// This allows the producers to get notified when interest in the produced | ||
| 250 | /// values is canceled and immediately stop doing work. | ||
| 251 | pub async fn closed(&self) { | ||
| 252 | CloseFuture { | ||
| 253 | channel: self.channel, | ||
| 254 | } | ||
| 255 | .await | ||
| 256 | } | ||
| 257 | |||
| 258 | /// Checks if the channel has been closed. This happens when the | ||
| 259 | /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is | ||
| 260 | /// called. | ||
| 261 | /// | ||
| 262 | /// [`Receiver`]: Receiver | ||
| 263 | /// [`Receiver::close`]: Receiver::close | ||
| 264 | pub fn is_closed(&self) -> bool { | ||
| 265 | self.channel.lock(|c| c.is_closed()) | ||
| 266 | } | ||
| 267 | } | ||
| 268 | |||
| 269 | pub struct SendFuture<'ch, M, T, const N: usize> | ||
| 270 | where | ||
| 271 | M: RawMutex, | ||
| 272 | { | ||
| 273 | channel: &'ch Channel<M, T, N>, | ||
| 274 | message: Option<T>, | ||
| 275 | } | ||
| 276 | |||
| 277 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | ||
| 278 | where | ||
| 279 | M: RawMutex, | ||
| 280 | { | ||
| 281 | type Output = Result<(), SendError<T>>; | ||
| 282 | |||
| 283 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 284 | match self.message.take() { | ||
| 285 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { | ||
| 286 | Ok(..) => Poll::Ready(Ok(())), | ||
| 287 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), | ||
| 288 | Err(TrySendError::Full(m)) => { | ||
| 289 | self.message = Some(m); | ||
| 290 | Poll::Pending | ||
| 291 | } | ||
| 292 | }, | ||
| 293 | None => panic!("Message cannot be None"), | ||
| 294 | } | ||
| 295 | } | ||
| 296 | } | ||
| 297 | |||
| 298 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | ||
| 299 | |||
| 300 | struct CloseFuture<'ch, M, T, const N: usize> | ||
| 301 | where | ||
| 302 | M: RawMutex, | ||
| 303 | { | ||
| 304 | channel: &'ch Channel<M, T, N>, | ||
| 305 | } | ||
| 306 | |||
| 307 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> | ||
| 308 | where | ||
| 309 | M: RawMutex, | ||
| 310 | { | ||
| 311 | type Output = (); | ||
| 312 | |||
| 313 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 314 | if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) { | ||
| 315 | Poll::Ready(()) | ||
| 316 | } else { | ||
| 317 | Poll::Pending | ||
| 318 | } | ||
| 319 | } | ||
| 320 | } | ||
| 321 | |||
| 322 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> | ||
| 323 | where | ||
| 324 | M: RawMutex, | ||
| 325 | { | ||
| 326 | fn drop(&mut self) { | ||
| 327 | self.channel.lock(|c| c.deregister_sender()) | ||
| 328 | } | ||
| 329 | } | ||
| 330 | |||
| 331 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | ||
| 332 | where | ||
| 333 | M: RawMutex, | ||
| 334 | { | ||
| 335 | fn clone(&self) -> Self { | ||
| 336 | self.channel.lock(|c| c.register_sender()); | ||
| 337 | Sender { | ||
| 338 | channel: self.channel, | ||
| 339 | } | ||
| 340 | } | ||
| 341 | } | ||
| 342 | |||
| 343 | /// An error returned from the [`try_recv`] method. | ||
| 344 | /// | ||
| 345 | /// [`try_recv`]: Receiver::try_recv | ||
| 346 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 347 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 348 | pub enum TryRecvError { | ||
| 349 | /// A message could not be received because the channel is empty. | ||
| 350 | Empty, | ||
| 351 | |||
| 352 | /// The message could not be received because the channel is empty and closed. | ||
| 353 | Closed, | ||
| 354 | } | ||
| 355 | |||
| 356 | /// Error returned by the `Sender`. | ||
| 357 | #[derive(Debug)] | ||
| 358 | pub struct SendError<T>(pub T); | ||
| 359 | |||
| 360 | impl<T> fmt::Display for SendError<T> { | ||
| 361 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 362 | write!(fmt, "channel closed") | ||
| 363 | } | ||
| 364 | } | ||
| 365 | |||
| 366 | #[cfg(feature = "defmt")] | ||
| 367 | impl<T> defmt::Format for SendError<T> { | ||
| 368 | fn format(&self, fmt: defmt::Formatter<'_>) { | ||
| 369 | defmt::write!(fmt, "channel closed") | ||
| 370 | } | ||
| 371 | } | ||
| 372 | |||
| 373 | /// This enumeration is the list of the possible error outcomes for the | ||
| 374 | /// [try_send](Sender::try_send) method. | ||
| 375 | #[derive(Debug)] | ||
| 376 | pub enum TrySendError<T> { | ||
| 377 | /// The data could not be sent on the channel because the channel is | ||
| 378 | /// currently full and sending would require blocking. | ||
| 379 | Full(T), | ||
| 380 | |||
| 381 | /// The receive half of the channel was explicitly closed or has been | ||
| 382 | /// dropped. | ||
| 383 | Closed(T), | ||
| 384 | } | ||
| 385 | |||
| 386 | impl<T> fmt::Display for TrySendError<T> { | ||
| 387 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 388 | write!( | ||
| 389 | fmt, | ||
| 390 | "{}", | ||
| 391 | match self { | ||
| 392 | TrySendError::Full(..) => "no available capacity", | ||
| 393 | TrySendError::Closed(..) => "channel closed", | ||
| 394 | } | ||
| 395 | ) | ||
| 396 | } | ||
| 397 | } | ||
| 398 | |||
| 399 | #[cfg(feature = "defmt")] | ||
| 400 | impl<T> defmt::Format for TrySendError<T> { | ||
| 401 | fn format(&self, fmt: defmt::Formatter<'_>) { | ||
| 402 | match self { | ||
| 403 | TrySendError::Full(..) => defmt::write!(fmt, "no available capacity"), | ||
| 404 | TrySendError::Closed(..) => defmt::write!(fmt, "channel closed"), | ||
| 405 | } | ||
| 406 | } | ||
| 407 | } | ||
| 408 | |||
| 409 | struct ChannelState<T, const N: usize> { | ||
| 410 | queue: Deque<T, N>, | ||
| 411 | closed: bool, | ||
| 412 | receiver_registered: bool, | ||
| 413 | senders_registered: u32, | ||
| 414 | receiver_waker: WakerRegistration, | ||
| 415 | senders_waker: WakerRegistration, | ||
| 416 | } | ||
| 417 | |||
| 418 | impl<T, const N: usize> ChannelState<T, N> { | ||
| 419 | const fn new() -> Self { | ||
| 420 | ChannelState { | ||
| 421 | queue: Deque::new(), | ||
| 422 | closed: false, | ||
| 423 | receiver_registered: false, | ||
| 424 | senders_registered: 0, | ||
| 425 | receiver_waker: WakerRegistration::new(), | ||
| 426 | senders_waker: WakerRegistration::new(), | ||
| 427 | } | ||
| 428 | } | ||
| 429 | |||
| 430 | fn try_recv(&mut self) -> Result<T, TryRecvError> { | ||
| 431 | self.try_recv_with_context(None) | ||
| 432 | } | ||
| 433 | |||
| 434 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 435 | if self.queue.is_full() { | ||
| 436 | self.senders_waker.wake(); | ||
| 437 | } | ||
| 438 | |||
| 439 | if let Some(message) = self.queue.pop_front() { | ||
| 440 | Ok(message) | ||
| 441 | } else if !self.closed { | ||
| 442 | if let Some(cx) = cx { | ||
| 443 | self.set_receiver_waker(cx.waker()); | ||
| 444 | } | ||
| 445 | Err(TryRecvError::Empty) | ||
| 446 | } else { | ||
| 447 | Err(TryRecvError::Closed) | ||
| 448 | } | ||
| 449 | } | ||
| 450 | |||
| 451 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { | ||
| 452 | self.try_send_with_context(message, None) | ||
| 453 | } | ||
| 454 | |||
| 455 | fn try_send_with_context( | ||
| 456 | &mut self, | ||
| 457 | message: T, | ||
| 458 | cx: Option<&mut Context<'_>>, | ||
| 459 | ) -> Result<(), TrySendError<T>> { | ||
| 460 | if self.closed { | ||
| 461 | return Err(TrySendError::Closed(message)); | ||
| 462 | } | ||
| 463 | |||
| 464 | match self.queue.push_back(message) { | ||
| 465 | Ok(()) => { | ||
| 466 | self.receiver_waker.wake(); | ||
| 467 | |||
| 468 | Ok(()) | ||
| 469 | } | ||
| 470 | Err(message) => { | ||
| 471 | cx.into_iter() | ||
| 472 | .for_each(|cx| self.set_senders_waker(cx.waker())); | ||
| 473 | Err(TrySendError::Full(message)) | ||
| 474 | } | ||
| 475 | } | ||
| 476 | } | ||
| 477 | |||
| 478 | fn close(&mut self) { | ||
| 479 | self.receiver_waker.wake(); | ||
| 480 | self.closed = true; | ||
| 481 | } | ||
| 482 | |||
| 483 | fn is_closed(&mut self) -> bool { | ||
| 484 | self.is_closed_with_context(None) | ||
| 485 | } | ||
| 486 | |||
| 487 | fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool { | ||
| 488 | if self.closed { | ||
| 489 | cx.into_iter() | ||
| 490 | .for_each(|cx| self.set_senders_waker(cx.waker())); | ||
| 491 | true | ||
| 492 | } else { | ||
| 493 | false | ||
| 494 | } | ||
| 495 | } | ||
| 496 | |||
| 497 | fn register_receiver(&mut self) { | ||
| 498 | assert!(!self.receiver_registered); | ||
| 499 | self.receiver_registered = true; | ||
| 500 | } | ||
| 501 | |||
| 502 | fn deregister_receiver(&mut self) { | ||
| 503 | if self.receiver_registered { | ||
| 504 | self.closed = true; | ||
| 505 | self.senders_waker.wake(); | ||
| 506 | } | ||
| 507 | self.receiver_registered = false; | ||
| 508 | } | ||
| 509 | |||
| 510 | fn register_sender(&mut self) { | ||
| 511 | self.senders_registered += 1; | ||
| 512 | } | ||
| 513 | |||
| 514 | fn deregister_sender(&mut self) { | ||
| 515 | assert!(self.senders_registered > 0); | ||
| 516 | self.senders_registered -= 1; | ||
| 517 | if self.senders_registered == 0 { | ||
| 518 | self.receiver_waker.wake(); | ||
| 519 | self.closed = true; | ||
| 520 | } | ||
| 521 | } | ||
| 522 | |||
| 523 | fn set_receiver_waker(&mut self, receiver_waker: &Waker) { | ||
| 524 | self.receiver_waker.register(receiver_waker); | ||
| 525 | } | ||
| 526 | |||
| 527 | fn set_senders_waker(&mut self, senders_waker: &Waker) { | ||
| 528 | // Dispose of any existing sender causing them to be polled again. | ||
| 529 | // This could cause a spin given multiple concurrent senders, however given that | ||
| 530 | // most sends only block waiting for the receiver to become active, this should | ||
| 531 | // be a short-lived activity. The upside is a greatly simplified implementation | ||
| 532 | // that avoids the need for intrusive linked-lists and unsafe operations on pinned | ||
| 533 | // pointers. | ||
| 534 | self.senders_waker.wake(); | ||
| 535 | self.senders_waker.register(senders_waker); | ||
| 536 | } | ||
| 537 | } | ||
| 538 | |||
| 539 | /// A a bounded mpsc channel for communicating between asynchronous tasks | ||
| 540 | /// with backpressure. | ||
| 541 | /// | ||
| 542 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 543 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 544 | /// received from the channel. | ||
| 545 | /// | ||
| 546 | /// All data sent will become available in the same order as it was sent. | ||
| 547 | pub struct Channel<M, T, const N: usize> | ||
| 548 | where | ||
| 549 | M: RawMutex, | ||
| 550 | { | ||
| 551 | inner: Mutex<M, RefCell<ChannelState<T, N>>>, | ||
| 552 | } | ||
| 553 | |||
| 554 | impl<M, T, const N: usize> Channel<M, T, N> | ||
| 555 | where | ||
| 556 | M: RawMutex, | ||
| 557 | { | ||
| 558 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 559 | /// | ||
| 560 | /// ``` | ||
| 561 | /// use embassy::channel::mpsc; | ||
| 562 | /// use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 563 | /// use embassy::channel::mpsc::Channel; | ||
| 564 | /// | ||
| 565 | /// // Declare a bounded channel of 3 u32s. | ||
| 566 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 567 | /// // once we have a channel, obtain its sender and receiver | ||
| 568 | /// let (sender, receiver) = mpsc::split(&mut channel); | ||
| 569 | /// ``` | ||
| 570 | #[cfg(feature = "nightly")] | ||
| 571 | pub const fn new() -> Self { | ||
| 572 | Self { | ||
| 573 | inner: Mutex::new(RefCell::new(ChannelState::new())), | ||
| 574 | } | ||
| 575 | } | ||
| 576 | |||
| 577 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 578 | /// | ||
| 579 | /// ``` | ||
| 580 | /// use embassy::channel::mpsc; | ||
| 581 | /// use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 582 | /// use embassy::channel::mpsc::Channel; | ||
| 583 | /// | ||
| 584 | /// // Declare a bounded channel of 3 u32s. | ||
| 585 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 586 | /// // once we have a channel, obtain its sender and receiver | ||
| 587 | /// let (sender, receiver) = mpsc::split(&mut channel); | ||
| 588 | /// ``` | ||
| 589 | #[cfg(not(feature = "nightly"))] | ||
| 590 | pub fn new() -> Self { | ||
| 591 | Self { | ||
| 592 | inner: Mutex::new(RefCell::new(ChannelState::new())), | ||
| 593 | } | ||
| 594 | } | ||
| 595 | |||
| 596 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { | ||
| 597 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | ||
| 598 | } | ||
| 599 | } | ||
| 600 | |||
| 601 | #[cfg(test)] | ||
| 602 | mod tests { | ||
| 603 | use core::time::Duration; | ||
| 604 | |||
| 605 | use futures::task::SpawnExt; | ||
| 606 | use futures_executor::ThreadPool; | ||
| 607 | use futures_timer::Delay; | ||
| 608 | |||
| 609 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; | ||
| 610 | use crate::util::Forever; | ||
| 611 | |||
| 612 | use super::*; | ||
| 613 | |||
| 614 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | ||
| 615 | c.queue.capacity() - c.queue.len() | ||
| 616 | } | ||
| 617 | |||
| 618 | #[test] | ||
| 619 | fn sending_once() { | ||
| 620 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 621 | assert!(c.try_send(1).is_ok()); | ||
| 622 | assert_eq!(capacity(&c), 2); | ||
| 623 | } | ||
| 624 | |||
| 625 | #[test] | ||
| 626 | fn sending_when_full() { | ||
| 627 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 628 | let _ = c.try_send(1); | ||
| 629 | let _ = c.try_send(1); | ||
| 630 | let _ = c.try_send(1); | ||
| 631 | match c.try_send(2) { | ||
| 632 | Err(TrySendError::Full(2)) => assert!(true), | ||
| 633 | _ => assert!(false), | ||
| 634 | } | ||
| 635 | assert_eq!(capacity(&c), 0); | ||
| 636 | } | ||
| 637 | |||
| 638 | #[test] | ||
| 639 | fn sending_when_closed() { | ||
| 640 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 641 | c.closed = true; | ||
| 642 | match c.try_send(2) { | ||
| 643 | Err(TrySendError::Closed(2)) => assert!(true), | ||
| 644 | _ => assert!(false), | ||
| 645 | } | ||
| 646 | } | ||
| 647 | |||
| 648 | #[test] | ||
| 649 | fn receiving_once_with_one_send() { | ||
| 650 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 651 | assert!(c.try_send(1).is_ok()); | ||
| 652 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 653 | assert_eq!(capacity(&c), 3); | ||
| 654 | } | ||
| 655 | |||
| 656 | #[test] | ||
| 657 | fn receiving_when_empty() { | ||
| 658 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 659 | match c.try_recv() { | ||
| 660 | Err(TryRecvError::Empty) => assert!(true), | ||
| 661 | _ => assert!(false), | ||
| 662 | } | ||
| 663 | assert_eq!(capacity(&c), 3); | ||
| 664 | } | ||
| 665 | |||
| 666 | #[test] | ||
| 667 | fn receiving_when_closed() { | ||
| 668 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 669 | c.closed = true; | ||
| 670 | match c.try_recv() { | ||
| 671 | Err(TryRecvError::Closed) => assert!(true), | ||
| 672 | _ => assert!(false), | ||
| 673 | } | ||
| 674 | } | ||
| 675 | |||
| 676 | #[test] | ||
| 677 | fn simple_send_and_receive() { | ||
| 678 | let mut c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 679 | let (s, r) = split(&mut c); | ||
| 680 | assert!(s.clone().try_send(1).is_ok()); | ||
| 681 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 682 | } | ||
| 683 | |||
| 684 | #[test] | ||
| 685 | fn should_close_without_sender() { | ||
| 686 | let mut c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 687 | let (s, r) = split(&mut c); | ||
| 688 | drop(s); | ||
| 689 | match r.try_recv() { | ||
| 690 | Err(TryRecvError::Closed) => assert!(true), | ||
| 691 | _ => assert!(false), | ||
| 692 | } | ||
| 693 | } | ||
| 694 | |||
| 695 | #[test] | ||
| 696 | fn should_close_once_drained() { | ||
| 697 | let mut c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 698 | let (s, r) = split(&mut c); | ||
| 699 | assert!(s.try_send(1).is_ok()); | ||
| 700 | drop(s); | ||
| 701 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 702 | match r.try_recv() { | ||
| 703 | Err(TryRecvError::Closed) => assert!(true), | ||
| 704 | _ => assert!(false), | ||
| 705 | } | ||
| 706 | } | ||
| 707 | |||
| 708 | #[test] | ||
| 709 | fn should_reject_send_when_receiver_dropped() { | ||
| 710 | let mut c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 711 | let (s, r) = split(&mut c); | ||
| 712 | drop(r); | ||
| 713 | match s.try_send(1) { | ||
| 714 | Err(TrySendError::Closed(1)) => assert!(true), | ||
| 715 | _ => assert!(false), | ||
| 716 | } | ||
| 717 | } | ||
| 718 | |||
| 719 | #[test] | ||
| 720 | fn should_reject_send_when_channel_closed() { | ||
| 721 | let mut c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 722 | let (s, mut r) = split(&mut c); | ||
| 723 | assert!(s.try_send(1).is_ok()); | ||
| 724 | r.close(); | ||
| 725 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 726 | match r.try_recv() { | ||
| 727 | Err(TryRecvError::Closed) => assert!(true), | ||
| 728 | _ => assert!(false), | ||
| 729 | } | ||
| 730 | assert!(s.is_closed()); | ||
| 731 | } | ||
| 732 | |||
| 733 | #[futures_test::test] | ||
| 734 | async fn receiver_closes_when_sender_dropped_async() { | ||
| 735 | let executor = ThreadPool::new().unwrap(); | ||
| 736 | |||
| 737 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new(); | ||
| 738 | let c = CHANNEL.put(Channel::new()); | ||
| 739 | let (s, mut r) = split(c); | ||
| 740 | assert!(executor | ||
| 741 | .spawn(async move { | ||
| 742 | drop(s); | ||
| 743 | }) | ||
| 744 | .is_ok()); | ||
| 745 | assert_eq!(r.recv().await, None); | ||
| 746 | } | ||
| 747 | |||
| 748 | #[futures_test::test] | ||
| 749 | async fn receiver_receives_given_try_send_async() { | ||
| 750 | let executor = ThreadPool::new().unwrap(); | ||
| 751 | |||
| 752 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new(); | ||
| 753 | let c = CHANNEL.put(Channel::new()); | ||
| 754 | let (s, mut r) = split(c); | ||
| 755 | assert!(executor | ||
| 756 | .spawn(async move { | ||
| 757 | assert!(s.try_send(1).is_ok()); | ||
| 758 | }) | ||
| 759 | .is_ok()); | ||
| 760 | assert_eq!(r.recv().await, Some(1)); | ||
| 761 | } | ||
| 762 | |||
| 763 | #[futures_test::test] | ||
| 764 | async fn sender_send_completes_if_capacity() { | ||
| 765 | let mut c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); | ||
| 766 | let (s, mut r) = split(&mut c); | ||
| 767 | assert!(s.send(1).await.is_ok()); | ||
| 768 | assert_eq!(r.recv().await, Some(1)); | ||
| 769 | } | ||
| 770 | |||
| 771 | #[futures_test::test] | ||
| 772 | async fn sender_send_completes_if_closed() { | ||
| 773 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new(); | ||
| 774 | let c = CHANNEL.put(Channel::new()); | ||
| 775 | let (s, r) = split(c); | ||
| 776 | drop(r); | ||
| 777 | match s.send(1).await { | ||
| 778 | Err(SendError(1)) => assert!(true), | ||
| 779 | _ => assert!(false), | ||
| 780 | } | ||
| 781 | } | ||
| 782 | |||
| 783 | #[futures_test::test] | ||
| 784 | async fn senders_sends_wait_until_capacity() { | ||
| 785 | let executor = ThreadPool::new().unwrap(); | ||
| 786 | |||
| 787 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new(); | ||
| 788 | let c = CHANNEL.put(Channel::new()); | ||
| 789 | let (s0, mut r) = split(c); | ||
| 790 | assert!(s0.try_send(1).is_ok()); | ||
| 791 | let s1 = s0.clone(); | ||
| 792 | let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); | ||
| 793 | let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await }); | ||
| 794 | // Wish I could think of a means of determining that the async send is waiting instead. | ||
| 795 | // However, I've used the debugger to observe that the send does indeed wait. | ||
| 796 | Delay::new(Duration::from_millis(500)).await; | ||
| 797 | assert_eq!(r.recv().await, Some(1)); | ||
| 798 | assert!(executor | ||
| 799 | .spawn(async move { while let Some(_) = r.recv().await {} }) | ||
| 800 | .is_ok()); | ||
| 801 | assert!(send_task_1.unwrap().await.is_ok()); | ||
| 802 | assert!(send_task_2.unwrap().await.is_ok()); | ||
| 803 | } | ||
| 804 | |||
| 805 | #[futures_test::test] | ||
| 806 | async fn sender_close_completes_if_closing() { | ||
| 807 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new(); | ||
| 808 | let c = CHANNEL.put(Channel::new()); | ||
| 809 | let (s, mut r) = split(c); | ||
| 810 | r.close(); | ||
| 811 | s.closed().await; | ||
| 812 | } | ||
| 813 | |||
| 814 | #[futures_test::test] | ||
| 815 | async fn sender_close_completes_if_closed() { | ||
| 816 | static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new(); | ||
| 817 | let c = CHANNEL.put(Channel::new()); | ||
| 818 | let (s, r) = split(c); | ||
| 819 | drop(r); | ||
| 820 | s.closed().await; | ||
| 821 | } | ||
| 822 | } | ||
diff --git a/embassy/src/channel/signal.rs b/embassy/src/channel/signal.rs index 027f4f47c..e1f6c4b1d 100644 --- a/embassy/src/channel/signal.rs +++ b/embassy/src/channel/signal.rs | |||
| @@ -5,7 +5,7 @@ use core::task::{Context, Poll, Waker}; | |||
| 5 | 5 | ||
| 6 | /// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks. | 6 | /// Synchronization primitive. Allows creating awaitable signals that may be passed between tasks. |
| 7 | /// For a simple use-case where the receiver is only ever interested in the latest value of | 7 | /// For a simple use-case where the receiver is only ever interested in the latest value of |
| 8 | /// something, Signals work well. For more advanced use cases, please consider [crate::channel::mpsc]. | 8 | /// something, Signals work well. For more advanced use cases, you might want to use [`Channel`](crate::channel::channel::Channel) instead.. |
| 9 | /// | 9 | /// |
| 10 | /// Signals are generally declared as being a static const and then borrowed as required. | 10 | /// Signals are generally declared as being a static const and then borrowed as required. |
| 11 | /// | 11 | /// |
diff --git a/examples/nrf/src/bin/channel.rs b/examples/nrf/src/bin/channel.rs new file mode 100644 index 000000000..476ec09a1 --- /dev/null +++ b/examples/nrf/src/bin/channel.rs | |||
| @@ -0,0 +1,45 @@ | |||
| 1 | #![no_std] | ||
| 2 | #![no_main] | ||
| 3 | #![feature(type_alias_impl_trait)] | ||
| 4 | |||
| 5 | use defmt::unwrap; | ||
| 6 | use embassy::blocking_mutex::raw::ThreadModeRawMutex; | ||
| 7 | use embassy::channel::channel::Channel; | ||
| 8 | use embassy::executor::Spawner; | ||
| 9 | use embassy::time::{Duration, Timer}; | ||
| 10 | use embassy_nrf::gpio::{Level, Output, OutputDrive}; | ||
| 11 | use embassy_nrf::Peripherals; | ||
| 12 | |||
| 13 | use defmt_rtt as _; // global logger | ||
| 14 | use panic_probe as _; | ||
| 15 | |||
| 16 | enum LedState { | ||
| 17 | On, | ||
| 18 | Off, | ||
| 19 | } | ||
| 20 | |||
| 21 | static CHANNEL: Channel<ThreadModeRawMutex, LedState, 1> = Channel::new(); | ||
| 22 | |||
| 23 | #[embassy::task] | ||
| 24 | async fn my_task() { | ||
| 25 | loop { | ||
| 26 | CHANNEL.send(LedState::On).await; | ||
| 27 | Timer::after(Duration::from_secs(1)).await; | ||
| 28 | CHANNEL.send(LedState::Off).await; | ||
| 29 | Timer::after(Duration::from_secs(1)).await; | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | #[embassy::main] | ||
| 34 | async fn main(spawner: Spawner, p: Peripherals) { | ||
| 35 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); | ||
| 36 | |||
| 37 | unwrap!(spawner.spawn(my_task())); | ||
| 38 | |||
| 39 | loop { | ||
| 40 | match CHANNEL.recv().await { | ||
| 41 | LedState::On => led.set_high(), | ||
| 42 | LedState::Off => led.set_low(), | ||
| 43 | } | ||
| 44 | } | ||
| 45 | } | ||
diff --git a/examples/nrf/src/bin/channel_sender_receiver.rs b/examples/nrf/src/bin/channel_sender_receiver.rs new file mode 100644 index 000000000..c79f2fd6b --- /dev/null +++ b/examples/nrf/src/bin/channel_sender_receiver.rs | |||
| @@ -0,0 +1,52 @@ | |||
| 1 | #![no_std] | ||
| 2 | #![no_main] | ||
| 3 | #![feature(type_alias_impl_trait)] | ||
| 4 | |||
| 5 | use defmt::unwrap; | ||
| 6 | use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 7 | use embassy::channel::channel::{Channel, Receiver, Sender}; | ||
| 8 | use embassy::executor::Spawner; | ||
| 9 | use embassy::time::{Duration, Timer}; | ||
| 10 | use embassy::util::Forever; | ||
| 11 | use embassy_nrf::gpio::{AnyPin, Level, Output, OutputDrive, Pin}; | ||
| 12 | use embassy_nrf::Peripherals; | ||
| 13 | |||
| 14 | use defmt_rtt as _; // global logger | ||
| 15 | use panic_probe as _; | ||
| 16 | |||
| 17 | enum LedState { | ||
| 18 | On, | ||
| 19 | Off, | ||
| 20 | } | ||
| 21 | |||
| 22 | static CHANNEL: Forever<Channel<NoopRawMutex, LedState, 1>> = Forever::new(); | ||
| 23 | |||
| 24 | #[embassy::task] | ||
| 25 | async fn send_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) { | ||
| 26 | loop { | ||
| 27 | sender.send(LedState::On).await; | ||
| 28 | Timer::after(Duration::from_secs(1)).await; | ||
| 29 | sender.send(LedState::Off).await; | ||
| 30 | Timer::after(Duration::from_secs(1)).await; | ||
| 31 | } | ||
| 32 | } | ||
| 33 | |||
| 34 | #[embassy::task] | ||
| 35 | async fn recv_task(led: AnyPin, receiver: Receiver<'static, NoopRawMutex, LedState, 1>) { | ||
| 36 | let mut led = Output::new(led, Level::Low, OutputDrive::Standard); | ||
| 37 | |||
| 38 | loop { | ||
| 39 | match receiver.recv().await { | ||
| 40 | LedState::On => led.set_high(), | ||
| 41 | LedState::Off => led.set_low(), | ||
| 42 | } | ||
| 43 | } | ||
| 44 | } | ||
| 45 | |||
| 46 | #[embassy::main] | ||
| 47 | async fn main(spawner: Spawner, p: Peripherals) { | ||
| 48 | let channel = CHANNEL.put(Channel::new()); | ||
| 49 | |||
| 50 | unwrap!(spawner.spawn(send_task(channel.sender()))); | ||
| 51 | unwrap!(spawner.spawn(recv_task(p.P0_13.degrade(), channel.receiver()))); | ||
| 52 | } | ||
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs deleted file mode 100644 index 0cb182755..000000000 --- a/examples/nrf/src/bin/mpsc.rs +++ /dev/null | |||
| @@ -1,60 +0,0 @@ | |||
| 1 | #![no_std] | ||
| 2 | #![no_main] | ||
| 3 | #![feature(type_alias_impl_trait)] | ||
| 4 | |||
| 5 | use defmt::unwrap; | ||
| 6 | use embassy::blocking_mutex::raw::NoopRawMutex; | ||
| 7 | use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError}; | ||
| 8 | use embassy::executor::Spawner; | ||
| 9 | use embassy::time::{Duration, Timer}; | ||
| 10 | use embassy::util::Forever; | ||
| 11 | use embassy_nrf::gpio::{Level, Output, OutputDrive}; | ||
| 12 | use embassy_nrf::Peripherals; | ||
| 13 | |||
| 14 | use defmt_rtt as _; // global logger | ||
| 15 | use panic_probe as _; | ||
| 16 | |||
| 17 | enum LedState { | ||
| 18 | On, | ||
| 19 | Off, | ||
| 20 | } | ||
| 21 | |||
| 22 | static CHANNEL: Forever<Channel<NoopRawMutex, LedState, 1>> = Forever::new(); | ||
| 23 | |||
| 24 | #[embassy::task(pool_size = 1)] | ||
| 25 | async fn my_task(sender: Sender<'static, NoopRawMutex, LedState, 1>) { | ||
| 26 | loop { | ||
| 27 | let _ = sender.send(LedState::On).await; | ||
| 28 | Timer::after(Duration::from_secs(1)).await; | ||
| 29 | let _ = sender.send(LedState::Off).await; | ||
| 30 | Timer::after(Duration::from_secs(1)).await; | ||
| 31 | } | ||
| 32 | } | ||
| 33 | |||
| 34 | #[embassy::main] | ||
| 35 | async fn main(spawner: Spawner, p: Peripherals) { | ||
| 36 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); | ||
| 37 | |||
| 38 | let channel = CHANNEL.put(Channel::new()); | ||
| 39 | let (sender, mut receiver) = mpsc::split(channel); | ||
| 40 | |||
| 41 | unwrap!(spawner.spawn(my_task(sender))); | ||
| 42 | |||
| 43 | // We could just loop on `receiver.recv()` for simplicity. The code below | ||
| 44 | // is optimized to drain the queue as fast as possible in the spirit of | ||
| 45 | // handling events as fast as possible. This optimization is benign when in | ||
| 46 | // thread mode, but can be useful when interrupts are sending messages | ||
| 47 | // with the channel having been created via with_critical_sections. | ||
| 48 | loop { | ||
| 49 | let maybe_message = match receiver.try_recv() { | ||
| 50 | m @ Ok(..) => m.ok(), | ||
| 51 | Err(TryRecvError::Empty) => receiver.recv().await, | ||
| 52 | Err(TryRecvError::Closed) => break, | ||
| 53 | }; | ||
| 54 | match maybe_message { | ||
| 55 | Some(LedState::On) => led.set_high(), | ||
| 56 | Some(LedState::Off) => led.set_low(), | ||
| 57 | _ => (), | ||
| 58 | } | ||
| 59 | } | ||
| 60 | } | ||
diff --git a/examples/nrf/src/bin/uart_split.rs b/examples/nrf/src/bin/uart_split.rs index 909429b1a..3fde2f0d8 100644 --- a/examples/nrf/src/bin/uart_split.rs +++ b/examples/nrf/src/bin/uart_split.rs | |||
| @@ -3,10 +3,9 @@ | |||
| 3 | #![feature(type_alias_impl_trait)] | 3 | #![feature(type_alias_impl_trait)] |
| 4 | 4 | ||
| 5 | use defmt::*; | 5 | use defmt::*; |
| 6 | use embassy::blocking_mutex::raw::NoopRawMutex; | 6 | use embassy::blocking_mutex::raw::ThreadModeRawMutex; |
| 7 | use embassy::channel::mpsc::{self, Channel, Sender}; | 7 | use embassy::channel::channel::Channel; |
| 8 | use embassy::executor::Spawner; | 8 | use embassy::executor::Spawner; |
| 9 | use embassy::util::Forever; | ||
| 10 | use embassy_nrf::peripherals::UARTE0; | 9 | use embassy_nrf::peripherals::UARTE0; |
| 11 | use embassy_nrf::uarte::UarteRx; | 10 | use embassy_nrf::uarte::UarteRx; |
| 12 | use embassy_nrf::{interrupt, uarte, Peripherals}; | 11 | use embassy_nrf::{interrupt, uarte, Peripherals}; |
| @@ -14,7 +13,7 @@ use embassy_nrf::{interrupt, uarte, Peripherals}; | |||
| 14 | use defmt_rtt as _; // global logger | 13 | use defmt_rtt as _; // global logger |
| 15 | use panic_probe as _; | 14 | use panic_probe as _; |
| 16 | 15 | ||
| 17 | static CHANNEL: Forever<Channel<NoopRawMutex, [u8; 8], 1>> = Forever::new(); | 16 | static CHANNEL: Channel<ThreadModeRawMutex, [u8; 8], 1> = Channel::new(); |
| 18 | 17 | ||
| 19 | #[embassy::main] | 18 | #[embassy::main] |
| 20 | async fn main(spawner: Spawner, p: Peripherals) { | 19 | async fn main(spawner: Spawner, p: Peripherals) { |
| @@ -26,14 +25,11 @@ async fn main(spawner: Spawner, p: Peripherals) { | |||
| 26 | let uart = uarte::Uarte::new(p.UARTE0, irq, p.P0_08, p.P0_06, config); | 25 | let uart = uarte::Uarte::new(p.UARTE0, irq, p.P0_08, p.P0_06, config); |
| 27 | let (mut tx, rx) = uart.split(); | 26 | let (mut tx, rx) = uart.split(); |
| 28 | 27 | ||
| 29 | let c = CHANNEL.put(Channel::new()); | ||
| 30 | let (s, mut r) = mpsc::split(c); | ||
| 31 | |||
| 32 | info!("uarte initialized!"); | 28 | info!("uarte initialized!"); |
| 33 | 29 | ||
| 34 | // Spawn a task responsible purely for reading | 30 | // Spawn a task responsible purely for reading |
| 35 | 31 | ||
| 36 | unwrap!(spawner.spawn(reader(rx, s))); | 32 | unwrap!(spawner.spawn(reader(rx))); |
| 37 | 33 | ||
| 38 | // Message must be in SRAM | 34 | // Message must be in SRAM |
| 39 | { | 35 | { |
| @@ -48,19 +44,18 @@ async fn main(spawner: Spawner, p: Peripherals) { | |||
| 48 | // back out the buffer we receive from the read | 44 | // back out the buffer we receive from the read |
| 49 | // task. | 45 | // task. |
| 50 | loop { | 46 | loop { |
| 51 | if let Some(buf) = r.recv().await { | 47 | let buf = CHANNEL.recv().await; |
| 52 | info!("writing..."); | 48 | info!("writing..."); |
| 53 | unwrap!(tx.write(&buf).await); | 49 | unwrap!(tx.write(&buf).await); |
| 54 | } | ||
| 55 | } | 50 | } |
| 56 | } | 51 | } |
| 57 | 52 | ||
| 58 | #[embassy::task] | 53 | #[embassy::task] |
| 59 | async fn reader(mut rx: UarteRx<'static, UARTE0>, s: Sender<'static, NoopRawMutex, [u8; 8], 1>) { | 54 | async fn reader(mut rx: UarteRx<'static, UARTE0>) { |
| 60 | let mut buf = [0; 8]; | 55 | let mut buf = [0; 8]; |
| 61 | loop { | 56 | loop { |
| 62 | info!("reading..."); | 57 | info!("reading..."); |
| 63 | unwrap!(rx.read(&mut buf).await); | 58 | unwrap!(rx.read(&mut buf).await); |
| 64 | unwrap!(s.send(buf).await); | 59 | CHANNEL.send(buf).await; |
| 65 | } | 60 | } |
| 66 | } | 61 | } |
diff --git a/examples/stm32f3/src/bin/button_events.rs b/examples/stm32f3/src/bin/button_events.rs index 99aab3027..06e8eec1f 100644 --- a/examples/stm32f3/src/bin/button_events.rs +++ b/examples/stm32f3/src/bin/button_events.rs | |||
| @@ -11,11 +11,10 @@ | |||
| 11 | #![feature(type_alias_impl_trait)] | 11 | #![feature(type_alias_impl_trait)] |
| 12 | 12 | ||
| 13 | use defmt::*; | 13 | use defmt::*; |
| 14 | use embassy::blocking_mutex::raw::NoopRawMutex; | 14 | use embassy::blocking_mutex::raw::ThreadModeRawMutex; |
| 15 | use embassy::channel::mpsc::{self, Channel, Receiver, Sender}; | 15 | use embassy::channel::channel::Channel; |
| 16 | use embassy::executor::Spawner; | 16 | use embassy::executor::Spawner; |
| 17 | use embassy::time::{with_timeout, Duration, Timer}; | 17 | use embassy::time::{with_timeout, Duration, Timer}; |
| 18 | use embassy::util::Forever; | ||
| 19 | use embassy_stm32::exti::ExtiInput; | 18 | use embassy_stm32::exti::ExtiInput; |
| 20 | use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed}; | 19 | use embassy_stm32::gpio::{AnyPin, Input, Level, Output, Pin, Pull, Speed}; |
| 21 | use embassy_stm32::peripherals::PA0; | 20 | use embassy_stm32::peripherals::PA0; |
| @@ -51,14 +50,15 @@ impl<'a> Leds<'a> { | |||
| 51 | } | 50 | } |
| 52 | } | 51 | } |
| 53 | 52 | ||
| 54 | async fn show(&mut self, queue: &mut Receiver<'static, NoopRawMutex, ButtonEvent, 4>) { | 53 | async fn show(&mut self) { |
| 55 | self.leds[self.current_led].set_high(); | 54 | self.leds[self.current_led].set_high(); |
| 56 | if let Ok(new_message) = with_timeout(Duration::from_millis(500), queue.recv()).await { | 55 | if let Ok(new_message) = with_timeout(Duration::from_millis(500), CHANNEL.recv()).await { |
| 57 | self.leds[self.current_led].set_low(); | 56 | self.leds[self.current_led].set_low(); |
| 58 | self.process_event(new_message).await; | 57 | self.process_event(new_message).await; |
| 59 | } else { | 58 | } else { |
| 60 | self.leds[self.current_led].set_low(); | 59 | self.leds[self.current_led].set_low(); |
| 61 | if let Ok(new_message) = with_timeout(Duration::from_millis(200), queue.recv()).await { | 60 | if let Ok(new_message) = with_timeout(Duration::from_millis(200), CHANNEL.recv()).await |
| 61 | { | ||
| 62 | self.process_event(new_message).await; | 62 | self.process_event(new_message).await; |
| 63 | } | 63 | } |
| 64 | } | 64 | } |
| @@ -77,15 +77,18 @@ impl<'a> Leds<'a> { | |||
| 77 | } | 77 | } |
| 78 | } | 78 | } |
| 79 | 79 | ||
| 80 | async fn process_event(&mut self, event: Option<ButtonEvent>) { | 80 | async fn process_event(&mut self, event: ButtonEvent) { |
| 81 | match event { | 81 | match event { |
| 82 | Some(ButtonEvent::SingleClick) => self.move_next(), | 82 | ButtonEvent::SingleClick => { |
| 83 | Some(ButtonEvent::DoubleClick) => { | 83 | self.move_next(); |
| 84 | } | ||
| 85 | ButtonEvent::DoubleClick => { | ||
| 84 | self.change_direction(); | 86 | self.change_direction(); |
| 85 | self.move_next() | 87 | self.move_next(); |
| 88 | } | ||
| 89 | ButtonEvent::Hold => { | ||
| 90 | self.flash().await; | ||
| 86 | } | 91 | } |
| 87 | Some(ButtonEvent::Hold) => self.flash().await, | ||
| 88 | _ => {} | ||
| 89 | } | 92 | } |
| 90 | } | 93 | } |
| 91 | } | 94 | } |
| @@ -97,7 +100,7 @@ enum ButtonEvent { | |||
| 97 | Hold, | 100 | Hold, |
| 98 | } | 101 | } |
| 99 | 102 | ||
| 100 | static BUTTON_EVENTS_QUEUE: Forever<Channel<NoopRawMutex, ButtonEvent, 4>> = Forever::new(); | 103 | static CHANNEL: Channel<ThreadModeRawMutex, ButtonEvent, 4> = Channel::new(); |
| 101 | 104 | ||
| 102 | #[embassy::main] | 105 | #[embassy::main] |
| 103 | async fn main(spawner: Spawner, p: Peripherals) { | 106 | async fn main(spawner: Spawner, p: Peripherals) { |
| @@ -116,27 +119,19 @@ async fn main(spawner: Spawner, p: Peripherals) { | |||
| 116 | ]; | 119 | ]; |
| 117 | let leds = Leds::new(leds); | 120 | let leds = Leds::new(leds); |
| 118 | 121 | ||
| 119 | let buttons_queue = BUTTON_EVENTS_QUEUE.put(Channel::new()); | 122 | spawner.spawn(button_waiter(button)).unwrap(); |
| 120 | let (sender, receiver) = mpsc::split(buttons_queue); | 123 | spawner.spawn(led_blinker(leds)).unwrap(); |
| 121 | spawner.spawn(button_waiter(button, sender)).unwrap(); | ||
| 122 | spawner.spawn(led_blinker(leds, receiver)).unwrap(); | ||
| 123 | } | 124 | } |
| 124 | 125 | ||
| 125 | #[embassy::task] | 126 | #[embassy::task] |
| 126 | async fn led_blinker( | 127 | async fn led_blinker(mut leds: Leds<'static>) { |
| 127 | mut leds: Leds<'static>, | ||
| 128 | mut queue: Receiver<'static, NoopRawMutex, ButtonEvent, 4>, | ||
| 129 | ) { | ||
| 130 | loop { | 128 | loop { |
| 131 | leds.show(&mut queue).await; | 129 | leds.show().await; |
| 132 | } | 130 | } |
| 133 | } | 131 | } |
| 134 | 132 | ||
| 135 | #[embassy::task] | 133 | #[embassy::task] |
| 136 | async fn button_waiter( | 134 | async fn button_waiter(mut button: ExtiInput<'static, PA0>) { |
| 137 | mut button: ExtiInput<'static, PA0>, | ||
| 138 | queue: Sender<'static, NoopRawMutex, ButtonEvent, 4>, | ||
| 139 | ) { | ||
| 140 | const DOUBLE_CLICK_DELAY: u64 = 250; | 135 | const DOUBLE_CLICK_DELAY: u64 = 250; |
| 141 | const HOLD_DELAY: u64 = 1000; | 136 | const HOLD_DELAY: u64 = 1000; |
| 142 | 137 | ||
| @@ -150,9 +145,7 @@ async fn button_waiter( | |||
| 150 | .is_err() | 145 | .is_err() |
| 151 | { | 146 | { |
| 152 | info!("Hold"); | 147 | info!("Hold"); |
| 153 | if queue.send(ButtonEvent::Hold).await.is_err() { | 148 | CHANNEL.send(ButtonEvent::Hold).await; |
| 154 | break; | ||
| 155 | } | ||
| 156 | button.wait_for_falling_edge().await; | 149 | button.wait_for_falling_edge().await; |
| 157 | } else if with_timeout( | 150 | } else if with_timeout( |
| 158 | Duration::from_millis(DOUBLE_CLICK_DELAY), | 151 | Duration::from_millis(DOUBLE_CLICK_DELAY), |
| @@ -161,15 +154,11 @@ async fn button_waiter( | |||
| 161 | .await | 154 | .await |
| 162 | .is_err() | 155 | .is_err() |
| 163 | { | 156 | { |
| 164 | if queue.send(ButtonEvent::SingleClick).await.is_err() { | ||
| 165 | break; | ||
| 166 | } | ||
| 167 | info!("Single click"); | 157 | info!("Single click"); |
| 158 | CHANNEL.send(ButtonEvent::SingleClick).await; | ||
| 168 | } else { | 159 | } else { |
| 169 | info!("Double click"); | 160 | info!("Double click"); |
| 170 | if queue.send(ButtonEvent::DoubleClick).await.is_err() { | 161 | CHANNEL.send(ButtonEvent::DoubleClick).await; |
| 171 | break; | ||
| 172 | } | ||
| 173 | button.wait_for_falling_edge().await; | 162 | button.wait_for_falling_edge().await; |
| 174 | } | 163 | } |
| 175 | button.wait_for_rising_edge().await; | 164 | button.wait_for_rising_edge().await; |
diff --git a/examples/stm32h7/src/bin/usart_split.rs b/examples/stm32h7/src/bin/usart_split.rs index ee1763aa4..40a7c3e44 100644 --- a/examples/stm32h7/src/bin/usart_split.rs +++ b/examples/stm32h7/src/bin/usart_split.rs | |||
| @@ -4,10 +4,9 @@ | |||
| 4 | 4 | ||
| 5 | use defmt::*; | 5 | use defmt::*; |
| 6 | use defmt_rtt as _; // global logger | 6 | use defmt_rtt as _; // global logger |
| 7 | use embassy::blocking_mutex::raw::NoopRawMutex; | 7 | use embassy::blocking_mutex::raw::ThreadModeRawMutex; |
| 8 | use embassy::channel::mpsc::{self, Channel, Sender}; | 8 | use embassy::channel::channel::Channel; |
| 9 | use embassy::executor::Spawner; | 9 | use embassy::executor::Spawner; |
| 10 | use embassy::util::Forever; | ||
| 11 | use embassy_stm32::dma::NoDma; | 10 | use embassy_stm32::dma::NoDma; |
| 12 | use embassy_stm32::{ | 11 | use embassy_stm32::{ |
| 13 | peripherals::{DMA1_CH1, UART7}, | 12 | peripherals::{DMA1_CH1, UART7}, |
| @@ -28,7 +27,7 @@ async fn writer(mut usart: Uart<'static, UART7, NoDma, NoDma>) { | |||
| 28 | } | 27 | } |
| 29 | } | 28 | } |
| 30 | 29 | ||
| 31 | static CHANNEL: Forever<Channel<NoopRawMutex, [u8; 8], 1>> = Forever::new(); | 30 | static CHANNEL: Channel<ThreadModeRawMutex, [u8; 8], 1> = Channel::new(); |
| 32 | 31 | ||
| 33 | #[embassy::main] | 32 | #[embassy::main] |
| 34 | async fn main(spawner: Spawner, p: Peripherals) -> ! { | 33 | async fn main(spawner: Spawner, p: Peripherals) -> ! { |
| @@ -40,28 +39,21 @@ async fn main(spawner: Spawner, p: Peripherals) -> ! { | |||
| 40 | 39 | ||
| 41 | let (mut tx, rx) = usart.split(); | 40 | let (mut tx, rx) = usart.split(); |
| 42 | 41 | ||
| 43 | let c = CHANNEL.put(Channel::new()); | 42 | unwrap!(spawner.spawn(reader(rx))); |
| 44 | let (s, mut r) = mpsc::split(c); | ||
| 45 | |||
| 46 | unwrap!(spawner.spawn(reader(rx, s))); | ||
| 47 | 43 | ||
| 48 | loop { | 44 | loop { |
| 49 | if let Some(buf) = r.recv().await { | 45 | let buf = CHANNEL.recv().await; |
| 50 | info!("writing..."); | 46 | info!("writing..."); |
| 51 | unwrap!(tx.write(&buf).await); | 47 | unwrap!(tx.write(&buf).await); |
| 52 | } | ||
| 53 | } | 48 | } |
| 54 | } | 49 | } |
| 55 | 50 | ||
| 56 | #[embassy::task] | 51 | #[embassy::task] |
| 57 | async fn reader( | 52 | async fn reader(mut rx: UartRx<'static, UART7, DMA1_CH1>) { |
| 58 | mut rx: UartRx<'static, UART7, DMA1_CH1>, | ||
| 59 | s: Sender<'static, NoopRawMutex, [u8; 8], 1>, | ||
| 60 | ) { | ||
| 61 | let mut buf = [0; 8]; | 53 | let mut buf = [0; 8]; |
| 62 | loop { | 54 | loop { |
| 63 | info!("reading..."); | 55 | info!("reading..."); |
| 64 | unwrap!(rx.read(&mut buf).await); | 56 | unwrap!(rx.read(&mut buf).await); |
| 65 | unwrap!(s.send(buf).await); | 57 | CHANNEL.send(buf).await; |
| 66 | } | 58 | } |
| 67 | } | 59 | } |
