aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlf Lilleengen <[email protected]>2025-01-15 14:50:34 +0000
committerGitHub <[email protected]>2025-01-15 14:50:34 +0000
commit1e6b807acbb106254209153cccd46c7ffd754c04 (patch)
treefc47f92256af3cfb85e0f6d9dfb85b9723c40940
parent27fb1f4dd004bd32c718b932694cb498fe9bff91 (diff)
parentc06862eeaf44eabd291194ed29f9e12558d1abf4 (diff)
Merge pull request #3775 from embassy-rs/dyn-dispatch-pipe
feat: add dynamic dispatch variants of pipe
-rw-r--r--embassy-sync/CHANGELOG.md4
-rw-r--r--embassy-sync/src/pipe.rs273
2 files changed, 277 insertions, 0 deletions
diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md
index a7547422f..b96c9416d 100644
--- a/embassy-sync/CHANGELOG.md
+++ b/embassy-sync/CHANGELOG.md
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), 5The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 6and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7 7
8## Unreleased
9
10- Add dynamic dispatch variant of `Pipe`.
11
8## 0.6.1 - 2024-11-22 12## 0.6.1 - 2024-11-22
9 13
10- Add `LazyLock` sync primitive. 14- Add `LazyLock` sync primitive.
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs
index cd5b8ed75..2598652d2 100644
--- a/embassy-sync/src/pipe.rs
+++ b/embassy-sync/src/pipe.rs
@@ -532,6 +532,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N>
532 } 532 }
533} 533}
534 534
535//
536// Type-erased variants
537//
538
539pub(crate) trait DynamicPipe {
540 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
541 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
542
543 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
544 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
545
546 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
547 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
548
549 fn consume(&self, amt: usize);
550 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
551}
552
553impl<M, const N: usize> DynamicPipe for Pipe<M, N>
554where
555 M: RawMutex,
556{
557 fn consume(&self, amt: usize) {
558 Pipe::consume(self, amt)
559 }
560
561 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
562 Pipe::try_fill_buf_with_context(self, cx)
563 }
564
565 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
566 Pipe::write(self, buf).into()
567 }
568
569 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
570 Pipe::read(self, buf).into()
571 }
572
573 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
574 Pipe::try_read(self, buf)
575 }
576
577 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
578 Pipe::try_write(self, buf)
579 }
580
581 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
582 Pipe::try_write_with_context(self, cx, buf)
583 }
584
585 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
586 Pipe::try_read_with_context(self, cx, buf)
587 }
588}
589
590/// Write-only access to a [`DynamicPipe`].
591pub struct DynamicWriter<'p> {
592 pipe: &'p dyn DynamicPipe,
593}
594
595impl<'p> Clone for DynamicWriter<'p> {
596 fn clone(&self) -> Self {
597 *self
598 }
599}
600
601impl<'p> Copy for DynamicWriter<'p> {}
602
603impl<'p> DynamicWriter<'p> {
604 /// Write some bytes to the pipe.
605 ///
606 /// See [`Pipe::write()`]
607 pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
608 self.pipe.write(buf)
609 }
610
611 /// Attempt to immediately write some bytes to the pipe.
612 ///
613 /// See [`Pipe::try_write()`]
614 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
615 self.pipe.try_write(buf)
616 }
617}
618
619impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
620where
621 M: RawMutex,
622{
623 fn from(value: Writer<'p, M, N>) -> Self {
624 Self { pipe: value.pipe }
625 }
626}
627
628/// Future returned by [`DynamicWriter::write`].
629#[must_use = "futures do nothing unless you `.await` or poll them"]
630pub struct DynamicWriteFuture<'p> {
631 pipe: &'p dyn DynamicPipe,
632 buf: &'p [u8],
633}
634
635impl<'p> Future for DynamicWriteFuture<'p> {
636 type Output = usize;
637
638 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
639 match self.pipe.try_write_with_context(Some(cx), self.buf) {
640 Ok(n) => Poll::Ready(n),
641 Err(TryWriteError::Full) => Poll::Pending,
642 }
643 }
644}
645
646impl<'p> Unpin for DynamicWriteFuture<'p> {}
647
648impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
649where
650 M: RawMutex,
651{
652 fn from(value: WriteFuture<'p, M, N>) -> Self {
653 Self {
654 pipe: value.pipe,
655 buf: value.buf,
656 }
657 }
658}
659
660/// Read-only access to a [`DynamicPipe`].
661pub struct DynamicReader<'p> {
662 pipe: &'p dyn DynamicPipe,
663}
664
665impl<'p> DynamicReader<'p> {
666 /// Read some bytes from the pipe.
667 ///
668 /// See [`Pipe::read()`]
669 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
670 self.pipe.read(buf)
671 }
672
673 /// Attempt to immediately read some bytes from the pipe.
674 ///
675 /// See [`Pipe::try_read()`]
676 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
677 self.pipe.try_read(buf)
678 }
679
680 /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
681 ///
682 /// If no bytes are currently available to read, this function waits until at least one byte is available.
683 ///
684 /// If the reader is at end-of-file (EOF), an empty slice is returned.
685 pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
686 DynamicFillBufFuture { pipe: Some(self.pipe) }
687 }
688
689 /// Try returning contents of the internal buffer.
690 ///
691 /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
692 ///
693 /// If the reader is at end-of-file (EOF), an empty slice is returned.
694 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
695 unsafe { self.pipe.try_fill_buf_with_context(None) }
696 }
697
698 /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
699 pub fn consume(&mut self, amt: usize) {
700 self.pipe.consume(amt)
701 }
702}
703
704impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
705where
706 M: RawMutex,
707{
708 fn from(value: Reader<'p, M, N>) -> Self {
709 Self { pipe: value.pipe }
710 }
711}
712
713/// Future returned by [`Pipe::read`] and [`Reader::read`].
714#[must_use = "futures do nothing unless you `.await` or poll them"]
715pub struct DynamicReadFuture<'p> {
716 pipe: &'p dyn DynamicPipe,
717 buf: &'p mut [u8],
718}
719
720impl<'p> Future for DynamicReadFuture<'p> {
721 type Output = usize;
722
723 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
724 match self.pipe.try_read_with_context(Some(cx), self.buf) {
725 Ok(n) => Poll::Ready(n),
726 Err(TryReadError::Empty) => Poll::Pending,
727 }
728 }
729}
730
731impl<'p> Unpin for DynamicReadFuture<'p> {}
732
733impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
734where
735 M: RawMutex,
736{
737 fn from(value: ReadFuture<'p, M, N>) -> Self {
738 Self {
739 pipe: value.pipe,
740 buf: value.buf,
741 }
742 }
743}
744
745/// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`].
746#[must_use = "futures do nothing unless you `.await` or poll them"]
747pub struct DynamicFillBufFuture<'p> {
748 pipe: Option<&'p dyn DynamicPipe>,
749}
750
751impl<'p> Future for DynamicFillBufFuture<'p> {
752 type Output = &'p [u8];
753
754 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755 let pipe = self.pipe.take().unwrap();
756 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
757 Ok(buf) => Poll::Ready(buf),
758 Err(TryReadError::Empty) => {
759 self.pipe = Some(pipe);
760 Poll::Pending
761 }
762 }
763 }
764}
765
766impl<'p> Unpin for DynamicFillBufFuture<'p> {}
767
768impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
769where
770 M: RawMutex,
771{
772 fn from(value: FillBufFuture<'p, M, N>) -> Self {
773 Self {
774 pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
775 }
776 }
777}
778
535#[cfg(test)] 779#[cfg(test)]
536mod tests { 780mod tests {
537 use futures_executor::ThreadPool; 781 use futures_executor::ThreadPool;
@@ -619,6 +863,35 @@ mod tests {
619 let _ = w.clone(); 863 let _ = w.clone();
620 } 864 }
621 865
866 #[test]
867 fn dynamic_dispatch_pipe() {
868 let mut c = Pipe::<NoopRawMutex, 3>::new();
869 let (r, w) = c.split();
870 let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
871
872 assert!(w.try_write(&[42, 43]).is_ok());
873 let buf = r.try_fill_buf().unwrap();
874 assert_eq!(buf, &[42, 43]);
875 let buf = r.try_fill_buf().unwrap();
876 assert_eq!(buf, &[42, 43]);
877 r.consume(1);
878 let buf = r.try_fill_buf().unwrap();
879 assert_eq!(buf, &[43]);
880 r.consume(1);
881 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
882 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
883 assert_eq!(w.try_write(&[45, 46]), Ok(2));
884 let buf = r.try_fill_buf().unwrap();
885 assert_eq!(buf, &[44]); // only one byte due to wraparound.
886 r.consume(1);
887 let buf = r.try_fill_buf().unwrap();
888 assert_eq!(buf, &[45, 46]);
889 assert!(w.try_write(&[47]).is_ok());
890 let buf = r.try_fill_buf().unwrap();
891 assert_eq!(buf, &[45, 46, 47]);
892 r.consume(3);
893 }
894
622 #[futures_test::test] 895 #[futures_test::test]
623 async fn receiver_receives_given_try_write_async() { 896 async fn receiver_receives_given_try_write_async() {
624 let executor = ThreadPool::new().unwrap(); 897 let executor = ThreadPool::new().unwrap();