diff options
| author | Dario Nieuwenhuis <[email protected]> | 2023-09-04 23:38:11 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-04 23:38:11 +0200 |
| commit | ce662766be80d75b5f9294ae4b792f7db252ccd3 (patch) | |
| tree | 3f785aa5a74823a00c980ce7e24ae62d5e89ae0f | |
| parent | a03b6be693293f8a646a96eb3f26c3e79e0882fe (diff) | |
| parent | 615882ebd67f4e7e60fb8aa1505b1272655c4fa4 (diff) | |
Merge pull request #1771 from rubdos/zero-copy-channel
embassy_sync::zero_copy_channel
| -rw-r--r-- | embassy-net-driver-channel/src/lib.rs | 235 | ||||
| -rw-r--r-- | embassy-sync/src/lib.rs | 1 | ||||
| -rw-r--r-- | embassy-sync/src/zerocopy_channel.rs | 260 |
3 files changed, 273 insertions, 223 deletions
diff --git a/embassy-net-driver-channel/src/lib.rs b/embassy-net-driver-channel/src/lib.rs index f2aa6b254..bf7ae5217 100644 --- a/embassy-net-driver-channel/src/lib.rs +++ b/embassy-net-driver-channel/src/lib.rs | |||
| @@ -14,6 +14,7 @@ use embassy_net_driver::{Capabilities, LinkState, Medium}; | |||
| 14 | use embassy_sync::blocking_mutex::raw::NoopRawMutex; | 14 | use embassy_sync::blocking_mutex::raw::NoopRawMutex; |
| 15 | use embassy_sync::blocking_mutex::Mutex; | 15 | use embassy_sync::blocking_mutex::Mutex; |
| 16 | use embassy_sync::waitqueue::WakerRegistration; | 16 | use embassy_sync::waitqueue::WakerRegistration; |
| 17 | use embassy_sync::zerocopy_channel; | ||
| 17 | 18 | ||
| 18 | pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> { | 19 | pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> { |
| 19 | rx: [PacketBuf<MTU>; N_RX], | 20 | rx: [PacketBuf<MTU>; N_RX], |
| @@ -130,24 +131,24 @@ impl<'d, const MTU: usize> Runner<'d, MTU> { | |||
| 130 | } | 131 | } |
| 131 | 132 | ||
| 132 | pub async fn tx_buf(&mut self) -> &mut [u8] { | 133 | pub async fn tx_buf(&mut self) -> &mut [u8] { |
| 133 | let p = self.tx_chan.recv().await; | 134 | let p = self.tx_chan.receive().await; |
| 134 | &mut p.buf[..p.len] | 135 | &mut p.buf[..p.len] |
| 135 | } | 136 | } |
| 136 | 137 | ||
| 137 | pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { | 138 | pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { |
| 138 | let p = self.tx_chan.try_recv()?; | 139 | let p = self.tx_chan.try_receive()?; |
| 139 | Some(&mut p.buf[..p.len]) | 140 | Some(&mut p.buf[..p.len]) |
| 140 | } | 141 | } |
| 141 | 142 | ||
| 142 | pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { | 143 | pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { |
| 143 | match self.tx_chan.poll_recv(cx) { | 144 | match self.tx_chan.poll_receive(cx) { |
| 144 | Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), | 145 | Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), |
| 145 | Poll::Pending => Poll::Pending, | 146 | Poll::Pending => Poll::Pending, |
| 146 | } | 147 | } |
| 147 | } | 148 | } |
| 148 | 149 | ||
| 149 | pub fn tx_done(&mut self) { | 150 | pub fn tx_done(&mut self) { |
| 150 | self.tx_chan.recv_done(); | 151 | self.tx_chan.receive_done(); |
| 151 | } | 152 | } |
| 152 | } | 153 | } |
| 153 | 154 | ||
| @@ -204,24 +205,24 @@ impl<'d, const MTU: usize> RxRunner<'d, MTU> { | |||
| 204 | 205 | ||
| 205 | impl<'d, const MTU: usize> TxRunner<'d, MTU> { | 206 | impl<'d, const MTU: usize> TxRunner<'d, MTU> { |
| 206 | pub async fn tx_buf(&mut self) -> &mut [u8] { | 207 | pub async fn tx_buf(&mut self) -> &mut [u8] { |
| 207 | let p = self.tx_chan.recv().await; | 208 | let p = self.tx_chan.receive().await; |
| 208 | &mut p.buf[..p.len] | 209 | &mut p.buf[..p.len] |
| 209 | } | 210 | } |
| 210 | 211 | ||
| 211 | pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { | 212 | pub fn try_tx_buf(&mut self) -> Option<&mut [u8]> { |
| 212 | let p = self.tx_chan.try_recv()?; | 213 | let p = self.tx_chan.try_receive()?; |
| 213 | Some(&mut p.buf[..p.len]) | 214 | Some(&mut p.buf[..p.len]) |
| 214 | } | 215 | } |
| 215 | 216 | ||
| 216 | pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { | 217 | pub fn poll_tx_buf(&mut self, cx: &mut Context) -> Poll<&mut [u8]> { |
| 217 | match self.tx_chan.poll_recv(cx) { | 218 | match self.tx_chan.poll_receive(cx) { |
| 218 | Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), | 219 | Poll::Ready(p) => Poll::Ready(&mut p.buf[..p.len]), |
| 219 | Poll::Pending => Poll::Pending, | 220 | Poll::Pending => Poll::Pending, |
| 220 | } | 221 | } |
| 221 | } | 222 | } |
| 222 | 223 | ||
| 223 | pub fn tx_done(&mut self) { | 224 | pub fn tx_done(&mut self) { |
| 224 | self.tx_chan.recv_done(); | 225 | self.tx_chan.receive_done(); |
| 225 | } | 226 | } |
| 226 | } | 227 | } |
| 227 | 228 | ||
| @@ -293,7 +294,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> { | |||
| 293 | type TxToken<'a> = TxToken<'a, MTU> where Self: 'a ; | 294 | type TxToken<'a> = TxToken<'a, MTU> where Self: 'a ; |
| 294 | 295 | ||
| 295 | fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { | 296 | fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { |
| 296 | if self.rx.poll_recv(cx).is_ready() && self.tx.poll_send(cx).is_ready() { | 297 | if self.rx.poll_receive(cx).is_ready() && self.tx.poll_send(cx).is_ready() { |
| 297 | Some((RxToken { rx: self.rx.borrow() }, TxToken { tx: self.tx.borrow() })) | 298 | Some((RxToken { rx: self.rx.borrow() }, TxToken { tx: self.tx.borrow() })) |
| 298 | } else { | 299 | } else { |
| 299 | None | 300 | None |
| @@ -337,9 +338,9 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> { | |||
| 337 | F: FnOnce(&mut [u8]) -> R, | 338 | F: FnOnce(&mut [u8]) -> R, |
| 338 | { | 339 | { |
| 339 | // NOTE(unwrap): we checked the queue wasn't full when creating the token. | 340 | // NOTE(unwrap): we checked the queue wasn't full when creating the token. |
| 340 | let pkt = unwrap!(self.rx.try_recv()); | 341 | let pkt = unwrap!(self.rx.try_receive()); |
| 341 | let r = f(&mut pkt.buf[..pkt.len]); | 342 | let r = f(&mut pkt.buf[..pkt.len]); |
| 342 | self.rx.recv_done(); | 343 | self.rx.receive_done(); |
| 343 | r | 344 | r |
| 344 | } | 345 | } |
| 345 | } | 346 | } |
| @@ -361,215 +362,3 @@ impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> { | |||
| 361 | r | 362 | r |
| 362 | } | 363 | } |
| 363 | } | 364 | } |
| 364 | |||
| 365 | mod zerocopy_channel { | ||
| 366 | use core::cell::RefCell; | ||
| 367 | use core::future::poll_fn; | ||
| 368 | use core::marker::PhantomData; | ||
| 369 | use core::task::{Context, Poll}; | ||
| 370 | |||
| 371 | use embassy_sync::blocking_mutex::raw::RawMutex; | ||
| 372 | use embassy_sync::blocking_mutex::Mutex; | ||
| 373 | use embassy_sync::waitqueue::WakerRegistration; | ||
| 374 | |||
| 375 | pub struct Channel<'a, M: RawMutex, T> { | ||
| 376 | buf: *mut T, | ||
| 377 | phantom: PhantomData<&'a mut T>, | ||
| 378 | state: Mutex<M, RefCell<State>>, | ||
| 379 | } | ||
| 380 | |||
| 381 | impl<'a, M: RawMutex, T> Channel<'a, M, T> { | ||
| 382 | pub fn new(buf: &'a mut [T]) -> Self { | ||
| 383 | let len = buf.len(); | ||
| 384 | assert!(len != 0); | ||
| 385 | |||
| 386 | Self { | ||
| 387 | buf: buf.as_mut_ptr(), | ||
| 388 | phantom: PhantomData, | ||
| 389 | state: Mutex::new(RefCell::new(State { | ||
| 390 | len, | ||
| 391 | front: 0, | ||
| 392 | back: 0, | ||
| 393 | full: false, | ||
| 394 | send_waker: WakerRegistration::new(), | ||
| 395 | recv_waker: WakerRegistration::new(), | ||
| 396 | })), | ||
| 397 | } | ||
| 398 | } | ||
| 399 | |||
| 400 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { | ||
| 401 | (Sender { channel: self }, Receiver { channel: self }) | ||
| 402 | } | ||
| 403 | } | ||
| 404 | |||
| 405 | pub struct Sender<'a, M: RawMutex, T> { | ||
| 406 | channel: &'a Channel<'a, M, T>, | ||
| 407 | } | ||
| 408 | |||
| 409 | impl<'a, M: RawMutex, T> Sender<'a, M, T> { | ||
| 410 | pub fn borrow(&mut self) -> Sender<'_, M, T> { | ||
| 411 | Sender { channel: self.channel } | ||
| 412 | } | ||
| 413 | |||
| 414 | pub fn try_send(&mut self) -> Option<&mut T> { | ||
| 415 | self.channel.state.lock(|s| { | ||
| 416 | let s = &mut *s.borrow_mut(); | ||
| 417 | match s.push_index() { | ||
| 418 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 419 | None => None, | ||
| 420 | } | ||
| 421 | }) | ||
| 422 | } | ||
| 423 | |||
| 424 | pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { | ||
| 425 | self.channel.state.lock(|s| { | ||
| 426 | let s = &mut *s.borrow_mut(); | ||
| 427 | match s.push_index() { | ||
| 428 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 429 | None => { | ||
| 430 | s.recv_waker.register(cx.waker()); | ||
| 431 | Poll::Pending | ||
| 432 | } | ||
| 433 | } | ||
| 434 | }) | ||
| 435 | } | ||
| 436 | |||
| 437 | pub async fn send(&mut self) -> &mut T { | ||
| 438 | let i = poll_fn(|cx| { | ||
| 439 | self.channel.state.lock(|s| { | ||
| 440 | let s = &mut *s.borrow_mut(); | ||
| 441 | match s.push_index() { | ||
| 442 | Some(i) => Poll::Ready(i), | ||
| 443 | None => { | ||
| 444 | s.recv_waker.register(cx.waker()); | ||
| 445 | Poll::Pending | ||
| 446 | } | ||
| 447 | } | ||
| 448 | }) | ||
| 449 | }) | ||
| 450 | .await; | ||
| 451 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 452 | } | ||
| 453 | |||
| 454 | pub fn send_done(&mut self) { | ||
| 455 | self.channel.state.lock(|s| s.borrow_mut().push_done()) | ||
| 456 | } | ||
| 457 | } | ||
| 458 | pub struct Receiver<'a, M: RawMutex, T> { | ||
| 459 | channel: &'a Channel<'a, M, T>, | ||
| 460 | } | ||
| 461 | |||
| 462 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | ||
| 463 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { | ||
| 464 | Receiver { channel: self.channel } | ||
| 465 | } | ||
| 466 | |||
| 467 | pub fn try_recv(&mut self) -> Option<&mut T> { | ||
| 468 | self.channel.state.lock(|s| { | ||
| 469 | let s = &mut *s.borrow_mut(); | ||
| 470 | match s.pop_index() { | ||
| 471 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 472 | None => None, | ||
| 473 | } | ||
| 474 | }) | ||
| 475 | } | ||
| 476 | |||
| 477 | pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> { | ||
| 478 | self.channel.state.lock(|s| { | ||
| 479 | let s = &mut *s.borrow_mut(); | ||
| 480 | match s.pop_index() { | ||
| 481 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 482 | None => { | ||
| 483 | s.send_waker.register(cx.waker()); | ||
| 484 | Poll::Pending | ||
| 485 | } | ||
| 486 | } | ||
| 487 | }) | ||
| 488 | } | ||
| 489 | |||
| 490 | pub async fn recv(&mut self) -> &mut T { | ||
| 491 | let i = poll_fn(|cx| { | ||
| 492 | self.channel.state.lock(|s| { | ||
| 493 | let s = &mut *s.borrow_mut(); | ||
| 494 | match s.pop_index() { | ||
| 495 | Some(i) => Poll::Ready(i), | ||
| 496 | None => { | ||
| 497 | s.send_waker.register(cx.waker()); | ||
| 498 | Poll::Pending | ||
| 499 | } | ||
| 500 | } | ||
| 501 | }) | ||
| 502 | }) | ||
| 503 | .await; | ||
| 504 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 505 | } | ||
| 506 | |||
| 507 | pub fn recv_done(&mut self) { | ||
| 508 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) | ||
| 509 | } | ||
| 510 | } | ||
| 511 | |||
| 512 | struct State { | ||
| 513 | len: usize, | ||
| 514 | |||
| 515 | /// Front index. Always 0..=(N-1) | ||
| 516 | front: usize, | ||
| 517 | /// Back index. Always 0..=(N-1). | ||
| 518 | back: usize, | ||
| 519 | |||
| 520 | /// Used to distinguish "empty" and "full" cases when `front == back`. | ||
| 521 | /// May only be `true` if `front == back`, always `false` otherwise. | ||
| 522 | full: bool, | ||
| 523 | |||
| 524 | send_waker: WakerRegistration, | ||
| 525 | recv_waker: WakerRegistration, | ||
| 526 | } | ||
| 527 | |||
| 528 | impl State { | ||
| 529 | fn increment(&self, i: usize) -> usize { | ||
| 530 | if i + 1 == self.len { | ||
| 531 | 0 | ||
| 532 | } else { | ||
| 533 | i + 1 | ||
| 534 | } | ||
| 535 | } | ||
| 536 | |||
| 537 | fn is_full(&self) -> bool { | ||
| 538 | self.full | ||
| 539 | } | ||
| 540 | |||
| 541 | fn is_empty(&self) -> bool { | ||
| 542 | self.front == self.back && !self.full | ||
| 543 | } | ||
| 544 | |||
| 545 | fn push_index(&mut self) -> Option<usize> { | ||
| 546 | match self.is_full() { | ||
| 547 | true => None, | ||
| 548 | false => Some(self.back), | ||
| 549 | } | ||
| 550 | } | ||
| 551 | |||
| 552 | fn push_done(&mut self) { | ||
| 553 | assert!(!self.is_full()); | ||
| 554 | self.back = self.increment(self.back); | ||
| 555 | if self.back == self.front { | ||
| 556 | self.full = true; | ||
| 557 | } | ||
| 558 | self.send_waker.wake(); | ||
| 559 | } | ||
| 560 | |||
| 561 | fn pop_index(&mut self) -> Option<usize> { | ||
| 562 | match self.is_empty() { | ||
| 563 | true => None, | ||
| 564 | false => Some(self.front), | ||
| 565 | } | ||
| 566 | } | ||
| 567 | |||
| 568 | fn pop_done(&mut self) { | ||
| 569 | assert!(!self.is_empty()); | ||
| 570 | self.front = self.increment(self.front); | ||
| 571 | self.full = false; | ||
| 572 | self.recv_waker.wake(); | ||
| 573 | } | ||
| 574 | } | ||
| 575 | } | ||
diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index 53d95d081..8a9f841ee 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs | |||
| @@ -17,3 +17,4 @@ pub mod pipe; | |||
| 17 | pub mod pubsub; | 17 | pub mod pubsub; |
| 18 | pub mod signal; | 18 | pub mod signal; |
| 19 | pub mod waitqueue; | 19 | pub mod waitqueue; |
| 20 | pub mod zerocopy_channel; | ||
diff --git a/embassy-sync/src/zerocopy_channel.rs b/embassy-sync/src/zerocopy_channel.rs new file mode 100644 index 000000000..f704cbd5d --- /dev/null +++ b/embassy-sync/src/zerocopy_channel.rs | |||
| @@ -0,0 +1,260 @@ | |||
| 1 | //! A zero-copy queue for sending values between asynchronous tasks. | ||
| 2 | //! | ||
| 3 | //! It can be used concurrently by multiple producers (senders) and multiple | ||
| 4 | //! consumers (receivers), i.e. it is an "MPMC channel". | ||
| 5 | //! | ||
| 6 | //! Receivers are competing for messages. So a message that is received by | ||
| 7 | //! one receiver is not received by any other. | ||
| 8 | //! | ||
| 9 | //! This queue takes a Mutex type so that various | ||
| 10 | //! targets can be attained. For example, a ThreadModeMutex can be used | ||
| 11 | //! for single-core Cortex-M targets where messages are only passed | ||
| 12 | //! between tasks running in thread mode. Similarly, a CriticalSectionMutex | ||
| 13 | //! can also be used for single-core targets where messages are to be | ||
| 14 | //! passed from exception mode e.g. out of an interrupt handler. | ||
| 15 | //! | ||
| 16 | //! This module provides a bounded channel that has a limit on the number of | ||
| 17 | //! messages that it can store, and if this limit is reached, trying to send | ||
| 18 | //! another message will result in an error being returned. | ||
| 19 | |||
| 20 | use core::cell::RefCell; | ||
| 21 | use core::future::poll_fn; | ||
| 22 | use core::marker::PhantomData; | ||
| 23 | use core::task::{Context, Poll}; | ||
| 24 | |||
| 25 | use crate::blocking_mutex::raw::RawMutex; | ||
| 26 | use crate::blocking_mutex::Mutex; | ||
| 27 | use crate::waitqueue::WakerRegistration; | ||
| 28 | |||
| 29 | /// A bounded zero-copy channel for communicating between asynchronous tasks | ||
| 30 | /// with backpressure. | ||
| 31 | /// | ||
| 32 | /// The channel will buffer up to the provided number of messages. Once the | ||
| 33 | /// buffer is full, attempts to `send` new messages will wait until a message is | ||
| 34 | /// received from the channel. | ||
| 35 | /// | ||
| 36 | /// All data sent will become available in the same order as it was sent. | ||
| 37 | /// | ||
| 38 | /// The channel requires a buffer of recyclable elements. Writing to the channel is done through | ||
| 39 | /// an `&mut T`. | ||
| 40 | pub struct Channel<'a, M: RawMutex, T> { | ||
| 41 | buf: *mut T, | ||
| 42 | phantom: PhantomData<&'a mut T>, | ||
| 43 | state: Mutex<M, RefCell<State>>, | ||
| 44 | } | ||
| 45 | |||
| 46 | impl<'a, M: RawMutex, T> Channel<'a, M, T> { | ||
| 47 | /// Initialize a new [`Channel`]. | ||
| 48 | /// | ||
| 49 | /// The provided buffer will be used and reused by the channel's logic, and thus dictates the | ||
| 50 | /// channel's capacity. | ||
| 51 | pub fn new(buf: &'a mut [T]) -> Self { | ||
| 52 | let len = buf.len(); | ||
| 53 | assert!(len != 0); | ||
| 54 | |||
| 55 | Self { | ||
| 56 | buf: buf.as_mut_ptr(), | ||
| 57 | phantom: PhantomData, | ||
| 58 | state: Mutex::new(RefCell::new(State { | ||
| 59 | len, | ||
| 60 | front: 0, | ||
| 61 | back: 0, | ||
| 62 | full: false, | ||
| 63 | send_waker: WakerRegistration::new(), | ||
| 64 | receive_waker: WakerRegistration::new(), | ||
| 65 | })), | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 69 | /// Creates a [`Sender`] and [`Receiver`] from an existing channel. | ||
| 70 | /// | ||
| 71 | /// Further Senders and Receivers can be created through [`Sender::borrow`] and | ||
| 72 | /// [`Receiver::borrow`] respectively. | ||
| 73 | pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { | ||
| 74 | (Sender { channel: self }, Receiver { channel: self }) | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 78 | /// Send-only access to a [`Channel`]. | ||
| 79 | pub struct Sender<'a, M: RawMutex, T> { | ||
| 80 | channel: &'a Channel<'a, M, T>, | ||
| 81 | } | ||
| 82 | |||
| 83 | impl<'a, M: RawMutex, T> Sender<'a, M, T> { | ||
| 84 | /// Creates one further [`Sender`] over the same channel. | ||
| 85 | pub fn borrow(&mut self) -> Sender<'_, M, T> { | ||
| 86 | Sender { channel: self.channel } | ||
| 87 | } | ||
| 88 | |||
| 89 | /// Attempts to send a value over the channel. | ||
| 90 | pub fn try_send(&mut self) -> Option<&mut T> { | ||
| 91 | self.channel.state.lock(|s| { | ||
| 92 | let s = &mut *s.borrow_mut(); | ||
| 93 | match s.push_index() { | ||
| 94 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 95 | None => None, | ||
| 96 | } | ||
| 97 | }) | ||
| 98 | } | ||
| 99 | |||
| 100 | /// Attempts to send a value over the channel. | ||
| 101 | pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { | ||
| 102 | self.channel.state.lock(|s| { | ||
| 103 | let s = &mut *s.borrow_mut(); | ||
| 104 | match s.push_index() { | ||
| 105 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 106 | None => { | ||
| 107 | s.receive_waker.register(cx.waker()); | ||
| 108 | Poll::Pending | ||
| 109 | } | ||
| 110 | } | ||
| 111 | }) | ||
| 112 | } | ||
| 113 | |||
| 114 | /// Asynchronously send a value over the channel. | ||
| 115 | pub async fn send(&mut self) -> &mut T { | ||
| 116 | let i = poll_fn(|cx| { | ||
| 117 | self.channel.state.lock(|s| { | ||
| 118 | let s = &mut *s.borrow_mut(); | ||
| 119 | match s.push_index() { | ||
| 120 | Some(i) => Poll::Ready(i), | ||
| 121 | None => { | ||
| 122 | s.receive_waker.register(cx.waker()); | ||
| 123 | Poll::Pending | ||
| 124 | } | ||
| 125 | } | ||
| 126 | }) | ||
| 127 | }) | ||
| 128 | .await; | ||
| 129 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 130 | } | ||
| 131 | |||
| 132 | /// Notify the channel that the sending of the value has been finalized. | ||
| 133 | pub fn send_done(&mut self) { | ||
| 134 | self.channel.state.lock(|s| s.borrow_mut().push_done()) | ||
| 135 | } | ||
| 136 | } | ||
| 137 | |||
| 138 | /// Receive-only access to a [`Channel`]. | ||
| 139 | pub struct Receiver<'a, M: RawMutex, T> { | ||
| 140 | channel: &'a Channel<'a, M, T>, | ||
| 141 | } | ||
| 142 | |||
| 143 | impl<'a, M: RawMutex, T> Receiver<'a, M, T> { | ||
| 144 | /// Creates one further [`Sender`] over the same channel. | ||
| 145 | pub fn borrow(&mut self) -> Receiver<'_, M, T> { | ||
| 146 | Receiver { channel: self.channel } | ||
| 147 | } | ||
| 148 | |||
| 149 | /// Attempts to receive a value over the channel. | ||
| 150 | pub fn try_receive(&mut self) -> Option<&mut T> { | ||
| 151 | self.channel.state.lock(|s| { | ||
| 152 | let s = &mut *s.borrow_mut(); | ||
| 153 | match s.pop_index() { | ||
| 154 | Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 155 | None => None, | ||
| 156 | } | ||
| 157 | }) | ||
| 158 | } | ||
| 159 | |||
| 160 | /// Attempts to asynchronously receive a value over the channel. | ||
| 161 | pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { | ||
| 162 | self.channel.state.lock(|s| { | ||
| 163 | let s = &mut *s.borrow_mut(); | ||
| 164 | match s.pop_index() { | ||
| 165 | Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }), | ||
| 166 | None => { | ||
| 167 | s.send_waker.register(cx.waker()); | ||
| 168 | Poll::Pending | ||
| 169 | } | ||
| 170 | } | ||
| 171 | }) | ||
| 172 | } | ||
| 173 | |||
| 174 | /// Asynchronously receive a value over the channel. | ||
| 175 | pub async fn receive(&mut self) -> &mut T { | ||
| 176 | let i = poll_fn(|cx| { | ||
| 177 | self.channel.state.lock(|s| { | ||
| 178 | let s = &mut *s.borrow_mut(); | ||
| 179 | match s.pop_index() { | ||
| 180 | Some(i) => Poll::Ready(i), | ||
| 181 | None => { | ||
| 182 | s.send_waker.register(cx.waker()); | ||
| 183 | Poll::Pending | ||
| 184 | } | ||
| 185 | } | ||
| 186 | }) | ||
| 187 | }) | ||
| 188 | .await; | ||
| 189 | unsafe { &mut *self.channel.buf.add(i) } | ||
| 190 | } | ||
| 191 | |||
| 192 | /// Notify the channel that the receiving of the value has been finalized. | ||
| 193 | pub fn receive_done(&mut self) { | ||
| 194 | self.channel.state.lock(|s| s.borrow_mut().pop_done()) | ||
| 195 | } | ||
| 196 | } | ||
| 197 | |||
| 198 | struct State { | ||
| 199 | len: usize, | ||
| 200 | |||
| 201 | /// Front index. Always 0..=(N-1) | ||
| 202 | front: usize, | ||
| 203 | /// Back index. Always 0..=(N-1). | ||
| 204 | back: usize, | ||
| 205 | |||
| 206 | /// Used to distinguish "empty" and "full" cases when `front == back`. | ||
| 207 | /// May only be `true` if `front == back`, always `false` otherwise. | ||
| 208 | full: bool, | ||
| 209 | |||
| 210 | send_waker: WakerRegistration, | ||
| 211 | receive_waker: WakerRegistration, | ||
| 212 | } | ||
| 213 | |||
| 214 | impl State { | ||
| 215 | fn increment(&self, i: usize) -> usize { | ||
| 216 | if i + 1 == self.len { | ||
| 217 | 0 | ||
| 218 | } else { | ||
| 219 | i + 1 | ||
| 220 | } | ||
| 221 | } | ||
| 222 | |||
| 223 | fn is_full(&self) -> bool { | ||
| 224 | self.full | ||
| 225 | } | ||
| 226 | |||
| 227 | fn is_empty(&self) -> bool { | ||
| 228 | self.front == self.back && !self.full | ||
| 229 | } | ||
| 230 | |||
| 231 | fn push_index(&mut self) -> Option<usize> { | ||
| 232 | match self.is_full() { | ||
| 233 | true => None, | ||
| 234 | false => Some(self.back), | ||
| 235 | } | ||
| 236 | } | ||
| 237 | |||
| 238 | fn push_done(&mut self) { | ||
| 239 | assert!(!self.is_full()); | ||
| 240 | self.back = self.increment(self.back); | ||
| 241 | if self.back == self.front { | ||
| 242 | self.full = true; | ||
| 243 | } | ||
| 244 | self.send_waker.wake(); | ||
| 245 | } | ||
| 246 | |||
| 247 | fn pop_index(&mut self) -> Option<usize> { | ||
| 248 | match self.is_empty() { | ||
| 249 | true => None, | ||
| 250 | false => Some(self.front), | ||
| 251 | } | ||
| 252 | } | ||
| 253 | |||
| 254 | fn pop_done(&mut self) { | ||
| 255 | assert!(!self.is_empty()); | ||
| 256 | self.front = self.increment(self.front); | ||
| 257 | self.full = false; | ||
| 258 | self.receive_waker.wake(); | ||
| 259 | } | ||
| 260 | } | ||
