aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2022-08-22 20:18:40 +0000
committerGitHub <[email protected]>2022-08-22 20:18:40 +0000
commitcb9f0ef5b800ce4a22cde1805e0eb88425f1e07b (patch)
tree0c7425dae57acb94cb6ddca27def7e77609369b3 /embassy-sync
parent61356181b223e95f289ca3af3a038a699cde2112 (diff)
parent5677b13a86beca58aa57ecfd7cea0db7ceb189fa (diff)
Merge #922
922: split `embassy-util` into `embassy-futures`, `embassy-sync`. r=Dirbaio a=Dirbaio Co-authored-by: Dario Nieuwenhuis <[email protected]>
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/Cargo.toml34
-rw-r--r--embassy-sync/build.rs29
-rw-r--r--embassy-sync/src/blocking_mutex/mod.rs189
-rw-r--r--embassy-sync/src/blocking_mutex/raw.rs149
-rw-r--r--embassy-sync/src/channel.rs596
-rw-r--r--embassy-sync/src/fmt.rs228
-rw-r--r--embassy-sync/src/lib.rs19
-rw-r--r--embassy-sync/src/mutex.rs167
-rw-r--r--embassy-sync/src/pipe.rs551
-rw-r--r--embassy-sync/src/pubsub/mod.rs542
-rw-r--r--embassy-sync/src/pubsub/publisher.rs182
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs152
-rw-r--r--embassy-sync/src/ring_buffer.rs146
-rw-r--r--embassy-sync/src/signal.rs100
-rw-r--r--embassy-sync/src/waitqueue/mod.rs7
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs33
-rw-r--r--embassy-sync/src/waitqueue/waker.rs92
17 files changed, 3216 insertions, 0 deletions
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
new file mode 100644
index 000000000..0d14bba55
--- /dev/null
+++ b/embassy-sync/Cargo.toml
@@ -0,0 +1,34 @@
1[package]
2name = "embassy-sync"
3version = "0.1.0"
4edition = "2021"
5
6[package.metadata.embassy_docs]
7src_base = "https://github.com/embassy-rs/embassy/blob/embassy-sync-v$VERSION/embassy-sync/src/"
8src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/src/"
9features = ["nightly"]
10target = "thumbv7em-none-eabi"
11
12[features]
13nightly = ["embedded-io/async"]
14
15[dependencies]
16defmt = { version = "0.3", optional = true }
17log = { version = "0.4.14", optional = true }
18
19futures-util = { version = "0.3.17", default-features = false }
20atomic-polyfill = "1.0.1"
21critical-section = "1.1"
22heapless = "0.7.5"
23cfg-if = "1.0.0"
24embedded-io = "0.3.0"
25
26[dev-dependencies]
27futures-executor = { version = "0.3.17", features = [ "thread-pool" ] }
28futures-test = "0.3.17"
29futures-timer = "3.0.2"
30futures-util = { version = "0.3.17", features = [ "channel" ] }
31
32# Enable critical-section implementation for std, for tests
33critical-section = { version = "1.1", features = ["std"] }
34static_cell = "1.0"
diff --git a/embassy-sync/build.rs b/embassy-sync/build.rs
new file mode 100644
index 000000000..6fe82b44f
--- /dev/null
+++ b/embassy-sync/build.rs
@@ -0,0 +1,29 @@
1use std::env;
2
3fn main() {
4 let target = env::var("TARGET").unwrap();
5
6 if target.starts_with("thumbv6m-") {
7 println!("cargo:rustc-cfg=cortex_m");
8 println!("cargo:rustc-cfg=armv6m");
9 } else if target.starts_with("thumbv7m-") {
10 println!("cargo:rustc-cfg=cortex_m");
11 println!("cargo:rustc-cfg=armv7m");
12 } else if target.starts_with("thumbv7em-") {
13 println!("cargo:rustc-cfg=cortex_m");
14 println!("cargo:rustc-cfg=armv7m");
15 println!("cargo:rustc-cfg=armv7em"); // (not currently used)
16 } else if target.starts_with("thumbv8m.base") {
17 println!("cargo:rustc-cfg=cortex_m");
18 println!("cargo:rustc-cfg=armv8m");
19 println!("cargo:rustc-cfg=armv8m_base");
20 } else if target.starts_with("thumbv8m.main") {
21 println!("cargo:rustc-cfg=cortex_m");
22 println!("cargo:rustc-cfg=armv8m");
23 println!("cargo:rustc-cfg=armv8m_main");
24 }
25
26 if target.ends_with("-eabihf") {
27 println!("cargo:rustc-cfg=has_fpu");
28 }
29}
diff --git a/embassy-sync/src/blocking_mutex/mod.rs b/embassy-sync/src/blocking_mutex/mod.rs
new file mode 100644
index 000000000..8a4a4c642
--- /dev/null
+++ b/embassy-sync/src/blocking_mutex/mod.rs
@@ -0,0 +1,189 @@
1//! Blocking mutex.
2//!
3//! This module provides a blocking mutex that can be used to synchronize data.
4pub mod raw;
5
6use core::cell::UnsafeCell;
7
8use self::raw::RawMutex;
9
10/// Blocking mutex (not async)
11///
12/// Provides a blocking mutual exclusion primitive backed by an implementation of [`raw::RawMutex`].
13///
14/// Which implementation you select depends on the context in which you're using the mutex, and you can choose which kind
15/// of interior mutability fits your use case.
16///
17/// Use [`CriticalSectionMutex`] when data can be shared between threads and interrupts.
18///
19/// Use [`NoopMutex`] when data is only shared between tasks running on the same executor.
20///
21/// Use [`ThreadModeMutex`] when data is shared between tasks running on the same executor but you want a global singleton.
22///
23/// In all cases, the blocking mutex is intended to be short lived and not held across await points.
24/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points.
25pub struct Mutex<R, T: ?Sized> {
26 // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets
27 // to run BEFORE dropping `data`.
28 raw: R,
29 data: UnsafeCell<T>,
30}
31
32unsafe impl<R: RawMutex + Send, T: ?Sized + Send> Send for Mutex<R, T> {}
33unsafe impl<R: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<R, T> {}
34
35impl<R: RawMutex, T> Mutex<R, T> {
36 /// Creates a new mutex in an unlocked state ready for use.
37 #[inline]
38 pub const fn new(val: T) -> Mutex<R, T> {
39 Mutex {
40 raw: R::INIT,
41 data: UnsafeCell::new(val),
42 }
43 }
44
45 /// Creates a critical section and grants temporary access to the protected data.
46 pub fn lock<U>(&self, f: impl FnOnce(&T) -> U) -> U {
47 self.raw.lock(|| {
48 let ptr = self.data.get() as *const T;
49 let inner = unsafe { &*ptr };
50 f(inner)
51 })
52 }
53}
54
55impl<R, T> Mutex<R, T> {
56 /// Creates a new mutex based on a pre-existing raw mutex.
57 ///
58 /// This allows creating a mutex in a constant context on stable Rust.
59 #[inline]
60 pub const fn const_new(raw_mutex: R, val: T) -> Mutex<R, T> {
61 Mutex {
62 raw: raw_mutex,
63 data: UnsafeCell::new(val),
64 }
65 }
66
67 /// Consumes this mutex, returning the underlying data.
68 #[inline]
69 pub fn into_inner(self) -> T {
70 self.data.into_inner()
71 }
72
73 /// Returns a mutable reference to the underlying data.
74 ///
75 /// Since this call borrows the `Mutex` mutably, no actual locking needs to
76 /// take place---the mutable borrow statically guarantees no locks exist.
77 #[inline]
78 pub fn get_mut(&mut self) -> &mut T {
79 unsafe { &mut *self.data.get() }
80 }
81}
82
83/// A mutex that allows borrowing data across executors and interrupts.
84///
85/// # Safety
86///
87/// This mutex is safe to share between different executors and interrupts.
88pub type CriticalSectionMutex<T> = Mutex<raw::CriticalSectionRawMutex, T>;
89
90/// A mutex that allows borrowing data in the context of a single executor.
91///
92/// # Safety
93///
94/// **This Mutex is only safe within a single executor.**
95pub type NoopMutex<T> = Mutex<raw::NoopRawMutex, T>;
96
97impl<T> Mutex<raw::CriticalSectionRawMutex, T> {
98 /// Borrows the data for the duration of the critical section
99 pub fn borrow<'cs>(&'cs self, _cs: critical_section::CriticalSection<'cs>) -> &'cs T {
100 let ptr = self.data.get() as *const T;
101 unsafe { &*ptr }
102 }
103}
104
105impl<T> Mutex<raw::NoopRawMutex, T> {
106 /// Borrows the data
107 pub fn borrow(&self) -> &T {
108 let ptr = self.data.get() as *const T;
109 unsafe { &*ptr }
110 }
111}
112
113// ThreadModeMutex does NOT use the generic mutex from above because it's special:
114// it's Send+Sync even if T: !Send. There's no way to do that without specialization (I think?).
115//
116// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example),
117// but that will require T: Send even though it shouldn't be needed.
118
119#[cfg(any(cortex_m, feature = "std"))]
120pub use thread_mode_mutex::*;
121#[cfg(any(cortex_m, feature = "std"))]
122mod thread_mode_mutex {
123 use super::*;
124
125 /// A "mutex" that only allows borrowing from thread mode.
126 ///
127 /// # Safety
128 ///
129 /// **This Mutex is only safe on single-core systems.**
130 ///
131 /// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access.
132 pub struct ThreadModeMutex<T: ?Sized> {
133 inner: UnsafeCell<T>,
134 }
135
136 // NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode.
137 // Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can
138 // be Send+Sync even if T is not Send (unlike CriticalSectionMutex)
139 unsafe impl<T: ?Sized> Sync for ThreadModeMutex<T> {}
140 unsafe impl<T: ?Sized> Send for ThreadModeMutex<T> {}
141
142 impl<T> ThreadModeMutex<T> {
143 /// Creates a new mutex
144 pub const fn new(value: T) -> Self {
145 ThreadModeMutex {
146 inner: UnsafeCell::new(value),
147 }
148 }
149 }
150
151 impl<T: ?Sized> ThreadModeMutex<T> {
152 /// Lock the `ThreadModeMutex`, granting access to the data.
153 ///
154 /// # Panics
155 ///
156 /// This will panic if not currently running in thread mode.
157 pub fn lock<R>(&self, f: impl FnOnce(&T) -> R) -> R {
158 f(self.borrow())
159 }
160
161 /// Borrows the data
162 ///
163 /// # Panics
164 ///
165 /// This will panic if not currently running in thread mode.
166 pub fn borrow(&self) -> &T {
167 assert!(
168 raw::in_thread_mode(),
169 "ThreadModeMutex can only be borrowed from thread mode."
170 );
171 unsafe { &*self.inner.get() }
172 }
173 }
174
175 impl<T: ?Sized> Drop for ThreadModeMutex<T> {
176 fn drop(&mut self) {
177 // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so
178 // `drop` needs the same guarantees as `lock`. `ThreadModeMutex<T>` is Send even if
179 // T isn't, so without this check a user could create a ThreadModeMutex in thread mode,
180 // send it to interrupt context and drop it there, which would "send" a T even if T is not Send.
181 assert!(
182 raw::in_thread_mode(),
183 "ThreadModeMutex can only be dropped from thread mode."
184 );
185
186 // Drop of the inner `T` happens after this.
187 }
188 }
189}
diff --git a/embassy-sync/src/blocking_mutex/raw.rs b/embassy-sync/src/blocking_mutex/raw.rs
new file mode 100644
index 000000000..15796f1b2
--- /dev/null
+++ b/embassy-sync/src/blocking_mutex/raw.rs
@@ -0,0 +1,149 @@
1//! Mutex primitives.
2//!
3//! This module provides a trait for mutexes that can be used in different contexts.
4use core::marker::PhantomData;
5
6/// Raw mutex trait.
7///
8/// This mutex is "raw", which means it does not actually contain the protected data, it
9/// just implements the mutex mechanism. For most uses you should use [`super::Mutex`] instead,
10/// which is generic over a RawMutex and contains the protected data.
11///
12/// Note that, unlike other mutexes, implementations only guarantee no
13/// concurrent access from other threads: concurrent access from the current
14/// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly.
15///
16/// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access
17/// to the data, it is not enough to guarantee exclusive (`&mut`) access.
18///
19/// # Safety
20///
21/// RawMutex implementations must ensure that, while locked, no other thread can lock
22/// the RawMutex concurrently.
23///
24/// Unsafe code is allowed to rely on this fact, so incorrect implementations will cause undefined behavior.
25pub unsafe trait RawMutex {
26 /// Create a new `RawMutex` instance.
27 ///
28 /// This is a const instead of a method to allow creating instances in const context.
29 const INIT: Self;
30
31 /// Lock this `RawMutex`.
32 fn lock<R>(&self, f: impl FnOnce() -> R) -> R;
33}
34
35/// A mutex that allows borrowing data across executors and interrupts.
36///
37/// # Safety
38///
39/// This mutex is safe to share between different executors and interrupts.
40pub struct CriticalSectionRawMutex {
41 _phantom: PhantomData<()>,
42}
43unsafe impl Send for CriticalSectionRawMutex {}
44unsafe impl Sync for CriticalSectionRawMutex {}
45
46impl CriticalSectionRawMutex {
47 /// Create a new `CriticalSectionRawMutex`.
48 pub const fn new() -> Self {
49 Self { _phantom: PhantomData }
50 }
51}
52
53unsafe impl RawMutex for CriticalSectionRawMutex {
54 const INIT: Self = Self::new();
55
56 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
57 critical_section::with(|_| f())
58 }
59}
60
61// ================
62
63/// A mutex that allows borrowing data in the context of a single executor.
64///
65/// # Safety
66///
67/// **This Mutex is only safe within a single executor.**
68pub struct NoopRawMutex {
69 _phantom: PhantomData<*mut ()>,
70}
71
72unsafe impl Send for NoopRawMutex {}
73
74impl NoopRawMutex {
75 /// Create a new `NoopRawMutex`.
76 pub const fn new() -> Self {
77 Self { _phantom: PhantomData }
78 }
79}
80
81unsafe impl RawMutex for NoopRawMutex {
82 const INIT: Self = Self::new();
83 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
84 f()
85 }
86}
87
88// ================
89
90#[cfg(any(cortex_m, feature = "std"))]
91mod thread_mode {
92 use super::*;
93
94 /// A "mutex" that only allows borrowing from thread mode.
95 ///
96 /// # Safety
97 ///
98 /// **This Mutex is only safe on single-core systems.**
99 ///
100 /// On multi-core systems, a `ThreadModeRawMutex` **is not sufficient** to ensure exclusive access.
101 pub struct ThreadModeRawMutex {
102 _phantom: PhantomData<()>,
103 }
104
105 unsafe impl Send for ThreadModeRawMutex {}
106 unsafe impl Sync for ThreadModeRawMutex {}
107
108 impl ThreadModeRawMutex {
109 /// Create a new `ThreadModeRawMutex`.
110 pub const fn new() -> Self {
111 Self { _phantom: PhantomData }
112 }
113 }
114
115 unsafe impl RawMutex for ThreadModeRawMutex {
116 const INIT: Self = Self::new();
117 fn lock<R>(&self, f: impl FnOnce() -> R) -> R {
118 assert!(in_thread_mode(), "ThreadModeMutex can only be locked from thread mode.");
119
120 f()
121 }
122 }
123
124 impl Drop for ThreadModeRawMutex {
125 fn drop(&mut self) {
126 // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so
127 // `drop` needs the same guarantees as `lock`. `ThreadModeMutex<T>` is Send even if
128 // T isn't, so without this check a user could create a ThreadModeMutex in thread mode,
129 // send it to interrupt context and drop it there, which would "send" a T even if T is not Send.
130 assert!(
131 in_thread_mode(),
132 "ThreadModeMutex can only be dropped from thread mode."
133 );
134
135 // Drop of the inner `T` happens after this.
136 }
137 }
138
139 pub(crate) fn in_thread_mode() -> bool {
140 #[cfg(feature = "std")]
141 return Some("main") == std::thread::current().name();
142
143 #[cfg(not(feature = "std"))]
144 // ICSR.VECTACTIVE == 0
145 return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0;
146 }
147}
148#[cfg(any(cortex_m, feature = "std"))]
149pub use thread_mode::*;
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
new file mode 100644
index 000000000..76f42d0e7
--- /dev/null
+++ b/embassy-sync/src/channel.rs
@@ -0,0 +1,596 @@
1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20
21use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32/// Send-only access to a [`Channel`].
33#[derive(Copy)]
34pub struct Sender<'ch, M, T, const N: usize>
35where
36 M: RawMutex,
37{
38 channel: &'ch Channel<M, T, N>,
39}
40
41impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
42where
43 M: RawMutex,
44{
45 fn clone(&self) -> Self {
46 Sender { channel: self.channel }
47 }
48}
49
50impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
51where
52 M: RawMutex,
53{
54 /// Sends a value.
55 ///
56 /// See [`Channel::send()`]
57 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
58 self.channel.send(message)
59 }
60
61 /// Attempt to immediately send a message.
62 ///
63 /// See [`Channel::send()`]
64 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67}
68
69/// Send-only access to a [`Channel`] without knowing channel size.
70#[derive(Copy)]
71pub struct DynamicSender<'ch, T> {
72 channel: &'ch dyn DynamicChannel<T>,
73}
74
75impl<'ch, T> Clone for DynamicSender<'ch, T> {
76 fn clone(&self) -> Self {
77 DynamicSender { channel: self.channel }
78 }
79}
80
81impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
82where
83 M: RawMutex,
84{
85 fn from(s: Sender<'ch, M, T, N>) -> Self {
86 Self { channel: s.channel }
87 }
88}
89
90impl<'ch, T> DynamicSender<'ch, T> {
91 /// Sends a value.
92 ///
93 /// See [`Channel::send()`]
94 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
95 DynamicSendFuture {
96 channel: self.channel,
97 message: Some(message),
98 }
99 }
100
101 /// Attempt to immediately send a message.
102 ///
103 /// See [`Channel::send()`]
104 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
105 self.channel.try_send_with_context(message, None)
106 }
107}
108
109/// Receive-only access to a [`Channel`].
110#[derive(Copy)]
111pub struct Receiver<'ch, M, T, const N: usize>
112where
113 M: RawMutex,
114{
115 channel: &'ch Channel<M, T, N>,
116}
117
118impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
119where
120 M: RawMutex,
121{
122 fn clone(&self) -> Self {
123 Receiver { channel: self.channel }
124 }
125}
126
127impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
128where
129 M: RawMutex,
130{
131 /// Receive the next value.
132 ///
133 /// See [`Channel::recv()`].
134 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
135 self.channel.recv()
136 }
137
138 /// Attempt to immediately receive the next value.
139 ///
140 /// See [`Channel::try_recv()`]
141 pub fn try_recv(&self) -> Result<T, TryRecvError> {
142 self.channel.try_recv()
143 }
144}
145
146/// Receive-only access to a [`Channel`] without knowing channel size.
147#[derive(Copy)]
148pub struct DynamicReceiver<'ch, T> {
149 channel: &'ch dyn DynamicChannel<T>,
150}
151
152impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
153 fn clone(&self) -> Self {
154 DynamicReceiver { channel: self.channel }
155 }
156}
157
158impl<'ch, T> DynamicReceiver<'ch, T> {
159 /// Receive the next value.
160 ///
161 /// See [`Channel::recv()`].
162 pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
163 DynamicRecvFuture { channel: self.channel }
164 }
165
166 /// Attempt to immediately receive the next value.
167 ///
168 /// See [`Channel::try_recv()`]
169 pub fn try_recv(&self) -> Result<T, TryRecvError> {
170 self.channel.try_recv_with_context(None)
171 }
172}
173
174impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
175where
176 M: RawMutex,
177{
178 fn from(s: Receiver<'ch, M, T, N>) -> Self {
179 Self { channel: s.channel }
180 }
181}
182
183/// Future returned by [`Channel::recv`] and [`Receiver::recv`].
184pub struct RecvFuture<'ch, M, T, const N: usize>
185where
186 M: RawMutex,
187{
188 channel: &'ch Channel<M, T, N>,
189}
190
191impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
192where
193 M: RawMutex,
194{
195 type Output = T;
196
197 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
198 match self.channel.try_recv_with_context(Some(cx)) {
199 Ok(v) => Poll::Ready(v),
200 Err(TryRecvError::Empty) => Poll::Pending,
201 }
202 }
203}
204
205/// Future returned by [`DynamicReceiver::recv`].
206pub struct DynamicRecvFuture<'ch, T> {
207 channel: &'ch dyn DynamicChannel<T>,
208}
209
210impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
211 type Output = T;
212
213 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
214 match self.channel.try_recv_with_context(Some(cx)) {
215 Ok(v) => Poll::Ready(v),
216 Err(TryRecvError::Empty) => Poll::Pending,
217 }
218 }
219}
220
221/// Future returned by [`Channel::send`] and [`Sender::send`].
222pub struct SendFuture<'ch, M, T, const N: usize>
223where
224 M: RawMutex,
225{
226 channel: &'ch Channel<M, T, N>,
227 message: Option<T>,
228}
229
230impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
231where
232 M: RawMutex,
233{
234 type Output = ();
235
236 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
237 match self.message.take() {
238 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
239 Ok(..) => Poll::Ready(()),
240 Err(TrySendError::Full(m)) => {
241 self.message = Some(m);
242 Poll::Pending
243 }
244 },
245 None => panic!("Message cannot be None"),
246 }
247 }
248}
249
250impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
251
252/// Future returned by [`DynamicSender::send`].
253pub struct DynamicSendFuture<'ch, T> {
254 channel: &'ch dyn DynamicChannel<T>,
255 message: Option<T>,
256}
257
258impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
259 type Output = ();
260
261 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 match self.message.take() {
263 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
264 Ok(..) => Poll::Ready(()),
265 Err(TrySendError::Full(m)) => {
266 self.message = Some(m);
267 Poll::Pending
268 }
269 },
270 None => panic!("Message cannot be None"),
271 }
272 }
273}
274
275impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
276
277trait DynamicChannel<T> {
278 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
279
280 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
281}
282
283/// Error returned by [`try_recv`](Channel::try_recv).
284#[derive(PartialEq, Eq, Clone, Copy, Debug)]
285#[cfg_attr(feature = "defmt", derive(defmt::Format))]
286pub enum TryRecvError {
287 /// A message could not be received because the channel is empty.
288 Empty,
289}
290
291/// Error returned by [`try_send`](Channel::try_send).
292#[derive(PartialEq, Eq, Clone, Copy, Debug)]
293#[cfg_attr(feature = "defmt", derive(defmt::Format))]
294pub enum TrySendError<T> {
295 /// The data could not be sent on the channel because the channel is
296 /// currently full and sending would require blocking.
297 Full(T),
298}
299
300struct ChannelState<T, const N: usize> {
301 queue: Deque<T, N>,
302 receiver_waker: WakerRegistration,
303 senders_waker: WakerRegistration,
304}
305
306impl<T, const N: usize> ChannelState<T, N> {
307 const fn new() -> Self {
308 ChannelState {
309 queue: Deque::new(),
310 receiver_waker: WakerRegistration::new(),
311 senders_waker: WakerRegistration::new(),
312 }
313 }
314
315 fn try_recv(&mut self) -> Result<T, TryRecvError> {
316 self.try_recv_with_context(None)
317 }
318
319 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
320 if self.queue.is_full() {
321 self.senders_waker.wake();
322 }
323
324 if let Some(message) = self.queue.pop_front() {
325 Ok(message)
326 } else {
327 if let Some(cx) = cx {
328 self.receiver_waker.register(cx.waker());
329 }
330 Err(TryRecvError::Empty)
331 }
332 }
333
334 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
335 self.try_send_with_context(message, None)
336 }
337
338 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
339 match self.queue.push_back(message) {
340 Ok(()) => {
341 self.receiver_waker.wake();
342 Ok(())
343 }
344 Err(message) => {
345 if let Some(cx) = cx {
346 self.senders_waker.register(cx.waker());
347 }
348 Err(TrySendError::Full(message))
349 }
350 }
351 }
352}
353
354/// A bounded channel for communicating between asynchronous tasks
355/// with backpressure.
356///
357/// The channel will buffer up to the provided number of messages. Once the
358/// buffer is full, attempts to `send` new messages will wait until a message is
359/// received from the channel.
360///
361/// All data sent will become available in the same order as it was sent.
362pub struct Channel<M, T, const N: usize>
363where
364 M: RawMutex,
365{
366 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
367}
368
369impl<M, T, const N: usize> Channel<M, T, N>
370where
371 M: RawMutex,
372{
373 /// Establish a new bounded channel. For example, to create one with a NoopMutex:
374 ///
375 /// ```
376 /// use embassy_sync::channel::Channel;
377 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
378 ///
379 /// // Declare a bounded channel of 3 u32s.
380 /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
381 /// ```
382 pub const fn new() -> Self {
383 Self {
384 inner: Mutex::new(RefCell::new(ChannelState::new())),
385 }
386 }
387
388 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
389 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
390 }
391
392 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
393 self.lock(|c| c.try_recv_with_context(cx))
394 }
395
396 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
397 self.lock(|c| c.try_send_with_context(m, cx))
398 }
399
400 /// Get a sender for this channel.
401 pub fn sender(&self) -> Sender<'_, M, T, N> {
402 Sender { channel: self }
403 }
404
405 /// Get a receiver for this channel.
406 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
407 Receiver { channel: self }
408 }
409
410 /// Send a value, waiting until there is capacity.
411 ///
412 /// Sending completes when the value has been pushed to the channel's queue.
413 /// This doesn't mean the value has been received yet.
414 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
415 SendFuture {
416 channel: self,
417 message: Some(message),
418 }
419 }
420
421 /// Attempt to immediately send a message.
422 ///
423 /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
424 /// buffer is full, instead of waiting.
425 ///
426 /// # Errors
427 ///
428 /// If the channel capacity has been reached, i.e., the channel has `n`
429 /// buffered values where `n` is the argument passed to [`Channel`], then an
430 /// error is returned.
431 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
432 self.lock(|c| c.try_send(message))
433 }
434
435 /// Receive the next value.
436 ///
437 /// If there are no messages in the channel's buffer, this method will
438 /// wait until a message is sent.
439 pub fn recv(&self) -> RecvFuture<'_, M, T, N> {
440 RecvFuture { channel: self }
441 }
442
443 /// Attempt to immediately receive a message.
444 ///
445 /// This method will either receive a message from the channel immediately or return an error
446 /// if the channel is empty.
447 pub fn try_recv(&self) -> Result<T, TryRecvError> {
448 self.lock(|c| c.try_recv())
449 }
450}
451
452/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
453/// tradeoff cost of dynamic dispatch.
454impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
455where
456 M: RawMutex,
457{
458 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
459 Channel::try_send_with_context(self, m, cx)
460 }
461
462 fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
463 Channel::try_recv_with_context(self, cx)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use core::time::Duration;
470
471 use futures_executor::ThreadPool;
472 use futures_timer::Delay;
473 use futures_util::task::SpawnExt;
474 use static_cell::StaticCell;
475
476 use super::*;
477 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
478
479 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
480 c.queue.capacity() - c.queue.len()
481 }
482
483 #[test]
484 fn sending_once() {
485 let mut c = ChannelState::<u32, 3>::new();
486 assert!(c.try_send(1).is_ok());
487 assert_eq!(capacity(&c), 2);
488 }
489
490 #[test]
491 fn sending_when_full() {
492 let mut c = ChannelState::<u32, 3>::new();
493 let _ = c.try_send(1);
494 let _ = c.try_send(1);
495 let _ = c.try_send(1);
496 match c.try_send(2) {
497 Err(TrySendError::Full(2)) => assert!(true),
498 _ => assert!(false),
499 }
500 assert_eq!(capacity(&c), 0);
501 }
502
503 #[test]
504 fn receiving_once_with_one_send() {
505 let mut c = ChannelState::<u32, 3>::new();
506 assert!(c.try_send(1).is_ok());
507 assert_eq!(c.try_recv().unwrap(), 1);
508 assert_eq!(capacity(&c), 3);
509 }
510
511 #[test]
512 fn receiving_when_empty() {
513 let mut c = ChannelState::<u32, 3>::new();
514 match c.try_recv() {
515 Err(TryRecvError::Empty) => assert!(true),
516 _ => assert!(false),
517 }
518 assert_eq!(capacity(&c), 3);
519 }
520
521 #[test]
522 fn simple_send_and_receive() {
523 let c = Channel::<NoopRawMutex, u32, 3>::new();
524 assert!(c.try_send(1).is_ok());
525 assert_eq!(c.try_recv().unwrap(), 1);
526 }
527
528 #[test]
529 fn cloning() {
530 let c = Channel::<NoopRawMutex, u32, 3>::new();
531 let r1 = c.receiver();
532 let s1 = c.sender();
533
534 let _ = r1.clone();
535 let _ = s1.clone();
536 }
537
538 #[test]
539 fn dynamic_dispatch() {
540 let c = Channel::<NoopRawMutex, u32, 3>::new();
541 let s: DynamicSender<'_, u32> = c.sender().into();
542 let r: DynamicReceiver<'_, u32> = c.receiver().into();
543
544 assert!(s.try_send(1).is_ok());
545 assert_eq!(r.try_recv().unwrap(), 1);
546 }
547
548 #[futures_test::test]
549 async fn receiver_receives_given_try_send_async() {
550 let executor = ThreadPool::new().unwrap();
551
552 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
553 let c = &*CHANNEL.init(Channel::new());
554 let c2 = c;
555 assert!(executor
556 .spawn(async move {
557 assert!(c2.try_send(1).is_ok());
558 })
559 .is_ok());
560 assert_eq!(c.recv().await, 1);
561 }
562
563 #[futures_test::test]
564 async fn sender_send_completes_if_capacity() {
565 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
566 c.send(1).await;
567 assert_eq!(c.recv().await, 1);
568 }
569
570 #[futures_test::test]
571 async fn senders_sends_wait_until_capacity() {
572 let executor = ThreadPool::new().unwrap();
573
574 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
575 let c = &*CHANNEL.init(Channel::new());
576 assert!(c.try_send(1).is_ok());
577
578 let c2 = c;
579 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
580 let c2 = c;
581 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
582 // Wish I could think of a means of determining that the async send is waiting instead.
583 // However, I've used the debugger to observe that the send does indeed wait.
584 Delay::new(Duration::from_millis(500)).await;
585 assert_eq!(c.recv().await, 1);
586 assert!(executor
587 .spawn(async move {
588 loop {
589 c.recv().await;
590 }
591 })
592 .is_ok());
593 send_task_1.unwrap().await;
594 send_task_2.unwrap().await;
595 }
596}
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
new file mode 100644
index 000000000..f8bb0a035
--- /dev/null
+++ b/embassy-sync/src/fmt.rs
@@ -0,0 +1,228 @@
1#![macro_use]
2#![allow(unused_macros)]
3
4#[cfg(all(feature = "defmt", feature = "log"))]
5compile_error!("You may not enable both `defmt` and `log` features.");
6
7macro_rules! assert {
8 ($($x:tt)*) => {
9 {
10 #[cfg(not(feature = "defmt"))]
11 ::core::assert!($($x)*);
12 #[cfg(feature = "defmt")]
13 ::defmt::assert!($($x)*);
14 }
15 };
16}
17
18macro_rules! assert_eq {
19 ($($x:tt)*) => {
20 {
21 #[cfg(not(feature = "defmt"))]
22 ::core::assert_eq!($($x)*);
23 #[cfg(feature = "defmt")]
24 ::defmt::assert_eq!($($x)*);
25 }
26 };
27}
28
29macro_rules! assert_ne {
30 ($($x:tt)*) => {
31 {
32 #[cfg(not(feature = "defmt"))]
33 ::core::assert_ne!($($x)*);
34 #[cfg(feature = "defmt")]
35 ::defmt::assert_ne!($($x)*);
36 }
37 };
38}
39
40macro_rules! debug_assert {
41 ($($x:tt)*) => {
42 {
43 #[cfg(not(feature = "defmt"))]
44 ::core::debug_assert!($($x)*);
45 #[cfg(feature = "defmt")]
46 ::defmt::debug_assert!($($x)*);
47 }
48 };
49}
50
51macro_rules! debug_assert_eq {
52 ($($x:tt)*) => {
53 {
54 #[cfg(not(feature = "defmt"))]
55 ::core::debug_assert_eq!($($x)*);
56 #[cfg(feature = "defmt")]
57 ::defmt::debug_assert_eq!($($x)*);
58 }
59 };
60}
61
62macro_rules! debug_assert_ne {
63 ($($x:tt)*) => {
64 {
65 #[cfg(not(feature = "defmt"))]
66 ::core::debug_assert_ne!($($x)*);
67 #[cfg(feature = "defmt")]
68 ::defmt::debug_assert_ne!($($x)*);
69 }
70 };
71}
72
73macro_rules! todo {
74 ($($x:tt)*) => {
75 {
76 #[cfg(not(feature = "defmt"))]
77 ::core::todo!($($x)*);
78 #[cfg(feature = "defmt")]
79 ::defmt::todo!($($x)*);
80 }
81 };
82}
83
84macro_rules! unreachable {
85 ($($x:tt)*) => {
86 {
87 #[cfg(not(feature = "defmt"))]
88 ::core::unreachable!($($x)*);
89 #[cfg(feature = "defmt")]
90 ::defmt::unreachable!($($x)*);
91 }
92 };
93}
94
95macro_rules! panic {
96 ($($x:tt)*) => {
97 {
98 #[cfg(not(feature = "defmt"))]
99 ::core::panic!($($x)*);
100 #[cfg(feature = "defmt")]
101 ::defmt::panic!($($x)*);
102 }
103 };
104}
105
106macro_rules! trace {
107 ($s:literal $(, $x:expr)* $(,)?) => {
108 {
109 #[cfg(feature = "log")]
110 ::log::trace!($s $(, $x)*);
111 #[cfg(feature = "defmt")]
112 ::defmt::trace!($s $(, $x)*);
113 #[cfg(not(any(feature = "log", feature="defmt")))]
114 let _ = ($( & $x ),*);
115 }
116 };
117}
118
119macro_rules! debug {
120 ($s:literal $(, $x:expr)* $(,)?) => {
121 {
122 #[cfg(feature = "log")]
123 ::log::debug!($s $(, $x)*);
124 #[cfg(feature = "defmt")]
125 ::defmt::debug!($s $(, $x)*);
126 #[cfg(not(any(feature = "log", feature="defmt")))]
127 let _ = ($( & $x ),*);
128 }
129 };
130}
131
132macro_rules! info {
133 ($s:literal $(, $x:expr)* $(,)?) => {
134 {
135 #[cfg(feature = "log")]
136 ::log::info!($s $(, $x)*);
137 #[cfg(feature = "defmt")]
138 ::defmt::info!($s $(, $x)*);
139 #[cfg(not(any(feature = "log", feature="defmt")))]
140 let _ = ($( & $x ),*);
141 }
142 };
143}
144
145macro_rules! warn {
146 ($s:literal $(, $x:expr)* $(,)?) => {
147 {
148 #[cfg(feature = "log")]
149 ::log::warn!($s $(, $x)*);
150 #[cfg(feature = "defmt")]
151 ::defmt::warn!($s $(, $x)*);
152 #[cfg(not(any(feature = "log", feature="defmt")))]
153 let _ = ($( & $x ),*);
154 }
155 };
156}
157
158macro_rules! error {
159 ($s:literal $(, $x:expr)* $(,)?) => {
160 {
161 #[cfg(feature = "log")]
162 ::log::error!($s $(, $x)*);
163 #[cfg(feature = "defmt")]
164 ::defmt::error!($s $(, $x)*);
165 #[cfg(not(any(feature = "log", feature="defmt")))]
166 let _ = ($( & $x ),*);
167 }
168 };
169}
170
171#[cfg(feature = "defmt")]
172macro_rules! unwrap {
173 ($($x:tt)*) => {
174 ::defmt::unwrap!($($x)*)
175 };
176}
177
178#[cfg(not(feature = "defmt"))]
179macro_rules! unwrap {
180 ($arg:expr) => {
181 match $crate::fmt::Try::into_result($arg) {
182 ::core::result::Result::Ok(t) => t,
183 ::core::result::Result::Err(e) => {
184 ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
185 }
186 }
187 };
188 ($arg:expr, $($msg:expr),+ $(,)? ) => {
189 match $crate::fmt::Try::into_result($arg) {
190 ::core::result::Result::Ok(t) => t,
191 ::core::result::Result::Err(e) => {
192 ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
193 }
194 }
195 }
196}
197
198#[cfg(feature = "defmt-timestamp-uptime")]
199defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
200
201#[derive(Debug, Copy, Clone, Eq, PartialEq)]
202pub struct NoneError;
203
204pub trait Try {
205 type Ok;
206 type Error;
207 fn into_result(self) -> Result<Self::Ok, Self::Error>;
208}
209
210impl<T> Try for Option<T> {
211 type Ok = T;
212 type Error = NoneError;
213
214 #[inline]
215 fn into_result(self) -> Result<T, NoneError> {
216 self.ok_or(NoneError)
217 }
218}
219
220impl<T, E> Try for Result<T, E> {
221 type Ok = T;
222 type Error = E;
223
224 #[inline]
225 fn into_result(self) -> Self {
226 self
227 }
228}
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
new file mode 100644
index 000000000..8e81e5cbe
--- /dev/null
+++ b/embassy-sync/src/lib.rs
@@ -0,0 +1,19 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))]
3#![allow(clippy::new_without_default)]
4#![doc = include_str!("../../README.md")]
5#![warn(missing_docs)]
6
7// This mod MUST go first, so that the others see its macros.
8pub(crate) mod fmt;
9
10// internal use
11mod ring_buffer;
12
13pub mod blocking_mutex;
14pub mod channel;
15pub mod mutex;
16pub mod pipe;
17pub mod pubsub;
18pub mod signal;
19pub mod waitqueue;
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
new file mode 100644
index 000000000..75a6e8dd3
--- /dev/null
+++ b/embassy-sync/src/mutex.rs
@@ -0,0 +1,167 @@
1//! Async mutex.
2//!
3//! This module provides a mutex that can be used to synchronize data between asynchronous tasks.
4use core::cell::{RefCell, UnsafeCell};
5use core::ops::{Deref, DerefMut};
6use core::task::Poll;
7
8use futures_util::future::poll_fn;
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex as BlockingMutex;
12use crate::waitqueue::WakerRegistration;
13
14/// Error returned by [`Mutex::try_lock`]
15#[derive(PartialEq, Eq, Clone, Copy, Debug)]
16#[cfg_attr(feature = "defmt", derive(defmt::Format))]
17pub struct TryLockError;
18
19struct State {
20 locked: bool,
21 waker: WakerRegistration,
22}
23
24/// Async mutex.
25///
26/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex).
27/// The raw mutex is used to guard access to the internal "is locked" flag. It
28/// is held for very short periods only, while locking and unlocking. It is *not* held
29/// for the entire time the async Mutex is locked.
30///
31/// Which implementation you select depends on the context in which you're using the mutex.
32///
33/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts.
34///
35/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor.
36///
37/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton.
38///
39pub struct Mutex<M, T>
40where
41 M: RawMutex,
42 T: ?Sized,
43{
44 state: BlockingMutex<M, RefCell<State>>,
45 inner: UnsafeCell<T>,
46}
47
48unsafe impl<M: RawMutex + Send, T: ?Sized + Send> Send for Mutex<M, T> {}
49unsafe impl<M: RawMutex + Sync, T: ?Sized + Send> Sync for Mutex<M, T> {}
50
51/// Async mutex.
52impl<M, T> Mutex<M, T>
53where
54 M: RawMutex,
55{
56 /// Create a new mutex with the given value.
57 pub const fn new(value: T) -> Self {
58 Self {
59 inner: UnsafeCell::new(value),
60 state: BlockingMutex::new(RefCell::new(State {
61 locked: false,
62 waker: WakerRegistration::new(),
63 })),
64 }
65 }
66}
67
68impl<M, T> Mutex<M, T>
69where
70 M: RawMutex,
71 T: ?Sized,
72{
73 /// Lock the mutex.
74 ///
75 /// This will wait for the mutex to be unlocked if it's already locked.
76 pub async fn lock(&self) -> MutexGuard<'_, M, T> {
77 poll_fn(|cx| {
78 let ready = self.state.lock(|s| {
79 let mut s = s.borrow_mut();
80 if s.locked {
81 s.waker.register(cx.waker());
82 false
83 } else {
84 s.locked = true;
85 true
86 }
87 });
88
89 if ready {
90 Poll::Ready(MutexGuard { mutex: self })
91 } else {
92 Poll::Pending
93 }
94 })
95 .await
96 }
97
98 /// Attempt to immediately lock the mutex.
99 ///
100 /// If the mutex is already locked, this will return an error instead of waiting.
101 pub fn try_lock(&self) -> Result<MutexGuard<'_, M, T>, TryLockError> {
102 self.state.lock(|s| {
103 let mut s = s.borrow_mut();
104 if s.locked {
105 Err(TryLockError)
106 } else {
107 s.locked = true;
108 Ok(())
109 }
110 })?;
111
112 Ok(MutexGuard { mutex: self })
113 }
114}
115
116/// Async mutex guard.
117///
118/// Owning an instance of this type indicates having
119/// successfully locked the mutex, and grants access to the contents.
120///
121/// Dropping it unlocks the mutex.
122pub struct MutexGuard<'a, M, T>
123where
124 M: RawMutex,
125 T: ?Sized,
126{
127 mutex: &'a Mutex<M, T>,
128}
129
130impl<'a, M, T> Drop for MutexGuard<'a, M, T>
131where
132 M: RawMutex,
133 T: ?Sized,
134{
135 fn drop(&mut self) {
136 self.mutex.state.lock(|s| {
137 let mut s = s.borrow_mut();
138 s.locked = false;
139 s.waker.wake();
140 })
141 }
142}
143
144impl<'a, M, T> Deref for MutexGuard<'a, M, T>
145where
146 M: RawMutex,
147 T: ?Sized,
148{
149 type Target = T;
150 fn deref(&self) -> &Self::Target {
151 // Safety: the MutexGuard represents exclusive access to the contents
152 // of the mutex, so it's OK to get it.
153 unsafe { &*(self.mutex.inner.get() as *const T) }
154 }
155}
156
157impl<'a, M, T> DerefMut for MutexGuard<'a, M, T>
158where
159 M: RawMutex,
160 T: ?Sized,
161{
162 fn deref_mut(&mut self) -> &mut Self::Target {
163 // Safety: the MutexGuard represents exclusive access to the contents
164 // of the mutex, so it's OK to get it.
165 unsafe { &mut *(self.mutex.inner.get()) }
166 }
167}
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
new file mode 100644
index 000000000..7d64b648e
--- /dev/null
+++ b/embassy-sync/src/pipe.rs
@@ -0,0 +1,551 @@
1//! Async byte stream pipe.
2
3use core::cell::RefCell;
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use crate::blocking_mutex::raw::RawMutex;
9use crate::blocking_mutex::Mutex;
10use crate::ring_buffer::RingBuffer;
11use crate::waitqueue::WakerRegistration;
12
13/// Write-only access to a [`Pipe`].
14#[derive(Copy)]
15pub struct Writer<'p, M, const N: usize>
16where
17 M: RawMutex,
18{
19 pipe: &'p Pipe<M, N>,
20}
21
22impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
23where
24 M: RawMutex,
25{
26 fn clone(&self) -> Self {
27 Writer { pipe: self.pipe }
28 }
29}
30
31impl<'p, M, const N: usize> Writer<'p, M, N>
32where
33 M: RawMutex,
34{
35 /// Writes a value.
36 ///
37 /// See [`Pipe::write()`]
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
39 self.pipe.write(buf)
40 }
41
42 /// Attempt to immediately write a message.
43 ///
44 /// See [`Pipe::write()`]
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
46 self.pipe.try_write(buf)
47 }
48}
49
50/// Future returned by [`Pipe::write`] and [`Writer::write`].
51pub struct WriteFuture<'p, M, const N: usize>
52where
53 M: RawMutex,
54{
55 pipe: &'p Pipe<M, N>,
56 buf: &'p [u8],
57}
58
59impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
60where
61 M: RawMutex,
62{
63 type Output = usize;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 match self.pipe.try_write_with_context(Some(cx), self.buf) {
67 Ok(n) => Poll::Ready(n),
68 Err(TryWriteError::Full) => Poll::Pending,
69 }
70 }
71}
72
73impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
74
75/// Read-only access to a [`Pipe`].
76#[derive(Copy)]
77pub struct Reader<'p, M, const N: usize>
78where
79 M: RawMutex,
80{
81 pipe: &'p Pipe<M, N>,
82}
83
84impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
85where
86 M: RawMutex,
87{
88 fn clone(&self) -> Self {
89 Reader { pipe: self.pipe }
90 }
91}
92
93impl<'p, M, const N: usize> Reader<'p, M, N>
94where
95 M: RawMutex,
96{
97 /// Reads a value.
98 ///
99 /// See [`Pipe::read()`]
100 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
101 self.pipe.read(buf)
102 }
103
104 /// Attempt to immediately read a message.
105 ///
106 /// See [`Pipe::read()`]
107 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
108 self.pipe.try_read(buf)
109 }
110}
111
112/// Future returned by [`Pipe::read`] and [`Reader::read`].
113pub struct ReadFuture<'p, M, const N: usize>
114where
115 M: RawMutex,
116{
117 pipe: &'p Pipe<M, N>,
118 buf: &'p mut [u8],
119}
120
121impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
122where
123 M: RawMutex,
124{
125 type Output = usize;
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 match self.pipe.try_read_with_context(Some(cx), self.buf) {
129 Ok(n) => Poll::Ready(n),
130 Err(TryReadError::Empty) => Poll::Pending,
131 }
132 }
133}
134
135impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
136
137/// Error returned by [`try_read`](Pipe::try_read).
138#[derive(PartialEq, Eq, Clone, Copy, Debug)]
139#[cfg_attr(feature = "defmt", derive(defmt::Format))]
140pub enum TryReadError {
141 /// No data could be read from the pipe because it is currently
142 /// empty, and reading would require blocking.
143 Empty,
144}
145
146/// Error returned by [`try_write`](Pipe::try_write).
147#[derive(PartialEq, Eq, Clone, Copy, Debug)]
148#[cfg_attr(feature = "defmt", derive(defmt::Format))]
149pub enum TryWriteError {
150 /// No data could be written to the pipe because it is
151 /// currently full, and writing would require blocking.
152 Full,
153}
154
155struct PipeState<const N: usize> {
156 buffer: RingBuffer<N>,
157 read_waker: WakerRegistration,
158 write_waker: WakerRegistration,
159}
160
161impl<const N: usize> PipeState<N> {
162 const fn new() -> Self {
163 PipeState {
164 buffer: RingBuffer::new(),
165 read_waker: WakerRegistration::new(),
166 write_waker: WakerRegistration::new(),
167 }
168 }
169
170 fn clear(&mut self) {
171 self.buffer.clear();
172 self.write_waker.wake();
173 }
174
175 fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
176 self.try_read_with_context(None, buf)
177 }
178
179 fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
180 if self.buffer.is_full() {
181 self.write_waker.wake();
182 }
183
184 let available = self.buffer.pop_buf();
185 if available.is_empty() {
186 if let Some(cx) = cx {
187 self.read_waker.register(cx.waker());
188 }
189 return Err(TryReadError::Empty);
190 }
191
192 let n = available.len().min(buf.len());
193 buf[..n].copy_from_slice(&available[..n]);
194 self.buffer.pop(n);
195 Ok(n)
196 }
197
198 fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
199 self.try_write_with_context(None, buf)
200 }
201
202 fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
203 if self.buffer.is_empty() {
204 self.read_waker.wake();
205 }
206
207 let available = self.buffer.push_buf();
208 if available.is_empty() {
209 if let Some(cx) = cx {
210 self.write_waker.register(cx.waker());
211 }
212 return Err(TryWriteError::Full);
213 }
214
215 let n = available.len().min(buf.len());
216 available[..n].copy_from_slice(&buf[..n]);
217 self.buffer.push(n);
218 Ok(n)
219 }
220}
221
222/// A bounded pipe for communicating between asynchronous tasks
223/// with backpressure.
224///
225/// The pipe will buffer up to the provided number of messages. Once the
226/// buffer is full, attempts to `write` new messages will wait until a message is
227/// read from the pipe.
228///
229/// All data written will become available in the same order as it was written.
230pub struct Pipe<M, const N: usize>
231where
232 M: RawMutex,
233{
234 inner: Mutex<M, RefCell<PipeState<N>>>,
235}
236
237impl<M, const N: usize> Pipe<M, N>
238where
239 M: RawMutex,
240{
241 /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
242 ///
243 /// ```
244 /// use embassy_sync::pipe::Pipe;
245 /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
246 ///
247 /// // Declare a bounded pipe, with a buffer of 256 bytes.
248 /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
249 /// ```
250 pub const fn new() -> Self {
251 Self {
252 inner: Mutex::new(RefCell::new(PipeState::new())),
253 }
254 }
255
256 fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
257 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
258 }
259
260 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
261 self.lock(|c| c.try_read_with_context(cx, buf))
262 }
263
264 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
265 self.lock(|c| c.try_write_with_context(cx, buf))
266 }
267
268 /// Get a writer for this pipe.
269 pub fn writer(&self) -> Writer<'_, M, N> {
270 Writer { pipe: self }
271 }
272
273 /// Get a reader for this pipe.
274 pub fn reader(&self) -> Reader<'_, M, N> {
275 Reader { pipe: self }
276 }
277
278 /// Write a value, waiting until there is capacity.
279 ///
280 /// Writeing completes when the value has been pushed to the pipe's queue.
281 /// This doesn't mean the value has been read yet.
282 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
283 WriteFuture { pipe: self, buf }
284 }
285
286 /// Attempt to immediately write a message.
287 ///
288 /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's
289 /// buffer is full, instead of waiting.
290 ///
291 /// # Errors
292 ///
293 /// If the pipe capacity has been reached, i.e., the pipe has `n`
294 /// buffered values where `n` is the argument passed to [`Pipe`], then an
295 /// error is returned.
296 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
297 self.lock(|c| c.try_write(buf))
298 }
299
300 /// Receive the next value.
301 ///
302 /// If there are no messages in the pipe's buffer, this method will
303 /// wait until a message is written.
304 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
305 ReadFuture { pipe: self, buf }
306 }
307
308 /// Attempt to immediately read a message.
309 ///
310 /// This method will either read a message from the pipe immediately or return an error
311 /// if the pipe is empty.
312 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
313 self.lock(|c| c.try_read(buf))
314 }
315
316 /// Clear the data in the pipe's buffer.
317 pub fn clear(&self) {
318 self.lock(|c| c.clear())
319 }
320
321 /// Return whether the pipe is full (no free space in the buffer)
322 pub fn is_full(&self) -> bool {
323 self.len() == N
324 }
325
326 /// Return whether the pipe is empty (no data buffered)
327 pub fn is_empty(&self) -> bool {
328 self.len() == 0
329 }
330
331 /// Total byte capacity.
332 ///
333 /// This is the same as the `N` generic param.
334 pub fn capacity(&self) -> usize {
335 N
336 }
337
338 /// Used byte capacity.
339 pub fn len(&self) -> usize {
340 self.lock(|c| c.buffer.len())
341 }
342
343 /// Free byte capacity.
344 ///
345 /// This is equivalent to `capacity() - len()`
346 pub fn free_capacity(&self) -> usize {
347 N - self.len()
348 }
349}
350
351#[cfg(feature = "nightly")]
352mod io_impls {
353 use core::convert::Infallible;
354
355 use futures_util::FutureExt;
356
357 use super::*;
358
359 impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> {
360 type Error = Infallible;
361 }
362
363 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> {
364 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
365 where
366 Self: 'a;
367
368 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
369 Pipe::read(self, buf).map(Ok)
370 }
371 }
372
373 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> {
374 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
375 where
376 Self: 'a;
377
378 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
379 Pipe::write(self, buf).map(Ok)
380 }
381
382 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
383 where
384 Self: 'a;
385
386 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
387 futures_util::future::ready(Ok(()))
388 }
389 }
390
391 impl<M: RawMutex, const N: usize> embedded_io::Io for &Pipe<M, N> {
392 type Error = Infallible;
393 }
394
395 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> {
396 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
397 where
398 Self: 'a;
399
400 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
401 Pipe::read(self, buf).map(Ok)
402 }
403 }
404
405 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> {
406 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
407 where
408 Self: 'a;
409
410 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
411 Pipe::write(self, buf).map(Ok)
412 }
413
414 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
415 where
416 Self: 'a;
417
418 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
419 futures_util::future::ready(Ok(()))
420 }
421 }
422
423 impl<M: RawMutex, const N: usize> embedded_io::Io for Reader<'_, M, N> {
424 type Error = Infallible;
425 }
426
427 impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> {
428 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
429 where
430 Self: 'a;
431
432 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
433 Reader::read(self, buf).map(Ok)
434 }
435 }
436
437 impl<M: RawMutex, const N: usize> embedded_io::Io for Writer<'_, M, N> {
438 type Error = Infallible;
439 }
440
441 impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> {
442 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
443 where
444 Self: 'a;
445
446 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
447 Writer::write(self, buf).map(Ok)
448 }
449
450 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
451 where
452 Self: 'a;
453
454 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
455 futures_util::future::ready(Ok(()))
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use futures_executor::ThreadPool;
463 use futures_util::task::SpawnExt;
464 use static_cell::StaticCell;
465
466 use super::*;
467 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
468
469 fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
470 N - c.buffer.len()
471 }
472
473 #[test]
474 fn writing_once() {
475 let mut c = PipeState::<3>::new();
476 assert!(c.try_write(&[1]).is_ok());
477 assert_eq!(capacity(&c), 2);
478 }
479
480 #[test]
481 fn writing_when_full() {
482 let mut c = PipeState::<3>::new();
483 assert_eq!(c.try_write(&[42]), Ok(1));
484 assert_eq!(c.try_write(&[43]), Ok(1));
485 assert_eq!(c.try_write(&[44]), Ok(1));
486 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
487 assert_eq!(capacity(&c), 0);
488 }
489
490 #[test]
491 fn receiving_once_with_one_send() {
492 let mut c = PipeState::<3>::new();
493 assert!(c.try_write(&[42]).is_ok());
494 let mut buf = [0; 16];
495 assert_eq!(c.try_read(&mut buf), Ok(1));
496 assert_eq!(buf[0], 42);
497 assert_eq!(capacity(&c), 3);
498 }
499
500 #[test]
501 fn receiving_when_empty() {
502 let mut c = PipeState::<3>::new();
503 let mut buf = [0; 16];
504 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
505 assert_eq!(capacity(&c), 3);
506 }
507
508 #[test]
509 fn simple_send_and_receive() {
510 let c = Pipe::<NoopRawMutex, 3>::new();
511 assert!(c.try_write(&[42]).is_ok());
512 let mut buf = [0; 16];
513 assert_eq!(c.try_read(&mut buf), Ok(1));
514 assert_eq!(buf[0], 42);
515 }
516
517 #[test]
518 fn cloning() {
519 let c = Pipe::<NoopRawMutex, 3>::new();
520 let r1 = c.reader();
521 let w1 = c.writer();
522
523 let _ = r1.clone();
524 let _ = w1.clone();
525 }
526
527 #[futures_test::test]
528 async fn receiver_receives_given_try_write_async() {
529 let executor = ThreadPool::new().unwrap();
530
531 static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new();
532 let c = &*CHANNEL.init(Pipe::new());
533 let c2 = c;
534 let f = async move {
535 assert_eq!(c2.try_write(&[42]), Ok(1));
536 };
537 executor.spawn(f).unwrap();
538 let mut buf = [0; 16];
539 assert_eq!(c.read(&mut buf).await, 1);
540 assert_eq!(buf[0], 42);
541 }
542
543 #[futures_test::test]
544 async fn sender_send_completes_if_capacity() {
545 let c = Pipe::<CriticalSectionRawMutex, 1>::new();
546 c.write(&[42]).await;
547 let mut buf = [0; 16];
548 assert_eq!(c.read(&mut buf).await, 1);
549 assert_eq!(buf[0], 42);
550 }
551}
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
new file mode 100644
index 000000000..62a9e4763
--- /dev/null
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -0,0 +1,542 @@
1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2
3#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
24///
25/// Any published message can be read by all subscribers.
26/// A publisher can choose how it sends its message.
27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy_sync::pubsub::WaitResult;
38/// # use embassy_sync::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79 PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81 /// Create a new channel
82 pub const fn new() -> Self {
83 Self {
84 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85 }
86 }
87
88 /// Create a new subscriber. It will only receive messages that are published after its creation.
89 ///
90 /// If there are no subscriber slots left, an error will be returned.
91 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut();
94
95 if s.subscriber_count >= SUBS {
96 Err(Error::MaximumSubscribersReached)
97 } else {
98 s.subscriber_count += 1;
99 Ok(Subscriber(Sub::new(s.next_message_id, self)))
100 }
101 })
102 }
103
104 /// Create a new subscriber. It will only receive messages that are published after its creation.
105 ///
106 /// If there are no subscriber slots left, an error will be returned.
107 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108 self.inner.lock(|inner| {
109 let mut s = inner.borrow_mut();
110
111 if s.subscriber_count >= SUBS {
112 Err(Error::MaximumSubscribersReached)
113 } else {
114 s.subscriber_count += 1;
115 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116 }
117 })
118 }
119
120 /// Create a new publisher
121 ///
122 /// If there are no publisher slots left, an error will be returned.
123 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut();
126
127 if s.publisher_count >= PUBS {
128 Err(Error::MaximumPublishersReached)
129 } else {
130 s.publisher_count += 1;
131 Ok(Publisher(Pub::new(self)))
132 }
133 })
134 }
135
136 /// Create a new publisher
137 ///
138 /// If there are no publisher slots left, an error will be returned.
139 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140 self.inner.lock(|inner| {
141 let mut s = inner.borrow_mut();
142
143 if s.publisher_count >= PUBS {
144 Err(Error::MaximumPublishersReached)
145 } else {
146 s.publisher_count += 1;
147 Ok(DynPublisher(Pub::new(self)))
148 }
149 })
150 }
151
152 /// Create a new publisher that can only send immediate messages.
153 /// This kind of publisher does not take up a publisher slot.
154 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self))
156 }
157
158 /// Create a new publisher that can only send immediate messages.
159 /// This kind of publisher does not take up a publisher slot.
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self))
162 }
163}
164
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
169 self.inner.lock(|s| {
170 let mut s = s.borrow_mut();
171
172 // Check if we can read a message
173 match s.get_message(*next_message_id) {
174 // Yes, so we are done polling
175 Some(WaitResult::Message(message)) => {
176 *next_message_id += 1;
177 Poll::Ready(WaitResult::Message(message))
178 }
179 // No, so we need to reregister our waker and sleep again
180 None => {
181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker());
183 }
184 Poll::Pending
185 }
186 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
187 Some(WaitResult::Lagged(amount)) => {
188 *next_message_id += amount;
189 Poll::Ready(WaitResult::Lagged(amount))
190 }
191 }
192 })
193 }
194
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| {
197 let mut s = s.borrow_mut();
198 // Try to publish the message
199 match s.try_publish(message) {
200 // We did it, we are ready
201 Ok(()) => Ok(()),
202 // The queue is full, so we need to reregister our waker and go to sleep
203 Err(message) => {
204 if let Some(cx) = cx {
205 s.register_publisher_waker(cx.waker());
206 }
207 Err(message)
208 }
209 }
210 })
211 }
212
213 fn publish_immediate(&self, message: T) {
214 self.inner.lock(|s| {
215 let mut s = s.borrow_mut();
216 s.publish_immediate(message)
217 })
218 }
219
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| {
222 let mut s = s.borrow_mut();
223 s.unregister_subscriber(subscriber_next_message_id)
224 })
225 }
226
227 fn unregister_publisher(&self) {
228 self.inner.lock(|s| {
229 let mut s = s.borrow_mut();
230 s.unregister_publisher()
231 })
232 }
233}
234
235/// Internal state for the PubSub channel
236struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
237 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
238 queue: Deque<(T, usize), CAP>,
239 /// Every message has an id.
240 /// Don't worry, we won't run out.
241 /// If a million messages were published every second, then the ID's would run out in about 584942 years.
242 next_message_id: u64,
243 /// Collection of wakers for Subscribers that are waiting.
244 subscriber_wakers: MultiWakerRegistration<SUBS>,
245 /// Collection of wakers for Publishers that are waiting.
246 publisher_wakers: MultiWakerRegistration<PUBS>,
247 /// The amount of subscribers that are active
248 subscriber_count: usize,
249 /// The amount of publishers that are active
250 publisher_count: usize,
251}
252
253impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
254 /// Create a new internal channel state
255 const fn new() -> Self {
256 Self {
257 queue: Deque::new(),
258 next_message_id: 0,
259 subscriber_wakers: MultiWakerRegistration::new(),
260 publisher_wakers: MultiWakerRegistration::new(),
261 subscriber_count: 0,
262 publisher_count: 0,
263 }
264 }
265
266 fn try_publish(&mut self, message: T) -> Result<(), T> {
267 if self.subscriber_count == 0 {
268 // We don't need to publish anything because there is no one to receive it
269 return Ok(());
270 }
271
272 if self.queue.is_full() {
273 return Err(message);
274 }
275 // We just did a check for this
276 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
277
278 self.next_message_id += 1;
279
280 // Wake all of the subscribers
281 self.subscriber_wakers.wake();
282
283 Ok(())
284 }
285
286 fn publish_immediate(&mut self, message: T) {
287 // Make space in the queue if required
288 if self.queue.is_full() {
289 self.queue.pop_front();
290 }
291
292 // This will succeed because we made sure there is space
293 self.try_publish(message).ok().unwrap();
294 }
295
296 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
297 let start_id = self.next_message_id - self.queue.len() as u64;
298
299 if message_id < start_id {
300 return Some(WaitResult::Lagged(start_id - message_id));
301 }
302
303 let current_message_index = (message_id - start_id) as usize;
304
305 if current_message_index >= self.queue.len() {
306 return None;
307 }
308
309 // We've checked that the index is valid
310 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
311
312 // We're reading this item, so decrement the counter
313 queue_item.1 -= 1;
314 let message = queue_item.0.clone();
315
316 if current_message_index == 0 && queue_item.1 == 0 {
317 self.queue.pop_front();
318 self.publisher_wakers.wake();
319 }
320
321 Some(WaitResult::Message(message))
322 }
323
324 fn register_subscriber_waker(&mut self, waker: &Waker) {
325 match self.subscriber_wakers.register(waker) {
326 Ok(()) => {}
327 Err(_) => {
328 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
329 // We need to throw it away. It's a bit inefficient, but we can wake everything.
330 // Any future that is still active will simply reregister.
331 // This won't happen a lot, so it's ok.
332 self.subscriber_wakers.wake();
333 self.subscriber_wakers.register(waker).unwrap();
334 }
335 }
336 }
337
338 fn register_publisher_waker(&mut self, waker: &Waker) {
339 match self.publisher_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.publisher_wakers.wake();
347 self.publisher_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
353 self.subscriber_count -= 1;
354
355 // All messages that haven't been read yet by this subscriber must have their counter decremented
356 let start_id = self.next_message_id - self.queue.len() as u64;
357 if subscriber_next_message_id >= start_id {
358 let current_message_index = (subscriber_next_message_id - start_id) as usize;
359 self.queue
360 .iter_mut()
361 .skip(current_message_index)
362 .for_each(|(_, counter)| *counter -= 1);
363 }
364 }
365
366 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1;
368 }
369}
370
371/// Error type for the [PubSubChannel]
372#[derive(Debug, PartialEq, Eq, Clone)]
373#[cfg_attr(feature = "defmt", derive(defmt::Format))]
374pub enum Error {
375 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
376 /// the capacity of the channels must be increased.
377 MaximumSubscribersReached,
378 /// All publisher slots are used. To add another publisher, first another publisher must be dropped or
379 /// the capacity of the channels must be increased.
380 MaximumPublishersReached,
381}
382
383/// 'Middle level' behaviour of the pubsub channel.
384/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id.
387 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390
391 /// Try to publish a message to the queue.
392 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
394 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
395
396 /// Publish a message immediately
397 fn publish_immediate(&self, message: T);
398
399 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401
402 /// Let the channel know that a publisher has dropped
403 fn unregister_publisher(&self);
404}
405
406/// The result of the subscriber wait procedure
407#[derive(Debug, Clone, PartialEq, Eq)]
408#[cfg_attr(feature = "defmt", derive(defmt::Format))]
409pub enum WaitResult<T> {
410 /// The subscriber did not receive all messages and lagged by the given amount of messages.
411 /// (This is the amount of messages that were missed)
412 Lagged(u64),
413 /// A message was received
414 Message(T),
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::blocking_mutex::raw::NoopRawMutex;
421
422 #[futures_test::test]
423 async fn dyn_pub_sub_works() {
424 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
425
426 let mut sub0 = channel.dyn_subscriber().unwrap();
427 let mut sub1 = channel.dyn_subscriber().unwrap();
428 let pub0 = channel.dyn_publisher().unwrap();
429
430 pub0.publish(42).await;
431
432 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
433 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
434
435 assert_eq!(sub0.try_next_message(), None);
436 assert_eq!(sub1.try_next_message(), None);
437 }
438
439 #[futures_test::test]
440 async fn all_subscribers_receive() {
441 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
442
443 let mut sub0 = channel.subscriber().unwrap();
444 let mut sub1 = channel.subscriber().unwrap();
445 let pub0 = channel.publisher().unwrap();
446
447 pub0.publish(42).await;
448
449 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
450 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
451
452 assert_eq!(sub0.try_next_message(), None);
453 assert_eq!(sub1.try_next_message(), None);
454 }
455
456 #[futures_test::test]
457 async fn lag_when_queue_full_on_immediate_publish() {
458 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
459
460 let mut sub0 = channel.subscriber().unwrap();
461 let pub0 = channel.publisher().unwrap();
462
463 pub0.publish_immediate(42);
464 pub0.publish_immediate(43);
465 pub0.publish_immediate(44);
466 pub0.publish_immediate(45);
467 pub0.publish_immediate(46);
468 pub0.publish_immediate(47);
469
470 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
471 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
472 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
473 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
474 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
475 assert_eq!(sub0.try_next_message(), None);
476 }
477
478 #[test]
479 fn limited_subs_and_pubs() {
480 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
481
482 let sub0 = channel.subscriber();
483 let sub1 = channel.subscriber();
484 let sub2 = channel.subscriber();
485 let sub3 = channel.subscriber();
486 let sub4 = channel.subscriber();
487
488 assert!(sub0.is_ok());
489 assert!(sub1.is_ok());
490 assert!(sub2.is_ok());
491 assert!(sub3.is_ok());
492 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
493
494 drop(sub0);
495
496 let sub5 = channel.subscriber();
497 assert!(sub5.is_ok());
498
499 // publishers
500
501 let pub0 = channel.publisher();
502 let pub1 = channel.publisher();
503 let pub2 = channel.publisher();
504 let pub3 = channel.publisher();
505 let pub4 = channel.publisher();
506
507 assert!(pub0.is_ok());
508 assert!(pub1.is_ok());
509 assert!(pub2.is_ok());
510 assert!(pub3.is_ok());
511 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
512
513 drop(pub0);
514
515 let pub5 = channel.publisher();
516 assert!(pub5.is_ok());
517 }
518
519 #[test]
520 fn publisher_wait_on_full_queue() {
521 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
522
523 let pub0 = channel.publisher().unwrap();
524
525 // There are no subscribers, so the queue will never be full
526 assert_eq!(pub0.try_publish(0), Ok(()));
527 assert_eq!(pub0.try_publish(0), Ok(()));
528 assert_eq!(pub0.try_publish(0), Ok(()));
529 assert_eq!(pub0.try_publish(0), Ok(()));
530 assert_eq!(pub0.try_publish(0), Ok(()));
531
532 let sub0 = channel.subscriber().unwrap();
533
534 assert_eq!(pub0.try_publish(0), Ok(()));
535 assert_eq!(pub0.try_publish(0), Ok(()));
536 assert_eq!(pub0.try_publish(0), Ok(()));
537 assert_eq!(pub0.try_publish(0), Ok(()));
538 assert_eq!(pub0.try_publish(0), Err(0));
539
540 drop(sub0);
541 }
542}
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
new file mode 100644
index 000000000..705797f60
--- /dev/null
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -0,0 +1,182 @@
1//! Implementation of anything directly publisher related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A publisher to a channel
13pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The channel we are a publisher for
15 channel: &'a PSB,
16 _phantom: PhantomData<T>,
17}
18
19impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
20 pub(super) fn new(channel: &'a PSB) -> Self {
21 Self {
22 channel,
23 _phantom: Default::default(),
24 }
25 }
26
27 /// Publish a message right now even when the queue is full.
28 /// This may cause a subscriber to miss an older message.
29 pub fn publish_immediate(&self, message: T) {
30 self.channel.publish_immediate(message)
31 }
32
33 /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
34 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
35 PublisherWaitFuture {
36 message: Some(message),
37 publisher: self,
38 }
39 }
40
41 /// Publish a message if there is space in the message queue
42 pub fn try_publish(&self, message: T) -> Result<(), T> {
43 self.channel.publish_with_context(message, None)
44 }
45}
46
47impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
48 fn drop(&mut self) {
49 self.channel.unregister_publisher()
50 }
51}
52
53/// A publisher that holds a dynamic reference to the channel
54pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
55
56impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
57 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
58
59 fn deref(&self) -> &Self::Target {
60 &self.0
61 }
62}
63
64impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
65 fn deref_mut(&mut self) -> &mut Self::Target {
66 &mut self.0
67 }
68}
69
70/// A publisher that holds a generic reference to the channel
71pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
72 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
73);
74
75impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
76 for Publisher<'a, M, T, CAP, SUBS, PUBS>
77{
78 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
79
80 fn deref(&self) -> &Self::Target {
81 &self.0
82 }
83}
84
85impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
86 for Publisher<'a, M, T, CAP, SUBS, PUBS>
87{
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.0
90 }
91}
92
93/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
94/// (So an infinite amount is possible)
95pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
96 /// The channel we are a publisher for
97 channel: &'a PSB,
98 _phantom: PhantomData<T>,
99}
100
101impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
102 pub(super) fn new(channel: &'a PSB) -> Self {
103 Self {
104 channel,
105 _phantom: Default::default(),
106 }
107 }
108 /// Publish the message right now even when the queue is full.
109 /// This may cause a subscriber to miss an older message.
110 pub fn publish_immediate(&self, message: T) {
111 self.channel.publish_immediate(message)
112 }
113
114 /// Publish a message if there is space in the message queue
115 pub fn try_publish(&self, message: T) -> Result<(), T> {
116 self.channel.publish_with_context(message, None)
117 }
118}
119
120/// An immediate publisher that holds a dynamic reference to the channel
121pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
122
123impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
124 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
125
126 fn deref(&self) -> &Self::Target {
127 &self.0
128 }
129}
130
131impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// An immediate publisher that holds a generic reference to the channel
138pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
139 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
140);
141
142impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
143 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
144{
145 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
146
147 fn deref(&self) -> &Self::Target {
148 &self.0
149 }
150}
151
152impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
153 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
154{
155 fn deref_mut(&mut self) -> &mut Self::Target {
156 &mut self.0
157 }
158}
159
160/// Future for the publisher wait action
161pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
162 /// The message we need to publish
163 message: Option<T>,
164 publisher: &'s Pub<'a, PSB, T>,
165}
166
167impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
168 type Output = ();
169
170 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let message = self.message.take().unwrap();
172 match self.publisher.channel.publish_with_context(message, Some(cx)) {
173 Ok(()) => Poll::Ready(()),
174 Err(message) => {
175 self.message = Some(message);
176 Poll::Pending
177 }
178 }
179 }
180}
181
182impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
new file mode 100644
index 000000000..b9a2cbe18
--- /dev/null
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -0,0 +1,152 @@
1//! Implementation of anything directly subscriber related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A subscriber to a channel
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 /// The message id of the next message we are yet to receive
15 next_message_id: u64,
16 /// The channel we are a subscriber to
17 channel: &'a PSB,
18 _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23 Self {
24 next_message_id,
25 channel,
26 _phantom: Default::default(),
27 }
28 }
29
30 /// Wait for a published message
31 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32 SubscriberWaitFuture { subscriber: self }
33 }
34
35 /// Wait for a published message (ignoring lag results)
36 pub async fn next_message_pure(&mut self) -> T {
37 loop {
38 match self.next_message().await {
39 WaitResult::Lagged(_) => continue,
40 WaitResult::Message(message) => break message,
41 }
42 }
43 }
44
45 /// Try to see if there's a published message we haven't received yet.
46 ///
47 /// This function does not peek. The message is received if there is one.
48 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50 Poll::Ready(result) => Some(result),
51 Poll::Pending => None,
52 }
53 }
54
55 /// Try to see if there's a published message we haven't received yet (ignoring lag results).
56 ///
57 /// This function does not peek. The message is received if there is one.
58 pub fn try_next_message_pure(&mut self) -> Option<T> {
59 loop {
60 match self.try_next_message() {
61 Some(WaitResult::Lagged(_)) => continue,
62 Some(WaitResult::Message(message)) => break Some(message),
63 None => break None,
64 }
65 }
66 }
67}
68
69impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
70 fn drop(&mut self) {
71 self.channel.unregister_subscriber(self.next_message_id)
72 }
73}
74
75impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
76
77/// Warning: The stream implementation ignores lag results and returns all messages.
78/// This might miss some messages without you knowing it.
79impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
80 type Item = T;
81
82 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 match self
84 .channel
85 .get_message_with_context(&mut self.next_message_id, Some(cx))
86 {
87 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
88 Poll::Ready(WaitResult::Lagged(_)) => {
89 cx.waker().wake_by_ref();
90 Poll::Pending
91 }
92 Poll::Pending => Poll::Pending,
93 }
94 }
95}
96
97/// A subscriber that holds a dynamic reference to the channel
98pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
99
100impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
101 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.0
111 }
112}
113
114/// A subscriber that holds a generic reference to the channel
115pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
116 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
117);
118
119impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
120 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
121{
122 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
123
124 fn deref(&self) -> &Self::Target {
125 &self.0
126 }
127}
128
129impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
130 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
131{
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.0
134 }
135}
136
137/// Future for the subscriber wait action
138pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
139 subscriber: &'s mut Sub<'a, PSB, T>,
140}
141
142impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
143 type Output = WaitResult<T>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 self.subscriber
147 .channel
148 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
149 }
150}
151
152impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs
new file mode 100644
index 000000000..521084024
--- /dev/null
+++ b/embassy-sync/src/ring_buffer.rs
@@ -0,0 +1,146 @@
1pub struct RingBuffer<const N: usize> {
2 buf: [u8; N],
3 start: usize,
4 end: usize,
5 empty: bool,
6}
7
8impl<const N: usize> RingBuffer<N> {
9 pub const fn new() -> Self {
10 Self {
11 buf: [0; N],
12 start: 0,
13 end: 0,
14 empty: true,
15 }
16 }
17
18 pub fn push_buf(&mut self) -> &mut [u8] {
19 if self.start == self.end && !self.empty {
20 trace!(" ringbuf: push_buf empty");
21 return &mut self.buf[..0];
22 }
23
24 let n = if self.start <= self.end {
25 self.buf.len() - self.end
26 } else {
27 self.start - self.end
28 };
29
30 trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
31 &mut self.buf[self.end..self.end + n]
32 }
33
34 pub fn push(&mut self, n: usize) {
35 trace!(" ringbuf: push {:?}", n);
36 if n == 0 {
37 return;
38 }
39
40 self.end = self.wrap(self.end + n);
41 self.empty = false;
42 }
43
44 pub fn pop_buf(&mut self) -> &mut [u8] {
45 if self.empty {
46 trace!(" ringbuf: pop_buf empty");
47 return &mut self.buf[..0];
48 }
49
50 let n = if self.end <= self.start {
51 self.buf.len() - self.start
52 } else {
53 self.end - self.start
54 };
55
56 trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
57 &mut self.buf[self.start..self.start + n]
58 }
59
60 pub fn pop(&mut self, n: usize) {
61 trace!(" ringbuf: pop {:?}", n);
62 if n == 0 {
63 return;
64 }
65
66 self.start = self.wrap(self.start + n);
67 self.empty = self.start == self.end;
68 }
69
70 pub fn is_full(&self) -> bool {
71 self.start == self.end && !self.empty
72 }
73
74 pub fn is_empty(&self) -> bool {
75 self.empty
76 }
77
78 #[allow(unused)]
79 pub fn len(&self) -> usize {
80 if self.empty {
81 0
82 } else if self.start < self.end {
83 self.end - self.start
84 } else {
85 N + self.end - self.start
86 }
87 }
88
89 pub fn clear(&mut self) {
90 self.start = 0;
91 self.end = 0;
92 self.empty = true;
93 }
94
95 fn wrap(&self, n: usize) -> usize {
96 assert!(n <= self.buf.len());
97 if n == self.buf.len() {
98 0
99 } else {
100 n
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[test]
110 fn push_pop() {
111 let mut rb: RingBuffer<4> = RingBuffer::new();
112 let buf = rb.push_buf();
113 assert_eq!(4, buf.len());
114 buf[0] = 1;
115 buf[1] = 2;
116 buf[2] = 3;
117 buf[3] = 4;
118 rb.push(4);
119
120 let buf = rb.pop_buf();
121 assert_eq!(4, buf.len());
122 assert_eq!(1, buf[0]);
123 rb.pop(1);
124
125 let buf = rb.pop_buf();
126 assert_eq!(3, buf.len());
127 assert_eq!(2, buf[0]);
128 rb.pop(1);
129
130 let buf = rb.pop_buf();
131 assert_eq!(2, buf.len());
132 assert_eq!(3, buf[0]);
133 rb.pop(1);
134
135 let buf = rb.pop_buf();
136 assert_eq!(1, buf.len());
137 assert_eq!(4, buf[0]);
138 rb.pop(1);
139
140 let buf = rb.pop_buf();
141 assert_eq!(0, buf.len());
142
143 let buf = rb.push_buf();
144 assert_eq!(4, buf.len());
145 }
146}
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
new file mode 100644
index 000000000..3f665e388
--- /dev/null
+++ b/embassy-sync/src/signal.rs
@@ -0,0 +1,100 @@
1//! A synchronization primitive for passing the latest value to a task.
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::mem;
5use core::task::{Context, Poll, Waker};
6
7/// Single-slot signaling primitive.
8///
9/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except
10/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead
11/// of waiting for the receiver to pop the previous value.
12///
13/// It is useful for sending data between tasks when the receiver only cares about
14/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state"
15/// updates.
16///
17/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead.
18///
19/// Signals are generally declared as `static`s and then borrowed as required.
20///
21/// ```
22/// use embassy_sync::signal::Signal;
23///
24/// enum SomeCommand {
25/// On,
26/// Off,
27/// }
28///
29/// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new();
30/// ```
31pub struct Signal<T> {
32 state: UnsafeCell<State<T>>,
33}
34
35enum State<T> {
36 None,
37 Waiting(Waker),
38 Signaled(T),
39}
40
41unsafe impl<T: Send> Send for Signal<T> {}
42unsafe impl<T: Send> Sync for Signal<T> {}
43
44impl<T> Signal<T> {
45 /// Create a new `Signal`.
46 pub const fn new() -> Self {
47 Self {
48 state: UnsafeCell::new(State::None),
49 }
50 }
51}
52
53impl<T: Send> Signal<T> {
54 /// Mark this Signal as signaled.
55 pub fn signal(&self, val: T) {
56 critical_section::with(|_| unsafe {
57 let state = &mut *self.state.get();
58 if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) {
59 waker.wake();
60 }
61 })
62 }
63
64 /// Remove the queued value in this `Signal`, if any.
65 pub fn reset(&self) {
66 critical_section::with(|_| unsafe {
67 let state = &mut *self.state.get();
68 *state = State::None
69 })
70 }
71
72 /// Manually poll the Signal future.
73 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> {
74 critical_section::with(|_| unsafe {
75 let state = &mut *self.state.get();
76 match state {
77 State::None => {
78 *state = State::Waiting(cx.waker().clone());
79 Poll::Pending
80 }
81 State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending,
82 State::Waiting(_) => panic!("waker overflow"),
83 State::Signaled(_) => match mem::replace(state, State::None) {
84 State::Signaled(res) => Poll::Ready(res),
85 _ => unreachable!(),
86 },
87 }
88 })
89 }
90
91 /// Future that completes when this Signal has been signaled.
92 pub fn wait(&self) -> impl Future<Output = T> + '_ {
93 futures_util::future::poll_fn(move |cx| self.poll_wait(cx))
94 }
95
96 /// non-blocking method to check whether this signal has been signaled.
97 pub fn signaled(&self) -> bool {
98 critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_)))
99 }
100}
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs
new file mode 100644
index 000000000..6661a6b61
--- /dev/null
+++ b/embassy-sync/src/waitqueue/mod.rs
@@ -0,0 +1,7 @@
1//! Async low-level wait queues
2
3mod waker;
4pub use waker::*;
5
6mod multi_waker;
7pub use multi_waker::*;
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
new file mode 100644
index 000000000..325d2cb3a
--- /dev/null
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -0,0 +1,33 @@
1use core::task::Waker;
2
3use super::WakerRegistration;
4
5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: [WakerRegistration; N],
8}
9
10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance
12 pub const fn new() -> Self {
13 const WAKER: WakerRegistration = WakerRegistration::new();
14 Self { wakers: [WAKER; N] }
15 }
16
17 /// Register a waker. If the buffer is full the function returns it in the error
18 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> {
19 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) {
20 waker_slot.register(w);
21 Ok(())
22 } else {
23 Err(w)
24 }
25 }
26
27 /// Wake all registered wakers. This clears the buffer
28 pub fn wake(&mut self) {
29 for waker_slot in self.wakers.iter_mut() {
30 waker_slot.wake()
31 }
32 }
33}
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker.rs
new file mode 100644
index 000000000..64e300eb8
--- /dev/null
+++ b/embassy-sync/src/waitqueue/waker.rs
@@ -0,0 +1,92 @@
1use core::cell::Cell;
2use core::mem;
3use core::task::Waker;
4
5use crate::blocking_mutex::raw::CriticalSectionRawMutex;
6use crate::blocking_mutex::Mutex;
7
8/// Utility struct to register and wake a waker.
9#[derive(Debug)]
10pub struct WakerRegistration {
11 waker: Option<Waker>,
12}
13
14impl WakerRegistration {
15 /// Create a new `WakerRegistration`.
16 pub const fn new() -> Self {
17 Self { waker: None }
18 }
19
20 /// Register a waker. Overwrites the previous waker, if any.
21 pub fn register(&mut self, w: &Waker) {
22 match self.waker {
23 // Optimization: If both the old and new Wakers wake the same task, we can simply
24 // keep the old waker, skipping the clone. (In most executor implementations,
25 // cloning a waker is somewhat expensive, comparable to cloning an Arc).
26 Some(ref w2) if (w2.will_wake(w)) => {}
27 _ => {
28 // clone the new waker and store it
29 if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) {
30 // We had a waker registered for another task. Wake it, so the other task can
31 // reregister itself if it's still interested.
32 //
33 // If two tasks are waiting on the same thing concurrently, this will cause them
34 // to wake each other in a loop fighting over this WakerRegistration. This wastes
35 // CPU but things will still work.
36 //
37 // If the user wants to have two tasks waiting on the same thing they should use
38 // a more appropriate primitive that can store multiple wakers.
39 old_waker.wake()
40 }
41 }
42 }
43 }
44
45 /// Wake the registered waker, if any.
46 pub fn wake(&mut self) {
47 if let Some(w) = self.waker.take() {
48 w.wake()
49 }
50 }
51
52 /// Returns true if a waker is currently registered
53 pub fn occupied(&self) -> bool {
54 self.waker.is_some()
55 }
56}
57
58/// Utility struct to register and wake a waker.
59pub struct AtomicWaker {
60 waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>,
61}
62
63impl AtomicWaker {
64 /// Create a new `AtomicWaker`.
65 pub const fn new() -> Self {
66 Self {
67 waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
68 }
69 }
70
71 /// Register a waker. Overwrites the previous waker, if any.
72 pub fn register(&self, w: &Waker) {
73 critical_section::with(|cs| {
74 let cell = self.waker.borrow(cs);
75 cell.set(match cell.replace(None) {
76 Some(w2) if (w2.will_wake(w)) => Some(w2),
77 _ => Some(w.clone()),
78 })
79 })
80 }
81
82 /// Wake the registered waker, if any.
83 pub fn wake(&self) {
84 critical_section::with(|cs| {
85 let cell = self.waker.borrow(cs);
86 if let Some(w) = cell.replace(None) {
87 w.wake_by_ref();
88 cell.set(Some(w));
89 }
90 })
91 }
92}