diff options
| author | diogo464 <[email protected]> | 2025-07-23 20:55:50 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-23 20:55:50 +0100 |
| commit | b13fedfaf55eaa7716195d11d0fb12041ed8dfb5 (patch) | |
| tree | 88e735c354c98731bd1400fc05dce4f0cd7a0ad0 /src | |
| parent | be4e94f257a1e0a033a3e85c365a7a868965f4c5 (diff) | |
added concurrency limit via OAR_P2P_CONCURRENCY_LIMIT
the env var OAR_P2P_CONCURRENCY_LIMIT limits the number of parallel
"operations" being done on the cluster machines. so, if it is set to 3,
then we only work on 3 machines at time. setting to 0 means unlimited.
Diffstat (limited to 'src')
| -rw-r--r-- | src/machine.rs | 54 | ||||
| -rw-r--r-- | src/main.rs | 50 |
2 files changed, 69 insertions, 35 deletions
diff --git a/src/machine.rs b/src/machine.rs index 958a03c..ce5068f 100644 --- a/src/machine.rs +++ b/src/machine.rs | |||
| @@ -1,5 +1,8 @@ | |||
| 1 | use std::sync::Arc; | ||
| 2 | |||
| 1 | use eyre::{Context as _, Result}; | 3 | use eyre::{Context as _, Result}; |
| 2 | use futures::{StreamExt as _, stream::FuturesUnordered}; | 4 | use futures::{StreamExt as _, stream::FuturesUnordered}; |
| 5 | use tokio::sync::Semaphore; | ||
| 3 | 6 | ||
| 4 | macro_rules! define_machines { | 7 | macro_rules! define_machines { |
| 5 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { | 8 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { |
| @@ -141,23 +144,64 @@ define_machines!( | |||
| 141 | (Vulpix1, 56, "vulpix-1", 112, todo!()) | 144 | (Vulpix1, 56, "vulpix-1", 112, todo!()) |
| 142 | ); | 145 | ); |
| 143 | 146 | ||
| 144 | pub async fn for_each<F, FUT, RET>(machines: impl IntoIterator<Item = &Machine>, f: F) -> Result<()> | 147 | pub async fn for_each<F, FUT, RET>( |
| 148 | machines: impl IntoIterator<Item = &Machine>, | ||
| 149 | f: F, | ||
| 150 | ) -> Result<Vec<(Machine, RET)>> | ||
| 151 | where | ||
| 152 | F: Fn(Machine) -> FUT, | ||
| 153 | RET: Send + 'static, | ||
| 154 | FUT: std::future::Future<Output = Result<RET>>, | ||
| 155 | { | ||
| 156 | let limit = match std::env::var("OAR_P2P_CONCURRENCY_LIMIT") { | ||
| 157 | Ok(value) => { | ||
| 158 | tracing::trace!("parsing concurrency limit value '{value}'"); | ||
| 159 | let limit = value | ||
| 160 | .parse() | ||
| 161 | .expect("invalid value for OAR_P2P_CONCURRENCY_LIMIT"); | ||
| 162 | tracing::debug!("using concurrency limit = {limit}"); | ||
| 163 | limit | ||
| 164 | } | ||
| 165 | Err(_) => 0, | ||
| 166 | }; | ||
| 167 | for_each_with_limit(machines, limit, f).await | ||
| 168 | } | ||
| 169 | |||
| 170 | pub async fn for_each_with_limit<F, FUT, RET>( | ||
| 171 | machines: impl IntoIterator<Item = &Machine>, | ||
| 172 | limit: usize, | ||
| 173 | f: F, | ||
| 174 | ) -> Result<Vec<(Machine, RET)>> | ||
| 145 | where | 175 | where |
| 146 | F: Fn(Machine) -> FUT, | 176 | F: Fn(Machine) -> FUT, |
| 177 | RET: Send + 'static, | ||
| 147 | FUT: std::future::Future<Output = Result<RET>>, | 178 | FUT: std::future::Future<Output = Result<RET>>, |
| 148 | { | 179 | { |
| 180 | let sem = Arc::new(Semaphore::new(if limit == 0 { | ||
| 181 | Semaphore::MAX_PERMITS | ||
| 182 | } else { | ||
| 183 | limit | ||
| 184 | })); | ||
| 149 | let mut futures = FuturesUnordered::new(); | 185 | let mut futures = FuturesUnordered::new(); |
| 150 | 186 | ||
| 151 | for &machine in machines { | 187 | for &machine in machines { |
| 152 | let fut = f(machine); | 188 | let fut = f(machine); |
| 153 | let fut = async move { (machine, fut.await) }; | 189 | let sem = sem.clone(); |
| 190 | let fut = async move { | ||
| 191 | let _permit = sem.acquire().await.unwrap(); | ||
| 192 | (machine, fut.await) | ||
| 193 | }; | ||
| 154 | futures.push(fut); | 194 | futures.push(fut); |
| 155 | } | 195 | } |
| 156 | 196 | ||
| 197 | let mut results = Vec::default(); | ||
| 157 | while let Some((machine, result)) = futures.next().await { | 198 | while let Some((machine, result)) = futures.next().await { |
| 158 | if let Err(err) = result { | 199 | match result { |
| 159 | return Err(err).with_context(|| format!("running task on machine {machine}")); | 200 | Ok(value) => results.push((machine, value)), |
| 201 | Err(err) => { | ||
| 202 | return Err(err).with_context(|| format!("running task on machine {machine}")); | ||
| 203 | } | ||
| 160 | } | 204 | } |
| 161 | } | 205 | } |
| 162 | Ok(()) | 206 | Ok(results) |
| 163 | } | 207 | } |
diff --git a/src/main.rs b/src/main.rs index cc8afd0..68cbbdd 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -15,7 +15,6 @@ use serde::Deserialize; | |||
| 15 | use tokio::{ | 15 | use tokio::{ |
| 16 | io::{AsyncReadExt as _, AsyncWriteExt as _}, | 16 | io::{AsyncReadExt as _, AsyncWriteExt as _}, |
| 17 | process::Command, | 17 | process::Command, |
| 18 | task::JoinSet, | ||
| 19 | }; | 18 | }; |
| 20 | 19 | ||
| 21 | use crate::{ | 20 | use crate::{ |
| @@ -253,14 +252,14 @@ async fn cmd_net_down(args: NetDownArgs) -> Result<()> { | |||
| 253 | async fn cmd_net_show(args: NetShowArgs) -> Result<()> { | 252 | async fn cmd_net_show(args: NetShowArgs) -> Result<()> { |
| 254 | let context = context_from_common(&args.common).await?; | 253 | let context = context_from_common(&args.common).await?; |
| 255 | let machines = oar::job_list_machines(&context).await?; | 254 | let machines = oar::job_list_machines(&context).await?; |
| 256 | let mut set = JoinSet::default(); | 255 | let results = machine::for_each(machines.iter(), |machine| { |
| 257 | for machine in machines { | ||
| 258 | let context = context.clone(); | 256 | let context = context.clone(); |
| 259 | set.spawn(async move { (machine, machine_list_addresses(&context, machine).await) }); | 257 | async move { machine_list_addresses(&context, machine).await } |
| 260 | } | 258 | }) |
| 259 | .await?; | ||
| 260 | |||
| 261 | let mut addresses = Vec::default(); | 261 | let mut addresses = Vec::default(); |
| 262 | for (machine, result) in set.join_all().await { | 262 | for (machine, addrs) in results { |
| 263 | let addrs = result?; | ||
| 264 | for addr in addrs { | 263 | for addr in addrs { |
| 265 | addresses.push((machine, addr)); | 264 | addresses.push((machine, addr)); |
| 266 | } | 265 | } |
| @@ -625,44 +624,35 @@ async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> | |||
| 625 | #[tracing::instrument(ret, err, skip_all)] | 624 | #[tracing::instrument(ret, err, skip_all)] |
| 626 | async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> { | 625 | async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> { |
| 627 | tracing::info!("cleaning machines: {machines:?}"); | 626 | tracing::info!("cleaning machines: {machines:?}"); |
| 628 | let mut set = JoinSet::default(); | 627 | machine::for_each(machines, |machine| { |
| 629 | for &machine in machines { | ||
| 630 | let ctx = ctx.clone(); | 628 | let ctx = ctx.clone(); |
| 631 | set.spawn(async move { machine_clean(&ctx, machine).await }); | 629 | async move { machine_clean(&ctx, machine).await } |
| 632 | } | 630 | }) |
| 633 | let results = set.join_all().await; | 631 | .await?; |
| 634 | for result in results { | ||
| 635 | result?; | ||
| 636 | } | ||
| 637 | Ok(()) | 632 | Ok(()) |
| 638 | } | 633 | } |
| 639 | 634 | ||
| 640 | #[tracing::instrument(ret, err, skip_all)] | 635 | #[tracing::instrument(ret, err, skip_all)] |
| 641 | async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> { | 636 | async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> { |
| 642 | tracing::info!("building networking container for machines: {machines:?}"); | 637 | tracing::info!("building networking container for machines: {machines:?}"); |
| 643 | let mut set = JoinSet::default(); | 638 | machine::for_each(machines, |machine| { |
| 644 | for &machine in machines { | ||
| 645 | let ctx = ctx.clone(); | 639 | let ctx = ctx.clone(); |
| 646 | set.spawn(async move { machine_net_container_build(&ctx, machine).await }); | 640 | async move { machine_net_container_build(&ctx, machine).await } |
| 647 | } | 641 | }) |
| 648 | for result in set.join_all().await { | 642 | .await?; |
| 649 | result?; | ||
| 650 | } | ||
| 651 | Ok(()) | 643 | Ok(()) |
| 652 | } | 644 | } |
| 653 | 645 | ||
| 654 | #[tracing::instrument(ret, err, skip_all)] | 646 | #[tracing::instrument(ret, err, skip_all)] |
| 655 | async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> { | 647 | async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> { |
| 656 | tracing::info!("configuring machines"); | 648 | tracing::info!("configuring machines"); |
| 657 | let mut set = JoinSet::default(); | 649 | let machines = configs.iter().map(|c| &c.machine); |
| 658 | for config in configs { | 650 | machine::for_each(machines, |machine| { |
| 659 | let ctx = ctx.clone(); | 651 | let ctx = ctx.clone(); |
| 660 | let config = config.clone(); | 652 | let config = configs.iter().find(|c| c.machine == machine).unwrap(); |
| 661 | set.spawn(async move { machine_configure(&ctx, &config).await }); | 653 | async move { machine_configure(&ctx, &config).await } |
| 662 | } | 654 | }) |
| 663 | for result in set.join_all().await { | 655 | .await?; |
| 664 | result?; | ||
| 665 | } | ||
| 666 | Ok(()) | 656 | Ok(()) |
| 667 | } | 657 | } |
| 668 | 658 | ||
