diff options
| author | huntc <[email protected]> | 2021-07-11 11:47:09 +1000 |
|---|---|---|
| committer | huntc <[email protected]> | 2021-07-15 12:31:52 +1000 |
| commit | 108cffcba02d5f84099991d670cddfb458e2c106 (patch) | |
| tree | 429db59adcfb10d691fde2d4eb3a2dfa66107e43 | |
| parent | dcd0c38109ed6711d91c4bdff42825f25e3ee402 (diff) | |
Migrated to the waker registration functionality for Embassy specific optimisations
| -rw-r--r-- | embassy/Cargo.toml | 1 | ||||
| -rw-r--r-- | embassy/src/util/mpsc.rs | 61 |
2 files changed, 20 insertions, 42 deletions
diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index d26490247..c03fc0df5 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml | |||
| @@ -39,6 +39,7 @@ embedded-hal = "0.2.5" | |||
| 39 | cast = { version = "=0.2.3", default-features = false } | 39 | cast = { version = "=0.2.3", default-features = false } |
| 40 | 40 | ||
| 41 | [dev-dependencies] | 41 | [dev-dependencies] |
| 42 | embassy = { path = ".", features = ["executor-agnostic"] } | ||
| 42 | futures-executor = { version = "0.3", features = [ "thread-pool" ] } | 43 | futures-executor = { version = "0.3", features = [ "thread-pool" ] } |
| 43 | futures-test = "0.3" | 44 | futures-test = "0.3" |
| 44 | futures-timer = "0.3" | 45 | futures-timer = "0.3" |
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index 8f1bba764..580c6794f 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs | |||
| @@ -51,6 +51,7 @@ use super::CriticalSectionMutex; | |||
| 51 | use super::Mutex; | 51 | use super::Mutex; |
| 52 | use super::NoopMutex; | 52 | use super::NoopMutex; |
| 53 | use super::ThreadModeMutex; | 53 | use super::ThreadModeMutex; |
| 54 | use super::WakerRegistration; | ||
| 54 | 55 | ||
| 55 | /// Send values to the associated `Receiver`. | 56 | /// Send values to the associated `Receiver`. |
| 56 | /// | 57 | /// |
| @@ -149,7 +150,7 @@ where | |||
| 149 | Ok(v) => Poll::Ready(Some(v)), | 150 | Ok(v) => Poll::Ready(Some(v)), |
| 150 | Err(TryRecvError::Closed) => Poll::Ready(None), | 151 | Err(TryRecvError::Closed) => Poll::Ready(None), |
| 151 | Err(TryRecvError::Empty) => { | 152 | Err(TryRecvError::Empty) => { |
| 152 | self.channel.get().set_receiver_waker(cx.waker().clone()); | 153 | self.channel.get().set_receiver_waker(&cx.waker()); |
| 153 | Poll::Pending | 154 | Poll::Pending |
| 154 | } | 155 | } |
| 155 | } | 156 | } |
| @@ -282,10 +283,7 @@ where | |||
| 282 | Ok(..) => Poll::Ready(Ok(())), | 283 | Ok(..) => Poll::Ready(Ok(())), |
| 283 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), | 284 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), |
| 284 | Err(TrySendError::Full(..)) => { | 285 | Err(TrySendError::Full(..)) => { |
| 285 | self.sender | 286 | self.sender.channel.get().set_senders_waker(&cx.waker()); |
| 286 | .channel | ||
| 287 | .get() | ||
| 288 | .set_senders_waker(cx.waker().clone()); | ||
| 289 | Poll::Pending | 287 | Poll::Pending |
| 290 | // Note we leave the existing UnsafeCell contents - they still | 288 | // Note we leave the existing UnsafeCell contents - they still |
| 291 | // contain the original message. We could create another UnsafeCell | 289 | // contain the original message. We could create another UnsafeCell |
| @@ -312,10 +310,7 @@ where | |||
| 312 | if self.sender.is_closed() { | 310 | if self.sender.is_closed() { |
| 313 | Poll::Ready(()) | 311 | Poll::Ready(()) |
| 314 | } else { | 312 | } else { |
| 315 | self.sender | 313 | self.sender.channel.get().set_senders_waker(&cx.waker()); |
| 316 | .channel | ||
| 317 | .get() | ||
| 318 | .set_senders_waker(cx.waker().clone()); | ||
| 319 | Poll::Pending | 314 | Poll::Pending |
| 320 | } | 315 | } |
| 321 | } | 316 | } |
| @@ -400,8 +395,8 @@ struct ChannelState<T, const N: usize> { | |||
| 400 | closed: bool, | 395 | closed: bool, |
| 401 | receiver_registered: bool, | 396 | receiver_registered: bool, |
| 402 | senders_registered: u32, | 397 | senders_registered: u32, |
| 403 | receiver_waker: Option<Waker>, | 398 | receiver_waker: WakerRegistration, |
| 404 | senders_waker: Option<Waker>, | 399 | senders_waker: WakerRegistration, |
| 405 | } | 400 | } |
| 406 | 401 | ||
| 407 | impl<T, const N: usize> ChannelState<T, N> { | 402 | impl<T, const N: usize> ChannelState<T, N> { |
| @@ -416,8 +411,8 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 416 | let closed = false; | 411 | let closed = false; |
| 417 | let receiver_registered = false; | 412 | let receiver_registered = false; |
| 418 | let senders_registered = 0; | 413 | let senders_registered = 0; |
| 419 | let receiver_waker = None; | 414 | let receiver_waker = WakerRegistration::new(); |
| 420 | let senders_waker = None; | 415 | let senders_waker = WakerRegistration::new(); |
| 421 | ChannelState { | 416 | ChannelState { |
| 422 | buf, | 417 | buf, |
| 423 | read_pos, | 418 | read_pos, |
| @@ -534,9 +529,7 @@ where | |||
| 534 | if state.read_pos != state.write_pos || state.full { | 529 | if state.read_pos != state.write_pos || state.full { |
| 535 | if state.full { | 530 | if state.full { |
| 536 | state.full = false; | 531 | state.full = false; |
| 537 | if let Some(w) = state.senders_waker.take() { | 532 | state.senders_waker.wake(); |
| 538 | w.wake(); | ||
| 539 | } | ||
| 540 | } | 533 | } |
| 541 | let message = | 534 | let message = |
| 542 | unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() }; | 535 | unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() }; |
| @@ -546,9 +539,7 @@ where | |||
| 546 | Err(TryRecvError::Empty) | 539 | Err(TryRecvError::Empty) |
| 547 | } else { | 540 | } else { |
| 548 | state.closed = true; | 541 | state.closed = true; |
| 549 | if let Some(w) = state.senders_waker.take() { | 542 | state.senders_waker.wake(); |
| 550 | w.wake(); | ||
| 551 | } | ||
| 552 | Err(TryRecvError::Closed) | 543 | Err(TryRecvError::Closed) |
| 553 | } | 544 | } |
| 554 | } else { | 545 | } else { |
| @@ -567,9 +558,7 @@ where | |||
| 567 | if state.write_pos == state.read_pos { | 558 | if state.write_pos == state.read_pos { |
| 568 | state.full = true; | 559 | state.full = true; |
| 569 | } | 560 | } |
| 570 | if let Some(w) = state.receiver_waker.take() { | 561 | state.receiver_waker.wake(); |
| 571 | w.wake(); | ||
| 572 | } | ||
| 573 | Ok(()) | 562 | Ok(()) |
| 574 | } else { | 563 | } else { |
| 575 | Err(TrySendError::Full(message)) | 564 | Err(TrySendError::Full(message)) |
| @@ -583,9 +572,7 @@ where | |||
| 583 | fn close(&mut self) { | 572 | fn close(&mut self) { |
| 584 | let state = &mut self.state; | 573 | let state = &mut self.state; |
| 585 | self.mutex.lock(|_| { | 574 | self.mutex.lock(|_| { |
| 586 | if let Some(w) = state.receiver_waker.take() { | 575 | state.receiver_waker.wake(); |
| 587 | w.wake(); | ||
| 588 | } | ||
| 589 | state.closing = true; | 576 | state.closing = true; |
| 590 | }); | 577 | }); |
| 591 | } | 578 | } |
| @@ -608,9 +595,7 @@ where | |||
| 608 | self.mutex.lock(|_| { | 595 | self.mutex.lock(|_| { |
| 609 | if state.receiver_registered { | 596 | if state.receiver_registered { |
| 610 | state.closed = true; | 597 | state.closed = true; |
| 611 | if let Some(w) = state.senders_waker.take() { | 598 | state.senders_waker.wake(); |
| 612 | w.wake(); | ||
| 613 | } | ||
| 614 | } | 599 | } |
| 615 | state.receiver_registered = false; | 600 | state.receiver_registered = false; |
| 616 | }) | 601 | }) |
| @@ -629,38 +614,30 @@ where | |||
| 629 | assert!(state.senders_registered > 0); | 614 | assert!(state.senders_registered > 0); |
| 630 | state.senders_registered -= 1; | 615 | state.senders_registered -= 1; |
| 631 | if state.senders_registered == 0 { | 616 | if state.senders_registered == 0 { |
| 632 | if let Some(w) = state.receiver_waker.take() { | 617 | state.receiver_waker.wake(); |
| 633 | w.wake(); | ||
| 634 | } | ||
| 635 | state.closing = true; | 618 | state.closing = true; |
| 636 | } | 619 | } |
| 637 | }) | 620 | }) |
| 638 | } | 621 | } |
| 639 | 622 | ||
| 640 | fn set_receiver_waker(&mut self, receiver_waker: Waker) { | 623 | fn set_receiver_waker(&mut self, receiver_waker: &Waker) { |
| 641 | let state = &mut self.state; | 624 | let state = &mut self.state; |
| 642 | self.mutex.lock(|_| { | 625 | self.mutex.lock(|_| { |
| 643 | state.receiver_waker = Some(receiver_waker); | 626 | state.receiver_waker.register(receiver_waker); |
| 644 | }) | 627 | }) |
| 645 | } | 628 | } |
| 646 | 629 | ||
| 647 | fn set_senders_waker(&mut self, senders_waker: Waker) { | 630 | fn set_senders_waker(&mut self, senders_waker: &Waker) { |
| 648 | let state = &mut self.state; | 631 | let state = &mut self.state; |
| 649 | self.mutex.lock(|_| { | 632 | self.mutex.lock(|_| { |
| 650 | |||
| 651 | // Dispose of any existing sender causing them to be polled again. | 633 | // Dispose of any existing sender causing them to be polled again. |
| 652 | // This could cause a spin given multiple concurrent senders, however given that | 634 | // This could cause a spin given multiple concurrent senders, however given that |
| 653 | // most sends only block waiting for the receiver to become active, this should | 635 | // most sends only block waiting for the receiver to become active, this should |
| 654 | // be a short-lived activity. The upside is a greatly simplified implementation | 636 | // be a short-lived activity. The upside is a greatly simplified implementation |
| 655 | // that avoids the need for intrusive linked-lists and unsafe operations on pinned | 637 | // that avoids the need for intrusive linked-lists and unsafe operations on pinned |
| 656 | // pointers. | 638 | // pointers. |
| 657 | if let Some(waker) = state.senders_waker.clone() { | 639 | state.senders_waker.wake(); |
| 658 | if !senders_waker.will_wake(&waker) { | 640 | state.senders_waker.register(senders_waker); |
| 659 | trace!("Waking an an active send waker due to being superseded with a new one. While benign, please report this."); | ||
| 660 | waker.wake(); | ||
| 661 | } | ||
| 662 | } | ||
| 663 | state.senders_waker = Some(senders_waker); | ||
| 664 | }) | 641 | }) |
| 665 | } | 642 | } |
| 666 | } | 643 | } |
