diff options
| author | Frostie314159 <[email protected]> | 2024-03-21 14:17:03 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-03-21 14:17:03 +0100 |
| commit | 7efe8e0005ca75c3753081848e8b176f8ac3a9ba (patch) | |
| tree | 3690b3b873b4643a07e29b60bf92596a819d30f0 /embassy-sync/src | |
| parent | 8707462ec23807782796fbac4295bc5bce9ff136 (diff) | |
| parent | 29d388042cdbd76d46f66ebe58cc580edb4515f8 (diff) | |
Merge branch 'embassy-rs:main' into reset-at-after
Diffstat (limited to 'embassy-sync/src')
| -rw-r--r-- | embassy-sync/src/channel.rs | 37 | ||||
| -rw-r--r-- | embassy-sync/src/fmt.rs | 3 | ||||
| -rw-r--r-- | embassy-sync/src/lib.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/priority_channel.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/ring_buffer.rs | 22 | ||||
| -rw-r--r-- | embassy-sync/src/signal.rs | 16 | ||||
| -rw-r--r-- | embassy-sync/src/zerocopy_channel.rs | 7 |
7 files changed, 66 insertions, 23 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index ff7129303..48f4dafd6 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -263,6 +263,12 @@ impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> { | |||
| 263 | } | 263 | } |
| 264 | } | 264 | } |
| 265 | 265 | ||
| 266 | impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> { | ||
| 267 | fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self { | ||
| 268 | Self { channel: value.channel } | ||
| 269 | } | ||
| 270 | } | ||
| 271 | |||
| 266 | /// Future returned by [`Channel::send`] and [`Sender::send`]. | 272 | /// Future returned by [`Channel::send`] and [`Sender::send`]. |
| 267 | #[must_use = "futures do nothing unless you `.await` or poll them"] | 273 | #[must_use = "futures do nothing unless you `.await` or poll them"] |
| 268 | pub struct SendFuture<'ch, M, T, const N: usize> | 274 | pub struct SendFuture<'ch, M, T, const N: usize> |
| @@ -321,6 +327,15 @@ impl<'ch, T> Future for DynamicSendFuture<'ch, T> { | |||
| 321 | 327 | ||
| 322 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} | 328 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} |
| 323 | 329 | ||
| 330 | impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> { | ||
| 331 | fn from(value: SendFuture<'ch, M, T, N>) -> Self { | ||
| 332 | Self { | ||
| 333 | channel: value.channel, | ||
| 334 | message: value.message, | ||
| 335 | } | ||
| 336 | } | ||
| 337 | } | ||
| 338 | |||
| 324 | pub(crate) trait DynamicChannel<T> { | 339 | pub(crate) trait DynamicChannel<T> { |
| 325 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; | 340 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; |
| 326 | 341 | ||
| @@ -507,6 +522,16 @@ where | |||
| 507 | Receiver { channel: self } | 522 | Receiver { channel: self } |
| 508 | } | 523 | } |
| 509 | 524 | ||
| 525 | /// Get a sender for this channel using dynamic dispatch. | ||
| 526 | pub fn dyn_sender(&self) -> DynamicSender<'_, T> { | ||
| 527 | DynamicSender { channel: self } | ||
| 528 | } | ||
| 529 | |||
| 530 | /// Get a receiver for this channel using dynamic dispatch. | ||
| 531 | pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> { | ||
| 532 | DynamicReceiver { channel: self } | ||
| 533 | } | ||
| 534 | |||
| 510 | /// Send a value, waiting until there is capacity. | 535 | /// Send a value, waiting until there is capacity. |
| 511 | /// | 536 | /// |
| 512 | /// Sending completes when the value has been pushed to the channel's queue. | 537 | /// Sending completes when the value has been pushed to the channel's queue. |
| @@ -648,7 +673,7 @@ mod tests { | |||
| 648 | } | 673 | } |
| 649 | 674 | ||
| 650 | #[test] | 675 | #[test] |
| 651 | fn dynamic_dispatch() { | 676 | fn dynamic_dispatch_into() { |
| 652 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | 677 | let c = Channel::<NoopRawMutex, u32, 3>::new(); |
| 653 | let s: DynamicSender<'_, u32> = c.sender().into(); | 678 | let s: DynamicSender<'_, u32> = c.sender().into(); |
| 654 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | 679 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); |
| @@ -657,6 +682,16 @@ mod tests { | |||
| 657 | assert_eq!(r.try_receive().unwrap(), 1); | 682 | assert_eq!(r.try_receive().unwrap(), 1); |
| 658 | } | 683 | } |
| 659 | 684 | ||
| 685 | #[test] | ||
| 686 | fn dynamic_dispatch_constructor() { | ||
| 687 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 688 | let s = c.dyn_sender(); | ||
| 689 | let r = c.dyn_receiver(); | ||
| 690 | |||
| 691 | assert!(s.try_send(1).is_ok()); | ||
| 692 | assert_eq!(r.try_receive().unwrap(), 1); | ||
| 693 | } | ||
| 694 | |||
| 660 | #[futures_test::test] | 695 | #[futures_test::test] |
| 661 | async fn receiver_receives_given_try_send_async() { | 696 | async fn receiver_receives_given_try_send_async() { |
| 662 | let executor = ThreadPool::new().unwrap(); | 697 | let executor = ThreadPool::new().unwrap(); |
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index 78e583c1c..2ac42c557 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | #![macro_use] | 1 | #![macro_use] |
| 2 | #![allow(unused_macros)] | 2 | #![allow(unused)] |
| 3 | 3 | ||
| 4 | use core::fmt::{Debug, Display, LowerHex}; | 4 | use core::fmt::{Debug, Display, LowerHex}; |
| 5 | 5 | ||
| @@ -229,7 +229,6 @@ impl<T, E> Try for Result<T, E> { | |||
| 229 | } | 229 | } |
| 230 | } | 230 | } |
| 231 | 231 | ||
| 232 | #[allow(unused)] | ||
| 233 | pub(crate) struct Bytes<'a>(pub &'a [u8]); | 232 | pub(crate) struct Bytes<'a>(pub &'a [u8]); |
| 234 | 233 | ||
| 235 | impl<'a> Debug for Bytes<'a> { | 234 | impl<'a> Debug for Bytes<'a> { |
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index b0ccfde57..d88c76db5 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -1,6 +1,4 @@ | |||
| 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] | 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] |
| 2 | #![cfg_attr(nightly, feature(async_fn_in_trait, impl_trait_projections))] | ||
| 3 | #![cfg_attr(nightly, allow(stable_features, unknown_lints))] | ||
| 4 | #![allow(async_fn_in_trait)] | 2 | #![allow(async_fn_in_trait)] |
| 5 | #![allow(clippy::new_without_default)] | 3 | #![allow(clippy::new_without_default)] |
| 6 | #![doc = include_str!("../README.md")] | 4 | #![doc = include_str!("../README.md")] |
diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index bd75c0135..e77678c24 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs | |||
| @@ -325,7 +325,7 @@ where | |||
| 325 | /// | 325 | /// |
| 326 | /// Sent data may be reordered based on their priorty within the channel. | 326 | /// Sent data may be reordered based on their priorty within the channel. |
| 327 | /// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] | 327 | /// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`] |
| 328 | /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be recieved as `[3, 2, 1]`. | 328 | /// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`. |
| 329 | pub struct PriorityChannel<M, T, K, const N: usize> | 329 | pub struct PriorityChannel<M, T, K, const N: usize> |
| 330 | where | 330 | where |
| 331 | T: Ord, | 331 | T: Ord, |
diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs index d95ffa7c9..81e60c42b 100644 --- a/embassy-sync/src/ring_buffer.rs +++ b/embassy-sync/src/ring_buffer.rs | |||
| @@ -3,7 +3,7 @@ use core::ops::Range; | |||
| 3 | pub struct RingBuffer<const N: usize> { | 3 | pub struct RingBuffer<const N: usize> { |
| 4 | start: usize, | 4 | start: usize, |
| 5 | end: usize, | 5 | end: usize, |
| 6 | empty: bool, | 6 | full: bool, |
| 7 | } | 7 | } |
| 8 | 8 | ||
| 9 | impl<const N: usize> RingBuffer<N> { | 9 | impl<const N: usize> RingBuffer<N> { |
| @@ -11,13 +11,13 @@ impl<const N: usize> RingBuffer<N> { | |||
| 11 | Self { | 11 | Self { |
| 12 | start: 0, | 12 | start: 0, |
| 13 | end: 0, | 13 | end: 0, |
| 14 | empty: true, | 14 | full: false, |
| 15 | } | 15 | } |
| 16 | } | 16 | } |
| 17 | 17 | ||
| 18 | pub fn push_buf(&mut self) -> Range<usize> { | 18 | pub fn push_buf(&mut self) -> Range<usize> { |
| 19 | if self.start == self.end && !self.empty { | 19 | if self.is_full() { |
| 20 | trace!(" ringbuf: push_buf empty"); | 20 | trace!(" ringbuf: push_buf full"); |
| 21 | return 0..0; | 21 | return 0..0; |
| 22 | } | 22 | } |
| 23 | 23 | ||
| @@ -38,11 +38,11 @@ impl<const N: usize> RingBuffer<N> { | |||
| 38 | } | 38 | } |
| 39 | 39 | ||
| 40 | self.end = self.wrap(self.end + n); | 40 | self.end = self.wrap(self.end + n); |
| 41 | self.empty = false; | 41 | self.full = self.start == self.end; |
| 42 | } | 42 | } |
| 43 | 43 | ||
| 44 | pub fn pop_buf(&mut self) -> Range<usize> { | 44 | pub fn pop_buf(&mut self) -> Range<usize> { |
| 45 | if self.empty { | 45 | if self.is_empty() { |
| 46 | trace!(" ringbuf: pop_buf empty"); | 46 | trace!(" ringbuf: pop_buf empty"); |
| 47 | return 0..0; | 47 | return 0..0; |
| 48 | } | 48 | } |
| @@ -64,20 +64,20 @@ impl<const N: usize> RingBuffer<N> { | |||
| 64 | } | 64 | } |
| 65 | 65 | ||
| 66 | self.start = self.wrap(self.start + n); | 66 | self.start = self.wrap(self.start + n); |
| 67 | self.empty = self.start == self.end; | 67 | self.full = false; |
| 68 | } | 68 | } |
| 69 | 69 | ||
| 70 | pub fn is_full(&self) -> bool { | 70 | pub fn is_full(&self) -> bool { |
| 71 | self.start == self.end && !self.empty | 71 | self.full |
| 72 | } | 72 | } |
| 73 | 73 | ||
| 74 | pub fn is_empty(&self) -> bool { | 74 | pub fn is_empty(&self) -> bool { |
| 75 | self.empty | 75 | self.start == self.end && !self.full |
| 76 | } | 76 | } |
| 77 | 77 | ||
| 78 | #[allow(unused)] | 78 | #[allow(unused)] |
| 79 | pub fn len(&self) -> usize { | 79 | pub fn len(&self) -> usize { |
| 80 | if self.empty { | 80 | if self.is_empty() { |
| 81 | 0 | 81 | 0 |
| 82 | } else if self.start < self.end { | 82 | } else if self.start < self.end { |
| 83 | self.end - self.start | 83 | self.end - self.start |
| @@ -89,7 +89,7 @@ impl<const N: usize> RingBuffer<N> { | |||
| 89 | pub fn clear(&mut self) { | 89 | pub fn clear(&mut self) { |
| 90 | self.start = 0; | 90 | self.start = 0; |
| 91 | self.end = 0; | 91 | self.end = 0; |
| 92 | self.empty = true; | 92 | self.full = false; |
| 93 | } | 93 | } |
| 94 | 94 | ||
| 95 | fn wrap(&self, n: usize) -> usize { | 95 | fn wrap(&self, n: usize) -> usize { |
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index bea67d8be..520f1a896 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs | |||
| @@ -111,7 +111,21 @@ where | |||
| 111 | poll_fn(move |cx| self.poll_wait(cx)) | 111 | poll_fn(move |cx| self.poll_wait(cx)) |
| 112 | } | 112 | } |
| 113 | 113 | ||
| 114 | /// non-blocking method to check whether this signal has been signaled. | 114 | /// non-blocking method to try and take the signal value. |
| 115 | pub fn try_take(&self) -> Option<T> { | ||
| 116 | self.state.lock(|cell| { | ||
| 117 | let state = cell.replace(State::None); | ||
| 118 | match state { | ||
| 119 | State::Signaled(res) => Some(res), | ||
| 120 | state => { | ||
| 121 | cell.set(state); | ||
| 122 | None | ||
| 123 | } | ||
| 124 | } | ||
| 125 | }) | ||
| 126 | } | ||
| 127 | |||
| 128 | /// non-blocking method to check whether this signal has been signaled. This does not clear the signal. | ||
| 115 | pub fn signaled(&self) -> bool { | 129 | pub fn signaled(&self) -> bool { |
| 116 | self.state.lock(|cell| { | 130 | self.state.lock(|cell| { |
| 117 | let state = cell.replace(State::None); | 131 | let state = cell.replace(State::None); |
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs index f704cbd5d..cfce9a571 100644 --- a/embassy-sync/src/zerocopy_channel.rs +++ b/embassy-sync/src/zerocopy_channel.rs | |||
| @@ -1,10 +1,7 @@ | |||
| 1 | //! A zero-copy queue for sending values between asynchronous tasks. | 1 | //! A zero-copy queue for sending values between asynchronous tasks. |
| 2 | //! | 2 | //! |
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | 3 | //! It can be used concurrently by a producer (sender) and a |
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | 4 | //! consumer (receiver), i.e. it is an "SPSC channel". |
| 5 | //! | ||
| 6 | //! Receivers are competing for messages. So a message that is received by | ||
| 7 | //! one receiver is not received by any other. | ||
| 8 | //! | 5 | //! |
| 9 | //! This queue takes a Mutex type so that various | 6 | //! This queue takes a Mutex type so that various |
| 10 | //! targets can be attained. For example, a ThreadModeMutex can be used | 7 | //! targets can be attained. For example, a ThreadModeMutex can be used |
