diff options
| author | diogo464 <[email protected]> | 2025-07-10 22:05:59 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-10 22:05:59 +0100 |
| commit | 66a0329b201a19d2a6d87b265b6fde3423a917c0 (patch) | |
| tree | 62ad156e4c3e378dc64fec38542657eaa9ea0c9c | |
| parent | 4fc26211fda53023f8ce703ccf4b1a2bbfbbe10a (diff) | |
clean enough for now
| -rw-r--r-- | src/context.rs | 58 | ||||
| -rw-r--r-- | src/machine.rs | 24 | ||||
| -rw-r--r-- | src/main.rs | 507 | ||||
| -rw-r--r-- | src/oar.rs | 164 |
4 files changed, 433 insertions, 320 deletions
diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..323a8a5 --- /dev/null +++ b/src/context.rs | |||
| @@ -0,0 +1,58 @@ | |||
| 1 | use eyre::{Context as _, Result}; | ||
| 2 | |||
| 3 | use crate::machine::Machine; | ||
| 4 | |||
| 5 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||
| 6 | pub enum ExecutionNode { | ||
| 7 | Frontend, | ||
| 8 | Machine(Machine), | ||
| 9 | Unknown, | ||
| 10 | } | ||
| 11 | |||
| 12 | #[derive(Debug, Clone)] | ||
| 13 | pub struct Context { | ||
| 14 | pub node: ExecutionNode, | ||
| 15 | job_id: Option<u32>, | ||
| 16 | frontend_hostname: Option<String>, | ||
| 17 | } | ||
| 18 | |||
| 19 | impl Context { | ||
| 20 | pub async fn new(job_id: Option<u32>, frontend_hostname: Option<String>) -> Result<Self> { | ||
| 21 | Ok(Self { | ||
| 22 | node: get_execution_node().await?, | ||
| 23 | job_id, | ||
| 24 | frontend_hostname, | ||
| 25 | }) | ||
| 26 | } | ||
| 27 | |||
| 28 | pub fn job_id(&self) -> Result<u32> { | ||
| 29 | self.job_id.ok_or_else(|| eyre::eyre!("missing job id")) | ||
| 30 | } | ||
| 31 | |||
| 32 | pub fn frontend_hostname(&self) -> Result<&str> { | ||
| 33 | self.frontend_hostname | ||
| 34 | .as_ref() | ||
| 35 | .map(|s| s.as_str()) | ||
| 36 | .ok_or_else(|| eyre::eyre!("missing frontend hostname")) | ||
| 37 | } | ||
| 38 | } | ||
| 39 | |||
| 40 | async fn get_execution_node() -> Result<ExecutionNode> { | ||
| 41 | let hostname = get_hostname().await?; | ||
| 42 | let node = match hostname.as_str() { | ||
| 43 | "frontend" => ExecutionNode::Frontend, | ||
| 44 | _ => match Machine::from_hostname(&hostname) { | ||
| 45 | Some(machine) => ExecutionNode::Machine(machine), | ||
| 46 | _ => ExecutionNode::Unknown, | ||
| 47 | }, | ||
| 48 | }; | ||
| 49 | Ok(node) | ||
| 50 | } | ||
| 51 | |||
| 52 | async fn get_hostname() -> Result<String> { | ||
| 53 | if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await { | ||
| 54 | Ok(hostname) | ||
| 55 | } else { | ||
| 56 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | ||
| 57 | } | ||
| 58 | } | ||
diff --git a/src/machine.rs b/src/machine.rs index f223e66..c4a5875 100644 --- a/src/machine.rs +++ b/src/machine.rs | |||
| @@ -1,3 +1,6 @@ | |||
| 1 | use eyre::{Context as _, Result}; | ||
| 2 | use futures::{StreamExt as _, stream::FuturesUnordered}; | ||
| 3 | |||
| 1 | macro_rules! define_machines { | 4 | macro_rules! define_machines { |
| 2 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { | 5 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { |
| 3 | #[derive(Debug)] | 6 | #[derive(Debug)] |
| @@ -137,3 +140,24 @@ define_machines!( | |||
| 137 | (Sudowoodo1, 55, "sudowoodo-1", 16, todo!()), | 140 | (Sudowoodo1, 55, "sudowoodo-1", 16, todo!()), |
| 138 | (Vulpix1, 56, "vulpix-1", 112, todo!()) | 141 | (Vulpix1, 56, "vulpix-1", 112, todo!()) |
| 139 | ); | 142 | ); |
| 143 | |||
| 144 | pub async fn for_each<F, FUT, RET>(machines: impl IntoIterator<Item = &Machine>, f: F) -> Result<()> | ||
| 145 | where | ||
| 146 | F: Fn(Machine) -> FUT, | ||
| 147 | FUT: std::future::Future<Output = Result<RET>>, | ||
| 148 | { | ||
| 149 | let mut futures = FuturesUnordered::new(); | ||
| 150 | |||
| 151 | for &machine in machines { | ||
| 152 | let fut = f(machine); | ||
| 153 | let fut = async move { (machine, fut.await) }; | ||
| 154 | futures.push(fut); | ||
| 155 | } | ||
| 156 | |||
| 157 | while let Some((machine, result)) = futures.next().await { | ||
| 158 | if let Err(err) = result { | ||
| 159 | return Err(err).with_context(|| format!("running task on machine {machine}")); | ||
| 160 | } | ||
| 161 | } | ||
| 162 | Ok(()) | ||
| 163 | } | ||
diff --git a/src/main.rs b/src/main.rs index 36c2a9b..f164d47 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -9,7 +9,6 @@ use std::{ | |||
| 9 | use clap::{Args, Parser, Subcommand}; | 9 | use clap::{Args, Parser, Subcommand}; |
| 10 | use eyre::Context as _; | 10 | use eyre::Context as _; |
| 11 | use eyre::Result; | 11 | use eyre::Result; |
| 12 | use futures::{StreamExt as _, stream::FuturesUnordered}; | ||
| 13 | use machine::Machine; | 12 | use machine::Machine; |
| 14 | use serde::Deserialize; | 13 | use serde::Deserialize; |
| 15 | use tokio::{ | 14 | use tokio::{ |
| @@ -18,10 +17,15 @@ use tokio::{ | |||
| 18 | task::JoinSet, | 17 | task::JoinSet, |
| 19 | }; | 18 | }; |
| 20 | 19 | ||
| 21 | use crate::latency_matrix::LatencyMatrix; | 20 | use crate::{ |
| 21 | context::{Context, ExecutionNode}, | ||
| 22 | latency_matrix::LatencyMatrix, | ||
| 23 | }; | ||
| 22 | 24 | ||
| 25 | pub mod context; | ||
| 23 | pub mod latency_matrix; | 26 | pub mod latency_matrix; |
| 24 | pub mod machine; | 27 | pub mod machine; |
| 28 | pub mod oar; | ||
| 25 | 29 | ||
| 26 | const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking"; | 30 | const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking"; |
| 27 | 31 | ||
| @@ -105,20 +109,6 @@ struct RunArgs { | |||
| 105 | schedule: Option<PathBuf>, | 109 | schedule: Option<PathBuf>, |
| 106 | } | 110 | } |
| 107 | 111 | ||
| 108 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||
| 109 | enum ExecutionNode { | ||
| 110 | Frontend, | ||
| 111 | Machine(Machine), | ||
| 112 | Unknown, | ||
| 113 | } | ||
| 114 | |||
| 115 | #[derive(Debug, Clone)] | ||
| 116 | struct Context { | ||
| 117 | node: ExecutionNode, | ||
| 118 | job_id: Option<u32>, | ||
| 119 | frontend_hostname: Option<String>, | ||
| 120 | } | ||
| 121 | |||
| 122 | #[derive(Debug, Clone)] | 112 | #[derive(Debug, Clone)] |
| 123 | struct MachineConfig { | 113 | struct MachineConfig { |
| 124 | machine: Machine, | 114 | machine: Machine, |
| @@ -152,12 +142,7 @@ async fn main() -> Result<()> { | |||
| 152 | } | 142 | } |
| 153 | 143 | ||
| 154 | async fn context_from_common(common: &Common) -> Result<Context> { | 144 | async fn context_from_common(common: &Common) -> Result<Context> { |
| 155 | let node = get_execution_node().await?; | 145 | Context::new(common.job_id, common.frontend_hostname.clone()).await |
| 156 | Ok(Context { | ||
| 157 | node, | ||
| 158 | job_id: common.job_id, | ||
| 159 | frontend_hostname: common.frontend_hostname.clone(), | ||
| 160 | }) | ||
| 161 | } | 146 | } |
| 162 | 147 | ||
| 163 | async fn cmd_net_up(args: NetUpArgs) -> Result<()> { | 148 | async fn cmd_net_up(args: NetUpArgs) -> Result<()> { |
| @@ -167,7 +152,7 @@ async fn cmd_net_up(args: NetUpArgs) -> Result<()> { | |||
| 167 | .context("reading latecy matrix")?; | 152 | .context("reading latecy matrix")?; |
| 168 | let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) | 153 | let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) |
| 169 | .context("parsing latency matrix")?; | 154 | .context("parsing latency matrix")?; |
| 170 | let machines = job_list_machines(&context).await?; | 155 | let machines = oar::job_list_machines(&context).await?; |
| 171 | let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); | 156 | let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); |
| 172 | machines_net_container_build(&context, &machines).await?; | 157 | machines_net_container_build(&context, &machines).await?; |
| 173 | machines_clean(&context, &machines).await?; | 158 | machines_clean(&context, &machines).await?; |
| @@ -177,7 +162,7 @@ async fn cmd_net_up(args: NetUpArgs) -> Result<()> { | |||
| 177 | 162 | ||
| 178 | async fn cmd_net_down(args: NetDownArgs) -> Result<()> { | 163 | async fn cmd_net_down(args: NetDownArgs) -> Result<()> { |
| 179 | let context = context_from_common(&args.common).await?; | 164 | let context = context_from_common(&args.common).await?; |
| 180 | let machines = job_list_machines(&context).await?; | 165 | let machines = oar::job_list_machines(&context).await?; |
| 181 | machines_net_container_build(&context, &machines).await?; | 166 | machines_net_container_build(&context, &machines).await?; |
| 182 | machines_clean(&context, &machines).await?; | 167 | machines_clean(&context, &machines).await?; |
| 183 | Ok(()) | 168 | Ok(()) |
| @@ -185,7 +170,7 @@ async fn cmd_net_down(args: NetDownArgs) -> Result<()> { | |||
| 185 | 170 | ||
| 186 | async fn cmd_net_show(args: NetShowArgs) -> Result<()> { | 171 | async fn cmd_net_show(args: NetShowArgs) -> Result<()> { |
| 187 | let context = context_from_common(&args.common).await?; | 172 | let context = context_from_common(&args.common).await?; |
| 188 | let machines = job_list_machines(&context).await?; | 173 | let machines = oar::job_list_machines(&context).await?; |
| 189 | let mut set = JoinSet::default(); | 174 | let mut set = JoinSet::default(); |
| 190 | for machine in machines { | 175 | for machine in machines { |
| 191 | let context = context.clone(); | 176 | let context = context.clone(); |
| @@ -270,7 +255,7 @@ fn parse_schedule(schedule: &str) -> Result<Vec<ScheduledContainer>> { | |||
| 270 | 255 | ||
| 271 | async fn cmd_run(args: RunArgs) -> Result<()> { | 256 | async fn cmd_run(args: RunArgs) -> Result<()> { |
| 272 | let ctx = context_from_common(&args.common).await?; | 257 | let ctx = context_from_common(&args.common).await?; |
| 273 | let machines = job_list_machines(&ctx).await?; | 258 | let machines = oar::job_list_machines(&ctx).await?; |
| 274 | let schedule = match args.schedule { | 259 | let schedule = match args.schedule { |
| 275 | Some(path) => tokio::fs::read_to_string(&path) | 260 | Some(path) => tokio::fs::read_to_string(&path) |
| 276 | .await | 261 | .await |
| @@ -286,104 +271,57 @@ async fn cmd_run(args: RunArgs) -> Result<()> { | |||
| 286 | }; | 271 | }; |
| 287 | let containers = parse_schedule(&schedule)?; | 272 | let containers = parse_schedule(&schedule)?; |
| 288 | 273 | ||
| 289 | machines_foreach(&machines, |machine| machine_containers_clean(&ctx, machine)).await?; | 274 | machine::for_each(&machines, |machine| machine_containers_clean(&ctx, machine)).await?; |
| 290 | machines_foreach(&machines, |machine| { | 275 | machine::for_each(&machines, |machine| { |
| 291 | let ctx = ctx.clone(); | 276 | let ctx = ctx.clone(); |
| 292 | let containers = containers | 277 | let containers = containers |
| 293 | .iter() | 278 | .iter() |
| 294 | .filter(|c| c.machine == machine) | 279 | .filter(|c| c.machine == machine) |
| 295 | .cloned() | 280 | .cloned() |
| 296 | .collect::<Vec<_>>(); | 281 | .collect::<Vec<_>>(); |
| 297 | let mut script = String::default(); | 282 | async move { machine_create_containers(&ctx, machine, &containers).await } |
| 298 | for (idx, container) in containers.iter().enumerate() { | ||
| 299 | script.push_str("docker create \\\n"); | ||
| 300 | script.push_str("\t--pull=always \\\n"); | ||
| 301 | script.push_str("\t--network=host \\\n"); | ||
| 302 | script.push_str("\t--restart=no \\\n"); | ||
| 303 | script.push_str(&format!("\t--name {} \\\n", container.name)); | ||
| 304 | for (key, val) in container.variables.iter() { | ||
| 305 | script.push_str("\t-e "); | ||
| 306 | script.push_str(key); | ||
| 307 | script.push_str("="); | ||
| 308 | script.push_str(val); | ||
| 309 | script.push_str(" \\\n"); | ||
| 310 | } | ||
| 311 | script.push_str("\t"); | ||
| 312 | script.push_str(&container.image); | ||
| 313 | script.push_str(" &\n"); | ||
| 314 | script.push_str(&format!("pid_{idx}=$!\n\n")); | ||
| 315 | } | ||
| 316 | |||
| 317 | for (idx, container) in containers.iter().enumerate() { | ||
| 318 | let name = &container.name; | ||
| 319 | script.push_str(&format!( | ||
| 320 | "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n" | ||
| 321 | )); | ||
| 322 | } | ||
| 323 | tracing::debug!("container creation script:\n{script}"); | ||
| 324 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 325 | }) | 283 | }) |
| 326 | .await?; | 284 | .await?; |
| 327 | 285 | ||
| 328 | tracing::info!("starting all containers on all machines"); | 286 | tracing::info!("starting all containers on all machines"); |
| 329 | machines_foreach( | 287 | machine::for_each( |
| 330 | machines | 288 | machines |
| 331 | .iter() | 289 | .iter() |
| 332 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | 290 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), |
| 333 | |machine| { | 291 | |machine| machine_start_containers(&ctx, machine), |
| 334 | machine_run_script( | ||
| 335 | &ctx, | ||
| 336 | machine, | ||
| 337 | "docker container ls -aq | xargs docker container start", | ||
| 338 | ) | ||
| 339 | }, | ||
| 340 | ) | 292 | ) |
| 341 | .await?; | 293 | .await?; |
| 342 | 294 | ||
| 343 | tracing::info!("waiting for all containers to exit"); | 295 | tracing::info!("waiting for all containers to exit"); |
| 344 | machines_foreach(&machines, |machine| { | 296 | machine::for_each(&machines, |machine| { |
| 345 | let ctx = ctx.clone(); | 297 | let ctx = ctx.clone(); |
| 346 | let containers = containers | 298 | let containers = containers |
| 347 | .iter() | 299 | .iter() |
| 348 | .filter(|c| c.machine == machine) | 300 | .filter(|c| c.machine == machine) |
| 349 | .cloned() | 301 | .cloned() |
| 350 | .collect::<Vec<_>>(); | 302 | .collect::<Vec<_>>(); |
| 351 | let mut script = String::default(); | 303 | async move { |
| 352 | for container in containers { | 304 | machine_containers_wait(&ctx, machine, &containers) |
| 353 | let name = &container.name; | 305 | .await |
| 354 | script.push_str(&format!("if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n")); | 306 | .with_context(|| format!("waiting for containers on {machine}")) |
| 355 | script.push_str(&format!("\techo Container {name} failed\n")); | ||
| 356 | script.push_str(&format!("\tdocker logs {name} 2>1\n")); | ||
| 357 | script.push_str("\texit 1\n"); | ||
| 358 | script.push_str("fi\n\n"); | ||
| 359 | } | 307 | } |
| 360 | script.push_str("exit 0\n"); | ||
| 361 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 362 | }) | 308 | }) |
| 363 | .await?; | 309 | .await?; |
| 364 | 310 | ||
| 365 | tracing::info!("saving logs to disk on all machines"); | 311 | tracing::info!("saving logs to disk on all machines"); |
| 366 | machines_foreach(&machines, |machine| { | 312 | machine::for_each(&machines, |machine| { |
| 367 | let ctx = ctx.clone(); | 313 | let ctx = ctx.clone(); |
| 368 | let containers = containers | 314 | let containers = containers |
| 369 | .iter() | 315 | .iter() |
| 370 | .filter(|c| c.machine == machine) | 316 | .filter(|c| c.machine == machine) |
| 371 | .cloned() | 317 | .cloned() |
| 372 | .collect::<Vec<_>>(); | 318 | .collect::<Vec<_>>(); |
| 373 | let mut script = String::default(); | 319 | async move { machine_containers_save_logs(&ctx, machine, &containers).await } |
| 374 | script.push_str("set -e\n"); | ||
| 375 | script.push_str("mkdir -p /tmp/oar-p2p-logs\n"); | ||
| 376 | script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n"); | ||
| 377 | for container in containers { | ||
| 378 | let name = &container.name; | ||
| 379 | script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n")); | ||
| 380 | } | ||
| 381 | script.push_str("exit 0\n"); | ||
| 382 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 383 | }) | 320 | }) |
| 384 | .await?; | 321 | .await?; |
| 385 | 322 | ||
| 386 | machines_foreach( | 323 | tracing::info!("copying logs from all machines"); |
| 324 | machine::for_each( | ||
| 387 | machines | 325 | machines |
| 388 | .iter() | 326 | .iter() |
| 389 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | 327 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), |
| @@ -394,7 +332,121 @@ async fn cmd_run(args: RunArgs) -> Result<()> { | |||
| 394 | Ok(()) | 332 | Ok(()) |
| 395 | } | 333 | } |
| 396 | 334 | ||
| 335 | fn machine_containers_create_script(containers: &[ScheduledContainer]) -> String { | ||
| 336 | let mut script = String::default(); | ||
| 337 | for (idx, container) in containers.iter().enumerate() { | ||
| 338 | script.push_str("docker create \\\n"); | ||
| 339 | script.push_str("\t--pull=always \\\n"); | ||
| 340 | script.push_str("\t--network=host \\\n"); | ||
| 341 | script.push_str("\t--restart=no \\\n"); | ||
| 342 | script.push_str(&format!("\t--name {} \\\n", container.name)); | ||
| 343 | for (key, val) in container.variables.iter() { | ||
| 344 | script.push_str("\t-e "); | ||
| 345 | script.push_str(key); | ||
| 346 | script.push_str("="); | ||
| 347 | script.push_str(val); | ||
| 348 | script.push_str(" \\\n"); | ||
| 349 | } | ||
| 350 | script.push_str("\t"); | ||
| 351 | script.push_str(&container.image); | ||
| 352 | script.push_str(" &\n"); | ||
| 353 | script.push_str(&format!("pid_{idx}=$!\n\n")); | ||
| 354 | } | ||
| 355 | |||
| 356 | for (idx, container) in containers.iter().enumerate() { | ||
| 357 | let name = &container.name; | ||
| 358 | script.push_str(&format!( | ||
| 359 | "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n" | ||
| 360 | )); | ||
| 361 | } | ||
| 362 | |||
| 363 | script | ||
| 364 | } | ||
| 365 | |||
| 366 | #[tracing::instrument(ret, err, skip(ctx, containers))] | ||
| 367 | async fn machine_create_containers( | ||
| 368 | ctx: &Context, | ||
| 369 | machine: Machine, | ||
| 370 | containers: &[ScheduledContainer], | ||
| 371 | ) -> Result<()> { | ||
| 372 | tracing::info!("creating {} containers", containers.len()); | ||
| 373 | let script = machine_containers_create_script(containers); | ||
| 374 | machine_run_script(&ctx, machine, &script).await?; | ||
| 375 | tracing::info!("containers created"); | ||
| 376 | Ok(()) | ||
| 377 | } | ||
| 378 | |||
| 379 | #[tracing::instrument(ret, err, skip(ctx))] | ||
| 380 | async fn machine_start_containers(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 381 | tracing::info!("starting all containers"); | ||
| 382 | machine_run_script( | ||
| 383 | &ctx, | ||
| 384 | machine, | ||
| 385 | "docker container ls -aq | xargs docker container start", | ||
| 386 | ) | ||
| 387 | .await?; | ||
| 388 | tracing::info!("all containers started"); | ||
| 389 | Ok(()) | ||
| 390 | } | ||
| 391 | |||
| 392 | fn machine_containers_wait_script(containers: &[ScheduledContainer]) -> String { | ||
| 393 | let mut script = String::default(); | ||
| 394 | for container in containers { | ||
| 395 | let name = &container.name; | ||
| 396 | script.push_str(&format!( | ||
| 397 | "if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n" | ||
| 398 | )); | ||
| 399 | script.push_str(&format!("\techo Container {name} failed\n")); | ||
| 400 | script.push_str(&format!("\tdocker logs {name} 2>1\n")); | ||
| 401 | script.push_str("\texit 1\n"); | ||
| 402 | script.push_str("fi\n\n"); | ||
| 403 | } | ||
| 404 | script.push_str("exit 0\n"); | ||
| 405 | script | ||
| 406 | } | ||
| 407 | |||
| 408 | #[tracing::instrument(ret, err, skip(ctx, containers))] | ||
| 409 | async fn machine_containers_wait( | ||
| 410 | ctx: &Context, | ||
| 411 | machine: Machine, | ||
| 412 | containers: &[ScheduledContainer], | ||
| 413 | ) -> Result<()> { | ||
| 414 | tracing::info!("waiting for {} containers to exit", containers.len()); | ||
| 415 | let script = machine_containers_wait_script(containers); | ||
| 416 | machine_run_script(ctx, machine, &script).await?; | ||
| 417 | tracing::info!("all containers exited"); | ||
| 418 | Ok(()) | ||
| 419 | } | ||
| 420 | |||
| 421 | fn machine_containers_save_logs_script(containers: &[ScheduledContainer]) -> String { | ||
| 422 | let mut script = String::default(); | ||
| 423 | script.push_str("set -e\n"); | ||
| 424 | script.push_str("mkdir -p /tmp/oar-p2p-logs\n"); | ||
| 425 | script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n"); | ||
| 426 | for container in containers { | ||
| 427 | let name = &container.name; | ||
| 428 | script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n")); | ||
| 429 | } | ||
| 430 | script.push_str("exit 0\n"); | ||
| 431 | script | ||
| 432 | } | ||
| 433 | |||
| 434 | #[tracing::instrument(ret, err, skip(ctx, containers))] | ||
| 435 | async fn machine_containers_save_logs( | ||
| 436 | ctx: &Context, | ||
| 437 | machine: Machine, | ||
| 438 | containers: &[ScheduledContainer], | ||
| 439 | ) -> Result<()> { | ||
| 440 | tracing::info!("saving logs from {} containers", containers.len()); | ||
| 441 | let script = machine_containers_save_logs_script(containers); | ||
| 442 | machine_run_script(&ctx, machine, &script).await?; | ||
| 443 | tracing::info!("logs saved"); | ||
| 444 | Ok(()) | ||
| 445 | } | ||
| 446 | |||
| 447 | #[tracing::instrument(ret, err, skip(ctx))] | ||
| 397 | async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> { | 448 | async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> { |
| 449 | tracing::info!("copying container logs from machine"); | ||
| 398 | let scp_common = &[ | 450 | let scp_common = &[ |
| 399 | "-o", | 451 | "-o", |
| 400 | "StrictHostKeyChecking=no", | 452 | "StrictHostKeyChecking=no", |
| @@ -406,7 +458,7 @@ async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Pat | |||
| 406 | args.extend(scp_common); | 458 | args.extend(scp_common); |
| 407 | if ctx.node == ExecutionNode::Unknown { | 459 | if ctx.node == ExecutionNode::Unknown { |
| 408 | args.push("-J"); | 460 | args.push("-J"); |
| 409 | args.push(ctx.frontend_hostname.as_ref().expect("TODO")); | 461 | args.push(ctx.frontend_hostname()?); |
| 410 | } | 462 | } |
| 411 | args.push("-r"); | 463 | args.push("-r"); |
| 412 | 464 | ||
| @@ -417,38 +469,15 @@ async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Pat | |||
| 417 | 469 | ||
| 418 | let output = Command::new("scp").args(args).output().await?; | 470 | let output = Command::new("scp").args(args).output().await?; |
| 419 | output.exit_ok()?; | 471 | output.exit_ok()?; |
| 472 | tracing::info!("logs finished copying"); | ||
| 420 | Ok(()) | 473 | Ok(()) |
| 421 | } | 474 | } |
| 422 | 475 | ||
| 423 | async fn machines_foreach<F, FUT, RET>( | 476 | #[tracing::instrument(ret, err, skip(ctx))] |
| 424 | machines: impl IntoIterator<Item = &Machine>, | ||
| 425 | f: F, | ||
| 426 | ) -> Result<()> | ||
| 427 | where | ||
| 428 | F: Fn(Machine) -> FUT, | ||
| 429 | FUT: std::future::Future<Output = Result<RET>>, | ||
| 430 | { | ||
| 431 | let mut futures = FuturesUnordered::new(); | ||
| 432 | |||
| 433 | for &machine in machines { | ||
| 434 | let fut = f(machine); | ||
| 435 | let fut = async move { (machine, fut.await) }; | ||
| 436 | futures.push(fut); | ||
| 437 | } | ||
| 438 | |||
| 439 | while let Some((machine, result)) = futures.next().await { | ||
| 440 | if let Err(err) = result { | ||
| 441 | tracing::error!("error on machine {machine}: {err}"); | ||
| 442 | return Err(err); | ||
| 443 | } | ||
| 444 | } | ||
| 445 | Ok(()) | ||
| 446 | } | ||
| 447 | |||
| 448 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | ||
| 449 | async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> { | 477 | async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> { |
| 450 | tracing::info!("removing all containers..."); | 478 | tracing::info!("removing all containers..."); |
| 451 | machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?; | 479 | machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?; |
| 480 | tracing::info!("all containers removed"); | ||
| 452 | Ok(()) | 481 | Ok(()) |
| 453 | } | 482 | } |
| 454 | 483 | ||
| @@ -496,7 +525,9 @@ async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result< | |||
| 496 | Ok(()) | 525 | Ok(()) |
| 497 | } | 526 | } |
| 498 | 527 | ||
| 528 | #[tracing::instrument(err, skip(ctx))] | ||
| 499 | async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { | 529 | async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { |
| 530 | tracing::info!("listing machine addresses"); | ||
| 500 | let interface = machine.interface(); | 531 | let interface = machine.interface(); |
| 501 | let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'"); | 532 | let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'"); |
| 502 | let output = machine_run_script(ctx, machine, &script).await?; | 533 | let output = machine_run_script(ctx, machine, &script).await?; |
| @@ -506,9 +537,11 @@ async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<I | |||
| 506 | tracing::trace!("parsing address from line: '{line}'"); | 537 | tracing::trace!("parsing address from line: '{line}'"); |
| 507 | addresses.push(line.parse()?); | 538 | addresses.push(line.parse()?); |
| 508 | } | 539 | } |
| 540 | tracing::trace!("addresses: {addresses:#?}"); | ||
| 509 | Ok(addresses) | 541 | Ok(addresses) |
| 510 | } | 542 | } |
| 511 | 543 | ||
| 544 | #[tracing::instrument(ret, err, level = tracing::Level::TRACE)] | ||
| 512 | async fn machine_run( | 545 | async fn machine_run( |
| 513 | ctx: &Context, | 546 | ctx: &Context, |
| 514 | machine: Machine, | 547 | machine: Machine, |
| @@ -542,7 +575,7 @@ async fn machine_run( | |||
| 542 | } | 575 | } |
| 543 | } | 576 | } |
| 544 | ExecutionNode::Unknown => { | 577 | ExecutionNode::Unknown => { |
| 545 | let frontend = ctx.frontend_hostname.as_ref().unwrap(); | 578 | let frontend = ctx.frontend_hostname()?; |
| 546 | let mut arguments = Vec::default(); | 579 | let mut arguments = Vec::default(); |
| 547 | arguments.push("ssh"); | 580 | arguments.push("ssh"); |
| 548 | arguments.extend(ssh_common); | 581 | arguments.extend(ssh_common); |
| @@ -557,6 +590,7 @@ async fn machine_run( | |||
| 557 | } | 590 | } |
| 558 | arguments.extend(args); | 591 | arguments.extend(args); |
| 559 | 592 | ||
| 593 | tracing::trace!("running command: {arguments:?}"); | ||
| 560 | let mut proc = Command::new(arguments[0]) | 594 | let mut proc = Command::new(arguments[0]) |
| 561 | .args(&arguments[1..]) | 595 | .args(&arguments[1..]) |
| 562 | .stdout(std::process::Stdio::piped()) | 596 | .stdout(std::process::Stdio::piped()) |
| @@ -582,16 +616,17 @@ async fn machine_run( | |||
| 582 | } | 616 | } |
| 583 | 617 | ||
| 584 | async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> { | 618 | async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> { |
| 585 | tracing::trace!("running script on machine {machine}:\n{script}"); | 619 | tracing::debug!("script body:\n{script}"); |
| 586 | let output = machine_run(ctx, machine, &[], Some(script)).await?; | 620 | let output = machine_run(ctx, machine, &[], Some(script)).await?; |
| 587 | tracing::trace!( | 621 | let stdout = std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>"); |
| 588 | "stdout:\n{}", | 622 | let stderr = std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>"); |
| 589 | std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>") | 623 | if output.status.success() { |
| 590 | ); | 624 | tracing::trace!("stdout:\n{stdout}",); |
| 591 | tracing::trace!( | 625 | tracing::trace!("stderr:\n{stderr}",); |
| 592 | "stderr:\n{}", | 626 | } else { |
| 593 | std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>") | 627 | tracing::error!("stdout:\n{stdout}",); |
| 594 | ); | 628 | tracing::error!("stderr:\n{stderr}",); |
| 629 | } | ||
| 595 | Ok(output.exit_ok()?) | 630 | Ok(output.exit_ok()?) |
| 596 | } | 631 | } |
| 597 | 632 | ||
| @@ -600,7 +635,8 @@ async fn machine_net_container_run_script( | |||
| 600 | machine: Machine, | 635 | machine: Machine, |
| 601 | script: &str, | 636 | script: &str, |
| 602 | ) -> Result<Output> { | 637 | ) -> Result<Output> { |
| 603 | machine_run( | 638 | tracing::debug!("network container script body:\n{script}"); |
| 639 | let output = machine_run( | ||
| 604 | ctx, | 640 | ctx, |
| 605 | machine, | 641 | machine, |
| 606 | &[ | 642 | &[ |
| @@ -614,11 +650,24 @@ async fn machine_net_container_run_script( | |||
| 614 | ], | 650 | ], |
| 615 | Some(script), | 651 | Some(script), |
| 616 | ) | 652 | ) |
| 617 | .await | 653 | .await?; |
| 654 | |||
| 655 | let stdout = std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>"); | ||
| 656 | let stderr = std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>"); | ||
| 657 | if output.status.success() { | ||
| 658 | tracing::trace!("stdout:\n{stdout}",); | ||
| 659 | tracing::trace!("stderr:\n{stderr}",); | ||
| 660 | } else { | ||
| 661 | tracing::error!("stdout:\n{stdout}",); | ||
| 662 | tracing::error!("stderr:\n{stderr}",); | ||
| 663 | } | ||
| 664 | |||
| 665 | Ok(output.exit_ok()?) | ||
| 618 | } | 666 | } |
| 619 | 667 | ||
| 620 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | 668 | #[tracing::instrument(ret, err, skip(ctx))] |
| 621 | async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> { | 669 | async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> { |
| 670 | tracing::info!("building network container..."); | ||
| 622 | let script = r#" | 671 | let script = r#" |
| 623 | set -e | 672 | set -e |
| 624 | cat << EOF > /tmp/oar-p2p.containerfile | 673 | cat << EOF > /tmp/oar-p2p.containerfile |
| @@ -633,11 +682,13 @@ EOF | |||
| 633 | docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile . | 682 | docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile . |
| 634 | "#; | 683 | "#; |
| 635 | machine_run_script(ctx, machine, script).await?; | 684 | machine_run_script(ctx, machine, script).await?; |
| 685 | tracing::info!("network container built"); | ||
| 636 | Ok(()) | 686 | Ok(()) |
| 637 | } | 687 | } |
| 638 | 688 | ||
| 639 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | 689 | #[tracing::instrument(ret, err, skip(ctx))] |
| 640 | async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> { | 690 | async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> { |
| 691 | tracing::info!("cleaning network interfaces"); | ||
| 641 | let interface = machine.interface(); | 692 | let interface = machine.interface(); |
| 642 | let mut script = String::default(); | 693 | let mut script = String::default(); |
| 643 | script.push_str(&format!( | 694 | script.push_str(&format!( |
| @@ -653,7 +704,8 @@ async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> { | |||
| 653 | script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n"); | 704 | script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n"); |
| 654 | script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n"); | 705 | script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n"); |
| 655 | script.push_str("nft delete table oar-p2p 2>/dev/null || true\n"); | 706 | script.push_str("nft delete table oar-p2p 2>/dev/null || true\n"); |
| 656 | let output = machine_net_container_run_script(&ctx, machine, &script).await?; | 707 | machine_net_container_run_script(&ctx, machine, &script).await?; |
| 708 | tracing::info!("network interfaces clean"); | ||
| 657 | Ok(()) | 709 | Ok(()) |
| 658 | } | 710 | } |
| 659 | 711 | ||
| @@ -682,11 +734,15 @@ fn machine_configuration_script(config: &MachineConfig) -> String { | |||
| 682 | script | 734 | script |
| 683 | } | 735 | } |
| 684 | 736 | ||
| 685 | #[tracing::instrument(ret, err, skip_all, fields(machine = config.machine.to_string()))] | 737 | #[tracing::instrument(ret, err, skip_all, fields(machine = ?config.machine))] |
| 686 | async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> { | 738 | async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> { |
| 739 | tracing::info!( | ||
| 740 | "configuring machine with {} addresses", | ||
| 741 | config.addresses.len() | ||
| 742 | ); | ||
| 687 | let script = machine_configuration_script(config); | 743 | let script = machine_configuration_script(config); |
| 688 | tracing::debug!("configuration script:\n{script}"); | ||
| 689 | machine_net_container_run_script(ctx, config.machine, &script).await?; | 744 | machine_net_container_run_script(ctx, config.machine, &script).await?; |
| 745 | tracing::info!("machine configured"); | ||
| 690 | Ok(()) | 746 | Ok(()) |
| 691 | } | 747 | } |
| 692 | 748 | ||
| @@ -815,192 +871,3 @@ fn machine_generate_configs( | |||
| 815 | } | 871 | } |
| 816 | configs | 872 | configs |
| 817 | } | 873 | } |
| 818 | |||
| 819 | async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | ||
| 820 | match ctx.node { | ||
| 821 | ExecutionNode::Frontend => { | ||
| 822 | let job_id = match ctx.job_id { | ||
| 823 | Some(job_id) => job_id, | ||
| 824 | None => return Err(eyre::eyre!("job id is required when running from cluster")), | ||
| 825 | }; | ||
| 826 | |||
| 827 | let output = Command::new("oarstat") | ||
| 828 | .arg("-j") | ||
| 829 | .arg(job_id.to_string()) | ||
| 830 | .arg("-J") | ||
| 831 | .output() | ||
| 832 | .await?; | ||
| 833 | |||
| 834 | if !output.status.success() { | ||
| 835 | tracing::error!( | ||
| 836 | "stdout: {}", | ||
| 837 | std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8") | ||
| 838 | ); | ||
| 839 | tracing::error!( | ||
| 840 | "stderr: {}", | ||
| 841 | std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8") | ||
| 842 | ); | ||
| 843 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 844 | } | ||
| 845 | |||
| 846 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 847 | extract_machines_from_oar_stat_json(&stdout, job_id) | ||
| 848 | } | ||
| 849 | ExecutionNode::Unknown => { | ||
| 850 | let frontend_hostname = match ctx.frontend_hostname.as_ref() { | ||
| 851 | Some(hostname) => hostname, | ||
| 852 | None => { | ||
| 853 | return Err(eyre::eyre!( | ||
| 854 | "frontend hostname is required when running from outside the cluster" | ||
| 855 | )); | ||
| 856 | } | ||
| 857 | }; | ||
| 858 | |||
| 859 | let job_id = match ctx.job_id { | ||
| 860 | Some(job_id) => job_id, | ||
| 861 | None => return Err(eyre::eyre!("job id is required when running from cluster")), | ||
| 862 | }; | ||
| 863 | |||
| 864 | let output = Command::new("ssh") | ||
| 865 | .arg(frontend_hostname) | ||
| 866 | .arg("oarstat") | ||
| 867 | .arg("-j") | ||
| 868 | .arg(job_id.to_string()) | ||
| 869 | .arg("-J") | ||
| 870 | .output() | ||
| 871 | .await?; | ||
| 872 | |||
| 873 | if !output.status.success() { | ||
| 874 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 875 | } | ||
| 876 | |||
| 877 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 878 | extract_machines_from_oar_stat_json(&stdout, job_id) | ||
| 879 | } | ||
| 880 | ExecutionNode::Machine(_) => { | ||
| 881 | let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?; | ||
| 882 | let content = tokio::fs::read_to_string(&nodefile).await?; | ||
| 883 | let unique_lines = content | ||
| 884 | .lines() | ||
| 885 | .map(|l| l.trim()) | ||
| 886 | .filter(|l| !l.is_empty()) | ||
| 887 | .collect::<HashSet<_>>(); | ||
| 888 | let mut machines = Vec::default(); | ||
| 889 | for hostname in unique_lines { | ||
| 890 | let machine = match Machine::from_hostname(hostname) { | ||
| 891 | Some(machine) => machine, | ||
| 892 | None => return Err(eyre::eyre!("unknown machine: {hostname}")), | ||
| 893 | }; | ||
| 894 | machines.push(machine); | ||
| 895 | } | ||
| 896 | Ok(machines) | ||
| 897 | } | ||
| 898 | } | ||
| 899 | } | ||
| 900 | |||
| 901 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { | ||
| 902 | #[derive(Debug, Deserialize)] | ||
| 903 | struct JobSchema { | ||
| 904 | assigned_network_address: Vec<String>, | ||
| 905 | } | ||
| 906 | let map = serde_json::from_str::<HashMap<String, JobSchema>>(output)?; | ||
| 907 | let key = job_id.to_string(); | ||
| 908 | let data = map | ||
| 909 | .get(&key) | ||
| 910 | .ok_or_else(|| eyre::eyre!("missing job key"))?; | ||
| 911 | let mut machines = Vec::default(); | ||
| 912 | for hostname in data.assigned_network_address.iter() { | ||
| 913 | match Machine::from_hostname(hostname) { | ||
| 914 | Some(machine) => machines.push(machine), | ||
| 915 | None => return Err(eyre::eyre!("unknown machine: '{hostname}'")), | ||
| 916 | } | ||
| 917 | } | ||
| 918 | Ok(machines) | ||
| 919 | } | ||
| 920 | |||
| 921 | async fn get_execution_node() -> Result<ExecutionNode> { | ||
| 922 | let hostname = get_hostname().await?; | ||
| 923 | let node = match hostname.as_str() { | ||
| 924 | "frontend" => ExecutionNode::Frontend, | ||
| 925 | _ => match Machine::from_hostname(&hostname) { | ||
| 926 | Some(machine) => ExecutionNode::Machine(machine), | ||
| 927 | _ => ExecutionNode::Unknown, | ||
| 928 | }, | ||
| 929 | }; | ||
| 930 | Ok(node) | ||
| 931 | } | ||
| 932 | |||
| 933 | async fn get_hostname() -> Result<String> { | ||
| 934 | if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await { | ||
| 935 | Ok(hostname) | ||
| 936 | } else { | ||
| 937 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | ||
| 938 | } | ||
| 939 | } | ||
| 940 | |||
| 941 | #[cfg(test)] | ||
| 942 | mod test { | ||
| 943 | use super::*; | ||
| 944 | |||
| 945 | const OAR_STAT_JSON_JOB_ID: u32 = 36627; | ||
| 946 | const OAR_STAT_JSON_OUTPUT: &'static str = r#" | ||
| 947 | { | ||
| 948 | "36627" : { | ||
| 949 | "types" : [], | ||
| 950 | "reservation" : "None", | ||
| 951 | "dependencies" : [], | ||
| 952 | "Job_Id" : 36627, | ||
| 953 | "assigned_network_address" : [ | ||
| 954 | "gengar-1", | ||
| 955 | "gengar-2" | ||
| 956 | ], | ||
| 957 | "owner" : "diogo464", | ||
| 958 | "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'", | ||
| 959 | "startTime" : 1751979909, | ||
| 960 | "cpuset_name" : "diogo464_36627", | ||
| 961 | "stderr_file" : "OAR.36627.stderr", | ||
| 962 | "queue" : "default", | ||
| 963 | "state" : "Running", | ||
| 964 | "stdout_file" : "OAR.36627.stdout", | ||
| 965 | "array_index" : 1, | ||
| 966 | "array_id" : 36627, | ||
| 967 | "assigned_resources" : [ | ||
| 968 | 419, | ||
| 969 | 420, | ||
| 970 | 421, | ||
| 971 | 422, | ||
| 972 | 423, | ||
| 973 | 424, | ||
| 974 | 425, | ||
| 975 | 426, | ||
| 976 | 427, | ||
| 977 | 428, | ||
| 978 | 429, | ||
| 979 | 430, | ||
| 980 | 431, | ||
| 981 | 432, | ||
| 982 | 433, | ||
| 983 | 434 | ||
| 984 | ], | ||
| 985 | "name" : null, | ||
| 986 | "resubmit_job_id" : 0, | ||
| 987 | "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)", | ||
| 988 | "launchingDirectory" : "/home/diogo464", | ||
| 989 | "jobType" : "PASSIVE", | ||
| 990 | "submissionTime" : 1751979897, | ||
| 991 | "project" : "default", | ||
| 992 | "command" : "sleep 365d" | ||
| 993 | } | ||
| 994 | } | ||
| 995 | "#; | ||
| 996 | |||
| 997 | #[test] | ||
| 998 | fn test_extract_machines_from_oar_stat_json() { | ||
| 999 | let machines = | ||
| 1000 | extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID) | ||
| 1001 | .unwrap(); | ||
| 1002 | assert_eq!(machines.len(), 2); | ||
| 1003 | assert_eq!(machines[0], Machine::Gengar1); | ||
| 1004 | assert_eq!(machines[1], Machine::Gengar2); | ||
| 1005 | } | ||
| 1006 | } | ||
diff --git a/src/oar.rs b/src/oar.rs new file mode 100644 index 0000000..d2018b8 --- /dev/null +++ b/src/oar.rs | |||
| @@ -0,0 +1,164 @@ | |||
| 1 | use std::collections::{HashMap, HashSet}; | ||
| 2 | |||
| 3 | use eyre::{Context as _, Result}; | ||
| 4 | use serde::Deserialize; | ||
| 5 | use tokio::process::Command; | ||
| 6 | |||
| 7 | use crate::{ | ||
| 8 | context::{Context, ExecutionNode}, | ||
| 9 | machine::Machine, | ||
| 10 | }; | ||
| 11 | |||
| 12 | pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | ||
| 13 | match ctx.node { | ||
| 14 | ExecutionNode::Frontend => { | ||
| 15 | let job_id = ctx.job_id()?; | ||
| 16 | let output = Command::new("oarstat") | ||
| 17 | .arg("-j") | ||
| 18 | .arg(job_id.to_string()) | ||
| 19 | .arg("-J") | ||
| 20 | .output() | ||
| 21 | .await?; | ||
| 22 | |||
| 23 | if !output.status.success() { | ||
| 24 | tracing::error!( | ||
| 25 | "stdout: {}", | ||
| 26 | std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8") | ||
| 27 | ); | ||
| 28 | tracing::error!( | ||
| 29 | "stderr: {}", | ||
| 30 | std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8") | ||
| 31 | ); | ||
| 32 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 33 | } | ||
| 34 | |||
| 35 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 36 | extract_machines_from_oar_stat_json(&stdout, job_id) | ||
| 37 | } | ||
| 38 | ExecutionNode::Unknown => { | ||
| 39 | let job_id = ctx.job_id()?; | ||
| 40 | let frontend_hostname = ctx.frontend_hostname()?; | ||
| 41 | |||
| 42 | let output = Command::new("ssh") | ||
| 43 | .arg(frontend_hostname) | ||
| 44 | .arg("oarstat") | ||
| 45 | .arg("-j") | ||
| 46 | .arg(job_id.to_string()) | ||
| 47 | .arg("-J") | ||
| 48 | .output() | ||
| 49 | .await?; | ||
| 50 | |||
| 51 | if !output.status.success() { | ||
| 52 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 53 | } | ||
| 54 | |||
| 55 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 56 | extract_machines_from_oar_stat_json(&stdout, job_id) | ||
| 57 | } | ||
| 58 | ExecutionNode::Machine(_) => { | ||
| 59 | let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?; | ||
| 60 | let content = tokio::fs::read_to_string(&nodefile).await?; | ||
| 61 | let unique_lines = content | ||
| 62 | .lines() | ||
| 63 | .map(|l| l.trim()) | ||
| 64 | .filter(|l| !l.is_empty()) | ||
| 65 | .collect::<HashSet<_>>(); | ||
| 66 | let mut machines = Vec::default(); | ||
| 67 | for hostname in unique_lines { | ||
| 68 | let machine = match Machine::from_hostname(hostname) { | ||
| 69 | Some(machine) => machine, | ||
| 70 | None => return Err(eyre::eyre!("unknown machine: {hostname}")), | ||
| 71 | }; | ||
| 72 | machines.push(machine); | ||
| 73 | } | ||
| 74 | Ok(machines) | ||
| 75 | } | ||
| 76 | } | ||
| 77 | } | ||
| 78 | |||
| 79 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { | ||
| 80 | #[derive(Debug, Deserialize)] | ||
| 81 | struct JobSchema { | ||
| 82 | assigned_network_address: Vec<String>, | ||
| 83 | } | ||
| 84 | let map = serde_json::from_str::<HashMap<String, JobSchema>>(output)?; | ||
| 85 | let key = job_id.to_string(); | ||
| 86 | let data = map | ||
| 87 | .get(&key) | ||
| 88 | .ok_or_else(|| eyre::eyre!("missing job key"))?; | ||
| 89 | let mut machines = Vec::default(); | ||
| 90 | for hostname in data.assigned_network_address.iter() { | ||
| 91 | match Machine::from_hostname(hostname) { | ||
| 92 | Some(machine) => machines.push(machine), | ||
| 93 | None => return Err(eyre::eyre!("unknown machine: '{hostname}'")), | ||
| 94 | } | ||
| 95 | } | ||
| 96 | Ok(machines) | ||
| 97 | } | ||
| 98 | |||
| 99 | #[cfg(test)] | ||
| 100 | mod test { | ||
| 101 | use super::*; | ||
| 102 | |||
| 103 | const OAR_STAT_JSON_JOB_ID: u32 = 36627; | ||
| 104 | const OAR_STAT_JSON_OUTPUT: &'static str = r#" | ||
| 105 | { | ||
| 106 | "36627" : { | ||
| 107 | "types" : [], | ||
| 108 | "reservation" : "None", | ||
| 109 | "dependencies" : [], | ||
| 110 | "Job_Id" : 36627, | ||
| 111 | "assigned_network_address" : [ | ||
| 112 | "gengar-1", | ||
| 113 | "gengar-2" | ||
| 114 | ], | ||
| 115 | "owner" : "diogo464", | ||
| 116 | "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'", | ||
| 117 | "startTime" : 1751979909, | ||
| 118 | "cpuset_name" : "diogo464_36627", | ||
| 119 | "stderr_file" : "OAR.36627.stderr", | ||
| 120 | "queue" : "default", | ||
| 121 | "state" : "Running", | ||
| 122 | "stdout_file" : "OAR.36627.stdout", | ||
| 123 | "array_index" : 1, | ||
| 124 | "array_id" : 36627, | ||
| 125 | "assigned_resources" : [ | ||
| 126 | 419, | ||
| 127 | 420, | ||
| 128 | 421, | ||
| 129 | 422, | ||
| 130 | 423, | ||
| 131 | 424, | ||
| 132 | 425, | ||
| 133 | 426, | ||
| 134 | 427, | ||
| 135 | 428, | ||
| 136 | 429, | ||
| 137 | 430, | ||
| 138 | 431, | ||
| 139 | 432, | ||
| 140 | 433, | ||
| 141 | 434 | ||
| 142 | ], | ||
| 143 | "name" : null, | ||
| 144 | "resubmit_job_id" : 0, | ||
| 145 | "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)", | ||
| 146 | "launchingDirectory" : "/home/diogo464", | ||
| 147 | "jobType" : "PASSIVE", | ||
| 148 | "submissionTime" : 1751979897, | ||
| 149 | "project" : "default", | ||
| 150 | "command" : "sleep 365d" | ||
| 151 | } | ||
| 152 | } | ||
| 153 | "#; | ||
| 154 | |||
| 155 | #[test] | ||
| 156 | fn test_extract_machines_from_oar_stat_json() { | ||
| 157 | let machines = | ||
| 158 | extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID) | ||
| 159 | .unwrap(); | ||
| 160 | assert_eq!(machines.len(), 2); | ||
| 161 | assert_eq!(machines[0], Machine::Gengar1); | ||
| 162 | assert_eq!(machines[1], Machine::Gengar2); | ||
| 163 | } | ||
| 164 | } | ||
