diff options
Diffstat (limited to 'embassy-sync/src/channel')
| -rw-r--r-- | embassy-sync/src/channel/mod.rs | 5 | ||||
| -rw-r--r-- | embassy-sync/src/channel/mpmc.rs | 596 | ||||
| -rw-r--r-- | embassy-sync/src/channel/pubsub/mod.rs | 542 | ||||
| -rw-r--r-- | embassy-sync/src/channel/pubsub/publisher.rs | 182 | ||||
| -rw-r--r-- | embassy-sync/src/channel/pubsub/subscriber.rs | 152 | ||||
| -rw-r--r-- | embassy-sync/src/channel/signal.rs | 100 |
6 files changed, 1577 insertions, 0 deletions
diff --git a/embassy-sync/src/channel/mod.rs b/embassy-sync/src/channel/mod.rs new file mode 100644 index 000000000..5df1f5c5c --- /dev/null +++ b/embassy-sync/src/channel/mod.rs | |||
| @@ -0,0 +1,5 @@ | |||
| 1 | //! Async channels | ||
| 2 | |||
| 3 | pub mod mpmc; | ||
| 4 | pub mod pubsub; | ||
| 5 | pub mod signal; | ||
diff --git a/embassy-sync/src/channel/mpmc.rs b/embassy-sync/src/channel/mpmc.rs new file mode 100644 index 000000000..7bebd3412 --- /dev/null +++ b/embassy-sync/src/channel/mpmc.rs | |||
| @@ -0,0 +1,596 @@ | |||
| 1 | //! A queue for sending values between asynchronous tasks. | ||
| 2 | //! | ||
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | ||
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | ||
| 5 | //! | ||
| 6 | //! Receivers are competing for messages. So a message that is received by | ||
| 7 | //! one receiver is not received by any other. | ||
| 8 | //! | ||
| 9 | //! This queue takes a Mutex type so that various | ||
| 10 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 11 | //! for single-core Cortex-M targets where messages are only passed | ||
| 12 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 13 | //! can also be used for single-core targets where messages are to be | ||
| 14 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 15 | //! | ||
| 16 | //! This module provides a bounded channel that has a limit on the number of | ||
| 17 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 18 | //! another message will result in an error being returned. | ||
| 19 | //! | ||
| 20 | |||
| 21 | use core::cell::RefCell; | ||
| 22 | use core::future::Future; | ||
| 23 | use core::pin::Pin; | ||
| 24 | use core::task::{Context, Poll}; | ||
| 25 | |||
| 26 | use heapless::Deque; | ||
| 27 | |||
| 28 | use crate::blocking_mutex::raw::RawMutex; | ||
| 29 | use crate::blocking_mutex::Mutex; | ||
| 30 | use crate::waitqueue::WakerRegistration; | ||
| 31 | |||
| 32 | /// Send-only access to a [`Channel`]. | ||
| 33 | #[derive(Copy)] | ||
| 34 | pub struct Sender<'ch, M, T, const N: usize> | ||
| 35 | where | ||
| 36 | M: RawMutex, | ||
| 37 | { | ||
| 38 | channel: &'ch Channel<M, T, N>, | ||
| 39 | } | ||
| 40 | |||
| 41 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | ||
| 42 | where | ||
| 43 | M: RawMutex, | ||
| 44 | { | ||
| 45 | fn clone(&self) -> Self { | ||
| 46 | Sender { channel: self.channel } | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | ||
| 51 | where | ||
| 52 | M: RawMutex, | ||
| 53 | { | ||
| 54 | /// Sends a value. | ||
| 55 | /// | ||
| 56 | /// See [`Channel::send()`] | ||
| 57 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { | ||
| 58 | self.channel.send(message) | ||
| 59 | } | ||
| 60 | |||
| 61 | /// Attempt to immediately send a message. | ||
| 62 | /// | ||
| 63 | /// See [`Channel::send()`] | ||
| 64 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 65 | self.channel.try_send(message) | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 70 | #[derive(Copy)] | ||
| 71 | pub struct DynamicSender<'ch, T> { | ||
| 72 | channel: &'ch dyn DynamicChannel<T>, | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'ch, T> Clone for DynamicSender<'ch, T> { | ||
| 76 | fn clone(&self) -> Self { | ||
| 77 | DynamicSender { channel: self.channel } | ||
| 78 | } | ||
| 79 | } | ||
| 80 | |||
| 81 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T> | ||
| 82 | where | ||
| 83 | M: RawMutex, | ||
| 84 | { | ||
| 85 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 86 | Self { channel: s.channel } | ||
| 87 | } | ||
| 88 | } | ||
| 89 | |||
| 90 | impl<'ch, T> DynamicSender<'ch, T> { | ||
| 91 | /// Sends a value. | ||
| 92 | /// | ||
| 93 | /// See [`Channel::send()`] | ||
| 94 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 95 | DynamicSendFuture { | ||
| 96 | channel: self.channel, | ||
| 97 | message: Some(message), | ||
| 98 | } | ||
| 99 | } | ||
| 100 | |||
| 101 | /// Attempt to immediately send a message. | ||
| 102 | /// | ||
| 103 | /// See [`Channel::send()`] | ||
| 104 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 105 | self.channel.try_send_with_context(message, None) | ||
| 106 | } | ||
| 107 | } | ||
| 108 | |||
| 109 | /// Receive-only access to a [`Channel`]. | ||
| 110 | #[derive(Copy)] | ||
| 111 | pub struct Receiver<'ch, M, T, const N: usize> | ||
| 112 | where | ||
| 113 | M: RawMutex, | ||
| 114 | { | ||
| 115 | channel: &'ch Channel<M, T, N>, | ||
| 116 | } | ||
| 117 | |||
| 118 | impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> | ||
| 119 | where | ||
| 120 | M: RawMutex, | ||
| 121 | { | ||
| 122 | fn clone(&self) -> Self { | ||
| 123 | Receiver { channel: self.channel } | ||
| 124 | } | ||
| 125 | } | ||
| 126 | |||
| 127 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | ||
| 128 | where | ||
| 129 | M: RawMutex, | ||
| 130 | { | ||
| 131 | /// Receive the next value. | ||
| 132 | /// | ||
| 133 | /// See [`Channel::recv()`]. | ||
| 134 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | ||
| 135 | self.channel.recv() | ||
| 136 | } | ||
| 137 | |||
| 138 | /// Attempt to immediately receive the next value. | ||
| 139 | /// | ||
| 140 | /// See [`Channel::try_recv()`] | ||
| 141 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 142 | self.channel.try_recv() | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 147 | #[derive(Copy)] | ||
| 148 | pub struct DynamicReceiver<'ch, T> { | ||
| 149 | channel: &'ch dyn DynamicChannel<T>, | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'ch, T> Clone for DynamicReceiver<'ch, T> { | ||
| 153 | fn clone(&self) -> Self { | ||
| 154 | DynamicReceiver { channel: self.channel } | ||
| 155 | } | ||
| 156 | } | ||
| 157 | |||
| 158 | impl<'ch, T> DynamicReceiver<'ch, T> { | ||
| 159 | /// Receive the next value. | ||
| 160 | /// | ||
| 161 | /// See [`Channel::recv()`]. | ||
| 162 | pub fn recv(&self) -> DynamicRecvFuture<'_, T> { | ||
| 163 | DynamicRecvFuture { channel: self.channel } | ||
| 164 | } | ||
| 165 | |||
| 166 | /// Attempt to immediately receive the next value. | ||
| 167 | /// | ||
| 168 | /// See [`Channel::try_recv()`] | ||
| 169 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 170 | self.channel.try_recv_with_context(None) | ||
| 171 | } | ||
| 172 | } | ||
| 173 | |||
| 174 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T> | ||
| 175 | where | ||
| 176 | M: RawMutex, | ||
| 177 | { | ||
| 178 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 179 | Self { channel: s.channel } | ||
| 180 | } | ||
| 181 | } | ||
| 182 | |||
| 183 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. | ||
| 184 | pub struct RecvFuture<'ch, M, T, const N: usize> | ||
| 185 | where | ||
| 186 | M: RawMutex, | ||
| 187 | { | ||
| 188 | channel: &'ch Channel<M, T, N>, | ||
| 189 | } | ||
| 190 | |||
| 191 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | ||
| 192 | where | ||
| 193 | M: RawMutex, | ||
| 194 | { | ||
| 195 | type Output = T; | ||
| 196 | |||
| 197 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 198 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 199 | Ok(v) => Poll::Ready(v), | ||
| 200 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 201 | } | ||
| 202 | } | ||
| 203 | } | ||
| 204 | |||
| 205 | /// Future returned by [`DynamicReceiver::recv`]. | ||
| 206 | pub struct DynamicRecvFuture<'ch, T> { | ||
| 207 | channel: &'ch dyn DynamicChannel<T>, | ||
| 208 | } | ||
| 209 | |||
| 210 | impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | ||
| 211 | type Output = T; | ||
| 212 | |||
| 213 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| 214 | match self.channel.try_recv_with_context(Some(cx)) { | ||
| 215 | Ok(v) => Poll::Ready(v), | ||
| 216 | Err(TryRecvError::Empty) => Poll::Pending, | ||
| 217 | } | ||
| 218 | } | ||
| 219 | } | ||
| 220 | |||
| 221 | /// Future returned by [`Channel::send`] and [`Sender::send`]. | ||
| 222 | pub struct SendFuture<'ch, M, T, const N: usize> | ||
| 223 | where | ||
| 224 | M: RawMutex, | ||
| 225 | { | ||
| 226 | channel: &'ch Channel<M, T, N>, | ||
| 227 | message: Option<T>, | ||
| 228 | } | ||
| 229 | |||
| 230 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | ||
| 231 | where | ||
| 232 | M: RawMutex, | ||
| 233 | { | ||
| 234 | type Output = (); | ||
| 235 | |||
| 236 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 237 | match self.message.take() { | ||
| 238 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 239 | Ok(..) => Poll::Ready(()), | ||
| 240 | Err(TrySendError::Full(m)) => { | ||
| 241 | self.message = Some(m); | ||
| 242 | Poll::Pending | ||
| 243 | } | ||
| 244 | }, | ||
| 245 | None => panic!("Message cannot be None"), | ||
| 246 | } | ||
| 247 | } | ||
| 248 | } | ||
| 249 | |||
| 250 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | ||
| 251 | |||
| 252 | /// Future returned by [`DynamicSender::send`]. | ||
| 253 | pub struct DynamicSendFuture<'ch, T> { | ||
| 254 | channel: &'ch dyn DynamicChannel<T>, | ||
| 255 | message: Option<T>, | ||
| 256 | } | ||
| 257 | |||
| 258 | impl<'ch, T> Future for DynamicSendFuture<'ch, T> { | ||
| 259 | type Output = (); | ||
| 260 | |||
| 261 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 262 | match self.message.take() { | ||
| 263 | Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { | ||
| 264 | Ok(..) => Poll::Ready(()), | ||
| 265 | Err(TrySendError::Full(m)) => { | ||
| 266 | self.message = Some(m); | ||
| 267 | Poll::Pending | ||
| 268 | } | ||
| 269 | }, | ||
| 270 | None => panic!("Message cannot be None"), | ||
| 271 | } | ||
| 272 | } | ||
| 273 | } | ||
| 274 | |||
| 275 | impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} | ||
| 276 | |||
| 277 | trait DynamicChannel<T> { | ||
| 278 | fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>; | ||
| 279 | |||
| 280 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>; | ||
| 281 | } | ||
| 282 | |||
| 283 | /// Error returned by [`try_recv`](Channel::try_recv). | ||
| 284 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 285 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 286 | pub enum TryRecvError { | ||
| 287 | /// A message could not be received because the channel is empty. | ||
| 288 | Empty, | ||
| 289 | } | ||
| 290 | |||
| 291 | /// Error returned by [`try_send`](Channel::try_send). | ||
| 292 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] | ||
| 293 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 294 | pub enum TrySendError<T> { | ||
| 295 | /// The data could not be sent on the channel because the channel is | ||
| 296 | /// currently full and sending would require blocking. | ||
| 297 | Full(T), | ||
| 298 | } | ||
| 299 | |||
| 300 | struct ChannelState<T, const N: usize> { | ||
| 301 | queue: Deque<T, N>, | ||
| 302 | receiver_waker: WakerRegistration, | ||
| 303 | senders_waker: WakerRegistration, | ||
| 304 | } | ||
| 305 | |||
| 306 | impl<T, const N: usize> ChannelState<T, N> { | ||
| 307 | const fn new() -> Self { | ||
| 308 | ChannelState { | ||
| 309 | queue: Deque::new(), | ||
| 310 | receiver_waker: WakerRegistration::new(), | ||
| 311 | senders_waker: WakerRegistration::new(), | ||
| 312 | } | ||
| 313 | } | ||
| 314 | |||
| 315 | fn try_recv(&mut self) -> Result<T, TryRecvError> { | ||
| 316 | self.try_recv_with_context(None) | ||
| 317 | } | ||
| 318 | |||
| 319 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 320 | if self.queue.is_full() { | ||
| 321 | self.senders_waker.wake(); | ||
| 322 | } | ||
| 323 | |||
| 324 | if let Some(message) = self.queue.pop_front() { | ||
| 325 | Ok(message) | ||
| 326 | } else { | ||
| 327 | if let Some(cx) = cx { | ||
| 328 | self.receiver_waker.register(cx.waker()); | ||
| 329 | } | ||
| 330 | Err(TryRecvError::Empty) | ||
| 331 | } | ||
| 332 | } | ||
| 333 | |||
| 334 | fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { | ||
| 335 | self.try_send_with_context(message, None) | ||
| 336 | } | ||
| 337 | |||
| 338 | fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | ||
| 339 | match self.queue.push_back(message) { | ||
| 340 | Ok(()) => { | ||
| 341 | self.receiver_waker.wake(); | ||
| 342 | Ok(()) | ||
| 343 | } | ||
| 344 | Err(message) => { | ||
| 345 | if let Some(cx) = cx { | ||
| 346 | self.senders_waker.register(cx.waker()); | ||
| 347 | } | ||
| 348 | Err(TrySendError::Full(message)) | ||
| 349 | } | ||
| 350 | } | ||
| 351 | } | ||
| 352 | } | ||
| 353 | |||
| 354 | /// A bounded channel for communicating between asynchronous tasks | ||
| 355 | /// with backpressure. | ||
| 356 | /// | ||
| 357 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 358 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 359 | /// received from the channel. | ||
| 360 | /// | ||
| 361 | /// All data sent will become available in the same order as it was sent. | ||
| 362 | pub struct Channel<M, T, const N: usize> | ||
| 363 | where | ||
| 364 | M: RawMutex, | ||
| 365 | { | ||
| 366 | inner: Mutex<M, RefCell<ChannelState<T, N>>>, | ||
| 367 | } | ||
| 368 | |||
| 369 | impl<M, T, const N: usize> Channel<M, T, N> | ||
| 370 | where | ||
| 371 | M: RawMutex, | ||
| 372 | { | ||
| 373 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | ||
| 374 | /// | ||
| 375 | /// ``` | ||
| 376 | /// use embassy_sync::channel::mpmc::Channel; | ||
| 377 | /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; | ||
| 378 | /// | ||
| 379 | /// // Declare a bounded channel of 3 u32s. | ||
| 380 | /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 381 | /// ``` | ||
| 382 | pub const fn new() -> Self { | ||
| 383 | Self { | ||
| 384 | inner: Mutex::new(RefCell::new(ChannelState::new())), | ||
| 385 | } | ||
| 386 | } | ||
| 387 | |||
| 388 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { | ||
| 389 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) | ||
| 390 | } | ||
| 391 | |||
| 392 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 393 | self.lock(|c| c.try_recv_with_context(cx)) | ||
| 394 | } | ||
| 395 | |||
| 396 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | ||
| 397 | self.lock(|c| c.try_send_with_context(m, cx)) | ||
| 398 | } | ||
| 399 | |||
| 400 | /// Get a sender for this channel. | ||
| 401 | pub fn sender(&self) -> Sender<'_, M, T, N> { | ||
| 402 | Sender { channel: self } | ||
| 403 | } | ||
| 404 | |||
| 405 | /// Get a receiver for this channel. | ||
| 406 | pub fn receiver(&self) -> Receiver<'_, M, T, N> { | ||
| 407 | Receiver { channel: self } | ||
| 408 | } | ||
| 409 | |||
| 410 | /// Send a value, waiting until there is capacity. | ||
| 411 | /// | ||
| 412 | /// Sending completes when the value has been pushed to the channel's queue. | ||
| 413 | /// This doesn't mean the value has been received yet. | ||
| 414 | pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { | ||
| 415 | SendFuture { | ||
| 416 | channel: self, | ||
| 417 | message: Some(message), | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | /// Attempt to immediately send a message. | ||
| 422 | /// | ||
| 423 | /// This method differs from [`send`](Channel::send) by returning immediately if the channel's | ||
| 424 | /// buffer is full, instead of waiting. | ||
| 425 | /// | ||
| 426 | /// # Errors | ||
| 427 | /// | ||
| 428 | /// If the channel capacity has been reached, i.e., the channel has `n` | ||
| 429 | /// buffered values where `n` is the argument passed to [`Channel`], then an | ||
| 430 | /// error is returned. | ||
| 431 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 432 | self.lock(|c| c.try_send(message)) | ||
| 433 | } | ||
| 434 | |||
| 435 | /// Receive the next value. | ||
| 436 | /// | ||
| 437 | /// If there are no messages in the channel's buffer, this method will | ||
| 438 | /// wait until a message is sent. | ||
| 439 | pub fn recv(&self) -> RecvFuture<'_, M, T, N> { | ||
| 440 | RecvFuture { channel: self } | ||
| 441 | } | ||
| 442 | |||
| 443 | /// Attempt to immediately receive a message. | ||
| 444 | /// | ||
| 445 | /// This method will either receive a message from the channel immediately or return an error | ||
| 446 | /// if the channel is empty. | ||
| 447 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | ||
| 448 | self.lock(|c| c.try_recv()) | ||
| 449 | } | ||
| 450 | } | ||
| 451 | |||
| 452 | /// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the | ||
| 453 | /// tradeoff cost of dynamic dispatch. | ||
| 454 | impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N> | ||
| 455 | where | ||
| 456 | M: RawMutex, | ||
| 457 | { | ||
| 458 | fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> { | ||
| 459 | Channel::try_send_with_context(self, m, cx) | ||
| 460 | } | ||
| 461 | |||
| 462 | fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | ||
| 463 | Channel::try_recv_with_context(self, cx) | ||
| 464 | } | ||
| 465 | } | ||
| 466 | |||
| 467 | #[cfg(test)] | ||
| 468 | mod tests { | ||
| 469 | use core::time::Duration; | ||
| 470 | |||
| 471 | use futures_executor::ThreadPool; | ||
| 472 | use futures_timer::Delay; | ||
| 473 | use futures_util::task::SpawnExt; | ||
| 474 | use static_cell::StaticCell; | ||
| 475 | |||
| 476 | use super::*; | ||
| 477 | use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; | ||
| 478 | |||
| 479 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | ||
| 480 | c.queue.capacity() - c.queue.len() | ||
| 481 | } | ||
| 482 | |||
| 483 | #[test] | ||
| 484 | fn sending_once() { | ||
| 485 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 486 | assert!(c.try_send(1).is_ok()); | ||
| 487 | assert_eq!(capacity(&c), 2); | ||
| 488 | } | ||
| 489 | |||
| 490 | #[test] | ||
| 491 | fn sending_when_full() { | ||
| 492 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 493 | let _ = c.try_send(1); | ||
| 494 | let _ = c.try_send(1); | ||
| 495 | let _ = c.try_send(1); | ||
| 496 | match c.try_send(2) { | ||
| 497 | Err(TrySendError::Full(2)) => assert!(true), | ||
| 498 | _ => assert!(false), | ||
| 499 | } | ||
| 500 | assert_eq!(capacity(&c), 0); | ||
| 501 | } | ||
| 502 | |||
| 503 | #[test] | ||
| 504 | fn receiving_once_with_one_send() { | ||
| 505 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 506 | assert!(c.try_send(1).is_ok()); | ||
| 507 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 508 | assert_eq!(capacity(&c), 3); | ||
| 509 | } | ||
| 510 | |||
| 511 | #[test] | ||
| 512 | fn receiving_when_empty() { | ||
| 513 | let mut c = ChannelState::<u32, 3>::new(); | ||
| 514 | match c.try_recv() { | ||
| 515 | Err(TryRecvError::Empty) => assert!(true), | ||
| 516 | _ => assert!(false), | ||
| 517 | } | ||
| 518 | assert_eq!(capacity(&c), 3); | ||
| 519 | } | ||
| 520 | |||
| 521 | #[test] | ||
| 522 | fn simple_send_and_receive() { | ||
| 523 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 524 | assert!(c.try_send(1).is_ok()); | ||
| 525 | assert_eq!(c.try_recv().unwrap(), 1); | ||
| 526 | } | ||
| 527 | |||
| 528 | #[test] | ||
| 529 | fn cloning() { | ||
| 530 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 531 | let r1 = c.receiver(); | ||
| 532 | let s1 = c.sender(); | ||
| 533 | |||
| 534 | let _ = r1.clone(); | ||
| 535 | let _ = s1.clone(); | ||
| 536 | } | ||
| 537 | |||
| 538 | #[test] | ||
| 539 | fn dynamic_dispatch() { | ||
| 540 | let c = Channel::<NoopRawMutex, u32, 3>::new(); | ||
| 541 | let s: DynamicSender<'_, u32> = c.sender().into(); | ||
| 542 | let r: DynamicReceiver<'_, u32> = c.receiver().into(); | ||
| 543 | |||
| 544 | assert!(s.try_send(1).is_ok()); | ||
| 545 | assert_eq!(r.try_recv().unwrap(), 1); | ||
| 546 | } | ||
| 547 | |||
| 548 | #[futures_test::test] | ||
| 549 | async fn receiver_receives_given_try_send_async() { | ||
| 550 | let executor = ThreadPool::new().unwrap(); | ||
| 551 | |||
| 552 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new(); | ||
| 553 | let c = &*CHANNEL.init(Channel::new()); | ||
| 554 | let c2 = c; | ||
| 555 | assert!(executor | ||
| 556 | .spawn(async move { | ||
| 557 | assert!(c2.try_send(1).is_ok()); | ||
| 558 | }) | ||
| 559 | .is_ok()); | ||
| 560 | assert_eq!(c.recv().await, 1); | ||
| 561 | } | ||
| 562 | |||
| 563 | #[futures_test::test] | ||
| 564 | async fn sender_send_completes_if_capacity() { | ||
| 565 | let c = Channel::<CriticalSectionRawMutex, u32, 1>::new(); | ||
| 566 | c.send(1).await; | ||
| 567 | assert_eq!(c.recv().await, 1); | ||
| 568 | } | ||
| 569 | |||
| 570 | #[futures_test::test] | ||
| 571 | async fn senders_sends_wait_until_capacity() { | ||
| 572 | let executor = ThreadPool::new().unwrap(); | ||
| 573 | |||
| 574 | static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new(); | ||
| 575 | let c = &*CHANNEL.init(Channel::new()); | ||
| 576 | assert!(c.try_send(1).is_ok()); | ||
| 577 | |||
| 578 | let c2 = c; | ||
| 579 | let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); | ||
| 580 | let c2 = c; | ||
| 581 | let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); | ||
| 582 | // Wish I could think of a means of determining that the async send is waiting instead. | ||
| 583 | // However, I've used the debugger to observe that the send does indeed wait. | ||
| 584 | Delay::new(Duration::from_millis(500)).await; | ||
| 585 | assert_eq!(c.recv().await, 1); | ||
| 586 | assert!(executor | ||
| 587 | .spawn(async move { | ||
| 588 | loop { | ||
| 589 | c.recv().await; | ||
| 590 | } | ||
| 591 | }) | ||
| 592 | .is_ok()); | ||
| 593 | send_task_1.unwrap().await; | ||
| 594 | send_task_2.unwrap().await; | ||
| 595 | } | ||
| 596 | } | ||
diff --git a/embassy-sync/src/channel/pubsub/mod.rs b/embassy-sync/src/channel/pubsub/mod.rs new file mode 100644 index 000000000..f62b4d118 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/mod.rs | |||
| @@ -0,0 +1,542 @@ | |||
| 1 | //! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. | ||
| 2 | |||
| 3 | #![deny(missing_docs)] | ||
| 4 | |||
| 5 | use core::cell::RefCell; | ||
| 6 | use core::fmt::Debug; | ||
| 7 | use core::task::{Context, Poll, Waker}; | ||
| 8 | |||
| 9 | use heapless::Deque; | ||
| 10 | |||
| 11 | use self::publisher::{ImmediatePub, Pub}; | ||
| 12 | use self::subscriber::Sub; | ||
| 13 | use crate::blocking_mutex::raw::RawMutex; | ||
| 14 | use crate::blocking_mutex::Mutex; | ||
| 15 | use crate::waitqueue::MultiWakerRegistration; | ||
| 16 | |||
| 17 | pub mod publisher; | ||
| 18 | pub mod subscriber; | ||
| 19 | |||
| 20 | pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; | ||
| 21 | pub use subscriber::{DynSubscriber, Subscriber}; | ||
| 22 | |||
| 23 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers | ||
| 24 | /// | ||
| 25 | /// Any published message can be read by all subscribers. | ||
| 26 | /// A publisher can choose how it sends its message. | ||
| 27 | /// | ||
| 28 | /// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. | ||
| 29 | /// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message | ||
| 30 | /// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive | ||
| 31 | /// an error to indicate that it has lagged. | ||
| 32 | /// | ||
| 33 | /// ## Example | ||
| 34 | /// | ||
| 35 | /// ``` | ||
| 36 | /// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; | ||
| 37 | /// # use embassy_sync::channel::pubsub::WaitResult; | ||
| 38 | /// # use embassy_sync::channel::pubsub::PubSubChannel; | ||
| 39 | /// # use futures_executor::block_on; | ||
| 40 | /// # let test = async { | ||
| 41 | /// // Create the channel. This can be static as well | ||
| 42 | /// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 43 | /// | ||
| 44 | /// // This is a generic subscriber with a direct reference to the channel | ||
| 45 | /// let mut sub0 = channel.subscriber().unwrap(); | ||
| 46 | /// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel | ||
| 47 | /// let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 48 | /// | ||
| 49 | /// let pub0 = channel.publisher().unwrap(); | ||
| 50 | /// | ||
| 51 | /// // Publish a message, but wait if the queue is full | ||
| 52 | /// pub0.publish(42).await; | ||
| 53 | /// | ||
| 54 | /// // Publish a message, but if the queue is full, just kick out the oldest message. | ||
| 55 | /// // This may cause some subscribers to miss a message | ||
| 56 | /// pub0.publish_immediate(43); | ||
| 57 | /// | ||
| 58 | /// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result | ||
| 59 | /// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 60 | /// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 61 | /// | ||
| 62 | /// // Wait again, but this time ignore any Lag results | ||
| 63 | /// assert_eq!(sub0.next_message_pure().await, 43); | ||
| 64 | /// assert_eq!(sub1.next_message_pure().await, 43); | ||
| 65 | /// | ||
| 66 | /// // There's also a polling interface | ||
| 67 | /// assert_eq!(sub0.try_next_message(), None); | ||
| 68 | /// assert_eq!(sub1.try_next_message(), None); | ||
| 69 | /// # }; | ||
| 70 | /// # | ||
| 71 | /// # block_on(test); | ||
| 72 | /// ``` | ||
| 73 | /// | ||
| 74 | pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | ||
| 75 | inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>, | ||
| 76 | } | ||
| 77 | |||
| 78 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> | ||
| 79 | PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 80 | { | ||
| 81 | /// Create a new channel | ||
| 82 | pub const fn new() -> Self { | ||
| 83 | Self { | ||
| 84 | inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | /// Create a new subscriber. It will only receive messages that are published after its creation. | ||
| 89 | /// | ||
| 90 | /// If there are no subscriber slots left, an error will be returned. | ||
| 91 | pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> { | ||
| 92 | self.inner.lock(|inner| { | ||
| 93 | let mut s = inner.borrow_mut(); | ||
| 94 | |||
| 95 | if s.subscriber_count >= SUBS { | ||
| 96 | Err(Error::MaximumSubscribersReached) | ||
| 97 | } else { | ||
| 98 | s.subscriber_count += 1; | ||
| 99 | Ok(Subscriber(Sub::new(s.next_message_id, self))) | ||
| 100 | } | ||
| 101 | }) | ||
| 102 | } | ||
| 103 | |||
| 104 | /// Create a new subscriber. It will only receive messages that are published after its creation. | ||
| 105 | /// | ||
| 106 | /// If there are no subscriber slots left, an error will be returned. | ||
| 107 | pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> { | ||
| 108 | self.inner.lock(|inner| { | ||
| 109 | let mut s = inner.borrow_mut(); | ||
| 110 | |||
| 111 | if s.subscriber_count >= SUBS { | ||
| 112 | Err(Error::MaximumSubscribersReached) | ||
| 113 | } else { | ||
| 114 | s.subscriber_count += 1; | ||
| 115 | Ok(DynSubscriber(Sub::new(s.next_message_id, self))) | ||
| 116 | } | ||
| 117 | }) | ||
| 118 | } | ||
| 119 | |||
| 120 | /// Create a new publisher | ||
| 121 | /// | ||
| 122 | /// If there are no publisher slots left, an error will be returned. | ||
| 123 | pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> { | ||
| 124 | self.inner.lock(|inner| { | ||
| 125 | let mut s = inner.borrow_mut(); | ||
| 126 | |||
| 127 | if s.publisher_count >= PUBS { | ||
| 128 | Err(Error::MaximumPublishersReached) | ||
| 129 | } else { | ||
| 130 | s.publisher_count += 1; | ||
| 131 | Ok(Publisher(Pub::new(self))) | ||
| 132 | } | ||
| 133 | }) | ||
| 134 | } | ||
| 135 | |||
| 136 | /// Create a new publisher | ||
| 137 | /// | ||
| 138 | /// If there are no publisher slots left, an error will be returned. | ||
| 139 | pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> { | ||
| 140 | self.inner.lock(|inner| { | ||
| 141 | let mut s = inner.borrow_mut(); | ||
| 142 | |||
| 143 | if s.publisher_count >= PUBS { | ||
| 144 | Err(Error::MaximumPublishersReached) | ||
| 145 | } else { | ||
| 146 | s.publisher_count += 1; | ||
| 147 | Ok(DynPublisher(Pub::new(self))) | ||
| 148 | } | ||
| 149 | }) | ||
| 150 | } | ||
| 151 | |||
| 152 | /// Create a new publisher that can only send immediate messages. | ||
| 153 | /// This kind of publisher does not take up a publisher slot. | ||
| 154 | pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> { | ||
| 155 | ImmediatePublisher(ImmediatePub::new(self)) | ||
| 156 | } | ||
| 157 | |||
| 158 | /// Create a new publisher that can only send immediate messages. | ||
| 159 | /// This kind of publisher does not take up a publisher slot. | ||
| 160 | pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> { | ||
| 161 | DynImmediatePublisher(ImmediatePub::new(self)) | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> | ||
| 166 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | ||
| 167 | { | ||
| 168 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { | ||
| 169 | self.inner.lock(|s| { | ||
| 170 | let mut s = s.borrow_mut(); | ||
| 171 | |||
| 172 | // Check if we can read a message | ||
| 173 | match s.get_message(*next_message_id) { | ||
| 174 | // Yes, so we are done polling | ||
| 175 | Some(WaitResult::Message(message)) => { | ||
| 176 | *next_message_id += 1; | ||
| 177 | Poll::Ready(WaitResult::Message(message)) | ||
| 178 | } | ||
| 179 | // No, so we need to reregister our waker and sleep again | ||
| 180 | None => { | ||
| 181 | if let Some(cx) = cx { | ||
| 182 | s.register_subscriber_waker(cx.waker()); | ||
| 183 | } | ||
| 184 | Poll::Pending | ||
| 185 | } | ||
| 186 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged | ||
| 187 | Some(WaitResult::Lagged(amount)) => { | ||
| 188 | *next_message_id += amount; | ||
| 189 | Poll::Ready(WaitResult::Lagged(amount)) | ||
| 190 | } | ||
| 191 | } | ||
| 192 | }) | ||
| 193 | } | ||
| 194 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | ||
| 196 | self.inner.lock(|s| { | ||
| 197 | let mut s = s.borrow_mut(); | ||
| 198 | // Try to publish the message | ||
| 199 | match s.try_publish(message) { | ||
| 200 | // We did it, we are ready | ||
| 201 | Ok(()) => Ok(()), | ||
| 202 | // The queue is full, so we need to reregister our waker and go to sleep | ||
| 203 | Err(message) => { | ||
| 204 | if let Some(cx) = cx { | ||
| 205 | s.register_publisher_waker(cx.waker()); | ||
| 206 | } | ||
| 207 | Err(message) | ||
| 208 | } | ||
| 209 | } | ||
| 210 | }) | ||
| 211 | } | ||
| 212 | |||
| 213 | fn publish_immediate(&self, message: T) { | ||
| 214 | self.inner.lock(|s| { | ||
| 215 | let mut s = s.borrow_mut(); | ||
| 216 | s.publish_immediate(message) | ||
| 217 | }) | ||
| 218 | } | ||
| 219 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | ||
| 221 | self.inner.lock(|s| { | ||
| 222 | let mut s = s.borrow_mut(); | ||
| 223 | s.unregister_subscriber(subscriber_next_message_id) | ||
| 224 | }) | ||
| 225 | } | ||
| 226 | |||
| 227 | fn unregister_publisher(&self) { | ||
| 228 | self.inner.lock(|s| { | ||
| 229 | let mut s = s.borrow_mut(); | ||
| 230 | s.unregister_publisher() | ||
| 231 | }) | ||
| 232 | } | ||
| 233 | } | ||
| 234 | |||
| 235 | /// Internal state for the PubSub channel | ||
| 236 | struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { | ||
| 237 | /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it | ||
| 238 | queue: Deque<(T, usize), CAP>, | ||
| 239 | /// Every message has an id. | ||
| 240 | /// Don't worry, we won't run out. | ||
| 241 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. | ||
| 242 | next_message_id: u64, | ||
| 243 | /// Collection of wakers for Subscribers that are waiting. | ||
| 244 | subscriber_wakers: MultiWakerRegistration<SUBS>, | ||
| 245 | /// Collection of wakers for Publishers that are waiting. | ||
| 246 | publisher_wakers: MultiWakerRegistration<PUBS>, | ||
| 247 | /// The amount of subscribers that are active | ||
| 248 | subscriber_count: usize, | ||
| 249 | /// The amount of publishers that are active | ||
| 250 | publisher_count: usize, | ||
| 251 | } | ||
| 252 | |||
| 253 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { | ||
| 254 | /// Create a new internal channel state | ||
| 255 | const fn new() -> Self { | ||
| 256 | Self { | ||
| 257 | queue: Deque::new(), | ||
| 258 | next_message_id: 0, | ||
| 259 | subscriber_wakers: MultiWakerRegistration::new(), | ||
| 260 | publisher_wakers: MultiWakerRegistration::new(), | ||
| 261 | subscriber_count: 0, | ||
| 262 | publisher_count: 0, | ||
| 263 | } | ||
| 264 | } | ||
| 265 | |||
| 266 | fn try_publish(&mut self, message: T) -> Result<(), T> { | ||
| 267 | if self.subscriber_count == 0 { | ||
| 268 | // We don't need to publish anything because there is no one to receive it | ||
| 269 | return Ok(()); | ||
| 270 | } | ||
| 271 | |||
| 272 | if self.queue.is_full() { | ||
| 273 | return Err(message); | ||
| 274 | } | ||
| 275 | // We just did a check for this | ||
| 276 | self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); | ||
| 277 | |||
| 278 | self.next_message_id += 1; | ||
| 279 | |||
| 280 | // Wake all of the subscribers | ||
| 281 | self.subscriber_wakers.wake(); | ||
| 282 | |||
| 283 | Ok(()) | ||
| 284 | } | ||
| 285 | |||
| 286 | fn publish_immediate(&mut self, message: T) { | ||
| 287 | // Make space in the queue if required | ||
| 288 | if self.queue.is_full() { | ||
| 289 | self.queue.pop_front(); | ||
| 290 | } | ||
| 291 | |||
| 292 | // This will succeed because we made sure there is space | ||
| 293 | self.try_publish(message).ok().unwrap(); | ||
| 294 | } | ||
| 295 | |||
| 296 | fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> { | ||
| 297 | let start_id = self.next_message_id - self.queue.len() as u64; | ||
| 298 | |||
| 299 | if message_id < start_id { | ||
| 300 | return Some(WaitResult::Lagged(start_id - message_id)); | ||
| 301 | } | ||
| 302 | |||
| 303 | let current_message_index = (message_id - start_id) as usize; | ||
| 304 | |||
| 305 | if current_message_index >= self.queue.len() { | ||
| 306 | return None; | ||
| 307 | } | ||
| 308 | |||
| 309 | // We've checked that the index is valid | ||
| 310 | let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); | ||
| 311 | |||
| 312 | // We're reading this item, so decrement the counter | ||
| 313 | queue_item.1 -= 1; | ||
| 314 | let message = queue_item.0.clone(); | ||
| 315 | |||
| 316 | if current_message_index == 0 && queue_item.1 == 0 { | ||
| 317 | self.queue.pop_front(); | ||
| 318 | self.publisher_wakers.wake(); | ||
| 319 | } | ||
| 320 | |||
| 321 | Some(WaitResult::Message(message)) | ||
| 322 | } | ||
| 323 | |||
| 324 | fn register_subscriber_waker(&mut self, waker: &Waker) { | ||
| 325 | match self.subscriber_wakers.register(waker) { | ||
| 326 | Ok(()) => {} | ||
| 327 | Err(_) => { | ||
| 328 | // All waker slots were full. This can only happen when there was a subscriber that now has dropped. | ||
| 329 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 330 | // Any future that is still active will simply reregister. | ||
| 331 | // This won't happen a lot, so it's ok. | ||
| 332 | self.subscriber_wakers.wake(); | ||
| 333 | self.subscriber_wakers.register(waker).unwrap(); | ||
| 334 | } | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | fn register_publisher_waker(&mut self, waker: &Waker) { | ||
| 339 | match self.publisher_wakers.register(waker) { | ||
| 340 | Ok(()) => {} | ||
| 341 | Err(_) => { | ||
| 342 | // All waker slots were full. This can only happen when there was a publisher that now has dropped. | ||
| 343 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 344 | // Any future that is still active will simply reregister. | ||
| 345 | // This won't happen a lot, so it's ok. | ||
| 346 | self.publisher_wakers.wake(); | ||
| 347 | self.publisher_wakers.register(waker).unwrap(); | ||
| 348 | } | ||
| 349 | } | ||
| 350 | } | ||
| 351 | |||
| 352 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { | ||
| 353 | self.subscriber_count -= 1; | ||
| 354 | |||
| 355 | // All messages that haven't been read yet by this subscriber must have their counter decremented | ||
| 356 | let start_id = self.next_message_id - self.queue.len() as u64; | ||
| 357 | if subscriber_next_message_id >= start_id { | ||
| 358 | let current_message_index = (subscriber_next_message_id - start_id) as usize; | ||
| 359 | self.queue | ||
| 360 | .iter_mut() | ||
| 361 | .skip(current_message_index) | ||
| 362 | .for_each(|(_, counter)| *counter -= 1); | ||
| 363 | } | ||
| 364 | } | ||
| 365 | |||
| 366 | fn unregister_publisher(&mut self) { | ||
| 367 | self.publisher_count -= 1; | ||
| 368 | } | ||
| 369 | } | ||
| 370 | |||
| 371 | /// Error type for the [PubSubChannel] | ||
| 372 | #[derive(Debug, PartialEq, Eq, Clone)] | ||
| 373 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 374 | pub enum Error { | ||
| 375 | /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or | ||
| 376 | /// the capacity of the channels must be increased. | ||
| 377 | MaximumSubscribersReached, | ||
| 378 | /// All publisher slots are used. To add another publisher, first another publisher must be dropped or | ||
| 379 | /// the capacity of the channels must be increased. | ||
| 380 | MaximumPublishersReached, | ||
| 381 | } | ||
| 382 | |||
| 383 | /// 'Middle level' behaviour of the pubsub channel. | ||
| 384 | /// This trait is used so that Sub and Pub can be generic over the channel. | ||
| 385 | pub trait PubSubBehavior<T> { | ||
| 386 | /// Try to get a message from the queue with the given message id. | ||
| 387 | /// | ||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | ||
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | ||
| 390 | |||
| 391 | /// Try to publish a message to the queue. | ||
| 392 | /// | ||
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | ||
| 394 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; | ||
| 395 | |||
| 396 | /// Publish a message immediately | ||
| 397 | fn publish_immediate(&self, message: T); | ||
| 398 | |||
| 399 | /// Let the channel know that a subscriber has dropped | ||
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | ||
| 401 | |||
| 402 | /// Let the channel know that a publisher has dropped | ||
| 403 | fn unregister_publisher(&self); | ||
| 404 | } | ||
| 405 | |||
| 406 | /// The result of the subscriber wait procedure | ||
| 407 | #[derive(Debug, Clone, PartialEq, Eq)] | ||
| 408 | #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| 409 | pub enum WaitResult<T> { | ||
| 410 | /// The subscriber did not receive all messages and lagged by the given amount of messages. | ||
| 411 | /// (This is the amount of messages that were missed) | ||
| 412 | Lagged(u64), | ||
| 413 | /// A message was received | ||
| 414 | Message(T), | ||
| 415 | } | ||
| 416 | |||
| 417 | #[cfg(test)] | ||
| 418 | mod tests { | ||
| 419 | use super::*; | ||
| 420 | use crate::blocking_mutex::raw::NoopRawMutex; | ||
| 421 | |||
| 422 | #[futures_test::test] | ||
| 423 | async fn dyn_pub_sub_works() { | ||
| 424 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 425 | |||
| 426 | let mut sub0 = channel.dyn_subscriber().unwrap(); | ||
| 427 | let mut sub1 = channel.dyn_subscriber().unwrap(); | ||
| 428 | let pub0 = channel.dyn_publisher().unwrap(); | ||
| 429 | |||
| 430 | pub0.publish(42).await; | ||
| 431 | |||
| 432 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 433 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 434 | |||
| 435 | assert_eq!(sub0.try_next_message(), None); | ||
| 436 | assert_eq!(sub1.try_next_message(), None); | ||
| 437 | } | ||
| 438 | |||
| 439 | #[futures_test::test] | ||
| 440 | async fn all_subscribers_receive() { | ||
| 441 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 442 | |||
| 443 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 444 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 445 | let pub0 = channel.publisher().unwrap(); | ||
| 446 | |||
| 447 | pub0.publish(42).await; | ||
| 448 | |||
| 449 | assert_eq!(sub0.next_message().await, WaitResult::Message(42)); | ||
| 450 | assert_eq!(sub1.next_message().await, WaitResult::Message(42)); | ||
| 451 | |||
| 452 | assert_eq!(sub0.try_next_message(), None); | ||
| 453 | assert_eq!(sub1.try_next_message(), None); | ||
| 454 | } | ||
| 455 | |||
| 456 | #[futures_test::test] | ||
| 457 | async fn lag_when_queue_full_on_immediate_publish() { | ||
| 458 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 459 | |||
| 460 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 461 | let pub0 = channel.publisher().unwrap(); | ||
| 462 | |||
| 463 | pub0.publish_immediate(42); | ||
| 464 | pub0.publish_immediate(43); | ||
| 465 | pub0.publish_immediate(44); | ||
| 466 | pub0.publish_immediate(45); | ||
| 467 | pub0.publish_immediate(46); | ||
| 468 | pub0.publish_immediate(47); | ||
| 469 | |||
| 470 | assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); | ||
| 471 | assert_eq!(sub0.next_message().await, WaitResult::Message(44)); | ||
| 472 | assert_eq!(sub0.next_message().await, WaitResult::Message(45)); | ||
| 473 | assert_eq!(sub0.next_message().await, WaitResult::Message(46)); | ||
| 474 | assert_eq!(sub0.next_message().await, WaitResult::Message(47)); | ||
| 475 | assert_eq!(sub0.try_next_message(), None); | ||
| 476 | } | ||
| 477 | |||
| 478 | #[test] | ||
| 479 | fn limited_subs_and_pubs() { | ||
| 480 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 481 | |||
| 482 | let sub0 = channel.subscriber(); | ||
| 483 | let sub1 = channel.subscriber(); | ||
| 484 | let sub2 = channel.subscriber(); | ||
| 485 | let sub3 = channel.subscriber(); | ||
| 486 | let sub4 = channel.subscriber(); | ||
| 487 | |||
| 488 | assert!(sub0.is_ok()); | ||
| 489 | assert!(sub1.is_ok()); | ||
| 490 | assert!(sub2.is_ok()); | ||
| 491 | assert!(sub3.is_ok()); | ||
| 492 | assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); | ||
| 493 | |||
| 494 | drop(sub0); | ||
| 495 | |||
| 496 | let sub5 = channel.subscriber(); | ||
| 497 | assert!(sub5.is_ok()); | ||
| 498 | |||
| 499 | // publishers | ||
| 500 | |||
| 501 | let pub0 = channel.publisher(); | ||
| 502 | let pub1 = channel.publisher(); | ||
| 503 | let pub2 = channel.publisher(); | ||
| 504 | let pub3 = channel.publisher(); | ||
| 505 | let pub4 = channel.publisher(); | ||
| 506 | |||
| 507 | assert!(pub0.is_ok()); | ||
| 508 | assert!(pub1.is_ok()); | ||
| 509 | assert!(pub2.is_ok()); | ||
| 510 | assert!(pub3.is_ok()); | ||
| 511 | assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); | ||
| 512 | |||
| 513 | drop(pub0); | ||
| 514 | |||
| 515 | let pub5 = channel.publisher(); | ||
| 516 | assert!(pub5.is_ok()); | ||
| 517 | } | ||
| 518 | |||
| 519 | #[test] | ||
| 520 | fn publisher_wait_on_full_queue() { | ||
| 521 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 522 | |||
| 523 | let pub0 = channel.publisher().unwrap(); | ||
| 524 | |||
| 525 | // There are no subscribers, so the queue will never be full | ||
| 526 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 527 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 528 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 529 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 530 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 531 | |||
| 532 | let sub0 = channel.subscriber().unwrap(); | ||
| 533 | |||
| 534 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 535 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 536 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 537 | assert_eq!(pub0.try_publish(0), Ok(())); | ||
| 538 | assert_eq!(pub0.try_publish(0), Err(0)); | ||
| 539 | |||
| 540 | drop(sub0); | ||
| 541 | } | ||
| 542 | } | ||
diff --git a/embassy-sync/src/channel/pubsub/publisher.rs b/embassy-sync/src/channel/pubsub/publisher.rs new file mode 100644 index 000000000..705797f60 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/publisher.rs | |||
| @@ -0,0 +1,182 @@ | |||
| 1 | //! Implementation of anything directly publisher related | ||
| 2 | |||
| 3 | use core::future::Future; | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | use core::ops::{Deref, DerefMut}; | ||
| 6 | use core::pin::Pin; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use super::{PubSubBehavior, PubSubChannel}; | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | |||
| 12 | /// A publisher to a channel | ||
| 13 | pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 14 | /// The channel we are a publisher for | ||
| 15 | channel: &'a PSB, | ||
| 16 | _phantom: PhantomData<T>, | ||
| 17 | } | ||
| 18 | |||
| 19 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | ||
| 20 | pub(super) fn new(channel: &'a PSB) -> Self { | ||
| 21 | Self { | ||
| 22 | channel, | ||
| 23 | _phantom: Default::default(), | ||
| 24 | } | ||
| 25 | } | ||
| 26 | |||
| 27 | /// Publish a message right now even when the queue is full. | ||
| 28 | /// This may cause a subscriber to miss an older message. | ||
| 29 | pub fn publish_immediate(&self, message: T) { | ||
| 30 | self.channel.publish_immediate(message) | ||
| 31 | } | ||
| 32 | |||
| 33 | /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message | ||
| 34 | pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { | ||
| 35 | PublisherWaitFuture { | ||
| 36 | message: Some(message), | ||
| 37 | publisher: self, | ||
| 38 | } | ||
| 39 | } | ||
| 40 | |||
| 41 | /// Publish a message if there is space in the message queue | ||
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | ||
| 43 | self.channel.publish_with_context(message, None) | ||
| 44 | } | ||
| 45 | } | ||
| 46 | |||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | ||
| 48 | fn drop(&mut self) { | ||
| 49 | self.channel.unregister_publisher() | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | /// A publisher that holds a dynamic reference to the channel | ||
| 54 | pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 55 | |||
| 56 | impl<'a, T: Clone> Deref for DynPublisher<'a, T> { | ||
| 57 | type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 58 | |||
| 59 | fn deref(&self) -> &Self::Target { | ||
| 60 | &self.0 | ||
| 61 | } | ||
| 62 | } | ||
| 63 | |||
| 64 | impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { | ||
| 65 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 66 | &mut self.0 | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | /// A publisher that holds a generic reference to the channel | ||
| 71 | pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 72 | pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 73 | ); | ||
| 74 | |||
| 75 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 76 | for Publisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 77 | { | ||
| 78 | type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 79 | |||
| 80 | fn deref(&self) -> &Self::Target { | ||
| 81 | &self.0 | ||
| 82 | } | ||
| 83 | } | ||
| 84 | |||
| 85 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 86 | for Publisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 87 | { | ||
| 88 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 89 | &mut self.0 | ||
| 90 | } | ||
| 91 | } | ||
| 92 | |||
| 93 | /// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. | ||
| 94 | /// (So an infinite amount is possible) | ||
| 95 | pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 96 | /// The channel we are a publisher for | ||
| 97 | channel: &'a PSB, | ||
| 98 | _phantom: PhantomData<T>, | ||
| 99 | } | ||
| 100 | |||
| 101 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { | ||
| 102 | pub(super) fn new(channel: &'a PSB) -> Self { | ||
| 103 | Self { | ||
| 104 | channel, | ||
| 105 | _phantom: Default::default(), | ||
| 106 | } | ||
| 107 | } | ||
| 108 | /// Publish the message right now even when the queue is full. | ||
| 109 | /// This may cause a subscriber to miss an older message. | ||
| 110 | pub fn publish_immediate(&self, message: T) { | ||
| 111 | self.channel.publish_immediate(message) | ||
| 112 | } | ||
| 113 | |||
| 114 | /// Publish a message if there is space in the message queue | ||
| 115 | pub fn try_publish(&self, message: T) -> Result<(), T> { | ||
| 116 | self.channel.publish_with_context(message, None) | ||
| 117 | } | ||
| 118 | } | ||
| 119 | |||
| 120 | /// An immediate publisher that holds a dynamic reference to the channel | ||
| 121 | pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 122 | |||
| 123 | impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { | ||
| 124 | type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 125 | |||
| 126 | fn deref(&self) -> &Self::Target { | ||
| 127 | &self.0 | ||
| 128 | } | ||
| 129 | } | ||
| 130 | |||
| 131 | impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { | ||
| 132 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 133 | &mut self.0 | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | /// An immediate publisher that holds a generic reference to the channel | ||
| 138 | pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 139 | pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 140 | ); | ||
| 141 | |||
| 142 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 143 | for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 144 | { | ||
| 145 | type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 146 | |||
| 147 | fn deref(&self) -> &Self::Target { | ||
| 148 | &self.0 | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 153 | for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> | ||
| 154 | { | ||
| 155 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 156 | &mut self.0 | ||
| 157 | } | ||
| 158 | } | ||
| 159 | |||
| 160 | /// Future for the publisher wait action | ||
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 162 | /// The message we need to publish | ||
| 163 | message: Option<T>, | ||
| 164 | publisher: &'s Pub<'a, PSB, T>, | ||
| 165 | } | ||
| 166 | |||
| 167 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { | ||
| 168 | type Output = (); | ||
| 169 | |||
| 170 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 171 | let message = self.message.take().unwrap(); | ||
| 172 | match self.publisher.channel.publish_with_context(message, Some(cx)) { | ||
| 173 | Ok(()) => Poll::Ready(()), | ||
| 174 | Err(message) => { | ||
| 175 | self.message = Some(message); | ||
| 176 | Poll::Pending | ||
| 177 | } | ||
| 178 | } | ||
| 179 | } | ||
| 180 | } | ||
| 181 | |||
| 182 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} | ||
diff --git a/embassy-sync/src/channel/pubsub/subscriber.rs b/embassy-sync/src/channel/pubsub/subscriber.rs new file mode 100644 index 000000000..b9a2cbe18 --- /dev/null +++ b/embassy-sync/src/channel/pubsub/subscriber.rs | |||
| @@ -0,0 +1,152 @@ | |||
| 1 | //! Implementation of anything directly subscriber related | ||
| 2 | |||
| 3 | use core::future::Future; | ||
| 4 | use core::marker::PhantomData; | ||
| 5 | use core::ops::{Deref, DerefMut}; | ||
| 6 | use core::pin::Pin; | ||
| 7 | use core::task::{Context, Poll}; | ||
| 8 | |||
| 9 | use super::{PubSubBehavior, PubSubChannel, WaitResult}; | ||
| 10 | use crate::blocking_mutex::raw::RawMutex; | ||
| 11 | |||
| 12 | /// A subscriber to a channel | ||
| 13 | pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 14 | /// The message id of the next message we are yet to receive | ||
| 15 | next_message_id: u64, | ||
| 16 | /// The channel we are a subscriber to | ||
| 17 | channel: &'a PSB, | ||
| 18 | _phantom: PhantomData<T>, | ||
| 19 | } | ||
| 20 | |||
| 21 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | ||
| 22 | pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { | ||
| 23 | Self { | ||
| 24 | next_message_id, | ||
| 25 | channel, | ||
| 26 | _phantom: Default::default(), | ||
| 27 | } | ||
| 28 | } | ||
| 29 | |||
| 30 | /// Wait for a published message | ||
| 31 | pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { | ||
| 32 | SubscriberWaitFuture { subscriber: self } | ||
| 33 | } | ||
| 34 | |||
| 35 | /// Wait for a published message (ignoring lag results) | ||
| 36 | pub async fn next_message_pure(&mut self) -> T { | ||
| 37 | loop { | ||
| 38 | match self.next_message().await { | ||
| 39 | WaitResult::Lagged(_) => continue, | ||
| 40 | WaitResult::Message(message) => break message, | ||
| 41 | } | ||
| 42 | } | ||
| 43 | } | ||
| 44 | |||
| 45 | /// Try to see if there's a published message we haven't received yet. | ||
| 46 | /// | ||
| 47 | /// This function does not peek. The message is received if there is one. | ||
| 48 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { | ||
| 49 | match self.channel.get_message_with_context(&mut self.next_message_id, None) { | ||
| 50 | Poll::Ready(result) => Some(result), | ||
| 51 | Poll::Pending => None, | ||
| 52 | } | ||
| 53 | } | ||
| 54 | |||
| 55 | /// Try to see if there's a published message we haven't received yet (ignoring lag results). | ||
| 56 | /// | ||
| 57 | /// This function does not peek. The message is received if there is one. | ||
| 58 | pub fn try_next_message_pure(&mut self) -> Option<T> { | ||
| 59 | loop { | ||
| 60 | match self.try_next_message() { | ||
| 61 | Some(WaitResult::Lagged(_)) => continue, | ||
| 62 | Some(WaitResult::Message(message)) => break Some(message), | ||
| 63 | None => break None, | ||
| 64 | } | ||
| 65 | } | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | ||
| 70 | fn drop(&mut self) { | ||
| 71 | self.channel.unregister_subscriber(self.next_message_id) | ||
| 72 | } | ||
| 73 | } | ||
| 74 | |||
| 75 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} | ||
| 76 | |||
| 77 | /// Warning: The stream implementation ignores lag results and returns all messages. | ||
| 78 | /// This might miss some messages without you knowing it. | ||
| 79 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { | ||
| 80 | type Item = T; | ||
| 81 | |||
| 82 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| 83 | match self | ||
| 84 | .channel | ||
| 85 | .get_message_with_context(&mut self.next_message_id, Some(cx)) | ||
| 86 | { | ||
| 87 | Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), | ||
| 88 | Poll::Ready(WaitResult::Lagged(_)) => { | ||
| 89 | cx.waker().wake_by_ref(); | ||
| 90 | Poll::Pending | ||
| 91 | } | ||
| 92 | Poll::Pending => Poll::Pending, | ||
| 93 | } | ||
| 94 | } | ||
| 95 | } | ||
| 96 | |||
| 97 | /// A subscriber that holds a dynamic reference to the channel | ||
| 98 | pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>); | ||
| 99 | |||
| 100 | impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { | ||
| 101 | type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>; | ||
| 102 | |||
| 103 | fn deref(&self) -> &Self::Target { | ||
| 104 | &self.0 | ||
| 105 | } | ||
| 106 | } | ||
| 107 | |||
| 108 | impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { | ||
| 109 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 110 | &mut self.0 | ||
| 111 | } | ||
| 112 | } | ||
| 113 | |||
| 114 | /// A subscriber that holds a generic reference to the channel | ||
| 115 | pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( | ||
| 116 | pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>, | ||
| 117 | ); | ||
| 118 | |||
| 119 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref | ||
| 120 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> | ||
| 121 | { | ||
| 122 | type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>; | ||
| 123 | |||
| 124 | fn deref(&self) -> &Self::Target { | ||
| 125 | &self.0 | ||
| 126 | } | ||
| 127 | } | ||
| 128 | |||
| 129 | impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut | ||
| 130 | for Subscriber<'a, M, T, CAP, SUBS, PUBS> | ||
| 131 | { | ||
| 132 | fn deref_mut(&mut self) -> &mut Self::Target { | ||
| 133 | &mut self.0 | ||
| 134 | } | ||
| 135 | } | ||
| 136 | |||
| 137 | /// Future for the subscriber wait action | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | ||
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | ||
| 140 | } | ||
| 141 | |||
| 142 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { | ||
| 143 | type Output = WaitResult<T>; | ||
| 144 | |||
| 145 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 146 | self.subscriber | ||
| 147 | .channel | ||
| 148 | .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} | ||
diff --git a/embassy-sync/src/channel/signal.rs b/embassy-sync/src/channel/signal.rs new file mode 100644 index 000000000..9279266c1 --- /dev/null +++ b/embassy-sync/src/channel/signal.rs | |||
| @@ -0,0 +1,100 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to a task. | ||
| 2 | use core::cell::UnsafeCell; | ||
| 3 | use core::future::Future; | ||
| 4 | use core::mem; | ||
| 5 | use core::task::{Context, Poll, Waker}; | ||
| 6 | |||
| 7 | /// Single-slot signaling primitive. | ||
| 8 | /// | ||
| 9 | /// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except | ||
| 10 | /// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead | ||
| 11 | /// of waiting for the receiver to pop the previous value. | ||
| 12 | /// | ||
| 13 | /// It is useful for sending data between tasks when the receiver only cares about | ||
| 14 | /// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" | ||
| 15 | /// updates. | ||
| 16 | /// | ||
| 17 | /// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. | ||
| 18 | /// | ||
| 19 | /// Signals are generally declared as `static`s and then borrowed as required. | ||
| 20 | /// | ||
| 21 | /// ``` | ||
| 22 | /// use embassy_sync::channel::signal::Signal; | ||
| 23 | /// | ||
| 24 | /// enum SomeCommand { | ||
| 25 | /// On, | ||
| 26 | /// Off, | ||
| 27 | /// } | ||
| 28 | /// | ||
| 29 | /// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new(); | ||
| 30 | /// ``` | ||
| 31 | pub struct Signal<T> { | ||
| 32 | state: UnsafeCell<State<T>>, | ||
| 33 | } | ||
| 34 | |||
| 35 | enum State<T> { | ||
| 36 | None, | ||
| 37 | Waiting(Waker), | ||
| 38 | Signaled(T), | ||
| 39 | } | ||
| 40 | |||
| 41 | unsafe impl<T: Send> Send for Signal<T> {} | ||
| 42 | unsafe impl<T: Send> Sync for Signal<T> {} | ||
| 43 | |||
| 44 | impl<T> Signal<T> { | ||
| 45 | /// Create a new `Signal`. | ||
| 46 | pub const fn new() -> Self { | ||
| 47 | Self { | ||
| 48 | state: UnsafeCell::new(State::None), | ||
| 49 | } | ||
| 50 | } | ||
| 51 | } | ||
| 52 | |||
| 53 | impl<T: Send> Signal<T> { | ||
| 54 | /// Mark this Signal as signaled. | ||
| 55 | pub fn signal(&self, val: T) { | ||
| 56 | critical_section::with(|_| unsafe { | ||
| 57 | let state = &mut *self.state.get(); | ||
| 58 | if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { | ||
| 59 | waker.wake(); | ||
| 60 | } | ||
| 61 | }) | ||
| 62 | } | ||
| 63 | |||
| 64 | /// Remove the queued value in this `Signal`, if any. | ||
| 65 | pub fn reset(&self) { | ||
| 66 | critical_section::with(|_| unsafe { | ||
| 67 | let state = &mut *self.state.get(); | ||
| 68 | *state = State::None | ||
| 69 | }) | ||
| 70 | } | ||
| 71 | |||
| 72 | /// Manually poll the Signal future. | ||
| 73 | pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 74 | critical_section::with(|_| unsafe { | ||
| 75 | let state = &mut *self.state.get(); | ||
| 76 | match state { | ||
| 77 | State::None => { | ||
| 78 | *state = State::Waiting(cx.waker().clone()); | ||
| 79 | Poll::Pending | ||
| 80 | } | ||
| 81 | State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, | ||
| 82 | State::Waiting(_) => panic!("waker overflow"), | ||
| 83 | State::Signaled(_) => match mem::replace(state, State::None) { | ||
| 84 | State::Signaled(res) => Poll::Ready(res), | ||
| 85 | _ => unreachable!(), | ||
| 86 | }, | ||
| 87 | } | ||
| 88 | }) | ||
| 89 | } | ||
| 90 | |||
| 91 | /// Future that completes when this Signal has been signaled. | ||
| 92 | pub fn wait(&self) -> impl Future<Output = T> + '_ { | ||
| 93 | futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) | ||
| 94 | } | ||
| 95 | |||
| 96 | /// non-blocking method to check whether this signal has been signaled. | ||
| 97 | pub fn signaled(&self) -> bool { | ||
| 98 | critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) | ||
| 99 | } | ||
| 100 | } | ||
