From 5677b13a86beca58aa57ecfd7cea0db7ceb189fa Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 22 Aug 2022 22:00:06 +0200 Subject: sync: flatten module structure. --- embassy-sync/src/channel.rs | 596 ++++++++++++++++++++++++++ embassy-sync/src/channel/mod.rs | 5 - embassy-sync/src/channel/mpmc.rs | 596 -------------------------- embassy-sync/src/channel/pubsub/mod.rs | 542 ----------------------- embassy-sync/src/channel/pubsub/publisher.rs | 182 -------- embassy-sync/src/channel/pubsub/subscriber.rs | 152 ------- embassy-sync/src/channel/signal.rs | 100 ----- embassy-sync/src/lib.rs | 2 + embassy-sync/src/pubsub/mod.rs | 542 +++++++++++++++++++++++ embassy-sync/src/pubsub/publisher.rs | 182 ++++++++ embassy-sync/src/pubsub/subscriber.rs | 152 +++++++ embassy-sync/src/signal.rs | 100 +++++ 12 files changed, 1574 insertions(+), 1577 deletions(-) create mode 100644 embassy-sync/src/channel.rs delete mode 100644 embassy-sync/src/channel/mod.rs delete mode 100644 embassy-sync/src/channel/mpmc.rs delete mode 100644 embassy-sync/src/channel/pubsub/mod.rs delete mode 100644 embassy-sync/src/channel/pubsub/publisher.rs delete mode 100644 embassy-sync/src/channel/pubsub/subscriber.rs delete mode 100644 embassy-sync/src/channel/signal.rs create mode 100644 embassy-sync/src/pubsub/mod.rs create mode 100644 embassy-sync/src/pubsub/publisher.rs create mode 100644 embassy-sync/src/pubsub/subscriber.rs create mode 100644 embassy-sync/src/signal.rs (limited to 'embassy-sync/src') diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs new file mode 100644 index 000000000..76f42d0e7 --- /dev/null +++ b/embassy-sync/src/channel.rs @@ -0,0 +1,596 @@ +//! A queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! Receivers are competing for messages. So a message that is received by +//! one receiver is not received by any other. +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. +//! + +use core::cell::RefCell; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use heapless::Deque; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::WakerRegistration; + +/// Send-only access to a [`Channel`]. +#[derive(Copy)] +pub struct Sender<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Sender { channel: self.channel } + } +} + +impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> +where + M: RawMutex, +{ + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { + self.channel.send(message) + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send(message) + } +} + +/// Send-only access to a [`Channel`] without knowing channel size. +#[derive(Copy)] +pub struct DynamicSender<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for DynamicSender<'ch, T> { + fn clone(&self) -> Self { + DynamicSender { channel: self.channel } + } +} + +impl<'ch, M, T, const N: usize> From> for DynamicSender<'ch, T> +where + M: RawMutex, +{ + fn from(s: Sender<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +impl<'ch, T> DynamicSender<'ch, T> { + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { + DynamicSendFuture { + channel: self.channel, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send_with_context(message, None) + } +} + +/// Receive-only access to a [`Channel`]. +#[derive(Copy)] +pub struct Receiver<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> +where + M: RawMutex, +{ + fn clone(&self) -> Self { + Receiver { channel: self.channel } + } +} + +impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> +where + M: RawMutex, +{ + /// Receive the next value. + /// + /// See [`Channel::recv()`]. + pub fn recv(&self) -> RecvFuture<'_, M, T, N> { + self.channel.recv() + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_recv()`] + pub fn try_recv(&self) -> Result { + self.channel.try_recv() + } +} + +/// Receive-only access to a [`Channel`] without knowing channel size. +#[derive(Copy)] +pub struct DynamicReceiver<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for DynamicReceiver<'ch, T> { + fn clone(&self) -> Self { + DynamicReceiver { channel: self.channel } + } +} + +impl<'ch, T> DynamicReceiver<'ch, T> { + /// Receive the next value. + /// + /// See [`Channel::recv()`]. + pub fn recv(&self) -> DynamicRecvFuture<'_, T> { + DynamicRecvFuture { channel: self.channel } + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_recv()`] + pub fn try_recv(&self) -> Result { + self.channel.try_recv_with_context(None) + } +} + +impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> +where + M: RawMutex, +{ + fn from(s: Receiver<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. +pub struct RecvFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, +} + +impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.channel.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + } + } +} + +/// Future returned by [`DynamicReceiver::recv`]. +pub struct DynamicRecvFuture<'ch, T> { + channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.channel.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(v), + Err(TryRecvError::Empty) => Poll::Pending, + } + } +} + +/// Future returned by [`Channel::send`] and [`Sender::send`]. +pub struct SendFuture<'ch, M, T, const N: usize> +where + M: RawMutex, +{ + channel: &'ch Channel, + message: Option, +} + +impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> +where + M: RawMutex, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} + +/// Future returned by [`DynamicSender::send`]. +pub struct DynamicSendFuture<'ch, T> { + channel: &'ch dyn DynamicChannel, + message: Option, +} + +impl<'ch, T> Future for DynamicSendFuture<'ch, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.message.take() { + Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { + Ok(..) => Poll::Ready(()), + Err(TrySendError::Full(m)) => { + self.message = Some(m); + Poll::Pending + } + }, + None => panic!("Message cannot be None"), + } + } +} + +impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} + +trait DynamicChannel { + fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; +} + +/// Error returned by [`try_recv`](Channel::try_recv). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TryRecvError { + /// A message could not be received because the channel is empty. + Empty, +} + +/// Error returned by [`try_send`](Channel::try_send). +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum TrySendError { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), +} + +struct ChannelState { + queue: Deque, + receiver_waker: WakerRegistration, + senders_waker: WakerRegistration, +} + +impl ChannelState { + const fn new() -> Self { + ChannelState { + queue: Deque::new(), + receiver_waker: WakerRegistration::new(), + senders_waker: WakerRegistration::new(), + } + } + + fn try_recv(&mut self) -> Result { + self.try_recv_with_context(None) + } + + fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop_front() { + Ok(message) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryRecvError::Empty) + } + } + + fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + self.try_send_with_context(message, None) + } + + fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + match self.queue.push_back(message) { + Ok(()) => { + self.receiver_waker.wake(); + Ok(()) + } + Err(message) => { + if let Some(cx) = cx { + self.senders_waker.register(cx.waker()); + } + Err(TrySendError::Full(message)) + } + } + } +} + +/// A bounded channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +pub struct Channel +where + M: RawMutex, +{ + inner: Mutex>>, +} + +impl Channel +where + M: RawMutex, +{ + /// Establish a new bounded channel. For example, to create one with a NoopMutex: + /// + /// ``` + /// use embassy_sync::channel::Channel; + /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; + /// + /// // Declare a bounded channel of 3 u32s. + /// let mut channel = Channel::::new(); + /// ``` + pub const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(ChannelState::new())), + } + } + + fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { + self.inner.lock(|rc| f(&mut *rc.borrow_mut())) + } + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + self.lock(|c| c.try_recv_with_context(cx)) + } + + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + self.lock(|c| c.try_send_with_context(m, cx)) + } + + /// Get a sender for this channel. + pub fn sender(&self) -> Sender<'_, M, T, N> { + Sender { channel: self } + } + + /// Get a receiver for this channel. + pub fn receiver(&self) -> Receiver<'_, M, T, N> { + Receiver { channel: self } + } + + /// Send a value, waiting until there is capacity. + /// + /// Sending completes when the value has been pushed to the channel's queue. + /// This doesn't mean the value has been received yet. + pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { + SendFuture { + channel: self, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// This method differs from [`send`](Channel::send) by returning immediately if the channel's + /// buffer is full, instead of waiting. + /// + /// # Errors + /// + /// If the channel capacity has been reached, i.e., the channel has `n` + /// buffered values where `n` is the argument passed to [`Channel`], then an + /// error is returned. + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.lock(|c| c.try_send(message)) + } + + /// Receive the next value. + /// + /// If there are no messages in the channel's buffer, this method will + /// wait until a message is sent. + pub fn recv(&self) -> RecvFuture<'_, M, T, N> { + RecvFuture { channel: self } + } + + /// Attempt to immediately receive a message. + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + pub fn try_recv(&self) -> Result { + self.lock(|c| c.try_recv()) + } +} + +/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the +/// tradeoff cost of dynamic dispatch. +impl DynamicChannel for Channel +where + M: RawMutex, +{ + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { + Channel::try_send_with_context(self, m, cx) + } + + fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { + Channel::try_recv_with_context(self, cx) + } +} + +#[cfg(test)] +mod tests { + use core::time::Duration; + + use futures_executor::ThreadPool; + use futures_timer::Delay; + use futures_util::task::SpawnExt; + use static_cell::StaticCell; + + use super::*; + use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; + + fn capacity(c: &ChannelState) -> usize { + c.queue.capacity() - c.queue.len() + } + + #[test] + fn sending_once() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(capacity(&c), 2); + } + + #[test] + fn sending_when_full() { + let mut c = ChannelState::::new(); + let _ = c.try_send(1); + let _ = c.try_send(1); + let _ = c.try_send(1); + match c.try_send(2) { + Err(TrySendError::Full(2)) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 0); + } + + #[test] + fn receiving_once_with_one_send() { + let mut c = ChannelState::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + assert_eq!(capacity(&c), 3); + } + + #[test] + fn receiving_when_empty() { + let mut c = ChannelState::::new(); + match c.try_recv() { + Err(TryRecvError::Empty) => assert!(true), + _ => assert!(false), + } + assert_eq!(capacity(&c), 3); + } + + #[test] + fn simple_send_and_receive() { + let c = Channel::::new(); + assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_recv().unwrap(), 1); + } + + #[test] + fn cloning() { + let c = Channel::::new(); + let r1 = c.receiver(); + let s1 = c.sender(); + + let _ = r1.clone(); + let _ = s1.clone(); + } + + #[test] + fn dynamic_dispatch() { + let c = Channel::::new(); + let s: DynamicSender<'_, u32> = c.sender().into(); + let r: DynamicReceiver<'_, u32> = c.receiver().into(); + + assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_recv().unwrap(), 1); + } + + #[futures_test::test] + async fn receiver_receives_given_try_send_async() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(Channel::new()); + let c2 = c; + assert!(executor + .spawn(async move { + assert!(c2.try_send(1).is_ok()); + }) + .is_ok()); + assert_eq!(c.recv().await, 1); + } + + #[futures_test::test] + async fn sender_send_completes_if_capacity() { + let c = Channel::::new(); + c.send(1).await; + assert_eq!(c.recv().await, 1); + } + + #[futures_test::test] + async fn senders_sends_wait_until_capacity() { + let executor = ThreadPool::new().unwrap(); + + static CHANNEL: StaticCell> = StaticCell::new(); + let c = &*CHANNEL.init(Channel::new()); + assert!(c.try_send(1).is_ok()); + + let c2 = c; + let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); + let c2 = c; + let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); + // Wish I could think of a means of determining that the async send is waiting instead. + // However, I've used the debugger to observe that the send does indeed wait. + Delay::new(Duration::from_millis(500)).await; + assert_eq!(c.recv().await, 1); + assert!(executor + .spawn(async move { + loop { + c.recv().await; + } + }) + .is_ok()); + send_task_1.unwrap().await; + send_task_2.unwrap().await; + } +} diff --git a/embassy-sync/src/channel/mod.rs b/embassy-sync/src/channel/mod.rs deleted file mode 100644 index 5df1f5c5c..000000000 --- a/embassy-sync/src/channel/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Async channels - -pub mod mpmc; -pub mod pubsub; -pub mod signal; diff --git a/embassy-sync/src/channel/mpmc.rs b/embassy-sync/src/channel/mpmc.rs deleted file mode 100644 index 7bebd3412..000000000 --- a/embassy-sync/src/channel/mpmc.rs +++ /dev/null @@ -1,596 +0,0 @@ -//! A queue for sending values between asynchronous tasks. -//! -//! It can be used concurrently by multiple producers (senders) and multiple -//! consumers (receivers), i.e. it is an "MPMC channel". -//! -//! Receivers are competing for messages. So a message that is received by -//! one receiver is not received by any other. -//! -//! This queue takes a Mutex type so that various -//! targets can be attained. For example, a ThreadModeMutex can be used -//! for single-core Cortex-M targets where messages are only passed -//! between tasks running in thread mode. Similarly, a CriticalSectionMutex -//! can also be used for single-core targets where messages are to be -//! passed from exception mode e.g. out of an interrupt handler. -//! -//! This module provides a bounded channel that has a limit on the number of -//! messages that it can store, and if this limit is reached, trying to send -//! another message will result in an error being returned. -//! - -use core::cell::RefCell; -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use heapless::Deque; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::WakerRegistration; - -/// Send-only access to a [`Channel`]. -#[derive(Copy)] -pub struct Sender<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Sender { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> -where - M: RawMutex, -{ - /// Sends a value. - /// - /// See [`Channel::send()`] - pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { - self.channel.send(message) - } - - /// Attempt to immediately send a message. - /// - /// See [`Channel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send(message) - } -} - -/// Send-only access to a [`Channel`] without knowing channel size. -#[derive(Copy)] -pub struct DynamicSender<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicSender<'ch, T> { - fn clone(&self) -> Self { - DynamicSender { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> From> for DynamicSender<'ch, T> -where - M: RawMutex, -{ - fn from(s: Sender<'ch, M, T, N>) -> Self { - Self { channel: s.channel } - } -} - -impl<'ch, T> DynamicSender<'ch, T> { - /// Sends a value. - /// - /// See [`Channel::send()`] - pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { - DynamicSendFuture { - channel: self.channel, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// See [`Channel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send_with_context(message, None) - } -} - -/// Receive-only access to a [`Channel`]. -#[derive(Copy)] -pub struct Receiver<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Receiver { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - /// Receive the next value. - /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - self.channel.recv() - } - - /// Attempt to immediately receive the next value. - /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv() - } -} - -/// Receive-only access to a [`Channel`] without knowing channel size. -#[derive(Copy)] -pub struct DynamicReceiver<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicReceiver<'ch, T> { - fn clone(&self) -> Self { - DynamicReceiver { channel: self.channel } - } -} - -impl<'ch, T> DynamicReceiver<'ch, T> { - /// Receive the next value. - /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> DynamicRecvFuture<'_, T> { - DynamicRecvFuture { channel: self.channel } - } - - /// Attempt to immediately receive the next value. - /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv_with_context(None) - } -} - -impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> -where - M: RawMutex, -{ - fn from(s: Receiver<'ch, M, T, N>) -> Self { - Self { channel: s.channel } - } -} - -/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. -pub struct RecvFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } - } -} - -/// Future returned by [`DynamicReceiver::recv`]. -pub struct DynamicRecvFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } - } -} - -/// Future returned by [`Channel::send`] and [`Sender::send`]. -pub struct SendFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, - message: Option, -} - -impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} - -/// Future returned by [`DynamicSender::send`]. -pub struct DynamicSendFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, - message: Option, -} - -impl<'ch, T> Future for DynamicSendFuture<'ch, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} - -trait DynamicChannel { - fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; -} - -/// Error returned by [`try_recv`](Channel::try_recv). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryRecvError { - /// A message could not be received because the channel is empty. - Empty, -} - -/// Error returned by [`try_send`](Channel::try_send). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), -} - -struct ChannelState { - queue: Deque, - receiver_waker: WakerRegistration, - senders_waker: WakerRegistration, -} - -impl ChannelState { - const fn new() -> Self { - ChannelState { - queue: Deque::new(), - receiver_waker: WakerRegistration::new(), - senders_waker: WakerRegistration::new(), - } - } - - fn try_recv(&mut self) -> Result { - self.try_recv_with_context(None) - } - - fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { - if self.queue.is_full() { - self.senders_waker.wake(); - } - - if let Some(message) = self.queue.pop_front() { - Ok(message) - } else { - if let Some(cx) = cx { - self.receiver_waker.register(cx.waker()); - } - Err(TryRecvError::Empty) - } - } - - fn try_send(&mut self, message: T) -> Result<(), TrySendError> { - self.try_send_with_context(message, None) - } - - fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - match self.queue.push_back(message) { - Ok(()) => { - self.receiver_waker.wake(); - Ok(()) - } - Err(message) => { - if let Some(cx) = cx { - self.senders_waker.register(cx.waker()); - } - Err(TrySendError::Full(message)) - } - } - } -} - -/// A bounded channel for communicating between asynchronous tasks -/// with backpressure. -/// -/// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is -/// received from the channel. -/// -/// All data sent will become available in the same order as it was sent. -pub struct Channel -where - M: RawMutex, -{ - inner: Mutex>>, -} - -impl Channel -where - M: RawMutex, -{ - /// Establish a new bounded channel. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy_sync::channel::mpmc::Channel; - /// use embassy_sync::blocking_mutex::raw::NoopRawMutex; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::new(); - /// ``` - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(ChannelState::new())), - } - } - - fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) - } - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - self.lock(|c| c.try_recv_with_context(cx)) - } - - fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - self.lock(|c| c.try_send_with_context(m, cx)) - } - - /// Get a sender for this channel. - pub fn sender(&self) -> Sender<'_, M, T, N> { - Sender { channel: self } - } - - /// Get a receiver for this channel. - pub fn receiver(&self) -> Receiver<'_, M, T, N> { - Receiver { channel: self } - } - - /// Send a value, waiting until there is capacity. - /// - /// Sending completes when the value has been pushed to the channel's queue. - /// This doesn't mean the value has been received yet. - pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { - SendFuture { - channel: self, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// This method differs from [`send`](Channel::send) by returning immediately if the channel's - /// buffer is full, instead of waiting. - /// - /// # Errors - /// - /// If the channel capacity has been reached, i.e., the channel has `n` - /// buffered values where `n` is the argument passed to [`Channel`], then an - /// error is returned. - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.lock(|c| c.try_send(message)) - } - - /// Receive the next value. - /// - /// If there are no messages in the channel's buffer, this method will - /// wait until a message is sent. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - RecvFuture { channel: self } - } - - /// Attempt to immediately receive a message. - /// - /// This method will either receive a message from the channel immediately or return an error - /// if the channel is empty. - pub fn try_recv(&self) -> Result { - self.lock(|c| c.try_recv()) - } -} - -/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the -/// tradeoff cost of dynamic dispatch. -impl DynamicChannel for Channel -where - M: RawMutex, -{ - fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - Channel::try_send_with_context(self, m, cx) - } - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - Channel::try_recv_with_context(self, cx) - } -} - -#[cfg(test)] -mod tests { - use core::time::Duration; - - use futures_executor::ThreadPool; - use futures_timer::Delay; - use futures_util::task::SpawnExt; - use static_cell::StaticCell; - - use super::*; - use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - - fn capacity(c: &ChannelState) -> usize { - c.queue.capacity() - c.queue.len() - } - - #[test] - fn sending_once() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn sending_when_full() { - let mut c = ChannelState::::new(); - let _ = c.try_send(1); - let _ = c.try_send(1); - let _ = c.try_send(1); - match c.try_send(2) { - Err(TrySendError::Full(2)) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 0); - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = ChannelState::::new(); - match c.try_recv() { - Err(TryRecvError::Empty) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 3); - } - - #[test] - fn simple_send_and_receive() { - let c = Channel::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - } - - #[test] - fn cloning() { - let c = Channel::::new(); - let r1 = c.receiver(); - let s1 = c.sender(); - - let _ = r1.clone(); - let _ = s1.clone(); - } - - #[test] - fn dynamic_dispatch() { - let c = Channel::::new(); - let s: DynamicSender<'_, u32> = c.sender().into(); - let r: DynamicReceiver<'_, u32> = c.receiver().into(); - - assert!(s.try_send(1).is_ok()); - assert_eq!(r.try_recv().unwrap(), 1); - } - - #[futures_test::test] - async fn receiver_receives_given_try_send_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Channel::new()); - let c2 = c; - assert!(executor - .spawn(async move { - assert!(c2.try_send(1).is_ok()); - }) - .is_ok()); - assert_eq!(c.recv().await, 1); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let c = Channel::::new(); - c.send(1).await; - assert_eq!(c.recv().await, 1); - } - - #[futures_test::test] - async fn senders_sends_wait_until_capacity() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Channel::new()); - assert!(c.try_send(1).is_ok()); - - let c2 = c; - let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); - let c2 = c; - let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); - // Wish I could think of a means of determining that the async send is waiting instead. - // However, I've used the debugger to observe that the send does indeed wait. - Delay::new(Duration::from_millis(500)).await; - assert_eq!(c.recv().await, 1); - assert!(executor - .spawn(async move { - loop { - c.recv().await; - } - }) - .is_ok()); - send_task_1.unwrap().await; - send_task_2.unwrap().await; - } -} diff --git a/embassy-sync/src/channel/pubsub/mod.rs b/embassy-sync/src/channel/pubsub/mod.rs deleted file mode 100644 index f62b4d118..000000000 --- a/embassy-sync/src/channel/pubsub/mod.rs +++ /dev/null @@ -1,542 +0,0 @@ -//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. - -#![deny(missing_docs)] - -use core::cell::RefCell; -use core::fmt::Debug; -use core::task::{Context, Poll, Waker}; - -use heapless::Deque; - -use self::publisher::{ImmediatePub, Pub}; -use self::subscriber::Sub; -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::MultiWakerRegistration; - -pub mod publisher; -pub mod subscriber; - -pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; -pub use subscriber::{DynSubscriber, Subscriber}; - -/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers -/// -/// Any published message can be read by all subscribers. -/// A publisher can choose how it sends its message. -/// -/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. -/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message -/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive -/// an error to indicate that it has lagged. -/// -/// ## Example -/// -/// ``` -/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; -/// # use embassy_sync::channel::pubsub::WaitResult; -/// # use embassy_sync::channel::pubsub::PubSubChannel; -/// # use futures_executor::block_on; -/// # let test = async { -/// // Create the channel. This can be static as well -/// let channel = PubSubChannel::::new(); -/// -/// // This is a generic subscriber with a direct reference to the channel -/// let mut sub0 = channel.subscriber().unwrap(); -/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel -/// let mut sub1 = channel.dyn_subscriber().unwrap(); -/// -/// let pub0 = channel.publisher().unwrap(); -/// -/// // Publish a message, but wait if the queue is full -/// pub0.publish(42).await; -/// -/// // Publish a message, but if the queue is full, just kick out the oldest message. -/// // This may cause some subscribers to miss a message -/// pub0.publish_immediate(43); -/// -/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result -/// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); -/// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); -/// -/// // Wait again, but this time ignore any Lag results -/// assert_eq!(sub0.next_message_pure().await, 43); -/// assert_eq!(sub1.next_message_pure().await, 43); -/// -/// // There's also a polling interface -/// assert_eq!(sub0.try_next_message(), None); -/// assert_eq!(sub1.try_next_message(), None); -/// # }; -/// # -/// # block_on(test); -/// ``` -/// -pub struct PubSubChannel { - inner: Mutex>>, -} - -impl - PubSubChannel -{ - /// Create a new channel - pub const fn new() -> Self { - Self { - inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), - } - } - - /// Create a new subscriber. It will only receive messages that are published after its creation. - /// - /// If there are no subscriber slots left, an error will be returned. - pub fn subscriber(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.subscriber_count >= SUBS { - Err(Error::MaximumSubscribersReached) - } else { - s.subscriber_count += 1; - Ok(Subscriber(Sub::new(s.next_message_id, self))) - } - }) - } - - /// Create a new subscriber. It will only receive messages that are published after its creation. - /// - /// If there are no subscriber slots left, an error will be returned. - pub fn dyn_subscriber(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.subscriber_count >= SUBS { - Err(Error::MaximumSubscribersReached) - } else { - s.subscriber_count += 1; - Ok(DynSubscriber(Sub::new(s.next_message_id, self))) - } - }) - } - - /// Create a new publisher - /// - /// If there are no publisher slots left, an error will be returned. - pub fn publisher(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.publisher_count >= PUBS { - Err(Error::MaximumPublishersReached) - } else { - s.publisher_count += 1; - Ok(Publisher(Pub::new(self))) - } - }) - } - - /// Create a new publisher - /// - /// If there are no publisher slots left, an error will be returned. - pub fn dyn_publisher(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.publisher_count >= PUBS { - Err(Error::MaximumPublishersReached) - } else { - s.publisher_count += 1; - Ok(DynPublisher(Pub::new(self))) - } - }) - } - - /// Create a new publisher that can only send immediate messages. - /// This kind of publisher does not take up a publisher slot. - pub fn immediate_publisher(&self) -> ImmediatePublisher { - ImmediatePublisher(ImmediatePub::new(self)) - } - - /// Create a new publisher that can only send immediate messages. - /// This kind of publisher does not take up a publisher slot. - pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { - DynImmediatePublisher(ImmediatePub::new(self)) - } -} - -impl PubSubBehavior - for PubSubChannel -{ - fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - - // Check if we can read a message - match s.get_message(*next_message_id) { - // Yes, so we are done polling - Some(WaitResult::Message(message)) => { - *next_message_id += 1; - Poll::Ready(WaitResult::Message(message)) - } - // No, so we need to reregister our waker and sleep again - None => { - if let Some(cx) = cx { - s.register_subscriber_waker(cx.waker()); - } - Poll::Pending - } - // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged - Some(WaitResult::Lagged(amount)) => { - *next_message_id += amount; - Poll::Ready(WaitResult::Lagged(amount)) - } - } - }) - } - - fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - // Try to publish the message - match s.try_publish(message) { - // We did it, we are ready - Ok(()) => Ok(()), - // The queue is full, so we need to reregister our waker and go to sleep - Err(message) => { - if let Some(cx) = cx { - s.register_publisher_waker(cx.waker()); - } - Err(message) - } - } - }) - } - - fn publish_immediate(&self, message: T) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.publish_immediate(message) - }) - } - - fn unregister_subscriber(&self, subscriber_next_message_id: u64) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.unregister_subscriber(subscriber_next_message_id) - }) - } - - fn unregister_publisher(&self) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.unregister_publisher() - }) - } -} - -/// Internal state for the PubSub channel -struct PubSubState { - /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it - queue: Deque<(T, usize), CAP>, - /// Every message has an id. - /// Don't worry, we won't run out. - /// If a million messages were published every second, then the ID's would run out in about 584942 years. - next_message_id: u64, - /// Collection of wakers for Subscribers that are waiting. - subscriber_wakers: MultiWakerRegistration, - /// Collection of wakers for Publishers that are waiting. - publisher_wakers: MultiWakerRegistration, - /// The amount of subscribers that are active - subscriber_count: usize, - /// The amount of publishers that are active - publisher_count: usize, -} - -impl PubSubState { - /// Create a new internal channel state - const fn new() -> Self { - Self { - queue: Deque::new(), - next_message_id: 0, - subscriber_wakers: MultiWakerRegistration::new(), - publisher_wakers: MultiWakerRegistration::new(), - subscriber_count: 0, - publisher_count: 0, - } - } - - fn try_publish(&mut self, message: T) -> Result<(), T> { - if self.subscriber_count == 0 { - // We don't need to publish anything because there is no one to receive it - return Ok(()); - } - - if self.queue.is_full() { - return Err(message); - } - // We just did a check for this - self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); - - self.next_message_id += 1; - - // Wake all of the subscribers - self.subscriber_wakers.wake(); - - Ok(()) - } - - fn publish_immediate(&mut self, message: T) { - // Make space in the queue if required - if self.queue.is_full() { - self.queue.pop_front(); - } - - // This will succeed because we made sure there is space - self.try_publish(message).ok().unwrap(); - } - - fn get_message(&mut self, message_id: u64) -> Option> { - let start_id = self.next_message_id - self.queue.len() as u64; - - if message_id < start_id { - return Some(WaitResult::Lagged(start_id - message_id)); - } - - let current_message_index = (message_id - start_id) as usize; - - if current_message_index >= self.queue.len() { - return None; - } - - // We've checked that the index is valid - let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); - - // We're reading this item, so decrement the counter - queue_item.1 -= 1; - let message = queue_item.0.clone(); - - if current_message_index == 0 && queue_item.1 == 0 { - self.queue.pop_front(); - self.publisher_wakers.wake(); - } - - Some(WaitResult::Message(message)) - } - - fn register_subscriber_waker(&mut self, waker: &Waker) { - match self.subscriber_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a subscriber that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.subscriber_wakers.wake(); - self.subscriber_wakers.register(waker).unwrap(); - } - } - } - - fn register_publisher_waker(&mut self, waker: &Waker) { - match self.publisher_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a publisher that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.publisher_wakers.wake(); - self.publisher_wakers.register(waker).unwrap(); - } - } - } - - fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { - self.subscriber_count -= 1; - - // All messages that haven't been read yet by this subscriber must have their counter decremented - let start_id = self.next_message_id - self.queue.len() as u64; - if subscriber_next_message_id >= start_id { - let current_message_index = (subscriber_next_message_id - start_id) as usize; - self.queue - .iter_mut() - .skip(current_message_index) - .for_each(|(_, counter)| *counter -= 1); - } - } - - fn unregister_publisher(&mut self) { - self.publisher_count -= 1; - } -} - -/// Error type for the [PubSubChannel] -#[derive(Debug, PartialEq, Eq, Clone)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum Error { - /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or - /// the capacity of the channels must be increased. - MaximumSubscribersReached, - /// All publisher slots are used. To add another publisher, first another publisher must be dropped or - /// the capacity of the channels must be increased. - MaximumPublishersReached, -} - -/// 'Middle level' behaviour of the pubsub channel. -/// This trait is used so that Sub and Pub can be generic over the channel. -pub trait PubSubBehavior { - /// Try to get a message from the queue with the given message id. - /// - /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. - fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; - - /// Try to publish a message to the queue. - /// - /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. - fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; - - /// Publish a message immediately - fn publish_immediate(&self, message: T); - - /// Let the channel know that a subscriber has dropped - fn unregister_subscriber(&self, subscriber_next_message_id: u64); - - /// Let the channel know that a publisher has dropped - fn unregister_publisher(&self); -} - -/// The result of the subscriber wait procedure -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum WaitResult { - /// The subscriber did not receive all messages and lagged by the given amount of messages. - /// (This is the amount of messages that were missed) - Lagged(u64), - /// A message was received - Message(T), -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::blocking_mutex::raw::NoopRawMutex; - - #[futures_test::test] - async fn dyn_pub_sub_works() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.dyn_subscriber().unwrap(); - let mut sub1 = channel.dyn_subscriber().unwrap(); - let pub0 = channel.dyn_publisher().unwrap(); - - pub0.publish(42).await; - - assert_eq!(sub0.next_message().await, WaitResult::Message(42)); - assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - - assert_eq!(sub0.try_next_message(), None); - assert_eq!(sub1.try_next_message(), None); - } - - #[futures_test::test] - async fn all_subscribers_receive() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.subscriber().unwrap(); - let mut sub1 = channel.subscriber().unwrap(); - let pub0 = channel.publisher().unwrap(); - - pub0.publish(42).await; - - assert_eq!(sub0.next_message().await, WaitResult::Message(42)); - assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - - assert_eq!(sub0.try_next_message(), None); - assert_eq!(sub1.try_next_message(), None); - } - - #[futures_test::test] - async fn lag_when_queue_full_on_immediate_publish() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.subscriber().unwrap(); - let pub0 = channel.publisher().unwrap(); - - pub0.publish_immediate(42); - pub0.publish_immediate(43); - pub0.publish_immediate(44); - pub0.publish_immediate(45); - pub0.publish_immediate(46); - pub0.publish_immediate(47); - - assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); - assert_eq!(sub0.next_message().await, WaitResult::Message(44)); - assert_eq!(sub0.next_message().await, WaitResult::Message(45)); - assert_eq!(sub0.next_message().await, WaitResult::Message(46)); - assert_eq!(sub0.next_message().await, WaitResult::Message(47)); - assert_eq!(sub0.try_next_message(), None); - } - - #[test] - fn limited_subs_and_pubs() { - let channel = PubSubChannel::::new(); - - let sub0 = channel.subscriber(); - let sub1 = channel.subscriber(); - let sub2 = channel.subscriber(); - let sub3 = channel.subscriber(); - let sub4 = channel.subscriber(); - - assert!(sub0.is_ok()); - assert!(sub1.is_ok()); - assert!(sub2.is_ok()); - assert!(sub3.is_ok()); - assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); - - drop(sub0); - - let sub5 = channel.subscriber(); - assert!(sub5.is_ok()); - - // publishers - - let pub0 = channel.publisher(); - let pub1 = channel.publisher(); - let pub2 = channel.publisher(); - let pub3 = channel.publisher(); - let pub4 = channel.publisher(); - - assert!(pub0.is_ok()); - assert!(pub1.is_ok()); - assert!(pub2.is_ok()); - assert!(pub3.is_ok()); - assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); - - drop(pub0); - - let pub5 = channel.publisher(); - assert!(pub5.is_ok()); - } - - #[test] - fn publisher_wait_on_full_queue() { - let channel = PubSubChannel::::new(); - - let pub0 = channel.publisher().unwrap(); - - // There are no subscribers, so the queue will never be full - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - - let sub0 = channel.subscriber().unwrap(); - - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Err(0)); - - drop(sub0); - } -} diff --git a/embassy-sync/src/channel/pubsub/publisher.rs b/embassy-sync/src/channel/pubsub/publisher.rs deleted file mode 100644 index 705797f60..000000000 --- a/embassy-sync/src/channel/pubsub/publisher.rs +++ /dev/null @@ -1,182 +0,0 @@ -//! Implementation of anything directly publisher related - -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use super::{PubSubBehavior, PubSubChannel}; -use crate::blocking_mutex::raw::RawMutex; - -/// A publisher to a channel -pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { - pub(super) fn new(channel: &'a PSB) -> Self { - Self { - channel, - _phantom: Default::default(), - } - } - - /// Publish a message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { - PublisherWaitFuture { - message: Some(message), - publisher: self, - } - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_publisher() - } -} - -/// A publisher that holds a dynamic reference to the channel -pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynPublisher<'a, T> { - type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that holds a generic reference to the channel -pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) Pub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Pub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. -/// (So an infinite amount is possible) -pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { - pub(super) fn new(channel: &'a PSB) -> Self { - Self { - channel, - _phantom: Default::default(), - } - } - /// Publish the message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -/// An immediate publisher that holds a dynamic reference to the channel -pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { - type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// An immediate publisher that holds a generic reference to the channel -pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) ImmediatePub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = ImmediatePub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message we need to publish - message: Option, - publisher: &'s Pub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let message = self.message.take().unwrap(); - match self.publisher.channel.publish_with_context(message, Some(cx)) { - Ok(()) => Poll::Ready(()), - Err(message) => { - self.message = Some(message); - Poll::Pending - } - } - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-sync/src/channel/pubsub/subscriber.rs b/embassy-sync/src/channel/pubsub/subscriber.rs deleted file mode 100644 index b9a2cbe18..000000000 --- a/embassy-sync/src/channel/pubsub/subscriber.rs +++ /dev/null @@ -1,152 +0,0 @@ -//! Implementation of anything directly subscriber related - -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use super::{PubSubBehavior, PubSubChannel, WaitResult}; -use crate::blocking_mutex::raw::RawMutex; - -/// A subscriber to a channel -pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message id of the next message we are yet to receive - next_message_id: u64, - /// The channel we are a subscriber to - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { - pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { - Self { - next_message_id, - channel, - _phantom: Default::default(), - } - } - - /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { - SubscriberWaitFuture { subscriber: self } - } - - /// Wait for a published message (ignoring lag results) - pub async fn next_message_pure(&mut self) -> T { - loop { - match self.next_message().await { - WaitResult::Lagged(_) => continue, - WaitResult::Message(message) => break message, - } - } - } - - /// Try to see if there's a published message we haven't received yet. - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message(&mut self) -> Option> { - match self.channel.get_message_with_context(&mut self.next_message_id, None) { - Poll::Ready(result) => Some(result), - Poll::Pending => None, - } - } - - /// Try to see if there's a published message we haven't received yet (ignoring lag results). - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message_pure(&mut self) -> Option { - loop { - match self.try_next_message() { - Some(WaitResult::Lagged(_)) => continue, - Some(WaitResult::Message(message)) => break Some(message), - None => break None, - } - } - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_subscriber(self.next_message_id) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} - -/// Warning: The stream implementation ignores lag results and returns all messages. -/// This might miss some messages without you knowing it. -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self - .channel - .get_message_with_context(&mut self.next_message_id, Some(cx)) - { - Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), - Poll::Ready(WaitResult::Lagged(_)) => { - cx.waker().wake_by_ref(); - Poll::Pending - } - Poll::Pending => Poll::Pending, - } - } -} - -/// A subscriber that holds a dynamic reference to the channel -pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { - type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A subscriber that holds a generic reference to the channel -pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) Sub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Sub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - subscriber: &'s mut Sub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { - type Output = WaitResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.subscriber - .channel - .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-sync/src/channel/signal.rs b/embassy-sync/src/channel/signal.rs deleted file mode 100644 index 9279266c1..000000000 --- a/embassy-sync/src/channel/signal.rs +++ /dev/null @@ -1,100 +0,0 @@ -//! A synchronization primitive for passing the latest value to a task. -use core::cell::UnsafeCell; -use core::future::Future; -use core::mem; -use core::task::{Context, Poll, Waker}; - -/// Single-slot signaling primitive. -/// -/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except -/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead -/// of waiting for the receiver to pop the previous value. -/// -/// It is useful for sending data between tasks when the receiver only cares about -/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" -/// updates. -/// -/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. -/// -/// Signals are generally declared as `static`s and then borrowed as required. -/// -/// ``` -/// use embassy_sync::channel::signal::Signal; -/// -/// enum SomeCommand { -/// On, -/// Off, -/// } -/// -/// static SOME_SIGNAL: Signal = Signal::new(); -/// ``` -pub struct Signal { - state: UnsafeCell>, -} - -enum State { - None, - Waiting(Waker), - Signaled(T), -} - -unsafe impl Send for Signal {} -unsafe impl Sync for Signal {} - -impl Signal { - /// Create a new `Signal`. - pub const fn new() -> Self { - Self { - state: UnsafeCell::new(State::None), - } - } -} - -impl Signal { - /// Mark this Signal as signaled. - pub fn signal(&self, val: T) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { - waker.wake(); - } - }) - } - - /// Remove the queued value in this `Signal`, if any. - pub fn reset(&self) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - *state = State::None - }) - } - - /// Manually poll the Signal future. - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - match state { - State::None => { - *state = State::Waiting(cx.waker().clone()); - Poll::Pending - } - State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, - State::Waiting(_) => panic!("waker overflow"), - State::Signaled(_) => match mem::replace(state, State::None) { - State::Signaled(res) => Poll::Ready(res), - _ => unreachable!(), - }, - } - }) - } - - /// Future that completes when this Signal has been signaled. - pub fn wait(&self) -> impl Future + '_ { - futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) - } - - /// non-blocking method to check whether this signal has been signaled. - pub fn signaled(&self) -> bool { - critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) - } -} diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 7d8815903..8e81e5cbe 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -14,4 +14,6 @@ pub mod blocking_mutex; pub mod channel; pub mod mutex; pub mod pipe; +pub mod pubsub; +pub mod signal; pub mod waitqueue; diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs new file mode 100644 index 000000000..62a9e4763 --- /dev/null +++ b/embassy-sync/src/pubsub/mod.rs @@ -0,0 +1,542 @@ +//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. + +#![deny(missing_docs)] + +use core::cell::RefCell; +use core::fmt::Debug; +use core::task::{Context, Poll, Waker}; + +use heapless::Deque; + +use self::publisher::{ImmediatePub, Pub}; +use self::subscriber::Sub; +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::MultiWakerRegistration; + +pub mod publisher; +pub mod subscriber; + +pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; +pub use subscriber::{DynSubscriber, Subscriber}; + +/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers +/// +/// Any published message can be read by all subscribers. +/// A publisher can choose how it sends its message. +/// +/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. +/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message +/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive +/// an error to indicate that it has lagged. +/// +/// ## Example +/// +/// ``` +/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex; +/// # use embassy_sync::pubsub::WaitResult; +/// # use embassy_sync::pubsub::PubSubChannel; +/// # use futures_executor::block_on; +/// # let test = async { +/// // Create the channel. This can be static as well +/// let channel = PubSubChannel::::new(); +/// +/// // This is a generic subscriber with a direct reference to the channel +/// let mut sub0 = channel.subscriber().unwrap(); +/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel +/// let mut sub1 = channel.dyn_subscriber().unwrap(); +/// +/// let pub0 = channel.publisher().unwrap(); +/// +/// // Publish a message, but wait if the queue is full +/// pub0.publish(42).await; +/// +/// // Publish a message, but if the queue is full, just kick out the oldest message. +/// // This may cause some subscribers to miss a message +/// pub0.publish_immediate(43); +/// +/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result +/// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); +/// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); +/// +/// // Wait again, but this time ignore any Lag results +/// assert_eq!(sub0.next_message_pure().await, 43); +/// assert_eq!(sub1.next_message_pure().await, 43); +/// +/// // There's also a polling interface +/// assert_eq!(sub0.try_next_message(), None); +/// assert_eq!(sub1.try_next_message(), None); +/// # }; +/// # +/// # block_on(test); +/// ``` +/// +pub struct PubSubChannel { + inner: Mutex>>, +} + +impl + PubSubChannel +{ + /// Create a new channel + pub const fn new() -> Self { + Self { + inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), + } + } + + /// Create a new subscriber. It will only receive messages that are published after its creation. + /// + /// If there are no subscriber slots left, an error will be returned. + pub fn subscriber(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.subscriber_count >= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(Subscriber(Sub::new(s.next_message_id, self))) + } + }) + } + + /// Create a new subscriber. It will only receive messages that are published after its creation. + /// + /// If there are no subscriber slots left, an error will be returned. + pub fn dyn_subscriber(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.subscriber_count >= SUBS { + Err(Error::MaximumSubscribersReached) + } else { + s.subscriber_count += 1; + Ok(DynSubscriber(Sub::new(s.next_message_id, self))) + } + }) + } + + /// Create a new publisher + /// + /// If there are no publisher slots left, an error will be returned. + pub fn publisher(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.publisher_count >= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(Publisher(Pub::new(self))) + } + }) + } + + /// Create a new publisher + /// + /// If there are no publisher slots left, an error will be returned. + pub fn dyn_publisher(&self) -> Result, Error> { + self.inner.lock(|inner| { + let mut s = inner.borrow_mut(); + + if s.publisher_count >= PUBS { + Err(Error::MaximumPublishersReached) + } else { + s.publisher_count += 1; + Ok(DynPublisher(Pub::new(self))) + } + }) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn immediate_publisher(&self) -> ImmediatePublisher { + ImmediatePublisher(ImmediatePub::new(self)) + } + + /// Create a new publisher that can only send immediate messages. + /// This kind of publisher does not take up a publisher slot. + pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { + DynImmediatePublisher(ImmediatePub::new(self)) + } +} + +impl PubSubBehavior + for PubSubChannel +{ + fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + + // Check if we can read a message + match s.get_message(*next_message_id) { + // Yes, so we are done polling + Some(WaitResult::Message(message)) => { + *next_message_id += 1; + Poll::Ready(WaitResult::Message(message)) + } + // No, so we need to reregister our waker and sleep again + None => { + if let Some(cx) = cx { + s.register_subscriber_waker(cx.waker()); + } + Poll::Pending + } + // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged + Some(WaitResult::Lagged(amount)) => { + *next_message_id += amount; + Poll::Ready(WaitResult::Lagged(amount)) + } + } + }) + } + + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + // Try to publish the message + match s.try_publish(message) { + // We did it, we are ready + Ok(()) => Ok(()), + // The queue is full, so we need to reregister our waker and go to sleep + Err(message) => { + if let Some(cx) = cx { + s.register_publisher_waker(cx.waker()); + } + Err(message) + } + } + }) + } + + fn publish_immediate(&self, message: T) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.publish_immediate(message) + }) + } + + fn unregister_subscriber(&self, subscriber_next_message_id: u64) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.unregister_subscriber(subscriber_next_message_id) + }) + } + + fn unregister_publisher(&self) { + self.inner.lock(|s| { + let mut s = s.borrow_mut(); + s.unregister_publisher() + }) + } +} + +/// Internal state for the PubSub channel +struct PubSubState { + /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it + queue: Deque<(T, usize), CAP>, + /// Every message has an id. + /// Don't worry, we won't run out. + /// If a million messages were published every second, then the ID's would run out in about 584942 years. + next_message_id: u64, + /// Collection of wakers for Subscribers that are waiting. + subscriber_wakers: MultiWakerRegistration, + /// Collection of wakers for Publishers that are waiting. + publisher_wakers: MultiWakerRegistration, + /// The amount of subscribers that are active + subscriber_count: usize, + /// The amount of publishers that are active + publisher_count: usize, +} + +impl PubSubState { + /// Create a new internal channel state + const fn new() -> Self { + Self { + queue: Deque::new(), + next_message_id: 0, + subscriber_wakers: MultiWakerRegistration::new(), + publisher_wakers: MultiWakerRegistration::new(), + subscriber_count: 0, + publisher_count: 0, + } + } + + fn try_publish(&mut self, message: T) -> Result<(), T> { + if self.subscriber_count == 0 { + // We don't need to publish anything because there is no one to receive it + return Ok(()); + } + + if self.queue.is_full() { + return Err(message); + } + // We just did a check for this + self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); + + self.next_message_id += 1; + + // Wake all of the subscribers + self.subscriber_wakers.wake(); + + Ok(()) + } + + fn publish_immediate(&mut self, message: T) { + // Make space in the queue if required + if self.queue.is_full() { + self.queue.pop_front(); + } + + // This will succeed because we made sure there is space + self.try_publish(message).ok().unwrap(); + } + + fn get_message(&mut self, message_id: u64) -> Option> { + let start_id = self.next_message_id - self.queue.len() as u64; + + if message_id < start_id { + return Some(WaitResult::Lagged(start_id - message_id)); + } + + let current_message_index = (message_id - start_id) as usize; + + if current_message_index >= self.queue.len() { + return None; + } + + // We've checked that the index is valid + let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); + + // We're reading this item, so decrement the counter + queue_item.1 -= 1; + let message = queue_item.0.clone(); + + if current_message_index == 0 && queue_item.1 == 0 { + self.queue.pop_front(); + self.publisher_wakers.wake(); + } + + Some(WaitResult::Message(message)) + } + + fn register_subscriber_waker(&mut self, waker: &Waker) { + match self.subscriber_wakers.register(waker) { + Ok(()) => {} + Err(_) => { + // All waker slots were full. This can only happen when there was a subscriber that now has dropped. + // We need to throw it away. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.subscriber_wakers.wake(); + self.subscriber_wakers.register(waker).unwrap(); + } + } + } + + fn register_publisher_waker(&mut self, waker: &Waker) { + match self.publisher_wakers.register(waker) { + Ok(()) => {} + Err(_) => { + // All waker slots were full. This can only happen when there was a publisher that now has dropped. + // We need to throw it away. It's a bit inefficient, but we can wake everything. + // Any future that is still active will simply reregister. + // This won't happen a lot, so it's ok. + self.publisher_wakers.wake(); + self.publisher_wakers.register(waker).unwrap(); + } + } + } + + fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { + self.subscriber_count -= 1; + + // All messages that haven't been read yet by this subscriber must have their counter decremented + let start_id = self.next_message_id - self.queue.len() as u64; + if subscriber_next_message_id >= start_id { + let current_message_index = (subscriber_next_message_id - start_id) as usize; + self.queue + .iter_mut() + .skip(current_message_index) + .for_each(|(_, counter)| *counter -= 1); + } + } + + fn unregister_publisher(&mut self) { + self.publisher_count -= 1; + } +} + +/// Error type for the [PubSubChannel] +#[derive(Debug, PartialEq, Eq, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum Error { + /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or + /// the capacity of the channels must be increased. + MaximumSubscribersReached, + /// All publisher slots are used. To add another publisher, first another publisher must be dropped or + /// the capacity of the channels must be increased. + MaximumPublishersReached, +} + +/// 'Middle level' behaviour of the pubsub channel. +/// This trait is used so that Sub and Pub can be generic over the channel. +pub trait PubSubBehavior { + /// Try to get a message from the queue with the given message id. + /// + /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. + fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; + + /// Try to publish a message to the queue. + /// + /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. + fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; + + /// Publish a message immediately + fn publish_immediate(&self, message: T); + + /// Let the channel know that a subscriber has dropped + fn unregister_subscriber(&self, subscriber_next_message_id: u64); + + /// Let the channel know that a publisher has dropped + fn unregister_publisher(&self); +} + +/// The result of the subscriber wait procedure +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum WaitResult { + /// The subscriber did not receive all messages and lagged by the given amount of messages. + /// (This is the amount of messages that were missed) + Lagged(u64), + /// A message was received + Message(T), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blocking_mutex::raw::NoopRawMutex; + + #[futures_test::test] + async fn dyn_pub_sub_works() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.dyn_subscriber().unwrap(); + let mut sub1 = channel.dyn_subscriber().unwrap(); + let pub0 = channel.dyn_publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); + + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); + } + + #[futures_test::test] + async fn all_subscribers_receive() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + pub0.publish(42).await; + + assert_eq!(sub0.next_message().await, WaitResult::Message(42)); + assert_eq!(sub1.next_message().await, WaitResult::Message(42)); + + assert_eq!(sub0.try_next_message(), None); + assert_eq!(sub1.try_next_message(), None); + } + + #[futures_test::test] + async fn lag_when_queue_full_on_immediate_publish() { + let channel = PubSubChannel::::new(); + + let mut sub0 = channel.subscriber().unwrap(); + let pub0 = channel.publisher().unwrap(); + + pub0.publish_immediate(42); + pub0.publish_immediate(43); + pub0.publish_immediate(44); + pub0.publish_immediate(45); + pub0.publish_immediate(46); + pub0.publish_immediate(47); + + assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); + assert_eq!(sub0.next_message().await, WaitResult::Message(44)); + assert_eq!(sub0.next_message().await, WaitResult::Message(45)); + assert_eq!(sub0.next_message().await, WaitResult::Message(46)); + assert_eq!(sub0.next_message().await, WaitResult::Message(47)); + assert_eq!(sub0.try_next_message(), None); + } + + #[test] + fn limited_subs_and_pubs() { + let channel = PubSubChannel::::new(); + + let sub0 = channel.subscriber(); + let sub1 = channel.subscriber(); + let sub2 = channel.subscriber(); + let sub3 = channel.subscriber(); + let sub4 = channel.subscriber(); + + assert!(sub0.is_ok()); + assert!(sub1.is_ok()); + assert!(sub2.is_ok()); + assert!(sub3.is_ok()); + assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); + + drop(sub0); + + let sub5 = channel.subscriber(); + assert!(sub5.is_ok()); + + // publishers + + let pub0 = channel.publisher(); + let pub1 = channel.publisher(); + let pub2 = channel.publisher(); + let pub3 = channel.publisher(); + let pub4 = channel.publisher(); + + assert!(pub0.is_ok()); + assert!(pub1.is_ok()); + assert!(pub2.is_ok()); + assert!(pub3.is_ok()); + assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); + + drop(pub0); + + let pub5 = channel.publisher(); + assert!(pub5.is_ok()); + } + + #[test] + fn publisher_wait_on_full_queue() { + let channel = PubSubChannel::::new(); + + let pub0 = channel.publisher().unwrap(); + + // There are no subscribers, so the queue will never be full + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + + let sub0 = channel.subscriber().unwrap(); + + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Ok(())); + assert_eq!(pub0.try_publish(0), Err(0)); + + drop(sub0); + } +} diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs new file mode 100644 index 000000000..705797f60 --- /dev/null +++ b/embassy-sync/src/pubsub/publisher.rs @@ -0,0 +1,182 @@ +//! Implementation of anything directly publisher related + +use core::future::Future; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use super::{PubSubBehavior, PubSubChannel}; +use crate::blocking_mutex::raw::RawMutex; + +/// A publisher to a channel +pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { + pub(super) fn new(channel: &'a PSB) -> Self { + Self { + channel, + _phantom: Default::default(), + } + } + + /// Publish a message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message + pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { + PublisherWaitFuture { + message: Some(message), + publisher: self, + } + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.publish_with_context(message, None) + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { + fn drop(&mut self) { + self.channel.unregister_publisher() + } +} + +/// A publisher that holds a dynamic reference to the channel +pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynPublisher<'a, T> { + type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that holds a generic reference to the channel +pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) Pub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Pub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Publisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. +/// (So an infinite amount is possible) +pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The channel we are a publisher for + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { + pub(super) fn new(channel: &'a PSB) -> Self { + Self { + channel, + _phantom: Default::default(), + } + } + /// Publish the message right now even when the queue is full. + /// This may cause a subscriber to miss an older message. + pub fn publish_immediate(&self, message: T) { + self.channel.publish_immediate(message) + } + + /// Publish a message if there is space in the message queue + pub fn try_publish(&self, message: T) -> Result<(), T> { + self.channel.publish_with_context(message, None) + } +} + +/// An immediate publisher that holds a dynamic reference to the channel +pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { + type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// An immediate publisher that holds a generic reference to the channel +pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) ImmediatePub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = ImmediatePub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the publisher wait action +pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The message we need to publish + message: Option, + publisher: &'s Pub<'a, PSB, T>, +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let message = self.message.take().unwrap(); + match self.publisher.channel.publish_with_context(message, Some(cx)) { + Ok(()) => Poll::Ready(()), + Err(message) => { + self.message = Some(message); + Poll::Pending + } + } + } +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs new file mode 100644 index 000000000..b9a2cbe18 --- /dev/null +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -0,0 +1,152 @@ +//! Implementation of anything directly subscriber related + +use core::future::Future; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use super::{PubSubBehavior, PubSubChannel, WaitResult}; +use crate::blocking_mutex::raw::RawMutex; + +/// A subscriber to a channel +pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + /// The message id of the next message we are yet to receive + next_message_id: u64, + /// The channel we are a subscriber to + channel: &'a PSB, + _phantom: PhantomData, +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { + pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { + Self { + next_message_id, + channel, + _phantom: Default::default(), + } + } + + /// Wait for a published message + pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { + SubscriberWaitFuture { subscriber: self } + } + + /// Wait for a published message (ignoring lag results) + pub async fn next_message_pure(&mut self) -> T { + loop { + match self.next_message().await { + WaitResult::Lagged(_) => continue, + WaitResult::Message(message) => break message, + } + } + } + + /// Try to see if there's a published message we haven't received yet. + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message(&mut self) -> Option> { + match self.channel.get_message_with_context(&mut self.next_message_id, None) { + Poll::Ready(result) => Some(result), + Poll::Pending => None, + } + } + + /// Try to see if there's a published message we haven't received yet (ignoring lag results). + /// + /// This function does not peek. The message is received if there is one. + pub fn try_next_message_pure(&mut self) -> Option { + loop { + match self.try_next_message() { + Some(WaitResult::Lagged(_)) => continue, + Some(WaitResult::Message(message)) => break Some(message), + None => break None, + } + } + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { + fn drop(&mut self) { + self.channel.unregister_subscriber(self.next_message_id) + } +} + +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} + +/// Warning: The stream implementation ignores lag results and returns all messages. +/// This might miss some messages without you knowing it. +impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self + .channel + .get_message_with_context(&mut self.next_message_id, Some(cx)) + { + Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), + Poll::Ready(WaitResult::Lagged(_)) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } +} + +/// A subscriber that holds a dynamic reference to the channel +pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior + 'a, T>); + +impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { + type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A subscriber that holds a generic reference to the channel +pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( + pub(super) Sub<'a, PubSubChannel, T>, +); + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + type Target = Sub<'a, PubSubChannel, T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut + for Subscriber<'a, M, T, CAP, SUBS, PUBS> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the subscriber wait action +pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { + subscriber: &'s mut Sub<'a, PSB, T>, +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { + type Output = WaitResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.subscriber + .channel + .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) + } +} + +impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs new file mode 100644 index 000000000..3f665e388 --- /dev/null +++ b/embassy-sync/src/signal.rs @@ -0,0 +1,100 @@ +//! A synchronization primitive for passing the latest value to a task. +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem; +use core::task::{Context, Poll, Waker}; + +/// Single-slot signaling primitive. +/// +/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except +/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead +/// of waiting for the receiver to pop the previous value. +/// +/// It is useful for sending data between tasks when the receiver only cares about +/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" +/// updates. +/// +/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. +/// +/// Signals are generally declared as `static`s and then borrowed as required. +/// +/// ``` +/// use embassy_sync::signal::Signal; +/// +/// enum SomeCommand { +/// On, +/// Off, +/// } +/// +/// static SOME_SIGNAL: Signal = Signal::new(); +/// ``` +pub struct Signal { + state: UnsafeCell>, +} + +enum State { + None, + Waiting(Waker), + Signaled(T), +} + +unsafe impl Send for Signal {} +unsafe impl Sync for Signal {} + +impl Signal { + /// Create a new `Signal`. + pub const fn new() -> Self { + Self { + state: UnsafeCell::new(State::None), + } + } +} + +impl Signal { + /// Mark this Signal as signaled. + pub fn signal(&self, val: T) { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { + waker.wake(); + } + }) + } + + /// Remove the queued value in this `Signal`, if any. + pub fn reset(&self) { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + *state = State::None + }) + } + + /// Manually poll the Signal future. + pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { + critical_section::with(|_| unsafe { + let state = &mut *self.state.get(); + match state { + State::None => { + *state = State::Waiting(cx.waker().clone()); + Poll::Pending + } + State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, + State::Waiting(_) => panic!("waker overflow"), + State::Signaled(_) => match mem::replace(state, State::None) { + State::Signaled(res) => Poll::Ready(res), + _ => unreachable!(), + }, + } + }) + } + + /// Future that completes when this Signal has been signaled. + pub fn wait(&self) -> impl Future + '_ { + futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) + } + + /// non-blocking method to check whether this signal has been signaled. + pub fn signaled(&self) -> bool { + critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) + } +} -- cgit