diff options
| author | Dario Nieuwenhuis <[email protected]> | 2021-07-20 08:01:42 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-07-20 08:01:42 +0200 |
| commit | b04dc7e7837161bc8d9c6cf3cf40d876f2f82c9a (patch) | |
| tree | 1bb10d83863fe53961fb43d6c0b4e9b01201a234 | |
| parent | 17999381877714ccc6bfbb81320a0e51821cdd58 (diff) | |
| parent | 72d6f79ec731980bf776d35bfc0af848a1d994da (diff) | |
Merge pull request #226 from huntc/mpsc
Multi Producer Single Consumer channel
| -rw-r--r-- | embassy/Cargo.toml | 8 | ||||
| -rw-r--r-- | embassy/src/util/mod.rs | 1 | ||||
| -rw-r--r-- | embassy/src/util/mpsc.rs | 854 | ||||
| -rw-r--r-- | embassy/src/util/mutex.rs | 66 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 65 |
5 files changed, 994 insertions, 0 deletions
diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index b2ad80495..c03fc0df5 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml | |||
| @@ -3,6 +3,7 @@ name = "embassy" | |||
| 3 | version = "0.1.0" | 3 | version = "0.1.0" |
| 4 | authors = ["Dario Nieuwenhuis <[email protected]>"] | 4 | authors = ["Dario Nieuwenhuis <[email protected]>"] |
| 5 | edition = "2018" | 5 | edition = "2018" |
| 6 | resolver = "2" | ||
| 6 | 7 | ||
| 7 | [features] | 8 | [features] |
| 8 | default = [] | 9 | default = [] |
| @@ -36,3 +37,10 @@ embedded-hal = "0.2.5" | |||
| 36 | 37 | ||
| 37 | # Workaround https://github.com/japaric/cast.rs/pull/27 | 38 | # Workaround https://github.com/japaric/cast.rs/pull/27 |
| 38 | cast = { version = "=0.2.3", default-features = false } | 39 | cast = { version = "=0.2.3", default-features = false } |
| 40 | |||
| 41 | [dev-dependencies] | ||
| 42 | embassy = { path = ".", features = ["executor-agnostic"] } | ||
| 43 | futures-executor = { version = "0.3", features = [ "thread-pool" ] } | ||
| 44 | futures-test = "0.3" | ||
| 45 | futures-timer = "0.3" | ||
| 46 | futures-util = { version = "0.3", features = [ "channel" ] } | ||
diff --git a/embassy/src/util/mod.rs b/embassy/src/util/mod.rs index 88ae5c285..87d313e28 100644 --- a/embassy/src/util/mod.rs +++ b/embassy/src/util/mod.rs | |||
| @@ -11,6 +11,7 @@ mod waker; | |||
| 11 | 11 | ||
| 12 | pub use drop_bomb::*; | 12 | pub use drop_bomb::*; |
| 13 | pub use forever::*; | 13 | pub use forever::*; |
| 14 | pub mod mpsc; | ||
| 14 | pub use mutex::*; | 15 | pub use mutex::*; |
| 15 | pub use on_drop::*; | 16 | pub use on_drop::*; |
| 16 | pub use portal::*; | 17 | pub use portal::*; |
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs new file mode 100644 index 000000000..cc9e2a5dd --- /dev/null +++ b/embassy/src/util/mpsc.rs | |||
| @@ -0,0 +1,854 @@ | |||
| 1 | //! A multi-producer, single-consumer queue for sending values between | ||
| 2 | //! asynchronous tasks. This queue takes a Mutex type so that various | ||
| 3 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 4 | //! for single-core Cortex-M targets where messages are only passed | ||
| 5 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 6 | //! can also be used for single-core targets where messages are to be | ||
| 7 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 8 | //! | ||
| 9 | //! This module provides a bounded channel that has a limit on the number of | ||
| 10 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 11 | //! another message will result in an error being returned. | ||
| 12 | //! | ||
| 13 | //! Similar to the `mpsc` channels provided by `std`, the channel constructor | ||
| 14 | //! functions provide separate send and receive handles, [`Sender`] and | ||
| 15 | //! [`Receiver`]. If there is no message to read, the current task will be | ||
| 16 | //! notified when a new value is sent. [`Sender`] allows sending values into | ||
| 17 | //! the channel. If the bounded channel is at capacity, the send is rejected. | ||
| 18 | //! | ||
| 19 | //! # Disconnection | ||
| 20 | //! | ||
| 21 | //! When all [`Sender`] handles have been dropped, it is no longer | ||
| 22 | //! possible to send values into the channel. This is considered the termination | ||
| 23 | //! event of the stream. | ||
| 24 | //! | ||
| 25 | //! If the [`Receiver`] handle is dropped, then messages can no longer | ||
| 26 | //! be read out of the channel. In this case, all further attempts to send will | ||
| 27 | //! result in an error. | ||
| 28 | //! | ||
| 29 | //! # Clean Shutdown | ||
| 30 | //! | ||
| 31 | //! When the [`Receiver`] is dropped, it is possible for unprocessed messages to | ||
| 32 | //! remain in the channel. Instead, it is usually desirable to perform a "clean" | ||
| 33 | //! shutdown. To do this, the receiver first calls `close`, which will prevent | ||
| 34 | //! any further messages to be sent into the channel. Then, the receiver | ||
| 35 | //! consumes the channel to completion, at which point the receiver can be | ||
| 36 | //! dropped. | ||
| 37 | //! | ||
| 38 | //! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html | ||
| 39 | |||
| 40 | use core::cell::UnsafeCell; | ||
| 41 | use core::fmt; | ||
| 42 | use core::marker::PhantomData; | ||
| 43 | use core::mem::MaybeUninit; | ||
| 44 | use core::pin::Pin; | ||
| 45 | use core::ptr; | ||
| 46 | use core::task::Context; | ||
| 47 | use core::task::Poll; | ||
| 48 | use core::task::Waker; | ||
| 49 | |||
| 50 | use futures::Future; | ||
| 51 | |||
| 52 | use super::CriticalSectionMutex; | ||
| 53 | use super::Mutex; | ||
| 54 | use super::NoopMutex; | ||
| 55 | use super::ThreadModeMutex; | ||
| 56 | use super::WakerRegistration; | ||
| 57 | |||
| 58 | /// Send values to the associated `Receiver`. | ||
| 59 | /// | ||
| 60 | /// Instances are created by the [`split`](split) function. | ||
| 61 | pub struct Sender<'ch, M, T, const N: usize> | ||
| 62 | where | ||
| 63 | M: Mutex<Data = ()>, | ||
| 64 | { | ||
| 65 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | ||
| 66 | } | ||
| 67 | |||
| 68 | // Safe to pass the sender around | ||
| 69 | unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync | ||
| 70 | {} | ||
| 71 | unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync | ||
| 72 | {} | ||
| 73 | |||
| 74 | /// Receive values from the associated `Sender`. | ||
| 75 | /// | ||
| 76 | /// Instances are created by the [`split`](split) function. | ||
| 77 | pub struct Receiver<'ch, M, T, const N: usize> | ||
| 78 | where | ||
| 79 | M: Mutex<Data = ()>, | ||
| 80 | { | ||
| 81 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | ||
| 82 | _receiver_consumed: &'ch mut PhantomData<()>, | ||
| 83 | } | ||
| 84 | |||
| 85 | // Safe to pass the receiver around | ||
| 86 | unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where | ||
| 87 | M: Mutex<Data = ()> + Sync | ||
| 88 | { | ||
| 89 | } | ||
| 90 | unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where | ||
| 91 | M: Mutex<Data = ()> + Sync | ||
| 92 | { | ||
| 93 | } | ||
| 94 | |||
| 95 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. | ||
| 96 | /// | ||
| 97 | /// All data sent on `Sender` will become available on `Receiver` in the same | ||
| 98 | /// order as it was sent. | ||
| 99 | /// | ||
| 100 | /// The `Sender` can be cloned to `send` to the same channel from multiple code | ||
| 101 | /// locations. Only one `Receiver` is valid. | ||
| 102 | /// | ||
| 103 | /// If the `Receiver` is disconnected while trying to `send`, the `send` method | ||
| 104 | /// will return a `SendError`. Similarly, if `Sender` is disconnected while | ||
| 105 | /// trying to `recv`, the `recv` method will return a `RecvError`. | ||
| 106 | /// | ||
| 107 | /// Note that when splitting the channel, the sender and receiver cannot outlive | ||
| 108 | /// their channel. The following will therefore fail compilation: | ||
| 109 | //// | ||
| 110 | /// ```compile_fail | ||
| 111 | /// use embassy::util::mpsc; | ||
| 112 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; | ||
| 113 | /// | ||
| 114 | /// let (sender, receiver) = { | ||
| 115 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); | ||
| 116 | /// mpsc::split(&mut channel) | ||
| 117 | /// }; | ||
| 118 | /// ``` | ||
| 119 | pub fn split<M, T, const N: usize>( | ||
| 120 | channel: &mut Channel<M, T, N>, | ||
| 121 | ) -> (Sender<M, T, N>, Receiver<M, T, N>) | ||
| 122 | where | ||
| 123 | M: Mutex<Data = ()>, | ||
| 124 | { | ||
| 125 | let sender = Sender { | ||
| 126 | channel_cell: &channel.channel_cell, | ||
| 127 | }; | ||
| 128 | let receiver = Receiver { | ||
| 129 | channel_cell: &channel.channel_cell, | ||
| 130 | _receiver_consumed: &mut channel.receiver_consumed, | ||
| 131 | }; | ||
| 132 | Channel::lock(&channel.channel_cell, |c| { | ||
| 133 | c.register_receiver(); | ||
| 134 | c.register_sender(); | ||
| 135 | }); | ||
| 136 | (sender, receiver) | ||
| 137 | } | ||
| 138 | |||
| 139 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | ||
| 140 | where | ||
| 141 | M: Mutex<Data = ()>, | ||
| 142 | { | ||
| 143 | /// Receives the next value for this receiver. | ||
| 144 | /// | ||
| 145 | /// This method returns `None` if the channel has been closed and there are | ||
| 146 | /// no remaining messages in the channel's buffer. This indicates that no | ||
| 147 | /// further values can ever be received from this `Receiver`. The channel is | ||
| 148 | /// closed when all senders have been dropped, or when [`close`] is called. | ||
| 149 | /// | ||
| 150 | /// If there are no messages in the channel's buffer, but the channel has | ||
| 151 | /// not yet been closed, this method will sleep until a message is sent or | ||
| 152 | /// the channel is closed. | ||
| 153 | /// | ||
| 154 | /// Note that if [`close`] is called, but there are still outstanding | ||
| 155 | /// messages from before it was closed, the channel is not considered | ||
| 156 | /// closed by `recv` until they are all consumed. | ||
| 157 | /// | ||
| 158 | /// [`close`]: Self::close | ||
| 159 | pub async fn recv(&mut self) -> Option<T> { | ||
| 160 | futures::future::poll_fn(|cx| self.recv_poll(cx)).await | ||
| 161 | } | ||
| 162 | |||
| 163 | fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { | ||
| 164 | Channel::lock(self.channel_cell, |c| { | ||
| 165 | match c.try_recv_with_context(Some(cx)) { | ||
| 166 | Ok(v) => Poll::Ready(Some(v)), | ||
| 167 | Err(TryRecvError::Closed) => Poll::Ready(None), | ||
| 168 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 169 | } | ||
| 170 | }) | ||
| 171 | } | ||
| 172 | |||
| 173 | /// Attempts to immediately receive a message on this `Receiver` | ||
| 174 | /// | ||
| 175 | /// This method will either receive a message from the channel immediately or return an error | ||
| 176 | /// if the channel is empty. | ||
| 177 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 178 | Channel::lock(self.channel_cell, |c| c.try_recv()) | ||
| 179 | } | ||
| 180 | |||
| 181 | /// Closes the receiving half of a channel without dropping it. | ||
| 182 | /// | ||
| 183 | /// This prevents any further messages from being sent on the channel while | ||
| 184 | /// still enabling the receiver to drain messages that are buffered. | ||
| 185 | /// | ||
| 186 | /// To guarantee that no messages are dropped, after calling `close()`, | ||
| 187 | /// `recv()` must be called until `None` is returned. If there are | ||
| 188 | /// outstanding messages, the `recv` method will not return `None` | ||
| 189 | /// until those are released. | ||
| 190 | /// | ||
| 191 | pub fn close(&mut self) { | ||
| 192 | Channel::lock(self.channel_cell, |c| c.close()) | ||
| 193 | } | ||
| 194 | } | ||
| 195 | |||
| 196 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> | ||
| 197 | where | ||
| 198 | M: Mutex<Data = ()>, | ||
| 199 | { | ||
| 200 | fn drop(&mut self) { | ||
| 201 | Channel::lock(self.channel_cell, |c| c.deregister_receiver()) | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | ||
| 206 | where | ||
| 207 | M: Mutex<Data = ()>, | ||
| 208 | { | ||
| 209 | /// Sends a value, waiting until there is capacity. | ||
| 210 | /// | ||
| 211 | /// A successful send occurs when it is determined that the other end of the | ||
| 212 | /// channel has not hung up already. An unsuccessful send would be one where | ||
| 213 | /// the corresponding receiver has already been closed. Note that a return | ||
| 214 | /// value of `Err` means that the data will never be received, but a return | ||
| 215 | /// value of `Ok` does not mean that the data will be received. It is | ||
| 216 | /// possible for the corresponding receiver to hang up immediately after | ||
| 217 | /// this function returns `Ok`. | ||
| 218 | /// | ||
| 219 | /// # Errors | ||
| 220 | /// | ||
| 221 | /// If the receive half of the channel is closed, either due to [`close`] | ||
| 222 | /// being called or the [`Receiver`] handle dropping, the function returns | ||
| 223 | /// an error. The error includes the value passed to `send`. | ||
| 224 | /// | ||
| 225 | /// [`close`]: Receiver::close | ||
| 226 | /// [`Receiver`]: Receiver | ||
| 227 | pub async fn send(&self, message: T) -> Result<(), SendError<T>> { | ||
| 228 | SendFuture { | ||
| 229 | sender: self.clone(), | ||
| 230 | message: Some(message), | ||
| 231 | } | ||
| 232 | .await | ||
| 233 | } | ||
| 234 | |||
| 235 | /// Attempts to immediately send a message on this `Sender` | ||
| 236 | /// | ||
| 237 | /// This method differs from [`send`] by returning immediately if the channel's | ||
| 238 | /// buffer is full or no receiver is waiting to acquire some data. Compared | ||
| 239 | /// with [`send`], this function has two failure cases instead of one (one for | ||
| 240 | /// disconnection, one for a full buffer). | ||
| 241 | /// | ||
| 242 | /// # Errors | ||
| 243 | /// | ||
| 244 | /// If the channel capacity has been reached, i.e., the channel has `n` | ||
| 245 | /// buffered values where `n` is the argument passed to [`channel`], then an | ||
| 246 | /// error is returned. | ||
| 247 | /// | ||
| 248 | /// If the receive half of the channel is closed, either due to [`close`] | ||
| 249 | /// being called or the [`Receiver`] handle dropping, the function returns | ||
| 250 | /// an error. The error includes the value passed to `send`. | ||
| 251 | /// | ||
| 252 | /// [`send`]: Sender::send | ||
| 253 | /// [`channel`]: channel | ||
| 254 | /// [`close`]: Receiver::close | ||
| 255 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 256 | Channel::lock(self.channel_cell, |c| c.try_send(message)) | ||
| 257 | } | ||
| 258 | |||
| 259 | /// Completes when the receiver has dropped. | ||
| 260 | /// | ||
| 261 | /// This allows the producers to get notified when interest in the produced | ||
| 262 | /// values is canceled and immediately stop doing work. | ||
| 263 | pub async fn closed(&self) { | ||
| 264 | CloseFuture { | ||
| 265 | sender: self.clone(), | ||
| 266 | } | ||
| 267 | .await | ||
| 268 | } | ||
| 269 | |||
| 270 | /// Checks if the channel has been closed. This happens when the | ||
| 271 | /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is | ||
| 272 | /// called. | ||
| 273 | /// | ||
| 274 | /// [`Receiver`]: crate::sync::mpsc::Receiver | ||
| 275 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close | ||
| 276 | pub fn is_closed(&self) -> bool { | ||
| 277 | Channel::lock(self.channel_cell, |c| c.is_closed()) | ||
| 278 | } | ||
| 279 | } | ||
| 280 | |||
| 281 | struct SendFuture<'ch, M, T, const N: usize> | ||
| 282 | where | ||
| 283 | M: Mutex<Data = ()>, | ||
| 284 | { | ||
| 285 | sender: Sender<'ch, M, T, N>, | ||
| 286 | message: Option<T>, | ||
| 287 | } | ||
| 288 | |||
| 289 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | ||
| 290 | where | ||
| 291 | M: Mutex<Data = ()>, | ||
| 292 | { | ||
| 293 | type Output = Result<(), SendError<T>>; | ||
| 294 | |||
| 295 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 296 | match self.message.take() { | ||
| 297 | Some(m) => match Channel::lock(self.sender.channel_cell, |c| { | ||
| 298 | c.try_send_with_context(m, Some(cx)) | ||
| 299 | }) { | ||
| 300 | Ok(..) => Poll::Ready(Ok(())), | ||
| 301 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), | ||
| 302 | Err(TrySendError::Full(m)) => { | ||
| 303 | self.message.insert(m); | ||
| 304 | Poll::Pending | ||
| 305 | } | ||
| 306 | }, | ||
| 307 | None => panic!("Message cannot be None"), | ||
| 308 | } | ||
| 309 | } | ||
| 310 | } | ||
| 311 | |||
| 312 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {} | ||
| 313 | |||
| 314 | struct CloseFuture<'ch, M, T, const N: usize> | ||
| 315 | where | ||
| 316 | M: Mutex<Data = ()>, | ||
| 317 | { | ||
| 318 | sender: Sender<'ch, M, T, N>, | ||
| 319 | } | ||
| 320 | |||
| 321 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> | ||
| 322 | where | ||
| 323 | M: Mutex<Data = ()>, | ||
| 324 | { | ||
| 325 | type Output = (); | ||
| 326 | |||
| 327 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 328 | if Channel::lock(self.sender.channel_cell, |c| { | ||
| 329 | c.is_closed_with_context(Some(cx)) | ||
| 330 | }) { | ||
| 331 | Poll::Ready(()) | ||
| 332 | } else { | ||
| 333 | Poll::Pending | ||
| 334 | } | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> | ||
| 339 | where | ||
| 340 | M: Mutex<Data = ()>, | ||
| 341 | { | ||
| 342 | fn drop(&mut self) { | ||
| 343 | Channel::lock(self.channel_cell, |c| c.deregister_sender()) | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | ||
| 348 | where | ||
| 349 | M: Mutex<Data = ()>, | ||
| 350 | { | ||
| 351 | #[allow(clippy::clone_double_ref)] | ||
| 352 | fn clone(&self) -> Self { | ||
| 353 | Channel::lock(self.channel_cell, |c| c.register_sender()); | ||
| 354 | Sender { | ||
| 355 | channel_cell: self.channel_cell.clone(), | ||
| 356 | } | ||
| 357 | } | ||
| 358 | } | ||
| 359 | |||
| 360 | /// An error returned from the [`try_recv`] method. | ||
| 361 | /// | ||
| 362 | /// [`try_recv`]: super::Receiver::try_recv | ||
| 363 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 364 | pub enum TryRecvError { | ||
| 365 | /// A message could not be received because the channel is empty. | ||
| 366 | Empty, | ||
| 367 | |||
| 368 | /// The message could not be received because the channel is empty and closed. | ||
| 369 | Closed, | ||
| 370 | } | ||
| 371 | |||
| 372 | /// Error returned by the `Sender`. | ||
| 373 | #[derive(Debug)] | ||
| 374 | pub struct SendError<T>(pub T); | ||
| 375 | |||
| 376 | impl<T> fmt::Display for SendError<T> { | ||
| 377 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 378 | write!(fmt, "channel closed") | ||
| 379 | } | ||
| 380 | } | ||
| 381 | |||
| 382 | /// This enumeration is the list of the possible error outcomes for the | ||
| 383 | /// [try_send](super::Sender::try_send) method. | ||
| 384 | #[derive(Debug)] | ||
| 385 | pub enum TrySendError<T> { | ||
| 386 | /// The data could not be sent on the channel because the channel is | ||
| 387 | /// currently full and sending would require blocking. | ||
| 388 | Full(T), | ||
| 389 | |||
| 390 | /// The receive half of the channel was explicitly closed or has been | ||
| 391 | /// dropped. | ||
| 392 | Closed(T), | ||
| 393 | } | ||
| 394 | |||
| 395 | impl<T> fmt::Display for TrySendError<T> { | ||
| 396 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 397 | write!( | ||
| 398 | fmt, | ||
| 399 | "{}", | ||
| 400 | match self { | ||
| 401 | TrySendError::Full(..) => "no available capacity", | ||
| 402 | TrySendError::Closed(..) => "channel closed", | ||
| 403 | } | ||
| 404 | ) | ||
| 405 | } | ||
| 406 | } | ||
| 407 | |||
| 408 | struct ChannelState<T, const N: usize> { | ||
| 409 | buf: [MaybeUninit<UnsafeCell<T>>; N], | ||
| 410 | read_pos: usize, | ||
| 411 | write_pos: usize, | ||
| 412 | full: bool, | ||
| 413 | closed: bool, | ||
| 414 | receiver_registered: bool, | ||
| 415 | senders_registered: u32, | ||
| 416 | receiver_waker: WakerRegistration, | ||
| 417 | senders_waker: WakerRegistration, | ||
| 418 | } | ||
| 419 | |||
| 420 | impl<T, const N: usize> ChannelState<T, N> { | ||
| 421 | const INIT: MaybeUninit<UnsafeCell<T>> = MaybeUninit::uninit(); | ||
| 422 | |||
| 423 | const fn new() -> Self { | ||
| 424 | ChannelState { | ||
| 425 | buf: [Self::INIT; N], | ||
| 426 | read_pos: 0, | ||
| 427 | write_pos: 0, | ||
| 428 | full: false, | ||
| 429 | closed: false, | ||
| 430 | receiver_registered: false, | ||
| 431 | senders_registered: 0, | ||
| 432 | receiver_waker: WakerRegistration::new(), | ||
| 433 | senders_waker: WakerRegistration::new(), | ||
| 434 | } | ||
| 435 | } | ||
| 436 | |||
| 437 | fn try_recv(&mut self) -> Result<T, TryRecvError> { | ||
| 438 | self.try_recv_with_context(None) | ||
| 439 | } | ||
| 440 | |||
| 441 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 442 | if self.read_pos != self.write_pos || self.full { | ||
| 443 | if self.full { | ||
| 444 | self.full = false; | ||
| 445 | self.senders_waker.wake(); | ||
| 446 | } | ||
| 447 | let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() }; | ||
| 448 | self.read_pos = (self.read_pos + 1) % self.buf.len(); | ||
| 449 | Ok(message) | ||
| 450 | } else if !self.closed { | ||
| 451 | cx.into_iter() | ||
| 452 | .for_each(|cx| self.set_receiver_waker(&cx.waker())); | ||
| 453 | Err(TryRecvError::Empty) | ||
| 454 | } else { | ||
| 455 | Err(TryRecvError::Closed) | ||
| 456 | } | ||
| 457 | } | ||
| 458 | |||
| 459 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { | ||
| 460 | self.try_send_with_context(message, None) | ||
| 461 | } | ||
| 462 | |||
| 463 | fn try_send_with_context( | ||
| 464 | &mut self, | ||
| 465 | message: T, | ||
| 466 | cx: Option<&mut Context<'_>>, | ||
| 467 | ) -> Result<(), TrySendError<T>> { | ||
| 468 | if !self.closed { | ||
| 469 | if !self.full { | ||
| 470 | self.buf[self.write_pos] = MaybeUninit::new(message.into()); | ||
| 471 | self.write_pos = (self.write_pos + 1) % self.buf.len(); | ||
| 472 | if self.write_pos == self.read_pos { | ||
| 473 | self.full = true; | ||
| 474 | } | ||
| 475 | self.receiver_waker.wake(); | ||
| 476 | Ok(()) | ||
| 477 | } else { | ||
| 478 | cx.into_iter() | ||
| 479 | .for_each(|cx| self.set_senders_waker(&cx.waker())); | ||
| 480 | Err(TrySendError::Full(message)) | ||
| 481 | } | ||
| 482 | } else { | ||
| 483 | Err(TrySendError::Closed(message)) | ||
| 484 | } | ||
| 485 | } | ||
| 486 | |||
| 487 | fn close(&mut self) { | ||
| 488 | self.receiver_waker.wake(); | ||
| 489 | self.closed = true; | ||
| 490 | } | ||
| 491 | |||
| 492 | fn is_closed(&mut self) -> bool { | ||
| 493 | self.is_closed_with_context(None) | ||
| 494 | } | ||
| 495 | |||
| 496 | fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool { | ||
| 497 | if self.closed { | ||
| 498 | cx.into_iter() | ||
| 499 | .for_each(|cx| self.set_senders_waker(&cx.waker())); | ||
| 500 | true | ||
| 501 | } else { | ||
| 502 | false | ||
| 503 | } | ||
| 504 | } | ||
| 505 | |||
| 506 | fn register_receiver(&mut self) { | ||
| 507 | assert!(!self.receiver_registered); | ||
| 508 | self.receiver_registered = true; | ||
| 509 | } | ||
| 510 | |||
| 511 | fn deregister_receiver(&mut self) { | ||
| 512 | if self.receiver_registered { | ||
| 513 | self.closed = true; | ||
| 514 | self.senders_waker.wake(); | ||
| 515 | } | ||
| 516 | self.receiver_registered = false; | ||
| 517 | } | ||
| 518 | |||
| 519 | fn register_sender(&mut self) { | ||
| 520 | self.senders_registered += 1; | ||
| 521 | } | ||
| 522 | |||
| 523 | fn deregister_sender(&mut self) { | ||
| 524 | assert!(self.senders_registered > 0); | ||
| 525 | self.senders_registered -= 1; | ||
| 526 | if self.senders_registered == 0 { | ||
| 527 | self.receiver_waker.wake(); | ||
| 528 | self.closed = true; | ||
| 529 | } | ||
| 530 | } | ||
| 531 | |||
| 532 | fn set_receiver_waker(&mut self, receiver_waker: &Waker) { | ||
| 533 | self.receiver_waker.register(receiver_waker); | ||
| 534 | } | ||
| 535 | |||
| 536 | fn set_senders_waker(&mut self, senders_waker: &Waker) { | ||
| 537 | // Dispose of any existing sender causing them to be polled again. | ||
| 538 | // This could cause a spin given multiple concurrent senders, however given that | ||
| 539 | // most sends only block waiting for the receiver to become active, this should | ||
| 540 | // be a short-lived activity. The upside is a greatly simplified implementation | ||
| 541 | // that avoids the need for intrusive linked-lists and unsafe operations on pinned | ||
| 542 | // pointers. | ||
| 543 | self.senders_waker.wake(); | ||
| 544 | self.senders_waker.register(senders_waker); | ||
| 545 | } | ||
| 546 | } | ||
| 547 | |||
| 548 | impl<T, const N: usize> Drop for ChannelState<T, N> { | ||
| 549 | fn drop(&mut self) { | ||
| 550 | while self.read_pos != self.write_pos || self.full { | ||
| 551 | self.full = false; | ||
| 552 | unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) }; | ||
| 553 | self.read_pos = (self.read_pos + 1) % N; | ||
| 554 | } | ||
| 555 | } | ||
| 556 | } | ||
| 557 | |||
| 558 | /// A a bounded mpsc channel for communicating between asynchronous tasks | ||
| 559 | /// with backpressure. | ||
| 560 | /// | ||
| 561 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 562 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 563 | /// received from the channel. | ||
| 564 | /// | ||
| 565 | /// All data sent will become available in the same order as it was sent. | ||
| 566 | pub struct Channel<M, T, const N: usize> | ||
| 567 | where | ||
| 568 | M: Mutex<Data = ()>, | ||
| 569 | { | ||
| 570 | channel_cell: UnsafeCell<ChannelCell<M, T, N>>, | ||
| 571 | receiver_consumed: PhantomData<()>, | ||
| 572 | } | ||
| 573 | |||
| 574 | struct ChannelCell<M, T, const N: usize> | ||
| 575 | where | ||
| 576 | M: Mutex<Data = ()>, | ||
| 577 | { | ||
| 578 | mutex: M, | ||
| 579 | state: ChannelState<T, N>, | ||
| 580 | } | ||
| 581 | |||
| 582 | pub type WithCriticalSections = CriticalSectionMutex<()>; | ||
| 583 | |||
| 584 | pub type WithThreadModeOnly = ThreadModeMutex<()>; | ||
| 585 | |||
| 586 | pub type WithNoThreads = NoopMutex<()>; | ||
| 587 | |||
| 588 | impl<M, T, const N: usize> Channel<M, T, N> | ||
| 589 | where | ||
| 590 | M: Mutex<Data = ()>, | ||
| 591 | { | ||
| 592 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 593 | /// | ||
| 594 | /// ``` | ||
| 595 | /// use embassy::util::mpsc; | ||
| 596 | /// use embassy::util::mpsc::{Channel, WithNoThreads}; | ||
| 597 | /// | ||
| 598 | /// // Declare a bounded channel of 3 u32s. | ||
| 599 | /// let mut channel = Channel::<WithNoThreads, u32, 3>::new(); | ||
| 600 | /// // once we have a channel, obtain its sender and receiver | ||
| 601 | /// let (sender, receiver) = mpsc::split(&mut channel); | ||
| 602 | /// ``` | ||
| 603 | pub fn new() -> Self { | ||
| 604 | let mutex = M::new(()); | ||
| 605 | let state = ChannelState::new(); | ||
| 606 | let channel_cell = ChannelCell { mutex, state }; | ||
| 607 | Channel { | ||
| 608 | channel_cell: UnsafeCell::new(channel_cell), | ||
| 609 | receiver_consumed: PhantomData, | ||
| 610 | } | ||
| 611 | } | ||
| 612 | |||
| 613 | fn lock<R>( | ||
| 614 | channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, | ||
| 615 | f: impl FnOnce(&mut ChannelState<T, N>) -> R, | ||
| 616 | ) -> R { | ||
| 617 | unsafe { | ||
| 618 | let channel_cell = &mut *(channel_cell.get()); | ||
| 619 | let mutex = &mut channel_cell.mutex; | ||
| 620 | let mut state = &mut channel_cell.state; | ||
| 621 | mutex.lock(|_| f(&mut state)) | ||
| 622 | } | ||
| 623 | } | ||
| 624 | } | ||
| 625 | |||
| 626 | #[cfg(test)] | ||
| 627 | mod tests { | ||
| 628 | use core::time::Duration; | ||
| 629 | |||
| 630 | use futures::task::SpawnExt; | ||
| 631 | use futures_executor::ThreadPool; | ||
| 632 | use futures_timer::Delay; | ||
| 633 | |||
| 634 | use crate::util::Forever; | ||
| 635 | |||
| 636 | use super::*; | ||
| 637 | |||
| 638 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | ||
| 639 | if !c.full { | ||
| 640 | if c.write_pos > c.read_pos { | ||
| 641 | (c.buf.len() - c.write_pos) + c.read_pos | ||
| 642 | } else { | ||
| 643 | (c.buf.len() - c.read_pos) + c.write_pos | ||
| 644 | } | ||
| 645 | } else { | ||
| 646 | 0 | ||
| 647 | } | ||
| 648 | } | ||
| 649 | |||
| 650 | #[test] | ||
| 651 | fn sending_once() { | ||
| 652 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 653 | assert!(c.try_send(1).is_ok()); | ||
| 654 | assert_eq!(capacity(&c), 2); | ||
| 655 | } | ||
| 656 | |||
| 657 | #[test] | ||
| 658 | fn sending_when_full() { | ||
| 659 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 660 | let _ = c.try_send(1); | ||
| 661 | let _ = c.try_send(1); | ||
| 662 | let _ = c.try_send(1); | ||
| 663 | match c.try_send(2) { | ||
| 664 | Err(TrySendError::Full(2)) => assert!(true), | ||
| 665 | _ => assert!(false), | ||
| 666 | } | ||
| 667 | assert_eq!(capacity(&c), 0); | ||
| 668 | } | ||
| 669 | |||
| 670 | #[test] | ||
| 671 | fn sending_when_closed() { | ||
| 672 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 673 | c.closed = true; | ||
| 674 | match c.try_send(2) { | ||
| 675 | Err(TrySendError::Closed(2)) => assert!(true), | ||
| 676 | _ => assert!(false), | ||
| 677 | } | ||
| 678 | } | ||
| 679 | |||
| 680 | #[test] | ||
| 681 | fn receiving_once_with_one_send() { | ||
| 682 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 683 | assert!(c.try_send(1).is_ok()); | ||
| 684 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 685 | assert_eq!(capacity(&c), 3); | ||
| 686 | } | ||
| 687 | |||
| 688 | #[test] | ||
| 689 | fn receiving_when_empty() { | ||
| 690 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 691 | match c.try_recv() { | ||
| 692 | Err(TryRecvError::Empty) => assert!(true), | ||
| 693 | _ => assert!(false), | ||
| 694 | } | ||
| 695 | assert_eq!(capacity(&c), 3); | ||
| 696 | } | ||
| 697 | |||
| 698 | #[test] | ||
| 699 | fn receiving_when_closed() { | ||
| 700 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 701 | c.closed = true; | ||
| 702 | match c.try_recv() { | ||
| 703 | Err(TryRecvError::Closed) => assert!(true), | ||
| 704 | _ => assert!(false), | ||
| 705 | } | ||
| 706 | } | ||
| 707 | |||
| 708 | #[test] | ||
| 709 | fn simple_send_and_receive() { | ||
| 710 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | ||
| 711 | let (s, r) = split(&mut c); | ||
| 712 | assert!(s.clone().try_send(1).is_ok()); | ||
| 713 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 714 | } | ||
| 715 | |||
| 716 | #[test] | ||
| 717 | fn should_close_without_sender() { | ||
| 718 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | ||
| 719 | let (s, r) = split(&mut c); | ||
| 720 | drop(s); | ||
| 721 | match r.try_recv() { | ||
| 722 | Err(TryRecvError::Closed) => assert!(true), | ||
| 723 | _ => assert!(false), | ||
| 724 | } | ||
| 725 | } | ||
| 726 | |||
| 727 | #[test] | ||
| 728 | fn should_close_once_drained() { | ||
| 729 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | ||
| 730 | let (s, r) = split(&mut c); | ||
| 731 | assert!(s.try_send(1).is_ok()); | ||
| 732 | drop(s); | ||
| 733 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 734 | match r.try_recv() { | ||
| 735 | Err(TryRecvError::Closed) => assert!(true), | ||
| 736 | _ => assert!(false), | ||
| 737 | } | ||
| 738 | } | ||
| 739 | |||
| 740 | #[test] | ||
| 741 | fn should_reject_send_when_receiver_dropped() { | ||
| 742 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | ||
| 743 | let (s, r) = split(&mut c); | ||
| 744 | drop(r); | ||
| 745 | match s.try_send(1) { | ||
| 746 | Err(TrySendError::Closed(1)) => assert!(true), | ||
| 747 | _ => assert!(false), | ||
| 748 | } | ||
| 749 | } | ||
| 750 | |||
| 751 | #[test] | ||
| 752 | fn should_reject_send_when_channel_closed() { | ||
| 753 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | ||
| 754 | let (s, mut r) = split(&mut c); | ||
| 755 | assert!(s.try_send(1).is_ok()); | ||
| 756 | r.close(); | ||
| 757 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 758 | match r.try_recv() { | ||
| 759 | Err(TryRecvError::Closed) => assert!(true), | ||
| 760 | _ => assert!(false), | ||
| 761 | } | ||
| 762 | assert!(s.is_closed()); | ||
| 763 | } | ||
| 764 | |||
| 765 | #[futures_test::test] | ||
| 766 | async fn receiver_closes_when_sender_dropped_async() { | ||
| 767 | let executor = ThreadPool::new().unwrap(); | ||
| 768 | |||
| 769 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); | ||
| 770 | let c = CHANNEL.put(Channel::new()); | ||
| 771 | let (s, mut r) = split(c); | ||
| 772 | assert!(executor | ||
| 773 | .spawn(async move { | ||
| 774 | drop(s); | ||
| 775 | }) | ||
| 776 | .is_ok()); | ||
| 777 | assert_eq!(r.recv().await, None); | ||
| 778 | } | ||
| 779 | |||
| 780 | #[futures_test::test] | ||
| 781 | async fn receiver_receives_given_try_send_async() { | ||
| 782 | let executor = ThreadPool::new().unwrap(); | ||
| 783 | |||
| 784 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); | ||
| 785 | let c = CHANNEL.put(Channel::new()); | ||
| 786 | let (s, mut r) = split(c); | ||
| 787 | assert!(executor | ||
| 788 | .spawn(async move { | ||
| 789 | assert!(s.try_send(1).is_ok()); | ||
| 790 | }) | ||
| 791 | .is_ok()); | ||
| 792 | assert_eq!(r.recv().await, Some(1)); | ||
| 793 | } | ||
| 794 | |||
| 795 | #[futures_test::test] | ||
| 796 | async fn sender_send_completes_if_capacity() { | ||
| 797 | let mut c = Channel::<WithCriticalSections, u32, 1>::new(); | ||
| 798 | let (s, mut r) = split(&mut c); | ||
| 799 | assert!(s.send(1).await.is_ok()); | ||
| 800 | assert_eq!(r.recv().await, Some(1)); | ||
| 801 | } | ||
| 802 | |||
| 803 | #[futures_test::test] | ||
| 804 | async fn sender_send_completes_if_closed() { | ||
| 805 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | ||
| 806 | let c = CHANNEL.put(Channel::new()); | ||
| 807 | let (s, r) = split(c); | ||
| 808 | drop(r); | ||
| 809 | match s.send(1).await { | ||
| 810 | Err(SendError(1)) => assert!(true), | ||
| 811 | _ => assert!(false), | ||
| 812 | } | ||
| 813 | } | ||
| 814 | |||
| 815 | #[futures_test::test] | ||
| 816 | async fn senders_sends_wait_until_capacity() { | ||
| 817 | let executor = ThreadPool::new().unwrap(); | ||
| 818 | |||
| 819 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | ||
| 820 | let c = CHANNEL.put(Channel::new()); | ||
| 821 | let (s0, mut r) = split(c); | ||
| 822 | assert!(s0.try_send(1).is_ok()); | ||
| 823 | let s1 = s0.clone(); | ||
| 824 | let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); | ||
| 825 | let send_task_2 = executor.spawn_with_handle(async move { s1.send(3).await }); | ||
| 826 | // Wish I could think of a means of determining that the async send is waiting instead. | ||
| 827 | // However, I've used the debugger to observe that the send does indeed wait. | ||
| 828 | assert!(Delay::new(Duration::from_millis(500)).await.is_ok()); | ||
| 829 | assert_eq!(r.recv().await, Some(1)); | ||
| 830 | assert!(executor | ||
| 831 | .spawn(async move { while let Some(_) = r.recv().await {} }) | ||
| 832 | .is_ok()); | ||
| 833 | assert!(send_task_1.unwrap().await.is_ok()); | ||
| 834 | assert!(send_task_2.unwrap().await.is_ok()); | ||
| 835 | } | ||
| 836 | |||
| 837 | #[futures_test::test] | ||
| 838 | async fn sender_close_completes_if_closing() { | ||
| 839 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | ||
| 840 | let c = CHANNEL.put(Channel::new()); | ||
| 841 | let (s, mut r) = split(c); | ||
| 842 | r.close(); | ||
| 843 | s.closed().await; | ||
| 844 | } | ||
| 845 | |||
| 846 | #[futures_test::test] | ||
| 847 | async fn sender_close_completes_if_closed() { | ||
| 848 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | ||
| 849 | let c = CHANNEL.put(Channel::new()); | ||
| 850 | let (s, r) = split(c); | ||
| 851 | drop(r); | ||
| 852 | s.closed().await; | ||
| 853 | } | ||
| 854 | } | ||
diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs index e4b7764ce..0506ffe6f 100644 --- a/embassy/src/util/mutex.rs +++ b/embassy/src/util/mutex.rs | |||
| @@ -1,6 +1,19 @@ | |||
| 1 | use core::cell::UnsafeCell; | 1 | use core::cell::UnsafeCell; |
| 2 | use critical_section::CriticalSection; | 2 | use critical_section::CriticalSection; |
| 3 | 3 | ||
| 4 | /// Any object implementing this trait guarantees exclusive access to the data contained | ||
| 5 | /// within the mutex for the duration of the lock. | ||
| 6 | /// Adapted from https://github.com/rust-embedded/mutex-trait. | ||
| 7 | pub trait Mutex { | ||
| 8 | /// Data protected by the mutex. | ||
| 9 | type Data; | ||
| 10 | |||
| 11 | fn new(data: Self::Data) -> Self; | ||
| 12 | |||
| 13 | /// Creates a critical section and grants temporary access to the protected data. | ||
| 14 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; | ||
| 15 | } | ||
| 16 | |||
| 4 | /// A "mutex" based on critical sections | 17 | /// A "mutex" based on critical sections |
| 5 | /// | 18 | /// |
| 6 | /// # Safety | 19 | /// # Safety |
| @@ -33,6 +46,18 @@ impl<T> CriticalSectionMutex<T> { | |||
| 33 | } | 46 | } |
| 34 | } | 47 | } |
| 35 | 48 | ||
| 49 | impl<T> Mutex for CriticalSectionMutex<T> { | ||
| 50 | type Data = T; | ||
| 51 | |||
| 52 | fn new(data: T) -> Self { | ||
| 53 | Self::new(data) | ||
| 54 | } | ||
| 55 | |||
| 56 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | ||
| 57 | critical_section::with(|cs| f(self.borrow(cs))) | ||
| 58 | } | ||
| 59 | } | ||
| 60 | |||
| 36 | /// A "mutex" that only allows borrowing from thread mode. | 61 | /// A "mutex" that only allows borrowing from thread mode. |
| 37 | /// | 62 | /// |
| 38 | /// # Safety | 63 | /// # Safety |
| @@ -70,6 +95,18 @@ impl<T> ThreadModeMutex<T> { | |||
| 70 | } | 95 | } |
| 71 | } | 96 | } |
| 72 | 97 | ||
| 98 | impl<T> Mutex for ThreadModeMutex<T> { | ||
| 99 | type Data = T; | ||
| 100 | |||
| 101 | fn new(data: T) -> Self { | ||
| 102 | Self::new(data) | ||
| 103 | } | ||
| 104 | |||
| 105 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | ||
| 106 | f(self.borrow()) | ||
| 107 | } | ||
| 108 | } | ||
| 109 | |||
| 73 | pub fn in_thread_mode() -> bool { | 110 | pub fn in_thread_mode() -> bool { |
| 74 | #[cfg(feature = "std")] | 111 | #[cfg(feature = "std")] |
| 75 | return Some("main") == std::thread::current().name(); | 112 | return Some("main") == std::thread::current().name(); |
| @@ -78,3 +115,32 @@ pub fn in_thread_mode() -> bool { | |||
| 78 | return cortex_m::peripheral::SCB::vect_active() | 115 | return cortex_m::peripheral::SCB::vect_active() |
| 79 | == cortex_m::peripheral::scb::VectActive::ThreadMode; | 116 | == cortex_m::peripheral::scb::VectActive::ThreadMode; |
| 80 | } | 117 | } |
| 118 | |||
| 119 | /// A "mutex" that does nothing and cannot be shared between threads. | ||
| 120 | pub struct NoopMutex<T> { | ||
| 121 | inner: T, | ||
| 122 | } | ||
| 123 | |||
| 124 | impl<T> NoopMutex<T> { | ||
| 125 | pub const fn new(value: T) -> Self { | ||
| 126 | NoopMutex { inner: value } | ||
| 127 | } | ||
| 128 | } | ||
| 129 | |||
| 130 | impl<T> NoopMutex<T> { | ||
| 131 | pub fn borrow(&self) -> &T { | ||
| 132 | &self.inner | ||
| 133 | } | ||
| 134 | } | ||
| 135 | |||
| 136 | impl<T> Mutex for NoopMutex<T> { | ||
| 137 | type Data = T; | ||
| 138 | |||
| 139 | fn new(data: T) -> Self { | ||
| 140 | Self::new(data) | ||
| 141 | } | ||
| 142 | |||
| 143 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | ||
| 144 | f(self.borrow()) | ||
| 145 | } | ||
| 146 | } | ||
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs new file mode 100644 index 000000000..443955239 --- /dev/null +++ b/examples/nrf/src/bin/mpsc.rs | |||
| @@ -0,0 +1,65 @@ | |||
| 1 | #![no_std] | ||
| 2 | #![no_main] | ||
| 3 | #![feature(min_type_alias_impl_trait)] | ||
| 4 | #![feature(impl_trait_in_bindings)] | ||
| 5 | #![feature(type_alias_impl_trait)] | ||
| 6 | #![allow(incomplete_features)] | ||
| 7 | |||
| 8 | #[path = "../example_common.rs"] | ||
| 9 | mod example_common; | ||
| 10 | |||
| 11 | use defmt::panic; | ||
| 12 | use embassy::executor::Spawner; | ||
| 13 | use embassy::time::{Duration, Timer}; | ||
| 14 | use embassy::util::mpsc::TryRecvError; | ||
| 15 | use embassy::util::{mpsc, Forever}; | ||
| 16 | use embassy_nrf::gpio::{Level, Output, OutputDrive}; | ||
| 17 | use embassy_nrf::Peripherals; | ||
| 18 | use embedded_hal::digital::v2::OutputPin; | ||
| 19 | use mpsc::{Channel, Sender, WithNoThreads}; | ||
| 20 | |||
| 21 | enum LedState { | ||
| 22 | On, | ||
| 23 | Off, | ||
| 24 | } | ||
| 25 | |||
| 26 | static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new(); | ||
| 27 | |||
| 28 | #[embassy::task(pool_size = 1)] | ||
| 29 | async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { | ||
| 30 | loop { | ||
| 31 | let _ = sender.send(LedState::On).await; | ||
| 32 | Timer::after(Duration::from_secs(1)).await; | ||
| 33 | let _ = sender.send(LedState::Off).await; | ||
| 34 | Timer::after(Duration::from_secs(1)).await; | ||
| 35 | } | ||
| 36 | } | ||
| 37 | |||
| 38 | #[embassy::main] | ||
| 39 | async fn main(spawner: Spawner, p: Peripherals) { | ||
| 40 | |||
| 41 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); | ||
| 42 | |||
| 43 | let channel = CHANNEL.put(Channel::new()); | ||
| 44 | let (sender, mut receiver) = mpsc::split(channel); | ||
| 45 | |||
| 46 | spawner.spawn(my_task(sender)).unwrap(); | ||
| 47 | |||
| 48 | // We could just loop on `receiver.recv()` for simplicity. The code below | ||
| 49 | // is optimized to drain the queue as fast as possible in the spirit of | ||
| 50 | // handling events as fast as possible. This optimization is benign when in | ||
| 51 | // thread mode, but can be useful when interrupts are sending messages | ||
| 52 | // with the channel having been created via with_critical_sections. | ||
| 53 | loop { | ||
| 54 | let maybe_message = match receiver.try_recv() { | ||
| 55 | m @ Ok(..) => m.ok(), | ||
| 56 | Err(TryRecvError::Empty) => receiver.recv().await, | ||
| 57 | Err(TryRecvError::Closed) => break, | ||
| 58 | }; | ||
| 59 | match maybe_message { | ||
| 60 | Some(LedState::On) => led.set_high().unwrap(), | ||
| 61 | Some(LedState::Off) => led.set_low().unwrap(), | ||
| 62 | _ => (), | ||
| 63 | } | ||
| 64 | } | ||
| 65 | } | ||
