aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-10 20:05:11 +0100
committerdiogo464 <[email protected]>2025-07-10 20:05:11 +0100
commit4fc26211fda53023f8ce703ccf4b1a2bbfbbe10a (patch)
tree711fcea1be7a22cf3ef7d3ebf98ea01d7cade62d /src
parenta5178fbb0bde3ff9f863ef0cca48748cb993390a (diff)
it works, now needs cleanup
Diffstat (limited to 'src')
-rw-r--r--src/machine.rs156
-rw-r--r--src/main.rs832
2 files changed, 890 insertions, 98 deletions
diff --git a/src/machine.rs b/src/machine.rs
index f1ad94d..f223e66 100644
--- a/src/machine.rs
+++ b/src/machine.rs
@@ -1,6 +1,17 @@
1macro_rules! define_machines { 1macro_rules! define_machines {
2 ($(($name:ident, $idx:expr, $hostname:expr, $interface:expr)),*) => { 2 ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => {
3 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 3 #[derive(Debug)]
4 pub struct UnknownMachine;
5
6 impl std::fmt::Display for UnknownMachine {
7 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8 f.write_str("unknown machine")
9 }
10 }
11
12 impl std::error::Error for UnknownMachine {}
13
14 #[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
4 pub enum Machine { 15 pub enum Machine {
5 $($name,)* 16 $($name,)*
6 } 17 }
@@ -11,6 +22,17 @@ macro_rules! define_machines {
11 } 22 }
12 } 23 }
13 24
25 impl std::str::FromStr for Machine {
26 type Err = UnknownMachine;
27
28 fn from_str(v: &str) -> Result<Self, Self::Err> {
29 match v {
30 $($hostname => Ok(Self::$name),)*
31 _ => Err(UnknownMachine),
32 }
33 }
34 }
35
14 impl Machine { 36 impl Machine {
15 pub fn hostname(&self) -> &'static str { 37 pub fn hostname(&self) -> &'static str {
16 match self { 38 match self {
@@ -31,6 +53,19 @@ macro_rules! define_machines {
31 } 53 }
32 } 54 }
33 55
56 pub fn from_index(index: usize) -> Option<Self> {
57 match index {
58 $($idx => Some(Self::$name),)*
59 _ => None,
60 }
61 }
62
63 pub fn cpus(&self) -> u32 {
64 match self {
65 $(Self::$name => $cpus,)*
66 }
67 }
68
34 pub fn interface(&self) -> &'static str { 69 pub fn interface(&self) -> &'static str {
35 match self { 70 match self {
36 $(Self::$name => $interface,)* 71 $(Self::$name => $interface,)*
@@ -40,62 +75,65 @@ macro_rules! define_machines {
40 }; 75 };
41} 76}
42 77
78// node cpu counts
79// oarnodes | grep '^network_address' | cut -d' ' -f3 | sort | uniq -c
80
43define_machines!( 81define_machines!(
44 (Alakazam01, 0, "alakazam-01", todo!()), 82 (Alakazam01, 0, "alakazam-01", 64, todo!()),
45 (Alakazam02, 1, "alakazam-02", todo!()), 83 (Alakazam02, 1, "alakazam-02", 64, todo!()),
46 (Alakazam03, 2, "alakazam-03", todo!()), 84 (Alakazam03, 2, "alakazam-03", 64, todo!()),
47 (Alakazam04, 3, "alakazam-04", todo!()), 85 (Alakazam04, 3, "alakazam-04", 64, todo!()),
48 (Alakazam05, 4, "alakazam-05", todo!()), 86 (Alakazam05, 4, "alakazam-05", 64, todo!()),
49 (Alakazam06, 5, "alakazam-06", todo!()), 87 (Alakazam06, 5, "alakazam-06", 64, todo!()),
50 (Alakazam07, 6, "alakazam-07", todo!()), 88 (Alakazam07, 6, "alakazam-07", 64, todo!()),
51 (Alakazam08, 7, "alakazam-08", todo!()), 89 (Alakazam08, 7, "alakazam-08", 64, todo!()),
52 (Bulbasaur1, 8, "bulbasaur-1", todo!()), 90 (Bulbasaur1, 8, "bulbasaur-1", 16, todo!()),
53 (Bulbasaur2, 9, "bulbasaur-2", todo!()), 91 (Bulbasaur2, 9, "bulbasaur-2", 16, todo!()),
54 (Bulbasaur3, 10, "bulbasaur-3", todo!()), 92 (Bulbasaur3, 10, "bulbasaur-3", 16, todo!()),
55 (Charmander1, 11, "charmander-1", "bond0"), 93 (Charmander1, 11, "charmander-1", 32, "bond0"),
56 (Charmander2, 12, "charmander-2", "bond0"), 94 (Charmander2, 12, "charmander-2", 32, "bond0"),
57 (Charmander3, 13, "charmander-3", "bond0"), 95 (Charmander3, 13, "charmander-3", 32, "bond0"),
58 (Charmander4, 14, "charmander-4", "bond0"), 96 (Charmander4, 14, "charmander-4", 32, "bond0"),
59 (Charmander5, 15, "charmander-5", "bond0"), 97 (Charmander5, 15, "charmander-5", 32, "bond0"),
60 (Gengar1, 16, "gengar-1", "bond0"), 98 (Gengar1, 16, "gengar-1", 8, "bond0"),
61 (Gengar2, 17, "gengar-2", "bond0"), 99 (Gengar2, 17, "gengar-2", 8, "bond0"),
62 (Gengar3, 18, "gengar-3", "bond0"), 100 (Gengar3, 18, "gengar-3", 8, "bond0"),
63 (Gengar4, 19, "gengar-4", "bond0"), 101 (Gengar4, 19, "gengar-4", 8, "bond0"),
64 (Gengar5, 20, "gengar-5", "bond0"), 102 (Gengar5, 20, "gengar-5", 8, "bond0"),
65 (Kadabra01, 21, "kadabra-01", todo!()), 103 (Kadabra01, 21, "kadabra-01", 64, todo!()),
66 (Kadabra02, 22, "kadabra-02", todo!()), 104 (Kadabra02, 22, "kadabra-02", 64, todo!()),
67 (Kadabra03, 23, "kadabra-03", todo!()), 105 (Kadabra03, 23, "kadabra-03", 64, todo!()),
68 (Kadabra04, 24, "kadabra-04", todo!()), 106 (Kadabra04, 24, "kadabra-04", 64, todo!()),
69 (Kadabra05, 25, "kadabra-05", todo!()), 107 (Kadabra05, 25, "kadabra-05", 64, todo!()),
70 (Kadabra06, 26, "kadabra-06", todo!()), 108 (Kadabra06, 26, "kadabra-06", 64, todo!()),
71 (Kadabra07, 27, "kadabra-07", todo!()), 109 (Kadabra07, 27, "kadabra-07", 64, todo!()),
72 (Kadabra08, 28, "kadabra-08", todo!()), 110 (Kadabra08, 28, "kadabra-08", 64, todo!()),
73 (Lugia1, 29, "lugia-1", "bond0"), 111 (Lugia1, 29, "lugia-1", 64, "bond0"),
74 (Lugia2, 30, "lugia-2", "bond0"), 112 (Lugia2, 30, "lugia-2", 64, "bond0"),
75 (Lugia3, 31, "lugia-3", "bond0"), 113 (Lugia3, 31, "lugia-3", 64, "bond0"),
76 (Lugia4, 32, "lugia-4", "bond0"), 114 (Lugia4, 32, "lugia-4", 64, "bond0"),
77 (Lugia5, 33, "lugia-5", "bond0"), 115 (Lugia5, 33, "lugia-5", 64, "bond0"),
78 (Magikarp1, 34, "magikarp-1", todo!()), 116 (Magikarp1, 34, "magikarp-1", 16, todo!()),
79 (Moltres01, 35, "moltres-01", todo!()), 117 (Moltres01, 35, "moltres-01", 64, todo!()),
80 (Moltres02, 36, "moltres-02", todo!()), 118 (Moltres02, 36, "moltres-02", 64, todo!()),
81 (Moltres03, 37, "moltres-03", todo!()), 119 (Moltres03, 37, "moltres-03", 64, todo!()),
82 (Moltres04, 38, "moltres-04", todo!()), 120 (Moltres04, 38, "moltres-04", 64, todo!()),
83 (Moltres05, 39, "moltres-05", todo!()), 121 (Moltres05, 39, "moltres-05", 64, todo!()),
84 (Moltres06, 40, "moltres-06", todo!()), 122 (Moltres06, 40, "moltres-06", 64, todo!()),
85 (Moltres07, 41, "moltres-07", todo!()), 123 (Moltres07, 41, "moltres-07", 64, todo!()),
86 (Moltres08, 42, "moltres-08", todo!()), 124 (Moltres08, 42, "moltres-08", 64, todo!()),
87 (Moltres09, 43, "moltres-09", todo!()), 125 (Moltres09, 43, "moltres-09", 64, todo!()),
88 (Moltres10, 44, "moltres-10", todo!()), 126 (Moltres10, 44, "moltres-10", 64, todo!()),
89 (Oddish1, 45, "oddish-1", todo!()), 127 (Oddish1, 45, "oddish-1", 4, todo!()),
90 (Psyduck1, 46, "psyduck-1", todo!()), 128 (Psyduck1, 46, "psyduck-1", 8, todo!()),
91 (Psyduck2, 47, "psyduck-2", todo!()), 129 (Psyduck2, 47, "psyduck-2", 8, todo!()),
92 (Psyduck3, 48, "psyduck-3", todo!()), 130 (Psyduck3, 48, "psyduck-3", 8, todo!()),
93 (Shelder1, 49, "shelder-1", todo!()), 131 (Shelder1, 49, "shelder-1", 64, todo!()),
94 (Squirtle1, 50, "squirtle-1", todo!()), 132 (Squirtle1, 50, "squirtle-1", 24, todo!()),
95 (Squirtle2, 51, "squirtle-2", todo!()), 133 (Squirtle2, 51, "squirtle-2", 24, todo!()),
96 (Squirtle3, 52, "squirtle-3", todo!()), 134 (Squirtle3, 52, "squirtle-3", 24, todo!()),
97 (Squirtle4, 53, "squirtle-4", todo!()), 135 (Squirtle4, 53, "squirtle-4", 24, todo!()),
98 (Staryu1, 54, "staryu-1", todo!()), 136 (Staryu1, 54, "staryu-1", 12, todo!()),
99 (Sudowoodo1, 55, "sudowoodo-1", todo!()), 137 (Sudowoodo1, 55, "sudowoodo-1", 16, todo!()),
100 (Vulpix1, 56, "vulpix-1", todo!()) 138 (Vulpix1, 56, "vulpix-1", 112, todo!())
101); 139);
diff --git a/src/main.rs b/src/main.rs
index dfbddc0..36c2a9b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,72 +1,822 @@
1use std::collections::{HashMap, HashSet}; 1#![feature(exit_status_error)]
2use std::{
3 collections::{HashMap, HashSet},
4 net::Ipv4Addr,
5 path::{Path, PathBuf},
6 process::Output,
7};
2 8
3use clap::Parser; 9use clap::{Args, Parser, Subcommand};
4use eyre::Context as _; 10use eyre::Context as _;
11use eyre::Result;
12use futures::{StreamExt as _, stream::FuturesUnordered};
5use machine::Machine; 13use machine::Machine;
6use serde::Deserialize; 14use serde::Deserialize;
7use tokio::process::Command; 15use tokio::{
16 io::{AsyncReadExt as _, AsyncWriteExt as _},
17 process::Command,
18 task::JoinSet,
19};
20
21use crate::latency_matrix::LatencyMatrix;
8 22
9pub mod latency_matrix; 23pub mod latency_matrix;
10pub mod machine; 24pub mod machine;
11 25
26const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking";
27
12#[derive(Debug, Parser)] 28#[derive(Debug, Parser)]
13pub struct Args { 29struct Cli {
30 #[clap(subcommand)]
31 cmd: SubCmd,
32}
33
34#[derive(Debug, Args)]
35struct Common {
14 #[clap(long, env = "OAR_JOB_ID")] 36 #[clap(long, env = "OAR_JOB_ID")]
15 pub job_id: Option<u32>, 37 job_id: Option<u32>,
38
16 #[clap(long, env = "FRONTEND_HOSTNAME")] 39 #[clap(long, env = "FRONTEND_HOSTNAME")]
17 pub frontend_hostname: Option<String>, 40 frontend_hostname: Option<String>,
41}
42
43#[derive(Debug, Subcommand)]
44enum SubCmd {
45 Net(NetArgs),
46 Run(RunArgs),
47}
48
49#[derive(Debug, Args)]
50struct NetArgs {
51 #[clap(subcommand)]
52 cmd: NetSubCmd,
53}
54
55#[derive(Debug, Subcommand)]
56enum NetSubCmd {
57 Up(NetUpArgs),
58 Down(NetDownArgs),
59 Show(NetShowArgs),
60 Preview(NetPreviewArgs),
61}
62
63#[derive(Debug, Args)]
64struct NetUpArgs {
65 #[clap(flatten)]
66 common: Common,
67 #[clap(long)]
68 addr_per_cpu: u32,
69 #[clap(long)]
70 latency_matrix: PathBuf,
71}
72
73#[derive(Debug, Args)]
74struct NetDownArgs {
75 #[clap(flatten)]
76 common: Common,
77}
78
79#[derive(Debug, Args)]
80struct NetShowArgs {
81 #[clap(flatten)]
82 common: Common,
83}
84
85#[derive(Debug, Args)]
86struct NetPreviewArgs {
87 #[clap(long)]
88 machine: Vec<Machine>,
89
90 #[clap(long)]
91 addr_per_cpu: u32,
92
93 #[clap(long)]
94 latency_matrix: PathBuf,
95}
96
97#[derive(Debug, Args)]
98struct RunArgs {
99 #[clap(flatten)]
100 common: Common,
101
102 #[clap(long)]
103 output_dir: PathBuf,
104
105 schedule: Option<PathBuf>,
18} 106}
19 107
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 108#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum ExecutionNode { 109enum ExecutionNode {
22 Frontend, 110 Frontend,
23 Machine(Machine), 111 Machine(Machine),
24 Unknown, 112 Unknown,
25} 113}
26 114
27pub struct Context { 115#[derive(Debug, Clone)]
28 pub node: ExecutionNode, 116struct Context {
29 pub job_id: Option<u32>, 117 node: ExecutionNode,
30 pub frontend_hostname: Option<String>, 118 job_id: Option<u32>,
119 frontend_hostname: Option<String>,
31} 120}
32 121
33pub struct MachineConfig { 122#[derive(Debug, Clone)]
34 machine 123struct MachineConfig {
124 machine: Machine,
125 addresses: Vec<Ipv4Addr>,
126 nft_script: String,
127 tc_commands: Vec<String>,
128 ip_commands: Vec<String>,
35} 129}
36 130
37#[tokio::main] 131#[tokio::main]
38async fn main() -> eyre::Result<()> { 132async fn main() -> Result<()> {
39 tracing_subscriber::fmt::init(); 133 tracing_subscriber::fmt()
134 .with_env_filter(
135 tracing_subscriber::EnvFilter::try_from_default_env()
136 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
137 )
138 .with_writer(std::io::stderr)
139 .init();
40 color_eyre::install()?; 140 color_eyre::install()?;
41 141
42 let args = Args::parse(); 142 let cli = Cli::parse();
43 let node = get_execution_node()?; 143 match cli.cmd {
44 let context = Context { 144 SubCmd::Net(args) => match args.cmd {
145 NetSubCmd::Up(args) => cmd_net_up(args).await,
146 NetSubCmd::Down(args) => cmd_net_down(args).await,
147 NetSubCmd::Show(args) => cmd_net_show(args).await,
148 NetSubCmd::Preview(args) => cmd_net_preview(args).await,
149 },
150 SubCmd::Run(args) => cmd_run(args).await,
151 }
152}
153
154async fn context_from_common(common: &Common) -> Result<Context> {
155 let node = get_execution_node().await?;
156 Ok(Context {
45 node, 157 node,
46 job_id: args.job_id, 158 job_id: common.job_id,
47 frontend_hostname: args.frontend_hostname, 159 frontend_hostname: common.frontend_hostname.clone(),
160 })
161}
162
163async fn cmd_net_up(args: NetUpArgs) -> Result<()> {
164 let context = context_from_common(&args.common).await?;
165 let matrix_content = tokio::fs::read_to_string(&args.latency_matrix)
166 .await
167 .context("reading latecy matrix")?;
168 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds)
169 .context("parsing latency matrix")?;
170 let machines = job_list_machines(&context).await?;
171 let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu);
172 machines_net_container_build(&context, &machines).await?;
173 machines_clean(&context, &machines).await?;
174 machines_configure(&context, &configs).await?;
175 Ok(())
176}
177
178async fn cmd_net_down(args: NetDownArgs) -> Result<()> {
179 let context = context_from_common(&args.common).await?;
180 let machines = job_list_machines(&context).await?;
181 machines_net_container_build(&context, &machines).await?;
182 machines_clean(&context, &machines).await?;
183 Ok(())
184}
185
186async fn cmd_net_show(args: NetShowArgs) -> Result<()> {
187 let context = context_from_common(&args.common).await?;
188 let machines = job_list_machines(&context).await?;
189 let mut set = JoinSet::default();
190 for machine in machines {
191 let context = context.clone();
192 set.spawn(async move { (machine, machine_list_addresses(&context, machine).await) });
193 }
194 let mut addresses = Vec::default();
195 for (machine, result) in set.join_all().await {
196 let addrs = result?;
197 for addr in addrs {
198 addresses.push((machine, addr));
199 }
200 }
201 addresses.sort();
202 for (machine, addr) in addresses {
203 println!("{} {}", machine, addr);
204 }
205 Ok(())
206}
207
208async fn cmd_net_preview(args: NetPreviewArgs) -> Result<()> {
209 let matrix_content = tokio::fs::read_to_string(&args.latency_matrix)
210 .await
211 .context("reading latecy matrix")?;
212 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds)
213 .context("parsing latency matrix")?;
214 let machines = args.machine;
215 let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu);
216
217 for config in configs {
218 (0..20).for_each(|_| print!("-"));
219 print!(" {} ", config.machine);
220 (0..20).for_each(|_| print!("-"));
221 println!();
222 println!("{}", machine_configuration_script(&config));
223 }
224 Ok(())
225}
226
227fn machine_from_addr(addr: Ipv4Addr) -> Result<Machine> {
228 let machine_index = usize::from(addr.octets()[1]);
229 Machine::from_index(machine_index)
230 .ok_or_else(|| eyre::eyre!("failed to resolve machine from address {addr}"))
231}
232
233#[derive(Debug, Clone)]
234struct ScheduledContainer {
235 name: String,
236 image: String,
237 machine: Machine,
238 address: Ipv4Addr,
239 variables: HashMap<String, String>,
240}
241
242fn parse_schedule(schedule: &str) -> Result<Vec<ScheduledContainer>> {
243 #[derive(Debug, Deserialize)]
244 struct ScheduleItem {
245 name: Option<String>,
246 address: Ipv4Addr,
247 image: String,
248 env: HashMap<String, String>,
249 }
250
251 let items = serde_json::from_str::<Vec<ScheduleItem>>(schedule)?;
252 let mut containers = Vec::default();
253 for item in items {
254 let name = match item.name {
255 Some(name) => name,
256 None => item.address.to_string(),
257 };
258 let machine = machine_from_addr(item.address)?;
259
260 containers.push(ScheduledContainer {
261 name,
262 image: item.image,
263 machine,
264 address: item.address,
265 variables: item.env,
266 });
267 }
268 Ok(containers)
269}
270
271async fn cmd_run(args: RunArgs) -> Result<()> {
272 let ctx = context_from_common(&args.common).await?;
273 let machines = job_list_machines(&ctx).await?;
274 let schedule = match args.schedule {
275 Some(path) => tokio::fs::read_to_string(&path)
276 .await
277 .with_context(|| format!("reading schedule file: {}", path.display()))?,
278 None => {
279 let mut stdin = String::default();
280 tokio::io::stdin()
281 .read_to_string(&mut stdin)
282 .await
283 .context("reading schedule from stdin")?;
284 stdin
285 }
286 };
287 let containers = parse_schedule(&schedule)?;
288
289 machines_foreach(&machines, |machine| machine_containers_clean(&ctx, machine)).await?;
290 machines_foreach(&machines, |machine| {
291 let ctx = ctx.clone();
292 let containers = containers
293 .iter()
294 .filter(|c| c.machine == machine)
295 .cloned()
296 .collect::<Vec<_>>();
297 let mut script = String::default();
298 for (idx, container) in containers.iter().enumerate() {
299 script.push_str("docker create \\\n");
300 script.push_str("\t--pull=always \\\n");
301 script.push_str("\t--network=host \\\n");
302 script.push_str("\t--restart=no \\\n");
303 script.push_str(&format!("\t--name {} \\\n", container.name));
304 for (key, val) in container.variables.iter() {
305 script.push_str("\t-e ");
306 script.push_str(key);
307 script.push_str("=");
308 script.push_str(val);
309 script.push_str(" \\\n");
310 }
311 script.push_str("\t");
312 script.push_str(&container.image);
313 script.push_str(" &\n");
314 script.push_str(&format!("pid_{idx}=$!\n\n"));
315 }
316
317 for (idx, container) in containers.iter().enumerate() {
318 let name = &container.name;
319 script.push_str(&format!(
320 "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n"
321 ));
322 }
323 tracing::debug!("container creation script:\n{script}");
324 async move { machine_run_script(&ctx, machine, &script).await }
325 })
326 .await?;
327
328 tracing::info!("starting all containers on all machines");
329 machines_foreach(
330 machines
331 .iter()
332 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)),
333 |machine| {
334 machine_run_script(
335 &ctx,
336 machine,
337 "docker container ls -aq | xargs docker container start",
338 )
339 },
340 )
341 .await?;
342
343 tracing::info!("waiting for all containers to exit");
344 machines_foreach(&machines, |machine| {
345 let ctx = ctx.clone();
346 let containers = containers
347 .iter()
348 .filter(|c| c.machine == machine)
349 .cloned()
350 .collect::<Vec<_>>();
351 let mut script = String::default();
352 for container in containers {
353 let name = &container.name;
354 script.push_str(&format!("if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n"));
355 script.push_str(&format!("\techo Container {name} failed\n"));
356 script.push_str(&format!("\tdocker logs {name} 2>1\n"));
357 script.push_str("\texit 1\n");
358 script.push_str("fi\n\n");
359 }
360 script.push_str("exit 0\n");
361 async move { machine_run_script(&ctx, machine, &script).await }
362 })
363 .await?;
364
365 tracing::info!("saving logs to disk on all machines");
366 machines_foreach(&machines, |machine| {
367 let ctx = ctx.clone();
368 let containers = containers
369 .iter()
370 .filter(|c| c.machine == machine)
371 .cloned()
372 .collect::<Vec<_>>();
373 let mut script = String::default();
374 script.push_str("set -e\n");
375 script.push_str("mkdir -p /tmp/oar-p2p-logs\n");
376 script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n");
377 for container in containers {
378 let name = &container.name;
379 script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n"));
380 }
381 script.push_str("exit 0\n");
382 async move { machine_run_script(&ctx, machine, &script).await }
383 })
384 .await?;
385
386 machines_foreach(
387 machines
388 .iter()
389 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)),
390 |machine| machine_copy_logs_dir(&ctx, machine, &args.output_dir),
391 )
392 .await?;
393
394 Ok(())
395}
396
397async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> {
398 let scp_common = &[
399 "-o",
400 "StrictHostKeyChecking=no",
401 "-o",
402 "UserKnownHostsFile=/dev/null",
403 ];
404
405 let mut args = vec![];
406 args.extend(scp_common);
407 if ctx.node == ExecutionNode::Unknown {
408 args.push("-J");
409 args.push(ctx.frontend_hostname.as_ref().expect("TODO"));
410 }
411 args.push("-r");
412
413 let source_path = format!("{}:/tmp/oar-p2p-logs", machine.hostname());
414 let destination_path = output_dir.display().to_string();
415 args.push(&source_path);
416 args.push(&destination_path);
417
418 let output = Command::new("scp").args(args).output().await?;
419 output.exit_ok()?;
420 Ok(())
421}
422
423async fn machines_foreach<F, FUT, RET>(
424 machines: impl IntoIterator<Item = &Machine>,
425 f: F,
426) -> Result<()>
427where
428 F: Fn(Machine) -> FUT,
429 FUT: std::future::Future<Output = Result<RET>>,
430{
431 let mut futures = FuturesUnordered::new();
432
433 for &machine in machines {
434 let fut = f(machine);
435 let fut = async move { (machine, fut.await) };
436 futures.push(fut);
437 }
438
439 while let Some((machine, result)) = futures.next().await {
440 if let Err(err) = result {
441 tracing::error!("error on machine {machine}: {err}");
442 return Err(err);
443 }
444 }
445 Ok(())
446}
447
448#[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))]
449async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> {
450 tracing::info!("removing all containers...");
451 machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?;
452 Ok(())
453}
454
455#[tracing::instrument(ret, err, skip_all)]
456async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> {
457 tracing::info!("cleaning machines: {machines:?}");
458 let mut set = JoinSet::default();
459 for &machine in machines {
460 let ctx = ctx.clone();
461 set.spawn(async move { machine_clean(&ctx, machine).await });
462 }
463 let results = set.join_all().await;
464 for result in results {
465 result?;
466 }
467 Ok(())
468}
469
470#[tracing::instrument(ret, err, skip_all)]
471async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> {
472 tracing::info!("building networking container for machines: {machines:?}");
473 let mut set = JoinSet::default();
474 for &machine in machines {
475 let ctx = ctx.clone();
476 set.spawn(async move { machine_net_container_build(&ctx, machine).await });
477 }
478 for result in set.join_all().await {
479 result?;
480 }
481 Ok(())
482}
483
484#[tracing::instrument(ret, err, skip_all)]
485async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> {
486 tracing::info!("configuring machines");
487 let mut set = JoinSet::default();
488 for config in configs {
489 let ctx = ctx.clone();
490 let config = config.clone();
491 set.spawn(async move { machine_configure(&ctx, &config).await });
492 }
493 for result in set.join_all().await {
494 result?;
495 }
496 Ok(())
497}
498
499async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> {
500 let interface = machine.interface();
501 let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'");
502 let output = machine_run_script(ctx, machine, &script).await?;
503 let stdout = std::str::from_utf8(&output.stdout)?;
504 let mut addresses = Vec::default();
505 for line in stdout.lines().map(str::trim).filter(|l| !l.is_empty()) {
506 tracing::trace!("parsing address from line: '{line}'");
507 addresses.push(line.parse()?);
508 }
509 Ok(addresses)
510}
511
512async fn machine_run(
513 ctx: &Context,
514 machine: Machine,
515 args: &[&str],
516 stdin: Option<&str>,
517) -> Result<Output> {
518 let ssh_common = &[
519 "-o",
520 "StrictHostKeyChecking=no",
521 "-o",
522 "UserKnownHostsFile=/dev/null",
523 ];
524
525 let mut arguments = match ctx.node {
526 ExecutionNode::Frontend => {
527 let mut arguments = Vec::default();
528 arguments.push("ssh");
529 arguments.extend(ssh_common);
530 arguments.push(machine.hostname());
531 arguments
532 }
533 ExecutionNode::Machine(m) => {
534 if m == machine {
535 vec![]
536 } else {
537 let mut arguments = Vec::default();
538 arguments.push("ssh");
539 arguments.extend(ssh_common);
540 arguments.push(machine.hostname());
541 arguments
542 }
543 }
544 ExecutionNode::Unknown => {
545 let frontend = ctx.frontend_hostname.as_ref().unwrap();
546 let mut arguments = Vec::default();
547 arguments.push("ssh");
548 arguments.extend(ssh_common);
549 arguments.push("-J");
550 arguments.push(frontend);
551 arguments.push(machine.hostname());
552 arguments
553 }
48 }; 554 };
49 let machines = list_job_machines(&context).await?; 555 if args.is_empty() {
556 arguments.push("bash");
557 }
558 arguments.extend(args);
50 559
51 // listing oar job machines 560 let mut proc = Command::new(arguments[0])
52 // if we are in the frontend we use oarstat 561 .args(&arguments[1..])
53 // if we are in a job machine we read the cpuset file 562 .stdout(std::process::Stdio::piped())
54 // if we are outside the cluster we use ssh cluster oarstat 563 .stderr(std::process::Stdio::piped())
564 .stdin(std::process::Stdio::piped())
565 .spawn()
566 .context("spawning process")?;
55 567
56 // machine generate configurations 568 if let Some(stdin) = stdin {
57 // this does not require any connections 569 let proc_stdin = proc.stdin.as_mut().unwrap();
570 proc_stdin
571 .write_all(stdin.as_bytes())
572 .await
573 .context("writing stdin")?;
574 }
58 575
59 // machine cleanup interface 576 let output = proc
60 // requires running commands inside a container inside a machine 577 .wait_with_output()
61 // we could generate a cleanup bash script and execute that in oneshot 578 .await
579 .context("waiting for process to exit")?;
62 580
63 // machine apply configuration 581 Ok(output)
64 // this also requires running some scripts inside a container inside the machine 582}
65 583
584async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> {
585 tracing::trace!("running script on machine {machine}:\n{script}");
586 let output = machine_run(ctx, machine, &[], Some(script)).await?;
587 tracing::trace!(
588 "stdout:\n{}",
589 std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>")
590 );
591 tracing::trace!(
592 "stderr:\n{}",
593 std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>")
594 );
595 Ok(output.exit_ok()?)
596}
597
598async fn machine_net_container_run_script(
599 ctx: &Context,
600 machine: Machine,
601 script: &str,
602) -> Result<Output> {
603 machine_run(
604 ctx,
605 machine,
606 &[
607 "docker",
608 "run",
609 "--rm",
610 "-i",
611 "--net=host",
612 "--privileged",
613 CONTAINER_IMAGE_NAME,
614 ],
615 Some(script),
616 )
617 .await
618}
619
620#[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))]
621async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> {
622 let script = r#"
623set -e
624cat << EOF > /tmp/oar-p2p.containerfile
625FROM alpine:latest
626RUN apk update && \
627 apk add --no-cache bash grep iproute2 iproute2-tc nftables && \
628 rm -rf /var/cache/apk/*
629
630WORKDIR /work
631EOF
632
633docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile .
634"#;
635 machine_run_script(ctx, machine, script).await?;
66 Ok(()) 636 Ok(())
67} 637}
68 638
69async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { 639#[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))]
640async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> {
641 let interface = machine.interface();
642 let mut script = String::default();
643 script.push_str(&format!(
644 "ip route del 10.0.0.0/8 dev {interface} || true\n"
645 ));
646 script.push_str(&format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+/32' | sed 's/\\(.*\\)/addr del \\1 dev {interface}/' | ip -b -\n"));
647 script.push_str(&format!(
648 "tc qdisc del dev {interface} root 2>/dev/null || true\n"
649 ));
650 script.push_str(&format!(
651 "tc qdisc del dev {interface} ingress 2>/dev/null || true\n"
652 ));
653 script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n");
654 script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n");
655 script.push_str("nft delete table oar-p2p 2>/dev/null || true\n");
656 let output = machine_net_container_run_script(&ctx, machine, &script).await?;
657 Ok(())
658}
659
660fn machine_configuration_script(config: &MachineConfig) -> String {
661 let mut script = String::default();
662 // ip configuration
663 script.push_str("cat << EOF | ip -b -\n");
664 for command in config.ip_commands.iter() {
665 script.push_str(command);
666 script.push_str("\n");
667 }
668 script.push_str("\nEOF\n");
669
670 // tc configuration
671 script.push_str("cat << EOF | tc -b -\n");
672 for command in config.tc_commands.iter() {
673 script.push_str(command);
674 script.push_str("\n");
675 }
676 script.push_str("\nEOF\n");
677
678 // nft configuration
679 script.push_str("cat << EOF | nft -f -\n");
680 script.push_str(&config.nft_script);
681 script.push_str("\nEOF\n");
682 script
683}
684
685#[tracing::instrument(ret, err, skip_all, fields(machine = config.machine.to_string()))]
686async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> {
687 let script = machine_configuration_script(config);
688 tracing::debug!("configuration script:\n{script}");
689 machine_net_container_run_script(ctx, config.machine, &script).await?;
690 Ok(())
691}
692
693fn machine_address_for_idx(machine: Machine, idx: u32) -> Ipv4Addr {
694 let c = u8::try_from(idx / 254).unwrap();
695 let d = u8::try_from(idx % 254 + 1).unwrap();
696 Ipv4Addr::new(10, machine.index().try_into().unwrap(), c, d)
697}
698
699fn machine_generate_configs(
700 matrix: &LatencyMatrix,
701 machines: &[Machine],
702 addr_per_cpu: u32,
703) -> Vec<MachineConfig> {
704 let mut configs = Vec::default();
705 let mut addresses = Vec::default();
706 let mut address_to_index = HashMap::<Ipv4Addr, usize>::default();
707
708 // gather all addresses across all machines
709 for &machine in machines {
710 for i in 0..(addr_per_cpu * machine.cpus()) {
711 let address = machine_address_for_idx(machine, i);
712 addresses.push(address);
713 address_to_index.insert(address, addresses.len() - 1);
714 }
715 }
716
717 for &machine in machines {
718 let mut machine_addresses = Vec::default();
719 let mut machine_ip_commands = Vec::default();
720 let mut machine_tc_commands = Vec::default();
721 let mut machine_nft_script = String::default();
722
723 machine_ip_commands.push(format!("route add 10.0.0.0/8 dev {}", machine.interface()));
724 for i in 0..(addr_per_cpu * machine.cpus()) {
725 let address = machine_address_for_idx(machine, i);
726 machine_addresses.push(address);
727 machine_ip_commands.push(format!("addr add {address}/32 dev {}", machine.interface()));
728 }
729
730 let mut latencies_set = HashSet::<u32>::default();
731 let mut latencies_buckets = Vec::<u32>::default();
732 let mut latencies_addr_pairs = HashMap::<u32, Vec<(Ipv4Addr, Ipv4Addr)>>::default();
733 for &addr in &machine_addresses {
734 let addr_idx = address_to_index[&addr];
735 for other_idx in (0..addresses.len()).filter(|i| *i != addr_idx) {
736 let other = addresses[other_idx];
737 let latency = matrix.latency(addr_idx, other_idx);
738 let latency_millis = u32::try_from(latency.as_millis()).unwrap();
739 if !latencies_set.contains(&latency_millis) {
740 latencies_set.insert(latency_millis);
741 latencies_buckets.push(latency_millis);
742 }
743 latencies_addr_pairs
744 .entry(latency_millis)
745 .or_default()
746 .push((addr, other));
747 }
748 }
749
750 for iface in &["lo", machine.interface()] {
751 machine_tc_commands.push(format!(
752 "qdisc add dev {iface} root handle 1: htb default 9999"
753 ));
754 machine_tc_commands.push(format!(
755 "class add dev {iface} parent 1: classid 1:9999 htb rate 10gbit"
756 ));
757 for (idx, &latency_millis) in latencies_buckets.iter().enumerate() {
758 // tc class for latency at idx X is X + 1
759 let latency_class_id = idx + 1;
760 // mark for latency at idx X is X + 1
761 let latency_mark = idx + 1;
762
763 machine_tc_commands.push(format!(
764 "class add dev {iface} parent 1: classid 1:{} htb rate 10gbit",
765 latency_class_id
766 ));
767 // why idx + 2 here? I dont remember anymore and forgot to comment
768 machine_tc_commands.push(format!(
769 "qdisc add dev {iface} parent 1:{} handle {}: netem delay {latency_millis}ms",
770 latency_class_id,
771 idx + 2
772 ));
773 // TODO: is the order of these things correct?
774 machine_tc_commands.push(format!(
775 "filter add dev {iface} parent 1:0 prio 1 handle {} fw flowid 1:{}",
776 latency_mark, latency_class_id,
777 ));
778 }
779 }
780
781 machine_nft_script.push_str("table ip oar-p2p {\n");
782 machine_nft_script.push_str("\tmap mark_pairs {\n");
783 machine_nft_script.push_str("\t\ttype ipv4_addr . ipv4_addr : mark\n");
784 machine_nft_script.push_str("\t\telements = {\n");
785 for (latency_idx, &latency_millis) in latencies_buckets.iter().enumerate() {
786 let latency_mark = latency_idx + 1;
787 let pairs = match latencies_addr_pairs.get(&latency_millis) {
788 Some(pairs) => pairs,
789 None => continue,
790 };
791
792 for (src, dst) in pairs {
793 assert_ne!(src, dst);
794 machine_nft_script.push_str(&format!("\t\t\t{src} . {dst} : {latency_mark},\n"));
795 }
796 }
797 machine_nft_script.push_str("\t\t}\n");
798 machine_nft_script.push_str("\t}\n");
799 machine_nft_script.push_str("\n");
800 machine_nft_script.push_str("\tchain postrouting {\n");
801 machine_nft_script.push_str("\t\ttype filter hook postrouting priority mangle -1\n");
802 machine_nft_script.push_str("\t\tpolicy accept\n");
803 machine_nft_script
804 .push_str("\t\tmeta mark set ip saddr . ip daddr map @mark_pairs counter\n");
805 machine_nft_script.push_str("\t}\n");
806 machine_nft_script.push_str("}\n");
807
808 configs.push(MachineConfig {
809 machine,
810 addresses: machine_addresses,
811 nft_script: machine_nft_script,
812 tc_commands: machine_tc_commands,
813 ip_commands: machine_ip_commands,
814 });
815 }
816 configs
817}
818
819async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> {
70 match ctx.node { 820 match ctx.node {
71 ExecutionNode::Frontend => { 821 ExecutionNode::Frontend => {
72 let job_id = match ctx.job_id { 822 let job_id = match ctx.job_id {
@@ -101,7 +851,7 @@ async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> {
101 Some(hostname) => hostname, 851 Some(hostname) => hostname,
102 None => { 852 None => {
103 return Err(eyre::eyre!( 853 return Err(eyre::eyre!(
104 "frontend hostname is requiredwhen running from outside the cluster" 854 "frontend hostname is required when running from outside the cluster"
105 )); 855 ));
106 } 856 }
107 }; 857 };
@@ -148,7 +898,7 @@ async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> {
148 } 898 }
149} 899}
150 900
151fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Result<Vec<Machine>> { 901fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> {
152 #[derive(Debug, Deserialize)] 902 #[derive(Debug, Deserialize)]
153 struct JobSchema { 903 struct JobSchema {
154 assigned_network_address: Vec<String>, 904 assigned_network_address: Vec<String>,
@@ -168,8 +918,8 @@ fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Resul
168 Ok(machines) 918 Ok(machines)
169} 919}
170 920
171fn get_execution_node() -> eyre::Result<ExecutionNode> { 921async fn get_execution_node() -> Result<ExecutionNode> {
172 let hostname = get_hostname()?; 922 let hostname = get_hostname().await?;
173 let node = match hostname.as_str() { 923 let node = match hostname.as_str() {
174 "frontend" => ExecutionNode::Frontend, 924 "frontend" => ExecutionNode::Frontend,
175 _ => match Machine::from_hostname(&hostname) { 925 _ => match Machine::from_hostname(&hostname) {
@@ -180,8 +930,12 @@ fn get_execution_node() -> eyre::Result<ExecutionNode> {
180 Ok(node) 930 Ok(node)
181} 931}
182 932
183fn get_hostname() -> eyre::Result<String> { 933async fn get_hostname() -> Result<String> {
184 std::env::var("HOSTNAME").context("reading HOSTNAME env var") 934 if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await {
935 Ok(hostname)
936 } else {
937 std::env::var("HOSTNAME").context("reading HOSTNAME env var")
938 }
185} 939}
186 940
187#[cfg(test)] 941#[cfg(test)]