aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-22 10:20:00 +0100
committerdiogo464 <[email protected]>2025-07-22 10:20:00 +0100
commit8c90229d6e88c88338261496da50c01e51cac3f5 (patch)
treedce2080eb236a8cb44f43c4d12a141204eab3c8a
parentc76875a153eecd7168cbf9f1db2987cc7e90834e (diff)
feat: added oar job id inference
-rw-r--r--README.md6
-rw-r--r--src/context.rs29
-rw-r--r--src/main.rs17
-rw-r--r--src/oar.rs268
4 files changed, 313 insertions, 7 deletions
diff --git a/README.md b/README.md
index 7499b41..25855b1 100644
--- a/README.md
+++ b/README.md
@@ -32,6 +32,12 @@ before setting up a network you need to create a job on the cluster and setup so
32```bash 32```bash
33export OAR_JOB_ID="<your job id>" 33export OAR_JOB_ID="<your job id>"
34export FRONTEND_HOSTNAME="<cluster's hostname, ex: dicluster>" 34export FRONTEND_HOSTNAME="<cluster's hostname, ex: dicluster>"
35
36# optionally, you can set this variable to infer the job id
37# if you only have one active job in the cluster, this flag will
38# tell oar-p2p to auto detect that job and use it so you dont have to
39# specify the job id.
40# export OAR_P2P_INFER_JOB_ID=true
35``` 41```
36you can now use a tool like [direnv](https://direnv.net) or just `source` the file with those variables. 42you can now use a tool like [direnv](https://direnv.net) or just `source` the file with those variables.
37 43
diff --git a/src/context.rs b/src/context.rs
index ae25966..8032212 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -13,20 +13,43 @@ pub enum ExecutionNode {
13pub struct Context { 13pub struct Context {
14 pub node: ExecutionNode, 14 pub node: ExecutionNode,
15 job_id: Option<u32>, 15 job_id: Option<u32>,
16 infer_job_id: bool,
16 frontend_hostname: Option<String>, 17 frontend_hostname: Option<String>,
17} 18}
18 19
19impl Context { 20impl Context {
20 pub async fn new(job_id: Option<u32>, frontend_hostname: Option<String>) -> Result<Self> { 21 pub async fn new(
22 job_id: Option<u32>,
23 infer_job_id: bool,
24 frontend_hostname: Option<String>,
25 ) -> Result<Self> {
21 Ok(Self { 26 Ok(Self {
22 node: get_execution_node().await?, 27 node: get_execution_node().await?,
23 job_id, 28 job_id,
29 infer_job_id,
24 frontend_hostname, 30 frontend_hostname,
25 }) 31 })
26 } 32 }
27 33
28 pub fn job_id(&self) -> Result<u32> { 34 pub async fn job_id(&self) -> Result<u32> {
29 self.job_id.ok_or_else(|| eyre::eyre!("missing job id")) 35 tracing::debug!("obtaining job id");
36 if let Some(job_id) = self.job_id {
37 tracing::debug!("job id was set, using {job_id}");
38 Ok(job_id)
39 } else if self.infer_job_id {
40 tracing::debug!("job id was not set but inference is enabled, finding job id");
41 let job_ids = crate::oar::list_user_job_ids(self).await?;
42 match job_ids.len() {
43 0 => Err(eyre::eyre!("cannot infer job id, no jobs are running")),
44 1 => Ok(job_ids[0]),
45 _ => Err(eyre::eyre!(
46 "cannot infer job id, multiple jobs are running"
47 )),
48 }
49 } else {
50 tracing::debug!("inference was disabled and job id is not set");
51 Err(eyre::eyre!("missing job id"))
52 }
30 } 53 }
31 54
32 pub fn frontend_hostname(&self) -> Result<&str> { 55 pub fn frontend_hostname(&self) -> Result<&str> {
diff --git a/src/main.rs b/src/main.rs
index 9b9c815..cc8afd0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -46,6 +46,13 @@ struct Common {
46 #[clap(long, env = "OAR_JOB_ID")] 46 #[clap(long, env = "OAR_JOB_ID")]
47 job_id: Option<u32>, 47 job_id: Option<u32>,
48 48
49 /// infer the oar job id
50 ///
51 /// if you have a single job running on the cluster, that job will be used, otherwise,
52 /// inference fails. the job_id flag takes precedence of over inference.
53 #[clap(long, env = "OAR_P2P_INFER_JOB_ID")]
54 infer_job_id: bool,
55
49 /// hostname used to access the frontend using ssh. 56 /// hostname used to access the frontend using ssh.
50 /// i.e. `ssh <frontend-hostname>` should work. 57 /// i.e. `ssh <frontend-hostname>` should work.
51 #[clap(long, env = "FRONTEND_HOSTNAME")] 58 #[clap(long, env = "FRONTEND_HOSTNAME")]
@@ -212,7 +219,12 @@ async fn main() -> Result<()> {
212} 219}
213 220
214async fn context_from_common(common: &Common) -> Result<Context> { 221async fn context_from_common(common: &Common) -> Result<Context> {
215 Context::new(common.job_id, common.frontend_hostname.clone()).await 222 Context::new(
223 common.job_id,
224 common.infer_job_id,
225 common.frontend_hostname.clone(),
226 )
227 .await
216} 228}
217 229
218async fn cmd_net_up(args: NetUpArgs) -> Result<()> { 230async fn cmd_net_up(args: NetUpArgs) -> Result<()> {
@@ -658,7 +670,8 @@ async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<
658async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { 670async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> {
659 tracing::info!("listing machine addresses"); 671 tracing::info!("listing machine addresses");
660 let interface = machine.interface(); 672 let interface = machine.interface();
661 let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+' || true"); 673 let script =
674 format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+' || true");
662 let output = machine_run_script(ctx, machine, &script).await?; 675 let output = machine_run_script(ctx, machine, &script).await?;
663 let stdout = std::str::from_utf8(&output.stdout)?; 676 let stdout = std::str::from_utf8(&output.stdout)?;
664 let mut addresses = Vec::default(); 677 let mut addresses = Vec::default();
diff --git a/src/oar.rs b/src/oar.rs
index 2047d56..5a702b4 100644
--- a/src/oar.rs
+++ b/src/oar.rs
@@ -12,7 +12,7 @@ use crate::{
12pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { 12pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> {
13 match ctx.node { 13 match ctx.node {
14 ExecutionNode::Frontend => { 14 ExecutionNode::Frontend => {
15 let job_id = ctx.job_id()?; 15 let job_id = ctx.job_id().await?;
16 let output = Command::new("oarstat") 16 let output = Command::new("oarstat")
17 .arg("-j") 17 .arg("-j")
18 .arg(job_id.to_string()) 18 .arg(job_id.to_string())
@@ -36,7 +36,7 @@ pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> {
36 extract_machines_from_oar_stat_json(stdout, job_id) 36 extract_machines_from_oar_stat_json(stdout, job_id)
37 } 37 }
38 ExecutionNode::Unknown => { 38 ExecutionNode::Unknown => {
39 let job_id = ctx.job_id()?; 39 let job_id = ctx.job_id().await?;
40 let frontend_hostname = ctx.frontend_hostname()?; 40 let frontend_hostname = ctx.frontend_hostname()?;
41 41
42 let output = Command::new("ssh") 42 let output = Command::new("ssh")
@@ -76,6 +76,51 @@ pub async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> {
76 } 76 }
77} 77}
78 78
79pub async fn list_user_job_ids(ctx: &Context) -> Result<Vec<u32>> {
80 let output = match ctx.node {
81 ExecutionNode::Frontend => Command::new("oarstat").arg("-u").arg("-J").output().await?,
82 ExecutionNode::Unknown => {
83 Command::new("ssh")
84 .arg(ctx.frontend_hostname()?)
85 .arg("oarstat")
86 .arg("-u")
87 .arg("-J")
88 .output()
89 .await?
90 }
91 ExecutionNode::Machine(_) => {
92 return Err(eyre::eyre!(
93 "cannot run oarstat from inside a cluster machine"
94 ));
95 }
96 };
97
98 if !output.status.success() {
99 tracing::error!(
100 "stdout: {}",
101 std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8")
102 );
103 tracing::error!(
104 "stderr: {}",
105 std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8")
106 );
107 return Err(eyre::eyre!("failed to run oarstat"));
108 }
109
110 let stdout = String::from_utf8(output.stdout)?;
111 // for some reason, running oarstat with the -J flag (for json output) when you have no jobs
112 // running results in this error message instead of an empty object, so we will just assume it
113 // meant an empty object
114 let json_string = if stdout
115 == "hash- or arrayref expected (not a simple scalar, use allow_nonref to allow this) at /usr/lib/oar/oarstat line 285."
116 {
117 String::from("{}")
118 } else {
119 stdout
120 };
121 extract_job_ids_from_oarstat_output(&json_string)
122}
123
79fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { 124fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> {
80 #[derive(Debug, Deserialize)] 125 #[derive(Debug, Deserialize)]
81 struct JobSchema { 126 struct JobSchema {
@@ -96,6 +141,26 @@ fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<
96 Ok(machines) 141 Ok(machines)
97} 142}
98 143
144fn extract_job_ids_from_oarstat_output(output: &str) -> Result<Vec<u32>> {
145 let value = serde_json::from_str::<serde_json::Value>(output)?;
146 let object = match value {
147 serde_json::Value::Object(map) => map,
148 _ => {
149 return Err(eyre::eyre!(
150 "expected oar stat output to produce a json object"
151 ));
152 }
153 };
154
155 let mut job_ids = Vec::default();
156 for key in object.keys() {
157 tracing::trace!("parsing key '{key}'");
158 let job_id = key.parse()?;
159 job_ids.push(job_id);
160 }
161 Ok(job_ids)
162}
163
99#[cfg(test)] 164#[cfg(test)]
100mod test { 165mod test {
101 use super::*; 166 use super::*;
@@ -161,4 +226,203 @@ mod test {
161 assert_eq!(machines[0], Machine::Gengar1); 226 assert_eq!(machines[0], Machine::Gengar1);
162 assert_eq!(machines[1], Machine::Gengar2); 227 assert_eq!(machines[1], Machine::Gengar2);
163 } 228 }
229
230 const OAR_STAT_ALL_USER_JOBS_OUTPUT: &'static str = r#"
231{
232 "37030" : {
233 "dependencies" : [],
234 "jobType" : "PASSIVE",
235 "state" : "Running",
236 "assigned_network_address" : [
237 "moltres-02"
238 ],
239 "command" : "sleep 365d",
240 "submissionTime" : 1752824505,
241 "name" : null,
242 "Job_Id" : 37030,
243 "stderr_file" : "OAR.37030.stderr",
244 "queue" : "default",
245 "launchingDirectory" : "/home/diogo464",
246 "reservation" : "None",
247 "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' OR dedicated='tardis' )) AND desktop_computing = 'NO') AND drain='NO'",
248 "message" : "R=64,W=1:0:0,J=B (Karma=0.106,quota_ok)",
249 "stdout_file" : "OAR.37030.stdout",
250 "resubmit_job_id" : 0,
251 "types" : [],
252 "cpuset_name" : "diogo464_37030",
253 "array_index" : 1,
254 "project" : "default",
255 "array_id" : 37030,
256 "owner" : "diogo464",
257 "startTime" : 1752824506,
258 "assigned_resources" : [
259 893,
260 894,
261 895,
262 896,
263 897,
264 898,
265 899,
266 900,
267 901,
268 902,
269 903,
270 904,
271 905,
272 906,
273 907,
274 908,
275 909,
276 910,
277 911,
278 912,
279 913,
280 914,
281 915,
282 916,
283 917,
284 918,
285 919,
286 920,
287 921,
288 922,
289 923,
290 924,
291 925,
292 926,
293 927,
294 928,
295 929,
296 930,
297 931,
298 932,
299 933,
300 934,
301 935,
302 936,
303 937,
304 938,
305 939,
306 940,
307 941,
308 942,
309 943,
310 944,
311 945,
312 946,
313 947,
314 948,
315 949,
316 950,
317 951,
318 952,
319 953,
320 954,
321 955,
322 956
323 ]
324 },
325 "37029" : {
326 "command" : "sleep 365d",
327 "submissionTime" : 1752824490,
328 "dependencies" : [],
329 "jobType" : "PASSIVE",
330 "state" : "Running",
331 "assigned_network_address" : [
332 "moltres-01"
333 ],
334 "Job_Id" : 37029,
335 "stderr_file" : "OAR.37029.stderr",
336 "name" : null,
337 "types" : [],
338 "cpuset_name" : "diogo464_37029",
339 "launchingDirectory" : "/home/diogo464",
340 "queue" : "default",
341 "reservation" : "None",
342 "message" : "R=64,W=1:0:0,J=B (Karma=0.106,quota_ok)",
343 "stdout_file" : "OAR.37029.stdout",
344 "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' OR dedicated='tardis' )) AND desktop_computing = 'NO') AND drain='NO'",
345 "resubmit_job_id" : 0,
346 "startTime" : 1752824491,
347 "assigned_resources" : [
348 829,
349 830,
350 831,
351 832,
352 833,
353 834,
354 835,
355 836,
356 837,
357 838,
358 839,
359 840,
360 841,
361 842,
362 843,
363 844,
364 845,
365 846,
366 847,
367 848,
368 849,
369 850,
370 851,
371 852,
372 853,
373 854,
374 855,
375 856,
376 857,
377 858,
378 859,
379 860,
380 861,
381 862,
382 863,
383 864,
384 865,
385 866,
386 867,
387 868,
388 869,
389 870,
390 871,
391 872,
392 873,
393 874,
394 875,
395 876,
396 877,
397 878,
398 879,
399 880,
400 881,
401 882,
402 883,
403 884,
404 885,
405 886,
406 887,
407 888,
408 889,
409 890,
410 891,
411 892
412 ],
413 "array_index" : 1,
414 "project" : "default",
415 "array_id" : 37029,
416 "owner" : "diogo464"
417 }
418}
419"#;
420
421 #[test]
422 fn test_extract_job_ids_from_oarstat_output() {
423 let job_ids = extract_job_ids_from_oarstat_output(OAR_STAT_ALL_USER_JOBS_OUTPUT).unwrap();
424 assert_eq!(job_ids.len(), 2);
425 assert!(job_ids.contains(&37030));
426 assert!(job_ids.contains(&37029));
427 }
164} 428}