aboutsummaryrefslogtreecommitdiff
path: root/src/machine.rs
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-23 20:55:50 +0100
committerdiogo464 <[email protected]>2025-07-23 20:55:50 +0100
commitb13fedfaf55eaa7716195d11d0fb12041ed8dfb5 (patch)
tree88e735c354c98731bd1400fc05dce4f0cd7a0ad0 /src/machine.rs
parentbe4e94f257a1e0a033a3e85c365a7a868965f4c5 (diff)
added concurrency limit via OAR_P2P_CONCURRENCY_LIMIT
the env var OAR_P2P_CONCURRENCY_LIMIT limits the number of parallel "operations" being done on the cluster machines. so, if it is set to 3, then we only work on 3 machines at time. setting to 0 means unlimited.
Diffstat (limited to 'src/machine.rs')
-rw-r--r--src/machine.rs54
1 files changed, 49 insertions, 5 deletions
diff --git a/src/machine.rs b/src/machine.rs
index 958a03c..ce5068f 100644
--- a/src/machine.rs
+++ b/src/machine.rs
@@ -1,5 +1,8 @@
1use std::sync::Arc;
2
1use eyre::{Context as _, Result}; 3use eyre::{Context as _, Result};
2use futures::{StreamExt as _, stream::FuturesUnordered}; 4use futures::{StreamExt as _, stream::FuturesUnordered};
5use tokio::sync::Semaphore;
3 6
4macro_rules! define_machines { 7macro_rules! define_machines {
5 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { 8 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => {
@@ -141,23 +144,64 @@ define_machines!(
141 (Vulpix1, 56, "vulpix-1", 112, todo!()) 144 (Vulpix1, 56, "vulpix-1", 112, todo!())
142); 145);
143 146
144pub async fn for_each<F, FUT, RET>(machines: impl IntoIterator<Item = &Machine>, f: F) -> Result<()> 147pub async fn for_each<F, FUT, RET>(
148 machines: impl IntoIterator<Item = &Machine>,
149 f: F,
150) -> Result<Vec<(Machine, RET)>>
151where
152 F: Fn(Machine) -> FUT,
153 RET: Send + 'static,
154 FUT: std::future::Future<Output = Result<RET>>,
155{
156 let limit = match std::env::var("OAR_P2P_CONCURRENCY_LIMIT") {
157 Ok(value) => {
158 tracing::trace!("parsing concurrency limit value '{value}'");
159 let limit = value
160 .parse()
161 .expect("invalid value for OAR_P2P_CONCURRENCY_LIMIT");
162 tracing::debug!("using concurrency limit = {limit}");
163 limit
164 }
165 Err(_) => 0,
166 };
167 for_each_with_limit(machines, limit, f).await
168}
169
170pub async fn for_each_with_limit<F, FUT, RET>(
171 machines: impl IntoIterator<Item = &Machine>,
172 limit: usize,
173 f: F,
174) -> Result<Vec<(Machine, RET)>>
145where 175where
146 F: Fn(Machine) -> FUT, 176 F: Fn(Machine) -> FUT,
177 RET: Send + 'static,
147 FUT: std::future::Future<Output = Result<RET>>, 178 FUT: std::future::Future<Output = Result<RET>>,
148{ 179{
180 let sem = Arc::new(Semaphore::new(if limit == 0 {
181 Semaphore::MAX_PERMITS
182 } else {
183 limit
184 }));
149 let mut futures = FuturesUnordered::new(); 185 let mut futures = FuturesUnordered::new();
150 186
151 for &machine in machines { 187 for &machine in machines {
152 let fut = f(machine); 188 let fut = f(machine);
153 let fut = async move { (machine, fut.await) }; 189 let sem = sem.clone();
190 let fut = async move {
191 let _permit = sem.acquire().await.unwrap();
192 (machine, fut.await)
193 };
154 futures.push(fut); 194 futures.push(fut);
155 } 195 }
156 196
197 let mut results = Vec::default();
157 while let Some((machine, result)) = futures.next().await { 198 while let Some((machine, result)) = futures.next().await {
158 if let Err(err) = result { 199 match result {
159 return Err(err).with_context(|| format!("running task on machine {machine}")); 200 Ok(value) => results.push((machine, value)),
201 Err(err) => {
202 return Err(err).with_context(|| format!("running task on machine {machine}"));
203 }
160 } 204 }
161 } 205 }
162 Ok(()) 206 Ok(results)
163} 207}