From a5178fbb0bde3ff9f863ef0cca48748cb993390a Mon Sep 17 00:00:00 2001 From: diogo464 Date: Wed, 9 Jul 2025 14:24:25 +0100 Subject: rust init snapshot --- src/latency_matrix.rs | 99 ++++++++++++++++++++ src/machine.rs | 101 ++++++++++++++++++++ src/main.rs | 252 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 452 insertions(+) create mode 100644 src/latency_matrix.rs create mode 100644 src/machine.rs create mode 100644 src/main.rs (limited to 'src') diff --git a/src/latency_matrix.rs b/src/latency_matrix.rs new file mode 100644 index 0000000..b40df9e --- /dev/null +++ b/src/latency_matrix.rs @@ -0,0 +1,99 @@ +use std::str::FromStr; +use std::time::Duration; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum InvalidLatencyMatrix { + #[error( + "invalid line dimension: line {line} had dimension {dimension} but expected {expected}" + )] + InvalidLineDimension { + line: usize, + dimension: usize, + expected: usize, + }, + #[error("invalid latency value '{value}': {error}")] + InvalidLatencyValue { value: String, error: String }, +} + +pub enum TimeUnit { + Seconds, + Milliseconds, +} + +#[derive(Debug, Clone)] +pub struct LatencyMatrix { + dimension: usize, + latencies: Vec, +} + +impl LatencyMatrix { + fn new(dimension: usize, latencies: Vec) -> Self { + assert_eq!(dimension * dimension, latencies.len()); + Self { + dimension, + latencies, + } + } + + pub fn latency(&self, row: usize, col: usize) -> Duration { + self.latencies[self.dimension * row + col] + } + + pub fn dimension(&self) -> usize { + self.dimension + } + + pub fn parse(content: &str, unit: TimeUnit) -> Result { + let mut dimension = None; + let mut latencies = Vec::default(); + for (line_idx, line) in content.lines().enumerate() { + let line = line.trim(); + if line.is_empty() { + continue; + } + + let mut current_dimension = 0; + for component in line.split_whitespace() { + current_dimension += 1; + let component_value = match component.parse::() { + Ok(value) => value, + Err(err) => { + return Err(InvalidLatencyMatrix::InvalidLatencyValue { + value: component.to_string(), + error: err.to_string(), + }); + } + }; + + latencies.push(Duration::from_secs_f64(match unit { + TimeUnit::Seconds => component_value, + TimeUnit::Milliseconds => component_value / 1000.0, + })); + } + + match dimension { + Some(dimension) => { + if current_dimension != dimension { + return Err(InvalidLatencyMatrix::InvalidLineDimension { + line: line_idx, + dimension: current_dimension, + expected: dimension, + }); + } + } + None => dimension = Some(current_dimension), + } + } + + Ok(Self::new(dimension.unwrap_or(0), latencies)) + } +} + +impl FromStr for LatencyMatrix { + type Err = InvalidLatencyMatrix; + + fn from_str(s: &str) -> Result { + Self::parse(s, TimeUnit::Milliseconds) + } +} diff --git a/src/machine.rs b/src/machine.rs new file mode 100644 index 0000000..f1ad94d --- /dev/null +++ b/src/machine.rs @@ -0,0 +1,101 @@ +macro_rules! define_machines { + ($(($name:ident, $idx:expr, $hostname:expr, $interface:expr)),*) => { + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub enum Machine { + $($name,)* + } + + impl std::fmt::Display for Machine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.hostname()) + } + } + + impl Machine { + pub fn hostname(&self) -> &'static str { + match self { + $(Self::$name => $hostname,)* + } + } + + pub fn index(&self) -> usize { + match self { + $(Self::$name => $idx,)* + } + } + + pub fn from_hostname(hostname: &str) -> Option { + match hostname { + $($hostname => Some(Self::$name),)* + _ => None + } + } + + pub fn interface(&self) -> &'static str { + match self { + $(Self::$name => $interface,)* + } + } + } + }; +} + +define_machines!( + (Alakazam01, 0, "alakazam-01", todo!()), + (Alakazam02, 1, "alakazam-02", todo!()), + (Alakazam03, 2, "alakazam-03", todo!()), + (Alakazam04, 3, "alakazam-04", todo!()), + (Alakazam05, 4, "alakazam-05", todo!()), + (Alakazam06, 5, "alakazam-06", todo!()), + (Alakazam07, 6, "alakazam-07", todo!()), + (Alakazam08, 7, "alakazam-08", todo!()), + (Bulbasaur1, 8, "bulbasaur-1", todo!()), + (Bulbasaur2, 9, "bulbasaur-2", todo!()), + (Bulbasaur3, 10, "bulbasaur-3", todo!()), + (Charmander1, 11, "charmander-1", "bond0"), + (Charmander2, 12, "charmander-2", "bond0"), + (Charmander3, 13, "charmander-3", "bond0"), + (Charmander4, 14, "charmander-4", "bond0"), + (Charmander5, 15, "charmander-5", "bond0"), + (Gengar1, 16, "gengar-1", "bond0"), + (Gengar2, 17, "gengar-2", "bond0"), + (Gengar3, 18, "gengar-3", "bond0"), + (Gengar4, 19, "gengar-4", "bond0"), + (Gengar5, 20, "gengar-5", "bond0"), + (Kadabra01, 21, "kadabra-01", todo!()), + (Kadabra02, 22, "kadabra-02", todo!()), + (Kadabra03, 23, "kadabra-03", todo!()), + (Kadabra04, 24, "kadabra-04", todo!()), + (Kadabra05, 25, "kadabra-05", todo!()), + (Kadabra06, 26, "kadabra-06", todo!()), + (Kadabra07, 27, "kadabra-07", todo!()), + (Kadabra08, 28, "kadabra-08", todo!()), + (Lugia1, 29, "lugia-1", "bond0"), + (Lugia2, 30, "lugia-2", "bond0"), + (Lugia3, 31, "lugia-3", "bond0"), + (Lugia4, 32, "lugia-4", "bond0"), + (Lugia5, 33, "lugia-5", "bond0"), + (Magikarp1, 34, "magikarp-1", todo!()), + (Moltres01, 35, "moltres-01", todo!()), + (Moltres02, 36, "moltres-02", todo!()), + (Moltres03, 37, "moltres-03", todo!()), + (Moltres04, 38, "moltres-04", todo!()), + (Moltres05, 39, "moltres-05", todo!()), + (Moltres06, 40, "moltres-06", todo!()), + (Moltres07, 41, "moltres-07", todo!()), + (Moltres08, 42, "moltres-08", todo!()), + (Moltres09, 43, "moltres-09", todo!()), + (Moltres10, 44, "moltres-10", todo!()), + (Oddish1, 45, "oddish-1", todo!()), + (Psyduck1, 46, "psyduck-1", todo!()), + (Psyduck2, 47, "psyduck-2", todo!()), + (Psyduck3, 48, "psyduck-3", todo!()), + (Shelder1, 49, "shelder-1", todo!()), + (Squirtle1, 50, "squirtle-1", todo!()), + (Squirtle2, 51, "squirtle-2", todo!()), + (Squirtle3, 52, "squirtle-3", todo!()), + (Squirtle4, 53, "squirtle-4", todo!()), + (Staryu1, 54, "staryu-1", todo!()), + (Sudowoodo1, 55, "sudowoodo-1", todo!()), + (Vulpix1, 56, "vulpix-1", todo!()) +); diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..dfbddc0 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,252 @@ +use std::collections::{HashMap, HashSet}; + +use clap::Parser; +use eyre::Context as _; +use machine::Machine; +use serde::Deserialize; +use tokio::process::Command; + +pub mod latency_matrix; +pub mod machine; + +#[derive(Debug, Parser)] +pub struct Args { + #[clap(long, env = "OAR_JOB_ID")] + pub job_id: Option, + #[clap(long, env = "FRONTEND_HOSTNAME")] + pub frontend_hostname: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExecutionNode { + Frontend, + Machine(Machine), + Unknown, +} + +pub struct Context { + pub node: ExecutionNode, + pub job_id: Option, + pub frontend_hostname: Option, +} + +pub struct MachineConfig { + machine +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + tracing_subscriber::fmt::init(); + color_eyre::install()?; + + let args = Args::parse(); + let node = get_execution_node()?; + let context = Context { + node, + job_id: args.job_id, + frontend_hostname: args.frontend_hostname, + }; + let machines = list_job_machines(&context).await?; + + // listing oar job machines + // if we are in the frontend we use oarstat + // if we are in a job machine we read the cpuset file + // if we are outside the cluster we use ssh cluster oarstat + + // machine generate configurations + // this does not require any connections + + // machine cleanup interface + // requires running commands inside a container inside a machine + // we could generate a cleanup bash script and execute that in oneshot + + // machine apply configuration + // this also requires running some scripts inside a container inside the machine + + Ok(()) +} + +async fn list_job_machines(ctx: &Context) -> eyre::Result> { + match ctx.node { + ExecutionNode::Frontend => { + let job_id = match ctx.job_id { + Some(job_id) => job_id, + None => return Err(eyre::eyre!("job id is required when running from cluster")), + }; + + let output = Command::new("oarstat") + .arg("-j") + .arg(job_id.to_string()) + .arg("-J") + .output() + .await?; + + if !output.status.success() { + tracing::error!( + "stdout: {}", + std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8") + ); + tracing::error!( + "stderr: {}", + std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8") + ); + return Err(eyre::eyre!("failed to run oarstat")); + } + + let stdout = std::str::from_utf8(&output.stdout)?; + extract_machines_from_oar_stat_json(&stdout, job_id) + } + ExecutionNode::Unknown => { + let frontend_hostname = match ctx.frontend_hostname.as_ref() { + Some(hostname) => hostname, + None => { + return Err(eyre::eyre!( + "frontend hostname is requiredwhen running from outside the cluster" + )); + } + }; + + let job_id = match ctx.job_id { + Some(job_id) => job_id, + None => return Err(eyre::eyre!("job id is required when running from cluster")), + }; + + let output = Command::new("ssh") + .arg(frontend_hostname) + .arg("oarstat") + .arg("-j") + .arg(job_id.to_string()) + .arg("-J") + .output() + .await?; + + if !output.status.success() { + return Err(eyre::eyre!("failed to run oarstat")); + } + + let stdout = std::str::from_utf8(&output.stdout)?; + extract_machines_from_oar_stat_json(&stdout, job_id) + } + ExecutionNode::Machine(_) => { + let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?; + let content = tokio::fs::read_to_string(&nodefile).await?; + let unique_lines = content + .lines() + .map(|l| l.trim()) + .filter(|l| !l.is_empty()) + .collect::>(); + let mut machines = Vec::default(); + for hostname in unique_lines { + let machine = match Machine::from_hostname(hostname) { + Some(machine) => machine, + None => return Err(eyre::eyre!("unknown machine: {hostname}")), + }; + machines.push(machine); + } + Ok(machines) + } + } +} + +fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Result> { + #[derive(Debug, Deserialize)] + struct JobSchema { + assigned_network_address: Vec, + } + let map = serde_json::from_str::>(output)?; + let key = job_id.to_string(); + let data = map + .get(&key) + .ok_or_else(|| eyre::eyre!("missing job key"))?; + let mut machines = Vec::default(); + for hostname in data.assigned_network_address.iter() { + match Machine::from_hostname(hostname) { + Some(machine) => machines.push(machine), + None => return Err(eyre::eyre!("unknown machine: '{hostname}'")), + } + } + Ok(machines) +} + +fn get_execution_node() -> eyre::Result { + let hostname = get_hostname()?; + let node = match hostname.as_str() { + "frontend" => ExecutionNode::Frontend, + _ => match Machine::from_hostname(&hostname) { + Some(machine) => ExecutionNode::Machine(machine), + _ => ExecutionNode::Unknown, + }, + }; + Ok(node) +} + +fn get_hostname() -> eyre::Result { + std::env::var("HOSTNAME").context("reading HOSTNAME env var") +} + +#[cfg(test)] +mod test { + use super::*; + + const OAR_STAT_JSON_JOB_ID: u32 = 36627; + const OAR_STAT_JSON_OUTPUT: &'static str = r#" +{ + "36627" : { + "types" : [], + "reservation" : "None", + "dependencies" : [], + "Job_Id" : 36627, + "assigned_network_address" : [ + "gengar-1", + "gengar-2" + ], + "owner" : "diogo464", + "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'", + "startTime" : 1751979909, + "cpuset_name" : "diogo464_36627", + "stderr_file" : "OAR.36627.stderr", + "queue" : "default", + "state" : "Running", + "stdout_file" : "OAR.36627.stdout", + "array_index" : 1, + "array_id" : 36627, + "assigned_resources" : [ + 419, + 420, + 421, + 422, + 423, + 424, + 425, + 426, + 427, + 428, + 429, + 430, + 431, + 432, + 433, + 434 + ], + "name" : null, + "resubmit_job_id" : 0, + "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)", + "launchingDirectory" : "/home/diogo464", + "jobType" : "PASSIVE", + "submissionTime" : 1751979897, + "project" : "default", + "command" : "sleep 365d" + } +} +"#; + + #[test] + fn test_extract_machines_from_oar_stat_json() { + let machines = + extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID) + .unwrap(); + assert_eq!(machines.len(), 2); + assert_eq!(machines[0], Machine::Gengar1); + assert_eq!(machines[1], Machine::Gengar2); + } +} -- cgit