aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync
diff options
context:
space:
mode:
authorPeter Krull <[email protected]>2024-09-23 19:02:59 +0200
committerGitHub <[email protected]>2024-09-23 19:02:59 +0200
commita2c473306f4a7c8e99add2546450ab3a7a97436e (patch)
tree5522a708e492db7d4632dc0a56fe5057244f03f0 /embassy-sync
parente02a987bafd4f0fcf9d80e7c4f6e1504b8b02cec (diff)
parent2935290a6222536d6341103f91bfd732165d3862 (diff)
Merge branch 'embassy-rs:main' into multi-signal
Diffstat (limited to 'embassy-sync')
-rw-r--r--embassy-sync/CHANGELOG.md16
-rw-r--r--embassy-sync/Cargo.toml4
-rw-r--r--embassy-sync/README.md3
-rw-r--r--embassy-sync/build.rs32
-rw-r--r--embassy-sync/build_common.rs94
-rw-r--r--embassy-sync/src/channel.rs128
-rw-r--r--embassy-sync/src/fmt.rs34
-rw-r--r--embassy-sync/src/lazy_lock.rs152
-rw-r--r--embassy-sync/src/lib.rs5
-rw-r--r--embassy-sync/src/mutex.rs209
-rw-r--r--embassy-sync/src/once_lock.rs236
-rw-r--r--embassy-sync/src/pipe.rs2
-rw-r--r--embassy-sync/src/priority_channel.rs54
-rw-r--r--embassy-sync/src/pubsub/mod.rs174
-rw-r--r--embassy-sync/src/pubsub/publisher.rs68
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs36
-rw-r--r--embassy-sync/src/semaphore.rs772
-rw-r--r--embassy-sync/src/signal.rs4
-rw-r--r--embassy-sync/src/waitqueue/multi_waker.rs2
-rw-r--r--embassy-sync/src/zerocopy_channel.rs7
20 files changed, 1923 insertions, 109 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md
index e7db97ef7..8f2d26fe0 100644
--- a/embassy-sync/CHANGELOG.md
+++ b/embassy-sync/CHANGELOG.md
@@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), 5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7 7
8## Unreleased
9
10- Add LazyLock sync primitive.
11
12## 0.6.0 - 2024-05-29
13
14- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `Channel`.
15- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PriorityChannel`.
16- Add `capacity`, `free_capacity`, `clear`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`.
17- Made `PubSubBehavior` sealed
18 - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)`
19- Add OnceLock sync primitive.
20- Add constructor for DynamicChannel
21- Add ready_to_receive functions to Channel and Receiver.
22
8## 0.5.0 - 2023-12-04 23## 0.5.0 - 2023-12-04
9 24
10- Add a PriorityChannel. 25- Add a PriorityChannel.
@@ -35,7 +50,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
35- Remove unnecessary uses of `atomic-polyfill` 50- Remove unnecessary uses of `atomic-polyfill`
36- Add `#[must_use]` to all futures. 51- Add `#[must_use]` to all futures.
37 52
38
39## 0.1.0 - 2022-08-26 53## 0.1.0 - 2022-08-26
40 54
41- First release 55- First release
diff --git a/embassy-sync/Cargo.toml b/embassy-sync/Cargo.toml
index 85673026c..7b7d2bf8e 100644
--- a/embassy-sync/Cargo.toml
+++ b/embassy-sync/Cargo.toml
@@ -1,6 +1,6 @@
1[package] 1[package]
2name = "embassy-sync" 2name = "embassy-sync"
3version = "0.5.0" 3version = "0.6.0"
4edition = "2021" 4edition = "2021"
5description = "no-std, no-alloc synchronization primitives with async support" 5description = "no-std, no-alloc synchronization primitives with async support"
6repository = "https://github.com/embassy-rs/embassy" 6repository = "https://github.com/embassy-rs/embassy"
@@ -20,7 +20,7 @@ src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-sync/
20target = "thumbv7em-none-eabi" 20target = "thumbv7em-none-eabi"
21 21
22[features] 22[features]
23std = [] 23std = ["critical-section/std"]
24turbowakers = [] 24turbowakers = []
25 25
26[dependencies] 26[dependencies]
diff --git a/embassy-sync/README.md b/embassy-sync/README.md
index c2e13799e..97a663d6d 100644
--- a/embassy-sync/README.md
+++ b/embassy-sync/README.md
@@ -5,7 +5,7 @@ An [Embassy](https://embassy.dev) project.
5Synchronization primitives and data structures with async support: 5Synchronization primitives and data structures with async support:
6 6
7- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. 7- [`Channel`](channel::Channel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer.
8- [`PriorityChannel`](channel::priority::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are sifted to the front of the channel. 8- [`PriorityChannel`](channel::priority_channel::PriorityChannel) - A Multiple Producer Multiple Consumer (MPMC) channel. Each message is only received by a single consumer. Higher priority items are shifted to the front of the channel.
9- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers. 9- [`PubSubChannel`](pubsub::PubSubChannel) - A broadcast channel (publish-subscribe) channel. Each message is received by all consumers.
10- [`Signal`](signal::Signal) - Signalling latest value to a single consumer. 10- [`Signal`](signal::Signal) - Signalling latest value to a single consumer.
11- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks. 11- [`Mutex`](mutex::Mutex) - Mutex for synchronizing state between asynchronous tasks.
@@ -13,6 +13,7 @@ Synchronization primitives and data structures with async support:
13- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`. 13- [`WakerRegistration`](waitqueue::WakerRegistration) - Utility to register and wake a `Waker`.
14- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API. 14- [`AtomicWaker`](waitqueue::AtomicWaker) - A variant of `WakerRegistration` accessible using a non-mut API.
15- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s. 15- [`MultiWakerRegistration`](waitqueue::MultiWakerRegistration) - Utility registering and waking multiple `Waker`'s.
16- [`LazyLock`](lazy_lock::LazyLock) - A value which is initialized on the first access
16 17
17## Interoperability 18## Interoperability
18 19
diff --git a/embassy-sync/build.rs b/embassy-sync/build.rs
index afd76dad1..ecd2c0c9f 100644
--- a/embassy-sync/build.rs
+++ b/embassy-sync/build.rs
@@ -1,31 +1,7 @@
1use std::env; 1#[path = "./build_common.rs"]
2mod common;
2 3
3fn main() { 4fn main() {
4 println!("cargo:rerun-if-changed=build.rs"); 5 let mut cfgs = common::CfgSet::new();
5 6 common::set_target_cfgs(&mut cfgs);
6 let target = env::var("TARGET").unwrap();
7
8 if target.starts_with("thumbv6m-") {
9 println!("cargo:rustc-cfg=cortex_m");
10 println!("cargo:rustc-cfg=armv6m");
11 } else if target.starts_with("thumbv7m-") {
12 println!("cargo:rustc-cfg=cortex_m");
13 println!("cargo:rustc-cfg=armv7m");
14 } else if target.starts_with("thumbv7em-") {
15 println!("cargo:rustc-cfg=cortex_m");
16 println!("cargo:rustc-cfg=armv7m");
17 println!("cargo:rustc-cfg=armv7em"); // (not currently used)
18 } else if target.starts_with("thumbv8m.base") {
19 println!("cargo:rustc-cfg=cortex_m");
20 println!("cargo:rustc-cfg=armv8m");
21 println!("cargo:rustc-cfg=armv8m_base");
22 } else if target.starts_with("thumbv8m.main") {
23 println!("cargo:rustc-cfg=cortex_m");
24 println!("cargo:rustc-cfg=armv8m");
25 println!("cargo:rustc-cfg=armv8m_main");
26 }
27
28 if target.ends_with("-eabihf") {
29 println!("cargo:rustc-cfg=has_fpu");
30 }
31} 7}
diff --git a/embassy-sync/build_common.rs b/embassy-sync/build_common.rs
new file mode 100644
index 000000000..4f24e6d37
--- /dev/null
+++ b/embassy-sync/build_common.rs
@@ -0,0 +1,94 @@
1// NOTE: this file is copy-pasted between several Embassy crates, because there is no
2// straightforward way to share this code:
3// - it cannot be placed into the root of the repo and linked from each build.rs using `#[path =
4// "../build_common.rs"]`, because `cargo publish` requires that all files published with a crate
5// reside in the crate's directory,
6// - it cannot be symlinked from `embassy-xxx/build_common.rs` to `../build_common.rs`, because
7// symlinks don't work on Windows.
8
9use std::collections::HashSet;
10use std::env;
11
12/// Helper for emitting cargo instruction for enabling configs (`cargo:rustc-cfg=X`) and declaring
13/// them (`cargo:rust-check-cfg=cfg(X)`).
14#[derive(Debug)]
15pub struct CfgSet {
16 enabled: HashSet<String>,
17 declared: HashSet<String>,
18}
19
20impl CfgSet {
21 pub fn new() -> Self {
22 Self {
23 enabled: HashSet::new(),
24 declared: HashSet::new(),
25 }
26 }
27
28 /// Enable a config, which can then be used in `#[cfg(...)]` for conditional compilation.
29 ///
30 /// All configs that can potentially be enabled should be unconditionally declared using
31 /// [`Self::declare()`].
32 pub fn enable(&mut self, cfg: impl AsRef<str>) {
33 if self.enabled.insert(cfg.as_ref().to_owned()) {
34 println!("cargo:rustc-cfg={}", cfg.as_ref());
35 }
36 }
37
38 pub fn enable_all(&mut self, cfgs: &[impl AsRef<str>]) {
39 for cfg in cfgs.iter() {
40 self.enable(cfg.as_ref());
41 }
42 }
43
44 /// Declare a valid config for conditional compilation, without enabling it.
45 ///
46 /// This enables rustc to check that the configs in `#[cfg(...)]` attributes are valid.
47 pub fn declare(&mut self, cfg: impl AsRef<str>) {
48 if self.declared.insert(cfg.as_ref().to_owned()) {
49 println!("cargo:rustc-check-cfg=cfg({})", cfg.as_ref());
50 }
51 }
52
53 pub fn declare_all(&mut self, cfgs: &[impl AsRef<str>]) {
54 for cfg in cfgs.iter() {
55 self.declare(cfg.as_ref());
56 }
57 }
58
59 pub fn set(&mut self, cfg: impl Into<String>, enable: bool) {
60 let cfg = cfg.into();
61 if enable {
62 self.enable(cfg.clone());
63 }
64 self.declare(cfg);
65 }
66}
67
68/// Sets configs that describe the target platform.
69pub fn set_target_cfgs(cfgs: &mut CfgSet) {
70 let target = env::var("TARGET").unwrap();
71
72 if target.starts_with("thumbv6m-") {
73 cfgs.enable_all(&["cortex_m", "armv6m"]);
74 } else if target.starts_with("thumbv7m-") {
75 cfgs.enable_all(&["cortex_m", "armv7m"]);
76 } else if target.starts_with("thumbv7em-") {
77 cfgs.enable_all(&["cortex_m", "armv7m", "armv7em"]);
78 } else if target.starts_with("thumbv8m.base") {
79 cfgs.enable_all(&["cortex_m", "armv8m", "armv8m_base"]);
80 } else if target.starts_with("thumbv8m.main") {
81 cfgs.enable_all(&["cortex_m", "armv8m", "armv8m_main"]);
82 }
83 cfgs.declare_all(&[
84 "cortex_m",
85 "armv6m",
86 "armv7m",
87 "armv7em",
88 "armv8m",
89 "armv8m_base",
90 "armv8m_main",
91 ]);
92
93 cfgs.set("has_fpu", target.ends_with("-eabihf"));
94}
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs
index ff7129303..55ac5fb66 100644
--- a/embassy-sync/src/channel.rs
+++ b/embassy-sync/src/channel.rs
@@ -42,7 +42,7 @@ where
42 M: RawMutex, 42 M: RawMutex,
43{ 43{
44 fn clone(&self) -> Self { 44 fn clone(&self) -> Self {
45 Sender { channel: self.channel } 45 *self
46 } 46 }
47} 47}
48 48
@@ -81,7 +81,7 @@ pub struct DynamicSender<'ch, T> {
81 81
82impl<'ch, T> Clone for DynamicSender<'ch, T> { 82impl<'ch, T> Clone for DynamicSender<'ch, T> {
83 fn clone(&self) -> Self { 83 fn clone(&self) -> Self {
84 DynamicSender { channel: self.channel } 84 *self
85 } 85 }
86} 86}
87 87
@@ -135,7 +135,7 @@ where
135 M: RawMutex, 135 M: RawMutex,
136{ 136{
137 fn clone(&self) -> Self { 137 fn clone(&self) -> Self {
138 Receiver { channel: self.channel } 138 *self
139 } 139 }
140} 140}
141 141
@@ -152,6 +152,13 @@ where
152 self.channel.receive() 152 self.channel.receive()
153 } 153 }
154 154
155 /// Is a value ready to be received in the channel
156 ///
157 /// See [`Channel::ready_to_receive()`].
158 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
159 self.channel.ready_to_receive()
160 }
161
155 /// Attempt to immediately receive the next value. 162 /// Attempt to immediately receive the next value.
156 /// 163 ///
157 /// See [`Channel::try_receive()`] 164 /// See [`Channel::try_receive()`]
@@ -181,7 +188,7 @@ pub struct DynamicReceiver<'ch, T> {
181 188
182impl<'ch, T> Clone for DynamicReceiver<'ch, T> { 189impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
183 fn clone(&self) -> Self { 190 fn clone(&self) -> Self {
184 DynamicReceiver { channel: self.channel } 191 *self
185 } 192 }
186} 193}
187 194
@@ -246,6 +253,26 @@ where
246 } 253 }
247} 254}
248 255
256/// Future returned by [`Channel::ready_to_receive`] and [`Receiver::ready_to_receive`].
257#[must_use = "futures do nothing unless you `.await` or poll them"]
258pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
259where
260 M: RawMutex,
261{
262 channel: &'ch Channel<M, T, N>,
263}
264
265impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
266where
267 M: RawMutex,
268{
269 type Output = ();
270
271 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
272 self.channel.poll_ready_to_receive(cx)
273 }
274}
275
249/// Future returned by [`DynamicReceiver::receive`]. 276/// Future returned by [`DynamicReceiver::receive`].
250#[must_use = "futures do nothing unless you `.await` or poll them"] 277#[must_use = "futures do nothing unless you `.await` or poll them"]
251pub struct DynamicReceiveFuture<'ch, T> { 278pub struct DynamicReceiveFuture<'ch, T> {
@@ -263,6 +290,12 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
263 } 290 }
264} 291}
265 292
293impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
294 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
295 Self { channel: value.channel }
296 }
297}
298
266/// Future returned by [`Channel::send`] and [`Sender::send`]. 299/// Future returned by [`Channel::send`] and [`Sender::send`].
267#[must_use = "futures do nothing unless you `.await` or poll them"] 300#[must_use = "futures do nothing unless you `.await` or poll them"]
268pub struct SendFuture<'ch, M, T, const N: usize> 301pub struct SendFuture<'ch, M, T, const N: usize>
@@ -321,6 +354,15 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
321 354
322impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} 355impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
323 356
357impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
358 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
359 Self {
360 channel: value.channel,
361 message: value.message,
362 }
363 }
364}
365
324pub(crate) trait DynamicChannel<T> { 366pub(crate) trait DynamicChannel<T> {
325 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; 367 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
326 368
@@ -434,6 +476,22 @@ impl<T, const N: usize> ChannelState<T, N> {
434 Poll::Pending 476 Poll::Pending
435 } 477 }
436 } 478 }
479
480 fn clear(&mut self) {
481 self.queue.clear();
482 }
483
484 fn len(&self) -> usize {
485 self.queue.len()
486 }
487
488 fn is_empty(&self) -> bool {
489 self.queue.is_empty()
490 }
491
492 fn is_full(&self) -> bool {
493 self.queue.is_full()
494 }
437} 495}
438 496
439/// A bounded channel for communicating between asynchronous tasks 497/// A bounded channel for communicating between asynchronous tasks
@@ -507,6 +565,16 @@ where
507 Receiver { channel: self } 565 Receiver { channel: self }
508 } 566 }
509 567
568 /// Get a sender for this channel using dynamic dispatch.
569 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
570 DynamicSender { channel: self }
571 }
572
573 /// Get a receiver for this channel using dynamic dispatch.
574 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
575 DynamicReceiver { channel: self }
576 }
577
510 /// Send a value, waiting until there is capacity. 578 /// Send a value, waiting until there is capacity.
511 /// 579 ///
512 /// Sending completes when the value has been pushed to the channel's queue. 580 /// Sending completes when the value has been pushed to the channel's queue.
@@ -540,6 +608,14 @@ where
540 ReceiveFuture { channel: self } 608 ReceiveFuture { channel: self }
541 } 609 }
542 610
611 /// Is a value ready to be received in the channel
612 ///
613 /// If there are no messages in the channel's buffer, this method will
614 /// wait until there is at least one
615 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
616 ReceiveReadyFuture { channel: self }
617 }
618
543 /// Attempt to immediately receive a message. 619 /// Attempt to immediately receive a message.
544 /// 620 ///
545 /// This method will either receive a message from the channel immediately or return an error 621 /// This method will either receive a message from the channel immediately or return an error
@@ -547,6 +623,38 @@ where
547 pub fn try_receive(&self) -> Result<T, TryReceiveError> { 623 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
548 self.lock(|c| c.try_receive()) 624 self.lock(|c| c.try_receive())
549 } 625 }
626
627 /// Returns the maximum number of elements the channel can hold.
628 pub const fn capacity(&self) -> usize {
629 N
630 }
631
632 /// Returns the free capacity of the channel.
633 ///
634 /// This is equivalent to `capacity() - len()`
635 pub fn free_capacity(&self) -> usize {
636 N - self.len()
637 }
638
639 /// Clears all elements in the channel.
640 pub fn clear(&self) {
641 self.lock(|c| c.clear());
642 }
643
644 /// Returns the number of elements currently in the channel.
645 pub fn len(&self) -> usize {
646 self.lock(|c| c.len())
647 }
648
649 /// Returns whether the channel is empty.
650 pub fn is_empty(&self) -> bool {
651 self.lock(|c| c.is_empty())
652 }
653
654 /// Returns whether the channel is full.
655 pub fn is_full(&self) -> bool {
656 self.lock(|c| c.is_full())
657 }
550} 658}
551 659
552/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the 660/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
@@ -648,7 +756,7 @@ mod tests {
648 } 756 }
649 757
650 #[test] 758 #[test]
651 fn dynamic_dispatch() { 759 fn dynamic_dispatch_into() {
652 let c = Channel::<NoopRawMutex, u32, 3>::new(); 760 let c = Channel::<NoopRawMutex, u32, 3>::new();
653 let s: DynamicSender<'_, u32> = c.sender().into(); 761 let s: DynamicSender<'_, u32> = c.sender().into();
654 let r: DynamicReceiver<'_, u32> = c.receiver().into(); 762 let r: DynamicReceiver<'_, u32> = c.receiver().into();
@@ -657,6 +765,16 @@ mod tests {
657 assert_eq!(r.try_receive().unwrap(), 1); 765 assert_eq!(r.try_receive().unwrap(), 1);
658 } 766 }
659 767
768 #[test]
769 fn dynamic_dispatch_constructor() {
770 let c = Channel::<NoopRawMutex, u32, 3>::new();
771 let s = c.dyn_sender();
772 let r = c.dyn_receiver();
773
774 assert!(s.try_send(1).is_ok());
775 assert_eq!(r.try_receive().unwrap(), 1);
776 }
777
660 #[futures_test::test] 778 #[futures_test::test]
661 async fn receiver_receives_given_try_send_async() { 779 async fn receiver_receives_given_try_send_async() {
662 let executor = ThreadPool::new().unwrap(); 780 let executor = ThreadPool::new().unwrap();
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs
index 78e583c1c..8ca61bc39 100644
--- a/embassy-sync/src/fmt.rs
+++ b/embassy-sync/src/fmt.rs
@@ -1,11 +1,12 @@
1#![macro_use] 1#![macro_use]
2#![allow(unused_macros)] 2#![allow(unused)]
3 3
4use core::fmt::{Debug, Display, LowerHex}; 4use core::fmt::{Debug, Display, LowerHex};
5 5
6#[cfg(all(feature = "defmt", feature = "log"))] 6#[cfg(all(feature = "defmt", feature = "log"))]
7compile_error!("You may not enable both `defmt` and `log` features."); 7compile_error!("You may not enable both `defmt` and `log` features.");
8 8
9#[collapse_debuginfo(yes)]
9macro_rules! assert { 10macro_rules! assert {
10 ($($x:tt)*) => { 11 ($($x:tt)*) => {
11 { 12 {
@@ -17,6 +18,7 @@ macro_rules! assert {
17 }; 18 };
18} 19}
19 20
21#[collapse_debuginfo(yes)]
20macro_rules! assert_eq { 22macro_rules! assert_eq {
21 ($($x:tt)*) => { 23 ($($x:tt)*) => {
22 { 24 {
@@ -28,6 +30,7 @@ macro_rules! assert_eq {
28 }; 30 };
29} 31}
30 32
33#[collapse_debuginfo(yes)]
31macro_rules! assert_ne { 34macro_rules! assert_ne {
32 ($($x:tt)*) => { 35 ($($x:tt)*) => {
33 { 36 {
@@ -39,6 +42,7 @@ macro_rules! assert_ne {
39 }; 42 };
40} 43}
41 44
45#[collapse_debuginfo(yes)]
42macro_rules! debug_assert { 46macro_rules! debug_assert {
43 ($($x:tt)*) => { 47 ($($x:tt)*) => {
44 { 48 {
@@ -50,6 +54,7 @@ macro_rules! debug_assert {
50 }; 54 };
51} 55}
52 56
57#[collapse_debuginfo(yes)]
53macro_rules! debug_assert_eq { 58macro_rules! debug_assert_eq {
54 ($($x:tt)*) => { 59 ($($x:tt)*) => {
55 { 60 {
@@ -61,6 +66,7 @@ macro_rules! debug_assert_eq {
61 }; 66 };
62} 67}
63 68
69#[collapse_debuginfo(yes)]
64macro_rules! debug_assert_ne { 70macro_rules! debug_assert_ne {
65 ($($x:tt)*) => { 71 ($($x:tt)*) => {
66 { 72 {
@@ -72,6 +78,7 @@ macro_rules! debug_assert_ne {
72 }; 78 };
73} 79}
74 80
81#[collapse_debuginfo(yes)]
75macro_rules! todo { 82macro_rules! todo {
76 ($($x:tt)*) => { 83 ($($x:tt)*) => {
77 { 84 {
@@ -83,20 +90,19 @@ macro_rules! todo {
83 }; 90 };
84} 91}
85 92
86#[cfg(not(feature = "defmt"))] 93#[collapse_debuginfo(yes)]
87macro_rules! unreachable { 94macro_rules! unreachable {
88 ($($x:tt)*) => { 95 ($($x:tt)*) => {
89 ::core::unreachable!($($x)*) 96 {
90 }; 97 #[cfg(not(feature = "defmt"))]
91} 98 ::core::unreachable!($($x)*);
92 99 #[cfg(feature = "defmt")]
93#[cfg(feature = "defmt")] 100 ::defmt::unreachable!($($x)*);
94macro_rules! unreachable { 101 }
95 ($($x:tt)*) => {
96 ::defmt::unreachable!($($x)*)
97 }; 102 };
98} 103}
99 104
105#[collapse_debuginfo(yes)]
100macro_rules! panic { 106macro_rules! panic {
101 ($($x:tt)*) => { 107 ($($x:tt)*) => {
102 { 108 {
@@ -108,6 +114,7 @@ macro_rules! panic {
108 }; 114 };
109} 115}
110 116
117#[collapse_debuginfo(yes)]
111macro_rules! trace { 118macro_rules! trace {
112 ($s:literal $(, $x:expr)* $(,)?) => { 119 ($s:literal $(, $x:expr)* $(,)?) => {
113 { 120 {
@@ -121,6 +128,7 @@ macro_rules! trace {
121 }; 128 };
122} 129}
123 130
131#[collapse_debuginfo(yes)]
124macro_rules! debug { 132macro_rules! debug {
125 ($s:literal $(, $x:expr)* $(,)?) => { 133 ($s:literal $(, $x:expr)* $(,)?) => {
126 { 134 {
@@ -134,6 +142,7 @@ macro_rules! debug {
134 }; 142 };
135} 143}
136 144
145#[collapse_debuginfo(yes)]
137macro_rules! info { 146macro_rules! info {
138 ($s:literal $(, $x:expr)* $(,)?) => { 147 ($s:literal $(, $x:expr)* $(,)?) => {
139 { 148 {
@@ -147,6 +156,7 @@ macro_rules! info {
147 }; 156 };
148} 157}
149 158
159#[collapse_debuginfo(yes)]
150macro_rules! warn { 160macro_rules! warn {
151 ($s:literal $(, $x:expr)* $(,)?) => { 161 ($s:literal $(, $x:expr)* $(,)?) => {
152 { 162 {
@@ -160,6 +170,7 @@ macro_rules! warn {
160 }; 170 };
161} 171}
162 172
173#[collapse_debuginfo(yes)]
163macro_rules! error { 174macro_rules! error {
164 ($s:literal $(, $x:expr)* $(,)?) => { 175 ($s:literal $(, $x:expr)* $(,)?) => {
165 { 176 {
@@ -174,6 +185,7 @@ macro_rules! error {
174} 185}
175 186
176#[cfg(feature = "defmt")] 187#[cfg(feature = "defmt")]
188#[collapse_debuginfo(yes)]
177macro_rules! unwrap { 189macro_rules! unwrap {
178 ($($x:tt)*) => { 190 ($($x:tt)*) => {
179 ::defmt::unwrap!($($x)*) 191 ::defmt::unwrap!($($x)*)
@@ -181,6 +193,7 @@ macro_rules! unwrap {
181} 193}
182 194
183#[cfg(not(feature = "defmt"))] 195#[cfg(not(feature = "defmt"))]
196#[collapse_debuginfo(yes)]
184macro_rules! unwrap { 197macro_rules! unwrap {
185 ($arg:expr) => { 198 ($arg:expr) => {
186 match $crate::fmt::Try::into_result($arg) { 199 match $crate::fmt::Try::into_result($arg) {
@@ -229,7 +242,6 @@ impl<T, E> Try for Result<T, E> {
229 } 242 }
230} 243}
231 244
232#[allow(unused)]
233pub(crate) struct Bytes<'a>(pub &'a [u8]); 245pub(crate) struct Bytes<'a>(pub &'a [u8]);
234 246
235impl<'a> Debug for Bytes<'a> { 247impl<'a> Debug for Bytes<'a> {
diff --git a/embassy-sync/src/lazy_lock.rs b/embassy-sync/src/lazy_lock.rs
new file mode 100644
index 000000000..18e3c2019
--- /dev/null
+++ b/embassy-sync/src/lazy_lock.rs
@@ -0,0 +1,152 @@
1//! Synchronization primitive for initializing a value once, allowing others to get a reference to the value.
2
3use core::cell::UnsafeCell;
4use core::mem::ManuallyDrop;
5use core::sync::atomic::{AtomicBool, Ordering};
6
7/// The `LazyLock` is a synchronization primitive that allows for
8/// initializing a value once, and allowing others to obtain a
9/// reference to the value. This is useful for lazy initialization of
10/// a static value.
11///
12/// # Example
13/// ```
14/// use futures_executor::block_on;
15/// use embassy_sync::lazy_lock::LazyLock;
16///
17/// // Define a static value that will be lazily initialized
18/// // at runtime at the first access.
19/// static VALUE: LazyLock<u32> = LazyLock::new(|| 20);
20///
21/// let reference = VALUE.get();
22/// assert_eq!(reference, &20);
23/// ```
24pub struct LazyLock<T, F = fn() -> T> {
25 init: AtomicBool,
26 data: UnsafeCell<Data<T, F>>,
27}
28
29union Data<T, F> {
30 value: ManuallyDrop<T>,
31 f: ManuallyDrop<F>,
32}
33
34unsafe impl<T, F> Sync for LazyLock<T, F> {}
35
36impl<T, F: FnOnce() -> T> LazyLock<T, F> {
37 /// Create a new uninitialized `StaticLock`.
38 pub const fn new(init_fn: F) -> Self {
39 Self {
40 init: AtomicBool::new(false),
41 data: UnsafeCell::new(Data {
42 f: ManuallyDrop::new(init_fn),
43 }),
44 }
45 }
46
47 /// Get a reference to the underlying value, initializing it if it
48 /// has not been done already.
49 #[inline]
50 pub fn get(&self) -> &T {
51 self.ensure_init_fast();
52 unsafe { &(*self.data.get()).value }
53 }
54
55 /// Consume the `LazyLock`, returning the underlying value. The
56 /// initialization function will be called if it has not been
57 /// already.
58 #[inline]
59 pub fn into_inner(self) -> T {
60 self.ensure_init_fast();
61 let this = ManuallyDrop::new(self);
62 let data = unsafe { core::ptr::read(&this.data) }.into_inner();
63
64 ManuallyDrop::into_inner(unsafe { data.value })
65 }
66
67 /// Initialize the `LazyLock` if it has not been initialized yet.
68 /// This function is a fast track to [`Self::ensure_init`]
69 /// which does not require a critical section in most cases when
70 /// the value has been initialized already.
71 /// When this function returns, `self.data` is guaranteed to be
72 /// initialized and visible on the current core.
73 #[inline]
74 fn ensure_init_fast(&self) {
75 if !self.init.load(Ordering::Acquire) {
76 self.ensure_init();
77 }
78 }
79
80 /// Initialize the `LazyLock` if it has not been initialized yet.
81 /// When this function returns, `self.data` is guaranteed to be
82 /// initialized and visible on the current core.
83 fn ensure_init(&self) {
84 critical_section::with(|_| {
85 if !self.init.load(Ordering::Acquire) {
86 let data = unsafe { &mut *self.data.get() };
87 let f = unsafe { ManuallyDrop::take(&mut data.f) };
88 let value = f();
89 data.value = ManuallyDrop::new(value);
90
91 self.init.store(true, Ordering::Release);
92 }
93 });
94 }
95}
96
97impl<T, F> Drop for LazyLock<T, F> {
98 fn drop(&mut self) {
99 if self.init.load(Ordering::Acquire) {
100 unsafe { ManuallyDrop::drop(&mut self.data.get_mut().value) };
101 } else {
102 unsafe { ManuallyDrop::drop(&mut self.data.get_mut().f) };
103 }
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use core::sync::atomic::{AtomicU32, Ordering};
110
111 use super::*;
112
113 #[test]
114 fn test_lazy_lock() {
115 static VALUE: LazyLock<u32> = LazyLock::new(|| 20);
116 let reference = VALUE.get();
117 assert_eq!(reference, &20);
118 }
119 #[test]
120 fn test_lazy_lock_into_inner() {
121 let lazy: LazyLock<u32> = LazyLock::new(|| 20);
122 let value = lazy.into_inner();
123 assert_eq!(value, 20);
124 }
125
126 static DROP_CHECKER: AtomicU32 = AtomicU32::new(0);
127 struct DropCheck;
128
129 impl Drop for DropCheck {
130 fn drop(&mut self) {
131 DROP_CHECKER.fetch_add(1, Ordering::Acquire);
132 }
133 }
134
135 #[test]
136 fn test_lazy_drop() {
137 let lazy: LazyLock<DropCheck> = LazyLock::new(|| DropCheck);
138 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 0);
139 lazy.get();
140 drop(lazy);
141 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1);
142
143 let dropper = DropCheck;
144 let lazy_fn: LazyLock<u32, _> = LazyLock::new(move || {
145 let _a = dropper;
146 20
147 });
148 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 1);
149 drop(lazy_fn);
150 assert_eq!(DROP_CHECKER.load(Ordering::Acquire), 2);
151 }
152}
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs
index 8a69541a5..df0f5e815 100644
--- a/embassy-sync/src/lib.rs
+++ b/embassy-sync/src/lib.rs
@@ -1,4 +1,4 @@
1#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] 1#![cfg_attr(not(feature = "std"), no_std)]
2#![allow(async_fn_in_trait)] 2#![allow(async_fn_in_trait)]
3#![allow(clippy::new_without_default)] 3#![allow(clippy::new_without_default)]
4#![doc = include_str!("../README.md")] 4#![doc = include_str!("../README.md")]
@@ -12,10 +12,13 @@ mod ring_buffer;
12 12
13pub mod blocking_mutex; 13pub mod blocking_mutex;
14pub mod channel; 14pub mod channel;
15pub mod lazy_lock;
15pub mod mutex; 16pub mod mutex;
17pub mod once_lock;
16pub mod pipe; 18pub mod pipe;
17pub mod priority_channel; 19pub mod priority_channel;
18pub mod pubsub; 20pub mod pubsub;
21pub mod semaphore;
19pub mod signal; 22pub mod signal;
20pub mod waitqueue; 23pub mod waitqueue;
21pub mod watch; 24pub mod watch;
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs
index 72459d660..8c3a3af9f 100644
--- a/embassy-sync/src/mutex.rs
+++ b/embassy-sync/src/mutex.rs
@@ -5,6 +5,7 @@ use core::cell::{RefCell, UnsafeCell};
5use core::future::poll_fn; 5use core::future::poll_fn;
6use core::ops::{Deref, DerefMut}; 6use core::ops::{Deref, DerefMut};
7use core::task::Poll; 7use core::task::Poll;
8use core::{fmt, mem};
8 9
9use crate::blocking_mutex::raw::RawMutex; 10use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex as BlockingMutex; 11use crate::blocking_mutex::Mutex as BlockingMutex;
@@ -128,12 +129,49 @@ where
128 } 129 }
129} 130}
130 131
132impl<M: RawMutex, T> From<T> for Mutex<M, T> {
133 fn from(from: T) -> Self {
134 Self::new(from)
135 }
136}
137
138impl<M, T> Default for Mutex<M, T>
139where
140 M: RawMutex,
141 T: ?Sized + Default,
142{
143 fn default() -> Self {
144 Self::new(Default::default())
145 }
146}
147
148impl<M, T> fmt::Debug for Mutex<M, T>
149where
150 M: RawMutex,
151 T: ?Sized + fmt::Debug,
152{
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 let mut d = f.debug_struct("Mutex");
155 match self.try_lock() {
156 Ok(value) => {
157 d.field("inner", &&*value);
158 }
159 Err(TryLockError) => {
160 d.field("inner", &format_args!("<locked>"));
161 }
162 }
163
164 d.finish_non_exhaustive()
165 }
166}
167
131/// Async mutex guard. 168/// Async mutex guard.
132/// 169///
133/// Owning an instance of this type indicates having 170/// Owning an instance of this type indicates having
134/// successfully locked the mutex, and grants access to the contents. 171/// successfully locked the mutex, and grants access to the contents.
135/// 172///
136/// Dropping it unlocks the mutex. 173/// Dropping it unlocks the mutex.
174#[clippy::has_significant_drop]
137pub struct MutexGuard<'a, M, T> 175pub struct MutexGuard<'a, M, T>
138where 176where
139 M: RawMutex, 177 M: RawMutex,
@@ -142,6 +180,25 @@ where
142 mutex: &'a Mutex<M, T>, 180 mutex: &'a Mutex<M, T>,
143} 181}
144 182
183impl<'a, M, T> MutexGuard<'a, M, T>
184where
185 M: RawMutex,
186 T: ?Sized,
187{
188 /// Returns a locked view over a portion of the locked data.
189 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
190 let mutex = this.mutex;
191 let value = fun(unsafe { &mut *this.mutex.inner.get() });
192 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
193 // locked state is being moved to the returned MappedMutexGuard.
194 mem::forget(this);
195 MappedMutexGuard {
196 state: &mutex.state,
197 value,
198 }
199 }
200}
201
145impl<'a, M, T> Drop for MutexGuard<'a, M, T> 202impl<'a, M, T> Drop for MutexGuard<'a, M, T>
146where 203where
147 M: RawMutex, 204 M: RawMutex,
@@ -180,3 +237,155 @@ where
180 unsafe { &mut *(self.mutex.inner.get()) } 237 unsafe { &mut *(self.mutex.inner.get()) }
181 } 238 }
182} 239}
240
241impl<'a, M, T> fmt::Debug for MutexGuard<'a, M, T>
242where
243 M: RawMutex,
244 T: ?Sized + fmt::Debug,
245{
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 fmt::Debug::fmt(&**self, f)
248 }
249}
250
251impl<'a, M, T> fmt::Display for MutexGuard<'a, M, T>
252where
253 M: RawMutex,
254 T: ?Sized + fmt::Display,
255{
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 fmt::Display::fmt(&**self, f)
258 }
259}
260
261/// A handle to a held `Mutex` that has had a function applied to it via [`MutexGuard::map`] or
262/// [`MappedMutexGuard::map`].
263///
264/// This can be used to hold a subfield of the protected data.
265#[clippy::has_significant_drop]
266pub struct MappedMutexGuard<'a, M, T>
267where
268 M: RawMutex,
269 T: ?Sized,
270{
271 state: &'a BlockingMutex<M, RefCell<State>>,
272 value: *mut T,
273}
274
275impl<'a, M, T> MappedMutexGuard<'a, M, T>
276where
277 M: RawMutex,
278 T: ?Sized,
279{
280 /// Returns a locked view over a portion of the locked data.
281 pub fn map<U>(this: Self, fun: impl FnOnce(&mut T) -> &mut U) -> MappedMutexGuard<'a, M, U> {
282 let state = this.state;
283 let value = fun(unsafe { &mut *this.value });
284 // Don't run the `drop` method for MutexGuard. The ownership of the underlying
285 // locked state is being moved to the returned MappedMutexGuard.
286 mem::forget(this);
287 MappedMutexGuard { state, value }
288 }
289}
290
291impl<'a, M, T> Deref for MappedMutexGuard<'a, M, T>
292where
293 M: RawMutex,
294 T: ?Sized,
295{
296 type Target = T;
297 fn deref(&self) -> &Self::Target {
298 // Safety: the MutexGuard represents exclusive access to the contents
299 // of the mutex, so it's OK to get it.
300 unsafe { &*self.value }
301 }
302}
303
304impl<'a, M, T> DerefMut for MappedMutexGuard<'a, M, T>
305where
306 M: RawMutex,
307 T: ?Sized,
308{
309 fn deref_mut(&mut self) -> &mut Self::Target {
310 // Safety: the MutexGuard represents exclusive access to the contents
311 // of the mutex, so it's OK to get it.
312 unsafe { &mut *self.value }
313 }
314}
315
316impl<'a, M, T> Drop for MappedMutexGuard<'a, M, T>
317where
318 M: RawMutex,
319 T: ?Sized,
320{
321 fn drop(&mut self) {
322 self.state.lock(|s| {
323 let mut s = unwrap!(s.try_borrow_mut());
324 s.locked = false;
325 s.waker.wake();
326 })
327 }
328}
329
330unsafe impl<M, T> Send for MappedMutexGuard<'_, M, T>
331where
332 M: RawMutex + Sync,
333 T: Send + ?Sized,
334{
335}
336
337unsafe impl<M, T> Sync for MappedMutexGuard<'_, M, T>
338where
339 M: RawMutex + Sync,
340 T: Sync + ?Sized,
341{
342}
343
344impl<'a, M, T> fmt::Debug for MappedMutexGuard<'a, M, T>
345where
346 M: RawMutex,
347 T: ?Sized + fmt::Debug,
348{
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 fmt::Debug::fmt(&**self, f)
351 }
352}
353
354impl<'a, M, T> fmt::Display for MappedMutexGuard<'a, M, T>
355where
356 M: RawMutex,
357 T: ?Sized + fmt::Display,
358{
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 fmt::Display::fmt(&**self, f)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use crate::blocking_mutex::raw::NoopRawMutex;
367 use crate::mutex::{Mutex, MutexGuard};
368
369 #[futures_test::test]
370 async fn mapped_guard_releases_lock_when_dropped() {
371 let mutex: Mutex<NoopRawMutex, [i32; 2]> = Mutex::new([0, 1]);
372
373 {
374 let guard = mutex.lock().await;
375 assert_eq!(*guard, [0, 1]);
376 let mut mapped = MutexGuard::map(guard, |this| &mut this[1]);
377 assert_eq!(*mapped, 1);
378 *mapped = 2;
379 }
380
381 {
382 let guard = mutex.lock().await;
383 assert_eq!(*guard, [0, 2]);
384 let mut mapped = MutexGuard::map(guard, |this| &mut this[1]);
385 assert_eq!(*mapped, 2);
386 *mapped = 3;
387 }
388
389 assert_eq!(*mutex.lock().await, [0, 3]);
390 }
391}
diff --git a/embassy-sync/src/once_lock.rs b/embassy-sync/src/once_lock.rs
new file mode 100644
index 000000000..55608ba32
--- /dev/null
+++ b/embassy-sync/src/once_lock.rs
@@ -0,0 +1,236 @@
1//! Synchronization primitive for initializing a value once, allowing others to await a reference to the value.
2
3use core::cell::Cell;
4use core::future::poll_fn;
5use core::mem::MaybeUninit;
6use core::sync::atomic::{AtomicBool, Ordering};
7use core::task::Poll;
8
9/// The `OnceLock` is a synchronization primitive that allows for
10/// initializing a value once, and allowing others to `.await` a
11/// reference to the value. This is useful for lazy initialization of
12/// a static value.
13///
14/// **Note**: this implementation uses a busy loop to poll the value,
15/// which is not as efficient as registering a dedicated `Waker`.
16/// However, if the usecase for it is to initialize a static variable
17/// relatively early in the program life cycle, it should be fine.
18///
19/// # Example
20/// ```
21/// use futures_executor::block_on;
22/// use embassy_sync::once_lock::OnceLock;
23///
24/// // Define a static value that will be lazily initialized
25/// static VALUE: OnceLock<u32> = OnceLock::new();
26///
27/// let f = async {
28///
29/// // Initialize the value
30/// let reference = VALUE.get_or_init(|| 20);
31/// assert_eq!(reference, &20);
32///
33/// // Wait for the value to be initialized
34/// // and get a static reference it
35/// assert_eq!(VALUE.get().await, &20);
36///
37/// };
38/// block_on(f)
39/// ```
40pub struct OnceLock<T> {
41 init: AtomicBool,
42 data: Cell<MaybeUninit<T>>,
43}
44
45unsafe impl<T> Sync for OnceLock<T> {}
46
47impl<T> OnceLock<T> {
48 /// Create a new uninitialized `OnceLock`.
49 pub const fn new() -> Self {
50 Self {
51 init: AtomicBool::new(false),
52 data: Cell::new(MaybeUninit::zeroed()),
53 }
54 }
55
56 /// Get a reference to the underlying value, waiting for it to be set.
57 /// If the value is already set, this will return immediately.
58 pub async fn get(&self) -> &T {
59 poll_fn(|cx| match self.try_get() {
60 Some(data) => Poll::Ready(data),
61 None => {
62 cx.waker().wake_by_ref();
63 Poll::Pending
64 }
65 })
66 .await
67 }
68
69 /// Try to get a reference to the underlying value if it exists.
70 pub fn try_get(&self) -> Option<&T> {
71 if self.init.load(Ordering::Relaxed) {
72 Some(unsafe { self.get_ref_unchecked() })
73 } else {
74 None
75 }
76 }
77
78 /// Set the underlying value. If the value is already set, this will return an error with the given value.
79 pub fn init(&self, value: T) -> Result<(), T> {
80 // Critical section is required to ensure that the value is
81 // not simultaneously initialized elsewhere at the same time.
82 critical_section::with(|_| {
83 // If the value is not set, set it and return Ok.
84 if !self.init.load(Ordering::Relaxed) {
85 self.data.set(MaybeUninit::new(value));
86 self.init.store(true, Ordering::Relaxed);
87 Ok(())
88
89 // Otherwise return an error with the given value.
90 } else {
91 Err(value)
92 }
93 })
94 }
95
96 /// Get a reference to the underlying value, initializing it if it does not exist.
97 pub fn get_or_init<F>(&self, f: F) -> &T
98 where
99 F: FnOnce() -> T,
100 {
101 // Critical section is required to ensure that the value is
102 // not simultaneously initialized elsewhere at the same time.
103 critical_section::with(|_| {
104 // If the value is not set, set it.
105 if !self.init.load(Ordering::Relaxed) {
106 self.data.set(MaybeUninit::new(f()));
107 self.init.store(true, Ordering::Relaxed);
108 }
109 });
110
111 // Return a reference to the value.
112 unsafe { self.get_ref_unchecked() }
113 }
114
115 /// Consume the `OnceLock`, returning the underlying value if it was initialized.
116 pub fn into_inner(self) -> Option<T> {
117 if self.init.load(Ordering::Relaxed) {
118 Some(unsafe { self.data.into_inner().assume_init() })
119 } else {
120 None
121 }
122 }
123
124 /// Take the underlying value if it was initialized, uninitializing the `OnceLock` in the process.
125 pub fn take(&mut self) -> Option<T> {
126 // If the value is set, uninitialize the lock and return the value.
127 critical_section::with(|_| {
128 if self.init.load(Ordering::Relaxed) {
129 let val = unsafe { self.data.replace(MaybeUninit::zeroed()).assume_init() };
130 self.init.store(false, Ordering::Relaxed);
131 Some(val)
132
133 // Otherwise return None.
134 } else {
135 None
136 }
137 })
138 }
139
140 /// Check if the value has been set.
141 pub fn is_set(&self) -> bool {
142 self.init.load(Ordering::Relaxed)
143 }
144
145 /// Get a reference to the underlying value.
146 /// # Safety
147 /// Must only be used if a value has been set.
148 unsafe fn get_ref_unchecked(&self) -> &T {
149 (*self.data.as_ptr()).assume_init_ref()
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn once_lock() {
159 let lock = OnceLock::new();
160 assert_eq!(lock.try_get(), None);
161 assert_eq!(lock.is_set(), false);
162
163 let v = 42;
164 assert_eq!(lock.init(v), Ok(()));
165 assert_eq!(lock.is_set(), true);
166 assert_eq!(lock.try_get(), Some(&v));
167 assert_eq!(lock.try_get(), Some(&v));
168
169 let v = 43;
170 assert_eq!(lock.init(v), Err(v));
171 assert_eq!(lock.is_set(), true);
172 assert_eq!(lock.try_get(), Some(&42));
173 }
174
175 #[test]
176 fn once_lock_get_or_init() {
177 let lock = OnceLock::new();
178 assert_eq!(lock.try_get(), None);
179 assert_eq!(lock.is_set(), false);
180
181 let v = lock.get_or_init(|| 42);
182 assert_eq!(v, &42);
183 assert_eq!(lock.is_set(), true);
184 assert_eq!(lock.try_get(), Some(&42));
185
186 let v = lock.get_or_init(|| 43);
187 assert_eq!(v, &42);
188 assert_eq!(lock.is_set(), true);
189 assert_eq!(lock.try_get(), Some(&42));
190 }
191
192 #[test]
193 fn once_lock_static() {
194 static LOCK: OnceLock<i32> = OnceLock::new();
195
196 let v: &'static i32 = LOCK.get_or_init(|| 42);
197 assert_eq!(v, &42);
198
199 let v: &'static i32 = LOCK.get_or_init(|| 43);
200 assert_eq!(v, &42);
201 }
202
203 #[futures_test::test]
204 async fn once_lock_async() {
205 static LOCK: OnceLock<i32> = OnceLock::new();
206
207 assert!(LOCK.init(42).is_ok());
208
209 let v: &'static i32 = LOCK.get().await;
210 assert_eq!(v, &42);
211 }
212
213 #[test]
214 fn once_lock_into_inner() {
215 let lock: OnceLock<i32> = OnceLock::new();
216
217 let v = lock.get_or_init(|| 42);
218 assert_eq!(v, &42);
219
220 assert_eq!(lock.into_inner(), Some(42));
221 }
222
223 #[test]
224 fn once_lock_take_init() {
225 let mut lock: OnceLock<i32> = OnceLock::new();
226
227 assert_eq!(lock.get_or_init(|| 42), &42);
228 assert_eq!(lock.is_set(), true);
229
230 assert_eq!(lock.take(), Some(42));
231 assert_eq!(lock.is_set(), false);
232
233 assert_eq!(lock.get_or_init(|| 43), &43);
234 assert_eq!(lock.is_set(), true);
235 }
236}
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index 42fe8ebd0..cd5b8ed75 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -25,7 +25,7 @@ where
25 M: RawMutex, 25 M: RawMutex,
26{ 26{
27 fn clone(&self) -> Self { 27 fn clone(&self) -> Self {
28 Writer { pipe: self.pipe } 28 *self
29 } 29 }
30} 30}
31 31
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs
index e77678c24..24c6c5a7f 100644
--- a/embassy-sync/src/priority_channel.rs
+++ b/embassy-sync/src/priority_channel.rs
@@ -33,7 +33,7 @@ where
33 M: RawMutex, 33 M: RawMutex,
34{ 34{
35 fn clone(&self) -> Self { 35 fn clone(&self) -> Self {
36 Sender { channel: self.channel } 36 *self
37 } 37 }
38} 38}
39 39
@@ -101,7 +101,7 @@ where
101 M: RawMutex, 101 M: RawMutex,
102{ 102{
103 fn clone(&self) -> Self { 103 fn clone(&self) -> Self {
104 Receiver { channel: self.channel } 104 *self
105 } 105 }
106} 106}
107 107
@@ -314,6 +314,22 @@ where
314 Poll::Pending 314 Poll::Pending
315 } 315 }
316 } 316 }
317
318 fn clear(&mut self) {
319 self.queue.clear();
320 }
321
322 fn len(&self) -> usize {
323 self.queue.len()
324 }
325
326 fn is_empty(&self) -> bool {
327 self.queue.is_empty()
328 }
329
330 fn is_full(&self) -> bool {
331 self.queue.len() == self.queue.capacity()
332 }
317} 333}
318 334
319/// A bounded channel for communicating between asynchronous tasks 335/// A bounded channel for communicating between asynchronous tasks
@@ -323,7 +339,7 @@ where
323/// buffer is full, attempts to `send` new messages will wait until a message is 339/// buffer is full, attempts to `send` new messages will wait until a message is
324/// received from the channel. 340/// received from the channel.
325/// 341///
326/// Sent data may be reordered based on their priorty within the channel. 342/// Sent data may be reordered based on their priority within the channel.
327/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] 343/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
328/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. 344/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
329pub struct PriorityChannel<M, T, K, const N: usize> 345pub struct PriorityChannel<M, T, K, const N: usize>
@@ -433,6 +449,38 @@ where
433 pub fn try_receive(&self) -> Result<T, TryReceiveError> { 449 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
434 self.lock(|c| c.try_receive()) 450 self.lock(|c| c.try_receive())
435 } 451 }
452
453 /// Returns the maximum number of elements the channel can hold.
454 pub const fn capacity(&self) -> usize {
455 N
456 }
457
458 /// Returns the free capacity of the channel.
459 ///
460 /// This is equivalent to `capacity() - len()`
461 pub fn free_capacity(&self) -> usize {
462 N - self.len()
463 }
464
465 /// Clears all elements in the channel.
466 pub fn clear(&self) {
467 self.lock(|c| c.clear());
468 }
469
470 /// Returns the number of elements currently in the channel.
471 pub fn len(&self) -> usize {
472 self.lock(|c| c.len())
473 }
474
475 /// Returns whether the channel is empty.
476 pub fn is_empty(&self) -> bool {
477 self.lock(|c| c.is_empty())
478 }
479
480 /// Returns whether the channel is full.
481 pub fn is_full(&self) -> bool {
482 self.lock(|c| c.is_full())
483 }
436} 484}
437 485
438/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the 486/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 6afd54af5..812302e2b 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -160,9 +160,60 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { 160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self)) 161 DynImmediatePublisher(ImmediatePub::new(self))
162 } 162 }
163
164 /// Returns the maximum number of elements the channel can hold.
165 pub const fn capacity(&self) -> usize {
166 CAP
167 }
168
169 /// Returns the free capacity of the channel.
170 ///
171 /// This is equivalent to `capacity() - len()`
172 pub fn free_capacity(&self) -> usize {
173 CAP - self.len()
174 }
175
176 /// Clears all elements in the channel.
177 pub fn clear(&self) {
178 self.inner.lock(|inner| inner.borrow_mut().clear());
179 }
180
181 /// Returns the number of elements currently in the channel.
182 pub fn len(&self) -> usize {
183 self.inner.lock(|inner| inner.borrow().len())
184 }
185
186 /// Returns whether the channel is empty.
187 pub fn is_empty(&self) -> bool {
188 self.inner.lock(|inner| inner.borrow().is_empty())
189 }
190
191 /// Returns whether the channel is full.
192 pub fn is_full(&self) -> bool {
193 self.inner.lock(|inner| inner.borrow().is_full())
194 }
163} 195}
164 196
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> 197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
198 for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{
200 fn publish_immediate(&self, message: T) {
201 self.inner.lock(|s| {
202 let mut s = s.borrow_mut();
203 s.publish_immediate(message)
204 })
205 }
206
207 fn capacity(&self) -> usize {
208 self.capacity()
209 }
210
211 fn is_full(&self) -> bool {
212 self.is_full()
213 }
214}
215
216impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS> 217 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{ 218{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { 219 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
@@ -214,20 +265,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
214 }) 265 })
215 } 266 }
216 267
217 fn publish_immediate(&self, message: T) {
218 self.inner.lock(|s| {
219 let mut s = s.borrow_mut();
220 s.publish_immediate(message)
221 })
222 }
223
224 fn space(&self) -> usize {
225 self.inner.lock(|s| {
226 let s = s.borrow();
227 s.queue.capacity() - s.queue.len()
228 })
229 }
230
231 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 268 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
232 self.inner.lock(|s| { 269 self.inner.lock(|s| {
233 let mut s = s.borrow_mut(); 270 let mut s = s.borrow_mut();
@@ -241,6 +278,22 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
241 s.unregister_publisher() 278 s.unregister_publisher()
242 }) 279 })
243 } 280 }
281
282 fn free_capacity(&self) -> usize {
283 self.free_capacity()
284 }
285
286 fn clear(&self) {
287 self.clear();
288 }
289
290 fn len(&self) -> usize {
291 self.len()
292 }
293
294 fn is_empty(&self) -> bool {
295 self.is_empty()
296 }
244} 297}
245 298
246/// Internal state for the PubSub channel 299/// Internal state for the PubSub channel
@@ -366,10 +419,26 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
366 fn unregister_publisher(&mut self) { 419 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1; 420 self.publisher_count -= 1;
368 } 421 }
422
423 fn clear(&mut self) {
424 self.queue.clear();
425 }
426
427 fn len(&self) -> usize {
428 self.queue.len()
429 }
430
431 fn is_empty(&self) -> bool {
432 self.queue.is_empty()
433 }
434
435 fn is_full(&self) -> bool {
436 self.queue.is_full()
437 }
369} 438}
370 439
371/// Error type for the [PubSubChannel] 440/// Error type for the [PubSubChannel]
372#[derive(Debug, PartialEq, Eq, Clone)] 441#[derive(Debug, PartialEq, Eq, Clone, Copy)]
373#[cfg_attr(feature = "defmt", derive(defmt::Format))] 442#[cfg_attr(feature = "defmt", derive(defmt::Format))]
374pub enum Error { 443pub enum Error {
375 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or 444 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
@@ -380,12 +449,10 @@ pub enum Error {
380 MaximumPublishersReached, 449 MaximumPublishersReached,
381} 450}
382 451
383/// 'Middle level' behaviour of the pubsub channel. 452trait SealedPubSubBehavior<T> {
384/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id. 453 /// Try to get a message from the queue with the given message id.
387 /// 454 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 455 /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; 456 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390 457
391 /// Get the amount of messages that are between the given the next_message_id and the most recent message. 458 /// Get the amount of messages that are between the given the next_message_id and the most recent message.
@@ -397,11 +464,19 @@ pub trait PubSubBehavior<T> {
397 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. 464 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
398 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; 465 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
399 466
400 /// Publish a message immediately 467 /// Returns the free capacity of the channel.
401 fn publish_immediate(&self, message: T); 468 ///
469 /// This is equivalent to `capacity() - len()`
470 fn free_capacity(&self) -> usize;
471
472 /// Clears all elements in the channel.
473 fn clear(&self);
402 474
403 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 475 /// Returns the number of elements currently in the channel.
404 fn space(&self) -> usize; 476 fn len(&self) -> usize;
477
478 /// Returns whether the channel is empty.
479 fn is_empty(&self) -> bool;
405 480
406 /// Let the channel know that a subscriber has dropped 481 /// Let the channel know that a subscriber has dropped
407 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 482 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -410,6 +485,20 @@ pub trait PubSubBehavior<T> {
410 fn unregister_publisher(&self); 485 fn unregister_publisher(&self);
411} 486}
412 487
488/// 'Middle level' behaviour of the pubsub channel.
489/// This trait is used so that Sub and Pub can be generic over the channel.
490#[allow(private_bounds)]
491pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
492 /// Publish a message immediately
493 fn publish_immediate(&self, message: T);
494
495 /// Returns the maximum number of elements the channel can hold.
496 fn capacity(&self) -> usize;
497
498 /// Returns whether the channel is full.
499 fn is_full(&self) -> bool;
500}
501
413/// The result of the subscriber wait procedure 502/// The result of the subscriber wait procedure
414#[derive(Debug, Clone, PartialEq, Eq)] 503#[derive(Debug, Clone, PartialEq, Eq)]
415#[cfg_attr(feature = "defmt", derive(defmt::Format))] 504#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -542,6 +631,7 @@ mod tests {
542 assert_eq!(pub0.try_publish(0), Ok(())); 631 assert_eq!(pub0.try_publish(0), Ok(()));
543 assert_eq!(pub0.try_publish(0), Ok(())); 632 assert_eq!(pub0.try_publish(0), Ok(()));
544 assert_eq!(pub0.try_publish(0), Ok(())); 633 assert_eq!(pub0.try_publish(0), Ok(()));
634 assert!(pub0.is_full());
545 assert_eq!(pub0.try_publish(0), Err(0)); 635 assert_eq!(pub0.try_publish(0), Err(0));
546 636
547 drop(sub0); 637 drop(sub0);
@@ -574,32 +664,42 @@ mod tests {
574 } 664 }
575 665
576 #[futures_test::test] 666 #[futures_test::test]
577 async fn correct_space() { 667 async fn correct_len() {
578 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 668 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
579 669
580 let mut sub0 = channel.subscriber().unwrap(); 670 let mut sub0 = channel.subscriber().unwrap();
581 let mut sub1 = channel.subscriber().unwrap(); 671 let mut sub1 = channel.subscriber().unwrap();
582 let pub0 = channel.publisher().unwrap(); 672 let pub0 = channel.publisher().unwrap();
583 673
584 assert_eq!(pub0.space(), 4); 674 assert!(sub0.is_empty());
675 assert!(sub1.is_empty());
676 assert!(pub0.is_empty());
677 assert_eq!(pub0.free_capacity(), 4);
678 assert_eq!(pub0.len(), 0);
585 679
586 pub0.publish(42).await; 680 pub0.publish(42).await;
587 681
588 assert_eq!(pub0.space(), 3); 682 assert_eq!(pub0.free_capacity(), 3);
683 assert_eq!(pub0.len(), 1);
589 684
590 pub0.publish(42).await; 685 pub0.publish(42).await;
591 686
592 assert_eq!(pub0.space(), 2); 687 assert_eq!(pub0.free_capacity(), 2);
688 assert_eq!(pub0.len(), 2);
593 689
594 sub0.next_message().await; 690 sub0.next_message().await;
595 sub0.next_message().await; 691 sub0.next_message().await;
596 692
597 assert_eq!(pub0.space(), 2); 693 assert_eq!(pub0.free_capacity(), 2);
694 assert_eq!(pub0.len(), 2);
598 695
599 sub1.next_message().await; 696 sub1.next_message().await;
600 assert_eq!(pub0.space(), 3); 697 assert_eq!(pub0.free_capacity(), 3);
698 assert_eq!(pub0.len(), 1);
699
601 sub1.next_message().await; 700 sub1.next_message().await;
602 assert_eq!(pub0.space(), 4); 701 assert_eq!(pub0.free_capacity(), 4);
702 assert_eq!(pub0.len(), 0);
603 } 703 }
604 704
605 #[futures_test::test] 705 #[futures_test::test]
@@ -610,29 +710,29 @@ mod tests {
610 let mut sub0 = channel.subscriber().unwrap(); 710 let mut sub0 = channel.subscriber().unwrap();
611 let mut sub1 = channel.subscriber().unwrap(); 711 let mut sub1 = channel.subscriber().unwrap();
612 712
613 assert_eq!(4, pub0.space()); 713 assert_eq!(4, pub0.free_capacity());
614 714
615 pub0.publish(1).await; 715 pub0.publish(1).await;
616 pub0.publish(2).await; 716 pub0.publish(2).await;
617 717
618 assert_eq!(2, channel.space()); 718 assert_eq!(2, channel.free_capacity());
619 719
620 assert_eq!(1, sub0.try_next_message_pure().unwrap()); 720 assert_eq!(1, sub0.try_next_message_pure().unwrap());
621 assert_eq!(2, sub0.try_next_message_pure().unwrap()); 721 assert_eq!(2, sub0.try_next_message_pure().unwrap());
622 722
623 assert_eq!(2, channel.space()); 723 assert_eq!(2, channel.free_capacity());
624 724
625 drop(sub0); 725 drop(sub0);
626 726
627 assert_eq!(2, channel.space()); 727 assert_eq!(2, channel.free_capacity());
628 728
629 assert_eq!(1, sub1.try_next_message_pure().unwrap()); 729 assert_eq!(1, sub1.try_next_message_pure().unwrap());
630 730
631 assert_eq!(3, channel.space()); 731 assert_eq!(3, channel.free_capacity());
632 732
633 drop(sub1); 733 drop(sub1);
634 734
635 assert_eq!(4, channel.space()); 735 assert_eq!(4, channel.free_capacity());
636 } 736 }
637 737
638 struct CloneCallCounter(usize); 738 struct CloneCallCounter(usize);
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e1edc9eb9..e66b3b1db 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -43,12 +43,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45 45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 46 /// Returns the maximum number of elements the ***channel*** can hold.
47 pub fn capacity(&self) -> usize {
48 self.channel.capacity()
49 }
50
51 /// Returns the free capacity of the ***channel***.
47 /// 52 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 53 /// This is equivalent to `capacity() - len()`
49 /// So checking doesn't give any guarantees.* 54 pub fn free_capacity(&self) -> usize {
50 pub fn space(&self) -> usize { 55 self.channel.free_capacity()
51 self.channel.space() 56 }
57
58 /// Clears all elements in the ***channel***.
59 pub fn clear(&self) {
60 self.channel.clear();
61 }
62
63 /// Returns the number of elements currently in the ***channel***.
64 pub fn len(&self) -> usize {
65 self.channel.len()
66 }
67
68 /// Returns whether the ***channel*** is empty.
69 pub fn is_empty(&self) -> bool {
70 self.channel.is_empty()
71 }
72
73 /// Returns whether the ***channel*** is full.
74 pub fn is_full(&self) -> bool {
75 self.channel.is_full()
52 } 76 }
53} 77}
54 78
@@ -124,12 +148,36 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
124 self.channel.publish_with_context(message, None) 148 self.channel.publish_with_context(message, None)
125 } 149 }
126 150
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 151 /// Returns the maximum number of elements the ***channel*** can hold.
152 pub fn capacity(&self) -> usize {
153 self.channel.capacity()
154 }
155
156 /// Returns the free capacity of the ***channel***.
128 /// 157 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 158 /// This is equivalent to `capacity() - len()`
130 /// So checking doesn't give any guarantees.* 159 pub fn free_capacity(&self) -> usize {
131 pub fn space(&self) -> usize { 160 self.channel.free_capacity()
132 self.channel.space() 161 }
162
163 /// Clears all elements in the ***channel***.
164 pub fn clear(&self) {
165 self.channel.clear();
166 }
167
168 /// Returns the number of elements currently in the ***channel***.
169 pub fn len(&self) -> usize {
170 self.channel.len()
171 }
172
173 /// Returns whether the ***channel*** is empty.
174 pub fn is_empty(&self) -> bool {
175 self.channel.is_empty()
176 }
177
178 /// Returns whether the ***channel*** is full.
179 pub fn is_full(&self) -> bool {
180 self.channel.is_full()
133 } 181 }
134} 182}
135 183
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index f420a75f0..6ad660cb3 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -65,10 +65,44 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
65 } 65 }
66 } 66 }
67 67
68 /// The amount of messages this subscriber hasn't received yet 68 /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
69 /// for this subscriber.
69 pub fn available(&self) -> u64 { 70 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id) 71 self.channel.available(self.next_message_id)
71 } 72 }
73
74 /// Returns the maximum number of elements the ***channel*** can hold.
75 pub fn capacity(&self) -> usize {
76 self.channel.capacity()
77 }
78
79 /// Returns the free capacity of the ***channel***.
80 ///
81 /// This is equivalent to `capacity() - len()`
82 pub fn free_capacity(&self) -> usize {
83 self.channel.free_capacity()
84 }
85
86 /// Clears all elements in the ***channel***.
87 pub fn clear(&self) {
88 self.channel.clear();
89 }
90
91 /// Returns the number of elements currently in the ***channel***.
92 /// See [Self::available] for how many messages are available for this subscriber.
93 pub fn len(&self) -> usize {
94 self.channel.len()
95 }
96
97 /// Returns whether the ***channel*** is empty.
98 pub fn is_empty(&self) -> bool {
99 self.channel.is_empty()
100 }
101
102 /// Returns whether the ***channel*** is full.
103 pub fn is_full(&self) -> bool {
104 self.channel.is_full()
105 }
72} 106}
73 107
74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 108impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
diff --git a/embassy-sync/src/semaphore.rs b/embassy-sync/src/semaphore.rs
new file mode 100644
index 000000000..d30eee30b
--- /dev/null
+++ b/embassy-sync/src/semaphore.rs
@@ -0,0 +1,772 @@
1//! A synchronization primitive for controlling access to a pool of resources.
2use core::cell::{Cell, RefCell};
3use core::convert::Infallible;
4use core::future::{poll_fn, Future};
5use core::task::{Poll, Waker};
6
7use heapless::Deque;
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::WakerRegistration;
12
13/// An asynchronous semaphore.
14///
15/// A semaphore tracks a number of permits, typically representing a pool of shared resources.
16/// Users can acquire permits to synchronize access to those resources. The semaphore does not
17/// contain the resources themselves, only the count of available permits.
18pub trait Semaphore: Sized {
19 /// The error returned when the semaphore is unable to acquire the requested permits.
20 type Error;
21
22 /// Asynchronously acquire one or more permits from the semaphore.
23 async fn acquire(&self, permits: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error>;
24
25 /// Try to immediately acquire one or more permits from the semaphore.
26 fn try_acquire(&self, permits: usize) -> Option<SemaphoreReleaser<'_, Self>>;
27
28 /// Asynchronously acquire all permits controlled by the semaphore.
29 ///
30 /// This method will wait until at least `min` permits are available, then acquire all available permits
31 /// from the semaphore. Note that other tasks may have already acquired some permits which could be released
32 /// back to the semaphore at any time. The number of permits actually acquired may be determined by calling
33 /// [`SemaphoreReleaser::permits`].
34 async fn acquire_all(&self, min: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error>;
35
36 /// Try to immediately acquire all available permits from the semaphore, if at least `min` permits are available.
37 fn try_acquire_all(&self, min: usize) -> Option<SemaphoreReleaser<'_, Self>>;
38
39 /// Release `permits` back to the semaphore, making them available to be acquired.
40 fn release(&self, permits: usize);
41
42 /// Reset the number of available permints in the semaphore to `permits`.
43 fn set(&self, permits: usize);
44}
45
46/// A representation of a number of acquired permits.
47///
48/// The acquired permits will be released back to the [`Semaphore`] when this is dropped.
49pub struct SemaphoreReleaser<'a, S: Semaphore> {
50 semaphore: &'a S,
51 permits: usize,
52}
53
54impl<'a, S: Semaphore> Drop for SemaphoreReleaser<'a, S> {
55 fn drop(&mut self) {
56 self.semaphore.release(self.permits);
57 }
58}
59
60impl<'a, S: Semaphore> SemaphoreReleaser<'a, S> {
61 /// The number of acquired permits.
62 pub fn permits(&self) -> usize {
63 self.permits
64 }
65
66 /// Prevent the acquired permits from being released on drop.
67 ///
68 /// Returns the number of acquired permits.
69 pub fn disarm(self) -> usize {
70 let permits = self.permits;
71 core::mem::forget(self);
72 permits
73 }
74}
75
76/// A greedy [`Semaphore`] implementation.
77///
78/// Tasks can acquire permits as soon as they become available, even if another task
79/// is waiting on a larger number of permits.
80pub struct GreedySemaphore<M: RawMutex> {
81 state: Mutex<M, Cell<SemaphoreState>>,
82}
83
84impl<M: RawMutex> Default for GreedySemaphore<M> {
85 fn default() -> Self {
86 Self::new(0)
87 }
88}
89
90impl<M: RawMutex> GreedySemaphore<M> {
91 /// Create a new `Semaphore`.
92 pub const fn new(permits: usize) -> Self {
93 Self {
94 state: Mutex::new(Cell::new(SemaphoreState {
95 permits,
96 waker: WakerRegistration::new(),
97 })),
98 }
99 }
100
101 #[cfg(test)]
102 fn permits(&self) -> usize {
103 self.state.lock(|cell| {
104 let state = cell.replace(SemaphoreState::EMPTY);
105 let permits = state.permits;
106 cell.replace(state);
107 permits
108 })
109 }
110
111 fn poll_acquire(
112 &self,
113 permits: usize,
114 acquire_all: bool,
115 waker: Option<&Waker>,
116 ) -> Poll<Result<SemaphoreReleaser<'_, Self>, Infallible>> {
117 self.state.lock(|cell| {
118 let mut state = cell.replace(SemaphoreState::EMPTY);
119 if let Some(permits) = state.take(permits, acquire_all) {
120 cell.set(state);
121 Poll::Ready(Ok(SemaphoreReleaser {
122 semaphore: self,
123 permits,
124 }))
125 } else {
126 if let Some(waker) = waker {
127 state.register(waker);
128 }
129 cell.set(state);
130 Poll::Pending
131 }
132 })
133 }
134}
135
136impl<M: RawMutex> Semaphore for GreedySemaphore<M> {
137 type Error = Infallible;
138
139 async fn acquire(&self, permits: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error> {
140 poll_fn(|cx| self.poll_acquire(permits, false, Some(cx.waker()))).await
141 }
142
143 fn try_acquire(&self, permits: usize) -> Option<SemaphoreReleaser<'_, Self>> {
144 match self.poll_acquire(permits, false, None) {
145 Poll::Ready(Ok(n)) => Some(n),
146 _ => None,
147 }
148 }
149
150 async fn acquire_all(&self, min: usize) -> Result<SemaphoreReleaser<'_, Self>, Self::Error> {
151 poll_fn(|cx| self.poll_acquire(min, true, Some(cx.waker()))).await
152 }
153
154 fn try_acquire_all(&self, min: usize) -> Option<SemaphoreReleaser<'_, Self>> {
155 match self.poll_acquire(min, true, None) {
156 Poll::Ready(Ok(n)) => Some(n),
157 _ => None,
158 }
159 }
160
161 fn release(&self, permits: usize) {
162 if permits > 0 {
163 self.state.lock(|cell| {
164 let mut state = cell.replace(SemaphoreState::EMPTY);
165 state.permits += permits;
166 state.wake();
167 cell.set(state);
168 });
169 }
170 }
171
172 fn set(&self, permits: usize) {
173 self.state.lock(|cell| {
174 let mut state = cell.replace(SemaphoreState::EMPTY);
175 if permits > state.permits {
176 state.wake();
177 }
178 state.permits = permits;
179 cell.set(state);
180 });
181 }
182}
183
184struct SemaphoreState {
185 permits: usize,
186 waker: WakerRegistration,
187}
188
189impl SemaphoreState {
190 const EMPTY: SemaphoreState = SemaphoreState {
191 permits: 0,
192 waker: WakerRegistration::new(),
193 };
194
195 fn register(&mut self, w: &Waker) {
196 self.waker.register(w);
197 }
198
199 fn take(&mut self, mut permits: usize, acquire_all: bool) -> Option<usize> {
200 if self.permits < permits {
201 None
202 } else {
203 if acquire_all {
204 permits = self.permits;
205 }
206 self.permits -= permits;
207 Some(permits)
208 }
209 }
210
211 fn wake(&mut self) {
212 self.waker.wake();
213 }
214}
215
216/// A fair [`Semaphore`] implementation.
217///
218/// Tasks are allowed to acquire permits in FIFO order. A task waiting to acquire
219/// a large number of permits will prevent other tasks from acquiring any permits
220/// until its request is satisfied.
221///
222/// Up to `N` tasks may attempt to acquire permits concurrently. If additional
223/// tasks attempt to acquire a permit, a [`WaitQueueFull`] error will be returned.
224pub struct FairSemaphore<M, const N: usize>
225where
226 M: RawMutex,
227{
228 state: Mutex<M, RefCell<FairSemaphoreState<N>>>,
229}
230
231impl<M, const N: usize> Default for FairSemaphore<M, N>
232where
233 M: RawMutex,
234{
235 fn default() -> Self {
236 Self::new(0)
237 }
238}
239
240impl<M, const N: usize> FairSemaphore<M, N>
241where
242 M: RawMutex,
243{
244 /// Create a new `FairSemaphore`.
245 pub const fn new(permits: usize) -> Self {
246 Self {
247 state: Mutex::new(RefCell::new(FairSemaphoreState::new(permits))),
248 }
249 }
250
251 #[cfg(test)]
252 fn permits(&self) -> usize {
253 self.state.lock(|cell| cell.borrow().permits)
254 }
255
256 fn poll_acquire(
257 &self,
258 permits: usize,
259 acquire_all: bool,
260 cx: Option<(&mut Option<usize>, &Waker)>,
261 ) -> Poll<Result<SemaphoreReleaser<'_, Self>, WaitQueueFull>> {
262 let ticket = cx.as_ref().map(|(x, _)| **x).unwrap_or(None);
263 self.state.lock(|cell| {
264 let mut state = cell.borrow_mut();
265 if let Some(permits) = state.take(ticket, permits, acquire_all) {
266 Poll::Ready(Ok(SemaphoreReleaser {
267 semaphore: self,
268 permits,
269 }))
270 } else if let Some((ticket_ref, waker)) = cx {
271 match state.register(ticket, waker) {
272 Ok(ticket) => {
273 *ticket_ref = Some(ticket);
274 Poll::Pending
275 }
276 Err(err) => Poll::Ready(Err(err)),
277 }
278 } else {
279 Poll::Pending
280 }
281 })
282 }
283}
284
285/// An error indicating the [`FairSemaphore`]'s wait queue is full.
286#[derive(Debug, Clone, Copy, PartialEq, Eq)]
287#[cfg_attr(feature = "defmt", derive(defmt::Format))]
288pub struct WaitQueueFull;
289
290impl<M: RawMutex, const N: usize> Semaphore for FairSemaphore<M, N> {
291 type Error = WaitQueueFull;
292
293 fn acquire(&self, permits: usize) -> impl Future<Output = Result<SemaphoreReleaser<'_, Self>, Self::Error>> {
294 FairAcquire {
295 sema: self,
296 permits,
297 ticket: None,
298 }
299 }
300
301 fn try_acquire(&self, permits: usize) -> Option<SemaphoreReleaser<'_, Self>> {
302 match self.poll_acquire(permits, false, None) {
303 Poll::Ready(Ok(x)) => Some(x),
304 _ => None,
305 }
306 }
307
308 fn acquire_all(&self, min: usize) -> impl Future<Output = Result<SemaphoreReleaser<'_, Self>, Self::Error>> {
309 FairAcquireAll {
310 sema: self,
311 min,
312 ticket: None,
313 }
314 }
315
316 fn try_acquire_all(&self, min: usize) -> Option<SemaphoreReleaser<'_, Self>> {
317 match self.poll_acquire(min, true, None) {
318 Poll::Ready(Ok(x)) => Some(x),
319 _ => None,
320 }
321 }
322
323 fn release(&self, permits: usize) {
324 if permits > 0 {
325 self.state.lock(|cell| {
326 let mut state = cell.borrow_mut();
327 state.permits += permits;
328 state.wake();
329 });
330 }
331 }
332
333 fn set(&self, permits: usize) {
334 self.state.lock(|cell| {
335 let mut state = cell.borrow_mut();
336 if permits > state.permits {
337 state.wake();
338 }
339 state.permits = permits;
340 });
341 }
342}
343
344struct FairAcquire<'a, M: RawMutex, const N: usize> {
345 sema: &'a FairSemaphore<M, N>,
346 permits: usize,
347 ticket: Option<usize>,
348}
349
350impl<'a, M: RawMutex, const N: usize> Drop for FairAcquire<'a, M, N> {
351 fn drop(&mut self) {
352 self.sema
353 .state
354 .lock(|cell| cell.borrow_mut().cancel(self.ticket.take()));
355 }
356}
357
358impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquire<'a, M, N> {
359 type Output = Result<SemaphoreReleaser<'a, FairSemaphore<M, N>>, WaitQueueFull>;
360
361 fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
362 self.sema
363 .poll_acquire(self.permits, false, Some((&mut self.ticket, cx.waker())))
364 }
365}
366
367struct FairAcquireAll<'a, M: RawMutex, const N: usize> {
368 sema: &'a FairSemaphore<M, N>,
369 min: usize,
370 ticket: Option<usize>,
371}
372
373impl<'a, M: RawMutex, const N: usize> Drop for FairAcquireAll<'a, M, N> {
374 fn drop(&mut self) {
375 self.sema
376 .state
377 .lock(|cell| cell.borrow_mut().cancel(self.ticket.take()));
378 }
379}
380
381impl<'a, M: RawMutex, const N: usize> core::future::Future for FairAcquireAll<'a, M, N> {
382 type Output = Result<SemaphoreReleaser<'a, FairSemaphore<M, N>>, WaitQueueFull>;
383
384 fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
385 self.sema
386 .poll_acquire(self.min, true, Some((&mut self.ticket, cx.waker())))
387 }
388}
389
390struct FairSemaphoreState<const N: usize> {
391 permits: usize,
392 next_ticket: usize,
393 wakers: Deque<Option<Waker>, N>,
394}
395
396impl<const N: usize> FairSemaphoreState<N> {
397 /// Create a new empty instance
398 const fn new(permits: usize) -> Self {
399 Self {
400 permits,
401 next_ticket: 0,
402 wakers: Deque::new(),
403 }
404 }
405
406 /// Register a waker. If the queue is full the function returns an error
407 fn register(&mut self, ticket: Option<usize>, w: &Waker) -> Result<usize, WaitQueueFull> {
408 self.pop_canceled();
409
410 match ticket {
411 None => {
412 let ticket = self.next_ticket.wrapping_add(self.wakers.len());
413 self.wakers.push_back(Some(w.clone())).or(Err(WaitQueueFull))?;
414 Ok(ticket)
415 }
416 Some(ticket) => {
417 self.set_waker(ticket, Some(w.clone()));
418 Ok(ticket)
419 }
420 }
421 }
422
423 fn cancel(&mut self, ticket: Option<usize>) {
424 if let Some(ticket) = ticket {
425 self.set_waker(ticket, None);
426 }
427 }
428
429 fn set_waker(&mut self, ticket: usize, waker: Option<Waker>) {
430 let i = ticket.wrapping_sub(self.next_ticket);
431 if i < self.wakers.len() {
432 let (a, b) = self.wakers.as_mut_slices();
433 let x = if i < a.len() { &mut a[i] } else { &mut b[i - a.len()] };
434 *x = waker;
435 }
436 }
437
438 fn take(&mut self, ticket: Option<usize>, mut permits: usize, acquire_all: bool) -> Option<usize> {
439 self.pop_canceled();
440
441 if permits > self.permits {
442 return None;
443 }
444
445 match ticket {
446 Some(n) if n != self.next_ticket => return None,
447 None if !self.wakers.is_empty() => return None,
448 _ => (),
449 }
450
451 if acquire_all {
452 permits = self.permits;
453 }
454 self.permits -= permits;
455
456 if ticket.is_some() {
457 self.pop();
458 if self.permits > 0 {
459 self.wake();
460 }
461 }
462
463 Some(permits)
464 }
465
466 fn pop_canceled(&mut self) {
467 while let Some(None) = self.wakers.front() {
468 self.pop();
469 }
470 }
471
472 /// Panics if `self.wakers` is empty
473 fn pop(&mut self) {
474 self.wakers.pop_front().unwrap();
475 self.next_ticket = self.next_ticket.wrapping_add(1);
476 }
477
478 fn wake(&mut self) {
479 self.pop_canceled();
480
481 if let Some(Some(waker)) = self.wakers.front() {
482 waker.wake_by_ref();
483 }
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 mod greedy {
490 use core::pin::pin;
491
492 use futures_util::poll;
493
494 use super::super::*;
495 use crate::blocking_mutex::raw::NoopRawMutex;
496
497 #[test]
498 fn try_acquire() {
499 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
500
501 let a = semaphore.try_acquire(1).unwrap();
502 assert_eq!(a.permits(), 1);
503 assert_eq!(semaphore.permits(), 2);
504
505 core::mem::drop(a);
506 assert_eq!(semaphore.permits(), 3);
507 }
508
509 #[test]
510 fn disarm() {
511 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
512
513 let a = semaphore.try_acquire(1).unwrap();
514 assert_eq!(a.disarm(), 1);
515 assert_eq!(semaphore.permits(), 2);
516 }
517
518 #[futures_test::test]
519 async fn acquire() {
520 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
521
522 let a = semaphore.acquire(1).await.unwrap();
523 assert_eq!(a.permits(), 1);
524 assert_eq!(semaphore.permits(), 2);
525
526 core::mem::drop(a);
527 assert_eq!(semaphore.permits(), 3);
528 }
529
530 #[test]
531 fn try_acquire_all() {
532 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
533
534 let a = semaphore.try_acquire_all(1).unwrap();
535 assert_eq!(a.permits(), 3);
536 assert_eq!(semaphore.permits(), 0);
537 }
538
539 #[futures_test::test]
540 async fn acquire_all() {
541 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
542
543 let a = semaphore.acquire_all(1).await.unwrap();
544 assert_eq!(a.permits(), 3);
545 assert_eq!(semaphore.permits(), 0);
546 }
547
548 #[test]
549 fn release() {
550 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
551 assert_eq!(semaphore.permits(), 3);
552 semaphore.release(2);
553 assert_eq!(semaphore.permits(), 5);
554 }
555
556 #[test]
557 fn set() {
558 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
559 assert_eq!(semaphore.permits(), 3);
560 semaphore.set(2);
561 assert_eq!(semaphore.permits(), 2);
562 }
563
564 #[test]
565 fn contested() {
566 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
567
568 let a = semaphore.try_acquire(1).unwrap();
569 let b = semaphore.try_acquire(3);
570 assert!(b.is_none());
571
572 core::mem::drop(a);
573
574 let b = semaphore.try_acquire(3);
575 assert!(b.is_some());
576 }
577
578 #[futures_test::test]
579 async fn greedy() {
580 let semaphore = GreedySemaphore::<NoopRawMutex>::new(3);
581
582 let a = semaphore.try_acquire(1).unwrap();
583
584 let b_fut = semaphore.acquire(3);
585 let mut b_fut = pin!(b_fut);
586 let b = poll!(b_fut.as_mut());
587 assert!(b.is_pending());
588
589 // Succeed even through `b` is waiting
590 let c = semaphore.try_acquire(1);
591 assert!(c.is_some());
592
593 let b = poll!(b_fut.as_mut());
594 assert!(b.is_pending());
595
596 core::mem::drop(a);
597
598 let b = poll!(b_fut.as_mut());
599 assert!(b.is_pending());
600
601 core::mem::drop(c);
602
603 let b = poll!(b_fut.as_mut());
604 assert!(b.is_ready());
605 }
606 }
607
608 mod fair {
609 use core::pin::pin;
610 use core::time::Duration;
611
612 use futures_executor::ThreadPool;
613 use futures_timer::Delay;
614 use futures_util::poll;
615 use futures_util::task::SpawnExt;
616 use static_cell::StaticCell;
617
618 use super::super::*;
619 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
620
621 #[test]
622 fn try_acquire() {
623 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
624
625 let a = semaphore.try_acquire(1).unwrap();
626 assert_eq!(a.permits(), 1);
627 assert_eq!(semaphore.permits(), 2);
628
629 core::mem::drop(a);
630 assert_eq!(semaphore.permits(), 3);
631 }
632
633 #[test]
634 fn disarm() {
635 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
636
637 let a = semaphore.try_acquire(1).unwrap();
638 assert_eq!(a.disarm(), 1);
639 assert_eq!(semaphore.permits(), 2);
640 }
641
642 #[futures_test::test]
643 async fn acquire() {
644 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
645
646 let a = semaphore.acquire(1).await.unwrap();
647 assert_eq!(a.permits(), 1);
648 assert_eq!(semaphore.permits(), 2);
649
650 core::mem::drop(a);
651 assert_eq!(semaphore.permits(), 3);
652 }
653
654 #[test]
655 fn try_acquire_all() {
656 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
657
658 let a = semaphore.try_acquire_all(1).unwrap();
659 assert_eq!(a.permits(), 3);
660 assert_eq!(semaphore.permits(), 0);
661 }
662
663 #[futures_test::test]
664 async fn acquire_all() {
665 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
666
667 let a = semaphore.acquire_all(1).await.unwrap();
668 assert_eq!(a.permits(), 3);
669 assert_eq!(semaphore.permits(), 0);
670 }
671
672 #[test]
673 fn release() {
674 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
675 assert_eq!(semaphore.permits(), 3);
676 semaphore.release(2);
677 assert_eq!(semaphore.permits(), 5);
678 }
679
680 #[test]
681 fn set() {
682 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
683 assert_eq!(semaphore.permits(), 3);
684 semaphore.set(2);
685 assert_eq!(semaphore.permits(), 2);
686 }
687
688 #[test]
689 fn contested() {
690 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
691
692 let a = semaphore.try_acquire(1).unwrap();
693 let b = semaphore.try_acquire(3);
694 assert!(b.is_none());
695
696 core::mem::drop(a);
697
698 let b = semaphore.try_acquire(3);
699 assert!(b.is_some());
700 }
701
702 #[futures_test::test]
703 async fn fairness() {
704 let semaphore = FairSemaphore::<NoopRawMutex, 2>::new(3);
705
706 let a = semaphore.try_acquire(1);
707 assert!(a.is_some());
708
709 let b_fut = semaphore.acquire(3);
710 let mut b_fut = pin!(b_fut);
711 let b = poll!(b_fut.as_mut()); // Poll `b_fut` once so it is registered
712 assert!(b.is_pending());
713
714 let c = semaphore.try_acquire(1);
715 assert!(c.is_none());
716
717 let c_fut = semaphore.acquire(1);
718 let mut c_fut = pin!(c_fut);
719 let c = poll!(c_fut.as_mut()); // Poll `c_fut` once so it is registered
720 assert!(c.is_pending()); // `c` is blocked behind `b`
721
722 let d = semaphore.acquire(1).await;
723 assert!(matches!(d, Err(WaitQueueFull)));
724
725 core::mem::drop(a);
726
727 let c = poll!(c_fut.as_mut());
728 assert!(c.is_pending()); // `c` is still blocked behind `b`
729
730 let b = poll!(b_fut.as_mut());
731 assert!(b.is_ready());
732
733 let c = poll!(c_fut.as_mut());
734 assert!(c.is_pending()); // `c` is still blocked behind `b`
735
736 core::mem::drop(b);
737
738 let c = poll!(c_fut.as_mut());
739 assert!(c.is_ready());
740 }
741
742 #[futures_test::test]
743 async fn wakers() {
744 let executor = ThreadPool::new().unwrap();
745
746 static SEMAPHORE: StaticCell<FairSemaphore<CriticalSectionRawMutex, 2>> = StaticCell::new();
747 let semaphore = &*SEMAPHORE.init(FairSemaphore::new(3));
748
749 let a = semaphore.try_acquire(2);
750 assert!(a.is_some());
751
752 let b_task = executor
753 .spawn_with_handle(async move { semaphore.acquire(2).await })
754 .unwrap();
755 while semaphore.state.lock(|x| x.borrow().wakers.is_empty()) {
756 Delay::new(Duration::from_millis(50)).await;
757 }
758
759 let c_task = executor
760 .spawn_with_handle(async move { semaphore.acquire(1).await })
761 .unwrap();
762
763 core::mem::drop(a);
764
765 let b = b_task.await.unwrap();
766 assert_eq!(b.permits(), 2);
767
768 let c = c_task.await.unwrap();
769 assert_eq!(c.permits(), 1);
770 }
771 }
772}
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs
index d75750ce7..a0f4b5a74 100644
--- a/embassy-sync/src/signal.rs
+++ b/embassy-sync/src/signal.rs
@@ -65,7 +65,7 @@ where
65 } 65 }
66} 66}
67 67
68impl<M, T: Send> Signal<M, T> 68impl<M, T> Signal<M, T>
69where 69where
70 M: RawMutex, 70 M: RawMutex,
71{ 71{
@@ -125,7 +125,7 @@ where
125 }) 125 })
126 } 126 }
127 127
128 /// non-blocking method to check whether this signal has been signaled. 128 /// non-blocking method to check whether this signal has been signaled. This does not clear the signal.
129 pub fn signaled(&self) -> bool { 129 pub fn signaled(&self) -> bool {
130 self.state.lock(|cell| { 130 self.state.lock(|cell| {
131 let state = cell.replace(State::None); 131 let state = cell.replace(State::None);
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs
index 824d192da..0e520bf40 100644
--- a/embassy-sync/src/waitqueue/multi_waker.rs
+++ b/embassy-sync/src/waitqueue/multi_waker.rs
@@ -14,7 +14,7 @@ impl<const N: usize> MultiWakerRegistration<N> {
14 } 14 }
15 15
16 /// Register a waker. If the buffer is full the function returns it in the error 16 /// Register a waker. If the buffer is full the function returns it in the error
17 pub fn register<'a>(&mut self, w: &'a Waker) { 17 pub fn register(&mut self, w: &Waker) {
18 // If we already have some waker that wakes the same task as `w`, do nothing. 18 // If we already have some waker that wakes the same task as `w`, do nothing.
19 // This avoids cloning wakers, and avoids unnecessary mass-wakes. 19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
20 for w2 in &self.wakers { 20 for w2 in &self.wakers {
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs
index f704cbd5d..cfce9a571 100644
--- a/embassy-sync/src/zerocopy_channel.rs
+++ b/embassy-sync/src/zerocopy_channel.rs
@@ -1,10 +1,7 @@
1//! A zero-copy queue for sending values between asynchronous tasks. 1//! A zero-copy queue for sending values between asynchronous tasks.
2//! 2//!
3//! It can be used concurrently by multiple producers (senders) and multiple 3//! It can be used concurrently by a producer (sender) and a
4//! consumers (receivers), i.e. it is an "MPMC channel". 4//! consumer (receiver), i.e. it is an "SPSC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//! 5//!
9//! This queue takes a Mutex type so that various 6//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used 7//! targets can be attained. For example, a ThreadModeMutex can be used