aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuntc <[email protected]>2021-07-14 13:31:23 +1000
committerhuntc <[email protected]>2021-07-15 12:31:53 +1000
commitd86892ca566907aa7b9c29971229262557be49dc (patch)
tree7641563de575d174c993956a54162dace63d9c0a
parenta247fa4f2c90993bad3501349029c52e7bb06f9d (diff)
Removed the closing state as it was not required
-rw-r--r--embassy/src/util/mpsc.rs35
1 files changed, 13 insertions, 22 deletions
diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs
index b64d81c89..7f37eece4 100644
--- a/embassy/src/util/mpsc.rs
+++ b/embassy/src/util/mpsc.rs
@@ -390,7 +390,6 @@ struct ChannelState<T, const N: usize> {
390 read_pos: usize, 390 read_pos: usize,
391 write_pos: usize, 391 write_pos: usize,
392 full: bool, 392 full: bool,
393 closing: bool,
394 closed: bool, 393 closed: bool,
395 receiver_registered: bool, 394 receiver_registered: bool,
396 senders_registered: u32, 395 senders_registered: u32,
@@ -407,7 +406,6 @@ impl<T, const N: usize> ChannelState<T, N> {
407 read_pos: 0, 406 read_pos: 0,
408 write_pos: 0, 407 write_pos: 0,
409 full: false, 408 full: false,
410 closing: false,
411 closed: false, 409 closed: false,
412 receiver_registered: false, 410 receiver_registered: false,
413 senders_registered: 0, 411 senders_registered: 0,
@@ -528,25 +526,18 @@ where
528 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { 526 fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
529 let mut state = &mut self.state; 527 let mut state = &mut self.state;
530 self.mutex.lock(|_| { 528 self.mutex.lock(|_| {
531 if !state.closed { 529 if state.read_pos != state.write_pos || state.full {
532 if state.read_pos != state.write_pos || state.full { 530 if state.full {
533 if state.full { 531 state.full = false;
534 state.full = false;
535 state.senders_waker.wake();
536 }
537 let message =
538 unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() };
539 state.read_pos = (state.read_pos + 1) % state.buf.len();
540 Ok(message)
541 } else if !state.closing {
542 cx.into_iter()
543 .for_each(|cx| Self::set_receiver_waker(&mut state, &cx.waker()));
544 Err(TryRecvError::Empty)
545 } else {
546 state.closed = true;
547 state.senders_waker.wake(); 532 state.senders_waker.wake();
548 Err(TryRecvError::Closed)
549 } 533 }
534 let message = unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() };
535 state.read_pos = (state.read_pos + 1) % state.buf.len();
536 Ok(message)
537 } else if !state.closed {
538 cx.into_iter()
539 .for_each(|cx| Self::set_receiver_waker(&mut state, &cx.waker()));
540 Err(TryRecvError::Empty)
550 } else { 541 } else {
551 Err(TryRecvError::Closed) 542 Err(TryRecvError::Closed)
552 } 543 }
@@ -588,7 +579,7 @@ where
588 let state = &mut self.state; 579 let state = &mut self.state;
589 self.mutex.lock(|_| { 580 self.mutex.lock(|_| {
590 state.receiver_waker.wake(); 581 state.receiver_waker.wake();
591 state.closing = true; 582 state.closed = true;
592 }); 583 });
593 } 584 }
594 585
@@ -599,7 +590,7 @@ where
599 fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool { 590 fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool {
600 let mut state = &mut self.state; 591 let mut state = &mut self.state;
601 self.mutex.lock(|_| { 592 self.mutex.lock(|_| {
602 if state.closing || state.closed { 593 if state.closed {
603 cx.into_iter() 594 cx.into_iter()
604 .for_each(|cx| Self::set_senders_waker(&mut state, &cx.waker())); 595 .for_each(|cx| Self::set_senders_waker(&mut state, &cx.waker()));
605 true 596 true
@@ -642,7 +633,7 @@ where
642 state.senders_registered -= 1; 633 state.senders_registered -= 1;
643 if state.senders_registered == 0 { 634 if state.senders_registered == 0 {
644 state.receiver_waker.wake(); 635 state.receiver_waker.wake();
645 state.closing = true; 636 state.closed = true;
646 } 637 }
647 }) 638 })
648 } 639 }