diff options
| author | Dario Nieuwenhuis <[email protected]> | 2022-08-22 22:00:06 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2022-08-22 22:18:13 +0200 |
| commit | 5677b13a86beca58aa57ecfd7cea0db7ceb189fa (patch) | |
| tree | 0c7425dae57acb94cb6ddca27def7e77609369b3 /embassy-sync/src/pubsub/mod.rs | |
| parent | 21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 (diff) | |
sync: flatten module structure.
Diffstat (limited to 'embassy-sync/src/pubsub/mod.rs')
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 542 |
1 files changed, 542 insertions, 0 deletions
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs new file mode 100644 index 000000000..62a9e4763 --- /dev/null +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -0,0 +1,542 @@ | |||
| 1 | //! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. | ||
| 2 | |||
| 3 | #![deny(missing_docs)] | ||
| 4 | |||
| 5 | use core::cell::RefCell; | ||
| 6 | use core::fmt::Debug; | ||
| 7 | use core::task::{Context, Poll, Waker}; | ||
| 8 | |||
| 9 | use heapless::Deque; | ||
| 10 | |||
| 11 | use self::publisher::{ImmediatePub, Pub}; | ||
| 12 | use self::subscriber::Sub; | ||
| 13 | use crate::blocking_mutex::raw::RawMutex; | ||
| 14 | use crate::blocking_mutex::Mutex; | ||
| 15 | use crate::waitqueue::MultiWakerRegistration; | ||
| 16 | |||
| 17 | pub mod publisher; | ||
| 18 | pub mod subscriber; | ||
| 19 | |||
| 20 | pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; | ||
| 21 | pub use subscriber::{DynSubscriber, Subscriber}; | ||
| 22 | |||
| 23 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers | ||
| 24 | /// | ||
| 25 | /// Any published message can be read by all subscribers. | ||
| 26 | /// A publisher can choose how it sends its message. | ||
| 27 | /// | ||
| 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. | ||
| 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message | ||
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive | ||
| 31 | /// an error to indicate that it has lagged. | ||
| 32 | /// | ||
| 33 | /// ## Example | ||
| 34 | /// | ||
| 35 | /// ``` | ||
| 36 | /// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; | ||
| 37 | /// # use embassy_sync::pubsub::WaitResult; | ||
| 38 | /// # use embassy_sync::pubsub::PubSubChannel; | ||
| 39 | /// # use futures_executor::block_on; | ||
| 40 | /// # let test = async { | ||
| 41 | /// // Create the channel. This can be static as well | ||
| 42 | /// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 43 | /// | ||
| 44 | /// // This is a generic subscriber with a direct reference to the channel | ||
| 45 | /// let mut sub0 = channel.subscriber().unwrap(); | ||
| 46 | /// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel | ||
| 47 | /// let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 48 | /// | ||
| 49 | /// let pub0 = channel.publisher().unwrap(); | ||
| 50 | /// | ||
| 51 | /// // Publish a message, but wait if the queue is full | ||
| 52 | /// pub0.publish(42).await; | ||
| 53 | /// | ||
| 54 | /// // Publish a message, but if the queue is full, just kick out the oldest message. | ||
| 55 | /// // This may cause some subscribers to miss a message | ||
| 56 | /// pub0.publish_immediate(43); | ||
| 57 | /// | ||
| 58 | /// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result | ||
| 59 | /// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 60 | /// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 61 | /// | ||
| 62 | /// // Wait again, but this time ignore any Lag results | ||
| 63 | /// assert_eq!(sub0.next_message_pure().await, 43); | ||
| 64 | /// assert_eq!(sub1.next_message_pure().await, 43); | ||
| 65 | /// | ||
| 66 | /// // There's also a polling interface | ||
| 67 | /// assert_eq!(sub0.try_next_message(), None); | ||
| 68 | /// assert_eq!(sub1.try_next_message(), None); | ||
| 69 | /// # }; | ||
| 70 | /// # | ||
| 71 | /// # block_on(test); | ||
| 72 | /// ``` | ||
| 73 | /// | ||
| 74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | ||
| 75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, | ||
| 76 | } | ||
| 77 | |||
| 78 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> | ||
| 79 | PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 80 | { | ||
| 81 | /// Create a new channel | ||
| 82 | pub const fn new() -> Self { | ||
| 83 | Self { | ||
| 84 | inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | /// Create a new subscriber. It will only receive messages that are published after its creation. | ||
| 89 | /// | ||
| 90 | /// If there are no subscriber slots left, an error will be returned. | ||
| 91 | pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { | ||
| 92 | self.inner.lock(|inner| { | ||
| 93 | let mut s = inner.borrow_mut(); | ||
| 94 | |||
| 95 | if s.subscriber_count >= SUBS { | ||
| 96 | Err(Error::MaximumSubscribersReached) | ||
| 97 | } else { | ||
| 98 | s.subscriber_count += 1; | ||
| 99 | Ok(Subscriber(Sub::new(s.next_message_id, self))) | ||
| 100 | } | ||
| 101 | }) | ||
| 102 | } | ||
| 103 | |||
| 104 | /// Create a new subscriber. It will only receive messages that are published after its creation. | ||
| 105 | /// | ||
| 106 | /// If there are no subscriber slots left, an error will be returned. | ||
| 107 | pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> { | ||
| 108 | self.inner.lock(|inner| { | ||
| 109 | let mut s = inner.borrow_mut(); | ||
| 110 | |||
| 111 | if s.subscriber_count >= SUBS { | ||
| 112 | Err(Error::MaximumSubscribersReached) | ||
| 113 | } else { | ||
| 114 | s.subscriber_count += 1; | ||
| 115 | Ok(DynSubscriber(Sub::new(s.next_message_id, self))) | ||
| 116 | } | ||
| 117 | }) | ||
| 118 | } | ||
| 119 | |||
| 120 | /// Create a new publisher | ||
| 121 | /// | ||
| 122 | /// If there are no publisher slots left, an error will be returned. | ||
| 123 | pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { | ||
| 124 | self.inner.lock(|inner| { | ||
| 125 | let mut s = inner.borrow_mut(); | ||
| 126 | |||
| 127 | if s.publisher_count >= PUBS { | ||
| 128 | Err(Error::MaximumPublishersReached) | ||
| 129 | } else { | ||
| 130 | s.publisher_count += 1; | ||
| 131 | Ok(Publisher(Pub::new(self))) | ||
| 132 | } | ||
| 133 | }) | ||
| 134 | } | ||
| 135 | |||
| 136 | /// Create a new publisher | ||
| 137 | /// | ||
| 138 | /// If there are no publisher slots left, an error will be returned. | ||
| 139 | pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> { | ||
| 140 | self.inner.lock(|inner| { | ||
| 141 | let mut s = inner.borrow_mut(); | ||
| 142 | |||
| 143 | if s.publisher_count >= PUBS { | ||
| 144 | Err(Error::MaximumPublishersReached) | ||
| 145 | } else { | ||
| 146 | s.publisher_count += 1; | ||
| 147 | Ok(DynPublisher(Pub::new(self))) | ||
| 148 | } | ||
| 149 | }) | ||
| 150 | } | ||
| 151 | |||
| 152 | /// Create a new publisher that can only send immediate messages. | ||
| 153 | /// This kind of publisher does not take up a publisher slot. | ||
| 154 | pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { | ||
| 155 | ImmediatePublisher(ImmediatePub::new(self)) | ||
| 156 | } | ||
| 157 | |||
| 158 | /// Create a new publisher that can only send immediate messages. | ||
| 159 | /// This kind of publisher does not take up a publisher slot. | ||
| 160 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { | ||
| 161 | DynImmediatePublisher(ImmediatePub::new(self)) | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> | ||
| 166 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 167 | { | ||
| 168 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { | ||
| 169 | self.inner.lock(|s| { | ||
| 170 | let mut s = s.borrow_mut(); | ||
| 171 | |||
| 172 | // Check if we can read a message | ||
| 173 | match s.get_message(*next_message_id) { | ||
| 174 | // Yes, so we are done polling | ||
| 175 | Some(WaitResult::Message(message)) => { | ||
| 176 | *next_message_id += 1; | ||
| 177 | Poll::Ready(WaitResult::Message(message)) | ||
| 178 | } | ||
| 179 | // No, so we need to reregister our waker and sleep again | ||
| 180 | None => { | ||
| 181 | if let Some(cx) = cx { | ||
| 182 | s.register_subscriber_waker(cx.waker()); | ||
| 183 | } | ||
| 184 | Poll::Pending | ||
| 185 | } | ||
| 186 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged | ||
| 187 | Some(WaitResult::Lagged(amount)) => { | ||
| 188 | *next_message_id += amount; | ||
| 189 | Poll::Ready(WaitResult::Lagged(amount)) | ||
| 190 | } | ||
| 191 | } | ||
| 192 | }) | ||
| 193 | } | ||
| 194 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | ||
| 196 | self.inner.lock(|s| { | ||
| 197 | let mut s = s.borrow_mut(); | ||
| 198 | // Try to publish the message | ||
| 199 | match s.try_publish(message) { | ||
| 200 | // We did it, we are ready | ||
| 201 | Ok(()) => Ok(()), | ||
| 202 | // The queue is full, so we need to reregister our waker and go to sleep | ||
| 203 | Err(message) => { | ||
| 204 | if let Some(cx) = cx { | ||
| 205 | s.register_publisher_waker(cx.waker()); | ||
| 206 | } | ||
| 207 | Err(message) | ||
| 208 | } | ||
| 209 | } | ||
| 210 | }) | ||
| 211 | } | ||
| 212 | |||
| 213 | fn publish_immediate(&self, message: T) { | ||
| 214 | self.inner.lock(|s| { | ||
| 215 | let mut s = s.borrow_mut(); | ||
| 216 | s.publish_immediate(message) | ||
| 217 | }) | ||
| 218 | } | ||
| 219 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | ||
| 221 | self.inner.lock(|s| { | ||
| 222 | let mut s = s.borrow_mut(); | ||
| 223 | s.unregister_subscriber(subscriber_next_message_id) | ||
| 224 | }) | ||
| 225 | } | ||
| 226 | |||
| 227 | fn unregister_publisher(&self) { | ||
| 228 | self.inner.lock(|s| { | ||
| 229 | let mut s = s.borrow_mut(); | ||
| 230 | s.unregister_publisher() | ||
| 231 | }) | ||
| 232 | } | ||
| 233 | } | ||
| 234 | |||
| 235 | /// Internal state for the PubSub channel | ||
| 236 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | ||
| 237 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it | ||
| 238 | queue: Deque<(T, usize), CAP>, | ||
| 239 | /// Every message has an id. | ||
| 240 | /// Don't worry, we won't run out. | ||
| 241 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. | ||
| 242 | next_message_id: u64, | ||
| 243 | /// Collection of wakers for Subscribers that are waiting. | ||
| 244 | subscriber_wakers: MultiWakerRegistration<SUBS>, | ||
| 245 | /// Collection of wakers for Publishers that are waiting. | ||
| 246 | publisher_wakers: MultiWakerRegistration<PUBS>, | ||
| 247 | /// The amount of subscribers that are active | ||
| 248 | subscriber_count: usize, | ||
| 249 | /// The amount of publishers that are active | ||
| 250 | publisher_count: usize, | ||
| 251 | } | ||
| 252 | |||
| 253 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { | ||
| 254 | /// Create a new internal channel state | ||
| 255 | const fn new() -> Self { | ||
| 256 | Self { | ||
| 257 | queue: Deque::new(), | ||
| 258 | next_message_id: 0, | ||
| 259 | subscriber_wakers: MultiWakerRegistration::new(), | ||
| 260 | publisher_wakers: MultiWakerRegistration::new(), | ||
| 261 | subscriber_count: 0, | ||
| 262 | publisher_count: 0, | ||
| 263 | } | ||
| 264 | } | ||
| 265 | |||
| 266 | fn try_publish(&mut self, message: T) -> Result<(), T> { | ||
| 267 | if self.subscriber_count == 0 { | ||
| 268 | // We don't need to publish anything because there is no one to receive it | ||
| 269 | return Ok(()); | ||
| 270 | } | ||
| 271 | |||
| 272 | if self.queue.is_full() { | ||
| 273 | return Err(message); | ||
| 274 | } | ||
| 275 | // We just did a check for this | ||
| 276 | self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); | ||
| 277 | |||
| 278 | self.next_message_id += 1; | ||
| 279 | |||
| 280 | // Wake all of the subscribers | ||
| 281 | self.subscriber_wakers.wake(); | ||
| 282 | |||
| 283 | Ok(()) | ||
| 284 | } | ||
| 285 | |||
| 286 | fn publish_immediate(&mut self, message: T) { | ||
| 287 | // Make space in the queue if required | ||
| 288 | if self.queue.is_full() { | ||
| 289 | self.queue.pop_front(); | ||
| 290 | } | ||
| 291 | |||
| 292 | // This will succeed because we made sure there is space | ||
| 293 | self.try_publish(message).ok().unwrap(); | ||
| 294 | } | ||
| 295 | |||
| 296 | fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> { | ||
| 297 | let start_id = self.next_message_id - self.queue.len() as u64; | ||
| 298 | |||
| 299 | if message_id < start_id { | ||
| 300 | return Some(WaitResult::Lagged(start_id - message_id)); | ||
| 301 | } | ||
| 302 | |||
| 303 | let current_message_index = (message_id - start_id) as usize; | ||
| 304 | |||
| 305 | if current_message_index >= self.queue.len() { | ||
| 306 | return None; | ||
| 307 | } | ||
| 308 | |||
| 309 | // We've checked that the index is valid | ||
| 310 | let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); | ||
| 311 | |||
| 312 | // We're reading this item, so decrement the counter | ||
| 313 | queue_item.1 -= 1; | ||
| 314 | let message = queue_item.0.clone(); | ||
| 315 | |||
| 316 | if current_message_index == 0 && queue_item.1 == 0 { | ||
| 317 | self.queue.pop_front(); | ||
| 318 | self.publisher_wakers.wake(); | ||
| 319 | } | ||
| 320 | |||
| 321 | Some(WaitResult::Message(message)) | ||
| 322 | } | ||
| 323 | |||
| 324 | fn register_subscriber_waker(&mut self, waker: &Waker) { | ||
| 325 | match self.subscriber_wakers.register(waker) { | ||
| 326 | Ok(()) => {} | ||
| 327 | Err(_) => { | ||
| 328 | // All waker slots were full. This can only happen when there was a subscriber that now has dropped. | ||
| 329 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 330 | // Any future that is still active will simply reregister. | ||
| 331 | // This won't happen a lot, so it's ok. | ||
| 332 | self.subscriber_wakers.wake(); | ||
| 333 | self.subscriber_wakers.register(waker).unwrap(); | ||
| 334 | } | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | fn register_publisher_waker(&mut self, waker: &Waker) { | ||
| 339 | match self.publisher_wakers.register(waker) { | ||
| 340 | Ok(()) => {} | ||
| 341 | Err(_) => { | ||
| 342 | // All waker slots were full. This can only happen when there was a publisher that now has dropped. | ||
| 343 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 344 | // Any future that is still active will simply reregister. | ||
| 345 | // This won't happen a lot, so it's ok. | ||
| 346 | self.publisher_wakers.wake(); | ||
| 347 | self.publisher_wakers.register(waker).unwrap(); | ||
| 348 | } | ||
| 349 | } | ||
| 350 | } | ||
| 351 | |||
| 352 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { | ||
| 353 | self.subscriber_count -= 1; | ||
| 354 | |||
| 355 | // All messages that haven't been read yet by this subscriber must have their counter decremented | ||
| 356 | let start_id = self.next_message_id - self.queue.len() as u64; | ||
| 357 | if subscriber_next_message_id >= start_id { | ||
| 358 | let current_message_index = (subscriber_next_message_id - start_id) as usize; | ||
| 359 | self.queue | ||
| 360 | .iter_mut() | ||
| 361 | .skip(current_message_index) | ||
| 362 | .for_each(|(_, counter)| *counter -= 1); | ||
| 363 | } | ||
| 364 | } | ||
| 365 | |||
| 366 | fn unregister_publisher(&mut self) { | ||
| 367 | self.publisher_count -= 1; | ||
| 368 | } | ||
| 369 | } | ||
| 370 | |||
| 371 | /// Error type for the [PubSubChannel] | ||
| 372 | #[derive(Debug, PartialEq, Eq, Clone)] | ||
| 373 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 374 | pub enum Error { | ||
| 375 | /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or | ||
| 376 | /// the capacity of the channels must be increased. | ||
| 377 | MaximumSubscribersReached, | ||
| 378 | /// All publisher slots are used. To add another publisher, first another publisher must be dropped or | ||
| 379 | /// the capacity of the channels must be increased. | ||
| 380 | MaximumPublishersReached, | ||
| 381 | } | ||
| 382 | |||
| 383 | /// 'Middle level' behaviour of the pubsub channel. | ||
| 384 | /// This trait is used so that Sub and Pub can be generic over the channel. | ||
| 385 | pub trait PubSubBehavior<T> { | ||
| 386 | /// Try to get a message from the queue with the given message id. | ||
| 387 | /// | ||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | ||
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | ||
| 390 | |||
| 391 | /// Try to publish a message to the queue. | ||
| 392 | /// | ||
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | ||
| 394 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; | ||
| 395 | |||
| 396 | /// Publish a message immediately | ||
| 397 | fn publish_immediate(&self, message: T); | ||
| 398 | |||
| 399 | /// Let the channel know that a subscriber has dropped | ||
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | ||
| 401 | |||
| 402 | /// Let the channel know that a publisher has dropped | ||
| 403 | fn unregister_publisher(&self); | ||
| 404 | } | ||
| 405 | |||
| 406 | /// The result of the subscriber wait procedure | ||
| 407 | #[derive(Debug, Clone, PartialEq, Eq)] | ||
| 408 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 409 | pub enum WaitResult<T> { | ||
| 410 | /// The subscriber did not receive all messages and lagged by the given amount of messages. | ||
| 411 | /// (This is the amount of messages that were missed) | ||
| 412 | Lagged(u64), | ||
| 413 | /// A message was received | ||
| 414 | Message(T), | ||
| 415 | } | ||
| 416 | |||
| 417 | #[cfg(test)] | ||
| 418 | mod tests { | ||
| 419 | use super::*; | ||
| 420 | use crate::blocking_mutex::raw::NoopRawMutex; | ||
| 421 | |||
| 422 | #[futures_test::test] | ||
| 423 | async fn dyn_pub_sub_works() { | ||
| 424 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 425 | |||
| 426 | let mut sub0 = channel.dyn_subscriber().unwrap(); | ||
| 427 | let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 428 | let pub0 = channel.dyn_publisher().unwrap(); | ||
| 429 | |||
| 430 | pub0.publish(42).await; | ||
| 431 | |||
| 432 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 433 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 434 | |||
| 435 | assert_eq!(sub0.try_next_message(), None); | ||
| 436 | assert_eq!(sub1.try_next_message(), None); | ||
| 437 | } | ||
| 438 | |||
| 439 | #[futures_test::test] | ||
| 440 | async fn all_subscribers_receive() { | ||
| 441 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 442 | |||
| 443 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 444 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 445 | let pub0 = channel.publisher().unwrap(); | ||
| 446 | |||
| 447 | pub0.publish(42).await; | ||
| 448 | |||
| 449 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 450 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 451 | |||
| 452 | assert_eq!(sub0.try_next_message(), None); | ||
| 453 | assert_eq!(sub1.try_next_message(), None); | ||
| 454 | } | ||
| 455 | |||
| 456 | #[futures_test::test] | ||
| 457 | async fn lag_when_queue_full_on_immediate_publish() { | ||
| 458 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 459 | |||
| 460 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 461 | let pub0 = channel.publisher().unwrap(); | ||
| 462 | |||
| 463 | pub0.publish_immediate(42); | ||
| 464 | pub0.publish_immediate(43); | ||
| 465 | pub0.publish_immediate(44); | ||
| 466 | pub0.publish_immediate(45); | ||
| 467 | pub0.publish_immediate(46); | ||
| 468 | pub0.publish_immediate(47); | ||
| 469 | |||
| 470 | assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); | ||
| 471 | assert_eq!(sub0.next_message().await, WaitResult::Message(44)); | ||
| 472 | assert_eq!(sub0.next_message().await, WaitResult::Message(45)); | ||
| 473 | assert_eq!(sub0.next_message().await, WaitResult::Message(46)); | ||
| 474 | assert_eq!(sub0.next_message().await, WaitResult::Message(47)); | ||
| 475 | assert_eq!(sub0.try_next_message(), None); | ||
| 476 | } | ||
| 477 | |||
| 478 | #[test] | ||
| 479 | fn limited_subs_and_pubs() { | ||
| 480 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 481 | |||
| 482 | let sub0 = channel.subscriber(); | ||
| 483 | let sub1 = channel.subscriber(); | ||
| 484 | let sub2 = channel.subscriber(); | ||
| 485 | let sub3 = channel.subscriber(); | ||
| 486 | let sub4 = channel.subscriber(); | ||
| 487 | |||
| 488 | assert!(sub0.is_ok()); | ||
| 489 | assert!(sub1.is_ok()); | ||
| 490 | assert!(sub2.is_ok()); | ||
| 491 | assert!(sub3.is_ok()); | ||
| 492 | assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); | ||
| 493 | |||
| 494 | drop(sub0); | ||
| 495 | |||
| 496 | let sub5 = channel.subscriber(); | ||
| 497 | assert!(sub5.is_ok()); | ||
| 498 | |||
| 499 | // publishers | ||
| 500 | |||
| 501 | let pub0 = channel.publisher(); | ||
| 502 | let pub1 = channel.publisher(); | ||
| 503 | let pub2 = channel.publisher(); | ||
| 504 | let pub3 = channel.publisher(); | ||
| 505 | let pub4 = channel.publisher(); | ||
| 506 | |||
| 507 | assert!(pub0.is_ok()); | ||
| 508 | assert!(pub1.is_ok()); | ||
| 509 | assert!(pub2.is_ok()); | ||
| 510 | assert!(pub3.is_ok()); | ||
| 511 | assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); | ||
| 512 | |||
| 513 | drop(pub0); | ||
| 514 | |||
| 515 | let pub5 = channel.publisher(); | ||
| 516 | assert!(pub5.is_ok()); | ||
| 517 | } | ||
| 518 | |||
| 519 | #[test] | ||
| 520 | fn publisher_wait_on_full_queue() { | ||
| 521 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 522 | |||
| 523 | let pub0 = channel.publisher().unwrap(); | ||
| 524 | |||
| 525 | // There are no subscribers, so the queue will never be full | ||
| 526 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 527 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 528 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 529 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 530 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 531 | |||
| 532 | let sub0 = channel.subscriber().unwrap(); | ||
| 533 | |||
| 534 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 535 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 536 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 537 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 538 | assert_eq!(pub0.try_publish(0), Err(0)); | ||
| 539 | |||
| 540 | drop(sub0); | ||
| 541 | } | ||
| 542 | } | ||
