diff options
| author | Corey Schuhen <[email protected]> | 2025-05-28 18:15:15 +1000 |
|---|---|---|
| committer | Corey Schuhen <[email protected]> | 2025-05-28 20:29:08 +1000 |
| commit | 277f6f7331684a0008930a43b6705ba52873d1f5 (patch) | |
| tree | 6d0028cc89cf1556c811cd7eba18d680bce18439 /embassy-sync/src | |
| parent | 645883d8748995beb8b07f5cee93ac960f6d6a9f (diff) | |
Make Sync capable versions of DynamicSender and DynamicReceiver.
DynamicSender and DynamicReceiver, just seem to be a fat pointer to a
Channel which is already protected by it's own Mutex already. In fact,
you can share the Channel already betwen threads and create Dynamic*er's
in the target threads. It should be safe to share the Dynamic*er's
directly. Can only be used when Mutex M of channel supoorts Sync.
Diffstat (limited to 'embassy-sync/src')
| -rw-r--r-- | embassy-sync/src/channel.rs | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 4d1fa9e39..1b053ecad 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs | |||
| @@ -164,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> { | |||
| 164 | } | 164 | } |
| 165 | } | 165 | } |
| 166 | 166 | ||
| 167 | /// Send-only access to a [`Channel`] without knowing channel size. | ||
| 168 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 169 | pub struct SendDynamicSender<'ch, T> { | ||
| 170 | pub(crate) channel: &'ch dyn DynamicChannel<T>, | ||
| 171 | } | ||
| 172 | |||
| 173 | impl<'ch, T> Clone for SendDynamicSender<'ch, T> { | ||
| 174 | fn clone(&self) -> Self { | ||
| 175 | *self | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | impl<'ch, T> Copy for SendDynamicSender<'ch, T> {} | ||
| 180 | unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {} | ||
| 181 | unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {} | ||
| 182 | |||
| 183 | impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T> | ||
| 184 | where | ||
| 185 | M: RawMutex + Sync + Send, | ||
| 186 | { | ||
| 187 | fn from(s: Sender<'ch, M, T, N>) -> Self { | ||
| 188 | Self { channel: s.channel } | ||
| 189 | } | ||
| 190 | } | ||
| 191 | |||
| 192 | impl<'ch, T> SendDynamicSender<'ch, T> { | ||
| 193 | /// Sends a value. | ||
| 194 | /// | ||
| 195 | /// See [`Channel::send()`] | ||
| 196 | pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { | ||
| 197 | DynamicSendFuture { | ||
| 198 | channel: self.channel, | ||
| 199 | message: Some(message), | ||
| 200 | } | ||
| 201 | } | ||
| 202 | |||
| 203 | /// Attempt to immediately send a message. | ||
| 204 | /// | ||
| 205 | /// See [`Channel::send()`] | ||
| 206 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | ||
| 207 | self.channel.try_send_with_context(message, None) | ||
| 208 | } | ||
| 209 | |||
| 210 | /// Allows a poll_fn to poll until the channel is ready to send | ||
| 211 | /// | ||
| 212 | /// See [`Channel::poll_ready_to_send()`] | ||
| 213 | pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 214 | self.channel.poll_ready_to_send(cx) | ||
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 167 | /// Receive-only access to a [`Channel`]. | 218 | /// Receive-only access to a [`Channel`]. |
| 168 | pub struct Receiver<'ch, M, T, const N: usize> | 219 | pub struct Receiver<'ch, M, T, const N: usize> |
| 169 | where | 220 | where |
| @@ -317,6 +368,61 @@ where | |||
| 317 | } | 368 | } |
| 318 | } | 369 | } |
| 319 | 370 | ||
| 371 | /// Receive-only access to a [`Channel`] without knowing channel size. | ||
| 372 | /// This version can be sent between threads but can only be created if the underlying mutex is Sync. | ||
| 373 | pub struct SendableDynamicReceiver<'ch, T> { | ||
| 374 | pub(crate) channel: &'ch dyn DynamicChannel<T>, | ||
| 375 | } | ||
| 376 | |||
| 377 | impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> { | ||
| 378 | fn clone(&self) -> Self { | ||
| 379 | *self | ||
| 380 | } | ||
| 381 | } | ||
| 382 | |||
| 383 | impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {} | ||
| 384 | unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {} | ||
| 385 | unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {} | ||
| 386 | |||
| 387 | impl<'ch, T> SendableDynamicReceiver<'ch, T> { | ||
| 388 | /// Receive the next value. | ||
| 389 | /// | ||
| 390 | /// See [`Channel::receive()`]. | ||
| 391 | pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { | ||
| 392 | DynamicReceiveFuture { channel: self.channel } | ||
| 393 | } | ||
| 394 | |||
| 395 | /// Attempt to immediately receive the next value. | ||
| 396 | /// | ||
| 397 | /// See [`Channel::try_receive()`] | ||
| 398 | pub fn try_receive(&self) -> Result<T, TryReceiveError> { | ||
| 399 | self.channel.try_receive_with_context(None) | ||
| 400 | } | ||
| 401 | |||
| 402 | /// Allows a poll_fn to poll until the channel is ready to receive | ||
| 403 | /// | ||
| 404 | /// See [`Channel::poll_ready_to_receive()`] | ||
| 405 | pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { | ||
| 406 | self.channel.poll_ready_to_receive(cx) | ||
| 407 | } | ||
| 408 | |||
| 409 | /// Poll the channel for the next item | ||
| 410 | /// | ||
| 411 | /// See [`Channel::poll_receive()`] | ||
| 412 | pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> { | ||
| 413 | self.channel.poll_receive(cx) | ||
| 414 | } | ||
| 415 | } | ||
| 416 | |||
| 417 | impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendableDynamicReceiver<'ch, T> | ||
| 418 | where | ||
| 419 | M: RawMutex + Sync + Send, | ||
| 420 | { | ||
| 421 | fn from(s: Receiver<'ch, M, T, N>) -> Self { | ||
| 422 | Self { channel: s.channel } | ||
| 423 | } | ||
| 424 | } | ||
| 425 | |||
| 320 | impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> | 426 | impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> |
| 321 | where | 427 | where |
| 322 | M: RawMutex, | 428 | M: RawMutex, |
