diff options
| -rw-r--r-- | embassy/src/channel/pubsub.rs | 106 |
1 files changed, 34 insertions, 72 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index ea0ccb2da..20878187d 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs | |||
| @@ -109,9 +109,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 109 | return Err(message); | 109 | return Err(message); |
| 110 | } | 110 | } |
| 111 | // We just did a check for this | 111 | // We just did a check for this |
| 112 | unsafe { | 112 | s.queue.push_back((message, active_subscriber_count)).ok().unwrap(); |
| 113 | s.queue.push_back_unchecked((message, active_subscriber_count)); | ||
| 114 | } | ||
| 115 | 113 | ||
| 116 | s.next_message_id += 1; | 114 | s.next_message_id += 1; |
| 117 | 115 | ||
| @@ -138,7 +136,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 138 | drop(s); | 136 | drop(s); |
| 139 | 137 | ||
| 140 | // This will succeed because we made sure there is space | 138 | // This will succeed because we made sure there is space |
| 141 | unsafe { self.try_publish(message).unwrap_unchecked() }; | 139 | self.try_publish(message).ok().unwrap(); |
| 142 | }); | 140 | }); |
| 143 | } | 141 | } |
| 144 | 142 | ||
| @@ -159,51 +157,41 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 159 | } | 157 | } |
| 160 | 158 | ||
| 161 | // We've checked that the index is valid | 159 | // We've checked that the index is valid |
| 162 | unsafe { | 160 | let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap(); |
| 163 | let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap_unchecked(); | ||
| 164 | |||
| 165 | // We're reading this item, so decrement the counter | ||
| 166 | queue_item.1 -= 1; | ||
| 167 | let message = queue_item.0.clone(); | ||
| 168 | 161 | ||
| 169 | if current_message_index == 0 && queue_item.1 == 0 { | 162 | // We're reading this item, so decrement the counter |
| 170 | s.queue.pop_front(); | 163 | queue_item.1 -= 1; |
| 171 | s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); | 164 | let message = queue_item.0.clone(); |
| 172 | } | ||
| 173 | 165 | ||
| 174 | Some(WaitResult::Message(message)) | 166 | if current_message_index == 0 && queue_item.1 == 0 { |
| 167 | s.queue.pop_front(); | ||
| 168 | s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); | ||
| 175 | } | 169 | } |
| 170 | |||
| 171 | Some(WaitResult::Message(message)) | ||
| 176 | }) | 172 | }) |
| 177 | } | 173 | } |
| 178 | 174 | ||
| 179 | unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) { | 175 | fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) { |
| 180 | self.inner.lock(|inner| { | 176 | self.inner.lock(|inner| { |
| 181 | let mut s = inner.borrow_mut(); | 177 | let mut s = inner.borrow_mut(); |
| 182 | s.subscriber_wakers | 178 | s.subscriber_wakers[subscriber_index].as_mut().unwrap().register(waker); |
| 183 | .get_unchecked_mut(subscriber_index) | ||
| 184 | .as_mut() | ||
| 185 | .unwrap_unchecked() | ||
| 186 | .register(waker); | ||
| 187 | }) | 179 | }) |
| 188 | } | 180 | } |
| 189 | 181 | ||
| 190 | unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) { | 182 | fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) { |
| 191 | self.inner.lock(|inner| { | 183 | self.inner.lock(|inner| { |
| 192 | let mut s = inner.borrow_mut(); | 184 | let mut s = inner.borrow_mut(); |
| 193 | s.publisher_wakers | 185 | s.publisher_wakers[publisher_index].as_mut().unwrap().register(waker); |
| 194 | .get_unchecked_mut(publisher_index) | ||
| 195 | .as_mut() | ||
| 196 | .unwrap_unchecked() | ||
| 197 | .register(waker); | ||
| 198 | }) | 186 | }) |
| 199 | } | 187 | } |
| 200 | 188 | ||
| 201 | unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { | 189 | fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { |
| 202 | self.inner.lock(|inner| { | 190 | self.inner.lock(|inner| { |
| 203 | let mut s = inner.borrow_mut(); | 191 | let mut s = inner.borrow_mut(); |
| 204 | 192 | ||
| 205 | // Remove the subscriber from the wakers | 193 | // Remove the subscriber from the wakers |
| 206 | *s.subscriber_wakers.get_unchecked_mut(subscriber_index) = None; | 194 | s.subscriber_wakers[subscriber_index] = None; |
| 207 | 195 | ||
| 208 | // All messages that haven't been read yet by this subscriber must have their counter decremented | 196 | // All messages that haven't been read yet by this subscriber must have their counter decremented |
| 209 | let start_id = s.next_message_id - s.queue.len() as u64; | 197 | let start_id = s.next_message_id - s.queue.len() as u64; |
| @@ -217,11 +205,11 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 217 | }) | 205 | }) |
| 218 | } | 206 | } |
| 219 | 207 | ||
| 220 | unsafe fn unregister_publisher(&self, publisher_index: usize) { | 208 | fn unregister_publisher(&self, publisher_index: usize) { |
| 221 | self.inner.lock(|inner| { | 209 | self.inner.lock(|inner| { |
| 222 | let mut s = inner.borrow_mut(); | 210 | let mut s = inner.borrow_mut(); |
| 223 | // Remove the publisher from the wakers | 211 | // Remove the publisher from the wakers |
| 224 | *s.publisher_wakers.get_unchecked_mut(publisher_index) = None; | 212 | s.publisher_wakers[publisher_index] = None; |
| 225 | }) | 213 | }) |
| 226 | } | 214 | } |
| 227 | } | 215 | } |
| @@ -316,10 +304,8 @@ impl<'a, T: Clone> Subscriber<'a, T> { | |||
| 316 | 304 | ||
| 317 | impl<'a, T: Clone> Drop for Subscriber<'a, T> { | 305 | impl<'a, T: Clone> Drop for Subscriber<'a, T> { |
| 318 | fn drop(&mut self) { | 306 | fn drop(&mut self) { |
| 319 | unsafe { | 307 | self.channel |
| 320 | self.channel | 308 | .unregister_subscriber(self.subscriber_index, self.next_message_id) |
| 321 | .unregister_subscriber(self.subscriber_index, self.next_message_id) | ||
| 322 | } | ||
| 323 | } | 309 | } |
| 324 | } | 310 | } |
| 325 | 311 | ||
| @@ -340,10 +326,8 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | |||
| 340 | } | 326 | } |
| 341 | // No, so we need to reregister our waker and sleep again | 327 | // No, so we need to reregister our waker and sleep again |
| 342 | None => { | 328 | None => { |
| 343 | unsafe { | 329 | this.channel |
| 344 | this.channel | 330 | .register_subscriber_waker(this.subscriber_index, cx.waker()); |
| 345 | .register_subscriber_waker(this.subscriber_index, cx.waker()); | ||
| 346 | } | ||
| 347 | Poll::Pending | 331 | Poll::Pending |
| 348 | } | 332 | } |
| 349 | // We missed a couple of messages. We must do our internal bookkeeping. | 333 | // We missed a couple of messages. We must do our internal bookkeeping. |
| @@ -391,7 +375,7 @@ impl<'a, T: Clone> Publisher<'a, T> { | |||
| 391 | 375 | ||
| 392 | impl<'a, T: Clone> Drop for Publisher<'a, T> { | 376 | impl<'a, T: Clone> Drop for Publisher<'a, T> { |
| 393 | fn drop(&mut self) { | 377 | fn drop(&mut self) { |
| 394 | unsafe { self.channel.unregister_publisher(self.publisher_index) } | 378 | self.channel.unregister_publisher(self.publisher_index) |
| 395 | } | 379 | } |
| 396 | } | 380 | } |
| 397 | 381 | ||
| @@ -434,31 +418,13 @@ trait PubSubBehavior<T> { | |||
| 434 | /// Tries to read the message if available | 418 | /// Tries to read the message if available |
| 435 | fn get_message(&self, message_id: u64) -> Option<WaitResult<T>>; | 419 | fn get_message(&self, message_id: u64) -> Option<WaitResult<T>>; |
| 436 | /// Register the given waker for the given subscriber. | 420 | /// Register the given waker for the given subscriber. |
| 437 | /// | 421 | fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); |
| 438 | /// ## Safety | ||
| 439 | /// | ||
| 440 | /// The subscriber index must be of a valid and active subscriber | ||
| 441 | unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker); | ||
| 442 | /// Register the given waker for the given publisher. | 422 | /// Register the given waker for the given publisher. |
| 443 | /// | 423 | fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); |
| 444 | /// ## Safety | ||
| 445 | /// | ||
| 446 | /// The subscriber index must be of a valid and active publisher | ||
| 447 | unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker); | ||
| 448 | /// Make the channel forget the subscriber. | 424 | /// Make the channel forget the subscriber. |
| 449 | /// | 425 | fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); |
| 450 | /// ## Safety | ||
| 451 | /// | ||
| 452 | /// The subscriber index must be of a valid and active subscriber which must not be used again | ||
| 453 | /// unless a new subscriber takes on that index. | ||
| 454 | unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); | ||
| 455 | /// Make the channel forget the publisher. | 426 | /// Make the channel forget the publisher. |
| 456 | /// | 427 | fn unregister_publisher(&self, publisher_index: usize); |
| 457 | /// ## Safety | ||
| 458 | /// | ||
| 459 | /// The publisher index must be of a valid and active publisher which must not be used again | ||
| 460 | /// unless a new publisher takes on that index. | ||
| 461 | unsafe fn unregister_publisher(&self, publisher_index: usize); | ||
| 462 | } | 428 | } |
| 463 | 429 | ||
| 464 | /// Future for the subscriber wait action | 430 | /// Future for the subscriber wait action |
| @@ -479,11 +445,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { | |||
| 479 | } | 445 | } |
| 480 | // No, so we need to reregister our waker and sleep again | 446 | // No, so we need to reregister our waker and sleep again |
| 481 | None => { | 447 | None => { |
| 482 | unsafe { | 448 | self.subscriber |
| 483 | self.subscriber | 449 | .channel |
| 484 | .channel | 450 | .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); |
| 485 | .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker()); | ||
| 486 | } | ||
| 487 | Poll::Pending | 451 | Poll::Pending |
| 488 | } | 452 | } |
| 489 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged | 453 | // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged |
| @@ -517,11 +481,9 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { | |||
| 517 | // The queue is full, so we need to reregister our waker and go to sleep | 481 | // The queue is full, so we need to reregister our waker and go to sleep |
| 518 | Err(message) => { | 482 | Err(message) => { |
| 519 | this.message = Some(message); | 483 | this.message = Some(message); |
| 520 | unsafe { | 484 | this.publisher |
| 521 | this.publisher | 485 | .channel |
| 522 | .channel | 486 | .register_publisher_waker(this.publisher.publisher_index, cx.waker()); |
| 523 | .register_publisher_waker(this.publisher.publisher_index, cx.waker()); | ||
| 524 | } | ||
| 525 | Poll::Pending | 487 | Poll::Pending |
| 526 | } | 488 | } |
| 527 | } | 489 | } |
