diff options
| author | Dario Nieuwenhuis <[email protected]> | 2022-08-28 22:52:31 +0200 |
|---|---|---|
| committer | Dario Nieuwenhuis <[email protected]> | 2022-08-28 23:01:07 +0200 |
| commit | 2a0df652f37ac00ba3f568b457eabe4e352bc3c3 (patch) | |
| tree | 6b5a4bd4bbc853f98f3210f5bfb19f709622ebb5 | |
| parent | 764ee3b72cc8770f7c398ba1aee41fbd34f07764 (diff) | |
futures: add joins
| -rw-r--r-- | embassy-futures/src/join.rs | 252 | ||||
| -rw-r--r-- | embassy-futures/src/lib.rs | 2 |
2 files changed, 254 insertions, 0 deletions
diff --git a/embassy-futures/src/join.rs b/embassy-futures/src/join.rs new file mode 100644 index 000000000..39a78ccd3 --- /dev/null +++ b/embassy-futures/src/join.rs | |||
| @@ -0,0 +1,252 @@ | |||
| 1 | use core::future::Future; | ||
| 2 | use core::pin::Pin; | ||
| 3 | use core::task::{Context, Poll}; | ||
| 4 | use core::{fmt, mem}; | ||
| 5 | |||
| 6 | #[derive(Debug)] | ||
| 7 | enum MaybeDone<Fut: Future> { | ||
| 8 | /// A not-yet-completed future | ||
| 9 | Future(/* #[pin] */ Fut), | ||
| 10 | /// The output of the completed future | ||
| 11 | Done(Fut::Output), | ||
| 12 | /// The empty variant after the result of a [`MaybeDone`] has been | ||
| 13 | /// taken using the [`take_output`](MaybeDone::take_output) method. | ||
| 14 | Gone, | ||
| 15 | } | ||
| 16 | |||
| 17 | impl<Fut: Future> MaybeDone<Fut> { | ||
| 18 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool { | ||
| 19 | let this = unsafe { self.get_unchecked_mut() }; | ||
| 20 | match this { | ||
| 21 | Self::Future(fut) => match unsafe { Pin::new_unchecked(fut) }.poll(cx) { | ||
| 22 | Poll::Ready(res) => { | ||
| 23 | *this = Self::Done(res); | ||
| 24 | true | ||
| 25 | } | ||
| 26 | Poll::Pending => false, | ||
| 27 | }, | ||
| 28 | _ => true, | ||
| 29 | } | ||
| 30 | } | ||
| 31 | |||
| 32 | fn take_output(&mut self) -> Fut::Output { | ||
| 33 | match &*self { | ||
| 34 | Self::Done(_) => {} | ||
| 35 | Self::Future(_) | Self::Gone => panic!("take_output when MaybeDone is not done."), | ||
| 36 | } | ||
| 37 | match mem::replace(self, Self::Gone) { | ||
| 38 | MaybeDone::Done(output) => output, | ||
| 39 | _ => unreachable!(), | ||
| 40 | } | ||
| 41 | } | ||
| 42 | } | ||
| 43 | |||
| 44 | impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} | ||
| 45 | |||
| 46 | macro_rules! generate { | ||
| 47 | ($( | ||
| 48 | $(#[$doc:meta])* | ||
| 49 | ($Join:ident, <$($Fut:ident),*>), | ||
| 50 | )*) => ($( | ||
| 51 | $(#[$doc])* | ||
| 52 | #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| 53 | #[allow(non_snake_case)] | ||
| 54 | pub struct $Join<$($Fut: Future),*> { | ||
| 55 | $( | ||
| 56 | $Fut: MaybeDone<$Fut>, | ||
| 57 | )* | ||
| 58 | } | ||
| 59 | |||
| 60 | impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*> | ||
| 61 | where | ||
| 62 | $( | ||
| 63 | $Fut: Future + fmt::Debug, | ||
| 64 | $Fut::Output: fmt::Debug, | ||
| 65 | )* | ||
| 66 | { | ||
| 67 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| 68 | f.debug_struct(stringify!($Join)) | ||
| 69 | $(.field(stringify!($Fut), &self.$Fut))* | ||
| 70 | .finish() | ||
| 71 | } | ||
| 72 | } | ||
| 73 | |||
| 74 | impl<$($Fut: Future),*> $Join<$($Fut),*> { | ||
| 75 | #[allow(non_snake_case)] | ||
| 76 | fn new($($Fut: $Fut),*) -> Self { | ||
| 77 | Self { | ||
| 78 | $($Fut: MaybeDone::Future($Fut)),* | ||
| 79 | } | ||
| 80 | } | ||
| 81 | } | ||
| 82 | |||
| 83 | impl<$($Fut: Future),*> Future for $Join<$($Fut),*> { | ||
| 84 | type Output = ($($Fut::Output),*); | ||
| 85 | |||
| 86 | fn poll( | ||
| 87 | self: Pin<&mut Self>, cx: &mut Context<'_> | ||
| 88 | ) -> Poll<Self::Output> { | ||
| 89 | let this = unsafe { self.get_unchecked_mut() }; | ||
| 90 | let mut all_done = true; | ||
| 91 | $( | ||
| 92 | all_done &= unsafe { Pin::new_unchecked(&mut this.$Fut) }.poll(cx); | ||
| 93 | )* | ||
| 94 | |||
| 95 | if all_done { | ||
| 96 | Poll::Ready(($(this.$Fut.take_output()), *)) | ||
| 97 | } else { | ||
| 98 | Poll::Pending | ||
| 99 | } | ||
| 100 | } | ||
| 101 | } | ||
| 102 | )*) | ||
| 103 | } | ||
| 104 | |||
| 105 | generate! { | ||
| 106 | /// Future for the [`join`](join()) function. | ||
| 107 | (Join, <Fut1, Fut2>), | ||
| 108 | |||
| 109 | /// Future for the [`join3`] function. | ||
| 110 | (Join3, <Fut1, Fut2, Fut3>), | ||
| 111 | |||
| 112 | /// Future for the [`join4`] function. | ||
| 113 | (Join4, <Fut1, Fut2, Fut3, Fut4>), | ||
| 114 | |||
| 115 | /// Future for the [`join5`] function. | ||
| 116 | (Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>), | ||
| 117 | } | ||
| 118 | |||
| 119 | /// Joins the result of two futures, waiting for them both to complete. | ||
| 120 | /// | ||
| 121 | /// This function will return a new future which awaits both futures to | ||
| 122 | /// complete. The returned future will finish with a tuple of both results. | ||
| 123 | /// | ||
| 124 | /// Note that this function consumes the passed futures and returns a | ||
| 125 | /// wrapped version of it. | ||
| 126 | /// | ||
| 127 | /// # Examples | ||
| 128 | /// | ||
| 129 | /// ``` | ||
| 130 | /// # embassy_futures::block_on(async { | ||
| 131 | /// | ||
| 132 | /// let a = async { 1 }; | ||
| 133 | /// let b = async { 2 }; | ||
| 134 | /// let pair = embassy_futures::join(a, b).await; | ||
| 135 | /// | ||
| 136 | /// assert_eq!(pair, (1, 2)); | ||
| 137 | /// # }); | ||
| 138 | /// ``` | ||
| 139 | pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2> | ||
| 140 | where | ||
| 141 | Fut1: Future, | ||
| 142 | Fut2: Future, | ||
| 143 | { | ||
| 144 | Join::new(future1, future2) | ||
| 145 | } | ||
| 146 | |||
| 147 | /// Joins the result of three futures, waiting for them all to complete. | ||
| 148 | /// | ||
| 149 | /// This function will return a new future which awaits all futures to | ||
| 150 | /// complete. The returned future will finish with a tuple of all results. | ||
| 151 | /// | ||
| 152 | /// Note that this function consumes the passed futures and returns a | ||
| 153 | /// wrapped version of it. | ||
| 154 | /// | ||
| 155 | /// # Examples | ||
| 156 | /// | ||
| 157 | /// ``` | ||
| 158 | /// # embassy_futures::block_on(async { | ||
| 159 | /// | ||
| 160 | /// let a = async { 1 }; | ||
| 161 | /// let b = async { 2 }; | ||
| 162 | /// let c = async { 3 }; | ||
| 163 | /// let res = embassy_futures::join3(a, b, c).await; | ||
| 164 | /// | ||
| 165 | /// assert_eq!(res, (1, 2, 3)); | ||
| 166 | /// # }); | ||
| 167 | /// ``` | ||
| 168 | pub fn join3<Fut1, Fut2, Fut3>(future1: Fut1, future2: Fut2, future3: Fut3) -> Join3<Fut1, Fut2, Fut3> | ||
| 169 | where | ||
| 170 | Fut1: Future, | ||
| 171 | Fut2: Future, | ||
| 172 | Fut3: Future, | ||
| 173 | { | ||
| 174 | Join3::new(future1, future2, future3) | ||
| 175 | } | ||
| 176 | |||
| 177 | /// Joins the result of four futures, waiting for them all to complete. | ||
| 178 | /// | ||
| 179 | /// This function will return a new future which awaits all futures to | ||
| 180 | /// complete. The returned future will finish with a tuple of all results. | ||
| 181 | /// | ||
| 182 | /// Note that this function consumes the passed futures and returns a | ||
| 183 | /// wrapped version of it. | ||
| 184 | /// | ||
| 185 | /// # Examples | ||
| 186 | /// | ||
| 187 | /// ``` | ||
| 188 | /// # embassy_futures::block_on(async { | ||
| 189 | /// | ||
| 190 | /// let a = async { 1 }; | ||
| 191 | /// let b = async { 2 }; | ||
| 192 | /// let c = async { 3 }; | ||
| 193 | /// let d = async { 4 }; | ||
| 194 | /// let res = embassy_futures::join4(a, b, c, d).await; | ||
| 195 | /// | ||
| 196 | /// assert_eq!(res, (1, 2, 3, 4)); | ||
| 197 | /// # }); | ||
| 198 | /// ``` | ||
| 199 | pub fn join4<Fut1, Fut2, Fut3, Fut4>( | ||
| 200 | future1: Fut1, | ||
| 201 | future2: Fut2, | ||
| 202 | future3: Fut3, | ||
| 203 | future4: Fut4, | ||
| 204 | ) -> Join4<Fut1, Fut2, Fut3, Fut4> | ||
| 205 | where | ||
| 206 | Fut1: Future, | ||
| 207 | Fut2: Future, | ||
| 208 | Fut3: Future, | ||
| 209 | Fut4: Future, | ||
| 210 | { | ||
| 211 | Join4::new(future1, future2, future3, future4) | ||
| 212 | } | ||
| 213 | |||
| 214 | /// Joins the result of five futures, waiting for them all to complete. | ||
| 215 | /// | ||
| 216 | /// This function will return a new future which awaits all futures to | ||
| 217 | /// complete. The returned future will finish with a tuple of all results. | ||
| 218 | /// | ||
| 219 | /// Note that this function consumes the passed futures and returns a | ||
| 220 | /// wrapped version of it. | ||
| 221 | /// | ||
| 222 | /// # Examples | ||
| 223 | /// | ||
| 224 | /// ``` | ||
| 225 | /// # embassy_futures::block_on(async { | ||
| 226 | /// | ||
| 227 | /// let a = async { 1 }; | ||
| 228 | /// let b = async { 2 }; | ||
| 229 | /// let c = async { 3 }; | ||
| 230 | /// let d = async { 4 }; | ||
| 231 | /// let e = async { 5 }; | ||
| 232 | /// let res = embassy_futures::join5(a, b, c, d, e).await; | ||
| 233 | /// | ||
| 234 | /// assert_eq!(res, (1, 2, 3, 4, 5)); | ||
| 235 | /// # }); | ||
| 236 | /// ``` | ||
| 237 | pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>( | ||
| 238 | future1: Fut1, | ||
| 239 | future2: Fut2, | ||
| 240 | future3: Fut3, | ||
| 241 | future4: Fut4, | ||
| 242 | future5: Fut5, | ||
| 243 | ) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5> | ||
| 244 | where | ||
| 245 | Fut1: Future, | ||
| 246 | Fut2: Future, | ||
| 247 | Fut3: Future, | ||
| 248 | Fut4: Future, | ||
| 249 | Fut5: Future, | ||
| 250 | { | ||
| 251 | Join5::new(future1, future2, future3, future4, future5) | ||
| 252 | } | ||
diff --git a/embassy-futures/src/lib.rs b/embassy-futures/src/lib.rs index 41e27047d..ea135b3ab 100644 --- a/embassy-futures/src/lib.rs +++ b/embassy-futures/src/lib.rs | |||
| @@ -6,9 +6,11 @@ | |||
| 6 | pub(crate) mod fmt; | 6 | pub(crate) mod fmt; |
| 7 | 7 | ||
| 8 | mod block_on; | 8 | mod block_on; |
| 9 | mod join; | ||
| 9 | mod select; | 10 | mod select; |
| 10 | mod yield_now; | 11 | mod yield_now; |
| 11 | 12 | ||
| 12 | pub use block_on::*; | 13 | pub use block_on::*; |
| 14 | pub use join::*; | ||
| 13 | pub use select::*; | 15 | pub use select::*; |
| 14 | pub use yield_now::*; | 16 | pub use yield_now::*; |
