aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-23 20:55:50 +0100
committerdiogo464 <[email protected]>2025-07-23 20:55:50 +0100
commitb13fedfaf55eaa7716195d11d0fb12041ed8dfb5 (patch)
tree88e735c354c98731bd1400fc05dce4f0cd7a0ad0 /src
parentbe4e94f257a1e0a033a3e85c365a7a868965f4c5 (diff)
added concurrency limit via OAR_P2P_CONCURRENCY_LIMIT
the env var OAR_P2P_CONCURRENCY_LIMIT limits the number of parallel "operations" being done on the cluster machines. so, if it is set to 3, then we only work on 3 machines at time. setting to 0 means unlimited.
Diffstat (limited to 'src')
-rw-r--r--src/machine.rs54
-rw-r--r--src/main.rs50
2 files changed, 69 insertions, 35 deletions
diff --git a/src/machine.rs b/src/machine.rs
index 958a03c..ce5068f 100644
--- a/src/machine.rs
+++ b/src/machine.rs
@@ -1,5 +1,8 @@
1use std::sync::Arc;
2
1use eyre::{Context as _, Result}; 3use eyre::{Context as _, Result};
2use futures::{StreamExt as _, stream::FuturesUnordered}; 4use futures::{StreamExt as _, stream::FuturesUnordered};
5use tokio::sync::Semaphore;
3 6
4macro_rules! define_machines { 7macro_rules! define_machines {
5 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { 8 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => {
@@ -141,23 +144,64 @@ define_machines!(
141 (Vulpix1, 56, "vulpix-1", 112, todo!()) 144 (Vulpix1, 56, "vulpix-1", 112, todo!())
142); 145);
143 146
144pub async fn for_each<F, FUT, RET>(machines: impl IntoIterator<Item = &Machine>, f: F) -> Result<()> 147pub async fn for_each<F, FUT, RET>(
148 machines: impl IntoIterator<Item = &Machine>,
149 f: F,
150) -> Result<Vec<(Machine, RET)>>
151where
152 F: Fn(Machine) -> FUT,
153 RET: Send + 'static,
154 FUT: std::future::Future<Output = Result<RET>>,
155{
156 let limit = match std::env::var("OAR_P2P_CONCURRENCY_LIMIT") {
157 Ok(value) => {
158 tracing::trace!("parsing concurrency limit value '{value}'");
159 let limit = value
160 .parse()
161 .expect("invalid value for OAR_P2P_CONCURRENCY_LIMIT");
162 tracing::debug!("using concurrency limit = {limit}");
163 limit
164 }
165 Err(_) => 0,
166 };
167 for_each_with_limit(machines, limit, f).await
168}
169
170pub async fn for_each_with_limit<F, FUT, RET>(
171 machines: impl IntoIterator<Item = &Machine>,
172 limit: usize,
173 f: F,
174) -> Result<Vec<(Machine, RET)>>
145where 175where
146 F: Fn(Machine) -> FUT, 176 F: Fn(Machine) -> FUT,
177 RET: Send + 'static,
147 FUT: std::future::Future<Output = Result<RET>>, 178 FUT: std::future::Future<Output = Result<RET>>,
148{ 179{
180 let sem = Arc::new(Semaphore::new(if limit == 0 {
181 Semaphore::MAX_PERMITS
182 } else {
183 limit
184 }));
149 let mut futures = FuturesUnordered::new(); 185 let mut futures = FuturesUnordered::new();
150 186
151 for &machine in machines { 187 for &machine in machines {
152 let fut = f(machine); 188 let fut = f(machine);
153 let fut = async move { (machine, fut.await) }; 189 let sem = sem.clone();
190 let fut = async move {
191 let _permit = sem.acquire().await.unwrap();
192 (machine, fut.await)
193 };
154 futures.push(fut); 194 futures.push(fut);
155 } 195 }
156 196
197 let mut results = Vec::default();
157 while let Some((machine, result)) = futures.next().await { 198 while let Some((machine, result)) = futures.next().await {
158 if let Err(err) = result { 199 match result {
159 return Err(err).with_context(|| format!("running task on machine {machine}")); 200 Ok(value) => results.push((machine, value)),
201 Err(err) => {
202 return Err(err).with_context(|| format!("running task on machine {machine}"));
203 }
160 } 204 }
161 } 205 }
162 Ok(()) 206 Ok(results)
163} 207}
diff --git a/src/main.rs b/src/main.rs
index cc8afd0..68cbbdd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -15,7 +15,6 @@ use serde::Deserialize;
15use tokio::{ 15use tokio::{
16 io::{AsyncReadExt as _, AsyncWriteExt as _}, 16 io::{AsyncReadExt as _, AsyncWriteExt as _},
17 process::Command, 17 process::Command,
18 task::JoinSet,
19}; 18};
20 19
21use crate::{ 20use crate::{
@@ -253,14 +252,14 @@ async fn cmd_net_down(args: NetDownArgs) -> Result<()> {
253async fn cmd_net_show(args: NetShowArgs) -> Result<()> { 252async fn cmd_net_show(args: NetShowArgs) -> Result<()> {
254 let context = context_from_common(&args.common).await?; 253 let context = context_from_common(&args.common).await?;
255 let machines = oar::job_list_machines(&context).await?; 254 let machines = oar::job_list_machines(&context).await?;
256 let mut set = JoinSet::default(); 255 let results = machine::for_each(machines.iter(), |machine| {
257 for machine in machines {
258 let context = context.clone(); 256 let context = context.clone();
259 set.spawn(async move { (machine, machine_list_addresses(&context, machine).await) }); 257 async move { machine_list_addresses(&context, machine).await }
260 } 258 })
259 .await?;
260
261 let mut addresses = Vec::default(); 261 let mut addresses = Vec::default();
262 for (machine, result) in set.join_all().await { 262 for (machine, addrs) in results {
263 let addrs = result?;
264 for addr in addrs { 263 for addr in addrs {
265 addresses.push((machine, addr)); 264 addresses.push((machine, addr));
266 } 265 }
@@ -625,44 +624,35 @@ async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()>
625#[tracing::instrument(ret, err, skip_all)] 624#[tracing::instrument(ret, err, skip_all)]
626async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> { 625async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> {
627 tracing::info!("cleaning machines: {machines:?}"); 626 tracing::info!("cleaning machines: {machines:?}");
628 let mut set = JoinSet::default(); 627 machine::for_each(machines, |machine| {
629 for &machine in machines {
630 let ctx = ctx.clone(); 628 let ctx = ctx.clone();
631 set.spawn(async move { machine_clean(&ctx, machine).await }); 629 async move { machine_clean(&ctx, machine).await }
632 } 630 })
633 let results = set.join_all().await; 631 .await?;
634 for result in results {
635 result?;
636 }
637 Ok(()) 632 Ok(())
638} 633}
639 634
640#[tracing::instrument(ret, err, skip_all)] 635#[tracing::instrument(ret, err, skip_all)]
641async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> { 636async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> {
642 tracing::info!("building networking container for machines: {machines:?}"); 637 tracing::info!("building networking container for machines: {machines:?}");
643 let mut set = JoinSet::default(); 638 machine::for_each(machines, |machine| {
644 for &machine in machines {
645 let ctx = ctx.clone(); 639 let ctx = ctx.clone();
646 set.spawn(async move { machine_net_container_build(&ctx, machine).await }); 640 async move { machine_net_container_build(&ctx, machine).await }
647 } 641 })
648 for result in set.join_all().await { 642 .await?;
649 result?;
650 }
651 Ok(()) 643 Ok(())
652} 644}
653 645
654#[tracing::instrument(ret, err, skip_all)] 646#[tracing::instrument(ret, err, skip_all)]
655async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> { 647async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> {
656 tracing::info!("configuring machines"); 648 tracing::info!("configuring machines");
657 let mut set = JoinSet::default(); 649 let machines = configs.iter().map(|c| &c.machine);
658 for config in configs { 650 machine::for_each(machines, |machine| {
659 let ctx = ctx.clone(); 651 let ctx = ctx.clone();
660 let config = config.clone(); 652 let config = configs.iter().find(|c| c.machine == machine).unwrap();
661 set.spawn(async move { machine_configure(&ctx, &config).await }); 653 async move { machine_configure(&ctx, &config).await }
662 } 654 })
663 for result in set.join_all().await { 655 .await?;
664 result?;
665 }
666 Ok(()) 656 Ok(())
667} 657}
668 658