diff options
| author | diogo464 <[email protected]> | 2025-07-10 20:05:11 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-10 20:05:11 +0100 |
| commit | 4fc26211fda53023f8ce703ccf4b1a2bbfbbe10a (patch) | |
| tree | 711fcea1be7a22cf3ef7d3ebf98ea01d7cade62d | |
| parent | a5178fbb0bde3ff9f863ef0cca48748cb993390a (diff) | |
it works, now needs cleanup
| -rw-r--r-- | Cargo.lock | 100 | ||||
| -rw-r--r-- | Cargo.toml | 5 | ||||
| -rw-r--r-- | src/machine.rs | 156 | ||||
| -rw-r--r-- | src/main.rs | 832 |
4 files changed, 991 insertions, 102 deletions
| @@ -199,6 +199,95 @@ dependencies = [ | |||
| 199 | ] | 199 | ] |
| 200 | 200 | ||
| 201 | [[package]] | 201 | [[package]] |
| 202 | name = "futures" | ||
| 203 | version = "0.3.31" | ||
| 204 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 205 | checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" | ||
| 206 | dependencies = [ | ||
| 207 | "futures-channel", | ||
| 208 | "futures-core", | ||
| 209 | "futures-executor", | ||
| 210 | "futures-io", | ||
| 211 | "futures-sink", | ||
| 212 | "futures-task", | ||
| 213 | "futures-util", | ||
| 214 | ] | ||
| 215 | |||
| 216 | [[package]] | ||
| 217 | name = "futures-channel" | ||
| 218 | version = "0.3.31" | ||
| 219 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 220 | checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" | ||
| 221 | dependencies = [ | ||
| 222 | "futures-core", | ||
| 223 | "futures-sink", | ||
| 224 | ] | ||
| 225 | |||
| 226 | [[package]] | ||
| 227 | name = "futures-core" | ||
| 228 | version = "0.3.31" | ||
| 229 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 230 | checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" | ||
| 231 | |||
| 232 | [[package]] | ||
| 233 | name = "futures-executor" | ||
| 234 | version = "0.3.31" | ||
| 235 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 236 | checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" | ||
| 237 | dependencies = [ | ||
| 238 | "futures-core", | ||
| 239 | "futures-task", | ||
| 240 | "futures-util", | ||
| 241 | ] | ||
| 242 | |||
| 243 | [[package]] | ||
| 244 | name = "futures-io" | ||
| 245 | version = "0.3.31" | ||
| 246 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 247 | checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" | ||
| 248 | |||
| 249 | [[package]] | ||
| 250 | name = "futures-macro" | ||
| 251 | version = "0.3.31" | ||
| 252 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 253 | checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" | ||
| 254 | dependencies = [ | ||
| 255 | "proc-macro2", | ||
| 256 | "quote", | ||
| 257 | "syn", | ||
| 258 | ] | ||
| 259 | |||
| 260 | [[package]] | ||
| 261 | name = "futures-sink" | ||
| 262 | version = "0.3.31" | ||
| 263 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 264 | checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" | ||
| 265 | |||
| 266 | [[package]] | ||
| 267 | name = "futures-task" | ||
| 268 | version = "0.3.31" | ||
| 269 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 270 | checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" | ||
| 271 | |||
| 272 | [[package]] | ||
| 273 | name = "futures-util" | ||
| 274 | version = "0.3.31" | ||
| 275 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 276 | checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" | ||
| 277 | dependencies = [ | ||
| 278 | "futures-channel", | ||
| 279 | "futures-core", | ||
| 280 | "futures-io", | ||
| 281 | "futures-macro", | ||
| 282 | "futures-sink", | ||
| 283 | "futures-task", | ||
| 284 | "memchr", | ||
| 285 | "pin-project-lite", | ||
| 286 | "pin-utils", | ||
| 287 | "slab", | ||
| 288 | ] | ||
| 289 | |||
| 290 | [[package]] | ||
| 202 | name = "gimli" | 291 | name = "gimli" |
| 203 | version = "0.31.1" | 292 | version = "0.31.1" |
| 204 | source = "registry+https://github.com/rust-lang/crates.io-index" | 293 | source = "registry+https://github.com/rust-lang/crates.io-index" |
| @@ -313,12 +402,13 @@ dependencies = [ | |||
| 313 | ] | 402 | ] |
| 314 | 403 | ||
| 315 | [[package]] | 404 | [[package]] |
| 316 | name = "oar-p2p-net" | 405 | name = "oar-p2p" |
| 317 | version = "0.1.0" | 406 | version = "0.0.0" |
| 318 | dependencies = [ | 407 | dependencies = [ |
| 319 | "clap", | 408 | "clap", |
| 320 | "color-eyre", | 409 | "color-eyre", |
| 321 | "eyre", | 410 | "eyre", |
| 411 | "futures", | ||
| 322 | "serde", | 412 | "serde", |
| 323 | "serde_json", | 413 | "serde_json", |
| 324 | "thiserror", | 414 | "thiserror", |
| @@ -390,6 +480,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| 390 | checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" | 480 | checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" |
| 391 | 481 | ||
| 392 | [[package]] | 482 | [[package]] |
| 483 | name = "pin-utils" | ||
| 484 | version = "0.1.0" | ||
| 485 | source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| 486 | checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" | ||
| 487 | |||
| 488 | [[package]] | ||
| 393 | name = "proc-macro2" | 489 | name = "proc-macro2" |
| 394 | version = "1.0.95" | 490 | version = "1.0.95" |
| 395 | source = "registry+https://github.com/rust-lang/crates.io-index" | 491 | source = "registry+https://github.com/rust-lang/crates.io-index" |
| @@ -1,12 +1,13 @@ | |||
| 1 | [package] | 1 | [package] |
| 2 | name = "oar-p2p-net" | 2 | name = "oar-p2p" |
| 3 | version = "0.1.0" | 3 | version = "0.0.0" |
| 4 | edition = "2024" | 4 | edition = "2024" |
| 5 | 5 | ||
| 6 | [dependencies] | 6 | [dependencies] |
| 7 | clap = { version = "4.5.40", features = ["derive", "env"] } | 7 | clap = { version = "4.5.40", features = ["derive", "env"] } |
| 8 | color-eyre = "0.6.5" | 8 | color-eyre = "0.6.5" |
| 9 | eyre = "0.6.12" | 9 | eyre = "0.6.12" |
| 10 | futures = "0.3.31" | ||
| 10 | serde = { version = "1.0.219", features = ["derive"] } | 11 | serde = { version = "1.0.219", features = ["derive"] } |
| 11 | serde_json = "1.0.140" | 12 | serde_json = "1.0.140" |
| 12 | thiserror = "2.0.12" | 13 | thiserror = "2.0.12" |
diff --git a/src/machine.rs b/src/machine.rs index f1ad94d..f223e66 100644 --- a/src/machine.rs +++ b/src/machine.rs | |||
| @@ -1,6 +1,17 @@ | |||
| 1 | macro_rules! define_machines { | 1 | macro_rules! define_machines { |
| 2 | ($(($name:ident, $idx:expr, $hostname:expr, $interface:expr)),*) => { | 2 | ($(($name:ident, $idx:expr, $hostname:expr, $cpus:expr, $interface:expr)),*) => { |
| 3 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | 3 | #[derive(Debug)] |
| 4 | pub struct UnknownMachine; | ||
| 5 | |||
| 6 | impl std::fmt::Display for UnknownMachine { | ||
| 7 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 8 | f.write_str("unknown machine") | ||
| 9 | } | ||
| 10 | } | ||
| 11 | |||
| 12 | impl std::error::Error for UnknownMachine {} | ||
| 13 | |||
| 14 | #[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] | ||
| 4 | pub enum Machine { | 15 | pub enum Machine { |
| 5 | $($name,)* | 16 | $($name,)* |
| 6 | } | 17 | } |
| @@ -11,6 +22,17 @@ macro_rules! define_machines { | |||
| 11 | } | 22 | } |
| 12 | } | 23 | } |
| 13 | 24 | ||
| 25 | impl std::str::FromStr for Machine { | ||
| 26 | type Err = UnknownMachine; | ||
| 27 | |||
| 28 | fn from_str(v: &str) -> Result<Self, Self::Err> { | ||
| 29 | match v { | ||
| 30 | $($hostname => Ok(Self::$name),)* | ||
| 31 | _ => Err(UnknownMachine), | ||
| 32 | } | ||
| 33 | } | ||
| 34 | } | ||
| 35 | |||
| 14 | impl Machine { | 36 | impl Machine { |
| 15 | pub fn hostname(&self) -> &'static str { | 37 | pub fn hostname(&self) -> &'static str { |
| 16 | match self { | 38 | match self { |
| @@ -31,6 +53,19 @@ macro_rules! define_machines { | |||
| 31 | } | 53 | } |
| 32 | } | 54 | } |
| 33 | 55 | ||
| 56 | pub fn from_index(index: usize) -> Option<Self> { | ||
| 57 | match index { | ||
| 58 | $($idx => Some(Self::$name),)* | ||
| 59 | _ => None, | ||
| 60 | } | ||
| 61 | } | ||
| 62 | |||
| 63 | pub fn cpus(&self) -> u32 { | ||
| 64 | match self { | ||
| 65 | $(Self::$name => $cpus,)* | ||
| 66 | } | ||
| 67 | } | ||
| 68 | |||
| 34 | pub fn interface(&self) -> &'static str { | 69 | pub fn interface(&self) -> &'static str { |
| 35 | match self { | 70 | match self { |
| 36 | $(Self::$name => $interface,)* | 71 | $(Self::$name => $interface,)* |
| @@ -40,62 +75,65 @@ macro_rules! define_machines { | |||
| 40 | }; | 75 | }; |
| 41 | } | 76 | } |
| 42 | 77 | ||
| 78 | // node cpu counts | ||
| 79 | // oarnodes | grep '^network_address' | cut -d' ' -f3 | sort | uniq -c | ||
| 80 | |||
| 43 | define_machines!( | 81 | define_machines!( |
| 44 | (Alakazam01, 0, "alakazam-01", todo!()), | 82 | (Alakazam01, 0, "alakazam-01", 64, todo!()), |
| 45 | (Alakazam02, 1, "alakazam-02", todo!()), | 83 | (Alakazam02, 1, "alakazam-02", 64, todo!()), |
| 46 | (Alakazam03, 2, "alakazam-03", todo!()), | 84 | (Alakazam03, 2, "alakazam-03", 64, todo!()), |
| 47 | (Alakazam04, 3, "alakazam-04", todo!()), | 85 | (Alakazam04, 3, "alakazam-04", 64, todo!()), |
| 48 | (Alakazam05, 4, "alakazam-05", todo!()), | 86 | (Alakazam05, 4, "alakazam-05", 64, todo!()), |
| 49 | (Alakazam06, 5, "alakazam-06", todo!()), | 87 | (Alakazam06, 5, "alakazam-06", 64, todo!()), |
| 50 | (Alakazam07, 6, "alakazam-07", todo!()), | 88 | (Alakazam07, 6, "alakazam-07", 64, todo!()), |
| 51 | (Alakazam08, 7, "alakazam-08", todo!()), | 89 | (Alakazam08, 7, "alakazam-08", 64, todo!()), |
| 52 | (Bulbasaur1, 8, "bulbasaur-1", todo!()), | 90 | (Bulbasaur1, 8, "bulbasaur-1", 16, todo!()), |
| 53 | (Bulbasaur2, 9, "bulbasaur-2", todo!()), | 91 | (Bulbasaur2, 9, "bulbasaur-2", 16, todo!()), |
| 54 | (Bulbasaur3, 10, "bulbasaur-3", todo!()), | 92 | (Bulbasaur3, 10, "bulbasaur-3", 16, todo!()), |
| 55 | (Charmander1, 11, "charmander-1", "bond0"), | 93 | (Charmander1, 11, "charmander-1", 32, "bond0"), |
| 56 | (Charmander2, 12, "charmander-2", "bond0"), | 94 | (Charmander2, 12, "charmander-2", 32, "bond0"), |
| 57 | (Charmander3, 13, "charmander-3", "bond0"), | 95 | (Charmander3, 13, "charmander-3", 32, "bond0"), |
| 58 | (Charmander4, 14, "charmander-4", "bond0"), | 96 | (Charmander4, 14, "charmander-4", 32, "bond0"), |
| 59 | (Charmander5, 15, "charmander-5", "bond0"), | 97 | (Charmander5, 15, "charmander-5", 32, "bond0"), |
| 60 | (Gengar1, 16, "gengar-1", "bond0"), | 98 | (Gengar1, 16, "gengar-1", 8, "bond0"), |
| 61 | (Gengar2, 17, "gengar-2", "bond0"), | 99 | (Gengar2, 17, "gengar-2", 8, "bond0"), |
| 62 | (Gengar3, 18, "gengar-3", "bond0"), | 100 | (Gengar3, 18, "gengar-3", 8, "bond0"), |
| 63 | (Gengar4, 19, "gengar-4", "bond0"), | 101 | (Gengar4, 19, "gengar-4", 8, "bond0"), |
| 64 | (Gengar5, 20, "gengar-5", "bond0"), | 102 | (Gengar5, 20, "gengar-5", 8, "bond0"), |
| 65 | (Kadabra01, 21, "kadabra-01", todo!()), | 103 | (Kadabra01, 21, "kadabra-01", 64, todo!()), |
| 66 | (Kadabra02, 22, "kadabra-02", todo!()), | 104 | (Kadabra02, 22, "kadabra-02", 64, todo!()), |
| 67 | (Kadabra03, 23, "kadabra-03", todo!()), | 105 | (Kadabra03, 23, "kadabra-03", 64, todo!()), |
| 68 | (Kadabra04, 24, "kadabra-04", todo!()), | 106 | (Kadabra04, 24, "kadabra-04", 64, todo!()), |
| 69 | (Kadabra05, 25, "kadabra-05", todo!()), | 107 | (Kadabra05, 25, "kadabra-05", 64, todo!()), |
| 70 | (Kadabra06, 26, "kadabra-06", todo!()), | 108 | (Kadabra06, 26, "kadabra-06", 64, todo!()), |
| 71 | (Kadabra07, 27, "kadabra-07", todo!()), | 109 | (Kadabra07, 27, "kadabra-07", 64, todo!()), |
| 72 | (Kadabra08, 28, "kadabra-08", todo!()), | 110 | (Kadabra08, 28, "kadabra-08", 64, todo!()), |
| 73 | (Lugia1, 29, "lugia-1", "bond0"), | 111 | (Lugia1, 29, "lugia-1", 64, "bond0"), |
| 74 | (Lugia2, 30, "lugia-2", "bond0"), | 112 | (Lugia2, 30, "lugia-2", 64, "bond0"), |
| 75 | (Lugia3, 31, "lugia-3", "bond0"), | 113 | (Lugia3, 31, "lugia-3", 64, "bond0"), |
| 76 | (Lugia4, 32, "lugia-4", "bond0"), | 114 | (Lugia4, 32, "lugia-4", 64, "bond0"), |
| 77 | (Lugia5, 33, "lugia-5", "bond0"), | 115 | (Lugia5, 33, "lugia-5", 64, "bond0"), |
| 78 | (Magikarp1, 34, "magikarp-1", todo!()), | 116 | (Magikarp1, 34, "magikarp-1", 16, todo!()), |
| 79 | (Moltres01, 35, "moltres-01", todo!()), | 117 | (Moltres01, 35, "moltres-01", 64, todo!()), |
| 80 | (Moltres02, 36, "moltres-02", todo!()), | 118 | (Moltres02, 36, "moltres-02", 64, todo!()), |
| 81 | (Moltres03, 37, "moltres-03", todo!()), | 119 | (Moltres03, 37, "moltres-03", 64, todo!()), |
| 82 | (Moltres04, 38, "moltres-04", todo!()), | 120 | (Moltres04, 38, "moltres-04", 64, todo!()), |
| 83 | (Moltres05, 39, "moltres-05", todo!()), | 121 | (Moltres05, 39, "moltres-05", 64, todo!()), |
| 84 | (Moltres06, 40, "moltres-06", todo!()), | 122 | (Moltres06, 40, "moltres-06", 64, todo!()), |
| 85 | (Moltres07, 41, "moltres-07", todo!()), | 123 | (Moltres07, 41, "moltres-07", 64, todo!()), |
| 86 | (Moltres08, 42, "moltres-08", todo!()), | 124 | (Moltres08, 42, "moltres-08", 64, todo!()), |
| 87 | (Moltres09, 43, "moltres-09", todo!()), | 125 | (Moltres09, 43, "moltres-09", 64, todo!()), |
| 88 | (Moltres10, 44, "moltres-10", todo!()), | 126 | (Moltres10, 44, "moltres-10", 64, todo!()), |
| 89 | (Oddish1, 45, "oddish-1", todo!()), | 127 | (Oddish1, 45, "oddish-1", 4, todo!()), |
| 90 | (Psyduck1, 46, "psyduck-1", todo!()), | 128 | (Psyduck1, 46, "psyduck-1", 8, todo!()), |
| 91 | (Psyduck2, 47, "psyduck-2", todo!()), | 129 | (Psyduck2, 47, "psyduck-2", 8, todo!()), |
| 92 | (Psyduck3, 48, "psyduck-3", todo!()), | 130 | (Psyduck3, 48, "psyduck-3", 8, todo!()), |
| 93 | (Shelder1, 49, "shelder-1", todo!()), | 131 | (Shelder1, 49, "shelder-1", 64, todo!()), |
| 94 | (Squirtle1, 50, "squirtle-1", todo!()), | 132 | (Squirtle1, 50, "squirtle-1", 24, todo!()), |
| 95 | (Squirtle2, 51, "squirtle-2", todo!()), | 133 | (Squirtle2, 51, "squirtle-2", 24, todo!()), |
| 96 | (Squirtle3, 52, "squirtle-3", todo!()), | 134 | (Squirtle3, 52, "squirtle-3", 24, todo!()), |
| 97 | (Squirtle4, 53, "squirtle-4", todo!()), | 135 | (Squirtle4, 53, "squirtle-4", 24, todo!()), |
| 98 | (Staryu1, 54, "staryu-1", todo!()), | 136 | (Staryu1, 54, "staryu-1", 12, todo!()), |
| 99 | (Sudowoodo1, 55, "sudowoodo-1", todo!()), | 137 | (Sudowoodo1, 55, "sudowoodo-1", 16, todo!()), |
| 100 | (Vulpix1, 56, "vulpix-1", todo!()) | 138 | (Vulpix1, 56, "vulpix-1", 112, todo!()) |
| 101 | ); | 139 | ); |
diff --git a/src/main.rs b/src/main.rs index dfbddc0..36c2a9b 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -1,72 +1,822 @@ | |||
| 1 | use std::collections::{HashMap, HashSet}; | 1 | #![feature(exit_status_error)] |
| 2 | use std::{ | ||
| 3 | collections::{HashMap, HashSet}, | ||
| 4 | net::Ipv4Addr, | ||
| 5 | path::{Path, PathBuf}, | ||
| 6 | process::Output, | ||
| 7 | }; | ||
| 2 | 8 | ||
| 3 | use clap::Parser; | 9 | use clap::{Args, Parser, Subcommand}; |
| 4 | use eyre::Context as _; | 10 | use eyre::Context as _; |
| 11 | use eyre::Result; | ||
| 12 | use futures::{StreamExt as _, stream::FuturesUnordered}; | ||
| 5 | use machine::Machine; | 13 | use machine::Machine; |
| 6 | use serde::Deserialize; | 14 | use serde::Deserialize; |
| 7 | use tokio::process::Command; | 15 | use tokio::{ |
| 16 | io::{AsyncReadExt as _, AsyncWriteExt as _}, | ||
| 17 | process::Command, | ||
| 18 | task::JoinSet, | ||
| 19 | }; | ||
| 20 | |||
| 21 | use crate::latency_matrix::LatencyMatrix; | ||
| 8 | 22 | ||
| 9 | pub mod latency_matrix; | 23 | pub mod latency_matrix; |
| 10 | pub mod machine; | 24 | pub mod machine; |
| 11 | 25 | ||
| 26 | const CONTAINER_IMAGE_NAME: &'static str = "local/oar-p2p-networking"; | ||
| 27 | |||
| 12 | #[derive(Debug, Parser)] | 28 | #[derive(Debug, Parser)] |
| 13 | pub struct Args { | 29 | struct Cli { |
| 30 | #[clap(subcommand)] | ||
| 31 | cmd: SubCmd, | ||
| 32 | } | ||
| 33 | |||
| 34 | #[derive(Debug, Args)] | ||
| 35 | struct Common { | ||
| 14 | #[clap(long, env = "OAR_JOB_ID")] | 36 | #[clap(long, env = "OAR_JOB_ID")] |
| 15 | pub job_id: Option<u32>, | 37 | job_id: Option<u32>, |
| 38 | |||
| 16 | #[clap(long, env = "FRONTEND_HOSTNAME")] | 39 | #[clap(long, env = "FRONTEND_HOSTNAME")] |
| 17 | pub frontend_hostname: Option<String>, | 40 | frontend_hostname: Option<String>, |
| 41 | } | ||
| 42 | |||
| 43 | #[derive(Debug, Subcommand)] | ||
| 44 | enum SubCmd { | ||
| 45 | Net(NetArgs), | ||
| 46 | Run(RunArgs), | ||
| 47 | } | ||
| 48 | |||
| 49 | #[derive(Debug, Args)] | ||
| 50 | struct NetArgs { | ||
| 51 | #[clap(subcommand)] | ||
| 52 | cmd: NetSubCmd, | ||
| 53 | } | ||
| 54 | |||
| 55 | #[derive(Debug, Subcommand)] | ||
| 56 | enum NetSubCmd { | ||
| 57 | Up(NetUpArgs), | ||
| 58 | Down(NetDownArgs), | ||
| 59 | Show(NetShowArgs), | ||
| 60 | Preview(NetPreviewArgs), | ||
| 61 | } | ||
| 62 | |||
| 63 | #[derive(Debug, Args)] | ||
| 64 | struct NetUpArgs { | ||
| 65 | #[clap(flatten)] | ||
| 66 | common: Common, | ||
| 67 | #[clap(long)] | ||
| 68 | addr_per_cpu: u32, | ||
| 69 | #[clap(long)] | ||
| 70 | latency_matrix: PathBuf, | ||
| 71 | } | ||
| 72 | |||
| 73 | #[derive(Debug, Args)] | ||
| 74 | struct NetDownArgs { | ||
| 75 | #[clap(flatten)] | ||
| 76 | common: Common, | ||
| 77 | } | ||
| 78 | |||
| 79 | #[derive(Debug, Args)] | ||
| 80 | struct NetShowArgs { | ||
| 81 | #[clap(flatten)] | ||
| 82 | common: Common, | ||
| 83 | } | ||
| 84 | |||
| 85 | #[derive(Debug, Args)] | ||
| 86 | struct NetPreviewArgs { | ||
| 87 | #[clap(long)] | ||
| 88 | machine: Vec<Machine>, | ||
| 89 | |||
| 90 | #[clap(long)] | ||
| 91 | addr_per_cpu: u32, | ||
| 92 | |||
| 93 | #[clap(long)] | ||
| 94 | latency_matrix: PathBuf, | ||
| 95 | } | ||
| 96 | |||
| 97 | #[derive(Debug, Args)] | ||
| 98 | struct RunArgs { | ||
| 99 | #[clap(flatten)] | ||
| 100 | common: Common, | ||
| 101 | |||
| 102 | #[clap(long)] | ||
| 103 | output_dir: PathBuf, | ||
| 104 | |||
| 105 | schedule: Option<PathBuf>, | ||
| 18 | } | 106 | } |
| 19 | 107 | ||
| 20 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | 108 | #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] |
| 21 | pub enum ExecutionNode { | 109 | enum ExecutionNode { |
| 22 | Frontend, | 110 | Frontend, |
| 23 | Machine(Machine), | 111 | Machine(Machine), |
| 24 | Unknown, | 112 | Unknown, |
| 25 | } | 113 | } |
| 26 | 114 | ||
| 27 | pub struct Context { | 115 | #[derive(Debug, Clone)] |
| 28 | pub node: ExecutionNode, | 116 | struct Context { |
| 29 | pub job_id: Option<u32>, | 117 | node: ExecutionNode, |
| 30 | pub frontend_hostname: Option<String>, | 118 | job_id: Option<u32>, |
| 119 | frontend_hostname: Option<String>, | ||
| 31 | } | 120 | } |
| 32 | 121 | ||
| 33 | pub struct MachineConfig { | 122 | #[derive(Debug, Clone)] |
| 34 | machine | 123 | struct MachineConfig { |
| 124 | machine: Machine, | ||
| 125 | addresses: Vec<Ipv4Addr>, | ||
| 126 | nft_script: String, | ||
| 127 | tc_commands: Vec<String>, | ||
| 128 | ip_commands: Vec<String>, | ||
| 35 | } | 129 | } |
| 36 | 130 | ||
| 37 | #[tokio::main] | 131 | #[tokio::main] |
| 38 | async fn main() -> eyre::Result<()> { | 132 | async fn main() -> Result<()> { |
| 39 | tracing_subscriber::fmt::init(); | 133 | tracing_subscriber::fmt() |
| 134 | .with_env_filter( | ||
| 135 | tracing_subscriber::EnvFilter::try_from_default_env() | ||
| 136 | .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), | ||
| 137 | ) | ||
| 138 | .with_writer(std::io::stderr) | ||
| 139 | .init(); | ||
| 40 | color_eyre::install()?; | 140 | color_eyre::install()?; |
| 41 | 141 | ||
| 42 | let args = Args::parse(); | 142 | let cli = Cli::parse(); |
| 43 | let node = get_execution_node()?; | 143 | match cli.cmd { |
| 44 | let context = Context { | 144 | SubCmd::Net(args) => match args.cmd { |
| 145 | NetSubCmd::Up(args) => cmd_net_up(args).await, | ||
| 146 | NetSubCmd::Down(args) => cmd_net_down(args).await, | ||
| 147 | NetSubCmd::Show(args) => cmd_net_show(args).await, | ||
| 148 | NetSubCmd::Preview(args) => cmd_net_preview(args).await, | ||
| 149 | }, | ||
| 150 | SubCmd::Run(args) => cmd_run(args).await, | ||
| 151 | } | ||
| 152 | } | ||
| 153 | |||
| 154 | async fn context_from_common(common: &Common) -> Result<Context> { | ||
| 155 | let node = get_execution_node().await?; | ||
| 156 | Ok(Context { | ||
| 45 | node, | 157 | node, |
| 46 | job_id: args.job_id, | 158 | job_id: common.job_id, |
| 47 | frontend_hostname: args.frontend_hostname, | 159 | frontend_hostname: common.frontend_hostname.clone(), |
| 160 | }) | ||
| 161 | } | ||
| 162 | |||
| 163 | async fn cmd_net_up(args: NetUpArgs) -> Result<()> { | ||
| 164 | let context = context_from_common(&args.common).await?; | ||
| 165 | let matrix_content = tokio::fs::read_to_string(&args.latency_matrix) | ||
| 166 | .await | ||
| 167 | .context("reading latecy matrix")?; | ||
| 168 | let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) | ||
| 169 | .context("parsing latency matrix")?; | ||
| 170 | let machines = job_list_machines(&context).await?; | ||
| 171 | let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); | ||
| 172 | machines_net_container_build(&context, &machines).await?; | ||
| 173 | machines_clean(&context, &machines).await?; | ||
| 174 | machines_configure(&context, &configs).await?; | ||
| 175 | Ok(()) | ||
| 176 | } | ||
| 177 | |||
| 178 | async fn cmd_net_down(args: NetDownArgs) -> Result<()> { | ||
| 179 | let context = context_from_common(&args.common).await?; | ||
| 180 | let machines = job_list_machines(&context).await?; | ||
| 181 | machines_net_container_build(&context, &machines).await?; | ||
| 182 | machines_clean(&context, &machines).await?; | ||
| 183 | Ok(()) | ||
| 184 | } | ||
| 185 | |||
| 186 | async fn cmd_net_show(args: NetShowArgs) -> Result<()> { | ||
| 187 | let context = context_from_common(&args.common).await?; | ||
| 188 | let machines = job_list_machines(&context).await?; | ||
| 189 | let mut set = JoinSet::default(); | ||
| 190 | for machine in machines { | ||
| 191 | let context = context.clone(); | ||
| 192 | set.spawn(async move { (machine, machine_list_addresses(&context, machine).await) }); | ||
| 193 | } | ||
| 194 | let mut addresses = Vec::default(); | ||
| 195 | for (machine, result) in set.join_all().await { | ||
| 196 | let addrs = result?; | ||
| 197 | for addr in addrs { | ||
| 198 | addresses.push((machine, addr)); | ||
| 199 | } | ||
| 200 | } | ||
| 201 | addresses.sort(); | ||
| 202 | for (machine, addr) in addresses { | ||
| 203 | println!("{} {}", machine, addr); | ||
| 204 | } | ||
| 205 | Ok(()) | ||
| 206 | } | ||
| 207 | |||
| 208 | async fn cmd_net_preview(args: NetPreviewArgs) -> Result<()> { | ||
| 209 | let matrix_content = tokio::fs::read_to_string(&args.latency_matrix) | ||
| 210 | .await | ||
| 211 | .context("reading latecy matrix")?; | ||
| 212 | let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) | ||
| 213 | .context("parsing latency matrix")?; | ||
| 214 | let machines = args.machine; | ||
| 215 | let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); | ||
| 216 | |||
| 217 | for config in configs { | ||
| 218 | (0..20).for_each(|_| print!("-")); | ||
| 219 | print!(" {} ", config.machine); | ||
| 220 | (0..20).for_each(|_| print!("-")); | ||
| 221 | println!(); | ||
| 222 | println!("{}", machine_configuration_script(&config)); | ||
| 223 | } | ||
| 224 | Ok(()) | ||
| 225 | } | ||
| 226 | |||
| 227 | fn machine_from_addr(addr: Ipv4Addr) -> Result<Machine> { | ||
| 228 | let machine_index = usize::from(addr.octets()[1]); | ||
| 229 | Machine::from_index(machine_index) | ||
| 230 | .ok_or_else(|| eyre::eyre!("failed to resolve machine from address {addr}")) | ||
| 231 | } | ||
| 232 | |||
| 233 | #[derive(Debug, Clone)] | ||
| 234 | struct ScheduledContainer { | ||
| 235 | name: String, | ||
| 236 | image: String, | ||
| 237 | machine: Machine, | ||
| 238 | address: Ipv4Addr, | ||
| 239 | variables: HashMap<String, String>, | ||
| 240 | } | ||
| 241 | |||
| 242 | fn parse_schedule(schedule: &str) -> Result<Vec<ScheduledContainer>> { | ||
| 243 | #[derive(Debug, Deserialize)] | ||
| 244 | struct ScheduleItem { | ||
| 245 | name: Option<String>, | ||
| 246 | address: Ipv4Addr, | ||
| 247 | image: String, | ||
| 248 | env: HashMap<String, String>, | ||
| 249 | } | ||
| 250 | |||
| 251 | let items = serde_json::from_str::<Vec<ScheduleItem>>(schedule)?; | ||
| 252 | let mut containers = Vec::default(); | ||
| 253 | for item in items { | ||
| 254 | let name = match item.name { | ||
| 255 | Some(name) => name, | ||
| 256 | None => item.address.to_string(), | ||
| 257 | }; | ||
| 258 | let machine = machine_from_addr(item.address)?; | ||
| 259 | |||
| 260 | containers.push(ScheduledContainer { | ||
| 261 | name, | ||
| 262 | image: item.image, | ||
| 263 | machine, | ||
| 264 | address: item.address, | ||
| 265 | variables: item.env, | ||
| 266 | }); | ||
| 267 | } | ||
| 268 | Ok(containers) | ||
| 269 | } | ||
| 270 | |||
| 271 | async fn cmd_run(args: RunArgs) -> Result<()> { | ||
| 272 | let ctx = context_from_common(&args.common).await?; | ||
| 273 | let machines = job_list_machines(&ctx).await?; | ||
| 274 | let schedule = match args.schedule { | ||
| 275 | Some(path) => tokio::fs::read_to_string(&path) | ||
| 276 | .await | ||
| 277 | .with_context(|| format!("reading schedule file: {}", path.display()))?, | ||
| 278 | None => { | ||
| 279 | let mut stdin = String::default(); | ||
| 280 | tokio::io::stdin() | ||
| 281 | .read_to_string(&mut stdin) | ||
| 282 | .await | ||
| 283 | .context("reading schedule from stdin")?; | ||
| 284 | stdin | ||
| 285 | } | ||
| 286 | }; | ||
| 287 | let containers = parse_schedule(&schedule)?; | ||
| 288 | |||
| 289 | machines_foreach(&machines, |machine| machine_containers_clean(&ctx, machine)).await?; | ||
| 290 | machines_foreach(&machines, |machine| { | ||
| 291 | let ctx = ctx.clone(); | ||
| 292 | let containers = containers | ||
| 293 | .iter() | ||
| 294 | .filter(|c| c.machine == machine) | ||
| 295 | .cloned() | ||
| 296 | .collect::<Vec<_>>(); | ||
| 297 | let mut script = String::default(); | ||
| 298 | for (idx, container) in containers.iter().enumerate() { | ||
| 299 | script.push_str("docker create \\\n"); | ||
| 300 | script.push_str("\t--pull=always \\\n"); | ||
| 301 | script.push_str("\t--network=host \\\n"); | ||
| 302 | script.push_str("\t--restart=no \\\n"); | ||
| 303 | script.push_str(&format!("\t--name {} \\\n", container.name)); | ||
| 304 | for (key, val) in container.variables.iter() { | ||
| 305 | script.push_str("\t-e "); | ||
| 306 | script.push_str(key); | ||
| 307 | script.push_str("="); | ||
| 308 | script.push_str(val); | ||
| 309 | script.push_str(" \\\n"); | ||
| 310 | } | ||
| 311 | script.push_str("\t"); | ||
| 312 | script.push_str(&container.image); | ||
| 313 | script.push_str(" &\n"); | ||
| 314 | script.push_str(&format!("pid_{idx}=$!\n\n")); | ||
| 315 | } | ||
| 316 | |||
| 317 | for (idx, container) in containers.iter().enumerate() { | ||
| 318 | let name = &container.name; | ||
| 319 | script.push_str(&format!( | ||
| 320 | "wait $pid_{idx} || {{ echo Failed to create container {name} ; exit 1 ; }}\n" | ||
| 321 | )); | ||
| 322 | } | ||
| 323 | tracing::debug!("container creation script:\n{script}"); | ||
| 324 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 325 | }) | ||
| 326 | .await?; | ||
| 327 | |||
| 328 | tracing::info!("starting all containers on all machines"); | ||
| 329 | machines_foreach( | ||
| 330 | machines | ||
| 331 | .iter() | ||
| 332 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | ||
| 333 | |machine| { | ||
| 334 | machine_run_script( | ||
| 335 | &ctx, | ||
| 336 | machine, | ||
| 337 | "docker container ls -aq | xargs docker container start", | ||
| 338 | ) | ||
| 339 | }, | ||
| 340 | ) | ||
| 341 | .await?; | ||
| 342 | |||
| 343 | tracing::info!("waiting for all containers to exit"); | ||
| 344 | machines_foreach(&machines, |machine| { | ||
| 345 | let ctx = ctx.clone(); | ||
| 346 | let containers = containers | ||
| 347 | .iter() | ||
| 348 | .filter(|c| c.machine == machine) | ||
| 349 | .cloned() | ||
| 350 | .collect::<Vec<_>>(); | ||
| 351 | let mut script = String::default(); | ||
| 352 | for container in containers { | ||
| 353 | let name = &container.name; | ||
| 354 | script.push_str(&format!("if [ \"$(docker wait {name})\" -ne \"0\" ] ; then\n")); | ||
| 355 | script.push_str(&format!("\techo Container {name} failed\n")); | ||
| 356 | script.push_str(&format!("\tdocker logs {name} 2>1\n")); | ||
| 357 | script.push_str("\texit 1\n"); | ||
| 358 | script.push_str("fi\n\n"); | ||
| 359 | } | ||
| 360 | script.push_str("exit 0\n"); | ||
| 361 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 362 | }) | ||
| 363 | .await?; | ||
| 364 | |||
| 365 | tracing::info!("saving logs to disk on all machines"); | ||
| 366 | machines_foreach(&machines, |machine| { | ||
| 367 | let ctx = ctx.clone(); | ||
| 368 | let containers = containers | ||
| 369 | .iter() | ||
| 370 | .filter(|c| c.machine == machine) | ||
| 371 | .cloned() | ||
| 372 | .collect::<Vec<_>>(); | ||
| 373 | let mut script = String::default(); | ||
| 374 | script.push_str("set -e\n"); | ||
| 375 | script.push_str("mkdir -p /tmp/oar-p2p-logs\n"); | ||
| 376 | script.push_str("find /tmp/oar-p2p-logs -maxdepth 1 -type f -delete\n"); | ||
| 377 | for container in containers { | ||
| 378 | let name = &container.name; | ||
| 379 | script.push_str(&format!("docker logs {name} 1> /tmp/oar-p2p-logs/{name}.stdout 2> /tmp/oar-p2p-logs/{name}.stderr\n")); | ||
| 380 | } | ||
| 381 | script.push_str("exit 0\n"); | ||
| 382 | async move { machine_run_script(&ctx, machine, &script).await } | ||
| 383 | }) | ||
| 384 | .await?; | ||
| 385 | |||
| 386 | machines_foreach( | ||
| 387 | machines | ||
| 388 | .iter() | ||
| 389 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | ||
| 390 | |machine| machine_copy_logs_dir(&ctx, machine, &args.output_dir), | ||
| 391 | ) | ||
| 392 | .await?; | ||
| 393 | |||
| 394 | Ok(()) | ||
| 395 | } | ||
| 396 | |||
| 397 | async fn machine_copy_logs_dir(ctx: &Context, machine: Machine, output_dir: &Path) -> Result<()> { | ||
| 398 | let scp_common = &[ | ||
| 399 | "-o", | ||
| 400 | "StrictHostKeyChecking=no", | ||
| 401 | "-o", | ||
| 402 | "UserKnownHostsFile=/dev/null", | ||
| 403 | ]; | ||
| 404 | |||
| 405 | let mut args = vec![]; | ||
| 406 | args.extend(scp_common); | ||
| 407 | if ctx.node == ExecutionNode::Unknown { | ||
| 408 | args.push("-J"); | ||
| 409 | args.push(ctx.frontend_hostname.as_ref().expect("TODO")); | ||
| 410 | } | ||
| 411 | args.push("-r"); | ||
| 412 | |||
| 413 | let source_path = format!("{}:/tmp/oar-p2p-logs", machine.hostname()); | ||
| 414 | let destination_path = output_dir.display().to_string(); | ||
| 415 | args.push(&source_path); | ||
| 416 | args.push(&destination_path); | ||
| 417 | |||
| 418 | let output = Command::new("scp").args(args).output().await?; | ||
| 419 | output.exit_ok()?; | ||
| 420 | Ok(()) | ||
| 421 | } | ||
| 422 | |||
| 423 | async fn machines_foreach<F, FUT, RET>( | ||
| 424 | machines: impl IntoIterator<Item = &Machine>, | ||
| 425 | f: F, | ||
| 426 | ) -> Result<()> | ||
| 427 | where | ||
| 428 | F: Fn(Machine) -> FUT, | ||
| 429 | FUT: std::future::Future<Output = Result<RET>>, | ||
| 430 | { | ||
| 431 | let mut futures = FuturesUnordered::new(); | ||
| 432 | |||
| 433 | for &machine in machines { | ||
| 434 | let fut = f(machine); | ||
| 435 | let fut = async move { (machine, fut.await) }; | ||
| 436 | futures.push(fut); | ||
| 437 | } | ||
| 438 | |||
| 439 | while let Some((machine, result)) = futures.next().await { | ||
| 440 | if let Err(err) = result { | ||
| 441 | tracing::error!("error on machine {machine}: {err}"); | ||
| 442 | return Err(err); | ||
| 443 | } | ||
| 444 | } | ||
| 445 | Ok(()) | ||
| 446 | } | ||
| 447 | |||
| 448 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | ||
| 449 | async fn machine_containers_clean(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 450 | tracing::info!("removing all containers..."); | ||
| 451 | machine_run_script(ctx, machine, "docker ps -aq | xargs -r docker rm -f").await?; | ||
| 452 | Ok(()) | ||
| 453 | } | ||
| 454 | |||
| 455 | #[tracing::instrument(ret, err, skip_all)] | ||
| 456 | async fn machines_clean(ctx: &Context, machines: &[Machine]) -> Result<()> { | ||
| 457 | tracing::info!("cleaning machines: {machines:?}"); | ||
| 458 | let mut set = JoinSet::default(); | ||
| 459 | for &machine in machines { | ||
| 460 | let ctx = ctx.clone(); | ||
| 461 | set.spawn(async move { machine_clean(&ctx, machine).await }); | ||
| 462 | } | ||
| 463 | let results = set.join_all().await; | ||
| 464 | for result in results { | ||
| 465 | result?; | ||
| 466 | } | ||
| 467 | Ok(()) | ||
| 468 | } | ||
| 469 | |||
| 470 | #[tracing::instrument(ret, err, skip_all)] | ||
| 471 | async fn machines_net_container_build(ctx: &Context, machines: &[Machine]) -> Result<()> { | ||
| 472 | tracing::info!("building networking container for machines: {machines:?}"); | ||
| 473 | let mut set = JoinSet::default(); | ||
| 474 | for &machine in machines { | ||
| 475 | let ctx = ctx.clone(); | ||
| 476 | set.spawn(async move { machine_net_container_build(&ctx, machine).await }); | ||
| 477 | } | ||
| 478 | for result in set.join_all().await { | ||
| 479 | result?; | ||
| 480 | } | ||
| 481 | Ok(()) | ||
| 482 | } | ||
| 483 | |||
| 484 | #[tracing::instrument(ret, err, skip_all)] | ||
| 485 | async fn machines_configure(ctx: &Context, configs: &[MachineConfig]) -> Result<()> { | ||
| 486 | tracing::info!("configuring machines"); | ||
| 487 | let mut set = JoinSet::default(); | ||
| 488 | for config in configs { | ||
| 489 | let ctx = ctx.clone(); | ||
| 490 | let config = config.clone(); | ||
| 491 | set.spawn(async move { machine_configure(&ctx, &config).await }); | ||
| 492 | } | ||
| 493 | for result in set.join_all().await { | ||
| 494 | result?; | ||
| 495 | } | ||
| 496 | Ok(()) | ||
| 497 | } | ||
| 498 | |||
| 499 | async fn machine_list_addresses(ctx: &Context, machine: Machine) -> Result<Vec<Ipv4Addr>> { | ||
| 500 | let interface = machine.interface(); | ||
| 501 | let script = format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+'"); | ||
| 502 | let output = machine_run_script(ctx, machine, &script).await?; | ||
| 503 | let stdout = std::str::from_utf8(&output.stdout)?; | ||
| 504 | let mut addresses = Vec::default(); | ||
| 505 | for line in stdout.lines().map(str::trim).filter(|l| !l.is_empty()) { | ||
| 506 | tracing::trace!("parsing address from line: '{line}'"); | ||
| 507 | addresses.push(line.parse()?); | ||
| 508 | } | ||
| 509 | Ok(addresses) | ||
| 510 | } | ||
| 511 | |||
| 512 | async fn machine_run( | ||
| 513 | ctx: &Context, | ||
| 514 | machine: Machine, | ||
| 515 | args: &[&str], | ||
| 516 | stdin: Option<&str>, | ||
| 517 | ) -> Result<Output> { | ||
| 518 | let ssh_common = &[ | ||
| 519 | "-o", | ||
| 520 | "StrictHostKeyChecking=no", | ||
| 521 | "-o", | ||
| 522 | "UserKnownHostsFile=/dev/null", | ||
| 523 | ]; | ||
| 524 | |||
| 525 | let mut arguments = match ctx.node { | ||
| 526 | ExecutionNode::Frontend => { | ||
| 527 | let mut arguments = Vec::default(); | ||
| 528 | arguments.push("ssh"); | ||
| 529 | arguments.extend(ssh_common); | ||
| 530 | arguments.push(machine.hostname()); | ||
| 531 | arguments | ||
| 532 | } | ||
| 533 | ExecutionNode::Machine(m) => { | ||
| 534 | if m == machine { | ||
| 535 | vec![] | ||
| 536 | } else { | ||
| 537 | let mut arguments = Vec::default(); | ||
| 538 | arguments.push("ssh"); | ||
| 539 | arguments.extend(ssh_common); | ||
| 540 | arguments.push(machine.hostname()); | ||
| 541 | arguments | ||
| 542 | } | ||
| 543 | } | ||
| 544 | ExecutionNode::Unknown => { | ||
| 545 | let frontend = ctx.frontend_hostname.as_ref().unwrap(); | ||
| 546 | let mut arguments = Vec::default(); | ||
| 547 | arguments.push("ssh"); | ||
| 548 | arguments.extend(ssh_common); | ||
| 549 | arguments.push("-J"); | ||
| 550 | arguments.push(frontend); | ||
| 551 | arguments.push(machine.hostname()); | ||
| 552 | arguments | ||
| 553 | } | ||
| 48 | }; | 554 | }; |
| 49 | let machines = list_job_machines(&context).await?; | 555 | if args.is_empty() { |
| 556 | arguments.push("bash"); | ||
| 557 | } | ||
| 558 | arguments.extend(args); | ||
| 50 | 559 | ||
| 51 | // listing oar job machines | 560 | let mut proc = Command::new(arguments[0]) |
| 52 | // if we are in the frontend we use oarstat | 561 | .args(&arguments[1..]) |
| 53 | // if we are in a job machine we read the cpuset file | 562 | .stdout(std::process::Stdio::piped()) |
| 54 | // if we are outside the cluster we use ssh cluster oarstat | 563 | .stderr(std::process::Stdio::piped()) |
| 564 | .stdin(std::process::Stdio::piped()) | ||
| 565 | .spawn() | ||
| 566 | .context("spawning process")?; | ||
| 55 | 567 | ||
| 56 | // machine generate configurations | 568 | if let Some(stdin) = stdin { |
| 57 | // this does not require any connections | 569 | let proc_stdin = proc.stdin.as_mut().unwrap(); |
| 570 | proc_stdin | ||
| 571 | .write_all(stdin.as_bytes()) | ||
| 572 | .await | ||
| 573 | .context("writing stdin")?; | ||
| 574 | } | ||
| 58 | 575 | ||
| 59 | // machine cleanup interface | 576 | let output = proc |
| 60 | // requires running commands inside a container inside a machine | 577 | .wait_with_output() |
| 61 | // we could generate a cleanup bash script and execute that in oneshot | 578 | .await |
| 579 | .context("waiting for process to exit")?; | ||
| 62 | 580 | ||
| 63 | // machine apply configuration | 581 | Ok(output) |
| 64 | // this also requires running some scripts inside a container inside the machine | 582 | } |
| 65 | 583 | ||
| 584 | async fn machine_run_script(ctx: &Context, machine: Machine, script: &str) -> Result<Output> { | ||
| 585 | tracing::trace!("running script on machine {machine}:\n{script}"); | ||
| 586 | let output = machine_run(ctx, machine, &[], Some(script)).await?; | ||
| 587 | tracing::trace!( | ||
| 588 | "stdout:\n{}", | ||
| 589 | std::str::from_utf8(&output.stdout).unwrap_or("<invalid utf-8>") | ||
| 590 | ); | ||
| 591 | tracing::trace!( | ||
| 592 | "stderr:\n{}", | ||
| 593 | std::str::from_utf8(&output.stderr).unwrap_or("<invalid utf-8>") | ||
| 594 | ); | ||
| 595 | Ok(output.exit_ok()?) | ||
| 596 | } | ||
| 597 | |||
| 598 | async fn machine_net_container_run_script( | ||
| 599 | ctx: &Context, | ||
| 600 | machine: Machine, | ||
| 601 | script: &str, | ||
| 602 | ) -> Result<Output> { | ||
| 603 | machine_run( | ||
| 604 | ctx, | ||
| 605 | machine, | ||
| 606 | &[ | ||
| 607 | "docker", | ||
| 608 | "run", | ||
| 609 | "--rm", | ||
| 610 | "-i", | ||
| 611 | "--net=host", | ||
| 612 | "--privileged", | ||
| 613 | CONTAINER_IMAGE_NAME, | ||
| 614 | ], | ||
| 615 | Some(script), | ||
| 616 | ) | ||
| 617 | .await | ||
| 618 | } | ||
| 619 | |||
| 620 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] | ||
| 621 | async fn machine_net_container_build(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 622 | let script = r#" | ||
| 623 | set -e | ||
| 624 | cat << EOF > /tmp/oar-p2p.containerfile | ||
| 625 | FROM alpine:latest | ||
| 626 | RUN apk update && \ | ||
| 627 | apk add --no-cache bash grep iproute2 iproute2-tc nftables && \ | ||
| 628 | rm -rf /var/cache/apk/* | ||
| 629 | |||
| 630 | WORKDIR /work | ||
| 631 | EOF | ||
| 632 | |||
| 633 | docker build -t local/oar-p2p-networking:latest -f /tmp/oar-p2p.containerfile . | ||
| 634 | "#; | ||
| 635 | machine_run_script(ctx, machine, script).await?; | ||
| 66 | Ok(()) | 636 | Ok(()) |
| 67 | } | 637 | } |
| 68 | 638 | ||
| 69 | async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | 639 | #[tracing::instrument(ret, err, skip_all, fields(machine = machine.to_string()))] |
| 640 | async fn machine_clean(ctx: &Context, machine: Machine) -> Result<()> { | ||
| 641 | let interface = machine.interface(); | ||
| 642 | let mut script = String::default(); | ||
| 643 | script.push_str(&format!( | ||
| 644 | "ip route del 10.0.0.0/8 dev {interface} || true\n" | ||
| 645 | )); | ||
| 646 | script.push_str(&format!("ip addr show {interface} | grep -oE '10\\.[0-9]+\\.[0-9]+\\.[0-9]+/32' | sed 's/\\(.*\\)/addr del \\1 dev {interface}/' | ip -b -\n")); | ||
| 647 | script.push_str(&format!( | ||
| 648 | "tc qdisc del dev {interface} root 2>/dev/null || true\n" | ||
| 649 | )); | ||
| 650 | script.push_str(&format!( | ||
| 651 | "tc qdisc del dev {interface} ingress 2>/dev/null || true\n" | ||
| 652 | )); | ||
| 653 | script.push_str("tc qdisc del dev lo root 2>/dev/null || true\n"); | ||
| 654 | script.push_str("tc qdisc del dev lo ingress 2>/dev/null || true\n"); | ||
| 655 | script.push_str("nft delete table oar-p2p 2>/dev/null || true\n"); | ||
| 656 | let output = machine_net_container_run_script(&ctx, machine, &script).await?; | ||
| 657 | Ok(()) | ||
| 658 | } | ||
| 659 | |||
| 660 | fn machine_configuration_script(config: &MachineConfig) -> String { | ||
| 661 | let mut script = String::default(); | ||
| 662 | // ip configuration | ||
| 663 | script.push_str("cat << EOF | ip -b -\n"); | ||
| 664 | for command in config.ip_commands.iter() { | ||
| 665 | script.push_str(command); | ||
| 666 | script.push_str("\n"); | ||
| 667 | } | ||
| 668 | script.push_str("\nEOF\n"); | ||
| 669 | |||
| 670 | // tc configuration | ||
| 671 | script.push_str("cat << EOF | tc -b -\n"); | ||
| 672 | for command in config.tc_commands.iter() { | ||
| 673 | script.push_str(command); | ||
| 674 | script.push_str("\n"); | ||
| 675 | } | ||
| 676 | script.push_str("\nEOF\n"); | ||
| 677 | |||
| 678 | // nft configuration | ||
| 679 | script.push_str("cat << EOF | nft -f -\n"); | ||
| 680 | script.push_str(&config.nft_script); | ||
| 681 | script.push_str("\nEOF\n"); | ||
| 682 | script | ||
| 683 | } | ||
| 684 | |||
| 685 | #[tracing::instrument(ret, err, skip_all, fields(machine = config.machine.to_string()))] | ||
| 686 | async fn machine_configure(ctx: &Context, config: &MachineConfig) -> Result<()> { | ||
| 687 | let script = machine_configuration_script(config); | ||
| 688 | tracing::debug!("configuration script:\n{script}"); | ||
| 689 | machine_net_container_run_script(ctx, config.machine, &script).await?; | ||
| 690 | Ok(()) | ||
| 691 | } | ||
| 692 | |||
| 693 | fn machine_address_for_idx(machine: Machine, idx: u32) -> Ipv4Addr { | ||
| 694 | let c = u8::try_from(idx / 254).unwrap(); | ||
| 695 | let d = u8::try_from(idx % 254 + 1).unwrap(); | ||
| 696 | Ipv4Addr::new(10, machine.index().try_into().unwrap(), c, d) | ||
| 697 | } | ||
| 698 | |||
| 699 | fn machine_generate_configs( | ||
| 700 | matrix: &LatencyMatrix, | ||
| 701 | machines: &[Machine], | ||
| 702 | addr_per_cpu: u32, | ||
| 703 | ) -> Vec<MachineConfig> { | ||
| 704 | let mut configs = Vec::default(); | ||
| 705 | let mut addresses = Vec::default(); | ||
| 706 | let mut address_to_index = HashMap::<Ipv4Addr, usize>::default(); | ||
| 707 | |||
| 708 | // gather all addresses across all machines | ||
| 709 | for &machine in machines { | ||
| 710 | for i in 0..(addr_per_cpu * machine.cpus()) { | ||
| 711 | let address = machine_address_for_idx(machine, i); | ||
| 712 | addresses.push(address); | ||
| 713 | address_to_index.insert(address, addresses.len() - 1); | ||
| 714 | } | ||
| 715 | } | ||
| 716 | |||
| 717 | for &machine in machines { | ||
| 718 | let mut machine_addresses = Vec::default(); | ||
| 719 | let mut machine_ip_commands = Vec::default(); | ||
| 720 | let mut machine_tc_commands = Vec::default(); | ||
| 721 | let mut machine_nft_script = String::default(); | ||
| 722 | |||
| 723 | machine_ip_commands.push(format!("route add 10.0.0.0/8 dev {}", machine.interface())); | ||
| 724 | for i in 0..(addr_per_cpu * machine.cpus()) { | ||
| 725 | let address = machine_address_for_idx(machine, i); | ||
| 726 | machine_addresses.push(address); | ||
| 727 | machine_ip_commands.push(format!("addr add {address}/32 dev {}", machine.interface())); | ||
| 728 | } | ||
| 729 | |||
| 730 | let mut latencies_set = HashSet::<u32>::default(); | ||
| 731 | let mut latencies_buckets = Vec::<u32>::default(); | ||
| 732 | let mut latencies_addr_pairs = HashMap::<u32, Vec<(Ipv4Addr, Ipv4Addr)>>::default(); | ||
| 733 | for &addr in &machine_addresses { | ||
| 734 | let addr_idx = address_to_index[&addr]; | ||
| 735 | for other_idx in (0..addresses.len()).filter(|i| *i != addr_idx) { | ||
| 736 | let other = addresses[other_idx]; | ||
| 737 | let latency = matrix.latency(addr_idx, other_idx); | ||
| 738 | let latency_millis = u32::try_from(latency.as_millis()).unwrap(); | ||
| 739 | if !latencies_set.contains(&latency_millis) { | ||
| 740 | latencies_set.insert(latency_millis); | ||
| 741 | latencies_buckets.push(latency_millis); | ||
| 742 | } | ||
| 743 | latencies_addr_pairs | ||
| 744 | .entry(latency_millis) | ||
| 745 | .or_default() | ||
| 746 | .push((addr, other)); | ||
| 747 | } | ||
| 748 | } | ||
| 749 | |||
| 750 | for iface in &["lo", machine.interface()] { | ||
| 751 | machine_tc_commands.push(format!( | ||
| 752 | "qdisc add dev {iface} root handle 1: htb default 9999" | ||
| 753 | )); | ||
| 754 | machine_tc_commands.push(format!( | ||
| 755 | "class add dev {iface} parent 1: classid 1:9999 htb rate 10gbit" | ||
| 756 | )); | ||
| 757 | for (idx, &latency_millis) in latencies_buckets.iter().enumerate() { | ||
| 758 | // tc class for latency at idx X is X + 1 | ||
| 759 | let latency_class_id = idx + 1; | ||
| 760 | // mark for latency at idx X is X + 1 | ||
| 761 | let latency_mark = idx + 1; | ||
| 762 | |||
| 763 | machine_tc_commands.push(format!( | ||
| 764 | "class add dev {iface} parent 1: classid 1:{} htb rate 10gbit", | ||
| 765 | latency_class_id | ||
| 766 | )); | ||
| 767 | // why idx + 2 here? I dont remember anymore and forgot to comment | ||
| 768 | machine_tc_commands.push(format!( | ||
| 769 | "qdisc add dev {iface} parent 1:{} handle {}: netem delay {latency_millis}ms", | ||
| 770 | latency_class_id, | ||
| 771 | idx + 2 | ||
| 772 | )); | ||
| 773 | // TODO: is the order of these things correct? | ||
| 774 | machine_tc_commands.push(format!( | ||
| 775 | "filter add dev {iface} parent 1:0 prio 1 handle {} fw flowid 1:{}", | ||
| 776 | latency_mark, latency_class_id, | ||
| 777 | )); | ||
| 778 | } | ||
| 779 | } | ||
| 780 | |||
| 781 | machine_nft_script.push_str("table ip oar-p2p {\n"); | ||
| 782 | machine_nft_script.push_str("\tmap mark_pairs {\n"); | ||
| 783 | machine_nft_script.push_str("\t\ttype ipv4_addr . ipv4_addr : mark\n"); | ||
| 784 | machine_nft_script.push_str("\t\telements = {\n"); | ||
| 785 | for (latency_idx, &latency_millis) in latencies_buckets.iter().enumerate() { | ||
| 786 | let latency_mark = latency_idx + 1; | ||
| 787 | let pairs = match latencies_addr_pairs.get(&latency_millis) { | ||
| 788 | Some(pairs) => pairs, | ||
| 789 | None => continue, | ||
| 790 | }; | ||
| 791 | |||
| 792 | for (src, dst) in pairs { | ||
| 793 | assert_ne!(src, dst); | ||
| 794 | machine_nft_script.push_str(&format!("\t\t\t{src} . {dst} : {latency_mark},\n")); | ||
| 795 | } | ||
| 796 | } | ||
| 797 | machine_nft_script.push_str("\t\t}\n"); | ||
| 798 | machine_nft_script.push_str("\t}\n"); | ||
| 799 | machine_nft_script.push_str("\n"); | ||
| 800 | machine_nft_script.push_str("\tchain postrouting {\n"); | ||
| 801 | machine_nft_script.push_str("\t\ttype filter hook postrouting priority mangle -1\n"); | ||
| 802 | machine_nft_script.push_str("\t\tpolicy accept\n"); | ||
| 803 | machine_nft_script | ||
| 804 | .push_str("\t\tmeta mark set ip saddr . ip daddr map @mark_pairs counter\n"); | ||
| 805 | machine_nft_script.push_str("\t}\n"); | ||
| 806 | machine_nft_script.push_str("}\n"); | ||
| 807 | |||
| 808 | configs.push(MachineConfig { | ||
| 809 | machine, | ||
| 810 | addresses: machine_addresses, | ||
| 811 | nft_script: machine_nft_script, | ||
| 812 | tc_commands: machine_tc_commands, | ||
| 813 | ip_commands: machine_ip_commands, | ||
| 814 | }); | ||
| 815 | } | ||
| 816 | configs | ||
| 817 | } | ||
| 818 | |||
| 819 | async fn job_list_machines(ctx: &Context) -> Result<Vec<Machine>> { | ||
| 70 | match ctx.node { | 820 | match ctx.node { |
| 71 | ExecutionNode::Frontend => { | 821 | ExecutionNode::Frontend => { |
| 72 | let job_id = match ctx.job_id { | 822 | let job_id = match ctx.job_id { |
| @@ -101,7 +851,7 @@ async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | |||
| 101 | Some(hostname) => hostname, | 851 | Some(hostname) => hostname, |
| 102 | None => { | 852 | None => { |
| 103 | return Err(eyre::eyre!( | 853 | return Err(eyre::eyre!( |
| 104 | "frontend hostname is requiredwhen running from outside the cluster" | 854 | "frontend hostname is required when running from outside the cluster" |
| 105 | )); | 855 | )); |
| 106 | } | 856 | } |
| 107 | }; | 857 | }; |
| @@ -148,7 +898,7 @@ async fn list_job_machines(ctx: &Context) -> eyre::Result<Vec<Machine>> { | |||
| 148 | } | 898 | } |
| 149 | } | 899 | } |
| 150 | 900 | ||
| 151 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Result<Vec<Machine>> { | 901 | fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> Result<Vec<Machine>> { |
| 152 | #[derive(Debug, Deserialize)] | 902 | #[derive(Debug, Deserialize)] |
| 153 | struct JobSchema { | 903 | struct JobSchema { |
| 154 | assigned_network_address: Vec<String>, | 904 | assigned_network_address: Vec<String>, |
| @@ -168,8 +918,8 @@ fn extract_machines_from_oar_stat_json(output: &str, job_id: u32) -> eyre::Resul | |||
| 168 | Ok(machines) | 918 | Ok(machines) |
| 169 | } | 919 | } |
| 170 | 920 | ||
| 171 | fn get_execution_node() -> eyre::Result<ExecutionNode> { | 921 | async fn get_execution_node() -> Result<ExecutionNode> { |
| 172 | let hostname = get_hostname()?; | 922 | let hostname = get_hostname().await?; |
| 173 | let node = match hostname.as_str() { | 923 | let node = match hostname.as_str() { |
| 174 | "frontend" => ExecutionNode::Frontend, | 924 | "frontend" => ExecutionNode::Frontend, |
| 175 | _ => match Machine::from_hostname(&hostname) { | 925 | _ => match Machine::from_hostname(&hostname) { |
| @@ -180,8 +930,12 @@ fn get_execution_node() -> eyre::Result<ExecutionNode> { | |||
| 180 | Ok(node) | 930 | Ok(node) |
| 181 | } | 931 | } |
| 182 | 932 | ||
| 183 | fn get_hostname() -> eyre::Result<String> { | 933 | async fn get_hostname() -> Result<String> { |
| 184 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | 934 | if let Ok(hostname) = tokio::fs::read_to_string("/etc/hostname").await { |
| 935 | Ok(hostname) | ||
| 936 | } else { | ||
| 937 | std::env::var("HOSTNAME").context("reading HOSTNAME env var") | ||
| 938 | } | ||
| 185 | } | 939 | } |
| 186 | 940 | ||
| 187 | #[cfg(test)] | 941 | #[cfg(test)] |
