diff options
| author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2022-06-21 20:04:27 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-06-21 20:04:27 +0000 |
| commit | 9721b2bf5b7cc75f12d7fe39a444084ca11be845 (patch) | |
| tree | e85ad158726156343b1ba157efc7f19178b0b69e /examples | |
| parent | e4fbfaf5682aea8b20ffd559bfbbbbc0edf957a4 (diff) | |
| parent | 78c546f3566d4289ebd9512fa3e0a03eee6fd87f (diff) | |
Merge #817
817: Added a pubsub channel implementation r=lulf a=diondokter
This is similar to Tokio's Broadcast channel, except that it doesn't allocate.
The publishers and subscribers are dynamic. They use an &dyn channel reference because it's really annoying to have to specify the mutex and const generics every time.
Do we need fully generic types as well?
Co-authored-by: Dion Dokter <[email protected]>
Co-authored-by: Dion Dokter <[email protected]>
Diffstat (limited to 'examples')
| -rw-r--r-- | examples/nrf/src/bin/pubsub.rs | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/examples/nrf/src/bin/pubsub.rs b/examples/nrf/src/bin/pubsub.rs new file mode 100644 index 000000000..2c3a355c2 --- /dev/null +++ b/examples/nrf/src/bin/pubsub.rs | |||
| @@ -0,0 +1,106 @@ | |||
| 1 | #![no_std] | ||
| 2 | #![no_main] | ||
| 3 | #![feature(type_alias_impl_trait)] | ||
| 4 | |||
| 5 | use defmt::unwrap; | ||
| 6 | use embassy::blocking_mutex::raw::ThreadModeRawMutex; | ||
| 7 | use embassy::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber}; | ||
| 8 | use embassy::executor::Spawner; | ||
| 9 | use embassy::time::{Duration, Timer}; | ||
| 10 | use {defmt_rtt as _, panic_probe as _}; | ||
| 11 | |||
| 12 | /// Create the message bus. It has a queue of 4, supports 3 subscribers and 1 publisher | ||
| 13 | static MESSAGE_BUS: PubSubChannel<ThreadModeRawMutex, Message, 4, 3, 1> = PubSubChannel::new(); | ||
| 14 | |||
| 15 | #[derive(Clone, defmt::Format)] | ||
| 16 | enum Message { | ||
| 17 | A, | ||
| 18 | B, | ||
| 19 | C, | ||
| 20 | } | ||
| 21 | |||
| 22 | #[embassy::main] | ||
| 23 | async fn main(spawner: Spawner, _p: embassy_nrf::Peripherals) { | ||
| 24 | defmt::info!("Hello World!"); | ||
| 25 | |||
| 26 | // It's good to set up the subscribers before publishing anything. | ||
| 27 | // A subscriber will only yield messages that have been published after its creation. | ||
| 28 | |||
| 29 | spawner.must_spawn(fast_logger(unwrap!(MESSAGE_BUS.subscriber()))); | ||
| 30 | spawner.must_spawn(slow_logger(unwrap!(MESSAGE_BUS.dyn_subscriber()))); | ||
| 31 | spawner.must_spawn(slow_logger_pure(unwrap!(MESSAGE_BUS.dyn_subscriber()))); | ||
| 32 | |||
| 33 | // Get a publisher | ||
| 34 | let message_publisher = unwrap!(MESSAGE_BUS.publisher()); | ||
| 35 | // We can't get more (normal) publishers | ||
| 36 | // We can have an infinite amount of immediate publishers. They can't await a publish, only do an immediate publish | ||
| 37 | defmt::assert!(MESSAGE_BUS.publisher().is_err()); | ||
| 38 | |||
| 39 | let mut index = 0; | ||
| 40 | loop { | ||
| 41 | Timer::after(Duration::from_millis(500)).await; | ||
| 42 | |||
| 43 | let message = match index % 3 { | ||
| 44 | 0 => Message::A, | ||
| 45 | 1 => Message::B, | ||
| 46 | 2..=u32::MAX => Message::C, | ||
| 47 | }; | ||
| 48 | |||
| 49 | // We publish immediately and don't await anything. | ||
| 50 | // If the queue is full, it will cause the oldest message to not be received by some/all subscribers | ||
| 51 | message_publisher.publish_immediate(message); | ||
| 52 | |||
| 53 | // Try to comment out the last one and uncomment this line below. | ||
| 54 | // The behaviour will change: | ||
| 55 | // - The subscribers won't miss any messages any more | ||
| 56 | // - Trying to publish now has some wait time when the queue is full | ||
| 57 | |||
| 58 | // message_publisher.publish(message).await; | ||
| 59 | |||
| 60 | index += 1; | ||
| 61 | } | ||
| 62 | } | ||
| 63 | |||
| 64 | /// A logger task that just awaits the messages it receives | ||
| 65 | /// | ||
| 66 | /// This takes the generic `Subscriber`. This is most performant, but requires you to write down all of the generics | ||
| 67 | #[embassy::task] | ||
| 68 | async fn fast_logger(mut messages: Subscriber<'static, ThreadModeRawMutex, Message, 4, 3, 1>) { | ||
| 69 | loop { | ||
| 70 | let message = messages.next_message().await; | ||
| 71 | defmt::info!("Received message at fast logger: {:?}", message); | ||
| 72 | } | ||
| 73 | } | ||
| 74 | |||
| 75 | /// A logger task that awaits the messages, but also does some other work. | ||
| 76 | /// Because of this, depeding on how the messages were published, the subscriber might miss some messages | ||
| 77 | /// | ||
| 78 | /// This takes the dynamic `DynSubscriber`. This is not as performant as the generic version, but let's you ignore some of the generics | ||
| 79 | #[embassy::task] | ||
| 80 | async fn slow_logger(mut messages: DynSubscriber<'static, Message>) { | ||
| 81 | loop { | ||
| 82 | // Do some work | ||
| 83 | Timer::after(Duration::from_millis(2000)).await; | ||
| 84 | |||
| 85 | // If the publisher has used the `publish_immediate` function, then we may receive a lag message here | ||
| 86 | let message = messages.next_message().await; | ||
| 87 | defmt::info!("Received message at slow logger: {:?}", message); | ||
| 88 | |||
| 89 | // If the previous one was a lag message, then we should receive the next message here immediately | ||
| 90 | let message = messages.next_message().await; | ||
| 91 | defmt::info!("Received message at slow logger: {:?}", message); | ||
| 92 | } | ||
| 93 | } | ||
| 94 | |||
| 95 | /// Same as `slow_logger` but it ignores lag results | ||
| 96 | #[embassy::task] | ||
| 97 | async fn slow_logger_pure(mut messages: DynSubscriber<'static, Message>) { | ||
| 98 | loop { | ||
| 99 | // Do some work | ||
| 100 | Timer::after(Duration::from_millis(2000)).await; | ||
| 101 | |||
| 102 | // Instead of receiving lags here, we just ignore that and read the next message | ||
| 103 | let message = messages.next_message_pure().await; | ||
| 104 | defmt::info!("Received message at slow logger pure: {:?}", message); | ||
| 105 | } | ||
| 106 | } | ||
