diff options
| author | huntc <[email protected]> | 2021-07-06 23:20:47 +1000 |
|---|---|---|
| committer | huntc <[email protected]> | 2021-07-15 12:31:52 +1000 |
| commit | 816b78c0d9733362d8653eb2032f126e6a710030 (patch) | |
| tree | 5748746b9f292d8f383e30ecf48a82bc916862e7 | |
| parent | 1b9d5e50710cefde4bd1e234695783d62e824c68 (diff) | |
Reduces the types on sender and receiver
In exchange for an UnsafeCell being passed into split
| -rw-r--r-- | embassy/src/util/mpsc.rs | 248 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 8 |
2 files changed, 110 insertions, 146 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index d24eb00bf..d8a010d7d 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs | |||
| @@ -39,7 +39,6 @@ | |||
| 39 | 39 | ||
| 40 | use core::cell::UnsafeCell; | 40 | use core::cell::UnsafeCell; |
| 41 | use core::fmt; | 41 | use core::fmt; |
| 42 | use core::marker::PhantomData; | ||
| 43 | use core::mem::MaybeUninit; | 42 | use core::mem::MaybeUninit; |
| 44 | use core::pin::Pin; | 43 | use core::pin::Pin; |
| 45 | use core::task::Context; | 44 | use core::task::Context; |
| @@ -55,32 +54,24 @@ use super::ThreadModeMutex; | |||
| 55 | /// Send values to the associated `Receiver`. | 54 | /// Send values to the associated `Receiver`. |
| 56 | /// | 55 | /// |
| 57 | /// Instances are created by the [`split`](split) function. | 56 | /// Instances are created by the [`split`](split) function. |
| 58 | pub struct Sender<'ch, M, T, const N: usize> | 57 | pub struct Sender<'ch, T> { |
| 59 | where | 58 | channel: &'ch UnsafeCell<dyn ChannelLike<T>>, |
| 60 | M: Mutex<Data = ()>, | ||
| 61 | { | ||
| 62 | channel: *mut Channel<M, T, N>, | ||
| 63 | phantom_data: &'ch PhantomData<T>, | ||
| 64 | } | 59 | } |
| 65 | 60 | ||
| 66 | // Safe to pass the sender around | 61 | // Safe to pass the sender around |
| 67 | unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {} | 62 | unsafe impl<'ch, T> Send for Sender<'ch, T> {} |
| 68 | unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> {} | 63 | unsafe impl<'ch, T> Sync for Sender<'ch, T> {} |
| 69 | 64 | ||
| 70 | /// Receive values from the associated `Sender`. | 65 | /// Receive values from the associated `Sender`. |
| 71 | /// | 66 | /// |
| 72 | /// Instances are created by the [`split`](split) function. | 67 | /// Instances are created by the [`split`](split) function. |
| 73 | pub struct Receiver<'ch, M, T, const N: usize> | 68 | pub struct Receiver<'ch, T> { |
| 74 | where | 69 | channel: &'ch UnsafeCell<dyn ChannelLike<T>>, |
| 75 | M: Mutex<Data = ()>, | ||
| 76 | { | ||
| 77 | channel: *mut Channel<M, T, N>, | ||
| 78 | _phantom_data: &'ch PhantomData<T>, | ||
| 79 | } | 70 | } |
| 80 | 71 | ||
| 81 | // Safe to pass the receiver around | 72 | // Safe to pass the receiver around |
| 82 | unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {} | 73 | unsafe impl<'ch, T> Send for Receiver<'ch, T> {} |
| 83 | unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: Mutex<Data = ()> {} | 74 | unsafe impl<'ch, T> Sync for Receiver<'ch, T> {} |
| 84 | 75 | ||
| 85 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. | 76 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. |
| 86 | /// | 77 | /// |
| @@ -98,37 +89,29 @@ unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where M: | |||
| 98 | /// their channel. The following will therefore fail compilation: | 89 | /// their channel. The following will therefore fail compilation: |
| 99 | //// | 90 | //// |
| 100 | /// ```compile_fail | 91 | /// ```compile_fail |
| 92 | /// use core::cell::UnsafeCell; | ||
| 101 | /// use embassy::util::mpsc; | 93 | /// use embassy::util::mpsc; |
| 102 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; | 94 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; |
| 103 | /// | 95 | /// |
| 104 | /// let (sender, receiver) = { | 96 | /// let (sender, receiver) = { |
| 105 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); | 97 | /// let mut channel = UnsafeCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only()); |
| 106 | /// mpsc::split(&mut channel) | 98 | /// mpsc::split(&channel) |
| 107 | /// }; | 99 | /// }; |
| 108 | /// ``` | 100 | /// ``` |
| 109 | pub fn split<'ch, M, T, const N: usize>( | 101 | pub fn split<'ch, T>( |
| 110 | channel: &'ch mut Channel<M, T, N>, | 102 | channel: &'ch UnsafeCell<dyn ChannelLike<T>>, |
| 111 | ) -> (Sender<'ch, M, T, N>, Receiver<'ch, M, T, N>) | 103 | ) -> (Sender<'ch, T>, Receiver<'ch, T>) { |
| 112 | where | 104 | let sender = Sender { channel: &channel }; |
| 113 | M: Mutex<Data = ()>, | 105 | let receiver = Receiver { channel: &channel }; |
| 114 | { | 106 | { |
| 115 | let sender = Sender { | 107 | let c = unsafe { &mut *channel.get() }; |
| 116 | channel, | 108 | c.register_receiver(); |
| 117 | phantom_data: &PhantomData, | 109 | c.register_sender(); |
| 118 | }; | 110 | } |
| 119 | let receiver = Receiver { | ||
| 120 | channel, | ||
| 121 | _phantom_data: &PhantomData, | ||
| 122 | }; | ||
| 123 | channel.register_receiver(); | ||
| 124 | channel.register_sender(); | ||
| 125 | (sender, receiver) | 111 | (sender, receiver) |
| 126 | } | 112 | } |
| 127 | 113 | ||
| 128 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | 114 | impl<'ch, T> Receiver<'ch, T> { |
| 129 | where | ||
| 130 | M: Mutex<Data = ()>, | ||
| 131 | { | ||
| 132 | /// Receives the next value for this receiver. | 115 | /// Receives the next value for this receiver. |
| 133 | /// | 116 | /// |
| 134 | /// This method returns `None` if the channel has been closed and there are | 117 | /// This method returns `None` if the channel has been closed and there are |
| @@ -154,7 +137,7 @@ where | |||
| 154 | /// This method will either receive a message from the channel immediately or return an error | 137 | /// This method will either receive a message from the channel immediately or return an error |
| 155 | /// if the channel is empty. | 138 | /// if the channel is empty. |
| 156 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | 139 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
| 157 | unsafe { self.channel.as_mut().unwrap().try_recv() } | 140 | unsafe { &mut *self.channel.get() }.try_recv() |
| 158 | } | 141 | } |
| 159 | 142 | ||
| 160 | /// Closes the receiving half of a channel without dropping it. | 143 | /// Closes the receiving half of a channel without dropping it. |
| @@ -168,14 +151,11 @@ where | |||
| 168 | /// until those are released. | 151 | /// until those are released. |
| 169 | /// | 152 | /// |
| 170 | pub fn close(&mut self) { | 153 | pub fn close(&mut self) { |
| 171 | unsafe { self.channel.as_mut().unwrap().close() } | 154 | unsafe { &mut *self.channel.get() }.close() |
| 172 | } | 155 | } |
| 173 | } | 156 | } |
| 174 | 157 | ||
| 175 | impl<'ch, M, T, const N: usize> Future for Receiver<'ch, M, T, N> | 158 | impl<'ch, T> Future for Receiver<'ch, T> { |
| 176 | where | ||
| 177 | M: Mutex<Data = ()>, | ||
| 178 | { | ||
| 179 | type Output = Option<T>; | 159 | type Output = Option<T>; |
| 180 | 160 | ||
| 181 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 161 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| @@ -183,31 +163,20 @@ where | |||
| 183 | Ok(v) => Poll::Ready(Some(v)), | 163 | Ok(v) => Poll::Ready(Some(v)), |
| 184 | Err(TryRecvError::Closed) => Poll::Ready(None), | 164 | Err(TryRecvError::Closed) => Poll::Ready(None), |
| 185 | Err(TryRecvError::Empty) => { | 165 | Err(TryRecvError::Empty) => { |
| 186 | unsafe { | 166 | unsafe { &mut *self.channel.get() }.set_receiver_waker(cx.waker().clone()); |
| 187 | self.channel | ||
| 188 | .as_mut() | ||
| 189 | .unwrap() | ||
| 190 | .set_receiver_waker(cx.waker().clone()); | ||
| 191 | }; | ||
| 192 | Poll::Pending | 167 | Poll::Pending |
| 193 | } | 168 | } |
| 194 | } | 169 | } |
| 195 | } | 170 | } |
| 196 | } | 171 | } |
| 197 | 172 | ||
| 198 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> | 173 | impl<'ch, T> Drop for Receiver<'ch, T> { |
| 199 | where | ||
| 200 | M: Mutex<Data = ()>, | ||
| 201 | { | ||
| 202 | fn drop(&mut self) { | 174 | fn drop(&mut self) { |
| 203 | unsafe { self.channel.as_mut().unwrap().deregister_receiver() } | 175 | unsafe { &mut *self.channel.get() }.deregister_receiver() |
| 204 | } | 176 | } |
| 205 | } | 177 | } |
| 206 | 178 | ||
| 207 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | 179 | impl<'ch, T> Sender<'ch, T> { |
| 208 | where | ||
| 209 | M: Mutex<Data = ()>, | ||
| 210 | { | ||
| 211 | /// Sends a value, waiting until there is capacity. | 180 | /// Sends a value, waiting until there is capacity. |
| 212 | /// | 181 | /// |
| 213 | /// A successful send occurs when it is determined that the other end of the | 182 | /// A successful send occurs when it is determined that the other end of the |
| @@ -255,7 +224,7 @@ where | |||
| 255 | /// [`channel`]: channel | 224 | /// [`channel`]: channel |
| 256 | /// [`close`]: Receiver::close | 225 | /// [`close`]: Receiver::close |
| 257 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | 226 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
| 258 | unsafe { self.channel.as_mut().unwrap().try_send(message) } | 227 | unsafe { &mut *self.channel.get() }.try_send(message) |
| 259 | } | 228 | } |
| 260 | 229 | ||
| 261 | /// Completes when the receiver has dropped. | 230 | /// Completes when the receiver has dropped. |
| @@ -276,22 +245,16 @@ where | |||
| 276 | /// [`Receiver`]: crate::sync::mpsc::Receiver | 245 | /// [`Receiver`]: crate::sync::mpsc::Receiver |
| 277 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close | 246 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close |
| 278 | pub fn is_closed(&self) -> bool { | 247 | pub fn is_closed(&self) -> bool { |
| 279 | unsafe { self.channel.as_mut().unwrap().is_closed() } | 248 | unsafe { &mut *self.channel.get() }.is_closed() |
| 280 | } | 249 | } |
| 281 | } | 250 | } |
| 282 | 251 | ||
| 283 | struct SendFuture<'ch, M, T, const N: usize> | 252 | struct SendFuture<'ch, T> { |
| 284 | where | 253 | sender: Sender<'ch, T>, |
| 285 | M: Mutex<Data = ()>, | ||
| 286 | { | ||
| 287 | sender: Sender<'ch, M, T, N>, | ||
| 288 | message: UnsafeCell<T>, | 254 | message: UnsafeCell<T>, |
| 289 | } | 255 | } |
| 290 | 256 | ||
| 291 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | 257 | impl<'ch, T> Future for SendFuture<'ch, T> { |
| 292 | where | ||
| 293 | M: Mutex<Data = ()>, | ||
| 294 | { | ||
| 295 | type Output = Result<(), SendError<T>>; | 258 | type Output = Result<(), SendError<T>>; |
| 296 | 259 | ||
| 297 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 260 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| @@ -299,13 +262,7 @@ where | |||
| 299 | Ok(..) => Poll::Ready(Ok(())), | 262 | Ok(..) => Poll::Ready(Ok(())), |
| 300 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), | 263 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), |
| 301 | Err(TrySendError::Full(..)) => { | 264 | Err(TrySendError::Full(..)) => { |
| 302 | unsafe { | 265 | unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone()); |
| 303 | self.sender | ||
| 304 | .channel | ||
| 305 | .as_mut() | ||
| 306 | .unwrap() | ||
| 307 | .set_senders_waker(cx.waker().clone()); | ||
| 308 | }; | ||
| 309 | Poll::Pending | 266 | Poll::Pending |
| 310 | // Note we leave the existing UnsafeCell contents - they still | 267 | // Note we leave the existing UnsafeCell contents - they still |
| 311 | // contain the original message. We could create another UnsafeCell | 268 | // contain the original message. We could create another UnsafeCell |
| @@ -315,53 +272,34 @@ where | |||
| 315 | } | 272 | } |
| 316 | } | 273 | } |
| 317 | 274 | ||
| 318 | struct CloseFuture<'ch, M, T, const N: usize> | 275 | struct CloseFuture<'ch, T> { |
| 319 | where | 276 | sender: Sender<'ch, T>, |
| 320 | M: Mutex<Data = ()>, | ||
| 321 | { | ||
| 322 | sender: Sender<'ch, M, T, N>, | ||
| 323 | } | 277 | } |
| 324 | 278 | ||
| 325 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> | 279 | impl<'ch, T> Future for CloseFuture<'ch, T> { |
| 326 | where | ||
| 327 | M: Mutex<Data = ()>, | ||
| 328 | { | ||
| 329 | type Output = (); | 280 | type Output = (); |
| 330 | 281 | ||
| 331 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 282 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 332 | if self.sender.is_closed() { | 283 | if self.sender.is_closed() { |
| 333 | Poll::Ready(()) | 284 | Poll::Ready(()) |
| 334 | } else { | 285 | } else { |
| 335 | unsafe { | 286 | unsafe { &mut *self.sender.channel.get() }.set_senders_waker(cx.waker().clone()); |
| 336 | self.sender | ||
| 337 | .channel | ||
| 338 | .as_mut() | ||
| 339 | .unwrap() | ||
| 340 | .set_senders_waker(cx.waker().clone()); | ||
| 341 | }; | ||
| 342 | Poll::Pending | 287 | Poll::Pending |
| 343 | } | 288 | } |
| 344 | } | 289 | } |
| 345 | } | 290 | } |
| 346 | 291 | ||
| 347 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> | 292 | impl<'ch, T> Drop for Sender<'ch, T> { |
| 348 | where | ||
| 349 | M: Mutex<Data = ()>, | ||
| 350 | { | ||
| 351 | fn drop(&mut self) { | 293 | fn drop(&mut self) { |
| 352 | unsafe { self.channel.as_mut().unwrap().deregister_sender() } | 294 | unsafe { &mut *self.channel.get() }.deregister_sender() |
| 353 | } | 295 | } |
| 354 | } | 296 | } |
| 355 | 297 | ||
| 356 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | 298 | impl<'ch, T> Clone for Sender<'ch, T> { |
| 357 | where | ||
| 358 | M: Mutex<Data = ()>, | ||
| 359 | { | ||
| 360 | fn clone(&self) -> Self { | 299 | fn clone(&self) -> Self { |
| 361 | unsafe { self.channel.as_mut().unwrap().register_sender() }; | 300 | unsafe { &mut *self.channel.get() }.register_sender(); |
| 362 | Sender { | 301 | Sender { |
| 363 | channel: self.channel, | 302 | channel: self.channel.clone(), |
| 364 | phantom_data: self.phantom_data, | ||
| 365 | } | 303 | } |
| 366 | } | 304 | } |
| 367 | } | 305 | } |
| @@ -414,6 +352,28 @@ impl<T> fmt::Display for TrySendError<T> { | |||
| 414 | } | 352 | } |
| 415 | } | 353 | } |
| 416 | 354 | ||
| 355 | pub trait ChannelLike<T> { | ||
| 356 | fn try_recv(&mut self) -> Result<T, TryRecvError>; | ||
| 357 | |||
| 358 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>>; | ||
| 359 | |||
| 360 | fn close(&mut self); | ||
| 361 | |||
| 362 | fn is_closed(&mut self) -> bool; | ||
| 363 | |||
| 364 | fn register_receiver(&mut self); | ||
| 365 | |||
| 366 | fn deregister_receiver(&mut self); | ||
| 367 | |||
| 368 | fn register_sender(&mut self); | ||
| 369 | |||
| 370 | fn deregister_sender(&mut self); | ||
| 371 | |||
| 372 | fn set_receiver_waker(&mut self, receiver_waker: Waker); | ||
| 373 | |||
| 374 | fn set_senders_waker(&mut self, senders_waker: Waker); | ||
| 375 | } | ||
| 376 | |||
| 417 | pub struct ChannelState<T, const N: usize> { | 377 | pub struct ChannelState<T, const N: usize> { |
| 418 | buf: [MaybeUninit<UnsafeCell<T>>; N], | 378 | buf: [MaybeUninit<UnsafeCell<T>>; N], |
| 419 | read_pos: usize, | 379 | read_pos: usize, |
| @@ -480,13 +440,14 @@ impl<T, const N: usize> Channel<WithCriticalSections, T, N> { | |||
| 480 | /// from exception mode e.g. interrupt handlers. To create one: | 440 | /// from exception mode e.g. interrupt handlers. To create one: |
| 481 | /// | 441 | /// |
| 482 | /// ``` | 442 | /// ``` |
| 443 | /// use core::cell::UnsafeCell; | ||
| 483 | /// use embassy::util::mpsc; | 444 | /// use embassy::util::mpsc; |
| 484 | /// use embassy::util::mpsc::{Channel, WithCriticalSections}; | 445 | /// use embassy::util::mpsc::{Channel, WithCriticalSections}; |
| 485 | /// | 446 | /// |
| 486 | /// // Declare a bounded channel of 3 u32s. | 447 | /// // Declare a bounded channel of 3 u32s. |
| 487 | /// let mut channel = mpsc::Channel::<WithCriticalSections, u32, 3>::with_critical_sections(); | 448 | /// let mut channel = UnsafeCell::new(mpsc::Channel::<WithCriticalSections, u32, 3>::with_critical_sections()); |
| 488 | /// // once we have a channel, obtain its sender and receiver | 449 | /// // once we have a channel, obtain its sender and receiver |
| 489 | /// let (sender, receiver) = mpsc::split(&mut channel); | 450 | /// let (sender, receiver) = mpsc::split(&channel); |
| 490 | /// ``` | 451 | /// ``` |
| 491 | pub const fn with_critical_sections() -> Self { | 452 | pub const fn with_critical_sections() -> Self { |
| 492 | let mutex = CriticalSectionMutex::new(()); | 453 | let mutex = CriticalSectionMutex::new(()); |
| @@ -504,13 +465,14 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> { | |||
| 504 | /// channel avoids all locks. To create one: | 465 | /// channel avoids all locks. To create one: |
| 505 | /// | 466 | /// |
| 506 | /// ``` no_run | 467 | /// ``` no_run |
| 468 | /// use core::cell::UnsafeCell; | ||
| 507 | /// use embassy::util::mpsc; | 469 | /// use embassy::util::mpsc; |
| 508 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; | 470 | /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; |
| 509 | /// | 471 | /// |
| 510 | /// // Declare a bounded channel of 3 u32s. | 472 | /// // Declare a bounded channel of 3 u32s. |
| 511 | /// let mut channel = Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only(); | 473 | /// let mut channel = UnsafeCell::new(Channel::<WithThreadModeOnly, u32, 3>::with_thread_mode_only()); |
| 512 | /// // once we have a channel, obtain its sender and receiver | 474 | /// // once we have a channel, obtain its sender and receiver |
| 513 | /// let (sender, receiver) = mpsc::split(&mut channel); | 475 | /// let (sender, receiver) = mpsc::split(&channel); |
| 514 | /// ``` | 476 | /// ``` |
| 515 | pub const fn with_thread_mode_only() -> Self { | 477 | pub const fn with_thread_mode_only() -> Self { |
| 516 | let mutex = ThreadModeMutex::new(()); | 478 | let mutex = ThreadModeMutex::new(()); |
| @@ -519,7 +481,7 @@ impl<T, const N: usize> Channel<WithThreadModeOnly, T, N> { | |||
| 519 | } | 481 | } |
| 520 | } | 482 | } |
| 521 | 483 | ||
| 522 | impl<M, T, const N: usize> Channel<M, T, N> | 484 | impl<M, T, const N: usize> ChannelLike<T> for Channel<M, T, N> |
| 523 | where | 485 | where |
| 524 | M: Mutex<Data = ()>, | 486 | M: Mutex<Data = ()>, |
| 525 | { | 487 | { |
| @@ -771,16 +733,16 @@ mod tests { | |||
| 771 | 733 | ||
| 772 | #[test] | 734 | #[test] |
| 773 | fn simple_send_and_receive() { | 735 | fn simple_send_and_receive() { |
| 774 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 736 | let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); |
| 775 | let (s, r) = split(&mut c); | 737 | let (s, r) = split(&c); |
| 776 | assert!(s.clone().try_send(1).is_ok()); | 738 | assert!(s.clone().try_send(1).is_ok()); |
| 777 | assert_eq!(r.try_recv().unwrap(), 1); | 739 | assert_eq!(r.try_recv().unwrap(), 1); |
| 778 | } | 740 | } |
| 779 | 741 | ||
| 780 | #[test] | 742 | #[test] |
| 781 | fn should_close_without_sender() { | 743 | fn should_close_without_sender() { |
| 782 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 744 | let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); |
| 783 | let (s, r) = split(&mut c); | 745 | let (s, r) = split(&c); |
| 784 | drop(s); | 746 | drop(s); |
| 785 | match r.try_recv() { | 747 | match r.try_recv() { |
| 786 | Err(TryRecvError::Closed) => assert!(true), | 748 | Err(TryRecvError::Closed) => assert!(true), |
| @@ -790,8 +752,8 @@ mod tests { | |||
| 790 | 752 | ||
| 791 | #[test] | 753 | #[test] |
| 792 | fn should_close_once_drained() { | 754 | fn should_close_once_drained() { |
| 793 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 755 | let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); |
| 794 | let (s, r) = split(&mut c); | 756 | let (s, r) = split(&c); |
| 795 | assert!(s.try_send(1).is_ok()); | 757 | assert!(s.try_send(1).is_ok()); |
| 796 | drop(s); | 758 | drop(s); |
| 797 | assert_eq!(r.try_recv().unwrap(), 1); | 759 | assert_eq!(r.try_recv().unwrap(), 1); |
| @@ -803,8 +765,8 @@ mod tests { | |||
| 803 | 765 | ||
| 804 | #[test] | 766 | #[test] |
| 805 | fn should_reject_send_when_receiver_dropped() { | 767 | fn should_reject_send_when_receiver_dropped() { |
| 806 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 768 | let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); |
| 807 | let (s, r) = split(&mut c); | 769 | let (s, r) = split(&c); |
| 808 | drop(r); | 770 | drop(r); |
| 809 | match s.try_send(1) { | 771 | match s.try_send(1) { |
| 810 | Err(TrySendError::Closed(1)) => assert!(true), | 772 | Err(TrySendError::Closed(1)) => assert!(true), |
| @@ -814,8 +776,8 @@ mod tests { | |||
| 814 | 776 | ||
| 815 | #[test] | 777 | #[test] |
| 816 | fn should_reject_send_when_channel_closed() { | 778 | fn should_reject_send_when_channel_closed() { |
| 817 | let mut c = Channel::<WithNoThreads, u32, 3>::with_no_threads(); | 779 | let c = UnsafeCell::new(Channel::<WithNoThreads, u32, 3>::with_no_threads()); |
| 818 | let (s, mut r) = split(&mut c); | 780 | let (s, mut r) = split(&c); |
| 819 | assert!(s.try_send(1).is_ok()); | 781 | assert!(s.try_send(1).is_ok()); |
| 820 | r.close(); | 782 | r.close(); |
| 821 | assert_eq!(r.try_recv().unwrap(), 1); | 783 | assert_eq!(r.try_recv().unwrap(), 1); |
| @@ -830,9 +792,9 @@ mod tests { | |||
| 830 | async fn receiver_closes_when_sender_dropped_async() { | 792 | async fn receiver_closes_when_sender_dropped_async() { |
| 831 | let executor = ThreadPool::new().unwrap(); | 793 | let executor = ThreadPool::new().unwrap(); |
| 832 | 794 | ||
| 833 | static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = | 795 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 3>> = |
| 834 | Channel::with_critical_sections(); | 796 | UnsafeCell::new(Channel::with_critical_sections()); |
| 835 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 797 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 836 | assert!(executor | 798 | assert!(executor |
| 837 | .spawn(async move { | 799 | .spawn(async move { |
| 838 | drop(s); | 800 | drop(s); |
| @@ -845,12 +807,12 @@ mod tests { | |||
| 845 | async fn receiver_receives_given_try_send_async() { | 807 | async fn receiver_receives_given_try_send_async() { |
| 846 | let executor = ThreadPool::new().unwrap(); | 808 | let executor = ThreadPool::new().unwrap(); |
| 847 | 809 | ||
| 848 | static mut CHANNEL: Channel<WithCriticalSections, u32, 3> = | 810 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 3>> = |
| 849 | Channel::with_critical_sections(); | 811 | UnsafeCell::new(Channel::with_critical_sections()); |
| 850 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 812 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 851 | assert!(executor | 813 | assert!(executor |
| 852 | .spawn(async move { | 814 | .spawn(async move { |
| 853 | let _ = s.try_send(1); | 815 | assert!(s.try_send(1).is_ok()); |
| 854 | }) | 816 | }) |
| 855 | .is_ok()); | 817 | .is_ok()); |
| 856 | assert_eq!(r.recv().await, Some(1)); | 818 | assert_eq!(r.recv().await, Some(1)); |
| @@ -858,18 +820,18 @@ mod tests { | |||
| 858 | 820 | ||
| 859 | #[futures_test::test] | 821 | #[futures_test::test] |
| 860 | async fn sender_send_completes_if_capacity() { | 822 | async fn sender_send_completes_if_capacity() { |
| 861 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 823 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> = |
| 862 | Channel::with_critical_sections(); | 824 | UnsafeCell::new(Channel::with_critical_sections()); |
| 863 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 825 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 864 | assert!(s.send(1).await.is_ok()); | 826 | assert!(s.send(1).await.is_ok()); |
| 865 | assert_eq!(r.recv().await, Some(1)); | 827 | assert_eq!(r.recv().await, Some(1)); |
| 866 | } | 828 | } |
| 867 | 829 | ||
| 868 | #[futures_test::test] | 830 | #[futures_test::test] |
| 869 | async fn sender_send_completes_if_closed() { | 831 | async fn sender_send_completes_if_closed() { |
| 870 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 832 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> = |
| 871 | Channel::with_critical_sections(); | 833 | UnsafeCell::new(Channel::with_critical_sections()); |
| 872 | let (s, r) = split(unsafe { &mut CHANNEL }); | 834 | let (s, r) = split(unsafe { &CHANNEL }); |
| 873 | drop(r); | 835 | drop(r); |
| 874 | match s.send(1).await { | 836 | match s.send(1).await { |
| 875 | Err(SendError(1)) => assert!(true), | 837 | Err(SendError(1)) => assert!(true), |
| @@ -881,9 +843,9 @@ mod tests { | |||
| 881 | async fn senders_sends_wait_until_capacity() { | 843 | async fn senders_sends_wait_until_capacity() { |
| 882 | let executor = ThreadPool::new().unwrap(); | 844 | let executor = ThreadPool::new().unwrap(); |
| 883 | 845 | ||
| 884 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 846 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> = |
| 885 | Channel::with_critical_sections(); | 847 | UnsafeCell::new(Channel::with_critical_sections()); |
| 886 | let (s0, mut r) = split(unsafe { &mut CHANNEL }); | 848 | let (s0, mut r) = split(unsafe { &CHANNEL }); |
| 887 | assert!(s0.try_send(1).is_ok()); | 849 | assert!(s0.try_send(1).is_ok()); |
| 888 | let s1 = s0.clone(); | 850 | let s1 = s0.clone(); |
| 889 | let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); | 851 | let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); |
| @@ -901,18 +863,18 @@ mod tests { | |||
| 901 | 863 | ||
| 902 | #[futures_test::test] | 864 | #[futures_test::test] |
| 903 | async fn sender_close_completes_if_closing() { | 865 | async fn sender_close_completes_if_closing() { |
| 904 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 866 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> = |
| 905 | Channel::with_critical_sections(); | 867 | UnsafeCell::new(Channel::with_critical_sections()); |
| 906 | let (s, mut r) = split(unsafe { &mut CHANNEL }); | 868 | let (s, mut r) = split(unsafe { &CHANNEL }); |
| 907 | r.close(); | 869 | r.close(); |
| 908 | s.closed().await; | 870 | s.closed().await; |
| 909 | } | 871 | } |
| 910 | 872 | ||
| 911 | #[futures_test::test] | 873 | #[futures_test::test] |
| 912 | async fn sender_close_completes_if_closed() { | 874 | async fn sender_close_completes_if_closed() { |
| 913 | static mut CHANNEL: Channel<WithCriticalSections, u32, 1> = | 875 | static mut CHANNEL: UnsafeCell<Channel<WithCriticalSections, u32, 1>> = |
| 914 | Channel::with_critical_sections(); | 876 | UnsafeCell::new(Channel::with_critical_sections()); |
| 915 | let (s, r) = split(unsafe { &mut CHANNEL }); | 877 | let (s, r) = split(unsafe { &CHANNEL }); |
| 916 | drop(r); | 878 | drop(r); |
| 917 | s.closed().await; | 879 | s.closed().await; |
| 918 | } | 880 | } |
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 6a0f8f471..d692abee2 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs | |||
| @@ -8,6 +8,8 @@ | |||
| 8 | #[path = "../example_common.rs"] | 8 | #[path = "../example_common.rs"] |
| 9 | mod example_common; | 9 | mod example_common; |
| 10 | 10 | ||
| 11 | use core::cell::UnsafeCell; | ||
| 12 | |||
| 11 | use defmt::panic; | 13 | use defmt::panic; |
| 12 | use embassy::executor::Spawner; | 14 | use embassy::executor::Spawner; |
| 13 | use embassy::time::{Duration, Timer}; | 15 | use embassy::time::{Duration, Timer}; |
| @@ -23,10 +25,10 @@ enum LedState { | |||
| 23 | Off, | 25 | Off, |
| 24 | } | 26 | } |
| 25 | 27 | ||
| 26 | static CHANNEL: Forever<Channel<WithThreadModeOnly, LedState, 1>> = Forever::new(); | 28 | static CHANNEL: Forever<UnsafeCell<Channel<WithThreadModeOnly, LedState, 1>>> = Forever::new(); |
| 27 | 29 | ||
| 28 | #[embassy::task(pool_size = 1)] | 30 | #[embassy::task(pool_size = 1)] |
| 29 | async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { | 31 | async fn my_task(sender: Sender<'static, LedState>) { |
| 30 | loop { | 32 | loop { |
| 31 | let _ = sender.send(LedState::On).await; | 33 | let _ = sender.send(LedState::On).await; |
| 32 | Timer::after(Duration::from_secs(1)).await; | 34 | Timer::after(Duration::from_secs(1)).await; |
| @@ -39,7 +41,7 @@ async fn my_task(sender: Sender<'static, WithThreadModeOnly, LedState, 1>) { | |||
| 39 | async fn main(spawner: Spawner, p: Peripherals) { | 41 | async fn main(spawner: Spawner, p: Peripherals) { |
| 40 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); | 42 | let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); |
| 41 | 43 | ||
| 42 | let channel = CHANNEL.put(Channel::with_thread_mode_only()); | 44 | let channel = CHANNEL.put(UnsafeCell::new(Channel::with_thread_mode_only())); |
| 43 | let (sender, mut receiver) = mpsc::split(channel); | 45 | let (sender, mut receiver) = mpsc::split(channel); |
| 44 | 46 | ||
| 45 | spawner.spawn(my_task(sender)).unwrap(); | 47 | spawner.spawn(my_task(sender)).unwrap(); |
