From 21072bee48ff6ec19b79e0d9527ad8cc34a4e9e0 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 22 Aug 2022 21:46:09 +0200 Subject: split `embassy-util` into `embassy-futures`, `embassy-sync`. --- embassy-util/src/blocking_mutex/mod.rs | 189 -------- embassy-util/src/blocking_mutex/raw.rs | 149 ------- embassy-util/src/channel/mod.rs | 5 - embassy-util/src/channel/mpmc.rs | 596 -------------------------- embassy-util/src/channel/pubsub/mod.rs | 542 ----------------------- embassy-util/src/channel/pubsub/publisher.rs | 182 -------- embassy-util/src/channel/pubsub/subscriber.rs | 152 ------- embassy-util/src/channel/signal.rs | 100 ----- embassy-util/src/fmt.rs | 228 ---------- embassy-util/src/lib.rs | 23 - embassy-util/src/mutex.rs | 167 -------- embassy-util/src/pipe.rs | 551 ------------------------ embassy-util/src/ring_buffer.rs | 146 ------- embassy-util/src/select.rs | 230 ---------- embassy-util/src/waitqueue/mod.rs | 7 - embassy-util/src/waitqueue/multi_waker.rs | 33 -- embassy-util/src/waitqueue/waker.rs | 92 ---- embassy-util/src/yield_now.rs | 25 -- 18 files changed, 3417 deletions(-) delete mode 100644 embassy-util/src/blocking_mutex/mod.rs delete mode 100644 embassy-util/src/blocking_mutex/raw.rs delete mode 100644 embassy-util/src/channel/mod.rs delete mode 100644 embassy-util/src/channel/mpmc.rs delete mode 100644 embassy-util/src/channel/pubsub/mod.rs delete mode 100644 embassy-util/src/channel/pubsub/publisher.rs delete mode 100644 embassy-util/src/channel/pubsub/subscriber.rs delete mode 100644 embassy-util/src/channel/signal.rs delete mode 100644 embassy-util/src/fmt.rs delete mode 100644 embassy-util/src/lib.rs delete mode 100644 embassy-util/src/mutex.rs delete mode 100644 embassy-util/src/pipe.rs delete mode 100644 embassy-util/src/ring_buffer.rs delete mode 100644 embassy-util/src/select.rs delete mode 100644 embassy-util/src/waitqueue/mod.rs delete mode 100644 embassy-util/src/waitqueue/multi_waker.rs delete mode 100644 embassy-util/src/waitqueue/waker.rs delete mode 100644 embassy-util/src/yield_now.rs (limited to 'embassy-util/src') diff --git a/embassy-util/src/blocking_mutex/mod.rs b/embassy-util/src/blocking_mutex/mod.rs deleted file mode 100644 index 8a4a4c642..000000000 --- a/embassy-util/src/blocking_mutex/mod.rs +++ /dev/null @@ -1,189 +0,0 @@ -//! Blocking mutex. -//! -//! This module provides a blocking mutex that can be used to synchronize data. -pub mod raw; - -use core::cell::UnsafeCell; - -use self::raw::RawMutex; - -/// Blocking mutex (not async) -/// -/// Provides a blocking mutual exclusion primitive backed by an implementation of [`raw::RawMutex`]. -/// -/// Which implementation you select depends on the context in which you're using the mutex, and you can choose which kind -/// of interior mutability fits your use case. -/// -/// Use [`CriticalSectionMutex`] when data can be shared between threads and interrupts. -/// -/// Use [`NoopMutex`] when data is only shared between tasks running on the same executor. -/// -/// Use [`ThreadModeMutex`] when data is shared between tasks running on the same executor but you want a global singleton. -/// -/// In all cases, the blocking mutex is intended to be short lived and not held across await points. -/// Use the async [`Mutex`](crate::mutex::Mutex) if you need a lock that is held across await points. -pub struct Mutex { - // NOTE: `raw` must be FIRST, so when using ThreadModeMutex the "can't drop in non-thread-mode" gets - // to run BEFORE dropping `data`. - raw: R, - data: UnsafeCell, -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -impl Mutex { - /// Creates a new mutex in an unlocked state ready for use. - #[inline] - pub const fn new(val: T) -> Mutex { - Mutex { - raw: R::INIT, - data: UnsafeCell::new(val), - } - } - - /// Creates a critical section and grants temporary access to the protected data. - pub fn lock(&self, f: impl FnOnce(&T) -> U) -> U { - self.raw.lock(|| { - let ptr = self.data.get() as *const T; - let inner = unsafe { &*ptr }; - f(inner) - }) - } -} - -impl Mutex { - /// Creates a new mutex based on a pre-existing raw mutex. - /// - /// This allows creating a mutex in a constant context on stable Rust. - #[inline] - pub const fn const_new(raw_mutex: R, val: T) -> Mutex { - Mutex { - raw: raw_mutex, - data: UnsafeCell::new(val), - } - } - - /// Consumes this mutex, returning the underlying data. - #[inline] - pub fn into_inner(self) -> T { - self.data.into_inner() - } - - /// Returns a mutable reference to the underlying data. - /// - /// Since this call borrows the `Mutex` mutably, no actual locking needs to - /// take place---the mutable borrow statically guarantees no locks exist. - #[inline] - pub fn get_mut(&mut self) -> &mut T { - unsafe { &mut *self.data.get() } - } -} - -/// A mutex that allows borrowing data across executors and interrupts. -/// -/// # Safety -/// -/// This mutex is safe to share between different executors and interrupts. -pub type CriticalSectionMutex = Mutex; - -/// A mutex that allows borrowing data in the context of a single executor. -/// -/// # Safety -/// -/// **This Mutex is only safe within a single executor.** -pub type NoopMutex = Mutex; - -impl Mutex { - /// Borrows the data for the duration of the critical section - pub fn borrow<'cs>(&'cs self, _cs: critical_section::CriticalSection<'cs>) -> &'cs T { - let ptr = self.data.get() as *const T; - unsafe { &*ptr } - } -} - -impl Mutex { - /// Borrows the data - pub fn borrow(&self) -> &T { - let ptr = self.data.get() as *const T; - unsafe { &*ptr } - } -} - -// ThreadModeMutex does NOT use the generic mutex from above because it's special: -// it's Send+Sync even if T: !Send. There's no way to do that without specialization (I think?). -// -// There's still a ThreadModeRawMutex for use with the generic Mutex (handy with Channel, for example), -// but that will require T: Send even though it shouldn't be needed. - -#[cfg(any(cortex_m, feature = "std"))] -pub use thread_mode_mutex::*; -#[cfg(any(cortex_m, feature = "std"))] -mod thread_mode_mutex { - use super::*; - - /// A "mutex" that only allows borrowing from thread mode. - /// - /// # Safety - /// - /// **This Mutex is only safe on single-core systems.** - /// - /// On multi-core systems, a `ThreadModeMutex` **is not sufficient** to ensure exclusive access. - pub struct ThreadModeMutex { - inner: UnsafeCell, - } - - // NOTE: ThreadModeMutex only allows borrowing from one execution context ever: thread mode. - // Therefore it cannot be used to send non-sendable stuff between execution contexts, so it can - // be Send+Sync even if T is not Send (unlike CriticalSectionMutex) - unsafe impl Sync for ThreadModeMutex {} - unsafe impl Send for ThreadModeMutex {} - - impl ThreadModeMutex { - /// Creates a new mutex - pub const fn new(value: T) -> Self { - ThreadModeMutex { - inner: UnsafeCell::new(value), - } - } - } - - impl ThreadModeMutex { - /// Lock the `ThreadModeMutex`, granting access to the data. - /// - /// # Panics - /// - /// This will panic if not currently running in thread mode. - pub fn lock(&self, f: impl FnOnce(&T) -> R) -> R { - f(self.borrow()) - } - - /// Borrows the data - /// - /// # Panics - /// - /// This will panic if not currently running in thread mode. - pub fn borrow(&self) -> &T { - assert!( - raw::in_thread_mode(), - "ThreadModeMutex can only be borrowed from thread mode." - ); - unsafe { &*self.inner.get() } - } - } - - impl Drop for ThreadModeMutex { - fn drop(&mut self) { - // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so - // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if - // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, - // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. - assert!( - raw::in_thread_mode(), - "ThreadModeMutex can only be dropped from thread mode." - ); - - // Drop of the inner `T` happens after this. - } - } -} diff --git a/embassy-util/src/blocking_mutex/raw.rs b/embassy-util/src/blocking_mutex/raw.rs deleted file mode 100644 index 15796f1b2..000000000 --- a/embassy-util/src/blocking_mutex/raw.rs +++ /dev/null @@ -1,149 +0,0 @@ -//! Mutex primitives. -//! -//! This module provides a trait for mutexes that can be used in different contexts. -use core::marker::PhantomData; - -/// Raw mutex trait. -/// -/// This mutex is "raw", which means it does not actually contain the protected data, it -/// just implements the mutex mechanism. For most uses you should use [`super::Mutex`] instead, -/// which is generic over a RawMutex and contains the protected data. -/// -/// Note that, unlike other mutexes, implementations only guarantee no -/// concurrent access from other threads: concurrent access from the current -/// thread is allwed. For example, it's possible to lock the same mutex multiple times reentrantly. -/// -/// Therefore, locking a `RawMutex` is only enough to guarantee safe shared (`&`) access -/// to the data, it is not enough to guarantee exclusive (`&mut`) access. -/// -/// # Safety -/// -/// RawMutex implementations must ensure that, while locked, no other thread can lock -/// the RawMutex concurrently. -/// -/// Unsafe code is allowed to rely on this fact, so incorrect implementations will cause undefined behavior. -pub unsafe trait RawMutex { - /// Create a new `RawMutex` instance. - /// - /// This is a const instead of a method to allow creating instances in const context. - const INIT: Self; - - /// Lock this `RawMutex`. - fn lock(&self, f: impl FnOnce() -> R) -> R; -} - -/// A mutex that allows borrowing data across executors and interrupts. -/// -/// # Safety -/// -/// This mutex is safe to share between different executors and interrupts. -pub struct CriticalSectionRawMutex { - _phantom: PhantomData<()>, -} -unsafe impl Send for CriticalSectionRawMutex {} -unsafe impl Sync for CriticalSectionRawMutex {} - -impl CriticalSectionRawMutex { - /// Create a new `CriticalSectionRawMutex`. - pub const fn new() -> Self { - Self { _phantom: PhantomData } - } -} - -unsafe impl RawMutex for CriticalSectionRawMutex { - const INIT: Self = Self::new(); - - fn lock(&self, f: impl FnOnce() -> R) -> R { - critical_section::with(|_| f()) - } -} - -// ================ - -/// A mutex that allows borrowing data in the context of a single executor. -/// -/// # Safety -/// -/// **This Mutex is only safe within a single executor.** -pub struct NoopRawMutex { - _phantom: PhantomData<*mut ()>, -} - -unsafe impl Send for NoopRawMutex {} - -impl NoopRawMutex { - /// Create a new `NoopRawMutex`. - pub const fn new() -> Self { - Self { _phantom: PhantomData } - } -} - -unsafe impl RawMutex for NoopRawMutex { - const INIT: Self = Self::new(); - fn lock(&self, f: impl FnOnce() -> R) -> R { - f() - } -} - -// ================ - -#[cfg(any(cortex_m, feature = "std"))] -mod thread_mode { - use super::*; - - /// A "mutex" that only allows borrowing from thread mode. - /// - /// # Safety - /// - /// **This Mutex is only safe on single-core systems.** - /// - /// On multi-core systems, a `ThreadModeRawMutex` **is not sufficient** to ensure exclusive access. - pub struct ThreadModeRawMutex { - _phantom: PhantomData<()>, - } - - unsafe impl Send for ThreadModeRawMutex {} - unsafe impl Sync for ThreadModeRawMutex {} - - impl ThreadModeRawMutex { - /// Create a new `ThreadModeRawMutex`. - pub const fn new() -> Self { - Self { _phantom: PhantomData } - } - } - - unsafe impl RawMutex for ThreadModeRawMutex { - const INIT: Self = Self::new(); - fn lock(&self, f: impl FnOnce() -> R) -> R { - assert!(in_thread_mode(), "ThreadModeMutex can only be locked from thread mode."); - - f() - } - } - - impl Drop for ThreadModeRawMutex { - fn drop(&mut self) { - // Only allow dropping from thread mode. Dropping calls drop on the inner `T`, so - // `drop` needs the same guarantees as `lock`. `ThreadModeMutex` is Send even if - // T isn't, so without this check a user could create a ThreadModeMutex in thread mode, - // send it to interrupt context and drop it there, which would "send" a T even if T is not Send. - assert!( - in_thread_mode(), - "ThreadModeMutex can only be dropped from thread mode." - ); - - // Drop of the inner `T` happens after this. - } - } - - pub(crate) fn in_thread_mode() -> bool { - #[cfg(feature = "std")] - return Some("main") == std::thread::current().name(); - - #[cfg(not(feature = "std"))] - // ICSR.VECTACTIVE == 0 - return unsafe { (0xE000ED04 as *const u32).read_volatile() } & 0x1FF == 0; - } -} -#[cfg(any(cortex_m, feature = "std"))] -pub use thread_mode::*; diff --git a/embassy-util/src/channel/mod.rs b/embassy-util/src/channel/mod.rs deleted file mode 100644 index 5df1f5c5c..000000000 --- a/embassy-util/src/channel/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Async channels - -pub mod mpmc; -pub mod pubsub; -pub mod signal; diff --git a/embassy-util/src/channel/mpmc.rs b/embassy-util/src/channel/mpmc.rs deleted file mode 100644 index 263f29bfc..000000000 --- a/embassy-util/src/channel/mpmc.rs +++ /dev/null @@ -1,596 +0,0 @@ -//! A queue for sending values between asynchronous tasks. -//! -//! It can be used concurrently by multiple producers (senders) and multiple -//! consumers (receivers), i.e. it is an "MPMC channel". -//! -//! Receivers are competing for messages. So a message that is received by -//! one receiver is not received by any other. -//! -//! This queue takes a Mutex type so that various -//! targets can be attained. For example, a ThreadModeMutex can be used -//! for single-core Cortex-M targets where messages are only passed -//! between tasks running in thread mode. Similarly, a CriticalSectionMutex -//! can also be used for single-core targets where messages are to be -//! passed from exception mode e.g. out of an interrupt handler. -//! -//! This module provides a bounded channel that has a limit on the number of -//! messages that it can store, and if this limit is reached, trying to send -//! another message will result in an error being returned. -//! - -use core::cell::RefCell; -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use heapless::Deque; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::WakerRegistration; - -/// Send-only access to a [`Channel`]. -#[derive(Copy)] -pub struct Sender<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Sender { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> -where - M: RawMutex, -{ - /// Sends a value. - /// - /// See [`Channel::send()`] - pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { - self.channel.send(message) - } - - /// Attempt to immediately send a message. - /// - /// See [`Channel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send(message) - } -} - -/// Send-only access to a [`Channel`] without knowing channel size. -#[derive(Copy)] -pub struct DynamicSender<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicSender<'ch, T> { - fn clone(&self) -> Self { - DynamicSender { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> From> for DynamicSender<'ch, T> -where - M: RawMutex, -{ - fn from(s: Sender<'ch, M, T, N>) -> Self { - Self { channel: s.channel } - } -} - -impl<'ch, T> DynamicSender<'ch, T> { - /// Sends a value. - /// - /// See [`Channel::send()`] - pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { - DynamicSendFuture { - channel: self.channel, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// See [`Channel::send()`] - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.channel.try_send_with_context(message, None) - } -} - -/// Receive-only access to a [`Channel`]. -#[derive(Copy)] -pub struct Receiver<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Receiver { channel: self.channel } - } -} - -impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> -where - M: RawMutex, -{ - /// Receive the next value. - /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - self.channel.recv() - } - - /// Attempt to immediately receive the next value. - /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv() - } -} - -/// Receive-only access to a [`Channel`] without knowing channel size. -#[derive(Copy)] -pub struct DynamicReceiver<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Clone for DynamicReceiver<'ch, T> { - fn clone(&self) -> Self { - DynamicReceiver { channel: self.channel } - } -} - -impl<'ch, T> DynamicReceiver<'ch, T> { - /// Receive the next value. - /// - /// See [`Channel::recv()`]. - pub fn recv(&self) -> DynamicRecvFuture<'_, T> { - DynamicRecvFuture { channel: self.channel } - } - - /// Attempt to immediately receive the next value. - /// - /// See [`Channel::try_recv()`] - pub fn try_recv(&self) -> Result { - self.channel.try_recv_with_context(None) - } -} - -impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> -where - M: RawMutex, -{ - fn from(s: Receiver<'ch, M, T, N>) -> Self { - Self { channel: s.channel } - } -} - -/// Future returned by [`Channel::recv`] and [`Receiver::recv`]. -pub struct RecvFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, -} - -impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } - } -} - -/// Future returned by [`DynamicReceiver::recv`]. -pub struct DynamicRecvFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, -} - -impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } - } -} - -/// Future returned by [`Channel::send`] and [`Sender::send`]. -pub struct SendFuture<'ch, M, T, const N: usize> -where - M: RawMutex, -{ - channel: &'ch Channel, - message: Option, -} - -impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> -where - M: RawMutex, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} - -/// Future returned by [`DynamicSender::send`]. -pub struct DynamicSendFuture<'ch, T> { - channel: &'ch dyn DynamicChannel, - message: Option, -} - -impl<'ch, T> Future for DynamicSendFuture<'ch, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.message.take() { - Some(m) => match self.channel.try_send_with_context(m, Some(cx)) { - Ok(..) => Poll::Ready(()), - Err(TrySendError::Full(m)) => { - self.message = Some(m); - Poll::Pending - } - }, - None => panic!("Message cannot be None"), - } - } -} - -impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {} - -trait DynamicChannel { - fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError>; - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; -} - -/// Error returned by [`try_recv`](Channel::try_recv). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryRecvError { - /// A message could not be received because the channel is empty. - Empty, -} - -/// Error returned by [`try_send`](Channel::try_send). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TrySendError { - /// The data could not be sent on the channel because the channel is - /// currently full and sending would require blocking. - Full(T), -} - -struct ChannelState { - queue: Deque, - receiver_waker: WakerRegistration, - senders_waker: WakerRegistration, -} - -impl ChannelState { - const fn new() -> Self { - ChannelState { - queue: Deque::new(), - receiver_waker: WakerRegistration::new(), - senders_waker: WakerRegistration::new(), - } - } - - fn try_recv(&mut self) -> Result { - self.try_recv_with_context(None) - } - - fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { - if self.queue.is_full() { - self.senders_waker.wake(); - } - - if let Some(message) = self.queue.pop_front() { - Ok(message) - } else { - if let Some(cx) = cx { - self.receiver_waker.register(cx.waker()); - } - Err(TryRecvError::Empty) - } - } - - fn try_send(&mut self, message: T) -> Result<(), TrySendError> { - self.try_send_with_context(message, None) - } - - fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - match self.queue.push_back(message) { - Ok(()) => { - self.receiver_waker.wake(); - Ok(()) - } - Err(message) => { - if let Some(cx) = cx { - self.senders_waker.register(cx.waker()); - } - Err(TrySendError::Full(message)) - } - } - } -} - -/// A bounded channel for communicating between asynchronous tasks -/// with backpressure. -/// -/// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is -/// received from the channel. -/// -/// All data sent will become available in the same order as it was sent. -pub struct Channel -where - M: RawMutex, -{ - inner: Mutex>>, -} - -impl Channel -where - M: RawMutex, -{ - /// Establish a new bounded channel. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy_util::channel::mpmc::Channel; - /// use embassy_util::blocking_mutex::raw::NoopRawMutex; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::new(); - /// ``` - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(ChannelState::new())), - } - } - - fn lock(&self, f: impl FnOnce(&mut ChannelState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) - } - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - self.lock(|c| c.try_recv_with_context(cx)) - } - - fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - self.lock(|c| c.try_send_with_context(m, cx)) - } - - /// Get a sender for this channel. - pub fn sender(&self) -> Sender<'_, M, T, N> { - Sender { channel: self } - } - - /// Get a receiver for this channel. - pub fn receiver(&self) -> Receiver<'_, M, T, N> { - Receiver { channel: self } - } - - /// Send a value, waiting until there is capacity. - /// - /// Sending completes when the value has been pushed to the channel's queue. - /// This doesn't mean the value has been received yet. - pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> { - SendFuture { - channel: self, - message: Some(message), - } - } - - /// Attempt to immediately send a message. - /// - /// This method differs from [`send`](Channel::send) by returning immediately if the channel's - /// buffer is full, instead of waiting. - /// - /// # Errors - /// - /// If the channel capacity has been reached, i.e., the channel has `n` - /// buffered values where `n` is the argument passed to [`Channel`], then an - /// error is returned. - pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - self.lock(|c| c.try_send(message)) - } - - /// Receive the next value. - /// - /// If there are no messages in the channel's buffer, this method will - /// wait until a message is sent. - pub fn recv(&self) -> RecvFuture<'_, M, T, N> { - RecvFuture { channel: self } - } - - /// Attempt to immediately receive a message. - /// - /// This method will either receive a message from the channel immediately or return an error - /// if the channel is empty. - pub fn try_recv(&self) -> Result { - self.lock(|c| c.try_recv()) - } -} - -/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the -/// tradeoff cost of dynamic dispatch. -impl DynamicChannel for Channel -where - M: RawMutex, -{ - fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { - Channel::try_send_with_context(self, m, cx) - } - - fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result { - Channel::try_recv_with_context(self, cx) - } -} - -#[cfg(test)] -mod tests { - use core::time::Duration; - - use futures_executor::ThreadPool; - use futures_timer::Delay; - use futures_util::task::SpawnExt; - use static_cell::StaticCell; - - use super::*; - use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - - fn capacity(c: &ChannelState) -> usize { - c.queue.capacity() - c.queue.len() - } - - #[test] - fn sending_once() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn sending_when_full() { - let mut c = ChannelState::::new(); - let _ = c.try_send(1); - let _ = c.try_send(1); - let _ = c.try_send(1); - match c.try_send(2) { - Err(TrySendError::Full(2)) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 0); - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = ChannelState::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = ChannelState::::new(); - match c.try_recv() { - Err(TryRecvError::Empty) => assert!(true), - _ => assert!(false), - } - assert_eq!(capacity(&c), 3); - } - - #[test] - fn simple_send_and_receive() { - let c = Channel::::new(); - assert!(c.try_send(1).is_ok()); - assert_eq!(c.try_recv().unwrap(), 1); - } - - #[test] - fn cloning() { - let c = Channel::::new(); - let r1 = c.receiver(); - let s1 = c.sender(); - - let _ = r1.clone(); - let _ = s1.clone(); - } - - #[test] - fn dynamic_dispatch() { - let c = Channel::::new(); - let s: DynamicSender<'_, u32> = c.sender().into(); - let r: DynamicReceiver<'_, u32> = c.receiver().into(); - - assert!(s.try_send(1).is_ok()); - assert_eq!(r.try_recv().unwrap(), 1); - } - - #[futures_test::test] - async fn receiver_receives_given_try_send_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Channel::new()); - let c2 = c; - assert!(executor - .spawn(async move { - assert!(c2.try_send(1).is_ok()); - }) - .is_ok()); - assert_eq!(c.recv().await, 1); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let c = Channel::::new(); - c.send(1).await; - assert_eq!(c.recv().await, 1); - } - - #[futures_test::test] - async fn senders_sends_wait_until_capacity() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Channel::new()); - assert!(c.try_send(1).is_ok()); - - let c2 = c; - let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await }); - let c2 = c; - let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await }); - // Wish I could think of a means of determining that the async send is waiting instead. - // However, I've used the debugger to observe that the send does indeed wait. - Delay::new(Duration::from_millis(500)).await; - assert_eq!(c.recv().await, 1); - assert!(executor - .spawn(async move { - loop { - c.recv().await; - } - }) - .is_ok()); - send_task_1.unwrap().await; - send_task_2.unwrap().await; - } -} diff --git a/embassy-util/src/channel/pubsub/mod.rs b/embassy-util/src/channel/pubsub/mod.rs deleted file mode 100644 index ecc8fbd8f..000000000 --- a/embassy-util/src/channel/pubsub/mod.rs +++ /dev/null @@ -1,542 +0,0 @@ -//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers. - -#![deny(missing_docs)] - -use core::cell::RefCell; -use core::fmt::Debug; -use core::task::{Context, Poll, Waker}; - -use heapless::Deque; - -use self::publisher::{ImmediatePub, Pub}; -use self::subscriber::Sub; -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::MultiWakerRegistration; - -pub mod publisher; -pub mod subscriber; - -pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher}; -pub use subscriber::{DynSubscriber, Subscriber}; - -/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers -/// -/// Any published message can be read by all subscribers. -/// A publisher can choose how it sends its message. -/// -/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue. -/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message -/// in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive -/// an error to indicate that it has lagged. -/// -/// ## Example -/// -/// ``` -/// # use embassy_util::blocking_mutex::raw::NoopRawMutex; -/// # use embassy_util::channel::pubsub::WaitResult; -/// # use embassy_util::channel::pubsub::PubSubChannel; -/// # use futures_executor::block_on; -/// # let test = async { -/// // Create the channel. This can be static as well -/// let channel = PubSubChannel::::new(); -/// -/// // This is a generic subscriber with a direct reference to the channel -/// let mut sub0 = channel.subscriber().unwrap(); -/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel -/// let mut sub1 = channel.dyn_subscriber().unwrap(); -/// -/// let pub0 = channel.publisher().unwrap(); -/// -/// // Publish a message, but wait if the queue is full -/// pub0.publish(42).await; -/// -/// // Publish a message, but if the queue is full, just kick out the oldest message. -/// // This may cause some subscribers to miss a message -/// pub0.publish_immediate(43); -/// -/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result -/// assert_eq!(sub0.next_message().await, WaitResult::Message(42)); -/// assert_eq!(sub1.next_message().await, WaitResult::Message(42)); -/// -/// // Wait again, but this time ignore any Lag results -/// assert_eq!(sub0.next_message_pure().await, 43); -/// assert_eq!(sub1.next_message_pure().await, 43); -/// -/// // There's also a polling interface -/// assert_eq!(sub0.try_next_message(), None); -/// assert_eq!(sub1.try_next_message(), None); -/// # }; -/// # -/// # block_on(test); -/// ``` -/// -pub struct PubSubChannel { - inner: Mutex>>, -} - -impl - PubSubChannel -{ - /// Create a new channel - pub const fn new() -> Self { - Self { - inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())), - } - } - - /// Create a new subscriber. It will only receive messages that are published after its creation. - /// - /// If there are no subscriber slots left, an error will be returned. - pub fn subscriber(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.subscriber_count >= SUBS { - Err(Error::MaximumSubscribersReached) - } else { - s.subscriber_count += 1; - Ok(Subscriber(Sub::new(s.next_message_id, self))) - } - }) - } - - /// Create a new subscriber. It will only receive messages that are published after its creation. - /// - /// If there are no subscriber slots left, an error will be returned. - pub fn dyn_subscriber(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.subscriber_count >= SUBS { - Err(Error::MaximumSubscribersReached) - } else { - s.subscriber_count += 1; - Ok(DynSubscriber(Sub::new(s.next_message_id, self))) - } - }) - } - - /// Create a new publisher - /// - /// If there are no publisher slots left, an error will be returned. - pub fn publisher(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.publisher_count >= PUBS { - Err(Error::MaximumPublishersReached) - } else { - s.publisher_count += 1; - Ok(Publisher(Pub::new(self))) - } - }) - } - - /// Create a new publisher - /// - /// If there are no publisher slots left, an error will be returned. - pub fn dyn_publisher(&self) -> Result, Error> { - self.inner.lock(|inner| { - let mut s = inner.borrow_mut(); - - if s.publisher_count >= PUBS { - Err(Error::MaximumPublishersReached) - } else { - s.publisher_count += 1; - Ok(DynPublisher(Pub::new(self))) - } - }) - } - - /// Create a new publisher that can only send immediate messages. - /// This kind of publisher does not take up a publisher slot. - pub fn immediate_publisher(&self) -> ImmediatePublisher { - ImmediatePublisher(ImmediatePub::new(self)) - } - - /// Create a new publisher that can only send immediate messages. - /// This kind of publisher does not take up a publisher slot. - pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher { - DynImmediatePublisher(ImmediatePub::new(self)) - } -} - -impl PubSubBehavior - for PubSubChannel -{ - fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - - // Check if we can read a message - match s.get_message(*next_message_id) { - // Yes, so we are done polling - Some(WaitResult::Message(message)) => { - *next_message_id += 1; - Poll::Ready(WaitResult::Message(message)) - } - // No, so we need to reregister our waker and sleep again - None => { - if let Some(cx) = cx { - s.register_subscriber_waker(cx.waker()); - } - Poll::Pending - } - // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged - Some(WaitResult::Lagged(amount)) => { - *next_message_id += amount; - Poll::Ready(WaitResult::Lagged(amount)) - } - } - }) - } - - fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - // Try to publish the message - match s.try_publish(message) { - // We did it, we are ready - Ok(()) => Ok(()), - // The queue is full, so we need to reregister our waker and go to sleep - Err(message) => { - if let Some(cx) = cx { - s.register_publisher_waker(cx.waker()); - } - Err(message) - } - } - }) - } - - fn publish_immediate(&self, message: T) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.publish_immediate(message) - }) - } - - fn unregister_subscriber(&self, subscriber_next_message_id: u64) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.unregister_subscriber(subscriber_next_message_id) - }) - } - - fn unregister_publisher(&self) { - self.inner.lock(|s| { - let mut s = s.borrow_mut(); - s.unregister_publisher() - }) - } -} - -/// Internal state for the PubSub channel -struct PubSubState { - /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it - queue: Deque<(T, usize), CAP>, - /// Every message has an id. - /// Don't worry, we won't run out. - /// If a million messages were published every second, then the ID's would run out in about 584942 years. - next_message_id: u64, - /// Collection of wakers for Subscribers that are waiting. - subscriber_wakers: MultiWakerRegistration, - /// Collection of wakers for Publishers that are waiting. - publisher_wakers: MultiWakerRegistration, - /// The amount of subscribers that are active - subscriber_count: usize, - /// The amount of publishers that are active - publisher_count: usize, -} - -impl PubSubState { - /// Create a new internal channel state - const fn new() -> Self { - Self { - queue: Deque::new(), - next_message_id: 0, - subscriber_wakers: MultiWakerRegistration::new(), - publisher_wakers: MultiWakerRegistration::new(), - subscriber_count: 0, - publisher_count: 0, - } - } - - fn try_publish(&mut self, message: T) -> Result<(), T> { - if self.subscriber_count == 0 { - // We don't need to publish anything because there is no one to receive it - return Ok(()); - } - - if self.queue.is_full() { - return Err(message); - } - // We just did a check for this - self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); - - self.next_message_id += 1; - - // Wake all of the subscribers - self.subscriber_wakers.wake(); - - Ok(()) - } - - fn publish_immediate(&mut self, message: T) { - // Make space in the queue if required - if self.queue.is_full() { - self.queue.pop_front(); - } - - // This will succeed because we made sure there is space - self.try_publish(message).ok().unwrap(); - } - - fn get_message(&mut self, message_id: u64) -> Option> { - let start_id = self.next_message_id - self.queue.len() as u64; - - if message_id < start_id { - return Some(WaitResult::Lagged(start_id - message_id)); - } - - let current_message_index = (message_id - start_id) as usize; - - if current_message_index >= self.queue.len() { - return None; - } - - // We've checked that the index is valid - let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap(); - - // We're reading this item, so decrement the counter - queue_item.1 -= 1; - let message = queue_item.0.clone(); - - if current_message_index == 0 && queue_item.1 == 0 { - self.queue.pop_front(); - self.publisher_wakers.wake(); - } - - Some(WaitResult::Message(message)) - } - - fn register_subscriber_waker(&mut self, waker: &Waker) { - match self.subscriber_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a subscriber that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.subscriber_wakers.wake(); - self.subscriber_wakers.register(waker).unwrap(); - } - } - } - - fn register_publisher_waker(&mut self, waker: &Waker) { - match self.publisher_wakers.register(waker) { - Ok(()) => {} - Err(_) => { - // All waker slots were full. This can only happen when there was a publisher that now has dropped. - // We need to throw it away. It's a bit inefficient, but we can wake everything. - // Any future that is still active will simply reregister. - // This won't happen a lot, so it's ok. - self.publisher_wakers.wake(); - self.publisher_wakers.register(waker).unwrap(); - } - } - } - - fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { - self.subscriber_count -= 1; - - // All messages that haven't been read yet by this subscriber must have their counter decremented - let start_id = self.next_message_id - self.queue.len() as u64; - if subscriber_next_message_id >= start_id { - let current_message_index = (subscriber_next_message_id - start_id) as usize; - self.queue - .iter_mut() - .skip(current_message_index) - .for_each(|(_, counter)| *counter -= 1); - } - } - - fn unregister_publisher(&mut self) { - self.publisher_count -= 1; - } -} - -/// Error type for the [PubSubChannel] -#[derive(Debug, PartialEq, Eq, Clone)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum Error { - /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or - /// the capacity of the channels must be increased. - MaximumSubscribersReached, - /// All publisher slots are used. To add another publisher, first another publisher must be dropped or - /// the capacity of the channels must be increased. - MaximumPublishersReached, -} - -/// 'Middle level' behaviour of the pubsub channel. -/// This trait is used so that Sub and Pub can be generic over the channel. -pub trait PubSubBehavior { - /// Try to get a message from the queue with the given message id. - /// - /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. - fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll>; - - /// Try to publish a message to the queue. - /// - /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. - fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; - - /// Publish a message immediately - fn publish_immediate(&self, message: T); - - /// Let the channel know that a subscriber has dropped - fn unregister_subscriber(&self, subscriber_next_message_id: u64); - - /// Let the channel know that a publisher has dropped - fn unregister_publisher(&self); -} - -/// The result of the subscriber wait procedure -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum WaitResult { - /// The subscriber did not receive all messages and lagged by the given amount of messages. - /// (This is the amount of messages that were missed) - Lagged(u64), - /// A message was received - Message(T), -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::blocking_mutex::raw::NoopRawMutex; - - #[futures_test::test] - async fn dyn_pub_sub_works() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.dyn_subscriber().unwrap(); - let mut sub1 = channel.dyn_subscriber().unwrap(); - let pub0 = channel.dyn_publisher().unwrap(); - - pub0.publish(42).await; - - assert_eq!(sub0.next_message().await, WaitResult::Message(42)); - assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - - assert_eq!(sub0.try_next_message(), None); - assert_eq!(sub1.try_next_message(), None); - } - - #[futures_test::test] - async fn all_subscribers_receive() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.subscriber().unwrap(); - let mut sub1 = channel.subscriber().unwrap(); - let pub0 = channel.publisher().unwrap(); - - pub0.publish(42).await; - - assert_eq!(sub0.next_message().await, WaitResult::Message(42)); - assert_eq!(sub1.next_message().await, WaitResult::Message(42)); - - assert_eq!(sub0.try_next_message(), None); - assert_eq!(sub1.try_next_message(), None); - } - - #[futures_test::test] - async fn lag_when_queue_full_on_immediate_publish() { - let channel = PubSubChannel::::new(); - - let mut sub0 = channel.subscriber().unwrap(); - let pub0 = channel.publisher().unwrap(); - - pub0.publish_immediate(42); - pub0.publish_immediate(43); - pub0.publish_immediate(44); - pub0.publish_immediate(45); - pub0.publish_immediate(46); - pub0.publish_immediate(47); - - assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2))); - assert_eq!(sub0.next_message().await, WaitResult::Message(44)); - assert_eq!(sub0.next_message().await, WaitResult::Message(45)); - assert_eq!(sub0.next_message().await, WaitResult::Message(46)); - assert_eq!(sub0.next_message().await, WaitResult::Message(47)); - assert_eq!(sub0.try_next_message(), None); - } - - #[test] - fn limited_subs_and_pubs() { - let channel = PubSubChannel::::new(); - - let sub0 = channel.subscriber(); - let sub1 = channel.subscriber(); - let sub2 = channel.subscriber(); - let sub3 = channel.subscriber(); - let sub4 = channel.subscriber(); - - assert!(sub0.is_ok()); - assert!(sub1.is_ok()); - assert!(sub2.is_ok()); - assert!(sub3.is_ok()); - assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached); - - drop(sub0); - - let sub5 = channel.subscriber(); - assert!(sub5.is_ok()); - - // publishers - - let pub0 = channel.publisher(); - let pub1 = channel.publisher(); - let pub2 = channel.publisher(); - let pub3 = channel.publisher(); - let pub4 = channel.publisher(); - - assert!(pub0.is_ok()); - assert!(pub1.is_ok()); - assert!(pub2.is_ok()); - assert!(pub3.is_ok()); - assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached); - - drop(pub0); - - let pub5 = channel.publisher(); - assert!(pub5.is_ok()); - } - - #[test] - fn publisher_wait_on_full_queue() { - let channel = PubSubChannel::::new(); - - let pub0 = channel.publisher().unwrap(); - - // There are no subscribers, so the queue will never be full - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - - let sub0 = channel.subscriber().unwrap(); - - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Ok(())); - assert_eq!(pub0.try_publish(0), Err(0)); - - drop(sub0); - } -} diff --git a/embassy-util/src/channel/pubsub/publisher.rs b/embassy-util/src/channel/pubsub/publisher.rs deleted file mode 100644 index 705797f60..000000000 --- a/embassy-util/src/channel/pubsub/publisher.rs +++ /dev/null @@ -1,182 +0,0 @@ -//! Implementation of anything directly publisher related - -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use super::{PubSubBehavior, PubSubChannel}; -use crate::blocking_mutex::raw::RawMutex; - -/// A publisher to a channel -pub struct Pub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { - pub(super) fn new(channel: &'a PSB) -> Self { - Self { - channel, - _phantom: Default::default(), - } - } - - /// Publish a message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message - pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> { - PublisherWaitFuture { - message: Some(message), - publisher: self, - } - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_publisher() - } -} - -/// A publisher that holds a dynamic reference to the channel -pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynPublisher<'a, T> { - type Target = Pub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that holds a generic reference to the channel -pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) Pub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Pub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Publisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel. -/// (So an infinite amount is possible) -pub struct ImmediatePub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The channel we are a publisher for - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { - pub(super) fn new(channel: &'a PSB) -> Self { - Self { - channel, - _phantom: Default::default(), - } - } - /// Publish the message right now even when the queue is full. - /// This may cause a subscriber to miss an older message. - pub fn publish_immediate(&self, message: T) { - self.channel.publish_immediate(message) - } - - /// Publish a message if there is space in the message queue - pub fn try_publish(&self, message: T) -> Result<(), T> { - self.channel.publish_with_context(message, None) - } -} - -/// An immediate publisher that holds a dynamic reference to the channel -pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> { - type Target = ImmediatePub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// An immediate publisher that holds a generic reference to the channel -pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) ImmediatePub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = ImmediatePub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the publisher wait action -pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message we need to publish - message: Option, - publisher: &'s Pub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let message = self.message.take().unwrap(); - match self.publisher.channel.publish_with_context(message, Some(cx)) { - Ok(()) => Poll::Ready(()), - Err(message) => { - self.message = Some(message); - Poll::Pending - } - } - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-util/src/channel/pubsub/subscriber.rs b/embassy-util/src/channel/pubsub/subscriber.rs deleted file mode 100644 index b9a2cbe18..000000000 --- a/embassy-util/src/channel/pubsub/subscriber.rs +++ /dev/null @@ -1,152 +0,0 @@ -//! Implementation of anything directly subscriber related - -use core::future::Future; -use core::marker::PhantomData; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use super::{PubSubBehavior, PubSubChannel, WaitResult}; -use crate::blocking_mutex::raw::RawMutex; - -/// A subscriber to a channel -pub struct Sub<'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - /// The message id of the next message we are yet to receive - next_message_id: u64, - /// The channel we are a subscriber to - channel: &'a PSB, - _phantom: PhantomData, -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { - pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self { - Self { - next_message_id, - channel, - _phantom: Default::default(), - } - } - - /// Wait for a published message - pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> { - SubscriberWaitFuture { subscriber: self } - } - - /// Wait for a published message (ignoring lag results) - pub async fn next_message_pure(&mut self) -> T { - loop { - match self.next_message().await { - WaitResult::Lagged(_) => continue, - WaitResult::Message(message) => break message, - } - } - } - - /// Try to see if there's a published message we haven't received yet. - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message(&mut self) -> Option> { - match self.channel.get_message_with_context(&mut self.next_message_id, None) { - Poll::Ready(result) => Some(result), - Poll::Pending => None, - } - } - - /// Try to see if there's a published message we haven't received yet (ignoring lag results). - /// - /// This function does not peek. The message is received if there is one. - pub fn try_next_message_pure(&mut self) -> Option { - loop { - match self.try_next_message() { - Some(WaitResult::Lagged(_)) => continue, - Some(WaitResult::Message(message)) => break Some(message), - None => break None, - } - } - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { - fn drop(&mut self) { - self.channel.unregister_subscriber(self.next_message_id) - } -} - -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {} - -/// Warning: The stream implementation ignores lag results and returns all messages. -/// This might miss some messages without you knowing it. -impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self - .channel - .get_message_with_context(&mut self.next_message_id, Some(cx)) - { - Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), - Poll::Ready(WaitResult::Lagged(_)) => { - cx.waker().wake_by_ref(); - Poll::Pending - } - Poll::Pending => Poll::Pending, - } - } -} - -/// A subscriber that holds a dynamic reference to the channel -pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior + 'a, T>); - -impl<'a, T: Clone> Deref for DynSubscriber<'a, T> { - type Target = Sub<'a, dyn PubSubBehavior + 'a, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// A subscriber that holds a generic reference to the channel -pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>( - pub(super) Sub<'a, PubSubChannel, T>, -); - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - type Target = Sub<'a, PubSubChannel, T>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut - for Subscriber<'a, M, T, CAP, SUBS, PUBS> -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the subscriber wait action -pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> { - subscriber: &'s mut Sub<'a, PSB, T>, -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> { - type Output = WaitResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.subscriber - .channel - .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) - } -} - -impl<'s, 'a, PSB: PubSubBehavior + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {} diff --git a/embassy-util/src/channel/signal.rs b/embassy-util/src/channel/signal.rs deleted file mode 100644 index 05889f5a4..000000000 --- a/embassy-util/src/channel/signal.rs +++ /dev/null @@ -1,100 +0,0 @@ -//! A synchronization primitive for passing the latest value to a task. -use core::cell::UnsafeCell; -use core::future::Future; -use core::mem; -use core::task::{Context, Poll, Waker}; - -/// Single-slot signaling primitive. -/// -/// This is similar to a [`Channel`](crate::channel::mpmc::Channel) with a buffer size of 1, except -/// "sending" to it (calling [`Signal::signal`]) when full will overwrite the previous value instead -/// of waiting for the receiver to pop the previous value. -/// -/// It is useful for sending data between tasks when the receiver only cares about -/// the latest data, and therefore it's fine to "lose" messages. This is often the case for "state" -/// updates. -/// -/// For more advanced use cases, you might want to use [`Channel`](crate::channel::mpmc::Channel) instead. -/// -/// Signals are generally declared as `static`s and then borrowed as required. -/// -/// ``` -/// use embassy_util::channel::signal::Signal; -/// -/// enum SomeCommand { -/// On, -/// Off, -/// } -/// -/// static SOME_SIGNAL: Signal = Signal::new(); -/// ``` -pub struct Signal { - state: UnsafeCell>, -} - -enum State { - None, - Waiting(Waker), - Signaled(T), -} - -unsafe impl Send for Signal {} -unsafe impl Sync for Signal {} - -impl Signal { - /// Create a new `Signal`. - pub const fn new() -> Self { - Self { - state: UnsafeCell::new(State::None), - } - } -} - -impl Signal { - /// Mark this Signal as signaled. - pub fn signal(&self, val: T) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { - waker.wake(); - } - }) - } - - /// Remove the queued value in this `Signal`, if any. - pub fn reset(&self) { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - *state = State::None - }) - } - - /// Manually poll the Signal future. - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll { - critical_section::with(|_| unsafe { - let state = &mut *self.state.get(); - match state { - State::None => { - *state = State::Waiting(cx.waker().clone()); - Poll::Pending - } - State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, - State::Waiting(_) => panic!("waker overflow"), - State::Signaled(_) => match mem::replace(state, State::None) { - State::Signaled(res) => Poll::Ready(res), - _ => unreachable!(), - }, - } - }) - } - - /// Future that completes when this Signal has been signaled. - pub fn wait(&self) -> impl Future + '_ { - futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) - } - - /// non-blocking method to check whether this signal has been signaled. - pub fn signaled(&self) -> bool { - critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) - } -} diff --git a/embassy-util/src/fmt.rs b/embassy-util/src/fmt.rs deleted file mode 100644 index f8bb0a035..000000000 --- a/embassy-util/src/fmt.rs +++ /dev/null @@ -1,228 +0,0 @@ -#![macro_use] -#![allow(unused_macros)] - -#[cfg(all(feature = "defmt", feature = "log"))] -compile_error!("You may not enable both `defmt` and `log` features."); - -macro_rules! assert { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::assert!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::assert!($($x)*); - } - }; -} - -macro_rules! assert_eq { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::assert_eq!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::assert_eq!($($x)*); - } - }; -} - -macro_rules! assert_ne { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::assert_ne!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::assert_ne!($($x)*); - } - }; -} - -macro_rules! debug_assert { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::debug_assert!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::debug_assert!($($x)*); - } - }; -} - -macro_rules! debug_assert_eq { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::debug_assert_eq!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::debug_assert_eq!($($x)*); - } - }; -} - -macro_rules! debug_assert_ne { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::debug_assert_ne!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::debug_assert_ne!($($x)*); - } - }; -} - -macro_rules! todo { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::todo!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::todo!($($x)*); - } - }; -} - -macro_rules! unreachable { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::unreachable!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::unreachable!($($x)*); - } - }; -} - -macro_rules! panic { - ($($x:tt)*) => { - { - #[cfg(not(feature = "defmt"))] - ::core::panic!($($x)*); - #[cfg(feature = "defmt")] - ::defmt::panic!($($x)*); - } - }; -} - -macro_rules! trace { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::trace!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::trace!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! debug { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::debug!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::debug!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! info { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::info!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::info!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! warn { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::warn!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::warn!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -macro_rules! error { - ($s:literal $(, $x:expr)* $(,)?) => { - { - #[cfg(feature = "log")] - ::log::error!($s $(, $x)*); - #[cfg(feature = "defmt")] - ::defmt::error!($s $(, $x)*); - #[cfg(not(any(feature = "log", feature="defmt")))] - let _ = ($( & $x ),*); - } - }; -} - -#[cfg(feature = "defmt")] -macro_rules! unwrap { - ($($x:tt)*) => { - ::defmt::unwrap!($($x)*) - }; -} - -#[cfg(not(feature = "defmt"))] -macro_rules! unwrap { - ($arg:expr) => { - match $crate::fmt::Try::into_result($arg) { - ::core::result::Result::Ok(t) => t, - ::core::result::Result::Err(e) => { - ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); - } - } - }; - ($arg:expr, $($msg:expr),+ $(,)? ) => { - match $crate::fmt::Try::into_result($arg) { - ::core::result::Result::Ok(t) => t, - ::core::result::Result::Err(e) => { - ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); - } - } - } -} - -#[cfg(feature = "defmt-timestamp-uptime")] -defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub struct NoneError; - -pub trait Try { - type Ok; - type Error; - fn into_result(self) -> Result; -} - -impl Try for Option { - type Ok = T; - type Error = NoneError; - - #[inline] - fn into_result(self) -> Result { - self.ok_or(NoneError) - } -} - -impl Try for Result { - type Ok = T; - type Error = E; - - #[inline] - fn into_result(self) -> Self { - self - } -} diff --git a/embassy-util/src/lib.rs b/embassy-util/src/lib.rs deleted file mode 100644 index 8ec3300d9..000000000 --- a/embassy-util/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] -#![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] -#![allow(clippy::new_without_default)] -#![doc = include_str!("../../README.md")] -#![warn(missing_docs)] - -// This mod MUST go first, so that the others see its macros. -pub(crate) mod fmt; - -// internal use -mod ring_buffer; - -pub mod blocking_mutex; -pub mod channel; -pub mod mutex; -pub mod pipe; -pub mod waitqueue; - -mod select; -mod yield_now; - -pub use select::*; -pub use yield_now::*; diff --git a/embassy-util/src/mutex.rs b/embassy-util/src/mutex.rs deleted file mode 100644 index 75a6e8dd3..000000000 --- a/embassy-util/src/mutex.rs +++ /dev/null @@ -1,167 +0,0 @@ -//! Async mutex. -//! -//! This module provides a mutex that can be used to synchronize data between asynchronous tasks. -use core::cell::{RefCell, UnsafeCell}; -use core::ops::{Deref, DerefMut}; -use core::task::Poll; - -use futures_util::future::poll_fn; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex as BlockingMutex; -use crate::waitqueue::WakerRegistration; - -/// Error returned by [`Mutex::try_lock`] -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub struct TryLockError; - -struct State { - locked: bool, - waker: WakerRegistration, -} - -/// Async mutex. -/// -/// The mutex is generic over a blocking [`RawMutex`](crate::blocking_mutex::raw::RawMutex). -/// The raw mutex is used to guard access to the internal "is locked" flag. It -/// is held for very short periods only, while locking and unlocking. It is *not* held -/// for the entire time the async Mutex is locked. -/// -/// Which implementation you select depends on the context in which you're using the mutex. -/// -/// Use [`CriticalSectionRawMutex`](crate::blocking_mutex::raw::CriticalSectionRawMutex) when data can be shared between threads and interrupts. -/// -/// Use [`NoopRawMutex`](crate::blocking_mutex::raw::NoopRawMutex) when data is only shared between tasks running on the same executor. -/// -/// Use [`ThreadModeRawMutex`](crate::blocking_mutex::raw::ThreadModeRawMutex) when data is shared between tasks running on the same executor but you want a singleton. -/// -pub struct Mutex -where - M: RawMutex, - T: ?Sized, -{ - state: BlockingMutex>, - inner: UnsafeCell, -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -/// Async mutex. -impl Mutex -where - M: RawMutex, -{ - /// Create a new mutex with the given value. - pub const fn new(value: T) -> Self { - Self { - inner: UnsafeCell::new(value), - state: BlockingMutex::new(RefCell::new(State { - locked: false, - waker: WakerRegistration::new(), - })), - } - } -} - -impl Mutex -where - M: RawMutex, - T: ?Sized, -{ - /// Lock the mutex. - /// - /// This will wait for the mutex to be unlocked if it's already locked. - pub async fn lock(&self) -> MutexGuard<'_, M, T> { - poll_fn(|cx| { - let ready = self.state.lock(|s| { - let mut s = s.borrow_mut(); - if s.locked { - s.waker.register(cx.waker()); - false - } else { - s.locked = true; - true - } - }); - - if ready { - Poll::Ready(MutexGuard { mutex: self }) - } else { - Poll::Pending - } - }) - .await - } - - /// Attempt to immediately lock the mutex. - /// - /// If the mutex is already locked, this will return an error instead of waiting. - pub fn try_lock(&self) -> Result, TryLockError> { - self.state.lock(|s| { - let mut s = s.borrow_mut(); - if s.locked { - Err(TryLockError) - } else { - s.locked = true; - Ok(()) - } - })?; - - Ok(MutexGuard { mutex: self }) - } -} - -/// Async mutex guard. -/// -/// Owning an instance of this type indicates having -/// successfully locked the mutex, and grants access to the contents. -/// -/// Dropping it unlocks the mutex. -pub struct MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - mutex: &'a Mutex, -} - -impl<'a, M, T> Drop for MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - fn drop(&mut self) { - self.mutex.state.lock(|s| { - let mut s = s.borrow_mut(); - s.locked = false; - s.waker.wake(); - }) - } -} - -impl<'a, M, T> Deref for MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - type Target = T; - fn deref(&self) -> &Self::Target { - // Safety: the MutexGuard represents exclusive access to the contents - // of the mutex, so it's OK to get it. - unsafe { &*(self.mutex.inner.get() as *const T) } - } -} - -impl<'a, M, T> DerefMut for MutexGuard<'a, M, T> -where - M: RawMutex, - T: ?Sized, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - // Safety: the MutexGuard represents exclusive access to the contents - // of the mutex, so it's OK to get it. - unsafe { &mut *(self.mutex.inner.get()) } - } -} diff --git a/embassy-util/src/pipe.rs b/embassy-util/src/pipe.rs deleted file mode 100644 index d85b843e4..000000000 --- a/embassy-util/src/pipe.rs +++ /dev/null @@ -1,551 +0,0 @@ -//! Async byte stream pipe. - -use core::cell::RefCell; -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::ring_buffer::RingBuffer; -use crate::waitqueue::WakerRegistration; - -/// Write-only access to a [`Pipe`]. -#[derive(Copy)] -pub struct Writer<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, -} - -impl<'p, M, const N: usize> Clone for Writer<'p, M, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Writer { pipe: self.pipe } - } -} - -impl<'p, M, const N: usize> Writer<'p, M, N> -where - M: RawMutex, -{ - /// Writes a value. - /// - /// See [`Pipe::write()`] - pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { - self.pipe.write(buf) - } - - /// Attempt to immediately write a message. - /// - /// See [`Pipe::write()`] - pub fn try_write(&self, buf: &[u8]) -> Result { - self.pipe.try_write(buf) - } -} - -/// Future returned by [`Pipe::write`] and [`Writer::write`]. -pub struct WriteFuture<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, - buf: &'p [u8], -} - -impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N> -where - M: RawMutex, -{ - type Output = usize; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.pipe.try_write_with_context(Some(cx), self.buf) { - Ok(n) => Poll::Ready(n), - Err(TryWriteError::Full) => Poll::Pending, - } - } -} - -impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {} - -/// Read-only access to a [`Pipe`]. -#[derive(Copy)] -pub struct Reader<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, -} - -impl<'p, M, const N: usize> Clone for Reader<'p, M, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Reader { pipe: self.pipe } - } -} - -impl<'p, M, const N: usize> Reader<'p, M, N> -where - M: RawMutex, -{ - /// Reads a value. - /// - /// See [`Pipe::read()`] - pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { - self.pipe.read(buf) - } - - /// Attempt to immediately read a message. - /// - /// See [`Pipe::read()`] - pub fn try_read(&self, buf: &mut [u8]) -> Result { - self.pipe.try_read(buf) - } -} - -/// Future returned by [`Pipe::read`] and [`Reader::read`]. -pub struct ReadFuture<'p, M, const N: usize> -where - M: RawMutex, -{ - pipe: &'p Pipe, - buf: &'p mut [u8], -} - -impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N> -where - M: RawMutex, -{ - type Output = usize; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.pipe.try_read_with_context(Some(cx), self.buf) { - Ok(n) => Poll::Ready(n), - Err(TryReadError::Empty) => Poll::Pending, - } - } -} - -impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} - -/// Error returned by [`try_read`](Pipe::try_read). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryReadError { - /// No data could be read from the pipe because it is currently - /// empty, and reading would require blocking. - Empty, -} - -/// Error returned by [`try_write`](Pipe::try_write). -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum TryWriteError { - /// No data could be written to the pipe because it is - /// currently full, and writing would require blocking. - Full, -} - -struct PipeState { - buffer: RingBuffer, - read_waker: WakerRegistration, - write_waker: WakerRegistration, -} - -impl PipeState { - const fn new() -> Self { - PipeState { - buffer: RingBuffer::new(), - read_waker: WakerRegistration::new(), - write_waker: WakerRegistration::new(), - } - } - - fn clear(&mut self) { - self.buffer.clear(); - self.write_waker.wake(); - } - - fn try_read(&mut self, buf: &mut [u8]) -> Result { - self.try_read_with_context(None, buf) - } - - fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - if self.buffer.is_full() { - self.write_waker.wake(); - } - - let available = self.buffer.pop_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.read_waker.register(cx.waker()); - } - return Err(TryReadError::Empty); - } - - let n = available.len().min(buf.len()); - buf[..n].copy_from_slice(&available[..n]); - self.buffer.pop(n); - Ok(n) - } - - fn try_write(&mut self, buf: &[u8]) -> Result { - self.try_write_with_context(None, buf) - } - - fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - if self.buffer.is_empty() { - self.read_waker.wake(); - } - - let available = self.buffer.push_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.write_waker.register(cx.waker()); - } - return Err(TryWriteError::Full); - } - - let n = available.len().min(buf.len()); - available[..n].copy_from_slice(&buf[..n]); - self.buffer.push(n); - Ok(n) - } -} - -/// A bounded pipe for communicating between asynchronous tasks -/// with backpressure. -/// -/// The pipe will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `write` new messages will wait until a message is -/// read from the pipe. -/// -/// All data written will become available in the same order as it was written. -pub struct Pipe -where - M: RawMutex, -{ - inner: Mutex>>, -} - -impl Pipe -where - M: RawMutex, -{ - /// Establish a new bounded pipe. For example, to create one with a NoopMutex: - /// - /// ``` - /// use embassy_util::pipe::Pipe; - /// use embassy_util::blocking_mutex::raw::NoopRawMutex; - /// - /// // Declare a bounded pipe, with a buffer of 256 bytes. - /// let mut pipe = Pipe::::new(); - /// ``` - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(PipeState::new())), - } - } - - fn lock(&self, f: impl FnOnce(&mut PipeState) -> R) -> R { - self.inner.lock(|rc| f(&mut *rc.borrow_mut())) - } - - fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read_with_context(cx, buf)) - } - - fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - self.lock(|c| c.try_write_with_context(cx, buf)) - } - - /// Get a writer for this pipe. - pub fn writer(&self) -> Writer<'_, M, N> { - Writer { pipe: self } - } - - /// Get a reader for this pipe. - pub fn reader(&self) -> Reader<'_, M, N> { - Reader { pipe: self } - } - - /// Write a value, waiting until there is capacity. - /// - /// Writeing completes when the value has been pushed to the pipe's queue. - /// This doesn't mean the value has been read yet. - pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { - WriteFuture { pipe: self, buf } - } - - /// Attempt to immediately write a message. - /// - /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's - /// buffer is full, instead of waiting. - /// - /// # Errors - /// - /// If the pipe capacity has been reached, i.e., the pipe has `n` - /// buffered values where `n` is the argument passed to [`Pipe`], then an - /// error is returned. - pub fn try_write(&self, buf: &[u8]) -> Result { - self.lock(|c| c.try_write(buf)) - } - - /// Receive the next value. - /// - /// If there are no messages in the pipe's buffer, this method will - /// wait until a message is written. - pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { - ReadFuture { pipe: self, buf } - } - - /// Attempt to immediately read a message. - /// - /// This method will either read a message from the pipe immediately or return an error - /// if the pipe is empty. - pub fn try_read(&self, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read(buf)) - } - - /// Clear the data in the pipe's buffer. - pub fn clear(&self) { - self.lock(|c| c.clear()) - } - - /// Return whether the pipe is full (no free space in the buffer) - pub fn is_full(&self) -> bool { - self.len() == N - } - - /// Return whether the pipe is empty (no data buffered) - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Total byte capacity. - /// - /// This is the same as the `N` generic param. - pub fn capacity(&self) -> usize { - N - } - - /// Used byte capacity. - pub fn len(&self) -> usize { - self.lock(|c| c.buffer.len()) - } - - /// Free byte capacity. - /// - /// This is equivalent to `capacity() - len()` - pub fn free_capacity(&self) -> usize { - N - self.len() - } -} - -#[cfg(feature = "nightly")] -mod io_impls { - use core::convert::Infallible; - - use futures_util::FutureExt; - - use super::*; - - impl embedded_io::Io for Pipe { - type Error = Infallible; - } - - impl embedded_io::asynch::Read for Pipe { - type ReadFuture<'a> = impl Future> - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Pipe::read(self, buf).map(Ok) - } - } - - impl embedded_io::asynch::Write for Pipe { - type WriteFuture<'a> = impl Future> - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Pipe::write(self, buf).map(Ok) - } - - type FlushFuture<'a> = impl Future> - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) - } - } - - impl embedded_io::Io for &Pipe { - type Error = Infallible; - } - - impl embedded_io::asynch::Read for &Pipe { - type ReadFuture<'a> = impl Future> - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Pipe::read(self, buf).map(Ok) - } - } - - impl embedded_io::asynch::Write for &Pipe { - type WriteFuture<'a> = impl Future> - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Pipe::write(self, buf).map(Ok) - } - - type FlushFuture<'a> = impl Future> - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) - } - } - - impl embedded_io::Io for Reader<'_, M, N> { - type Error = Infallible; - } - - impl embedded_io::asynch::Read for Reader<'_, M, N> { - type ReadFuture<'a> = impl Future> - where - Self: 'a; - - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - Reader::read(self, buf).map(Ok) - } - } - - impl embedded_io::Io for Writer<'_, M, N> { - type Error = Infallible; - } - - impl embedded_io::asynch::Write for Writer<'_, M, N> { - type WriteFuture<'a> = impl Future> - where - Self: 'a; - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - Writer::write(self, buf).map(Ok) - } - - type FlushFuture<'a> = impl Future> - where - Self: 'a; - - fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - futures_util::future::ready(Ok(())) - } - } -} - -#[cfg(test)] -mod tests { - use futures_executor::ThreadPool; - use futures_util::task::SpawnExt; - use static_cell::StaticCell; - - use super::*; - use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - - fn capacity(c: &PipeState) -> usize { - N - c.buffer.len() - } - - #[test] - fn writing_once() { - let mut c = PipeState::<3>::new(); - assert!(c.try_write(&[1]).is_ok()); - assert_eq!(capacity(&c), 2); - } - - #[test] - fn writing_when_full() { - let mut c = PipeState::<3>::new(); - assert_eq!(c.try_write(&[42]), Ok(1)); - assert_eq!(c.try_write(&[43]), Ok(1)); - assert_eq!(c.try_write(&[44]), Ok(1)); - assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); - assert_eq!(capacity(&c), 0); - } - - #[test] - fn receiving_once_with_one_send() { - let mut c = PipeState::<3>::new(); - assert!(c.try_write(&[42]).is_ok()); - let mut buf = [0; 16]; - assert_eq!(c.try_read(&mut buf), Ok(1)); - assert_eq!(buf[0], 42); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn receiving_when_empty() { - let mut c = PipeState::<3>::new(); - let mut buf = [0; 16]; - assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); - assert_eq!(capacity(&c), 3); - } - - #[test] - fn simple_send_and_receive() { - let c = Pipe::::new(); - assert!(c.try_write(&[42]).is_ok()); - let mut buf = [0; 16]; - assert_eq!(c.try_read(&mut buf), Ok(1)); - assert_eq!(buf[0], 42); - } - - #[test] - fn cloning() { - let c = Pipe::::new(); - let r1 = c.reader(); - let w1 = c.writer(); - - let _ = r1.clone(); - let _ = w1.clone(); - } - - #[futures_test::test] - async fn receiver_receives_given_try_write_async() { - let executor = ThreadPool::new().unwrap(); - - static CHANNEL: StaticCell> = StaticCell::new(); - let c = &*CHANNEL.init(Pipe::new()); - let c2 = c; - let f = async move { - assert_eq!(c2.try_write(&[42]), Ok(1)); - }; - executor.spawn(f).unwrap(); - let mut buf = [0; 16]; - assert_eq!(c.read(&mut buf).await, 1); - assert_eq!(buf[0], 42); - } - - #[futures_test::test] - async fn sender_send_completes_if_capacity() { - let c = Pipe::::new(); - c.write(&[42]).await; - let mut buf = [0; 16]; - assert_eq!(c.read(&mut buf).await, 1); - assert_eq!(buf[0], 42); - } -} diff --git a/embassy-util/src/ring_buffer.rs b/embassy-util/src/ring_buffer.rs deleted file mode 100644 index 521084024..000000000 --- a/embassy-util/src/ring_buffer.rs +++ /dev/null @@ -1,146 +0,0 @@ -pub struct RingBuffer { - buf: [u8; N], - start: usize, - end: usize, - empty: bool, -} - -impl RingBuffer { - pub const fn new() -> Self { - Self { - buf: [0; N], - start: 0, - end: 0, - empty: true, - } - } - - pub fn push_buf(&mut self) -> &mut [u8] { - if self.start == self.end && !self.empty { - trace!(" ringbuf: push_buf empty"); - return &mut self.buf[..0]; - } - - let n = if self.start <= self.end { - self.buf.len() - self.end - } else { - self.start - self.end - }; - - trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); - &mut self.buf[self.end..self.end + n] - } - - pub fn push(&mut self, n: usize) { - trace!(" ringbuf: push {:?}", n); - if n == 0 { - return; - } - - self.end = self.wrap(self.end + n); - self.empty = false; - } - - pub fn pop_buf(&mut self) -> &mut [u8] { - if self.empty { - trace!(" ringbuf: pop_buf empty"); - return &mut self.buf[..0]; - } - - let n = if self.end <= self.start { - self.buf.len() - self.start - } else { - self.end - self.start - }; - - trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); - &mut self.buf[self.start..self.start + n] - } - - pub fn pop(&mut self, n: usize) { - trace!(" ringbuf: pop {:?}", n); - if n == 0 { - return; - } - - self.start = self.wrap(self.start + n); - self.empty = self.start == self.end; - } - - pub fn is_full(&self) -> bool { - self.start == self.end && !self.empty - } - - pub fn is_empty(&self) -> bool { - self.empty - } - - #[allow(unused)] - pub fn len(&self) -> usize { - if self.empty { - 0 - } else if self.start < self.end { - self.end - self.start - } else { - N + self.end - self.start - } - } - - pub fn clear(&mut self) { - self.start = 0; - self.end = 0; - self.empty = true; - } - - fn wrap(&self, n: usize) -> usize { - assert!(n <= self.buf.len()); - if n == self.buf.len() { - 0 - } else { - n - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn push_pop() { - let mut rb: RingBuffer<4> = RingBuffer::new(); - let buf = rb.push_buf(); - assert_eq!(4, buf.len()); - buf[0] = 1; - buf[1] = 2; - buf[2] = 3; - buf[3] = 4; - rb.push(4); - - let buf = rb.pop_buf(); - assert_eq!(4, buf.len()); - assert_eq!(1, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(3, buf.len()); - assert_eq!(2, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(2, buf.len()); - assert_eq!(3, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(1, buf.len()); - assert_eq!(4, buf[0]); - rb.pop(1); - - let buf = rb.pop_buf(); - assert_eq!(0, buf.len()); - - let buf = rb.push_buf(); - assert_eq!(4, buf.len()); - } -} diff --git a/embassy-util/src/select.rs b/embassy-util/src/select.rs deleted file mode 100644 index 8cecb7fa0..000000000 --- a/embassy-util/src/select.rs +++ /dev/null @@ -1,230 +0,0 @@ -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// Result for [`select`]. -#[derive(Debug, Clone)] -pub enum Either { - /// First future finished first. - First(A), - /// Second future finished first. - Second(B), -} - -/// Wait for one of two futures to complete. -/// -/// This function returns a new future which polls all the futures. -/// When one of them completes, it will complete with its result value. -/// -/// The other future is dropped. -pub fn select(a: A, b: B) -> Select -where - A: Future, - B: Future, -{ - Select { a, b } -} - -/// Future for the [`select`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Select { - a: A, - b: B, -} - -impl Unpin for Select {} - -impl Future for Select -where - A: Future, - B: Future, -{ - type Output = Either; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - let a = unsafe { Pin::new_unchecked(&mut this.a) }; - let b = unsafe { Pin::new_unchecked(&mut this.b) }; - if let Poll::Ready(x) = a.poll(cx) { - return Poll::Ready(Either::First(x)); - } - if let Poll::Ready(x) = b.poll(cx) { - return Poll::Ready(Either::Second(x)); - } - Poll::Pending - } -} - -// ==================================================================== - -/// Result for [`select3`]. -#[derive(Debug, Clone)] -pub enum Either3 { - /// First future finished first. - First(A), - /// Second future finished first. - Second(B), - /// Third future finished first. - Third(C), -} - -/// Same as [`select`], but with more futures. -pub fn select3(a: A, b: B, c: C) -> Select3 -where - A: Future, - B: Future, - C: Future, -{ - Select3 { a, b, c } -} - -/// Future for the [`select3`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Select3 { - a: A, - b: B, - c: C, -} - -impl Future for Select3 -where - A: Future, - B: Future, - C: Future, -{ - type Output = Either3; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - let a = unsafe { Pin::new_unchecked(&mut this.a) }; - let b = unsafe { Pin::new_unchecked(&mut this.b) }; - let c = unsafe { Pin::new_unchecked(&mut this.c) }; - if let Poll::Ready(x) = a.poll(cx) { - return Poll::Ready(Either3::First(x)); - } - if let Poll::Ready(x) = b.poll(cx) { - return Poll::Ready(Either3::Second(x)); - } - if let Poll::Ready(x) = c.poll(cx) { - return Poll::Ready(Either3::Third(x)); - } - Poll::Pending - } -} - -// ==================================================================== - -/// Result for [`select4`]. -#[derive(Debug, Clone)] -pub enum Either4 { - /// First future finished first. - First(A), - /// Second future finished first. - Second(B), - /// Third future finished first. - Third(C), - /// Fourth future finished first. - Fourth(D), -} - -/// Same as [`select`], but with more futures. -pub fn select4(a: A, b: B, c: C, d: D) -> Select4 -where - A: Future, - B: Future, - C: Future, - D: Future, -{ - Select4 { a, b, c, d } -} - -/// Future for the [`select4`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Select4 { - a: A, - b: B, - c: C, - d: D, -} - -impl Future for Select4 -where - A: Future, - B: Future, - C: Future, - D: Future, -{ - type Output = Either4; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; - let a = unsafe { Pin::new_unchecked(&mut this.a) }; - let b = unsafe { Pin::new_unchecked(&mut this.b) }; - let c = unsafe { Pin::new_unchecked(&mut this.c) }; - let d = unsafe { Pin::new_unchecked(&mut this.d) }; - if let Poll::Ready(x) = a.poll(cx) { - return Poll::Ready(Either4::First(x)); - } - if let Poll::Ready(x) = b.poll(cx) { - return Poll::Ready(Either4::Second(x)); - } - if let Poll::Ready(x) = c.poll(cx) { - return Poll::Ready(Either4::Third(x)); - } - if let Poll::Ready(x) = d.poll(cx) { - return Poll::Ready(Either4::Fourth(x)); - } - Poll::Pending - } -} - -// ==================================================================== - -/// Future for the [`select_all`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct SelectAll { - inner: [Fut; N], -} - -/// Creates a new future which will select over a list of futures. -/// -/// The returned future will wait for any future within `iter` to be ready. Upon -/// completion the item resolved will be returned, along with the index of the -/// future that was ready. -/// -/// # Panics -/// -/// This function will panic if the array specified contains no items. -pub fn select_all(arr: [Fut; N]) -> SelectAll { - assert!(N > 0); - SelectAll { inner: arr } -} - -impl Future for SelectAll { - type Output = (Fut::Output, usize); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Safety: Since `self` is pinned, `inner` cannot move. Since `inner` cannot move, - // its elements also cannot move. Therefore it is safe to access `inner` and pin - // references to the contained futures. - let item = unsafe { - self.get_unchecked_mut() - .inner - .iter_mut() - .enumerate() - .find_map(|(i, f)| match Pin::new_unchecked(f).poll(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - }) - }; - - match item { - Some((idx, res)) => Poll::Ready((res, idx)), - None => Poll::Pending, - } - } -} diff --git a/embassy-util/src/waitqueue/mod.rs b/embassy-util/src/waitqueue/mod.rs deleted file mode 100644 index 6661a6b61..000000000 --- a/embassy-util/src/waitqueue/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Async low-level wait queues - -mod waker; -pub use waker::*; - -mod multi_waker; -pub use multi_waker::*; diff --git a/embassy-util/src/waitqueue/multi_waker.rs b/embassy-util/src/waitqueue/multi_waker.rs deleted file mode 100644 index 325d2cb3a..000000000 --- a/embassy-util/src/waitqueue/multi_waker.rs +++ /dev/null @@ -1,33 +0,0 @@ -use core::task::Waker; - -use super::WakerRegistration; - -/// Utility struct to register and wake multiple wakers. -pub struct MultiWakerRegistration { - wakers: [WakerRegistration; N], -} - -impl MultiWakerRegistration { - /// Create a new empty instance - pub const fn new() -> Self { - const WAKER: WakerRegistration = WakerRegistration::new(); - Self { wakers: [WAKER; N] } - } - - /// Register a waker. If the buffer is full the function returns it in the error - pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { - if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { - waker_slot.register(w); - Ok(()) - } else { - Err(w) - } - } - - /// Wake all registered wakers. This clears the buffer - pub fn wake(&mut self) { - for waker_slot in self.wakers.iter_mut() { - waker_slot.wake() - } - } -} diff --git a/embassy-util/src/waitqueue/waker.rs b/embassy-util/src/waitqueue/waker.rs deleted file mode 100644 index 64e300eb8..000000000 --- a/embassy-util/src/waitqueue/waker.rs +++ /dev/null @@ -1,92 +0,0 @@ -use core::cell::Cell; -use core::mem; -use core::task::Waker; - -use crate::blocking_mutex::raw::CriticalSectionRawMutex; -use crate::blocking_mutex::Mutex; - -/// Utility struct to register and wake a waker. -#[derive(Debug)] -pub struct WakerRegistration { - waker: Option, -} - -impl WakerRegistration { - /// Create a new `WakerRegistration`. - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(w)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = mem::replace(&mut self.waker, Some(w.clone())) { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - w.wake() - } - } - - /// Returns true if a waker is currently registered - pub fn occupied(&self) -> bool { - self.waker.is_some() - } -} - -/// Utility struct to register and wake a waker. -pub struct AtomicWaker { - waker: Mutex>>, -} - -impl AtomicWaker { - /// Create a new `AtomicWaker`. - pub const fn new() -> Self { - Self { - waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), - } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&self, w: &Waker) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - cell.set(match cell.replace(None) { - Some(w2) if (w2.will_wake(w)) => Some(w2), - _ => Some(w.clone()), - }) - }) - } - - /// Wake the registered waker, if any. - pub fn wake(&self) { - critical_section::with(|cs| { - let cell = self.waker.borrow(cs); - if let Some(w) = cell.replace(None) { - w.wake_by_ref(); - cell.set(Some(w)); - } - }) - } -} diff --git a/embassy-util/src/yield_now.rs b/embassy-util/src/yield_now.rs deleted file mode 100644 index 1ebecb916..000000000 --- a/embassy-util/src/yield_now.rs +++ /dev/null @@ -1,25 +0,0 @@ -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// Yield from the current task once, allowing other tasks to run. -pub fn yield_now() -> impl Future { - YieldNowFuture { yielded: false } -} - -struct YieldNowFuture { - yielded: bool, -} - -impl Future for YieldNowFuture { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.yielded { - Poll::Ready(()) - } else { - self.yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} -- cgit