aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDion Dokter <[email protected]>2022-06-21 15:47:20 +0200
committerDion Dokter <[email protected]>2022-06-21 15:47:20 +0200
commit78c546f3566d4289ebd9512fa3e0a03eee6fd87f (patch)
tree11722d0299a679ad6a6a971abb05fc6ff36bb28f
parent1eec7e69f1be1bf91b8ca576cc8d8e30629705be (diff)
Added example and some defmt
-rw-r--r--embassy/src/channel/pubsub/mod.rs2
-rw-r--r--examples/nrf/src/bin/pubsub.rs106
2 files changed, 108 insertions, 0 deletions
diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs
index 12f0d24ab..9bfb845e0 100644
--- a/embassy/src/channel/pubsub/mod.rs
+++ b/embassy/src/channel/pubsub/mod.rs
@@ -370,6 +370,7 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
370 370
371/// Error type for the [PubSubChannel] 371/// Error type for the [PubSubChannel]
372#[derive(Debug, PartialEq, Clone)] 372#[derive(Debug, PartialEq, Clone)]
373#[cfg_attr(feature = "defmt", derive(defmt::Format))]
373pub enum Error { 374pub enum Error {
374 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or 375 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
375 /// the capacity of the channels must be increased. 376 /// the capacity of the channels must be increased.
@@ -404,6 +405,7 @@ pub trait PubSubBehavior<T> {
404 405
405/// The result of the subscriber wait procedure 406/// The result of the subscriber wait procedure
406#[derive(Debug, Clone, PartialEq)] 407#[derive(Debug, Clone, PartialEq)]
408#[cfg_attr(feature = "defmt", derive(defmt::Format))]
407pub enum WaitResult<T> { 409pub enum WaitResult<T> {
408 /// The subscriber did not receive all messages and lagged by the given amount of messages. 410 /// The subscriber did not receive all messages and lagged by the given amount of messages.
409 /// (This is the amount of messages that were missed) 411 /// (This is the amount of messages that were missed)
diff --git a/examples/nrf/src/bin/pubsub.rs b/examples/nrf/src/bin/pubsub.rs
new file mode 100644
index 000000000..2c3a355c2
--- /dev/null
+++ b/examples/nrf/src/bin/pubsub.rs
@@ -0,0 +1,106 @@
1#![no_std]
2#![no_main]
3#![feature(type_alias_impl_trait)]
4
5use defmt::unwrap;
6use embassy::blocking_mutex::raw::ThreadModeRawMutex;
7use embassy::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber};
8use embassy::executor::Spawner;
9use embassy::time::{Duration, Timer};
10use {defmt_rtt as _, panic_probe as _};
11
12/// Create the message bus. It has a queue of 4, supports 3 subscribers and 1 publisher
13static MESSAGE_BUS: PubSubChannel<ThreadModeRawMutex, Message, 4, 3, 1> = PubSubChannel::new();
14
15#[derive(Clone, defmt::Format)]
16enum Message {
17 A,
18 B,
19 C,
20}
21
22#[embassy::main]
23async fn main(spawner: Spawner, _p: embassy_nrf::Peripherals) {
24 defmt::info!("Hello World!");
25
26 // It's good to set up the subscribers before publishing anything.
27 // A subscriber will only yield messages that have been published after its creation.
28
29 spawner.must_spawn(fast_logger(unwrap!(MESSAGE_BUS.subscriber())));
30 spawner.must_spawn(slow_logger(unwrap!(MESSAGE_BUS.dyn_subscriber())));
31 spawner.must_spawn(slow_logger_pure(unwrap!(MESSAGE_BUS.dyn_subscriber())));
32
33 // Get a publisher
34 let message_publisher = unwrap!(MESSAGE_BUS.publisher());
35 // We can't get more (normal) publishers
36 // We can have an infinite amount of immediate publishers. They can't await a publish, only do an immediate publish
37 defmt::assert!(MESSAGE_BUS.publisher().is_err());
38
39 let mut index = 0;
40 loop {
41 Timer::after(Duration::from_millis(500)).await;
42
43 let message = match index % 3 {
44 0 => Message::A,
45 1 => Message::B,
46 2..=u32::MAX => Message::C,
47 };
48
49 // We publish immediately and don't await anything.
50 // If the queue is full, it will cause the oldest message to not be received by some/all subscribers
51 message_publisher.publish_immediate(message);
52
53 // Try to comment out the last one and uncomment this line below.
54 // The behaviour will change:
55 // - The subscribers won't miss any messages any more
56 // - Trying to publish now has some wait time when the queue is full
57
58 // message_publisher.publish(message).await;
59
60 index += 1;
61 }
62}
63
64/// A logger task that just awaits the messages it receives
65///
66/// This takes the generic `Subscriber`. This is most performant, but requires you to write down all of the generics
67#[embassy::task]
68async fn fast_logger(mut messages: Subscriber<'static, ThreadModeRawMutex, Message, 4, 3, 1>) {
69 loop {
70 let message = messages.next_message().await;
71 defmt::info!("Received message at fast logger: {:?}", message);
72 }
73}
74
75/// A logger task that awaits the messages, but also does some other work.
76/// Because of this, depeding on how the messages were published, the subscriber might miss some messages
77///
78/// This takes the dynamic `DynSubscriber`. This is not as performant as the generic version, but let's you ignore some of the generics
79#[embassy::task]
80async fn slow_logger(mut messages: DynSubscriber<'static, Message>) {
81 loop {
82 // Do some work
83 Timer::after(Duration::from_millis(2000)).await;
84
85 // If the publisher has used the `publish_immediate` function, then we may receive a lag message here
86 let message = messages.next_message().await;
87 defmt::info!("Received message at slow logger: {:?}", message);
88
89 // If the previous one was a lag message, then we should receive the next message here immediately
90 let message = messages.next_message().await;
91 defmt::info!("Received message at slow logger: {:?}", message);
92 }
93}
94
95/// Same as `slow_logger` but it ignores lag results
96#[embassy::task]
97async fn slow_logger_pure(mut messages: DynSubscriber<'static, Message>) {
98 loop {
99 // Do some work
100 Timer::after(Duration::from_millis(2000)).await;
101
102 // Instead of receiving lags here, we just ignore that and read the next message
103 let message = messages.next_message_pure().await;
104 defmt::info!("Received message at slow logger pure: {:?}", message);
105 }
106}