aboutsummaryrefslogtreecommitdiff
path: root/embassy-sync/src/pubsub/mod.rs
diff options
context:
space:
mode:
authorDario Nieuwenhuis <[email protected]>2022-08-22 22:00:06 +0200
committerDario Nieuwenhuis <[email protected]>2022-08-22 22:18:13 +0200
commit5677b13a86beca58aa57ecfd7cea0db7ceb189fa (patch)
tree0c7425dae57acb94cb6ddca27def7e77609369b3 /embassy-sync/src/pubsub/mod.rs
parent21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 (diff)
sync: flatten module structure.
Diffstat (limited to 'embassy-sync/src/pubsub/mod.rs')
-rw-r--r--embassy-sync/src/pubsub/mod.rs542
1 files changed, 542 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs
new file mode 100644
index 000000000..62a9e4763
--- /dev/null
+++ b/embassy-sync/src/pubsub/mod.rs
@@ -0,0 +1,542 @@
1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2
3#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll, Waker};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
24///
25/// Any published message can be read by all subscribers.
26/// A publisher can choose how it sends its message.
27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31/// an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy_sync::pubsub::WaitResult;
38/// # use embassy_sync::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79 PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81 /// Create a new channel
82 pub const fn new() -> Self {
83 Self {
84 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85 }
86 }
87
88 /// Create a new subscriber. It will only receive messages that are published after its creation.
89 ///
90 /// If there are no subscriber slots left, an error will be returned.
91 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut();
94
95 if s.subscriber_count >= SUBS {
96 Err(Error::MaximumSubscribersReached)
97 } else {
98 s.subscriber_count += 1;
99 Ok(Subscriber(Sub::new(s.next_message_id, self)))
100 }
101 })
102 }
103
104 /// Create a new subscriber. It will only receive messages that are published after its creation.
105 ///
106 /// If there are no subscriber slots left, an error will be returned.
107 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108 self.inner.lock(|inner| {
109 let mut s = inner.borrow_mut();
110
111 if s.subscriber_count >= SUBS {
112 Err(Error::MaximumSubscribersReached)
113 } else {
114 s.subscriber_count += 1;
115 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116 }
117 })
118 }
119
120 /// Create a new publisher
121 ///
122 /// If there are no publisher slots left, an error will be returned.
123 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut();
126
127 if s.publisher_count >= PUBS {
128 Err(Error::MaximumPublishersReached)
129 } else {
130 s.publisher_count += 1;
131 Ok(Publisher(Pub::new(self)))
132 }
133 })
134 }
135
136 /// Create a new publisher
137 ///
138 /// If there are no publisher slots left, an error will be returned.
139 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140 self.inner.lock(|inner| {
141 let mut s = inner.borrow_mut();
142
143 if s.publisher_count >= PUBS {
144 Err(Error::MaximumPublishersReached)
145 } else {
146 s.publisher_count += 1;
147 Ok(DynPublisher(Pub::new(self)))
148 }
149 })
150 }
151
152 /// Create a new publisher that can only send immediate messages.
153 /// This kind of publisher does not take up a publisher slot.
154 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self))
156 }
157
158 /// Create a new publisher that can only send immediate messages.
159 /// This kind of publisher does not take up a publisher slot.
160 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self))
162 }
163}
164
165impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T>
166 for PubSubChannel<M, T, CAP, SUBS, PUBS>
167{
168 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
169 self.inner.lock(|s| {
170 let mut s = s.borrow_mut();
171
172 // Check if we can read a message
173 match s.get_message(*next_message_id) {
174 // Yes, so we are done polling
175 Some(WaitResult::Message(message)) => {
176 *next_message_id += 1;
177 Poll::Ready(WaitResult::Message(message))
178 }
179 // No, so we need to reregister our waker and sleep again
180 None => {
181 if let Some(cx) = cx {
182 s.register_subscriber_waker(cx.waker());
183 }
184 Poll::Pending
185 }
186 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
187 Some(WaitResult::Lagged(amount)) => {
188 *next_message_id += amount;
189 Poll::Ready(WaitResult::Lagged(amount))
190 }
191 }
192 })
193 }
194
195 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
196 self.inner.lock(|s| {
197 let mut s = s.borrow_mut();
198 // Try to publish the message
199 match s.try_publish(message) {
200 // We did it, we are ready
201 Ok(()) => Ok(()),
202 // The queue is full, so we need to reregister our waker and go to sleep
203 Err(message) => {
204 if let Some(cx) = cx {
205 s.register_publisher_waker(cx.waker());
206 }
207 Err(message)
208 }
209 }
210 })
211 }
212
213 fn publish_immediate(&self, message: T) {
214 self.inner.lock(|s| {
215 let mut s = s.borrow_mut();
216 s.publish_immediate(message)
217 })
218 }
219
220 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
221 self.inner.lock(|s| {
222 let mut s = s.borrow_mut();
223 s.unregister_subscriber(subscriber_next_message_id)
224 })
225 }
226
227 fn unregister_publisher(&self) {
228 self.inner.lock(|s| {
229 let mut s = s.borrow_mut();
230 s.unregister_publisher()
231 })
232 }
233}
234
235/// Internal state for the PubSub channel
236struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
237 /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
238 queue: Deque<(T, usize), CAP>,
239 /// Every message has an id.
240 /// Don't worry, we won't run out.
241 /// If a million messages were published every second, then the ID's would run out in about 584942 years.
242 next_message_id: u64,
243 /// Collection of wakers for Subscribers that are waiting.
244 subscriber_wakers: MultiWakerRegistration<SUBS>,
245 /// Collection of wakers for Publishers that are waiting.
246 publisher_wakers: MultiWakerRegistration<PUBS>,
247 /// The amount of subscribers that are active
248 subscriber_count: usize,
249 /// The amount of publishers that are active
250 publisher_count: usize,
251}
252
253impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
254 /// Create a new internal channel state
255 const fn new() -> Self {
256 Self {
257 queue: Deque::new(),
258 next_message_id: 0,
259 subscriber_wakers: MultiWakerRegistration::new(),
260 publisher_wakers: MultiWakerRegistration::new(),
261 subscriber_count: 0,
262 publisher_count: 0,
263 }
264 }
265
266 fn try_publish(&mut self, message: T) -> Result<(), T> {
267 if self.subscriber_count == 0 {
268 // We don't need to publish anything because there is no one to receive it
269 return Ok(());
270 }
271
272 if self.queue.is_full() {
273 return Err(message);
274 }
275 // We just did a check for this
276 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
277
278 self.next_message_id += 1;
279
280 // Wake all of the subscribers
281 self.subscriber_wakers.wake();
282
283 Ok(())
284 }
285
286 fn publish_immediate(&mut self, message: T) {
287 // Make space in the queue if required
288 if self.queue.is_full() {
289 self.queue.pop_front();
290 }
291
292 // This will succeed because we made sure there is space
293 self.try_publish(message).ok().unwrap();
294 }
295
296 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
297 let start_id = self.next_message_id - self.queue.len() as u64;
298
299 if message_id < start_id {
300 return Some(WaitResult::Lagged(start_id - message_id));
301 }
302
303 let current_message_index = (message_id - start_id) as usize;
304
305 if current_message_index >= self.queue.len() {
306 return None;
307 }
308
309 // We've checked that the index is valid
310 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
311
312 // We're reading this item, so decrement the counter
313 queue_item.1 -= 1;
314 let message = queue_item.0.clone();
315
316 if current_message_index == 0 && queue_item.1 == 0 {
317 self.queue.pop_front();
318 self.publisher_wakers.wake();
319 }
320
321 Some(WaitResult::Message(message))
322 }
323
324 fn register_subscriber_waker(&mut self, waker: &Waker) {
325 match self.subscriber_wakers.register(waker) {
326 Ok(()) => {}
327 Err(_) => {
328 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
329 // We need to throw it away. It's a bit inefficient, but we can wake everything.
330 // Any future that is still active will simply reregister.
331 // This won't happen a lot, so it's ok.
332 self.subscriber_wakers.wake();
333 self.subscriber_wakers.register(waker).unwrap();
334 }
335 }
336 }
337
338 fn register_publisher_waker(&mut self, waker: &Waker) {
339 match self.publisher_wakers.register(waker) {
340 Ok(()) => {}
341 Err(_) => {
342 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
343 // We need to throw it away. It's a bit inefficient, but we can wake everything.
344 // Any future that is still active will simply reregister.
345 // This won't happen a lot, so it's ok.
346 self.publisher_wakers.wake();
347 self.publisher_wakers.register(waker).unwrap();
348 }
349 }
350 }
351
352 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
353 self.subscriber_count -= 1;
354
355 // All messages that haven't been read yet by this subscriber must have their counter decremented
356 let start_id = self.next_message_id - self.queue.len() as u64;
357 if subscriber_next_message_id >= start_id {
358 let current_message_index = (subscriber_next_message_id - start_id) as usize;
359 self.queue
360 .iter_mut()
361 .skip(current_message_index)
362 .for_each(|(_, counter)| *counter -= 1);
363 }
364 }
365
366 fn unregister_publisher(&mut self) {
367 self.publisher_count -= 1;
368 }
369}
370
371/// Error type for the [PubSubChannel]
372#[derive(Debug, PartialEq, Eq, Clone)]
373#[cfg_attr(feature = "defmt", derive(defmt::Format))]
374pub enum Error {
375 /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
376 /// the capacity of the channels must be increased.
377 MaximumSubscribersReached,
378 /// All publisher slots are used. To add another publisher, first another publisher must be dropped or
379 /// the capacity of the channels must be increased.
380 MaximumPublishersReached,
381}
382
383/// 'Middle level' behaviour of the pubsub channel.
384/// This trait is used so that Sub and Pub can be generic over the channel.
385pub trait PubSubBehavior<T> {
386 /// Try to get a message from the queue with the given message id.
387 ///
388 /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers.
389 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
390
391 /// Try to publish a message to the queue.
392 ///
393 /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
394 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
395
396 /// Publish a message immediately
397 fn publish_immediate(&self, message: T);
398
399 /// Let the channel know that a subscriber has dropped
400 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
401
402 /// Let the channel know that a publisher has dropped
403 fn unregister_publisher(&self);
404}
405
406/// The result of the subscriber wait procedure
407#[derive(Debug, Clone, PartialEq, Eq)]
408#[cfg_attr(feature = "defmt", derive(defmt::Format))]
409pub enum WaitResult<T> {
410 /// The subscriber did not receive all messages and lagged by the given amount of messages.
411 /// (This is the amount of messages that were missed)
412 Lagged(u64),
413 /// A message was received
414 Message(T),
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::blocking_mutex::raw::NoopRawMutex;
421
422 #[futures_test::test]
423 async fn dyn_pub_sub_works() {
424 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
425
426 let mut sub0 = channel.dyn_subscriber().unwrap();
427 let mut sub1 = channel.dyn_subscriber().unwrap();
428 let pub0 = channel.dyn_publisher().unwrap();
429
430 pub0.publish(42).await;
431
432 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
433 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
434
435 assert_eq!(sub0.try_next_message(), None);
436 assert_eq!(sub1.try_next_message(), None);
437 }
438
439 #[futures_test::test]
440 async fn all_subscribers_receive() {
441 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
442
443 let mut sub0 = channel.subscriber().unwrap();
444 let mut sub1 = channel.subscriber().unwrap();
445 let pub0 = channel.publisher().unwrap();
446
447 pub0.publish(42).await;
448
449 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
450 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
451
452 assert_eq!(sub0.try_next_message(), None);
453 assert_eq!(sub1.try_next_message(), None);
454 }
455
456 #[futures_test::test]
457 async fn lag_when_queue_full_on_immediate_publish() {
458 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
459
460 let mut sub0 = channel.subscriber().unwrap();
461 let pub0 = channel.publisher().unwrap();
462
463 pub0.publish_immediate(42);
464 pub0.publish_immediate(43);
465 pub0.publish_immediate(44);
466 pub0.publish_immediate(45);
467 pub0.publish_immediate(46);
468 pub0.publish_immediate(47);
469
470 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
471 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
472 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
473 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
474 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
475 assert_eq!(sub0.try_next_message(), None);
476 }
477
478 #[test]
479 fn limited_subs_and_pubs() {
480 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
481
482 let sub0 = channel.subscriber();
483 let sub1 = channel.subscriber();
484 let sub2 = channel.subscriber();
485 let sub3 = channel.subscriber();
486 let sub4 = channel.subscriber();
487
488 assert!(sub0.is_ok());
489 assert!(sub1.is_ok());
490 assert!(sub2.is_ok());
491 assert!(sub3.is_ok());
492 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
493
494 drop(sub0);
495
496 let sub5 = channel.subscriber();
497 assert!(sub5.is_ok());
498
499 // publishers
500
501 let pub0 = channel.publisher();
502 let pub1 = channel.publisher();
503 let pub2 = channel.publisher();
504 let pub3 = channel.publisher();
505 let pub4 = channel.publisher();
506
507 assert!(pub0.is_ok());
508 assert!(pub1.is_ok());
509 assert!(pub2.is_ok());
510 assert!(pub3.is_ok());
511 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
512
513 drop(pub0);
514
515 let pub5 = channel.publisher();
516 assert!(pub5.is_ok());
517 }
518
519 #[test]
520 fn publisher_wait_on_full_queue() {
521 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
522
523 let pub0 = channel.publisher().unwrap();
524
525 // There are no subscribers, so the queue will never be full
526 assert_eq!(pub0.try_publish(0), Ok(()));
527 assert_eq!(pub0.try_publish(0), Ok(()));
528 assert_eq!(pub0.try_publish(0), Ok(()));
529 assert_eq!(pub0.try_publish(0), Ok(()));
530 assert_eq!(pub0.try_publish(0), Ok(()));
531
532 let sub0 = channel.subscriber().unwrap();
533
534 assert_eq!(pub0.try_publish(0), Ok(()));
535 assert_eq!(pub0.try_publish(0), Ok(()));
536 assert_eq!(pub0.try_publish(0), Ok(()));
537 assert_eq!(pub0.try_publish(0), Ok(()));
538 assert_eq!(pub0.try_publish(0), Err(0));
539
540 drop(sub0);
541 }
542}