diff options
| author | Ulf Lilleengen <[email protected]> | 2025-01-15 14:50:34 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-01-15 14:50:34 +0000 |
| commit | 1e6b807acbb106254209153cccd46c7ffd754c04 (patch) | |
| tree | fc47f92256af3cfb85e0f6d9dfb85b9723c40940 /embassy-sync/src/pipe.rs | |
| parent | 27fb1f4dd004bd32c718b932694cb498fe9bff91 (diff) | |
| parent | c06862eeaf44eabd291194ed29f9e12558d1abf4 (diff) | |
Merge pull request #3775 from embassy-rs/dyn-dispatch-pipe
feat: add dynamic dispatch variants of pipe
Diffstat (limited to 'embassy-sync/src/pipe.rs')
| -rw-r--r-- | embassy-sync/src/pipe.rs | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index cd5b8ed75..2598652d2 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs | |||
| @@ -532,6 +532,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> | |||
| 532 | } | 532 | } |
| 533 | } | 533 | } |
| 534 | 534 | ||
| 535 | // | ||
| 536 | // Type-erased variants | ||
| 537 | // | ||
| 538 | |||
| 539 | pub(crate) trait DynamicPipe { | ||
| 540 | fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>; | ||
| 541 | fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>; | ||
| 542 | |||
| 543 | fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>; | ||
| 544 | fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>; | ||
| 545 | |||
| 546 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>; | ||
| 547 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>; | ||
| 548 | |||
| 549 | fn consume(&self, amt: usize); | ||
| 550 | unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>; | ||
| 551 | } | ||
| 552 | |||
| 553 | impl<M, const N: usize> DynamicPipe for Pipe<M, N> | ||
| 554 | where | ||
| 555 | M: RawMutex, | ||
| 556 | { | ||
| 557 | fn consume(&self, amt: usize) { | ||
| 558 | Pipe::consume(self, amt) | ||
| 559 | } | ||
| 560 | |||
| 561 | unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> { | ||
| 562 | Pipe::try_fill_buf_with_context(self, cx) | ||
| 563 | } | ||
| 564 | |||
| 565 | fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> { | ||
| 566 | Pipe::write(self, buf).into() | ||
| 567 | } | ||
| 568 | |||
| 569 | fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> { | ||
| 570 | Pipe::read(self, buf).into() | ||
| 571 | } | ||
| 572 | |||
| 573 | fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 574 | Pipe::try_read(self, buf) | ||
| 575 | } | ||
| 576 | |||
| 577 | fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 578 | Pipe::try_write(self, buf) | ||
| 579 | } | ||
| 580 | |||
| 581 | fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 582 | Pipe::try_write_with_context(self, cx, buf) | ||
| 583 | } | ||
| 584 | |||
| 585 | fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 586 | Pipe::try_read_with_context(self, cx, buf) | ||
| 587 | } | ||
| 588 | } | ||
| 589 | |||
| 590 | /// Write-only access to a [`DynamicPipe`]. | ||
| 591 | pub struct DynamicWriter<'p> { | ||
| 592 | pipe: &'p dyn DynamicPipe, | ||
| 593 | } | ||
| 594 | |||
| 595 | impl<'p> Clone for DynamicWriter<'p> { | ||
| 596 | fn clone(&self) -> Self { | ||
| 597 | *self | ||
| 598 | } | ||
| 599 | } | ||
| 600 | |||
| 601 | impl<'p> Copy for DynamicWriter<'p> {} | ||
| 602 | |||
| 603 | impl<'p> DynamicWriter<'p> { | ||
| 604 | /// Write some bytes to the pipe. | ||
| 605 | /// | ||
| 606 | /// See [`Pipe::write()`] | ||
| 607 | pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> { | ||
| 608 | self.pipe.write(buf) | ||
| 609 | } | ||
| 610 | |||
| 611 | /// Attempt to immediately write some bytes to the pipe. | ||
| 612 | /// | ||
| 613 | /// See [`Pipe::try_write()`] | ||
| 614 | pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { | ||
| 615 | self.pipe.try_write(buf) | ||
| 616 | } | ||
| 617 | } | ||
| 618 | |||
| 619 | impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p> | ||
| 620 | where | ||
| 621 | M: RawMutex, | ||
| 622 | { | ||
| 623 | fn from(value: Writer<'p, M, N>) -> Self { | ||
| 624 | Self { pipe: value.pipe } | ||
| 625 | } | ||
| 626 | } | ||
| 627 | |||
| 628 | /// Future returned by [`DynamicWriter::write`]. | ||
| 629 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 630 | pub struct DynamicWriteFuture<'p> { | ||
| 631 | pipe: &'p dyn DynamicPipe, | ||
| 632 | buf: &'p [u8], | ||
| 633 | } | ||
| 634 | |||
| 635 | impl<'p> Future for DynamicWriteFuture<'p> { | ||
| 636 | type Output = usize; | ||
| 637 | |||
| 638 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 639 | match self.pipe.try_write_with_context(Some(cx), self.buf) { | ||
| 640 | Ok(n) => Poll::Ready(n), | ||
| 641 | Err(TryWriteError::Full) => Poll::Pending, | ||
| 642 | } | ||
| 643 | } | ||
| 644 | } | ||
| 645 | |||
| 646 | impl<'p> Unpin for DynamicWriteFuture<'p> {} | ||
| 647 | |||
| 648 | impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p> | ||
| 649 | where | ||
| 650 | M: RawMutex, | ||
| 651 | { | ||
| 652 | fn from(value: WriteFuture<'p, M, N>) -> Self { | ||
| 653 | Self { | ||
| 654 | pipe: value.pipe, | ||
| 655 | buf: value.buf, | ||
| 656 | } | ||
| 657 | } | ||
| 658 | } | ||
| 659 | |||
| 660 | /// Read-only access to a [`DynamicPipe`]. | ||
| 661 | pub struct DynamicReader<'p> { | ||
| 662 | pipe: &'p dyn DynamicPipe, | ||
| 663 | } | ||
| 664 | |||
| 665 | impl<'p> DynamicReader<'p> { | ||
| 666 | /// Read some bytes from the pipe. | ||
| 667 | /// | ||
| 668 | /// See [`Pipe::read()`] | ||
| 669 | pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> { | ||
| 670 | self.pipe.read(buf) | ||
| 671 | } | ||
| 672 | |||
| 673 | /// Attempt to immediately read some bytes from the pipe. | ||
| 674 | /// | ||
| 675 | /// See [`Pipe::try_read()`] | ||
| 676 | pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { | ||
| 677 | self.pipe.try_read(buf) | ||
| 678 | } | ||
| 679 | |||
| 680 | /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty. | ||
| 681 | /// | ||
| 682 | /// If no bytes are currently available to read, this function waits until at least one byte is available. | ||
| 683 | /// | ||
| 684 | /// If the reader is at end-of-file (EOF), an empty slice is returned. | ||
| 685 | pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> { | ||
| 686 | DynamicFillBufFuture { pipe: Some(self.pipe) } | ||
| 687 | } | ||
| 688 | |||
| 689 | /// Try returning contents of the internal buffer. | ||
| 690 | /// | ||
| 691 | /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`. | ||
| 692 | /// | ||
| 693 | /// If the reader is at end-of-file (EOF), an empty slice is returned. | ||
| 694 | pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> { | ||
| 695 | unsafe { self.pipe.try_fill_buf_with_context(None) } | ||
| 696 | } | ||
| 697 | |||
| 698 | /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`. | ||
| 699 | pub fn consume(&mut self, amt: usize) { | ||
| 700 | self.pipe.consume(amt) | ||
| 701 | } | ||
| 702 | } | ||
| 703 | |||
| 704 | impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p> | ||
| 705 | where | ||
| 706 | M: RawMutex, | ||
| 707 | { | ||
| 708 | fn from(value: Reader<'p, M, N>) -> Self { | ||
| 709 | Self { pipe: value.pipe } | ||
| 710 | } | ||
| 711 | } | ||
| 712 | |||
| 713 | /// Future returned by [`Pipe::read`] and [`Reader::read`]. | ||
| 714 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 715 | pub struct DynamicReadFuture<'p> { | ||
| 716 | pipe: &'p dyn DynamicPipe, | ||
| 717 | buf: &'p mut [u8], | ||
| 718 | } | ||
| 719 | |||
| 720 | impl<'p> Future for DynamicReadFuture<'p> { | ||
| 721 | type Output = usize; | ||
| 722 | |||
| 723 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 724 | match self.pipe.try_read_with_context(Some(cx), self.buf) { | ||
| 725 | Ok(n) => Poll::Ready(n), | ||
| 726 | Err(TryReadError::Empty) => Poll::Pending, | ||
| 727 | } | ||
| 728 | } | ||
| 729 | } | ||
| 730 | |||
| 731 | impl<'p> Unpin for DynamicReadFuture<'p> {} | ||
| 732 | |||
| 733 | impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p> | ||
| 734 | where | ||
| 735 | M: RawMutex, | ||
| 736 | { | ||
| 737 | fn from(value: ReadFuture<'p, M, N>) -> Self { | ||
| 738 | Self { | ||
| 739 | pipe: value.pipe, | ||
| 740 | buf: value.buf, | ||
| 741 | } | ||
| 742 | } | ||
| 743 | } | ||
| 744 | |||
| 745 | /// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`]. | ||
| 746 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 747 | pub struct DynamicFillBufFuture<'p> { | ||
| 748 | pipe: Option<&'p dyn DynamicPipe>, | ||
| 749 | } | ||
| 750 | |||
| 751 | impl<'p> Future for DynamicFillBufFuture<'p> { | ||
| 752 | type Output = &'p [u8]; | ||
| 753 | |||
| 754 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| 755 | let pipe = self.pipe.take().unwrap(); | ||
| 756 | match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } { | ||
| 757 | Ok(buf) => Poll::Ready(buf), | ||
| 758 | Err(TryReadError::Empty) => { | ||
| 759 | self.pipe = Some(pipe); | ||
| 760 | Poll::Pending | ||
| 761 | } | ||
| 762 | } | ||
| 763 | } | ||
| 764 | } | ||
| 765 | |||
| 766 | impl<'p> Unpin for DynamicFillBufFuture<'p> {} | ||
| 767 | |||
| 768 | impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p> | ||
| 769 | where | ||
| 770 | M: RawMutex, | ||
| 771 | { | ||
| 772 | fn from(value: FillBufFuture<'p, M, N>) -> Self { | ||
| 773 | Self { | ||
| 774 | pipe: value.pipe.map(|p| p as &dyn DynamicPipe), | ||
| 775 | } | ||
| 776 | } | ||
| 777 | } | ||
| 778 | |||
| 535 | #[cfg(test)] | 779 | #[cfg(test)] |
| 536 | mod tests { | 780 | mod tests { |
| 537 | use futures_executor::ThreadPool; | 781 | use futures_executor::ThreadPool; |
| @@ -619,6 +863,35 @@ mod tests { | |||
| 619 | let _ = w.clone(); | 863 | let _ = w.clone(); |
| 620 | } | 864 | } |
| 621 | 865 | ||
| 866 | #[test] | ||
| 867 | fn dynamic_dispatch_pipe() { | ||
| 868 | let mut c = Pipe::<NoopRawMutex, 3>::new(); | ||
| 869 | let (r, w) = c.split(); | ||
| 870 | let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into()); | ||
| 871 | |||
| 872 | assert!(w.try_write(&[42, 43]).is_ok()); | ||
| 873 | let buf = r.try_fill_buf().unwrap(); | ||
| 874 | assert_eq!(buf, &[42, 43]); | ||
| 875 | let buf = r.try_fill_buf().unwrap(); | ||
| 876 | assert_eq!(buf, &[42, 43]); | ||
| 877 | r.consume(1); | ||
| 878 | let buf = r.try_fill_buf().unwrap(); | ||
| 879 | assert_eq!(buf, &[43]); | ||
| 880 | r.consume(1); | ||
| 881 | assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty)); | ||
| 882 | assert_eq!(w.try_write(&[44, 45, 46]), Ok(1)); | ||
| 883 | assert_eq!(w.try_write(&[45, 46]), Ok(2)); | ||
| 884 | let buf = r.try_fill_buf().unwrap(); | ||
| 885 | assert_eq!(buf, &[44]); // only one byte due to wraparound. | ||
| 886 | r.consume(1); | ||
| 887 | let buf = r.try_fill_buf().unwrap(); | ||
| 888 | assert_eq!(buf, &[45, 46]); | ||
| 889 | assert!(w.try_write(&[47]).is_ok()); | ||
| 890 | let buf = r.try_fill_buf().unwrap(); | ||
| 891 | assert_eq!(buf, &[45, 46, 47]); | ||
| 892 | r.consume(3); | ||
| 893 | } | ||
| 894 | |||
| 622 | #[futures_test::test] | 895 | #[futures_test::test] |
| 623 | async fn receiver_receives_given_try_write_async() { | 896 | async fn receiver_receives_given_try_write_async() { |
| 624 | let executor = ThreadPool::new().unwrap(); | 897 | let executor = ThreadPool::new().unwrap(); |
