aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-10 22:05:59 +0100
committerdiogo464 <[email protected]>2025-07-10 22:05:59 +0100
commit66a0329b201a19d2a6d87b265b6fde3423a917c0 (patch)
tree62ad156e4c3e378dc64fec38542657eaa9ea0c9c
parent4fc26211fda53023f8ce703ccf4b1a2bbfbbe10a (diff)
clean enough for now
-rw-r--r--src/context.rs58
-rw-r--r--src/machine.rs24
-rw-r--r--src/main.rs507
-rw-r--r--src/oar.rs164
4 files changed, 433 insertions, 320 deletions
diff --git a/src/context.rs b/src/context.rs
new file mode 100644
index 0000000..323a8a5
--- /dev/null
+++ b/src/context.rs
@@ -0,0 +1,58 @@
1use eyre::{Context as _, Result};
2
3use crate::machine::Machine;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
6pub enum ExecutionNode {
7 Frontend,
8 Machine(Machine),
9 Unknown,
10}
11
12#[derive(Debug, Clone)]
13pub struct Context {
14 pub node: ExecutionNode,
15 job_id: Option<u32>,
16 frontend_hostname: Option<String>,
17}
18
19impl Context {
20 pub async fn new(job_id: Option<u32>, frontend_hostname: Option<String>) -> Result<Self> {
21 Ok(Self {
22 node: get_execution_node().await?,
23 job_id,
24 frontend_hostname,
25 })
26 }
27
28 pub fn job_id(&self) -> Result<u32> {
29 self.job_id.ok_or_else(|| eyre::eyre!("missing job id"))
30 }
31
32 pub fn frontend_hostname(&self) -> Result<&str> {
33 self.frontend_hostname
34 .as_ref()
35 .map(|s| s.as_str())
36 .ok_or_else(|| eyre::eyre!("missing frontend hostname"))
37 }
38}
39
40async fn get_execution_node() -> Result<ExecutionNode> {
41 let hostname = get_hostname().await?;
42 let node = match hostname.as_str() {
43 "frontend" => ExecutionNode::Frontend,
44 _ => match Machine::from_hostname(&hostname) {
45 Some(machine) => ExecutionNode::Machine(machine),
46 _ => ExecutionNode::Unknown,
47 },
48 };
49 Ok(node)
50}
51
52async fn get_hostname() -> Result<String> {
53 if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await {
54 Ok(hostname)
55 } else {
56 std::env::var("HOSTNAME").context("reading HOSTNAME env var")
57 }
58}
diff --git a/src/machine.rs b/src/machine.rs
index f223e66..c4a5875 100644
--- a/src/machine.rs
+++ b/src/machine.rs
@@ -1,3 +1,6 @@
1use eyre::{Context as _, Result};
2use futures::{StreamExt as _, stream::FuturesUnordered};
3
1macro_rules! define_machines { 4macro_rules! define_machines {
2 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { 5 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => {
3 #[derive(Debug)] 6 #[derive(Debug)]
@@ -137,3 +140,24 @@ define_machines!(
137 (Sudowoodo1, 55, "sudowoodo-1", 16, todo!()), 140 (Sudowoodo1, 55, "sudowoodo-1", 16, todo!()),
138 (Vulpix1, 56, "vulpix-1", 112, todo!()) 141 (Vulpix1, 56, "vulpix-1", 112, todo!())
139); 142);
143
144pub async fn for_each<F, FUT, RET>(machines: impl IntoIterator<Item = &Machine>, f: F) -> Result<()>
145where
146 F: Fn(Machine) -> FUT,
147 FUT: std::future::Future<Output = Result<RET>>,
148{
149 let mut futures = FuturesUnordered::new();
150
151 for &machine in machines {
152 let fut = f(machine);
153 let fut = async move { (machine, fut.await) };
154 futures.push(fut);
155 }
156
157 while let Some((machine, result)) = futures.next().await {
158 if let Err(err) = result {
159 return Err(err).with_context(|| format!("running task on machine {machine}"));
160 }
161 }
162 Ok(())
163}
diff --git a/src/main.rs b/src/main.rs
index 36c2a9b..f164d47 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,7 +9,6 @@ use std::{
9use clap::{Args, Parser, Subcommand}; 9use clap::{Args, Parser, Subcommand};
10use eyre::Context as _; 10use eyre::Context as _;
11use eyre::Result; 11use eyre::Result;
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use machine::Machine; 12use machine::Machine;
14use serde::Deserialize; 13use serde::Deserialize;
15use tokio::{ 14use tokio::{
@@ -18,10 +17,15 @@ use tokio::{
18 task::JoinSet, 17 task::JoinSet,
19}; 18};
20 19
21use crate::latency_matrix::LatencyMatrix; 20use crate::{
21 context::{Context, ExecutionNode},
22 latency_matrix::LatencyMatrix,
23};
22 24
25pub mod context;
23pub mod latency_matrix; 26pub mod latency_matrix;
24pub mod machine; 27pub mod machine;
28pub mod oar;
25 29
26const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking"; 30const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking";
27 31
@@ -105,20 +109,6 @@ struct RunArgs {
105 schedule: Option<PathBuf>, 109 schedule: Option<PathBuf>,
106} 110}
107 111
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
109enum ExecutionNode {
110 Frontend,
111 Machine(Machine),
112 Unknown,
113}
114
115#[derive(Debug, Clone)]
116struct Context {
117 node: ExecutionNode,
118 job_id: Option<u32>,
119 frontend_hostname: Option<String>,
120}
121
122#[derive(Debug, Clone)] 112#[derive(Debug, Clone)]
123struct MachineConfig { 113struct MachineConfig {
124 machine: Machine, 114 machine: Machine,
@@ -152,12 +142,7 @@ async fn main() -> Result<()> {
152} 142}
153 143
154async fn context_from_common(common: &Common) -> Result<Context> { 144async fn context_from_common(common: &Common) -> Result<Context> {
155 let node = get_execution_node().await?; 145 Context::new(common.job_id, common.frontend_hostname.clone()).await
156 Ok(Context {
157 node,
158 job_id: common.job_id,
159 frontend_hostname: common.frontend_hostname.clone(),
160 })
161} 146}
162 147
163async fn cmd_net_up(args: NetUpArgs) -> Result<()> { 148async fn cmd_net_up(args: NetUpArgs) -> Result<()> {
@@ -167,7 +152,7 @@ async fn cmd_net_up(args: NetUpArgs) -> Result<()> {
167 .context("reading latecy matrix")?; 152 .context("reading latecy matrix")?;
168 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) 153 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds)
169 .context("parsing latency matrix")?; 154 .context("parsing latency matrix")?;
170 let machines = job_list_machines(&context).await?; 155 let machines = oar::job_list_machines(&context).await?;
171 let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); 156 let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu);
172 machines_net_container_build(&context, &machines).await?; 157 machines_net_container_build(&context, &machines).await?;
173 machines_clean(&context, &machines).await?; 158 machines_clean(&context, &machines).await?;
@@ -177,7 +162,7 @@ async fn cmd_net_up(args: NetUpArgs) -> Result<()> {
177 162
178async fn cmd_net_down(args: NetDownArgs) -> Result<()> { 163async fn cmd_net_down(args: NetDownArgs) -> Result<()> {
179 let context = context_from_common(&args.common).await?; 164 let context = context_from_common(&args.common).await?;
180 let machines = job_list_machines(&context).await?; 165 let machines = oar::job_list_machines(&context).await?;
181 machines_net_container_build(&context, &machines).await?; 166 machines_net_container_build(&context, &machines).await?;
182 machines_clean(&context, &machines).await?; 167 machines_clean(&context, &machines).await?;
183 Ok(()) 168 Ok(())
@@ -185,7 +170,7 @@ async fn cmd_net_down(args: NetDownArgs) -> Result<()> {
185 170
186async fn cmd_net_show(args: NetShowArgs) -> Result<()> { 171async fn cmd_net_show(args: NetShowArgs) -> Result<()> {
187 let context = context_from_common(&args.common).await?; 172 let context = context_from_common(&args.common).await?;
188 let machines = job_list_machines(&context).await?; 173 let machines = oar::job_list_machines(&context).await?;
189 let mut set = JoinSet::default(); 174 let mut set = JoinSet::default();
190 for machine in machines { 175 for machine in machines {
191 let context = context.clone(); 176 let context = context.clone();
@@ -270,7 +255,7 @@ fn parse_schedule(schedule: &str) -> Result<Vec<ScheduledContainer>> {
270 255
271async fn cmd_run(args: RunArgs) -> Result<()> { 256async fn cmd_run(args: RunArgs) -> Result<()> {
272 let ctx = context_from_common(&args.common).await?; 257 let ctx = context_from_common(&args.common).await?;
273 let machines = job_list_machines(&ctx).await?; 258 let machines = oar::job_list_machines(&ctx).await?;
274 let schedule = match args.schedule { 259 let schedule = match args.schedule {
275 Some(path) => tokio::fs::read_to_string(&path) 260 Some(path) => tokio::fs::read_to_string(&path)
276 .await 261 .await
@@ -286,104 +271,57 @@ async fn cmd_run(args: RunArgs) -> Result<()> {
286 }; 271 };
287 let containers = parse_schedule(&schedule)?; 272 let containers = parse_schedule(&schedule)?;
288 273
289 machines_foreach(&machines, |machine| machine_containers_clean(&ctx, machine)).await?; 274 machine::for_each(&machines, |machine| machine_containers_clean(&ctx, machine)).await?;
290 machines_foreach(&machines, |machine| { 275 machine::for_each(&machines, |machine| {
291 let ctx = ctx.clone(); 276 let ctx = ctx.clone();
292 let containers = containers 277 let containers = containers
293 .iter() 278 .iter()
294 .filter(|c| c.machine == machine) 279 .filter(|c| c.machine == machine)
295 .cloned() 280 .cloned()
296 .collect::<Vec<_>>(); 281 .collect::<Vec<_>>();
297 let mut script = String::default(); 282 async move { machine_create_containers(&ctx, machine, &containers).await }
298 for (idx, container) in containers.iter().enumerate() {
299 script.push_str("docker create \\\n");
300 script.push_str("\t--pull=always \\\n");
301 script.push_str("\t--network=host \\\n");
302 script.push_str("\t--restart=no \\\n");
303 script.push_str(&format!("\t--name {} \\\n", container.name));
304 for (key, val) in container.variables.iter() {
305 script.push_str("\t-e ");
306 script.push_str(key);
307 script.push_str("=");
308 script.push_str(val);
309 script.push_str(" \\\n");
310 }
311 script.push_str("\t");
312 script.push_str(&container.image);
313 script.push_str(" &\n");
314 script.push_str(&format!("pid_{idx}=$!\n\n"));
315 }
316
317 for (idx, container) in containers.iter().enumerate() {
318 let name = &container.name;
319 script.push_str(&format!(
320 "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n"
321 ));
322 }
323 tracing::debug!("container creation script:\n{script}");
324 async move { machine_run_script(&ctx, machine, &script).await }
325 }) 283 })
326 .await?; 284 .await?;
327 285
328 tracing::info!("starting all containers on all machines"); 286 tracing::info!("starting all containers on all machines");
329 machines_foreach( 287 machine::for_each(
330 machines 288 machines
331 .iter() 289 .iter()
332 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), 290 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)),
333 |machine| { 291 |machine| machine_start_containers(&ctx, machine),
334 machine_run_script(
335 &ctx,
336 machine,
337 "docker container ls -aq | xargs docker container start",
338 )
339 },
340 ) 292 )
341 .await?; 293 .await?;
342 294
343 tracing::info!("waiting for all containers to exit"); 295 tracing::info!("waiting for all containers to exit");
344 machines_foreach(&machines, |machine| { 296 machine::for_each(&machines, |machine| {
345 let ctx = ctx.clone(); 297 let ctx = ctx.clone();
346 let containers = containers 298 let containers = containers
347 .iter() 299 .iter()
348 .filter(|c| c.machine == machine) 300 .filter(|c| c.machine == machine)
349 .cloned() 301 .cloned()
350 .collect::<Vec<_>>(); 302 .collect::<Vec<_>>();
351 let mut script = String::default(); 303 async move {
352 for container in containers { 304 machine_containers_wait(&ctx, machine, &containers)
353 let name = &container.name; 305 .await
354 script.push_str(&format!("if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n")); 306 .with_context(|| format!("waiting for containers on {machine}"))
355 script.push_str(&format!("\techo Container {name} failed\n"));
356 script.push_str(&format!("\tdocker logs {name} 2>1\n"));
357 script.push_str("\texit 1\n");
358 script.push_str("fi\n\n");
359 } 307 }
360 script.push_str("exit 0\n");
361 async move { machine_run_script(&ctx, machine, &script).await }
362 }) 308 })
363 .await?; 309 .await?;
364 310
365 tracing::info!("saving logs to disk on all machines"); 311 tracing::info!("saving logs to disk on all machines");
366 machines_foreach(&machines, |machine| { 312 machine::for_each(&machines, |machine| {
367 let ctx = ctx.clone(); 313 let ctx = ctx.clone();
368 let containers = containers 314 let containers = containers
369 .iter() 315 .iter()
370 .filter(|c| c.machine == machine) 316 .filter(|c| c.machine == machine)
371 .cloned() 317 .cloned()
372 .collect::<Vec<_>>(); 318 .collect::<Vec<_>>();
373 let mut script = String::default(); 319 async move { machine_containers_save_logs(&ctx, machine, &containers).await }
374 script.push_str("set -e\n");
375 script.push_str("mkdir -p /tmp/oar-p2p-logs\n");
376 script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n");
377 for container in containers {
378 let name = &container.name;
379 script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n"));
380 }
381 script.push_str("exit 0\n");
382 async move { machine_run_script(&ctx, machine, &script).await }
383 }) 320 })
384 .await?; 321 .await?;
385 322
386 machines_foreach( 323 tracing::info!("copying logs from all machines");
324 machine::for_each(
387 machines 325 machines
388 .iter() 326 .iter()
389 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), 327 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)),
@@ -394,7 +332,121 @@ async fn cmd_run(args: RunArgs) -> Result<()> {
394 Ok(()) 332 Ok(())
395} 333}
396 334
335fn machine_containers_create_script(containers: &[ScheduledContainer]) -> String {
336 let mut script = String::default();
337 for (idx, container) in containers.iter().enumerate() {
338 script.push_str("docker create \\\n");
339 script.push_str("\t--pull=always \\\n");
340 script.push_str("\t--network=host \\\n");
341 script.push_str("\t--restart=no \\\n");
342 script.push_str(&format!("\t--name {} \\\n", container.name));
343 for (key, val) in container.variables.iter() {
344 script.push_str("\t-e ");
345 script.push_str(key);
346 script.push_str("=");
347 script.push_str(val);
348 script.push_str(" \\\n");
349 }
350 script.push_str("\t");
351 script.push_str(&container.image);
352 script.push_str(" &\n");
353 script.push_str(&format!("pid_{idx}=$!\n\n"));
354 }
355
356 for (idx, container) in containers.iter().enumerate() {
357 let name = &container.name;
358 script.push_str(&format!(
359 "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n"
360 ));
361 }
362
363 script
364}
365
366#[tracing::instrument(ret, err, skip(ctx, containers))]
367async fn machine_create_containers(
368 ctx: &Context,
369 machine: Machine,
370 containers: &[ScheduledContainer],
371) -> Result<()> {
372 tracing::info!("creating {} containers", containers.len());
373 let script = machine_containers_create_script(containers);
374 machine_run_script(&ctx, machine, &script).await?;
375 tracing::info!("containers created");
376 Ok(())
377}
378
379#[tracing::instrument(ret, err, skip(ctx))]
380async fn machine_start_containers(ctx: &Context, machine: Machine) -> Result<()> {
381 tracing::info!("starting all containers");
382 machine_run_script(
383 &ctx,
384 machine,
385 "docker container ls -aq | xargs docker container start",
386 )
387 .await?;
388 tracing::info!("all containers started");
389 Ok(())
390}
391
392fn machine_containers_wait_script(containers: &[ScheduledContainer]) -> String {
393 let mut script = String::default();
394 for container in containers {
395 let name = &container.name;
396 script.push_str(&format!(
397 "if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n"
398 ));
399 script.push_str(&format!("\techo Container {name} failed\n"));
400 script.push_str(&format!("\tdocker logs {name} 2>1\n"));
401 script.push_str("\texit 1\n");
402 script.push_str("fi\n\n");
403 }
404 script.push_str("exit 0\n");
405 script
406}
407
408#[tracing::instrument(ret, err, skip(ctx, containers))]
409async fn machine_containers_wait(
410 ctx: &Context,
411 machine: Machine,
412 containers: &[ScheduledContainer],
413) -> Result<()> {
414 tracing::info!("waiting for {} containers to exit", containers.len());
415 let script = machine_containers_wait_script(containers);
416 machine_run_script(ctx, machine, &script).await?;
417 tracing::info!("all containers exited");
418 Ok(())
419}
420
421fn machine_containers_save_logs_script(containers: &[ScheduledContainer]) -> String {
422 let mut script = String::default();
423 script.push_str("set -e\n");
424 script.push_str("mkdir -p /tmp/oar-p2p-logs\n");
425 script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n");
426 for container in containers {
427 let name = &container.name;
428 script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n"));
429 }
430 script.push_str("exit 0\n");
431 script
432}
433
434#[tracing::instrument(ret, err, skip(ctx, containers))]
435async fn machine_containers_save_logs(
436 ctx: &Context,
437 machine: Machine,
438 containers: &[ScheduledContainer],
439) -> Result<()> {
440 tracing::info!("saving logs from {} containers", containers.len());
441 let script = machine_containers_save_logs_script(containers);
442 machine_run_script(&ctx, machine, &script).await?;
443 tracing::info!("logs saved");
444 Ok(())
445}
446
447#[tracing::instrument(ret, err, skip(ctx))]
397async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> { 448async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> {
449 tracing::info!("copying container logs from machine");
398 let scp_common = &[ 450 let scp_common = &[
399 "-o", 451 "-o",
400 "StrictHostKeyChecking=no", 452 "StrictHostKeyChecking=no",
@@ -406,7 +458,7 @@ async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Pat
406 args.extend(scp_common); 458 args.extend(scp_common);
407 if ctx.node == ExecutionNode::Unknown { 459 if ctx.node == ExecutionNode::Unknown {
408 args.push("-J"); 460 args.push("-J");
409 args.push(ctx.frontend_hostname.as_ref().expect("TODO")); 461 args.push(ctx.frontend_hostname()?);
410 } 462 }
411 args.push("-r"); 463 args.push("-r");
412 464
@@ -417,38 +469,15 @@ async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Pat
417 469
418 let output = Command::new("scp").args(args).output().await?; 470 let output = Command::new("scp").args(args).output().await?;
419 output.exit_ok()?; 471 output.exit_ok()?;
472 tracing::info!("logs finished copying");
420 Ok(()) 473 Ok(())
421} 474}
422 475
423async fn machines_foreach<F, FUT, RET>( 476#[tracing::instrument(ret, err, skip(ctx))]
424 machines: impl IntoIterator<Item = &Machine>,
425 f: F,
426) -> Result<()>
427where
428 F: Fn(Machine) -> FUT,
429 FUT: std::future::Future<Output = Result<RET>>,
430{
431 let mut futures = FuturesUnordered::new();
432
433 for &machine in machines {
434 let fut = f(machine);
435 let fut = async move { (machine, fut.await) };
436 futures.push(fut);
437 }
438
439 while let Some((machine, result)) = futures.next().await {
440 if let Err(err) = result {
441 tracing::error!("error on machine {machine}: {err}");
442 return Err(err);
443 }
444 }
445 Ok(())
446}
447
448#[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))]
449async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> { 477async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> {
450 tracing::info!("removing all containers..."); 478 tracing::info!("removing all containers...");
451 machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?; 479 machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?;
480 tracing::info!("all containers removed");
452 Ok(()) 481 Ok(())
453} 482}
454 483
@@ -496,7 +525,9 @@ async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<
496 Ok(()) 525 Ok(())
497} 526}
498 527
528#[tracing::instrument(err, skip(ctx))]
499async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { 529async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> {
530 tracing::info!("listing machine addresses");
500 let interface = machine.interface(); 531 let interface = machine.interface();
501 let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'"); 532 let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'");
502 let output = machine_run_script(ctx, machine, &script).await?; 533 let output = machine_run_script(ctx, machine, &script).await?;
@@ -506,9 +537,11 @@ async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<I
506 tracing::trace!("parsing address from line: '{line}'"); 537 tracing::trace!("parsing address from line: '{line}'");
507 addresses.push(line.parse()?); 538 addresses.push(line.parse()?);
508 } 539 }
540 tracing::trace!("addresses: {addresses:#?}");
509 Ok(addresses) 541 Ok(addresses)
510} 542}
511 543
544#[tracing::instrument(ret, err, level = tracing::Level::TRACE)]
512async fn machine_run( 545async fn machine_run(
513 ctx: &Context, 546 ctx: &Context,
514 machine: Machine, 547 machine: Machine,
@@ -542,7 +575,7 @@ async fn machine_run(
542 } 575 }
543 } 576 }
544 ExecutionNode::Unknown => { 577 ExecutionNode::Unknown => {
545 let frontend = ctx.frontend_hostname.as_ref().unwrap(); 578 let frontend = ctx.frontend_hostname()?;
546 let mut arguments = Vec::default(); 579 let mut arguments = Vec::default();
547 arguments.push("ssh"); 580 arguments.push("ssh");
548 arguments.extend(ssh_common); 581 arguments.extend(ssh_common);
@@ -557,6 +590,7 @@ async fn machine_run(
557 } 590 }
558 arguments.extend(args); 591 arguments.extend(args);
559 592
593 tracing::trace!("running command: {arguments:?}");
560 let mut proc = Command::new(arguments[0]) 594 let mut proc = Command::new(arguments[0])
561 .args(&arguments[1..]) 595 .args(&arguments[1..])
562 .stdout(std::process::Stdio::piped()) 596 .stdout(std::process::Stdio::piped())
@@ -582,16 +616,17 @@ async fn machine_run(
582} 616}
583 617
584async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> { 618async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> {
585 tracing::trace!("running script on machine {machine}:\n{script}"); 619 tracing::debug!("script body:\n{script}");
586 let output = machine_run(ctx, machine, &[], Some(script)).await?; 620 let output = machine_run(ctx, machine, &[], Some(script)).await?;
587 tracing::trace!( 621 let stdout = std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>");
588 "stdout:\n{}", 622 let stderr = std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>");
589 std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>") 623 if output.status.success() {
590 ); 624 tracing::trace!("stdout:\n{stdout}",);
591 tracing::trace!( 625 tracing::trace!("stderr:\n{stderr}",);
592 "stderr:\n{}", 626 } else {
593 std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>") 627 tracing::error!("stdout:\n{stdout}",);
594 ); 628 tracing::error!("stderr:\n{stderr}",);
629 }
595 Ok(output.exit_ok()?) 630 Ok(output.exit_ok()?)
596} 631}
597 632
@@ -600,7 +635,8 @@ async fn machine_net_container_run_script(
600 machine: Machine, 635 machine: Machine,
601 script: &str, 636 script: &str,
602) -> Result<Output> { 637) -> Result<Output> {
603 machine_run( 638 tracing::debug!("network container script body:\n{script}");
639 let output = machine_run(
604 ctx, 640 ctx,
605 machine, 641 machine,
606 &[ 642 &[
@@ -614,11 +650,24 @@ async fn machine_net_container_run_script(
614 ], 650 ],
615 Some(script), 651 Some(script),
616 ) 652 )
617 .await 653 .await?;
654
655 let stdout = std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>");
656 let stderr = std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>");
657 if output.status.success() {
658 tracing::trace!("stdout:\n{stdout}",);
659 tracing::trace!("stderr:\n{stderr}",);
660 } else {
661 tracing::error!("stdout:\n{stdout}",);
662 tracing::error!("stderr:\n{stderr}",);
663 }
664
665 Ok(output.exit_ok()?)
618} 666}
619 667
620#[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] 668#[tracing::instrument(ret, err, skip(ctx))]
621async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> { 669async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> {
670 tracing::info!("building network container...");
622 let script = r#" 671 let script = r#"
623set -e 672set -e
624cat << EOF > /tmp/oar-p2p.containerfile 673cat << EOF > /tmp/oar-p2p.containerfile
@@ -633,11 +682,13 @@ EOF
633docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile . 682docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile .
634"#; 683"#;
635 machine_run_script(ctx, machine, script).await?; 684 machine_run_script(ctx, machine, script).await?;
685 tracing::info!("network container built");
636 Ok(()) 686 Ok(())
637} 687}
638 688
639#[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] 689#[tracing::instrument(ret, err, skip(ctx))]
640async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> { 690async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> {
691 tracing::info!("cleaning network interfaces");
641 let interface = machine.interface(); 692 let interface = machine.interface();
642 let mut script = String::default(); 693 let mut script = String::default();
643 script.push_str(&format!( 694 script.push_str(&format!(
@@ -653,7 +704,8 @@ async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> {
653 script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n"); 704 script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n");
654 script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n"); 705 script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n");
655 script.push_str("nft delete table oar-p2p 2>/dev/null || true\n"); 706 script.push_str("nft delete table oar-p2p 2>/dev/null || true\n");
656 let output = machine_net_container_run_script(&ctx, machine, &script).await?; 707 machine_net_container_run_script(&ctx, machine, &script).await?;
708 tracing::info!("network interfaces clean");
657 Ok(()) 709 Ok(())
658} 710}
659 711
@@ -682,11 +734,15 @@ fn machine_configuration_script(config: &MachineConfig) -> String {
682 script 734 script
683} 735}
684 736
685#[tracing::instrument(ret, err, skip_all, fields(machine = config.machine.to_string()))] 737#[tracing::instrument(ret, err, skip_all, fields(machine = ?config.machine))]
686async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> { 738async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> {
739 tracing::info!(
740 "configuring machine with {} addresses",
741 config.addresses.len()
742 );
687 let script = machine_configuration_script(config); 743 let script = machine_configuration_script(config);
688 tracing::debug!("configuration script:\n{script}");
689 machine_net_container_run_script(ctx, config.machine, &script).await?; 744 machine_net_container_run_script(ctx, config.machine, &script).await?;
745 tracing::info!("machine configured");
690 Ok(()) 746 Ok(())
691} 747}
692 748
@@ -815,192 +871,3 @@ fn machine_generate_configs(
815 } 871 }
816 configs 872 configs
817} 873}
818
819async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> {
820 match ctx.node {
821 ExecutionNode::Frontend => {
822 let job_id = match ctx.job_id {
823 Some(job_id) => job_id,
824 None => return Err(eyre::eyre!("job id is required when running from cluster")),
825 };
826
827 let output = Command::new("oarstat")
828 .arg("-j")
829 .arg(job_id.to_string())
830 .arg("-J")
831 .output()
832 .await?;
833
834 if !output.status.success() {
835 tracing::error!(
836 "stdout: {}",
837 std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8")
838 );
839 tracing::error!(
840 "stderr: {}",
841 std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8")
842 );
843 return Err(eyre::eyre!("failed to run oarstat"));
844 }
845
846 let stdout = std::str::from_utf8(&output.stdout)?;
847 extract_machines_from_oar_stat_json(&stdout, job_id)
848 }
849 ExecutionNode::Unknown => {
850 let frontend_hostname = match ctx.frontend_hostname.as_ref() {
851 Some(hostname) => hostname,
852 None => {
853 return Err(eyre::eyre!(
854 "frontend hostname is required when running from outside the cluster"
855 ));
856 }
857 };
858
859 let job_id = match ctx.job_id {
860 Some(job_id) => job_id,
861 None => return Err(eyre::eyre!("job id is required when running from cluster")),
862 };
863
864 let output = Command::new("ssh")
865 .arg(frontend_hostname)
866 .arg("oarstat")
867 .arg("-j")
868 .arg(job_id.to_string())
869 .arg("-J")
870 .output()
871 .await?;
872
873 if !output.status.success() {
874 return Err(eyre::eyre!("failed to run oarstat"));
875 }
876
877 let stdout = std::str::from_utf8(&output.stdout)?;
878 extract_machines_from_oar_stat_json(&stdout, job_id)
879 }
880 ExecutionNode::Machine(_) => {
881 let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?;
882 let content = tokio::fs::read_to_string(&nodefile).await?;
883 let unique_lines = content
884 .lines()
885 .map(|l| l.trim())
886 .filter(|l| !l.is_empty())
887 .collect::<HashSet<_>>();
888 let mut machines = Vec::default();
889 for hostname in unique_lines {
890 let machine = match Machine::from_hostname(hostname) {
891 Some(machine) => machine,
892 None => return Err(eyre::eyre!("unknown machine: {hostname}")),
893 };
894 machines.push(machine);
895 }
896 Ok(machines)
897 }
898 }
899}
900
901fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> {
902 #[derive(Debug, Deserialize)]
903 struct JobSchema {
904 assigned_network_address: Vec<String>,
905 }
906 let map = serde_json::from_str::<HashMap<String, JobSchema>>(output)?;
907 let key = job_id.to_string();
908 let data = map
909 .get(&key)
910 .ok_or_else(|| eyre::eyre!("missing job key"))?;
911 let mut machines = Vec::default();
912 for hostname in data.assigned_network_address.iter() {
913 match Machine::from_hostname(hostname) {
914 Some(machine) => machines.push(machine),
915 None => return Err(eyre::eyre!("unknown machine: '{hostname}'")),
916 }
917 }
918 Ok(machines)
919}
920
921async fn get_execution_node() -> Result<ExecutionNode> {
922 let hostname = get_hostname().await?;
923 let node = match hostname.as_str() {
924 "frontend" => ExecutionNode::Frontend,
925 _ => match Machine::from_hostname(&hostname) {
926 Some(machine) => ExecutionNode::Machine(machine),
927 _ => ExecutionNode::Unknown,
928 },
929 };
930 Ok(node)
931}
932
933async fn get_hostname() -> Result<String> {
934 if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await {
935 Ok(hostname)
936 } else {
937 std::env::var("HOSTNAME").context("reading HOSTNAME env var")
938 }
939}
940
941#[cfg(test)]
942mod test {
943 use super::*;
944
945 const OAR_STAT_JSON_JOB_ID: u32 = 36627;
946 const OAR_STAT_JSON_OUTPUT: &'static str = r#"
947{
948 "36627" : {
949 "types" : [],
950 "reservation" : "None",
951 "dependencies" : [],
952 "Job_Id" : 36627,
953 "assigned_network_address" : [
954 "gengar-1",
955 "gengar-2"
956 ],
957 "owner" : "diogo464",
958 "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'",
959 "startTime" : 1751979909,
960 "cpuset_name" : "diogo464_36627",
961 "stderr_file" : "OAR.36627.stderr",
962 "queue" : "default",
963 "state" : "Running",
964 "stdout_file" : "OAR.36627.stdout",
965 "array_index" : 1,
966 "array_id" : 36627,
967 "assigned_resources" : [
968 419,
969 420,
970 421,
971 422,
972 423,
973 424,
974 425,
975 426,
976 427,
977 428,
978 429,
979 430,
980 431,
981 432,
982 433,
983 434
984 ],
985 "name" : null,
986 "resubmit_job_id" : 0,
987 "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)",
988 "launchingDirectory" : "/home/diogo464",
989 "jobType" : "PASSIVE",
990 "submissionTime" : 1751979897,
991 "project" : "default",
992 "command" : "sleep 365d"
993 }
994}
995"#;
996
997 #[test]
998 fn test_extract_machines_from_oar_stat_json() {
999 let machines =
1000 extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID)
1001 .unwrap();
1002 assert_eq!(machines.len(), 2);
1003 assert_eq!(machines[0], Machine::Gengar1);
1004 assert_eq!(machines[1], Machine::Gengar2);
1005 }
1006}
diff --git a/src/oar.rs b/src/oar.rs
new file mode 100644
index 0000000..d2018b8
--- /dev/null
+++ b/src/oar.rs
@@ -0,0 +1,164 @@
1use std::collections::{HashMap, HashSet};
2
3use eyre::{Context as _, Result};
4use serde::Deserialize;
5use tokio::process::Command;
6
7use crate::{
8 context::{Context, ExecutionNode},
9 machine::Machine,
10};
11
12pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> {
13 match ctx.node {
14 ExecutionNode::Frontend => {
15 let job_id = ctx.job_id()?;
16 let output = Command::new("oarstat")
17 .arg("-j")
18 .arg(job_id.to_string())
19 .arg("-J")
20 .output()
21 .await?;
22
23 if !output.status.success() {
24 tracing::error!(
25 "stdout: {}",
26 std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8")
27 );
28 tracing::error!(
29 "stderr: {}",
30 std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8")
31 );
32 return Err(eyre::eyre!("failed to run oarstat"));
33 }
34
35 let stdout = std::str::from_utf8(&output.stdout)?;
36 extract_machines_from_oar_stat_json(&stdout, job_id)
37 }
38 ExecutionNode::Unknown => {
39 let job_id = ctx.job_id()?;
40 let frontend_hostname = ctx.frontend_hostname()?;
41
42 let output = Command::new("ssh")
43 .arg(frontend_hostname)
44 .arg("oarstat")
45 .arg("-j")
46 .arg(job_id.to_string())
47 .arg("-J")
48 .output()
49 .await?;
50
51 if !output.status.success() {
52 return Err(eyre::eyre!("failed to run oarstat"));
53 }
54
55 let stdout = std::str::from_utf8(&output.stdout)?;
56 extract_machines_from_oar_stat_json(&stdout, job_id)
57 }
58 ExecutionNode::Machine(_) => {
59 let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?;
60 let content = tokio::fs::read_to_string(&nodefile).await?;
61 let unique_lines = content
62 .lines()
63 .map(|l| l.trim())
64 .filter(|l| !l.is_empty())
65 .collect::<HashSet<_>>();
66 let mut machines = Vec::default();
67 for hostname in unique_lines {
68 let machine = match Machine::from_hostname(hostname) {
69 Some(machine) => machine,
70 None => return Err(eyre::eyre!("unknown machine: {hostname}")),
71 };
72 machines.push(machine);
73 }
74 Ok(machines)
75 }
76 }
77}
78
79fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> {
80 #[derive(Debug, Deserialize)]
81 struct JobSchema {
82 assigned_network_address: Vec<String>,
83 }
84 let map = serde_json::from_str::<HashMap<String, JobSchema>>(output)?;
85 let key = job_id.to_string();
86 let data = map
87 .get(&key)
88 .ok_or_else(|| eyre::eyre!("missing job key"))?;
89 let mut machines = Vec::default();
90 for hostname in data.assigned_network_address.iter() {
91 match Machine::from_hostname(hostname) {
92 Some(machine) => machines.push(machine),
93 None => return Err(eyre::eyre!("unknown machine: '{hostname}'")),
94 }
95 }
96 Ok(machines)
97}
98
99#[cfg(test)]
100mod test {
101 use super::*;
102
103 const OAR_STAT_JSON_JOB_ID: u32 = 36627;
104 const OAR_STAT_JSON_OUTPUT: &'static str = r#"
105{
106 "36627" : {
107 "types" : [],
108 "reservation" : "None",
109 "dependencies" : [],
110 "Job_Id" : 36627,
111 "assigned_network_address" : [
112 "gengar-1",
113 "gengar-2"
114 ],
115 "owner" : "diogo464",
116 "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'",
117 "startTime" : 1751979909,
118 "cpuset_name" : "diogo464_36627",
119 "stderr_file" : "OAR.36627.stderr",
120 "queue" : "default",
121 "state" : "Running",
122 "stdout_file" : "OAR.36627.stdout",
123 "array_index" : 1,
124 "array_id" : 36627,
125 "assigned_resources" : [
126 419,
127 420,
128 421,
129 422,
130 423,
131 424,
132 425,
133 426,
134 427,
135 428,
136 429,
137 430,
138 431,
139 432,
140 433,
141 434
142 ],
143 "name" : null,
144 "resubmit_job_id" : 0,
145 "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)",
146 "launchingDirectory" : "/home/diogo464",
147 "jobType" : "PASSIVE",
148 "submissionTime" : 1751979897,
149 "project" : "default",
150 "command" : "sleep 365d"
151 }
152}
153"#;
154
155 #[test]
156 fn test_extract_machines_from_oar_stat_json() {
157 let machines =
158 extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID)
159 .unwrap();
160 assert_eq!(machines.len(), 2);
161 assert_eq!(machines[0], Machine::Gengar1);
162 assert_eq!(machines[1], Machine::Gengar2);
163 }
164}