aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src
diff options
context:
space:
mode:
Diffstat (limited to 'embassy-sync/src')
-rw-r--r--embassy-sync/src/channel.rs37
-rw-r--r--embassy-sync/src/fmt.rs3
-rw-r--r--embassy-sync/src/lib.rs3
-rw-r--r--embassy-sync/src/once_lock.rs236
-rw-r--r--embassy-sync/src/priority_channel.rs2
-rw-r--r--embassy-sync/src/ring_buffer.rs22
-rw-r--r--embassy-sync/src/signal.rs16
-rw-r--r--embassy-sync/src/zerocopy_channel.rs7
8 files changed, 303 insertions, 23 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index ff7129303..48f4dafd6 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -263,6 +263,12 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
263 } 263 }
264} 264}
265 265
266impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
267 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
268 Self { channel: value.channel }
269 }
270}
271
266/// Future returned by [`Channel::send`] and [`Sender::send`]. 272/// Future returned by [`Channel::send`] and [`Sender::send`].
267#[must_use = "futures do nothing unless you `.await` or poll them"] 273#[must_use = "futures do nothing unless you `.await` or poll them"]
268pub struct SendFuture<'ch, M, T, const N: usize> 274pub struct SendFuture<'ch, M, T, const N: usize>
@@ -321,6 +327,15 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
321 327
322impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} 328impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
323 329
330impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
331 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
332 Self {
333 channel: value.channel,
334 message: value.message,
335 }
336 }
337}
338
324pub(crate) trait DynamicChannel<T> { 339pub(crate) trait DynamicChannel<T> {
325 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; 340 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
326 341
@@ -507,6 +522,16 @@ where
507 Receiver { channel: self } 522 Receiver { channel: self }
508 } 523 }
509 524
525 /// Get a sender for this channel using dynamic dispatch.
526 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
527 DynamicSender { channel: self }
528 }
529
530 /// Get a receiver for this channel using dynamic dispatch.
531 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
532 DynamicReceiver { channel: self }
533 }
534
510 /// Send a value, waiting until there is capacity. 535 /// Send a value, waiting until there is capacity.
511 /// 536 ///
512 /// Sending completes when the value has been pushed to the channel's queue. 537 /// Sending completes when the value has been pushed to the channel's queue.
@@ -648,7 +673,7 @@ mod tests {
648 } 673 }
649 674
650 #[test] 675 #[test]
651 fn dynamic_dispatch() { 676 fn dynamic_dispatch_into() {
652 let c = Channel::<NoopRawMutex, u32, 3>::new(); 677 let c = Channel::<NoopRawMutex, u32, 3>::new();
653 let s: DynamicSender<'_, u32> = c.sender().into(); 678 let s: DynamicSender<'_, u32> = c.sender().into();
654 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 679 let r: DynamicReceiver<'_, u32> = c.receiver().into();
@@ -657,6 +682,16 @@ mod tests {
657 assert_eq!(r.try_receive().unwrap(), 1); 682 assert_eq!(r.try_receive().unwrap(), 1);
658 } 683 }
659 684
685 #[test]
686 fn dynamic_dispatch_constructor() {
687 let c = Channel::<NoopRawMutex, u32, 3>::new();
688 let s = c.dyn_sender();
689 let r = c.dyn_receiver();
690
691 assert!(s.try_send(1).is_ok());
692 assert_eq!(r.try_receive().unwrap(), 1);
693 }
694
660 #[futures_test::test] 695 #[futures_test::test]
661 async fn receiver_receives_given_try_send_async() { 696 async fn receiver_receives_given_try_send_async() {
662 let executor = ThreadPool::new().unwrap(); 697 let executor = ThreadPool::new().unwrap();
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
index 78e583c1c..2ac42c557 100644
--- a/embassy-sync/src/fmt.rs
+++ b/embassy-sync/src/fmt.rs
@@ -1,5 +1,5 @@
1#![macro_use] 1#![macro_use]
2#![allow(unused_macros)] 2#![allow(unused)]
3 3
4use core::fmt::{Debug, Display, LowerHex}; 4use core::fmt::{Debug, Display, LowerHex};
5 5
@@ -229,7 +229,6 @@ impl<T, E> Try for Result<T, E> {
229 } 229 }
230} 230}
231 231
232#[allow(unused)]
233pub(crate) struct Bytes<'a>(pub &'a [u8]); 232pub(crate) struct Bytes<'a>(pub &'a [u8]);
234 233
235impl<'a> Debug for Bytes<'a> { 234impl<'a> Debug for Bytes<'a> {
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index b0ccfde57..61b173e80 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -1,6 +1,4 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)]
2#![cfg_attr(nightly, feature(async_fn_in_trait, impl_trait_projections))]
3#![cfg_attr(nightly, allow(stable_features, unknown_lints))]
4#![allow(async_fn_in_trait)] 2#![allow(async_fn_in_trait)]
5#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
6#![doc = include_str!("../README.md")] 4#![doc = include_str!("../README.md")]
@@ -15,6 +13,7 @@ mod ring_buffer;
15pub mod blocking_mutex; 13pub mod blocking_mutex;
16pub mod channel; 14pub mod channel;
17pub mod mutex; 15pub mod mutex;
16pub mod once_lock;
18pub mod pipe; 17pub mod pipe;
19pub mod priority_channel; 18pub mod priority_channel;
20pub mod pubsub; 19pub mod pubsub;
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
new file mode 100644
index 000000000..31cc99711
--- /dev/null
+++ b/embassy-sync/src/once_lock.rs
@@ -0,0 +1,236 @@
1//! Syncronization primitive for initializing a value once, allowing others to await a reference to the value.
2
3use core::cell::Cell;
4use core::future::poll_fn;
5use core::mem::MaybeUninit;
6use core::sync::atomic::{AtomicBool, Ordering};
7use core::task::Poll;
8
9/// The `OnceLock` is a synchronization primitive that allows for
10/// initializing a value once, and allowing others to `.await` a
11/// reference to the value. This is useful for lazy initialization of
12/// a static value.
13///
14/// **Note**: this implementation uses a busy loop to poll the value,
15/// which is not as efficient as registering a dedicated `Waker`.
16/// However, the if the usecase for is to initialize a static variable
17/// relatively early in the program life cycle, it should be fine.
18///
19/// # Example
20/// ```
21/// use futures_executor::block_on;
22/// use embassy_sync::once_lock::OnceLock;
23///
24/// // Define a static value that will be lazily initialized
25/// static VALUE: OnceLock<u32> = OnceLock::new();
26///
27/// let f = async {
28///
29/// // Initialize the value
30/// let reference = VALUE.get_or_init(|| 20);
31/// assert_eq!(reference, &20);
32///
33/// // Wait for the value to be initialized
34/// // and get a static reference it
35/// assert_eq!(VALUE.get().await, &20);
36///
37/// };
38/// block_on(f)
39/// ```
40pub struct OnceLock<T> {
41 init: AtomicBool,
42 data: Cell<MaybeUninit<T>>,
43}
44
45unsafe impl<T> Sync for OnceLock<T> {}
46
47impl<T> OnceLock<T> {
48 /// Create a new uninitialized `OnceLock`.
49 pub const fn new() -> Self {
50 Self {
51 init: AtomicBool::new(false),
52 data: Cell::new(MaybeUninit::zeroed()),
53 }
54 }
55
56 /// Get a reference to the underlying value, waiting for it to be set.
57 /// If the value is already set, this will return immediately.
58 pub async fn get(&self) -> &T {
59 poll_fn(|cx| match self.try_get() {
60 Some(data) => Poll::Ready(data),
61 None => {
62 cx.waker().wake_by_ref();
63 Poll::Pending
64 }
65 })
66 .await
67 }
68
69 /// Try to get a reference to the underlying value if it exists.
70 pub fn try_get(&self) -> Option<&T> {
71 if self.init.load(Ordering::Relaxed) {
72 Some(unsafe { self.get_ref_unchecked() })
73 } else {
74 None
75 }
76 }
77
78 /// Set the underlying value. If the value is already set, this will return an error with the given value.
79 pub fn init(&self, value: T) -> Result<(), T> {
80 // Critical section is required to ensure that the value is
81 // not simultaniously initialized elsewhere at the same time.
82 critical_section::with(|_| {
83 // If the value is not set, set it and return Ok.
84 if !self.init.load(Ordering::Relaxed) {
85 self.data.set(MaybeUninit::new(value));
86 self.init.store(true, Ordering::Relaxed);
87 Ok(())
88
89 // Otherwise return an error with the given value.
90 } else {
91 Err(value)
92 }
93 })
94 }
95
96 /// Get a reference to the underlying value, initializing it if it does not exist.
97 pub fn get_or_init<F>(&self, f: F) -> &T
98 where
99 F: FnOnce() -> T,
100 {
101 // Critical section is required to ensure that the value is
102 // not simultaniously initialized elsewhere at the same time.
103 critical_section::with(|_| {
104 // If the value is not set, set it.
105 if !self.init.load(Ordering::Relaxed) {
106 self.data.set(MaybeUninit::new(f()));
107 self.init.store(true, Ordering::Relaxed);
108 }
109 });
110
111 // Return a reference to the value.
112 unsafe { self.get_ref_unchecked() }
113 }
114
115 /// Consume the `OnceLock`, returning the underlying value if it was initialized.
116 pub fn into_inner(self) -> Option<T> {
117 if self.init.load(Ordering::Relaxed) {
118 Some(unsafe { self.data.into_inner().assume_init() })
119 } else {
120 None
121 }
122 }
123
124 /// Take the underlying value if it was initialized, uninitializing the `OnceLock` in the process.
125 pub fn take(&mut self) -> Option<T> {
126 // If the value is set, uninitialize the lock and return the value.
127 critical_section::with(|_| {
128 if self.init.load(Ordering::Relaxed) {
129 let val = unsafe { self.data.replace(MaybeUninit::zeroed()).assume_init() };
130 self.init.store(false, Ordering::Relaxed);
131 Some(val)
132
133 // Otherwise return None.
134 } else {
135 None
136 }
137 })
138 }
139
140 /// Check if the value has been set.
141 pub fn is_set(&self) -> bool {
142 self.init.load(Ordering::Relaxed)
143 }
144
145 /// Get a reference to the underlying value.
146 /// # Safety
147 /// Must only be used if a value has been set.
148 unsafe fn get_ref_unchecked(&self) -> &T {
149 (*self.data.as_ptr()).assume_init_ref()
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn once_lock() {
159 let lock = OnceLock::new();
160 assert_eq!(lock.try_get(), None);
161 assert_eq!(lock.is_set(), false);
162
163 let v = 42;
164 assert_eq!(lock.init(v), Ok(()));
165 assert_eq!(lock.is_set(), true);
166 assert_eq!(lock.try_get(), Some(&v));
167 assert_eq!(lock.try_get(), Some(&v));
168
169 let v = 43;
170 assert_eq!(lock.init(v), Err(v));
171 assert_eq!(lock.is_set(), true);
172 assert_eq!(lock.try_get(), Some(&42));
173 }
174
175 #[test]
176 fn once_lock_get_or_init() {
177 let lock = OnceLock::new();
178 assert_eq!(lock.try_get(), None);
179 assert_eq!(lock.is_set(), false);
180
181 let v = lock.get_or_init(|| 42);
182 assert_eq!(v, &42);
183 assert_eq!(lock.is_set(), true);
184 assert_eq!(lock.try_get(), Some(&42));
185
186 let v = lock.get_or_init(|| 43);
187 assert_eq!(v, &42);
188 assert_eq!(lock.is_set(), true);
189 assert_eq!(lock.try_get(), Some(&42));
190 }
191
192 #[test]
193 fn once_lock_static() {
194 static LOCK: OnceLock<i32> = OnceLock::new();
195
196 let v: &'static i32 = LOCK.get_or_init(|| 42);
197 assert_eq!(v, &42);
198
199 let v: &'static i32 = LOCK.get_or_init(|| 43);
200 assert_eq!(v, &42);
201 }
202
203 #[futures_test::test]
204 async fn once_lock_async() {
205 static LOCK: OnceLock<i32> = OnceLock::new();
206
207 assert!(LOCK.init(42).is_ok());
208
209 let v: &'static i32 = LOCK.get().await;
210 assert_eq!(v, &42);
211 }
212
213 #[test]
214 fn once_lock_into_inner() {
215 let lock: OnceLock<i32> = OnceLock::new();
216
217 let v = lock.get_or_init(|| 42);
218 assert_eq!(v, &42);
219
220 assert_eq!(lock.into_inner(), Some(42));
221 }
222
223 #[test]
224 fn once_lock_take_init() {
225 let mut lock: OnceLock<i32> = OnceLock::new();
226
227 assert_eq!(lock.get_or_init(|| 42), &42);
228 assert_eq!(lock.is_set(), true);
229
230 assert_eq!(lock.take(), Some(42));
231 assert_eq!(lock.is_set(), false);
232
233 assert_eq!(lock.get_or_init(|| 43), &43);
234 assert_eq!(lock.is_set(), true);
235 }
236}
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index bd75c0135..e77678c24 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -325,7 +325,7 @@ where
325/// 325///
326/// Sent data may be reordered based on their priorty within the channel. 326/// Sent data may be reordered based on their priorty within the channel.
327/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] 327/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
328/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be recieved as `[3, 2, 1]`. 328/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
329pub struct PriorityChannel<M, T, K, const N: usize> 329pub struct PriorityChannel<M, T, K, const N: usize>
330where 330where
331 T: Ord, 331 T: Ord,
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs
index d95ffa7c9..81e60c42b 100644
--- a/embassy-sync/src/ring_buffer.rs
+++ b/embassy-sync/src/ring_buffer.rs
@@ -3,7 +3,7 @@ use core::ops::Range;
3pub struct RingBuffer<const N: usize> { 3pub struct RingBuffer<const N: usize> {
4 start: usize, 4 start: usize,
5 end: usize, 5 end: usize,
6 empty: bool, 6 full: bool,
7} 7}
8 8
9impl<const N: usize> RingBuffer<N> { 9impl<const N: usize> RingBuffer<N> {
@@ -11,13 +11,13 @@ impl<const N: usize> RingBuffer<N> {
11 Self { 11 Self {
12 start: 0, 12 start: 0,
13 end: 0, 13 end: 0,
14 empty: true, 14 full: false,
15 } 15 }
16 } 16 }
17 17
18 pub fn push_buf(&mut self) -> Range<usize> { 18 pub fn push_buf(&mut self) -> Range<usize> {
19 if self.start == self.end && !self.empty { 19 if self.is_full() {
20 trace!(" ringbuf: push_buf empty"); 20 trace!(" ringbuf: push_buf full");
21 return 0..0; 21 return 0..0;
22 } 22 }
23 23
@@ -38,11 +38,11 @@ impl<const N: usize> RingBuffer<N> {
38 } 38 }
39 39
40 self.end = self.wrap(self.end + n); 40 self.end = self.wrap(self.end + n);
41 self.empty = false; 41 self.full = self.start == self.end;
42 } 42 }
43 43
44 pub fn pop_buf(&mut self) -> Range<usize> { 44 pub fn pop_buf(&mut self) -> Range<usize> {
45 if self.empty { 45 if self.is_empty() {
46 trace!(" ringbuf: pop_buf empty"); 46 trace!(" ringbuf: pop_buf empty");
47 return 0..0; 47 return 0..0;
48 } 48 }
@@ -64,20 +64,20 @@ impl<const N: usize> RingBuffer<N> {
64 } 64 }
65 65
66 self.start = self.wrap(self.start + n); 66 self.start = self.wrap(self.start + n);
67 self.empty = self.start == self.end; 67 self.full = false;
68 } 68 }
69 69
70 pub fn is_full(&self) -> bool { 70 pub fn is_full(&self) -> bool {
71 self.start == self.end && !self.empty 71 self.full
72 } 72 }
73 73
74 pub fn is_empty(&self) -> bool { 74 pub fn is_empty(&self) -> bool {
75 self.empty 75 self.start == self.end && !self.full
76 } 76 }
77 77
78 #[allow(unused)] 78 #[allow(unused)]
79 pub fn len(&self) -> usize { 79 pub fn len(&self) -> usize {
80 if self.empty { 80 if self.is_empty() {
81 0 81 0
82 } else if self.start < self.end { 82 } else if self.start < self.end {
83 self.end - self.start 83 self.end - self.start
@@ -89,7 +89,7 @@ impl<const N: usize> RingBuffer<N> {
89 pub fn clear(&mut self) { 89 pub fn clear(&mut self) {
90 self.start = 0; 90 self.start = 0;
91 self.end = 0; 91 self.end = 0;
92 self.empty = true; 92 self.full = false;
93 } 93 }
94 94
95 fn wrap(&self, n: usize) -> usize { 95 fn wrap(&self, n: usize) -> usize {
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index bea67d8be..520f1a896 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -111,7 +111,21 @@ where
111 poll_fn(move |cx| self.poll_wait(cx)) 111 poll_fn(move |cx| self.poll_wait(cx))
112 } 112 }
113 113
114 /// non-blocking method to check whether this signal has been signaled. 114 /// non-blocking method to try and take the signal value.
115 pub fn try_take(&self) -> Option<T> {
116 self.state.lock(|cell| {
117 let state = cell.replace(State::None);
118 match state {
119 State::Signaled(res) => Some(res),
120 state => {
121 cell.set(state);
122 None
123 }
124 }
125 })
126 }
127
128 /// non-blocking method to check whether this signal has been signaled. This does not clear the signal.
115 pub fn signaled(&self) -> bool { 129 pub fn signaled(&self) -> bool {
116 self.state.lock(|cell| { 130 self.state.lock(|cell| {
117 let state = cell.replace(State::None); 131 let state = cell.replace(State::None);
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
index f704cbd5d..cfce9a571 100644
--- a/embassy-sync/src/zerocopy_channel.rs
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -1,10 +1,7 @@
1//! A zero-copy queue for sending values between asynchronous tasks. 1//! A zero-copy queue for sending values between asynchronous tasks.
2//! 2//!
3//! It can be used concurrently by multiple producers (senders) and multiple 3//! It can be used concurrently by a producer (sender) and a
4//! consumers (receivers), i.e. it is an "MPMC channel". 4//! consumer (receiver), i.e. it is an "SPSC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//! 5//!
9//! This queue takes a Mutex type so that various 6//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used 7//! targets can be attained. For example, a ThreadModeMutex can be used