aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-06-29 15:04:14 +0100
committerdiogo464 <[email protected]>2025-06-29 15:04:14 +0100
commitbb76625240ba60a1fc0d80d80ae0aefd56c66682 (patch)
tree4b684586161ee04ea25c5e37f409c03ac5dd75c7
parent71446823aba2d2250ccc4dde727751665b2308a1 (diff)
update
-rw-r--r--oar-p2p.py213
1 files changed, 120 insertions, 93 deletions
diff --git a/oar-p2p.py b/oar-p2p.py
index af24142..dcffa2b 100644
--- a/oar-p2p.py
+++ b/oar-p2p.py
@@ -8,65 +8,65 @@ from dataclasses import dataclass
8 8
9NFT_TABLE_NAME = "oar-p2p" 9NFT_TABLE_NAME = "oar-p2p"
10 10
11MACHINE_INTERFACES = { 11MACHINE_INTERFACES = [
12 "alakazam-01": None, 12 ("alakazam-01", None),
13 "alakazam-02": None, 13 ("alakazam-02", None),
14 "alakazam-03": None, 14 ("alakazam-03", None),
15 "alakazam-04": None, 15 ("alakazam-04", None),
16 "alakazam-05": None, 16 ("alakazam-05", None),
17 "alakazam-06": None, 17 ("alakazam-06", None),
18 "alakazam-07": None, 18 ("alakazam-07", None),
19 "alakazam-08": None, 19 ("alakazam-08", None),
20 "bulbasaur-1": None, 20 ("bulbasaur-1", None),
21 "bulbasaur-2": None, 21 ("bulbasaur-2", None),
22 "bulbasaur-3": None, 22 ("bulbasaur-3", None),
23 "charmander-1": "bond0", 23 ("charmander-1", "bond0"),
24 "charmander-2": "bond0", 24 ("charmander-2", "bond0"),
25 "charmander-3": "bond0", 25 ("charmander-3", "bond0"),
26 "charmander-4": "bond0", 26 ("charmander-4", "bond0"),
27 "charmander-5": "bond0", 27 ("charmander-5", "bond0"),
28 "gengar-1": None, 28 ("gengar-1", "bond0"),
29 "gengar-2": None, 29 ("gengar-2", "bond0"),
30 "gengar-3": None, 30 ("gengar-3", "bond0"),
31 "gengar-4": None, 31 ("gengar-4", "bond0"),
32 "gengar-5": None, 32 ("gengar-5", "bond0"),
33 "kadabra-01": None, 33 ("kadabra-01", None),
34 "kadabra-02": None, 34 ("kadabra-02", None),
35 "kadabra-03": None, 35 ("kadabra-03", None),
36 "kadabra-04": None, 36 ("kadabra-04", None),
37 "kadabra-05": None, 37 ("kadabra-05", None),
38 "kadabra-06": None, 38 ("kadabra-06", None),
39 "kadabra-07": None, 39 ("kadabra-07", None),
40 "kadabra-08": None, 40 ("kadabra-08", None),
41 "lugia-1": None, 41 ("lugia-1", None),
42 "lugia-2": None, 42 ("lugia-2", None),
43 "lugia-3": None, 43 ("lugia-3", None),
44 "lugia-4": None, 44 ("lugia-4", None),
45 "lugia-5": None, 45 ("lugia-5", None),
46 "magikarp-1": None, 46 ("magikarp-1", None),
47 "moltres-01": None, 47 ("moltres-01", None),
48 "moltres-02": None, 48 ("moltres-02", None),
49 "moltres-03": None, 49 ("moltres-03", None),
50 "moltres-04": None, 50 ("moltres-04", None),
51 "moltres-05": None, 51 ("moltres-05", None),
52 "moltres-06": None, 52 ("moltres-06", None),
53 "moltres-07": None, 53 ("moltres-07", None),
54 "moltres-08": None, 54 ("moltres-08", None),
55 "moltres-09": None, 55 ("moltres-09", None),
56 "moltres-10": None, 56 ("moltres-10", None),
57 "oddish-1": None, 57 ("oddish-1", None),
58 "psyduck-1": None, 58 ("psyduck-1", None),
59 "psyduck-2": None, 59 ("psyduck-2", None),
60 "psyduck-3": None, 60 ("psyduck-3", None),
61 "shelder-1": None, 61 ("shelder-1", None),
62 "squirtle-1": None, 62 ("squirtle-1", None),
63 "squirtle-2": None, 63 ("squirtle-2", None),
64 "squirtle-3": None, 64 ("squirtle-3", None),
65 "squirtle-4": None, 65 ("squirtle-4", None),
66 "staryu-1": None, 66 ("staryu-1", None),
67 "sudowoodo-1": None, 67 ("sudowoodo-1", None),
68 "vulpix-1": None, 68 ("vulpix-1", None),
69} 69]
70 70
71 71
72class LatencyMatrix: 72class LatencyMatrix:
@@ -105,29 +105,43 @@ class LatencyMatrix:
105@dataclass 105@dataclass
106class MachineConfiguration: 106class MachineConfiguration:
107 machine: str 107 machine: str
108 addresses: list[str]
108 nft_script: str 109 nft_script: str
109 tc_commands: list[str] 110 tc_commands: list[str]
110 ip_commands: list[str] 111 ip_commands: list[str]
111 112
112 113
113def machine_get_interface(machine: str) -> str: 114def machine_get_interface(machine: str) -> str:
114 interface = MACHINE_INTERFACES.get(machine, None) 115 for name, interface in MACHINE_INTERFACES:
115 assert interface is not None, f"machine interface not configured: {machine}" 116 if name == machine:
116 return interface 117 assert interface is not None, f"machine interface not configured: {machine}"
118 return interface
119 raise ValueError(f"Unknown machine: {machine}")
120
121
122def machine_get_index(machine: str) -> int:
123 for i, (name, _) in enumerate(MACHINE_INTERFACES):
124 if name == machine:
125 return i
126 raise ValueError(f"Unknown machine: {machine}")
117 127
118 128
119def machine_generate_configurations( 129def machine_generate_configurations(
120 machines: list[str], num_addresses: int, matrix: LatencyMatrix 130 machines: list[str], num_addresses_per_machine: int, matrix: LatencyMatrix
121) -> list[MachineConfiguration]: 131) -> list[MachineConfiguration]:
122 configurations = [] 132 configurations = []
123 133
124 machine_addr_idxs = defaultdict(list) 134 machine_addr_idxs = defaultdict(list)
125 135 addr_idx_to_machine_idx = defaultdict(int)
126 for i in range(num_addresses): 136 for machine_idx in range(len(machines)):
127 machine = machines[i % len(machines)] 137 for local_addr_idx in range(num_addresses_per_machine):
128 machine_addr_idxs[machine].append(i) 138 addr_idx = machine_idx * num_addresses_per_machine + local_addr_idx
139 machine_addr_idxs[machines[machine_idx]].append(addr_idx)
140 addr_idx_to_machine_idx[addr_idx] = machine_get_index(machines[machine_idx])
129 141
130 for machine in machines: 142 for machine in machines:
143 machine_ips = []
144 machine_index = machine_get_index(machine)
131 interface = machine_get_interface(machine) 145 interface = machine_get_interface(machine)
132 addr_idxs = machine_addr_idxs[machine] 146 addr_idxs = machine_addr_idxs[machine]
133 ip_commands = [] 147 ip_commands = []
@@ -135,14 +149,16 @@ def machine_generate_configurations(
135 149
136 ip_commands.append(f"route add 10.0.0.0/8 dev {interface}") 150 ip_commands.append(f"route add 10.0.0.0/8 dev {interface}")
137 for addr_idx in addr_idxs: 151 for addr_idx in addr_idxs:
138 ip_commands.append( 152 addr = address_from_index(
139 f"addr add {address_from_index(addr_idx)}/32 dev {interface}" 153 machine_index, addr_idx % num_addresses_per_machine
140 ) 154 )
155 machine_ips.append(addr)
156 ip_commands.append(f"addr add {addr}/32 dev {interface}")
141 157
142 latencies_set = set() 158 latencies_set = set()
143 latencies_buckets = defaultdict(list) 159 latencies_buckets = defaultdict(list)
144 for addr_idx in addr_idxs: 160 for addr_idx in addr_idxs:
145 for i in range(num_addresses): 161 for i in range(num_addresses_per_machine):
146 if addr_idx == i: 162 if addr_idx == i:
147 continue 163 continue
148 latency = matrix.get_latency(addr_idx, i) 164 latency = matrix.get_latency(addr_idx, i)
@@ -180,8 +196,14 @@ def machine_generate_configurations(
180 nft_script += f" elements = {{\n" 196 nft_script += f" elements = {{\n"
181 for src_idx, dst_idx in latencies_buckets[latency]: 197 for src_idx, dst_idx in latencies_buckets[latency]:
182 assert src_idx != dst_idx 198 assert src_idx != dst_idx
183 src_addr = address_from_index(src_idx) 199 src_addr = address_from_index(
184 dst_addr = address_from_index(dst_idx) 200 addr_idx_to_machine_idx[src_idx],
201 src_idx % num_addresses_per_machine,
202 )
203 dst_addr = address_from_index(
204 addr_idx_to_machine_idx[dst_idx],
205 dst_idx % num_addresses_per_machine,
206 )
185 nft_script += f" {src_addr} . {dst_addr},\n" 207 nft_script += f" {src_addr} . {dst_addr},\n"
186 nft_script += f" }}\n" 208 nft_script += f" }}\n"
187 nft_script += f" }}\n\n" 209 nft_script += f" }}\n\n"
@@ -198,6 +220,7 @@ def machine_generate_configurations(
198 configurations.append( 220 configurations.append(
199 MachineConfiguration( 221 MachineConfiguration(
200 machine=machine, 222 machine=machine,
223 addresses=machine_ips,
201 nft_script=nft_script, 224 nft_script=nft_script,
202 tc_commands=tc_commands, 225 tc_commands=tc_commands,
203 ip_commands=ip_commands, 226 ip_commands=ip_commands,
@@ -322,14 +345,12 @@ set -e
322 345
323 346
324def machine_interface(name: str) -> str: 347def machine_interface(name: str) -> str:
325 if name not in MACHINE_INTERFACES: 348 for machine, interface in MACHINE_INTERFACES:
326 raise ValueError(f"Unknown machine: {name}") 349 if machine == name:
327 350 if interface is None:
328 interface = MACHINE_INTERFACES[name] 351 raise ValueError(f"No interface configured for machine: {name}")
329 if interface is None: 352 return interface
330 raise ValueError(f"No interface configured for machine: {name}") 353 raise ValueError(f"Unknown machine: {name}")
331
332 return interface
333 354
334 355
335async def machine_cleanup_interface(job_id: int, machine: str): 356async def machine_cleanup_interface(job_id: int, machine: str):
@@ -425,7 +446,7 @@ async def clean_command(job_id: int):
425 # Clean up all machines in parallel, but don't fail fast 446 # Clean up all machines in parallel, but don't fail fast
426 tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] 447 tasks = [machine_cleanup_interface(job_id, machine) for machine in machines]
427 results = await asyncio.gather(*tasks, return_exceptions=True) 448 results = await asyncio.gather(*tasks, return_exceptions=True)
428 449
429 # Check for failures after all tasks complete 450 # Check for failures after all tasks complete
430 failures = [] 451 failures = []
431 for machine, result in zip(machines, results): 452 for machine, result in zip(machines, results):
@@ -434,11 +455,13 @@ async def clean_command(job_id: int):
434 print(f"ERROR: Cleanup failed on {machine}: {result}") 455 print(f"ERROR: Cleanup failed on {machine}: {result}")
435 else: 456 else:
436 print(f"Cleanup completed successfully on {machine}") 457 print(f"Cleanup completed successfully on {machine}")
437 458
438 if failures: 459 if failures:
439 failed_machines = [machine for machine, _ in failures] 460 failed_machines = [machine for machine, _ in failures]
440 raise Exception(f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}") 461 raise Exception(
441 462 f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}"
463 )
464
442 print("Cleanup completed successfully on all machines") 465 print("Cleanup completed successfully on all machines")
443 466
444 467
@@ -462,6 +485,10 @@ async def configurations_command(job_id: int, addresses: int, latency_matrix_pat
462 print(f"Machine: {config.machine}") 485 print(f"Machine: {config.machine}")
463 print("-" * 40) 486 print("-" * 40)
464 487
488 print("\nAddresses:")
489 for addr in config.addresses:
490 print(addr)
491
465 print("NFT Script:") 492 print("NFT Script:")
466 print(config.nft_script) 493 print(config.nft_script)
467 494
@@ -480,7 +507,7 @@ async def main():
480 507
481 # Setup command 508 # Setup command
482 setup_parser = subparsers.add_parser( 509 setup_parser = subparsers.add_parser(
483 "setup", help="Setup network interfaces and latencies" 510 "up", help="Setup network interfaces and latencies"
484 ) 511 )
485 setup_parser.add_argument("job_id", type=int, help="OAR job ID") 512 setup_parser.add_argument("job_id", type=int, help="OAR job ID")
486 setup_parser.add_argument( 513 setup_parser.add_argument(
@@ -491,7 +518,7 @@ async def main():
491 ) 518 )
492 519
493 # Clean command 520 # Clean command
494 clean_parser = subparsers.add_parser("clean", help="Clean up network interfaces") 521 clean_parser = subparsers.add_parser("down", help="Clean up network interfaces")
495 clean_parser.add_argument("job_id", type=int, help="OAR job ID") 522 clean_parser.add_argument("job_id", type=int, help="OAR job ID")
496 523
497 # Configurations command 524 # Configurations command
@@ -500,7 +527,7 @@ async def main():
500 ) 527 )
501 config_parser.add_argument("job_id", type=int, help="OAR job ID") 528 config_parser.add_argument("job_id", type=int, help="OAR job ID")
502 config_parser.add_argument( 529 config_parser.add_argument(
503 "addresses", type=int, help="Number of addresses to allocate" 530 "addresses", type=int, help="Number of addresses to allocate per machine"
504 ) 531 )
505 config_parser.add_argument( 532 config_parser.add_argument(
506 "latency_matrix", type=str, help="Path to latency matrix file" 533 "latency_matrix", type=str, help="Path to latency matrix file"
@@ -508,9 +535,9 @@ async def main():
508 535
509 args = parser.parse_args() 536 args = parser.parse_args()
510 537
511 if args.command == "setup": 538 if args.command == "up":
512 await setup_command(args.job_id, args.addresses, args.latency_matrix) 539 await setup_command(args.job_id, args.addresses, args.latency_matrix)
513 elif args.command == "clean": 540 elif args.command == "down":
514 await clean_command(args.job_id) 541 await clean_command(args.job_id)
515 elif args.command == "configurations": 542 elif args.command == "configurations":
516 await configurations_command(args.job_id, args.addresses, args.latency_matrix) 543 await configurations_command(args.job_id, args.addresses, args.latency_matrix)
@@ -518,11 +545,11 @@ async def main():
518 parser.print_help() 545 parser.print_help()
519 546
520 547
521def address_from_index(index: int) -> str: 548def address_from_index(machine_index: int, addr_index: int) -> str:
522 d = index % 254 549 d = addr_index % 254
523 c = (index // 254) % 254 550 c = (addr_index // 254) % 254
524 assert c <= 254 551 assert c <= 254
525 return f"10.0.{c}.{d+1}" 552 return f"10.{machine_index}.{c}.{d+1}"
526 553
527 554
528if __name__ == "__main__": 555if __name__ == "__main__":