diff options
| author | diogo464 <[email protected]> | 2025-07-22 10:20:00 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-22 10:20:00 +0100 |
| commit | 8c90229d6e88c88338261496da50c01e51cac3f5 (patch) | |
| tree | dce2080eb236a8cb44f43c4d12a141204eab3c8a /src | |
| parent | c76875a153eecd7168cbf9f1db2987cc7e90834e (diff) | |
feat: added oar job id inference
Diffstat (limited to 'src')
| -rw-r--r-- | src/context.rs | 29 | ||||
| -rw-r--r-- | src/main.rs | 17 | ||||
| -rw-r--r-- | src/oar.rs | 268 |
3 files changed, 307 insertions, 7 deletions
diff --git a/src/context.rs b/src/context.rs index ae25966..8032212 100644 --- a/src/context.rs +++ b/src/context.rs | |||
| @@ -13,20 +13,43 @@ pub enum ExecutionNode { | |||
| 13 | pub struct Context { | 13 | pub struct Context { |
| 14 | pub node: ExecutionNode, | 14 | pub node: ExecutionNode, |
| 15 | job_id: Option<u32>, | 15 | job_id: Option<u32>, |
| 16 | infer_job_id: bool, | ||
| 16 | frontend_hostname: Option<String>, | 17 | frontend_hostname: Option<String>, |
| 17 | } | 18 | } |
| 18 | 19 | ||
| 19 | impl Context { | 20 | impl Context { |
| 20 | pub async fn new(job_id: Option<u32>, frontend_hostname: Option<String>) -> Result<Self> { | 21 | pub async fn new( |
| 22 | job_id: Option<u32>, | ||
| 23 | infer_job_id: bool, | ||
| 24 | frontend_hostname: Option<String>, | ||
| 25 | ) -> Result<Self> { | ||
| 21 | Ok(Self { | 26 | Ok(Self { |
| 22 | node: get_execution_node().await?, | 27 | node: get_execution_node().await?, |
| 23 | job_id, | 28 | job_id, |
| 29 | infer_job_id, | ||
| 24 | frontend_hostname, | 30 | frontend_hostname, |
| 25 | }) | 31 | }) |
| 26 | } | 32 | } |
| 27 | 33 | ||
| 28 | pub fn job_id(&self) -> Result<u32> { | 34 | pub async fn job_id(&self) -> Result<u32> { |
| 29 | self.job_id.ok_or_else(|| eyre::eyre!("missing job id")) | 35 | tracing::debug!("obtaining job id"); |
| 36 | if let Some(job_id) = self.job_id { | ||
| 37 | tracing::debug!("job id was set, using {job_id}"); | ||
| 38 | Ok(job_id) | ||
| 39 | } else if self.infer_job_id { | ||
| 40 | tracing::debug!("job id was not set but inference is enabled, finding job id"); | ||
| 41 | let job_ids = crate::oar::list_user_job_ids(self).await?; | ||
| 42 | match job_ids.len() { | ||
| 43 | 0 => Err(eyre::eyre!("cannot infer job id, no jobs are running")), | ||
| 44 | 1 => Ok(job_ids[0]), | ||
| 45 | _ => Err(eyre::eyre!( | ||
| 46 | "cannot infer job id, multiple jobs are running" | ||
| 47 | )), | ||
| 48 | } | ||
| 49 | } else { | ||
| 50 | tracing::debug!("inference was disabled and job id is not set"); | ||
| 51 | Err(eyre::eyre!("missing job id")) | ||
| 52 | } | ||
| 30 | } | 53 | } |
| 31 | 54 | ||
| 32 | pub fn frontend_hostname(&self) -> Result<&str> { | 55 | pub fn frontend_hostname(&self) -> Result<&str> { |
diff --git a/src/main.rs b/src/main.rs index 9b9c815..cc8afd0 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -46,6 +46,13 @@ struct Common { | |||
| 46 | #[clap(long, env = "OAR_JOB_ID")] | 46 | #[clap(long, env = "OAR_JOB_ID")] |
| 47 | job_id: Option<u32>, | 47 | job_id: Option<u32>, |
| 48 | 48 | ||
| 49 | /// infer the oar job id | ||
| 50 | /// | ||
| 51 | /// if you have a single job running on the cluster, that job will be used, otherwise, | ||
| 52 | /// inference fails. the job_id flag takes precedence of over inference. | ||
| 53 | #[clap(long, env = "OAR_P2P_INFER_JOB_ID")] | ||
| 54 | infer_job_id: bool, | ||
| 55 | |||
| 49 | /// hostname used to access the frontend using ssh. | 56 | /// hostname used to access the frontend using ssh. |
| 50 | /// i.e. `ssh <frontend-hostname>` should work. | 57 | /// i.e. `ssh <frontend-hostname>` should work. |
| 51 | #[clap(long, env = "FRONTEND_HOSTNAME")] | 58 | #[clap(long, env = "FRONTEND_HOSTNAME")] |
| @@ -212,7 +219,12 @@ async fn main() -> Result<()> { | |||
| 212 | } | 219 | } |
| 213 | 220 | ||
| 214 | async fn context_from_common(common: &Common) -> Result<Context> { | 221 | async fn context_from_common(common: &Common) -> Result<Context> { |
| 215 | Context::new(common.job_id, common.frontend_hostname.clone()).await | 222 | Context::new( |
| 223 | common.job_id, | ||
| 224 | common.infer_job_id, | ||
| 225 | common.frontend_hostname.clone(), | ||
| 226 | ) | ||
| 227 | .await | ||
| 216 | } | 228 | } |
| 217 | 229 | ||
| 218 | async fn cmd_net_up(args: NetUpArgs) -> Result<()> { | 230 | async fn cmd_net_up(args: NetUpArgs) -> Result<()> { |
| @@ -658,7 +670,8 @@ async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result< | |||
| 658 | async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { | 670 | async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { |
| 659 | tracing::info!("listing machine addresses"); | 671 | tracing::info!("listing machine addresses"); |
| 660 | let interface = machine.interface(); | 672 | let interface = machine.interface(); |
| 661 | let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+' || true"); | 673 | let script = |
| 674 | format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+' || true"); | ||
| 662 | let output = machine_run_script(ctx, machine, &script).await?; | 675 | let output = machine_run_script(ctx, machine, &script).await?; |
| 663 | let stdout = std::str::from_utf8(&output.stdout)?; | 676 | let stdout = std::str::from_utf8(&output.stdout)?; |
| 664 | let mut addresses = Vec::default(); | 677 | let mut addresses = Vec::default(); |
| @@ -12,7 +12,7 @@ use crate::{ | |||
| 12 | pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | 12 | pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { |
| 13 | match ctx.node { | 13 | match ctx.node { |
| 14 | ExecutionNode::Frontend => { | 14 | ExecutionNode::Frontend => { |
| 15 | let job_id = ctx.job_id()?; | 15 | let job_id = ctx.job_id().await?; |
| 16 | let output = Command::new("oarstat") | 16 | let output = Command::new("oarstat") |
| 17 | .arg("-j") | 17 | .arg("-j") |
| 18 | .arg(job_id.to_string()) | 18 | .arg(job_id.to_string()) |
| @@ -36,7 +36,7 @@ pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | |||
| 36 | extract_machines_from_oar_stat_json(stdout, job_id) | 36 | extract_machines_from_oar_stat_json(stdout, job_id) |
| 37 | } | 37 | } |
| 38 | ExecutionNode::Unknown => { | 38 | ExecutionNode::Unknown => { |
| 39 | let job_id = ctx.job_id()?; | 39 | let job_id = ctx.job_id().await?; |
| 40 | let frontend_hostname = ctx.frontend_hostname()?; | 40 | let frontend_hostname = ctx.frontend_hostname()?; |
| 41 | 41 | ||
| 42 | let output = Command::new("ssh") | 42 | let output = Command::new("ssh") |
| @@ -76,6 +76,51 @@ pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | |||
| 76 | } | 76 | } |
| 77 | } | 77 | } |
| 78 | 78 | ||
| 79 | pub async fn list_user_job_ids(ctx: &Context) -> Result<Vec<u32>> { | ||
| 80 | let output = match ctx.node { | ||
| 81 | ExecutionNode::Frontend => Command::new("oarstat").arg("-u").arg("-J").output().await?, | ||
| 82 | ExecutionNode::Unknown => { | ||
| 83 | Command::new("ssh") | ||
| 84 | .arg(ctx.frontend_hostname()?) | ||
| 85 | .arg("oarstat") | ||
| 86 | .arg("-u") | ||
| 87 | .arg("-J") | ||
| 88 | .output() | ||
| 89 | .await? | ||
| 90 | } | ||
| 91 | ExecutionNode::Machine(_) => { | ||
| 92 | return Err(eyre::eyre!( | ||
| 93 | "cannot run oarstat from inside a cluster machine" | ||
| 94 | )); | ||
| 95 | } | ||
| 96 | }; | ||
| 97 | |||
| 98 | if !output.status.success() { | ||
| 99 | tracing::error!( | ||
| 100 | "stdout: {}", | ||
| 101 | std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8") | ||
| 102 | ); | ||
| 103 | tracing::error!( | ||
| 104 | "stderr: {}", | ||
| 105 | std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8") | ||
| 106 | ); | ||
| 107 | return Err(eyre::eyre!("failed to run oarstat")); | ||
| 108 | } | ||
| 109 | |||
| 110 | let stdout = String::from_utf8(output.stdout)?; | ||
| 111 | // for some reason, running oarstat with the -J flag (for json output) when you have no jobs | ||
| 112 | // running results in this error message instead of an empty object, so we will just assume it | ||
| 113 | // meant an empty object | ||
| 114 | let json_string = if stdout | ||
| 115 | == "hash- or arrayref expected (not a simple scalar, use allow_nonref to allow this) at /usr/lib/oar/oarstat line 285." | ||
| 116 | { | ||
| 117 | String::from("{}") | ||
| 118 | } else { | ||
| 119 | stdout | ||
| 120 | }; | ||
| 121 | extract_job_ids_from_oarstat_output(&json_string) | ||
| 122 | } | ||
| 123 | |||
| 79 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { | 124 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { |
| 80 | #[derive(Debug, Deserialize)] | 125 | #[derive(Debug, Deserialize)] |
| 81 | struct JobSchema { | 126 | struct JobSchema { |
| @@ -96,6 +141,26 @@ fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec< | |||
| 96 | Ok(machines) | 141 | Ok(machines) |
| 97 | } | 142 | } |
| 98 | 143 | ||
| 144 | fn extract_job_ids_from_oarstat_output(output: &str) -> Result<Vec<u32>> { | ||
| 145 | let value = serde_json::from_str::<serde_json::Value>(output)?; | ||
| 146 | let object = match value { | ||
| 147 | serde_json::Value::Object(map) => map, | ||
| 148 | _ => { | ||
| 149 | return Err(eyre::eyre!( | ||
| 150 | "expected oar stat output to produce a json object" | ||
| 151 | )); | ||
| 152 | } | ||
| 153 | }; | ||
| 154 | |||
| 155 | let mut job_ids = Vec::default(); | ||
| 156 | for key in object.keys() { | ||
| 157 | tracing::trace!("parsing key '{key}'"); | ||
| 158 | let job_id = key.parse()?; | ||
| 159 | job_ids.push(job_id); | ||
| 160 | } | ||
| 161 | Ok(job_ids) | ||
| 162 | } | ||
| 163 | |||
| 99 | #[cfg(test)] | 164 | #[cfg(test)] |
| 100 | mod test { | 165 | mod test { |
| 101 | use super::*; | 166 | use super::*; |
| @@ -161,4 +226,203 @@ mod test { | |||
| 161 | assert_eq!(machines[0], Machine::Gengar1); | 226 | assert_eq!(machines[0], Machine::Gengar1); |
| 162 | assert_eq!(machines[1], Machine::Gengar2); | 227 | assert_eq!(machines[1], Machine::Gengar2); |
| 163 | } | 228 | } |
| 229 | |||
| 230 | const OAR_STAT_ALL_USER_JOBS_OUTPUT: &'static str = r#" | ||
| 231 | { | ||
| 232 | "37030" : { | ||
| 233 | "dependencies" : [], | ||
| 234 | "jobType" : "PASSIVE", | ||
| 235 | "state" : "Running", | ||
| 236 | "assigned_network_address" : [ | ||
| 237 | "moltres-02" | ||
| 238 | ], | ||
| 239 | "command" : "sleep 365d", | ||
| 240 | "submissionTime" : 1752824505, | ||
| 241 | "name" : null, | ||
| 242 | "Job_Id" : 37030, | ||
| 243 | "stderr_file" : "OAR.37030.stderr", | ||
| 244 | "queue" : "default", | ||
| 245 | "launchingDirectory" : "/home/diogo464", | ||
| 246 | "reservation" : "None", | ||
| 247 | "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' OR dedicated='tardis' )) AND desktop_computing = 'NO') AND drain='NO'", | ||
| 248 | "message" : "R=64,W=1:0:0,J=B (Karma=0.106,quota_ok)", | ||
| 249 | "stdout_file" : "OAR.37030.stdout", | ||
| 250 | "resubmit_job_id" : 0, | ||
| 251 | "types" : [], | ||
| 252 | "cpuset_name" : "diogo464_37030", | ||
| 253 | "array_index" : 1, | ||
| 254 | "project" : "default", | ||
| 255 | "array_id" : 37030, | ||
| 256 | "owner" : "diogo464", | ||
| 257 | "startTime" : 1752824506, | ||
| 258 | "assigned_resources" : [ | ||
| 259 | 893, | ||
| 260 | 894, | ||
| 261 | 895, | ||
| 262 | 896, | ||
| 263 | 897, | ||
| 264 | 898, | ||
| 265 | 899, | ||
| 266 | 900, | ||
| 267 | 901, | ||
| 268 | 902, | ||
| 269 | 903, | ||
| 270 | 904, | ||
| 271 | 905, | ||
| 272 | 906, | ||
| 273 | 907, | ||
| 274 | 908, | ||
| 275 | 909, | ||
| 276 | 910, | ||
| 277 | 911, | ||
| 278 | 912, | ||
| 279 | 913, | ||
| 280 | 914, | ||
| 281 | 915, | ||
| 282 | 916, | ||
| 283 | 917, | ||
| 284 | 918, | ||
| 285 | 919, | ||
| 286 | 920, | ||
| 287 | 921, | ||
| 288 | 922, | ||
| 289 | 923, | ||
| 290 | 924, | ||
| 291 | 925, | ||
| 292 | 926, | ||
| 293 | 927, | ||
| 294 | 928, | ||
| 295 | 929, | ||
| 296 | 930, | ||
| 297 | 931, | ||
| 298 | 932, | ||
| 299 | 933, | ||
| 300 | 934, | ||
| 301 | 935, | ||
| 302 | 936, | ||
| 303 | 937, | ||
| 304 | 938, | ||
| 305 | 939, | ||
| 306 | 940, | ||
| 307 | 941, | ||
| 308 | 942, | ||
| 309 | 943, | ||
| 310 | 944, | ||
| 311 | 945, | ||
| 312 | 946, | ||
| 313 | 947, | ||
| 314 | 948, | ||
| 315 | 949, | ||
| 316 | 950, | ||
| 317 | 951, | ||
| 318 | 952, | ||
| 319 | 953, | ||
| 320 | 954, | ||
| 321 | 955, | ||
| 322 | 956 | ||
| 323 | ] | ||
| 324 | }, | ||
| 325 | "37029" : { | ||
| 326 | "command" : "sleep 365d", | ||
| 327 | "submissionTime" : 1752824490, | ||
| 328 | "dependencies" : [], | ||
| 329 | "jobType" : "PASSIVE", | ||
| 330 | "state" : "Running", | ||
| 331 | "assigned_network_address" : [ | ||
| 332 | "moltres-01" | ||
| 333 | ], | ||
| 334 | "Job_Id" : 37029, | ||
| 335 | "stderr_file" : "OAR.37029.stderr", | ||
| 336 | "name" : null, | ||
| 337 | "types" : [], | ||
| 338 | "cpuset_name" : "diogo464_37029", | ||
| 339 | "launchingDirectory" : "/home/diogo464", | ||
| 340 | "queue" : "default", | ||
| 341 | "reservation" : "None", | ||
| 342 | "message" : "R=64,W=1:0:0,J=B (Karma=0.106,quota_ok)", | ||
| 343 | "stdout_file" : "OAR.37029.stdout", | ||
| 344 | "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' OR dedicated='tardis' )) AND desktop_computing = 'NO') AND drain='NO'", | ||
| 345 | "resubmit_job_id" : 0, | ||
| 346 | "startTime" : 1752824491, | ||
| 347 | "assigned_resources" : [ | ||
| 348 | 829, | ||
| 349 | 830, | ||
| 350 | 831, | ||
| 351 | 832, | ||
| 352 | 833, | ||
| 353 | 834, | ||
| 354 | 835, | ||
| 355 | 836, | ||
| 356 | 837, | ||
| 357 | 838, | ||
| 358 | 839, | ||
| 359 | 840, | ||
| 360 | 841, | ||
| 361 | 842, | ||
| 362 | 843, | ||
| 363 | 844, | ||
| 364 | 845, | ||
| 365 | 846, | ||
| 366 | 847, | ||
| 367 | 848, | ||
| 368 | 849, | ||
| 369 | 850, | ||
| 370 | 851, | ||
| 371 | 852, | ||
| 372 | 853, | ||
| 373 | 854, | ||
| 374 | 855, | ||
| 375 | 856, | ||
| 376 | 857, | ||
| 377 | 858, | ||
| 378 | 859, | ||
| 379 | 860, | ||
| 380 | 861, | ||
| 381 | 862, | ||
| 382 | 863, | ||
| 383 | 864, | ||
| 384 | 865, | ||
| 385 | 866, | ||
| 386 | 867, | ||
| 387 | 868, | ||
| 388 | 869, | ||
| 389 | 870, | ||
| 390 | 871, | ||
| 391 | 872, | ||
| 392 | 873, | ||
| 393 | 874, | ||
| 394 | 875, | ||
| 395 | 876, | ||
| 396 | 877, | ||
| 397 | 878, | ||
| 398 | 879, | ||
| 399 | 880, | ||
| 400 | 881, | ||
| 401 | 882, | ||
| 402 | 883, | ||
| 403 | 884, | ||
| 404 | 885, | ||
| 405 | 886, | ||
| 406 | 887, | ||
| 407 | 888, | ||
| 408 | 889, | ||
| 409 | 890, | ||
| 410 | 891, | ||
| 411 | 892 | ||
| 412 | ], | ||
| 413 | "array_index" : 1, | ||
| 414 | "project" : "default", | ||
| 415 | "array_id" : 37029, | ||
| 416 | "owner" : "diogo464" | ||
| 417 | } | ||
| 418 | } | ||
| 419 | "#; | ||
| 420 | |||
| 421 | #[test] | ||
| 422 | fn test_extract_job_ids_from_oarstat_output() { | ||
| 423 | let job_ids = extract_job_ids_from_oarstat_output(OAR_STAT_ALL_USER_JOBS_OUTPUT).unwrap(); | ||
| 424 | assert_eq!(job_ids.len(), 2); | ||
| 425 | assert!(job_ids.contains(&37030)); | ||
| 426 | assert!(job_ids.contains(&37029)); | ||
| 427 | } | ||
| 164 | } | 428 | } |
