diff options
| author | Dario Nieuwenhuis <[email protected]> | 2022-08-22 21:46:09 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2022-08-22 22:18:13 +0200 |
| commit | 21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 (patch) | |
| tree | b5b8c0f4b3571989b5fd15152be5639f4334c282 /embassy-sync/src | |
| parent | 61356181b223e95f289ca3af3a038a699cde2112 (diff) | |
split `embassy-util` into `embassy-futures`, `embassy-sync`.
Diffstat (limited to 'embassy-sync/src')
| -rw-r--r-- | embassy-sync/src/blocking_mutex/mod.rs | 189 | ||||
| -rw-r--r-- | embassy-sync/src/blocking_mutex/raw.rs | 149 | ||||
| -rw-r--r-- | embassy-sync/src/channel/mod.rs | 5 | ||||
| -rw-r--r-- | embassy-sync/src/channel/mpmc.rs | 596 | ||||
| -rw-r--r-- | embassy-sync/src/channel/pubsub/mod.rs | 542 | ||||
| -rw-r--r-- | embassy-sync/src/channel/pubsub/publisher.rs | 182 | ||||
| -rw-r--r-- | embassy-sync/src/channel/pubsub/subscriber.rs | 152 | ||||
| -rw-r--r-- | embassy-sync/src/channel/signal.rs | 100 | ||||
| -rw-r--r-- | embassy-sync/src/fmt.rs | 228 | ||||
| -rw-r--r-- | embassy-sync/src/lib.rs | 17 | ||||
| -rw-r--r-- | embassy-sync/src/mutex.rs | 167 | ||||
| -rw-r--r-- | embassy-sync/src/pipe.rs | 551 | ||||
| -rw-r--r-- | embassy-sync/src/ring_buffer.rs | 146 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/mod.rs | 7 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/multi_waker.rs | 33 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/waker.rs | 92 |
16 files changed, 3156 insertions, 0 deletions
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs new file mode 100644 index 000000000..8a4a4c642 --- /dev/null +++ b/embassy-sync/src/blocking_mutex/mod.rs | |||
| @@ -0,0 +1,189 @@ | |||
| 1 | //! Blocking mutex. | ||
| 2 | //! | ||
| 3 | //! This module provides a blocking mutex that can be used to synchronize data. | ||
| 4 | pub mod raw; | ||
| 5 | |||
| 6 | use core::cell::UnsafeCell; | ||
| 7 | |||
| 8 | use self::raw::RawMutex; | ||
| 9 | |||
| 10 | /// Blocking mutex (not async) | ||
| 11 | /// | ||
| 12 | /// Provides a blocking mutual exclusion primitive backed by an implementation of [`raw::RawMutex`]. | ||
| 13 | /// | ||
| 14 | /// Which implementation you select depends on the context in which you're using the mutex, and you can choose which kind | ||
| 15 | /// of interior mutability fits your use case. | ||
| 16 | /// | ||
| 17 | /// Use [`CriticalSectionMutex`] when data can be shared between threads and interrupts. | ||
| 18 | /// | ||
| 19 | /// Use [`NoopMutex`] when data is only shared between tasks running on the same executor. | ||
| 20 | /// | ||
| 21 | /// Use [`ThreadModeMutex`] when data is shared between tasks running on the same executor but you want a global singleton. | ||
| 22 | /// | ||
| 23 | /// In all cases, the blocking mutex is intended to be short lived and not held across await points. | ||
| 24 | /// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. | ||
| 25 | pub struct Mutex<R, T: ?Sized> { | ||
| 26 | // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets | ||
| 27 | // to run BEFORE dropping `data`. | ||
| 28 | raw: R, | ||
| 29 | data: UnsafeCell<T>, | ||
| 30 | } | ||
| 31 | |||
| 32 | unsafe impl<R: RawMutex + Send, T: ?Sized + Send> Send for Mutex<R, T> {} | ||
| 33 | unsafe impl<R: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<R, T> {} | ||
| 34 | |||
| 35 | impl<R: RawMutex, T> Mutex<R, T> { | ||
| 36 | /// Creates a new mutex in an unlocked state ready for use. | ||
| 37 | #[inline] | ||
| 38 | pub const fn new(val: T) -> Mutex<R, T> { | ||
| 39 | Mutex { | ||
| 40 | raw: R::INIT, | ||
| 41 | data: UnsafeCell::new(val), | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Creates a critical section and grants temporary access to the protected data. | ||
| 46 | pub fn lock<U>(&self, f: impl FnOnce(&T) -> U) -> U { | ||
| 47 | self.raw.lock(|| { | ||
| 48 | let ptr = self.data.get() as *const T; | ||
| 49 | let inner = unsafe { &*ptr }; | ||
| 50 | f(inner) | ||
| 51 | }) | ||
| 52 | } | ||
| 53 | } | ||
| 54 | |||
| 55 | impl<R, T> Mutex<R, T> { | ||
| 56 | /// Creates a new mutex based on a pre-existing raw mutex. | ||
| 57 | /// | ||
| 58 | /// This allows creating a mutex in a constant context on stable Rust. | ||
| 59 | #[inline] | ||
| 60 | pub const fn const_new(raw_mutex: R, val: T) -> Mutex<R, T> { | ||
| 61 | Mutex { | ||
| 62 | raw: raw_mutex, | ||
| 63 | data: UnsafeCell::new(val), | ||
| 64 | } | ||
| 65 | } | ||
| 66 | |||
| 67 | /// Consumes this mutex, returning the underlying data. | ||
| 68 | #[inline] | ||
| 69 | pub fn into_inner(self) -> T { | ||
| 70 | self.data.into_inner() | ||
| 71 | } | ||
| 72 | |||
| 73 | /// Returns a mutable reference to the underlying data. | ||
| 74 | /// | ||
| 75 | /// Since this call borrows the `Mutex` mutably, no actual locking needs to | ||
| 76 | /// take place---the mutable borrow statically guarantees no locks exist. | ||
| 77 | #[inline] | ||
| 78 | pub fn get_mut(&mut self) -> &mut T { | ||
| 79 | unsafe { &mut *self.data.get() } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 83 | /// A mutex that allows borrowing data across executors and interrupts. | ||
| 84 | /// | ||
| 85 | /// # Safety | ||
| 86 | /// | ||
| 87 | /// This mutex is safe to share between different executors and interrupts. | ||
| 88 | pub type CriticalSectionMutex<T> = Mutex<raw::CriticalSectionRawMutex, T>; | ||
| 89 | |||
| 90 | /// A mutex that allows borrowing data in the context of a single executor. | ||
| 91 | /// | ||
| 92 | /// # Safety | ||
| 93 | /// | ||
| 94 | /// **This Mutex is only safe within a single executor.** | ||
| 95 | pub type NoopMutex<T> = Mutex<raw::NoopRawMutex, T>; | ||
| 96 | |||
| 97 | impl<T> Mutex<raw::CriticalSectionRawMutex, T> { | ||
| 98 | /// Borrows the data for the duration of the critical section | ||
| 99 | pub fn borrow<'cs>(&'cs self, _cs: critical_section::CriticalSection<'cs>) -> &'cs T { | ||
| 100 | let ptr = self.data.get() as *const T; | ||
| 101 | unsafe { &*ptr } | ||
| 102 | } | ||
| 103 | } | ||
| 104 | |||
| 105 | impl<T> Mutex<raw::NoopRawMutex, T> { | ||
| 106 | /// Borrows the data | ||
| 107 | pub fn borrow(&self) -> &T { | ||
| 108 | let ptr = self.data.get() as *const T; | ||
| 109 | unsafe { &*ptr } | ||
| 110 | } | ||
| 111 | } | ||
| 112 | |||
| 113 | // ThreadModeMutex does NOT use the generic mutex from above because it's special: | ||
| 114 | // it's Send+Sync even if T: !Send. There's no way to do that without specialization (I think?). | ||
| 115 | // | ||
| 116 | // There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), | ||
| 117 | // but that will require T: Send even though it shouldn't be needed. | ||
| 118 | |||
| 119 | #[cfg(any(cortex_m, feature = "std"))] | ||
| 120 | pub use thread_mode_mutex::*; | ||
| 121 | #[cfg(any(cortex_m, feature = "std"))] | ||
| 122 | mod thread_mode_mutex { | ||
| 123 | use super::*; | ||
| 124 | |||
| 125 | /// A "mutex" that only allows borrowing from thread mode. | ||
| 126 | /// | ||
| 127 | /// # Safety | ||
| 128 | /// | ||
| 129 | /// **This Mutex is only safe on single-core systems.** | ||
| 130 | /// | ||
| 131 | /// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access. | ||
| 132 | pub struct ThreadModeMutex<T: ?Sized> { | ||
| 133 | inner: UnsafeCell<T>, | ||
| 134 | } | ||
| 135 | |||
| 136 | // NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode. | ||
| 137 | // Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can | ||
| 138 | // be Send+Sync even if T is not Send (unlike CriticalSectionMutex) | ||
| 139 | unsafe impl<T: ?Sized> Sync for ThreadModeMutex<T> {} | ||
| 140 | unsafe impl<T: ?Sized> Send for ThreadModeMutex<T> {} | ||
| 141 | |||
| 142 | impl<T> ThreadModeMutex<T> { | ||
| 143 | /// Creates a new mutex | ||
| 144 | pub const fn new(value: T) -> Self { | ||
| 145 | ThreadModeMutex { | ||
| 146 | inner: UnsafeCell::new(value), | ||
| 147 | } | ||
| 148 | } | ||
| 149 | } | ||
| 150 | |||
| 151 | impl<T: ?Sized> ThreadModeMutex<T> { | ||
| 152 | /// Lock the `ThreadModeMutex`, granting access to the data. | ||
| 153 | /// | ||
| 154 | /// # Panics | ||
| 155 | /// | ||
| 156 | /// This will panic if not currently running in thread mode. | ||
| 157 | pub fn lock<R>(&self, f: impl FnOnce(&T) -> R) -> R { | ||
| 158 | f(self.borrow()) | ||
| 159 | } | ||
| 160 | |||
| 161 | /// Borrows the data | ||
| 162 | /// | ||
| 163 | /// # Panics | ||
| 164 | /// | ||
| 165 | /// This will panic if not currently running in thread mode. | ||
| 166 | pub fn borrow(&self) -> &T { | ||
| 167 | assert!( | ||
| 168 | raw::in_thread_mode(), | ||
| 169 | "ThreadModeMutex can only be borrowed from thread mode." | ||
| 170 | ); | ||
| 171 | unsafe { &*self.inner.get() } | ||
| 172 | } | ||
| 173 | } | ||
| 174 | |||
| 175 | impl<T: ?Sized> Drop for ThreadModeMutex<T> { | ||
| 176 | fn drop(&mut self) { | ||
| 177 | // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so | ||
| 178 | // `drop` needs the same guarantees as `lock`. `ThreadModeMutex<T>` is Send even if | ||
| 179 | // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, | ||
| 180 | // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. | ||
| 181 | assert!( | ||
| 182 | raw::in_thread_mode(), | ||
| 183 | "ThreadModeMutex can only be dropped from thread mode." | ||
| 184 | ); | ||
| 185 | |||
| 186 | // Drop of the inner `T` happens after this. | ||
| 187 | } | ||
| 188 | } | ||
| 189 | } | ||
diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs new file mode 100644 index 000000000..15796f1b2 --- /dev/null +++ b/embassy-sync/src/blocking_mutex/raw.rs | |||
| @@ -0,0 +1,149 @@ | |||
| 1 | //! Mutex primitives. | ||
| 2 | //! | ||
| 3 | //! This module provides a trait for mutexes that can be used in different contexts. | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | |||
| 6 | /// Raw mutex trait. | ||
| 7 | /// | ||
| 8 | /// This mutex is "raw", which means it does not actually contain the protected data, it | ||
| 9 | /// just implements the mutex mechanism. For most uses you should use [`super::Mutex`] instead, | ||
| 10 | /// which is generic over a RawMutex and contains the protected data. | ||
| 11 | /// | ||
| 12 | /// Note that, unlike other mutexes, implementations only guarantee no | ||
| 13 | /// concurrent access from other threads: concurrent access from the current | ||
| 14 | /// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly. | ||
| 15 | /// | ||
| 16 | /// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access | ||
| 17 | /// to the data, it is not enough to guarantee exclusive (`&mut`) access. | ||
| 18 | /// | ||
| 19 | /// # Safety | ||
| 20 | /// | ||
| 21 | /// RawMutex implementations must ensure that, while locked, no other thread can lock | ||
| 22 | /// the RawMutex concurrently. | ||
| 23 | /// | ||
| 24 | /// Unsafe code is allowed to rely on this fact, so incorrect implementations will cause undefined behavior. | ||
| 25 | pub unsafe trait RawMutex { | ||
| 26 | /// Create a new `RawMutex` instance. | ||
| 27 | /// | ||
| 28 | /// This is a const instead of a method to allow creating instances in const context. | ||
| 29 | const INIT: Self; | ||
| 30 | |||
| 31 | /// Lock this `RawMutex`. | ||
| 32 | fn lock<R>(&self, f: impl FnOnce() -> R) -> R; | ||
| 33 | } | ||
| 34 | |||
| 35 | /// A mutex that allows borrowing data across executors and interrupts. | ||
| 36 | /// | ||
| 37 | /// # Safety | ||
| 38 | /// | ||
| 39 | /// This mutex is safe to share between different executors and interrupts. | ||
| 40 | pub struct CriticalSectionRawMutex { | ||
| 41 | _phantom: PhantomData<()>, | ||
| 42 | } | ||
| 43 | unsafe impl Send for CriticalSectionRawMutex {} | ||
| 44 | unsafe impl Sync for CriticalSectionRawMutex {} | ||
| 45 | |||
| 46 | impl CriticalSectionRawMutex { | ||
| 47 | /// Create a new `CriticalSectionRawMutex`. | ||
| 48 | pub const fn new() -> Self { | ||
| 49 | Self { _phantom: PhantomData } | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | unsafe impl RawMutex for CriticalSectionRawMutex { | ||
| 54 | const INIT: Self = Self::new(); | ||
| 55 | |||
| 56 | fn lock<R>(&self, f: impl FnOnce() -> R) -> R { | ||
| 57 | critical_section::with(|_| f()) | ||
| 58 | } | ||
| 59 | } | ||
| 60 | |||
| 61 | // ================ | ||
| 62 | |||
| 63 | /// A mutex that allows borrowing data in the context of a single executor. | ||
| 64 | /// | ||
| 65 | /// # Safety | ||
| 66 | /// | ||
| 67 | /// **This Mutex is only safe within a single executor.** | ||
| 68 | pub struct NoopRawMutex { | ||
| 69 | _phantom: PhantomData<*mut ()>, | ||
| 70 | } | ||
| 71 | |||
| 72 | unsafe impl Send for NoopRawMutex {} | ||
| 73 | |||
| 74 | impl NoopRawMutex { | ||
| 75 | /// Create a new `NoopRawMutex`. | ||
| 76 | pub const fn new() -> Self { | ||
| 77 | Self { _phantom: PhantomData } | ||
| 78 | } | ||
| 79 | } | ||
| 80 | |||
| 81 | unsafe impl RawMutex for NoopRawMutex { | ||
| 82 | const INIT: Self = Self::new(); | ||
| 83 | fn lock<R>(&self, f: impl FnOnce() -> R) -> R { | ||
| 84 | f() | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | // ================ | ||
| 89 | |||
| 90 | #[cfg(any(cortex_m, feature = "std"))] | ||
| 91 | mod thread_mode { | ||
| 92 | use super::*; | ||
| 93 | |||
| 94 | /// A "mutex" that only allows borrowing from thread mode. | ||
| 95 | /// | ||
| 96 | /// # Safety | ||
| 97 | /// | ||
| 98 | /// **This Mutex is only safe on single-core systems.** | ||
| 99 | /// | ||
| 100 | /// On multi-core systems, a `ThreadModeRawMutex` **is not sufficient** to ensure exclusive access. | ||
| 101 | pub struct ThreadModeRawMutex { | ||
| 102 | _phantom: PhantomData<()>, | ||
| 103 | } | ||
| 104 | |||
| 105 | unsafe impl Send for ThreadModeRawMutex {} | ||
| 106 | unsafe impl Sync for ThreadModeRawMutex {} | ||
| 107 | |||
| 108 | impl ThreadModeRawMutex { | ||
| 109 | /// Create a new `ThreadModeRawMutex`. | ||
| 110 | pub const fn new() -> Self { | ||
| 111 | Self { _phantom: PhantomData } | ||
| 112 | } | ||
| 113 | } | ||
| 114 | |||
| 115 | unsafe impl RawMutex for ThreadModeRawMutex { | ||
| 116 | const INIT: Self = Self::new(); | ||
| 117 | fn lock<R>(&self, f: impl FnOnce() -> R) -> R { | ||
| 118 | assert!(in_thread_mode(), "ThreadModeMutex can only be locked from thread mode."); | ||
| 119 | |||
| 120 | f() | ||
| 121 | } | ||
| 122 | } | ||
| 123 | |||
| 124 | impl Drop for ThreadModeRawMutex { | ||
| 125 | fn drop(&mut self) { | ||
| 126 | // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so | ||
| 127 | // `drop` needs the same guarantees as `lock`. `ThreadModeMutex<T>` is Send even if | ||
| 128 | // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, | ||
| 129 | // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. | ||
| 130 | assert!( | ||
| 131 | in_thread_mode(), | ||
| 132 | "ThreadModeMutex can only be dropped from thread mode." | ||
| 133 | ); | ||
| 134 | |||
| 135 | // Drop of the inner `T` happens after this. | ||
| 136 | } | ||
| 137 | } | ||
| 138 | |||
| 139 | pub(crate) fn in_thread_mode() -> bool { | ||
| 140 | #[cfg(feature = "std")] | ||
| 141 | return Some("main") == std::thread::current().name(); | ||
| 142 | |||
| 143 | #[cfg(not(feature = "std"))] | ||
| 144 | // ICSR.VECTACTIVE == 0 | ||
| 145 | return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; | ||
| 146 | } | ||
| 147 | } | ||
| 148 | #[cfg(any(cortex_m, feature = "std"))] | ||
| 149 | pub use thread_mode::*; | ||
diff --git a/embassy-sync/src/channel/mod.rs b/embassy-sync/src/channel/mod.rs new file mode 100644 index 000000000..5df1f5c5c --- /dev/null +++ b/embassy-sync/src/channel/mod.rs | |||
| @@ -0,0 +1,5 @@ | |||
| 1 | //! Async channels | ||
| 2 | |||
| 3 | pub mod mpmc; | ||
| 4 | pub mod pubsub; | ||
| 5 | pub mod signal; | ||
diff --git a/embassy-sync/src/channel/mpmc.rs b/embassy-sync/src/channel/mpmc.rs new file mode 100644 index 000000000..7bebd3412 --- /dev/null +++ b/embassy-sync/src/channel/mpmc.rs | |||
| @@ -0,0 +1,596 @@ | |||
| 1 | //! A queue for sending values between asynchronous tasks. | ||
| 2 | //! | ||
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | ||
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | ||
| 5 | //! | ||
| 6 | //! Receivers are competing for messages. So a message that is received by | ||
| 7 | //! one receiver is not received by any other. | ||
| 8 | //! | ||
| 9 | //! This queue takes a Mutex type so that various | ||
| 10 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 11 | //! for single-core Cortex-M targets where messages are only passed | ||
| 12 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 13 | //! can also be used for single-core targets where messages are to be | ||
| 14 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 15 | //! | ||
| 16 | //! This module provides a bounded channel that has a limit on the number of | ||
| 17 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 18 | //! another message will result in an error being returned. | ||
| 19 | //! | ||
| 20 | |||
| 21 | use core::cell::RefCell; | ||
| 22 | use core::future::Future; | ||
| 23 | use core::pin::Pin; | ||
| 24 | use core::task::{Context, Poll}; | ||
| 25 | |||
| 26 | use heapless::Deque; | ||
| 27 | |||
| 28 | use crate::blocking_mutex::raw::RawMutex; | ||
| 29 | use crate::blocking_mutex::Mutex; | ||
| 30 | use crate::waitqueue::WakerRegistration; | ||
| 31 | |||
| 32 | /// Send-only access to a [`Channel`]. | ||
| 33 | #[derive(Copy)] | ||
| 34 | pub struct Sender<'ch, M, T, const N: usize> | ||
| 35 | where | ||
| 36 | M: RawMutex, | ||
| 37 | { | ||
| 38 | channel: &'ch Channel<M, T, N>, | ||
| 39 | } | ||
| 40 | |||
| 41 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | ||
| 42 | where | ||
| 43 | M: RawMutex, | ||
| 44 | { | ||
| 45 | fn clone(&self) -> Self { | ||
| 46 | Sender { channel: self.channel } | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | ||
| 51 | where | ||
| 52 | M: RawMutex, | ||
| 53 | { | ||
| 54 | /// Sends a value. | ||
| 55 | /// | ||
| 56 | /// See [`Channel::send()`] | ||
| 57 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { | ||
| 58 | self.channel.send(message) | ||
| 59 | } | ||
| 60 | |||
| 61 | /// Attempt to immediately send a message. | ||
| 62 | /// | ||
| 63 | /// See [`Channel::send()`] | ||
| 64 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 65 | self.channel.try_send(message) | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 70 | #[derive(Copy)] | ||
| 71 | pub struct DynamicSender<'ch, T> { | ||
| 72 | channel: &'ch dyn DynamicChannel<T>, | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'ch, T> Clone for DynamicSender<'ch, T> { | ||
| 76 | fn clone(&self) -> Self { | ||
| 77 | DynamicSender { channel: self.channel } | ||
| 78 | } | ||
| 79 | } | ||
| 80 | |||
| 81 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T> | ||
| 82 | where | ||
| 83 | M: RawMutex, | ||
| 84 | { | ||
| 85 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 86 | Self { channel: s.channel } | ||
| 87 | } | ||
| 88 | } | ||
| 89 | |||
| 90 | impl<'ch, T> DynamicSender<'ch, T> { | ||
| 91 | /// Sends a value. | ||
| 92 | /// | ||
| 93 | /// See [`Channel::send()`] | ||
| 94 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 95 | DynamicSendFuture { | ||
| 96 | channel: self.channel, | ||
| 97 | message: Some(message), | ||
| 98 | } | ||
| 99 | } | ||
| 100 | |||
| 101 | /// Attempt to immediately send a message. | ||
| 102 | /// | ||
| 103 | /// See [`Channel::send()`] | ||
| 104 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 105 | self.channel.try_send_with_context(message, None) | ||
| 106 | } | ||
| 107 | } | ||
| 108 | |||
| 109 | /// Receive-only access to a [`Channel`]. | ||
| 110 | #[derive(Copy)] | ||
| 111 | pub struct Receiver<'ch, M, T, const N: usize> | ||
| 112 | where | ||
| 113 | M: RawMutex, | ||
| 114 | { | ||
| 115 | channel: &'ch Channel<M, T, N>, | ||
| 116 | } | ||
| 117 | |||
| 118 | impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> | ||
| 119 | where | ||
| 120 | M: RawMutex, | ||
| 121 | { | ||
| 122 | fn clone(&self) -> Self { | ||
| 123 | Receiver { channel: self.channel } | ||
| 124 | } | ||
| 125 | } | ||
| 126 | |||
| 127 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | ||
| 128 | where | ||
| 129 | M: RawMutex, | ||
| 130 | { | ||
| 131 | /// Receive the next value. | ||
| 132 | /// | ||
| 133 | /// See [`Channel::recv()`]. | ||
| 134 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | ||
| 135 | self.channel.recv() | ||
| 136 | } | ||
| 137 | |||
| 138 | /// Attempt to immediately receive the next value. | ||
| 139 | /// | ||
| 140 | /// See [`Channel::try_recv()`] | ||
| 141 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 142 | self.channel.try_recv() | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 147 | #[derive(Copy)] | ||
| 148 | pub struct DynamicReceiver<'ch, T> { | ||
| 149 | channel: &'ch dyn DynamicChannel<T>, | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'ch, T> Clone for DynamicReceiver<'ch, T> { | ||
| 153 | fn clone(&self) -> Self { | ||
| 154 | DynamicReceiver { channel: self.channel } | ||
| 155 | } | ||
| 156 | } | ||
| 157 | |||
| 158 | impl<'ch, T> DynamicReceiver<'ch, T> { | ||
| 159 | /// Receive the next value. | ||
| 160 | /// | ||
| 161 | /// See [`Channel::recv()`]. | ||
| 162 | pub fn recv(&self) -> DynamicRecvFuture<'_, T> { | ||
| 163 | DynamicRecvFuture { channel: self.channel } | ||
| 164 | } | ||
| 165 | |||
| 166 | /// Attempt to immediately receive the next value. | ||
| 167 | /// | ||
| 168 | /// See [`Channel::try_recv()`] | ||
| 169 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 170 | self.channel.try_recv_with_context(None) | ||
| 171 | } | ||
| 172 | } | ||
| 173 | |||
| 174 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T> | ||
| 175 | where | ||
| 176 | M: RawMutex, | ||
| 177 | { | ||
| 178 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 179 | Self { channel: s.channel } | ||
| 180 | } | ||
| 181 | } | ||
| 182 | |||
| 183 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. | ||
| 184 | pub struct RecvFuture<'ch, M, T, const N: usize> | ||
| 185 | where | ||
| 186 | M: RawMutex, | ||
| 187 | { | ||
| 188 | channel: &'ch Channel<M, T, N>, | ||
| 189 | } | ||
| 190 | |||
| 191 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | ||
| 192 | where | ||
| 193 | M: RawMutex, | ||
| 194 | { | ||
| 195 | type Output = T; | ||
| 196 | |||
| 197 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 198 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 199 | Ok(v) => Poll::Ready(v), | ||
| 200 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 201 | } | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | /// Future returned by [`DynamicReceiver::recv`]. | ||
| 206 | pub struct DynamicRecvFuture<'ch, T> { | ||
| 207 | channel: &'ch dyn DynamicChannel<T>, | ||
| 208 | } | ||
| 209 | |||
| 210 | impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | ||
| 211 | type Output = T; | ||
| 212 | |||
| 213 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 214 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 215 | Ok(v) => Poll::Ready(v), | ||
| 216 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 217 | } | ||
| 218 | } | ||
| 219 | } | ||
| 220 | |||
| 221 | /// Future returned by [`Channel::send`] and [`Sender::send`]. | ||
| 222 | pub struct SendFuture<'ch, M, T, const N: usize> | ||
| 223 | where | ||
| 224 | M: RawMutex, | ||
| 225 | { | ||
| 226 | channel: &'ch Channel<M, T, N>, | ||
| 227 | message: Option<T>, | ||
| 228 | } | ||
| 229 | |||
| 230 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | ||
| 231 | where | ||
| 232 | M: RawMutex, | ||
| 233 | { | ||
| 234 | type Output = (); | ||
| 235 | |||
| 236 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 237 | match self.message.take() { | ||
| 238 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 239 | Ok(..) => Poll::Ready(()), | ||
| 240 | Err(TrySendError::Full(m)) => { | ||
| 241 | self.message = Some(m); | ||
| 242 | Poll::Pending | ||
| 243 | } | ||
| 244 | }, | ||
| 245 | None => panic!("Message cannot be None"), | ||
| 246 | } | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | ||
| 251 | |||
| 252 | /// Future returned by [`DynamicSender::send`]. | ||
| 253 | pub struct DynamicSendFuture<'ch, T> { | ||
| 254 | channel: &'ch dyn DynamicChannel<T>, | ||
| 255 | message: Option<T>, | ||
| 256 | } | ||
| 257 | |||
| 258 | impl<'ch, T> Future for DynamicSendFuture<'ch, T> { | ||
| 259 | type Output = (); | ||
| 260 | |||
| 261 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 262 | match self.message.take() { | ||
| 263 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 264 | Ok(..) => Poll::Ready(()), | ||
| 265 | Err(TrySendError::Full(m)) => { | ||
| 266 | self.message = Some(m); | ||
| 267 | Poll::Pending | ||
| 268 | } | ||
| 269 | }, | ||
| 270 | None => panic!("Message cannot be None"), | ||
| 271 | } | ||
| 272 | } | ||
| 273 | } | ||
| 274 | |||
| 275 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} | ||
| 276 | |||
| 277 | trait DynamicChannel<T> { | ||
| 278 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; | ||
| 279 | |||
| 280 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; | ||
| 281 | } | ||
| 282 | |||
| 283 | /// Error returned by [`try_recv`](Channel::try_recv). | ||
| 284 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 285 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 286 | pub enum TryRecvError { | ||
| 287 | /// A message could not be received because the channel is empty. | ||
| 288 | Empty, | ||
| 289 | } | ||
| 290 | |||
| 291 | /// Error returned by [`try_send`](Channel::try_send). | ||
| 292 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 293 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 294 | pub enum TrySendError<T> { | ||
| 295 | /// The data could not be sent on the channel because the channel is | ||
| 296 | /// currently full and sending would require blocking. | ||
| 297 | Full(T), | ||
| 298 | } | ||
| 299 | |||
| 300 | struct ChannelState<T, const N: usize> { | ||
| 301 | queue: Deque<T, N>, | ||
| 302 | receiver_waker: WakerRegistration, | ||
| 303 | senders_waker: WakerRegistration, | ||
| 304 | } | ||
| 305 | |||
| 306 | impl<T, const N: usize> ChannelState<T, N> { | ||
| 307 | const fn new() -> Self { | ||
| 308 | ChannelState { | ||
| 309 | queue: Deque::new(), | ||
| 310 | receiver_waker: WakerRegistration::new(), | ||
| 311 | senders_waker: WakerRegistration::new(), | ||
| 312 | } | ||
| 313 | } | ||
| 314 | |||
| 315 | fn try_recv(&mut self) -> Result<T, TryRecvError> { | ||
| 316 | self.try_recv_with_context(None) | ||
| 317 | } | ||
| 318 | |||
| 319 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 320 | if self.queue.is_full() { | ||
| 321 | self.senders_waker.wake(); | ||
| 322 | } | ||
| 323 | |||
| 324 | if let Some(message) = self.queue.pop_front() { | ||
| 325 | Ok(message) | ||
| 326 | } else { | ||
| 327 | if let Some(cx) = cx { | ||
| 328 | self.receiver_waker.register(cx.waker()); | ||
| 329 | } | ||
| 330 | Err(TryRecvError::Empty) | ||
| 331 | } | ||
| 332 | } | ||
| 333 | |||
| 334 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { | ||
| 335 | self.try_send_with_context(message, None) | ||
| 336 | } | ||
| 337 | |||
| 338 | fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | ||
| 339 | match self.queue.push_back(message) { | ||
| 340 | Ok(()) => { | ||
| 341 | self.receiver_waker.wake(); | ||
| 342 | Ok(()) | ||
| 343 | } | ||
| 344 | Err(message) => { | ||
| 345 | if let Some(cx) = cx { | ||
| 346 | self.senders_waker.register(cx.waker()); | ||
| 347 | } | ||
| 348 | Err(TrySendError::Full(message)) | ||
| 349 | } | ||
| 350 | } | ||
| 351 | } | ||
| 352 | } | ||
| 353 | |||
| 354 | /// A bounded channel for communicating between asynchronous tasks | ||
| 355 | /// with backpressure. | ||
| 356 | /// | ||
| 357 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 358 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 359 | /// received from the channel. | ||
| 360 | /// | ||
| 361 | /// All data sent will become available in the same order as it was sent. | ||
| 362 | pub struct Channel<M, T, const N: usize> | ||
| 363 | where | ||
| 364 | M: RawMutex, | ||
| 365 | { | ||
| 366 | inner: Mutex<M, RefCell<ChannelState<T, N>>>, | ||
| 367 | } | ||
| 368 | |||
| 369 | impl<M, T, const N: usize> Channel<M, T, N> | ||
| 370 | where | ||
| 371 | M: RawMutex, | ||
| 372 | { | ||
| 373 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 374 | /// | ||
| 375 | /// ``` | ||
| 376 | /// use embassy_sync::channel::mpmc::Channel; | ||
| 377 | /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; | ||
| 378 | /// | ||
| 379 | /// // Declare a bounded channel of 3 u32s. | ||
| 380 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 381 | /// ``` | ||
| 382 | pub const fn new() -> Self { | ||
| 383 | Self { | ||
| 384 | inner: Mutex::new(RefCell::new(ChannelState::new())), | ||
| 385 | } | ||
| 386 | } | ||
| 387 | |||
| 388 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { | ||
| 389 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | ||
| 390 | } | ||
| 391 | |||
| 392 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 393 | self.lock(|c| c.try_recv_with_context(cx)) | ||
| 394 | } | ||
| 395 | |||
| 396 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | ||
| 397 | self.lock(|c| c.try_send_with_context(m, cx)) | ||
| 398 | } | ||
| 399 | |||
| 400 | /// Get a sender for this channel. | ||
| 401 | pub fn sender(&self) -> Sender<'_, M, T, N> { | ||
| 402 | Sender { channel: self } | ||
| 403 | } | ||
| 404 | |||
| 405 | /// Get a receiver for this channel. | ||
| 406 | pub fn receiver(&self) -> Receiver<'_, M, T, N> { | ||
| 407 | Receiver { channel: self } | ||
| 408 | } | ||
| 409 | |||
| 410 | /// Send a value, waiting until there is capacity. | ||
| 411 | /// | ||
| 412 | /// Sending completes when the value has been pushed to the channel's queue. | ||
| 413 | /// This doesn't mean the value has been received yet. | ||
| 414 | pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { | ||
| 415 | SendFuture { | ||
| 416 | channel: self, | ||
| 417 | message: Some(message), | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | /// Attempt to immediately send a message. | ||
| 422 | /// | ||
| 423 | /// This method differs from [`send`](Channel::send) by returning immediately if the channel's | ||
| 424 | /// buffer is full, instead of waiting. | ||
| 425 | /// | ||
| 426 | /// # Errors | ||
| 427 | /// | ||
| 428 | /// If the channel capacity has been reached, i.e., the channel has `n` | ||
| 429 | /// buffered values where `n` is the argument passed to [`Channel`], then an | ||
| 430 | /// error is returned. | ||
| 431 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 432 | self.lock(|c| c.try_send(message)) | ||
| 433 | } | ||
| 434 | |||
| 435 | /// Receive the next value. | ||
| 436 | /// | ||
| 437 | /// If there are no messages in the channel's buffer, this method will | ||
| 438 | /// wait until a message is sent. | ||
| 439 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | ||
| 440 | RecvFuture { channel: self } | ||
| 441 | } | ||
| 442 | |||
| 443 | /// Attempt to immediately receive a message. | ||
| 444 | /// | ||
| 445 | /// This method will either receive a message from the channel immediately or return an error | ||
| 446 | /// if the channel is empty. | ||
| 447 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 448 | self.lock(|c| c.try_recv()) | ||
| 449 | } | ||
| 450 | } | ||
| 451 | |||
| 452 | /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the | ||
| 453 | /// tradeoff cost of dynamic dispatch. | ||
| 454 | impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N> | ||
| 455 | where | ||
| 456 | M: RawMutex, | ||
| 457 | { | ||
| 458 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | ||
| 459 | Channel::try_send_with_context(self, m, cx) | ||
| 460 | } | ||
| 461 | |||
| 462 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 463 | Channel::try_recv_with_context(self, cx) | ||
| 464 | } | ||
| 465 | } | ||
| 466 | |||
| 467 | #[cfg(test)] | ||
| 468 | mod tests { | ||
| 469 | use core::time::Duration; | ||
| 470 | |||
| 471 | use futures_executor::ThreadPool; | ||
| 472 | use futures_timer::Delay; | ||
| 473 | use futures_util::task::SpawnExt; | ||
| 474 | use static_cell::StaticCell; | ||
| 475 | |||
| 476 | use super::*; | ||
| 477 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; | ||
| 478 | |||
| 479 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | ||
| 480 | c.queue.capacity() - c.queue.len() | ||
| 481 | } | ||
| 482 | |||
| 483 | #[test] | ||
| 484 | fn sending_once() { | ||
| 485 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 486 | assert!(c.try_send(1).is_ok()); | ||
| 487 | assert_eq!(capacity(&c), 2); | ||
| 488 | } | ||
| 489 | |||
| 490 | #[test] | ||
| 491 | fn sending_when_full() { | ||
| 492 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 493 | let _ = c.try_send(1); | ||
| 494 | let _ = c.try_send(1); | ||
| 495 | let _ = c.try_send(1); | ||
| 496 | match c.try_send(2) { | ||
| 497 | Err(TrySendError::Full(2)) => assert!(true), | ||
| 498 | _ => assert!(false), | ||
| 499 | } | ||
| 500 | assert_eq!(capacity(&c), 0); | ||
| 501 | } | ||
| 502 | |||
| 503 | #[test] | ||
| 504 | fn receiving_once_with_one_send() { | ||
| 505 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 506 | assert!(c.try_send(1).is_ok()); | ||
| 507 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 508 | assert_eq!(capacity(&c), 3); | ||
| 509 | } | ||
| 510 | |||
| 511 | #[test] | ||
| 512 | fn receiving_when_empty() { | ||
| 513 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 514 | match c.try_recv() { | ||
| 515 | Err(TryRecvError::Empty) => assert!(true), | ||
| 516 | _ => assert!(false), | ||
| 517 | } | ||
| 518 | assert_eq!(capacity(&c), 3); | ||
| 519 | } | ||
| 520 | |||
| 521 | #[test] | ||
| 522 | fn simple_send_and_receive() { | ||
| 523 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 524 | assert!(c.try_send(1).is_ok()); | ||
| 525 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 526 | } | ||
| 527 | |||
| 528 | #[test] | ||
| 529 | fn cloning() { | ||
| 530 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 531 | let r1 = c.receiver(); | ||
| 532 | let s1 = c.sender(); | ||
| 533 | |||
| 534 | let _ = r1.clone(); | ||
| 535 | let _ = s1.clone(); | ||
| 536 | } | ||
| 537 | |||
| 538 | #[test] | ||
| 539 | fn dynamic_dispatch() { | ||
| 540 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 541 | let s: DynamicSender<'_, u32> = c.sender().into(); | ||
| 542 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | ||
| 543 | |||
| 544 | assert!(s.try_send(1).is_ok()); | ||
| 545 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 546 | } | ||
| 547 | |||
| 548 | #[futures_test::test] | ||
| 549 | async fn receiver_receives_given_try_send_async() { | ||
| 550 | let executor = ThreadPool::new().unwrap(); | ||
| 551 | |||
| 552 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); | ||
| 553 | let c = &*CHANNEL.init(Channel::new()); | ||
| 554 | let c2 = c; | ||
| 555 | assert!(executor | ||
| 556 | .spawn(async move { | ||
| 557 | assert!(c2.try_send(1).is_ok()); | ||
| 558 | }) | ||
| 559 | .is_ok()); | ||
| 560 | assert_eq!(c.recv().await, 1); | ||
| 561 | } | ||
| 562 | |||
| 563 | #[futures_test::test] | ||
| 564 | async fn sender_send_completes_if_capacity() { | ||
| 565 | let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); | ||
| 566 | c.send(1).await; | ||
| 567 | assert_eq!(c.recv().await, 1); | ||
| 568 | } | ||
| 569 | |||
| 570 | #[futures_test::test] | ||
| 571 | async fn senders_sends_wait_until_capacity() { | ||
| 572 | let executor = ThreadPool::new().unwrap(); | ||
| 573 | |||
| 574 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new(); | ||
| 575 | let c = &*CHANNEL.init(Channel::new()); | ||
| 576 | assert!(c.try_send(1).is_ok()); | ||
| 577 | |||
| 578 | let c2 = c; | ||
| 579 | let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); | ||
| 580 | let c2 = c; | ||
| 581 | let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); | ||
| 582 | // Wish I could think of a means of determining that the async send is waiting instead. | ||
| 583 | // However, I've used the debugger to observe that the send does indeed wait. | ||
| 584 | Delay::new(Duration::from_millis(500)).await; | ||
| 585 | assert_eq!(c.recv().await, 1); | ||
| 586 | assert!(executor | ||
| 587 | .spawn(async move { | ||
| 588 | loop { | ||
| 589 | c.recv().await; | ||
| 590 | } | ||
| 591 | }) | ||
| 592 | .is_ok()); | ||
| 593 | send_task_1.unwrap().await; | ||
| 594 | send_task_2.unwrap().await; | ||
| 595 | } | ||
| 596 | } | ||
diff --git a/embassy-sync/src/channel/pubsub/mod.rs b/embassy-sync/src/channel/pubsub/mod.rs new file mode 100644 index 000000000..f62b4d118 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/mod.rs | |||
| @@ -0,0 +1,542 @@ | |||
| 1 | //! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. | ||
| 2 | |||
| 3 | #![deny(missing_docs)] | ||
| 4 | |||
| 5 | use core::cell::RefCell; | ||
| 6 | use core::fmt::Debug; | ||
| 7 | use core::task::{Context, Poll, Waker}; | ||
| 8 | |||
| 9 | use heapless::Deque; | ||
| 10 | |||
| 11 | use self::publisher::{ImmediatePub, Pub}; | ||
| 12 | use self::subscriber::Sub; | ||
| 13 | use crate::blocking_mutex::raw::RawMutex; | ||
| 14 | use crate::blocking_mutex::Mutex; | ||
| 15 | use crate::waitqueue::MultiWakerRegistration; | ||
| 16 | |||
| 17 | pub mod publisher; | ||
| 18 | pub mod subscriber; | ||
| 19 | |||
| 20 | pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; | ||
| 21 | pub use subscriber::{DynSubscriber, Subscriber}; | ||
| 22 | |||
| 23 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers | ||
| 24 | /// | ||
| 25 | /// Any published message can be read by all subscribers. | ||
| 26 | /// A publisher can choose how it sends its message. | ||
| 27 | /// | ||
| 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. | ||
| 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message | ||
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive | ||
| 31 | /// an error to indicate that it has lagged. | ||
| 32 | /// | ||
| 33 | /// ## Example | ||
| 34 | /// | ||
| 35 | /// ``` | ||
| 36 | /// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; | ||
| 37 | /// # use embassy_sync::channel::pubsub::WaitResult; | ||
| 38 | /// # use embassy_sync::channel::pubsub::PubSubChannel; | ||
| 39 | /// # use futures_executor::block_on; | ||
| 40 | /// # let test = async { | ||
| 41 | /// // Create the channel. This can be static as well | ||
| 42 | /// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 43 | /// | ||
| 44 | /// // This is a generic subscriber with a direct reference to the channel | ||
| 45 | /// let mut sub0 = channel.subscriber().unwrap(); | ||
| 46 | /// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel | ||
| 47 | /// let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 48 | /// | ||
| 49 | /// let pub0 = channel.publisher().unwrap(); | ||
| 50 | /// | ||
| 51 | /// // Publish a message, but wait if the queue is full | ||
| 52 | /// pub0.publish(42).await; | ||
| 53 | /// | ||
| 54 | /// // Publish a message, but if the queue is full, just kick out the oldest message. | ||
| 55 | /// // This may cause some subscribers to miss a message | ||
| 56 | /// pub0.publish_immediate(43); | ||
| 57 | /// | ||
| 58 | /// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result | ||
| 59 | /// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 60 | /// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 61 | /// | ||
| 62 | /// // Wait again, but this time ignore any Lag results | ||
| 63 | /// assert_eq!(sub0.next_message_pure().await, 43); | ||
| 64 | /// assert_eq!(sub1.next_message_pure().await, 43); | ||
| 65 | /// | ||
| 66 | /// // There's also a polling interface | ||
| 67 | /// assert_eq!(sub0.try_next_message(), None); | ||
| 68 | /// assert_eq!(sub1.try_next_message(), None); | ||
| 69 | /// # }; | ||
| 70 | /// # | ||
| 71 | /// # block_on(test); | ||
| 72 | /// ``` | ||
| 73 | /// | ||
| 74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | ||
| 75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, | ||
| 76 | } | ||
| 77 | |||
| 78 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> | ||
| 79 | PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 80 | { | ||
| 81 | /// Create a new channel | ||
| 82 | pub const fn new() -> Self { | ||
| 83 | Self { | ||
| 84 | inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | /// Create a new subscriber. It will only receive messages that are published after its creation. | ||
| 89 | /// | ||
| 90 | /// If there are no subscriber slots left, an error will be returned. | ||
| 91 | pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { | ||
| 92 | self.inner.lock(|inner| { | ||
| 93 | let mut s = inner.borrow_mut(); | ||
| 94 | |||
| 95 | if s.subscriber_count >= SUBS { | ||
| 96 | Err(Error::MaximumSubscribersReached) | ||
| 97 | } else { | ||
| 98 | s.subscriber_count += 1; | ||
| 99 | Ok(Subscriber(Sub::new(s.next_message_id, self))) | ||
| 100 | } | ||
| 101 | }) | ||
| 102 | } | ||
| 103 | |||
| 104 | /// Create a new subscriber. It will only receive messages that are published after its creation. | ||
| 105 | /// | ||
| 106 | /// If there are no subscriber slots left, an error will be returned. | ||
| 107 | pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> { | ||
| 108 | self.inner.lock(|inner| { | ||
| 109 | let mut s = inner.borrow_mut(); | ||
| 110 | |||
| 111 | if s.subscriber_count >= SUBS { | ||
| 112 | Err(Error::MaximumSubscribersReached) | ||
| 113 | } else { | ||
| 114 | s.subscriber_count += 1; | ||
| 115 | Ok(DynSubscriber(Sub::new(s.next_message_id, self))) | ||
| 116 | } | ||
| 117 | }) | ||
| 118 | } | ||
| 119 | |||
| 120 | /// Create a new publisher | ||
| 121 | /// | ||
| 122 | /// If there are no publisher slots left, an error will be returned. | ||
| 123 | pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { | ||
| 124 | self.inner.lock(|inner| { | ||
| 125 | let mut s = inner.borrow_mut(); | ||
| 126 | |||
| 127 | if s.publisher_count >= PUBS { | ||
| 128 | Err(Error::MaximumPublishersReached) | ||
| 129 | } else { | ||
| 130 | s.publisher_count += 1; | ||
| 131 | Ok(Publisher(Pub::new(self))) | ||
| 132 | } | ||
| 133 | }) | ||
| 134 | } | ||
| 135 | |||
| 136 | /// Create a new publisher | ||
| 137 | /// | ||
| 138 | /// If there are no publisher slots left, an error will be returned. | ||
| 139 | pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> { | ||
| 140 | self.inner.lock(|inner| { | ||
| 141 | let mut s = inner.borrow_mut(); | ||
| 142 | |||
| 143 | if s.publisher_count >= PUBS { | ||
| 144 | Err(Error::MaximumPublishersReached) | ||
| 145 | } else { | ||
| 146 | s.publisher_count += 1; | ||
| 147 | Ok(DynPublisher(Pub::new(self))) | ||
| 148 | } | ||
| 149 | }) | ||
| 150 | } | ||
| 151 | |||
| 152 | /// Create a new publisher that can only send immediate messages. | ||
| 153 | /// This kind of publisher does not take up a publisher slot. | ||
| 154 | pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { | ||
| 155 | ImmediatePublisher(ImmediatePub::new(self)) | ||
| 156 | } | ||
| 157 | |||
| 158 | /// Create a new publisher that can only send immediate messages. | ||
| 159 | /// This kind of publisher does not take up a publisher slot. | ||
| 160 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { | ||
| 161 | DynImmediatePublisher(ImmediatePub::new(self)) | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> | ||
| 166 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 167 | { | ||
| 168 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { | ||
| 169 | self.inner.lock(|s| { | ||
| 170 | let mut s = s.borrow_mut(); | ||
| 171 | |||
| 172 | // Check if we can read a message | ||
| 173 | match s.get_message(*next_message_id) { | ||
| 174 | // Yes, so we are done polling | ||
| 175 | Some(WaitResult::Message(message)) => { | ||
| 176 | *next_message_id += 1; | ||
| 177 | Poll::Ready(WaitResult::Message(message)) | ||
| 178 | } | ||
| 179 | // No, so we need to reregister our waker and sleep again | ||
| 180 | None => { | ||
| 181 | if let Some(cx) = cx { | ||
| 182 | s.register_subscriber_waker(cx.waker()); | ||
| 183 | } | ||
| 184 | Poll::Pending | ||
| 185 | } | ||
| 186 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged | ||
| 187 | Some(WaitResult::Lagged(amount)) => { | ||
| 188 | *next_message_id += amount; | ||
| 189 | Poll::Ready(WaitResult::Lagged(amount)) | ||
| 190 | } | ||
| 191 | } | ||
| 192 | }) | ||
| 193 | } | ||
| 194 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | ||
| 196 | self.inner.lock(|s| { | ||
| 197 | let mut s = s.borrow_mut(); | ||
| 198 | // Try to publish the message | ||
| 199 | match s.try_publish(message) { | ||
| 200 | // We did it, we are ready | ||
| 201 | Ok(()) => Ok(()), | ||
| 202 | // The queue is full, so we need to reregister our waker and go to sleep | ||
| 203 | Err(message) => { | ||
| 204 | if let Some(cx) = cx { | ||
| 205 | s.register_publisher_waker(cx.waker()); | ||
| 206 | } | ||
| 207 | Err(message) | ||
| 208 | } | ||
| 209 | } | ||
| 210 | }) | ||
| 211 | } | ||
| 212 | |||
| 213 | fn publish_immediate(&self, message: T) { | ||
| 214 | self.inner.lock(|s| { | ||
| 215 | let mut s = s.borrow_mut(); | ||
| 216 | s.publish_immediate(message) | ||
| 217 | }) | ||
| 218 | } | ||
| 219 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | ||
| 221 | self.inner.lock(|s| { | ||
| 222 | let mut s = s.borrow_mut(); | ||
| 223 | s.unregister_subscriber(subscriber_next_message_id) | ||
| 224 | }) | ||
| 225 | } | ||
| 226 | |||
| 227 | fn unregister_publisher(&self) { | ||
| 228 | self.inner.lock(|s| { | ||
| 229 | let mut s = s.borrow_mut(); | ||
| 230 | s.unregister_publisher() | ||
| 231 | }) | ||
| 232 | } | ||
| 233 | } | ||
| 234 | |||
| 235 | /// Internal state for the PubSub channel | ||
| 236 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | ||
| 237 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it | ||
| 238 | queue: Deque<(T, usize), CAP>, | ||
| 239 | /// Every message has an id. | ||
| 240 | /// Don't worry, we won't run out. | ||
| 241 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. | ||
| 242 | next_message_id: u64, | ||
| 243 | /// Collection of wakers for Subscribers that are waiting. | ||
| 244 | subscriber_wakers: MultiWakerRegistration<SUBS>, | ||
| 245 | /// Collection of wakers for Publishers that are waiting. | ||
| 246 | publisher_wakers: MultiWakerRegistration<PUBS>, | ||
| 247 | /// The amount of subscribers that are active | ||
| 248 | subscriber_count: usize, | ||
| 249 | /// The amount of publishers that are active | ||
| 250 | publisher_count: usize, | ||
| 251 | } | ||
| 252 | |||
| 253 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { | ||
| 254 | /// Create a new internal channel state | ||
| 255 | const fn new() -> Self { | ||
| 256 | Self { | ||
| 257 | queue: Deque::new(), | ||
| 258 | next_message_id: 0, | ||
| 259 | subscriber_wakers: MultiWakerRegistration::new(), | ||
| 260 | publisher_wakers: MultiWakerRegistration::new(), | ||
| 261 | subscriber_count: 0, | ||
| 262 | publisher_count: 0, | ||
| 263 | } | ||
| 264 | } | ||
| 265 | |||
| 266 | fn try_publish(&mut self, message: T) -> Result<(), T> { | ||
| 267 | if self.subscriber_count == 0 { | ||
| 268 | // We don't need to publish anything because there is no one to receive it | ||
| 269 | return Ok(()); | ||
| 270 | } | ||
| 271 | |||
| 272 | if self.queue.is_full() { | ||
| 273 | return Err(message); | ||
| 274 | } | ||
| 275 | // We just did a check for this | ||
| 276 | self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); | ||
| 277 | |||
| 278 | self.next_message_id += 1; | ||
| 279 | |||
| 280 | // Wake all of the subscribers | ||
| 281 | self.subscriber_wakers.wake(); | ||
| 282 | |||
| 283 | Ok(()) | ||
| 284 | } | ||
| 285 | |||
| 286 | fn publish_immediate(&mut self, message: T) { | ||
| 287 | // Make space in the queue if required | ||
| 288 | if self.queue.is_full() { | ||
| 289 | self.queue.pop_front(); | ||
| 290 | } | ||
| 291 | |||
| 292 | // This will succeed because we made sure there is space | ||
| 293 | self.try_publish(message).ok().unwrap(); | ||
| 294 | } | ||
| 295 | |||
| 296 | fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> { | ||
| 297 | let start_id = self.next_message_id - self.queue.len() as u64; | ||
| 298 | |||
| 299 | if message_id < start_id { | ||
| 300 | return Some(WaitResult::Lagged(start_id - message_id)); | ||
| 301 | } | ||
| 302 | |||
| 303 | let current_message_index = (message_id - start_id) as usize; | ||
| 304 | |||
| 305 | if current_message_index >= self.queue.len() { | ||
| 306 | return None; | ||
| 307 | } | ||
| 308 | |||
| 309 | // We've checked that the index is valid | ||
| 310 | let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); | ||
| 311 | |||
| 312 | // We're reading this item, so decrement the counter | ||
| 313 | queue_item.1 -= 1; | ||
| 314 | let message = queue_item.0.clone(); | ||
| 315 | |||
| 316 | if current_message_index == 0 && queue_item.1 == 0 { | ||
| 317 | self.queue.pop_front(); | ||
| 318 | self.publisher_wakers.wake(); | ||
| 319 | } | ||
| 320 | |||
| 321 | Some(WaitResult::Message(message)) | ||
| 322 | } | ||
| 323 | |||
| 324 | fn register_subscriber_waker(&mut self, waker: &Waker) { | ||
| 325 | match self.subscriber_wakers.register(waker) { | ||
| 326 | Ok(()) => {} | ||
| 327 | Err(_) => { | ||
| 328 | // All waker slots were full. This can only happen when there was a subscriber that now has dropped. | ||
| 329 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 330 | // Any future that is still active will simply reregister. | ||
| 331 | // This won't happen a lot, so it's ok. | ||
| 332 | self.subscriber_wakers.wake(); | ||
| 333 | self.subscriber_wakers.register(waker).unwrap(); | ||
| 334 | } | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | fn register_publisher_waker(&mut self, waker: &Waker) { | ||
| 339 | match self.publisher_wakers.register(waker) { | ||
| 340 | Ok(()) => {} | ||
| 341 | Err(_) => { | ||
| 342 | // All waker slots were full. This can only happen when there was a publisher that now has dropped. | ||
| 343 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 344 | // Any future that is still active will simply reregister. | ||
| 345 | // This won't happen a lot, so it's ok. | ||
| 346 | self.publisher_wakers.wake(); | ||
| 347 | self.publisher_wakers.register(waker).unwrap(); | ||
| 348 | } | ||
| 349 | } | ||
| 350 | } | ||
| 351 | |||
| 352 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { | ||
| 353 | self.subscriber_count -= 1; | ||
| 354 | |||
| 355 | // All messages that haven't been read yet by this subscriber must have their counter decremented | ||
| 356 | let start_id = self.next_message_id - self.queue.len() as u64; | ||
| 357 | if subscriber_next_message_id >= start_id { | ||
| 358 | let current_message_index = (subscriber_next_message_id - start_id) as usize; | ||
| 359 | self.queue | ||
| 360 | .iter_mut() | ||
| 361 | .skip(current_message_index) | ||
| 362 | .for_each(|(_, counter)| *counter -= 1); | ||
| 363 | } | ||
| 364 | } | ||
| 365 | |||
| 366 | fn unregister_publisher(&mut self) { | ||
| 367 | self.publisher_count -= 1; | ||
| 368 | } | ||
| 369 | } | ||
| 370 | |||
| 371 | /// Error type for the [PubSubChannel] | ||
| 372 | #[derive(Debug, PartialEq, Eq, Clone)] | ||
| 373 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 374 | pub enum Error { | ||
| 375 | /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or | ||
| 376 | /// the capacity of the channels must be increased. | ||
| 377 | MaximumSubscribersReached, | ||
| 378 | /// All publisher slots are used. To add another publisher, first another publisher must be dropped or | ||
| 379 | /// the capacity of the channels must be increased. | ||
| 380 | MaximumPublishersReached, | ||
| 381 | } | ||
| 382 | |||
| 383 | /// 'Middle level' behaviour of the pubsub channel. | ||
| 384 | /// This trait is used so that Sub and Pub can be generic over the channel. | ||
| 385 | pub trait PubSubBehavior<T> { | ||
| 386 | /// Try to get a message from the queue with the given message id. | ||
| 387 | /// | ||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | ||
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | ||
| 390 | |||
| 391 | /// Try to publish a message to the queue. | ||
| 392 | /// | ||
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | ||
| 394 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; | ||
| 395 | |||
| 396 | /// Publish a message immediately | ||
| 397 | fn publish_immediate(&self, message: T); | ||
| 398 | |||
| 399 | /// Let the channel know that a subscriber has dropped | ||
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | ||
| 401 | |||
| 402 | /// Let the channel know that a publisher has dropped | ||
| 403 | fn unregister_publisher(&self); | ||
| 404 | } | ||
| 405 | |||
| 406 | /// The result of the subscriber wait procedure | ||
| 407 | #[derive(Debug, Clone, PartialEq, Eq)] | ||
| 408 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 409 | pub enum WaitResult<T> { | ||
| 410 | /// The subscriber did not receive all messages and lagged by the given amount of messages. | ||
| 411 | /// (This is the amount of messages that were missed) | ||
| 412 | Lagged(u64), | ||
| 413 | /// A message was received | ||
| 414 | Message(T), | ||
| 415 | } | ||
| 416 | |||
| 417 | #[cfg(test)] | ||
| 418 | mod tests { | ||
| 419 | use super::*; | ||
| 420 | use crate::blocking_mutex::raw::NoopRawMutex; | ||
| 421 | |||
| 422 | #[futures_test::test] | ||
| 423 | async fn dyn_pub_sub_works() { | ||
| 424 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 425 | |||
| 426 | let mut sub0 = channel.dyn_subscriber().unwrap(); | ||
| 427 | let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 428 | let pub0 = channel.dyn_publisher().unwrap(); | ||
| 429 | |||
| 430 | pub0.publish(42).await; | ||
| 431 | |||
| 432 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 433 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 434 | |||
| 435 | assert_eq!(sub0.try_next_message(), None); | ||
| 436 | assert_eq!(sub1.try_next_message(), None); | ||
| 437 | } | ||
| 438 | |||
| 439 | #[futures_test::test] | ||
| 440 | async fn all_subscribers_receive() { | ||
| 441 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 442 | |||
| 443 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 444 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 445 | let pub0 = channel.publisher().unwrap(); | ||
| 446 | |||
| 447 | pub0.publish(42).await; | ||
| 448 | |||
| 449 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 450 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 451 | |||
| 452 | assert_eq!(sub0.try_next_message(), None); | ||
| 453 | assert_eq!(sub1.try_next_message(), None); | ||
| 454 | } | ||
| 455 | |||
| 456 | #[futures_test::test] | ||
| 457 | async fn lag_when_queue_full_on_immediate_publish() { | ||
| 458 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 459 | |||
| 460 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 461 | let pub0 = channel.publisher().unwrap(); | ||
| 462 | |||
| 463 | pub0.publish_immediate(42); | ||
| 464 | pub0.publish_immediate(43); | ||
| 465 | pub0.publish_immediate(44); | ||
| 466 | pub0.publish_immediate(45); | ||
| 467 | pub0.publish_immediate(46); | ||
| 468 | pub0.publish_immediate(47); | ||
| 469 | |||
| 470 | assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); | ||
| 471 | assert_eq!(sub0.next_message().await, WaitResult::Message(44)); | ||
| 472 | assert_eq!(sub0.next_message().await, WaitResult::Message(45)); | ||
| 473 | assert_eq!(sub0.next_message().await, WaitResult::Message(46)); | ||
| 474 | assert_eq!(sub0.next_message().await, WaitResult::Message(47)); | ||
| 475 | assert_eq!(sub0.try_next_message(), None); | ||
| 476 | } | ||
| 477 | |||
| 478 | #[test] | ||
| 479 | fn limited_subs_and_pubs() { | ||
| 480 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 481 | |||
| 482 | let sub0 = channel.subscriber(); | ||
| 483 | let sub1 = channel.subscriber(); | ||
| 484 | let sub2 = channel.subscriber(); | ||
| 485 | let sub3 = channel.subscriber(); | ||
| 486 | let sub4 = channel.subscriber(); | ||
| 487 | |||
| 488 | assert!(sub0.is_ok()); | ||
| 489 | assert!(sub1.is_ok()); | ||
| 490 | assert!(sub2.is_ok()); | ||
| 491 | assert!(sub3.is_ok()); | ||
| 492 | assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); | ||
| 493 | |||
| 494 | drop(sub0); | ||
| 495 | |||
| 496 | let sub5 = channel.subscriber(); | ||
| 497 | assert!(sub5.is_ok()); | ||
| 498 | |||
| 499 | // publishers | ||
| 500 | |||
| 501 | let pub0 = channel.publisher(); | ||
| 502 | let pub1 = channel.publisher(); | ||
| 503 | let pub2 = channel.publisher(); | ||
| 504 | let pub3 = channel.publisher(); | ||
| 505 | let pub4 = channel.publisher(); | ||
| 506 | |||
| 507 | assert!(pub0.is_ok()); | ||
| 508 | assert!(pub1.is_ok()); | ||
| 509 | assert!(pub2.is_ok()); | ||
| 510 | assert!(pub3.is_ok()); | ||
| 511 | assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); | ||
| 512 | |||
| 513 | drop(pub0); | ||
| 514 | |||
| 515 | let pub5 = channel.publisher(); | ||
| 516 | assert!(pub5.is_ok()); | ||
| 517 | } | ||
| 518 | |||
| 519 | #[test] | ||
| 520 | fn publisher_wait_on_full_queue() { | ||
| 521 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 522 | |||
| 523 | let pub0 = channel.publisher().unwrap(); | ||
| 524 | |||
| 525 | // There are no subscribers, so the queue will never be full | ||
| 526 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 527 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 528 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 529 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 530 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 531 | |||
| 532 | let sub0 = channel.subscriber().unwrap(); | ||
| 533 | |||
| 534 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 535 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 536 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 537 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 538 | assert_eq!(pub0.try_publish(0), Err(0)); | ||
| 539 | |||
| 540 | drop(sub0); | ||
| 541 | } | ||
| 542 | } | ||
diff --git a/embassy-sync/src/channel/pubsub/publisher.rs b/embassy-sync/src/channel/pubsub/publisher.rs new file mode 100644 index 000000000..705797f60 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/publisher.rs | |||
| @@ -0,0 +1,182 @@ | |||
| 1 | //! Implementation of anything directly publisher related | ||
| 2 | |||
| 3 | use core::future::Future; | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | use core::ops::{Deref, DerefMut}; | ||
| 6 | use core::pin::Pin; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use super::{PubSubBehavior, PubSubChannel}; | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | |||
| 12 | /// A publisher to a channel | ||
| 13 | pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 14 | /// The channel we are a publisher for | ||
| 15 | channel: &'a PSB, | ||
| 16 | _phantom: PhantomData<T>, | ||
| 17 | } | ||
| 18 | |||
| 19 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | ||
| 20 | pub(super) fn new(channel: &'a PSB) -> Self { | ||
| 21 | Self { | ||
| 22 | channel, | ||
| 23 | _phantom: Default::default(), | ||
| 24 | } | ||
| 25 | } | ||
| 26 | |||
| 27 | /// Publish a message right now even when the queue is full. | ||
| 28 | /// This may cause a subscriber to miss an older message. | ||
| 29 | pub fn publish_immediate(&self, message: T) { | ||
| 30 | self.channel.publish_immediate(message) | ||
| 31 | } | ||
| 32 | |||
| 33 | /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message | ||
| 34 | pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { | ||
| 35 | PublisherWaitFuture { | ||
| 36 | message: Some(message), | ||
| 37 | publisher: self, | ||
| 38 | } | ||
| 39 | } | ||
| 40 | |||
| 41 | /// Publish a message if there is space in the message queue | ||
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | ||
| 43 | self.channel.publish_with_context(message, None) | ||
| 44 | } | ||
| 45 | } | ||
| 46 | |||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | ||
| 48 | fn drop(&mut self) { | ||
| 49 | self.channel.unregister_publisher() | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | /// A publisher that holds a dynamic reference to the channel | ||
| 54 | pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 55 | |||
| 56 | impl<'a, T: Clone> Deref for DynPublisher<'a, T> { | ||
| 57 | type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 58 | |||
| 59 | fn deref(&self) -> &Self::Target { | ||
| 60 | &self.0 | ||
| 61 | } | ||
| 62 | } | ||
| 63 | |||
| 64 | impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { | ||
| 65 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 66 | &mut self.0 | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | /// A publisher that holds a generic reference to the channel | ||
| 71 | pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 72 | pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 73 | ); | ||
| 74 | |||
| 75 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 76 | for Publisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 77 | { | ||
| 78 | type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 79 | |||
| 80 | fn deref(&self) -> &Self::Target { | ||
| 81 | &self.0 | ||
| 82 | } | ||
| 83 | } | ||
| 84 | |||
| 85 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 86 | for Publisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 87 | { | ||
| 88 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 89 | &mut self.0 | ||
| 90 | } | ||
| 91 | } | ||
| 92 | |||
| 93 | /// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. | ||
| 94 | /// (So an infinite amount is possible) | ||
| 95 | pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 96 | /// The channel we are a publisher for | ||
| 97 | channel: &'a PSB, | ||
| 98 | _phantom: PhantomData<T>, | ||
| 99 | } | ||
| 100 | |||
| 101 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { | ||
| 102 | pub(super) fn new(channel: &'a PSB) -> Self { | ||
| 103 | Self { | ||
| 104 | channel, | ||
| 105 | _phantom: Default::default(), | ||
| 106 | } | ||
| 107 | } | ||
| 108 | /// Publish the message right now even when the queue is full. | ||
| 109 | /// This may cause a subscriber to miss an older message. | ||
| 110 | pub fn publish_immediate(&self, message: T) { | ||
| 111 | self.channel.publish_immediate(message) | ||
| 112 | } | ||
| 113 | |||
| 114 | /// Publish a message if there is space in the message queue | ||
| 115 | pub fn try_publish(&self, message: T) -> Result<(), T> { | ||
| 116 | self.channel.publish_with_context(message, None) | ||
| 117 | } | ||
| 118 | } | ||
| 119 | |||
| 120 | /// An immediate publisher that holds a dynamic reference to the channel | ||
| 121 | pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 122 | |||
| 123 | impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { | ||
| 124 | type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 125 | |||
| 126 | fn deref(&self) -> &Self::Target { | ||
| 127 | &self.0 | ||
| 128 | } | ||
| 129 | } | ||
| 130 | |||
| 131 | impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { | ||
| 132 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 133 | &mut self.0 | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | /// An immediate publisher that holds a generic reference to the channel | ||
| 138 | pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 139 | pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 140 | ); | ||
| 141 | |||
| 142 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 143 | for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 144 | { | ||
| 145 | type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 146 | |||
| 147 | fn deref(&self) -> &Self::Target { | ||
| 148 | &self.0 | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 153 | for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 154 | { | ||
| 155 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 156 | &mut self.0 | ||
| 157 | } | ||
| 158 | } | ||
| 159 | |||
| 160 | /// Future for the publisher wait action | ||
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 162 | /// The message we need to publish | ||
| 163 | message: Option<T>, | ||
| 164 | publisher: &'s Pub<'a, PSB, T>, | ||
| 165 | } | ||
| 166 | |||
| 167 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { | ||
| 168 | type Output = (); | ||
| 169 | |||
| 170 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 171 | let message = self.message.take().unwrap(); | ||
| 172 | match self.publisher.channel.publish_with_context(message, Some(cx)) { | ||
| 173 | Ok(()) => Poll::Ready(()), | ||
| 174 | Err(message) => { | ||
| 175 | self.message = Some(message); | ||
| 176 | Poll::Pending | ||
| 177 | } | ||
| 178 | } | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} | ||
diff --git a/embassy-sync/src/channel/pubsub/subscriber.rs b/embassy-sync/src/channel/pubsub/subscriber.rs new file mode 100644 index 000000000..b9a2cbe18 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/subscriber.rs | |||
| @@ -0,0 +1,152 @@ | |||
| 1 | //! Implementation of anything directly subscriber related | ||
| 2 | |||
| 3 | use core::future::Future; | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | use core::ops::{Deref, DerefMut}; | ||
| 6 | use core::pin::Pin; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use super::{PubSubBehavior, PubSubChannel, WaitResult}; | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | |||
| 12 | /// A subscriber to a channel | ||
| 13 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 14 | /// The message id of the next message we are yet to receive | ||
| 15 | next_message_id: u64, | ||
| 16 | /// The channel we are a subscriber to | ||
| 17 | channel: &'a PSB, | ||
| 18 | _phantom: PhantomData<T>, | ||
| 19 | } | ||
| 20 | |||
| 21 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | ||
| 22 | pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { | ||
| 23 | Self { | ||
| 24 | next_message_id, | ||
| 25 | channel, | ||
| 26 | _phantom: Default::default(), | ||
| 27 | } | ||
| 28 | } | ||
| 29 | |||
| 30 | /// Wait for a published message | ||
| 31 | pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { | ||
| 32 | SubscriberWaitFuture { subscriber: self } | ||
| 33 | } | ||
| 34 | |||
| 35 | /// Wait for a published message (ignoring lag results) | ||
| 36 | pub async fn next_message_pure(&mut self) -> T { | ||
| 37 | loop { | ||
| 38 | match self.next_message().await { | ||
| 39 | WaitResult::Lagged(_) => continue, | ||
| 40 | WaitResult::Message(message) => break message, | ||
| 41 | } | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Try to see if there's a published message we haven't received yet. | ||
| 46 | /// | ||
| 47 | /// This function does not peek. The message is received if there is one. | ||
| 48 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { | ||
| 49 | match self.channel.get_message_with_context(&mut self.next_message_id, None) { | ||
| 50 | Poll::Ready(result) => Some(result), | ||
| 51 | Poll::Pending => None, | ||
| 52 | } | ||
| 53 | } | ||
| 54 | |||
| 55 | /// Try to see if there's a published message we haven't received yet (ignoring lag results). | ||
| 56 | /// | ||
| 57 | /// This function does not peek. The message is received if there is one. | ||
| 58 | pub fn try_next_message_pure(&mut self) -> Option<T> { | ||
| 59 | loop { | ||
| 60 | match self.try_next_message() { | ||
| 61 | Some(WaitResult::Lagged(_)) => continue, | ||
| 62 | Some(WaitResult::Message(message)) => break Some(message), | ||
| 63 | None => break None, | ||
| 64 | } | ||
| 65 | } | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | ||
| 70 | fn drop(&mut self) { | ||
| 71 | self.channel.unregister_subscriber(self.next_message_id) | ||
| 72 | } | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} | ||
| 76 | |||
| 77 | /// Warning: The stream implementation ignores lag results and returns all messages. | ||
| 78 | /// This might miss some messages without you knowing it. | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { | ||
| 80 | type Item = T; | ||
| 81 | |||
| 82 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 83 | match self | ||
| 84 | .channel | ||
| 85 | .get_message_with_context(&mut self.next_message_id, Some(cx)) | ||
| 86 | { | ||
| 87 | Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), | ||
| 88 | Poll::Ready(WaitResult::Lagged(_)) => { | ||
| 89 | cx.waker().wake_by_ref(); | ||
| 90 | Poll::Pending | ||
| 91 | } | ||
| 92 | Poll::Pending => Poll::Pending, | ||
| 93 | } | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | /// A subscriber that holds a dynamic reference to the channel | ||
| 98 | pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 99 | |||
| 100 | impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { | ||
| 101 | type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 102 | |||
| 103 | fn deref(&self) -> &Self::Target { | ||
| 104 | &self.0 | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { | ||
| 109 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 110 | &mut self.0 | ||
| 111 | } | ||
| 112 | } | ||
| 113 | |||
| 114 | /// A subscriber that holds a generic reference to the channel | ||
| 115 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 116 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 117 | ); | ||
| 118 | |||
| 119 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 120 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> | ||
| 121 | { | ||
| 122 | type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 123 | |||
| 124 | fn deref(&self) -> &Self::Target { | ||
| 125 | &self.0 | ||
| 126 | } | ||
| 127 | } | ||
| 128 | |||
| 129 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 130 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> | ||
| 131 | { | ||
| 132 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 133 | &mut self.0 | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | /// Future for the subscriber wait action | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | ||
| 140 | } | ||
| 141 | |||
| 142 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { | ||
| 143 | type Output = WaitResult<T>; | ||
| 144 | |||
| 145 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 146 | self.subscriber | ||
| 147 | .channel | ||
| 148 | .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} | ||
diff --git a/embassy-sync/src/channel/signal.rs b/embassy-sync/src/channel/signal.rs new file mode 100644 index 000000000..9279266c1 --- /dev/null +++ b/embassy-sync/src/channel/signal.rs | |||
| @@ -0,0 +1,100 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to a task. | ||
| 2 | use core::cell::UnsafeCell; | ||
| 3 | use core::future::Future; | ||
| 4 | use core::mem; | ||
| 5 | use core::task::{Context, Poll, Waker}; | ||
| 6 | |||
| 7 | /// Single-slot signaling primitive. | ||
| 8 | /// | ||
| 9 | /// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except | ||
| 10 | /// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead | ||
| 11 | /// of waiting for the receiver to pop the previous value. | ||
| 12 | /// | ||
| 13 | /// It is useful for sending data between tasks when the receiver only cares about | ||
| 14 | /// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" | ||
| 15 | /// updates. | ||
| 16 | /// | ||
| 17 | /// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. | ||
| 18 | /// | ||
| 19 | /// Signals are generally declared as `static`s and then borrowed as required. | ||
| 20 | /// | ||
| 21 | /// ``` | ||
| 22 | /// use embassy_sync::channel::signal::Signal; | ||
| 23 | /// | ||
| 24 | /// enum SomeCommand { | ||
| 25 | /// On, | ||
| 26 | /// Off, | ||
| 27 | /// } | ||
| 28 | /// | ||
| 29 | /// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new(); | ||
| 30 | /// ``` | ||
| 31 | pub struct Signal<T> { | ||
| 32 | state: UnsafeCell<State<T>>, | ||
| 33 | } | ||
| 34 | |||
| 35 | enum State<T> { | ||
| 36 | None, | ||
| 37 | Waiting(Waker), | ||
| 38 | Signaled(T), | ||
| 39 | } | ||
| 40 | |||
| 41 | unsafe impl<T: Send> Send for Signal<T> {} | ||
| 42 | unsafe impl<T: Send> Sync for Signal<T> {} | ||
| 43 | |||
| 44 | impl<T> Signal<T> { | ||
| 45 | /// Create a new `Signal`. | ||
| 46 | pub const fn new() -> Self { | ||
| 47 | Self { | ||
| 48 | state: UnsafeCell::new(State::None), | ||
| 49 | } | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | impl<T: Send> Signal<T> { | ||
| 54 | /// Mark this Signal as signaled. | ||
| 55 | pub fn signal(&self, val: T) { | ||
| 56 | critical_section::with(|_| unsafe { | ||
| 57 | let state = &mut *self.state.get(); | ||
| 58 | if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { | ||
| 59 | waker.wake(); | ||
| 60 | } | ||
| 61 | }) | ||
| 62 | } | ||
| 63 | |||
| 64 | /// Remove the queued value in this `Signal`, if any. | ||
| 65 | pub fn reset(&self) { | ||
| 66 | critical_section::with(|_| unsafe { | ||
| 67 | let state = &mut *self.state.get(); | ||
| 68 | *state = State::None | ||
| 69 | }) | ||
| 70 | } | ||
| 71 | |||
| 72 | /// Manually poll the Signal future. | ||
| 73 | pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 74 | critical_section::with(|_| unsafe { | ||
| 75 | let state = &mut *self.state.get(); | ||
| 76 | match state { | ||
| 77 | State::None => { | ||
| 78 | *state = State::Waiting(cx.waker().clone()); | ||
| 79 | Poll::Pending | ||
| 80 | } | ||
| 81 | State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, | ||
| 82 | State::Waiting(_) => panic!("waker overflow"), | ||
| 83 | State::Signaled(_) => match mem::replace(state, State::None) { | ||
| 84 | State::Signaled(res) => Poll::Ready(res), | ||
| 85 | _ => unreachable!(), | ||
| 86 | }, | ||
| 87 | } | ||
| 88 | }) | ||
| 89 | } | ||
| 90 | |||
| 91 | /// Future that completes when this Signal has been signaled. | ||
| 92 | pub fn wait(&self) -> impl Future<Output = T> + '_ { | ||
| 93 | futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) | ||
| 94 | } | ||
| 95 | |||
| 96 | /// non-blocking method to check whether this signal has been signaled. | ||
| 97 | pub fn signaled(&self) -> bool { | ||
| 98 | critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) | ||
| 99 | } | ||
| 100 | } | ||
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs new file mode 100644 index 000000000..f8bb0a035 --- /dev/null +++ b/embassy-sync/src/fmt.rs | |||
| @@ -0,0 +1,228 @@ | |||
| 1 | #![macro_use] | ||
| 2 | #![allow(unused_macros)] | ||
| 3 | |||
| 4 | #[cfg(all(feature = "defmt", feature = "log"))] | ||
| 5 | compile_error!("You may not enable both `defmt` and `log` features."); | ||
| 6 | |||
| 7 | macro_rules! assert { | ||
| 8 | ($($x:tt)*) => { | ||
| 9 | { | ||
| 10 | #[cfg(not(feature = "defmt"))] | ||
| 11 | ::core::assert!($($x)*); | ||
| 12 | #[cfg(feature = "defmt")] | ||
| 13 | ::defmt::assert!($($x)*); | ||
| 14 | } | ||
| 15 | }; | ||
| 16 | } | ||
| 17 | |||
| 18 | macro_rules! assert_eq { | ||
| 19 | ($($x:tt)*) => { | ||
| 20 | { | ||
| 21 | #[cfg(not(feature = "defmt"))] | ||
| 22 | ::core::assert_eq!($($x)*); | ||
| 23 | #[cfg(feature = "defmt")] | ||
| 24 | ::defmt::assert_eq!($($x)*); | ||
| 25 | } | ||
| 26 | }; | ||
| 27 | } | ||
| 28 | |||
| 29 | macro_rules! assert_ne { | ||
| 30 | ($($x:tt)*) => { | ||
| 31 | { | ||
| 32 | #[cfg(not(feature = "defmt"))] | ||
| 33 | ::core::assert_ne!($($x)*); | ||
| 34 | #[cfg(feature = "defmt")] | ||
| 35 | ::defmt::assert_ne!($($x)*); | ||
| 36 | } | ||
| 37 | }; | ||
| 38 | } | ||
| 39 | |||
| 40 | macro_rules! debug_assert { | ||
| 41 | ($($x:tt)*) => { | ||
| 42 | { | ||
| 43 | #[cfg(not(feature = "defmt"))] | ||
| 44 | ::core::debug_assert!($($x)*); | ||
| 45 | #[cfg(feature = "defmt")] | ||
| 46 | ::defmt::debug_assert!($($x)*); | ||
| 47 | } | ||
| 48 | }; | ||
| 49 | } | ||
| 50 | |||
| 51 | macro_rules! debug_assert_eq { | ||
| 52 | ($($x:tt)*) => { | ||
| 53 | { | ||
| 54 | #[cfg(not(feature = "defmt"))] | ||
| 55 | ::core::debug_assert_eq!($($x)*); | ||
| 56 | #[cfg(feature = "defmt")] | ||
| 57 | ::defmt::debug_assert_eq!($($x)*); | ||
| 58 | } | ||
| 59 | }; | ||
| 60 | } | ||
| 61 | |||
| 62 | macro_rules! debug_assert_ne { | ||
| 63 | ($($x:tt)*) => { | ||
| 64 | { | ||
| 65 | #[cfg(not(feature = "defmt"))] | ||
| 66 | ::core::debug_assert_ne!($($x)*); | ||
| 67 | #[cfg(feature = "defmt")] | ||
| 68 | ::defmt::debug_assert_ne!($($x)*); | ||
| 69 | } | ||
| 70 | }; | ||
| 71 | } | ||
| 72 | |||
| 73 | macro_rules! todo { | ||
| 74 | ($($x:tt)*) => { | ||
| 75 | { | ||
| 76 | #[cfg(not(feature = "defmt"))] | ||
| 77 | ::core::todo!($($x)*); | ||
| 78 | #[cfg(feature = "defmt")] | ||
| 79 | ::defmt::todo!($($x)*); | ||
| 80 | } | ||
| 81 | }; | ||
| 82 | } | ||
| 83 | |||
| 84 | macro_rules! unreachable { | ||
| 85 | ($($x:tt)*) => { | ||
| 86 | { | ||
| 87 | #[cfg(not(feature = "defmt"))] | ||
| 88 | ::core::unreachable!($($x)*); | ||
| 89 | #[cfg(feature = "defmt")] | ||
| 90 | ::defmt::unreachable!($($x)*); | ||
| 91 | } | ||
| 92 | }; | ||
| 93 | } | ||
| 94 | |||
| 95 | macro_rules! panic { | ||
| 96 | ($($x:tt)*) => { | ||
| 97 | { | ||
| 98 | #[cfg(not(feature = "defmt"))] | ||
| 99 | ::core::panic!($($x)*); | ||
| 100 | #[cfg(feature = "defmt")] | ||
| 101 | ::defmt::panic!($($x)*); | ||
| 102 | } | ||
| 103 | }; | ||
| 104 | } | ||
| 105 | |||
| 106 | macro_rules! trace { | ||
| 107 | ($s:literal $(, $x:expr)* $(,)?) => { | ||
| 108 | { | ||
| 109 | #[cfg(feature = "log")] | ||
| 110 | ::log::trace!($s $(, $x)*); | ||
| 111 | #[cfg(feature = "defmt")] | ||
| 112 | ::defmt::trace!($s $(, $x)*); | ||
| 113 | #[cfg(not(any(feature = "log", feature="defmt")))] | ||
| 114 | let _ = ($( & $x ),*); | ||
| 115 | } | ||
| 116 | }; | ||
| 117 | } | ||
| 118 | |||
| 119 | macro_rules! debug { | ||
| 120 | ($s:literal $(, $x:expr)* $(,)?) => { | ||
| 121 | { | ||
| 122 | #[cfg(feature = "log")] | ||
| 123 | ::log::debug!($s $(, $x)*); | ||
| 124 | #[cfg(feature = "defmt")] | ||
| 125 | ::defmt::debug!($s $(, $x)*); | ||
| 126 | #[cfg(not(any(feature = "log", feature="defmt")))] | ||
| 127 | let _ = ($( & $x ),*); | ||
| 128 | } | ||
| 129 | }; | ||
| 130 | } | ||
| 131 | |||
| 132 | macro_rules! info { | ||
| 133 | ($s:literal $(, $x:expr)* $(,)?) => { | ||
| 134 | { | ||
| 135 | #[cfg(feature = "log")] | ||
| 136 | ::log::info!($s $(, $x)*); | ||
| 137 | #[cfg(feature = "defmt")] | ||
| 138 | ::defmt::info!($s $(, $x)*); | ||
| 139 | #[cfg(not(any(feature = "log", feature="defmt")))] | ||
| 140 | let _ = ($( & $x ),*); | ||
| 141 | } | ||
| 142 | }; | ||
| 143 | } | ||
| 144 | |||
| 145 | macro_rules! warn { | ||
| 146 | ($s:literal $(, $x:expr)* $(,)?) => { | ||
| 147 | { | ||
| 148 | #[cfg(feature = "log")] | ||
| 149 | ::log::warn!($s $(, $x)*); | ||
| 150 | #[cfg(feature = "defmt")] | ||
| 151 | ::defmt::warn!($s $(, $x)*); | ||
| 152 | #[cfg(not(any(feature = "log", feature="defmt")))] | ||
| 153 | let _ = ($( & $x ),*); | ||
| 154 | } | ||
| 155 | }; | ||
| 156 | } | ||
| 157 | |||
| 158 | macro_rules! error { | ||
| 159 | ($s:literal $(, $x:expr)* $(,)?) => { | ||
| 160 | { | ||
| 161 | #[cfg(feature = "log")] | ||
| 162 | ::log::error!($s $(, $x)*); | ||
| 163 | #[cfg(feature = "defmt")] | ||
| 164 | ::defmt::error!($s $(, $x)*); | ||
| 165 | #[cfg(not(any(feature = "log", feature="defmt")))] | ||
| 166 | let _ = ($( & $x ),*); | ||
| 167 | } | ||
| 168 | }; | ||
| 169 | } | ||
| 170 | |||
| 171 | #[cfg(feature = "defmt")] | ||
| 172 | macro_rules! unwrap { | ||
| 173 | ($($x:tt)*) => { | ||
| 174 | ::defmt::unwrap!($($x)*) | ||
| 175 | }; | ||
| 176 | } | ||
| 177 | |||
| 178 | #[cfg(not(feature = "defmt"))] | ||
| 179 | macro_rules! unwrap { | ||
| 180 | ($arg:expr) => { | ||
| 181 | match $crate::fmt::Try::into_result($arg) { | ||
| 182 | ::core::result::Result::Ok(t) => t, | ||
| 183 | ::core::result::Result::Err(e) => { | ||
| 184 | ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); | ||
| 185 | } | ||
| 186 | } | ||
| 187 | }; | ||
| 188 | ($arg:expr, $($msg:expr),+ $(,)? ) => { | ||
| 189 | match $crate::fmt::Try::into_result($arg) { | ||
| 190 | ::core::result::Result::Ok(t) => t, | ||
| 191 | ::core::result::Result::Err(e) => { | ||
| 192 | ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); | ||
| 193 | } | ||
| 194 | } | ||
| 195 | } | ||
| 196 | } | ||
| 197 | |||
| 198 | #[cfg(feature = "defmt-timestamp-uptime")] | ||
| 199 | defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } | ||
| 200 | |||
| 201 | #[derive(Debug, Copy, Clone, Eq, PartialEq)] | ||
| 202 | pub struct NoneError; | ||
| 203 | |||
| 204 | pub trait Try { | ||
| 205 | type Ok; | ||
| 206 | type Error; | ||
| 207 | fn into_result(self) -> Result<Self::Ok, Self::Error>; | ||
| 208 | } | ||
| 209 | |||
| 210 | impl<T> Try for Option<T> { | ||
| 211 | type Ok = T; | ||
| 212 | type Error = NoneError; | ||
| 213 | |||
| 214 | #[inline] | ||
| 215 | fn into_result(self) -> Result<T, NoneError> { | ||
| 216 | self.ok_or(NoneError) | ||
| 217 | } | ||
| 218 | } | ||
| 219 | |||
| 220 | impl<T, E> Try for Result<T, E> { | ||
| 221 | type Ok = T; | ||
| 222 | type Error = E; | ||
| 223 | |||
| 224 | #[inline] | ||
| 225 | fn into_result(self) -> Self { | ||
| 226 | self | ||
| 227 | } | ||
| 228 | } | ||
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs new file mode 100644 index 000000000..7d8815903 --- /dev/null +++ b/embassy-sync/src/lib.rs | |||
| @@ -0,0 +1,17 @@ | |||
| 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] | ||
| 2 | #![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] | ||
| 3 | #![allow(clippy::new_without_default)] | ||
| 4 | #![doc = include_str!("../../README.md")] | ||
| 5 | #![warn(missing_docs)] | ||
| 6 | |||
| 7 | // This mod MUST go first, so that the others see its macros. | ||
| 8 | pub(crate) mod fmt; | ||
| 9 | |||
| 10 | // internal use | ||
| 11 | mod ring_buffer; | ||
| 12 | |||
| 13 | pub mod blocking_mutex; | ||
| 14 | pub mod channel; | ||
| 15 | pub mod mutex; | ||
| 16 | pub mod pipe; | ||
| 17 | pub mod waitqueue; | ||
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs new file mode 100644 index 000000000..75a6e8dd3 --- /dev/null +++ b/embassy-sync/src/mutex.rs | |||
| @@ -0,0 +1,167 @@ | |||
| 1 | //! Async mutex. | ||
| 2 | //! | ||
| 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. | ||
| 4 | use core::cell::{RefCell, UnsafeCell}; | ||
| 5 | use core::ops::{Deref, DerefMut}; | ||
| 6 | use core::task::Poll; | ||
| 7 | |||
| 8 | use futures_util::future::poll_fn; | ||
| 9 | |||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | use crate::blocking_mutex::Mutex as BlockingMutex; | ||
| 12 | use crate::waitqueue::WakerRegistration; | ||
| 13 | |||
| 14 | /// Error returned by [`Mutex::try_lock`] | ||
| 15 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 16 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 17 | pub struct TryLockError; | ||
| 18 | |||
| 19 | struct State { | ||
| 20 | locked: bool, | ||
| 21 | waker: WakerRegistration, | ||
| 22 | } | ||
| 23 | |||
| 24 | /// Async mutex. | ||
| 25 | /// | ||
| 26 | /// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). | ||
| 27 | /// The raw mutex is used to guard access to the internal "is locked" flag. It | ||
| 28 | /// is held for very short periods only, while locking and unlocking. It is *not* held | ||
| 29 | /// for the entire time the async Mutex is locked. | ||
| 30 | /// | ||
| 31 | /// Which implementation you select depends on the context in which you're using the mutex. | ||
| 32 | /// | ||
| 33 | /// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts. | ||
| 34 | /// | ||
| 35 | /// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor. | ||
| 36 | /// | ||
| 37 | /// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton. | ||
| 38 | /// | ||
| 39 | pub struct Mutex<M, T> | ||
| 40 | where | ||
| 41 | M: RawMutex, | ||
| 42 | T: ?Sized, | ||
| 43 | { | ||
| 44 | state: BlockingMutex<M, RefCell<State>>, | ||
| 45 | inner: UnsafeCell<T>, | ||
| 46 | } | ||
| 47 | |||
| 48 | unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for Mutex<M, T> {} | ||
| 49 | unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<M, T> {} | ||
| 50 | |||
| 51 | /// Async mutex. | ||
| 52 | impl<M, T> Mutex<M, T> | ||
| 53 | where | ||
| 54 | M: RawMutex, | ||
| 55 | { | ||
| 56 | /// Create a new mutex with the given value. | ||
| 57 | pub const fn new(value: T) -> Self { | ||
| 58 | Self { | ||
| 59 | inner: UnsafeCell::new(value), | ||
| 60 | state: BlockingMutex::new(RefCell::new(State { | ||
| 61 | locked: false, | ||
| 62 | waker: WakerRegistration::new(), | ||
| 63 | })), | ||
| 64 | } | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | impl<M, T> Mutex<M, T> | ||
| 69 | where | ||
| 70 | M: RawMutex, | ||
| 71 | T: ?Sized, | ||
| 72 | { | ||
| 73 | /// Lock the mutex. | ||
| 74 | /// | ||
| 75 | /// This will wait for the mutex to be unlocked if it's already locked. | ||
| 76 | pub async fn lock(&self) -> MutexGuard<'_, M, T> { | ||
| 77 | poll_fn(|cx| { | ||
| 78 | let ready = self.state.lock(|s| { | ||
| 79 | let mut s = s.borrow_mut(); | ||
| 80 | if s.locked { | ||
| 81 | s.waker.register(cx.waker()); | ||
| 82 | false | ||
| 83 | } else { | ||
| 84 | s.locked = true; | ||
| 85 | true | ||
| 86 | } | ||
| 87 | }); | ||
| 88 | |||
| 89 | if ready { | ||
| 90 | Poll::Ready(MutexGuard { mutex: self }) | ||
| 91 | } else { | ||
| 92 | Poll::Pending | ||
| 93 | } | ||
| 94 | }) | ||
| 95 | .await | ||
| 96 | } | ||
| 97 | |||
| 98 | /// Attempt to immediately lock the mutex. | ||
| 99 | /// | ||
| 100 | /// If the mutex is already locked, this will return an error instead of waiting. | ||
| 101 | pub fn try_lock(&self) -> Result<MutexGuard<'_, M, T>, TryLockError> { | ||
| 102 | self.state.lock(|s| { | ||
| 103 | let mut s = s.borrow_mut(); | ||
| 104 | if s.locked { | ||
| 105 | Err(TryLockError) | ||
| 106 | } else { | ||
| 107 | s.locked = true; | ||
| 108 | Ok(()) | ||
| 109 | } | ||
| 110 | })?; | ||
| 111 | |||
| 112 | Ok(MutexGuard { mutex: self }) | ||
| 113 | } | ||
| 114 | } | ||
| 115 | |||
| 116 | /// Async mutex guard. | ||
| 117 | /// | ||
| 118 | /// Owning an instance of this type indicates having | ||
| 119 | /// successfully locked the mutex, and grants access to the contents. | ||
| 120 | /// | ||
| 121 | /// Dropping it unlocks the mutex. | ||
| 122 | pub struct MutexGuard<'a, M, T> | ||
| 123 | where | ||
| 124 | M: RawMutex, | ||
| 125 | T: ?Sized, | ||
| 126 | { | ||
| 127 | mutex: &'a Mutex<M, T>, | ||
| 128 | } | ||
| 129 | |||
| 130 | impl<'a, M, T> Drop for MutexGuard<'a, M, T> | ||
| 131 | where | ||
| 132 | M: RawMutex, | ||
| 133 | T: ?Sized, | ||
| 134 | { | ||
| 135 | fn drop(&mut self) { | ||
| 136 | self.mutex.state.lock(|s| { | ||
| 137 | let mut s = s.borrow_mut(); | ||
| 138 | s.locked = false; | ||
| 139 | s.waker.wake(); | ||
| 140 | }) | ||
| 141 | } | ||
| 142 | } | ||
| 143 | |||
| 144 | impl<'a, M, T> Deref for MutexGuard<'a, M, T> | ||
| 145 | where | ||
| 146 | M: RawMutex, | ||
| 147 | T: ?Sized, | ||
| 148 | { | ||
| 149 | type Target = T; | ||
| 150 | fn deref(&self) -> &Self::Target { | ||
| 151 | // Safety: the MutexGuard represents exclusive access to the contents | ||
| 152 | // of the mutex, so it's OK to get it. | ||
| 153 | unsafe { &*(self.mutex.inner.get() as *const T) } | ||
| 154 | } | ||
| 155 | } | ||
| 156 | |||
| 157 | impl<'a, M, T> DerefMut for MutexGuard<'a, M, T> | ||
| 158 | where | ||
| 159 | M: RawMutex, | ||
| 160 | T: ?Sized, | ||
| 161 | { | ||
| 162 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 163 | // Safety: the MutexGuard represents exclusive access to the contents | ||
| 164 | // of the mutex, so it's OK to get it. | ||
| 165 | unsafe { &mut *(self.mutex.inner.get()) } | ||
| 166 | } | ||
| 167 | } | ||
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs new file mode 100644 index 000000000..7d64b648e --- /dev/null +++ b/embassy-sync/src/pipe.rs | |||
| @@ -0,0 +1,551 @@ | |||
| 1 | //! Async byte stream pipe. | ||
| 2 | |||
| 3 | use core::cell::RefCell; | ||
| 4 | use core::future::Future; | ||
| 5 | use core::pin::Pin; | ||
| 6 | use core::task::{Context, Poll}; | ||
| 7 | |||
| 8 | use crate::blocking_mutex::raw::RawMutex; | ||
| 9 | use crate::blocking_mutex::Mutex; | ||
| 10 | use crate::ring_buffer::RingBuffer; | ||
| 11 | use crate::waitqueue::WakerRegistration; | ||
| 12 | |||
| 13 | /// Write-only access to a [`Pipe`]. | ||
| 14 | #[derive(Copy)] | ||
| 15 | pub struct Writer<'p, M, const N: usize> | ||
| 16 | where | ||
| 17 | M: RawMutex, | ||
| 18 | { | ||
| 19 | pipe: &'p Pipe<M, N>, | ||
| 20 | } | ||
| 21 | |||
| 22 | impl<'p, M, const N: usize> Clone for Writer<'p, M, N> | ||
| 23 | where | ||
| 24 | M: RawMutex, | ||
| 25 | { | ||
| 26 | fn clone(&self) -> Self { | ||
| 27 | Writer { pipe: self.pipe } | ||
| 28 | } | ||
| 29 | } | ||
| 30 | |||
| 31 | impl<'p, M, const N: usize> Writer<'p, M, N> | ||
| 32 | where | ||
| 33 | M: RawMutex, | ||
| 34 | { | ||
| 35 | /// Writes a value. | ||
| 36 | /// | ||
| 37 | /// See [`Pipe::write()`] | ||
| 38 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | ||
| 39 | self.pipe.write(buf) | ||
| 40 | } | ||
| 41 | |||
| 42 | /// Attempt to immediately write a message. | ||
| 43 | /// | ||
| 44 | /// See [`Pipe::write()`] | ||
| 45 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 46 | self.pipe.try_write(buf) | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. | ||
| 51 | pub struct WriteFuture<'p, M, const N: usize> | ||
| 52 | where | ||
| 53 | M: RawMutex, | ||
| 54 | { | ||
| 55 | pipe: &'p Pipe<M, N>, | ||
| 56 | buf: &'p [u8], | ||
| 57 | } | ||
| 58 | |||
| 59 | impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> | ||
| 60 | where | ||
| 61 | M: RawMutex, | ||
| 62 | { | ||
| 63 | type Output = usize; | ||
| 64 | |||
| 65 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 66 | match self.pipe.try_write_with_context(Some(cx), self.buf) { | ||
| 67 | Ok(n) => Poll::Ready(n), | ||
| 68 | Err(TryWriteError::Full) => Poll::Pending, | ||
| 69 | } | ||
| 70 | } | ||
| 71 | } | ||
| 72 | |||
| 73 | impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} | ||
| 74 | |||
| 75 | /// Read-only access to a [`Pipe`]. | ||
| 76 | #[derive(Copy)] | ||
| 77 | pub struct Reader<'p, M, const N: usize> | ||
| 78 | where | ||
| 79 | M: RawMutex, | ||
| 80 | { | ||
| 81 | pipe: &'p Pipe<M, N>, | ||
| 82 | } | ||
| 83 | |||
| 84 | impl<'p, M, const N: usize> Clone for Reader<'p, M, N> | ||
| 85 | where | ||
| 86 | M: RawMutex, | ||
| 87 | { | ||
| 88 | fn clone(&self) -> Self { | ||
| 89 | Reader { pipe: self.pipe } | ||
| 90 | } | ||
| 91 | } | ||
| 92 | |||
| 93 | impl<'p, M, const N: usize> Reader<'p, M, N> | ||
| 94 | where | ||
| 95 | M: RawMutex, | ||
| 96 | { | ||
| 97 | /// Reads a value. | ||
| 98 | /// | ||
| 99 | /// See [`Pipe::read()`] | ||
| 100 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | ||
| 101 | self.pipe.read(buf) | ||
| 102 | } | ||
| 103 | |||
| 104 | /// Attempt to immediately read a message. | ||
| 105 | /// | ||
| 106 | /// See [`Pipe::read()`] | ||
| 107 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 108 | self.pipe.try_read(buf) | ||
| 109 | } | ||
| 110 | } | ||
| 111 | |||
| 112 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | ||
| 113 | pub struct ReadFuture<'p, M, const N: usize> | ||
| 114 | where | ||
| 115 | M: RawMutex, | ||
| 116 | { | ||
| 117 | pipe: &'p Pipe<M, N>, | ||
| 118 | buf: &'p mut [u8], | ||
| 119 | } | ||
| 120 | |||
| 121 | impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> | ||
| 122 | where | ||
| 123 | M: RawMutex, | ||
| 124 | { | ||
| 125 | type Output = usize; | ||
| 126 | |||
| 127 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 128 | match self.pipe.try_read_with_context(Some(cx), self.buf) { | ||
| 129 | Ok(n) => Poll::Ready(n), | ||
| 130 | Err(TryReadError::Empty) => Poll::Pending, | ||
| 131 | } | ||
| 132 | } | ||
| 133 | } | ||
| 134 | |||
| 135 | impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} | ||
| 136 | |||
| 137 | /// Error returned by [`try_read`](Pipe::try_read). | ||
| 138 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 139 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 140 | pub enum TryReadError { | ||
| 141 | /// No data could be read from the pipe because it is currently | ||
| 142 | /// empty, and reading would require blocking. | ||
| 143 | Empty, | ||
| 144 | } | ||
| 145 | |||
| 146 | /// Error returned by [`try_write`](Pipe::try_write). | ||
| 147 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 148 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 149 | pub enum TryWriteError { | ||
| 150 | /// No data could be written to the pipe because it is | ||
| 151 | /// currently full, and writing would require blocking. | ||
| 152 | Full, | ||
| 153 | } | ||
| 154 | |||
| 155 | struct PipeState<const N: usize> { | ||
| 156 | buffer: RingBuffer<N>, | ||
| 157 | read_waker: WakerRegistration, | ||
| 158 | write_waker: WakerRegistration, | ||
| 159 | } | ||
| 160 | |||
| 161 | impl<const N: usize> PipeState<N> { | ||
| 162 | const fn new() -> Self { | ||
| 163 | PipeState { | ||
| 164 | buffer: RingBuffer::new(), | ||
| 165 | read_waker: WakerRegistration::new(), | ||
| 166 | write_waker: WakerRegistration::new(), | ||
| 167 | } | ||
| 168 | } | ||
| 169 | |||
| 170 | fn clear(&mut self) { | ||
| 171 | self.buffer.clear(); | ||
| 172 | self.write_waker.wake(); | ||
| 173 | } | ||
| 174 | |||
| 175 | fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 176 | self.try_read_with_context(None, buf) | ||
| 177 | } | ||
| 178 | |||
| 179 | fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 180 | if self.buffer.is_full() { | ||
| 181 | self.write_waker.wake(); | ||
| 182 | } | ||
| 183 | |||
| 184 | let available = self.buffer.pop_buf(); | ||
| 185 | if available.is_empty() { | ||
| 186 | if let Some(cx) = cx { | ||
| 187 | self.read_waker.register(cx.waker()); | ||
| 188 | } | ||
| 189 | return Err(TryReadError::Empty); | ||
| 190 | } | ||
| 191 | |||
| 192 | let n = available.len().min(buf.len()); | ||
| 193 | buf[..n].copy_from_slice(&available[..n]); | ||
| 194 | self.buffer.pop(n); | ||
| 195 | Ok(n) | ||
| 196 | } | ||
| 197 | |||
| 198 | fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 199 | self.try_write_with_context(None, buf) | ||
| 200 | } | ||
| 201 | |||
| 202 | fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 203 | if self.buffer.is_empty() { | ||
| 204 | self.read_waker.wake(); | ||
| 205 | } | ||
| 206 | |||
| 207 | let available = self.buffer.push_buf(); | ||
| 208 | if available.is_empty() { | ||
| 209 | if let Some(cx) = cx { | ||
| 210 | self.write_waker.register(cx.waker()); | ||
| 211 | } | ||
| 212 | return Err(TryWriteError::Full); | ||
| 213 | } | ||
| 214 | |||
| 215 | let n = available.len().min(buf.len()); | ||
| 216 | available[..n].copy_from_slice(&buf[..n]); | ||
| 217 | self.buffer.push(n); | ||
| 218 | Ok(n) | ||
| 219 | } | ||
| 220 | } | ||
| 221 | |||
| 222 | /// A bounded pipe for communicating between asynchronous tasks | ||
| 223 | /// with backpressure. | ||
| 224 | /// | ||
| 225 | /// The pipe will buffer up to the provided number of messages. Once the | ||
| 226 | /// buffer is full, attempts to `write` new messages will wait until a message is | ||
| 227 | /// read from the pipe. | ||
| 228 | /// | ||
| 229 | /// All data written will become available in the same order as it was written. | ||
| 230 | pub struct Pipe<M, const N: usize> | ||
| 231 | where | ||
| 232 | M: RawMutex, | ||
| 233 | { | ||
| 234 | inner: Mutex<M, RefCell<PipeState<N>>>, | ||
| 235 | } | ||
| 236 | |||
| 237 | impl<M, const N: usize> Pipe<M, N> | ||
| 238 | where | ||
| 239 | M: RawMutex, | ||
| 240 | { | ||
| 241 | /// Establish a new bounded pipe. For example, to create one with a NoopMutex: | ||
| 242 | /// | ||
| 243 | /// ``` | ||
| 244 | /// use embassy_sync::pipe::Pipe; | ||
| 245 | /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; | ||
| 246 | /// | ||
| 247 | /// // Declare a bounded pipe, with a buffer of 256 bytes. | ||
| 248 | /// let mut pipe = Pipe::<NoopRawMutex, 256>::new(); | ||
| 249 | /// ``` | ||
| 250 | pub const fn new() -> Self { | ||
| 251 | Self { | ||
| 252 | inner: Mutex::new(RefCell::new(PipeState::new())), | ||
| 253 | } | ||
| 254 | } | ||
| 255 | |||
| 256 | fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R { | ||
| 257 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | ||
| 258 | } | ||
| 259 | |||
| 260 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 261 | self.lock(|c| c.try_read_with_context(cx, buf)) | ||
| 262 | } | ||
| 263 | |||
| 264 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 265 | self.lock(|c| c.try_write_with_context(cx, buf)) | ||
| 266 | } | ||
| 267 | |||
| 268 | /// Get a writer for this pipe. | ||
| 269 | pub fn writer(&self) -> Writer<'_, M, N> { | ||
| 270 | Writer { pipe: self } | ||
| 271 | } | ||
| 272 | |||
| 273 | /// Get a reader for this pipe. | ||
| 274 | pub fn reader(&self) -> Reader<'_, M, N> { | ||
| 275 | Reader { pipe: self } | ||
| 276 | } | ||
| 277 | |||
| 278 | /// Write a value, waiting until there is capacity. | ||
| 279 | /// | ||
| 280 | /// Writeing completes when the value has been pushed to the pipe's queue. | ||
| 281 | /// This doesn't mean the value has been read yet. | ||
| 282 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | ||
| 283 | WriteFuture { pipe: self, buf } | ||
| 284 | } | ||
| 285 | |||
| 286 | /// Attempt to immediately write a message. | ||
| 287 | /// | ||
| 288 | /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's | ||
| 289 | /// buffer is full, instead of waiting. | ||
| 290 | /// | ||
| 291 | /// # Errors | ||
| 292 | /// | ||
| 293 | /// If the pipe capacity has been reached, i.e., the pipe has `n` | ||
| 294 | /// buffered values where `n` is the argument passed to [`Pipe`], then an | ||
| 295 | /// error is returned. | ||
| 296 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 297 | self.lock(|c| c.try_write(buf)) | ||
| 298 | } | ||
| 299 | |||
| 300 | /// Receive the next value. | ||
| 301 | /// | ||
| 302 | /// If there are no messages in the pipe's buffer, this method will | ||
| 303 | /// wait until a message is written. | ||
| 304 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | ||
| 305 | ReadFuture { pipe: self, buf } | ||
| 306 | } | ||
| 307 | |||
| 308 | /// Attempt to immediately read a message. | ||
| 309 | /// | ||
| 310 | /// This method will either read a message from the pipe immediately or return an error | ||
| 311 | /// if the pipe is empty. | ||
| 312 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 313 | self.lock(|c| c.try_read(buf)) | ||
| 314 | } | ||
| 315 | |||
| 316 | /// Clear the data in the pipe's buffer. | ||
| 317 | pub fn clear(&self) { | ||
| 318 | self.lock(|c| c.clear()) | ||
| 319 | } | ||
| 320 | |||
| 321 | /// Return whether the pipe is full (no free space in the buffer) | ||
| 322 | pub fn is_full(&self) -> bool { | ||
| 323 | self.len() == N | ||
| 324 | } | ||
| 325 | |||
| 326 | /// Return whether the pipe is empty (no data buffered) | ||
| 327 | pub fn is_empty(&self) -> bool { | ||
| 328 | self.len() == 0 | ||
| 329 | } | ||
| 330 | |||
| 331 | /// Total byte capacity. | ||
| 332 | /// | ||
| 333 | /// This is the same as the `N` generic param. | ||
| 334 | pub fn capacity(&self) -> usize { | ||
| 335 | N | ||
| 336 | } | ||
| 337 | |||
| 338 | /// Used byte capacity. | ||
| 339 | pub fn len(&self) -> usize { | ||
| 340 | self.lock(|c| c.buffer.len()) | ||
| 341 | } | ||
| 342 | |||
| 343 | /// Free byte capacity. | ||
| 344 | /// | ||
| 345 | /// This is equivalent to `capacity() - len()` | ||
| 346 | pub fn free_capacity(&self) -> usize { | ||
| 347 | N - self.len() | ||
| 348 | } | ||
| 349 | } | ||
| 350 | |||
| 351 | #[cfg(feature = "nightly")] | ||
| 352 | mod io_impls { | ||
| 353 | use core::convert::Infallible; | ||
| 354 | |||
| 355 | use futures_util::FutureExt; | ||
| 356 | |||
| 357 | use super::*; | ||
| 358 | |||
| 359 | impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { | ||
| 360 | type Error = Infallible; | ||
| 361 | } | ||
| 362 | |||
| 363 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { | ||
| 364 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||
| 365 | where | ||
| 366 | Self: 'a; | ||
| 367 | |||
| 368 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 369 | Pipe::read(self, buf).map(Ok) | ||
| 370 | } | ||
| 371 | } | ||
| 372 | |||
| 373 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { | ||
| 374 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||
| 375 | where | ||
| 376 | Self: 'a; | ||
| 377 | |||
| 378 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 379 | Pipe::write(self, buf).map(Ok) | ||
| 380 | } | ||
| 381 | |||
| 382 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | ||
| 383 | where | ||
| 384 | Self: 'a; | ||
| 385 | |||
| 386 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 387 | futures_util::future::ready(Ok(())) | ||
| 388 | } | ||
| 389 | } | ||
| 390 | |||
| 391 | impl<M: RawMutex, const N: usize> embedded_io::Io for &Pipe<M, N> { | ||
| 392 | type Error = Infallible; | ||
| 393 | } | ||
| 394 | |||
| 395 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { | ||
| 396 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||
| 397 | where | ||
| 398 | Self: 'a; | ||
| 399 | |||
| 400 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 401 | Pipe::read(self, buf).map(Ok) | ||
| 402 | } | ||
| 403 | } | ||
| 404 | |||
| 405 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { | ||
| 406 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||
| 407 | where | ||
| 408 | Self: 'a; | ||
| 409 | |||
| 410 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 411 | Pipe::write(self, buf).map(Ok) | ||
| 412 | } | ||
| 413 | |||
| 414 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | ||
| 415 | where | ||
| 416 | Self: 'a; | ||
| 417 | |||
| 418 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 419 | futures_util::future::ready(Ok(())) | ||
| 420 | } | ||
| 421 | } | ||
| 422 | |||
| 423 | impl<M: RawMutex, const N: usize> embedded_io::Io for Reader<'_, M, N> { | ||
| 424 | type Error = Infallible; | ||
| 425 | } | ||
| 426 | |||
| 427 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { | ||
| 428 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||
| 429 | where | ||
| 430 | Self: 'a; | ||
| 431 | |||
| 432 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 433 | Reader::read(self, buf).map(Ok) | ||
| 434 | } | ||
| 435 | } | ||
| 436 | |||
| 437 | impl<M: RawMutex, const N: usize> embedded_io::Io for Writer<'_, M, N> { | ||
| 438 | type Error = Infallible; | ||
| 439 | } | ||
| 440 | |||
| 441 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { | ||
| 442 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||
| 443 | where | ||
| 444 | Self: 'a; | ||
| 445 | |||
| 446 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 447 | Writer::write(self, buf).map(Ok) | ||
| 448 | } | ||
| 449 | |||
| 450 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | ||
| 451 | where | ||
| 452 | Self: 'a; | ||
| 453 | |||
| 454 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 455 | futures_util::future::ready(Ok(())) | ||
| 456 | } | ||
| 457 | } | ||
| 458 | } | ||
| 459 | |||
| 460 | #[cfg(test)] | ||
| 461 | mod tests { | ||
| 462 | use futures_executor::ThreadPool; | ||
| 463 | use futures_util::task::SpawnExt; | ||
| 464 | use static_cell::StaticCell; | ||
| 465 | |||
| 466 | use super::*; | ||
| 467 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; | ||
| 468 | |||
| 469 | fn capacity<const N: usize>(c: &PipeState<N>) -> usize { | ||
| 470 | N - c.buffer.len() | ||
| 471 | } | ||
| 472 | |||
| 473 | #[test] | ||
| 474 | fn writing_once() { | ||
| 475 | let mut c = PipeState::<3>::new(); | ||
| 476 | assert!(c.try_write(&[1]).is_ok()); | ||
| 477 | assert_eq!(capacity(&c), 2); | ||
| 478 | } | ||
| 479 | |||
| 480 | #[test] | ||
| 481 | fn writing_when_full() { | ||
| 482 | let mut c = PipeState::<3>::new(); | ||
| 483 | assert_eq!(c.try_write(&[42]), Ok(1)); | ||
| 484 | assert_eq!(c.try_write(&[43]), Ok(1)); | ||
| 485 | assert_eq!(c.try_write(&[44]), Ok(1)); | ||
| 486 | assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); | ||
| 487 | assert_eq!(capacity(&c), 0); | ||
| 488 | } | ||
| 489 | |||
| 490 | #[test] | ||
| 491 | fn receiving_once_with_one_send() { | ||
| 492 | let mut c = PipeState::<3>::new(); | ||
| 493 | assert!(c.try_write(&[42]).is_ok()); | ||
| 494 | let mut buf = [0; 16]; | ||
| 495 | assert_eq!(c.try_read(&mut buf), Ok(1)); | ||
| 496 | assert_eq!(buf[0], 42); | ||
| 497 | assert_eq!(capacity(&c), 3); | ||
| 498 | } | ||
| 499 | |||
| 500 | #[test] | ||
| 501 | fn receiving_when_empty() { | ||
| 502 | let mut c = PipeState::<3>::new(); | ||
| 503 | let mut buf = [0; 16]; | ||
| 504 | assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); | ||
| 505 | assert_eq!(capacity(&c), 3); | ||
| 506 | } | ||
| 507 | |||
| 508 | #[test] | ||
| 509 | fn simple_send_and_receive() { | ||
| 510 | let c = Pipe::<NoopRawMutex, 3>::new(); | ||
| 511 | assert!(c.try_write(&[42]).is_ok()); | ||
| 512 | let mut buf = [0; 16]; | ||
| 513 | assert_eq!(c.try_read(&mut buf), Ok(1)); | ||
| 514 | assert_eq!(buf[0], 42); | ||
| 515 | } | ||
| 516 | |||
| 517 | #[test] | ||
| 518 | fn cloning() { | ||
| 519 | let c = Pipe::<NoopRawMutex, 3>::new(); | ||
| 520 | let r1 = c.reader(); | ||
| 521 | let w1 = c.writer(); | ||
| 522 | |||
| 523 | let _ = r1.clone(); | ||
| 524 | let _ = w1.clone(); | ||
| 525 | } | ||
| 526 | |||
| 527 | #[futures_test::test] | ||
| 528 | async fn receiver_receives_given_try_write_async() { | ||
| 529 | let executor = ThreadPool::new().unwrap(); | ||
| 530 | |||
| 531 | static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new(); | ||
| 532 | let c = &*CHANNEL.init(Pipe::new()); | ||
| 533 | let c2 = c; | ||
| 534 | let f = async move { | ||
| 535 | assert_eq!(c2.try_write(&[42]), Ok(1)); | ||
| 536 | }; | ||
| 537 | executor.spawn(f).unwrap(); | ||
| 538 | let mut buf = [0; 16]; | ||
| 539 | assert_eq!(c.read(&mut buf).await, 1); | ||
| 540 | assert_eq!(buf[0], 42); | ||
| 541 | } | ||
| 542 | |||
| 543 | #[futures_test::test] | ||
| 544 | async fn sender_send_completes_if_capacity() { | ||
| 545 | let c = Pipe::<CriticalSectionRawMutex, 1>::new(); | ||
| 546 | c.write(&[42]).await; | ||
| 547 | let mut buf = [0; 16]; | ||
| 548 | assert_eq!(c.read(&mut buf).await, 1); | ||
| 549 | assert_eq!(buf[0], 42); | ||
| 550 | } | ||
| 551 | } | ||
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs new file mode 100644 index 000000000..521084024 --- /dev/null +++ b/embassy-sync/src/ring_buffer.rs | |||
| @@ -0,0 +1,146 @@ | |||
| 1 | pub struct RingBuffer<const N: usize> { | ||
| 2 | buf: [u8; N], | ||
| 3 | start: usize, | ||
| 4 | end: usize, | ||
| 5 | empty: bool, | ||
| 6 | } | ||
| 7 | |||
| 8 | impl<const N: usize> RingBuffer<N> { | ||
| 9 | pub const fn new() -> Self { | ||
| 10 | Self { | ||
| 11 | buf: [0; N], | ||
| 12 | start: 0, | ||
| 13 | end: 0, | ||
| 14 | empty: true, | ||
| 15 | } | ||
| 16 | } | ||
| 17 | |||
| 18 | pub fn push_buf(&mut self) -> &mut [u8] { | ||
| 19 | if self.start == self.end && !self.empty { | ||
| 20 | trace!(" ringbuf: push_buf empty"); | ||
| 21 | return &mut self.buf[..0]; | ||
| 22 | } | ||
| 23 | |||
| 24 | let n = if self.start <= self.end { | ||
| 25 | self.buf.len() - self.end | ||
| 26 | } else { | ||
| 27 | self.start - self.end | ||
| 28 | }; | ||
| 29 | |||
| 30 | trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); | ||
| 31 | &mut self.buf[self.end..self.end + n] | ||
| 32 | } | ||
| 33 | |||
| 34 | pub fn push(&mut self, n: usize) { | ||
| 35 | trace!(" ringbuf: push {:?}", n); | ||
| 36 | if n == 0 { | ||
| 37 | return; | ||
| 38 | } | ||
| 39 | |||
| 40 | self.end = self.wrap(self.end + n); | ||
| 41 | self.empty = false; | ||
| 42 | } | ||
| 43 | |||
| 44 | pub fn pop_buf(&mut self) -> &mut [u8] { | ||
| 45 | if self.empty { | ||
| 46 | trace!(" ringbuf: pop_buf empty"); | ||
| 47 | return &mut self.buf[..0]; | ||
| 48 | } | ||
| 49 | |||
| 50 | let n = if self.end <= self.start { | ||
| 51 | self.buf.len() - self.start | ||
| 52 | } else { | ||
| 53 | self.end - self.start | ||
| 54 | }; | ||
| 55 | |||
| 56 | trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); | ||
| 57 | &mut self.buf[self.start..self.start + n] | ||
| 58 | } | ||
| 59 | |||
| 60 | pub fn pop(&mut self, n: usize) { | ||
| 61 | trace!(" ringbuf: pop {:?}", n); | ||
| 62 | if n == 0 { | ||
| 63 | return; | ||
| 64 | } | ||
| 65 | |||
| 66 | self.start = self.wrap(self.start + n); | ||
| 67 | self.empty = self.start == self.end; | ||
| 68 | } | ||
| 69 | |||
| 70 | pub fn is_full(&self) -> bool { | ||
| 71 | self.start == self.end && !self.empty | ||
| 72 | } | ||
| 73 | |||
| 74 | pub fn is_empty(&self) -> bool { | ||
| 75 | self.empty | ||
| 76 | } | ||
| 77 | |||
| 78 | #[allow(unused)] | ||
| 79 | pub fn len(&self) -> usize { | ||
| 80 | if self.empty { | ||
| 81 | 0 | ||
| 82 | } else if self.start < self.end { | ||
| 83 | self.end - self.start | ||
| 84 | } else { | ||
| 85 | N + self.end - self.start | ||
| 86 | } | ||
| 87 | } | ||
| 88 | |||
| 89 | pub fn clear(&mut self) { | ||
| 90 | self.start = 0; | ||
| 91 | self.end = 0; | ||
| 92 | self.empty = true; | ||
| 93 | } | ||
| 94 | |||
| 95 | fn wrap(&self, n: usize) -> usize { | ||
| 96 | assert!(n <= self.buf.len()); | ||
| 97 | if n == self.buf.len() { | ||
| 98 | 0 | ||
| 99 | } else { | ||
| 100 | n | ||
| 101 | } | ||
| 102 | } | ||
| 103 | } | ||
| 104 | |||
| 105 | #[cfg(test)] | ||
| 106 | mod tests { | ||
| 107 | use super::*; | ||
| 108 | |||
| 109 | #[test] | ||
| 110 | fn push_pop() { | ||
| 111 | let mut rb: RingBuffer<4> = RingBuffer::new(); | ||
| 112 | let buf = rb.push_buf(); | ||
| 113 | assert_eq!(4, buf.len()); | ||
| 114 | buf[0] = 1; | ||
| 115 | buf[1] = 2; | ||
| 116 | buf[2] = 3; | ||
| 117 | buf[3] = 4; | ||
| 118 | rb.push(4); | ||
| 119 | |||
| 120 | let buf = rb.pop_buf(); | ||
| 121 | assert_eq!(4, buf.len()); | ||
| 122 | assert_eq!(1, buf[0]); | ||
| 123 | rb.pop(1); | ||
| 124 | |||
| 125 | let buf = rb.pop_buf(); | ||
| 126 | assert_eq!(3, buf.len()); | ||
| 127 | assert_eq!(2, buf[0]); | ||
| 128 | rb.pop(1); | ||
| 129 | |||
| 130 | let buf = rb.pop_buf(); | ||
| 131 | assert_eq!(2, buf.len()); | ||
| 132 | assert_eq!(3, buf[0]); | ||
| 133 | rb.pop(1); | ||
| 134 | |||
| 135 | let buf = rb.pop_buf(); | ||
| 136 | assert_eq!(1, buf.len()); | ||
| 137 | assert_eq!(4, buf[0]); | ||
| 138 | rb.pop(1); | ||
| 139 | |||
| 140 | let buf = rb.pop_buf(); | ||
| 141 | assert_eq!(0, buf.len()); | ||
| 142 | |||
| 143 | let buf = rb.push_buf(); | ||
| 144 | assert_eq!(4, buf.len()); | ||
| 145 | } | ||
| 146 | } | ||
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs new file mode 100644 index 000000000..6661a6b61 --- /dev/null +++ b/embassy-sync/src/waitqueue/mod.rs | |||
| @@ -0,0 +1,7 @@ | |||
| 1 | //! Async low-level wait queues | ||
| 2 | |||
| 3 | mod waker; | ||
| 4 | pub use waker::*; | ||
| 5 | |||
| 6 | mod multi_waker; | ||
| 7 | pub use multi_waker::*; | ||
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs new file mode 100644 index 000000000..325d2cb3a --- /dev/null +++ b/embassy-sync/src/waitqueue/multi_waker.rs | |||
| @@ -0,0 +1,33 @@ | |||
| 1 | use core::task::Waker; | ||
| 2 | |||
| 3 | use super::WakerRegistration; | ||
| 4 | |||
| 5 | /// Utility struct to register and wake multiple wakers. | ||
| 6 | pub struct MultiWakerRegistration<const N: usize> { | ||
| 7 | wakers: [WakerRegistration; N], | ||
| 8 | } | ||
| 9 | |||
| 10 | impl<const N: usize> MultiWakerRegistration<N> { | ||
| 11 | /// Create a new empty instance | ||
| 12 | pub const fn new() -> Self { | ||
| 13 | const WAKER: WakerRegistration = WakerRegistration::new(); | ||
| 14 | Self { wakers: [WAKER; N] } | ||
| 15 | } | ||
| 16 | |||
| 17 | /// Register a waker. If the buffer is full the function returns it in the error | ||
| 18 | pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { | ||
| 19 | if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { | ||
| 20 | waker_slot.register(w); | ||
| 21 | Ok(()) | ||
| 22 | } else { | ||
| 23 | Err(w) | ||
| 24 | } | ||
| 25 | } | ||
| 26 | |||
| 27 | /// Wake all registered wakers. This clears the buffer | ||
| 28 | pub fn wake(&mut self) { | ||
| 29 | for waker_slot in self.wakers.iter_mut() { | ||
| 30 | waker_slot.wake() | ||
| 31 | } | ||
| 32 | } | ||
| 33 | } | ||
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs new file mode 100644 index 000000000..64e300eb8 --- /dev/null +++ b/embassy-sync/src/waitqueue/waker.rs | |||
| @@ -0,0 +1,92 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::mem; | ||
| 3 | use core::task::Waker; | ||
| 4 | |||
| 5 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 6 | use crate::blocking_mutex::Mutex; | ||
| 7 | |||
| 8 | /// Utility struct to register and wake a waker. | ||
| 9 | #[derive(Debug)] | ||
| 10 | pub struct WakerRegistration { | ||
| 11 | waker: Option<Waker>, | ||
| 12 | } | ||
| 13 | |||
| 14 | impl WakerRegistration { | ||
| 15 | /// Create a new `WakerRegistration`. | ||
| 16 | pub const fn new() -> Self { | ||
| 17 | Self { waker: None } | ||
| 18 | } | ||
| 19 | |||
| 20 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 21 | pub fn register(&mut self, w: &Waker) { | ||
| 22 | match self.waker { | ||
| 23 | // Optimization: If both the old and new Wakers wake the same task, we can simply | ||
| 24 | // keep the old waker, skipping the clone. (In most executor implementations, | ||
| 25 | // cloning a waker is somewhat expensive, comparable to cloning an Arc). | ||
| 26 | Some(ref w2) if (w2.will_wake(w)) => {} | ||
| 27 | _ => { | ||
| 28 | // clone the new waker and store it | ||
| 29 | if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { | ||
| 30 | // We had a waker registered for another task. Wake it, so the other task can | ||
| 31 | // reregister itself if it's still interested. | ||
| 32 | // | ||
| 33 | // If two tasks are waiting on the same thing concurrently, this will cause them | ||
| 34 | // to wake each other in a loop fighting over this WakerRegistration. This wastes | ||
| 35 | // CPU but things will still work. | ||
| 36 | // | ||
| 37 | // If the user wants to have two tasks waiting on the same thing they should use | ||
| 38 | // a more appropriate primitive that can store multiple wakers. | ||
| 39 | old_waker.wake() | ||
| 40 | } | ||
| 41 | } | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Wake the registered waker, if any. | ||
| 46 | pub fn wake(&mut self) { | ||
| 47 | if let Some(w) = self.waker.take() { | ||
| 48 | w.wake() | ||
| 49 | } | ||
| 50 | } | ||
| 51 | |||
| 52 | /// Returns true if a waker is currently registered | ||
| 53 | pub fn occupied(&self) -> bool { | ||
| 54 | self.waker.is_some() | ||
| 55 | } | ||
| 56 | } | ||
| 57 | |||
| 58 | /// Utility struct to register and wake a waker. | ||
| 59 | pub struct AtomicWaker { | ||
| 60 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | ||
| 61 | } | ||
| 62 | |||
| 63 | impl AtomicWaker { | ||
| 64 | /// Create a new `AtomicWaker`. | ||
| 65 | pub const fn new() -> Self { | ||
| 66 | Self { | ||
| 67 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | ||
| 68 | } | ||
| 69 | } | ||
| 70 | |||
| 71 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 72 | pub fn register(&self, w: &Waker) { | ||
| 73 | critical_section::with(|cs| { | ||
| 74 | let cell = self.waker.borrow(cs); | ||
| 75 | cell.set(match cell.replace(None) { | ||
| 76 | Some(w2) if (w2.will_wake(w)) => Some(w2), | ||
| 77 | _ => Some(w.clone()), | ||
| 78 | }) | ||
| 79 | }) | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Wake the registered waker, if any. | ||
| 83 | pub fn wake(&self) { | ||
| 84 | critical_section::with(|cs| { | ||
| 85 | let cell = self.waker.borrow(cs); | ||
| 86 | if let Some(w) = cell.replace(None) { | ||
| 87 | w.wake_by_ref(); | ||
| 88 | cell.set(Some(w)); | ||
| 89 | } | ||
| 90 | }) | ||
| 91 | } | ||
| 92 | } | ||
