diff options
| author | Dion Dokter <[email protected]> | 2022-06-17 13:54:34 +0200 |
|---|---|---|
| committer | Dion Dokter <[email protected]> | 2022-06-17 13:54:34 +0200 |
| commit | eb304c244839df742f4a76a4072f13179e397d0b (patch) | |
| tree | 9c45319a4adaafc6b0a6c57e8a02eb957b4d58a4 | |
| parent | 2a4cdd05faa40a7ce63f66e45080cdacb5171747 (diff) | |
Added a function to WakerRegistration to check if it's occupied.
Created a MultiWakerRegistration that can hold multiple wakers.
Got rid of some options and the pub/sub_index
| -rw-r--r-- | embassy/src/channel/pubsub.rs | 176 | ||||
| -rw-r--r-- | embassy/src/waitqueue/mod.rs | 3 | ||||
| -rw-r--r-- | embassy/src/waitqueue/multi_waker.rs | 31 | ||||
| -rw-r--r-- | embassy/src/waitqueue/waker.rs | 5 | ||||
| -rw-r--r-- | embassy/src/waitqueue/waker_agnostic.rs | 5 |
5 files changed, 120 insertions, 100 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs index cc00f47af..41f275c4e 100644 --- a/embassy/src/channel/pubsub.rs +++ b/embassy/src/channel/pubsub.rs | |||
| @@ -10,7 +10,7 @@ use heapless::Deque; | |||
| 10 | 10 | ||
| 11 | use crate::blocking_mutex::raw::RawMutex; | 11 | use crate::blocking_mutex::raw::RawMutex; |
| 12 | use crate::blocking_mutex::Mutex; | 12 | use crate::blocking_mutex::Mutex; |
| 13 | use crate::waitqueue::WakerRegistration; | 13 | use crate::waitqueue::MultiWakerRegistration; |
| 14 | 14 | ||
| 15 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers | 15 | /// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers |
| 16 | /// | 16 | /// |
| @@ -42,21 +42,15 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 42 | self.inner.lock(|inner| { | 42 | self.inner.lock(|inner| { |
| 43 | let mut s = inner.borrow_mut(); | 43 | let mut s = inner.borrow_mut(); |
| 44 | 44 | ||
| 45 | // Search for an empty subscriber spot | 45 | if s.subscriber_count >= SUBS { |
| 46 | for (i, sub_spot) in s.subscriber_wakers.iter_mut().enumerate() { | 46 | Err(Error::MaximumSubscribersReached) |
| 47 | if sub_spot.is_none() { | 47 | } else { |
| 48 | // We've found a spot, so now fill it and create the subscriber | 48 | s.subscriber_count += 1; |
| 49 | *sub_spot = Some(WakerRegistration::new()); | 49 | Ok(Subscriber { |
| 50 | return Ok(Subscriber { | 50 | next_message_id: s.next_message_id, |
| 51 | subscriber_index: i, | 51 | channel: self, |
| 52 | next_message_id: s.next_message_id, | 52 | }) |
| 53 | channel: self, | ||
| 54 | }); | ||
| 55 | } | ||
| 56 | } | 53 | } |
| 57 | |||
| 58 | // No spot was found, we're full | ||
| 59 | Err(Error::MaximumSubscribersReached) | ||
| 60 | }) | 54 | }) |
| 61 | } | 55 | } |
| 62 | 56 | ||
| @@ -67,20 +61,12 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 67 | self.inner.lock(|inner| { | 61 | self.inner.lock(|inner| { |
| 68 | let mut s = inner.borrow_mut(); | 62 | let mut s = inner.borrow_mut(); |
| 69 | 63 | ||
| 70 | // Search for an empty publisher spot | 64 | if s.publisher_count >= PUBS { |
| 71 | for (i, pub_spot) in s.publisher_wakers.iter_mut().enumerate() { | 65 | Err(Error::MaximumPublishersReached) |
| 72 | if pub_spot.is_none() { | 66 | } else { |
| 73 | // We've found a spot, so now fill it and create the subscriber | 67 | s.publisher_count += 1; |
| 74 | *pub_spot = Some(WakerRegistration::new()); | 68 | Ok(Publisher { channel: self }) |
| 75 | return Ok(Publisher { | ||
| 76 | publisher_index: i, | ||
| 77 | channel: self, | ||
| 78 | }); | ||
| 79 | } | ||
| 80 | } | 69 | } |
| 81 | |||
| 82 | // No spot was found, we're full | ||
| 83 | Err(Error::MaximumPublishersReached) | ||
| 84 | }) | 70 | }) |
| 85 | } | 71 | } |
| 86 | 72 | ||
| @@ -94,12 +80,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 94 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> | 80 | impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> |
| 95 | for PubSubChannel<M, T, CAP, SUBS, PUBS> | 81 | for PubSubChannel<M, T, CAP, SUBS, PUBS> |
| 96 | { | 82 | { |
| 97 | fn get_message_with_context( | 83 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> { |
| 98 | &self, | ||
| 99 | next_message_id: &mut u64, | ||
| 100 | subscriber_index: usize, | ||
| 101 | cx: Option<&mut Context<'_>>, | ||
| 102 | ) -> Poll<WaitResult<T>> { | ||
| 103 | self.inner.lock(|s| { | 84 | self.inner.lock(|s| { |
| 104 | let mut s = s.borrow_mut(); | 85 | let mut s = s.borrow_mut(); |
| 105 | 86 | ||
| @@ -113,7 +94,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 113 | // No, so we need to reregister our waker and sleep again | 94 | // No, so we need to reregister our waker and sleep again |
| 114 | None => { | 95 | None => { |
| 115 | if let Some(cx) = cx { | 96 | if let Some(cx) = cx { |
| 116 | s.register_subscriber_waker(subscriber_index, cx.waker()); | 97 | s.register_subscriber_waker(cx.waker()); |
| 117 | } | 98 | } |
| 118 | Poll::Pending | 99 | Poll::Pending |
| 119 | } | 100 | } |
| @@ -126,7 +107,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 126 | }) | 107 | }) |
| 127 | } | 108 | } |
| 128 | 109 | ||
| 129 | fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T> { | 110 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> { |
| 130 | self.inner.lock(|s| { | 111 | self.inner.lock(|s| { |
| 131 | let mut s = s.borrow_mut(); | 112 | let mut s = s.borrow_mut(); |
| 132 | // Try to publish the message | 113 | // Try to publish the message |
| @@ -136,7 +117,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 136 | // The queue is full, so we need to reregister our waker and go to sleep | 117 | // The queue is full, so we need to reregister our waker and go to sleep |
| 137 | Err(message) => { | 118 | Err(message) => { |
| 138 | if let Some(cx) = cx { | 119 | if let Some(cx) = cx { |
| 139 | s.register_publisher_waker(publisher_index, cx.waker()); | 120 | s.register_publisher_waker(cx.waker()); |
| 140 | } | 121 | } |
| 141 | Err(message) | 122 | Err(message) |
| 142 | } | 123 | } |
| @@ -151,17 +132,17 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi | |||
| 151 | }) | 132 | }) |
| 152 | } | 133 | } |
| 153 | 134 | ||
| 154 | fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { | 135 | fn unregister_subscriber(&self, subscriber_next_message_id: u64) { |
| 155 | self.inner.lock(|s| { | 136 | self.inner.lock(|s| { |
| 156 | let mut s = s.borrow_mut(); | 137 | let mut s = s.borrow_mut(); |
| 157 | s.unregister_subscriber(subscriber_index, subscriber_next_message_id) | 138 | s.unregister_subscriber(subscriber_next_message_id) |
| 158 | }) | 139 | }) |
| 159 | } | 140 | } |
| 160 | 141 | ||
| 161 | fn unregister_publisher(&self, publisher_index: usize) { | 142 | fn unregister_publisher(&self) { |
| 162 | self.inner.lock(|s| { | 143 | self.inner.lock(|s| { |
| 163 | let mut s = s.borrow_mut(); | 144 | let mut s = s.borrow_mut(); |
| 164 | s.unregister_publisher(publisher_index) | 145 | s.unregister_publisher() |
| 165 | }) | 146 | }) |
| 166 | } | 147 | } |
| 167 | } | 148 | } |
| @@ -175,29 +156,30 @@ struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: us | |||
| 175 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. | 156 | /// If a million messages were published every second, then the ID's would run out in about 584942 years. |
| 176 | next_message_id: u64, | 157 | next_message_id: u64, |
| 177 | /// Collection of wakers for Subscribers that are waiting. | 158 | /// Collection of wakers for Subscribers that are waiting. |
| 178 | /// The [Subscriber::subscriber_index] field indexes into this array. | 159 | subscriber_wakers: MultiWakerRegistration<SUBS>, |
| 179 | subscriber_wakers: [Option<WakerRegistration>; SUBS], | ||
| 180 | /// Collection of wakers for Publishers that are waiting. | 160 | /// Collection of wakers for Publishers that are waiting. |
| 181 | /// The [Publisher::publisher_index] field indexes into this array. | 161 | publisher_wakers: MultiWakerRegistration<PUBS>, |
| 182 | publisher_wakers: [Option<WakerRegistration>; PUBS], | 162 | /// The amount of subscribers that are active |
| 163 | subscriber_count: usize, | ||
| 164 | /// The amount of publishers that are active | ||
| 165 | publisher_count: usize, | ||
| 183 | } | 166 | } |
| 184 | 167 | ||
| 185 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { | 168 | impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { |
| 186 | /// Create a new internal channel state | 169 | /// Create a new internal channel state |
| 187 | const fn new() -> Self { | 170 | const fn new() -> Self { |
| 188 | const WAKER_INIT: Option<WakerRegistration> = None; | ||
| 189 | Self { | 171 | Self { |
| 190 | queue: Deque::new(), | 172 | queue: Deque::new(), |
| 191 | next_message_id: 0, | 173 | next_message_id: 0, |
| 192 | subscriber_wakers: [WAKER_INIT; SUBS], | 174 | subscriber_wakers: MultiWakerRegistration::new(), |
| 193 | publisher_wakers: [WAKER_INIT; PUBS], | 175 | publisher_wakers: MultiWakerRegistration::new(), |
| 176 | subscriber_count: 0, | ||
| 177 | publisher_count: 0, | ||
| 194 | } | 178 | } |
| 195 | } | 179 | } |
| 196 | 180 | ||
| 197 | fn try_publish(&mut self, message: T) -> Result<(), T> { | 181 | fn try_publish(&mut self, message: T) -> Result<(), T> { |
| 198 | let active_subscriber_count = self.subscriber_wakers.iter().flatten().count(); | 182 | if self.subscriber_count == 0 { |
| 199 | |||
| 200 | if active_subscriber_count == 0 { | ||
| 201 | // We don't need to publish anything because there is no one to receive it | 183 | // We don't need to publish anything because there is no one to receive it |
| 202 | return Ok(()); | 184 | return Ok(()); |
| 203 | } | 185 | } |
| @@ -206,14 +188,12 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 206 | return Err(message); | 188 | return Err(message); |
| 207 | } | 189 | } |
| 208 | // We just did a check for this | 190 | // We just did a check for this |
| 209 | self.queue.push_back((message, active_subscriber_count)).ok().unwrap(); | 191 | self.queue.push_back((message, self.subscriber_count)).ok().unwrap(); |
| 210 | 192 | ||
| 211 | self.next_message_id += 1; | 193 | self.next_message_id += 1; |
| 212 | 194 | ||
| 213 | // Wake all of the subscribers | 195 | // Wake all of the subscribers |
| 214 | for active_subscriber in self.subscriber_wakers.iter_mut().flatten() { | 196 | self.subscriber_wakers.wake(); |
| 215 | active_subscriber.wake() | ||
| 216 | } | ||
| 217 | 197 | ||
| 218 | Ok(()) | 198 | Ok(()) |
| 219 | } | 199 | } |
| @@ -250,26 +230,42 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 250 | 230 | ||
| 251 | if current_message_index == 0 && queue_item.1 == 0 { | 231 | if current_message_index == 0 && queue_item.1 == 0 { |
| 252 | self.queue.pop_front(); | 232 | self.queue.pop_front(); |
| 253 | self.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); | 233 | self.publisher_wakers.wake(); |
| 254 | } | 234 | } |
| 255 | 235 | ||
| 256 | Some(WaitResult::Message(message)) | 236 | Some(WaitResult::Message(message)) |
| 257 | } | 237 | } |
| 258 | 238 | ||
| 259 | fn register_subscriber_waker(&mut self, subscriber_index: usize, waker: &Waker) { | 239 | fn register_subscriber_waker(&mut self, waker: &Waker) { |
| 260 | self.subscriber_wakers[subscriber_index] | 240 | match self.subscriber_wakers.register(waker) { |
| 261 | .as_mut() | 241 | Ok(()) => {} |
| 262 | .unwrap() | 242 | Err(_) => { |
| 263 | .register(waker); | 243 | // All waker slots were full. This can only happen when there was a subscriber that now has dropped. |
| 244 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 245 | // Any future that is still active will simply reregister. | ||
| 246 | // This won't happen a lot, so it's ok. | ||
| 247 | self.subscriber_wakers.wake(); | ||
| 248 | self.subscriber_wakers.register(waker).unwrap(); | ||
| 249 | } | ||
| 250 | } | ||
| 264 | } | 251 | } |
| 265 | 252 | ||
| 266 | fn register_publisher_waker(&mut self, publisher_index: usize, waker: &Waker) { | 253 | fn register_publisher_waker(&mut self, waker: &Waker) { |
| 267 | self.publisher_wakers[publisher_index].as_mut().unwrap().register(waker); | 254 | match self.publisher_wakers.register(waker) { |
| 255 | Ok(()) => {} | ||
| 256 | Err(_) => { | ||
| 257 | // All waker slots were full. This can only happen when there was a publisher that now has dropped. | ||
| 258 | // We need to throw it away. It's a bit inefficient, but we can wake everything. | ||
| 259 | // Any future that is still active will simply reregister. | ||
| 260 | // This won't happen a lot, so it's ok. | ||
| 261 | self.publisher_wakers.wake(); | ||
| 262 | self.publisher_wakers.register(waker).unwrap(); | ||
| 263 | } | ||
| 264 | } | ||
| 268 | } | 265 | } |
| 269 | 266 | ||
| 270 | fn unregister_subscriber(&mut self, subscriber_index: usize, subscriber_next_message_id: u64) { | 267 | fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) { |
| 271 | // Remove the subscriber from the wakers | 268 | self.subscriber_count -= 1; |
| 272 | self.subscriber_wakers[subscriber_index] = None; | ||
| 273 | 269 | ||
| 274 | // All messages that haven't been read yet by this subscriber must have their counter decremented | 270 | // All messages that haven't been read yet by this subscriber must have their counter decremented |
| 275 | let start_id = self.next_message_id - self.queue.len() as u64; | 271 | let start_id = self.next_message_id - self.queue.len() as u64; |
| @@ -282,9 +278,8 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 282 | } | 278 | } |
| 283 | } | 279 | } |
| 284 | 280 | ||
| 285 | fn unregister_publisher(&mut self, publisher_index: usize) { | 281 | fn unregister_publisher(&mut self) { |
| 286 | // Remove the publisher from the wakers | 282 | self.publisher_count -= 1; |
| 287 | self.publisher_wakers[publisher_index] = None; | ||
| 288 | } | 283 | } |
| 289 | } | 284 | } |
| 290 | 285 | ||
| @@ -293,8 +288,6 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta | |||
| 293 | /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's | 288 | /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's |
| 294 | /// generics are erased on this subscriber | 289 | /// generics are erased on this subscriber |
| 295 | pub struct Subscriber<'a, T: Clone> { | 290 | pub struct Subscriber<'a, T: Clone> { |
| 296 | /// Our index into the channel | ||
| 297 | subscriber_index: usize, | ||
| 298 | /// The message id of the next message we are yet to receive | 291 | /// The message id of the next message we are yet to receive |
| 299 | next_message_id: u64, | 292 | next_message_id: u64, |
| 300 | /// The channel we are a subscriber to | 293 | /// The channel we are a subscriber to |
| @@ -321,10 +314,7 @@ impl<'a, T: Clone> Subscriber<'a, T> { | |||
| 321 | /// | 314 | /// |
| 322 | /// This function does not peek. The message is received if there is one. | 315 | /// This function does not peek. The message is received if there is one. |
| 323 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { | 316 | pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { |
| 324 | match self | 317 | match self.channel.get_message_with_context(&mut self.next_message_id, None) { |
| 325 | .channel | ||
| 326 | .get_message_with_context(&mut self.next_message_id, self.subscriber_index, None) | ||
| 327 | { | ||
| 328 | Poll::Ready(result) => Some(result), | 318 | Poll::Ready(result) => Some(result), |
| 329 | Poll::Pending => None, | 319 | Poll::Pending => None, |
| 330 | } | 320 | } |
| @@ -346,8 +336,7 @@ impl<'a, T: Clone> Subscriber<'a, T> { | |||
| 346 | 336 | ||
| 347 | impl<'a, T: Clone> Drop for Subscriber<'a, T> { | 337 | impl<'a, T: Clone> Drop for Subscriber<'a, T> { |
| 348 | fn drop(&mut self) { | 338 | fn drop(&mut self) { |
| 349 | self.channel | 339 | self.channel.unregister_subscriber(self.next_message_id) |
| 350 | .unregister_subscriber(self.subscriber_index, self.next_message_id) | ||
| 351 | } | 340 | } |
| 352 | } | 341 | } |
| 353 | 342 | ||
| @@ -357,10 +346,9 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | |||
| 357 | type Item = T; | 346 | type Item = T; |
| 358 | 347 | ||
| 359 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | 348 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 360 | let sub_index = self.subscriber_index; | ||
| 361 | match self | 349 | match self |
| 362 | .channel | 350 | .channel |
| 363 | .get_message_with_context(&mut self.next_message_id, sub_index, Some(cx)) | 351 | .get_message_with_context(&mut self.next_message_id, Some(cx)) |
| 364 | { | 352 | { |
| 365 | Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), | 353 | Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), |
| 366 | Poll::Ready(WaitResult::Lagged(_)) => { | 354 | Poll::Ready(WaitResult::Lagged(_)) => { |
| @@ -377,8 +365,6 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { | |||
| 377 | /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's | 365 | /// This instance carries a reference to the channel, but uses a trait object for it so that the channel's |
| 378 | /// generics are erased on this subscriber | 366 | /// generics are erased on this subscriber |
| 379 | pub struct Publisher<'a, T: Clone> { | 367 | pub struct Publisher<'a, T: Clone> { |
| 380 | /// Our index into the channel | ||
| 381 | publisher_index: usize, | ||
| 382 | /// The channel we are a publisher for | 368 | /// The channel we are a publisher for |
| 383 | channel: &'a dyn PubSubBehavior<T>, | 369 | channel: &'a dyn PubSubBehavior<T>, |
| 384 | } | 370 | } |
| @@ -400,13 +386,13 @@ impl<'a, T: Clone> Publisher<'a, T> { | |||
| 400 | 386 | ||
| 401 | /// Publish a message if there is space in the message queue | 387 | /// Publish a message if there is space in the message queue |
| 402 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 388 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 403 | self.channel.publish_with_context(message, self.publisher_index, None) | 389 | self.channel.publish_with_context(message, None) |
| 404 | } | 390 | } |
| 405 | } | 391 | } |
| 406 | 392 | ||
| 407 | impl<'a, T: Clone> Drop for Publisher<'a, T> { | 393 | impl<'a, T: Clone> Drop for Publisher<'a, T> { |
| 408 | fn drop(&mut self) { | 394 | fn drop(&mut self) { |
| 409 | self.channel.unregister_publisher(self.publisher_index) | 395 | self.channel.unregister_publisher() |
| 410 | } | 396 | } |
| 411 | } | 397 | } |
| 412 | 398 | ||
| @@ -426,7 +412,7 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> { | |||
| 426 | 412 | ||
| 427 | /// Publish a message if there is space in the message queue | 413 | /// Publish a message if there is space in the message queue |
| 428 | pub fn try_publish(&self, message: T) -> Result<(), T> { | 414 | pub fn try_publish(&self, message: T) -> Result<(), T> { |
| 429 | self.channel.publish_with_context(message, usize::MAX, None) | 415 | self.channel.publish_with_context(message, None) |
| 430 | } | 416 | } |
| 431 | } | 417 | } |
| 432 | 418 | ||
| @@ -442,20 +428,15 @@ pub enum Error { | |||
| 442 | } | 428 | } |
| 443 | 429 | ||
| 444 | trait PubSubBehavior<T> { | 430 | trait PubSubBehavior<T> { |
| 445 | fn get_message_with_context( | 431 | fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>; |
| 446 | &self, | ||
| 447 | next_message_id: &mut u64, | ||
| 448 | subscriber_index: usize, | ||
| 449 | cx: Option<&mut Context<'_>>, | ||
| 450 | ) -> Poll<WaitResult<T>>; | ||
| 451 | 432 | ||
| 452 | fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T>; | 433 | fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>; |
| 453 | 434 | ||
| 454 | fn publish_immediate(&self, message: T); | 435 | fn publish_immediate(&self, message: T); |
| 455 | 436 | ||
| 456 | fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); | 437 | fn unregister_subscriber(&self, subscriber_next_message_id: u64); |
| 457 | 438 | ||
| 458 | fn unregister_publisher(&self, publisher_index: usize); | 439 | fn unregister_publisher(&self); |
| 459 | } | 440 | } |
| 460 | 441 | ||
| 461 | /// Future for the subscriber wait action | 442 | /// Future for the subscriber wait action |
| @@ -467,10 +448,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> { | |||
| 467 | type Output = WaitResult<T>; | 448 | type Output = WaitResult<T>; |
| 468 | 449 | ||
| 469 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 450 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 470 | let sub_index = self.subscriber.subscriber_index; | ||
| 471 | self.subscriber | 451 | self.subscriber |
| 472 | .channel | 452 | .channel |
| 473 | .get_message_with_context(&mut self.subscriber.next_message_id, sub_index, Some(cx)) | 453 | .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx)) |
| 474 | } | 454 | } |
| 475 | } | 455 | } |
| 476 | 456 | ||
| @@ -488,11 +468,7 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { | |||
| 488 | 468 | ||
| 489 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 469 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 490 | let message = self.message.take().unwrap(); | 470 | let message = self.message.take().unwrap(); |
| 491 | match self | 471 | match self.publisher.channel.publish_with_context(message, Some(cx)) { |
| 492 | .publisher | ||
| 493 | .channel | ||
| 494 | .publish_with_context(message, self.publisher.publisher_index, Some(cx)) | ||
| 495 | { | ||
| 496 | Ok(()) => Poll::Ready(()), | 472 | Ok(()) => Poll::Ready(()), |
| 497 | Err(message) => { | 473 | Err(message) => { |
| 498 | self.message = Some(message); | 474 | self.message = Some(message); |
diff --git a/embassy/src/waitqueue/mod.rs b/embassy/src/waitqueue/mod.rs index a2bafad99..5c4e1bc3b 100644 --- a/embassy/src/waitqueue/mod.rs +++ b/embassy/src/waitqueue/mod.rs | |||
| @@ -3,3 +3,6 @@ | |||
| 3 | #[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] | 3 | #[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] |
| 4 | mod waker; | 4 | mod waker; |
| 5 | pub use waker::*; | 5 | pub use waker::*; |
| 6 | |||
| 7 | mod multi_waker; | ||
| 8 | pub use multi_waker::*; | ||
diff --git a/embassy/src/waitqueue/multi_waker.rs b/embassy/src/waitqueue/multi_waker.rs new file mode 100644 index 000000000..6e8710cb4 --- /dev/null +++ b/embassy/src/waitqueue/multi_waker.rs | |||
| @@ -0,0 +1,31 @@ | |||
| 1 | use core::task::Waker; | ||
| 2 | |||
| 3 | use super::WakerRegistration; | ||
| 4 | |||
| 5 | pub struct MultiWakerRegistration<const N: usize> { | ||
| 6 | wakers: [WakerRegistration; N], | ||
| 7 | } | ||
| 8 | |||
| 9 | impl<const N: usize> MultiWakerRegistration<N> { | ||
| 10 | pub const fn new() -> Self { | ||
| 11 | const WAKER: WakerRegistration = WakerRegistration::new(); | ||
| 12 | Self { wakers: [WAKER; N] } | ||
| 13 | } | ||
| 14 | |||
| 15 | /// Register a waker. If the buffer is full the function returns it in the error | ||
| 16 | pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> { | ||
| 17 | if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) { | ||
| 18 | waker_slot.register(w); | ||
| 19 | Ok(()) | ||
| 20 | } else { | ||
| 21 | Err(w) | ||
| 22 | } | ||
| 23 | } | ||
| 24 | |||
| 25 | /// Wake all registered wakers. This clears the buffer | ||
| 26 | pub fn wake(&mut self) { | ||
| 27 | for waker_slot in self.wakers.iter_mut() { | ||
| 28 | waker_slot.wake() | ||
| 29 | } | ||
| 30 | } | ||
| 31 | } | ||
diff --git a/embassy/src/waitqueue/waker.rs b/embassy/src/waitqueue/waker.rs index da907300a..a90154cce 100644 --- a/embassy/src/waitqueue/waker.rs +++ b/embassy/src/waitqueue/waker.rs | |||
| @@ -50,6 +50,11 @@ impl WakerRegistration { | |||
| 50 | unsafe { wake_task(w) } | 50 | unsafe { wake_task(w) } |
| 51 | } | 51 | } |
| 52 | } | 52 | } |
| 53 | |||
| 54 | /// Returns true if a waker is currently registered | ||
| 55 | pub fn occupied(&self) -> bool { | ||
| 56 | self.waker.is_some() | ||
| 57 | } | ||
| 53 | } | 58 | } |
| 54 | 59 | ||
| 55 | // SAFETY: `WakerRegistration` effectively contains an `Option<Waker>`, | 60 | // SAFETY: `WakerRegistration` effectively contains an `Option<Waker>`, |
diff --git a/embassy/src/waitqueue/waker_agnostic.rs b/embassy/src/waitqueue/waker_agnostic.rs index 89430aa4c..62e3adb79 100644 --- a/embassy/src/waitqueue/waker_agnostic.rs +++ b/embassy/src/waitqueue/waker_agnostic.rs | |||
| @@ -47,6 +47,11 @@ impl WakerRegistration { | |||
| 47 | w.wake() | 47 | w.wake() |
| 48 | } | 48 | } |
| 49 | } | 49 | } |
| 50 | |||
| 51 | /// Returns true if a waker is currently registered | ||
| 52 | pub fn occupied(&self) -> bool { | ||
| 53 | self.waker.is_some() | ||
| 54 | } | ||
| 50 | } | 55 | } |
| 51 | 56 | ||
| 52 | /// Utility struct to register and wake a waker. | 57 | /// Utility struct to register and wake a waker. |
