aboutsummaryrefslogtreecommitdiff
path: root/embassy-util
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-util')
-rw-r--r--embassy-util/Cargo.toml28
-rw-r--r--embassy-util/build.rs29
-rw-r--r--embassy-util/src/blocking_mutex/mod.rs189
-rw-r--r--embassy-util/src/blocking_mutex/raw.rs149
-rw-r--r--embassy-util/src/channel/mod.rs5
-rw-r--r--embassy-util/src/channel/mpmc.rs596
-rw-r--r--embassy-util/src/channel/pubsub/mod.rs542
-rw-r--r--embassy-util/src/channel/pubsub/publisher.rs182
-rw-r--r--embassy-util/src/channel/pubsub/subscriber.rs152
-rw-r--r--embassy-util/src/channel/signal.rs99
-rw-r--r--embassy-util/src/fmt.rs228
-rw-r--r--embassy-util/src/forever.rs95
-rw-r--r--embassy-util/src/lib.rs22
-rw-r--r--embassy-util/src/mutex.rs167
-rw-r--r--embassy-util/src/select.rs230
-rw-r--r--embassy-util/src/waitqueue/mod.rs7
-rw-r--r--embassy-util/src/waitqueue/multi_waker.rs33
-rw-r--r--embassy-util/src/waitqueue/waker.rs92
-rw-r--r--embassy-util/src/yield_now.rs25
19 files changed, 2870 insertions, 0 deletions
diff --git a/embassy-util/Cargo.toml b/embassy-util/Cargo.toml
new file mode 100644
index 000000000..32b796c0a
--- /dev/null
+++ b/embassy-util/Cargo.toml
@@ -0,0 +1,28 @@
1[package]
2name = "embassy-util"
3version = "0.1.0"
4edition = "2021"
5
6[package.metadata.embassy_docs]
7src_base = "https://github.com/embassy-rs/embassy/blob/embassy-util-v$VERSION/embassy-util/src/"
8src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-util/src/"
9features = ["nightly", "defmt", "unstable-traits", "time", "time-tick-1mhz"]
10flavors = [
11 { name = "default", target = "x86_64-unknown-linux-gnu" },
12]
13
14[dependencies]
15defmt = { version = "0.3", optional = true }
16log = { version = "0.4.14", optional = true }
17
18futures-util = { version = "0.3.17", default-features = false }
19atomic-polyfill = "0.1.5"
20critical-section = "0.2.5"
21heapless = "0.7.5"
22cfg-if = "1.0.0"
23
24[dev-dependencies]
25futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
26futures-test = "0.3.17"
27futures-timer = "3.0.2"
28futures-util = { version = "0.3.17", features = [ "channel" ] }
diff --git a/embassy-util/build.rs b/embassy-util/build.rs
new file mode 100644
index 000000000..6fe82b44f
--- /dev/null
+++ b/embassy-util/build.rs
@@ -0,0 +1,29 @@
1use std::env;
2
3fn main() {
4 let target = env::var("TARGET").unwrap();
5
6 if target.starts_with("thumbv6m-") {
7 println!("cargo:rustc-cfg=cortex_m");
8 println!("cargo:rustc-cfg=armv6m");
9 } else if target.starts_with("thumbv7m-") {
10 println!("cargo:rustc-cfg=cortex_m");
11 println!("cargo:rustc-cfg=armv7m");
12 } else if target.starts_with("thumbv7em-") {
13 println!("cargo:rustc-cfg=cortex_m");
14 println!("cargo:rustc-cfg=armv7m");
15 println!("cargo:rustc-cfg=armv7em"); // (not currently used)
16 } else if target.starts_with("thumbv8m.base") {
17 println!("cargo:rustc-cfg=cortex_m");
18 println!("cargo:rustc-cfg=armv8m");
19 println!("cargo:rustc-cfg=armv8m_base");
20 } else if target.starts_with("thumbv8m.main") {
21 println!("cargo:rustc-cfg=cortex_m");
22 println!("cargo:rustc-cfg=armv8m");
23 println!("cargo:rustc-cfg=armv8m_main");
24 }
25
26 if target.ends_with("-eabihf") {
27 println!("cargo:rustc-cfg=has_fpu");
28 }
29}
diff --git a/embassy-util/src/blocking_mutex/mod.rs b/embassy-util/src/blocking_mutex/mod.rs
new file mode 100644
index 000000000..8a4a4c642
--- /dev/null
+++ b/embassy-util/src/blocking_mutex/mod.rs
@@ -0,0 +1,189 @@
1//! Blocking mutex.
2//!
3//! This module provides a blocking mutex that can be used to synchronize data.
4pub mod raw;
5
6use core::cell::UnsafeCell;
7
8use self::raw::RawMutex;
9
10/// Blocking mutex (not async)
11///
12/// Provides a blocking mutual exclusion primitive backed by an implementation of [`raw::RawMutex`].
13///
14/// Which implementation you select depends on the context in which you're using the mutex, and you can choose which kind
15/// of interior mutability fits your use case.
16///
17/// Use [`CriticalSectionMutex`] when data can be shared between threads and interrupts.
18///
19/// Use [`NoopMutex`] when data is only shared between tasks running on the same executor.
20///
21/// Use [`ThreadModeMutex`] when data is shared between tasks running on the same executor but you want a global singleton.
22///
23/// In all cases, the blocking mutex is intended to be short lived and not held across await points.
24/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points.
25pub struct Mutex<R, T: ?Sized> {
26 // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets
27 // to run BEFORE dropping `data`.
28 raw: R,
29 data: UnsafeCell<T>,
30}
31
32unsafe impl<R: RawMutex + Send, T: ?Sized + Send> Send for Mutex<R, T> {}
33unsafe impl<R: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<R, T> {}
34
35impl<R: RawMutex, T> Mutex<R, T> {
36 /// Creates a new mutex in an unlocked state ready for use.
37 #[inline]
38 pub const fn new(val: T) -> Mutex<R, T> {
39 Mutex {
40 raw: R::INIT,
41 data: UnsafeCell::new(val),
42 }
43 }
44
45 /// Creates a critical section and grants temporary access to the protected data.
46 pub fn lock<U>(&self, f: impl FnOnce(&T) -> U) -> U {
47 self.raw.lock(|| {
48 let ptr = self.data.get() as *const T;
49 let inner = unsafe { &*ptr };
50 f(inner)
51 })
52 }
53}
54
55impl<R, T> Mutex<R, T> {
56 /// Creates a new mutex based on a pre-existing raw mutex.
57 ///
58 /// This allows creating a mutex in a constant context on stable Rust.
59 #[inline]
60 pub const fn const_new(raw_mutex: R, val: T) -> Mutex<R, T> {
61 Mutex {
62 raw: raw_mutex,
63 data: UnsafeCell::new(val),
64 }
65 }
66
67 /// Consumes this mutex, returning the underlying data.
68 #[inline]
69 pub fn into_inner(self) -> T {
70 self.data.into_inner()
71 }
72
73 /// Returns a mutable reference to the underlying data.
74 ///
75 /// Since this call borrows the `Mutex` mutably, no actual locking needs to
76 /// take place---the mutable borrow statically guarantees no locks exist.
77 #[inline]
78 pub fn get_mut(&mut self) -> &mut T {
79 unsafe { &mut *self.data.get() }
80 }
81}
82
83/// A mutex that allows borrowing data across executors and interrupts.
84///
85/// # Safety
86///
87/// This mutex is safe to share between different executors and interrupts.
88pub type CriticalSectionMutex<T> = Mutex<raw::CriticalSectionRawMutex, T>;
89
90/// A mutex that allows borrowing data in the context of a single executor.
91///
92/// # Safety
93///
94/// **This Mutex is only safe within a single executor.**
95pub type NoopMutex<T> = Mutex<raw::NoopRawMutex, T>;
96
97impl<T> Mutex<raw::CriticalSectionRawMutex, T> {
98 /// Borrows the data for the duration of the critical section
99 pub fn borrow<'cs>(&'cs self, _cs: critical_section::CriticalSection<'cs>) -> &'cs T {
100 let ptr = self.data.get() as *const T;
101 unsafe { &*ptr }
102 }
103}
104
105impl<T> Mutex<raw::NoopRawMutex, T> {
106 /// Borrows the data
107 pub fn borrow(&self) -> &T {
108 let ptr = self.data.get() as *const T;
109 unsafe { &*ptr }
110 }
111}
112
113// ThreadModeMutex does NOT use the generic mutex from above because it's special:
114// it's Send+Sync even if T: !Send. There's no way to do that without specialization (I think?).
115//
116// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example),
117// but that will require T: Send even though it shouldn't be needed.
118
119#[cfg(any(cortex_m, feature = "std"))]
120pub use thread_mode_mutex::*;
121#[cfg(any(cortex_m, feature = "std"))]
122mod thread_mode_mutex {
123 use super::*;
124
125 /// A "mutex" that only allows borrowing from thread mode.
126 ///
127 /// # Safety
128 ///
129 /// **This Mutex is only safe on single-core systems.**
130 ///
131 /// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access.
132 pub struct ThreadModeMutex<T: ?Sized> {
133 inner: UnsafeCell<T>,
134 }
135
136 // NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode.
137 // Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can
138 // be Send+Sync even if T is not Send (unlike CriticalSectionMutex)
139 unsafe impl<T: ?Sized> Sync for ThreadModeMutex<T> {}
140 unsafe impl<T: ?Sized> Send for ThreadModeMutex<T> {}
141
142 impl<T> ThreadModeMutex<T> {
143 /// Creates a new mutex
144 pub const fn new(value: T) -> Self {
145 ThreadModeMutex {
146 inner: UnsafeCell::new(value),
147 }
148 }
149 }
150
151 impl<T: ?Sized> ThreadModeMutex<T> {
152 /// Lock the `ThreadModeMutex`, granting access to the data.
153 ///
154 /// # Panics
155 ///
156 /// This will panic if not currently running in thread mode.
157 pub fn lock<R>(&self, f: impl FnOnce(&T) -> R) -> R {
158 f(self.borrow())
159 }
160
161 /// Borrows the data
162 ///
163 /// # Panics
164 ///
165 /// This will panic if not currently running in thread mode.
166 pub fn borrow(&self) -> &T {
167 assert!(
168 raw::in_thread_mode(),
169 "ThreadModeMutex can only be borrowed from thread mode."
170 );
171 unsafe { &*self.inner.get() }
172 }
173 }
174
175 impl<T: ?Sized> Drop for ThreadModeMutex<T> {
176 fn drop(&mut self) {
177 // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so
178 // `drop` needs the same guarantees as `lock`. `ThreadModeMutex<T>` is Send even if
179 // T isn't, so without this check a user could create a ThreadModeMutex in thread mode,
180 // send it to interrupt context and drop it there, which would "send" a T even if T is not Send.
181 assert!(
182 raw::in_thread_mode(),
183 "ThreadModeMutex can only be dropped from thread mode."
184 );
185
186 // Drop of the inner `T` happens after this.
187 }
188 }
189}
diff --git a/embassy-util/src/blocking_mutex/raw.rs b/embassy-util/src/blocking_mutex/raw.rs
new file mode 100644
index 000000000..15796f1b2
--- /dev/null
+++ b/embassy-util/src/blocking_mutex/raw.rs
@@ -0,0 +1,149 @@
1//! Mutex primitives.
2//!
3//! This module provides a trait for mutexes that can be used in different contexts.
4use core::marker::PhantomData;
5
6/// Raw mutex trait.
7///
8/// This mutex is "raw", which means it does not actually contain the protected data, it
9/// just implements the mutex mechanism. For most uses you should use [`super::Mutex`] instead,
10/// which is generic over a RawMutex and contains the protected data.
11///
12/// Note that, unlike other mutexes, implementations only guarantee no
13/// concurrent access from other threads: concurrent access from the current
14/// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly.
15///
16/// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access
17/// to the data, it is not enough to guarantee exclusive (`&mut`) access.
18///
19/// # Safety
20///
21/// RawMutex implementations must ensure that, while locked, no other thread can lock
22/// the RawMutex concurrently.
23///
24/// Unsafe code is allowed to rely on this fact, so incorrect implementations will cause undefined behavior.
25pub unsafe trait RawMutex {
26 /// Create a new `RawMutex` instance.
27 ///
28 /// This is a const instead of a method to allow creating instances in const context.
29 const INIT: Self;
30
31 /// Lock this `RawMutex`.
32 fn lock<R>(&self, f: impl FnOnce() -> R) -> R;
33}
34
35/// A mutex that allows borrowing data across executors and interrupts.
36///
37/// # Safety
38///
39/// This mutex is safe to share between different executors and interrupts.
40pub struct CriticalSectionRawMutex {
41 _phantom: PhantomData<()>,
42}
43unsafe impl Send for CriticalSectionRawMutex {}
44unsafe impl Sync for CriticalSectionRawMutex {}
45
46impl CriticalSectionRawMutex {
47 /// Create a new `CriticalSectionRawMutex`.
48 pub const fn new() -> Self {
49 Self { _phantom: PhantomData }
50 }
51}
52
53unsafe impl RawMutex for CriticalSectionRawMutex {
54 const INIT: Self = Self::new();
55
56 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
57 critical_section::with(|_| f())
58 }
59}
60
61// ================
62
63/// A mutex that allows borrowing data in the context of a single executor.
64///
65/// # Safety
66///
67/// **This Mutex is only safe within a single executor.**
68pub struct NoopRawMutex {
69 _phantom: PhantomData<*mut ()>,
70}
71
72unsafe impl Send for NoopRawMutex {}
73
74impl NoopRawMutex {
75 /// Create a new `NoopRawMutex`.
76 pub const fn new() -> Self {
77 Self { _phantom: PhantomData }
78 }
79}
80
81unsafe impl RawMutex for NoopRawMutex {
82 const INIT: Self = Self::new();
83 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
84 f()
85 }
86}
87
88// ================
89
90#[cfg(any(cortex_m, feature = "std"))]
91mod thread_mode {
92 use super::*;
93
94 /// A "mutex" that only allows borrowing from thread mode.
95 ///
96 /// # Safety
97 ///
98 /// **This Mutex is only safe on single-core systems.**
99 ///
100 /// On multi-core systems, a `ThreadModeRawMutex` **is not sufficient** to ensure exclusive access.
101 pub struct ThreadModeRawMutex {
102 _phantom: PhantomData<()>,
103 }
104
105 unsafe impl Send for ThreadModeRawMutex {}
106 unsafe impl Sync for ThreadModeRawMutex {}
107
108 impl ThreadModeRawMutex {
109 /// Create a new `ThreadModeRawMutex`.
110 pub const fn new() -> Self {
111 Self { _phantom: PhantomData }
112 }
113 }
114
115 unsafe impl RawMutex for ThreadModeRawMutex {
116 const INIT: Self = Self::new();
117 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
118 assert!(in_thread_mode(), "ThreadModeMutex can only be locked from thread mode.");
119
120 f()
121 }
122 }
123
124 impl Drop for ThreadModeRawMutex {
125 fn drop(&mut self) {
126 // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so
127 // `drop` needs the same guarantees as `lock`. `ThreadModeMutex<T>` is Send even if
128 // T isn't, so without this check a user could create a ThreadModeMutex in thread mode,
129 // send it to interrupt context and drop it there, which would "send" a T even if T is not Send.
130 assert!(
131 in_thread_mode(),
132 "ThreadModeMutex can only be dropped from thread mode."
133 );
134
135 // Drop of the inner `T` happens after this.
136 }
137 }
138
139 pub(crate) fn in_thread_mode() -> bool {
140 #[cfg(feature = "std")]
141 return Some("main") == std::thread::current().name();
142
143 #[cfg(not(feature = "std"))]
144 // ICSR.VECTACTIVE == 0
145 return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0;
146 }
147}
148#[cfg(any(cortex_m, feature = "std"))]
149pub use thread_mode::*;
diff --git a/embassy-util/src/channel/mod.rs b/embassy-util/src/channel/mod.rs
new file mode 100644
index 000000000..5df1f5c5c
--- /dev/null
+++ b/embassy-util/src/channel/mod.rs
@@ -0,0 +1,5 @@
1//! Async channels
2
3pub mod mpmc;
4pub mod pubsub;
5pub mod signal;
diff --git a/embassy-util/src/channel/mpmc.rs b/embassy-util/src/channel/mpmc.rs
new file mode 100644
index 000000000..535f77e6f
--- /dev/null
+++ b/embassy-util/src/channel/mpmc.rs
@@ -0,0 +1,596 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20
21use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32/// Send-only access to a [`Channel`].
33#[derive(Copy)]
34pub struct Sender<'ch, M, T, const N: usize>
35where
36 M: RawMutex,
37{
38 channel: &'ch Channel<M, T, N>,
39}
40
41impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
42where
43 M: RawMutex,
44{
45 fn clone(&self) -> Self {
46 Sender { channel: self.channel }
47 }
48}
49
50impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
51where
52 M: RawMutex,
53{
54 /// Sends a value.
55 ///
56 /// See [`Channel::send()`]
57 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
58 self.channel.send(message)
59 }
60
61 /// Attempt to immediately send a message.
62 ///
63 /// See [`Channel::send()`]
64 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67}
68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, T> {
72 channel: &'ch dyn DynamicChannel<T>,
73}
74
75impl<'ch, T> Clone for DynamicSender<'ch, T> {
76 fn clone(&self) -> Self {
77 DynamicSender { channel: self.channel }
78 }
79}
80
81impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
82where
83 M: RawMutex,
84{
85 fn from(s: Sender<'ch, M, T, N>) -> Self {
86 Self { channel: s.channel }
87 }
88}
89
90impl<'ch, T> DynamicSender<'ch, T> {
91 /// Sends a value.
92 ///
93 /// See [`Channel::send()`]
94 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
95 DynamicSendFuture {
96 channel: self.channel,
97 message: Some(message),
98 }
99 }
100
101 /// Attempt to immediately send a message.
102 ///
103 /// See [`Channel::send()`]
104 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
105 self.channel.try_send_with_context(message, None)
106 }
107}
108
109/// Receive-only access to a [`Channel`].
110#[derive(Copy)]
111pub struct Receiver<'ch, M, T, const N: usize>
112where
113 M: RawMutex,
114{
115 channel: &'ch Channel<M, T, N>,
116}
117
118impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
119where
120 M: RawMutex,
121{
122 fn clone(&self) -> Self {
123 Receiver { channel: self.channel }
124 }
125}
126
127impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
128where
129 M: RawMutex,
130{
131 /// Receive the next value.
132 ///
133 /// See [`Channel::recv()`].
134 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
135 self.channel.recv()
136 }
137
138 /// Attempt to immediately receive the next value.
139 ///
140 /// See [`Channel::try_recv()`]
141 pub fn try_recv(&self) -> Result<T, TryRecvError> {
142 self.channel.try_recv()
143 }
144}
145
146/// Receive-only access to a [`Channel`] without knowing channel size.
147#[derive(Copy)]
148pub struct DynamicReceiver<'ch, T> {
149 channel: &'ch dyn DynamicChannel<T>,
150}
151
152impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
153 fn clone(&self) -> Self {
154 DynamicReceiver { channel: self.channel }
155 }
156}
157
158impl<'ch, T> DynamicReceiver<'ch, T> {
159 /// Receive the next value.
160 ///
161 /// See [`Channel::recv()`].
162 pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
163 DynamicRecvFuture { channel: self.channel }
164 }
165
166 /// Attempt to immediately receive the next value.
167 ///
168 /// See [`Channel::try_recv()`]
169 pub fn try_recv(&self) -> Result<T, TryRecvError> {
170 self.channel.try_recv_with_context(None)
171 }
172}
173
174impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
175where
176 M: RawMutex,
177{
178 fn from(s: Receiver<'ch, M, T, N>) -> Self {
179 Self { channel: s.channel }
180 }
181}
182
183/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
184pub struct RecvFuture<'ch, M, T, const N: usize>
185where
186 M: RawMutex,
187{
188 channel: &'ch Channel<M, T, N>,
189}
190
191impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
192where
193 M: RawMutex,
194{
195 type Output = T;
196
197 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
198 match self.channel.try_recv_with_context(Some(cx)) {
199 Ok(v) => Poll::Ready(v),
200 Err(TryRecvError::Empty) => Poll::Pending,
201 }
202 }
203}
204
205/// Future returned by [`DynamicReceiver::recv`].
206pub struct DynamicRecvFuture<'ch, T> {
207 channel: &'ch dyn DynamicChannel<T>,
208}
209
210impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
211 type Output = T;
212
213 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
214 match self.channel.try_recv_with_context(Some(cx)) {
215 Ok(v) => Poll::Ready(v),
216 Err(TryRecvError::Empty) => Poll::Pending,
217 }
218 }
219}
220
221/// Future returned by [`Channel::send`] and [`Sender::send`].
222pub struct SendFuture<'ch, M, T, const N: usize>
223where
224 M: RawMutex,
225{
226 channel: &'ch Channel<M, T, N>,
227 message: Option<T>,
228}
229
230impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
231where
232 M: RawMutex,
233{
234 type Output = ();
235
236 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237 match self.message.take() {
238 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
239 Ok(..) => Poll::Ready(()),
240 Err(TrySendError::Full(m)) => {
241 self.message = Some(m);
242 Poll::Pending
243 }
244 },
245 None => panic!("Message cannot be None"),
246 }
247 }
248}
249
250impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
251
252/// Future returned by [`DynamicSender::send`].
253pub struct DynamicSendFuture<'ch, T> {
254 channel: &'ch dyn DynamicChannel<T>,
255 message: Option<T>,
256}
257
258impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
259 type Output = ();
260
261 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 match self.message.take() {
263 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
264 Ok(..) => Poll::Ready(()),
265 Err(TrySendError::Full(m)) => {
266 self.message = Some(m);
267 Poll::Pending
268 }
269 },
270 None => panic!("Message cannot be None"),
271 }
272 }
273}
274
275impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
276
277trait DynamicChannel<T> {
278 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
279
280 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
281}
282
283/// Error returned by [`try_recv`](Channel::try_recv).
284#[derive(PartialEq, Eq, Clone, Copy, Debug)]
285#[cfg_attr(feature = "defmt", derive(defmt::Format))]
286pub enum TryRecvError {
287 /// A message could not be received because the channel is empty.
288 Empty,
289}
290
291/// Error returned by [`try_send`](Channel::try_send).
292#[derive(PartialEq, Eq, Clone, Copy, Debug)]
293#[cfg_attr(feature = "defmt", derive(defmt::Format))]
294pub enum TrySendError<T> {
295 /// The data could not be sent on the channel because the channel is
296 /// currently full and sending would require blocking.
297 Full(T),
298}
299
300struct ChannelState<T, const N: usize> {
301 queue: Deque<T, N>,
302 receiver_waker: WakerRegistration,
303 senders_waker: WakerRegistration,
304}
305
306impl<T, const N: usize> ChannelState<T, N> {
307 const fn new() -> Self {
308 ChannelState {
309 queue: Deque::new(),
310 receiver_waker: WakerRegistration::new(),
311 senders_waker: WakerRegistration::new(),
312 }
313 }
314
315 fn try_recv(&mut self) -> Result<T, TryRecvError> {
316 self.try_recv_with_context(None)
317 }
318
319 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
320 if self.queue.is_full() {
321 self.senders_waker.wake();
322 }
323
324 if let Some(message) = self.queue.pop_front() {
325 Ok(message)
326 } else {
327 if let Some(cx) = cx {
328 self.receiver_waker.register(cx.waker());
329 }
330 Err(TryRecvError::Empty)
331 }
332 }
333
334 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
335 self.try_send_with_context(message, None)
336 }
337
338 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
339 match self.queue.push_back(message) {
340 Ok(()) => {
341 self.receiver_waker.wake();
342 Ok(())
343 }
344 Err(message) => {
345 if let Some(cx) = cx {
346 self.senders_waker.register(cx.waker());
347 }
348 Err(TrySendError::Full(message))
349 }
350 }
351 }
352}
353
354/// A bounded channel for communicating between asynchronous tasks
355/// with backpressure.
356///
357/// The channel will buffer up to the provided number of messages. Once the
358/// buffer is full, attempts to `send` new messages will wait until a message is
359/// received from the channel.
360///
361/// All data sent will become available in the same order as it was sent.
362pub struct Channel<M, T, const N: usize>
363where
364 M: RawMutex,
365{
366 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
367}
368
369impl<M, T, const N: usize> Channel<M, T, N>
370where
371 M: RawMutex,
372{
373 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
374 ///
375 /// ```
376 /// use embassy_util::channel::mpmc::Channel;
377 /// use embassy_util::blocking_mutex::raw::NoopRawMutex;
378 ///
379 /// // Declare a bounded channel of 3 u32s.
380 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
381 /// ```
382 pub const fn new() -> Self {
383 Self {
384 inner: Mutex::new(RefCell::new(ChannelState::new())),
385 }
386 }
387
388 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
389 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
390 }
391
392 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
393 self.lock(|c| c.try_recv_with_context(cx))
394 }
395
396 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
397 self.lock(|c| c.try_send_with_context(m, cx))
398 }
399
400 /// Get a sender for this channel.
401 pub fn sender(&self) -> Sender<'_, M, T, N> {
402 Sender { channel: self }
403 }
404
405 /// Get a receiver for this channel.
406 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
407 Receiver { channel: self }
408 }
409
410 /// Send a value, waiting until there is capacity.
411 ///
412 /// Sending completes when the value has been pushed to the channel's queue.
413 /// This doesn't mean the value has been received yet.
414 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
415 SendFuture {
416 channel: self,
417 message: Some(message),
418 }
419 }
420
421 /// Attempt to immediately send a message.
422 ///
423 /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
424 /// buffer is full, instead of waiting.
425 ///
426 /// # Errors
427 ///
428 /// If the channel capacity has been reached, i.e., the channel has `n`
429 /// buffered values where `n` is the argument passed to [`Channel`], then an
430 /// error is returned.
431 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
432 self.lock(|c| c.try_send(message))
433 }
434
435 /// Receive the next value.
436 ///
437 /// If there are no messages in the channel's buffer, this method will
438 /// wait until a message is sent.
439 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
440 RecvFuture { channel: self }
441 }
442
443 /// Attempt to immediately receive a message.
444 ///
445 /// This method will either receive a message from the channel immediately or return an error
446 /// if the channel is empty.
447 pub fn try_recv(&self) -> Result<T, TryRecvError> {
448 self.lock(|c| c.try_recv())
449 }
450}
451
452/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
453/// tradeoff cost of dynamic dispatch.
454impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
455where
456 M: RawMutex,
457{
458 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
459 Channel::try_send_with_context(self, m, cx)
460 }
461
462 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
463 Channel::try_recv_with_context(self, cx)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use core::time::Duration;
470
471 use futures_executor::ThreadPool;
472 use futures_timer::Delay;
473 use futures_util::task::SpawnExt;
474
475 use super::*;
476 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
477 use crate::Forever;
478
479 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
480 c.queue.capacity() - c.queue.len()
481 }
482
483 #[test]
484 fn sending_once() {
485 let mut c = ChannelState::<u32, 3>::new();
486 assert!(c.try_send(1).is_ok());
487 assert_eq!(capacity(&c), 2);
488 }
489
490 #[test]
491 fn sending_when_full() {
492 let mut c = ChannelState::<u32, 3>::new();
493 let _ = c.try_send(1);
494 let _ = c.try_send(1);
495 let _ = c.try_send(1);
496 match c.try_send(2) {
497 Err(TrySendError::Full(2)) => assert!(true),
498 _ => assert!(false),
499 }
500 assert_eq!(capacity(&c), 0);
501 }
502
503 #[test]
504 fn receiving_once_with_one_send() {
505 let mut c = ChannelState::<u32, 3>::new();
506 assert!(c.try_send(1).is_ok());
507 assert_eq!(c.try_recv().unwrap(), 1);
508 assert_eq!(capacity(&c), 3);
509 }
510
511 #[test]
512 fn receiving_when_empty() {
513 let mut c = ChannelState::<u32, 3>::new();
514 match c.try_recv() {
515 Err(TryRecvError::Empty) => assert!(true),
516 _ => assert!(false),
517 }
518 assert_eq!(capacity(&c), 3);
519 }
520
521 #[test]
522 fn simple_send_and_receive() {
523 let c = Channel::<NoopRawMutex, u32, 3>::new();
524 assert!(c.try_send(1).is_ok());
525 assert_eq!(c.try_recv().unwrap(), 1);
526 }
527
528 #[test]
529 fn cloning() {
530 let c = Channel::<NoopRawMutex, u32, 3>::new();
531 let r1 = c.receiver();
532 let s1 = c.sender();
533
534 let _ = r1.clone();
535 let _ = s1.clone();
536 }
537
538 #[test]
539 fn dynamic_dispatch() {
540 let c = Channel::<NoopRawMutex, u32, 3>::new();
541 let s: DynamicSender<'_, u32> = c.sender().into();
542 let r: DynamicReceiver<'_, u32> = c.receiver().into();
543
544 assert!(s.try_send(1).is_ok());
545 assert_eq!(r.try_recv().unwrap(), 1);
546 }
547
548 #[futures_test::test]
549 async fn receiver_receives_given_try_send_async() {
550 let executor = ThreadPool::new().unwrap();
551
552 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 3>> = Forever::new();
553 let c = &*CHANNEL.put(Channel::new());
554 let c2 = c;
555 assert!(executor
556 .spawn(async move {
557 assert!(c2.try_send(1).is_ok());
558 })
559 .is_ok());
560 assert_eq!(c.recv().await, 1);
561 }
562
563 #[futures_test::test]
564 async fn sender_send_completes_if_capacity() {
565 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
566 c.send(1).await;
567 assert_eq!(c.recv().await, 1);
568 }
569
570 #[futures_test::test]
571 async fn senders_sends_wait_until_capacity() {
572 let executor = ThreadPool::new().unwrap();
573
574 static CHANNEL: Forever<Channel<CriticalSectionRawMutex, u32, 1>> = Forever::new();
575 let c = &*CHANNEL.put(Channel::new());
576 assert!(c.try_send(1).is_ok());
577
578 let c2 = c;
579 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
580 let c2 = c;
581 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
582 // Wish I could think of a means of determining that the async send is waiting instead.
583 // However, I've used the debugger to observe that the send does indeed wait.
584 Delay::new(Duration::from_millis(500)).await;
585 assert_eq!(c.recv().await, 1);
586 assert!(executor
587 .spawn(async move {
588 loop {
589 c.recv().await;
590 }
591 })
592 .is_ok());
593 send_task_1.unwrap().await;
594 send_task_2.unwrap().await;
595 }
596}
diff --git a/embassy-util/src/channel/pubsub/mod.rs b/embassy-util/src/channel/pubsub/mod.rs
new file mode 100644
index 000000000..ecc8fbd8f
--- /dev/null
+++ b/embassy-util/src/channel/pubsub/mod.rs
@@ -0,0 +1,542 @@
1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2
3#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
24///
25/// Any published message can be read by all subscribers.
26/// A publisher can choose how it sends its message.
27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy_util::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy_util::channel::pubsub::WaitResult;
38/// # use embassy_util::channel::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79 PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81 /// Create a new channel
82 pub const fn new() -> Self {
83 Self {
84 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85 }
86 }
87
88 /// Create a new subscriber. It will only receive messages that are published after its creation.
89 ///
90 /// If there are no subscriber slots left, an error will be returned.
91 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut();
94
95 if s.subscriber_count >= SUBS {
96 Err(Error::MaximumSubscribersReached)
97 } else {
98 s.subscriber_count += 1;
99 Ok(Subscriber(Sub::new(s.next_message_id, self)))
100 }
101 })
102 }
103
104 /// Create a new subscriber. It will only receive messages that are published after its creation.
105 ///
106 /// If there are no subscriber slots left, an error will be returned.
107 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108 self.inner.lock(|inner| {
109 let mut s = inner.borrow_mut();
110
111 if s.subscriber_count >= SUBS {
112 Err(Error::MaximumSubscribersReached)
113 } else {
114 s.subscriber_count += 1;
115 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116 }
117 })
118 }
119
120 /// Create a new publisher
121 ///
122 /// If there are no publisher slots left, an error will be returned.
123 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut();
126
127 if s.publisher_count >= PUBS {
128 Err(Error::MaximumPublishersReached)
129 } else {
130 s.publisher_count += 1;
131 Ok(Publisher(Pub::new(self)))
132 }
133 })
134 }
135
136 /// Create a new publisher
137 ///
138 /// If there are no publisher slots left, an error will be returned.
139 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140 self.inner.lock(|inner| {
141 let mut s = inner.borrow_mut();
142
143 if s.publisher_count >= PUBS {
144 Err(Error::MaximumPublishersReached)
145 } else {
146 s.publisher_count += 1;
147 Ok(DynPublisher(Pub::new(self)))
148 }
149 })
150 }
151
152 /// Create a new publisher that can only send immediate messages.
153 /// This kind of publisher does not take up a publisher slot.
154 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self))
156 }
157
158 /// Create a new publisher that can only send immediate messages.
159 /// This kind of publisher does not take up a publisher slot.
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self))
162 }
163}
164
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
169 self.inner.lock(|s| {
170 let mut s = s.borrow_mut();
171
172 // Check if we can read a message
173 match s.get_message(*next_message_id) {
174 // Yes, so we are done polling
175 Some(WaitResult::Message(message)) => {
176 *next_message_id += 1;
177 Poll::Ready(WaitResult::Message(message))
178 }
179 // No, so we need to reregister our waker and sleep again
180 None => {
181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker());
183 }
184 Poll::Pending
185 }
186 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
187 Some(WaitResult::Lagged(amount)) => {
188 *next_message_id += amount;
189 Poll::Ready(WaitResult::Lagged(amount))
190 }
191 }
192 })
193 }
194
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| {
197 let mut s = s.borrow_mut();
198 // Try to publish the message
199 match s.try_publish(message) {
200 // We did it, we are ready
201 Ok(()) => Ok(()),
202 // The queue is full, so we need to reregister our waker and go to sleep
203 Err(message) => {
204 if let Some(cx) = cx {
205 s.register_publisher_waker(cx.waker());
206 }
207 Err(message)
208 }
209 }
210 })
211 }
212
213 fn publish_immediate(&self, message: T) {
214 self.inner.lock(|s| {
215 let mut s = s.borrow_mut();
216 s.publish_immediate(message)
217 })
218 }
219
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| {
222 let mut s = s.borrow_mut();
223 s.unregister_subscriber(subscriber_next_message_id)
224 })
225 }
226
227 fn unregister_publisher(&self) {
228 self.inner.lock(|s| {
229 let mut s = s.borrow_mut();
230 s.unregister_publisher()
231 })
232 }
233}
234
235/// Internal state for the PubSub channel
236struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
237 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
238 queue: Deque<(T, usize), CAP>,
239 /// Every message has an id.
240 /// Don't worry, we won't run out.
241 /// If a million messages were published every second, then the ID's would run out in about 584942 years.
242 next_message_id: u64,
243 /// Collection of wakers for Subscribers that are waiting.
244 subscriber_wakers: MultiWakerRegistration<SUBS>,
245 /// Collection of wakers for Publishers that are waiting.
246 publisher_wakers: MultiWakerRegistration<PUBS>,
247 /// The amount of subscribers that are active
248 subscriber_count: usize,
249 /// The amount of publishers that are active
250 publisher_count: usize,
251}
252
253impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
254 /// Create a new internal channel state
255 const fn new() -> Self {
256 Self {
257 queue: Deque::new(),
258 next_message_id: 0,
259 subscriber_wakers: MultiWakerRegistration::new(),
260 publisher_wakers: MultiWakerRegistration::new(),
261 subscriber_count: 0,
262 publisher_count: 0,
263 }
264 }
265
266 fn try_publish(&mut self, message: T) -> Result<(), T> {
267 if self.subscriber_count == 0 {
268 // We don't need to publish anything because there is no one to receive it
269 return Ok(());
270 }
271
272 if self.queue.is_full() {
273 return Err(message);
274 }
275 // We just did a check for this
276 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
277
278 self.next_message_id += 1;
279
280 // Wake all of the subscribers
281 self.subscriber_wakers.wake();
282
283 Ok(())
284 }
285
286 fn publish_immediate(&mut self, message: T) {
287 // Make space in the queue if required
288 if self.queue.is_full() {
289 self.queue.pop_front();
290 }
291
292 // This will succeed because we made sure there is space
293 self.try_publish(message).ok().unwrap();
294 }
295
296 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
297 let start_id = self.next_message_id - self.queue.len() as u64;
298
299 if message_id < start_id {
300 return Some(WaitResult::Lagged(start_id - message_id));
301 }
302
303 let current_message_index = (message_id - start_id) as usize;
304
305 if current_message_index >= self.queue.len() {
306 return None;
307 }
308
309 // We've checked that the index is valid
310 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
311
312 // We're reading this item, so decrement the counter
313 queue_item.1 -= 1;
314 let message = queue_item.0.clone();
315
316 if current_message_index == 0 && queue_item.1 == 0 {
317 self.queue.pop_front();
318 self.publisher_wakers.wake();
319 }
320
321 Some(WaitResult::Message(message))
322 }
323
324 fn register_subscriber_waker(&mut self, waker: &Waker) {
325 match self.subscriber_wakers.register(waker) {
326 Ok(()) => {}
327 Err(_) => {
328 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
329 // We need to throw it away. It's a bit inefficient, but we can wake everything.
330 // Any future that is still active will simply reregister.
331 // This won't happen a lot, so it's ok.
332 self.subscriber_wakers.wake();
333 self.subscriber_wakers.register(waker).unwrap();
334 }
335 }
336 }
337
338 fn register_publisher_waker(&mut self, waker: &Waker) {
339 match self.publisher_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.publisher_wakers.wake();
347 self.publisher_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
353 self.subscriber_count -= 1;
354
355 // All messages that haven't been read yet by this subscriber must have their counter decremented
356 let start_id = self.next_message_id - self.queue.len() as u64;
357 if subscriber_next_message_id >= start_id {
358 let current_message_index = (subscriber_next_message_id - start_id) as usize;
359 self.queue
360 .iter_mut()
361 .skip(current_message_index)
362 .for_each(|(_, counter)| *counter -= 1);
363 }
364 }
365
366 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1;
368 }
369}
370
371/// Error type for the [PubSubChannel]
372#[derive(Debug, PartialEq, Eq, Clone)]
373#[cfg_attr(feature = "defmt", derive(defmt::Format))]
374pub enum Error {
375 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
376 /// the capacity of the channels must be increased.
377 MaximumSubscribersReached,
378 /// All publisher slots are used. To add another publisher, first another publisher must be dropped or
379 /// the capacity of the channels must be increased.
380 MaximumPublishersReached,
381}
382
383/// 'Middle level' behaviour of the pubsub channel.
384/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id.
387 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390
391 /// Try to publish a message to the queue.
392 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
394 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
395
396 /// Publish a message immediately
397 fn publish_immediate(&self, message: T);
398
399 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401
402 /// Let the channel know that a publisher has dropped
403 fn unregister_publisher(&self);
404}
405
406/// The result of the subscriber wait procedure
407#[derive(Debug, Clone, PartialEq, Eq)]
408#[cfg_attr(feature = "defmt", derive(defmt::Format))]
409pub enum WaitResult<T> {
410 /// The subscriber did not receive all messages and lagged by the given amount of messages.
411 /// (This is the amount of messages that were missed)
412 Lagged(u64),
413 /// A message was received
414 Message(T),
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::blocking_mutex::raw::NoopRawMutex;
421
422 #[futures_test::test]
423 async fn dyn_pub_sub_works() {
424 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
425
426 let mut sub0 = channel.dyn_subscriber().unwrap();
427 let mut sub1 = channel.dyn_subscriber().unwrap();
428 let pub0 = channel.dyn_publisher().unwrap();
429
430 pub0.publish(42).await;
431
432 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
433 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
434
435 assert_eq!(sub0.try_next_message(), None);
436 assert_eq!(sub1.try_next_message(), None);
437 }
438
439 #[futures_test::test]
440 async fn all_subscribers_receive() {
441 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
442
443 let mut sub0 = channel.subscriber().unwrap();
444 let mut sub1 = channel.subscriber().unwrap();
445 let pub0 = channel.publisher().unwrap();
446
447 pub0.publish(42).await;
448
449 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
450 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
451
452 assert_eq!(sub0.try_next_message(), None);
453 assert_eq!(sub1.try_next_message(), None);
454 }
455
456 #[futures_test::test]
457 async fn lag_when_queue_full_on_immediate_publish() {
458 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
459
460 let mut sub0 = channel.subscriber().unwrap();
461 let pub0 = channel.publisher().unwrap();
462
463 pub0.publish_immediate(42);
464 pub0.publish_immediate(43);
465 pub0.publish_immediate(44);
466 pub0.publish_immediate(45);
467 pub0.publish_immediate(46);
468 pub0.publish_immediate(47);
469
470 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
471 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
472 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
473 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
474 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
475 assert_eq!(sub0.try_next_message(), None);
476 }
477
478 #[test]
479 fn limited_subs_and_pubs() {
480 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
481
482 let sub0 = channel.subscriber();
483 let sub1 = channel.subscriber();
484 let sub2 = channel.subscriber();
485 let sub3 = channel.subscriber();
486 let sub4 = channel.subscriber();
487
488 assert!(sub0.is_ok());
489 assert!(sub1.is_ok());
490 assert!(sub2.is_ok());
491 assert!(sub3.is_ok());
492 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
493
494 drop(sub0);
495
496 let sub5 = channel.subscriber();
497 assert!(sub5.is_ok());
498
499 // publishers
500
501 let pub0 = channel.publisher();
502 let pub1 = channel.publisher();
503 let pub2 = channel.publisher();
504 let pub3 = channel.publisher();
505 let pub4 = channel.publisher();
506
507 assert!(pub0.is_ok());
508 assert!(pub1.is_ok());
509 assert!(pub2.is_ok());
510 assert!(pub3.is_ok());
511 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
512
513 drop(pub0);
514
515 let pub5 = channel.publisher();
516 assert!(pub5.is_ok());
517 }
518
519 #[test]
520 fn publisher_wait_on_full_queue() {
521 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
522
523 let pub0 = channel.publisher().unwrap();
524
525 // There are no subscribers, so the queue will never be full
526 assert_eq!(pub0.try_publish(0), Ok(()));
527 assert_eq!(pub0.try_publish(0), Ok(()));
528 assert_eq!(pub0.try_publish(0), Ok(()));
529 assert_eq!(pub0.try_publish(0), Ok(()));
530 assert_eq!(pub0.try_publish(0), Ok(()));
531
532 let sub0 = channel.subscriber().unwrap();
533
534 assert_eq!(pub0.try_publish(0), Ok(()));
535 assert_eq!(pub0.try_publish(0), Ok(()));
536 assert_eq!(pub0.try_publish(0), Ok(()));
537 assert_eq!(pub0.try_publish(0), Ok(()));
538 assert_eq!(pub0.try_publish(0), Err(0));
539
540 drop(sub0);
541 }
542}
diff --git a/embassy-util/src/channel/pubsub/publisher.rs b/embassy-util/src/channel/pubsub/publisher.rs
new file mode 100644
index 000000000..705797f60
--- /dev/null
+++ b/embassy-util/src/channel/pubsub/publisher.rs
@@ -0,0 +1,182 @@
1//! Implementation of anything directly publisher related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A publisher to a channel
13pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The channel we are a publisher for
15 channel: &'a PSB,
16 _phantom: PhantomData<T>,
17}
18
19impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
20 pub(super) fn new(channel: &'a PSB) -> Self {
21 Self {
22 channel,
23 _phantom: Default::default(),
24 }
25 }
26
27 /// Publish a message right now even when the queue is full.
28 /// This may cause a subscriber to miss an older message.
29 pub fn publish_immediate(&self, message: T) {
30 self.channel.publish_immediate(message)
31 }
32
33 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
34 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
35 PublisherWaitFuture {
36 message: Some(message),
37 publisher: self,
38 }
39 }
40
41 /// Publish a message if there is space in the message queue
42 pub fn try_publish(&self, message: T) -> Result<(), T> {
43 self.channel.publish_with_context(message, None)
44 }
45}
46
47impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
48 fn drop(&mut self) {
49 self.channel.unregister_publisher()
50 }
51}
52
53/// A publisher that holds a dynamic reference to the channel
54pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
55
56impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
57 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
58
59 fn deref(&self) -> &Self::Target {
60 &self.0
61 }
62}
63
64impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
65 fn deref_mut(&mut self) -> &mut Self::Target {
66 &mut self.0
67 }
68}
69
70/// A publisher that holds a generic reference to the channel
71pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
72 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
73);
74
75impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
76 for Publisher<'a, M, T, CAP, SUBS, PUBS>
77{
78 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
79
80 fn deref(&self) -> &Self::Target {
81 &self.0
82 }
83}
84
85impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
86 for Publisher<'a, M, T, CAP, SUBS, PUBS>
87{
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.0
90 }
91}
92
93/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
94/// (So an infinite amount is possible)
95pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
96 /// The channel we are a publisher for
97 channel: &'a PSB,
98 _phantom: PhantomData<T>,
99}
100
101impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
102 pub(super) fn new(channel: &'a PSB) -> Self {
103 Self {
104 channel,
105 _phantom: Default::default(),
106 }
107 }
108 /// Publish the message right now even when the queue is full.
109 /// This may cause a subscriber to miss an older message.
110 pub fn publish_immediate(&self, message: T) {
111 self.channel.publish_immediate(message)
112 }
113
114 /// Publish a message if there is space in the message queue
115 pub fn try_publish(&self, message: T) -> Result<(), T> {
116 self.channel.publish_with_context(message, None)
117 }
118}
119
120/// An immediate publisher that holds a dynamic reference to the channel
121pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
122
123impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
124 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
125
126 fn deref(&self) -> &Self::Target {
127 &self.0
128 }
129}
130
131impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// An immediate publisher that holds a generic reference to the channel
138pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
139 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
140);
141
142impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
143 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
144{
145 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
146
147 fn deref(&self) -> &Self::Target {
148 &self.0
149 }
150}
151
152impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
153 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
154{
155 fn deref_mut(&mut self) -> &mut Self::Target {
156 &mut self.0
157 }
158}
159
160/// Future for the publisher wait action
161pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
162 /// The message we need to publish
163 message: Option<T>,
164 publisher: &'s Pub<'a, PSB, T>,
165}
166
167impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
168 type Output = ();
169
170 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let message = self.message.take().unwrap();
172 match self.publisher.channel.publish_with_context(message, Some(cx)) {
173 Ok(()) => Poll::Ready(()),
174 Err(message) => {
175 self.message = Some(message);
176 Poll::Pending
177 }
178 }
179 }
180}
181
182impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy-util/src/channel/pubsub/subscriber.rs b/embassy-util/src/channel/pubsub/subscriber.rs
new file mode 100644
index 000000000..b9a2cbe18
--- /dev/null
+++ b/embassy-util/src/channel/pubsub/subscriber.rs
@@ -0,0 +1,152 @@
1//! Implementation of anything directly subscriber related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A subscriber to a channel
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The message id of the next message we are yet to receive
15 next_message_id: u64,
16 /// The channel we are a subscriber to
17 channel: &'a PSB,
18 _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23 Self {
24 next_message_id,
25 channel,
26 _phantom: Default::default(),
27 }
28 }
29
30 /// Wait for a published message
31 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32 SubscriberWaitFuture { subscriber: self }
33 }
34
35 /// Wait for a published message (ignoring lag results)
36 pub async fn next_message_pure(&mut self) -> T {
37 loop {
38 match self.next_message().await {
39 WaitResult::Lagged(_) => continue,
40 WaitResult::Message(message) => break message,
41 }
42 }
43 }
44
45 /// Try to see if there's a published message we haven't received yet.
46 ///
47 /// This function does not peek. The message is received if there is one.
48 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50 Poll::Ready(result) => Some(result),
51 Poll::Pending => None,
52 }
53 }
54
55 /// Try to see if there's a published message we haven't received yet (ignoring lag results).
56 ///
57 /// This function does not peek. The message is received if there is one.
58 pub fn try_next_message_pure(&mut self) -> Option<T> {
59 loop {
60 match self.try_next_message() {
61 Some(WaitResult::Lagged(_)) => continue,
62 Some(WaitResult::Message(message)) => break Some(message),
63 None => break None,
64 }
65 }
66 }
67}
68
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
70 fn drop(&mut self) {
71 self.channel.unregister_subscriber(self.next_message_id)
72 }
73}
74
75impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
76
77/// Warning: The stream implementation ignores lag results and returns all messages.
78/// This might miss some messages without you knowing it.
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
80 type Item = T;
81
82 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 match self
84 .channel
85 .get_message_with_context(&mut self.next_message_id, Some(cx))
86 {
87 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
88 Poll::Ready(WaitResult::Lagged(_)) => {
89 cx.waker().wake_by_ref();
90 Poll::Pending
91 }
92 Poll::Pending => Poll::Pending,
93 }
94 }
95}
96
97/// A subscriber that holds a dynamic reference to the channel
98pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
99
100impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
101 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.0
111 }
112}
113
114/// A subscriber that holds a generic reference to the channel
115pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
116 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
117);
118
119impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
120 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
121{
122 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
123
124 fn deref(&self) -> &Self::Target {
125 &self.0
126 }
127}
128
129impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
130 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
131{
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// Future for the subscriber wait action
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>,
140}
141
142impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
143 type Output = WaitResult<T>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 self.subscriber
147 .channel
148 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
149 }
150}
151
152impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy-util/src/channel/signal.rs b/embassy-util/src/channel/signal.rs
new file mode 100644
index 000000000..a58469c4f
--- /dev/null
+++ b/embassy-util/src/channel/signal.rs
@@ -0,0 +1,99 @@
1//! A synchronization primitive for passing the latest value to a task.
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::mem;
5use core::task::{Context, Poll, Waker};
6
7/// Single-slot signaling primitive.
8///
9/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except
10/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead
11/// of waiting for the receiver to pop the previous value.
12///
13/// It is useful for sending data between tasks when the receiver only cares about
14/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state"
15/// updates.
16///
17/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead.
18///
19/// Signals are generally declared as `static`s and then borrowed as required.
20///
21/// ```
22/// use embassy_util::channel::signal::Signal;
23///
24/// enum SomeCommand {
25/// On,
26/// Off,
27/// }
28///
29/// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new();
30/// ```
31pub struct Signal<T> {
32 state: UnsafeCell<State<T>>,
33}
34
35enum State<T> {
36 None,
37 Waiting(Waker),
38 Signaled(T),
39}
40
41unsafe impl<T: Send> Send for Signal<T> {}
42unsafe impl<T: Send> Sync for Signal<T> {}
43
44impl<T> Signal<T> {
45 /// Create a new `Signal`.
46 pub const fn new() -> Self {
47 Self {
48 state: UnsafeCell::new(State::None),
49 }
50 }
51}
52
53impl<T: Send> Signal<T> {
54 /// Mark this Signal as signaled.
55 pub fn signal(&self, val: T) {
56 critical_section::with(|_| unsafe {
57 let state = &mut *self.state.get();
58 if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) {
59 waker.wake();
60 }
61 })
62 }
63
64 /// Remove the queued value in this `Signal`, if any.
65 pub fn reset(&self) {
66 critical_section::with(|_| unsafe {
67 let state = &mut *self.state.get();
68 *state = State::None
69 })
70 }
71
72 fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
73 critical_section::with(|_| unsafe {
74 let state = &mut *self.state.get();
75 match state {
76 State::None => {
77 *state = State::Waiting(cx.waker().clone());
78 Poll::Pending
79 }
80 State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending,
81 State::Waiting(_) => panic!("waker overflow"),
82 State::Signaled(_) => match mem::replace(state, State::None) {
83 State::Signaled(res) => Poll::Ready(res),
84 _ => unreachable!(),
85 },
86 }
87 })
88 }
89
90 /// Future that completes when this Signal has been signaled.
91 pub fn wait(&self) -> impl Future<Output = T> + '_ {
92 futures_util::future::poll_fn(move |cx| self.poll_wait(cx))
93 }
94
95 /// non-blocking method to check whether this signal has been signaled.
96 pub fn signaled(&self) -> bool {
97 critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_)))
98 }
99}
diff --git a/embassy-util/src/fmt.rs b/embassy-util/src/fmt.rs
new file mode 100644
index 000000000..f8bb0a035
--- /dev/null
+++ b/embassy-util/src/fmt.rs
@@ -0,0 +1,228 @@
1#![macro_use]
2#![allow(unused_macros)]
3
4#[cfg(all(feature = "defmt", feature = "log"))]
5compile_error!("You may not enable both `defmt` and `log` features.");
6
7macro_rules! assert {
8 ($($x:tt)*) => {
9 {
10 #[cfg(not(feature = "defmt"))]
11 ::core::assert!($($x)*);
12 #[cfg(feature = "defmt")]
13 ::defmt::assert!($($x)*);
14 }
15 };
16}
17
18macro_rules! assert_eq {
19 ($($x:tt)*) => {
20 {
21 #[cfg(not(feature = "defmt"))]
22 ::core::assert_eq!($($x)*);
23 #[cfg(feature = "defmt")]
24 ::defmt::assert_eq!($($x)*);
25 }
26 };
27}
28
29macro_rules! assert_ne {
30 ($($x:tt)*) => {
31 {
32 #[cfg(not(feature = "defmt"))]
33 ::core::assert_ne!($($x)*);
34 #[cfg(feature = "defmt")]
35 ::defmt::assert_ne!($($x)*);
36 }
37 };
38}
39
40macro_rules! debug_assert {
41 ($($x:tt)*) => {
42 {
43 #[cfg(not(feature = "defmt"))]
44 ::core::debug_assert!($($x)*);
45 #[cfg(feature = "defmt")]
46 ::defmt::debug_assert!($($x)*);
47 }
48 };
49}
50
51macro_rules! debug_assert_eq {
52 ($($x:tt)*) => {
53 {
54 #[cfg(not(feature = "defmt"))]
55 ::core::debug_assert_eq!($($x)*);
56 #[cfg(feature = "defmt")]
57 ::defmt::debug_assert_eq!($($x)*);
58 }
59 };
60}
61
62macro_rules! debug_assert_ne {
63 ($($x:tt)*) => {
64 {
65 #[cfg(not(feature = "defmt"))]
66 ::core::debug_assert_ne!($($x)*);
67 #[cfg(feature = "defmt")]
68 ::defmt::debug_assert_ne!($($x)*);
69 }
70 };
71}
72
73macro_rules! todo {
74 ($($x:tt)*) => {
75 {
76 #[cfg(not(feature = "defmt"))]
77 ::core::todo!($($x)*);
78 #[cfg(feature = "defmt")]
79 ::defmt::todo!($($x)*);
80 }
81 };
82}
83
84macro_rules! unreachable {
85 ($($x:tt)*) => {
86 {
87 #[cfg(not(feature = "defmt"))]
88 ::core::unreachable!($($x)*);
89 #[cfg(feature = "defmt")]
90 ::defmt::unreachable!($($x)*);
91 }
92 };
93}
94
95macro_rules! panic {
96 ($($x:tt)*) => {
97 {
98 #[cfg(not(feature = "defmt"))]
99 ::core::panic!($($x)*);
100 #[cfg(feature = "defmt")]
101 ::defmt::panic!($($x)*);
102 }
103 };
104}
105
106macro_rules! trace {
107 ($s:literal $(, $x:expr)* $(,)?) => {
108 {
109 #[cfg(feature = "log")]
110 ::log::trace!($s $(, $x)*);
111 #[cfg(feature = "defmt")]
112 ::defmt::trace!($s $(, $x)*);
113 #[cfg(not(any(feature = "log", feature="defmt")))]
114 let _ = ($( & $x ),*);
115 }
116 };
117}
118
119macro_rules! debug {
120 ($s:literal $(, $x:expr)* $(,)?) => {
121 {
122 #[cfg(feature = "log")]
123 ::log::debug!($s $(, $x)*);
124 #[cfg(feature = "defmt")]
125 ::defmt::debug!($s $(, $x)*);
126 #[cfg(not(any(feature = "log", feature="defmt")))]
127 let _ = ($( & $x ),*);
128 }
129 };
130}
131
132macro_rules! info {
133 ($s:literal $(, $x:expr)* $(,)?) => {
134 {
135 #[cfg(feature = "log")]
136 ::log::info!($s $(, $x)*);
137 #[cfg(feature = "defmt")]
138 ::defmt::info!($s $(, $x)*);
139 #[cfg(not(any(feature = "log", feature="defmt")))]
140 let _ = ($( & $x ),*);
141 }
142 };
143}
144
145macro_rules! warn {
146 ($s:literal $(, $x:expr)* $(,)?) => {
147 {
148 #[cfg(feature = "log")]
149 ::log::warn!($s $(, $x)*);
150 #[cfg(feature = "defmt")]
151 ::defmt::warn!($s $(, $x)*);
152 #[cfg(not(any(feature = "log", feature="defmt")))]
153 let _ = ($( & $x ),*);
154 }
155 };
156}
157
158macro_rules! error {
159 ($s:literal $(, $x:expr)* $(,)?) => {
160 {
161 #[cfg(feature = "log")]
162 ::log::error!($s $(, $x)*);
163 #[cfg(feature = "defmt")]
164 ::defmt::error!($s $(, $x)*);
165 #[cfg(not(any(feature = "log", feature="defmt")))]
166 let _ = ($( & $x ),*);
167 }
168 };
169}
170
171#[cfg(feature = "defmt")]
172macro_rules! unwrap {
173 ($($x:tt)*) => {
174 ::defmt::unwrap!($($x)*)
175 };
176}
177
178#[cfg(not(feature = "defmt"))]
179macro_rules! unwrap {
180 ($arg:expr) => {
181 match $crate::fmt::Try::into_result($arg) {
182 ::core::result::Result::Ok(t) => t,
183 ::core::result::Result::Err(e) => {
184 ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
185 }
186 }
187 };
188 ($arg:expr, $($msg:expr),+ $(,)? ) => {
189 match $crate::fmt::Try::into_result($arg) {
190 ::core::result::Result::Ok(t) => t,
191 ::core::result::Result::Err(e) => {
192 ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
193 }
194 }
195 }
196}
197
198#[cfg(feature = "defmt-timestamp-uptime")]
199defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
200
201#[derive(Debug, Copy, Clone, Eq, PartialEq)]
202pub struct NoneError;
203
204pub trait Try {
205 type Ok;
206 type Error;
207 fn into_result(self) -> Result<Self::Ok, Self::Error>;
208}
209
210impl<T> Try for Option<T> {
211 type Ok = T;
212 type Error = NoneError;
213
214 #[inline]
215 fn into_result(self) -> Result<T, NoneError> {
216 self.ok_or(NoneError)
217 }
218}
219
220impl<T, E> Try for Result<T, E> {
221 type Ok = T;
222 type Error = E;
223
224 #[inline]
225 fn into_result(self) -> Self {
226 self
227 }
228}
diff --git a/embassy-util/src/forever.rs b/embassy-util/src/forever.rs
new file mode 100644
index 000000000..4f3698211
--- /dev/null
+++ b/embassy-util/src/forever.rs
@@ -0,0 +1,95 @@
1use core::cell::UnsafeCell;
2use core::mem::MaybeUninit;
3
4use atomic_polyfill::{AtomicBool, Ordering};
5
6/// Type with static lifetime that may be written to once at runtime.
7///
8/// This may be used to initialize static objects at runtime, typically in the init routine.
9/// This is useful for objects such as Embassy's RTC, which cannot be initialized in a const
10/// context.
11///
12/// Note: IF a global mutable variable is desired, use a CriticalSectionMutex or ThreadModeMutex instead.
13///
14/// ```
15/// use embassy_util::Forever;
16/// // Using an integer for the sake of keeping this example self-contained,
17/// // see https://github.com/embassy-rs/embassy/wiki/Getting-Started for a more "proper" example.
18/// static SOME_INT: Forever<u32> =Forever::new();
19///
20/// // put returns a mutable pointer to the object stored in the forever, which may then be passed
21/// // around.
22/// let mut x = SOME_INT.put(42);
23/// assert_eq!(*x, 42);
24/// ```
25pub struct Forever<T> {
26 used: AtomicBool,
27 t: UnsafeCell<MaybeUninit<T>>,
28}
29
30unsafe impl<T> Send for Forever<T> {}
31unsafe impl<T> Sync for Forever<T> {}
32
33impl<T> Forever<T> {
34 /// Create a new `Forever`.
35 #[inline(always)]
36 pub const fn new() -> Self {
37 Self {
38 used: AtomicBool::new(false),
39 t: UnsafeCell::new(MaybeUninit::uninit()),
40 }
41 }
42
43 /// Store a value in this `Forever`, returning a mutable reference to it.
44 ///
45 /// Using this method, the compiler usually constructs `val` in the stack and then moves
46 /// it into the `Forever`. If `T` is big, this is likely to cause stack overflows.
47 /// Considering using [`Signal::put_with`] instead, which will construct it in-place inside the `Forever`.
48 ///
49 /// # Panics
50 ///
51 /// Panics if this `Forever` already has a value stored in it.
52 #[inline(always)]
53 #[allow(clippy::mut_from_ref)]
54 pub fn put(&'static self, val: T) -> &'static mut T {
55 self.put_with(|| val)
56 }
57
58 /// Store the closure return value in this `Forever`, returning a mutable reference to it.
59 ///
60 /// The advantage over [`Forever::put`] is that this method allows the closure to construct
61 /// the `T` value in-place directly inside the `Forever`, saving stack space.
62 ///
63 /// # Panics
64 ///
65 /// Panics if this `Forever` already has a value stored in it.
66 #[inline(always)]
67 #[allow(clippy::mut_from_ref)]
68 pub fn put_with(&'static self, val: impl FnOnce() -> T) -> &'static mut T {
69 if self
70 .used
71 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
72 .is_err()
73 {
74 panic!("Forever.put() called multiple times");
75 }
76
77 let p: &'static mut MaybeUninit<T> = unsafe { &mut *self.t.get() };
78 p.write(val())
79 }
80
81 /// Unsafely get a mutable reference to the contents of this Forever.
82 ///
83 /// # Safety
84 ///
85 /// This is undefined behavior if:
86 ///
87 /// - The `Forever` has not been initialized yet (with `put' or `put_with`), or
88 /// - A reference to the contents (mutable or not) already exists.
89 #[inline(always)]
90 #[allow(clippy::mut_from_ref)]
91 pub unsafe fn steal(&self) -> &mut T {
92 let p: &mut MaybeUninit<T> = &mut *self.t.get();
93 p.assume_init_mut()
94 }
95}
diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs
new file mode 100644
index 000000000..07b1633ea
--- /dev/null
+++ b/embassy-util/src/lib.rs
@@ -0,0 +1,22 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
3#![cfg_attr(all(feature = "nightly", target_arch = "xtensa"), feature(asm_experimental_arch))]
4#![allow(clippy::new_without_default)]
5#![doc = include_str!("../../README.md")]
6#![warn(missing_docs)]
7
8// This mod MUST go first, so that the others see its macros.
9pub(crate) mod fmt;
10
11pub mod blocking_mutex;
12pub mod channel;
13pub mod mutex;
14pub mod waitqueue;
15
16mod forever;
17mod select;
18mod yield_now;
19
20pub use forever::*;
21pub use select::*;
22pub use yield_now::*;
diff --git a/embassy-util/src/mutex.rs b/embassy-util/src/mutex.rs
new file mode 100644
index 000000000..75a6e8dd3
--- /dev/null
+++ b/embassy-util/src/mutex.rs
@@ -0,0 +1,167 @@
1//! Async mutex.
2//!
3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell};
5use core::ops::{Deref, DerefMut};
6use core::task::Poll;
7
8use futures_util::future::poll_fn;
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex as BlockingMutex;
12use crate::waitqueue::WakerRegistration;
13
14/// Error returned by [`Mutex::try_lock`]
15#[derive(PartialEq, Eq, Clone, Copy, Debug)]
16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError;
18
19struct State {
20 locked: bool,
21 waker: WakerRegistration,
22}
23
24/// Async mutex.
25///
26/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex).
27/// The raw mutex is used to guard access to the internal "is locked" flag. It
28/// is held for very short periods only, while locking and unlocking. It is *not* held
29/// for the entire time the async Mutex is locked.
30///
31/// Which implementation you select depends on the context in which you're using the mutex.
32///
33/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts.
34///
35/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor.
36///
37/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton.
38///
39pub struct Mutex<M, T>
40where
41 M: RawMutex,
42 T: ?Sized,
43{
44 state: BlockingMutex<M, RefCell<State>>,
45 inner: UnsafeCell<T>,
46}
47
48unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for Mutex<M, T> {}
49unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<M, T> {}
50
51/// Async mutex.
52impl<M, T> Mutex<M, T>
53where
54 M: RawMutex,
55{
56 /// Create a new mutex with the given value.
57 pub const fn new(value: T) -> Self {
58 Self {
59 inner: UnsafeCell::new(value),
60 state: BlockingMutex::new(RefCell::new(State {
61 locked: false,
62 waker: WakerRegistration::new(),
63 })),
64 }
65 }
66}
67
68impl<M, T> Mutex<M, T>
69where
70 M: RawMutex,
71 T: ?Sized,
72{
73 /// Lock the mutex.
74 ///
75 /// This will wait for the mutex to be unlocked if it's already locked.
76 pub async fn lock(&self) -> MutexGuard<'_, M, T> {
77 poll_fn(|cx| {
78 let ready = self.state.lock(|s| {
79 let mut s = s.borrow_mut();
80 if s.locked {
81 s.waker.register(cx.waker());
82 false
83 } else {
84 s.locked = true;
85 true
86 }
87 });
88
89 if ready {
90 Poll::Ready(MutexGuard { mutex: self })
91 } else {
92 Poll::Pending
93 }
94 })
95 .await
96 }
97
98 /// Attempt to immediately lock the mutex.
99 ///
100 /// If the mutex is already locked, this will return an error instead of waiting.
101 pub fn try_lock(&self) -> Result<MutexGuard<'_, M, T>, TryLockError> {
102 self.state.lock(|s| {
103 let mut s = s.borrow_mut();
104 if s.locked {
105 Err(TryLockError)
106 } else {
107 s.locked = true;
108 Ok(())
109 }
110 })?;
111
112 Ok(MutexGuard { mutex: self })
113 }
114}
115
116/// Async mutex guard.
117///
118/// Owning an instance of this type indicates having
119/// successfully locked the mutex, and grants access to the contents.
120///
121/// Dropping it unlocks the mutex.
122pub struct MutexGuard<'a, M, T>
123where
124 M: RawMutex,
125 T: ?Sized,
126{
127 mutex: &'a Mutex<M, T>,
128}
129
130impl<'a, M, T> Drop for MutexGuard<'a, M, T>
131where
132 M: RawMutex,
133 T: ?Sized,
134{
135 fn drop(&mut self) {
136 self.mutex.state.lock(|s| {
137 let mut s = s.borrow_mut();
138 s.locked = false;
139 s.waker.wake();
140 })
141 }
142}
143
144impl<'a, M, T> Deref for MutexGuard<'a, M, T>
145where
146 M: RawMutex,
147 T: ?Sized,
148{
149 type Target = T;
150 fn deref(&self) -> &Self::Target {
151 // Safety: the MutexGuard represents exclusive access to the contents
152 // of the mutex, so it's OK to get it.
153 unsafe { &*(self.mutex.inner.get() as *const T) }
154 }
155}
156
157impl<'a, M, T> DerefMut for MutexGuard<'a, M, T>
158where
159 M: RawMutex,
160 T: ?Sized,
161{
162 fn deref_mut(&mut self) -> &mut Self::Target {
163 // Safety: the MutexGuard represents exclusive access to the contents
164 // of the mutex, so it's OK to get it.
165 unsafe { &mut *(self.mutex.inner.get()) }
166 }
167}
diff --git a/embassy-util/src/select.rs b/embassy-util/src/select.rs
new file mode 100644
index 000000000..8cecb7fa0
--- /dev/null
+++ b/embassy-util/src/select.rs
@@ -0,0 +1,230 @@
1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5/// Result for [`select`].
6#[derive(Debug, Clone)]
7pub enum Either<A, B> {
8 /// First future finished first.
9 First(A),
10 /// Second future finished first.
11 Second(B),
12}
13
14/// Wait for one of two futures to complete.
15///
16/// This function returns a new future which polls all the futures.
17/// When one of them completes, it will complete with its result value.
18///
19/// The other future is dropped.
20pub fn select<A, B>(a: A, b: B) -> Select<A, B>
21where
22 A: Future,
23 B: Future,
24{
25 Select { a, b }
26}
27
28/// Future for the [`select`] function.
29#[derive(Debug)]
30#[must_use = "futures do nothing unless you `.await` or poll them"]
31pub struct Select<A, B> {
32 a: A,
33 b: B,
34}
35
36impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
37
38impl<A, B> Future for Select<A, B>
39where
40 A: Future,
41 B: Future,
42{
43 type Output = Either<A::Output, B::Output>;
44
45 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46 let this = unsafe { self.get_unchecked_mut() };
47 let a = unsafe { Pin::new_unchecked(&mut this.a) };
48 let b = unsafe { Pin::new_unchecked(&mut this.b) };
49 if let Poll::Ready(x) = a.poll(cx) {
50 return Poll::Ready(Either::First(x));
51 }
52 if let Poll::Ready(x) = b.poll(cx) {
53 return Poll::Ready(Either::Second(x));
54 }
55 Poll::Pending
56 }
57}
58
59// ====================================================================
60
61/// Result for [`select3`].
62#[derive(Debug, Clone)]
63pub enum Either3<A, B, C> {
64 /// First future finished first.
65 First(A),
66 /// Second future finished first.
67 Second(B),
68 /// Third future finished first.
69 Third(C),
70}
71
72/// Same as [`select`], but with more futures.
73pub fn select3<A, B, C>(a: A, b: B, c: C) -> Select3<A, B, C>
74where
75 A: Future,
76 B: Future,
77 C: Future,
78{
79 Select3 { a, b, c }
80}
81
82/// Future for the [`select3`] function.
83#[derive(Debug)]
84#[must_use = "futures do nothing unless you `.await` or poll them"]
85pub struct Select3<A, B, C> {
86 a: A,
87 b: B,
88 c: C,
89}
90
91impl<A, B, C> Future for Select3<A, B, C>
92where
93 A: Future,
94 B: Future,
95 C: Future,
96{
97 type Output = Either3<A::Output, B::Output, C::Output>;
98
99 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100 let this = unsafe { self.get_unchecked_mut() };
101 let a = unsafe { Pin::new_unchecked(&mut this.a) };
102 let b = unsafe { Pin::new_unchecked(&mut this.b) };
103 let c = unsafe { Pin::new_unchecked(&mut this.c) };
104 if let Poll::Ready(x) = a.poll(cx) {
105 return Poll::Ready(Either3::First(x));
106 }
107 if let Poll::Ready(x) = b.poll(cx) {
108 return Poll::Ready(Either3::Second(x));
109 }
110 if let Poll::Ready(x) = c.poll(cx) {
111 return Poll::Ready(Either3::Third(x));
112 }
113 Poll::Pending
114 }
115}
116
117// ====================================================================
118
119/// Result for [`select4`].
120#[derive(Debug, Clone)]
121pub enum Either4<A, B, C, D> {
122 /// First future finished first.
123 First(A),
124 /// Second future finished first.
125 Second(B),
126 /// Third future finished first.
127 Third(C),
128 /// Fourth future finished first.
129 Fourth(D),
130}
131
132/// Same as [`select`], but with more futures.
133pub fn select4<A, B, C, D>(a: A, b: B, c: C, d: D) -> Select4<A, B, C, D>
134where
135 A: Future,
136 B: Future,
137 C: Future,
138 D: Future,
139{
140 Select4 { a, b, c, d }
141}
142
143/// Future for the [`select4`] function.
144#[derive(Debug)]
145#[must_use = "futures do nothing unless you `.await` or poll them"]
146pub struct Select4<A, B, C, D> {
147 a: A,
148 b: B,
149 c: C,
150 d: D,
151}
152
153impl<A, B, C, D> Future for Select4<A, B, C, D>
154where
155 A: Future,
156 B: Future,
157 C: Future,
158 D: Future,
159{
160 type Output = Either4<A::Output, B::Output, C::Output, D::Output>;
161
162 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
163 let this = unsafe { self.get_unchecked_mut() };
164 let a = unsafe { Pin::new_unchecked(&mut this.a) };
165 let b = unsafe { Pin::new_unchecked(&mut this.b) };
166 let c = unsafe { Pin::new_unchecked(&mut this.c) };
167 let d = unsafe { Pin::new_unchecked(&mut this.d) };
168 if let Poll::Ready(x) = a.poll(cx) {
169 return Poll::Ready(Either4::First(x));
170 }
171 if let Poll::Ready(x) = b.poll(cx) {
172 return Poll::Ready(Either4::Second(x));
173 }
174 if let Poll::Ready(x) = c.poll(cx) {
175 return Poll::Ready(Either4::Third(x));
176 }
177 if let Poll::Ready(x) = d.poll(cx) {
178 return Poll::Ready(Either4::Fourth(x));
179 }
180 Poll::Pending
181 }
182}
183
184// ====================================================================
185
186/// Future for the [`select_all`] function.
187#[derive(Debug)]
188#[must_use = "futures do nothing unless you `.await` or poll them"]
189pub struct SelectAll<Fut, const N: usize> {
190 inner: [Fut; N],
191}
192
193/// Creates a new future which will select over a list of futures.
194///
195/// The returned future will wait for any future within `iter` to be ready. Upon
196/// completion the item resolved will be returned, along with the index of the
197/// future that was ready.
198///
199/// # Panics
200///
201/// This function will panic if the array specified contains no items.
202pub fn select_all<Fut: Future, const N: usize>(arr: [Fut; N]) -> SelectAll<Fut, N> {
203 assert!(N > 0);
204 SelectAll { inner: arr }
205}
206
207impl<Fut: Future, const N: usize> Future for SelectAll<Fut, N> {
208 type Output = (Fut::Output, usize);
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 // Safety: Since `self` is pinned, `inner` cannot move. Since `inner` cannot move,
212 // its elements also cannot move. Therefore it is safe to access `inner` and pin
213 // references to the contained futures.
214 let item = unsafe {
215 self.get_unchecked_mut()
216 .inner
217 .iter_mut()
218 .enumerate()
219 .find_map(|(i, f)| match Pin::new_unchecked(f).poll(cx) {
220 Poll::Pending => None,
221 Poll::Ready(e) => Some((i, e)),
222 })
223 };
224
225 match item {
226 Some((idx, res)) => Poll::Ready((res, idx)),
227 None => Poll::Pending,
228 }
229 }
230}
diff --git a/embassy-util/src/waitqueue/mod.rs b/embassy-util/src/waitqueue/mod.rs
new file mode 100644
index 000000000..6661a6b61
--- /dev/null
+++ b/embassy-util/src/waitqueue/mod.rs
@@ -0,0 +1,7 @@
1//! Async low-level wait queues
2
3mod waker;
4pub use waker::*;
5
6mod multi_waker;
7pub use multi_waker::*;
diff --git a/embassy-util/src/waitqueue/multi_waker.rs b/embassy-util/src/waitqueue/multi_waker.rs
new file mode 100644
index 000000000..325d2cb3a
--- /dev/null
+++ b/embassy-util/src/waitqueue/multi_waker.rs
@@ -0,0 +1,33 @@
1use core::task::Waker;
2
3use super::WakerRegistration;
4
5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: [WakerRegistration; N],
8}
9
10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance
12 pub const fn new() -> Self {
13 const WAKER: WakerRegistration = WakerRegistration::new();
14 Self { wakers: [WAKER; N] }
15 }
16
17 /// Register a waker. If the buffer is full the function returns it in the error
18 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> {
19 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) {
20 waker_slot.register(w);
21 Ok(())
22 } else {
23 Err(w)
24 }
25 }
26
27 /// Wake all registered wakers. This clears the buffer
28 pub fn wake(&mut self) {
29 for waker_slot in self.wakers.iter_mut() {
30 waker_slot.wake()
31 }
32 }
33}
diff --git a/embassy-util/src/waitqueue/waker.rs b/embassy-util/src/waitqueue/waker.rs
new file mode 100644
index 000000000..64e300eb8
--- /dev/null
+++ b/embassy-util/src/waitqueue/waker.rs
@@ -0,0 +1,92 @@
1use core::cell::Cell;
2use core::mem;
3use core::task::Waker;
4
5use crate::blocking_mutex::raw::CriticalSectionRawMutex;
6use crate::blocking_mutex::Mutex;
7
8/// Utility struct to register and wake a waker.
9#[derive(Debug)]
10pub struct WakerRegistration {
11 waker: Option<Waker>,
12}
13
14impl WakerRegistration {
15 /// Create a new `WakerRegistration`.
16 pub const fn new() -> Self {
17 Self { waker: None }
18 }
19
20 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&mut self, w: &Waker) {
22 match self.waker {
23 // Optimization: If both the old and new Wakers wake the same task, we can simply
24 // keep the old waker, skipping the clone. (In most executor implementations,
25 // cloning a waker is somewhat expensive, comparable to cloning an Arc).
26 Some(ref w2) if (w2.will_wake(w)) => {}
27 _ => {
28 // clone the new waker and store it
29 if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) {
30 // We had a waker registered for another task. Wake it, so the other task can
31 // reregister itself if it's still interested.
32 //
33 // If two tasks are waiting on the same thing concurrently, this will cause them
34 // to wake each other in a loop fighting over this WakerRegistration. This wastes
35 // CPU but things will still work.
36 //
37 // If the user wants to have two tasks waiting on the same thing they should use
38 // a more appropriate primitive that can store multiple wakers.
39 old_waker.wake()
40 }
41 }
42 }
43 }
44
45 /// Wake the registered waker, if any.
46 pub fn wake(&mut self) {
47 if let Some(w) = self.waker.take() {
48 w.wake()
49 }
50 }
51
52 /// Returns true if a waker is currently registered
53 pub fn occupied(&self) -> bool {
54 self.waker.is_some()
55 }
56}
57
58/// Utility struct to register and wake a waker.
59pub struct AtomicWaker {
60 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
61}
62
63impl AtomicWaker {
64 /// Create a new `AtomicWaker`.
65 pub const fn new() -> Self {
66 Self {
67 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
68 }
69 }
70
71 /// Register a waker. Overwrites the previous waker, if any.
72 pub fn register(&self, w: &Waker) {
73 critical_section::with(|cs| {
74 let cell = self.waker.borrow(cs);
75 cell.set(match cell.replace(None) {
76 Some(w2) if (w2.will_wake(w)) => Some(w2),
77 _ => Some(w.clone()),
78 })
79 })
80 }
81
82 /// Wake the registered waker, if any.
83 pub fn wake(&self) {
84 critical_section::with(|cs| {
85 let cell = self.waker.borrow(cs);
86 if let Some(w) = cell.replace(None) {
87 w.wake_by_ref();
88 cell.set(Some(w));
89 }
90 })
91 }
92}
diff --git a/embassy-util/src/yield_now.rs b/embassy-util/src/yield_now.rs
new file mode 100644
index 000000000..1ebecb916
--- /dev/null
+++ b/embassy-util/src/yield_now.rs
@@ -0,0 +1,25 @@
1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5/// Yield from the current task once, allowing other tasks to run.
6pub fn yield_now() -> impl Future<Output = ()> {
7 YieldNowFuture { yielded: false }
8}
9
10struct YieldNowFuture {
11 yielded: bool,
12}
13
14impl Future for YieldNowFuture {
15 type Output = ();
16 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
17 if self.yielded {
18 Poll::Ready(())
19 } else {
20 self.yielded = true;
21 cx.waker().wake_by_ref();
22 Poll::Pending
23 }
24 }
25}