aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-09 14:24:25 +0100
committerdiogo464 <[email protected]>2025-07-09 14:24:25 +0100
commita5178fbb0bde3ff9f863ef0cca48748cb993390a (patch)
tree9448cd051c4909f8fb4e1afaff1b9dda3278a5b2 /src
parent8018cd7a378baee5c5e1fab85bba2592c9244c72 (diff)
rust init snapshot
Diffstat (limited to 'src')
-rw-r--r--src/latency_matrix.rs99
-rw-r--r--src/machine.rs101
-rw-r--r--src/main.rs252
3 files changed, 452 insertions, 0 deletions
diff --git a/src/latency_matrix.rs b/src/latency_matrix.rs
new file mode 100644
index 0000000..b40df9e
--- /dev/null
+++ b/src/latency_matrix.rs
@@ -0,0 +1,99 @@
1use std::str::FromStr;
2use std::time::Duration;
3use thiserror::Error;
4
5#[derive(Debug, Error)]
6pub enum InvalidLatencyMatrix {
7 #[error(
8 "invalid line dimension: line {line} had dimension {dimension} but expected {expected}"
9 )]
10 InvalidLineDimension {
11 line: usize,
12 dimension: usize,
13 expected: usize,
14 },
15 #[error("invalid latency value '{value}': {error}")]
16 InvalidLatencyValue { value: String, error: String },
17}
18
19pub enum TimeUnit {
20 Seconds,
21 Milliseconds,
22}
23
24#[derive(Debug, Clone)]
25pub struct LatencyMatrix {
26 dimension: usize,
27 latencies: Vec<Duration>,
28}
29
30impl LatencyMatrix {
31 fn new(dimension: usize, latencies: Vec<Duration>) -> Self {
32 assert_eq!(dimension * dimension, latencies.len());
33 Self {
34 dimension,
35 latencies,
36 }
37 }
38
39 pub fn latency(&self, row: usize, col: usize) -> Duration {
40 self.latencies[self.dimension * row + col]
41 }
42
43 pub fn dimension(&self) -> usize {
44 self.dimension
45 }
46
47 pub fn parse(content: &str, unit: TimeUnit) -> Result<Self, InvalidLatencyMatrix> {
48 let mut dimension = None;
49 let mut latencies = Vec::default();
50 for (line_idx, line) in content.lines().enumerate() {
51 let line = line.trim();
52 if line.is_empty() {
53 continue;
54 }
55
56 let mut current_dimension = 0;
57 for component in line.split_whitespace() {
58 current_dimension += 1;
59 let component_value = match component.parse::<f64>() {
60 Ok(value) => value,
61 Err(err) => {
62 return Err(InvalidLatencyMatrix::InvalidLatencyValue {
63 value: component.to_string(),
64 error: err.to_string(),
65 });
66 }
67 };
68
69 latencies.push(Duration::from_secs_f64(match unit {
70 TimeUnit::Seconds => component_value,
71 TimeUnit::Milliseconds => component_value / 1000.0,
72 }));
73 }
74
75 match dimension {
76 Some(dimension) => {
77 if current_dimension != dimension {
78 return Err(InvalidLatencyMatrix::InvalidLineDimension {
79 line: line_idx,
80 dimension: current_dimension,
81 expected: dimension,
82 });
83 }
84 }
85 None => dimension = Some(current_dimension),
86 }
87 }
88
89 Ok(Self::new(dimension.unwrap_or(0), latencies))
90 }
91}
92
93impl FromStr for LatencyMatrix {
94 type Err = InvalidLatencyMatrix;
95
96 fn from_str(s: &str) -> Result<Self, Self::Err> {
97 Self::parse(s, TimeUnit::Milliseconds)
98 }
99}
diff --git a/src/machine.rs b/src/machine.rs
new file mode 100644
index 0000000..f1ad94d
--- /dev/null
+++ b/src/machine.rs
@@ -0,0 +1,101 @@
1macro_rules! define_machines {
2 ($(($name:ident, $idx:expr, $hostname:expr, $interface:expr)),*) => {
3 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
4 pub enum Machine {
5 $($name,)*
6 }
7
8 impl std::fmt::Display for Machine {
9 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
10 f.write_str(self.hostname())
11 }
12 }
13
14 impl Machine {
15 pub fn hostname(&self) -> &'static str {
16 match self {
17 $(Self::$name => $hostname,)*
18 }
19 }
20
21 pub fn index(&self) -> usize {
22 match self {
23 $(Self::$name => $idx,)*
24 }
25 }
26
27 pub fn from_hostname(hostname: &str) -> Option<Self> {
28 match hostname {
29 $($hostname => Some(Self::$name),)*
30 _ => None
31 }
32 }
33
34 pub fn interface(&self) -> &'static str {
35 match self {
36 $(Self::$name => $interface,)*
37 }
38 }
39 }
40 };
41}
42
43define_machines!(
44 (Alakazam01, 0, "alakazam-01", todo!()),
45 (Alakazam02, 1, "alakazam-02", todo!()),
46 (Alakazam03, 2, "alakazam-03", todo!()),
47 (Alakazam04, 3, "alakazam-04", todo!()),
48 (Alakazam05, 4, "alakazam-05", todo!()),
49 (Alakazam06, 5, "alakazam-06", todo!()),
50 (Alakazam07, 6, "alakazam-07", todo!()),
51 (Alakazam08, 7, "alakazam-08", todo!()),
52 (Bulbasaur1, 8, "bulbasaur-1", todo!()),
53 (Bulbasaur2, 9, "bulbasaur-2", todo!()),
54 (Bulbasaur3, 10, "bulbasaur-3", todo!()),
55 (Charmander1, 11, "charmander-1", "bond0"),
56 (Charmander2, 12, "charmander-2", "bond0"),
57 (Charmander3, 13, "charmander-3", "bond0"),
58 (Charmander4, 14, "charmander-4", "bond0"),
59 (Charmander5, 15, "charmander-5", "bond0"),
60 (Gengar1, 16, "gengar-1", "bond0"),
61 (Gengar2, 17, "gengar-2", "bond0"),
62 (Gengar3, 18, "gengar-3", "bond0"),
63 (Gengar4, 19, "gengar-4", "bond0"),
64 (Gengar5, 20, "gengar-5", "bond0"),
65 (Kadabra01, 21, "kadabra-01", todo!()),
66 (Kadabra02, 22, "kadabra-02", todo!()),
67 (Kadabra03, 23, "kadabra-03", todo!()),
68 (Kadabra04, 24, "kadabra-04", todo!()),
69 (Kadabra05, 25, "kadabra-05", todo!()),
70 (Kadabra06, 26, "kadabra-06", todo!()),
71 (Kadabra07, 27, "kadabra-07", todo!()),
72 (Kadabra08, 28, "kadabra-08", todo!()),
73 (Lugia1, 29, "lugia-1", "bond0"),
74 (Lugia2, 30, "lugia-2", "bond0"),
75 (Lugia3, 31, "lugia-3", "bond0"),
76 (Lugia4, 32, "lugia-4", "bond0"),
77 (Lugia5, 33, "lugia-5", "bond0"),
78 (Magikarp1, 34, "magikarp-1", todo!()),
79 (Moltres01, 35, "moltres-01", todo!()),
80 (Moltres02, 36, "moltres-02", todo!()),
81 (Moltres03, 37, "moltres-03", todo!()),
82 (Moltres04, 38, "moltres-04", todo!()),
83 (Moltres05, 39, "moltres-05", todo!()),
84 (Moltres06, 40, "moltres-06", todo!()),
85 (Moltres07, 41, "moltres-07", todo!()),
86 (Moltres08, 42, "moltres-08", todo!()),
87 (Moltres09, 43, "moltres-09", todo!()),
88 (Moltres10, 44, "moltres-10", todo!()),
89 (Oddish1, 45, "oddish-1", todo!()),
90 (Psyduck1, 46, "psyduck-1", todo!()),
91 (Psyduck2, 47, "psyduck-2", todo!()),
92 (Psyduck3, 48, "psyduck-3", todo!()),
93 (Shelder1, 49, "shelder-1", todo!()),
94 (Squirtle1, 50, "squirtle-1", todo!()),
95 (Squirtle2, 51, "squirtle-2", todo!()),
96 (Squirtle3, 52, "squirtle-3", todo!()),
97 (Squirtle4, 53, "squirtle-4", todo!()),
98 (Staryu1, 54, "staryu-1", todo!()),
99 (Sudowoodo1, 55, "sudowoodo-1", todo!()),
100 (Vulpix1, 56, "vulpix-1", todo!())
101);
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..dfbddc0
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,252 @@
1use std::collections::{HashMap, HashSet};
2
3use clap::Parser;
4use eyre::Context as _;
5use machine::Machine;
6use serde::Deserialize;
7use tokio::process::Command;
8
9pub mod latency_matrix;
10pub mod machine;
11
12#[derive(Debug, Parser)]
13pub struct Args {
14 #[clap(long, env = "OAR_JOB_ID")]
15 pub job_id: Option<u32>,
16 #[clap(long, env = "FRONTEND_HOSTNAME")]
17 pub frontend_hostname: Option<String>,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum ExecutionNode {
22 Frontend,
23 Machine(Machine),
24 Unknown,
25}
26
27pub struct Context {
28 pub node: ExecutionNode,
29 pub job_id: Option<u32>,
30 pub frontend_hostname: Option<String>,
31}
32
33pub struct MachineConfig {
34 machine
35}
36
37#[tokio::main]
38async fn main() -> eyre::Result<()> {
39 tracing_subscriber::fmt::init();
40 color_eyre::install()?;
41
42 let args = Args::parse();
43 let node = get_execution_node()?;
44 let context = Context {
45 node,
46 job_id: args.job_id,
47 frontend_hostname: args.frontend_hostname,
48 };
49 let machines = list_job_machines(&context).await?;
50
51 // listing oar job machines
52 // if we are in the frontend we use oarstat
53 // if we are in a job machine we read the cpuset file
54 // if we are outside the cluster we use ssh cluster oarstat
55
56 // machine generate configurations
57 // this does not require any connections
58
59 // machine cleanup interface
60 // requires running commands inside a container inside a machine
61 // we could generate a cleanup bash script and execute that in oneshot
62
63 // machine apply configuration
64 // this also requires running some scripts inside a container inside the machine
65
66 Ok(())
67}
68
69async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> {
70 match ctx.node {
71 ExecutionNode::Frontend => {
72 let job_id = match ctx.job_id {
73 Some(job_id) => job_id,
74 None => return Err(eyre::eyre!("job id is required when running from cluster")),
75 };
76
77 let output = Command::new("oarstat")
78 .arg("-j")
79 .arg(job_id.to_string())
80 .arg("-J")
81 .output()
82 .await?;
83
84 if !output.status.success() {
85 tracing::error!(
86 "stdout: {}",
87 std::str::from_utf8(&output.stdout).unwrap_or("stderr contains invalid uft-8")
88 );
89 tracing::error!(
90 "stderr: {}",
91 std::str::from_utf8(&output.stderr).unwrap_or("stderr contains invalid uft-8")
92 );
93 return Err(eyre::eyre!("failed to run oarstat"));
94 }
95
96 let stdout = std::str::from_utf8(&output.stdout)?;
97 extract_machines_from_oar_stat_json(&stdout, job_id)
98 }
99 ExecutionNode::Unknown => {
100 let frontend_hostname = match ctx.frontend_hostname.as_ref() {
101 Some(hostname) => hostname,
102 None => {
103 return Err(eyre::eyre!(
104 "frontend hostname is requiredwhen running from outside the cluster"
105 ));
106 }
107 };
108
109 let job_id = match ctx.job_id {
110 Some(job_id) => job_id,
111 None => return Err(eyre::eyre!("job id is required when running from cluster")),
112 };
113
114 let output = Command::new("ssh")
115 .arg(frontend_hostname)
116 .arg("oarstat")
117 .arg("-j")
118 .arg(job_id.to_string())
119 .arg("-J")
120 .output()
121 .await?;
122
123 if !output.status.success() {
124 return Err(eyre::eyre!("failed to run oarstat"));
125 }
126
127 let stdout = std::str::from_utf8(&output.stdout)?;
128 extract_machines_from_oar_stat_json(&stdout, job_id)
129 }
130 ExecutionNode::Machine(_) => {
131 let nodefile = std::env::var("OAR_NODEFILE").context("reading OAR_NODEFILE env var")?;
132 let content = tokio::fs::read_to_string(&nodefile).await?;
133 let unique_lines = content
134 .lines()
135 .map(|l| l.trim())
136 .filter(|l| !l.is_empty())
137 .collect::<HashSet<_>>();
138 let mut machines = Vec::default();
139 for hostname in unique_lines {
140 let machine = match Machine::from_hostname(hostname) {
141 Some(machine) => machine,
142 None => return Err(eyre::eyre!("unknown machine: {hostname}")),
143 };
144 machines.push(machine);
145 }
146 Ok(machines)
147 }
148 }
149}
150
151fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Result<Vec<Machine>> {
152 #[derive(Debug, Deserialize)]
153 struct JobSchema {
154 assigned_network_address: Vec<String>,
155 }
156 let map = serde_json::from_str::<HashMap<String, JobSchema>>(output)?;
157 let key = job_id.to_string();
158 let data = map
159 .get(&key)
160 .ok_or_else(|| eyre::eyre!("missing job key"))?;
161 let mut machines = Vec::default();
162 for hostname in data.assigned_network_address.iter() {
163 match Machine::from_hostname(hostname) {
164 Some(machine) => machines.push(machine),
165 None => return Err(eyre::eyre!("unknown machine: '{hostname}'")),
166 }
167 }
168 Ok(machines)
169}
170
171fn get_execution_node() -> eyre::Result<ExecutionNode> {
172 let hostname = get_hostname()?;
173 let node = match hostname.as_str() {
174 "frontend" => ExecutionNode::Frontend,
175 _ => match Machine::from_hostname(&hostname) {
176 Some(machine) => ExecutionNode::Machine(machine),
177 _ => ExecutionNode::Unknown,
178 },
179 };
180 Ok(node)
181}
182
183fn get_hostname() -> eyre::Result<String> {
184 std::env::var("HOSTNAME").context("reading HOSTNAME env var")
185}
186
187#[cfg(test)]
188mod test {
189 use super::*;
190
191 const OAR_STAT_JSON_JOB_ID: u32 = 36627;
192 const OAR_STAT_JSON_OUTPUT: &'static str = r#"
193{
194 "36627" : {
195 "types" : [],
196 "reservation" : "None",
197 "dependencies" : [],
198 "Job_Id" : 36627,
199 "assigned_network_address" : [
200 "gengar-1",
201 "gengar-2"
202 ],
203 "owner" : "diogo464",
204 "properties" : "(( ( dedicated='NO' OR dedicated='protocol-labs' )) AND desktop_computing = 'NO') AND drain='NO'",
205 "startTime" : 1751979909,
206 "cpuset_name" : "diogo464_36627",
207 "stderr_file" : "OAR.36627.stderr",
208 "queue" : "default",
209 "state" : "Running",
210 "stdout_file" : "OAR.36627.stdout",
211 "array_index" : 1,
212 "array_id" : 36627,
213 "assigned_resources" : [
214 419,
215 420,
216 421,
217 422,
218 423,
219 424,
220 425,
221 426,
222 427,
223 428,
224 429,
225 430,
226 431,
227 432,
228 433,
229 434
230 ],
231 "name" : null,
232 "resubmit_job_id" : 0,
233 "message" : "R=16,W=12:0:0,J=B (Karma=0.087,quota_ok)",
234 "launchingDirectory" : "/home/diogo464",
235 "jobType" : "PASSIVE",
236 "submissionTime" : 1751979897,
237 "project" : "default",
238 "command" : "sleep 365d"
239 }
240}
241"#;
242
243 #[test]
244 fn test_extract_machines_from_oar_stat_json() {
245 let machines =
246 extract_machines_from_oar_stat_json(OAR_STAT_JSON_OUTPUT, OAR_STAT_JSON_JOB_ID)
247 .unwrap();
248 assert_eq!(machines.len(), 2);
249 assert_eq!(machines[0], Machine::Gengar1);
250 assert_eq!(machines[1], Machine::Gengar2);
251 }
252}