diff options
| author | diogo464 <[email protected]> | 2025-07-23 20:55:50 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-23 20:55:50 +0100 |
| commit | b13fedfaf55eaa7716195d11d0fb12041ed8dfb5 (patch) | |
| tree | 88e735c354c98731bd1400fc05dce4f0cd7a0ad0 /src/machine.rs | |
| parent | be4e94f257a1e0a033a3e85c365a7a868965f4c5 (diff) | |
added concurrency limit via OAR_P2P_CONCURRENCY_LIMIT
the env var OAR_P2P_CONCURRENCY_LIMIT limits the number of parallel
"operations" being done on the cluster machines. so, if it is set to 3,
then we only work on 3 machines at time. setting to 0 means unlimited.
Diffstat (limited to 'src/machine.rs')
| -rw-r--r-- | src/machine.rs | 54 |
1 files changed, 49 insertions, 5 deletions
diff --git a/src/machine.rs b/src/machine.rs index 958a03c..ce5068f 100644 --- a/src/machine.rs +++ b/src/machine.rs | |||
| @@ -1,5 +1,8 @@ | |||
| 1 | use std::sync::Arc; | ||
| 2 | |||
| 1 | use eyre::{Context as _, Result}; | 3 | use eyre::{Context as _, Result}; |
| 2 | use futures::{StreamExt as _, stream::FuturesUnordered}; | 4 | use futures::{StreamExt as _, stream::FuturesUnordered}; |
| 5 | use tokio::sync::Semaphore; | ||
| 3 | 6 | ||
| 4 | macro_rules! define_machines { | 7 | macro_rules! define_machines { |
| 5 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { | 8 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { |
| @@ -141,23 +144,64 @@ define_machines!( | |||
| 141 | (Vulpix1, 56, "vulpix-1", 112, todo!()) | 144 | (Vulpix1, 56, "vulpix-1", 112, todo!()) |
| 142 | ); | 145 | ); |
| 143 | 146 | ||
| 144 | pub async fn for_each<F, FUT, RET>(machines: impl IntoIterator<Item = &Machine>, f: F) -> Result<()> | 147 | pub async fn for_each<F, FUT, RET>( |
| 148 | machines: impl IntoIterator<Item = &Machine>, | ||
| 149 | f: F, | ||
| 150 | ) -> Result<Vec<(Machine, RET)>> | ||
| 151 | where | ||
| 152 | F: Fn(Machine) -> FUT, | ||
| 153 | RET: Send + 'static, | ||
| 154 | FUT: std::future::Future<Output = Result<RET>>, | ||
| 155 | { | ||
| 156 | let limit = match std::env::var("OAR_P2P_CONCURRENCY_LIMIT") { | ||
| 157 | Ok(value) => { | ||
| 158 | tracing::trace!("parsing concurrency limit value '{value}'"); | ||
| 159 | let limit = value | ||
| 160 | .parse() | ||
| 161 | .expect("invalid value for OAR_P2P_CONCURRENCY_LIMIT"); | ||
| 162 | tracing::debug!("using concurrency limit = {limit}"); | ||
| 163 | limit | ||
| 164 | } | ||
| 165 | Err(_) => 0, | ||
| 166 | }; | ||
| 167 | for_each_with_limit(machines, limit, f).await | ||
| 168 | } | ||
| 169 | |||
| 170 | pub async fn for_each_with_limit<F, FUT, RET>( | ||
| 171 | machines: impl IntoIterator<Item = &Machine>, | ||
| 172 | limit: usize, | ||
| 173 | f: F, | ||
| 174 | ) -> Result<Vec<(Machine, RET)>> | ||
| 145 | where | 175 | where |
| 146 | F: Fn(Machine) -> FUT, | 176 | F: Fn(Machine) -> FUT, |
| 177 | RET: Send + 'static, | ||
| 147 | FUT: std::future::Future<Output = Result<RET>>, | 178 | FUT: std::future::Future<Output = Result<RET>>, |
| 148 | { | 179 | { |
| 180 | let sem = Arc::new(Semaphore::new(if limit == 0 { | ||
| 181 | Semaphore::MAX_PERMITS | ||
| 182 | } else { | ||
| 183 | limit | ||
| 184 | })); | ||
| 149 | let mut futures = FuturesUnordered::new(); | 185 | let mut futures = FuturesUnordered::new(); |
| 150 | 186 | ||
| 151 | for &machine in machines { | 187 | for &machine in machines { |
| 152 | let fut = f(machine); | 188 | let fut = f(machine); |
| 153 | let fut = async move { (machine, fut.await) }; | 189 | let sem = sem.clone(); |
| 190 | let fut = async move { | ||
| 191 | let _permit = sem.acquire().await.unwrap(); | ||
| 192 | (machine, fut.await) | ||
| 193 | }; | ||
| 154 | futures.push(fut); | 194 | futures.push(fut); |
| 155 | } | 195 | } |
| 156 | 196 | ||
| 197 | let mut results = Vec::default(); | ||
| 157 | while let Some((machine, result)) = futures.next().await { | 198 | while let Some((machine, result)) = futures.next().await { |
| 158 | if let Err(err) = result { | 199 | match result { |
| 159 | return Err(err).with_context(|| format!("running task on machine {machine}")); | 200 | Ok(value) => results.push((machine, value)), |
| 201 | Err(err) => { | ||
| 202 | return Err(err).with_context(|| format!("running task on machine {machine}")); | ||
| 203 | } | ||
| 160 | } | 204 | } |
| 161 | } | 205 | } |
| 162 | Ok(()) | 206 | Ok(results) |
| 163 | } | 207 | } |
