aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuntc <[email protected]>2021-07-11 11:47:09 +1000
committerhuntc <[email protected]>2021-07-15 12:31:52 +1000
commit108cffcba02d5f84099991d670cddfb458e2c106 (patch)
tree429db59adcfb10d691fde2d4eb3a2dfa66107e43
parentdcd0c38109ed6711d91c4bdff42825f25e3ee402 (diff)
Migrated to the waker registration functionality for Embassy specific optimisations
-rw-r--r--embassy/Cargo.toml1
-rw-r--r--embassy/src/util/mpsc.rs61
2 files changed, 20 insertions, 42 deletions
diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml
index d26490247..c03fc0df5 100644
--- a/embassy/Cargo.toml
+++ b/embassy/Cargo.toml
@@ -39,6 +39,7 @@ embedded-hal = "0.2.5"
39cast = { version = "=0.2.3", default-features = false } 39cast = { version = "=0.2.3", default-features = false }
40 40
41[dev-dependencies] 41[dev-dependencies]
42embassy = { path = ".", features = ["executor-agnostic"] }
42futures-executor = { version = "0.3", features = [ "thread-pool" ] } 43futures-executor = { version = "0.3", features = [ "thread-pool" ] }
43futures-test = "0.3" 44futures-test = "0.3"
44futures-timer = "0.3" 45futures-timer = "0.3"
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index 8f1bba764..580c6794f 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -51,6 +51,7 @@ use super::CriticalSectionMutex;
51use super::Mutex; 51use super::Mutex;
52use super::NoopMutex; 52use super::NoopMutex;
53use super::ThreadModeMutex; 53use super::ThreadModeMutex;
54use super::WakerRegistration;
54 55
55/// Send values to the associated `Receiver`. 56/// Send values to the associated `Receiver`.
56/// 57///
@@ -149,7 +150,7 @@ where
149 Ok(v) => Poll::Ready(Some(v)), 150 Ok(v) => Poll::Ready(Some(v)),
150 Err(TryRecvError::Closed) => Poll::Ready(None), 151 Err(TryRecvError::Closed) => Poll::Ready(None),
151 Err(TryRecvError::Empty) => { 152 Err(TryRecvError::Empty) => {
152 self.channel.get().set_receiver_waker(cx.waker().clone()); 153 self.channel.get().set_receiver_waker(&cx.waker());
153 Poll::Pending 154 Poll::Pending
154 } 155 }
155 } 156 }
@@ -282,10 +283,7 @@ where
282 Ok(..) => Poll::Ready(Ok(())), 283 Ok(..) => Poll::Ready(Ok(())),
283 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), 284 Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
284 Err(TrySendError::Full(..)) => { 285 Err(TrySendError::Full(..)) => {
285 self.sender 286 self.sender.channel.get().set_senders_waker(&cx.waker());
286 .channel
287 .get()
288 .set_senders_waker(cx.waker().clone());
289 Poll::Pending 287 Poll::Pending
290 // Note we leave the existing UnsafeCell contents - they still 288 // Note we leave the existing UnsafeCell contents - they still
291 // contain the original message. We could create another UnsafeCell 289 // contain the original message. We could create another UnsafeCell
@@ -312,10 +310,7 @@ where
312 if self.sender.is_closed() { 310 if self.sender.is_closed() {
313 Poll::Ready(()) 311 Poll::Ready(())
314 } else { 312 } else {
315 self.sender 313 self.sender.channel.get().set_senders_waker(&cx.waker());
316 .channel
317 .get()
318 .set_senders_waker(cx.waker().clone());
319 Poll::Pending 314 Poll::Pending
320 } 315 }
321 } 316 }
@@ -400,8 +395,8 @@ struct ChannelState<T, const N: usize> {
400 closed: bool, 395 closed: bool,
401 receiver_registered: bool, 396 receiver_registered: bool,
402 senders_registered: u32, 397 senders_registered: u32,
403 receiver_waker: Option<Waker>, 398 receiver_waker: WakerRegistration,
404 senders_waker: Option<Waker>, 399 senders_waker: WakerRegistration,
405} 400}
406 401
407impl<T, const N: usize> ChannelState<T, N> { 402impl<T, const N: usize> ChannelState<T, N> {
@@ -416,8 +411,8 @@ impl<T, const N: usize> ChannelState<T, N> {
416 let closed = false; 411 let closed = false;
417 let receiver_registered = false; 412 let receiver_registered = false;
418 let senders_registered = 0; 413 let senders_registered = 0;
419 let receiver_waker = None; 414 let receiver_waker = WakerRegistration::new();
420 let senders_waker = None; 415 let senders_waker = WakerRegistration::new();
421 ChannelState { 416 ChannelState {
422 buf, 417 buf,
423 read_pos, 418 read_pos,
@@ -534,9 +529,7 @@ where
534 if state.read_pos != state.write_pos || state.full { 529 if state.read_pos != state.write_pos || state.full {
535 if state.full { 530 if state.full {
536 state.full = false; 531 state.full = false;
537 if let Some(w) = state.senders_waker.take() { 532 state.senders_waker.wake();
538 w.wake();
539 }
540 } 533 }
541 let message = 534 let message =
542 unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() }; 535 unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() };
@@ -546,9 +539,7 @@ where
546 Err(TryRecvError::Empty) 539 Err(TryRecvError::Empty)
547 } else { 540 } else {
548 state.closed = true; 541 state.closed = true;
549 if let Some(w) = state.senders_waker.take() { 542 state.senders_waker.wake();
550 w.wake();
551 }
552 Err(TryRecvError::Closed) 543 Err(TryRecvError::Closed)
553 } 544 }
554 } else { 545 } else {
@@ -567,9 +558,7 @@ where
567 if state.write_pos == state.read_pos { 558 if state.write_pos == state.read_pos {
568 state.full = true; 559 state.full = true;
569 } 560 }
570 if let Some(w) = state.receiver_waker.take() { 561 state.receiver_waker.wake();
571 w.wake();
572 }
573 Ok(()) 562 Ok(())
574 } else { 563 } else {
575 Err(TrySendError::Full(message)) 564 Err(TrySendError::Full(message))
@@ -583,9 +572,7 @@ where
583 fn close(&mut self) { 572 fn close(&mut self) {
584 let state = &mut self.state; 573 let state = &mut self.state;
585 self.mutex.lock(|_| { 574 self.mutex.lock(|_| {
586 if let Some(w) = state.receiver_waker.take() { 575 state.receiver_waker.wake();
587 w.wake();
588 }
589 state.closing = true; 576 state.closing = true;
590 }); 577 });
591 } 578 }
@@ -608,9 +595,7 @@ where
608 self.mutex.lock(|_| { 595 self.mutex.lock(|_| {
609 if state.receiver_registered { 596 if state.receiver_registered {
610 state.closed = true; 597 state.closed = true;
611 if let Some(w) = state.senders_waker.take() { 598 state.senders_waker.wake();
612 w.wake();
613 }
614 } 599 }
615 state.receiver_registered = false; 600 state.receiver_registered = false;
616 }) 601 })
@@ -629,38 +614,30 @@ where
629 assert!(state.senders_registered > 0); 614 assert!(state.senders_registered > 0);
630 state.senders_registered -= 1; 615 state.senders_registered -= 1;
631 if state.senders_registered == 0 { 616 if state.senders_registered == 0 {
632 if let Some(w) = state.receiver_waker.take() { 617 state.receiver_waker.wake();
633 w.wake();
634 }
635 state.closing = true; 618 state.closing = true;
636 } 619 }
637 }) 620 })
638 } 621 }
639 622
640 fn set_receiver_waker(&mut self, receiver_waker: Waker) { 623 fn set_receiver_waker(&mut self, receiver_waker: &Waker) {
641 let state = &mut self.state; 624 let state = &mut self.state;
642 self.mutex.lock(|_| { 625 self.mutex.lock(|_| {
643 state.receiver_waker = Some(receiver_waker); 626 state.receiver_waker.register(receiver_waker);
644 }) 627 })
645 } 628 }
646 629
647 fn set_senders_waker(&mut self, senders_waker: Waker) { 630 fn set_senders_waker(&mut self, senders_waker: &Waker) {
648 let state = &mut self.state; 631 let state = &mut self.state;
649 self.mutex.lock(|_| { 632 self.mutex.lock(|_| {
650
651 // Dispose of any existing sender causing them to be polled again. 633 // Dispose of any existing sender causing them to be polled again.
652 // This could cause a spin given multiple concurrent senders, however given that 634 // This could cause a spin given multiple concurrent senders, however given that
653 // most sends only block waiting for the receiver to become active, this should 635 // most sends only block waiting for the receiver to become active, this should
654 // be a short-lived activity. The upside is a greatly simplified implementation 636 // be a short-lived activity. The upside is a greatly simplified implementation
655 // that avoids the need for intrusive linked-lists and unsafe operations on pinned 637 // that avoids the need for intrusive linked-lists and unsafe operations on pinned
656 // pointers. 638 // pointers.
657 if let Some(waker) = state.senders_waker.clone() { 639 state.senders_waker.wake();
658 if !senders_waker.will_wake(&waker) { 640 state.senders_waker.register(senders_waker);
659 trace!("Waking an an active send waker due to being superseded with a new one. While benign, please report this.");
660 waker.wake();
661 }
662 }
663 state.senders_waker = Some(senders_waker);
664 }) 641 })
665 } 642 }
666} 643}