diff options
| author | diogo464 <[email protected]> | 2025-07-10 20:05:11 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-10 20:05:11 +0100 |
| commit | 4fc26211fda53023f8ce703ccf4b1a2bbfbbe10a (patch) | |
| tree | 711fcea1be7a22cf3ef7d3ebf98ea01d7cade62d /src/main.rs | |
| parent | a5178fbb0bde3ff9f863ef0cca48748cb993390a (diff) | |
it works, now needs cleanup
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 832 |
1 files changed, 793 insertions, 39 deletions
diff --git a/src/main.rs b/src/main.rs index dfbddc0..36c2a9b 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -1,72 +1,822 @@ | |||
| 1 | use std::collections::{HashMap, HashSet}; | 1 | #![feature(exit_status_error)] |
| 2 | use std::{ | ||
| 3 | collections::{HashMap, HashSet}, | ||
| 4 | net::Ipv4Addr, | ||
| 5 | path::{Path, PathBuf}, | ||
| 6 | process::Output, | ||
| 7 | }; | ||
| 2 | 8 | ||
| 3 | use clap::Parser; | 9 | use clap::{Args, Parser, Subcommand}; |
| 4 | use eyre::Context as _; | 10 | use eyre::Context as _; |
| 11 | use eyre::Result; | ||
| 12 | use futures::{StreamExt as _, stream::FuturesUnordered}; | ||
| 5 | use machine::Machine; | 13 | use machine::Machine; |
| 6 | use serde::Deserialize; | 14 | use serde::Deserialize; |
| 7 | use tokio::process::Command; | 15 | use tokio::{ |
| 16 | io::{AsyncReadExt as _, AsyncWriteExt as _}, | ||
| 17 | process::Command, | ||
| 18 | task::JoinSet, | ||
| 19 | }; | ||
| 20 | |||
| 21 | use crate::latency_matrix::LatencyMatrix; | ||
| 8 | 22 | ||
| 9 | pub mod latency_matrix; | 23 | pub mod latency_matrix; |
| 10 | pub mod machine; | 24 | pub mod machine; |
| 11 | 25 | ||
| 26 | const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking"; | ||
| 27 | |||
| 12 | #[derive(Debug, Parser)] | 28 | #[derive(Debug, Parser)] |
| 13 | pub struct Args { | 29 | struct Cli { |
| 30 | #[clap(subcommand)] | ||
| 31 | cmd: SubCmd, | ||
| 32 | } | ||
| 33 | |||
| 34 | #[derive(Debug, Args)] | ||
| 35 | struct Common { | ||
| 14 | #[clap(long, env = "OAR_JOB_ID")] | 36 | #[clap(long, env = "OAR_JOB_ID")] |
| 15 | pub job_id: Option<u32>, | 37 | job_id: Option<u32>, |
| 38 | |||
| 16 | #[clap(long, env = "FRONTEND_HOSTNAME")] | 39 | #[clap(long, env = "FRONTEND_HOSTNAME")] |
| 17 | pub frontend_hostname: Option<String>, | 40 | frontend_hostname: Option<String>, |
| 41 | } | ||
| 42 | |||
| 43 | #[derive(Debug, Subcommand)] | ||
| 44 | enum SubCmd { | ||
| 45 | Net(NetArgs), | ||
| 46 | Run(RunArgs), | ||
| 47 | } | ||
| 48 | |||
| 49 | #[derive(Debug, Args)] | ||
| 50 | struct NetArgs { | ||
| 51 | #[clap(subcommand)] | ||
| 52 | cmd: NetSubCmd, | ||
| 53 | } | ||
| 54 | |||
| 55 | #[derive(Debug, Subcommand)] | ||
| 56 | enum NetSubCmd { | ||
| 57 | Up(NetUpArgs), | ||
| 58 | Down(NetDownArgs), | ||
| 59 | Show(NetShowArgs), | ||
| 60 | Preview(NetPreviewArgs), | ||
| 61 | } | ||
| 62 | |||
| 63 | #[derive(Debug, Args)] | ||
| 64 | struct NetUpArgs { | ||
| 65 | #[clap(flatten)] | ||
| 66 | common: Common, | ||
| 67 | #[clap(long)] | ||
| 68 | addr_per_cpu: u32, | ||
| 69 | #[clap(long)] | ||
| 70 | latency_matrix: PathBuf, | ||
| 71 | } | ||
| 72 | |||
| 73 | #[derive(Debug, Args)] | ||
| 74 | struct NetDownArgs { | ||
| 75 | #[clap(flatten)] | ||
| 76 | common: Common, | ||
| 77 | } | ||
| 78 | |||
| 79 | #[derive(Debug, Args)] | ||
| 80 | struct NetShowArgs { | ||
| 81 | #[clap(flatten)] | ||
| 82 | common: Common, | ||
| 83 | } | ||
| 84 | |||
| 85 | #[derive(Debug, Args)] | ||
| 86 | struct NetPreviewArgs { | ||
| 87 | #[clap(long)] | ||
| 88 | machine: Vec<Machine>, | ||
| 89 | |||
| 90 | #[clap(long)] | ||
| 91 | addr_per_cpu: u32, | ||
| 92 | |||
| 93 | #[clap(long)] | ||
| 94 | latency_matrix: PathBuf, | ||
| 95 | } | ||
| 96 | |||
| 97 | #[derive(Debug, Args)] | ||
| 98 | struct RunArgs { | ||
| 99 | #[clap(flatten)] | ||
| 100 | common: Common, | ||
| 101 | |||
| 102 | #[clap(long)] | ||
| 103 | output_dir: PathBuf, | ||
| 104 | |||
| 105 | schedule: Option<PathBuf>, | ||
| 18 | } | 106 | } |
| 19 | 107 | ||
| 20 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | 108 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] |
| 21 | pub enum ExecutionNode { | 109 | enum ExecutionNode { |
| 22 | Frontend, | 110 | Frontend, |
| 23 | Machine(Machine), | 111 | Machine(Machine), |
| 24 | Unknown, | 112 | Unknown, |
| 25 | } | 113 | } |
| 26 | 114 | ||
| 27 | pub struct Context { | 115 | #[derive(Debug, Clone)] |
| 28 | pub node: ExecutionNode, | 116 | struct Context { |
| 29 | pub job_id: Option<u32>, | 117 | node: ExecutionNode, |
| 30 | pub frontend_hostname: Option<String>, | 118 | job_id: Option<u32>, |
| 119 | frontend_hostname: Option<String>, | ||
| 31 | } | 120 | } |
| 32 | 121 | ||
| 33 | pub struct MachineConfig { | 122 | #[derive(Debug, Clone)] |
| 34 | machine | 123 | struct MachineConfig { |
| 124 | machine: Machine, | ||
| 125 | addresses: Vec<Ipv4Addr>, | ||
| 126 | nft_script: String, | ||
| 127 | tc_commands: Vec<String>, | ||
| 128 | ip_commands: Vec<String>, | ||
| 35 | } | 129 | } |
| 36 | 130 | ||
| 37 | #[tokio::main] | 131 | #[tokio::main] |
| 38 | async fn main() -> eyre::Result<()> { | 132 | async fn main() -> Result<()> { |
| 39 | tracing_subscriber::fmt::init(); | 133 | tracing_subscriber::fmt() |
| 134 | .with_env_filter( | ||
| 135 | tracing_subscriber::EnvFilter::try_from_default_env() | ||
| 136 | .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), | ||
| 137 | ) | ||
| 138 | .with_writer(std::io::stderr) | ||
| 139 | .init(); | ||
| 40 | color_eyre::install()?; | 140 | color_eyre::install()?; |
| 41 | 141 | ||
| 42 | let args = Args::parse(); | 142 | let cli = Cli::parse(); |
| 43 | let node = get_execution_node()?; | 143 | match cli.cmd { |
| 44 | let context = Context { | 144 | SubCmd::Net(args) => match args.cmd { |
| 145 | NetSubCmd::Up(args) => cmd_net_up(args).await, | ||
| 146 | NetSubCmd::Down(args) => cmd_net_down(args).await, | ||
| 147 | NetSubCmd::Show(args) => cmd_net_show(args).await, | ||
| 148 | NetSubCmd::Preview(args) => cmd_net_preview(args).await, | ||
| 149 | }, | ||
| 150 | SubCmd::Run(args) => cmd_run(args).await, | ||
| 151 | } | ||
| 152 | } | ||
| 153 | |||
| 154 | async fn context_from_common(common: &Common) -> Result<Context> { | ||
| 155 | let node = get_execution_node().await?; | ||
| 156 | Ok(Context { | ||
| 45 | node, | 157 | node, |
| 46 | job_id: args.job_id, | 158 | job_id: common.job_id, |
| 47 | frontend_hostname: args.frontend_hostname, | 159 | frontend_hostname: common.frontend_hostname.clone(), |
| 160 | }) | ||
| 161 | } | ||
| 162 | |||
| 163 | async fn cmd_net_up(args: NetUpArgs) -> Result<()> { | ||
| 164 | let context = context_from_common(&args.common).await?; | ||
| 165 | let matrix_content = tokio::fs::read_to_string(&args.latency_matrix) | ||
| 166 | .await | ||
| 167 | .context("reading latecy matrix")?; | ||
| 168 | let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) | ||
| 169 | .context("parsing latency matrix")?; | ||
| 170 | let machines = job_list_machines(&context).await?; | ||
| 171 | let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); | ||
| 172 | machines_net_container_build(&context, &machines).await?; | ||
| 173 | machines_clean(&context, &machines).await?; | ||
| 174 | machines_configure(&context, &configs).await?; | ||
| 175 | Ok(()) | ||
| 176 | } | ||
| 177 | |||
| 178 | async fn cmd_net_down(args: NetDownArgs) -> Result<()> { | ||
| 179 | let context = context_from_common(&args.common).await?; | ||
| 180 | let machines = job_list_machines(&context).await?; | ||
| 181 | machines_net_container_build(&context, &machines).await?; | ||
| 182 | machines_clean(&context, &machines).await?; | ||
| 183 | Ok(()) | ||
| 184 | } | ||
| 185 | |||
| 186 | async fn cmd_net_show(args: NetShowArgs) -> Result<()> { | ||
| 187 | let context = context_from_common(&args.common).await?; | ||
| 188 | let machines = job_list_machines(&context).await?; | ||
| 189 | let mut set = JoinSet::default(); | ||
| 190 | for machine in machines { | ||
| 191 | let context = context.clone(); | ||
| 192 | set.spawn(async move { (machine, machine_list_addresses(&context, machine).await) }); | ||
| 193 | } | ||
| 194 | let mut addresses = Vec::default(); | ||
| 195 | for (machine, result) in set.join_all().await { | ||
| 196 | let addrs = result?; | ||
| 197 | for addr in addrs { | ||
| 198 | addresses.push((machine, addr)); | ||
| 199 | } | ||
| 200 | } | ||
| 201 | addresses.sort(); | ||
| 202 | for (machine, addr) in addresses { | ||
| 203 | println!("{} {}", machine, addr); | ||
| 204 | } | ||
| 205 | Ok(()) | ||
| 206 | } | ||
| 207 | |||
| 208 | async fn cmd_net_preview(args: NetPreviewArgs) -> Result<()> { | ||
| 209 | let matrix_content = tokio::fs::read_to_string(&args.latency_matrix) | ||
| 210 | .await | ||
| 211 | .context("reading latecy matrix")?; | ||
| 212 | let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) | ||
| 213 | .context("parsing latency matrix")?; | ||
| 214 | let machines = args.machine; | ||
| 215 | let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); | ||
| 216 | |||
| 217 | for config in configs { | ||
| 218 | (0..20).for_each(|_| print!("-")); | ||
| 219 | print!(" {} ", config.machine); | ||
| 220 | (0..20).for_each(|_| print!("-")); | ||
| 221 | println!(); | ||
| 222 | println!("{}", machine_configuration_script(&config)); | ||
| 223 | } | ||
| 224 | Ok(()) | ||
| 225 | } | ||
| 226 | |||
| 227 | fn machine_from_addr(addr: Ipv4Addr) -> Result<Machine> { | ||
| 228 | let machine_index = usize::from(addr.octets()[1]); | ||
| 229 | Machine::from_index(machine_index) | ||
| 230 | .ok_or_else(|| eyre::eyre!("failed to resolve machine from address {addr}")) | ||
| 231 | } | ||
| 232 | |||
| 233 | #[derive(Debug, Clone)] | ||
| 234 | struct ScheduledContainer { | ||
| 235 | name: String, | ||
| 236 | image: String, | ||
| 237 | machine: Machine, | ||
| 238 | address: Ipv4Addr, | ||
| 239 | variables: HashMap<String, String>, | ||
| 240 | } | ||
| 241 | |||
| 242 | fn parse_schedule(schedule: &str) -> Result<Vec<ScheduledContainer>> { | ||
| 243 | #[derive(Debug, Deserialize)] | ||
| 244 | struct ScheduleItem { | ||
| 245 | name: Option<String>, | ||
| 246 | address: Ipv4Addr, | ||
| 247 | image: String, | ||
| 248 | env: HashMap<String, String>, | ||
| 249 | } | ||
| 250 | |||
| 251 | let items = serde_json::from_str::<Vec<ScheduleItem>>(schedule)?; | ||
| 252 | let mut containers = Vec::default(); | ||
| 253 | for item in items { | ||
| 254 | let name = match item.name { | ||
| 255 | Some(name) => name, | ||
| 256 | None => item.address.to_string(), | ||
| 257 | }; | ||
| 258 | let machine = machine_from_addr(item.address)?; | ||
| 259 | |||
| 260 | containers.push(ScheduledContainer { | ||
| 261 | name, | ||
| 262 | image: item.image, | ||
| 263 | machine, | ||
| 264 | address: item.address, | ||
| 265 | variables: item.env, | ||
| 266 | }); | ||
| 267 | } | ||
| 268 | Ok(containers) | ||
| 269 | } | ||
| 270 | |||
| 271 | async fn cmd_run(args: RunArgs) -> Result<()> { | ||
| 272 | let ctx = context_from_common(&args.common).await?; | ||
| 273 | let machines = job_list_machines(&ctx).await?; | ||
| 274 | let schedule = match args.schedule { | ||
| 275 | Some(path) => tokio::fs::read_to_string(&path) | ||
| 276 | .await | ||
| 277 | .with_context(|| format!("reading schedule file: {}", path.display()))?, | ||
| 278 | None => { | ||
| 279 | let mut stdin = String::default(); | ||
| 280 | tokio::io::stdin() | ||
| 281 | .read_to_string(&mut stdin) | ||
| 282 | .await | ||
| 283 | .context("reading schedule from stdin")?; | ||
| 284 | stdin | ||
| 285 | } | ||
| 286 | }; | ||
| 287 | let containers = parse_schedule(&schedule)?; | ||
| 288 | |||
| 289 | machines_foreach(&machines, |machine| machine_containers_clean(&ctx, machine)).await?; | ||
| 290 | machines_foreach(&machines, |machine| { | ||
| 291 | let ctx = ctx.clone(); | ||
| 292 | let containers = containers | ||
| 293 | .iter() | ||
| 294 | .filter(|c| c.machine == machine) | ||
| 295 | .cloned() | ||
| 296 | .collect::<Vec<_>>(); | ||
| 297 | let mut script = String::default(); | ||
| 298 | for (idx, container) in containers.iter().enumerate() { | ||
| 299 | script.push_str("docker create \\\n"); | ||
| 300 | script.push_str("\t--pull=always \\\n"); | ||
| 301 | script.push_str("\t--network=host \\\n"); | ||
| 302 | script.push_str("\t--restart=no \\\n"); | ||
| 303 | script.push_str(&format!("\t--name {} \\\n", container.name)); | ||
| 304 | for (key, val) in container.variables.iter() { | ||
| 305 | script.push_str("\t-e "); | ||
| 306 | script.push_str(key); | ||
| 307 | script.push_str("="); | ||
| 308 | script.push_str(val); | ||
| 309 | script.push_str(" \\\n"); | ||
| 310 | } | ||
| 311 | script.push_str("\t"); | ||
| 312 | script.push_str(&container.image); | ||
| 313 | script.push_str(" &\n"); | ||
| 314 | script.push_str(&format!("pid_{idx}=$!\n\n")); | ||
| 315 | } | ||
| 316 | |||
| 317 | for (idx, container) in containers.iter().enumerate() { | ||
| 318 | let name = &container.name; | ||
| 319 | script.push_str(&format!( | ||
| 320 | "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n" | ||
| 321 | )); | ||
| 322 | } | ||
| 323 | tracing::debug!("container creation script:\n{script}"); | ||
| 324 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 325 | }) | ||
| 326 | .await?; | ||
| 327 | |||
| 328 | tracing::info!("starting all containers on all machines"); | ||
| 329 | machines_foreach( | ||
| 330 | machines | ||
| 331 | .iter() | ||
| 332 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | ||
| 333 | |machine| { | ||
| 334 | machine_run_script( | ||
| 335 | &ctx, | ||
| 336 | machine, | ||
| 337 | "docker container ls -aq | xargs docker container start", | ||
| 338 | ) | ||
| 339 | }, | ||
| 340 | ) | ||
| 341 | .await?; | ||
| 342 | |||
| 343 | tracing::info!("waiting for all containers to exit"); | ||
| 344 | machines_foreach(&machines, |machine| { | ||
| 345 | let ctx = ctx.clone(); | ||
| 346 | let containers = containers | ||
| 347 | .iter() | ||
| 348 | .filter(|c| c.machine == machine) | ||
| 349 | .cloned() | ||
| 350 | .collect::<Vec<_>>(); | ||
| 351 | let mut script = String::default(); | ||
| 352 | for container in containers { | ||
| 353 | let name = &container.name; | ||
| 354 | script.push_str(&format!("if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n")); | ||
| 355 | script.push_str(&format!("\techo Container {name} failed\n")); | ||
| 356 | script.push_str(&format!("\tdocker logs {name} 2>1\n")); | ||
| 357 | script.push_str("\texit 1\n"); | ||
| 358 | script.push_str("fi\n\n"); | ||
| 359 | } | ||
| 360 | script.push_str("exit 0\n"); | ||
| 361 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 362 | }) | ||
| 363 | .await?; | ||
| 364 | |||
| 365 | tracing::info!("saving logs to disk on all machines"); | ||
| 366 | machines_foreach(&machines, |machine| { | ||
| 367 | let ctx = ctx.clone(); | ||
| 368 | let containers = containers | ||
| 369 | .iter() | ||
| 370 | .filter(|c| c.machine == machine) | ||
| 371 | .cloned() | ||
| 372 | .collect::<Vec<_>>(); | ||
| 373 | let mut script = String::default(); | ||
| 374 | script.push_str("set -e\n"); | ||
| 375 | script.push_str("mkdir -p /tmp/oar-p2p-logs\n"); | ||
| 376 | script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n"); | ||
| 377 | for container in containers { | ||
| 378 | let name = &container.name; | ||
| 379 | script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n")); | ||
| 380 | } | ||
| 381 | script.push_str("exit 0\n"); | ||
| 382 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 383 | }) | ||
| 384 | .await?; | ||
| 385 | |||
| 386 | machines_foreach( | ||
| 387 | machines | ||
| 388 | .iter() | ||
| 389 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | ||
| 390 | |machine| machine_copy_logs_dir(&ctx, machine, &args.output_dir), | ||
| 391 | ) | ||
| 392 | .await?; | ||
| 393 | |||
| 394 | Ok(()) | ||
| 395 | } | ||
| 396 | |||
| 397 | async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> { | ||
| 398 | let scp_common = &[ | ||
| 399 | "-o", | ||
| 400 | "StrictHostKeyChecking=no", | ||
| 401 | "-o", | ||
| 402 | "UserKnownHostsFile=/dev/null", | ||
| 403 | ]; | ||
| 404 | |||
| 405 | let mut args = vec![]; | ||
| 406 | args.extend(scp_common); | ||
| 407 | if ctx.node == ExecutionNode::Unknown { | ||
| 408 | args.push("-J"); | ||
| 409 | args.push(ctx.frontend_hostname.as_ref().expect("TODO")); | ||
| 410 | } | ||
| 411 | args.push("-r"); | ||
| 412 | |||
| 413 | let source_path = format!("{}:/tmp/oar-p2p-logs", machine.hostname()); | ||
| 414 | let destination_path = output_dir.display().to_string(); | ||
| 415 | args.push(&source_path); | ||
| 416 | args.push(&destination_path); | ||
| 417 | |||
| 418 | let output = Command::new("scp").args(args).output().await?; | ||
| 419 | output.exit_ok()?; | ||
| 420 | Ok(()) | ||
| 421 | } | ||
| 422 | |||
| 423 | async fn machines_foreach<F, FUT, RET>( | ||
| 424 | machines: impl IntoIterator<Item = &Machine>, | ||
| 425 | f: F, | ||
| 426 | ) -> Result<()> | ||
| 427 | where | ||
| 428 | F: Fn(Machine) -> FUT, | ||
| 429 | FUT: std::future::Future<Output = Result<RET>>, | ||
| 430 | { | ||
| 431 | let mut futures = FuturesUnordered::new(); | ||
| 432 | |||
| 433 | for &machine in machines { | ||
| 434 | let fut = f(machine); | ||
| 435 | let fut = async move { (machine, fut.await) }; | ||
| 436 | futures.push(fut); | ||
| 437 | } | ||
| 438 | |||
| 439 | while let Some((machine, result)) = futures.next().await { | ||
| 440 | if let Err(err) = result { | ||
| 441 | tracing::error!("error on machine {machine}: {err}"); | ||
| 442 | return Err(err); | ||
| 443 | } | ||
| 444 | } | ||
| 445 | Ok(()) | ||
| 446 | } | ||
| 447 | |||
| 448 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | ||
| 449 | async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 450 | tracing::info!("removing all containers..."); | ||
| 451 | machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?; | ||
| 452 | Ok(()) | ||
| 453 | } | ||
| 454 | |||
| 455 | #[tracing::instrument(ret, err, skip_all)] | ||
| 456 | async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> { | ||
| 457 | tracing::info!("cleaning machines: {machines:?}"); | ||
| 458 | let mut set = JoinSet::default(); | ||
| 459 | for &machine in machines { | ||
| 460 | let ctx = ctx.clone(); | ||
| 461 | set.spawn(async move { machine_clean(&ctx, machine).await }); | ||
| 462 | } | ||
| 463 | let results = set.join_all().await; | ||
| 464 | for result in results { | ||
| 465 | result?; | ||
| 466 | } | ||
| 467 | Ok(()) | ||
| 468 | } | ||
| 469 | |||
| 470 | #[tracing::instrument(ret, err, skip_all)] | ||
| 471 | async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> { | ||
| 472 | tracing::info!("building networking container for machines: {machines:?}"); | ||
| 473 | let mut set = JoinSet::default(); | ||
| 474 | for &machine in machines { | ||
| 475 | let ctx = ctx.clone(); | ||
| 476 | set.spawn(async move { machine_net_container_build(&ctx, machine).await }); | ||
| 477 | } | ||
| 478 | for result in set.join_all().await { | ||
| 479 | result?; | ||
| 480 | } | ||
| 481 | Ok(()) | ||
| 482 | } | ||
| 483 | |||
| 484 | #[tracing::instrument(ret, err, skip_all)] | ||
| 485 | async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> { | ||
| 486 | tracing::info!("configuring machines"); | ||
| 487 | let mut set = JoinSet::default(); | ||
| 488 | for config in configs { | ||
| 489 | let ctx = ctx.clone(); | ||
| 490 | let config = config.clone(); | ||
| 491 | set.spawn(async move { machine_configure(&ctx, &config).await }); | ||
| 492 | } | ||
| 493 | for result in set.join_all().await { | ||
| 494 | result?; | ||
| 495 | } | ||
| 496 | Ok(()) | ||
| 497 | } | ||
| 498 | |||
| 499 | async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { | ||
| 500 | let interface = machine.interface(); | ||
| 501 | let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'"); | ||
| 502 | let output = machine_run_script(ctx, machine, &script).await?; | ||
| 503 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 504 | let mut addresses = Vec::default(); | ||
| 505 | for line in stdout.lines().map(str::trim).filter(|l| !l.is_empty()) { | ||
| 506 | tracing::trace!("parsing address from line: '{line}'"); | ||
| 507 | addresses.push(line.parse()?); | ||
| 508 | } | ||
| 509 | Ok(addresses) | ||
| 510 | } | ||
| 511 | |||
| 512 | async fn machine_run( | ||
| 513 | ctx: &Context, | ||
| 514 | machine: Machine, | ||
| 515 | args: &[&str], | ||
| 516 | stdin: Option<&str>, | ||
| 517 | ) -> Result<Output> { | ||
| 518 | let ssh_common = &[ | ||
| 519 | "-o", | ||
| 520 | "StrictHostKeyChecking=no", | ||
| 521 | "-o", | ||
| 522 | "UserKnownHostsFile=/dev/null", | ||
| 523 | ]; | ||
| 524 | |||
| 525 | let mut arguments = match ctx.node { | ||
| 526 | ExecutionNode::Frontend => { | ||
| 527 | let mut arguments = Vec::default(); | ||
| 528 | arguments.push("ssh"); | ||
| 529 | arguments.extend(ssh_common); | ||
| 530 | arguments.push(machine.hostname()); | ||
| 531 | arguments | ||
| 532 | } | ||
| 533 | ExecutionNode::Machine(m) => { | ||
| 534 | if m == machine { | ||
| 535 | vec![] | ||
| 536 | } else { | ||
| 537 | let mut arguments = Vec::default(); | ||
| 538 | arguments.push("ssh"); | ||
| 539 | arguments.extend(ssh_common); | ||
| 540 | arguments.push(machine.hostname()); | ||
| 541 | arguments | ||
| 542 | } | ||
| 543 | } | ||
| 544 | ExecutionNode::Unknown => { | ||
| 545 | let frontend = ctx.frontend_hostname.as_ref().unwrap(); | ||
| 546 | let mut arguments = Vec::default(); | ||
| 547 | arguments.push("ssh"); | ||
| 548 | arguments.extend(ssh_common); | ||
| 549 | arguments.push("-J"); | ||
| 550 | arguments.push(frontend); | ||
| 551 | arguments.push(machine.hostname()); | ||
| 552 | arguments | ||
| 553 | } | ||
| 48 | }; | 554 | }; |
| 49 | let machines = list_job_machines(&context).await?; | 555 | if args.is_empty() { |
| 556 | arguments.push("bash"); | ||
| 557 | } | ||
| 558 | arguments.extend(args); | ||
| 50 | 559 | ||
| 51 | // listing oar job machines | 560 | let mut proc = Command::new(arguments[0]) |
| 52 | // if we are in the frontend we use oarstat | 561 | .args(&arguments[1..]) |
| 53 | // if we are in a job machine we read the cpuset file | 562 | .stdout(std::process::Stdio::piped()) |
| 54 | // if we are outside the cluster we use ssh cluster oarstat | 563 | .stderr(std::process::Stdio::piped()) |
| 564 | .stdin(std::process::Stdio::piped()) | ||
| 565 | .spawn() | ||
| 566 | .context("spawning process")?; | ||
| 55 | 567 | ||
| 56 | // machine generate configurations | 568 | if let Some(stdin) = stdin { |
| 57 | // this does not require any connections | 569 | let proc_stdin = proc.stdin.as_mut().unwrap(); |
| 570 | proc_stdin | ||
| 571 | .write_all(stdin.as_bytes()) | ||
| 572 | .await | ||
| 573 | .context("writing stdin")?; | ||
| 574 | } | ||
| 58 | 575 | ||
| 59 | // machine cleanup interface | 576 | let output = proc |
| 60 | // requires running commands inside a container inside a machine | 577 | .wait_with_output() |
| 61 | // we could generate a cleanup bash script and execute that in oneshot | 578 | .await |
| 579 | .context("waiting for process to exit")?; | ||
| 62 | 580 | ||
| 63 | // machine apply configuration | 581 | Ok(output) |
| 64 | // this also requires running some scripts inside a container inside the machine | 582 | } |
| 65 | 583 | ||
| 584 | async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> { | ||
| 585 | tracing::trace!("running script on machine {machine}:\n{script}"); | ||
| 586 | let output = machine_run(ctx, machine, &[], Some(script)).await?; | ||
| 587 | tracing::trace!( | ||
| 588 | "stdout:\n{}", | ||
| 589 | std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>") | ||
| 590 | ); | ||
| 591 | tracing::trace!( | ||
| 592 | "stderr:\n{}", | ||
| 593 | std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>") | ||
| 594 | ); | ||
| 595 | Ok(output.exit_ok()?) | ||
| 596 | } | ||
| 597 | |||
| 598 | async fn machine_net_container_run_script( | ||
| 599 | ctx: &Context, | ||
| 600 | machine: Machine, | ||
| 601 | script: &str, | ||
| 602 | ) -> Result<Output> { | ||
| 603 | machine_run( | ||
| 604 | ctx, | ||
| 605 | machine, | ||
| 606 | &[ | ||
| 607 | "docker", | ||
| 608 | "run", | ||
| 609 | "--rm", | ||
| 610 | "-i", | ||
| 611 | "--net=host", | ||
| 612 | "--privileged", | ||
| 613 | CONTAINER_IMAGE_NAME, | ||
| 614 | ], | ||
| 615 | Some(script), | ||
| 616 | ) | ||
| 617 | .await | ||
| 618 | } | ||
| 619 | |||
| 620 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | ||
| 621 | async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 622 | let script = r#" | ||
| 623 | set -e | ||
| 624 | cat << EOF > /tmp/oar-p2p.containerfile | ||
| 625 | FROM alpine:latest | ||
| 626 | RUN apk update && \ | ||
| 627 | apk add --no-cache bash grep iproute2 iproute2-tc nftables && \ | ||
| 628 | rm -rf /var/cache/apk/* | ||
| 629 | |||
| 630 | WORKDIR /work | ||
| 631 | EOF | ||
| 632 | |||
| 633 | docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile . | ||
| 634 | "#; | ||
| 635 | machine_run_script(ctx, machine, script).await?; | ||
| 66 | Ok(()) | 636 | Ok(()) |
| 67 | } | 637 | } |
| 68 | 638 | ||
| 69 | async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | 639 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] |
| 640 | async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 641 | let interface = machine.interface(); | ||
| 642 | let mut script = String::default(); | ||
| 643 | script.push_str(&format!( | ||
| 644 | "ip route del 10.0.0.0/8 dev {interface} || true\n" | ||
| 645 | )); | ||
| 646 | script.push_str(&format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+/32' | sed 's/\\(.*\\)/addr del \\1 dev {interface}/' | ip -b -\n")); | ||
| 647 | script.push_str(&format!( | ||
| 648 | "tc qdisc del dev {interface} root 2>/dev/null || true\n" | ||
| 649 | )); | ||
| 650 | script.push_str(&format!( | ||
| 651 | "tc qdisc del dev {interface} ingress 2>/dev/null || true\n" | ||
| 652 | )); | ||
| 653 | script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n"); | ||
| 654 | script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n"); | ||
| 655 | script.push_str("nft delete table oar-p2p 2>/dev/null || true\n"); | ||
| 656 | let output = machine_net_container_run_script(&ctx, machine, &script).await?; | ||
| 657 | Ok(()) | ||
| 658 | } | ||
| 659 | |||
| 660 | fn machine_configuration_script(config: &MachineConfig) -> String { | ||
| 661 | let mut script = String::default(); | ||
| 662 | // ip configuration | ||
| 663 | script.push_str("cat << EOF | ip -b -\n"); | ||
| 664 | for command in config.ip_commands.iter() { | ||
| 665 | script.push_str(command); | ||
| 666 | script.push_str("\n"); | ||
| 667 | } | ||
| 668 | script.push_str("\nEOF\n"); | ||
| 669 | |||
| 670 | // tc configuration | ||
| 671 | script.push_str("cat << EOF | tc -b -\n"); | ||
| 672 | for command in config.tc_commands.iter() { | ||
| 673 | script.push_str(command); | ||
| 674 | script.push_str("\n"); | ||
| 675 | } | ||
| 676 | script.push_str("\nEOF\n"); | ||
| 677 | |||
| 678 | // nft configuration | ||
| 679 | script.push_str("cat << EOF | nft -f -\n"); | ||
| 680 | script.push_str(&config.nft_script); | ||
| 681 | script.push_str("\nEOF\n"); | ||
| 682 | script | ||
| 683 | } | ||
| 684 | |||
| 685 | #[tracing::instrument(ret, err, skip_all, fields(machine = config.machine.to_string()))] | ||
| 686 | async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> { | ||
| 687 | let script = machine_configuration_script(config); | ||
| 688 | tracing::debug!("configuration script:\n{script}"); | ||
| 689 | machine_net_container_run_script(ctx, config.machine, &script).await?; | ||
| 690 | Ok(()) | ||
| 691 | } | ||
| 692 | |||
| 693 | fn machine_address_for_idx(machine: Machine, idx: u32) -> Ipv4Addr { | ||
| 694 | let c = u8::try_from(idx / 254).unwrap(); | ||
| 695 | let d = u8::try_from(idx % 254 + 1).unwrap(); | ||
| 696 | Ipv4Addr::new(10, machine.index().try_into().unwrap(), c, d) | ||
| 697 | } | ||
| 698 | |||
| 699 | fn machine_generate_configs( | ||
| 700 | matrix: &LatencyMatrix, | ||
| 701 | machines: &[Machine], | ||
| 702 | addr_per_cpu: u32, | ||
| 703 | ) -> Vec<MachineConfig> { | ||
| 704 | let mut configs = Vec::default(); | ||
| 705 | let mut addresses = Vec::default(); | ||
| 706 | let mut address_to_index = HashMap::<Ipv4Addr, usize>::default(); | ||
| 707 | |||
| 708 | // gather all addresses across all machines | ||
| 709 | for &machine in machines { | ||
| 710 | for i in 0..(addr_per_cpu * machine.cpus()) { | ||
| 711 | let address = machine_address_for_idx(machine, i); | ||
| 712 | addresses.push(address); | ||
| 713 | address_to_index.insert(address, addresses.len() - 1); | ||
| 714 | } | ||
| 715 | } | ||
| 716 | |||
| 717 | for &machine in machines { | ||
| 718 | let mut machine_addresses = Vec::default(); | ||
| 719 | let mut machine_ip_commands = Vec::default(); | ||
| 720 | let mut machine_tc_commands = Vec::default(); | ||
| 721 | let mut machine_nft_script = String::default(); | ||
| 722 | |||
| 723 | machine_ip_commands.push(format!("route add 10.0.0.0/8 dev {}", machine.interface())); | ||
| 724 | for i in 0..(addr_per_cpu * machine.cpus()) { | ||
| 725 | let address = machine_address_for_idx(machine, i); | ||
| 726 | machine_addresses.push(address); | ||
| 727 | machine_ip_commands.push(format!("addr add {address}/32 dev {}", machine.interface())); | ||
| 728 | } | ||
| 729 | |||
| 730 | let mut latencies_set = HashSet::<u32>::default(); | ||
| 731 | let mut latencies_buckets = Vec::<u32>::default(); | ||
| 732 | let mut latencies_addr_pairs = HashMap::<u32, Vec<(Ipv4Addr, Ipv4Addr)>>::default(); | ||
| 733 | for &addr in &machine_addresses { | ||
| 734 | let addr_idx = address_to_index[&addr]; | ||
| 735 | for other_idx in (0..addresses.len()).filter(|i| *i != addr_idx) { | ||
| 736 | let other = addresses[other_idx]; | ||
| 737 | let latency = matrix.latency(addr_idx, other_idx); | ||
| 738 | let latency_millis = u32::try_from(latency.as_millis()).unwrap(); | ||
| 739 | if !latencies_set.contains(&latency_millis) { | ||
| 740 | latencies_set.insert(latency_millis); | ||
| 741 | latencies_buckets.push(latency_millis); | ||
| 742 | } | ||
| 743 | latencies_addr_pairs | ||
| 744 | .entry(latency_millis) | ||
| 745 | .or_default() | ||
| 746 | .push((addr, other)); | ||
| 747 | } | ||
| 748 | } | ||
| 749 | |||
| 750 | for iface in &["lo", machine.interface()] { | ||
| 751 | machine_tc_commands.push(format!( | ||
| 752 | "qdisc add dev {iface} root handle 1: htb default 9999" | ||
| 753 | )); | ||
| 754 | machine_tc_commands.push(format!( | ||
| 755 | "class add dev {iface} parent 1: classid 1:9999 htb rate 10gbit" | ||
| 756 | )); | ||
| 757 | for (idx, &latency_millis) in latencies_buckets.iter().enumerate() { | ||
| 758 | // tc class for latency at idx X is X + 1 | ||
| 759 | let latency_class_id = idx + 1; | ||
| 760 | // mark for latency at idx X is X + 1 | ||
| 761 | let latency_mark = idx + 1; | ||
| 762 | |||
| 763 | machine_tc_commands.push(format!( | ||
| 764 | "class add dev {iface} parent 1: classid 1:{} htb rate 10gbit", | ||
| 765 | latency_class_id | ||
| 766 | )); | ||
| 767 | // why idx + 2 here? I dont remember anymore and forgot to comment | ||
| 768 | machine_tc_commands.push(format!( | ||
| 769 | "qdisc add dev {iface} parent 1:{} handle {}: netem delay {latency_millis}ms", | ||
| 770 | latency_class_id, | ||
| 771 | idx + 2 | ||
| 772 | )); | ||
| 773 | // TODO: is the order of these things correct? | ||
| 774 | machine_tc_commands.push(format!( | ||
| 775 | "filter add dev {iface} parent 1:0 prio 1 handle {} fw flowid 1:{}", | ||
| 776 | latency_mark, latency_class_id, | ||
| 777 | )); | ||
| 778 | } | ||
| 779 | } | ||
| 780 | |||
| 781 | machine_nft_script.push_str("table ip oar-p2p {\n"); | ||
| 782 | machine_nft_script.push_str("\tmap mark_pairs {\n"); | ||
| 783 | machine_nft_script.push_str("\t\ttype ipv4_addr . ipv4_addr : mark\n"); | ||
| 784 | machine_nft_script.push_str("\t\telements = {\n"); | ||
| 785 | for (latency_idx, &latency_millis) in latencies_buckets.iter().enumerate() { | ||
| 786 | let latency_mark = latency_idx + 1; | ||
| 787 | let pairs = match latencies_addr_pairs.get(&latency_millis) { | ||
| 788 | Some(pairs) => pairs, | ||
| 789 | None => continue, | ||
| 790 | }; | ||
| 791 | |||
| 792 | for (src, dst) in pairs { | ||
| 793 | assert_ne!(src, dst); | ||
| 794 | machine_nft_script.push_str(&format!("\t\t\t{src} . {dst} : {latency_mark},\n")); | ||
| 795 | } | ||
| 796 | } | ||
| 797 | machine_nft_script.push_str("\t\t}\n"); | ||
| 798 | machine_nft_script.push_str("\t}\n"); | ||
| 799 | machine_nft_script.push_str("\n"); | ||
| 800 | machine_nft_script.push_str("\tchain postrouting {\n"); | ||
| 801 | machine_nft_script.push_str("\t\ttype filter hook postrouting priority mangle -1\n"); | ||
| 802 | machine_nft_script.push_str("\t\tpolicy accept\n"); | ||
| 803 | machine_nft_script | ||
| 804 | .push_str("\t\tmeta mark set ip saddr . ip daddr map @mark_pairs counter\n"); | ||
| 805 | machine_nft_script.push_str("\t}\n"); | ||
| 806 | machine_nft_script.push_str("}\n"); | ||
| 807 | |||
| 808 | configs.push(MachineConfig { | ||
| 809 | machine, | ||
| 810 | addresses: machine_addresses, | ||
| 811 | nft_script: machine_nft_script, | ||
| 812 | tc_commands: machine_tc_commands, | ||
| 813 | ip_commands: machine_ip_commands, | ||
| 814 | }); | ||
| 815 | } | ||
| 816 | configs | ||
| 817 | } | ||
| 818 | |||
| 819 | async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | ||
| 70 | match ctx.node { | 820 | match ctx.node { |
| 71 | ExecutionNode::Frontend => { | 821 | ExecutionNode::Frontend => { |
| 72 | let job_id = match ctx.job_id { | 822 | let job_id = match ctx.job_id { |
| @@ -101,7 +851,7 @@ async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | |||
| 101 | Some(hostname) => hostname, | 851 | Some(hostname) => hostname, |
| 102 | None => { | 852 | None => { |
| 103 | return Err(eyre::eyre!( | 853 | return Err(eyre::eyre!( |
| 104 | "frontend hostname is requiredwhen running from outside the cluster" | 854 | "frontend hostname is required when running from outside the cluster" |
| 105 | )); | 855 | )); |
| 106 | } | 856 | } |
| 107 | }; | 857 | }; |
| @@ -148,7 +898,7 @@ async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | |||
| 148 | } | 898 | } |
| 149 | } | 899 | } |
| 150 | 900 | ||
| 151 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Result<Vec<Machine>> { | 901 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { |
| 152 | #[derive(Debug, Deserialize)] | 902 | #[derive(Debug, Deserialize)] |
| 153 | struct JobSchema { | 903 | struct JobSchema { |
| 154 | assigned_network_address: Vec<String>, | 904 | assigned_network_address: Vec<String>, |
| @@ -168,8 +918,8 @@ fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Resul | |||
| 168 | Ok(machines) | 918 | Ok(machines) |
| 169 | } | 919 | } |
| 170 | 920 | ||
| 171 | fn get_execution_node() -> eyre::Result<ExecutionNode> { | 921 | async fn get_execution_node() -> Result<ExecutionNode> { |
| 172 | let hostname = get_hostname()?; | 922 | let hostname = get_hostname().await?; |
| 173 | let node = match hostname.as_str() { | 923 | let node = match hostname.as_str() { |
| 174 | "frontend" => ExecutionNode::Frontend, | 924 | "frontend" => ExecutionNode::Frontend, |
| 175 | _ => match Machine::from_hostname(&hostname) { | 925 | _ => match Machine::from_hostname(&hostname) { |
| @@ -180,8 +930,12 @@ fn get_execution_node() -> eyre::Result<ExecutionNode> { | |||
| 180 | Ok(node) | 930 | Ok(node) |
| 181 | } | 931 | } |
| 182 | 932 | ||
| 183 | fn get_hostname() -> eyre::Result<String> { | 933 | async fn get_hostname() -> Result<String> { |
| 184 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | 934 | if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await { |
| 935 | Ok(hostname) | ||
| 936 | } else { | ||
| 937 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | ||
| 938 | } | ||
| 185 | } | 939 | } |
| 186 | 940 | ||
| 187 | #[cfg(test)] | 941 | #[cfg(test)] |
