diff options
| author | Dario Nieuwenhuis <[email protected]> | 2021-09-12 23:36:52 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2021-09-13 00:08:41 +0200 |
| commit | 70e5877d6823ba0894241e8bedc5cefa7e21bceb (patch) | |
| tree | c8f8671800eff4f2497ea8e4c7a96ba752db4aaa | |
| parent | 5be5bdfd20900ee1973097cf46ec946cad547e30 (diff) | |
embassy/channel: switch to use MutexKind
| -rw-r--r-- | embassy/src/channel/mpsc.rs | 188 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 7 |
2 files changed, 69 insertions, 126 deletions
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs index 3585b8039..9a57c0b19 100644 --- a/embassy/src/channel/mpsc.rs +++ b/embassy/src/channel/mpsc.rs | |||
| @@ -37,7 +37,7 @@ | |||
| 37 | //! | 37 | //! |
| 38 | //! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html | 38 | //! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html |
| 39 | 39 | ||
| 40 | use core::cell::UnsafeCell; | 40 | use core::cell::RefCell; |
| 41 | use core::fmt; | 41 | use core::fmt; |
| 42 | use core::pin::Pin; | 42 | use core::pin::Pin; |
| 43 | use core::task::Context; | 43 | use core::task::Context; |
| @@ -47,7 +47,8 @@ use core::task::Waker; | |||
| 47 | use futures::Future; | 47 | use futures::Future; |
| 48 | use heapless::Deque; | 48 | use heapless::Deque; |
| 49 | 49 | ||
| 50 | use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; | 50 | use crate::blocking_mutex::kind::MutexKind; |
| 51 | use crate::blocking_mutex::Mutex; | ||
| 51 | use crate::waitqueue::WakerRegistration; | 52 | use crate::waitqueue::WakerRegistration; |
| 52 | 53 | ||
| 53 | /// Send values to the associated `Receiver`. | 54 | /// Send values to the associated `Receiver`. |
| @@ -55,35 +56,19 @@ use crate::waitqueue::WakerRegistration; | |||
| 55 | /// Instances are created by the [`split`](split) function. | 56 | /// Instances are created by the [`split`](split) function. |
| 56 | pub struct Sender<'ch, M, T, const N: usize> | 57 | pub struct Sender<'ch, M, T, const N: usize> |
| 57 | where | 58 | where |
| 58 | M: Mutex<Data = ()>, | 59 | M: MutexKind, |
| 59 | { | 60 | { |
| 60 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | 61 | channel: &'ch Channel<M, T, N>, |
| 61 | } | 62 | } |
| 62 | 63 | ||
| 63 | // Safe to pass the sender around | ||
| 64 | unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync | ||
| 65 | {} | ||
| 66 | unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync | ||
| 67 | {} | ||
| 68 | |||
| 69 | /// Receive values from the associated `Sender`. | 64 | /// Receive values from the associated `Sender`. |
| 70 | /// | 65 | /// |
| 71 | /// Instances are created by the [`split`](split) function. | 66 | /// Instances are created by the [`split`](split) function. |
| 72 | pub struct Receiver<'ch, M, T, const N: usize> | 67 | pub struct Receiver<'ch, M, T, const N: usize> |
| 73 | where | 68 | where |
| 74 | M: Mutex<Data = ()>, | 69 | M: MutexKind, |
| 75 | { | ||
| 76 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | ||
| 77 | } | ||
| 78 | |||
| 79 | // Safe to pass the receiver around | ||
| 80 | unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where | ||
| 81 | M: Mutex<Data = ()> + Sync | ||
| 82 | { | ||
| 83 | } | ||
| 84 | unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where | ||
| 85 | M: Mutex<Data = ()> + Sync | ||
| 86 | { | 70 | { |
| 71 | channel: &'ch Channel<M, T, N>, | ||
| 87 | } | 72 | } |
| 88 | 73 | ||
| 89 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. | 74 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. |
| @@ -114,15 +99,11 @@ pub fn split<M, T, const N: usize>( | |||
| 114 | channel: &mut Channel<M, T, N>, | 99 | channel: &mut Channel<M, T, N>, |
| 115 | ) -> (Sender<M, T, N>, Receiver<M, T, N>) | 100 | ) -> (Sender<M, T, N>, Receiver<M, T, N>) |
| 116 | where | 101 | where |
| 117 | M: Mutex<Data = ()>, | 102 | M: MutexKind, |
| 118 | { | 103 | { |
| 119 | let sender = Sender { | 104 | let sender = Sender { channel }; |
| 120 | channel_cell: &channel.channel_cell, | 105 | let receiver = Receiver { channel }; |
| 121 | }; | 106 | channel.lock(|c| { |
| 122 | let receiver = Receiver { | ||
| 123 | channel_cell: &channel.channel_cell, | ||
| 124 | }; | ||
| 125 | Channel::lock(&channel.channel_cell, |c| { | ||
| 126 | c.register_receiver(); | 107 | c.register_receiver(); |
| 127 | c.register_sender(); | 108 | c.register_sender(); |
| 128 | }); | 109 | }); |
| @@ -131,7 +112,7 @@ where | |||
| 131 | 112 | ||
| 132 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | 113 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> |
| 133 | where | 114 | where |
| 134 | M: Mutex<Data = ()>, | 115 | M: MutexKind, |
| 135 | { | 116 | { |
| 136 | /// Receives the next value for this receiver. | 117 | /// Receives the next value for this receiver. |
| 137 | /// | 118 | /// |
| @@ -151,7 +132,7 @@ where | |||
| 151 | /// [`close`]: Self::close | 132 | /// [`close`]: Self::close |
| 152 | pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { | 133 | pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { |
| 153 | RecvFuture { | 134 | RecvFuture { |
| 154 | channel_cell: self.channel_cell, | 135 | channel: self.channel, |
| 155 | } | 136 | } |
| 156 | } | 137 | } |
| 157 | 138 | ||
| @@ -160,7 +141,7 @@ where | |||
| 160 | /// This method will either receive a message from the channel immediately or return an error | 141 | /// This method will either receive a message from the channel immediately or return an error |
| 161 | /// if the channel is empty. | 142 | /// if the channel is empty. |
| 162 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | 143 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
| 163 | Channel::lock(self.channel_cell, |c| c.try_recv()) | 144 | self.channel.lock(|c| c.try_recv()) |
| 164 | } | 145 | } |
| 165 | 146 | ||
| 166 | /// Closes the receiving half of a channel without dropping it. | 147 | /// Closes the receiving half of a channel without dropping it. |
| @@ -174,56 +155,45 @@ where | |||
| 174 | /// until those are released. | 155 | /// until those are released. |
| 175 | /// | 156 | /// |
| 176 | pub fn close(&mut self) { | 157 | pub fn close(&mut self) { |
| 177 | Channel::lock(self.channel_cell, |c| c.close()) | 158 | self.channel.lock(|c| c.close()) |
| 178 | } | 159 | } |
| 179 | } | 160 | } |
| 180 | 161 | ||
| 181 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> | 162 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> |
| 182 | where | 163 | where |
| 183 | M: Mutex<Data = ()>, | 164 | M: MutexKind, |
| 184 | { | 165 | { |
| 185 | fn drop(&mut self) { | 166 | fn drop(&mut self) { |
| 186 | Channel::lock(self.channel_cell, |c| c.deregister_receiver()) | 167 | self.channel.lock(|c| c.deregister_receiver()) |
| 187 | } | 168 | } |
| 188 | } | 169 | } |
| 189 | 170 | ||
| 190 | pub struct RecvFuture<'ch, M, T, const N: usize> | 171 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 191 | where | 172 | where |
| 192 | M: Mutex<Data = ()>, | 173 | M: MutexKind, |
| 193 | { | 174 | { |
| 194 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | 175 | channel: &'ch Channel<M, T, N>, |
| 195 | } | 176 | } |
| 196 | 177 | ||
| 197 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | 178 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> |
| 198 | where | 179 | where |
| 199 | M: Mutex<Data = ()>, | 180 | M: MutexKind, |
| 200 | { | 181 | { |
| 201 | type Output = Option<T>; | 182 | type Output = Option<T>; |
| 202 | 183 | ||
| 203 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | 184 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
| 204 | Channel::lock(self.channel_cell, |c| { | 185 | self.channel |
| 205 | match c.try_recv_with_context(Some(cx)) { | 186 | .lock(|c| match c.try_recv_with_context(Some(cx)) { |
| 206 | Ok(v) => Poll::Ready(Some(v)), | 187 | Ok(v) => Poll::Ready(Some(v)), |
| 207 | Err(TryRecvError::Closed) => Poll::Ready(None), | 188 | Err(TryRecvError::Closed) => Poll::Ready(None), |
| 208 | Err(TryRecvError::Empty) => Poll::Pending, | 189 | Err(TryRecvError::Empty) => Poll::Pending, |
| 209 | } | 190 | }) |
| 210 | }) | ||
| 211 | } | 191 | } |
| 212 | } | 192 | } |
| 213 | 193 | ||
| 214 | // Safe to pass the receive future around since it locks channel whenever polled | ||
| 215 | unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where | ||
| 216 | M: Mutex<Data = ()> + Sync | ||
| 217 | { | ||
| 218 | } | ||
| 219 | unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where | ||
| 220 | M: Mutex<Data = ()> + Sync | ||
| 221 | { | ||
| 222 | } | ||
| 223 | |||
| 224 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | 194 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> |
| 225 | where | 195 | where |
| 226 | M: Mutex<Data = ()>, | 196 | M: MutexKind, |
| 227 | { | 197 | { |
| 228 | /// Sends a value, waiting until there is capacity. | 198 | /// Sends a value, waiting until there is capacity. |
| 229 | /// | 199 | /// |
| @@ -245,7 +215,7 @@ where | |||
| 245 | /// [`Receiver`]: Receiver | 215 | /// [`Receiver`]: Receiver |
| 246 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { | 216 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { |
| 247 | SendFuture { | 217 | SendFuture { |
| 248 | sender: self.clone(), | 218 | channel: self.channel, |
| 249 | message: Some(message), | 219 | message: Some(message), |
| 250 | } | 220 | } |
| 251 | } | 221 | } |
| @@ -271,7 +241,7 @@ where | |||
| 271 | /// [`channel`]: channel | 241 | /// [`channel`]: channel |
| 272 | /// [`close`]: Receiver::close | 242 | /// [`close`]: Receiver::close |
| 273 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | 243 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
| 274 | Channel::lock(self.channel_cell, |c| c.try_send(message)) | 244 | self.channel.lock(|c| c.try_send(message)) |
| 275 | } | 245 | } |
| 276 | 246 | ||
| 277 | /// Completes when the receiver has dropped. | 247 | /// Completes when the receiver has dropped. |
| @@ -280,7 +250,7 @@ where | |||
| 280 | /// values is canceled and immediately stop doing work. | 250 | /// values is canceled and immediately stop doing work. |
| 281 | pub async fn closed(&self) { | 251 | pub async fn closed(&self) { |
| 282 | CloseFuture { | 252 | CloseFuture { |
| 283 | sender: self.clone(), | 253 | channel: self.channel, |
| 284 | } | 254 | } |
| 285 | .await | 255 | .await |
| 286 | } | 256 | } |
| @@ -292,29 +262,27 @@ where | |||
| 292 | /// [`Receiver`]: crate::sync::mpsc::Receiver | 262 | /// [`Receiver`]: crate::sync::mpsc::Receiver |
| 293 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close | 263 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close |
| 294 | pub fn is_closed(&self) -> bool { | 264 | pub fn is_closed(&self) -> bool { |
| 295 | Channel::lock(self.channel_cell, |c| c.is_closed()) | 265 | self.channel.lock(|c| c.is_closed()) |
| 296 | } | 266 | } |
| 297 | } | 267 | } |
| 298 | 268 | ||
| 299 | pub struct SendFuture<'ch, M, T, const N: usize> | 269 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 300 | where | 270 | where |
| 301 | M: Mutex<Data = ()>, | 271 | M: MutexKind, |
| 302 | { | 272 | { |
| 303 | sender: Sender<'ch, M, T, N>, | 273 | channel: &'ch Channel<M, T, N>, |
| 304 | message: Option<T>, | 274 | message: Option<T>, |
| 305 | } | 275 | } |
| 306 | 276 | ||
| 307 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | 277 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> |
| 308 | where | 278 | where |
| 309 | M: Mutex<Data = ()>, | 279 | M: MutexKind, |
| 310 | { | 280 | { |
| 311 | type Output = Result<(), SendError<T>>; | 281 | type Output = Result<(), SendError<T>>; |
| 312 | 282 | ||
| 313 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 283 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 314 | match self.message.take() { | 284 | match self.message.take() { |
| 315 | Some(m) => match Channel::lock(self.sender.channel_cell, |c| { | 285 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { |
| 316 | c.try_send_with_context(m, Some(cx)) | ||
| 317 | }) { | ||
| 318 | Ok(..) => Poll::Ready(Ok(())), | 286 | Ok(..) => Poll::Ready(Ok(())), |
| 319 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), | 287 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), |
| 320 | Err(TrySendError::Full(m)) => { | 288 | Err(TrySendError::Full(m)) => { |
| @@ -327,25 +295,23 @@ where | |||
| 327 | } | 295 | } |
| 328 | } | 296 | } |
| 329 | 297 | ||
| 330 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {} | 298 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: MutexKind {} |
| 331 | 299 | ||
| 332 | struct CloseFuture<'ch, M, T, const N: usize> | 300 | struct CloseFuture<'ch, M, T, const N: usize> |
| 333 | where | 301 | where |
| 334 | M: Mutex<Data = ()>, | 302 | M: MutexKind, |
| 335 | { | 303 | { |
| 336 | sender: Sender<'ch, M, T, N>, | 304 | channel: &'ch Channel<M, T, N>, |
| 337 | } | 305 | } |
| 338 | 306 | ||
| 339 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> | 307 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> |
| 340 | where | 308 | where |
| 341 | M: Mutex<Data = ()>, | 309 | M: MutexKind, |
| 342 | { | 310 | { |
| 343 | type Output = (); | 311 | type Output = (); |
| 344 | 312 | ||
| 345 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 313 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 346 | if Channel::lock(self.sender.channel_cell, |c| { | 314 | if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) { |
| 347 | c.is_closed_with_context(Some(cx)) | ||
| 348 | }) { | ||
| 349 | Poll::Ready(()) | 315 | Poll::Ready(()) |
| 350 | } else { | 316 | } else { |
| 351 | Poll::Pending | 317 | Poll::Pending |
| @@ -355,22 +321,21 @@ where | |||
| 355 | 321 | ||
| 356 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> | 322 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> |
| 357 | where | 323 | where |
| 358 | M: Mutex<Data = ()>, | 324 | M: MutexKind, |
| 359 | { | 325 | { |
| 360 | fn drop(&mut self) { | 326 | fn drop(&mut self) { |
| 361 | Channel::lock(self.channel_cell, |c| c.deregister_sender()) | 327 | self.channel.lock(|c| c.deregister_sender()) |
| 362 | } | 328 | } |
| 363 | } | 329 | } |
| 364 | 330 | ||
| 365 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | 331 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> |
| 366 | where | 332 | where |
| 367 | M: Mutex<Data = ()>, | 333 | M: MutexKind, |
| 368 | { | 334 | { |
| 369 | #[allow(clippy::clone_double_ref)] | ||
| 370 | fn clone(&self) -> Self { | 335 | fn clone(&self) -> Self { |
| 371 | Channel::lock(self.channel_cell, |c| c.register_sender()); | 336 | self.channel.lock(|c| c.register_sender()); |
| 372 | Sender { | 337 | Sender { |
| 373 | channel_cell: self.channel_cell.clone(), | 338 | channel: self.channel, |
| 374 | } | 339 | } |
| 375 | } | 340 | } |
| 376 | } | 341 | } |
| @@ -581,59 +546,35 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 581 | /// All data sent will become available in the same order as it was sent. | 546 | /// All data sent will become available in the same order as it was sent. |
| 582 | pub struct Channel<M, T, const N: usize> | 547 | pub struct Channel<M, T, const N: usize> |
| 583 | where | 548 | where |
| 584 | M: Mutex<Data = ()>, | 549 | M: MutexKind, |
| 585 | { | ||
| 586 | channel_cell: UnsafeCell<ChannelCell<M, T, N>>, | ||
| 587 | } | ||
| 588 | |||
| 589 | struct ChannelCell<M, T, const N: usize> | ||
| 590 | where | ||
| 591 | M: Mutex<Data = ()>, | ||
| 592 | { | 550 | { |
| 593 | mutex: M, | 551 | inner: M::Mutex<RefCell<ChannelState<T, N>>>, |
| 594 | state: ChannelState<T, N>, | ||
| 595 | } | 552 | } |
| 596 | 553 | ||
| 597 | pub type WithCriticalSections = CriticalSectionMutex<()>; | ||
| 598 | |||
| 599 | pub type WithThreadModeOnly = ThreadModeMutex<()>; | ||
| 600 | |||
| 601 | pub type WithNoThreads = NoopMutex<()>; | ||
| 602 | |||
| 603 | impl<M, T, const N: usize> Channel<M, T, N> | 554 | impl<M, T, const N: usize> Channel<M, T, N> |
| 604 | where | 555 | where |
| 605 | M: Mutex<Data = ()>, | 556 | M: MutexKind, |
| 606 | { | 557 | { |
| 607 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | 558 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: |
| 608 | /// | 559 | /// |
| 609 | /// ``` | 560 | /// ``` |
| 610 | /// use embassy::channel::mpsc; | 561 | /// use embassy::channel::mpsc; |
| 611 | /// use embassy::channel::mpsc::{Channel, WithNoThreads}; | 562 | /// use embassy::blocking_mutex::kind::Noop; |
| 563 | /// use embassy::channel::mpsc::Channel; | ||
| 612 | /// | 564 | /// |
| 613 | /// // Declare a bounded channel of 3 u32s. | 565 | /// // Declare a bounded channel of 3 u32s. |
| 614 | /// let mut channel = Channel::<WithNoThreads, u32, 3>::new(); | 566 | /// let mut channel = Channel::<Noop, u32, 3>::new(); |
| 615 | /// // once we have a channel, obtain its sender and receiver | 567 | /// // once we have a channel, obtain its sender and receiver |
| 616 | /// let (sender, receiver) = mpsc::split(&mut channel); | 568 | /// let (sender, receiver) = mpsc::split(&mut channel); |
| 617 | /// ``` | 569 | /// ``` |
| 618 | pub fn new() -> Self { | 570 | pub fn new() -> Self { |
| 619 | let mutex = M::new(()); | 571 | Self { |
| 620 | let state = ChannelState::new(); | 572 | inner: M::Mutex::new(RefCell::new(ChannelState::new())), |
| 621 | let channel_cell = ChannelCell { mutex, state }; | ||
| 622 | Channel { | ||
| 623 | channel_cell: UnsafeCell::new(channel_cell), | ||
| 624 | } | 573 | } |
| 625 | } | 574 | } |
| 626 | 575 | ||
| 627 | fn lock<R>( | 576 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { |
| 628 | channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, | 577 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) |
| 629 | f: impl FnOnce(&mut ChannelState<T, N>) -> R, | ||
| 630 | ) -> R { | ||
| 631 | unsafe { | ||
| 632 | let channel_cell = &mut *(channel_cell.get()); | ||
| 633 | let mutex = &mut channel_cell.mutex; | ||
| 634 | let mut state = &mut channel_cell.state; | ||
| 635 | mutex.lock(|_| f(&mut state)) | ||
| 636 | } | ||
| 637 | } | 578 | } |
| 638 | } | 579 | } |
| 639 | 580 | ||
| @@ -645,6 +586,7 @@ mod tests { | |||
| 645 | use futures_executor::ThreadPool; | 586 | use futures_executor::ThreadPool; |
| 646 | use futures_timer::Delay; | 587 | use futures_timer::Delay; |
| 647 | 588 | ||
| 589 | use crate::blocking_mutex::kind::{CriticalSection, Noop}; | ||
| 648 | use crate::util::Forever; | 590 | use crate::util::Forever; |
| 649 | 591 | ||
| 650 | use super::*; | 592 | use super::*; |
| @@ -713,7 +655,7 @@ mod tests { | |||
| 713 | 655 | ||
| 714 | #[test] | 656 | #[test] |
| 715 | fn simple_send_and_receive() { | 657 | fn simple_send_and_receive() { |
| 716 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 658 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 717 | let (s, r) = split(&mut c); | 659 | let (s, r) = split(&mut c); |
| 718 | assert!(s.clone().try_send(1).is_ok()); | 660 | assert!(s.clone().try_send(1).is_ok()); |
| 719 | assert_eq!(r.try_recv().unwrap(), 1); | 661 | assert_eq!(r.try_recv().unwrap(), 1); |
| @@ -721,7 +663,7 @@ mod tests { | |||
| 721 | 663 | ||
| 722 | #[test] | 664 | #[test] |
| 723 | fn should_close_without_sender() { | 665 | fn should_close_without_sender() { |
| 724 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 666 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 725 | let (s, r) = split(&mut c); | 667 | let (s, r) = split(&mut c); |
| 726 | drop(s); | 668 | drop(s); |
| 727 | match r.try_recv() { | 669 | match r.try_recv() { |
| @@ -732,7 +674,7 @@ mod tests { | |||
| 732 | 674 | ||
| 733 | #[test] | 675 | #[test] |
| 734 | fn should_close_once_drained() { | 676 | fn should_close_once_drained() { |
| 735 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 677 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 736 | let (s, r) = split(&mut c); | 678 | let (s, r) = split(&mut c); |
| 737 | assert!(s.try_send(1).is_ok()); | 679 | assert!(s.try_send(1).is_ok()); |
| 738 | drop(s); | 680 | drop(s); |
| @@ -745,7 +687,7 @@ mod tests { | |||
| 745 | 687 | ||
| 746 | #[test] | 688 | #[test] |
| 747 | fn should_reject_send_when_receiver_dropped() { | 689 | fn should_reject_send_when_receiver_dropped() { |
| 748 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 690 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 749 | let (s, r) = split(&mut c); | 691 | let (s, r) = split(&mut c); |
| 750 | drop(r); | 692 | drop(r); |
| 751 | match s.try_send(1) { | 693 | match s.try_send(1) { |
| @@ -756,7 +698,7 @@ mod tests { | |||
| 756 | 698 | ||
| 757 | #[test] | 699 | #[test] |
| 758 | fn should_reject_send_when_channel_closed() { | 700 | fn should_reject_send_when_channel_closed() { |
| 759 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 701 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 760 | let (s, mut r) = split(&mut c); | 702 | let (s, mut r) = split(&mut c); |
| 761 | assert!(s.try_send(1).is_ok()); | 703 | assert!(s.try_send(1).is_ok()); |
| 762 | r.close(); | 704 | r.close(); |
| @@ -772,7 +714,7 @@ mod tests { | |||
| 772 | async fn receiver_closes_when_sender_dropped_async() { | 714 | async fn receiver_closes_when_sender_dropped_async() { |
| 773 | let executor = ThreadPool::new().unwrap(); | 715 | let executor = ThreadPool::new().unwrap(); |
| 774 | 716 | ||
| 775 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); | 717 | static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new(); |
| 776 | let c = CHANNEL.put(Channel::new()); | 718 | let c = CHANNEL.put(Channel::new()); |
| 777 | let (s, mut r) = split(c); | 719 | let (s, mut r) = split(c); |
| 778 | assert!(executor | 720 | assert!(executor |
| @@ -787,7 +729,7 @@ mod tests { | |||
| 787 | async fn receiver_receives_given_try_send_async() { | 729 | async fn receiver_receives_given_try_send_async() { |
| 788 | let executor = ThreadPool::new().unwrap(); | 730 | let executor = ThreadPool::new().unwrap(); |
| 789 | 731 | ||
| 790 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); | 732 | static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new(); |
| 791 | let c = CHANNEL.put(Channel::new()); | 733 | let c = CHANNEL.put(Channel::new()); |
| 792 | let (s, mut r) = split(c); | 734 | let (s, mut r) = split(c); |
| 793 | assert!(executor | 735 | assert!(executor |
| @@ -800,7 +742,7 @@ mod tests { | |||
| 800 | 742 | ||
| 801 | #[futures_test::test] | 743 | #[futures_test::test] |
| 802 | async fn sender_send_completes_if_capacity() { | 744 | async fn sender_send_completes_if_capacity() { |
| 803 | let mut c = Channel::<WithCriticalSections, u32, 1>::new(); | 745 | let mut c = Channel::<CriticalSection, u32, 1>::new(); |
| 804 | let (s, mut r) = split(&mut c); | 746 | let (s, mut r) = split(&mut c); |
| 805 | assert!(s.send(1).await.is_ok()); | 747 | assert!(s.send(1).await.is_ok()); |
| 806 | assert_eq!(r.recv().await, Some(1)); | 748 | assert_eq!(r.recv().await, Some(1)); |
| @@ -808,7 +750,7 @@ mod tests { | |||
| 808 | 750 | ||
| 809 | #[futures_test::test] | 751 | #[futures_test::test] |
| 810 | async fn sender_send_completes_if_closed() { | 752 | async fn sender_send_completes_if_closed() { |
| 811 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 753 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 812 | let c = CHANNEL.put(Channel::new()); | 754 | let c = CHANNEL.put(Channel::new()); |
| 813 | let (s, r) = split(c); | 755 | let (s, r) = split(c); |
| 814 | drop(r); | 756 | drop(r); |
| @@ -822,7 +764,7 @@ mod tests { | |||
| 822 | async fn senders_sends_wait_until_capacity() { | 764 | async fn senders_sends_wait_until_capacity() { |
| 823 | let executor = ThreadPool::new().unwrap(); | 765 | let executor = ThreadPool::new().unwrap(); |
| 824 | 766 | ||
| 825 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 767 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 826 | let c = CHANNEL.put(Channel::new()); | 768 | let c = CHANNEL.put(Channel::new()); |
| 827 | let (s0, mut r) = split(c); | 769 | let (s0, mut r) = split(c); |
| 828 | assert!(s0.try_send(1).is_ok()); | 770 | assert!(s0.try_send(1).is_ok()); |
| @@ -842,7 +784,7 @@ mod tests { | |||
| 842 | 784 | ||
| 843 | #[futures_test::test] | 785 | #[futures_test::test] |
| 844 | async fn sender_close_completes_if_closing() { | 786 | async fn sender_close_completes_if_closing() { |
| 845 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 787 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 846 | let c = CHANNEL.put(Channel::new()); | 788 | let c = CHANNEL.put(Channel::new()); |
| 847 | let (s, mut r) = split(c); | 789 | let (s, mut r) = split(c); |
| 848 | r.close(); | 790 | r.close(); |
| @@ -851,7 +793,7 @@ mod tests { | |||
| 851 | 793 | ||
| 852 | #[futures_test::test] | 794 | #[futures_test::test] |
| 853 | async fn sender_close_completes_if_closed() { | 795 | async fn sender_close_completes_if_closed() { |
| 854 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 796 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 855 | let c = CHANNEL.put(Channel::new()); | 797 | let c = CHANNEL.put(Channel::new()); |
| 856 | let (s, r) = split(c); | 798 | let (s, r) = split(c); |
| 857 | drop(r); | 799 | drop(r); |
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 79fa3dfb9..c85b7c282 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs | |||
| @@ -6,7 +6,8 @@ | |||
| 6 | mod example_common; | 6 | mod example_common; |
| 7 | 7 | ||
| 8 | use defmt::unwrap; | 8 | use defmt::unwrap; |
| 9 | use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError, WithNoThreads}; | 9 | use embassy::blocking_mutex::kind::Noop; |
| 10 | use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError}; | ||
| 10 | use embassy::executor::Spawner; | 11 | use embassy::executor::Spawner; |
| 11 | use embassy::time::{Duration, Timer}; | 12 | use embassy::time::{Duration, Timer}; |
| 12 | use embassy::util::Forever; | 13 | use embassy::util::Forever; |
| @@ -19,10 +20,10 @@ enum LedState { | |||
| 19 | Off, | 20 | Off, |
| 20 | } | 21 | } |
| 21 | 22 | ||
| 22 | static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new(); | 23 | static CHANNEL: Forever<Channel<Noop, LedState, 1>> = Forever::new(); |
| 23 | 24 | ||
| 24 | #[embassy::task(pool_size = 1)] | 25 | #[embassy::task(pool_size = 1)] |
| 25 | async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { | 26 | async fn my_task(sender: Sender<'static, Noop, LedState, 1>) { |
| 26 | loop { | 27 | loop { |
| 27 | let _ = sender.send(LedState::On).await; | 28 | let _ = sender.send(LedState::On).await; |
| 28 | Timer::after(Duration::from_secs(1)).await; | 29 | Timer::after(Duration::from_secs(1)).await; |
