diff options
| author | diogo464 <[email protected]> | 2025-07-09 14:24:25 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-09 14:24:25 +0100 |
| commit | a5178fbb0bde3ff9f863ef0cca48748cb993390a (patch) | |
| tree | 9448cd051c4909f8fb4e1afaff1b9dda3278a5b2 /src/main.rs | |
| parent | 8018cd7a378baee5c5e1fab85bba2592c9244c72 (diff) | |
rust init snapshot
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..dfbddc0 --- /dev/null +++ b/src/main.rs | |||
| @@ -0,0 +1,252 @@ | |||
| 1 | use std::collections::{HashMap, HashSet}; | ||
| 2 | |||
| 3 | use clap::Parser; | ||
| 4 | use eyre::Context as _; | ||
| 5 | use machine::Machine; | ||
| 6 | use serde::Deserialize; | ||
| 7 | use tokio::process::Command; | ||
| 8 | |||
| 9 | pub mod latency_matrix; | ||
| 10 | pub mod machine; | ||
| 11 | |||
| 12 | #[derive(Debug, Parser)] | ||
| 13 | pub struct Args { | ||
| 14 | #[clap(long, env = "OAR_JOB_ID")] | ||
| 15 | pub job_id: Option<u32>, | ||
| 16 | #[clap(long, env = "FRONTEND_HOSTNAME")] | ||
| 17 | pub frontend_hostname: Option<String>, | ||
| 18 | } | ||
| 19 | |||
| 20 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||
| 21 | pub enum ExecutionNode { | ||
| 22 | Frontend, | ||
| 23 | Machine(Machine), | ||
| 24 | Unknown, | ||
| 25 | } | ||
| 26 | |||
| 27 | pub struct Context { | ||
| 28 | pub node: ExecutionNode, | ||
| 29 | pub job_id: Option<u32>, | ||
| 30 | pub frontend_hostname: Option<String>, | ||
| 31 | } | ||
| 32 | |||
| 33 | pub struct MachineConfig { | ||
| 34 | machine | ||
| 35 | } | ||
| 36 | |||
| 37 | #[tokio::main] | ||
| 38 | async fn main() -> eyre::Result<()> { | ||
| 39 | tracing_subscriber::fmt::init(); | ||
| 40 | color_eyre::install()?; | ||
| 41 | |||
| 42 | let args = Args::parse(); | ||
| 43 | let node = get_execution_node()?; | ||
| 44 | let context = Context { | ||
| 45 | node, | ||
| 46 | job_id: args.job_id, | ||
| 47 | frontend_hostname: args.frontend_hostname, | ||
| 48 | }; | ||
| 49 | let machines = list_job_machines(&context).await?; | ||
| 50 | |||
| 51 | // listing oar job machines | ||
| 52 | // if we are in the frontend we use oarstat | ||
| 53 | // if we are in a job machine we read the cpuset file | ||
| 54 | // if we are outside the cluster we use ssh cluster oarstat | ||
| 55 | |||
| 56 | // machine generate configurations | ||
| 57 | // this does not require any connections | ||
| 58 | |||
| 59 | // machine cleanup interface | ||
| 60 | // requires running commands inside a container inside a machine | ||
| 61 | // we could generate a cleanup bash script and execute that in oneshot | ||
| 62 | |||
| 63 | // machine apply configuration | ||
| 64 | // this also requires running some scripts inside a container inside the machine | ||
| 65 | |||
| 66 | Ok(()) | ||
| 67 | } | ||
| 68 | |||
| 69 | async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | ||
| 70 | match ctx.node { | ||
| 71 | ExecutionNode::Frontend => { | ||
| 72 | let job_id = match ctx.job_id { | ||
| 73 | Some(job_id) => job_id, | ||
| 74 | None => return Err(eyre::eyre!("job id is required when running from cluster")), | ||
| 75 | }; | ||
| 76 | |||
| 77 | let output = Command::new("oarstat") | ||
| 78 | .arg("-j") | ||
| 79 | .arg(job_id.to_string()) | ||
| 80 | .arg("-J") | ||
| 81 | .output() | ||
| 82 | .await?; | ||
| 83 | |||
| 84 | if !output.status.success() { | ||
| 85 | tracing::error!( | ||
| 86 | "stdout: {}", | ||
| 87 | std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8") | ||
| 88 | ); | ||
| 89 | tracing::error!( | ||
| 90 | "stderr: {}", | ||
| 91 | std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8") | ||
| 92 | ); | ||
| 93 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 94 | } | ||
| 95 | |||
| 96 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 97 | extract_machines_from_oar_stat_json(&stdout, job_id) | ||
| 98 | } | ||
| 99 | ExecutionNode::Unknown => { | ||
| 100 | let frontend_hostname = match ctx.frontend_hostname.as_ref() { | ||
| 101 | Some(hostname) => hostname, | ||
| 102 | None => { | ||
| 103 | return Err(eyre::eyre!( | ||
| 104 | "frontend hostname is requiredwhen running from outside the cluster" | ||
| 105 | )); | ||
| 106 | } | ||
| 107 | }; | ||
| 108 | |||
| 109 | let job_id = match ctx.job_id { | ||
| 110 | Some(job_id) => job_id, | ||
| 111 | None => return Err(eyre::eyre!("job id is required when running from cluster")), | ||
| 112 | }; | ||
| 113 | |||
| 114 | let output = Command::new("ssh") | ||
| 115 | .arg(frontend_hostname) | ||
| 116 | .arg("oarstat") | ||
| 117 | .arg("-j") | ||
| 118 | .arg(job_id.to_string()) | ||
| 119 | .arg("-J") | ||
| 120 | .output() | ||
| 121 | .await?; | ||
| 122 | |||
| 123 | if !output.status.success() { | ||
| 124 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 125 | } | ||
| 126 | |||
| 127 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 128 | extract_machines_from_oar_stat_json(&stdout, job_id) | ||
| 129 | } | ||
| 130 | ExecutionNode::Machine(_) => { | ||
| 131 | let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?; | ||
| 132 | let content = tokio::fs::read_to_string(&nodefile).await?; | ||
| 133 | let unique_lines = content | ||
| 134 | .lines() | ||
| 135 | .map(|l| l.trim()) | ||
| 136 | .filter(|l| !l.is_empty()) | ||
| 137 | .collect::<HashSet<_>>(); | ||
| 138 | let mut machines = Vec::default(); | ||
| 139 | for hostname in unique_lines { | ||
| 140 | let machine = match Machine::from_hostname(hostname) { | ||
| 141 | Some(machine) => machine, | ||
| 142 | None => return Err(eyre::eyre!("unknown machine: {hostname}")), | ||
| 143 | }; | ||
| 144 | machines.push(machine); | ||
| 145 | } | ||
| 146 | Ok(machines) | ||
| 147 | } | ||
| 148 | } | ||
| 149 | } | ||
| 150 | |||
| 151 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Result<Vec<Machine>> { | ||
| 152 | #[derive(Debug, Deserialize)] | ||
| 153 | struct JobSchema { | ||
| 154 | assigned_network_address: Vec<String>, | ||
| 155 | } | ||
| 156 | let map = serde_json::from_str::<HashMap<String, JobSchema>>(output)?; | ||
| 157 | let key = job_id.to_string(); | ||
| 158 | let data = map | ||
| 159 | .get(&key) | ||
| 160 | .ok_or_else(|| eyre::eyre!("missing job key"))?; | ||
| 161 | let mut machines = Vec::default(); | ||
| 162 | for hostname in data.assigned_network_address.iter() { | ||
| 163 | match Machine::from_hostname(hostname) { | ||
| 164 | Some(machine) => machines.push(machine), | ||
| 165 | None => return Err(eyre::eyre!("unknown machine: '{hostname}'")), | ||
| 166 | } | ||
| 167 | } | ||
| 168 | Ok(machines) | ||
| 169 | } | ||
| 170 | |||
| 171 | fn get_execution_node() -> eyre::Result<ExecutionNode> { | ||
| 172 | let hostname = get_hostname()?; | ||
| 173 | let node = match hostname.as_str() { | ||
| 174 | "frontend" => ExecutionNode::Frontend, | ||
| 175 | _ => match Machine::from_hostname(&hostname) { | ||
| 176 | Some(machine) => ExecutionNode::Machine(machine), | ||
| 177 | _ => ExecutionNode::Unknown, | ||
| 178 | }, | ||
| 179 | }; | ||
| 180 | Ok(node) | ||
| 181 | } | ||
| 182 | |||
| 183 | fn get_hostname() -> eyre::Result<String> { | ||
| 184 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | ||
| 185 | } | ||
| 186 | |||
| 187 | #[cfg(test)] | ||
| 188 | mod test { | ||
| 189 | use super::*; | ||
| 190 | |||
| 191 | const OAR_STAT_JSON_JOB_ID: u32 = 36627; | ||
| 192 | const OAR_STAT_JSON_OUTPUT: &'static str = r#" | ||
| 193 | { | ||
| 194 | "36627" : { | ||
| 195 | "types" : [], | ||
| 196 | "reservation" : "None", | ||
| 197 | "dependencies" : [], | ||
| 198 | "Job_Id" : 36627, | ||
| 199 | "assigned_network_address" : [ | ||
| 200 | "gengar-1", | ||
| 201 | "gengar-2" | ||
| 202 | ], | ||
| 203 | "owner" : "diogo464", | ||
| 204 | "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'", | ||
| 205 | "startTime" : 1751979909, | ||
| 206 | "cpuset_name" : "diogo464_36627", | ||
| 207 | "stderr_file" : "OAR.36627.stderr", | ||
| 208 | "queue" : "default", | ||
| 209 | "state" : "Running", | ||
| 210 | "stdout_file" : "OAR.36627.stdout", | ||
| 211 | "array_index" : 1, | ||
| 212 | "array_id" : 36627, | ||
| 213 | "assigned_resources" : [ | ||
| 214 | 419, | ||
| 215 | 420, | ||
| 216 | 421, | ||
| 217 | 422, | ||
| 218 | 423, | ||
| 219 | 424, | ||
| 220 | 425, | ||
| 221 | 426, | ||
| 222 | 427, | ||
| 223 | 428, | ||
| 224 | 429, | ||
| 225 | 430, | ||
| 226 | 431, | ||
| 227 | 432, | ||
| 228 | 433, | ||
| 229 | 434 | ||
| 230 | ], | ||
| 231 | "name" : null, | ||
| 232 | "resubmit_job_id" : 0, | ||
| 233 | "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)", | ||
| 234 | "launchingDirectory" : "/home/diogo464", | ||
| 235 | "jobType" : "PASSIVE", | ||
| 236 | "submissionTime" : 1751979897, | ||
| 237 | "project" : "default", | ||
| 238 | "command" : "sleep 365d" | ||
| 239 | } | ||
| 240 | } | ||
| 241 | "#; | ||
| 242 | |||
| 243 | #[test] | ||
| 244 | fn test_extract_machines_from_oar_stat_json() { | ||
| 245 | let machines = | ||
| 246 | extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID) | ||
| 247 | .unwrap(); | ||
| 248 | assert_eq!(machines.len(), 2); | ||
| 249 | assert_eq!(machines[0], Machine::Gengar1); | ||
| 250 | assert_eq!(machines[1], Machine::Gengar2); | ||
| 251 | } | ||
| 252 | } | ||
