diff options
| author | Dario Nieuwenhuis <[email protected]> | 2021-09-13 00:25:53 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-09-13 00:25:53 +0200 |
| commit | f1c35b40c74db489da8e04f1c2e87a1d4030c617 (patch) | |
| tree | c8f8671800eff4f2497ea8e4c7a96ba752db4aaa | |
| parent | 67fa6b06fafc8635d2063e687904d30864f45a05 (diff) | |
| parent | 70e5877d6823ba0894241e8bedc5cefa7e21bceb (diff) | |
Merge pull request #396 from embassy-rs/channel-fixes
embassy/channel: several improvements
| -rw-r--r-- | embassy/Cargo.toml | 1 | ||||
| -rw-r--r-- | embassy/src/blocking_mutex/kind.rs | 19 | ||||
| -rw-r--r-- | embassy/src/blocking_mutex/mod.rs | 10 | ||||
| -rw-r--r-- | embassy/src/channel/mpsc.rs | 264 | ||||
| -rw-r--r-- | examples/nrf/src/bin/mpsc.rs | 7 |
5 files changed, 116 insertions, 185 deletions
diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index 0a8ab4434..ae06bc198 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml | |||
| @@ -42,6 +42,7 @@ embassy-traits = { version = "0.1.0", path = "../embassy-traits"} | |||
| 42 | atomic-polyfill = "0.1.3" | 42 | atomic-polyfill = "0.1.3" |
| 43 | critical-section = "0.2.1" | 43 | critical-section = "0.2.1" |
| 44 | embedded-hal = "0.2.6" | 44 | embedded-hal = "0.2.6" |
| 45 | heapless = "0.7.5" | ||
| 45 | 46 | ||
| 46 | [dev-dependencies] | 47 | [dev-dependencies] |
| 47 | embassy = { path = ".", features = ["executor-agnostic"] } | 48 | embassy = { path = ".", features = ["executor-agnostic"] } |
diff --git a/embassy/src/blocking_mutex/kind.rs b/embassy/src/blocking_mutex/kind.rs new file mode 100644 index 000000000..30fc90497 --- /dev/null +++ b/embassy/src/blocking_mutex/kind.rs | |||
| @@ -0,0 +1,19 @@ | |||
| 1 | use super::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; | ||
| 2 | |||
| 3 | pub trait MutexKind { | ||
| 4 | type Mutex<T>: Mutex<Data = T>; | ||
| 5 | } | ||
| 6 | |||
| 7 | pub enum CriticalSection {} | ||
| 8 | impl MutexKind for CriticalSection { | ||
| 9 | type Mutex<T> = CriticalSectionMutex<T>; | ||
| 10 | } | ||
| 11 | |||
| 12 | pub enum ThreadMode {} | ||
| 13 | impl MutexKind for ThreadMode { | ||
| 14 | type Mutex<T> = ThreadModeMutex<T>; | ||
| 15 | } | ||
| 16 | pub enum Noop {} | ||
| 17 | impl MutexKind for Noop { | ||
| 18 | type Mutex<T> = NoopMutex<T>; | ||
| 19 | } | ||
diff --git a/embassy/src/blocking_mutex/mod.rs b/embassy/src/blocking_mutex/mod.rs index d112d2ede..641a1ed93 100644 --- a/embassy/src/blocking_mutex/mod.rs +++ b/embassy/src/blocking_mutex/mod.rs | |||
| @@ -1,5 +1,7 @@ | |||
| 1 | //! Blocking mutex (not async) | 1 | //! Blocking mutex (not async) |
| 2 | 2 | ||
| 3 | pub mod kind; | ||
| 4 | |||
| 3 | use core::cell::UnsafeCell; | 5 | use core::cell::UnsafeCell; |
| 4 | use critical_section::CriticalSection; | 6 | use critical_section::CriticalSection; |
| 5 | 7 | ||
| @@ -13,7 +15,7 @@ pub trait Mutex { | |||
| 13 | fn new(data: Self::Data) -> Self; | 15 | fn new(data: Self::Data) -> Self; |
| 14 | 16 | ||
| 15 | /// Creates a critical section and grants temporary access to the protected data. | 17 | /// Creates a critical section and grants temporary access to the protected data. |
| 16 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; | 18 | fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R; |
| 17 | } | 19 | } |
| 18 | 20 | ||
| 19 | /// A "mutex" based on critical sections | 21 | /// A "mutex" based on critical sections |
| @@ -55,7 +57,7 @@ impl<T> Mutex for CriticalSectionMutex<T> { | |||
| 55 | Self::new(data) | 57 | Self::new(data) |
| 56 | } | 58 | } |
| 57 | 59 | ||
| 58 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | 60 | fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R { |
| 59 | critical_section::with(|cs| f(self.borrow(cs))) | 61 | critical_section::with(|cs| f(self.borrow(cs))) |
| 60 | } | 62 | } |
| 61 | } | 63 | } |
| @@ -102,7 +104,7 @@ impl<T> Mutex for ThreadModeMutex<T> { | |||
| 102 | Self::new(data) | 104 | Self::new(data) |
| 103 | } | 105 | } |
| 104 | 106 | ||
| 105 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | 107 | fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R { |
| 106 | f(self.borrow()) | 108 | f(self.borrow()) |
| 107 | } | 109 | } |
| 108 | } | 110 | } |
| @@ -155,7 +157,7 @@ impl<T> Mutex for NoopMutex<T> { | |||
| 155 | Self::new(data) | 157 | Self::new(data) |
| 156 | } | 158 | } |
| 157 | 159 | ||
| 158 | fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { | 160 | fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R { |
| 159 | f(self.borrow()) | 161 | f(self.borrow()) |
| 160 | } | 162 | } |
| 161 | } | 163 | } |
diff --git a/embassy/src/channel/mpsc.rs b/embassy/src/channel/mpsc.rs index b20d48a95..9a57c0b19 100644 --- a/embassy/src/channel/mpsc.rs +++ b/embassy/src/channel/mpsc.rs | |||
| @@ -37,19 +37,18 @@ | |||
| 37 | //! | 37 | //! |
| 38 | //! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html | 38 | //! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html |
| 39 | 39 | ||
| 40 | use core::cell::UnsafeCell; | 40 | use core::cell::RefCell; |
| 41 | use core::fmt; | 41 | use core::fmt; |
| 42 | use core::marker::PhantomData; | ||
| 43 | use core::mem::MaybeUninit; | ||
| 44 | use core::pin::Pin; | 42 | use core::pin::Pin; |
| 45 | use core::ptr; | ||
| 46 | use core::task::Context; | 43 | use core::task::Context; |
| 47 | use core::task::Poll; | 44 | use core::task::Poll; |
| 48 | use core::task::Waker; | 45 | use core::task::Waker; |
| 49 | 46 | ||
| 50 | use futures::Future; | 47 | use futures::Future; |
| 48 | use heapless::Deque; | ||
| 51 | 49 | ||
| 52 | use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; | 50 | use crate::blocking_mutex::kind::MutexKind; |
| 51 | use crate::blocking_mutex::Mutex; | ||
| 53 | use crate::waitqueue::WakerRegistration; | 52 | use crate::waitqueue::WakerRegistration; |
| 54 | 53 | ||
| 55 | /// Send values to the associated `Receiver`. | 54 | /// Send values to the associated `Receiver`. |
| @@ -57,36 +56,19 @@ use crate::waitqueue::WakerRegistration; | |||
| 57 | /// Instances are created by the [`split`](split) function. | 56 | /// Instances are created by the [`split`](split) function. |
| 58 | pub struct Sender<'ch, M, T, const N: usize> | 57 | pub struct Sender<'ch, M, T, const N: usize> |
| 59 | where | 58 | where |
| 60 | M: Mutex<Data = ()>, | 59 | M: MutexKind, |
| 61 | { | 60 | { |
| 62 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | 61 | channel: &'ch Channel<M, T, N>, |
| 63 | } | 62 | } |
| 64 | 63 | ||
| 65 | // Safe to pass the sender around | ||
| 66 | unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync | ||
| 67 | {} | ||
| 68 | unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync | ||
| 69 | {} | ||
| 70 | |||
| 71 | /// Receive values from the associated `Sender`. | 64 | /// Receive values from the associated `Sender`. |
| 72 | /// | 65 | /// |
| 73 | /// Instances are created by the [`split`](split) function. | 66 | /// Instances are created by the [`split`](split) function. |
| 74 | pub struct Receiver<'ch, M, T, const N: usize> | 67 | pub struct Receiver<'ch, M, T, const N: usize> |
| 75 | where | 68 | where |
| 76 | M: Mutex<Data = ()>, | 69 | M: MutexKind, |
| 77 | { | ||
| 78 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | ||
| 79 | _receiver_consumed: &'ch mut PhantomData<()>, | ||
| 80 | } | ||
| 81 | |||
| 82 | // Safe to pass the receiver around | ||
| 83 | unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where | ||
| 84 | M: Mutex<Data = ()> + Sync | ||
| 85 | { | ||
| 86 | } | ||
| 87 | unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where | ||
| 88 | M: Mutex<Data = ()> + Sync | ||
| 89 | { | 70 | { |
| 71 | channel: &'ch Channel<M, T, N>, | ||
| 90 | } | 72 | } |
| 91 | 73 | ||
| 92 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. | 74 | /// Splits a bounded mpsc channel into a `Sender` and `Receiver`. |
| @@ -117,16 +99,11 @@ pub fn split<M, T, const N: usize>( | |||
| 117 | channel: &mut Channel<M, T, N>, | 99 | channel: &mut Channel<M, T, N>, |
| 118 | ) -> (Sender<M, T, N>, Receiver<M, T, N>) | 100 | ) -> (Sender<M, T, N>, Receiver<M, T, N>) |
| 119 | where | 101 | where |
| 120 | M: Mutex<Data = ()>, | 102 | M: MutexKind, |
| 121 | { | 103 | { |
| 122 | let sender = Sender { | 104 | let sender = Sender { channel }; |
| 123 | channel_cell: &channel.channel_cell, | 105 | let receiver = Receiver { channel }; |
| 124 | }; | 106 | channel.lock(|c| { |
| 125 | let receiver = Receiver { | ||
| 126 | channel_cell: &channel.channel_cell, | ||
| 127 | _receiver_consumed: &mut channel.receiver_consumed, | ||
| 128 | }; | ||
| 129 | Channel::lock(&channel.channel_cell, |c| { | ||
| 130 | c.register_receiver(); | 107 | c.register_receiver(); |
| 131 | c.register_sender(); | 108 | c.register_sender(); |
| 132 | }); | 109 | }); |
| @@ -135,7 +112,7 @@ where | |||
| 135 | 112 | ||
| 136 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> | 113 | impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N> |
| 137 | where | 114 | where |
| 138 | M: Mutex<Data = ()>, | 115 | M: MutexKind, |
| 139 | { | 116 | { |
| 140 | /// Receives the next value for this receiver. | 117 | /// Receives the next value for this receiver. |
| 141 | /// | 118 | /// |
| @@ -155,7 +132,7 @@ where | |||
| 155 | /// [`close`]: Self::close | 132 | /// [`close`]: Self::close |
| 156 | pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { | 133 | pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { |
| 157 | RecvFuture { | 134 | RecvFuture { |
| 158 | channel_cell: self.channel_cell, | 135 | channel: self.channel, |
| 159 | } | 136 | } |
| 160 | } | 137 | } |
| 161 | 138 | ||
| @@ -164,7 +141,7 @@ where | |||
| 164 | /// This method will either receive a message from the channel immediately or return an error | 141 | /// This method will either receive a message from the channel immediately or return an error |
| 165 | /// if the channel is empty. | 142 | /// if the channel is empty. |
| 166 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | 143 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
| 167 | Channel::lock(self.channel_cell, |c| c.try_recv()) | 144 | self.channel.lock(|c| c.try_recv()) |
| 168 | } | 145 | } |
| 169 | 146 | ||
| 170 | /// Closes the receiving half of a channel without dropping it. | 147 | /// Closes the receiving half of a channel without dropping it. |
| @@ -178,56 +155,45 @@ where | |||
| 178 | /// until those are released. | 155 | /// until those are released. |
| 179 | /// | 156 | /// |
| 180 | pub fn close(&mut self) { | 157 | pub fn close(&mut self) { |
| 181 | Channel::lock(self.channel_cell, |c| c.close()) | 158 | self.channel.lock(|c| c.close()) |
| 182 | } | 159 | } |
| 183 | } | 160 | } |
| 184 | 161 | ||
| 185 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> | 162 | impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> |
| 186 | where | 163 | where |
| 187 | M: Mutex<Data = ()>, | 164 | M: MutexKind, |
| 188 | { | 165 | { |
| 189 | fn drop(&mut self) { | 166 | fn drop(&mut self) { |
| 190 | Channel::lock(self.channel_cell, |c| c.deregister_receiver()) | 167 | self.channel.lock(|c| c.deregister_receiver()) |
| 191 | } | 168 | } |
| 192 | } | 169 | } |
| 193 | 170 | ||
| 194 | pub struct RecvFuture<'ch, M, T, const N: usize> | 171 | pub struct RecvFuture<'ch, M, T, const N: usize> |
| 195 | where | 172 | where |
| 196 | M: Mutex<Data = ()>, | 173 | M: MutexKind, |
| 197 | { | 174 | { |
| 198 | channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>, | 175 | channel: &'ch Channel<M, T, N>, |
| 199 | } | 176 | } |
| 200 | 177 | ||
| 201 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> | 178 | impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> |
| 202 | where | 179 | where |
| 203 | M: Mutex<Data = ()>, | 180 | M: MutexKind, |
| 204 | { | 181 | { |
| 205 | type Output = Option<T>; | 182 | type Output = Option<T>; |
| 206 | 183 | ||
| 207 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { | 184 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
| 208 | Channel::lock(self.channel_cell, |c| { | 185 | self.channel |
| 209 | match c.try_recv_with_context(Some(cx)) { | 186 | .lock(|c| match c.try_recv_with_context(Some(cx)) { |
| 210 | Ok(v) => Poll::Ready(Some(v)), | 187 | Ok(v) => Poll::Ready(Some(v)), |
| 211 | Err(TryRecvError::Closed) => Poll::Ready(None), | 188 | Err(TryRecvError::Closed) => Poll::Ready(None), |
| 212 | Err(TryRecvError::Empty) => Poll::Pending, | 189 | Err(TryRecvError::Empty) => Poll::Pending, |
| 213 | } | 190 | }) |
| 214 | }) | ||
| 215 | } | 191 | } |
| 216 | } | 192 | } |
| 217 | 193 | ||
| 218 | // Safe to pass the receive future around since it locks channel whenever polled | ||
| 219 | unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where | ||
| 220 | M: Mutex<Data = ()> + Sync | ||
| 221 | { | ||
| 222 | } | ||
| 223 | unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where | ||
| 224 | M: Mutex<Data = ()> + Sync | ||
| 225 | { | ||
| 226 | } | ||
| 227 | |||
| 228 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> | 194 | impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> |
| 229 | where | 195 | where |
| 230 | M: Mutex<Data = ()>, | 196 | M: MutexKind, |
| 231 | { | 197 | { |
| 232 | /// Sends a value, waiting until there is capacity. | 198 | /// Sends a value, waiting until there is capacity. |
| 233 | /// | 199 | /// |
| @@ -249,7 +215,7 @@ where | |||
| 249 | /// [`Receiver`]: Receiver | 215 | /// [`Receiver`]: Receiver |
| 250 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { | 216 | pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { |
| 251 | SendFuture { | 217 | SendFuture { |
| 252 | sender: self.clone(), | 218 | channel: self.channel, |
| 253 | message: Some(message), | 219 | message: Some(message), |
| 254 | } | 220 | } |
| 255 | } | 221 | } |
| @@ -275,7 +241,7 @@ where | |||
| 275 | /// [`channel`]: channel | 241 | /// [`channel`]: channel |
| 276 | /// [`close`]: Receiver::close | 242 | /// [`close`]: Receiver::close |
| 277 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { | 243 | pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { |
| 278 | Channel::lock(self.channel_cell, |c| c.try_send(message)) | 244 | self.channel.lock(|c| c.try_send(message)) |
| 279 | } | 245 | } |
| 280 | 246 | ||
| 281 | /// Completes when the receiver has dropped. | 247 | /// Completes when the receiver has dropped. |
| @@ -284,7 +250,7 @@ where | |||
| 284 | /// values is canceled and immediately stop doing work. | 250 | /// values is canceled and immediately stop doing work. |
| 285 | pub async fn closed(&self) { | 251 | pub async fn closed(&self) { |
| 286 | CloseFuture { | 252 | CloseFuture { |
| 287 | sender: self.clone(), | 253 | channel: self.channel, |
| 288 | } | 254 | } |
| 289 | .await | 255 | .await |
| 290 | } | 256 | } |
| @@ -296,29 +262,27 @@ where | |||
| 296 | /// [`Receiver`]: crate::sync::mpsc::Receiver | 262 | /// [`Receiver`]: crate::sync::mpsc::Receiver |
| 297 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close | 263 | /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close |
| 298 | pub fn is_closed(&self) -> bool { | 264 | pub fn is_closed(&self) -> bool { |
| 299 | Channel::lock(self.channel_cell, |c| c.is_closed()) | 265 | self.channel.lock(|c| c.is_closed()) |
| 300 | } | 266 | } |
| 301 | } | 267 | } |
| 302 | 268 | ||
| 303 | pub struct SendFuture<'ch, M, T, const N: usize> | 269 | pub struct SendFuture<'ch, M, T, const N: usize> |
| 304 | where | 270 | where |
| 305 | M: Mutex<Data = ()>, | 271 | M: MutexKind, |
| 306 | { | 272 | { |
| 307 | sender: Sender<'ch, M, T, N>, | 273 | channel: &'ch Channel<M, T, N>, |
| 308 | message: Option<T>, | 274 | message: Option<T>, |
| 309 | } | 275 | } |
| 310 | 276 | ||
| 311 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> | 277 | impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N> |
| 312 | where | 278 | where |
| 313 | M: Mutex<Data = ()>, | 279 | M: MutexKind, |
| 314 | { | 280 | { |
| 315 | type Output = Result<(), SendError<T>>; | 281 | type Output = Result<(), SendError<T>>; |
| 316 | 282 | ||
| 317 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 283 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 318 | match self.message.take() { | 284 | match self.message.take() { |
| 319 | Some(m) => match Channel::lock(self.sender.channel_cell, |c| { | 285 | Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) { |
| 320 | c.try_send_with_context(m, Some(cx)) | ||
| 321 | }) { | ||
| 322 | Ok(..) => Poll::Ready(Ok(())), | 286 | Ok(..) => Poll::Ready(Ok(())), |
| 323 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), | 287 | Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))), |
| 324 | Err(TrySendError::Full(m)) => { | 288 | Err(TrySendError::Full(m)) => { |
| @@ -331,25 +295,23 @@ where | |||
| 331 | } | 295 | } |
| 332 | } | 296 | } |
| 333 | 297 | ||
| 334 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {} | 298 | impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: MutexKind {} |
| 335 | 299 | ||
| 336 | struct CloseFuture<'ch, M, T, const N: usize> | 300 | struct CloseFuture<'ch, M, T, const N: usize> |
| 337 | where | 301 | where |
| 338 | M: Mutex<Data = ()>, | 302 | M: MutexKind, |
| 339 | { | 303 | { |
| 340 | sender: Sender<'ch, M, T, N>, | 304 | channel: &'ch Channel<M, T, N>, |
| 341 | } | 305 | } |
| 342 | 306 | ||
| 343 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> | 307 | impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N> |
| 344 | where | 308 | where |
| 345 | M: Mutex<Data = ()>, | 309 | M: MutexKind, |
| 346 | { | 310 | { |
| 347 | type Output = (); | 311 | type Output = (); |
| 348 | 312 | ||
| 349 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 313 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 350 | if Channel::lock(self.sender.channel_cell, |c| { | 314 | if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) { |
| 351 | c.is_closed_with_context(Some(cx)) | ||
| 352 | }) { | ||
| 353 | Poll::Ready(()) | 315 | Poll::Ready(()) |
| 354 | } else { | 316 | } else { |
| 355 | Poll::Pending | 317 | Poll::Pending |
| @@ -359,22 +321,21 @@ where | |||
| 359 | 321 | ||
| 360 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> | 322 | impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N> |
| 361 | where | 323 | where |
| 362 | M: Mutex<Data = ()>, | 324 | M: MutexKind, |
| 363 | { | 325 | { |
| 364 | fn drop(&mut self) { | 326 | fn drop(&mut self) { |
| 365 | Channel::lock(self.channel_cell, |c| c.deregister_sender()) | 327 | self.channel.lock(|c| c.deregister_sender()) |
| 366 | } | 328 | } |
| 367 | } | 329 | } |
| 368 | 330 | ||
| 369 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> | 331 | impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N> |
| 370 | where | 332 | where |
| 371 | M: Mutex<Data = ()>, | 333 | M: MutexKind, |
| 372 | { | 334 | { |
| 373 | #[allow(clippy::clone_double_ref)] | ||
| 374 | fn clone(&self) -> Self { | 335 | fn clone(&self) -> Self { |
| 375 | Channel::lock(self.channel_cell, |c| c.register_sender()); | 336 | self.channel.lock(|c| c.register_sender()); |
| 376 | Sender { | 337 | Sender { |
| 377 | channel_cell: self.channel_cell.clone(), | 338 | channel: self.channel, |
| 378 | } | 339 | } |
| 379 | } | 340 | } |
| 380 | } | 341 | } |
| @@ -446,10 +407,7 @@ impl<T> defmt::Format for TrySendError<T> { | |||
| 446 | } | 407 | } |
| 447 | 408 | ||
| 448 | struct ChannelState<T, const N: usize> { | 409 | struct ChannelState<T, const N: usize> { |
| 449 | buf: [MaybeUninit<UnsafeCell<T>>; N], | 410 | queue: Deque<T, N>, |
| 450 | read_pos: usize, | ||
| 451 | write_pos: usize, | ||
| 452 | full: bool, | ||
| 453 | closed: bool, | 411 | closed: bool, |
| 454 | receiver_registered: bool, | 412 | receiver_registered: bool, |
| 455 | senders_registered: u32, | 413 | senders_registered: u32, |
| @@ -458,14 +416,9 @@ struct ChannelState<T, const N: usize> { | |||
| 458 | } | 416 | } |
| 459 | 417 | ||
| 460 | impl<T, const N: usize> ChannelState<T, N> { | 418 | impl<T, const N: usize> ChannelState<T, N> { |
| 461 | const INIT: MaybeUninit<UnsafeCell<T>> = MaybeUninit::uninit(); | ||
| 462 | |||
| 463 | const fn new() -> Self { | 419 | const fn new() -> Self { |
| 464 | ChannelState { | 420 | ChannelState { |
| 465 | buf: [Self::INIT; N], | 421 | queue: Deque::new(), |
| 466 | read_pos: 0, | ||
| 467 | write_pos: 0, | ||
| 468 | full: false, | ||
| 469 | closed: false, | 422 | closed: false, |
| 470 | receiver_registered: false, | 423 | receiver_registered: false, |
| 471 | senders_registered: 0, | 424 | senders_registered: 0, |
| @@ -479,17 +432,16 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 479 | } | 432 | } |
| 480 | 433 | ||
| 481 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { | 434 | fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { |
| 482 | if self.read_pos != self.write_pos || self.full { | 435 | if self.queue.is_full() { |
| 483 | if self.full { | 436 | self.senders_waker.wake(); |
| 484 | self.full = false; | 437 | } |
| 485 | self.senders_waker.wake(); | 438 | |
| 486 | } | 439 | if let Some(message) = self.queue.pop_front() { |
| 487 | let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() }; | ||
| 488 | self.read_pos = (self.read_pos + 1) % self.buf.len(); | ||
| 489 | Ok(message) | 440 | Ok(message) |
| 490 | } else if !self.closed { | 441 | } else if !self.closed { |
| 491 | cx.into_iter() | 442 | if let Some(cx) = cx { |
| 492 | .for_each(|cx| self.set_receiver_waker(&cx.waker())); | 443 | self.set_receiver_waker(cx.waker()); |
| 444 | } | ||
| 493 | Err(TryRecvError::Empty) | 445 | Err(TryRecvError::Empty) |
| 494 | } else { | 446 | } else { |
| 495 | Err(TryRecvError::Closed) | 447 | Err(TryRecvError::Closed) |
| @@ -505,22 +457,21 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 505 | message: T, | 457 | message: T, |
| 506 | cx: Option<&mut Context<'_>>, | 458 | cx: Option<&mut Context<'_>>, |
| 507 | ) -> Result<(), TrySendError<T>> { | 459 | ) -> Result<(), TrySendError<T>> { |
| 508 | if !self.closed { | 460 | if self.closed { |
| 509 | if !self.full { | 461 | return Err(TrySendError::Closed(message)); |
| 510 | self.buf[self.write_pos] = MaybeUninit::new(message.into()); | 462 | } |
| 511 | self.write_pos = (self.write_pos + 1) % self.buf.len(); | 463 | |
| 512 | if self.write_pos == self.read_pos { | 464 | match self.queue.push_back(message) { |
| 513 | self.full = true; | 465 | Ok(()) => { |
| 514 | } | ||
| 515 | self.receiver_waker.wake(); | 466 | self.receiver_waker.wake(); |
| 467 | |||
| 516 | Ok(()) | 468 | Ok(()) |
| 517 | } else { | 469 | } |
| 470 | Err(message) => { | ||
| 518 | cx.into_iter() | 471 | cx.into_iter() |
| 519 | .for_each(|cx| self.set_senders_waker(&cx.waker())); | 472 | .for_each(|cx| self.set_senders_waker(&cx.waker())); |
| 520 | Err(TrySendError::Full(message)) | 473 | Err(TrySendError::Full(message)) |
| 521 | } | 474 | } |
| 522 | } else { | ||
| 523 | Err(TrySendError::Closed(message)) | ||
| 524 | } | 475 | } |
| 525 | } | 476 | } |
| 526 | 477 | ||
| @@ -585,16 +536,6 @@ impl<T, const N: usize> ChannelState<T, N> { | |||
| 585 | } | 536 | } |
| 586 | } | 537 | } |
| 587 | 538 | ||
| 588 | impl<T, const N: usize> Drop for ChannelState<T, N> { | ||
| 589 | fn drop(&mut self) { | ||
| 590 | while self.read_pos != self.write_pos || self.full { | ||
| 591 | self.full = false; | ||
| 592 | unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) }; | ||
| 593 | self.read_pos = (self.read_pos + 1) % N; | ||
| 594 | } | ||
| 595 | } | ||
| 596 | } | ||
| 597 | |||
| 598 | /// A a bounded mpsc channel for communicating between asynchronous tasks | 539 | /// A a bounded mpsc channel for communicating between asynchronous tasks |
| 599 | /// with backpressure. | 540 | /// with backpressure. |
| 600 | /// | 541 | /// |
| @@ -605,61 +546,35 @@ impl<T, const N: usize> Drop for ChannelState<T, N> { | |||
| 605 | /// All data sent will become available in the same order as it was sent. | 546 | /// All data sent will become available in the same order as it was sent. |
| 606 | pub struct Channel<M, T, const N: usize> | 547 | pub struct Channel<M, T, const N: usize> |
| 607 | where | 548 | where |
| 608 | M: Mutex<Data = ()>, | 549 | M: MutexKind, |
| 609 | { | 550 | { |
| 610 | channel_cell: UnsafeCell<ChannelCell<M, T, N>>, | 551 | inner: M::Mutex<RefCell<ChannelState<T, N>>>, |
| 611 | receiver_consumed: PhantomData<()>, | ||
| 612 | } | 552 | } |
| 613 | 553 | ||
| 614 | struct ChannelCell<M, T, const N: usize> | ||
| 615 | where | ||
| 616 | M: Mutex<Data = ()>, | ||
| 617 | { | ||
| 618 | mutex: M, | ||
| 619 | state: ChannelState<T, N>, | ||
| 620 | } | ||
| 621 | |||
| 622 | pub type WithCriticalSections = CriticalSectionMutex<()>; | ||
| 623 | |||
| 624 | pub type WithThreadModeOnly = ThreadModeMutex<()>; | ||
| 625 | |||
| 626 | pub type WithNoThreads = NoopMutex<()>; | ||
| 627 | |||
| 628 | impl<M, T, const N: usize> Channel<M, T, N> | 554 | impl<M, T, const N: usize> Channel<M, T, N> |
| 629 | where | 555 | where |
| 630 | M: Mutex<Data = ()>, | 556 | M: MutexKind, |
| 631 | { | 557 | { |
| 632 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: | 558 | /// Establish a new bounded channel. For example, to create one with a NoopMutex: |
| 633 | /// | 559 | /// |
| 634 | /// ``` | 560 | /// ``` |
| 635 | /// use embassy::channel::mpsc; | 561 | /// use embassy::channel::mpsc; |
| 636 | /// use embassy::channel::mpsc::{Channel, WithNoThreads}; | 562 | /// use embassy::blocking_mutex::kind::Noop; |
| 563 | /// use embassy::channel::mpsc::Channel; | ||
| 637 | /// | 564 | /// |
| 638 | /// // Declare a bounded channel of 3 u32s. | 565 | /// // Declare a bounded channel of 3 u32s. |
| 639 | /// let mut channel = Channel::<WithNoThreads, u32, 3>::new(); | 566 | /// let mut channel = Channel::<Noop, u32, 3>::new(); |
| 640 | /// // once we have a channel, obtain its sender and receiver | 567 | /// // once we have a channel, obtain its sender and receiver |
| 641 | /// let (sender, receiver) = mpsc::split(&mut channel); | 568 | /// let (sender, receiver) = mpsc::split(&mut channel); |
| 642 | /// ``` | 569 | /// ``` |
| 643 | pub fn new() -> Self { | 570 | pub fn new() -> Self { |
| 644 | let mutex = M::new(()); | 571 | Self { |
| 645 | let state = ChannelState::new(); | 572 | inner: M::Mutex::new(RefCell::new(ChannelState::new())), |
| 646 | let channel_cell = ChannelCell { mutex, state }; | ||
| 647 | Channel { | ||
| 648 | channel_cell: UnsafeCell::new(channel_cell), | ||
| 649 | receiver_consumed: PhantomData, | ||
| 650 | } | 573 | } |
| 651 | } | 574 | } |
| 652 | 575 | ||
| 653 | fn lock<R>( | 576 | fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R { |
| 654 | channel_cell: &UnsafeCell<ChannelCell<M, T, N>>, | 577 | self.inner.lock(|rc| f(&mut *rc.borrow_mut())) |
| 655 | f: impl FnOnce(&mut ChannelState<T, N>) -> R, | ||
| 656 | ) -> R { | ||
| 657 | unsafe { | ||
| 658 | let channel_cell = &mut *(channel_cell.get()); | ||
| 659 | let mutex = &mut channel_cell.mutex; | ||
| 660 | let mut state = &mut channel_cell.state; | ||
| 661 | mutex.lock(|_| f(&mut state)) | ||
| 662 | } | ||
| 663 | } | 578 | } |
| 664 | } | 579 | } |
| 665 | 580 | ||
| @@ -671,20 +586,13 @@ mod tests { | |||
| 671 | use futures_executor::ThreadPool; | 586 | use futures_executor::ThreadPool; |
| 672 | use futures_timer::Delay; | 587 | use futures_timer::Delay; |
| 673 | 588 | ||
| 589 | use crate::blocking_mutex::kind::{CriticalSection, Noop}; | ||
| 674 | use crate::util::Forever; | 590 | use crate::util::Forever; |
| 675 | 591 | ||
| 676 | use super::*; | 592 | use super::*; |
| 677 | 593 | ||
| 678 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { | 594 | fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { |
| 679 | if !c.full { | 595 | c.queue.capacity() - c.queue.len() |
| 680 | if c.write_pos > c.read_pos { | ||
| 681 | (c.buf.len() - c.write_pos) + c.read_pos | ||
| 682 | } else { | ||
| 683 | (c.buf.len() - c.read_pos) + c.write_pos | ||
| 684 | } | ||
| 685 | } else { | ||
| 686 | 0 | ||
| 687 | } | ||
| 688 | } | 596 | } |
| 689 | 597 | ||
| 690 | #[test] | 598 | #[test] |
| @@ -747,7 +655,7 @@ mod tests { | |||
| 747 | 655 | ||
| 748 | #[test] | 656 | #[test] |
| 749 | fn simple_send_and_receive() { | 657 | fn simple_send_and_receive() { |
| 750 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 658 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 751 | let (s, r) = split(&mut c); | 659 | let (s, r) = split(&mut c); |
| 752 | assert!(s.clone().try_send(1).is_ok()); | 660 | assert!(s.clone().try_send(1).is_ok()); |
| 753 | assert_eq!(r.try_recv().unwrap(), 1); | 661 | assert_eq!(r.try_recv().unwrap(), 1); |
| @@ -755,7 +663,7 @@ mod tests { | |||
| 755 | 663 | ||
| 756 | #[test] | 664 | #[test] |
| 757 | fn should_close_without_sender() { | 665 | fn should_close_without_sender() { |
| 758 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 666 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 759 | let (s, r) = split(&mut c); | 667 | let (s, r) = split(&mut c); |
| 760 | drop(s); | 668 | drop(s); |
| 761 | match r.try_recv() { | 669 | match r.try_recv() { |
| @@ -766,7 +674,7 @@ mod tests { | |||
| 766 | 674 | ||
| 767 | #[test] | 675 | #[test] |
| 768 | fn should_close_once_drained() { | 676 | fn should_close_once_drained() { |
| 769 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 677 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 770 | let (s, r) = split(&mut c); | 678 | let (s, r) = split(&mut c); |
| 771 | assert!(s.try_send(1).is_ok()); | 679 | assert!(s.try_send(1).is_ok()); |
| 772 | drop(s); | 680 | drop(s); |
| @@ -779,7 +687,7 @@ mod tests { | |||
| 779 | 687 | ||
| 780 | #[test] | 688 | #[test] |
| 781 | fn should_reject_send_when_receiver_dropped() { | 689 | fn should_reject_send_when_receiver_dropped() { |
| 782 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 690 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 783 | let (s, r) = split(&mut c); | 691 | let (s, r) = split(&mut c); |
| 784 | drop(r); | 692 | drop(r); |
| 785 | match s.try_send(1) { | 693 | match s.try_send(1) { |
| @@ -790,7 +698,7 @@ mod tests { | |||
| 790 | 698 | ||
| 791 | #[test] | 699 | #[test] |
| 792 | fn should_reject_send_when_channel_closed() { | 700 | fn should_reject_send_when_channel_closed() { |
| 793 | let mut c = Channel::<WithNoThreads, u32, 3>::new(); | 701 | let mut c = Channel::<Noop, u32, 3>::new(); |
| 794 | let (s, mut r) = split(&mut c); | 702 | let (s, mut r) = split(&mut c); |
| 795 | assert!(s.try_send(1).is_ok()); | 703 | assert!(s.try_send(1).is_ok()); |
| 796 | r.close(); | 704 | r.close(); |
| @@ -806,7 +714,7 @@ mod tests { | |||
| 806 | async fn receiver_closes_when_sender_dropped_async() { | 714 | async fn receiver_closes_when_sender_dropped_async() { |
| 807 | let executor = ThreadPool::new().unwrap(); | 715 | let executor = ThreadPool::new().unwrap(); |
| 808 | 716 | ||
| 809 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); | 717 | static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new(); |
| 810 | let c = CHANNEL.put(Channel::new()); | 718 | let c = CHANNEL.put(Channel::new()); |
| 811 | let (s, mut r) = split(c); | 719 | let (s, mut r) = split(c); |
| 812 | assert!(executor | 720 | assert!(executor |
| @@ -821,7 +729,7 @@ mod tests { | |||
| 821 | async fn receiver_receives_given_try_send_async() { | 729 | async fn receiver_receives_given_try_send_async() { |
| 822 | let executor = ThreadPool::new().unwrap(); | 730 | let executor = ThreadPool::new().unwrap(); |
| 823 | 731 | ||
| 824 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new(); | 732 | static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new(); |
| 825 | let c = CHANNEL.put(Channel::new()); | 733 | let c = CHANNEL.put(Channel::new()); |
| 826 | let (s, mut r) = split(c); | 734 | let (s, mut r) = split(c); |
| 827 | assert!(executor | 735 | assert!(executor |
| @@ -834,7 +742,7 @@ mod tests { | |||
| 834 | 742 | ||
| 835 | #[futures_test::test] | 743 | #[futures_test::test] |
| 836 | async fn sender_send_completes_if_capacity() { | 744 | async fn sender_send_completes_if_capacity() { |
| 837 | let mut c = Channel::<WithCriticalSections, u32, 1>::new(); | 745 | let mut c = Channel::<CriticalSection, u32, 1>::new(); |
| 838 | let (s, mut r) = split(&mut c); | 746 | let (s, mut r) = split(&mut c); |
| 839 | assert!(s.send(1).await.is_ok()); | 747 | assert!(s.send(1).await.is_ok()); |
| 840 | assert_eq!(r.recv().await, Some(1)); | 748 | assert_eq!(r.recv().await, Some(1)); |
| @@ -842,7 +750,7 @@ mod tests { | |||
| 842 | 750 | ||
| 843 | #[futures_test::test] | 751 | #[futures_test::test] |
| 844 | async fn sender_send_completes_if_closed() { | 752 | async fn sender_send_completes_if_closed() { |
| 845 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 753 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 846 | let c = CHANNEL.put(Channel::new()); | 754 | let c = CHANNEL.put(Channel::new()); |
| 847 | let (s, r) = split(c); | 755 | let (s, r) = split(c); |
| 848 | drop(r); | 756 | drop(r); |
| @@ -856,7 +764,7 @@ mod tests { | |||
| 856 | async fn senders_sends_wait_until_capacity() { | 764 | async fn senders_sends_wait_until_capacity() { |
| 857 | let executor = ThreadPool::new().unwrap(); | 765 | let executor = ThreadPool::new().unwrap(); |
| 858 | 766 | ||
| 859 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 767 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 860 | let c = CHANNEL.put(Channel::new()); | 768 | let c = CHANNEL.put(Channel::new()); |
| 861 | let (s0, mut r) = split(c); | 769 | let (s0, mut r) = split(c); |
| 862 | assert!(s0.try_send(1).is_ok()); | 770 | assert!(s0.try_send(1).is_ok()); |
| @@ -876,7 +784,7 @@ mod tests { | |||
| 876 | 784 | ||
| 877 | #[futures_test::test] | 785 | #[futures_test::test] |
| 878 | async fn sender_close_completes_if_closing() { | 786 | async fn sender_close_completes_if_closing() { |
| 879 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 787 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 880 | let c = CHANNEL.put(Channel::new()); | 788 | let c = CHANNEL.put(Channel::new()); |
| 881 | let (s, mut r) = split(c); | 789 | let (s, mut r) = split(c); |
| 882 | r.close(); | 790 | r.close(); |
| @@ -885,7 +793,7 @@ mod tests { | |||
| 885 | 793 | ||
| 886 | #[futures_test::test] | 794 | #[futures_test::test] |
| 887 | async fn sender_close_completes_if_closed() { | 795 | async fn sender_close_completes_if_closed() { |
| 888 | static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new(); | 796 | static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new(); |
| 889 | let c = CHANNEL.put(Channel::new()); | 797 | let c = CHANNEL.put(Channel::new()); |
| 890 | let (s, r) = split(c); | 798 | let (s, r) = split(c); |
| 891 | drop(r); | 799 | drop(r); |
diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index 79fa3dfb9..c85b7c282 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs | |||
| @@ -6,7 +6,8 @@ | |||
| 6 | mod example_common; | 6 | mod example_common; |
| 7 | 7 | ||
| 8 | use defmt::unwrap; | 8 | use defmt::unwrap; |
| 9 | use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError, WithNoThreads}; | 9 | use embassy::blocking_mutex::kind::Noop; |
| 10 | use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError}; | ||
| 10 | use embassy::executor::Spawner; | 11 | use embassy::executor::Spawner; |
| 11 | use embassy::time::{Duration, Timer}; | 12 | use embassy::time::{Duration, Timer}; |
| 12 | use embassy::util::Forever; | 13 | use embassy::util::Forever; |
| @@ -19,10 +20,10 @@ enum LedState { | |||
| 19 | Off, | 20 | Off, |
| 20 | } | 21 | } |
| 21 | 22 | ||
| 22 | static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new(); | 23 | static CHANNEL: Forever<Channel<Noop, LedState, 1>> = Forever::new(); |
| 23 | 24 | ||
| 24 | #[embassy::task(pool_size = 1)] | 25 | #[embassy::task(pool_size = 1)] |
| 25 | async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { | 26 | async fn my_task(sender: Sender<'static, Noop, LedState, 1>) { |
| 26 | loop { | 27 | loop { |
| 27 | let _ = sender.send(LedState::On).await; | 28 | let _ = sender.send(LedState::On).await; |
| 28 | Timer::after(Duration::from_secs(1)).await; | 29 | Timer::after(Duration::from_secs(1)).await; |
