diff options
| author | Dion Dokter <[email protected]> | 2025-11-20 13:22:38 +0100 |
|---|---|---|
| committer | Dion Dokter <[email protected]> | 2025-11-20 13:22:38 +0100 |
| commit | 4f2c36e447455e8d33607d586859d3d075cabf1d (patch) | |
| tree | 003cd822d688acd7c074dd229663b4648d100f71 /embassy-sync/src | |
| parent | 663732d85abbae400f2dbab2c411802a5b60e9b1 (diff) | |
| parent | 661874d11de7d93ed52e08e020a9d4c7ee11122d (diff) | |
Merge branch 'main' into u0-lcd
Diffstat (limited to 'embassy-sync/src')
23 files changed, 2924 insertions, 135 deletions
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs index 8a4a4c642..62bfc26fb 100644 --- a/embassy-sync/src/blocking_mutex/mod.rs +++ b/embassy-sync/src/blocking_mutex/mod.rs | |||
| @@ -22,6 +22,7 @@ use self::raw::RawMutex; | |||
| 22 | /// | 22 | /// |
| 23 | /// In all cases, the blocking mutex is intended to be short lived and not held across await points. | 23 | /// In all cases, the blocking mutex is intended to be short lived and not held across await points. |
| 24 | /// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. | 24 | /// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. |
| 25 | #[derive(Debug)] | ||
| 25 | pub struct Mutex<R, T: ?Sized> { | 26 | pub struct Mutex<R, T: ?Sized> { |
| 26 | // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets | 27 | // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets |
| 27 | // to run BEFORE dropping `data`. | 28 | // to run BEFORE dropping `data`. |
| @@ -50,6 +51,23 @@ impl<R: RawMutex, T> Mutex<R, T> { | |||
| 50 | f(inner) | 51 | f(inner) |
| 51 | }) | 52 | }) |
| 52 | } | 53 | } |
| 54 | |||
| 55 | /// Creates a critical section and grants temporary mutable access to the protected data. | ||
| 56 | /// | ||
| 57 | /// # Safety | ||
| 58 | /// | ||
| 59 | /// This method is marked unsafe because calling this method re-entrantly, i.e. within | ||
| 60 | /// another `lock_mut` or `lock` closure, violates Rust's aliasing rules. Calling this | ||
| 61 | /// method at the same time from different tasks is safe. For a safe alternative with | ||
| 62 | /// mutable access that never causes UB, use a `RefCell` in a `Mutex`. | ||
| 63 | pub unsafe fn lock_mut<U>(&self, f: impl FnOnce(&mut T) -> U) -> U { | ||
| 64 | self.raw.lock(|| { | ||
| 65 | let ptr = self.data.get() as *mut T; | ||
| 66 | // Safety: we have exclusive access to the data, as long as this mutex is not locked re-entrantly | ||
| 67 | let inner = unsafe { &mut *ptr }; | ||
| 68 | f(inner) | ||
| 69 | }) | ||
| 70 | } | ||
| 53 | } | 71 | } |
| 54 | 72 | ||
| 55 | impl<R, T> Mutex<R, T> { | 73 | impl<R, T> Mutex<R, T> { |
| @@ -104,6 +122,7 @@ impl<T> Mutex<raw::CriticalSectionRawMutex, T> { | |||
| 104 | 122 | ||
| 105 | impl<T> Mutex<raw::NoopRawMutex, T> { | 123 | impl<T> Mutex<raw::NoopRawMutex, T> { |
| 106 | /// Borrows the data | 124 | /// Borrows the data |
| 125 | #[allow(clippy::should_implement_trait)] | ||
| 107 | pub fn borrow(&self) -> &T { | 126 | pub fn borrow(&self) -> &T { |
| 108 | let ptr = self.data.get() as *const T; | 127 | let ptr = self.data.get() as *const T; |
| 109 | unsafe { &*ptr } | 128 | unsafe { &*ptr } |
| @@ -116,9 +135,9 @@ impl<T> Mutex<raw::NoopRawMutex, T> { | |||
| 116 | // There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), | 135 | // There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), |
| 117 | // but that will require T: Send even though it shouldn't be needed. | 136 | // but that will require T: Send even though it shouldn't be needed. |
| 118 | 137 | ||
| 119 | #[cfg(any(cortex_m, feature = "std"))] | 138 | #[cfg(any(cortex_m, doc, feature = "std"))] |
| 120 | pub use thread_mode_mutex::*; | 139 | pub use thread_mode_mutex::*; |
| 121 | #[cfg(any(cortex_m, feature = "std"))] | 140 | #[cfg(any(cortex_m, doc, feature = "std"))] |
| 122 | mod thread_mode_mutex { | 141 | mod thread_mode_mutex { |
| 123 | use super::*; | 142 | use super::*; |
| 124 | 143 | ||
diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs index a8afcad34..fbb9ece15 100644 --- a/embassy-sync/src/blocking_mutex/raw.rs +++ b/embassy-sync/src/blocking_mutex/raw.rs | |||
| @@ -37,6 +37,7 @@ pub unsafe trait RawMutex { | |||
| 37 | /// # Safety | 37 | /// # Safety |
| 38 | /// | 38 | /// |
| 39 | /// This mutex is safe to share between different executors and interrupts. | 39 | /// This mutex is safe to share between different executors and interrupts. |
| 40 | #[derive(Debug)] | ||
| 40 | pub struct CriticalSectionRawMutex { | 41 | pub struct CriticalSectionRawMutex { |
| 41 | _phantom: PhantomData<()>, | 42 | _phantom: PhantomData<()>, |
| 42 | } | 43 | } |
| @@ -65,6 +66,7 @@ unsafe impl RawMutex for CriticalSectionRawMutex { | |||
| 65 | /// # Safety | 66 | /// # Safety |
| 66 | /// | 67 | /// |
| 67 | /// **This Mutex is only safe within a single executor.** | 68 | /// **This Mutex is only safe within a single executor.** |
| 69 | #[derive(Debug)] | ||
| 68 | pub struct NoopRawMutex { | 70 | pub struct NoopRawMutex { |
| 69 | _phantom: PhantomData<*mut ()>, | 71 | _phantom: PhantomData<*mut ()>, |
| 70 | } | 72 | } |
| @@ -87,7 +89,7 @@ unsafe impl RawMutex for NoopRawMutex { | |||
| 87 | 89 | ||
| 88 | // ================ | 90 | // ================ |
| 89 | 91 | ||
| 90 | #[cfg(any(cortex_m, feature = "std"))] | 92 | #[cfg(any(cortex_m, doc, feature = "std"))] |
| 91 | mod thread_mode { | 93 | mod thread_mode { |
| 92 | use super::*; | 94 | use super::*; |
| 93 | 95 | ||
| @@ -145,5 +147,5 @@ mod thread_mode { | |||
| 145 | return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; | 147 | return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; |
| 146 | } | 148 | } |
| 147 | } | 149 | } |
| 148 | #[cfg(any(cortex_m, feature = "std"))] | 150 | #[cfg(any(cortex_m, doc, feature = "std"))] |
| 149 | pub use thread_mode::*; | 151 | pub use thread_mode::*; |
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 55ac5fb66..dbd24a6c7 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -17,6 +17,31 @@ | |||
| 17 | //! messages that it can store, and if this limit is reached, trying to send | 17 | //! messages that it can store, and if this limit is reached, trying to send |
| 18 | //! another message will result in an error being returned. | 18 | //! another message will result in an error being returned. |
| 19 | //! | 19 | //! |
| 20 | //! # Example: Message passing between task and interrupt handler | ||
| 21 | //! | ||
| 22 | //! ```rust | ||
| 23 | //! use embassy_sync::channel::Channel; | ||
| 24 | //! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 25 | //! | ||
| 26 | //! static SHARED_CHANNEL: Channel<CriticalSectionRawMutex, u32, 8> = Channel::new(); | ||
| 27 | //! | ||
| 28 | //! fn my_interrupt_handler() { | ||
| 29 | //! // Do some work.. | ||
| 30 | //! // ... | ||
| 31 | //! if let Err(e) = SHARED_CHANNEL.sender().try_send(42) { | ||
| 32 | //! // Channel is full.. | ||
| 33 | //! } | ||
| 34 | //! } | ||
| 35 | //! | ||
| 36 | //! async fn my_async_task() { | ||
| 37 | //! // ... | ||
| 38 | //! let receiver = SHARED_CHANNEL.receiver(); | ||
| 39 | //! loop { | ||
| 40 | //! let data_from_interrupt = receiver.receive().await; | ||
| 41 | //! // Do something with the data. | ||
| 42 | //! } | ||
| 43 | //! } | ||
| 44 | //! ``` | ||
| 20 | 45 | ||
| 21 | use core::cell::RefCell; | 46 | use core::cell::RefCell; |
| 22 | use core::future::Future; | 47 | use core::future::Future; |
| @@ -25,11 +50,12 @@ use core::task::{Context, Poll}; | |||
| 25 | 50 | ||
| 26 | use heapless::Deque; | 51 | use heapless::Deque; |
| 27 | 52 | ||
| 28 | use crate::blocking_mutex::raw::RawMutex; | ||
| 29 | use crate::blocking_mutex::Mutex; | 53 | use crate::blocking_mutex::Mutex; |
| 54 | use crate::blocking_mutex::raw::RawMutex; | ||
| 30 | use crate::waitqueue::WakerRegistration; | 55 | use crate::waitqueue::WakerRegistration; |
| 31 | 56 | ||
| 32 | /// Send-only access to a [`Channel`]. | 57 | /// Send-only access to a [`Channel`]. |
| 58 | #[derive(Debug)] | ||
| 33 | pub struct Sender<'ch, M, T, const N: usize> | 59 | pub struct Sender<'ch, M, T, const N: usize> |
| 34 | where | 60 | where |
| 35 | M: RawMutex, | 61 | M: RawMutex, |
| @@ -72,6 +98,48 @@ where | |||
| 72 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 98 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 73 | self.channel.poll_ready_to_send(cx) | 99 | self.channel.poll_ready_to_send(cx) |
| 74 | } | 100 | } |
| 101 | |||
| 102 | /// Returns the maximum number of elements the channel can hold. | ||
| 103 | /// | ||
| 104 | /// See [`Channel::capacity()`] | ||
| 105 | pub const fn capacity(&self) -> usize { | ||
| 106 | self.channel.capacity() | ||
| 107 | } | ||
| 108 | |||
| 109 | /// Returns the free capacity of the channel. | ||
| 110 | /// | ||
| 111 | /// See [`Channel::free_capacity()`] | ||
| 112 | pub fn free_capacity(&self) -> usize { | ||
| 113 | self.channel.free_capacity() | ||
| 114 | } | ||
| 115 | |||
| 116 | /// Clears all elements in the channel. | ||
| 117 | /// | ||
| 118 | /// See [`Channel::clear()`] | ||
| 119 | pub fn clear(&self) { | ||
| 120 | self.channel.clear(); | ||
| 121 | } | ||
| 122 | |||
| 123 | /// Returns the number of elements currently in the channel. | ||
| 124 | /// | ||
| 125 | /// See [`Channel::len()`] | ||
| 126 | pub fn len(&self) -> usize { | ||
| 127 | self.channel.len() | ||
| 128 | } | ||
| 129 | |||
| 130 | /// Returns whether the channel is empty. | ||
| 131 | /// | ||
| 132 | /// See [`Channel::is_empty()`] | ||
| 133 | pub fn is_empty(&self) -> bool { | ||
| 134 | self.channel.is_empty() | ||
| 135 | } | ||
| 136 | |||
| 137 | /// Returns whether the channel is full. | ||
| 138 | /// | ||
| 139 | /// See [`Channel::is_full()`] | ||
| 140 | pub fn is_full(&self) -> bool { | ||
| 141 | self.channel.is_full() | ||
| 142 | } | ||
| 75 | } | 143 | } |
| 76 | 144 | ||
| 77 | /// Send-only access to a [`Channel`] without knowing channel size. | 145 | /// Send-only access to a [`Channel`] without knowing channel size. |
| @@ -122,7 +190,59 @@ impl<'ch, T> DynamicSender<'ch, T> { | |||
| 122 | } | 190 | } |
| 123 | } | 191 | } |
| 124 | 192 | ||
| 193 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 194 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 195 | pub struct SendDynamicSender<'ch, T> { | ||
| 196 | pub(crate) channel: &'ch dyn DynamicChannel<T>, | ||
| 197 | } | ||
| 198 | |||
| 199 | impl<'ch, T> Clone for SendDynamicSender<'ch, T> { | ||
| 200 | fn clone(&self) -> Self { | ||
| 201 | *self | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | impl<'ch, T> Copy for SendDynamicSender<'ch, T> {} | ||
| 206 | unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {} | ||
| 207 | unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {} | ||
| 208 | |||
| 209 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T> | ||
| 210 | where | ||
| 211 | M: RawMutex + Sync + Send, | ||
| 212 | { | ||
| 213 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 214 | Self { channel: s.channel } | ||
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 218 | impl<'ch, T> SendDynamicSender<'ch, T> { | ||
| 219 | /// Sends a value. | ||
| 220 | /// | ||
| 221 | /// See [`Channel::send()`] | ||
| 222 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 223 | DynamicSendFuture { | ||
| 224 | channel: self.channel, | ||
| 225 | message: Some(message), | ||
| 226 | } | ||
| 227 | } | ||
| 228 | |||
| 229 | /// Attempt to immediately send a message. | ||
| 230 | /// | ||
| 231 | /// See [`Channel::send()`] | ||
| 232 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 233 | self.channel.try_send_with_context(message, None) | ||
| 234 | } | ||
| 235 | |||
| 236 | /// Allows a poll_fn to poll until the channel is ready to send | ||
| 237 | /// | ||
| 238 | /// See [`Channel::poll_ready_to_send()`] | ||
| 239 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 240 | self.channel.poll_ready_to_send(cx) | ||
| 241 | } | ||
| 242 | } | ||
| 243 | |||
| 125 | /// Receive-only access to a [`Channel`]. | 244 | /// Receive-only access to a [`Channel`]. |
| 245 | #[derive(Debug)] | ||
| 126 | pub struct Receiver<'ch, M, T, const N: usize> | 246 | pub struct Receiver<'ch, M, T, const N: usize> |
| 127 | where | 247 | where |
| 128 | M: RawMutex, | 248 | M: RawMutex, |
| @@ -166,6 +286,16 @@ where | |||
| 166 | self.channel.try_receive() | 286 | self.channel.try_receive() |
| 167 | } | 287 | } |
| 168 | 288 | ||
| 289 | /// Peek at the next value without removing it from the queue. | ||
| 290 | /// | ||
| 291 | /// See [`Channel::try_peek()`] | ||
| 292 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 293 | where | ||
| 294 | T: Clone, | ||
| 295 | { | ||
| 296 | self.channel.try_peek() | ||
| 297 | } | ||
| 298 | |||
| 169 | /// Allows a poll_fn to poll until the channel is ready to receive | 299 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 170 | /// | 300 | /// |
| 171 | /// See [`Channel::poll_ready_to_receive()`] | 301 | /// See [`Channel::poll_ready_to_receive()`] |
| @@ -179,6 +309,48 @@ where | |||
| 179 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 309 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 180 | self.channel.poll_receive(cx) | 310 | self.channel.poll_receive(cx) |
| 181 | } | 311 | } |
| 312 | |||
| 313 | /// Returns the maximum number of elements the channel can hold. | ||
| 314 | /// | ||
| 315 | /// See [`Channel::capacity()`] | ||
| 316 | pub const fn capacity(&self) -> usize { | ||
| 317 | self.channel.capacity() | ||
| 318 | } | ||
| 319 | |||
| 320 | /// Returns the free capacity of the channel. | ||
| 321 | /// | ||
| 322 | /// See [`Channel::free_capacity()`] | ||
| 323 | pub fn free_capacity(&self) -> usize { | ||
| 324 | self.channel.free_capacity() | ||
| 325 | } | ||
| 326 | |||
| 327 | /// Clears all elements in the channel. | ||
| 328 | /// | ||
| 329 | /// See [`Channel::clear()`] | ||
| 330 | pub fn clear(&self) { | ||
| 331 | self.channel.clear(); | ||
| 332 | } | ||
| 333 | |||
| 334 | /// Returns the number of elements currently in the channel. | ||
| 335 | /// | ||
| 336 | /// See [`Channel::len()`] | ||
| 337 | pub fn len(&self) -> usize { | ||
| 338 | self.channel.len() | ||
| 339 | } | ||
| 340 | |||
| 341 | /// Returns whether the channel is empty. | ||
| 342 | /// | ||
| 343 | /// See [`Channel::is_empty()`] | ||
| 344 | pub fn is_empty(&self) -> bool { | ||
| 345 | self.channel.is_empty() | ||
| 346 | } | ||
| 347 | |||
| 348 | /// Returns whether the channel is full. | ||
| 349 | /// | ||
| 350 | /// See [`Channel::is_full()`] | ||
| 351 | pub fn is_full(&self) -> bool { | ||
| 352 | self.channel.is_full() | ||
| 353 | } | ||
| 182 | } | 354 | } |
| 183 | 355 | ||
| 184 | /// Receive-only access to a [`Channel`] without knowing channel size. | 356 | /// Receive-only access to a [`Channel`] without knowing channel size. |
| @@ -209,6 +381,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> { | |||
| 209 | self.channel.try_receive_with_context(None) | 381 | self.channel.try_receive_with_context(None) |
| 210 | } | 382 | } |
| 211 | 383 | ||
| 384 | /// Peek at the next value without removing it from the queue. | ||
| 385 | /// | ||
| 386 | /// See [`Channel::try_peek()`] | ||
| 387 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 388 | where | ||
| 389 | T: Clone, | ||
| 390 | { | ||
| 391 | self.channel.try_peek_with_context(None) | ||
| 392 | } | ||
| 393 | |||
| 212 | /// Allows a poll_fn to poll until the channel is ready to receive | 394 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 213 | /// | 395 | /// |
| 214 | /// See [`Channel::poll_ready_to_receive()`] | 396 | /// See [`Channel::poll_ready_to_receive()`] |
| @@ -233,8 +415,80 @@ where | |||
| 233 | } | 415 | } |
| 234 | } | 416 | } |
| 235 | 417 | ||
| 418 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 419 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 420 | pub struct SendDynamicReceiver<'ch, T> { | ||
| 421 | pub(crate) channel: &'ch dyn DynamicChannel<T>, | ||
| 422 | } | ||
| 423 | |||
| 424 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 425 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 426 | #[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")] | ||
| 427 | pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>; | ||
| 428 | |||
| 429 | impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> { | ||
| 430 | fn clone(&self) -> Self { | ||
| 431 | *self | ||
| 432 | } | ||
| 433 | } | ||
| 434 | |||
| 435 | impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {} | ||
| 436 | unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {} | ||
| 437 | unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {} | ||
| 438 | |||
| 439 | impl<'ch, T> SendDynamicReceiver<'ch, T> { | ||
| 440 | /// Receive the next value. | ||
| 441 | /// | ||
| 442 | /// See [`Channel::receive()`]. | ||
| 443 | pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { | ||
| 444 | DynamicReceiveFuture { channel: self.channel } | ||
| 445 | } | ||
| 446 | |||
| 447 | /// Attempt to immediately receive the next value. | ||
| 448 | /// | ||
| 449 | /// See [`Channel::try_receive()`] | ||
| 450 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { | ||
| 451 | self.channel.try_receive_with_context(None) | ||
| 452 | } | ||
| 453 | |||
| 454 | /// Allows a poll_fn to poll until the channel is ready to receive | ||
| 455 | /// | ||
| 456 | /// See [`Channel::poll_ready_to_receive()`] | ||
| 457 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 458 | self.channel.poll_ready_to_receive(cx) | ||
| 459 | } | ||
| 460 | |||
| 461 | /// Poll the channel for the next item | ||
| 462 | /// | ||
| 463 | /// See [`Channel::poll_receive()`] | ||
| 464 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 465 | self.channel.poll_receive(cx) | ||
| 466 | } | ||
| 467 | } | ||
| 468 | |||
| 469 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T> | ||
| 470 | where | ||
| 471 | M: RawMutex + Sync + Send, | ||
| 472 | { | ||
| 473 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 474 | Self { channel: s.channel } | ||
| 475 | } | ||
| 476 | } | ||
| 477 | |||
| 478 | impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N> | ||
| 479 | where | ||
| 480 | M: RawMutex, | ||
| 481 | { | ||
| 482 | type Item = T; | ||
| 483 | |||
| 484 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 485 | self.channel.poll_receive(cx).map(Some) | ||
| 486 | } | ||
| 487 | } | ||
| 488 | |||
| 236 | /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. | 489 | /// Future returned by [`Channel::receive`] and [`Receiver::receive`]. |
| 237 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 490 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 491 | #[derive(Debug)] | ||
| 238 | pub struct ReceiveFuture<'ch, M, T, const N: usize> | 492 | pub struct ReceiveFuture<'ch, M, T, const N: usize> |
| 239 | where | 493 | where |
| 240 | M: RawMutex, | 494 | M: RawMutex, |
| @@ -255,6 +509,7 @@ where | |||
| 255 | 509 | ||
| 256 | /// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. | 510 | /// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`]. |
| 257 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 511 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 512 | #[derive(Debug)] | ||
| 258 | pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> | 513 | pub struct ReceiveReadyFuture<'ch, M, T, const N: usize> |
| 259 | where | 514 | where |
| 260 | M: RawMutex, | 515 | M: RawMutex, |
| @@ -298,6 +553,7 @@ impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for | |||
| 298 | 553 | ||
| 299 | /// Future returned by [`Channel::send`] and [`Sender::send`]. | 554 | /// Future returned by [`Channel::send`] and [`Sender::send`]. |
| 300 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 555 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 556 | #[derive(Debug)] | ||
| 301 | pub struct SendFuture<'ch, M, T, const N: usize> | 557 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 302 | where | 558 | where |
| 303 | M: RawMutex, | 559 | M: RawMutex, |
| @@ -368,6 +624,10 @@ pub(crate) trait DynamicChannel<T> { | |||
| 368 | 624 | ||
| 369 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; | 625 | fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>; |
| 370 | 626 | ||
| 627 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 628 | where | ||
| 629 | T: Clone; | ||
| 630 | |||
| 371 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; | 631 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; |
| 372 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; | 632 | fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; |
| 373 | 633 | ||
| @@ -391,6 +651,7 @@ pub enum TrySendError<T> { | |||
| 391 | Full(T), | 651 | Full(T), |
| 392 | } | 652 | } |
| 393 | 653 | ||
| 654 | #[derive(Debug)] | ||
| 394 | struct ChannelState<T, const N: usize> { | 655 | struct ChannelState<T, const N: usize> { |
| 395 | queue: Deque<T, N>, | 656 | queue: Deque<T, N>, |
| 396 | receiver_waker: WakerRegistration, | 657 | receiver_waker: WakerRegistration, |
| @@ -410,6 +671,31 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 410 | self.try_receive_with_context(None) | 671 | self.try_receive_with_context(None) |
| 411 | } | 672 | } |
| 412 | 673 | ||
| 674 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 675 | where | ||
| 676 | T: Clone, | ||
| 677 | { | ||
| 678 | self.try_peek_with_context(None) | ||
| 679 | } | ||
| 680 | |||
| 681 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 682 | where | ||
| 683 | T: Clone, | ||
| 684 | { | ||
| 685 | if self.queue.is_full() { | ||
| 686 | self.senders_waker.wake(); | ||
| 687 | } | ||
| 688 | |||
| 689 | if let Some(message) = self.queue.front() { | ||
| 690 | Ok(message.clone()) | ||
| 691 | } else { | ||
| 692 | if let Some(cx) = cx { | ||
| 693 | self.receiver_waker.register(cx.waker()); | ||
| 694 | } | ||
| 695 | Err(TryReceiveError::Empty) | ||
| 696 | } | ||
| 697 | } | ||
| 698 | |||
| 413 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 699 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 414 | if self.queue.is_full() { | 700 | if self.queue.is_full() { |
| 415 | self.senders_waker.wake(); | 701 | self.senders_waker.wake(); |
| @@ -478,6 +764,9 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 478 | } | 764 | } |
| 479 | 765 | ||
| 480 | fn clear(&mut self) { | 766 | fn clear(&mut self) { |
| 767 | if self.queue.is_full() { | ||
| 768 | self.senders_waker.wake(); | ||
| 769 | } | ||
| 481 | self.queue.clear(); | 770 | self.queue.clear(); |
| 482 | } | 771 | } |
| 483 | 772 | ||
| @@ -502,6 +791,7 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 502 | /// received from the channel. | 791 | /// received from the channel. |
| 503 | /// | 792 | /// |
| 504 | /// All data sent will become available in the same order as it was sent. | 793 | /// All data sent will become available in the same order as it was sent. |
| 794 | #[derive(Debug)] | ||
| 505 | pub struct Channel<M, T, const N: usize> | 795 | pub struct Channel<M, T, const N: usize> |
| 506 | where | 796 | where |
| 507 | M: RawMutex, | 797 | M: RawMutex, |
| @@ -536,6 +826,13 @@ where | |||
| 536 | self.lock(|c| c.try_receive_with_context(cx)) | 826 | self.lock(|c| c.try_receive_with_context(cx)) |
| 537 | } | 827 | } |
| 538 | 828 | ||
| 829 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 830 | where | ||
| 831 | T: Clone, | ||
| 832 | { | ||
| 833 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 834 | } | ||
| 835 | |||
| 539 | /// Poll the channel for the next message | 836 | /// Poll the channel for the next message |
| 540 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 837 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 541 | self.lock(|c| c.poll_receive(cx)) | 838 | self.lock(|c| c.poll_receive(cx)) |
| @@ -624,6 +921,17 @@ where | |||
| 624 | self.lock(|c| c.try_receive()) | 921 | self.lock(|c| c.try_receive()) |
| 625 | } | 922 | } |
| 626 | 923 | ||
| 924 | /// Peek at the next value without removing it from the queue. | ||
| 925 | /// | ||
| 926 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 927 | /// an error if the channel is empty. | ||
| 928 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 929 | where | ||
| 930 | T: Clone, | ||
| 931 | { | ||
| 932 | self.lock(|c| c.try_peek()) | ||
| 933 | } | ||
| 934 | |||
| 627 | /// Returns the maximum number of elements the channel can hold. | 935 | /// Returns the maximum number of elements the channel can hold. |
| 628 | pub const fn capacity(&self) -> usize { | 936 | pub const fn capacity(&self) -> usize { |
| 629 | N | 937 | N |
| @@ -671,6 +979,13 @@ where | |||
| 671 | Channel::try_receive_with_context(self, cx) | 979 | Channel::try_receive_with_context(self, cx) |
| 672 | } | 980 | } |
| 673 | 981 | ||
| 982 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 983 | where | ||
| 984 | T: Clone, | ||
| 985 | { | ||
| 986 | Channel::try_peek_with_context(self, cx) | ||
| 987 | } | ||
| 988 | |||
| 674 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 989 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 675 | Channel::poll_ready_to_send(self, cx) | 990 | Channel::poll_ready_to_send(self, cx) |
| 676 | } | 991 | } |
| @@ -684,6 +999,17 @@ where | |||
| 684 | } | 999 | } |
| 685 | } | 1000 | } |
| 686 | 1001 | ||
| 1002 | impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N> | ||
| 1003 | where | ||
| 1004 | M: RawMutex, | ||
| 1005 | { | ||
| 1006 | type Item = T; | ||
| 1007 | |||
| 1008 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 1009 | self.poll_receive(cx).map(Some) | ||
| 1010 | } | ||
| 1011 | } | ||
| 1012 | |||
| 687 | #[cfg(test)] | 1013 | #[cfg(test)] |
| 688 | mod tests { | 1014 | mod tests { |
| 689 | use core::time::Duration; | 1015 | use core::time::Duration; |
| @@ -742,6 +1068,8 @@ mod tests { | |||
| 742 | fn simple_send_and_receive() { | 1068 | fn simple_send_and_receive() { |
| 743 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | 1069 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
| 744 | assert!(c.try_send(1).is_ok()); | 1070 | assert!(c.try_send(1).is_ok()); |
| 1071 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 1072 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 745 | assert_eq!(c.try_receive().unwrap(), 1); | 1073 | assert_eq!(c.try_receive().unwrap(), 1); |
| 746 | } | 1074 | } |
| 747 | 1075 | ||
| @@ -772,6 +1100,8 @@ mod tests { | |||
| 772 | let r = c.dyn_receiver(); | 1100 | let r = c.dyn_receiver(); |
| 773 | 1101 | ||
| 774 | assert!(s.try_send(1).is_ok()); | 1102 | assert!(s.try_send(1).is_ok()); |
| 1103 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 1104 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 775 | assert_eq!(r.try_receive().unwrap(), 1); | 1105 | assert_eq!(r.try_receive().unwrap(), 1); |
| 776 | } | 1106 | } |
| 777 | 1107 | ||
| @@ -782,11 +1112,13 @@ mod tests { | |||
| 782 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); | 1112 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); |
| 783 | let c = &*CHANNEL.init(Channel::new()); | 1113 | let c = &*CHANNEL.init(Channel::new()); |
| 784 | let c2 = c; | 1114 | let c2 = c; |
| 785 | assert!(executor | 1115 | assert!( |
| 786 | .spawn(async move { | 1116 | executor |
| 787 | assert!(c2.try_send(1).is_ok()); | 1117 | .spawn(async move { |
| 788 | }) | 1118 | assert!(c2.try_send(1).is_ok()); |
| 789 | .is_ok()); | 1119 | }) |
| 1120 | .is_ok() | ||
| 1121 | ); | ||
| 790 | assert_eq!(c.receive().await, 1); | 1122 | assert_eq!(c.receive().await, 1); |
| 791 | } | 1123 | } |
| 792 | 1124 | ||
| @@ -813,13 +1145,15 @@ mod tests { | |||
| 813 | // However, I've used the debugger to observe that the send does indeed wait. | 1145 | // However, I've used the debugger to observe that the send does indeed wait. |
| 814 | Delay::new(Duration::from_millis(500)).await; | 1146 | Delay::new(Duration::from_millis(500)).await; |
| 815 | assert_eq!(c.receive().await, 1); | 1147 | assert_eq!(c.receive().await, 1); |
| 816 | assert!(executor | 1148 | assert!( |
| 817 | .spawn(async move { | 1149 | executor |
| 818 | loop { | 1150 | .spawn(async move { |
| 819 | c.receive().await; | 1151 | loop { |
| 820 | } | 1152 | c.receive().await; |
| 821 | }) | 1153 | } |
| 822 | .is_ok()); | 1154 | }) |
| 1155 | .is_ok() | ||
| 1156 | ); | ||
| 823 | send_task_1.unwrap().await; | 1157 | send_task_1.unwrap().await; |
| 824 | send_task_2.unwrap().await; | 1158 | send_task_2.unwrap().await; |
| 825 | } | 1159 | } |
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 2ac42c557..8ca61bc39 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs | |||
| @@ -6,6 +6,7 @@ use core::fmt::{Debug, Display, LowerHex}; | |||
| 6 | #[cfg(all(feature = "defmt", feature = "log"))] | 6 | #[cfg(all(feature = "defmt", feature = "log"))] |
| 7 | compile_error!("You may not enable both `defmt` and `log` features."); | 7 | compile_error!("You may not enable both `defmt` and `log` features."); |
| 8 | 8 | ||
| 9 | #[collapse_debuginfo(yes)] | ||
| 9 | macro_rules! assert { | 10 | macro_rules! assert { |
| 10 | ($($x:tt)*) => { | 11 | ($($x:tt)*) => { |
| 11 | { | 12 | { |
| @@ -17,6 +18,7 @@ macro_rules! assert { | |||
| 17 | }; | 18 | }; |
| 18 | } | 19 | } |
| 19 | 20 | ||
| 21 | #[collapse_debuginfo(yes)] | ||
| 20 | macro_rules! assert_eq { | 22 | macro_rules! assert_eq { |
| 21 | ($($x:tt)*) => { | 23 | ($($x:tt)*) => { |
| 22 | { | 24 | { |
| @@ -28,6 +30,7 @@ macro_rules! assert_eq { | |||
| 28 | }; | 30 | }; |
| 29 | } | 31 | } |
| 30 | 32 | ||
| 33 | #[collapse_debuginfo(yes)] | ||
| 31 | macro_rules! assert_ne { | 34 | macro_rules! assert_ne { |
| 32 | ($($x:tt)*) => { | 35 | ($($x:tt)*) => { |
| 33 | { | 36 | { |
| @@ -39,6 +42,7 @@ macro_rules! assert_ne { | |||
| 39 | }; | 42 | }; |
| 40 | } | 43 | } |
| 41 | 44 | ||
| 45 | #[collapse_debuginfo(yes)] | ||
| 42 | macro_rules! debug_assert { | 46 | macro_rules! debug_assert { |
| 43 | ($($x:tt)*) => { | 47 | ($($x:tt)*) => { |
| 44 | { | 48 | { |
| @@ -50,6 +54,7 @@ macro_rules! debug_assert { | |||
| 50 | }; | 54 | }; |
| 51 | } | 55 | } |
| 52 | 56 | ||
| 57 | #[collapse_debuginfo(yes)] | ||
| 53 | macro_rules! debug_assert_eq { | 58 | macro_rules! debug_assert_eq { |
| 54 | ($($x:tt)*) => { | 59 | ($($x:tt)*) => { |
| 55 | { | 60 | { |
| @@ -61,6 +66,7 @@ macro_rules! debug_assert_eq { | |||
| 61 | }; | 66 | }; |
| 62 | } | 67 | } |
| 63 | 68 | ||
| 69 | #[collapse_debuginfo(yes)] | ||
| 64 | macro_rules! debug_assert_ne { | 70 | macro_rules! debug_assert_ne { |
| 65 | ($($x:tt)*) => { | 71 | ($($x:tt)*) => { |
| 66 | { | 72 | { |
| @@ -72,6 +78,7 @@ macro_rules! debug_assert_ne { | |||
| 72 | }; | 78 | }; |
| 73 | } | 79 | } |
| 74 | 80 | ||
| 81 | #[collapse_debuginfo(yes)] | ||
| 75 | macro_rules! todo { | 82 | macro_rules! todo { |
| 76 | ($($x:tt)*) => { | 83 | ($($x:tt)*) => { |
| 77 | { | 84 | { |
| @@ -83,20 +90,19 @@ macro_rules! todo { | |||
| 83 | }; | 90 | }; |
| 84 | } | 91 | } |
| 85 | 92 | ||
| 86 | #[cfg(not(feature = "defmt"))] | 93 | #[collapse_debuginfo(yes)] |
| 87 | macro_rules! unreachable { | 94 | macro_rules! unreachable { |
| 88 | ($($x:tt)*) => { | 95 | ($($x:tt)*) => { |
| 89 | ::core::unreachable!($($x)*) | 96 | { |
| 90 | }; | 97 | #[cfg(not(feature = "defmt"))] |
| 91 | } | 98 | ::core::unreachable!($($x)*); |
| 92 | 99 | #[cfg(feature = "defmt")] | |
| 93 | #[cfg(feature = "defmt")] | 100 | ::defmt::unreachable!($($x)*); |
| 94 | macro_rules! unreachable { | 101 | } |
| 95 | ($($x:tt)*) => { | ||
| 96 | ::defmt::unreachable!($($x)*) | ||
| 97 | }; | 102 | }; |
| 98 | } | 103 | } |
| 99 | 104 | ||
| 105 | #[collapse_debuginfo(yes)] | ||
| 100 | macro_rules! panic { | 106 | macro_rules! panic { |
| 101 | ($($x:tt)*) => { | 107 | ($($x:tt)*) => { |
| 102 | { | 108 | { |
| @@ -108,6 +114,7 @@ macro_rules! panic { | |||
| 108 | }; | 114 | }; |
| 109 | } | 115 | } |
| 110 | 116 | ||
| 117 | #[collapse_debuginfo(yes)] | ||
| 111 | macro_rules! trace { | 118 | macro_rules! trace { |
| 112 | ($s:literal $(, $x:expr)* $(,)?) => { | 119 | ($s:literal $(, $x:expr)* $(,)?) => { |
| 113 | { | 120 | { |
| @@ -121,6 +128,7 @@ macro_rules! trace { | |||
| 121 | }; | 128 | }; |
| 122 | } | 129 | } |
| 123 | 130 | ||
| 131 | #[collapse_debuginfo(yes)] | ||
| 124 | macro_rules! debug { | 132 | macro_rules! debug { |
| 125 | ($s:literal $(, $x:expr)* $(,)?) => { | 133 | ($s:literal $(, $x:expr)* $(,)?) => { |
| 126 | { | 134 | { |
| @@ -134,6 +142,7 @@ macro_rules! debug { | |||
| 134 | }; | 142 | }; |
| 135 | } | 143 | } |
| 136 | 144 | ||
| 145 | #[collapse_debuginfo(yes)] | ||
| 137 | macro_rules! info { | 146 | macro_rules! info { |
| 138 | ($s:literal $(, $x:expr)* $(,)?) => { | 147 | ($s:literal $(, $x:expr)* $(,)?) => { |
| 139 | { | 148 | { |
| @@ -147,6 +156,7 @@ macro_rules! info { | |||
| 147 | }; | 156 | }; |
| 148 | } | 157 | } |
| 149 | 158 | ||
| 159 | #[collapse_debuginfo(yes)] | ||
| 150 | macro_rules! warn { | 160 | macro_rules! warn { |
| 151 | ($s:literal $(, $x:expr)* $(,)?) => { | 161 | ($s:literal $(, $x:expr)* $(,)?) => { |
| 152 | { | 162 | { |
| @@ -160,6 +170,7 @@ macro_rules! warn { | |||
| 160 | }; | 170 | }; |
| 161 | } | 171 | } |
| 162 | 172 | ||
| 173 | #[collapse_debuginfo(yes)] | ||
| 163 | macro_rules! error { | 174 | macro_rules! error { |
| 164 | ($s:literal $(, $x:expr)* $(,)?) => { | 175 | ($s:literal $(, $x:expr)* $(,)?) => { |
| 165 | { | 176 | { |
| @@ -174,6 +185,7 @@ macro_rules! error { | |||
| 174 | } | 185 | } |
| 175 | 186 | ||
| 176 | #[cfg(feature = "defmt")] | 187 | #[cfg(feature = "defmt")] |
| 188 | #[collapse_debuginfo(yes)] | ||
| 177 | macro_rules! unwrap { | 189 | macro_rules! unwrap { |
| 178 | ($($x:tt)*) => { | 190 | ($($x:tt)*) => { |
| 179 | ::defmt::unwrap!($($x)*) | 191 | ::defmt::unwrap!($($x)*) |
| @@ -181,6 +193,7 @@ macro_rules! unwrap { | |||
| 181 | } | 193 | } |
| 182 | 194 | ||
| 183 | #[cfg(not(feature = "defmt"))] | 195 | #[cfg(not(feature = "defmt"))] |
| 196 | #[collapse_debuginfo(yes)] | ||
| 184 | macro_rules! unwrap { | 197 | macro_rules! unwrap { |
| 185 | ($arg:expr) => { | 198 | ($arg:expr) => { |
| 186 | match $crate::fmt::Try::into_result($arg) { | 199 | match $crate::fmt::Try::into_result($arg) { |
diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs new file mode 100644 index 000000000..945560a80 --- /dev/null +++ b/embassy-sync/src/lazy_lock.rs | |||
| @@ -0,0 +1,174 @@ | |||
| 1 | //! Synchronization primitive for initializing a value once, allowing others to get a reference to the value. | ||
| 2 | |||
| 3 | use core::cell::UnsafeCell; | ||
| 4 | use core::mem::ManuallyDrop; | ||
| 5 | use core::sync::atomic::{AtomicBool, Ordering}; | ||
| 6 | |||
| 7 | /// The `LazyLock` is a synchronization primitive that allows for | ||
| 8 | /// initializing a value once, and allowing others to obtain a | ||
| 9 | /// reference to the value. This is useful for lazy initialization of | ||
| 10 | /// a static value. | ||
| 11 | /// | ||
| 12 | /// # Example | ||
| 13 | /// ``` | ||
| 14 | /// use futures_executor::block_on; | ||
| 15 | /// use embassy_sync::lazy_lock::LazyLock; | ||
| 16 | /// | ||
| 17 | /// // Define a static value that will be lazily initialized | ||
| 18 | /// // at runtime at the first access. | ||
| 19 | /// static VALUE: LazyLock<u32> = LazyLock::new(|| 20); | ||
| 20 | /// | ||
| 21 | /// let reference = VALUE.get(); | ||
| 22 | /// assert_eq!(reference, &20); | ||
| 23 | /// ``` | ||
| 24 | #[derive(Debug)] | ||
| 25 | pub struct LazyLock<T, F = fn() -> T> { | ||
| 26 | init: AtomicBool, | ||
| 27 | data: UnsafeCell<Data<T, F>>, | ||
| 28 | } | ||
| 29 | |||
| 30 | union Data<T, F> { | ||
| 31 | value: ManuallyDrop<T>, | ||
| 32 | f: ManuallyDrop<F>, | ||
| 33 | } | ||
| 34 | |||
| 35 | unsafe impl<T, F> Sync for LazyLock<T, F> | ||
| 36 | where | ||
| 37 | T: Sync, | ||
| 38 | F: Sync, | ||
| 39 | { | ||
| 40 | } | ||
| 41 | |||
| 42 | impl<T, F: FnOnce() -> T> LazyLock<T, F> { | ||
| 43 | /// Create a new uninitialized `StaticLock`. | ||
| 44 | pub const fn new(init_fn: F) -> Self { | ||
| 45 | Self { | ||
| 46 | init: AtomicBool::new(false), | ||
| 47 | data: UnsafeCell::new(Data { | ||
| 48 | f: ManuallyDrop::new(init_fn), | ||
| 49 | }), | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | /// Get a reference to the underlying value, initializing it if it | ||
| 54 | /// has not been done already. | ||
| 55 | #[inline] | ||
| 56 | pub fn get(&self) -> &T { | ||
| 57 | self.ensure_init_fast(); | ||
| 58 | unsafe { &(*self.data.get()).value } | ||
| 59 | } | ||
| 60 | |||
| 61 | /// Get a mutable reference to the underlying value, initializing it if it | ||
| 62 | /// has not been done already. | ||
| 63 | #[inline] | ||
| 64 | pub fn get_mut(&mut self) -> &mut T { | ||
| 65 | self.ensure_init_fast(); | ||
| 66 | unsafe { &mut (*self.data.get()).value } | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Consume the `LazyLock`, returning the underlying value. The | ||
| 70 | /// initialization function will be called if it has not been | ||
| 71 | /// already. | ||
| 72 | #[inline] | ||
| 73 | pub fn into_inner(self) -> T { | ||
| 74 | self.ensure_init_fast(); | ||
| 75 | let this = ManuallyDrop::new(self); | ||
| 76 | let data = unsafe { core::ptr::read(&this.data) }.into_inner(); | ||
| 77 | |||
| 78 | ManuallyDrop::into_inner(unsafe { data.value }) | ||
| 79 | } | ||
| 80 | |||
| 81 | /// Initialize the `LazyLock` if it has not been initialized yet. | ||
| 82 | /// This function is a fast track to [`Self::ensure_init`] | ||
| 83 | /// which does not require a critical section in most cases when | ||
| 84 | /// the value has been initialized already. | ||
| 85 | /// When this function returns, `self.data` is guaranteed to be | ||
| 86 | /// initialized and visible on the current core. | ||
| 87 | #[inline] | ||
| 88 | fn ensure_init_fast(&self) { | ||
| 89 | if !self.init.load(Ordering::Acquire) { | ||
| 90 | self.ensure_init(); | ||
| 91 | } | ||
| 92 | } | ||
| 93 | |||
| 94 | /// Initialize the `LazyLock` if it has not been initialized yet. | ||
| 95 | /// When this function returns, `self.data` is guaranteed to be | ||
| 96 | /// initialized and visible on the current core. | ||
| 97 | fn ensure_init(&self) { | ||
| 98 | critical_section::with(|_| { | ||
| 99 | if !self.init.load(Ordering::Acquire) { | ||
| 100 | let data = unsafe { &mut *self.data.get() }; | ||
| 101 | let f = unsafe { ManuallyDrop::take(&mut data.f) }; | ||
| 102 | let value = f(); | ||
| 103 | data.value = ManuallyDrop::new(value); | ||
| 104 | |||
| 105 | self.init.store(true, Ordering::Release); | ||
| 106 | } | ||
| 107 | }); | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 111 | impl<T, F> Drop for LazyLock<T, F> { | ||
| 112 | fn drop(&mut self) { | ||
| 113 | if self.init.load(Ordering::Acquire) { | ||
| 114 | unsafe { ManuallyDrop::drop(&mut self.data.get_mut().value) }; | ||
| 115 | } else { | ||
| 116 | unsafe { ManuallyDrop::drop(&mut self.data.get_mut().f) }; | ||
| 117 | } | ||
| 118 | } | ||
| 119 | } | ||
| 120 | |||
| 121 | #[cfg(test)] | ||
| 122 | mod tests { | ||
| 123 | use core::sync::atomic::{AtomicU32, Ordering}; | ||
| 124 | |||
| 125 | use super::*; | ||
| 126 | |||
| 127 | #[test] | ||
| 128 | fn test_lazy_lock() { | ||
| 129 | static VALUE: LazyLock<u32> = LazyLock::new(|| 20); | ||
| 130 | let reference = VALUE.get(); | ||
| 131 | assert_eq!(reference, &20); | ||
| 132 | } | ||
| 133 | #[test] | ||
| 134 | fn test_lazy_lock_mutation() { | ||
| 135 | let mut value: LazyLock<u32> = LazyLock::new(|| 20); | ||
| 136 | *value.get_mut() = 21; | ||
| 137 | let reference = value.get(); | ||
| 138 | assert_eq!(reference, &21); | ||
| 139 | } | ||
| 140 | #[test] | ||
| 141 | fn test_lazy_lock_into_inner() { | ||
| 142 | let lazy: LazyLock<u32> = LazyLock::new(|| 20); | ||
| 143 | let value = lazy.into_inner(); | ||
| 144 | assert_eq!(value, 20); | ||
| 145 | } | ||
| 146 | |||
| 147 | static DROP_CHECKER: AtomicU32 = AtomicU32::new(0); | ||
| 148 | #[derive(Debug)] | ||
| 149 | struct DropCheck; | ||
| 150 | |||
| 151 | impl Drop for DropCheck { | ||
| 152 | fn drop(&mut self) { | ||
| 153 | DROP_CHECKER.fetch_add(1, Ordering::Acquire); | ||
| 154 | } | ||
| 155 | } | ||
| 156 | |||
| 157 | #[test] | ||
| 158 | fn test_lazy_drop() { | ||
| 159 | let lazy: LazyLock<DropCheck> = LazyLock::new(|| DropCheck); | ||
| 160 | assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 0); | ||
| 161 | lazy.get(); | ||
| 162 | drop(lazy); | ||
| 163 | assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1); | ||
| 164 | |||
| 165 | let dropper = DropCheck; | ||
| 166 | let lazy_fn: LazyLock<u32, _> = LazyLock::new(move || { | ||
| 167 | let _a = dropper; | ||
| 168 | 20 | ||
| 169 | }); | ||
| 170 | assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1); | ||
| 171 | drop(lazy_fn); | ||
| 172 | assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 2); | ||
| 173 | } | ||
| 174 | } | ||
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index a5eee8d02..1cfde8b10 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -1,6 +1,7 @@ | |||
| 1 | #![cfg_attr(not(feature = "std"), no_std)] | 1 | #![cfg_attr(not(feature = "std"), no_std)] |
| 2 | #![allow(async_fn_in_trait)] | 2 | #![allow(async_fn_in_trait)] |
| 3 | #![allow(clippy::new_without_default)] | 3 | #![allow(clippy::new_without_default)] |
| 4 | #![allow(unsafe_op_in_unsafe_fn)] | ||
| 4 | #![doc = include_str!("../README.md")] | 5 | #![doc = include_str!("../README.md")] |
| 5 | #![warn(missing_docs)] | 6 | #![warn(missing_docs)] |
| 6 | 7 | ||
| @@ -12,12 +13,15 @@ mod ring_buffer; | |||
| 12 | 13 | ||
| 13 | pub mod blocking_mutex; | 14 | pub mod blocking_mutex; |
| 14 | pub mod channel; | 15 | pub mod channel; |
| 16 | pub mod lazy_lock; | ||
| 15 | pub mod mutex; | 17 | pub mod mutex; |
| 16 | pub mod once_lock; | 18 | pub mod once_lock; |
| 17 | pub mod pipe; | 19 | pub mod pipe; |
| 18 | pub mod priority_channel; | 20 | pub mod priority_channel; |
| 19 | pub mod pubsub; | 21 | pub mod pubsub; |
| 22 | pub mod rwlock; | ||
| 20 | pub mod semaphore; | 23 | pub mod semaphore; |
| 21 | pub mod signal; | 24 | pub mod signal; |
| 22 | pub mod waitqueue; | 25 | pub mod waitqueue; |
| 26 | pub mod watch; | ||
| 23 | pub mod zerocopy_channel; | 27 | pub mod zerocopy_channel; |
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 8c3a3af9f..96b834f02 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs | |||
| @@ -2,13 +2,13 @@ | |||
| 2 | //! | 2 | //! |
| 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. | 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. |
| 4 | use core::cell::{RefCell, UnsafeCell}; | 4 | use core::cell::{RefCell, UnsafeCell}; |
| 5 | use core::future::poll_fn; | 5 | use core::future::{Future, poll_fn}; |
| 6 | use core::ops::{Deref, DerefMut}; | 6 | use core::ops::{Deref, DerefMut}; |
| 7 | use core::task::Poll; | 7 | use core::task::Poll; |
| 8 | use core::{fmt, mem}; | 8 | use core::{fmt, mem}; |
| 9 | 9 | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | use crate::blocking_mutex::Mutex as BlockingMutex; | 10 | use crate::blocking_mutex::Mutex as BlockingMutex; |
| 11 | use crate::blocking_mutex::raw::RawMutex; | ||
| 12 | use crate::waitqueue::WakerRegistration; | 12 | use crate::waitqueue::WakerRegistration; |
| 13 | 13 | ||
| 14 | /// Error returned by [`Mutex::try_lock`] | 14 | /// Error returned by [`Mutex::try_lock`] |
| @@ -16,6 +16,7 @@ use crate::waitqueue::WakerRegistration; | |||
| 16 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | 16 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] |
| 17 | pub struct TryLockError; | 17 | pub struct TryLockError; |
| 18 | 18 | ||
| 19 | #[derive(Debug)] | ||
| 19 | struct State { | 20 | struct State { |
| 20 | locked: bool, | 21 | locked: bool, |
| 21 | waker: WakerRegistration, | 22 | waker: WakerRegistration, |
| @@ -23,7 +24,7 @@ struct State { | |||
| 23 | 24 | ||
| 24 | /// Async mutex. | 25 | /// Async mutex. |
| 25 | /// | 26 | /// |
| 26 | /// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). | 27 | /// The mutex is generic over a blocking [`RawMutex`]. |
| 27 | /// The raw mutex is used to guard access to the internal "is locked" flag. It | 28 | /// The raw mutex is used to guard access to the internal "is locked" flag. It |
| 28 | /// is held for very short periods only, while locking and unlocking. It is *not* held | 29 | /// is held for very short periods only, while locking and unlocking. It is *not* held |
| 29 | /// for the entire time the async Mutex is locked. | 30 | /// for the entire time the async Mutex is locked. |
| @@ -73,7 +74,7 @@ where | |||
| 73 | /// Lock the mutex. | 74 | /// Lock the mutex. |
| 74 | /// | 75 | /// |
| 75 | /// This will wait for the mutex to be unlocked if it's already locked. | 76 | /// This will wait for the mutex to be unlocked if it's already locked. |
| 76 | pub async fn lock(&self) -> MutexGuard<'_, M, T> { | 77 | pub fn lock(&self) -> impl Future<Output = MutexGuard<'_, M, T>> { |
| 77 | poll_fn(|cx| { | 78 | poll_fn(|cx| { |
| 78 | let ready = self.state.lock(|s| { | 79 | let ready = self.state.lock(|s| { |
| 79 | let mut s = s.borrow_mut(); | 80 | let mut s = s.borrow_mut(); |
| @@ -92,7 +93,6 @@ where | |||
| 92 | Poll::Pending | 93 | Poll::Pending |
| 93 | } | 94 | } |
| 94 | }) | 95 | }) |
| 95 | .await | ||
| 96 | } | 96 | } |
| 97 | 97 | ||
| 98 | /// Attempt to immediately lock the mutex. | 98 | /// Attempt to immediately lock the mutex. |
| @@ -138,7 +138,7 @@ impl<M: RawMutex, T> From<T> for Mutex<M, T> { | |||
| 138 | impl<M, T> Default for Mutex<M, T> | 138 | impl<M, T> Default for Mutex<M, T> |
| 139 | where | 139 | where |
| 140 | M: RawMutex, | 140 | M: RawMutex, |
| 141 | T: ?Sized + Default, | 141 | T: Default, |
| 142 | { | 142 | { |
| 143 | fn default() -> Self { | 143 | fn default() -> Self { |
| 144 | Self::new(Default::default()) | 144 | Self::new(Default::default()) |
| @@ -172,6 +172,7 @@ where | |||
| 172 | /// | 172 | /// |
| 173 | /// Dropping it unlocks the mutex. | 173 | /// Dropping it unlocks the mutex. |
| 174 | #[clippy::has_significant_drop] | 174 | #[clippy::has_significant_drop] |
| 175 | #[must_use = "if unused the Mutex will immediately unlock"] | ||
| 175 | pub struct MutexGuard<'a, M, T> | 176 | pub struct MutexGuard<'a, M, T> |
| 176 | where | 177 | where |
| 177 | M: RawMutex, | 178 | M: RawMutex, |
| @@ -186,7 +187,7 @@ where | |||
| 186 | T: ?Sized, | 187 | T: ?Sized, |
| 187 | { | 188 | { |
| 188 | /// Returns a locked view over a portion of the locked data. | 189 | /// Returns a locked view over a portion of the locked data. |
| 189 | pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { | 190 | pub fn map<U: ?Sized>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { |
| 190 | let mutex = this.mutex; | 191 | let mutex = this.mutex; |
| 191 | let value = fun(unsafe { &mut *this.mutex.inner.get() }); | 192 | let value = fun(unsafe { &mut *this.mutex.inner.get() }); |
| 192 | // Don't run the `drop` method for MutexGuard. The ownership of the underlying | 193 | // Don't run the `drop` method for MutexGuard. The ownership of the underlying |
| @@ -278,7 +279,7 @@ where | |||
| 278 | T: ?Sized, | 279 | T: ?Sized, |
| 279 | { | 280 | { |
| 280 | /// Returns a locked view over a portion of the locked data. | 281 | /// Returns a locked view over a portion of the locked data. |
| 281 | pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { | 282 | pub fn map<U: ?Sized>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> { |
| 282 | let state = this.state; | 283 | let state = this.state; |
| 283 | let value = fun(unsafe { &mut *this.value }); | 284 | let value = fun(unsafe { &mut *this.value }); |
| 284 | // Don't run the `drop` method for MutexGuard. The ownership of the underlying | 285 | // Don't run the `drop` method for MutexGuard. The ownership of the underlying |
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs index 55608ba32..2af19ca20 100644 --- a/embassy-sync/src/once_lock.rs +++ b/embassy-sync/src/once_lock.rs | |||
| @@ -1,7 +1,8 @@ | |||
| 1 | //! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. | 1 | //! Synchronization primitive for initializing a value once, allowing others to await a reference to the value. |
| 2 | 2 | ||
| 3 | use core::cell::Cell; | 3 | use core::cell::Cell; |
| 4 | use core::future::poll_fn; | 4 | use core::fmt::{Debug, Formatter}; |
| 5 | use core::future::{Future, poll_fn}; | ||
| 5 | use core::mem::MaybeUninit; | 6 | use core::mem::MaybeUninit; |
| 6 | use core::sync::atomic::{AtomicBool, Ordering}; | 7 | use core::sync::atomic::{AtomicBool, Ordering}; |
| 7 | use core::task::Poll; | 8 | use core::task::Poll; |
| @@ -42,7 +43,16 @@ pub struct OnceLock<T> { | |||
| 42 | data: Cell<MaybeUninit<T>>, | 43 | data: Cell<MaybeUninit<T>>, |
| 43 | } | 44 | } |
| 44 | 45 | ||
| 45 | unsafe impl<T> Sync for OnceLock<T> {} | 46 | impl<T> Debug for OnceLock<T> { |
| 47 | fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { | ||
| 48 | f.debug_struct("OnceLock") | ||
| 49 | .field("init", &self.init) | ||
| 50 | .field("data", &"Cell<MaybeUninit<{unprintable}>>") | ||
| 51 | .finish() | ||
| 52 | } | ||
| 53 | } | ||
| 54 | |||
| 55 | unsafe impl<T> Sync for OnceLock<T> where T: Sync {} | ||
| 46 | 56 | ||
| 47 | impl<T> OnceLock<T> { | 57 | impl<T> OnceLock<T> { |
| 48 | /// Create a new uninitialized `OnceLock`. | 58 | /// Create a new uninitialized `OnceLock`. |
| @@ -55,7 +65,7 @@ impl<T> OnceLock<T> { | |||
| 55 | 65 | ||
| 56 | /// Get a reference to the underlying value, waiting for it to be set. | 66 | /// Get a reference to the underlying value, waiting for it to be set. |
| 57 | /// If the value is already set, this will return immediately. | 67 | /// If the value is already set, this will return immediately. |
| 58 | pub async fn get(&self) -> &T { | 68 | pub fn get(&self) -> impl Future<Output = &T> { |
| 59 | poll_fn(|cx| match self.try_get() { | 69 | poll_fn(|cx| match self.try_get() { |
| 60 | Some(data) => Poll::Ready(data), | 70 | Some(data) => Poll::Ready(data), |
| 61 | None => { | 71 | None => { |
| @@ -63,7 +73,6 @@ impl<T> OnceLock<T> { | |||
| 63 | Poll::Pending | 73 | Poll::Pending |
| 64 | } | 74 | } |
| 65 | }) | 75 | }) |
| 66 | .await | ||
| 67 | } | 76 | } |
| 68 | 77 | ||
| 69 | /// Try to get a reference to the underlying value if it exists. | 78 | /// Try to get a reference to the underlying value if it exists. |
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index cd5b8ed75..215a556d9 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs | |||
| @@ -7,12 +7,13 @@ use core::ops::Range; | |||
| 7 | use core::pin::Pin; | 7 | use core::pin::Pin; |
| 8 | use core::task::{Context, Poll}; | 8 | use core::task::{Context, Poll}; |
| 9 | 9 | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | use crate::blocking_mutex::Mutex; | 10 | use crate::blocking_mutex::Mutex; |
| 11 | use crate::blocking_mutex::raw::RawMutex; | ||
| 12 | use crate::ring_buffer::RingBuffer; | 12 | use crate::ring_buffer::RingBuffer; |
| 13 | use crate::waitqueue::WakerRegistration; | 13 | use crate::waitqueue::WakerRegistration; |
| 14 | 14 | ||
| 15 | /// Write-only access to a [`Pipe`]. | 15 | /// Write-only access to a [`Pipe`]. |
| 16 | #[derive(Debug)] | ||
| 16 | pub struct Writer<'p, M, const N: usize> | 17 | pub struct Writer<'p, M, const N: usize> |
| 17 | where | 18 | where |
| 18 | M: RawMutex, | 19 | M: RawMutex, |
| @@ -52,6 +53,7 @@ where | |||
| 52 | 53 | ||
| 53 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. | 54 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. |
| 54 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 55 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 56 | #[derive(Debug)] | ||
| 55 | pub struct WriteFuture<'p, M, const N: usize> | 57 | pub struct WriteFuture<'p, M, const N: usize> |
| 56 | where | 58 | where |
| 57 | M: RawMutex, | 59 | M: RawMutex, |
| @@ -77,6 +79,7 @@ where | |||
| 77 | impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} | 79 | impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} |
| 78 | 80 | ||
| 79 | /// Read-only access to a [`Pipe`]. | 81 | /// Read-only access to a [`Pipe`]. |
| 82 | #[derive(Debug)] | ||
| 80 | pub struct Reader<'p, M, const N: usize> | 83 | pub struct Reader<'p, M, const N: usize> |
| 81 | where | 84 | where |
| 82 | M: RawMutex, | 85 | M: RawMutex, |
| @@ -128,6 +131,7 @@ where | |||
| 128 | 131 | ||
| 129 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | 132 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. |
| 130 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 133 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 134 | #[derive(Debug)] | ||
| 131 | pub struct ReadFuture<'p, M, const N: usize> | 135 | pub struct ReadFuture<'p, M, const N: usize> |
| 132 | where | 136 | where |
| 133 | M: RawMutex, | 137 | M: RawMutex, |
| @@ -152,8 +156,9 @@ where | |||
| 152 | 156 | ||
| 153 | impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} | 157 | impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} |
| 154 | 158 | ||
| 155 | /// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. | 159 | /// Future returned by [`Reader::fill_buf`]. |
| 156 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 160 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 161 | #[derive(Debug)] | ||
| 157 | pub struct FillBufFuture<'p, M, const N: usize> | 162 | pub struct FillBufFuture<'p, M, const N: usize> |
| 158 | where | 163 | where |
| 159 | M: RawMutex, | 164 | M: RawMutex, |
| @@ -199,6 +204,7 @@ pub enum TryWriteError { | |||
| 199 | Full, | 204 | Full, |
| 200 | } | 205 | } |
| 201 | 206 | ||
| 207 | #[derive(Debug)] | ||
| 202 | struct PipeState<const N: usize> { | 208 | struct PipeState<const N: usize> { |
| 203 | buffer: RingBuffer<N>, | 209 | buffer: RingBuffer<N>, |
| 204 | read_waker: WakerRegistration, | 210 | read_waker: WakerRegistration, |
| @@ -206,6 +212,7 @@ struct PipeState<const N: usize> { | |||
| 206 | } | 212 | } |
| 207 | 213 | ||
| 208 | #[repr(transparent)] | 214 | #[repr(transparent)] |
| 215 | #[derive(Debug)] | ||
| 209 | struct Buffer<const N: usize>(UnsafeCell<[u8; N]>); | 216 | struct Buffer<const N: usize>(UnsafeCell<[u8; N]>); |
| 210 | 217 | ||
| 211 | impl<const N: usize> Buffer<N> { | 218 | impl<const N: usize> Buffer<N> { |
| @@ -230,6 +237,7 @@ unsafe impl<const N: usize> Sync for Buffer<N> {} | |||
| 230 | /// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. | 237 | /// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. |
| 231 | /// | 238 | /// |
| 232 | /// All data written will become available in the same order as it was written. | 239 | /// All data written will become available in the same order as it was written. |
| 240 | #[derive(Debug)] | ||
| 233 | pub struct Pipe<M, const N: usize> | 241 | pub struct Pipe<M, const N: usize> |
| 234 | where | 242 | where |
| 235 | M: RawMutex, | 243 | M: RawMutex, |
| @@ -532,6 +540,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> | |||
| 532 | } | 540 | } |
| 533 | } | 541 | } |
| 534 | 542 | ||
| 543 | // | ||
| 544 | // Type-erased variants | ||
| 545 | // | ||
| 546 | |||
| 547 | pub(crate) trait DynamicPipe { | ||
| 548 | fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>; | ||
| 549 | fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>; | ||
| 550 | |||
| 551 | fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>; | ||
| 552 | fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>; | ||
| 553 | |||
| 554 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>; | ||
| 555 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>; | ||
| 556 | |||
| 557 | fn consume(&self, amt: usize); | ||
| 558 | unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>; | ||
| 559 | } | ||
| 560 | |||
| 561 | impl<M, const N: usize> DynamicPipe for Pipe<M, N> | ||
| 562 | where | ||
| 563 | M: RawMutex, | ||
| 564 | { | ||
| 565 | fn consume(&self, amt: usize) { | ||
| 566 | Pipe::consume(self, amt) | ||
| 567 | } | ||
| 568 | |||
| 569 | unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> { | ||
| 570 | Pipe::try_fill_buf_with_context(self, cx) | ||
| 571 | } | ||
| 572 | |||
| 573 | fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> { | ||
| 574 | Pipe::write(self, buf).into() | ||
| 575 | } | ||
| 576 | |||
| 577 | fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> { | ||
| 578 | Pipe::read(self, buf).into() | ||
| 579 | } | ||
| 580 | |||
| 581 | fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 582 | Pipe::try_read(self, buf) | ||
| 583 | } | ||
| 584 | |||
| 585 | fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 586 | Pipe::try_write(self, buf) | ||
| 587 | } | ||
| 588 | |||
| 589 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 590 | Pipe::try_write_with_context(self, cx, buf) | ||
| 591 | } | ||
| 592 | |||
| 593 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 594 | Pipe::try_read_with_context(self, cx, buf) | ||
| 595 | } | ||
| 596 | } | ||
| 597 | |||
| 598 | /// Write-only access to the dynamic pipe. | ||
| 599 | pub struct DynamicWriter<'p> { | ||
| 600 | pipe: &'p dyn DynamicPipe, | ||
| 601 | } | ||
| 602 | |||
| 603 | impl<'p> Clone for DynamicWriter<'p> { | ||
| 604 | fn clone(&self) -> Self { | ||
| 605 | *self | ||
| 606 | } | ||
| 607 | } | ||
| 608 | |||
| 609 | impl<'p> Copy for DynamicWriter<'p> {} | ||
| 610 | |||
| 611 | impl<'p> DynamicWriter<'p> { | ||
| 612 | /// Write some bytes to the pipe. | ||
| 613 | /// | ||
| 614 | /// See [`Pipe::write()`] | ||
| 615 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> { | ||
| 616 | self.pipe.write(buf) | ||
| 617 | } | ||
| 618 | |||
| 619 | /// Attempt to immediately write some bytes to the pipe. | ||
| 620 | /// | ||
| 621 | /// See [`Pipe::try_write()`] | ||
| 622 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 623 | self.pipe.try_write(buf) | ||
| 624 | } | ||
| 625 | } | ||
| 626 | |||
| 627 | impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p> | ||
| 628 | where | ||
| 629 | M: RawMutex, | ||
| 630 | { | ||
| 631 | fn from(value: Writer<'p, M, N>) -> Self { | ||
| 632 | Self { pipe: value.pipe } | ||
| 633 | } | ||
| 634 | } | ||
| 635 | |||
| 636 | /// Future returned by [`DynamicWriter::write`]. | ||
| 637 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 638 | pub struct DynamicWriteFuture<'p> { | ||
| 639 | pipe: &'p dyn DynamicPipe, | ||
| 640 | buf: &'p [u8], | ||
| 641 | } | ||
| 642 | |||
| 643 | impl<'p> Future for DynamicWriteFuture<'p> { | ||
| 644 | type Output = usize; | ||
| 645 | |||
| 646 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 647 | match self.pipe.try_write_with_context(Some(cx), self.buf) { | ||
| 648 | Ok(n) => Poll::Ready(n), | ||
| 649 | Err(TryWriteError::Full) => Poll::Pending, | ||
| 650 | } | ||
| 651 | } | ||
| 652 | } | ||
| 653 | |||
| 654 | impl<'p> Unpin for DynamicWriteFuture<'p> {} | ||
| 655 | |||
| 656 | impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p> | ||
| 657 | where | ||
| 658 | M: RawMutex, | ||
| 659 | { | ||
| 660 | fn from(value: WriteFuture<'p, M, N>) -> Self { | ||
| 661 | Self { | ||
| 662 | pipe: value.pipe, | ||
| 663 | buf: value.buf, | ||
| 664 | } | ||
| 665 | } | ||
| 666 | } | ||
| 667 | |||
| 668 | /// Read-only access to a dynamic pipe. | ||
| 669 | pub struct DynamicReader<'p> { | ||
| 670 | pipe: &'p dyn DynamicPipe, | ||
| 671 | } | ||
| 672 | |||
| 673 | impl<'p> DynamicReader<'p> { | ||
| 674 | /// Read some bytes from the pipe. | ||
| 675 | /// | ||
| 676 | /// See [`Pipe::read()`] | ||
| 677 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> { | ||
| 678 | self.pipe.read(buf) | ||
| 679 | } | ||
| 680 | |||
| 681 | /// Attempt to immediately read some bytes from the pipe. | ||
| 682 | /// | ||
| 683 | /// See [`Pipe::try_read()`] | ||
| 684 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 685 | self.pipe.try_read(buf) | ||
| 686 | } | ||
| 687 | |||
| 688 | /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty. | ||
| 689 | /// | ||
| 690 | /// If no bytes are currently available to read, this function waits until at least one byte is available. | ||
| 691 | /// | ||
| 692 | /// If the reader is at end-of-file (EOF), an empty slice is returned. | ||
| 693 | pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> { | ||
| 694 | DynamicFillBufFuture { pipe: Some(self.pipe) } | ||
| 695 | } | ||
| 696 | |||
| 697 | /// Try returning contents of the internal buffer. | ||
| 698 | /// | ||
| 699 | /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`. | ||
| 700 | /// | ||
| 701 | /// If the reader is at end-of-file (EOF), an empty slice is returned. | ||
| 702 | pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> { | ||
| 703 | unsafe { self.pipe.try_fill_buf_with_context(None) } | ||
| 704 | } | ||
| 705 | |||
| 706 | /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`. | ||
| 707 | pub fn consume(&mut self, amt: usize) { | ||
| 708 | self.pipe.consume(amt) | ||
| 709 | } | ||
| 710 | } | ||
| 711 | |||
| 712 | impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p> | ||
| 713 | where | ||
| 714 | M: RawMutex, | ||
| 715 | { | ||
| 716 | fn from(value: Reader<'p, M, N>) -> Self { | ||
| 717 | Self { pipe: value.pipe } | ||
| 718 | } | ||
| 719 | } | ||
| 720 | |||
| 721 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | ||
| 722 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 723 | pub struct DynamicReadFuture<'p> { | ||
| 724 | pipe: &'p dyn DynamicPipe, | ||
| 725 | buf: &'p mut [u8], | ||
| 726 | } | ||
| 727 | |||
| 728 | impl<'p> Future for DynamicReadFuture<'p> { | ||
| 729 | type Output = usize; | ||
| 730 | |||
| 731 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 732 | match self.pipe.try_read_with_context(Some(cx), self.buf) { | ||
| 733 | Ok(n) => Poll::Ready(n), | ||
| 734 | Err(TryReadError::Empty) => Poll::Pending, | ||
| 735 | } | ||
| 736 | } | ||
| 737 | } | ||
| 738 | |||
| 739 | impl<'p> Unpin for DynamicReadFuture<'p> {} | ||
| 740 | |||
| 741 | impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p> | ||
| 742 | where | ||
| 743 | M: RawMutex, | ||
| 744 | { | ||
| 745 | fn from(value: ReadFuture<'p, M, N>) -> Self { | ||
| 746 | Self { | ||
| 747 | pipe: value.pipe, | ||
| 748 | buf: value.buf, | ||
| 749 | } | ||
| 750 | } | ||
| 751 | } | ||
| 752 | |||
| 753 | /// Future returned by [`DynamicReader::fill_buf`]. | ||
| 754 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 755 | pub struct DynamicFillBufFuture<'p> { | ||
| 756 | pipe: Option<&'p dyn DynamicPipe>, | ||
| 757 | } | ||
| 758 | |||
| 759 | impl<'p> Future for DynamicFillBufFuture<'p> { | ||
| 760 | type Output = &'p [u8]; | ||
| 761 | |||
| 762 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 763 | let pipe = self.pipe.take().unwrap(); | ||
| 764 | match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } { | ||
| 765 | Ok(buf) => Poll::Ready(buf), | ||
| 766 | Err(TryReadError::Empty) => { | ||
| 767 | self.pipe = Some(pipe); | ||
| 768 | Poll::Pending | ||
| 769 | } | ||
| 770 | } | ||
| 771 | } | ||
| 772 | } | ||
| 773 | |||
| 774 | impl<'p> Unpin for DynamicFillBufFuture<'p> {} | ||
| 775 | |||
| 776 | impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p> | ||
| 777 | where | ||
| 778 | M: RawMutex, | ||
| 779 | { | ||
| 780 | fn from(value: FillBufFuture<'p, M, N>) -> Self { | ||
| 781 | Self { | ||
| 782 | pipe: value.pipe.map(|p| p as &dyn DynamicPipe), | ||
| 783 | } | ||
| 784 | } | ||
| 785 | } | ||
| 786 | |||
| 535 | #[cfg(test)] | 787 | #[cfg(test)] |
| 536 | mod tests { | 788 | mod tests { |
| 537 | use futures_executor::ThreadPool; | 789 | use futures_executor::ThreadPool; |
| @@ -619,6 +871,35 @@ mod tests { | |||
| 619 | let _ = w.clone(); | 871 | let _ = w.clone(); |
| 620 | } | 872 | } |
| 621 | 873 | ||
| 874 | #[test] | ||
| 875 | fn dynamic_dispatch_pipe() { | ||
| 876 | let mut c = Pipe::<NoopRawMutex, 3>::new(); | ||
| 877 | let (r, w) = c.split(); | ||
| 878 | let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into()); | ||
| 879 | |||
| 880 | assert!(w.try_write(&[42, 43]).is_ok()); | ||
| 881 | let buf = r.try_fill_buf().unwrap(); | ||
| 882 | assert_eq!(buf, &[42, 43]); | ||
| 883 | let buf = r.try_fill_buf().unwrap(); | ||
| 884 | assert_eq!(buf, &[42, 43]); | ||
| 885 | r.consume(1); | ||
| 886 | let buf = r.try_fill_buf().unwrap(); | ||
| 887 | assert_eq!(buf, &[43]); | ||
| 888 | r.consume(1); | ||
| 889 | assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty)); | ||
| 890 | assert_eq!(w.try_write(&[44, 45, 46]), Ok(1)); | ||
| 891 | assert_eq!(w.try_write(&[45, 46]), Ok(2)); | ||
| 892 | let buf = r.try_fill_buf().unwrap(); | ||
| 893 | assert_eq!(buf, &[44]); // only one byte due to wraparound. | ||
| 894 | r.consume(1); | ||
| 895 | let buf = r.try_fill_buf().unwrap(); | ||
| 896 | assert_eq!(buf, &[45, 46]); | ||
| 897 | assert!(w.try_write(&[47]).is_ok()); | ||
| 898 | let buf = r.try_fill_buf().unwrap(); | ||
| 899 | assert_eq!(buf, &[45, 46, 47]); | ||
| 900 | r.consume(3); | ||
| 901 | } | ||
| 902 | |||
| 622 | #[futures_test::test] | 903 | #[futures_test::test] |
| 623 | async fn receiver_receives_given_try_write_async() { | 904 | async fn receiver_receives_given_try_write_async() { |
| 624 | let executor = ThreadPool::new().unwrap(); | 905 | let executor = ThreadPool::new().unwrap(); |
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 24c6c5a7f..1af7d9221 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs | |||
| @@ -1,18 +1,18 @@ | |||
| 1 | //! A queue for sending values between asynchronous tasks. | 1 | //! A queue for sending values between asynchronous tasks. |
| 2 | //! | 2 | //! |
| 3 | //! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. | 3 | //! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue. |
| 4 | //! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel. | 4 | //! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`] parameter of the channel. |
| 5 | 5 | ||
| 6 | use core::cell::RefCell; | 6 | use core::cell::RefCell; |
| 7 | use core::future::Future; | 7 | use core::future::Future; |
| 8 | use core::pin::Pin; | 8 | use core::pin::Pin; |
| 9 | use core::task::{Context, Poll}; | 9 | use core::task::{Context, Poll}; |
| 10 | 10 | ||
| 11 | pub use heapless::binary_heap::{Kind, Max, Min}; | ||
| 12 | use heapless::BinaryHeap; | 11 | use heapless::BinaryHeap; |
| 12 | pub use heapless::binary_heap::{Kind, Max, Min}; | ||
| 13 | 13 | ||
| 14 | use crate::blocking_mutex::raw::RawMutex; | ||
| 15 | use crate::blocking_mutex::Mutex; | 14 | use crate::blocking_mutex::Mutex; |
| 15 | use crate::blocking_mutex::raw::RawMutex; | ||
| 16 | use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError}; | 16 | use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError}; |
| 17 | use crate::waitqueue::WakerRegistration; | 17 | use crate::waitqueue::WakerRegistration; |
| 18 | 18 | ||
| @@ -71,6 +71,48 @@ where | |||
| 71 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 71 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 72 | self.channel.poll_ready_to_send(cx) | 72 | self.channel.poll_ready_to_send(cx) |
| 73 | } | 73 | } |
| 74 | |||
| 75 | /// Returns the maximum number of elements the channel can hold. | ||
| 76 | /// | ||
| 77 | /// See [`PriorityChannel::capacity()`] | ||
| 78 | pub const fn capacity(&self) -> usize { | ||
| 79 | self.channel.capacity() | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Returns the free capacity of the channel. | ||
| 83 | /// | ||
| 84 | /// See [`PriorityChannel::free_capacity()`] | ||
| 85 | pub fn free_capacity(&self) -> usize { | ||
| 86 | self.channel.free_capacity() | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Clears all elements in the channel. | ||
| 90 | /// | ||
| 91 | /// See [`PriorityChannel::clear()`] | ||
| 92 | pub fn clear(&self) { | ||
| 93 | self.channel.clear(); | ||
| 94 | } | ||
| 95 | |||
| 96 | /// Returns the number of elements currently in the channel. | ||
| 97 | /// | ||
| 98 | /// See [`PriorityChannel::len()`] | ||
| 99 | pub fn len(&self) -> usize { | ||
| 100 | self.channel.len() | ||
| 101 | } | ||
| 102 | |||
| 103 | /// Returns whether the channel is empty. | ||
| 104 | /// | ||
| 105 | /// See [`PriorityChannel::is_empty()`] | ||
| 106 | pub fn is_empty(&self) -> bool { | ||
| 107 | self.channel.is_empty() | ||
| 108 | } | ||
| 109 | |||
| 110 | /// Returns whether the channel is full. | ||
| 111 | /// | ||
| 112 | /// See [`PriorityChannel::is_full()`] | ||
| 113 | pub fn is_full(&self) -> bool { | ||
| 114 | self.channel.is_full() | ||
| 115 | } | ||
| 74 | } | 116 | } |
| 75 | 117 | ||
| 76 | impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> | 118 | impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T> |
| @@ -133,6 +175,16 @@ where | |||
| 133 | self.channel.try_receive() | 175 | self.channel.try_receive() |
| 134 | } | 176 | } |
| 135 | 177 | ||
| 178 | /// Peek at the next value without removing it from the queue. | ||
| 179 | /// | ||
| 180 | /// See [`PriorityChannel::try_peek()`] | ||
| 181 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 182 | where | ||
| 183 | T: Clone, | ||
| 184 | { | ||
| 185 | self.channel.try_peek_with_context(None) | ||
| 186 | } | ||
| 187 | |||
| 136 | /// Allows a poll_fn to poll until the channel is ready to receive | 188 | /// Allows a poll_fn to poll until the channel is ready to receive |
| 137 | /// | 189 | /// |
| 138 | /// See [`PriorityChannel::poll_ready_to_receive()`] | 190 | /// See [`PriorityChannel::poll_ready_to_receive()`] |
| @@ -146,6 +198,59 @@ where | |||
| 146 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 198 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 147 | self.channel.poll_receive(cx) | 199 | self.channel.poll_receive(cx) |
| 148 | } | 200 | } |
| 201 | |||
| 202 | /// Removes the elements from the channel that satisfy the predicate. | ||
| 203 | /// | ||
| 204 | /// See [`PriorityChannel::remove_if()`] | ||
| 205 | pub fn remove_if<F>(&self, predicate: F) | ||
| 206 | where | ||
| 207 | F: Fn(&T) -> bool, | ||
| 208 | T: Clone, | ||
| 209 | { | ||
| 210 | self.channel.remove_if(predicate) | ||
| 211 | } | ||
| 212 | |||
| 213 | /// Returns the maximum number of elements the channel can hold. | ||
| 214 | /// | ||
| 215 | /// See [`PriorityChannel::capacity()`] | ||
| 216 | pub const fn capacity(&self) -> usize { | ||
| 217 | self.channel.capacity() | ||
| 218 | } | ||
| 219 | |||
| 220 | /// Returns the free capacity of the channel. | ||
| 221 | /// | ||
| 222 | /// See [`PriorityChannel::free_capacity()`] | ||
| 223 | pub fn free_capacity(&self) -> usize { | ||
| 224 | self.channel.free_capacity() | ||
| 225 | } | ||
| 226 | |||
| 227 | /// Clears all elements in the channel. | ||
| 228 | /// | ||
| 229 | /// See [`PriorityChannel::clear()`] | ||
| 230 | pub fn clear(&self) { | ||
| 231 | self.channel.clear(); | ||
| 232 | } | ||
| 233 | |||
| 234 | /// Returns the number of elements currently in the channel. | ||
| 235 | /// | ||
| 236 | /// See [`PriorityChannel::len()`] | ||
| 237 | pub fn len(&self) -> usize { | ||
| 238 | self.channel.len() | ||
| 239 | } | ||
| 240 | |||
| 241 | /// Returns whether the channel is empty. | ||
| 242 | /// | ||
| 243 | /// See [`PriorityChannel::is_empty()`] | ||
| 244 | pub fn is_empty(&self) -> bool { | ||
| 245 | self.channel.is_empty() | ||
| 246 | } | ||
| 247 | |||
| 248 | /// Returns whether the channel is full. | ||
| 249 | /// | ||
| 250 | /// See [`PriorityChannel::is_full()`] | ||
| 251 | pub fn is_full(&self) -> bool { | ||
| 252 | self.channel.is_full() | ||
| 253 | } | ||
| 149 | } | 254 | } |
| 150 | 255 | ||
| 151 | impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> | 256 | impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T> |
| @@ -248,6 +353,31 @@ where | |||
| 248 | self.try_receive_with_context(None) | 353 | self.try_receive_with_context(None) |
| 249 | } | 354 | } |
| 250 | 355 | ||
| 356 | fn try_peek(&mut self) -> Result<T, TryReceiveError> | ||
| 357 | where | ||
| 358 | T: Clone, | ||
| 359 | { | ||
| 360 | self.try_peek_with_context(None) | ||
| 361 | } | ||
| 362 | |||
| 363 | fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 364 | where | ||
| 365 | T: Clone, | ||
| 366 | { | ||
| 367 | if self.queue.len() == self.queue.capacity() { | ||
| 368 | self.senders_waker.wake(); | ||
| 369 | } | ||
| 370 | |||
| 371 | if let Some(message) = self.queue.peek() { | ||
| 372 | Ok(message.clone()) | ||
| 373 | } else { | ||
| 374 | if let Some(cx) = cx { | ||
| 375 | self.receiver_waker.register(cx.waker()); | ||
| 376 | } | ||
| 377 | Err(TryReceiveError::Empty) | ||
| 378 | } | ||
| 379 | } | ||
| 380 | |||
| 251 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { | 381 | fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> { |
| 252 | if self.queue.len() == self.queue.capacity() { | 382 | if self.queue.len() == self.queue.capacity() { |
| 253 | self.senders_waker.wake(); | 383 | self.senders_waker.wake(); |
| @@ -316,6 +446,9 @@ where | |||
| 316 | } | 446 | } |
| 317 | 447 | ||
| 318 | fn clear(&mut self) { | 448 | fn clear(&mut self) { |
| 449 | if self.queue.len() == self.queue.capacity() { | ||
| 450 | self.senders_waker.wake(); | ||
| 451 | } | ||
| 319 | self.queue.clear(); | 452 | self.queue.clear(); |
| 320 | } | 453 | } |
| 321 | 454 | ||
| @@ -340,7 +473,7 @@ where | |||
| 340 | /// received from the channel. | 473 | /// received from the channel. |
| 341 | /// | 474 | /// |
| 342 | /// Sent data may be reordered based on their priority within the channel. | 475 | /// Sent data may be reordered based on their priority within the channel. |
| 343 | /// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] | 476 | /// For example, in a [`Max`] [`PriorityChannel`] |
| 344 | /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. | 477 | /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. |
| 345 | pub struct PriorityChannel<M, T, K, const N: usize> | 478 | pub struct PriorityChannel<M, T, K, const N: usize> |
| 346 | where | 479 | where |
| @@ -380,6 +513,13 @@ where | |||
| 380 | self.lock(|c| c.try_receive_with_context(cx)) | 513 | self.lock(|c| c.try_receive_with_context(cx)) |
| 381 | } | 514 | } |
| 382 | 515 | ||
| 516 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 517 | where | ||
| 518 | T: Clone, | ||
| 519 | { | ||
| 520 | self.lock(|c| c.try_peek_with_context(cx)) | ||
| 521 | } | ||
| 522 | |||
| 383 | /// Poll the channel for the next message | 523 | /// Poll the channel for the next message |
| 384 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | 524 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 385 | self.lock(|c| c.poll_receive(cx)) | 525 | self.lock(|c| c.poll_receive(cx)) |
| @@ -450,6 +590,37 @@ where | |||
| 450 | self.lock(|c| c.try_receive()) | 590 | self.lock(|c| c.try_receive()) |
| 451 | } | 591 | } |
| 452 | 592 | ||
| 593 | /// Peek at the next value without removing it from the queue. | ||
| 594 | /// | ||
| 595 | /// This method will either receive a copy of the message from the channel immediately or return | ||
| 596 | /// an error if the channel is empty. | ||
| 597 | pub fn try_peek(&self) -> Result<T, TryReceiveError> | ||
| 598 | where | ||
| 599 | T: Clone, | ||
| 600 | { | ||
| 601 | self.lock(|c| c.try_peek()) | ||
| 602 | } | ||
| 603 | |||
| 604 | /// Removes elements from the channel based on the given predicate. | ||
| 605 | pub fn remove_if<F>(&self, predicate: F) | ||
| 606 | where | ||
| 607 | F: Fn(&T) -> bool, | ||
| 608 | T: Clone, | ||
| 609 | { | ||
| 610 | self.lock(|c| { | ||
| 611 | let mut new_heap = BinaryHeap::<T, K, N>::new(); | ||
| 612 | for item in c.queue.iter() { | ||
| 613 | if !predicate(item) { | ||
| 614 | match new_heap.push(item.clone()) { | ||
| 615 | Ok(_) => (), | ||
| 616 | Err(_) => panic!("Error pushing item to heap"), | ||
| 617 | } | ||
| 618 | } | ||
| 619 | } | ||
| 620 | c.queue = new_heap; | ||
| 621 | }); | ||
| 622 | } | ||
| 623 | |||
| 453 | /// Returns the maximum number of elements the channel can hold. | 624 | /// Returns the maximum number of elements the channel can hold. |
| 454 | pub const fn capacity(&self) -> usize { | 625 | pub const fn capacity(&self) -> usize { |
| 455 | N | 626 | N |
| @@ -499,6 +670,13 @@ where | |||
| 499 | PriorityChannel::try_receive_with_context(self, cx) | 670 | PriorityChannel::try_receive_with_context(self, cx) |
| 500 | } | 671 | } |
| 501 | 672 | ||
| 673 | fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> | ||
| 674 | where | ||
| 675 | T: Clone, | ||
| 676 | { | ||
| 677 | PriorityChannel::try_peek_with_context(self, cx) | ||
| 678 | } | ||
| 679 | |||
| 502 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | 680 | fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { |
| 503 | PriorityChannel::poll_ready_to_send(self, cx) | 681 | PriorityChannel::poll_ready_to_send(self, cx) |
| 504 | } | 682 | } |
| @@ -587,6 +765,8 @@ mod tests { | |||
| 587 | fn simple_send_and_receive() { | 765 | fn simple_send_and_receive() { |
| 588 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); | 766 | let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new(); |
| 589 | assert!(c.try_send(1).is_ok()); | 767 | assert!(c.try_send(1).is_ok()); |
| 768 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 769 | assert_eq!(c.try_peek().unwrap(), 1); | ||
| 590 | assert_eq!(c.try_receive().unwrap(), 1); | 770 | assert_eq!(c.try_receive().unwrap(), 1); |
| 591 | } | 771 | } |
| 592 | 772 | ||
| @@ -607,6 +787,8 @@ mod tests { | |||
| 607 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | 787 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
| 608 | 788 | ||
| 609 | assert!(s.try_send(1).is_ok()); | 789 | assert!(s.try_send(1).is_ok()); |
| 790 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 791 | assert_eq!(r.try_peek().unwrap(), 1); | ||
| 610 | assert_eq!(r.try_receive().unwrap(), 1); | 792 | assert_eq!(r.try_receive().unwrap(), 1); |
| 611 | } | 793 | } |
| 612 | 794 | ||
| @@ -617,11 +799,13 @@ mod tests { | |||
| 617 | static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new(); | 799 | static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new(); |
| 618 | let c = &*CHANNEL.init(PriorityChannel::new()); | 800 | let c = &*CHANNEL.init(PriorityChannel::new()); |
| 619 | let c2 = c; | 801 | let c2 = c; |
| 620 | assert!(executor | 802 | assert!( |
| 621 | .spawn(async move { | 803 | executor |
| 622 | assert!(c2.try_send(1).is_ok()); | 804 | .spawn(async move { |
| 623 | }) | 805 | assert!(c2.try_send(1).is_ok()); |
| 624 | .is_ok()); | 806 | }) |
| 807 | .is_ok() | ||
| 808 | ); | ||
| 625 | assert_eq!(c.receive().await, 1); | 809 | assert_eq!(c.receive().await, 1); |
| 626 | } | 810 | } |
| 627 | 811 | ||
| @@ -648,13 +832,15 @@ mod tests { | |||
| 648 | // However, I've used the debugger to observe that the send does indeed wait. | 832 | // However, I've used the debugger to observe that the send does indeed wait. |
| 649 | Delay::new(Duration::from_millis(500)).await; | 833 | Delay::new(Duration::from_millis(500)).await; |
| 650 | assert_eq!(c.receive().await, 1); | 834 | assert_eq!(c.receive().await, 1); |
| 651 | assert!(executor | 835 | assert!( |
| 652 | .spawn(async move { | 836 | executor |
| 653 | loop { | 837 | .spawn(async move { |
| 654 | c.receive().await; | 838 | loop { |
| 655 | } | 839 | c.receive().await; |
| 656 | }) | 840 | } |
| 657 | .is_ok()); | 841 | }) |
| 842 | .is_ok() | ||
| 843 | ); | ||
| 658 | send_task_1.unwrap().await; | 844 | send_task_1.unwrap().await; |
| 659 | send_task_2.unwrap().await; | 845 | send_task_2.unwrap().await; |
| 660 | } | 846 | } |
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index a97eb7d5b..127a208f1 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -10,8 +10,8 @@ use heapless::Deque; | |||
| 10 | 10 | ||
| 11 | use self::publisher::{ImmediatePub, Pub}; | 11 | use self::publisher::{ImmediatePub, Pub}; |
| 12 | use self::subscriber::Sub; | 12 | use self::subscriber::Sub; |
| 13 | use crate::blocking_mutex::raw::RawMutex; | ||
| 14 | use crate::blocking_mutex::Mutex; | 13 | use crate::blocking_mutex::Mutex; |
| 14 | use crate::blocking_mutex::raw::RawMutex; | ||
| 15 | use crate::waitqueue::MultiWakerRegistration; | 15 | use crate::waitqueue::MultiWakerRegistration; |
| 16 | 16 | ||
| 17 | pub mod publisher; | 17 | pub mod publisher; |
| @@ -27,8 +27,8 @@ pub use subscriber::{DynSubscriber, Subscriber}; | |||
| 27 | /// | 27 | /// |
| 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. | 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. |
| 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message | 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message |
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive | 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive |
| 31 | /// an error to indicate that it has lagged. | 31 | /// an error to indicate that it has lagged. |
| 32 | /// | 32 | /// |
| 33 | /// ## Example | 33 | /// ## Example |
| 34 | /// | 34 | /// |
| @@ -71,6 +71,7 @@ pub use subscriber::{DynSubscriber, Subscriber}; | |||
| 71 | /// # block_on(test); | 71 | /// # block_on(test); |
| 72 | /// ``` | 72 | /// ``` |
| 73 | /// | 73 | /// |
| 74 | #[derive(Debug)] | ||
| 74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | 75 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
| 75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, | 76 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, |
| 76 | } | 77 | } |
| @@ -88,7 +89,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 88 | /// Create a new subscriber. It will only receive messages that are published after its creation. | 89 | /// Create a new subscriber. It will only receive messages that are published after its creation. |
| 89 | /// | 90 | /// |
| 90 | /// If there are no subscriber slots left, an error will be returned. | 91 | /// If there are no subscriber slots left, an error will be returned. |
| 91 | pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { | 92 | pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error> { |
| 92 | self.inner.lock(|inner| { | 93 | self.inner.lock(|inner| { |
| 93 | let mut s = inner.borrow_mut(); | 94 | let mut s = inner.borrow_mut(); |
| 94 | 95 | ||
| @@ -120,7 +121,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 120 | /// Create a new publisher | 121 | /// Create a new publisher |
| 121 | /// | 122 | /// |
| 122 | /// If there are no publisher slots left, an error will be returned. | 123 | /// If there are no publisher slots left, an error will be returned. |
| 123 | pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { | 124 | pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error> { |
| 124 | self.inner.lock(|inner| { | 125 | self.inner.lock(|inner| { |
| 125 | let mut s = inner.borrow_mut(); | 126 | let mut s = inner.borrow_mut(); |
| 126 | 127 | ||
| @@ -151,13 +152,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 151 | 152 | ||
| 152 | /// Create a new publisher that can only send immediate messages. | 153 | /// Create a new publisher that can only send immediate messages. |
| 153 | /// This kind of publisher does not take up a publisher slot. | 154 | /// This kind of publisher does not take up a publisher slot. |
| 154 | pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { | 155 | pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> { |
| 155 | ImmediatePublisher(ImmediatePub::new(self)) | 156 | ImmediatePublisher(ImmediatePub::new(self)) |
| 156 | } | 157 | } |
| 157 | 158 | ||
| 158 | /// Create a new publisher that can only send immediate messages. | 159 | /// Create a new publisher that can only send immediate messages. |
| 159 | /// This kind of publisher does not take up a publisher slot. | 160 | /// This kind of publisher does not take up a publisher slot. |
| 160 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { | 161 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> { |
| 161 | DynImmediatePublisher(ImmediatePub::new(self)) | 162 | DynImmediatePublisher(ImmediatePub::new(self)) |
| 162 | } | 163 | } |
| 163 | 164 | ||
| @@ -194,6 +195,25 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 194 | } | 195 | } |
| 195 | } | 196 | } |
| 196 | 197 | ||
| 198 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T> | ||
| 199 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 200 | { | ||
| 201 | fn publish_immediate(&self, message: T) { | ||
| 202 | self.inner.lock(|s| { | ||
| 203 | let mut s = s.borrow_mut(); | ||
| 204 | s.publish_immediate(message) | ||
| 205 | }) | ||
| 206 | } | ||
| 207 | |||
| 208 | fn capacity(&self) -> usize { | ||
| 209 | self.capacity() | ||
| 210 | } | ||
| 211 | |||
| 212 | fn is_full(&self) -> bool { | ||
| 213 | self.is_full() | ||
| 214 | } | ||
| 215 | } | ||
| 216 | |||
| 197 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> | 217 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T> |
| 198 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | 218 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
| 199 | { | 219 | { |
| @@ -246,13 +266,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 246 | }) | 266 | }) |
| 247 | } | 267 | } |
| 248 | 268 | ||
| 249 | fn publish_immediate(&self, message: T) { | ||
| 250 | self.inner.lock(|s| { | ||
| 251 | let mut s = s.borrow_mut(); | ||
| 252 | s.publish_immediate(message) | ||
| 253 | }) | ||
| 254 | } | ||
| 255 | |||
| 256 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 269 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 257 | self.inner.lock(|s| { | 270 | self.inner.lock(|s| { |
| 258 | let mut s = s.borrow_mut(); | 271 | let mut s = s.borrow_mut(); |
| @@ -267,10 +280,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 267 | }) | 280 | }) |
| 268 | } | 281 | } |
| 269 | 282 | ||
| 270 | fn capacity(&self) -> usize { | ||
| 271 | self.capacity() | ||
| 272 | } | ||
| 273 | |||
| 274 | fn free_capacity(&self) -> usize { | 283 | fn free_capacity(&self) -> usize { |
| 275 | self.free_capacity() | 284 | self.free_capacity() |
| 276 | } | 285 | } |
| @@ -286,13 +295,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 286 | fn is_empty(&self) -> bool { | 295 | fn is_empty(&self) -> bool { |
| 287 | self.is_empty() | 296 | self.is_empty() |
| 288 | } | 297 | } |
| 289 | |||
| 290 | fn is_full(&self) -> bool { | ||
| 291 | self.is_full() | ||
| 292 | } | ||
| 293 | } | 298 | } |
| 294 | 299 | ||
| 295 | /// Internal state for the PubSub channel | 300 | /// Internal state for the PubSub channel |
| 301 | #[derive(Debug)] | ||
| 296 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | 302 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { |
| 297 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it | 303 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it |
| 298 | queue: Deque<(T, usize), CAP>, | 304 | queue: Deque<(T, usize), CAP>, |
| @@ -417,6 +423,9 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 417 | } | 423 | } |
| 418 | 424 | ||
| 419 | fn clear(&mut self) { | 425 | fn clear(&mut self) { |
| 426 | if self.is_full() { | ||
| 427 | self.publisher_wakers.wake(); | ||
| 428 | } | ||
| 420 | self.queue.clear(); | 429 | self.queue.clear(); |
| 421 | } | 430 | } |
| 422 | 431 | ||
| @@ -445,8 +454,6 @@ pub enum Error { | |||
| 445 | MaximumPublishersReached, | 454 | MaximumPublishersReached, |
| 446 | } | 455 | } |
| 447 | 456 | ||
| 448 | /// 'Middle level' behaviour of the pubsub channel. | ||
| 449 | /// This trait is used so that Sub and Pub can be generic over the channel. | ||
| 450 | trait SealedPubSubBehavior<T> { | 457 | trait SealedPubSubBehavior<T> { |
| 451 | /// Try to get a message from the queue with the given message id. | 458 | /// Try to get a message from the queue with the given message id. |
| 452 | /// | 459 | /// |
| @@ -462,12 +469,6 @@ trait SealedPubSubBehavior<T> { | |||
| 462 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 469 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| 463 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; | 470 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; |
| 464 | 471 | ||
| 465 | /// Publish a message immediately | ||
| 466 | fn publish_immediate(&self, message: T); | ||
| 467 | |||
| 468 | /// Returns the maximum number of elements the channel can hold. | ||
| 469 | fn capacity(&self) -> usize; | ||
| 470 | |||
| 471 | /// Returns the free capacity of the channel. | 472 | /// Returns the free capacity of the channel. |
| 472 | /// | 473 | /// |
| 473 | /// This is equivalent to `capacity() - len()` | 474 | /// This is equivalent to `capacity() - len()` |
| @@ -482,9 +483,6 @@ trait SealedPubSubBehavior<T> { | |||
| 482 | /// Returns whether the channel is empty. | 483 | /// Returns whether the channel is empty. |
| 483 | fn is_empty(&self) -> bool; | 484 | fn is_empty(&self) -> bool; |
| 484 | 485 | ||
| 485 | /// Returns whether the channel is full. | ||
| 486 | fn is_full(&self) -> bool; | ||
| 487 | |||
| 488 | /// Let the channel know that a subscriber has dropped | 486 | /// Let the channel know that a subscriber has dropped |
| 489 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 487 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 490 | 488 | ||
| @@ -495,9 +493,16 @@ trait SealedPubSubBehavior<T> { | |||
| 495 | /// 'Middle level' behaviour of the pubsub channel. | 493 | /// 'Middle level' behaviour of the pubsub channel. |
| 496 | /// This trait is used so that Sub and Pub can be generic over the channel. | 494 | /// This trait is used so that Sub and Pub can be generic over the channel. |
| 497 | #[allow(private_bounds)] | 495 | #[allow(private_bounds)] |
| 498 | pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {} | 496 | pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> { |
| 497 | /// Publish a message immediately | ||
| 498 | fn publish_immediate(&self, message: T); | ||
| 499 | 499 | ||
| 500 | impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {} | 500 | /// Returns the maximum number of elements the channel can hold. |
| 501 | fn capacity(&self) -> usize; | ||
| 502 | |||
| 503 | /// Returns whether the channel is full. | ||
| 504 | fn is_full(&self) -> bool; | ||
| 505 | } | ||
| 501 | 506 | ||
| 502 | /// The result of the subscriber wait procedure | 507 | /// The result of the subscriber wait procedure |
| 503 | #[derive(Debug, Clone, PartialEq, Eq)] | 508 | #[derive(Debug, Clone, PartialEq, Eq)] |
| @@ -755,4 +760,30 @@ mod tests { | |||
| 755 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); | 760 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); |
| 756 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); | 761 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); |
| 757 | } | 762 | } |
| 763 | |||
| 764 | #[futures_test::test] | ||
| 765 | async fn publisher_sink() { | ||
| 766 | use futures_util::{SinkExt, StreamExt}; | ||
| 767 | |||
| 768 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 769 | |||
| 770 | let mut sub = channel.subscriber().unwrap(); | ||
| 771 | |||
| 772 | let publ = channel.publisher().unwrap(); | ||
| 773 | let mut sink = publ.sink(); | ||
| 774 | |||
| 775 | sink.send(0).await.unwrap(); | ||
| 776 | assert_eq!(0, sub.try_next_message_pure().unwrap()); | ||
| 777 | |||
| 778 | sink.send(1).await.unwrap(); | ||
| 779 | assert_eq!(1, sub.try_next_message_pure().unwrap()); | ||
| 780 | |||
| 781 | sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok)) | ||
| 782 | .await | ||
| 783 | .unwrap(); | ||
| 784 | assert_eq!(0, sub.try_next_message_pure().unwrap()); | ||
| 785 | assert_eq!(1, sub.try_next_message_pure().unwrap()); | ||
| 786 | assert_eq!(2, sub.try_next_message_pure().unwrap()); | ||
| 787 | assert_eq!(3, sub.try_next_message_pure().unwrap()); | ||
| 788 | } | ||
| 758 | } | 789 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e66b3b1db..2a67a0002 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel}; | |||
| 10 | use crate::blocking_mutex::raw::RawMutex; | 10 | use crate::blocking_mutex::raw::RawMutex; |
| 11 | 11 | ||
| 12 | /// A publisher to a channel | 12 | /// A publisher to a channel |
| 13 | #[derive(Debug)] | ||
| 13 | pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 14 | pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 14 | /// The channel we are a publisher for | 15 | /// The channel we are a publisher for |
| 15 | channel: &'a PSB, | 16 | channel: &'a PSB, |
| @@ -74,6 +75,12 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 74 | pub fn is_full(&self) -> bool { | 75 | pub fn is_full(&self) -> bool { |
| 75 | self.channel.is_full() | 76 | self.channel.is_full() |
| 76 | } | 77 | } |
| 78 | |||
| 79 | /// Create a [`futures_sink::Sink`] adapter for this publisher. | ||
| 80 | #[inline] | ||
| 81 | pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> { | ||
| 82 | PubSink { publ: self, fut: None } | ||
| 83 | } | ||
| 77 | } | 84 | } |
| 78 | 85 | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 86 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -100,6 +107,7 @@ impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { | |||
| 100 | } | 107 | } |
| 101 | 108 | ||
| 102 | /// A publisher that holds a generic reference to the channel | 109 | /// A publisher that holds a generic reference to the channel |
| 110 | #[derive(Debug)] | ||
| 103 | pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | 111 | pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
| 104 | pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | 112 | pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
| 105 | ); | 113 | ); |
| @@ -124,6 +132,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 124 | 132 | ||
| 125 | /// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. | 133 | /// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. |
| 126 | /// (So an infinite amount is possible) | 134 | /// (So an infinite amount is possible) |
| 135 | #[derive(Debug)] | ||
| 127 | pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 136 | pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 128 | /// The channel we are a publisher for | 137 | /// The channel we are a publisher for |
| 129 | channel: &'a PSB, | 138 | channel: &'a PSB, |
| @@ -199,6 +208,7 @@ impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { | |||
| 199 | } | 208 | } |
| 200 | 209 | ||
| 201 | /// An immediate publisher that holds a generic reference to the channel | 210 | /// An immediate publisher that holds a generic reference to the channel |
| 211 | #[derive(Debug)] | ||
| 202 | pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | 212 | pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
| 203 | pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | 213 | pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
| 204 | ); | 214 | ); |
| @@ -221,8 +231,71 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 221 | } | 231 | } |
| 222 | } | 232 | } |
| 223 | 233 | ||
| 234 | #[must_use = "Sinks do nothing unless polled"] | ||
| 235 | /// [`futures_sink::Sink`] adapter for [`Pub`]. | ||
| 236 | #[derive(Debug)] | ||
| 237 | pub struct PubSink<'a, 'p, PSB, T> | ||
| 238 | where | ||
| 239 | T: Clone, | ||
| 240 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 241 | { | ||
| 242 | publ: &'p Pub<'a, PSB, T>, | ||
| 243 | fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>, | ||
| 244 | } | ||
| 245 | |||
| 246 | impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T> | ||
| 247 | where | ||
| 248 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 249 | T: Clone, | ||
| 250 | { | ||
| 251 | /// Try to make progress on the pending future if we have one. | ||
| 252 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | ||
| 253 | let Some(mut fut) = self.fut.take() else { | ||
| 254 | return Poll::Ready(()); | ||
| 255 | }; | ||
| 256 | |||
| 257 | if Pin::new(&mut fut).poll(cx).is_pending() { | ||
| 258 | self.fut = Some(fut); | ||
| 259 | return Poll::Pending; | ||
| 260 | } | ||
| 261 | |||
| 262 | Poll::Ready(()) | ||
| 263 | } | ||
| 264 | } | ||
| 265 | |||
| 266 | impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T> | ||
| 267 | where | ||
| 268 | PSB: PubSubBehavior<T> + ?Sized, | ||
| 269 | T: Clone, | ||
| 270 | { | ||
| 271 | type Error = core::convert::Infallible; | ||
| 272 | |||
| 273 | #[inline] | ||
| 274 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 275 | self.poll(cx).map(Ok) | ||
| 276 | } | ||
| 277 | |||
| 278 | #[inline] | ||
| 279 | fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | ||
| 280 | self.fut = Some(self.publ.publish(item)); | ||
| 281 | |||
| 282 | Ok(()) | ||
| 283 | } | ||
| 284 | |||
| 285 | #[inline] | ||
| 286 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 287 | self.poll(cx).map(Ok) | ||
| 288 | } | ||
| 289 | |||
| 290 | #[inline] | ||
| 291 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| 292 | self.poll(cx).map(Ok) | ||
| 293 | } | ||
| 294 | } | ||
| 295 | |||
| 224 | /// Future for the publisher wait action | 296 | /// Future for the publisher wait action |
| 225 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 297 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 298 | #[derive(Debug)] | ||
| 226 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 299 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 227 | /// The message we need to publish | 300 | /// The message we need to publish |
| 228 | message: Option<T>, | 301 | message: Option<T>, |
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index 6ad660cb3..356de23f6 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -10,6 +10,7 @@ use super::{PubSubBehavior, PubSubChannel, WaitResult}; | |||
| 10 | use crate::blocking_mutex::raw::RawMutex; | 10 | use crate::blocking_mutex::raw::RawMutex; |
| 11 | 11 | ||
| 12 | /// A subscriber to a channel | 12 | /// A subscriber to a channel |
| 13 | #[derive(Debug)] | ||
| 13 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 14 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 14 | /// The message id of the next message we are yet to receive | 15 | /// The message id of the next message we are yet to receive |
| 15 | next_message_id: u64, | 16 | next_message_id: u64, |
| @@ -115,7 +116,7 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} | |||
| 115 | 116 | ||
| 116 | /// Warning: The stream implementation ignores lag results and returns all messages. | 117 | /// Warning: The stream implementation ignores lag results and returns all messages. |
| 117 | /// This might miss some messages without you knowing it. | 118 | /// This might miss some messages without you knowing it. |
| 118 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { | 119 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_core::Stream for Sub<'a, PSB, T> { |
| 119 | type Item = T; | 120 | type Item = T; |
| 120 | 121 | ||
| 121 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | 122 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| @@ -151,6 +152,7 @@ impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { | |||
| 151 | } | 152 | } |
| 152 | 153 | ||
| 153 | /// A subscriber that holds a generic reference to the channel | 154 | /// A subscriber that holds a generic reference to the channel |
| 155 | #[derive(Debug)] | ||
| 154 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | 156 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( |
| 155 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | 157 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, |
| 156 | ); | 158 | ); |
| @@ -175,6 +177,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 175 | 177 | ||
| 176 | /// Future for the subscriber wait action | 178 | /// Future for the subscriber wait action |
| 177 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 179 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 180 | #[derive(Debug)] | ||
| 178 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 181 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 179 | subscriber: &'s mut Sub<'a, PSB, T>, | 182 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 180 | } | 183 | } |
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs index 81e60c42b..608447cd6 100644 --- a/embassy-sync/src/ring_buffer.rs +++ b/embassy-sync/src/ring_buffer.rs | |||
| @@ -1,5 +1,6 @@ | |||
| 1 | use core::ops::Range; | 1 | use core::ops::Range; |
| 2 | 2 | ||
| 3 | #[derive(Debug)] | ||
| 3 | pub struct RingBuffer<const N: usize> { | 4 | pub struct RingBuffer<const N: usize> { |
| 4 | start: usize, | 5 | start: usize, |
| 5 | end: usize, | 6 | end: usize, |
| @@ -94,11 +95,7 @@ impl<const N: usize> RingBuffer<N> { | |||
| 94 | 95 | ||
| 95 | fn wrap(&self, n: usize) -> usize { | 96 | fn wrap(&self, n: usize) -> usize { |
| 96 | assert!(n <= N); | 97 | assert!(n <= N); |
| 97 | if n == N { | 98 | if n == N { 0 } else { n } |
| 98 | 0 | ||
| 99 | } else { | ||
| 100 | n | ||
| 101 | } | ||
| 102 | } | 99 | } |
| 103 | } | 100 | } |
| 104 | 101 | ||
diff --git a/embassy-sync/src/rwlock.rs b/embassy-sync/src/rwlock.rs new file mode 100644 index 000000000..918a6aa41 --- /dev/null +++ b/embassy-sync/src/rwlock.rs | |||
| @@ -0,0 +1,386 @@ | |||
| 1 | //! Async read-write lock. | ||
| 2 | //! | ||
| 3 | //! This module provides a read-write lock that can be used to synchronize data between asynchronous tasks. | ||
| 4 | use core::cell::{RefCell, UnsafeCell}; | ||
| 5 | use core::fmt; | ||
| 6 | use core::future::{Future, poll_fn}; | ||
| 7 | use core::ops::{Deref, DerefMut}; | ||
| 8 | use core::task::Poll; | ||
| 9 | |||
| 10 | use crate::blocking_mutex::Mutex as BlockingMutex; | ||
| 11 | use crate::blocking_mutex::raw::RawMutex; | ||
| 12 | use crate::waitqueue::WakerRegistration; | ||
| 13 | |||
| 14 | /// Error returned by [`RwLock::try_read`] and [`RwLock::try_write`] when the lock is already held. | ||
| 15 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 16 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 17 | pub struct TryLockError; | ||
| 18 | |||
| 19 | #[derive(Debug)] | ||
| 20 | struct State { | ||
| 21 | readers: usize, | ||
| 22 | writer: bool, | ||
| 23 | waker: WakerRegistration, | ||
| 24 | } | ||
| 25 | |||
| 26 | /// Async read-write lock. | ||
| 27 | /// | ||
| 28 | /// The read-write lock is generic over the raw mutex implementation `M` and the data `T` it protects. | ||
| 29 | /// The raw read-write lock is used to guard access to the internal state. It | ||
| 30 | /// is held for very short periods only, while locking and unlocking. It is *not* held | ||
| 31 | /// for the entire time the async RwLock is locked. | ||
| 32 | /// | ||
| 33 | /// Which implementation you select depends on the context in which you're using the read-write lock. | ||
| 34 | /// | ||
| 35 | /// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts. | ||
| 36 | /// | ||
| 37 | /// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor. | ||
| 38 | /// | ||
| 39 | /// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton. | ||
| 40 | pub struct RwLock<M, T> | ||
| 41 | where | ||
| 42 | M: RawMutex, | ||
| 43 | T: ?Sized, | ||
| 44 | { | ||
| 45 | state: BlockingMutex<M, RefCell<State>>, | ||
| 46 | inner: UnsafeCell<T>, | ||
| 47 | } | ||
| 48 | |||
| 49 | unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for RwLock<M, T> {} | ||
| 50 | unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for RwLock<M, T> {} | ||
| 51 | |||
| 52 | /// Async read-write lock. | ||
| 53 | impl<M, T> RwLock<M, T> | ||
| 54 | where | ||
| 55 | M: RawMutex, | ||
| 56 | { | ||
| 57 | /// Create a new read-write lock with the given value. | ||
| 58 | pub const fn new(value: T) -> Self { | ||
| 59 | Self { | ||
| 60 | inner: UnsafeCell::new(value), | ||
| 61 | state: BlockingMutex::new(RefCell::new(State { | ||
| 62 | readers: 0, | ||
| 63 | writer: false, | ||
| 64 | waker: WakerRegistration::new(), | ||
| 65 | })), | ||
| 66 | } | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | impl<M, T> RwLock<M, T> | ||
| 71 | where | ||
| 72 | M: RawMutex, | ||
| 73 | T: ?Sized, | ||
| 74 | { | ||
| 75 | /// Lock the read-write lock for reading. | ||
| 76 | /// | ||
| 77 | /// This will wait for the lock to be available if it's already locked for writing. | ||
| 78 | pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, M, T>> { | ||
| 79 | poll_fn(|cx| { | ||
| 80 | let ready = self.state.lock(|s| { | ||
| 81 | let mut s = s.borrow_mut(); | ||
| 82 | if s.writer { | ||
| 83 | s.waker.register(cx.waker()); | ||
| 84 | false | ||
| 85 | } else { | ||
| 86 | s.readers += 1; | ||
| 87 | true | ||
| 88 | } | ||
| 89 | }); | ||
| 90 | |||
| 91 | if ready { | ||
| 92 | Poll::Ready(RwLockReadGuard { rwlock: self }) | ||
| 93 | } else { | ||
| 94 | Poll::Pending | ||
| 95 | } | ||
| 96 | }) | ||
| 97 | } | ||
| 98 | |||
| 99 | /// Lock the read-write lock for writing. | ||
| 100 | /// | ||
| 101 | /// This will wait for the lock to be available if it's already locked for reading or writing. | ||
| 102 | pub fn write(&self) -> impl Future<Output = RwLockWriteGuard<'_, M, T>> { | ||
| 103 | poll_fn(|cx| { | ||
| 104 | let ready = self.state.lock(|s| { | ||
| 105 | let mut s = s.borrow_mut(); | ||
| 106 | if s.writer || s.readers > 0 { | ||
| 107 | s.waker.register(cx.waker()); | ||
| 108 | false | ||
| 109 | } else { | ||
| 110 | s.writer = true; | ||
| 111 | true | ||
| 112 | } | ||
| 113 | }); | ||
| 114 | |||
| 115 | if ready { | ||
| 116 | Poll::Ready(RwLockWriteGuard { rwlock: self }) | ||
| 117 | } else { | ||
| 118 | Poll::Pending | ||
| 119 | } | ||
| 120 | }) | ||
| 121 | } | ||
| 122 | |||
| 123 | /// Attempt to immediately lock the rwlock. | ||
| 124 | /// | ||
| 125 | /// If the rwlock is already locked, this will return an error instead of waiting. | ||
| 126 | pub fn try_read(&self) -> Result<RwLockReadGuard<'_, M, T>, TryLockError> { | ||
| 127 | self.state | ||
| 128 | .lock(|s| { | ||
| 129 | let mut s = s.borrow_mut(); | ||
| 130 | if s.writer { | ||
| 131 | return Err(()); | ||
| 132 | } | ||
| 133 | s.readers += 1; | ||
| 134 | Ok(()) | ||
| 135 | }) | ||
| 136 | .map_err(|_| TryLockError)?; | ||
| 137 | |||
| 138 | Ok(RwLockReadGuard { rwlock: self }) | ||
| 139 | } | ||
| 140 | |||
| 141 | /// Attempt to immediately lock the rwlock. | ||
| 142 | /// | ||
| 143 | /// If the rwlock is already locked, this will return an error instead of waiting. | ||
| 144 | pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, M, T>, TryLockError> { | ||
| 145 | self.state | ||
| 146 | .lock(|s| { | ||
| 147 | let mut s = s.borrow_mut(); | ||
| 148 | if s.writer || s.readers > 0 { | ||
| 149 | return Err(()); | ||
| 150 | } | ||
| 151 | s.writer = true; | ||
| 152 | Ok(()) | ||
| 153 | }) | ||
| 154 | .map_err(|_| TryLockError)?; | ||
| 155 | |||
| 156 | Ok(RwLockWriteGuard { rwlock: self }) | ||
| 157 | } | ||
| 158 | |||
| 159 | /// Consumes this read-write lock, returning the underlying data. | ||
| 160 | pub fn into_inner(self) -> T | ||
| 161 | where | ||
| 162 | T: Sized, | ||
| 163 | { | ||
| 164 | self.inner.into_inner() | ||
| 165 | } | ||
| 166 | |||
| 167 | /// Returns a mutable reference to the underlying data. | ||
| 168 | /// | ||
| 169 | /// Since this call borrows the RwLock mutably, no actual locking needs to | ||
| 170 | /// take place -- the mutable borrow statically guarantees no locks exist. | ||
| 171 | pub fn get_mut(&mut self) -> &mut T { | ||
| 172 | self.inner.get_mut() | ||
| 173 | } | ||
| 174 | } | ||
| 175 | |||
| 176 | impl<M: RawMutex, T> From<T> for RwLock<M, T> { | ||
| 177 | fn from(from: T) -> Self { | ||
| 178 | Self::new(from) | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | impl<M, T> Default for RwLock<M, T> | ||
| 183 | where | ||
| 184 | M: RawMutex, | ||
| 185 | T: Default, | ||
| 186 | { | ||
| 187 | fn default() -> Self { | ||
| 188 | Self::new(Default::default()) | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 192 | impl<M, T> fmt::Debug for RwLock<M, T> | ||
| 193 | where | ||
| 194 | M: RawMutex, | ||
| 195 | T: ?Sized + fmt::Debug, | ||
| 196 | { | ||
| 197 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 198 | let mut d = f.debug_struct("RwLock"); | ||
| 199 | match self.try_read() { | ||
| 200 | Ok(guard) => d.field("inner", &&*guard), | ||
| 201 | Err(TryLockError) => d.field("inner", &"Locked"), | ||
| 202 | } | ||
| 203 | .finish_non_exhaustive() | ||
| 204 | } | ||
| 205 | } | ||
| 206 | |||
| 207 | /// Async read lock guard. | ||
| 208 | /// | ||
| 209 | /// Owning an instance of this type indicates having | ||
| 210 | /// successfully locked the read-write lock for reading, and grants access to the contents. | ||
| 211 | /// | ||
| 212 | /// Dropping it unlocks the read-write lock. | ||
| 213 | #[clippy::has_significant_drop] | ||
| 214 | #[must_use = "if unused the RwLock will immediately unlock"] | ||
| 215 | pub struct RwLockReadGuard<'a, R, T> | ||
| 216 | where | ||
| 217 | R: RawMutex, | ||
| 218 | T: ?Sized, | ||
| 219 | { | ||
| 220 | rwlock: &'a RwLock<R, T>, | ||
| 221 | } | ||
| 222 | |||
| 223 | impl<'a, M, T> Drop for RwLockReadGuard<'a, M, T> | ||
| 224 | where | ||
| 225 | M: RawMutex, | ||
| 226 | T: ?Sized, | ||
| 227 | { | ||
| 228 | fn drop(&mut self) { | ||
| 229 | self.rwlock.state.lock(|s| { | ||
| 230 | let mut s = unwrap!(s.try_borrow_mut()); | ||
| 231 | s.readers -= 1; | ||
| 232 | if s.readers == 0 { | ||
| 233 | s.waker.wake(); | ||
| 234 | } | ||
| 235 | }) | ||
| 236 | } | ||
| 237 | } | ||
| 238 | |||
| 239 | impl<'a, M, T> Deref for RwLockReadGuard<'a, M, T> | ||
| 240 | where | ||
| 241 | M: RawMutex, | ||
| 242 | T: ?Sized, | ||
| 243 | { | ||
| 244 | type Target = T; | ||
| 245 | fn deref(&self) -> &Self::Target { | ||
| 246 | // Safety: the RwLockReadGuard represents shared access to the contents | ||
| 247 | // of the read-write lock, so it's OK to get it. | ||
| 248 | unsafe { &*(self.rwlock.inner.get() as *const T) } | ||
| 249 | } | ||
| 250 | } | ||
| 251 | |||
| 252 | impl<'a, M, T> fmt::Debug for RwLockReadGuard<'a, M, T> | ||
| 253 | where | ||
| 254 | M: RawMutex, | ||
| 255 | T: ?Sized + fmt::Debug, | ||
| 256 | { | ||
| 257 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 258 | fmt::Debug::fmt(&**self, f) | ||
| 259 | } | ||
| 260 | } | ||
| 261 | |||
| 262 | impl<'a, M, T> fmt::Display for RwLockReadGuard<'a, M, T> | ||
| 263 | where | ||
| 264 | M: RawMutex, | ||
| 265 | T: ?Sized + fmt::Display, | ||
| 266 | { | ||
| 267 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 268 | fmt::Display::fmt(&**self, f) | ||
| 269 | } | ||
| 270 | } | ||
| 271 | |||
| 272 | /// Async write lock guard. | ||
| 273 | /// | ||
| 274 | /// Owning an instance of this type indicates having | ||
| 275 | /// successfully locked the read-write lock for writing, and grants access to the contents. | ||
| 276 | /// | ||
| 277 | /// Dropping it unlocks the read-write lock. | ||
| 278 | #[clippy::has_significant_drop] | ||
| 279 | #[must_use = "if unused the RwLock will immediately unlock"] | ||
| 280 | pub struct RwLockWriteGuard<'a, R, T> | ||
| 281 | where | ||
| 282 | R: RawMutex, | ||
| 283 | T: ?Sized, | ||
| 284 | { | ||
| 285 | rwlock: &'a RwLock<R, T>, | ||
| 286 | } | ||
| 287 | |||
| 288 | impl<'a, R, T> Drop for RwLockWriteGuard<'a, R, T> | ||
| 289 | where | ||
| 290 | R: RawMutex, | ||
| 291 | T: ?Sized, | ||
| 292 | { | ||
| 293 | fn drop(&mut self) { | ||
| 294 | self.rwlock.state.lock(|s| { | ||
| 295 | let mut s = unwrap!(s.try_borrow_mut()); | ||
| 296 | s.writer = false; | ||
| 297 | s.waker.wake(); | ||
| 298 | }) | ||
| 299 | } | ||
| 300 | } | ||
| 301 | |||
| 302 | impl<'a, R, T> Deref for RwLockWriteGuard<'a, R, T> | ||
| 303 | where | ||
| 304 | R: RawMutex, | ||
| 305 | T: ?Sized, | ||
| 306 | { | ||
| 307 | type Target = T; | ||
| 308 | fn deref(&self) -> &Self::Target { | ||
| 309 | // Safety: the RwLockWriteGuard represents exclusive access to the contents | ||
| 310 | // of the read-write lock, so it's OK to get it. | ||
| 311 | unsafe { &*(self.rwlock.inner.get() as *mut T) } | ||
| 312 | } | ||
| 313 | } | ||
| 314 | |||
| 315 | impl<'a, R, T> DerefMut for RwLockWriteGuard<'a, R, T> | ||
| 316 | where | ||
| 317 | R: RawMutex, | ||
| 318 | T: ?Sized, | ||
| 319 | { | ||
| 320 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 321 | // Safety: the RwLockWriteGuard represents exclusive access to the contents | ||
| 322 | // of the read-write lock, so it's OK to get it. | ||
| 323 | unsafe { &mut *(self.rwlock.inner.get()) } | ||
| 324 | } | ||
| 325 | } | ||
| 326 | |||
| 327 | impl<'a, R, T> fmt::Debug for RwLockWriteGuard<'a, R, T> | ||
| 328 | where | ||
| 329 | R: RawMutex, | ||
| 330 | T: ?Sized + fmt::Debug, | ||
| 331 | { | ||
| 332 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 333 | fmt::Debug::fmt(&**self, f) | ||
| 334 | } | ||
| 335 | } | ||
| 336 | |||
| 337 | impl<'a, R, T> fmt::Display for RwLockWriteGuard<'a, R, T> | ||
| 338 | where | ||
| 339 | R: RawMutex, | ||
| 340 | T: ?Sized + fmt::Display, | ||
| 341 | { | ||
| 342 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 343 | fmt::Display::fmt(&**self, f) | ||
| 344 | } | ||
| 345 | } | ||
| 346 | |||
| 347 | #[cfg(test)] | ||
| 348 | mod tests { | ||
| 349 | use crate::blocking_mutex::raw::NoopRawMutex; | ||
| 350 | use crate::rwlock::RwLock; | ||
| 351 | |||
| 352 | #[futures_test::test] | ||
| 353 | async fn read_guard_releases_lock_when_dropped() { | ||
| 354 | let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]); | ||
| 355 | |||
| 356 | { | ||
| 357 | let guard = rwlock.read().await; | ||
| 358 | assert_eq!(*guard, [0, 1]); | ||
| 359 | } | ||
| 360 | |||
| 361 | { | ||
| 362 | let guard = rwlock.read().await; | ||
| 363 | assert_eq!(*guard, [0, 1]); | ||
| 364 | } | ||
| 365 | |||
| 366 | assert_eq!(*rwlock.read().await, [0, 1]); | ||
| 367 | } | ||
| 368 | |||
| 369 | #[futures_test::test] | ||
| 370 | async fn write_guard_releases_lock_when_dropped() { | ||
| 371 | let rwlock: RwLock<NoopRawMutex, [i32; 2]> = RwLock::new([0, 1]); | ||
| 372 | |||
| 373 | { | ||
| 374 | let mut guard = rwlock.write().await; | ||
| 375 | assert_eq!(*guard, [0, 1]); | ||
| 376 | guard[1] = 2; | ||
| 377 | } | ||
| 378 | |||
| 379 | { | ||
| 380 | let guard = rwlock.read().await; | ||
| 381 | assert_eq!(*guard, [0, 2]); | ||
| 382 | } | ||
| 383 | |||
| 384 | assert_eq!(*rwlock.read().await, [0, 2]); | ||
| 385 | } | ||
| 386 | } | ||
diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs index d30eee30b..8d2413931 100644 --- a/embassy-sync/src/semaphore.rs +++ b/embassy-sync/src/semaphore.rs | |||
| @@ -1,13 +1,13 @@ | |||
| 1 | //! A synchronization primitive for controlling access to a pool of resources. | 1 | //! A synchronization primitive for controlling access to a pool of resources. |
| 2 | use core::cell::{Cell, RefCell}; | 2 | use core::cell::{Cell, RefCell}; |
| 3 | use core::convert::Infallible; | 3 | use core::convert::Infallible; |
| 4 | use core::future::{poll_fn, Future}; | 4 | use core::future::{Future, poll_fn}; |
| 5 | use core::task::{Poll, Waker}; | 5 | use core::task::{Poll, Waker}; |
| 6 | 6 | ||
| 7 | use heapless::Deque; | 7 | use heapless::Deque; |
| 8 | 8 | ||
| 9 | use crate::blocking_mutex::raw::RawMutex; | ||
| 10 | use crate::blocking_mutex::Mutex; | 9 | use crate::blocking_mutex::Mutex; |
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | use crate::waitqueue::WakerRegistration; | 11 | use crate::waitqueue::WakerRegistration; |
| 12 | 12 | ||
| 13 | /// An asynchronous semaphore. | 13 | /// An asynchronous semaphore. |
| @@ -46,6 +46,7 @@ pub trait Semaphore: Sized { | |||
| 46 | /// A representation of a number of acquired permits. | 46 | /// A representation of a number of acquired permits. |
| 47 | /// | 47 | /// |
| 48 | /// The acquired permits will be released back to the [`Semaphore`] when this is dropped. | 48 | /// The acquired permits will be released back to the [`Semaphore`] when this is dropped. |
| 49 | #[derive(Debug)] | ||
| 49 | pub struct SemaphoreReleaser<'a, S: Semaphore> { | 50 | pub struct SemaphoreReleaser<'a, S: Semaphore> { |
| 50 | semaphore: &'a S, | 51 | semaphore: &'a S, |
| 51 | permits: usize, | 52 | permits: usize, |
| @@ -181,6 +182,7 @@ impl<M: RawMutex> Semaphore for GreedySemaphore<M> { | |||
| 181 | } | 182 | } |
| 182 | } | 183 | } |
| 183 | 184 | ||
| 185 | #[derive(Debug)] | ||
| 184 | struct SemaphoreState { | 186 | struct SemaphoreState { |
| 185 | permits: usize, | 187 | permits: usize, |
| 186 | waker: WakerRegistration, | 188 | waker: WakerRegistration, |
| @@ -221,6 +223,7 @@ impl SemaphoreState { | |||
| 221 | /// | 223 | /// |
| 222 | /// Up to `N` tasks may attempt to acquire permits concurrently. If additional | 224 | /// Up to `N` tasks may attempt to acquire permits concurrently. If additional |
| 223 | /// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned. | 225 | /// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned. |
| 226 | #[derive(Debug)] | ||
| 224 | pub struct FairSemaphore<M, const N: usize> | 227 | pub struct FairSemaphore<M, const N: usize> |
| 225 | where | 228 | where |
| 226 | M: RawMutex, | 229 | M: RawMutex, |
| @@ -341,6 +344,7 @@ impl<M: RawMutex, const N: usize> Semaphore for FairSemaphore<M, N> { | |||
| 341 | } | 344 | } |
| 342 | } | 345 | } |
| 343 | 346 | ||
| 347 | #[derive(Debug)] | ||
| 344 | struct FairAcquire<'a, M: RawMutex, const N: usize> { | 348 | struct FairAcquire<'a, M: RawMutex, const N: usize> { |
| 345 | sema: &'a FairSemaphore<M, N>, | 349 | sema: &'a FairSemaphore<M, N>, |
| 346 | permits: usize, | 350 | permits: usize, |
| @@ -364,6 +368,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M | |||
| 364 | } | 368 | } |
| 365 | } | 369 | } |
| 366 | 370 | ||
| 371 | #[derive(Debug)] | ||
| 367 | struct FairAcquireAll<'a, M: RawMutex, const N: usize> { | 372 | struct FairAcquireAll<'a, M: RawMutex, const N: usize> { |
| 368 | sema: &'a FairSemaphore<M, N>, | 373 | sema: &'a FairSemaphore<M, N>, |
| 369 | min: usize, | 374 | min: usize, |
| @@ -387,6 +392,7 @@ impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a | |||
| 387 | } | 392 | } |
| 388 | } | 393 | } |
| 389 | 394 | ||
| 395 | #[derive(Debug)] | ||
| 390 | struct FairSemaphoreState<const N: usize> { | 396 | struct FairSemaphoreState<const N: usize> { |
| 391 | permits: usize, | 397 | permits: usize, |
| 392 | next_ticket: usize, | 398 | next_ticket: usize, |
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index a0f4b5a74..cc02228cf 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs | |||
| @@ -1,12 +1,12 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to a task. | 1 | //! A synchronization primitive for passing the latest value to a task. |
| 2 | use core::cell::Cell; | 2 | use core::cell::Cell; |
| 3 | use core::future::{poll_fn, Future}; | 3 | use core::future::{Future, poll_fn}; |
| 4 | use core::task::{Context, Poll, Waker}; | 4 | use core::task::{Context, Poll, Waker}; |
| 5 | 5 | ||
| 6 | use crate::blocking_mutex::raw::RawMutex; | ||
| 7 | use crate::blocking_mutex::Mutex; | 6 | use crate::blocking_mutex::Mutex; |
| 7 | use crate::blocking_mutex::raw::RawMutex; | ||
| 8 | 8 | ||
| 9 | /// Single-slot signaling primitive. | 9 | /// Single-slot signaling primitive for a _single_ consumer. |
| 10 | /// | 10 | /// |
| 11 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except | 11 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except |
| 12 | /// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead | 12 | /// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead |
| @@ -17,6 +17,7 @@ use crate::blocking_mutex::Mutex; | |||
| 17 | /// updates. | 17 | /// updates. |
| 18 | /// | 18 | /// |
| 19 | /// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead. | 19 | /// For more advanced use cases, you might want to use [`Channel`](crate::channel::Channel) instead. |
| 20 | /// For multiple consumers, use [`Watch`](crate::watch::Watch) instead. | ||
| 20 | /// | 21 | /// |
| 21 | /// Signals are generally declared as `static`s and then borrowed as required. | 22 | /// Signals are generally declared as `static`s and then borrowed as required. |
| 22 | /// | 23 | /// |
| @@ -38,6 +39,7 @@ where | |||
| 38 | state: Mutex<M, Cell<State<T>>>, | 39 | state: Mutex<M, Cell<State<T>>>, |
| 39 | } | 40 | } |
| 40 | 41 | ||
| 42 | #[derive(Debug)] | ||
| 41 | enum State<T> { | 43 | enum State<T> { |
| 42 | None, | 44 | None, |
| 43 | Waiting(Waker), | 45 | Waiting(Waker), |
| @@ -81,7 +83,7 @@ where | |||
| 81 | 83 | ||
| 82 | /// Remove the queued value in this `Signal`, if any. | 84 | /// Remove the queued value in this `Signal`, if any. |
| 83 | pub fn reset(&self) { | 85 | pub fn reset(&self) { |
| 84 | self.state.lock(|cell| cell.set(State::None)); | 86 | self.try_take(); |
| 85 | } | 87 | } |
| 86 | 88 | ||
| 87 | fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { | 89 | fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { |
| @@ -106,7 +108,7 @@ where | |||
| 106 | }) | 108 | }) |
| 107 | } | 109 | } |
| 108 | 110 | ||
| 109 | /// Future that completes when this Signal has been signaled. | 111 | /// Future that completes when this Signal has been signaled, taking the value out of the signal. |
| 110 | pub fn wait(&self) -> impl Future<Output = T> + '_ { | 112 | pub fn wait(&self) -> impl Future<Output = T> + '_ { |
| 111 | poll_fn(move |cx| self.poll_wait(cx)) | 113 | poll_fn(move |cx| self.poll_wait(cx)) |
| 112 | } | 114 | } |
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs index 63fe04a6e..d2bf890e5 100644 --- a/embassy-sync/src/waitqueue/atomic_waker.rs +++ b/embassy-sync/src/waitqueue/atomic_waker.rs | |||
| @@ -1,26 +1,28 @@ | |||
| 1 | use core::cell::Cell; | 1 | use core::cell::Cell; |
| 2 | use core::task::Waker; | 2 | use core::task::Waker; |
| 3 | 3 | ||
| 4 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 5 | use crate::blocking_mutex::Mutex; | 4 | use crate::blocking_mutex::Mutex; |
| 5 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; | ||
| 6 | 6 | ||
| 7 | /// Utility struct to register and wake a waker. | 7 | /// Utility struct to register and wake a waker. |
| 8 | pub struct AtomicWaker { | 8 | /// If a waker is registered, registering another waker will replace the previous one without waking it. |
| 9 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | 9 | /// Intended to wake a task from an interrupt. Therefore, it is generally not expected, |
| 10 | /// that multiple tasks register try to register a waker simultaneously. | ||
| 11 | pub struct GenericAtomicWaker<M: RawMutex> { | ||
| 12 | waker: Mutex<M, Cell<Option<Waker>>>, | ||
| 10 | } | 13 | } |
| 11 | 14 | ||
| 12 | impl AtomicWaker { | 15 | impl<M: RawMutex> GenericAtomicWaker<M> { |
| 13 | /// Create a new `AtomicWaker`. | 16 | /// Create a new `AtomicWaker`. |
| 14 | pub const fn new() -> Self { | 17 | pub const fn new(mutex: M) -> Self { |
| 15 | Self { | 18 | Self { |
| 16 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | 19 | waker: Mutex::const_new(mutex, Cell::new(None)), |
| 17 | } | 20 | } |
| 18 | } | 21 | } |
| 19 | 22 | ||
| 20 | /// Register a waker. Overwrites the previous waker, if any. | 23 | /// Register a waker. Overwrites the previous waker, if any. |
| 21 | pub fn register(&self, w: &Waker) { | 24 | pub fn register(&self, w: &Waker) { |
| 22 | critical_section::with(|cs| { | 25 | self.waker.lock(|cell| { |
| 23 | let cell = self.waker.borrow(cs); | ||
| 24 | cell.set(match cell.replace(None) { | 26 | cell.set(match cell.replace(None) { |
| 25 | Some(w2) if (w2.will_wake(w)) => Some(w2), | 27 | Some(w2) if (w2.will_wake(w)) => Some(w2), |
| 26 | _ => Some(w.clone()), | 28 | _ => Some(w.clone()), |
| @@ -30,8 +32,7 @@ impl AtomicWaker { | |||
| 30 | 32 | ||
| 31 | /// Wake the registered waker, if any. | 33 | /// Wake the registered waker, if any. |
| 32 | pub fn wake(&self) { | 34 | pub fn wake(&self) { |
| 33 | critical_section::with(|cs| { | 35 | self.waker.lock(|cell| { |
| 34 | let cell = self.waker.borrow(cs); | ||
| 35 | if let Some(w) = cell.replace(None) { | 36 | if let Some(w) = cell.replace(None) { |
| 36 | w.wake_by_ref(); | 37 | w.wake_by_ref(); |
| 37 | cell.set(Some(w)); | 38 | cell.set(Some(w)); |
| @@ -39,3 +40,27 @@ impl AtomicWaker { | |||
| 39 | }) | 40 | }) |
| 40 | } | 41 | } |
| 41 | } | 42 | } |
| 43 | |||
| 44 | /// Utility struct to register and wake a waker. | ||
| 45 | pub struct AtomicWaker { | ||
| 46 | waker: GenericAtomicWaker<CriticalSectionRawMutex>, | ||
| 47 | } | ||
| 48 | |||
| 49 | impl AtomicWaker { | ||
| 50 | /// Create a new `AtomicWaker`. | ||
| 51 | pub const fn new() -> Self { | ||
| 52 | Self { | ||
| 53 | waker: GenericAtomicWaker::new(CriticalSectionRawMutex::new()), | ||
| 54 | } | ||
| 55 | } | ||
| 56 | |||
| 57 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 58 | pub fn register(&self, w: &Waker) { | ||
| 59 | self.waker.register(w); | ||
| 60 | } | ||
| 61 | |||
| 62 | /// Wake the registered waker, if any. | ||
| 63 | pub fn wake(&self) { | ||
| 64 | self.waker.wake(); | ||
| 65 | } | ||
| 66 | } | ||
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs index 5c6a96ec8..a45adeab8 100644 --- a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs | |||
| @@ -4,6 +4,10 @@ use core::sync::atomic::{AtomicPtr, Ordering}; | |||
| 4 | use core::task::Waker; | 4 | use core::task::Waker; |
| 5 | 5 | ||
| 6 | /// Utility struct to register and wake a waker. | 6 | /// Utility struct to register and wake a waker. |
| 7 | /// If a waker is registered, registering another waker will replace the previous one without waking it. | ||
| 8 | /// The intended use case is to wake tasks from interrupts. Therefore, it is generally not expected, | ||
| 9 | /// that multiple tasks register try to register a waker simultaneously. | ||
| 10 | #[derive(Debug)] | ||
| 7 | pub struct AtomicWaker { | 11 | pub struct AtomicWaker { |
| 8 | waker: AtomicPtr<()>, | 12 | waker: AtomicPtr<()>, |
| 9 | } | 13 | } |
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 0e520bf40..56c0cd1b2 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs | |||
| @@ -3,6 +3,9 @@ use core::task::Waker; | |||
| 3 | use heapless::Vec; | 3 | use heapless::Vec; |
| 4 | 4 | ||
| 5 | /// Utility struct to register and wake multiple wakers. | 5 | /// Utility struct to register and wake multiple wakers. |
| 6 | /// Queue of wakers with a maximum length of `N`. | ||
| 7 | /// Intended for waking multiple tasks. | ||
| 8 | #[derive(Debug)] | ||
| 6 | pub struct MultiWakerRegistration<const N: usize> { | 9 | pub struct MultiWakerRegistration<const N: usize> { |
| 7 | wakers: Vec<Waker, N>, | 10 | wakers: Vec<Waker, N>, |
| 8 | } | 11 | } |
| @@ -13,7 +16,9 @@ impl<const N: usize> MultiWakerRegistration<N> { | |||
| 13 | Self { wakers: Vec::new() } | 16 | Self { wakers: Vec::new() } |
| 14 | } | 17 | } |
| 15 | 18 | ||
| 16 | /// Register a waker. If the buffer is full the function returns it in the error | 19 | /// Register a waker. |
| 20 | /// | ||
| 21 | /// If the buffer is full, [wakes all the wakers](Self::wake), clears its buffer and registers the waker. | ||
| 17 | pub fn register(&mut self, w: &Waker) { | 22 | pub fn register(&mut self, w: &Waker) { |
| 18 | // If we already have some waker that wakes the same task as `w`, do nothing. | 23 | // If we already have some waker that wakes the same task as `w`, do nothing. |
| 19 | // This avoids cloning wakers, and avoids unnecessary mass-wakes. | 24 | // This avoids cloning wakers, and avoids unnecessary mass-wakes. |
diff --git a/embassy-sync/src/waitqueue/waker_registration.rs b/embassy-sync/src/waitqueue/waker_registration.rs index 9b666e7c4..7f24f8fb6 100644 --- a/embassy-sync/src/waitqueue/waker_registration.rs +++ b/embassy-sync/src/waitqueue/waker_registration.rs | |||
| @@ -2,6 +2,10 @@ use core::mem; | |||
| 2 | use core::task::Waker; | 2 | use core::task::Waker; |
| 3 | 3 | ||
| 4 | /// Utility struct to register and wake a waker. | 4 | /// Utility struct to register and wake a waker. |
| 5 | /// If a waker is registered, registering another waker will replace the previous one. | ||
| 6 | /// The previous waker will be woken in this case, giving it a chance to reregister itself. | ||
| 7 | /// Although it is possible to wake multiple tasks this way, | ||
| 8 | /// this will cause them to wake each other in a loop registering themselves. | ||
| 5 | #[derive(Debug, Default)] | 9 | #[derive(Debug, Default)] |
| 6 | pub struct WakerRegistration { | 10 | pub struct WakerRegistration { |
| 7 | waker: Option<Waker>, | 11 | waker: Option<Waker>, |
diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs new file mode 100644 index 000000000..0f8a8d679 --- /dev/null +++ b/embassy-sync/src/watch.rs | |||
| @@ -0,0 +1,1127 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to **multiple** receivers. | ||
| 2 | |||
| 3 | use core::cell::RefCell; | ||
| 4 | use core::future::{Future, poll_fn}; | ||
| 5 | use core::marker::PhantomData; | ||
| 6 | use core::ops::{Deref, DerefMut}; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use crate::blocking_mutex::Mutex; | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | use crate::waitqueue::MultiWakerRegistration; | ||
| 12 | |||
| 13 | /// The `Watch` is a single-slot signaling primitive that allows _multiple_ (`N`) receivers to concurrently await | ||
| 14 | /// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers, | ||
| 15 | /// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous | ||
| 16 | /// value when a new one is sent, without waiting for all receivers to read the previous value. | ||
| 17 | /// | ||
| 18 | /// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks | ||
| 19 | /// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are | ||
| 20 | /// always provided with the latest value. | ||
| 21 | /// | ||
| 22 | /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] | ||
| 23 | /// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`] | ||
| 24 | /// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the | ||
| 25 | /// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel. | ||
| 26 | /// ``` | ||
| 27 | /// | ||
| 28 | /// use futures_executor::block_on; | ||
| 29 | /// use embassy_sync::watch::Watch; | ||
| 30 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 31 | /// | ||
| 32 | /// let f = async { | ||
| 33 | /// | ||
| 34 | /// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 35 | /// | ||
| 36 | /// // Obtain receivers and sender | ||
| 37 | /// let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 38 | /// let mut rcv1 = WATCH.dyn_receiver().unwrap(); | ||
| 39 | /// let mut snd = WATCH.sender(); | ||
| 40 | /// | ||
| 41 | /// // No more receivers, and no update | ||
| 42 | /// assert!(WATCH.receiver().is_none()); | ||
| 43 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 44 | /// | ||
| 45 | /// snd.send(10); | ||
| 46 | /// | ||
| 47 | /// // Receive the new value (async or try) | ||
| 48 | /// assert_eq!(rcv0.changed().await, 10); | ||
| 49 | /// assert_eq!(rcv1.try_changed(), Some(10)); | ||
| 50 | /// | ||
| 51 | /// // No update | ||
| 52 | /// assert_eq!(rcv0.try_changed(), None); | ||
| 53 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 54 | /// | ||
| 55 | /// snd.send(20); | ||
| 56 | /// | ||
| 57 | /// // Using `get` marks the value as seen | ||
| 58 | /// assert_eq!(rcv1.get().await, 20); | ||
| 59 | /// assert_eq!(rcv1.try_changed(), None); | ||
| 60 | /// | ||
| 61 | /// // But `get` also returns when unchanged | ||
| 62 | /// assert_eq!(rcv1.get().await, 20); | ||
| 63 | /// assert_eq!(rcv1.get().await, 20); | ||
| 64 | /// | ||
| 65 | /// }; | ||
| 66 | /// block_on(f); | ||
| 67 | /// ``` | ||
| 68 | #[derive(Debug)] | ||
| 69 | pub struct Watch<M: RawMutex, T: Clone, const N: usize> { | ||
| 70 | mutex: Mutex<M, RefCell<WatchState<T, N>>>, | ||
| 71 | } | ||
| 72 | |||
| 73 | #[derive(Debug)] | ||
| 74 | struct WatchState<T: Clone, const N: usize> { | ||
| 75 | data: Option<T>, | ||
| 76 | current_id: u64, | ||
| 77 | wakers: MultiWakerRegistration<N>, | ||
| 78 | receiver_count: usize, | ||
| 79 | } | ||
| 80 | |||
| 81 | trait SealedWatchBehavior<T> { | ||
| 82 | /// Poll the `Watch` for the current value, making it as seen. | ||
| 83 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 84 | |||
| 85 | /// Poll the `Watch` for the value if it matches the predicate function | ||
| 86 | /// `f`, making it as seen. | ||
| 87 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 88 | |||
| 89 | /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. | ||
| 90 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>; | ||
| 91 | |||
| 92 | /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. | ||
| 93 | fn try_changed(&self, id: &mut u64) -> Option<T>; | ||
| 94 | |||
| 95 | /// Poll the `Watch` for a changed value that matches the predicate function | ||
| 96 | /// `f`, marking it as seen. | ||
| 97 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>; | ||
| 98 | |||
| 99 | /// Tries to retrieve the value of the `Watch` if it has changed and matches the | ||
| 100 | /// predicate function `f`, marking it as seen. | ||
| 101 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 102 | |||
| 103 | /// Used when a receiver is dropped to decrement the receiver count. | ||
| 104 | /// | ||
| 105 | /// ## This method should not be called by the user. | ||
| 106 | fn drop_receiver(&self); | ||
| 107 | |||
| 108 | /// Clears the value of the `Watch`. | ||
| 109 | fn clear(&self); | ||
| 110 | |||
| 111 | /// Sends a new value to the `Watch`. | ||
| 112 | fn send(&self, val: T); | ||
| 113 | |||
| 114 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | ||
| 115 | /// `Watch` does not already contain a value. | ||
| 116 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)); | ||
| 117 | |||
| 118 | /// Modify the value of the `Watch` using a closure. Returns `false` if the | ||
| 119 | /// `Watch` does not already contain a value. | ||
| 120 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool); | ||
| 121 | } | ||
| 122 | |||
| 123 | /// A trait representing the 'inner' behavior of the `Watch`. | ||
| 124 | #[allow(private_bounds)] | ||
| 125 | pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> { | ||
| 126 | /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. | ||
| 127 | fn try_get(&self, id: Option<&mut u64>) -> Option<T>; | ||
| 128 | |||
| 129 | /// Tries to get the value of the `Watch` if it matches the predicate function | ||
| 130 | /// `f`, marking it as seen. | ||
| 131 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>; | ||
| 132 | |||
| 133 | /// Checks if the `Watch` is been initialized with a value. | ||
| 134 | fn contains_value(&self) -> bool; | ||
| 135 | } | ||
| 136 | |||
| 137 | impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> { | ||
| 138 | fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 139 | self.mutex.lock(|state| { | ||
| 140 | let mut s = state.borrow_mut(); | ||
| 141 | match &s.data { | ||
| 142 | Some(data) => { | ||
| 143 | *id = s.current_id; | ||
| 144 | Poll::Ready(data.clone()) | ||
| 145 | } | ||
| 146 | None => { | ||
| 147 | s.wakers.register(cx.waker()); | ||
| 148 | Poll::Pending | ||
| 149 | } | ||
| 150 | } | ||
| 151 | }) | ||
| 152 | } | ||
| 153 | |||
| 154 | fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 155 | self.mutex.lock(|state| { | ||
| 156 | let mut s = state.borrow_mut(); | ||
| 157 | match s.data { | ||
| 158 | Some(ref data) if f(data) => { | ||
| 159 | *id = s.current_id; | ||
| 160 | Poll::Ready(data.clone()) | ||
| 161 | } | ||
| 162 | _ => { | ||
| 163 | s.wakers.register(cx.waker()); | ||
| 164 | Poll::Pending | ||
| 165 | } | ||
| 166 | } | ||
| 167 | }) | ||
| 168 | } | ||
| 169 | |||
| 170 | fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> { | ||
| 171 | self.mutex.lock(|state| { | ||
| 172 | let mut s = state.borrow_mut(); | ||
| 173 | match (&s.data, s.current_id > *id) { | ||
| 174 | (Some(data), true) => { | ||
| 175 | *id = s.current_id; | ||
| 176 | Poll::Ready(data.clone()) | ||
| 177 | } | ||
| 178 | _ => { | ||
| 179 | s.wakers.register(cx.waker()); | ||
| 180 | Poll::Pending | ||
| 181 | } | ||
| 182 | } | ||
| 183 | }) | ||
| 184 | } | ||
| 185 | |||
| 186 | fn try_changed(&self, id: &mut u64) -> Option<T> { | ||
| 187 | self.mutex.lock(|state| { | ||
| 188 | let s = state.borrow(); | ||
| 189 | match s.current_id > *id { | ||
| 190 | true => { | ||
| 191 | *id = s.current_id; | ||
| 192 | s.data.clone() | ||
| 193 | } | ||
| 194 | false => None, | ||
| 195 | } | ||
| 196 | }) | ||
| 197 | } | ||
| 198 | |||
| 199 | fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> { | ||
| 200 | self.mutex.lock(|state| { | ||
| 201 | let mut s = state.borrow_mut(); | ||
| 202 | match (&s.data, s.current_id > *id) { | ||
| 203 | (Some(data), true) if f(data) => { | ||
| 204 | *id = s.current_id; | ||
| 205 | Poll::Ready(data.clone()) | ||
| 206 | } | ||
| 207 | _ => { | ||
| 208 | s.wakers.register(cx.waker()); | ||
| 209 | Poll::Pending | ||
| 210 | } | ||
| 211 | } | ||
| 212 | }) | ||
| 213 | } | ||
| 214 | |||
| 215 | fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 216 | self.mutex.lock(|state| { | ||
| 217 | let s = state.borrow(); | ||
| 218 | match (&s.data, s.current_id > *id) { | ||
| 219 | (Some(data), true) if f(data) => { | ||
| 220 | *id = s.current_id; | ||
| 221 | s.data.clone() | ||
| 222 | } | ||
| 223 | _ => None, | ||
| 224 | } | ||
| 225 | }) | ||
| 226 | } | ||
| 227 | |||
| 228 | fn drop_receiver(&self) { | ||
| 229 | self.mutex.lock(|state| { | ||
| 230 | let mut s = state.borrow_mut(); | ||
| 231 | s.receiver_count -= 1; | ||
| 232 | }) | ||
| 233 | } | ||
| 234 | |||
| 235 | fn clear(&self) { | ||
| 236 | self.mutex.lock(|state| { | ||
| 237 | let mut s = state.borrow_mut(); | ||
| 238 | s.data = None; | ||
| 239 | }) | ||
| 240 | } | ||
| 241 | |||
| 242 | fn send(&self, val: T) { | ||
| 243 | self.mutex.lock(|state| { | ||
| 244 | let mut s = state.borrow_mut(); | ||
| 245 | s.data = Some(val); | ||
| 246 | s.current_id += 1; | ||
| 247 | s.wakers.wake(); | ||
| 248 | }) | ||
| 249 | } | ||
| 250 | |||
| 251 | fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) { | ||
| 252 | self.mutex.lock(|state| { | ||
| 253 | let mut s = state.borrow_mut(); | ||
| 254 | f(&mut s.data); | ||
| 255 | s.current_id += 1; | ||
| 256 | s.wakers.wake(); | ||
| 257 | }) | ||
| 258 | } | ||
| 259 | |||
| 260 | fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) { | ||
| 261 | self.mutex.lock(|state| { | ||
| 262 | let mut s = state.borrow_mut(); | ||
| 263 | if f(&mut s.data) { | ||
| 264 | s.current_id += 1; | ||
| 265 | s.wakers.wake(); | ||
| 266 | } | ||
| 267 | }) | ||
| 268 | } | ||
| 269 | } | ||
| 270 | |||
| 271 | impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> { | ||
| 272 | fn try_get(&self, id: Option<&mut u64>) -> Option<T> { | ||
| 273 | self.mutex.lock(|state| { | ||
| 274 | let s = state.borrow(); | ||
| 275 | if let Some(id) = id { | ||
| 276 | *id = s.current_id; | ||
| 277 | } | ||
| 278 | s.data.clone() | ||
| 279 | }) | ||
| 280 | } | ||
| 281 | |||
| 282 | fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> { | ||
| 283 | self.mutex.lock(|state| { | ||
| 284 | let s = state.borrow(); | ||
| 285 | match s.data { | ||
| 286 | Some(ref data) if f(data) => { | ||
| 287 | if let Some(id) = id { | ||
| 288 | *id = s.current_id; | ||
| 289 | } | ||
| 290 | Some(data.clone()) | ||
| 291 | } | ||
| 292 | _ => None, | ||
| 293 | } | ||
| 294 | }) | ||
| 295 | } | ||
| 296 | |||
| 297 | fn contains_value(&self) -> bool { | ||
| 298 | self.mutex.lock(|state| state.borrow().data.is_some()) | ||
| 299 | } | ||
| 300 | } | ||
| 301 | |||
| 302 | impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> { | ||
| 303 | /// Create a new `Watch` channel for `N` receivers. | ||
| 304 | pub const fn new() -> Self { | ||
| 305 | Self { | ||
| 306 | mutex: Mutex::new(RefCell::new(WatchState { | ||
| 307 | data: None, | ||
| 308 | current_id: 0, | ||
| 309 | wakers: MultiWakerRegistration::new(), | ||
| 310 | receiver_count: 0, | ||
| 311 | })), | ||
| 312 | } | ||
| 313 | } | ||
| 314 | |||
| 315 | /// Create a new `Watch` channel with default data. | ||
| 316 | pub const fn new_with(data: T) -> Self { | ||
| 317 | Self { | ||
| 318 | mutex: Mutex::new(RefCell::new(WatchState { | ||
| 319 | data: Some(data), | ||
| 320 | current_id: 0, | ||
| 321 | wakers: MultiWakerRegistration::new(), | ||
| 322 | receiver_count: 0, | ||
| 323 | })), | ||
| 324 | } | ||
| 325 | } | ||
| 326 | |||
| 327 | /// Create a new [`Sender`] for the `Watch`. | ||
| 328 | pub fn sender(&self) -> Sender<'_, M, T, N> { | ||
| 329 | Sender(Snd::new(self)) | ||
| 330 | } | ||
| 331 | |||
| 332 | /// Create a new [`DynSender`] for the `Watch`. | ||
| 333 | pub fn dyn_sender(&self) -> DynSender<'_, T> { | ||
| 334 | DynSender(Snd::new(self)) | ||
| 335 | } | ||
| 336 | |||
| 337 | /// Try to create a new [`Receiver`] for the `Watch`. If the | ||
| 338 | /// maximum number of receivers has been reached, `None` is returned. | ||
| 339 | pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> { | ||
| 340 | self.mutex.lock(|state| { | ||
| 341 | let mut s = state.borrow_mut(); | ||
| 342 | if s.receiver_count < N { | ||
| 343 | s.receiver_count += 1; | ||
| 344 | Some(Receiver(Rcv::new(self, 0))) | ||
| 345 | } else { | ||
| 346 | None | ||
| 347 | } | ||
| 348 | }) | ||
| 349 | } | ||
| 350 | |||
| 351 | /// Try to create a new [`DynReceiver`] for the `Watch`. If the | ||
| 352 | /// maximum number of receivers has been reached, `None` is returned. | ||
| 353 | pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> { | ||
| 354 | self.mutex.lock(|state| { | ||
| 355 | let mut s = state.borrow_mut(); | ||
| 356 | if s.receiver_count < N { | ||
| 357 | s.receiver_count += 1; | ||
| 358 | Some(DynReceiver(Rcv::new(self, 0))) | ||
| 359 | } else { | ||
| 360 | None | ||
| 361 | } | ||
| 362 | }) | ||
| 363 | } | ||
| 364 | |||
| 365 | /// Try to create a new [`AnonReceiver`] for the `Watch`. | ||
| 366 | pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> { | ||
| 367 | AnonReceiver(AnonRcv::new(self, 0)) | ||
| 368 | } | ||
| 369 | |||
| 370 | /// Try to create a new [`DynAnonReceiver`] for the `Watch`. | ||
| 371 | pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> { | ||
| 372 | DynAnonReceiver(AnonRcv::new(self, 0)) | ||
| 373 | } | ||
| 374 | |||
| 375 | /// Returns the message ID of the latest message sent to the `Watch`. | ||
| 376 | /// | ||
| 377 | /// This counter is monotonic, and is incremented every time a new message is sent. | ||
| 378 | pub fn get_msg_id(&self) -> u64 { | ||
| 379 | self.mutex.lock(|state| state.borrow().current_id) | ||
| 380 | } | ||
| 381 | |||
| 382 | /// Tries to get the value of the `Watch`. | ||
| 383 | pub fn try_get(&self) -> Option<T> { | ||
| 384 | WatchBehavior::try_get(self, None) | ||
| 385 | } | ||
| 386 | |||
| 387 | /// Tries to get the value of the `Watch` if it matches the predicate function `f`. | ||
| 388 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> | ||
| 389 | where | ||
| 390 | F: Fn(&T) -> bool, | ||
| 391 | { | ||
| 392 | WatchBehavior::try_get_and(self, None, &mut f) | ||
| 393 | } | ||
| 394 | } | ||
| 395 | |||
| 396 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 397 | #[derive(Debug)] | ||
| 398 | pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 399 | watch: &'a W, | ||
| 400 | _phantom: PhantomData<T>, | ||
| 401 | } | ||
| 402 | |||
| 403 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> { | ||
| 404 | fn clone(&self) -> Self { | ||
| 405 | Self { | ||
| 406 | watch: self.watch, | ||
| 407 | _phantom: PhantomData, | ||
| 408 | } | ||
| 409 | } | ||
| 410 | } | ||
| 411 | |||
| 412 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> { | ||
| 413 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 414 | fn new(watch: &'a W) -> Self { | ||
| 415 | Self { | ||
| 416 | watch, | ||
| 417 | _phantom: PhantomData, | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | /// Sends a new value to the `Watch`. | ||
| 422 | pub fn send(&self, val: T) { | ||
| 423 | self.watch.send(val) | ||
| 424 | } | ||
| 425 | |||
| 426 | /// Clears the value of the `Watch`. | ||
| 427 | /// This will cause calls to [`Rcv::get`] to be pending. | ||
| 428 | pub fn clear(&self) { | ||
| 429 | self.watch.clear() | ||
| 430 | } | ||
| 431 | |||
| 432 | /// Tries to retrieve the value of the `Watch`. | ||
| 433 | pub fn try_get(&self) -> Option<T> { | ||
| 434 | self.watch.try_get(None) | ||
| 435 | } | ||
| 436 | |||
| 437 | /// Tries to peek the current value of the `Watch` if it matches the predicate | ||
| 438 | /// function `f`. | ||
| 439 | pub fn try_get_and<F>(&self, mut f: F) -> Option<T> | ||
| 440 | where | ||
| 441 | F: Fn(&T) -> bool, | ||
| 442 | { | ||
| 443 | self.watch.try_get_and(None, &mut f) | ||
| 444 | } | ||
| 445 | |||
| 446 | /// Returns true if the `Watch` contains a value. | ||
| 447 | pub fn contains_value(&self) -> bool { | ||
| 448 | self.watch.contains_value() | ||
| 449 | } | ||
| 450 | |||
| 451 | /// Modify the value of the `Watch` using a closure. | ||
| 452 | pub fn send_modify<F>(&self, mut f: F) | ||
| 453 | where | ||
| 454 | F: Fn(&mut Option<T>), | ||
| 455 | { | ||
| 456 | self.watch.send_modify(&mut f) | ||
| 457 | } | ||
| 458 | |||
| 459 | /// Modify the value of the `Watch` using a closure. The closure must return | ||
| 460 | /// `true` if the value was modified, which notifies all receivers. | ||
| 461 | pub fn send_if_modified<F>(&self, mut f: F) | ||
| 462 | where | ||
| 463 | F: Fn(&mut Option<T>) -> bool, | ||
| 464 | { | ||
| 465 | self.watch.send_if_modified(&mut f) | ||
| 466 | } | ||
| 467 | } | ||
| 468 | |||
| 469 | /// A sender of a `Watch` channel. | ||
| 470 | /// | ||
| 471 | /// For a simpler type definition, consider [`DynSender`] at the expense of | ||
| 472 | /// some runtime performance due to dynamic dispatch. | ||
| 473 | #[derive(Debug)] | ||
| 474 | pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>); | ||
| 475 | |||
| 476 | impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { | ||
| 477 | fn clone(&self) -> Self { | ||
| 478 | Self(self.0.clone()) | ||
| 479 | } | ||
| 480 | } | ||
| 481 | |||
| 482 | impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> { | ||
| 483 | /// Converts the `Sender` into a [`DynSender`]. | ||
| 484 | pub fn as_dyn(self) -> DynSender<'a, T> { | ||
| 485 | DynSender(Snd::new(self.watch)) | ||
| 486 | } | ||
| 487 | } | ||
| 488 | |||
| 489 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> { | ||
| 490 | fn into(self) -> DynSender<'a, T> { | ||
| 491 | self.as_dyn() | ||
| 492 | } | ||
| 493 | } | ||
| 494 | |||
| 495 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> { | ||
| 496 | type Target = Snd<'a, T, Watch<M, T, N>>; | ||
| 497 | |||
| 498 | fn deref(&self) -> &Self::Target { | ||
| 499 | &self.0 | ||
| 500 | } | ||
| 501 | } | ||
| 502 | |||
| 503 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> { | ||
| 504 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 505 | &mut self.0 | ||
| 506 | } | ||
| 507 | } | ||
| 508 | |||
| 509 | /// A sender which holds a **dynamic** reference to a `Watch` channel. | ||
| 510 | /// | ||
| 511 | /// This is an alternative to [`Sender`] with a simpler type definition, | ||
| 512 | pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 513 | |||
| 514 | impl<'a, T: Clone> Clone for DynSender<'a, T> { | ||
| 515 | fn clone(&self) -> Self { | ||
| 516 | Self(self.0.clone()) | ||
| 517 | } | ||
| 518 | } | ||
| 519 | |||
| 520 | impl<'a, T: Clone> Deref for DynSender<'a, T> { | ||
| 521 | type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 522 | |||
| 523 | fn deref(&self) -> &Self::Target { | ||
| 524 | &self.0 | ||
| 525 | } | ||
| 526 | } | ||
| 527 | |||
| 528 | impl<'a, T: Clone> DerefMut for DynSender<'a, T> { | ||
| 529 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 530 | &mut self.0 | ||
| 531 | } | ||
| 532 | } | ||
| 533 | |||
| 534 | /// A receiver can `.await` a change in the `Watch` value. | ||
| 535 | pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 536 | watch: &'a W, | ||
| 537 | at_id: u64, | ||
| 538 | _phantom: PhantomData<T>, | ||
| 539 | } | ||
| 540 | |||
| 541 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> { | ||
| 542 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 543 | fn new(watch: &'a W, at_id: u64) -> Self { | ||
| 544 | Self { | ||
| 545 | watch, | ||
| 546 | at_id, | ||
| 547 | _phantom: PhantomData, | ||
| 548 | } | ||
| 549 | } | ||
| 550 | |||
| 551 | /// Returns the current value of the `Watch` once it is initialized, marking it as seen. | ||
| 552 | /// | ||
| 553 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 554 | pub fn get(&mut self) -> impl Future<Output = T> + '_ { | ||
| 555 | poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)) | ||
| 556 | } | ||
| 557 | |||
| 558 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 559 | pub fn try_get(&mut self) -> Option<T> { | ||
| 560 | self.watch.try_get(Some(&mut self.at_id)) | ||
| 561 | } | ||
| 562 | |||
| 563 | /// Returns the value of the `Watch` if it matches the predicate function `f`, | ||
| 564 | /// or waits for it to match, marking it as seen. | ||
| 565 | /// | ||
| 566 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 567 | pub async fn get_and<F>(&mut self, mut f: F) -> T | ||
| 568 | where | ||
| 569 | F: Fn(&T) -> bool, | ||
| 570 | { | ||
| 571 | poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await | ||
| 572 | } | ||
| 573 | |||
| 574 | /// Tries to get the current value of the `Watch` if it matches the predicate | ||
| 575 | /// function `f` without waiting, marking it as seen. | ||
| 576 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 577 | where | ||
| 578 | F: Fn(&T) -> bool, | ||
| 579 | { | ||
| 580 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) | ||
| 581 | } | ||
| 582 | |||
| 583 | /// Waits for the `Watch` to change and returns the new value, marking it as seen. | ||
| 584 | /// | ||
| 585 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 586 | pub async fn changed(&mut self) -> T { | ||
| 587 | poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await | ||
| 588 | } | ||
| 589 | |||
| 590 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 591 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 592 | self.watch.try_changed(&mut self.at_id) | ||
| 593 | } | ||
| 594 | |||
| 595 | /// Waits for the `Watch` to change to a value which satisfies the predicate | ||
| 596 | /// function `f` and returns the new value, marking it as seen. | ||
| 597 | /// | ||
| 598 | /// **Note**: Futures do nothing unless you `.await` or poll them. | ||
| 599 | pub async fn changed_and<F>(&mut self, mut f: F) -> T | ||
| 600 | where | ||
| 601 | F: Fn(&T) -> bool, | ||
| 602 | { | ||
| 603 | poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await | ||
| 604 | } | ||
| 605 | |||
| 606 | /// Tries to get the new value of the watch which satisfies the predicate | ||
| 607 | /// function `f` and returns the new value without waiting, marking it as seen. | ||
| 608 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 609 | where | ||
| 610 | F: Fn(&T) -> bool, | ||
| 611 | { | ||
| 612 | self.watch.try_changed_and(&mut self.at_id, &mut f) | ||
| 613 | } | ||
| 614 | |||
| 615 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 616 | /// then awaiting [`Rcv::get`] will return immediately. | ||
| 617 | pub fn contains_value(&self) -> bool { | ||
| 618 | self.watch.contains_value() | ||
| 619 | } | ||
| 620 | } | ||
| 621 | |||
| 622 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> { | ||
| 623 | fn drop(&mut self) { | ||
| 624 | self.watch.drop_receiver(); | ||
| 625 | } | ||
| 626 | } | ||
| 627 | |||
| 628 | /// A anonymous receiver can NOT `.await` a change in the `Watch` value. | ||
| 629 | #[derive(Debug)] | ||
| 630 | pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> { | ||
| 631 | watch: &'a W, | ||
| 632 | at_id: u64, | ||
| 633 | _phantom: PhantomData<T>, | ||
| 634 | } | ||
| 635 | |||
| 636 | impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> { | ||
| 637 | /// Creates a new `Receiver` with a reference to the `Watch`. | ||
| 638 | fn new(watch: &'a W, at_id: u64) -> Self { | ||
| 639 | Self { | ||
| 640 | watch, | ||
| 641 | at_id, | ||
| 642 | _phantom: PhantomData, | ||
| 643 | } | ||
| 644 | } | ||
| 645 | |||
| 646 | /// Tries to get the current value of the `Watch` without waiting, marking it as seen. | ||
| 647 | pub fn try_get(&mut self) -> Option<T> { | ||
| 648 | self.watch.try_get(Some(&mut self.at_id)) | ||
| 649 | } | ||
| 650 | |||
| 651 | /// Tries to get the current value of the `Watch` if it matches the predicate | ||
| 652 | /// function `f` without waiting, marking it as seen. | ||
| 653 | pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 654 | where | ||
| 655 | F: Fn(&T) -> bool, | ||
| 656 | { | ||
| 657 | self.watch.try_get_and(Some(&mut self.at_id), &mut f) | ||
| 658 | } | ||
| 659 | |||
| 660 | /// Tries to get the new value of the watch without waiting, marking it as seen. | ||
| 661 | pub fn try_changed(&mut self) -> Option<T> { | ||
| 662 | self.watch.try_changed(&mut self.at_id) | ||
| 663 | } | ||
| 664 | |||
| 665 | /// Tries to get the new value of the watch which satisfies the predicate | ||
| 666 | /// function `f` and returns the new value without waiting, marking it as seen. | ||
| 667 | pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T> | ||
| 668 | where | ||
| 669 | F: Fn(&T) -> bool, | ||
| 670 | { | ||
| 671 | self.watch.try_changed_and(&mut self.at_id, &mut f) | ||
| 672 | } | ||
| 673 | |||
| 674 | /// Checks if the `Watch` contains a value. If this returns true, | ||
| 675 | /// then awaiting [`Rcv::get`] will return immediately. | ||
| 676 | pub fn contains_value(&self) -> bool { | ||
| 677 | self.watch.contains_value() | ||
| 678 | } | ||
| 679 | } | ||
| 680 | |||
| 681 | /// A receiver of a `Watch` channel. | ||
| 682 | pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>); | ||
| 683 | |||
| 684 | impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { | ||
| 685 | /// Converts the `Receiver` into a [`DynReceiver`]. | ||
| 686 | pub fn as_dyn(self) -> DynReceiver<'a, T> { | ||
| 687 | let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id)); | ||
| 688 | core::mem::forget(self); // Ensures the destructor is not called | ||
| 689 | rcv | ||
| 690 | } | ||
| 691 | } | ||
| 692 | |||
| 693 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> { | ||
| 694 | fn into(self) -> DynReceiver<'a, T> { | ||
| 695 | self.as_dyn() | ||
| 696 | } | ||
| 697 | } | ||
| 698 | |||
| 699 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { | ||
| 700 | type Target = Rcv<'a, T, Watch<M, T, N>>; | ||
| 701 | |||
| 702 | fn deref(&self) -> &Self::Target { | ||
| 703 | &self.0 | ||
| 704 | } | ||
| 705 | } | ||
| 706 | |||
| 707 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { | ||
| 708 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 709 | &mut self.0 | ||
| 710 | } | ||
| 711 | } | ||
| 712 | |||
| 713 | /// A receiver which holds a **dynamic** reference to a `Watch` channel. | ||
| 714 | /// | ||
| 715 | /// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of | ||
| 716 | /// some runtime performance due to dynamic dispatch. | ||
| 717 | pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 718 | |||
| 719 | impl<'a, T: Clone> Deref for DynReceiver<'a, T> { | ||
| 720 | type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 721 | |||
| 722 | fn deref(&self) -> &Self::Target { | ||
| 723 | &self.0 | ||
| 724 | } | ||
| 725 | } | ||
| 726 | |||
| 727 | impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { | ||
| 728 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 729 | &mut self.0 | ||
| 730 | } | ||
| 731 | } | ||
| 732 | |||
| 733 | /// A receiver of a `Watch` channel that cannot `.await` values. | ||
| 734 | #[derive(Debug)] | ||
| 735 | pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>); | ||
| 736 | |||
| 737 | impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { | ||
| 738 | /// Converts the `Receiver` into a [`DynReceiver`]. | ||
| 739 | pub fn as_dyn(self) -> DynAnonReceiver<'a, T> { | ||
| 740 | let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id)); | ||
| 741 | core::mem::forget(self); // Ensures the destructor is not called | ||
| 742 | rcv | ||
| 743 | } | ||
| 744 | } | ||
| 745 | |||
| 746 | impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> { | ||
| 747 | fn into(self) -> DynAnonReceiver<'a, T> { | ||
| 748 | self.as_dyn() | ||
| 749 | } | ||
| 750 | } | ||
| 751 | |||
| 752 | impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> { | ||
| 753 | type Target = AnonRcv<'a, T, Watch<M, T, N>>; | ||
| 754 | |||
| 755 | fn deref(&self) -> &Self::Target { | ||
| 756 | &self.0 | ||
| 757 | } | ||
| 758 | } | ||
| 759 | |||
| 760 | impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> { | ||
| 761 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 762 | &mut self.0 | ||
| 763 | } | ||
| 764 | } | ||
| 765 | |||
| 766 | /// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel. | ||
| 767 | /// | ||
| 768 | /// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of | ||
| 769 | /// some runtime performance due to dynamic dispatch. | ||
| 770 | pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>); | ||
| 771 | |||
| 772 | impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> { | ||
| 773 | type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>; | ||
| 774 | |||
| 775 | fn deref(&self) -> &Self::Target { | ||
| 776 | &self.0 | ||
| 777 | } | ||
| 778 | } | ||
| 779 | |||
| 780 | impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> { | ||
| 781 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 782 | &mut self.0 | ||
| 783 | } | ||
| 784 | } | ||
| 785 | |||
| 786 | #[cfg(test)] | ||
| 787 | mod tests { | ||
| 788 | use futures_executor::block_on; | ||
| 789 | |||
| 790 | use super::Watch; | ||
| 791 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 792 | |||
| 793 | #[test] | ||
| 794 | fn multiple_sends() { | ||
| 795 | let f = async { | ||
| 796 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 797 | |||
| 798 | // Obtain receiver and sender | ||
| 799 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 800 | let snd = WATCH.sender(); | ||
| 801 | |||
| 802 | // Not initialized | ||
| 803 | assert_eq!(rcv.try_changed(), None); | ||
| 804 | |||
| 805 | // Receive the new value | ||
| 806 | snd.send(10); | ||
| 807 | assert_eq!(rcv.changed().await, 10); | ||
| 808 | |||
| 809 | // Receive another value | ||
| 810 | snd.send(20); | ||
| 811 | assert_eq!(rcv.try_changed(), Some(20)); | ||
| 812 | |||
| 813 | // No update | ||
| 814 | assert_eq!(rcv.try_changed(), None); | ||
| 815 | }; | ||
| 816 | block_on(f); | ||
| 817 | } | ||
| 818 | |||
| 819 | #[test] | ||
| 820 | fn all_try_get() { | ||
| 821 | let f = async { | ||
| 822 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 823 | |||
| 824 | // Obtain receiver and sender | ||
| 825 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 826 | let snd = WATCH.sender(); | ||
| 827 | |||
| 828 | // Not initialized | ||
| 829 | assert_eq!(WATCH.try_get(), None); | ||
| 830 | assert_eq!(rcv.try_get(), None); | ||
| 831 | assert_eq!(snd.try_get(), None); | ||
| 832 | |||
| 833 | // Receive the new value | ||
| 834 | snd.send(10); | ||
| 835 | assert_eq!(WATCH.try_get(), Some(10)); | ||
| 836 | assert_eq!(rcv.try_get(), Some(10)); | ||
| 837 | assert_eq!(snd.try_get(), Some(10)); | ||
| 838 | |||
| 839 | assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10)); | ||
| 840 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(10)); | ||
| 841 | assert_eq!(snd.try_get_and(|x| x > &5), Some(10)); | ||
| 842 | |||
| 843 | assert_eq!(WATCH.try_get_and(|x| x < &5), None); | ||
| 844 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | ||
| 845 | assert_eq!(snd.try_get_and(|x| x < &5), None); | ||
| 846 | }; | ||
| 847 | block_on(f); | ||
| 848 | } | ||
| 849 | |||
| 850 | #[test] | ||
| 851 | fn once_lock_like() { | ||
| 852 | let f = async { | ||
| 853 | static CONFIG0: u8 = 10; | ||
| 854 | static CONFIG1: u8 = 20; | ||
| 855 | |||
| 856 | static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new(); | ||
| 857 | |||
| 858 | // Obtain receiver and sender | ||
| 859 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 860 | let snd = WATCH.sender(); | ||
| 861 | |||
| 862 | // Not initialized | ||
| 863 | assert_eq!(rcv.try_changed(), None); | ||
| 864 | |||
| 865 | // Receive the new value | ||
| 866 | snd.send(&CONFIG0); | ||
| 867 | let rcv0 = rcv.changed().await; | ||
| 868 | assert_eq!(rcv0, &10); | ||
| 869 | |||
| 870 | // Receive another value | ||
| 871 | snd.send(&CONFIG1); | ||
| 872 | let rcv1 = rcv.try_changed(); | ||
| 873 | assert_eq!(rcv1, Some(&20)); | ||
| 874 | |||
| 875 | // No update | ||
| 876 | assert_eq!(rcv.try_changed(), None); | ||
| 877 | |||
| 878 | // Ensure similarity with original static | ||
| 879 | assert_eq!(rcv0, &CONFIG0); | ||
| 880 | assert_eq!(rcv1, Some(&CONFIG1)); | ||
| 881 | }; | ||
| 882 | block_on(f); | ||
| 883 | } | ||
| 884 | |||
| 885 | #[test] | ||
| 886 | fn sender_modify() { | ||
| 887 | let f = async { | ||
| 888 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 889 | |||
| 890 | // Obtain receiver and sender | ||
| 891 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 892 | let snd = WATCH.sender(); | ||
| 893 | |||
| 894 | // Receive the new value | ||
| 895 | snd.send(10); | ||
| 896 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 897 | |||
| 898 | // Modify the value inplace | ||
| 899 | snd.send_modify(|opt| { | ||
| 900 | if let Some(inner) = opt { | ||
| 901 | *inner += 5; | ||
| 902 | } | ||
| 903 | }); | ||
| 904 | |||
| 905 | // Get the modified value | ||
| 906 | assert_eq!(rcv.try_changed(), Some(15)); | ||
| 907 | assert_eq!(rcv.try_changed(), None); | ||
| 908 | }; | ||
| 909 | block_on(f); | ||
| 910 | } | ||
| 911 | |||
| 912 | #[test] | ||
| 913 | fn predicate_fn() { | ||
| 914 | let f = async { | ||
| 915 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 916 | |||
| 917 | // Obtain receiver and sender | ||
| 918 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 919 | let snd = WATCH.sender(); | ||
| 920 | |||
| 921 | snd.send(15); | ||
| 922 | assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); | ||
| 923 | assert_eq!(rcv.try_get_and(|x| x < &5), None); | ||
| 924 | assert!(rcv.try_changed().is_none()); | ||
| 925 | |||
| 926 | snd.send(20); | ||
| 927 | assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20)); | ||
| 928 | assert_eq!(rcv.try_changed_and(|x| x > &5), None); | ||
| 929 | |||
| 930 | snd.send(25); | ||
| 931 | assert_eq!(rcv.try_changed_and(|x| x < &5), None); | ||
| 932 | assert_eq!(rcv.try_changed(), Some(25)); | ||
| 933 | |||
| 934 | snd.send(30); | ||
| 935 | assert_eq!(rcv.changed_and(|x| x > &5).await, 30); | ||
| 936 | assert_eq!(rcv.get_and(|x| x > &5).await, 30); | ||
| 937 | }; | ||
| 938 | block_on(f); | ||
| 939 | } | ||
| 940 | |||
| 941 | #[test] | ||
| 942 | fn receive_after_create() { | ||
| 943 | let f = async { | ||
| 944 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 945 | |||
| 946 | // Obtain sender and send value | ||
| 947 | let snd = WATCH.sender(); | ||
| 948 | snd.send(10); | ||
| 949 | |||
| 950 | // Obtain receiver and receive value | ||
| 951 | let mut rcv = WATCH.receiver().unwrap(); | ||
| 952 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 953 | }; | ||
| 954 | block_on(f); | ||
| 955 | } | ||
| 956 | |||
| 957 | #[test] | ||
| 958 | fn max_receivers_drop() { | ||
| 959 | let f = async { | ||
| 960 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 961 | |||
| 962 | // Try to create 3 receivers (only 2 can exist at once) | ||
| 963 | let rcv0 = WATCH.receiver(); | ||
| 964 | let rcv1 = WATCH.receiver(); | ||
| 965 | let rcv2 = WATCH.receiver(); | ||
| 966 | |||
| 967 | // Ensure the first two are successful and the third is not | ||
| 968 | assert!(rcv0.is_some()); | ||
| 969 | assert!(rcv1.is_some()); | ||
| 970 | assert!(rcv2.is_none()); | ||
| 971 | |||
| 972 | // Drop the first receiver | ||
| 973 | drop(rcv0); | ||
| 974 | |||
| 975 | // Create another receiver and ensure it is successful | ||
| 976 | let rcv3 = WATCH.receiver(); | ||
| 977 | assert!(rcv3.is_some()); | ||
| 978 | }; | ||
| 979 | block_on(f); | ||
| 980 | } | ||
| 981 | |||
| 982 | #[test] | ||
| 983 | fn multiple_receivers() { | ||
| 984 | let f = async { | ||
| 985 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 986 | |||
| 987 | // Obtain receivers and sender | ||
| 988 | let mut rcv0 = WATCH.receiver().unwrap(); | ||
| 989 | let mut rcv1 = WATCH.anon_receiver(); | ||
| 990 | let snd = WATCH.sender(); | ||
| 991 | |||
| 992 | // No update for both | ||
| 993 | assert_eq!(rcv0.try_changed(), None); | ||
| 994 | assert_eq!(rcv1.try_changed(), None); | ||
| 995 | |||
| 996 | // Send a new value | ||
| 997 | snd.send(0); | ||
| 998 | |||
| 999 | // Both receivers receive the new value | ||
| 1000 | assert_eq!(rcv0.try_changed(), Some(0)); | ||
| 1001 | assert_eq!(rcv1.try_changed(), Some(0)); | ||
| 1002 | }; | ||
| 1003 | block_on(f); | ||
| 1004 | } | ||
| 1005 | |||
| 1006 | #[test] | ||
| 1007 | fn clone_senders() { | ||
| 1008 | let f = async { | ||
| 1009 | // Obtain different ways to send | ||
| 1010 | static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new(); | ||
| 1011 | let snd0 = WATCH.sender(); | ||
| 1012 | let snd1 = snd0.clone(); | ||
| 1013 | |||
| 1014 | // Obtain Receiver | ||
| 1015 | let mut rcv = WATCH.receiver().unwrap().as_dyn(); | ||
| 1016 | |||
| 1017 | // Send a value from first sender | ||
| 1018 | snd0.send(10); | ||
| 1019 | assert_eq!(rcv.try_changed(), Some(10)); | ||
| 1020 | |||
| 1021 | // Send a value from second sender | ||
| 1022 | snd1.send(20); | ||
| 1023 | assert_eq!(rcv.try_changed(), Some(20)); | ||
| 1024 | }; | ||
| 1025 | block_on(f); | ||
| 1026 | } | ||
| 1027 | |||
| 1028 | #[test] | ||
| 1029 | fn use_dynamics() { | ||
| 1030 | let f = async { | ||
| 1031 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1032 | |||
| 1033 | // Obtain receiver and sender | ||
| 1034 | let mut anon_rcv = WATCH.dyn_anon_receiver(); | ||
| 1035 | let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); | ||
| 1036 | let dyn_snd = WATCH.dyn_sender(); | ||
| 1037 | |||
| 1038 | // Send a value | ||
| 1039 | dyn_snd.send(10); | ||
| 1040 | |||
| 1041 | // Ensure the dynamic receiver receives the value | ||
| 1042 | assert_eq!(anon_rcv.try_changed(), Some(10)); | ||
| 1043 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | ||
| 1044 | assert_eq!(dyn_rcv.try_changed(), None); | ||
| 1045 | }; | ||
| 1046 | block_on(f); | ||
| 1047 | } | ||
| 1048 | |||
| 1049 | #[test] | ||
| 1050 | fn convert_to_dyn() { | ||
| 1051 | let f = async { | ||
| 1052 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1053 | |||
| 1054 | // Obtain receiver and sender | ||
| 1055 | let anon_rcv = WATCH.anon_receiver(); | ||
| 1056 | let rcv = WATCH.receiver().unwrap(); | ||
| 1057 | let snd = WATCH.sender(); | ||
| 1058 | |||
| 1059 | // Convert to dynamic | ||
| 1060 | let mut dyn_anon_rcv = anon_rcv.as_dyn(); | ||
| 1061 | let mut dyn_rcv = rcv.as_dyn(); | ||
| 1062 | let dyn_snd = snd.as_dyn(); | ||
| 1063 | |||
| 1064 | // Send a value | ||
| 1065 | dyn_snd.send(10); | ||
| 1066 | |||
| 1067 | // Ensure the dynamic receiver receives the value | ||
| 1068 | assert_eq!(dyn_anon_rcv.try_changed(), Some(10)); | ||
| 1069 | assert_eq!(dyn_rcv.try_changed(), Some(10)); | ||
| 1070 | assert_eq!(dyn_rcv.try_changed(), None); | ||
| 1071 | }; | ||
| 1072 | block_on(f); | ||
| 1073 | } | ||
| 1074 | |||
| 1075 | #[test] | ||
| 1076 | fn dynamic_receiver_count() { | ||
| 1077 | let f = async { | ||
| 1078 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1079 | |||
| 1080 | // Obtain receiver and sender | ||
| 1081 | let rcv0 = WATCH.receiver(); | ||
| 1082 | let rcv1 = WATCH.receiver(); | ||
| 1083 | let rcv2 = WATCH.receiver(); | ||
| 1084 | |||
| 1085 | // Ensure the first two are successful and the third is not | ||
| 1086 | assert!(rcv0.is_some()); | ||
| 1087 | assert!(rcv1.is_some()); | ||
| 1088 | assert!(rcv2.is_none()); | ||
| 1089 | |||
| 1090 | // Convert to dynamic | ||
| 1091 | let dyn_rcv0 = rcv0.unwrap().as_dyn(); | ||
| 1092 | |||
| 1093 | // Drop the (now dynamic) receiver | ||
| 1094 | drop(dyn_rcv0); | ||
| 1095 | |||
| 1096 | // Create another receiver and ensure it is successful | ||
| 1097 | let rcv3 = WATCH.receiver(); | ||
| 1098 | let rcv4 = WATCH.receiver(); | ||
| 1099 | assert!(rcv3.is_some()); | ||
| 1100 | assert!(rcv4.is_none()); | ||
| 1101 | }; | ||
| 1102 | block_on(f); | ||
| 1103 | } | ||
| 1104 | |||
| 1105 | #[test] | ||
| 1106 | fn contains_value() { | ||
| 1107 | let f = async { | ||
| 1108 | static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new(); | ||
| 1109 | |||
| 1110 | // Obtain receiver and sender | ||
| 1111 | let rcv = WATCH.receiver().unwrap(); | ||
| 1112 | let snd = WATCH.sender(); | ||
| 1113 | |||
| 1114 | // check if the watch contains a value | ||
| 1115 | assert_eq!(rcv.contains_value(), false); | ||
| 1116 | assert_eq!(snd.contains_value(), false); | ||
| 1117 | |||
| 1118 | // Send a value | ||
| 1119 | snd.send(10); | ||
| 1120 | |||
| 1121 | // check if the watch contains a value | ||
| 1122 | assert_eq!(rcv.contains_value(), true); | ||
| 1123 | assert_eq!(snd.contains_value(), true); | ||
| 1124 | }; | ||
| 1125 | block_on(f); | ||
| 1126 | } | ||
| 1127 | } | ||
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs index cfce9a571..c572592b8 100644 --- a/embassy-sync/src/zerocopy_channel.rs +++ b/embassy-sync/src/zerocopy_channel.rs | |||
| @@ -15,12 +15,12 @@ | |||
| 15 | //! another message will result in an error being returned. | 15 | //! another message will result in an error being returned. |
| 16 | 16 | ||
| 17 | use core::cell::RefCell; | 17 | use core::cell::RefCell; |
| 18 | use core::future::poll_fn; | 18 | use core::future::{Future, poll_fn}; |
| 19 | use core::marker::PhantomData; | 19 | use core::marker::PhantomData; |
| 20 | use core::task::{Context, Poll}; | 20 | use core::task::{Context, Poll}; |
| 21 | 21 | ||
| 22 | use crate::blocking_mutex::raw::RawMutex; | ||
| 23 | use crate::blocking_mutex::Mutex; | 22 | use crate::blocking_mutex::Mutex; |
| 23 | use crate::blocking_mutex::raw::RawMutex; | ||
| 24 | use crate::waitqueue::WakerRegistration; | 24 | use crate::waitqueue::WakerRegistration; |
| 25 | 25 | ||
| 26 | /// A bounded zero-copy channel for communicating between asynchronous tasks | 26 | /// A bounded zero-copy channel for communicating between asynchronous tasks |
| @@ -34,8 +34,9 @@ use crate::waitqueue::WakerRegistration; | |||
| 34 | /// | 34 | /// |
| 35 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through | 35 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through |
| 36 | /// an `&mut T`. | 36 | /// an `&mut T`. |
| 37 | #[derive(Debug)] | ||
| 37 | pub struct Channel<'a, M: RawMutex, T> { | 38 | pub struct Channel<'a, M: RawMutex, T> { |
| 38 | buf: *mut T, | 39 | buf: BufferPtr<T>, |
| 39 | phantom: PhantomData<&'a mut T>, | 40 | phantom: PhantomData<&'a mut T>, |
| 40 | state: Mutex<M, RefCell<State>>, | 41 | state: Mutex<M, RefCell<State>>, |
| 41 | } | 42 | } |
| @@ -50,10 +51,10 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { | |||
| 50 | assert!(len != 0); | 51 | assert!(len != 0); |
| 51 | 52 | ||
| 52 | Self { | 53 | Self { |
| 53 | buf: buf.as_mut_ptr(), | 54 | buf: BufferPtr(buf.as_mut_ptr()), |
| 54 | phantom: PhantomData, | 55 | phantom: PhantomData, |
| 55 | state: Mutex::new(RefCell::new(State { | 56 | state: Mutex::new(RefCell::new(State { |
| 56 | len, | 57 | capacity: len, |
| 57 | front: 0, | 58 | front: 0, |
| 58 | back: 0, | 59 | back: 0, |
| 59 | full: false, | 60 | full: false, |
| @@ -70,9 +71,45 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { | |||
| 70 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { | 71 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { |
| 71 | (Sender { channel: self }, Receiver { channel: self }) | 72 | (Sender { channel: self }, Receiver { channel: self }) |
| 72 | } | 73 | } |
| 74 | |||
| 75 | /// Clears all elements in the channel. | ||
| 76 | pub fn clear(&mut self) { | ||
| 77 | self.state.lock(|s| { | ||
| 78 | s.borrow_mut().clear(); | ||
| 79 | }); | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Returns the number of elements currently in the channel. | ||
| 83 | pub fn len(&self) -> usize { | ||
| 84 | self.state.lock(|s| s.borrow().len()) | ||
| 85 | } | ||
| 86 | |||
| 87 | /// Returns whether the channel is empty. | ||
| 88 | pub fn is_empty(&self) -> bool { | ||
| 89 | self.state.lock(|s| s.borrow().is_empty()) | ||
| 90 | } | ||
| 91 | |||
| 92 | /// Returns whether the channel is full. | ||
| 93 | pub fn is_full(&self) -> bool { | ||
| 94 | self.state.lock(|s| s.borrow().is_full()) | ||
| 95 | } | ||
| 96 | } | ||
| 97 | |||
| 98 | #[repr(transparent)] | ||
| 99 | #[derive(Debug)] | ||
| 100 | struct BufferPtr<T>(*mut T); | ||
| 101 | |||
| 102 | impl<T> BufferPtr<T> { | ||
| 103 | unsafe fn add(&self, count: usize) -> *mut T { | ||
| 104 | self.0.add(count) | ||
| 105 | } | ||
| 73 | } | 106 | } |
| 74 | 107 | ||
| 108 | unsafe impl<T> Send for BufferPtr<T> {} | ||
| 109 | unsafe impl<T> Sync for BufferPtr<T> {} | ||
| 110 | |||
| 75 | /// Send-only access to a [`Channel`]. | 111 | /// Send-only access to a [`Channel`]. |
| 112 | #[derive(Debug)] | ||
| 76 | pub struct Sender<'a, M: RawMutex, T> { | 113 | pub struct Sender<'a, M: RawMutex, T> { |
| 77 | channel: &'a Channel<'a, M, T>, | 114 | channel: &'a Channel<'a, M, T>, |
| 78 | } | 115 | } |
| @@ -109,12 +146,15 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 109 | } | 146 | } |
| 110 | 147 | ||
| 111 | /// Asynchronously send a value over the channel. | 148 | /// Asynchronously send a value over the channel. |
| 112 | pub async fn send(&mut self) -> &mut T { | 149 | pub fn send(&mut self) -> impl Future<Output = &mut T> { |
| 113 | let i = poll_fn(|cx| { | 150 | poll_fn(|cx| { |
| 114 | self.channel.state.lock(|s| { | 151 | self.channel.state.lock(|s| { |
| 115 | let s = &mut *s.borrow_mut(); | 152 | let s = &mut *s.borrow_mut(); |
| 116 | match s.push_index() { | 153 | match s.push_index() { |
| 117 | Some(i) => Poll::Ready(i), | 154 | Some(i) => { |
| 155 | let r = unsafe { &mut *self.channel.buf.add(i) }; | ||
| 156 | Poll::Ready(r) | ||
| 157 | } | ||
| 118 | None => { | 158 | None => { |
| 119 | s.receive_waker.register(cx.waker()); | 159 | s.receive_waker.register(cx.waker()); |
| 120 | Poll::Pending | 160 | Poll::Pending |
| @@ -122,23 +162,44 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { | |||
| 122 | } | 162 | } |
| 123 | }) | 163 | }) |
| 124 | }) | 164 | }) |
| 125 | .await; | ||
| 126 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 127 | } | 165 | } |
| 128 | 166 | ||
| 129 | /// Notify the channel that the sending of the value has been finalized. | 167 | /// Notify the channel that the sending of the value has been finalized. |
| 130 | pub fn send_done(&mut self) { | 168 | pub fn send_done(&mut self) { |
| 131 | self.channel.state.lock(|s| s.borrow_mut().push_done()) | 169 | self.channel.state.lock(|s| s.borrow_mut().push_done()) |
| 132 | } | 170 | } |
| 171 | |||
| 172 | /// Clears all elements in the channel. | ||
| 173 | pub fn clear(&mut self) { | ||
| 174 | self.channel.state.lock(|s| { | ||
| 175 | s.borrow_mut().clear(); | ||
| 176 | }); | ||
| 177 | } | ||
| 178 | |||
| 179 | /// Returns the number of elements currently in the channel. | ||
| 180 | pub fn len(&self) -> usize { | ||
| 181 | self.channel.state.lock(|s| s.borrow().len()) | ||
| 182 | } | ||
| 183 | |||
| 184 | /// Returns whether the channel is empty. | ||
| 185 | pub fn is_empty(&self) -> bool { | ||
| 186 | self.channel.state.lock(|s| s.borrow().is_empty()) | ||
| 187 | } | ||
| 188 | |||
| 189 | /// Returns whether the channel is full. | ||
| 190 | pub fn is_full(&self) -> bool { | ||
| 191 | self.channel.state.lock(|s| s.borrow().is_full()) | ||
| 192 | } | ||
| 133 | } | 193 | } |
| 134 | 194 | ||
| 135 | /// Receive-only access to a [`Channel`]. | 195 | /// Receive-only access to a [`Channel`]. |
| 196 | #[derive(Debug)] | ||
| 136 | pub struct Receiver<'a, M: RawMutex, T> { | 197 | pub struct Receiver<'a, M: RawMutex, T> { |
| 137 | channel: &'a Channel<'a, M, T>, | 198 | channel: &'a Channel<'a, M, T>, |
| 138 | } | 199 | } |
| 139 | 200 | ||
| 140 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | 201 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { |
| 141 | /// Creates one further [`Sender`] over the same channel. | 202 | /// Creates one further [`Receiver`] over the same channel. |
| 142 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { | 203 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { |
| 143 | Receiver { channel: self.channel } | 204 | Receiver { channel: self.channel } |
| 144 | } | 205 | } |
| @@ -169,12 +230,15 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 169 | } | 230 | } |
| 170 | 231 | ||
| 171 | /// Asynchronously receive a value over the channel. | 232 | /// Asynchronously receive a value over the channel. |
| 172 | pub async fn receive(&mut self) -> &mut T { | 233 | pub fn receive(&mut self) -> impl Future<Output = &mut T> { |
| 173 | let i = poll_fn(|cx| { | 234 | poll_fn(|cx| { |
| 174 | self.channel.state.lock(|s| { | 235 | self.channel.state.lock(|s| { |
| 175 | let s = &mut *s.borrow_mut(); | 236 | let s = &mut *s.borrow_mut(); |
| 176 | match s.pop_index() { | 237 | match s.pop_index() { |
| 177 | Some(i) => Poll::Ready(i), | 238 | Some(i) => { |
| 239 | let r = unsafe { &mut *self.channel.buf.add(i) }; | ||
| 240 | Poll::Ready(r) | ||
| 241 | } | ||
| 178 | None => { | 242 | None => { |
| 179 | s.send_waker.register(cx.waker()); | 243 | s.send_waker.register(cx.waker()); |
| 180 | Poll::Pending | 244 | Poll::Pending |
| @@ -182,18 +246,40 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | |||
| 182 | } | 246 | } |
| 183 | }) | 247 | }) |
| 184 | }) | 248 | }) |
| 185 | .await; | ||
| 186 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 187 | } | 249 | } |
| 188 | 250 | ||
| 189 | /// Notify the channel that the receiving of the value has been finalized. | 251 | /// Notify the channel that the receiving of the value has been finalized. |
| 190 | pub fn receive_done(&mut self) { | 252 | pub fn receive_done(&mut self) { |
| 191 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) | 253 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) |
| 192 | } | 254 | } |
| 255 | |||
| 256 | /// Clears all elements in the channel. | ||
| 257 | pub fn clear(&mut self) { | ||
| 258 | self.channel.state.lock(|s| { | ||
| 259 | s.borrow_mut().clear(); | ||
| 260 | }); | ||
| 261 | } | ||
| 262 | |||
| 263 | /// Returns the number of elements currently in the channel. | ||
| 264 | pub fn len(&self) -> usize { | ||
| 265 | self.channel.state.lock(|s| s.borrow().len()) | ||
| 266 | } | ||
| 267 | |||
| 268 | /// Returns whether the channel is empty. | ||
| 269 | pub fn is_empty(&self) -> bool { | ||
| 270 | self.channel.state.lock(|s| s.borrow().is_empty()) | ||
| 271 | } | ||
| 272 | |||
| 273 | /// Returns whether the channel is full. | ||
| 274 | pub fn is_full(&self) -> bool { | ||
| 275 | self.channel.state.lock(|s| s.borrow().is_full()) | ||
| 276 | } | ||
| 193 | } | 277 | } |
| 194 | 278 | ||
| 279 | #[derive(Debug)] | ||
| 195 | struct State { | 280 | struct State { |
| 196 | len: usize, | 281 | /// Maximum number of elements the channel can hold. |
| 282 | capacity: usize, | ||
| 197 | 283 | ||
| 198 | /// Front index. Always 0..=(N-1) | 284 | /// Front index. Always 0..=(N-1) |
| 199 | front: usize, | 285 | front: usize, |
| @@ -210,10 +296,27 @@ struct State { | |||
| 210 | 296 | ||
| 211 | impl State { | 297 | impl State { |
| 212 | fn increment(&self, i: usize) -> usize { | 298 | fn increment(&self, i: usize) -> usize { |
| 213 | if i + 1 == self.len { | 299 | if i + 1 == self.capacity { 0 } else { i + 1 } |
| 214 | 0 | 300 | } |
| 301 | |||
| 302 | fn clear(&mut self) { | ||
| 303 | if self.full { | ||
| 304 | self.receive_waker.wake(); | ||
| 305 | } | ||
| 306 | self.front = 0; | ||
| 307 | self.back = 0; | ||
| 308 | self.full = false; | ||
| 309 | } | ||
| 310 | |||
| 311 | fn len(&self) -> usize { | ||
| 312 | if !self.full { | ||
| 313 | if self.back >= self.front { | ||
| 314 | self.back - self.front | ||
| 315 | } else { | ||
| 316 | self.capacity + self.back - self.front | ||
| 317 | } | ||
| 215 | } else { | 318 | } else { |
| 216 | i + 1 | 319 | self.capacity |
| 217 | } | 320 | } |
| 218 | } | 321 | } |
| 219 | 322 | ||
