aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlf Lilleengen <[email protected]>2024-05-20 18:30:19 +0000
committerGitHub <[email protected]>2024-05-20 18:30:19 +0000
commit8e7361f4ca1fe0917eeefaeaced571e6395c265b (patch)
tree172558f68557546e1d6ff154e70054edd79da04c
parentc74acae7c0f5475422dfef5d1e184acfe2f569b0 (diff)
parent5cffaf323b382e90d64e39ee5574ab8a9fa5640d (diff)
Merge pull request #2969 from diondokter/pubsub-cleanup
Pubsub cleanup
-rw-r--r--cyw43/src/runner.rs18
-rw-r--r--embassy-sync/CHANGELOG.md2
-rw-r--r--embassy-sync/src/pubsub/mod.rs93
-rw-r--r--embassy-sync/src/pubsub/publisher.rs58
-rw-r--r--embassy-sync/src/pubsub/subscriber.rs31
5 files changed, 159 insertions, 43 deletions
diff --git a/cyw43/src/runner.rs b/cyw43/src/runner.rs
index c72cf0def..e90316302 100644
--- a/cyw43/src/runner.rs
+++ b/cyw43/src/runner.rs
@@ -1,6 +1,5 @@
1use embassy_futures::select::{select3, Either3}; 1use embassy_futures::select::{select3, Either3};
2use embassy_net_driver_channel as ch; 2use embassy_net_driver_channel as ch;
3use embassy_sync::pubsub::PubSubBehavior;
4use embassy_time::{block_for, Duration, Timer}; 3use embassy_time::{block_for, Duration, Timer};
5use embedded_hal_1::digital::OutputPin; 4use embedded_hal_1::digital::OutputPin;
6 5
@@ -438,13 +437,16 @@ where
438 // publish() is a deadlock risk in the current design as awaiting here prevents ioctls 437 // publish() is a deadlock risk in the current design as awaiting here prevents ioctls
439 // The `Runner` always yields when accessing the device, so consumers always have a chance to receive the event 438 // The `Runner` always yields when accessing the device, so consumers always have a chance to receive the event
440 // (if they are actively awaiting the queue) 439 // (if they are actively awaiting the queue)
441 self.events.queue.publish_immediate(events::Message::new( 440 self.events
442 Status { 441 .queue
443 event_type: evt_type, 442 .immediate_publisher()
444 status, 443 .publish_immediate(events::Message::new(
445 }, 444 Status {
446 event_payload, 445 event_type: evt_type,
447 )); 446 status,
447 },
448 event_payload,
449 ));
448 } 450 }
449 } 451 }
450 CHANNEL_TYPE_DATA => { 452 CHANNEL_TYPE_DATA => {
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md
index 7a830a853..bb919b28a 100644
--- a/embassy-sync/CHANGELOG.md
+++ b/embassy-sync/CHANGELOG.md
@@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
10- Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `Channel`. 10- Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `Channel`.
11- Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `PriorityChannel`. 11- Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `PriorityChannel`.
12- Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`. 12- Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`.
13- Made `PubSubBehavior` sealed
14 - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)`
13 15
14## 0.5.0 - 2023-12-04 16## 0.5.0 - 2023-12-04
15 17
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
index 754747ab8..af3d6db2a 100644
--- a/embassy-sync/src/pubsub/mod.rs
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -189,7 +189,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
189 } 189 }
190} 190}
191 191
192impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> 192impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
193 for PubSubChannel<M, T, CAP, SUBS, PUBS> 193 for PubSubChannel<M, T, CAP, SUBS, PUBS>
194{ 194{
195 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { 195 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
@@ -248,13 +248,6 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
248 }) 248 })
249 } 249 }
250 250
251 fn space(&self) -> usize {
252 self.inner.lock(|s| {
253 let s = s.borrow();
254 s.queue.capacity() - s.queue.len()
255 })
256 }
257
258 fn unregister_subscriber(&self, subscriber_next_message_id: u64) { 251 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
259 self.inner.lock(|s| { 252 self.inner.lock(|s| {
260 let mut s = s.borrow_mut(); 253 let mut s = s.borrow_mut();
@@ -268,6 +261,26 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
268 s.unregister_publisher() 261 s.unregister_publisher()
269 }) 262 })
270 } 263 }
264
265 fn capacity(&self) -> usize {
266 self.capacity()
267 }
268
269 fn free_capacity(&self) -> usize {
270 self.free_capacity()
271 }
272
273 fn len(&self) -> usize {
274 self.len()
275 }
276
277 fn is_empty(&self) -> bool {
278 self.is_empty()
279 }
280
281 fn is_full(&self) -> bool {
282 self.is_full()
283 }
271} 284}
272 285
273/// Internal state for the PubSub channel 286/// Internal state for the PubSub channel
@@ -421,7 +434,7 @@ pub enum Error {
421 434
422/// 'Middle level' behaviour of the pubsub channel. 435/// 'Middle level' behaviour of the pubsub channel.
423/// This trait is used so that Sub and Pub can be generic over the channel. 436/// This trait is used so that Sub and Pub can be generic over the channel.
424pub trait PubSubBehavior<T> { 437trait SealedPubSubBehavior<T> {
425 /// Try to get a message from the queue with the given message id. 438 /// Try to get a message from the queue with the given message id.
426 /// 439 ///
427 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. 440 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
@@ -439,8 +452,22 @@ pub trait PubSubBehavior<T> {
439 /// Publish a message immediately 452 /// Publish a message immediately
440 fn publish_immediate(&self, message: T); 453 fn publish_immediate(&self, message: T);
441 454
442 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 455 /// Returns the maximum number of elements the channel can hold.
443 fn space(&self) -> usize; 456 fn capacity(&self) -> usize;
457
458 /// Returns the free capacity of the channel.
459 ///
460 /// This is equivalent to `capacity() - len()`
461 fn free_capacity(&self) -> usize;
462
463 /// Returns the number of elements currently in the channel.
464 fn len(&self) -> usize;
465
466 /// Returns whether the channel is empty.
467 fn is_empty(&self) -> bool;
468
469 /// Returns whether the channel is full.
470 fn is_full(&self) -> bool;
444 471
445 /// Let the channel know that a subscriber has dropped 472 /// Let the channel know that a subscriber has dropped
446 fn unregister_subscriber(&self, subscriber_next_message_id: u64); 473 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
@@ -449,6 +476,13 @@ pub trait PubSubBehavior<T> {
449 fn unregister_publisher(&self); 476 fn unregister_publisher(&self);
450} 477}
451 478
479/// 'Middle level' behaviour of the pubsub channel.
480/// This trait is used so that Sub and Pub can be generic over the channel.
481#[allow(private_bounds)]
482pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {}
483
484impl<T, C: SealedPubSubBehavior<T>> PubSubBehavior<T> for C {}
485
452/// The result of the subscriber wait procedure 486/// The result of the subscriber wait procedure
453#[derive(Debug, Clone, PartialEq, Eq)] 487#[derive(Debug, Clone, PartialEq, Eq)]
454#[cfg_attr(feature = "defmt", derive(defmt::Format))] 488#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -581,6 +615,7 @@ mod tests {
581 assert_eq!(pub0.try_publish(0), Ok(())); 615 assert_eq!(pub0.try_publish(0), Ok(()));
582 assert_eq!(pub0.try_publish(0), Ok(())); 616 assert_eq!(pub0.try_publish(0), Ok(()));
583 assert_eq!(pub0.try_publish(0), Ok(())); 617 assert_eq!(pub0.try_publish(0), Ok(()));
618 assert!(pub0.is_full());
584 assert_eq!(pub0.try_publish(0), Err(0)); 619 assert_eq!(pub0.try_publish(0), Err(0));
585 620
586 drop(sub0); 621 drop(sub0);
@@ -613,32 +648,42 @@ mod tests {
613 } 648 }
614 649
615 #[futures_test::test] 650 #[futures_test::test]
616 async fn correct_space() { 651 async fn correct_len() {
617 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); 652 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
618 653
619 let mut sub0 = channel.subscriber().unwrap(); 654 let mut sub0 = channel.subscriber().unwrap();
620 let mut sub1 = channel.subscriber().unwrap(); 655 let mut sub1 = channel.subscriber().unwrap();
621 let pub0 = channel.publisher().unwrap(); 656 let pub0 = channel.publisher().unwrap();
622 657
623 assert_eq!(pub0.space(), 4); 658 assert!(sub0.is_empty());
659 assert!(sub1.is_empty());
660 assert!(pub0.is_empty());
661 assert_eq!(pub0.free_capacity(), 4);
662 assert_eq!(pub0.len(), 0);
624 663
625 pub0.publish(42).await; 664 pub0.publish(42).await;
626 665
627 assert_eq!(pub0.space(), 3); 666 assert_eq!(pub0.free_capacity(), 3);
667 assert_eq!(pub0.len(), 1);
628 668
629 pub0.publish(42).await; 669 pub0.publish(42).await;
630 670
631 assert_eq!(pub0.space(), 2); 671 assert_eq!(pub0.free_capacity(), 2);
672 assert_eq!(pub0.len(), 2);
632 673
633 sub0.next_message().await; 674 sub0.next_message().await;
634 sub0.next_message().await; 675 sub0.next_message().await;
635 676
636 assert_eq!(pub0.space(), 2); 677 assert_eq!(pub0.free_capacity(), 2);
678 assert_eq!(pub0.len(), 2);
637 679
638 sub1.next_message().await; 680 sub1.next_message().await;
639 assert_eq!(pub0.space(), 3); 681 assert_eq!(pub0.free_capacity(), 3);
682 assert_eq!(pub0.len(), 1);
683
640 sub1.next_message().await; 684 sub1.next_message().await;
641 assert_eq!(pub0.space(), 4); 685 assert_eq!(pub0.free_capacity(), 4);
686 assert_eq!(pub0.len(), 0);
642 } 687 }
643 688
644 #[futures_test::test] 689 #[futures_test::test]
@@ -649,29 +694,29 @@ mod tests {
649 let mut sub0 = channel.subscriber().unwrap(); 694 let mut sub0 = channel.subscriber().unwrap();
650 let mut sub1 = channel.subscriber().unwrap(); 695 let mut sub1 = channel.subscriber().unwrap();
651 696
652 assert_eq!(4, pub0.space()); 697 assert_eq!(4, pub0.free_capacity());
653 698
654 pub0.publish(1).await; 699 pub0.publish(1).await;
655 pub0.publish(2).await; 700 pub0.publish(2).await;
656 701
657 assert_eq!(2, channel.space()); 702 assert_eq!(2, channel.free_capacity());
658 703
659 assert_eq!(1, sub0.try_next_message_pure().unwrap()); 704 assert_eq!(1, sub0.try_next_message_pure().unwrap());
660 assert_eq!(2, sub0.try_next_message_pure().unwrap()); 705 assert_eq!(2, sub0.try_next_message_pure().unwrap());
661 706
662 assert_eq!(2, channel.space()); 707 assert_eq!(2, channel.free_capacity());
663 708
664 drop(sub0); 709 drop(sub0);
665 710
666 assert_eq!(2, channel.space()); 711 assert_eq!(2, channel.free_capacity());
667 712
668 assert_eq!(1, sub1.try_next_message_pure().unwrap()); 713 assert_eq!(1, sub1.try_next_message_pure().unwrap());
669 714
670 assert_eq!(3, channel.space()); 715 assert_eq!(3, channel.free_capacity());
671 716
672 drop(sub1); 717 drop(sub1);
673 718
674 assert_eq!(4, channel.space()); 719 assert_eq!(4, channel.free_capacity());
675 } 720 }
676 721
677 struct CloneCallCounter(usize); 722 struct CloneCallCounter(usize);
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs
index e1edc9eb9..26e2c63b7 100644
--- a/embassy-sync/src/pubsub/publisher.rs
+++ b/embassy-sync/src/pubsub/publisher.rs
@@ -43,12 +43,31 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
43 self.channel.publish_with_context(message, None) 43 self.channel.publish_with_context(message, None)
44 } 44 }
45 45
46 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 46 /// Returns the maximum number of elements the ***channel*** can hold.
47 pub fn capacity(&self) -> usize {
48 self.channel.capacity()
49 }
50
51 /// Returns the free capacity of the ***channel***.
47 /// 52 ///
48 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 53 /// This is equivalent to `capacity() - len()`
49 /// So checking doesn't give any guarantees.* 54 pub fn free_capacity(&self) -> usize {
50 pub fn space(&self) -> usize { 55 self.channel.free_capacity()
51 self.channel.space() 56 }
57
58 /// Returns the number of elements currently in the ***channel***.
59 pub fn len(&self) -> usize {
60 self.channel.len()
61 }
62
63 /// Returns whether the ***channel*** is empty.
64 pub fn is_empty(&self) -> bool {
65 self.channel.is_empty()
66 }
67
68 /// Returns whether the ***channel*** is full.
69 pub fn is_full(&self) -> bool {
70 self.channel.is_full()
52 } 71 }
53} 72}
54 73
@@ -124,12 +143,31 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
124 self.channel.publish_with_context(message, None) 143 self.channel.publish_with_context(message, None)
125 } 144 }
126 145
127 /// The amount of messages that can still be published without having to wait or without having to lag the subscribers 146 /// Returns the maximum number of elements the ***channel*** can hold.
147 pub fn capacity(&self) -> usize {
148 self.channel.capacity()
149 }
150
151 /// Returns the free capacity of the ***channel***.
128 /// 152 ///
129 /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. 153 /// This is equivalent to `capacity() - len()`
130 /// So checking doesn't give any guarantees.* 154 pub fn free_capacity(&self) -> usize {
131 pub fn space(&self) -> usize { 155 self.channel.free_capacity()
132 self.channel.space() 156 }
157
158 /// Returns the number of elements currently in the ***channel***.
159 pub fn len(&self) -> usize {
160 self.channel.len()
161 }
162
163 /// Returns whether the ***channel*** is empty.
164 pub fn is_empty(&self) -> bool {
165 self.channel.is_empty()
166 }
167
168 /// Returns whether the ***channel*** is full.
169 pub fn is_full(&self) -> bool {
170 self.channel.is_full()
133 } 171 }
134} 172}
135 173
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs
index f420a75f0..2e2bd26a9 100644
--- a/embassy-sync/src/pubsub/subscriber.rs
+++ b/embassy-sync/src/pubsub/subscriber.rs
@@ -65,10 +65,39 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
65 } 65 }
66 } 66 }
67 67
68 /// The amount of messages this subscriber hasn't received yet 68 /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
69 /// for this subscriber.
69 pub fn available(&self) -> u64 { 70 pub fn available(&self) -> u64 {
70 self.channel.available(self.next_message_id) 71 self.channel.available(self.next_message_id)
71 } 72 }
73
74 /// Returns the maximum number of elements the ***channel*** can hold.
75 pub fn capacity(&self) -> usize {
76 self.channel.capacity()
77 }
78
79 /// Returns the free capacity of the ***channel***.
80 ///
81 /// This is equivalent to `capacity() - len()`
82 pub fn free_capacity(&self) -> usize {
83 self.channel.free_capacity()
84 }
85
86 /// Returns the number of elements currently in the ***channel***.
87 /// See [Self::available] for how many messages are available for this subscriber.
88 pub fn len(&self) -> usize {
89 self.channel.len()
90 }
91
92 /// Returns whether the ***channel*** is empty.
93 pub fn is_empty(&self) -> bool {
94 self.channel.is_empty()
95 }
96
97 /// Returns whether the ***channel*** is full.
98 pub fn is_full(&self) -> bool {
99 self.channel.is_full()
100 }
72} 101}
73 102
74impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { 103impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {