diff options
Diffstat (limited to 'embassy-sync/src')
| -rw-r--r-- | embassy-sync/src/channel.rs | 4 | ||||
| -rw-r--r-- | embassy-sync/src/fmt.rs | 3 | ||||
| -rw-r--r-- | embassy-sync/src/lib.rs | 2 | ||||
| -rw-r--r-- | embassy-sync/src/mutex.rs | 19 | ||||
| -rw-r--r-- | embassy-sync/src/pipe.rs | 155 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/mod.rs | 186 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/publisher.rs | 17 | ||||
| -rw-r--r-- | embassy-sync/src/pubsub/subscriber.rs | 6 | ||||
| -rw-r--r-- | embassy-sync/src/signal.rs | 90 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/atomic_waker.rs | 41 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/atomic_waker_turbo.rs | 30 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/mod.rs | 8 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/multi_waker.rs | 49 | ||||
| -rw-r--r-- | embassy-sync/src/waitqueue/waker_registration.rs (renamed from embassy-sync/src/waitqueue/waker.rs) | 42 |
14 files changed, 441 insertions, 211 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 76f42d0e7..77352874d 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -181,6 +181,7 @@ where | |||
| 181 | } | 181 | } |
| 182 | 182 | ||
| 183 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. | 183 | /// Future returned by [`Channel::recv`] and [`Receiver::recv`]. |
| 184 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 184 | pub struct RecvFuture<'ch, M, T, const N: usize> | 185 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 185 | where | 186 | where |
| 186 | M: RawMutex, | 187 | M: RawMutex, |
| @@ -203,6 +204,7 @@ where | |||
| 203 | } | 204 | } |
| 204 | 205 | ||
| 205 | /// Future returned by [`DynamicReceiver::recv`]. | 206 | /// Future returned by [`DynamicReceiver::recv`]. |
| 207 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 206 | pub struct DynamicRecvFuture<'ch, T> { | 208 | pub struct DynamicRecvFuture<'ch, T> { |
| 207 | channel: &'ch dyn DynamicChannel<T>, | 209 | channel: &'ch dyn DynamicChannel<T>, |
| 208 | } | 210 | } |
| @@ -219,6 +221,7 @@ impl<'ch, T> Future for DynamicRecvFuture<'ch, T> { | |||
| 219 | } | 221 | } |
| 220 | 222 | ||
| 221 | /// Future returned by [`Channel::send`] and [`Sender::send`]. | 223 | /// Future returned by [`Channel::send`] and [`Sender::send`]. |
| 224 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 222 | pub struct SendFuture<'ch, M, T, const N: usize> | 225 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 223 | where | 226 | where |
| 224 | M: RawMutex, | 227 | M: RawMutex, |
| @@ -250,6 +253,7 @@ where | |||
| 250 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} | 253 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {} |
| 251 | 254 | ||
| 252 | /// Future returned by [`DynamicSender::send`]. | 255 | /// Future returned by [`DynamicSender::send`]. |
| 256 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 253 | pub struct DynamicSendFuture<'ch, T> { | 257 | pub struct DynamicSendFuture<'ch, T> { |
| 254 | channel: &'ch dyn DynamicChannel<T>, | 258 | channel: &'ch dyn DynamicChannel<T>, |
| 255 | message: Option<T>, | 259 | message: Option<T>, |
diff --git a/embassy-sync/src/fmt.rs b/embassy-sync/src/fmt.rs index f8bb0a035..066970813 100644 --- a/embassy-sync/src/fmt.rs +++ b/embassy-sync/src/fmt.rs | |||
| @@ -195,9 +195,6 @@ macro_rules! unwrap { | |||
| 195 | } | 195 | } |
| 196 | } | 196 | } |
| 197 | 197 | ||
| 198 | #[cfg(feature = "defmt-timestamp-uptime")] | ||
| 199 | defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } | ||
| 200 | |||
| 201 | #[derive(Debug, Copy, Clone, Eq, PartialEq)] | 198 | #[derive(Debug, Copy, Clone, Eq, PartialEq)] |
| 202 | pub struct NoneError; | 199 | pub struct NoneError; |
| 203 | 200 | ||
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 25150e8aa..53d95d081 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] | 1 | #![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] |
| 2 | #![cfg_attr(feature = "nightly", feature(generic_associated_types, type_alias_impl_trait))] | 2 | #![cfg_attr(feature = "nightly", feature(async_fn_in_trait, impl_trait_projections))] |
| 3 | #![allow(clippy::new_without_default)] | 3 | #![allow(clippy::new_without_default)] |
| 4 | #![doc = include_str!("../README.md")] | 4 | #![doc = include_str!("../README.md")] |
| 5 | #![warn(missing_docs)] | 5 | #![warn(missing_docs)] |
diff --git a/embassy-sync/src/mutex.rs b/embassy-sync/src/mutex.rs index 75a6e8dd3..fcf056d36 100644 --- a/embassy-sync/src/mutex.rs +++ b/embassy-sync/src/mutex.rs | |||
| @@ -2,11 +2,10 @@ | |||
| 2 | //! | 2 | //! |
| 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. | 3 | //! This module provides a mutex that can be used to synchronize data between asynchronous tasks. |
| 4 | use core::cell::{RefCell, UnsafeCell}; | 4 | use core::cell::{RefCell, UnsafeCell}; |
| 5 | use core::future::poll_fn; | ||
| 5 | use core::ops::{Deref, DerefMut}; | 6 | use core::ops::{Deref, DerefMut}; |
| 6 | use core::task::Poll; | 7 | use core::task::Poll; |
| 7 | 8 | ||
| 8 | use futures_util::future::poll_fn; | ||
| 9 | |||
| 10 | use crate::blocking_mutex::raw::RawMutex; | 9 | use crate::blocking_mutex::raw::RawMutex; |
| 11 | use crate::blocking_mutex::Mutex as BlockingMutex; | 10 | use crate::blocking_mutex::Mutex as BlockingMutex; |
| 12 | use crate::waitqueue::WakerRegistration; | 11 | use crate::waitqueue::WakerRegistration; |
| @@ -111,6 +110,22 @@ where | |||
| 111 | 110 | ||
| 112 | Ok(MutexGuard { mutex: self }) | 111 | Ok(MutexGuard { mutex: self }) |
| 113 | } | 112 | } |
| 113 | |||
| 114 | /// Consumes this mutex, returning the underlying data. | ||
| 115 | pub fn into_inner(self) -> T | ||
| 116 | where | ||
| 117 | T: Sized, | ||
| 118 | { | ||
| 119 | self.inner.into_inner() | ||
| 120 | } | ||
| 121 | |||
| 122 | /// Returns a mutable reference to the underlying data. | ||
| 123 | /// | ||
| 124 | /// Since this call borrows the Mutex mutably, no actual locking needs to | ||
| 125 | /// take place -- the mutable borrow statically guarantees no locks exist. | ||
| 126 | pub fn get_mut(&mut self) -> &mut T { | ||
| 127 | self.inner.get_mut() | ||
| 128 | } | ||
| 114 | } | 129 | } |
| 115 | 130 | ||
| 116 | /// Async mutex guard. | 131 | /// Async mutex guard. |
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 7d64b648e..13bf4ef01 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs | |||
| @@ -32,22 +32,23 @@ impl<'p, M, const N: usize> Writer<'p, M, N> | |||
| 32 | where | 32 | where |
| 33 | M: RawMutex, | 33 | M: RawMutex, |
| 34 | { | 34 | { |
| 35 | /// Writes a value. | 35 | /// Write some bytes to the pipe. |
| 36 | /// | 36 | /// |
| 37 | /// See [`Pipe::write()`] | 37 | /// See [`Pipe::write()`] |
| 38 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | 38 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { |
| 39 | self.pipe.write(buf) | 39 | self.pipe.write(buf) |
| 40 | } | 40 | } |
| 41 | 41 | ||
| 42 | /// Attempt to immediately write a message. | 42 | /// Attempt to immediately write some bytes to the pipe. |
| 43 | /// | 43 | /// |
| 44 | /// See [`Pipe::write()`] | 44 | /// See [`Pipe::try_write()`] |
| 45 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | 45 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { |
| 46 | self.pipe.try_write(buf) | 46 | self.pipe.try_write(buf) |
| 47 | } | 47 | } |
| 48 | } | 48 | } |
| 49 | 49 | ||
| 50 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. | 50 | /// Future returned by [`Pipe::write`] and [`Writer::write`]. |
| 51 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 51 | pub struct WriteFuture<'p, M, const N: usize> | 52 | pub struct WriteFuture<'p, M, const N: usize> |
| 52 | where | 53 | where |
| 53 | M: RawMutex, | 54 | M: RawMutex, |
| @@ -94,22 +95,23 @@ impl<'p, M, const N: usize> Reader<'p, M, N> | |||
| 94 | where | 95 | where |
| 95 | M: RawMutex, | 96 | M: RawMutex, |
| 96 | { | 97 | { |
| 97 | /// Reads a value. | 98 | /// Read some bytes from the pipe. |
| 98 | /// | 99 | /// |
| 99 | /// See [`Pipe::read()`] | 100 | /// See [`Pipe::read()`] |
| 100 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | 101 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { |
| 101 | self.pipe.read(buf) | 102 | self.pipe.read(buf) |
| 102 | } | 103 | } |
| 103 | 104 | ||
| 104 | /// Attempt to immediately read a message. | 105 | /// Attempt to immediately read some bytes from the pipe. |
| 105 | /// | 106 | /// |
| 106 | /// See [`Pipe::read()`] | 107 | /// See [`Pipe::try_read()`] |
| 107 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | 108 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { |
| 108 | self.pipe.try_read(buf) | 109 | self.pipe.try_read(buf) |
| 109 | } | 110 | } |
| 110 | } | 111 | } |
| 111 | 112 | ||
| 112 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | 113 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. |
| 114 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 113 | pub struct ReadFuture<'p, M, const N: usize> | 115 | pub struct ReadFuture<'p, M, const N: usize> |
| 114 | where | 116 | where |
| 115 | M: RawMutex, | 117 | M: RawMutex, |
| @@ -219,12 +221,11 @@ impl<const N: usize> PipeState<N> { | |||
| 219 | } | 221 | } |
| 220 | } | 222 | } |
| 221 | 223 | ||
| 222 | /// A bounded pipe for communicating between asynchronous tasks | 224 | /// A bounded byte-oriented pipe for communicating between asynchronous tasks |
| 223 | /// with backpressure. | 225 | /// with backpressure. |
| 224 | /// | 226 | /// |
| 225 | /// The pipe will buffer up to the provided number of messages. Once the | 227 | /// The pipe will buffer up to the provided number of bytes. Once the |
| 226 | /// buffer is full, attempts to `write` new messages will wait until a message is | 228 | /// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up. |
| 227 | /// read from the pipe. | ||
| 228 | /// | 229 | /// |
| 229 | /// All data written will become available in the same order as it was written. | 230 | /// All data written will become available in the same order as it was written. |
| 230 | pub struct Pipe<M, const N: usize> | 231 | pub struct Pipe<M, const N: usize> |
| @@ -275,40 +276,66 @@ where | |||
| 275 | Reader { pipe: self } | 276 | Reader { pipe: self } |
| 276 | } | 277 | } |
| 277 | 278 | ||
| 278 | /// Write a value, waiting until there is capacity. | 279 | /// Write some bytes to the pipe. |
| 280 | /// | ||
| 281 | /// This method writes a nonzero amount of bytes from `buf` into the pipe, and | ||
| 282 | /// returns the amount of bytes written. | ||
| 279 | /// | 283 | /// |
| 280 | /// Writeing completes when the value has been pushed to the pipe's queue. | 284 | /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full, |
| 281 | /// This doesn't mean the value has been read yet. | 285 | /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that |
| 286 | /// returns an error instead of waiting. | ||
| 287 | /// | ||
| 288 | /// It is not guaranteed that all bytes in the buffer are written, even if there's enough | ||
| 289 | /// free space in the pipe buffer for all. In other words, it is possible for `write` to return | ||
| 290 | /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave | ||
| 291 | /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like | ||
| 292 | /// `write_all` from the `embedded-io` crate. | ||
| 282 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { | 293 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> { |
| 283 | WriteFuture { pipe: self, buf } | 294 | WriteFuture { pipe: self, buf } |
| 284 | } | 295 | } |
| 285 | 296 | ||
| 286 | /// Attempt to immediately write a message. | 297 | /// Write all bytes to the pipe. |
| 287 | /// | ||
| 288 | /// This method differs from [`write`](Pipe::write) by returning immediately if the pipe's | ||
| 289 | /// buffer is full, instead of waiting. | ||
| 290 | /// | 298 | /// |
| 291 | /// # Errors | 299 | /// This method writes all bytes from `buf` into the pipe |
| 300 | pub async fn write_all(&self, mut buf: &[u8]) { | ||
| 301 | while !buf.is_empty() { | ||
| 302 | let n = self.write(buf).await; | ||
| 303 | buf = &buf[n..]; | ||
| 304 | } | ||
| 305 | } | ||
| 306 | |||
| 307 | /// Attempt to immediately write some bytes to the pipe. | ||
| 292 | /// | 308 | /// |
| 293 | /// If the pipe capacity has been reached, i.e., the pipe has `n` | 309 | /// This method will either write a nonzero amount of bytes to the pipe immediately, |
| 294 | /// buffered values where `n` is the argument passed to [`Pipe`], then an | 310 | /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant |
| 295 | /// error is returned. | 311 | /// that waits instead of returning an error. |
| 296 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | 312 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { |
| 297 | self.lock(|c| c.try_write(buf)) | 313 | self.lock(|c| c.try_write(buf)) |
| 298 | } | 314 | } |
| 299 | 315 | ||
| 300 | /// Receive the next value. | 316 | /// Read some bytes from the pipe. |
| 317 | /// | ||
| 318 | /// This method reads a nonzero amount of bytes from the pipe into `buf` and | ||
| 319 | /// returns the amount of bytes read. | ||
| 320 | /// | ||
| 321 | /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty, | ||
| 322 | /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that | ||
| 323 | /// returns an error instead of waiting. | ||
| 301 | /// | 324 | /// |
| 302 | /// If there are no messages in the pipe's buffer, this method will | 325 | /// It is not guaranteed that all bytes in the buffer are read, even if there's enough |
| 303 | /// wait until a message is written. | 326 | /// space in `buf` for all. In other words, it is possible for `read` to return |
| 327 | /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes | ||
| 328 | /// in the pipe buffer. You should always `read` in a loop, or use helpers like | ||
| 329 | /// `read_exact` from the `embedded-io` crate. | ||
| 304 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { | 330 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> { |
| 305 | ReadFuture { pipe: self, buf } | 331 | ReadFuture { pipe: self, buf } |
| 306 | } | 332 | } |
| 307 | 333 | ||
| 308 | /// Attempt to immediately read a message. | 334 | /// Attempt to immediately read some bytes from the pipe. |
| 309 | /// | 335 | /// |
| 310 | /// This method will either read a message from the pipe immediately or return an error | 336 | /// This method will either read a nonzero amount of bytes from the pipe immediately, |
| 311 | /// if the pipe is empty. | 337 | /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant |
| 338 | /// that waits instead of returning an error. | ||
| 312 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | 339 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { |
| 313 | self.lock(|c| c.try_read(buf)) | 340 | self.lock(|c| c.try_read(buf)) |
| 314 | } | 341 | } |
| @@ -352,8 +379,6 @@ where | |||
| 352 | mod io_impls { | 379 | mod io_impls { |
| 353 | use core::convert::Infallible; | 380 | use core::convert::Infallible; |
| 354 | 381 | ||
| 355 | use futures_util::FutureExt; | ||
| 356 | |||
| 357 | use super::*; | 382 | use super::*; |
| 358 | 383 | ||
| 359 | impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { | 384 | impl<M: RawMutex, const N: usize> embedded_io::Io for Pipe<M, N> { |
| @@ -361,30 +386,18 @@ mod io_impls { | |||
| 361 | } | 386 | } |
| 362 | 387 | ||
| 363 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { | 388 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Pipe<M, N> { |
| 364 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 389 | async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 365 | where | 390 | Ok(Pipe::read(self, buf).await) |
| 366 | Self: 'a; | ||
| 367 | |||
| 368 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 369 | Pipe::read(self, buf).map(Ok) | ||
| 370 | } | 391 | } |
| 371 | } | 392 | } |
| 372 | 393 | ||
| 373 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { | 394 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Pipe<M, N> { |
| 374 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 395 | async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 375 | where | 396 | Ok(Pipe::write(self, buf).await) |
| 376 | Self: 'a; | ||
| 377 | |||
| 378 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 379 | Pipe::write(self, buf).map(Ok) | ||
| 380 | } | 397 | } |
| 381 | 398 | ||
| 382 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | 399 | async fn flush(&mut self) -> Result<(), Self::Error> { |
| 383 | where | 400 | Ok(()) |
| 384 | Self: 'a; | ||
| 385 | |||
| 386 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 387 | futures_util::future::ready(Ok(())) | ||
| 388 | } | 401 | } |
| 389 | } | 402 | } |
| 390 | 403 | ||
| @@ -393,30 +406,18 @@ mod io_impls { | |||
| 393 | } | 406 | } |
| 394 | 407 | ||
| 395 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { | 408 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for &Pipe<M, N> { |
| 396 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 409 | async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 397 | where | 410 | Ok(Pipe::read(self, buf).await) |
| 398 | Self: 'a; | ||
| 399 | |||
| 400 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 401 | Pipe::read(self, buf).map(Ok) | ||
| 402 | } | 411 | } |
| 403 | } | 412 | } |
| 404 | 413 | ||
| 405 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { | 414 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for &Pipe<M, N> { |
| 406 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 415 | async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 407 | where | 416 | Ok(Pipe::write(self, buf).await) |
| 408 | Self: 'a; | ||
| 409 | |||
| 410 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 411 | Pipe::write(self, buf).map(Ok) | ||
| 412 | } | 417 | } |
| 413 | 418 | ||
| 414 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | 419 | async fn flush(&mut self) -> Result<(), Self::Error> { |
| 415 | where | 420 | Ok(()) |
| 416 | Self: 'a; | ||
| 417 | |||
| 418 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 419 | futures_util::future::ready(Ok(())) | ||
| 420 | } | 421 | } |
| 421 | } | 422 | } |
| 422 | 423 | ||
| @@ -425,12 +426,8 @@ mod io_impls { | |||
| 425 | } | 426 | } |
| 426 | 427 | ||
| 427 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { | 428 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Read for Reader<'_, M, N> { |
| 428 | type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 429 | async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 429 | where | 430 | Ok(Reader::read(self, buf).await) |
| 430 | Self: 'a; | ||
| 431 | |||
| 432 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||
| 433 | Reader::read(self, buf).map(Ok) | ||
| 434 | } | 431 | } |
| 435 | } | 432 | } |
| 436 | 433 | ||
| @@ -439,20 +436,12 @@ mod io_impls { | |||
| 439 | } | 436 | } |
| 440 | 437 | ||
| 441 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { | 438 | impl<M: RawMutex, const N: usize> embedded_io::asynch::Write for Writer<'_, M, N> { |
| 442 | type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | 439 | async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 443 | where | 440 | Ok(Writer::write(self, buf).await) |
| 444 | Self: 'a; | ||
| 445 | |||
| 446 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||
| 447 | Writer::write(self, buf).map(Ok) | ||
| 448 | } | 441 | } |
| 449 | 442 | ||
| 450 | type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | 443 | async fn flush(&mut self) -> Result<(), Self::Error> { |
| 451 | where | 444 | Ok(()) |
| 452 | Self: 'a; | ||
| 453 | |||
| 454 | fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||
| 455 | futures_util::future::ready(Ok(())) | ||
| 456 | } | 445 | } |
| 457 | } | 446 | } |
| 458 | } | 447 | } |
diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 62a9e4763..6afd54af5 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs | |||
| @@ -4,7 +4,7 @@ | |||
| 4 | 4 | ||
| 5 | use core::cell::RefCell; | 5 | use core::cell::RefCell; |
| 6 | use core::fmt::Debug; | 6 | use core::fmt::Debug; |
| 7 | use core::task::{Context, Poll, Waker}; | 7 | use core::task::{Context, Poll}; |
| 8 | 8 | ||
| 9 | use heapless::Deque; | 9 | use heapless::Deque; |
| 10 | 10 | ||
| @@ -179,7 +179,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 179 | // No, so we need to reregister our waker and sleep again | 179 | // No, so we need to reregister our waker and sleep again |
| 180 | None => { | 180 | None => { |
| 181 | if let Some(cx) = cx { | 181 | if let Some(cx) = cx { |
| 182 | s.register_subscriber_waker(cx.waker()); | 182 | s.subscriber_wakers.register(cx.waker()); |
| 183 | } | 183 | } |
| 184 | Poll::Pending | 184 | Poll::Pending |
| 185 | } | 185 | } |
| @@ -192,6 +192,10 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 192 | }) | 192 | }) |
| 193 | } | 193 | } |
| 194 | 194 | ||
| 195 | fn available(&self, next_message_id: u64) -> u64 { | ||
| 196 | self.inner.lock(|s| s.borrow().next_message_id - next_message_id) | ||
| 197 | } | ||
| 198 | |||
| 195 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { | 199 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 196 | self.inner.lock(|s| { | 200 | self.inner.lock(|s| { |
| 197 | let mut s = s.borrow_mut(); | 201 | let mut s = s.borrow_mut(); |
| @@ -202,7 +206,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 202 | // The queue is full, so we need to reregister our waker and go to sleep | 206 | // The queue is full, so we need to reregister our waker and go to sleep |
| 203 | Err(message) => { | 207 | Err(message) => { |
| 204 | if let Some(cx) = cx { | 208 | if let Some(cx) = cx { |
| 205 | s.register_publisher_waker(cx.waker()); | 209 | s.publisher_wakers.register(cx.waker()); |
| 206 | } | 210 | } |
| 207 | Err(message) | 211 | Err(message) |
| 208 | } | 212 | } |
| @@ -217,6 +221,13 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 217 | }) | 221 | }) |
| 218 | } | 222 | } |
| 219 | 223 | ||
| 224 | fn space(&self) -> usize { | ||
| 225 | self.inner.lock(|s| { | ||
| 226 | let s = s.borrow(); | ||
| 227 | s.queue.capacity() - s.queue.len() | ||
| 228 | }) | ||
| 229 | } | ||
| 230 | |||
| 220 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { | 231 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 221 | self.inner.lock(|s| { | 232 | self.inner.lock(|s| { |
| 222 | let mut s = s.borrow_mut(); | 233 | let mut s = s.borrow_mut(); |
| @@ -311,44 +322,19 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 311 | 322 | ||
| 312 | // We're reading this item, so decrement the counter | 323 | // We're reading this item, so decrement the counter |
| 313 | queue_item.1 -= 1; | 324 | queue_item.1 -= 1; |
| 314 | let message = queue_item.0.clone(); | ||
| 315 | 325 | ||
| 316 | if current_message_index == 0 && queue_item.1 == 0 { | 326 | let message = if current_message_index == 0 && queue_item.1 == 0 { |
| 317 | self.queue.pop_front(); | 327 | let (message, _) = self.queue.pop_front().unwrap(); |
| 318 | self.publisher_wakers.wake(); | 328 | self.publisher_wakers.wake(); |
| 319 | } | 329 | // Return pop'd message without clone |
| 330 | message | ||
| 331 | } else { | ||
| 332 | queue_item.0.clone() | ||
| 333 | }; | ||
| 320 | 334 | ||
| 321 | Some(WaitResult::Message(message)) | 335 | Some(WaitResult::Message(message)) |
| 322 | } | 336 | } |
| 323 | 337 | ||
| 324 | fn register_subscriber_waker(&mut self, waker: &Waker) { | ||
| 325 | match self.subscriber_wakers.register(waker) { | ||
| 326 | Ok(()) => {} | ||
| 327 | Err(_) => { | ||
| 328 | // All waker slots were full. This can only happen when there was a subscriber that now has dropped. | ||
| 329 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 330 | // Any future that is still active will simply reregister. | ||
| 331 | // This won't happen a lot, so it's ok. | ||
| 332 | self.subscriber_wakers.wake(); | ||
| 333 | self.subscriber_wakers.register(waker).unwrap(); | ||
| 334 | } | ||
| 335 | } | ||
| 336 | } | ||
| 337 | |||
| 338 | fn register_publisher_waker(&mut self, waker: &Waker) { | ||
| 339 | match self.publisher_wakers.register(waker) { | ||
| 340 | Ok(()) => {} | ||
| 341 | Err(_) => { | ||
| 342 | // All waker slots were full. This can only happen when there was a publisher that now has dropped. | ||
| 343 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 344 | // Any future that is still active will simply reregister. | ||
| 345 | // This won't happen a lot, so it's ok. | ||
| 346 | self.publisher_wakers.wake(); | ||
| 347 | self.publisher_wakers.register(waker).unwrap(); | ||
| 348 | } | ||
| 349 | } | ||
| 350 | } | ||
| 351 | |||
| 352 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { | 338 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { |
| 353 | self.subscriber_count -= 1; | 339 | self.subscriber_count -= 1; |
| 354 | 340 | ||
| @@ -360,6 +346,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 360 | .iter_mut() | 346 | .iter_mut() |
| 361 | .skip(current_message_index) | 347 | .skip(current_message_index) |
| 362 | .for_each(|(_, counter)| *counter -= 1); | 348 | .for_each(|(_, counter)| *counter -= 1); |
| 349 | |||
| 350 | let mut wake_publishers = false; | ||
| 351 | while let Some((_, count)) = self.queue.front() { | ||
| 352 | if *count == 0 { | ||
| 353 | self.queue.pop_front().unwrap(); | ||
| 354 | wake_publishers = true; | ||
| 355 | } else { | ||
| 356 | break; | ||
| 357 | } | ||
| 358 | } | ||
| 359 | |||
| 360 | if wake_publishers { | ||
| 361 | self.publisher_wakers.wake(); | ||
| 362 | } | ||
| 363 | } | 363 | } |
| 364 | } | 364 | } |
| 365 | 365 | ||
| @@ -388,6 +388,10 @@ pub trait PubSubBehavior<T> { | |||
| 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. | 388 | /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. |
| 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; | 389 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 390 | 390 | ||
| 391 | /// Get the amount of messages that are between the given the next_message_id and the most recent message. | ||
| 392 | /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged. | ||
| 393 | fn available(&self, next_message_id: u64) -> u64; | ||
| 394 | |||
| 391 | /// Try to publish a message to the queue. | 395 | /// Try to publish a message to the queue. |
| 392 | /// | 396 | /// |
| 393 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. | 397 | /// If the queue is full and a context is given, then its waker is registered in the publisher wakers. |
| @@ -396,6 +400,9 @@ pub trait PubSubBehavior<T> { | |||
| 396 | /// Publish a message immediately | 400 | /// Publish a message immediately |
| 397 | fn publish_immediate(&self, message: T); | 401 | fn publish_immediate(&self, message: T); |
| 398 | 402 | ||
| 403 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 404 | fn space(&self) -> usize; | ||
| 405 | |||
| 399 | /// Let the channel know that a subscriber has dropped | 406 | /// Let the channel know that a subscriber has dropped |
| 400 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); | 407 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 401 | 408 | ||
| @@ -539,4 +546,113 @@ mod tests { | |||
| 539 | 546 | ||
| 540 | drop(sub0); | 547 | drop(sub0); |
| 541 | } | 548 | } |
| 549 | |||
| 550 | #[futures_test::test] | ||
| 551 | async fn correct_available() { | ||
| 552 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 553 | |||
| 554 | let sub0 = channel.subscriber().unwrap(); | ||
| 555 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 556 | let pub0 = channel.publisher().unwrap(); | ||
| 557 | |||
| 558 | assert_eq!(sub0.available(), 0); | ||
| 559 | assert_eq!(sub1.available(), 0); | ||
| 560 | |||
| 561 | pub0.publish(42).await; | ||
| 562 | |||
| 563 | assert_eq!(sub0.available(), 1); | ||
| 564 | assert_eq!(sub1.available(), 1); | ||
| 565 | |||
| 566 | sub1.next_message().await; | ||
| 567 | |||
| 568 | assert_eq!(sub1.available(), 0); | ||
| 569 | |||
| 570 | pub0.publish(42).await; | ||
| 571 | |||
| 572 | assert_eq!(sub0.available(), 2); | ||
| 573 | assert_eq!(sub1.available(), 1); | ||
| 574 | } | ||
| 575 | |||
| 576 | #[futures_test::test] | ||
| 577 | async fn correct_space() { | ||
| 578 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 579 | |||
| 580 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 581 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 582 | let pub0 = channel.publisher().unwrap(); | ||
| 583 | |||
| 584 | assert_eq!(pub0.space(), 4); | ||
| 585 | |||
| 586 | pub0.publish(42).await; | ||
| 587 | |||
| 588 | assert_eq!(pub0.space(), 3); | ||
| 589 | |||
| 590 | pub0.publish(42).await; | ||
| 591 | |||
| 592 | assert_eq!(pub0.space(), 2); | ||
| 593 | |||
| 594 | sub0.next_message().await; | ||
| 595 | sub0.next_message().await; | ||
| 596 | |||
| 597 | assert_eq!(pub0.space(), 2); | ||
| 598 | |||
| 599 | sub1.next_message().await; | ||
| 600 | assert_eq!(pub0.space(), 3); | ||
| 601 | sub1.next_message().await; | ||
| 602 | assert_eq!(pub0.space(), 4); | ||
| 603 | } | ||
| 604 | |||
| 605 | #[futures_test::test] | ||
| 606 | async fn empty_channel_when_last_subscriber_is_dropped() { | ||
| 607 | let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new(); | ||
| 608 | |||
| 609 | let pub0 = channel.publisher().unwrap(); | ||
| 610 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 611 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 612 | |||
| 613 | assert_eq!(4, pub0.space()); | ||
| 614 | |||
| 615 | pub0.publish(1).await; | ||
| 616 | pub0.publish(2).await; | ||
| 617 | |||
| 618 | assert_eq!(2, channel.space()); | ||
| 619 | |||
| 620 | assert_eq!(1, sub0.try_next_message_pure().unwrap()); | ||
| 621 | assert_eq!(2, sub0.try_next_message_pure().unwrap()); | ||
| 622 | |||
| 623 | assert_eq!(2, channel.space()); | ||
| 624 | |||
| 625 | drop(sub0); | ||
| 626 | |||
| 627 | assert_eq!(2, channel.space()); | ||
| 628 | |||
| 629 | assert_eq!(1, sub1.try_next_message_pure().unwrap()); | ||
| 630 | |||
| 631 | assert_eq!(3, channel.space()); | ||
| 632 | |||
| 633 | drop(sub1); | ||
| 634 | |||
| 635 | assert_eq!(4, channel.space()); | ||
| 636 | } | ||
| 637 | |||
| 638 | struct CloneCallCounter(usize); | ||
| 639 | |||
| 640 | impl Clone for CloneCallCounter { | ||
| 641 | fn clone(&self) -> Self { | ||
| 642 | Self(self.0 + 1) | ||
| 643 | } | ||
| 644 | } | ||
| 645 | |||
| 646 | #[futures_test::test] | ||
| 647 | async fn skip_clone_for_last_message() { | ||
| 648 | let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new(); | ||
| 649 | let pub0 = channel.publisher().unwrap(); | ||
| 650 | let mut sub0 = channel.subscriber().unwrap(); | ||
| 651 | let mut sub1 = channel.subscriber().unwrap(); | ||
| 652 | |||
| 653 | pub0.publish(CloneCallCounter(0)).await; | ||
| 654 | |||
| 655 | assert_eq!(1, sub0.try_next_message_pure().unwrap().0); | ||
| 656 | assert_eq!(0, sub1.try_next_message_pure().unwrap().0); | ||
| 657 | } | ||
| 542 | } | 658 | } |
diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index 705797f60..e1edc9eb9 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs | |||
| @@ -42,6 +42,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> { | |||
| 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 42 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 43 | self.channel.publish_with_context(message, None) | 43 | self.channel.publish_with_context(message, None) |
| 44 | } | 44 | } |
| 45 | |||
| 46 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 47 | /// | ||
| 48 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 49 | /// So checking doesn't give any guarantees.* | ||
| 50 | pub fn space(&self) -> usize { | ||
| 51 | self.channel.space() | ||
| 52 | } | ||
| 45 | } | 53 | } |
| 46 | 54 | ||
| 47 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { | 55 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> { |
| @@ -115,6 +123,14 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { | |||
| 115 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 123 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 116 | self.channel.publish_with_context(message, None) | 124 | self.channel.publish_with_context(message, None) |
| 117 | } | 125 | } |
| 126 | |||
| 127 | /// The amount of messages that can still be published without having to wait or without having to lag the subscribers | ||
| 128 | /// | ||
| 129 | /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. | ||
| 130 | /// So checking doesn't give any guarantees.* | ||
| 131 | pub fn space(&self) -> usize { | ||
| 132 | self.channel.space() | ||
| 133 | } | ||
| 118 | } | 134 | } |
| 119 | 135 | ||
| 120 | /// An immediate publisher that holds a dynamic reference to the channel | 136 | /// An immediate publisher that holds a dynamic reference to the channel |
| @@ -158,6 +174,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 158 | } | 174 | } |
| 159 | 175 | ||
| 160 | /// Future for the publisher wait action | 176 | /// Future for the publisher wait action |
| 177 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 161 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 178 | pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 162 | /// The message we need to publish | 179 | /// The message we need to publish |
| 163 | message: Option<T>, | 180 | message: Option<T>, |
diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index b9a2cbe18..f420a75f0 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs | |||
| @@ -64,6 +64,11 @@ impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> { | |||
| 64 | } | 64 | } |
| 65 | } | 65 | } |
| 66 | } | 66 | } |
| 67 | |||
| 68 | /// The amount of messages this subscriber hasn't received yet | ||
| 69 | pub fn available(&self) -> u64 { | ||
| 70 | self.channel.available(self.next_message_id) | ||
| 71 | } | ||
| 67 | } | 72 | } |
| 68 | 73 | ||
| 69 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { | 74 | impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> { |
| @@ -135,6 +140,7 @@ impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: | |||
| 135 | } | 140 | } |
| 136 | 141 | ||
| 137 | /// Future for the subscriber wait action | 142 | /// Future for the subscriber wait action |
| 143 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 138 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { | 144 | pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> { |
| 139 | subscriber: &'s mut Sub<'a, PSB, T>, | 145 | subscriber: &'s mut Sub<'a, PSB, T>, |
| 140 | } | 146 | } |
diff --git a/embassy-sync/src/signal.rs b/embassy-sync/src/signal.rs index f6ebeb9b9..bea67d8be 100644 --- a/embassy-sync/src/signal.rs +++ b/embassy-sync/src/signal.rs | |||
| @@ -1,9 +1,11 @@ | |||
| 1 | //! A synchronization primitive for passing the latest value to a task. | 1 | //! A synchronization primitive for passing the latest value to a task. |
| 2 | use core::cell::UnsafeCell; | 2 | use core::cell::Cell; |
| 3 | use core::future::Future; | 3 | use core::future::{poll_fn, Future}; |
| 4 | use core::mem; | ||
| 5 | use core::task::{Context, Poll, Waker}; | 4 | use core::task::{Context, Poll, Waker}; |
| 6 | 5 | ||
| 6 | use crate::blocking_mutex::raw::RawMutex; | ||
| 7 | use crate::blocking_mutex::Mutex; | ||
| 8 | |||
| 7 | /// Single-slot signaling primitive. | 9 | /// Single-slot signaling primitive. |
| 8 | /// | 10 | /// |
| 9 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except | 11 | /// This is similar to a [`Channel`](crate::channel::Channel) with a buffer size of 1, except |
| @@ -20,16 +22,20 @@ use core::task::{Context, Poll, Waker}; | |||
| 20 | /// | 22 | /// |
| 21 | /// ``` | 23 | /// ``` |
| 22 | /// use embassy_sync::signal::Signal; | 24 | /// use embassy_sync::signal::Signal; |
| 25 | /// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 23 | /// | 26 | /// |
| 24 | /// enum SomeCommand { | 27 | /// enum SomeCommand { |
| 25 | /// On, | 28 | /// On, |
| 26 | /// Off, | 29 | /// Off, |
| 27 | /// } | 30 | /// } |
| 28 | /// | 31 | /// |
| 29 | /// static SOME_SIGNAL: Signal<SomeCommand> = Signal::new(); | 32 | /// static SOME_SIGNAL: Signal<CriticalSectionRawMutex, SomeCommand> = Signal::new(); |
| 30 | /// ``` | 33 | /// ``` |
| 31 | pub struct Signal<T> { | 34 | pub struct Signal<M, T> |
| 32 | state: UnsafeCell<State<T>>, | 35 | where |
| 36 | M: RawMutex, | ||
| 37 | { | ||
| 38 | state: Mutex<M, Cell<State<T>>>, | ||
| 33 | } | 39 | } |
| 34 | 40 | ||
| 35 | enum State<T> { | 41 | enum State<T> { |
| @@ -38,24 +44,36 @@ enum State<T> { | |||
| 38 | Signaled(T), | 44 | Signaled(T), |
| 39 | } | 45 | } |
| 40 | 46 | ||
| 41 | unsafe impl<T: Send> Send for Signal<T> {} | 47 | impl<M, T> Signal<M, T> |
| 42 | unsafe impl<T: Send> Sync for Signal<T> {} | 48 | where |
| 43 | 49 | M: RawMutex, | |
| 44 | impl<T> Signal<T> { | 50 | { |
| 45 | /// Create a new `Signal`. | 51 | /// Create a new `Signal`. |
| 46 | pub const fn new() -> Self { | 52 | pub const fn new() -> Self { |
| 47 | Self { | 53 | Self { |
| 48 | state: UnsafeCell::new(State::None), | 54 | state: Mutex::new(Cell::new(State::None)), |
| 49 | } | 55 | } |
| 50 | } | 56 | } |
| 51 | } | 57 | } |
| 52 | 58 | ||
| 53 | impl<T: Send> Signal<T> { | 59 | impl<M, T> Default for Signal<M, T> |
| 60 | where | ||
| 61 | M: RawMutex, | ||
| 62 | { | ||
| 63 | fn default() -> Self { | ||
| 64 | Self::new() | ||
| 65 | } | ||
| 66 | } | ||
| 67 | |||
| 68 | impl<M, T: Send> Signal<M, T> | ||
| 69 | where | ||
| 70 | M: RawMutex, | ||
| 71 | { | ||
| 54 | /// Mark this Signal as signaled. | 72 | /// Mark this Signal as signaled. |
| 55 | pub fn signal(&self, val: T) { | 73 | pub fn signal(&self, val: T) { |
| 56 | critical_section::with(|_| unsafe { | 74 | self.state.lock(|cell| { |
| 57 | let state = &mut *self.state.get(); | 75 | let state = cell.replace(State::Signaled(val)); |
| 58 | if let State::Waiting(waker) = mem::replace(state, State::Signaled(val)) { | 76 | if let State::Waiting(waker) = state { |
| 59 | waker.wake(); | 77 | waker.wake(); |
| 60 | } | 78 | } |
| 61 | }) | 79 | }) |
| @@ -63,38 +81,46 @@ impl<T: Send> Signal<T> { | |||
| 63 | 81 | ||
| 64 | /// Remove the queued value in this `Signal`, if any. | 82 | /// Remove the queued value in this `Signal`, if any. |
| 65 | pub fn reset(&self) { | 83 | pub fn reset(&self) { |
| 66 | critical_section::with(|_| unsafe { | 84 | self.state.lock(|cell| cell.set(State::None)); |
| 67 | let state = &mut *self.state.get(); | ||
| 68 | *state = State::None | ||
| 69 | }) | ||
| 70 | } | 85 | } |
| 71 | 86 | ||
| 72 | /// Manually poll the Signal future. | 87 | fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { |
| 73 | pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<T> { | 88 | self.state.lock(|cell| { |
| 74 | critical_section::with(|_| unsafe { | 89 | let state = cell.replace(State::None); |
| 75 | let state = &mut *self.state.get(); | ||
| 76 | match state { | 90 | match state { |
| 77 | State::None => { | 91 | State::None => { |
| 78 | *state = State::Waiting(cx.waker().clone()); | 92 | cell.set(State::Waiting(cx.waker().clone())); |
| 93 | Poll::Pending | ||
| 94 | } | ||
| 95 | State::Waiting(w) if w.will_wake(cx.waker()) => { | ||
| 96 | cell.set(State::Waiting(w)); | ||
| 97 | Poll::Pending | ||
| 98 | } | ||
| 99 | State::Waiting(w) => { | ||
| 100 | cell.set(State::Waiting(cx.waker().clone())); | ||
| 101 | w.wake(); | ||
| 79 | Poll::Pending | 102 | Poll::Pending |
| 80 | } | 103 | } |
| 81 | State::Waiting(w) if w.will_wake(cx.waker()) => Poll::Pending, | 104 | State::Signaled(res) => Poll::Ready(res), |
| 82 | State::Waiting(_) => panic!("waker overflow"), | ||
| 83 | State::Signaled(_) => match mem::replace(state, State::None) { | ||
| 84 | State::Signaled(res) => Poll::Ready(res), | ||
| 85 | _ => unreachable!(), | ||
| 86 | }, | ||
| 87 | } | 105 | } |
| 88 | }) | 106 | }) |
| 89 | } | 107 | } |
| 90 | 108 | ||
| 91 | /// Future that completes when this Signal has been signaled. | 109 | /// Future that completes when this Signal has been signaled. |
| 92 | pub fn wait(&self) -> impl Future<Output = T> + '_ { | 110 | pub fn wait(&self) -> impl Future<Output = T> + '_ { |
| 93 | futures_util::future::poll_fn(move |cx| self.poll_wait(cx)) | 111 | poll_fn(move |cx| self.poll_wait(cx)) |
| 94 | } | 112 | } |
| 95 | 113 | ||
| 96 | /// non-blocking method to check whether this signal has been signaled. | 114 | /// non-blocking method to check whether this signal has been signaled. |
| 97 | pub fn signaled(&self) -> bool { | 115 | pub fn signaled(&self) -> bool { |
| 98 | critical_section::with(|_| matches!(unsafe { &*self.state.get() }, State::Signaled(_))) | 116 | self.state.lock(|cell| { |
| 117 | let state = cell.replace(State::None); | ||
| 118 | |||
| 119 | let res = matches!(state, State::Signaled(_)); | ||
| 120 | |||
| 121 | cell.set(state); | ||
| 122 | |||
| 123 | res | ||
| 124 | }) | ||
| 99 | } | 125 | } |
| 100 | } | 126 | } |
diff --git a/embassy-sync/src/waitqueue/atomic_waker.rs b/embassy-sync/src/waitqueue/atomic_waker.rs new file mode 100644 index 000000000..63fe04a6e --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker.rs | |||
| @@ -0,0 +1,41 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::task::Waker; | ||
| 3 | |||
| 4 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 5 | use crate::blocking_mutex::Mutex; | ||
| 6 | |||
| 7 | /// Utility struct to register and wake a waker. | ||
| 8 | pub struct AtomicWaker { | ||
| 9 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | ||
| 10 | } | ||
| 11 | |||
| 12 | impl AtomicWaker { | ||
| 13 | /// Create a new `AtomicWaker`. | ||
| 14 | pub const fn new() -> Self { | ||
| 15 | Self { | ||
| 16 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | ||
| 17 | } | ||
| 18 | } | ||
| 19 | |||
| 20 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 21 | pub fn register(&self, w: &Waker) { | ||
| 22 | critical_section::with(|cs| { | ||
| 23 | let cell = self.waker.borrow(cs); | ||
| 24 | cell.set(match cell.replace(None) { | ||
| 25 | Some(w2) if (w2.will_wake(w)) => Some(w2), | ||
| 26 | _ => Some(w.clone()), | ||
| 27 | }) | ||
| 28 | }) | ||
| 29 | } | ||
| 30 | |||
| 31 | /// Wake the registered waker, if any. | ||
| 32 | pub fn wake(&self) { | ||
| 33 | critical_section::with(|cs| { | ||
| 34 | let cell = self.waker.borrow(cs); | ||
| 35 | if let Some(w) = cell.replace(None) { | ||
| 36 | w.wake_by_ref(); | ||
| 37 | cell.set(Some(w)); | ||
| 38 | } | ||
| 39 | }) | ||
| 40 | } | ||
| 41 | } | ||
diff --git a/embassy-sync/src/waitqueue/atomic_waker_turbo.rs b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs new file mode 100644 index 000000000..5c6a96ec8 --- /dev/null +++ b/embassy-sync/src/waitqueue/atomic_waker_turbo.rs | |||
| @@ -0,0 +1,30 @@ | |||
| 1 | use core::ptr; | ||
| 2 | use core::ptr::NonNull; | ||
| 3 | use core::sync::atomic::{AtomicPtr, Ordering}; | ||
| 4 | use core::task::Waker; | ||
| 5 | |||
| 6 | /// Utility struct to register and wake a waker. | ||
| 7 | pub struct AtomicWaker { | ||
| 8 | waker: AtomicPtr<()>, | ||
| 9 | } | ||
| 10 | |||
| 11 | impl AtomicWaker { | ||
| 12 | /// Create a new `AtomicWaker`. | ||
| 13 | pub const fn new() -> Self { | ||
| 14 | Self { | ||
| 15 | waker: AtomicPtr::new(ptr::null_mut()), | ||
| 16 | } | ||
| 17 | } | ||
| 18 | |||
| 19 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 20 | pub fn register(&self, w: &Waker) { | ||
| 21 | self.waker.store(w.as_turbo_ptr().as_ptr() as _, Ordering::Release); | ||
| 22 | } | ||
| 23 | |||
| 24 | /// Wake the registered waker, if any. | ||
| 25 | pub fn wake(&self) { | ||
| 26 | if let Some(ptr) = NonNull::new(self.waker.load(Ordering::Acquire)) { | ||
| 27 | unsafe { Waker::from_turbo_ptr(ptr) }.wake(); | ||
| 28 | } | ||
| 29 | } | ||
| 30 | } | ||
diff --git a/embassy-sync/src/waitqueue/mod.rs b/embassy-sync/src/waitqueue/mod.rs index 6661a6b61..6b0b0c64e 100644 --- a/embassy-sync/src/waitqueue/mod.rs +++ b/embassy-sync/src/waitqueue/mod.rs | |||
| @@ -1,7 +1,11 @@ | |||
| 1 | //! Async low-level wait queues | 1 | //! Async low-level wait queues |
| 2 | 2 | ||
| 3 | mod waker; | 3 | #[cfg_attr(feature = "turbowakers", path = "atomic_waker_turbo.rs")] |
| 4 | pub use waker::*; | 4 | mod atomic_waker; |
| 5 | pub use atomic_waker::*; | ||
| 6 | |||
| 7 | mod waker_registration; | ||
| 8 | pub use waker_registration::*; | ||
| 5 | 9 | ||
| 6 | mod multi_waker; | 10 | mod multi_waker; |
| 7 | pub use multi_waker::*; | 11 | pub use multi_waker::*; |
diff --git a/embassy-sync/src/waitqueue/multi_waker.rs b/embassy-sync/src/waitqueue/multi_waker.rs index 325d2cb3a..824d192da 100644 --- a/embassy-sync/src/waitqueue/multi_waker.rs +++ b/embassy-sync/src/waitqueue/multi_waker.rs | |||
| @@ -1,33 +1,58 @@ | |||
| 1 | use core::task::Waker; | 1 | use core::task::Waker; |
| 2 | 2 | ||
| 3 | use super::WakerRegistration; | 3 | use heapless::Vec; |
| 4 | 4 | ||
| 5 | /// Utility struct to register and wake multiple wakers. | 5 | /// Utility struct to register and wake multiple wakers. |
| 6 | pub struct MultiWakerRegistration<const N: usize> { | 6 | pub struct MultiWakerRegistration<const N: usize> { |
| 7 | wakers: [WakerRegistration; N], | 7 | wakers: Vec<Waker, N>, |
| 8 | } | 8 | } |
| 9 | 9 | ||
| 10 | impl<const N: usize> MultiWakerRegistration<N> { | 10 | impl<const N: usize> MultiWakerRegistration<N> { |
| 11 | /// Create a new empty instance | 11 | /// Create a new empty instance |
| 12 | pub const fn new() -> Self { | 12 | pub const fn new() -> Self { |
| 13 | const WAKER: WakerRegistration = WakerRegistration::new(); | 13 | Self { wakers: Vec::new() } |
| 14 | Self { wakers: [WAKER; N] } | ||
| 15 | } | 14 | } |
| 16 | 15 | ||
| 17 | /// Register a waker. If the buffer is full the function returns it in the error | 16 | /// Register a waker. If the buffer is full the function returns it in the error |
| 18 | pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { | 17 | pub fn register<'a>(&mut self, w: &'a Waker) { |
| 19 | if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { | 18 | // If we already have some waker that wakes the same task as `w`, do nothing. |
| 20 | waker_slot.register(w); | 19 | // This avoids cloning wakers, and avoids unnecessary mass-wakes. |
| 21 | Ok(()) | 20 | for w2 in &self.wakers { |
| 22 | } else { | 21 | if w.will_wake(w2) { |
| 23 | Err(w) | 22 | return; |
| 23 | } | ||
| 24 | } | ||
| 25 | |||
| 26 | if self.wakers.is_full() { | ||
| 27 | // All waker slots were full. It's a bit inefficient, but we can wake everything. | ||
| 28 | // Any future that is still active will simply reregister. | ||
| 29 | // This won't happen a lot, so it's ok. | ||
| 30 | self.wake(); | ||
| 31 | } | ||
| 32 | |||
| 33 | if self.wakers.push(w.clone()).is_err() { | ||
| 34 | // This can't happen unless N=0 | ||
| 35 | // (Either `wakers` wasn't full, or it was in which case `wake()` empied it) | ||
| 36 | panic!("tried to push a waker to a zero-length MultiWakerRegistration") | ||
| 24 | } | 37 | } |
| 25 | } | 38 | } |
| 26 | 39 | ||
| 27 | /// Wake all registered wakers. This clears the buffer | 40 | /// Wake all registered wakers. This clears the buffer |
| 28 | pub fn wake(&mut self) { | 41 | pub fn wake(&mut self) { |
| 29 | for waker_slot in self.wakers.iter_mut() { | 42 | // heapless::Vec has no `drain()`, do it unsafely ourselves... |
| 30 | waker_slot.wake() | 43 | |
| 44 | // First set length to 0, without dropping the contents. | ||
| 45 | // This is necessary for soundness: if wake() panics and we're using panic=unwind. | ||
| 46 | // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state. | ||
| 47 | // (it'll leak wakers, but that's not UB) | ||
| 48 | let len = self.wakers.len(); | ||
| 49 | unsafe { self.wakers.set_len(0) } | ||
| 50 | |||
| 51 | for i in 0..len { | ||
| 52 | // Move a waker out of the vec. | ||
| 53 | let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() }; | ||
| 54 | // Wake it by value, which consumes (drops) it. | ||
| 55 | waker.wake(); | ||
| 31 | } | 56 | } |
| 32 | } | 57 | } |
| 33 | } | 58 | } |
diff --git a/embassy-sync/src/waitqueue/waker.rs b/embassy-sync/src/waitqueue/waker_registration.rs index 64e300eb8..9b666e7c4 100644 --- a/embassy-sync/src/waitqueue/waker.rs +++ b/embassy-sync/src/waitqueue/waker_registration.rs | |||
| @@ -1,12 +1,8 @@ | |||
| 1 | use core::cell::Cell; | ||
| 2 | use core::mem; | 1 | use core::mem; |
| 3 | use core::task::Waker; | 2 | use core::task::Waker; |
| 4 | 3 | ||
| 5 | use crate::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| 6 | use crate::blocking_mutex::Mutex; | ||
| 7 | |||
| 8 | /// Utility struct to register and wake a waker. | 4 | /// Utility struct to register and wake a waker. |
| 9 | #[derive(Debug)] | 5 | #[derive(Debug, Default)] |
| 10 | pub struct WakerRegistration { | 6 | pub struct WakerRegistration { |
| 11 | waker: Option<Waker>, | 7 | waker: Option<Waker>, |
| 12 | } | 8 | } |
| @@ -54,39 +50,3 @@ impl WakerRegistration { | |||
| 54 | self.waker.is_some() | 50 | self.waker.is_some() |
| 55 | } | 51 | } |
| 56 | } | 52 | } |
| 57 | |||
| 58 | /// Utility struct to register and wake a waker. | ||
| 59 | pub struct AtomicWaker { | ||
| 60 | waker: Mutex<CriticalSectionRawMutex, Cell<Option<Waker>>>, | ||
| 61 | } | ||
| 62 | |||
| 63 | impl AtomicWaker { | ||
| 64 | /// Create a new `AtomicWaker`. | ||
| 65 | pub const fn new() -> Self { | ||
| 66 | Self { | ||
| 67 | waker: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), | ||
| 68 | } | ||
| 69 | } | ||
| 70 | |||
| 71 | /// Register a waker. Overwrites the previous waker, if any. | ||
| 72 | pub fn register(&self, w: &Waker) { | ||
| 73 | critical_section::with(|cs| { | ||
| 74 | let cell = self.waker.borrow(cs); | ||
| 75 | cell.set(match cell.replace(None) { | ||
| 76 | Some(w2) if (w2.will_wake(w)) => Some(w2), | ||
| 77 | _ => Some(w.clone()), | ||
| 78 | }) | ||
| 79 | }) | ||
| 80 | } | ||
| 81 | |||
| 82 | /// Wake the registered waker, if any. | ||
| 83 | pub fn wake(&self) { | ||
| 84 | critical_section::with(|cs| { | ||
| 85 | let cell = self.waker.borrow(cs); | ||
| 86 | if let Some(w) = cell.replace(None) { | ||
| 87 | w.wake_by_ref(); | ||
| 88 | cell.set(Some(w)); | ||
| 89 | } | ||
| 90 | }) | ||
| 91 | } | ||
| 92 | } | ||
