aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDion Dokter <[email protected]>2022-06-17 13:54:34 +0200
committerDion Dokter <[email protected]>2022-06-17 13:54:34 +0200
commiteb304c244839df742f4a76a4072f13179e397d0b (patch)
tree9c45319a4adaafc6b0a6c57e8a02eb957b4d58a4
parent2a4cdd05faa40a7ce63f66e45080cdacb5171747 (diff)
Added a function to WakerRegistration to check if it's occupied.
Created a MultiWakerRegistration that can hold multiple wakers. Got rid of some options and the pub/sub_index
-rw-r--r--embassy/src/channel/pubsub.rs176
-rw-r--r--embassy/src/waitqueue/mod.rs3
-rw-r--r--embassy/src/waitqueue/multi_waker.rs31
-rw-r--r--embassy/src/waitqueue/waker.rs5
-rw-r--r--embassy/src/waitqueue/waker_agnostic.rs5
5 files changed, 120 insertions, 100 deletions
diff --git a/embassy/src/channel/pubsub.rs b/embassy/src/channel/pubsub.rs
index cc00f47af..41f275c4e 100644
--- a/embassy/src/channel/pubsub.rs
+++ b/embassy/src/channel/pubsub.rs
@@ -10,7 +10,7 @@ use heapless::Deque;
10 10
11use crate::blocking_mutex::raw::RawMutex; 11use crate::blocking_mutex::raw::RawMutex;
12use crate::blocking_mutex::Mutex; 12use crate::blocking_mutex::Mutex;
13use crate::waitqueue::WakerRegistration; 13use crate::waitqueue::MultiWakerRegistration;
14 14
15/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers 15/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
16/// 16///
@@ -42,21 +42,15 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
42 self.inner.lock(|inner| { 42 self.inner.lock(|inner| {
43 let mut s = inner.borrow_mut(); 43 let mut s = inner.borrow_mut();
44 44
45 // Search for an empty subscriber spot 45 if s.subscriber_count >= SUBS {
46 for (i, sub_spot) in s.subscriber_wakers.iter_mut().enumerate() { 46 Err(Error::MaximumSubscribersReached)
47 if sub_spot.is_none() { 47 } else {
48 // We've found a spot, so now fill it and create the subscriber 48 s.subscriber_count += 1;
49 *sub_spot = Some(WakerRegistration::new()); 49 Ok(Subscriber {
50 return Ok(Subscriber { 50 next_message_id: s.next_message_id,
51 subscriber_index: i, 51 channel: self,
52 next_message_id: s.next_message_id, 52 })
53 channel: self,
54 });
55 }
56 } 53 }
57
58 // No spot was found, we're full
59 Err(Error::MaximumSubscribersReached)
60 }) 54 })
61 } 55 }
62 56
@@ -67,20 +61,12 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
67 self.inner.lock(|inner| { 61 self.inner.lock(|inner| {
68 let mut s = inner.borrow_mut(); 62 let mut s = inner.borrow_mut();
69 63
70 // Search for an empty publisher spot 64 if s.publisher_count >= PUBS {
71 for (i, pub_spot) in s.publisher_wakers.iter_mut().enumerate() { 65 Err(Error::MaximumPublishersReached)
72 if pub_spot.is_none() { 66 } else {
73 // We've found a spot, so now fill it and create the subscriber 67 s.publisher_count += 1;
74 *pub_spot = Some(WakerRegistration::new()); 68 Ok(Publisher { channel: self })
75 return Ok(Publisher {
76 publisher_index: i,
77 channel: self,
78 });
79 }
80 } 69 }
81
82 // No spot was found, we're full
83 Err(Error::MaximumPublishersReached)
84 }) 70 })
85 } 71 }
86 72
@@ -94,12 +80,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
94impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> 80impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T>
95 for PubSubChannel<M, T, CAP, SUBS, PUBS> 81 for PubSubChannel<M, T, CAP, SUBS, PUBS>
96{ 82{
97 fn get_message_with_context( 83 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
98 &self,
99 next_message_id: &mut u64,
100 subscriber_index: usize,
101 cx: Option<&mut Context<'_>>,
102 ) -> Poll<WaitResult<T>> {
103 self.inner.lock(|s| { 84 self.inner.lock(|s| {
104 let mut s = s.borrow_mut(); 85 let mut s = s.borrow_mut();
105 86
@@ -113,7 +94,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
113 // No, so we need to reregister our waker and sleep again 94 // No, so we need to reregister our waker and sleep again
114 None => { 95 None => {
115 if let Some(cx) = cx { 96 if let Some(cx) = cx {
116 s.register_subscriber_waker(subscriber_index, cx.waker()); 97 s.register_subscriber_waker(cx.waker());
117 } 98 }
118 Poll::Pending 99 Poll::Pending
119 } 100 }
@@ -126,7 +107,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
126 }) 107 })
127 } 108 }
128 109
129 fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T> { 110 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
130 self.inner.lock(|s| { 111 self.inner.lock(|s| {
131 let mut s = s.borrow_mut(); 112 let mut s = s.borrow_mut();
132 // Try to publish the message 113 // Try to publish the message
@@ -136,7 +117,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
136 // The queue is full, so we need to reregister our waker and go to sleep 117 // The queue is full, so we need to reregister our waker and go to sleep
137 Err(message) => { 118 Err(message) => {
138 if let Some(cx) = cx { 119 if let Some(cx) = cx {
139 s.register_publisher_waker(publisher_index, cx.waker()); 120 s.register_publisher_waker(cx.waker());
140 } 121 }
141 Err(message) 122 Err(message)
142 } 123 }
@@ -151,17 +132,17 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
151 }) 132 })
152 } 133 }
153 134
154 fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64) { 135 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
155 self.inner.lock(|s| { 136 self.inner.lock(|s| {
156 let mut s = s.borrow_mut(); 137 let mut s = s.borrow_mut();
157 s.unregister_subscriber(subscriber_index, subscriber_next_message_id) 138 s.unregister_subscriber(subscriber_next_message_id)
158 }) 139 })
159 } 140 }
160 141
161 fn unregister_publisher(&self, publisher_index: usize) { 142 fn unregister_publisher(&self) {
162 self.inner.lock(|s| { 143 self.inner.lock(|s| {
163 let mut s = s.borrow_mut(); 144 let mut s = s.borrow_mut();
164 s.unregister_publisher(publisher_index) 145 s.unregister_publisher()
165 }) 146 })
166 } 147 }
167} 148}
@@ -175,29 +156,30 @@ struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: us
175 /// If a million messages were published every second, then the ID's would run out in about 584942 years. 156 /// If a million messages were published every second, then the ID's would run out in about 584942 years.
176 next_message_id: u64, 157 next_message_id: u64,
177 /// Collection of wakers for Subscribers that are waiting. 158 /// Collection of wakers for Subscribers that are waiting.
178 /// The [Subscriber::subscriber_index] field indexes into this array. 159 subscriber_wakers: MultiWakerRegistration<SUBS>,
179 subscriber_wakers: [Option<WakerRegistration>; SUBS],
180 /// Collection of wakers for Publishers that are waiting. 160 /// Collection of wakers for Publishers that are waiting.
181 /// The [Publisher::publisher_index] field indexes into this array. 161 publisher_wakers: MultiWakerRegistration<PUBS>,
182 publisher_wakers: [Option<WakerRegistration>; PUBS], 162 /// The amount of subscribers that are active
163 subscriber_count: usize,
164 /// The amount of publishers that are active
165 publisher_count: usize,
183} 166}
184 167
185impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> { 168impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
186 /// Create a new internal channel state 169 /// Create a new internal channel state
187 const fn new() -> Self { 170 const fn new() -> Self {
188 const WAKER_INIT: Option<WakerRegistration> = None;
189 Self { 171 Self {
190 queue: Deque::new(), 172 queue: Deque::new(),
191 next_message_id: 0, 173 next_message_id: 0,
192 subscriber_wakers: [WAKER_INIT; SUBS], 174 subscriber_wakers: MultiWakerRegistration::new(),
193 publisher_wakers: [WAKER_INIT; PUBS], 175 publisher_wakers: MultiWakerRegistration::new(),
176 subscriber_count: 0,
177 publisher_count: 0,
194 } 178 }
195 } 179 }
196 180
197 fn try_publish(&mut self, message: T) -> Result<(), T> { 181 fn try_publish(&mut self, message: T) -> Result<(), T> {
198 let active_subscriber_count = self.subscriber_wakers.iter().flatten().count(); 182 if self.subscriber_count == 0 {
199
200 if active_subscriber_count == 0 {
201 // We don't need to publish anything because there is no one to receive it 183 // We don't need to publish anything because there is no one to receive it
202 return Ok(()); 184 return Ok(());
203 } 185 }
@@ -206,14 +188,12 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
206 return Err(message); 188 return Err(message);
207 } 189 }
208 // We just did a check for this 190 // We just did a check for this
209 self.queue.push_back((message, active_subscriber_count)).ok().unwrap(); 191 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
210 192
211 self.next_message_id += 1; 193 self.next_message_id += 1;
212 194
213 // Wake all of the subscribers 195 // Wake all of the subscribers
214 for active_subscriber in self.subscriber_wakers.iter_mut().flatten() { 196 self.subscriber_wakers.wake();
215 active_subscriber.wake()
216 }
217 197
218 Ok(()) 198 Ok(())
219 } 199 }
@@ -250,26 +230,42 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
250 230
251 if current_message_index == 0 && queue_item.1 == 0 { 231 if current_message_index == 0 && queue_item.1 == 0 {
252 self.queue.pop_front(); 232 self.queue.pop_front();
253 self.publisher_wakers.iter_mut().flatten().for_each(|w| w.wake()); 233 self.publisher_wakers.wake();
254 } 234 }
255 235
256 Some(WaitResult::Message(message)) 236 Some(WaitResult::Message(message))
257 } 237 }
258 238
259 fn register_subscriber_waker(&mut self, subscriber_index: usize, waker: &Waker) { 239 fn register_subscriber_waker(&mut self, waker: &Waker) {
260 self.subscriber_wakers[subscriber_index] 240 match self.subscriber_wakers.register(waker) {
261 .as_mut() 241 Ok(()) => {}
262 .unwrap() 242 Err(_) => {
263 .register(waker); 243 // All waker slots were full. This can only happen when there was a subscriber that now has dropped.
244 // We need to throw it away. It's a bit inefficient, but we can wake everything.
245 // Any future that is still active will simply reregister.
246 // This won't happen a lot, so it's ok.
247 self.subscriber_wakers.wake();
248 self.subscriber_wakers.register(waker).unwrap();
249 }
250 }
264 } 251 }
265 252
266 fn register_publisher_waker(&mut self, publisher_index: usize, waker: &Waker) { 253 fn register_publisher_waker(&mut self, waker: &Waker) {
267 self.publisher_wakers[publisher_index].as_mut().unwrap().register(waker); 254 match self.publisher_wakers.register(waker) {
255 Ok(()) => {}
256 Err(_) => {
257 // All waker slots were full. This can only happen when there was a publisher that now has dropped.
258 // We need to throw it away. It's a bit inefficient, but we can wake everything.
259 // Any future that is still active will simply reregister.
260 // This won't happen a lot, so it's ok.
261 self.publisher_wakers.wake();
262 self.publisher_wakers.register(waker).unwrap();
263 }
264 }
268 } 265 }
269 266
270 fn unregister_subscriber(&mut self, subscriber_index: usize, subscriber_next_message_id: u64) { 267 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
271 // Remove the subscriber from the wakers 268 self.subscriber_count -= 1;
272 self.subscriber_wakers[subscriber_index] = None;
273 269
274 // All messages that haven't been read yet by this subscriber must have their counter decremented 270 // All messages that haven't been read yet by this subscriber must have their counter decremented
275 let start_id = self.next_message_id - self.queue.len() as u64; 271 let start_id = self.next_message_id - self.queue.len() as u64;
@@ -282,9 +278,8 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
282 } 278 }
283 } 279 }
284 280
285 fn unregister_publisher(&mut self, publisher_index: usize) { 281 fn unregister_publisher(&mut self) {
286 // Remove the publisher from the wakers 282 self.publisher_count -= 1;
287 self.publisher_wakers[publisher_index] = None;
288 } 283 }
289} 284}
290 285
@@ -293,8 +288,6 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
293/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's 288/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
294/// generics are erased on this subscriber 289/// generics are erased on this subscriber
295pub struct Subscriber<'a, T: Clone> { 290pub struct Subscriber<'a, T: Clone> {
296 /// Our index into the channel
297 subscriber_index: usize,
298 /// The message id of the next message we are yet to receive 291 /// The message id of the next message we are yet to receive
299 next_message_id: u64, 292 next_message_id: u64,
300 /// The channel we are a subscriber to 293 /// The channel we are a subscriber to
@@ -321,10 +314,7 @@ impl<'a, T: Clone> Subscriber<'a, T> {
321 /// 314 ///
322 /// This function does not peek. The message is received if there is one. 315 /// This function does not peek. The message is received if there is one.
323 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> { 316 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
324 match self 317 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
325 .channel
326 .get_message_with_context(&mut self.next_message_id, self.subscriber_index, None)
327 {
328 Poll::Ready(result) => Some(result), 318 Poll::Ready(result) => Some(result),
329 Poll::Pending => None, 319 Poll::Pending => None,
330 } 320 }
@@ -346,8 +336,7 @@ impl<'a, T: Clone> Subscriber<'a, T> {
346 336
347impl<'a, T: Clone> Drop for Subscriber<'a, T> { 337impl<'a, T: Clone> Drop for Subscriber<'a, T> {
348 fn drop(&mut self) { 338 fn drop(&mut self) {
349 self.channel 339 self.channel.unregister_subscriber(self.next_message_id)
350 .unregister_subscriber(self.subscriber_index, self.next_message_id)
351 } 340 }
352} 341}
353 342
@@ -357,10 +346,9 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
357 type Item = T; 346 type Item = T;
358 347
359 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 348 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
360 let sub_index = self.subscriber_index;
361 match self 349 match self
362 .channel 350 .channel
363 .get_message_with_context(&mut self.next_message_id, sub_index, Some(cx)) 351 .get_message_with_context(&mut self.next_message_id, Some(cx))
364 { 352 {
365 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), 353 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
366 Poll::Ready(WaitResult::Lagged(_)) => { 354 Poll::Ready(WaitResult::Lagged(_)) => {
@@ -377,8 +365,6 @@ impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
377/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's 365/// This instance carries a reference to the channel, but uses a trait object for it so that the channel's
378/// generics are erased on this subscriber 366/// generics are erased on this subscriber
379pub struct Publisher<'a, T: Clone> { 367pub struct Publisher<'a, T: Clone> {
380 /// Our index into the channel
381 publisher_index: usize,
382 /// The channel we are a publisher for 368 /// The channel we are a publisher for
383 channel: &'a dyn PubSubBehavior<T>, 369 channel: &'a dyn PubSubBehavior<T>,
384} 370}
@@ -400,13 +386,13 @@ impl<'a, T: Clone> Publisher<'a, T> {
400 386
401 /// Publish a message if there is space in the message queue 387 /// Publish a message if there is space in the message queue
402 pub fn try_publish(&self, message: T) -> Result<(), T> { 388 pub fn try_publish(&self, message: T) -> Result<(), T> {
403 self.channel.publish_with_context(message, self.publisher_index, None) 389 self.channel.publish_with_context(message, None)
404 } 390 }
405} 391}
406 392
407impl<'a, T: Clone> Drop for Publisher<'a, T> { 393impl<'a, T: Clone> Drop for Publisher<'a, T> {
408 fn drop(&mut self) { 394 fn drop(&mut self) {
409 self.channel.unregister_publisher(self.publisher_index) 395 self.channel.unregister_publisher()
410 } 396 }
411} 397}
412 398
@@ -426,7 +412,7 @@ impl<'a, T: Clone> ImmediatePublisher<'a, T> {
426 412
427 /// Publish a message if there is space in the message queue 413 /// Publish a message if there is space in the message queue
428 pub fn try_publish(&self, message: T) -> Result<(), T> { 414 pub fn try_publish(&self, message: T) -> Result<(), T> {
429 self.channel.publish_with_context(message, usize::MAX, None) 415 self.channel.publish_with_context(message, None)
430 } 416 }
431} 417}
432 418
@@ -442,20 +428,15 @@ pub enum Error {
442} 428}
443 429
444trait PubSubBehavior<T> { 430trait PubSubBehavior<T> {
445 fn get_message_with_context( 431 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
446 &self,
447 next_message_id: &mut u64,
448 subscriber_index: usize,
449 cx: Option<&mut Context<'_>>,
450 ) -> Poll<WaitResult<T>>;
451 432
452 fn publish_with_context(&self, message: T, publisher_index: usize, cx: Option<&mut Context<'_>>) -> Result<(), T>; 433 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
453 434
454 fn publish_immediate(&self, message: T); 435 fn publish_immediate(&self, message: T);
455 436
456 fn unregister_subscriber(&self, subscriber_index: usize, subscriber_next_message_id: u64); 437 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
457 438
458 fn unregister_publisher(&self, publisher_index: usize); 439 fn unregister_publisher(&self);
459} 440}
460 441
461/// Future for the subscriber wait action 442/// Future for the subscriber wait action
@@ -467,10 +448,9 @@ impl<'s, 'a, T: Clone> Future for SubscriberWaitFuture<'s, 'a, T> {
467 type Output = WaitResult<T>; 448 type Output = WaitResult<T>;
468 449
469 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
470 let sub_index = self.subscriber.subscriber_index;
471 self.subscriber 451 self.subscriber
472 .channel 452 .channel
473 .get_message_with_context(&mut self.subscriber.next_message_id, sub_index, Some(cx)) 453 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
474 } 454 }
475} 455}
476 456
@@ -488,11 +468,7 @@ impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> {
488 468
489 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 469 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
490 let message = self.message.take().unwrap(); 470 let message = self.message.take().unwrap();
491 match self 471 match self.publisher.channel.publish_with_context(message, Some(cx)) {
492 .publisher
493 .channel
494 .publish_with_context(message, self.publisher.publisher_index, Some(cx))
495 {
496 Ok(()) => Poll::Ready(()), 472 Ok(()) => Poll::Ready(()),
497 Err(message) => { 473 Err(message) => {
498 self.message = Some(message); 474 self.message = Some(message);
diff --git a/embassy/src/waitqueue/mod.rs b/embassy/src/waitqueue/mod.rs
index a2bafad99..5c4e1bc3b 100644
--- a/embassy/src/waitqueue/mod.rs
+++ b/embassy/src/waitqueue/mod.rs
@@ -3,3 +3,6 @@
3#[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")] 3#[cfg_attr(feature = "executor-agnostic", path = "waker_agnostic.rs")]
4mod waker; 4mod waker;
5pub use waker::*; 5pub use waker::*;
6
7mod multi_waker;
8pub use multi_waker::*;
diff --git a/embassy/src/waitqueue/multi_waker.rs b/embassy/src/waitqueue/multi_waker.rs
new file mode 100644
index 000000000..6e8710cb4
--- /dev/null
+++ b/embassy/src/waitqueue/multi_waker.rs
@@ -0,0 +1,31 @@
1use core::task::Waker;
2
3use super::WakerRegistration;
4
5pub struct MultiWakerRegistration<const N: usize> {
6 wakers: [WakerRegistration; N],
7}
8
9impl<const N: usize> MultiWakerRegistration<N> {
10 pub const fn new() -> Self {
11 const WAKER: WakerRegistration = WakerRegistration::new();
12 Self { wakers: [WAKER; N] }
13 }
14
15 /// Register a waker. If the buffer is full the function returns it in the error
16 pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> {
17 if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) {
18 waker_slot.register(w);
19 Ok(())
20 } else {
21 Err(w)
22 }
23 }
24
25 /// Wake all registered wakers. This clears the buffer
26 pub fn wake(&mut self) {
27 for waker_slot in self.wakers.iter_mut() {
28 waker_slot.wake()
29 }
30 }
31}
diff --git a/embassy/src/waitqueue/waker.rs b/embassy/src/waitqueue/waker.rs
index da907300a..a90154cce 100644
--- a/embassy/src/waitqueue/waker.rs
+++ b/embassy/src/waitqueue/waker.rs
@@ -50,6 +50,11 @@ impl WakerRegistration {
50 unsafe { wake_task(w) } 50 unsafe { wake_task(w) }
51 } 51 }
52 } 52 }
53
54 /// Returns true if a waker is currently registered
55 pub fn occupied(&self) -> bool {
56 self.waker.is_some()
57 }
53} 58}
54 59
55// SAFETY: `WakerRegistration` effectively contains an `Option<Waker>`, 60// SAFETY: `WakerRegistration` effectively contains an `Option<Waker>`,
diff --git a/embassy/src/waitqueue/waker_agnostic.rs b/embassy/src/waitqueue/waker_agnostic.rs
index 89430aa4c..62e3adb79 100644
--- a/embassy/src/waitqueue/waker_agnostic.rs
+++ b/embassy/src/waitqueue/waker_agnostic.rs
@@ -47,6 +47,11 @@ impl WakerRegistration {
47 w.wake() 47 w.wake()
48 } 48 }
49 } 49 }
50
51 /// Returns true if a waker is currently registered
52 pub fn occupied(&self) -> bool {
53 self.waker.is_some()
54 }
50} 55}
51 56
52/// Utility struct to register and wake a waker. 57/// Utility struct to register and wake a waker.