aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--embassy/src/channel/pubsub.rs106
1 files changed, 34 insertions, 72 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs
index ea0ccb2da..20878187d 100644
--- a/embassy/src/channel/pubsub.rs
+++ b/embassy/src/channel/pubsub.rs
@@ -109,9 +109,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
109 return Err(message); 109 return Err(message);
110 } 110 }
111 // We just did a check for this 111 // We just did a check for this
112 unsafe { 112 s.queue.push_back((message, active_subscriber_count)).ok().unwrap();
113 s.queue.push_back_unchecked((message, active_subscriber_count));
114 }
115 113
116 s.next_message_id += 1; 114 s.next_message_id += 1;
117 115
@@ -138,7 +136,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
138 drop(s); 136 drop(s);
139 137
140 // This will succeed because we made sure there is space 138 // This will succeed because we made sure there is space
141 unsafe { self.try_publish(message).unwrap_unchecked() }; 139 self.try_publish(message).ok().unwrap();
142 }); 140 });
143 } 141 }
144 142
@@ -159,51 +157,41 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
159 } 157 }
160 158
161 // We've checked that the index is valid 159 // We've checked that the index is valid
162 unsafe { 160 let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap();
163 let queue_item = s.queue.iter_mut().nth(current_message_index).unwrap_unchecked();
164
165 // We're reading this item, so decrement the counter
166 queue_item.1 -= 1;
167 let message = queue_item.0.clone();
168 161
169 if current_message_index == 0 && queue_item.1 == 0 { 162 // We're reading this item, so decrement the counter
170 s.queue.pop_front(); 163 queue_item.1 -= 1;
171 s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); 164 let message = queue_item.0.clone();
172 }
173 165
174 Some(WaitResult::Message(message)) 166 if current_message_index == 0 && queue_item.1 == 0 {
167 s.queue.pop_front();
168 s.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake());
175 } 169 }
170
171 Some(WaitResult::Message(message))
176 }) 172 })
177 } 173 }
178 174
179 unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) { 175 fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker) {
180 self.inner.lock(|inner| { 176 self.inner.lock(|inner| {
181 let mut s = inner.borrow_mut(); 177 let mut s = inner.borrow_mut();
182 s.subscriber_wakers 178 s.subscriber_wakers[subscriber_index].as_mut().unwrap().register(waker);
183 .get_unchecked_mut(subscriber_index)
184 .as_mut()
185 .unwrap_unchecked()
186 .register(waker);
187 }) 179 })
188 } 180 }
189 181
190 unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) { 182 fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker) {
191 self.inner.lock(|inner| { 183 self.inner.lock(|inner| {
192 let mut s = inner.borrow_mut(); 184 let mut s = inner.borrow_mut();
193 s.publisher_wakers 185 s.publisher_wakers[publisher_index].as_mut().unwrap().register(waker);
194 .get_unchecked_mut(publisher_index)
195 .as_mut()
196 .unwrap_unchecked()
197 .register(waker);
198 }) 186 })
199 } 187 }
200 188
201 unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { 189 fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) {
202 self.inner.lock(|inner| { 190 self.inner.lock(|inner| {
203 let mut s = inner.borrow_mut(); 191 let mut s = inner.borrow_mut();
204 192
205 // Remove the subscriber from the wakers 193 // Remove the subscriber from the wakers
206 *s.subscriber_wakers.get_unchecked_mut(subscriber_index) = None; 194 s.subscriber_wakers[subscriber_index] = None;
207 195
208 // All messages that haven't been read yet by this subscriber must have their counter decremented 196 // All messages that haven't been read yet by this subscriber must have their counter decremented
209 let start_id = s.next_message_id - s.queue.len() as u64; 197 let start_id = s.next_message_id - s.queue.len() as u64;
@@ -217,11 +205,11 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
217 }) 205 })
218 } 206 }
219 207
220 unsafe fn unregister_publisher(&self, publisher_index: usize) { 208 fn unregister_publisher(&self, publisher_index: usize) {
221 self.inner.lock(|inner| { 209 self.inner.lock(|inner| {
222 let mut s = inner.borrow_mut(); 210 let mut s = inner.borrow_mut();
223 // Remove the publisher from the wakers 211 // Remove the publisher from the wakers
224 *s.publisher_wakers.get_unchecked_mut(publisher_index) = None; 212 s.publisher_wakers[publisher_index] = None;
225 }) 213 })
226 } 214 }
227} 215}
@@ -316,10 +304,8 @@ impl<'a, T: Clone> Subscriber<'a, T> {
316 304
317impl<'a, T: Clone> Drop for Subscriber<'a, T> { 305impl<'a, T: Clone> Drop for Subscriber<'a, T> {
318 fn drop(&mut self) { 306 fn drop(&mut self) {
319 unsafe { 307 self.channel
320 self.channel 308 .unregister_subscriber(self.subscriber_index, self.next_message_id)
321 .unregister_subscriber(self.subscriber_index, self.next_message_id)
322 }
323 } 309 }
324} 310}
325 311
@@ -340,10 +326,8 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
340 } 326 }
341 // No, so we need to reregister our waker and sleep again 327 // No, so we need to reregister our waker and sleep again
342 None => { 328 None => {
343 unsafe { 329 this.channel
344 this.channel 330 .register_subscriber_waker(this.subscriber_index, cx.waker());
345 .register_subscriber_waker(this.subscriber_index, cx.waker());
346 }
347 Poll::Pending 331 Poll::Pending
348 } 332 }
349 // We missed a couple of messages. We must do our internal bookkeeping. 333 // We missed a couple of messages. We must do our internal bookkeeping.
@@ -391,7 +375,7 @@ impl<'a, T: Clone> Publisher<'a, T> {
391 375
392impl<'a, T: Clone> Drop for Publisher<'a, T> { 376impl<'a, T: Clone> Drop for Publisher<'a, T> {
393 fn drop(&mut self) { 377 fn drop(&mut self) {
394 unsafe { self.channel.unregister_publisher(self.publisher_index) } 378 self.channel.unregister_publisher(self.publisher_index)
395 } 379 }
396} 380}
397 381
@@ -434,31 +418,13 @@ trait PubSubBehavior<T> {
434 /// Tries to read the message if available 418 /// Tries to read the message if available
435 fn get_message(&self, message_id: u64) -> Option<WaitResult<T>>; 419 fn get_message(&self, message_id: u64) -> Option<WaitResult<T>>;
436 /// Register the given waker for the given subscriber. 420 /// Register the given waker for the given subscriber.
437 /// 421 fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker);
438 /// ## Safety
439 ///
440 /// The subscriber index must be of a valid and active subscriber
441 unsafe fn register_subscriber_waker(&self, subscriber_index: usize, waker: &Waker);
442 /// Register the given waker for the given publisher. 422 /// Register the given waker for the given publisher.
443 /// 423 fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker);
444 /// ## Safety
445 ///
446 /// The subscriber index must be of a valid and active publisher
447 unsafe fn register_publisher_waker(&self, publisher_index: usize, waker: &Waker);
448 /// Make the channel forget the subscriber. 424 /// Make the channel forget the subscriber.
449 /// 425 fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64);
450 /// ## Safety
451 ///
452 /// The subscriber index must be of a valid and active subscriber which must not be used again
453 /// unless a new subscriber takes on that index.
454 unsafe fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64);
455 /// Make the channel forget the publisher. 426 /// Make the channel forget the publisher.
456 /// 427 fn unregister_publisher(&self, publisher_index: usize);
457 /// ## Safety
458 ///
459 /// The publisher index must be of a valid and active publisher which must not be used again
460 /// unless a new publisher takes on that index.
461 unsafe fn unregister_publisher(&self, publisher_index: usize);
462} 428}
463 429
464/// Future for the subscriber wait action 430/// Future for the subscriber wait action
@@ -479,11 +445,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> {
479 } 445 }
480 // No, so we need to reregister our waker and sleep again 446 // No, so we need to reregister our waker and sleep again
481 None => { 447 None => {
482 unsafe { 448 self.subscriber
483 self.subscriber 449 .channel
484 .channel 450 .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker());
485 .register_subscriber_waker(self.subscriber.subscriber_index, cx.waker());
486 }
487 Poll::Pending 451 Poll::Pending
488 } 452 }
489 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged 453 // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
@@ -517,11 +481,9 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> {
517 // The queue is full, so we need to reregister our waker and go to sleep 481 // The queue is full, so we need to reregister our waker and go to sleep
518 Err(message) => { 482 Err(message) => {
519 this.message = Some(message); 483 this.message = Some(message);
520 unsafe { 484 this.publisher
521 this.publisher 485 .channel
522 .channel 486 .register_publisher_waker(this.publisher.publisher_index, cx.waker());
523 .register_publisher_waker(this.publisher.publisher_index, cx.waker());
524 }
525 Poll::Pending 487 Poll::Pending
526 } 488 }
527 } 489 }