diff options
| -rw-r--r-- | oar-p2p.py | 213 |
1 files changed, 120 insertions, 93 deletions
| @@ -8,65 +8,65 @@ from dataclasses import dataclass | |||
| 8 | 8 | ||
| 9 | NFT_TABLE_NAME = "oar-p2p" | 9 | NFT_TABLE_NAME = "oar-p2p" |
| 10 | 10 | ||
| 11 | MACHINE_INTERFACES = { | 11 | MACHINE_INTERFACES = [ |
| 12 | "alakazam-01": None, | 12 | ("alakazam-01", None), |
| 13 | "alakazam-02": None, | 13 | ("alakazam-02", None), |
| 14 | "alakazam-03": None, | 14 | ("alakazam-03", None), |
| 15 | "alakazam-04": None, | 15 | ("alakazam-04", None), |
| 16 | "alakazam-05": None, | 16 | ("alakazam-05", None), |
| 17 | "alakazam-06": None, | 17 | ("alakazam-06", None), |
| 18 | "alakazam-07": None, | 18 | ("alakazam-07", None), |
| 19 | "alakazam-08": None, | 19 | ("alakazam-08", None), |
| 20 | "bulbasaur-1": None, | 20 | ("bulbasaur-1", None), |
| 21 | "bulbasaur-2": None, | 21 | ("bulbasaur-2", None), |
| 22 | "bulbasaur-3": None, | 22 | ("bulbasaur-3", None), |
| 23 | "charmander-1": "bond0", | 23 | ("charmander-1", "bond0"), |
| 24 | "charmander-2": "bond0", | 24 | ("charmander-2", "bond0"), |
| 25 | "charmander-3": "bond0", | 25 | ("charmander-3", "bond0"), |
| 26 | "charmander-4": "bond0", | 26 | ("charmander-4", "bond0"), |
| 27 | "charmander-5": "bond0", | 27 | ("charmander-5", "bond0"), |
| 28 | "gengar-1": None, | 28 | ("gengar-1", "bond0"), |
| 29 | "gengar-2": None, | 29 | ("gengar-2", "bond0"), |
| 30 | "gengar-3": None, | 30 | ("gengar-3", "bond0"), |
| 31 | "gengar-4": None, | 31 | ("gengar-4", "bond0"), |
| 32 | "gengar-5": None, | 32 | ("gengar-5", "bond0"), |
| 33 | "kadabra-01": None, | 33 | ("kadabra-01", None), |
| 34 | "kadabra-02": None, | 34 | ("kadabra-02", None), |
| 35 | "kadabra-03": None, | 35 | ("kadabra-03", None), |
| 36 | "kadabra-04": None, | 36 | ("kadabra-04", None), |
| 37 | "kadabra-05": None, | 37 | ("kadabra-05", None), |
| 38 | "kadabra-06": None, | 38 | ("kadabra-06", None), |
| 39 | "kadabra-07": None, | 39 | ("kadabra-07", None), |
| 40 | "kadabra-08": None, | 40 | ("kadabra-08", None), |
| 41 | "lugia-1": None, | 41 | ("lugia-1", None), |
| 42 | "lugia-2": None, | 42 | ("lugia-2", None), |
| 43 | "lugia-3": None, | 43 | ("lugia-3", None), |
| 44 | "lugia-4": None, | 44 | ("lugia-4", None), |
| 45 | "lugia-5": None, | 45 | ("lugia-5", None), |
| 46 | "magikarp-1": None, | 46 | ("magikarp-1", None), |
| 47 | "moltres-01": None, | 47 | ("moltres-01", None), |
| 48 | "moltres-02": None, | 48 | ("moltres-02", None), |
| 49 | "moltres-03": None, | 49 | ("moltres-03", None), |
| 50 | "moltres-04": None, | 50 | ("moltres-04", None), |
| 51 | "moltres-05": None, | 51 | ("moltres-05", None), |
| 52 | "moltres-06": None, | 52 | ("moltres-06", None), |
| 53 | "moltres-07": None, | 53 | ("moltres-07", None), |
| 54 | "moltres-08": None, | 54 | ("moltres-08", None), |
| 55 | "moltres-09": None, | 55 | ("moltres-09", None), |
| 56 | "moltres-10": None, | 56 | ("moltres-10", None), |
| 57 | "oddish-1": None, | 57 | ("oddish-1", None), |
| 58 | "psyduck-1": None, | 58 | ("psyduck-1", None), |
| 59 | "psyduck-2": None, | 59 | ("psyduck-2", None), |
| 60 | "psyduck-3": None, | 60 | ("psyduck-3", None), |
| 61 | "shelder-1": None, | 61 | ("shelder-1", None), |
| 62 | "squirtle-1": None, | 62 | ("squirtle-1", None), |
| 63 | "squirtle-2": None, | 63 | ("squirtle-2", None), |
| 64 | "squirtle-3": None, | 64 | ("squirtle-3", None), |
| 65 | "squirtle-4": None, | 65 | ("squirtle-4", None), |
| 66 | "staryu-1": None, | 66 | ("staryu-1", None), |
| 67 | "sudowoodo-1": None, | 67 | ("sudowoodo-1", None), |
| 68 | "vulpix-1": None, | 68 | ("vulpix-1", None), |
| 69 | } | 69 | ] |
| 70 | 70 | ||
| 71 | 71 | ||
| 72 | class LatencyMatrix: | 72 | class LatencyMatrix: |
| @@ -105,29 +105,43 @@ class LatencyMatrix: | |||
| 105 | @dataclass | 105 | @dataclass |
| 106 | class MachineConfiguration: | 106 | class MachineConfiguration: |
| 107 | machine: str | 107 | machine: str |
| 108 | addresses: list[str] | ||
| 108 | nft_script: str | 109 | nft_script: str |
| 109 | tc_commands: list[str] | 110 | tc_commands: list[str] |
| 110 | ip_commands: list[str] | 111 | ip_commands: list[str] |
| 111 | 112 | ||
| 112 | 113 | ||
| 113 | def machine_get_interface(machine: str) -> str: | 114 | def machine_get_interface(machine: str) -> str: |
| 114 | interface = MACHINE_INTERFACES.get(machine, None) | 115 | for name, interface in MACHINE_INTERFACES: |
| 115 | assert interface is not None, f"machine interface not configured: {machine}" | 116 | if name == machine: |
| 116 | return interface | 117 | assert interface is not None, f"machine interface not configured: {machine}" |
| 118 | return interface | ||
| 119 | raise ValueError(f"Unknown machine: {machine}") | ||
| 120 | |||
| 121 | |||
| 122 | def machine_get_index(machine: str) -> int: | ||
| 123 | for i, (name, _) in enumerate(MACHINE_INTERFACES): | ||
| 124 | if name == machine: | ||
| 125 | return i | ||
| 126 | raise ValueError(f"Unknown machine: {machine}") | ||
| 117 | 127 | ||
| 118 | 128 | ||
| 119 | def machine_generate_configurations( | 129 | def machine_generate_configurations( |
| 120 | machines: list[str], num_addresses: int, matrix: LatencyMatrix | 130 | machines: list[str], num_addresses_per_machine: int, matrix: LatencyMatrix |
| 121 | ) -> list[MachineConfiguration]: | 131 | ) -> list[MachineConfiguration]: |
| 122 | configurations = [] | 132 | configurations = [] |
| 123 | 133 | ||
| 124 | machine_addr_idxs = defaultdict(list) | 134 | machine_addr_idxs = defaultdict(list) |
| 125 | 135 | addr_idx_to_machine_idx = defaultdict(int) | |
| 126 | for i in range(num_addresses): | 136 | for machine_idx in range(len(machines)): |
| 127 | machine = machines[i % len(machines)] | 137 | for local_addr_idx in range(num_addresses_per_machine): |
| 128 | machine_addr_idxs[machine].append(i) | 138 | addr_idx = machine_idx * num_addresses_per_machine + local_addr_idx |
| 139 | machine_addr_idxs[machines[machine_idx]].append(addr_idx) | ||
| 140 | addr_idx_to_machine_idx[addr_idx] = machine_get_index(machines[machine_idx]) | ||
| 129 | 141 | ||
| 130 | for machine in machines: | 142 | for machine in machines: |
| 143 | machine_ips = [] | ||
| 144 | machine_index = machine_get_index(machine) | ||
| 131 | interface = machine_get_interface(machine) | 145 | interface = machine_get_interface(machine) |
| 132 | addr_idxs = machine_addr_idxs[machine] | 146 | addr_idxs = machine_addr_idxs[machine] |
| 133 | ip_commands = [] | 147 | ip_commands = [] |
| @@ -135,14 +149,16 @@ def machine_generate_configurations( | |||
| 135 | 149 | ||
| 136 | ip_commands.append(f"route add 10.0.0.0/8 dev {interface}") | 150 | ip_commands.append(f"route add 10.0.0.0/8 dev {interface}") |
| 137 | for addr_idx in addr_idxs: | 151 | for addr_idx in addr_idxs: |
| 138 | ip_commands.append( | 152 | addr = address_from_index( |
| 139 | f"addr add {address_from_index(addr_idx)}/32 dev {interface}" | 153 | machine_index, addr_idx % num_addresses_per_machine |
| 140 | ) | 154 | ) |
| 155 | machine_ips.append(addr) | ||
| 156 | ip_commands.append(f"addr add {addr}/32 dev {interface}") | ||
| 141 | 157 | ||
| 142 | latencies_set = set() | 158 | latencies_set = set() |
| 143 | latencies_buckets = defaultdict(list) | 159 | latencies_buckets = defaultdict(list) |
| 144 | for addr_idx in addr_idxs: | 160 | for addr_idx in addr_idxs: |
| 145 | for i in range(num_addresses): | 161 | for i in range(num_addresses_per_machine): |
| 146 | if addr_idx == i: | 162 | if addr_idx == i: |
| 147 | continue | 163 | continue |
| 148 | latency = matrix.get_latency(addr_idx, i) | 164 | latency = matrix.get_latency(addr_idx, i) |
| @@ -180,8 +196,14 @@ def machine_generate_configurations( | |||
| 180 | nft_script += f" elements = {{\n" | 196 | nft_script += f" elements = {{\n" |
| 181 | for src_idx, dst_idx in latencies_buckets[latency]: | 197 | for src_idx, dst_idx in latencies_buckets[latency]: |
| 182 | assert src_idx != dst_idx | 198 | assert src_idx != dst_idx |
| 183 | src_addr = address_from_index(src_idx) | 199 | src_addr = address_from_index( |
| 184 | dst_addr = address_from_index(dst_idx) | 200 | addr_idx_to_machine_idx[src_idx], |
| 201 | src_idx % num_addresses_per_machine, | ||
| 202 | ) | ||
| 203 | dst_addr = address_from_index( | ||
| 204 | addr_idx_to_machine_idx[dst_idx], | ||
| 205 | dst_idx % num_addresses_per_machine, | ||
| 206 | ) | ||
| 185 | nft_script += f" {src_addr} . {dst_addr},\n" | 207 | nft_script += f" {src_addr} . {dst_addr},\n" |
| 186 | nft_script += f" }}\n" | 208 | nft_script += f" }}\n" |
| 187 | nft_script += f" }}\n\n" | 209 | nft_script += f" }}\n\n" |
| @@ -198,6 +220,7 @@ def machine_generate_configurations( | |||
| 198 | configurations.append( | 220 | configurations.append( |
| 199 | MachineConfiguration( | 221 | MachineConfiguration( |
| 200 | machine=machine, | 222 | machine=machine, |
| 223 | addresses=machine_ips, | ||
| 201 | nft_script=nft_script, | 224 | nft_script=nft_script, |
| 202 | tc_commands=tc_commands, | 225 | tc_commands=tc_commands, |
| 203 | ip_commands=ip_commands, | 226 | ip_commands=ip_commands, |
| @@ -322,14 +345,12 @@ set -e | |||
| 322 | 345 | ||
| 323 | 346 | ||
| 324 | def machine_interface(name: str) -> str: | 347 | def machine_interface(name: str) -> str: |
| 325 | if name not in MACHINE_INTERFACES: | 348 | for machine, interface in MACHINE_INTERFACES: |
| 326 | raise ValueError(f"Unknown machine: {name}") | 349 | if machine == name: |
| 327 | 350 | if interface is None: | |
| 328 | interface = MACHINE_INTERFACES[name] | 351 | raise ValueError(f"No interface configured for machine: {name}") |
| 329 | if interface is None: | 352 | return interface |
| 330 | raise ValueError(f"No interface configured for machine: {name}") | 353 | raise ValueError(f"Unknown machine: {name}") |
| 331 | |||
| 332 | return interface | ||
| 333 | 354 | ||
| 334 | 355 | ||
| 335 | async def machine_cleanup_interface(job_id: int, machine: str): | 356 | async def machine_cleanup_interface(job_id: int, machine: str): |
| @@ -425,7 +446,7 @@ async def clean_command(job_id: int): | |||
| 425 | # Clean up all machines in parallel, but don't fail fast | 446 | # Clean up all machines in parallel, but don't fail fast |
| 426 | tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] | 447 | tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] |
| 427 | results = await asyncio.gather(*tasks, return_exceptions=True) | 448 | results = await asyncio.gather(*tasks, return_exceptions=True) |
| 428 | 449 | ||
| 429 | # Check for failures after all tasks complete | 450 | # Check for failures after all tasks complete |
| 430 | failures = [] | 451 | failures = [] |
| 431 | for machine, result in zip(machines, results): | 452 | for machine, result in zip(machines, results): |
| @@ -434,11 +455,13 @@ async def clean_command(job_id: int): | |||
| 434 | print(f"ERROR: Cleanup failed on {machine}: {result}") | 455 | print(f"ERROR: Cleanup failed on {machine}: {result}") |
| 435 | else: | 456 | else: |
| 436 | print(f"Cleanup completed successfully on {machine}") | 457 | print(f"Cleanup completed successfully on {machine}") |
| 437 | 458 | ||
| 438 | if failures: | 459 | if failures: |
| 439 | failed_machines = [machine for machine, _ in failures] | 460 | failed_machines = [machine for machine, _ in failures] |
| 440 | raise Exception(f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}") | 461 | raise Exception( |
| 441 | 462 | f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}" | |
| 463 | ) | ||
| 464 | |||
| 442 | print("Cleanup completed successfully on all machines") | 465 | print("Cleanup completed successfully on all machines") |
| 443 | 466 | ||
| 444 | 467 | ||
| @@ -462,6 +485,10 @@ async def configurations_command(job_id: int, addresses: int, latency_matrix_pat | |||
| 462 | print(f"Machine: {config.machine}") | 485 | print(f"Machine: {config.machine}") |
| 463 | print("-" * 40) | 486 | print("-" * 40) |
| 464 | 487 | ||
| 488 | print("\nAddresses:") | ||
| 489 | for addr in config.addresses: | ||
| 490 | print(addr) | ||
| 491 | |||
| 465 | print("NFT Script:") | 492 | print("NFT Script:") |
| 466 | print(config.nft_script) | 493 | print(config.nft_script) |
| 467 | 494 | ||
| @@ -480,7 +507,7 @@ async def main(): | |||
| 480 | 507 | ||
| 481 | # Setup command | 508 | # Setup command |
| 482 | setup_parser = subparsers.add_parser( | 509 | setup_parser = subparsers.add_parser( |
| 483 | "setup", help="Setup network interfaces and latencies" | 510 | "up", help="Setup network interfaces and latencies" |
| 484 | ) | 511 | ) |
| 485 | setup_parser.add_argument("job_id", type=int, help="OAR job ID") | 512 | setup_parser.add_argument("job_id", type=int, help="OAR job ID") |
| 486 | setup_parser.add_argument( | 513 | setup_parser.add_argument( |
| @@ -491,7 +518,7 @@ async def main(): | |||
| 491 | ) | 518 | ) |
| 492 | 519 | ||
| 493 | # Clean command | 520 | # Clean command |
| 494 | clean_parser = subparsers.add_parser("clean", help="Clean up network interfaces") | 521 | clean_parser = subparsers.add_parser("down", help="Clean up network interfaces") |
| 495 | clean_parser.add_argument("job_id", type=int, help="OAR job ID") | 522 | clean_parser.add_argument("job_id", type=int, help="OAR job ID") |
| 496 | 523 | ||
| 497 | # Configurations command | 524 | # Configurations command |
| @@ -500,7 +527,7 @@ async def main(): | |||
| 500 | ) | 527 | ) |
| 501 | config_parser.add_argument("job_id", type=int, help="OAR job ID") | 528 | config_parser.add_argument("job_id", type=int, help="OAR job ID") |
| 502 | config_parser.add_argument( | 529 | config_parser.add_argument( |
| 503 | "addresses", type=int, help="Number of addresses to allocate" | 530 | "addresses", type=int, help="Number of addresses to allocate per machine" |
| 504 | ) | 531 | ) |
| 505 | config_parser.add_argument( | 532 | config_parser.add_argument( |
| 506 | "latency_matrix", type=str, help="Path to latency matrix file" | 533 | "latency_matrix", type=str, help="Path to latency matrix file" |
| @@ -508,9 +535,9 @@ async def main(): | |||
| 508 | 535 | ||
| 509 | args = parser.parse_args() | 536 | args = parser.parse_args() |
| 510 | 537 | ||
| 511 | if args.command == "setup": | 538 | if args.command == "up": |
| 512 | await setup_command(args.job_id, args.addresses, args.latency_matrix) | 539 | await setup_command(args.job_id, args.addresses, args.latency_matrix) |
| 513 | elif args.command == "clean": | 540 | elif args.command == "down": |
| 514 | await clean_command(args.job_id) | 541 | await clean_command(args.job_id) |
| 515 | elif args.command == "configurations": | 542 | elif args.command == "configurations": |
| 516 | await configurations_command(args.job_id, args.addresses, args.latency_matrix) | 543 | await configurations_command(args.job_id, args.addresses, args.latency_matrix) |
| @@ -518,11 +545,11 @@ async def main(): | |||
| 518 | parser.print_help() | 545 | parser.print_help() |
| 519 | 546 | ||
| 520 | 547 | ||
| 521 | def address_from_index(index: int) -> str: | 548 | def address_from_index(machine_index: int, addr_index: int) -> str: |
| 522 | d = index % 254 | 549 | d = addr_index % 254 |
| 523 | c = (index // 254) % 254 | 550 | c = (addr_index // 254) % 254 |
| 524 | assert c <= 254 | 551 | assert c <= 254 |
| 525 | return f"10.0.{c}.{d+1}" | 552 | return f"10.{machine_index}.{c}.{d+1}" |
| 526 | 553 | ||
| 527 | 554 | ||
| 528 | if __name__ == "__main__": | 555 | if __name__ == "__main__": |
