diff options
| -rw-r--r-- | Containerfile | 3 | ||||
| -rw-r--r-- | oar-p2p.py | 490 |
2 files changed, 337 insertions, 156 deletions
diff --git a/Containerfile b/Containerfile index 896a128..086b7d9 100644 --- a/Containerfile +++ b/Containerfile | |||
| @@ -22,7 +22,8 @@ FROM docker.io/alpine:3.22 | |||
| 22 | # Install runtime dependencies | 22 | # Install runtime dependencies |
| 23 | RUN apk add --no-cache \ | 23 | RUN apk add --no-cache \ |
| 24 | iproute2 \ | 24 | iproute2 \ |
| 25 | iproute2-tc | 25 | iproute2-tc \ |
| 26 | nftables | ||
| 26 | 27 | ||
| 27 | # Copy the binary from builder stage | 28 | # Copy the binary from builder stage |
| 28 | COPY --from=builder /app/target/release/oar-p2p /usr/local/bin/oar-p2p | 29 | COPY --from=builder /app/target/release/oar-p2p /usr/local/bin/oar-p2p |
| @@ -1,6 +1,12 @@ | |||
| 1 | import json | ||
| 2 | import math | ||
| 1 | import asyncio | 3 | import asyncio |
| 2 | import argparse | 4 | import argparse |
| 3 | import json | 5 | |
| 6 | from collections import defaultdict | ||
| 7 | from dataclasses import dataclass | ||
| 8 | |||
| 9 | NFT_TABLE_NAME = "oar-p2p" | ||
| 4 | 10 | ||
| 5 | MACHINE_INTERFACES = { | 11 | MACHINE_INTERFACES = { |
| 6 | "alakazam-01": None, | 12 | "alakazam-01": None, |
| @@ -70,35 +76,172 @@ class LatencyMatrix: | |||
| 70 | # Assert matrix is square | 76 | # Assert matrix is square |
| 71 | size = len(matrix) | 77 | size = len(matrix) |
| 72 | for row in matrix: | 78 | for row in matrix: |
| 73 | assert len(row) == size, f"Matrix must be square: expected {size} columns, got {len(row)}" | 79 | assert ( |
| 80 | len(row) == size | ||
| 81 | ), f"Matrix must be square: expected {size} columns, got {len(row)}" | ||
| 74 | self.matrix = matrix | 82 | self.matrix = matrix |
| 75 | 83 | ||
| 76 | @staticmethod | 84 | @staticmethod |
| 77 | def read_from_file(file_path: str) -> 'LatencyMatrix': | 85 | def read_from_file(file_path: str) -> "LatencyMatrix": |
| 78 | """Read a latency matrix from a file and return as LatencyMatrix instance.""" | 86 | """Read a latency matrix from a file and return as LatencyMatrix instance.""" |
| 79 | matrix = [] | 87 | matrix = [] |
| 80 | with open(file_path, 'r') as f: | 88 | with open(file_path, "r") as f: |
| 81 | for line in f: | 89 | for line in f: |
| 82 | line = line.strip() | 90 | line = line.strip() |
| 83 | if line and not line.startswith('#'): # Skip empty lines and comments | 91 | if line and not line.startswith("#"): # Skip empty lines and comments |
| 84 | row = [float(x) for x in line.split()] | 92 | row = [float(x) for x in line.split()] |
| 85 | matrix.append(row) | 93 | matrix.append(row) |
| 86 | return LatencyMatrix(matrix) | 94 | return LatencyMatrix(matrix) |
| 87 | 95 | ||
| 88 | def get_latency(self, src_idx: int, dst_idx: int) -> float: | 96 | def get_latency(self, src_idx: int, dst_idx: int) -> float: |
| 89 | """Get the latency value from source index to destination index.""" | 97 | """Get the latency value from source index to destination index.""" |
| 90 | return self.matrix[src_idx][dst_idx] | 98 | return self.matrix[src_idx][dst_idx] |
| 91 | 99 | ||
| 92 | def size(self) -> int: | 100 | def size(self) -> int: |
| 93 | """Get the size of the square matrix.""" | 101 | """Get the size of the square matrix.""" |
| 94 | return len(self.matrix) | 102 | return len(self.matrix) |
| 95 | 103 | ||
| 96 | 104 | ||
| 97 | def address_from_index(index: int) -> str: | 105 | @dataclass |
| 98 | d = index % 254 | 106 | class MachineConfiguration: |
| 99 | c = (index // 254) % 254 | 107 | machine: str |
| 100 | assert c <= 254 | 108 | nft_script: str |
| 101 | return f"10.0.{c}.{d+1}" | 109 | tc_commands: list[str] |
| 110 | ip_commands: list[str] | ||
| 111 | |||
| 112 | |||
| 113 | def machine_get_interface(machine: str) -> str: | ||
| 114 | interface = MACHINE_INTERFACES.get(machine, None) | ||
| 115 | assert interface is not None, f"machine interface not configured: {machine}" | ||
| 116 | return interface | ||
| 117 | |||
| 118 | |||
| 119 | def machine_generate_configurations( | ||
| 120 | machines: list[str], num_addresses: int, matrix: LatencyMatrix | ||
| 121 | ) -> list[MachineConfiguration]: | ||
| 122 | configurations = [] | ||
| 123 | |||
| 124 | machine_addr_idxs = defaultdict(list) | ||
| 125 | |||
| 126 | for i in range(num_addresses): | ||
| 127 | machine = machines[i % len(machines)] | ||
| 128 | machine_addr_idxs[machine].append(i) | ||
| 129 | |||
| 130 | for machine in machines: | ||
| 131 | interface = machine_get_interface(machine) | ||
| 132 | addr_idxs = machine_addr_idxs[machine] | ||
| 133 | ip_commands = [] | ||
| 134 | tc_commands = [] | ||
| 135 | |||
| 136 | ip_commands.append(f"route add 10.0.0.0/8 dev {interface}") | ||
| 137 | for addr_idx in addr_idxs: | ||
| 138 | ip_commands.append( | ||
| 139 | f"addr add {address_from_index(addr_idx)}/32 dev {interface}" | ||
| 140 | ) | ||
| 141 | |||
| 142 | latencies_set = set() | ||
| 143 | latencies_buckets = defaultdict(list) | ||
| 144 | for addr_idx in addr_idxs: | ||
| 145 | for i in range(num_addresses): | ||
| 146 | if addr_idx == i: | ||
| 147 | continue | ||
| 148 | latency = matrix.get_latency(addr_idx, i) | ||
| 149 | latency_rounded = math.ceil(latency) // 1 | ||
| 150 | latencies_set.add(latency_rounded) | ||
| 151 | latencies_buckets[latency_rounded].append((addr_idx, i)) | ||
| 152 | |||
| 153 | latencies = list(sorted(latencies_set)) | ||
| 154 | |||
| 155 | tc_commands.append(f"qdisc add dev {interface} root handle 1: htb default 9999") | ||
| 156 | tc_commands.append( | ||
| 157 | f"class add dev {interface} parent 1: classid 1:9999 htb rate 10gbit" | ||
| 158 | ) | ||
| 159 | for idx, latency in enumerate(latencies): | ||
| 160 | # tc class for latency at idx X is X + 1 | ||
| 161 | tc_commands.append( | ||
| 162 | f"class add dev {interface} parent 1: classid 1:{idx+1} htb rate 10gbit" | ||
| 163 | ) | ||
| 164 | tc_commands.append( | ||
| 165 | f"qdisc add dev {interface} parent 1:{idx+1} handle {idx+2}: netem delay {latency}ms" | ||
| 166 | ) | ||
| 167 | # mark for latency at idx X is X + 1 | ||
| 168 | tc_commands.append( | ||
| 169 | f"filter add dev {interface} parent 1:0 prio 1 handle {idx+1} fw flowid 1:{idx+1}" | ||
| 170 | ) | ||
| 171 | |||
| 172 | nft_script = "" | ||
| 173 | nft_script += "table ip oar-p2p {" + "\n" | ||
| 174 | for latency_idx, latency in enumerate(latencies): | ||
| 175 | if len(latencies_buckets[latency]) == 0: | ||
| 176 | continue | ||
| 177 | nft_script += f" set mark_{latency_idx}_pairs {{\n" | ||
| 178 | nft_script += f" type ipv4_addr . ipv4_addr\n" | ||
| 179 | nft_script += f" flags interval\n" | ||
| 180 | nft_script += f" elements = {{\n" | ||
| 181 | for src_idx, dst_idx in latencies_buckets[latency]: | ||
| 182 | assert src_idx != dst_idx | ||
| 183 | src_addr = address_from_index(src_idx) | ||
| 184 | dst_addr = address_from_index(dst_idx) | ||
| 185 | nft_script += f" {src_addr} . {dst_addr},\n" | ||
| 186 | nft_script += f" }}\n" | ||
| 187 | nft_script += f" }}\n\n" | ||
| 188 | |||
| 189 | nft_script += " chain postrouting {\n" | ||
| 190 | nft_script += " type filter hook postrouting priority mangle - 1\n" | ||
| 191 | nft_script += " policy accept\n" | ||
| 192 | nft_script += "\n" | ||
| 193 | for latency_idx in range(len(latencies)): | ||
| 194 | nft_script += f" ip saddr . ip daddr @mark_{latency_idx}_pairs meta mark set {latency_idx+1}\n" | ||
| 195 | nft_script += " }" + "\n" | ||
| 196 | nft_script += "}" + "\n" | ||
| 197 | |||
| 198 | configurations.append( | ||
| 199 | MachineConfiguration( | ||
| 200 | machine=machine, | ||
| 201 | nft_script=nft_script, | ||
| 202 | tc_commands=tc_commands, | ||
| 203 | ip_commands=ip_commands, | ||
| 204 | ) | ||
| 205 | ) | ||
| 206 | |||
| 207 | return configurations | ||
| 208 | |||
| 209 | |||
| 210 | async def machine_apply_configuration(job_id: int, config: MachineConfiguration): | ||
| 211 | """Apply a machine configuration by executing IP commands, TC commands, and NFT script.""" | ||
| 212 | machine = config.machine | ||
| 213 | |||
| 214 | print(f"Applying configuration to {machine}...") | ||
| 215 | |||
| 216 | # Prepare tasks for parallel execution | ||
| 217 | tasks = [] | ||
| 218 | |||
| 219 | # IP commands task | ||
| 220 | if config.ip_commands: | ||
| 221 | ip_batch = "\n".join(config.ip_commands) | ||
| 222 | print(f"Executing {len(config.ip_commands)} IP commands on {machine}") | ||
| 223 | tasks.append(run_script_in_docker(job_id, machine, "ip -b -", ip_batch)) | ||
| 224 | |||
| 225 | # TC commands task | ||
| 226 | if config.tc_commands: | ||
| 227 | tc_batch = "\n".join(config.tc_commands) | ||
| 228 | print(f"Executing {len(config.tc_commands)} TC commands on {machine}") | ||
| 229 | tasks.append(run_script_in_docker(job_id, machine, "tc -b -", tc_batch)) | ||
| 230 | |||
| 231 | # NFT script task | ||
| 232 | if config.nft_script: | ||
| 233 | print(f"Applying NFT script on {machine}") | ||
| 234 | tasks.append( | ||
| 235 | run_script_in_docker(job_id, machine, "nft -f -", config.nft_script) | ||
| 236 | ) | ||
| 237 | |||
| 238 | # Execute all tasks in parallel | ||
| 239 | if tasks: | ||
| 240 | try: | ||
| 241 | await asyncio.gather(*tasks) | ||
| 242 | except Exception as e: | ||
| 243 | print(f"ERROR applying configuration to {machine}: {e}") | ||
| 244 | raise | ||
| 102 | 245 | ||
| 103 | 246 | ||
| 104 | async def oar_job_list_machines(job_id: int) -> list[str]: | 247 | async def oar_job_list_machines(job_id: int) -> list[str]: |
| @@ -121,14 +264,14 @@ async def oar_job_list_machines(job_id: int) -> list[str]: | |||
| 121 | return data[str(job_id)]["assigned_network_address"] | 264 | return data[str(job_id)]["assigned_network_address"] |
| 122 | 265 | ||
| 123 | 266 | ||
| 124 | async def run_script_in_docker(job_id: int, machine: str, script: str, stdin_data: str = None) -> str: | 267 | async def run_script_in_docker( |
| 125 | # Prepare the full script with package installation | 268 | job_id: int, machine: str, script: str, stdin_data: str | None = None |
| 269 | ) -> str: | ||
| 270 | # Prepare the script (no package installation needed with custom image) | ||
| 126 | if stdin_data: | 271 | if stdin_data: |
| 127 | # If stdin_data is provided, create a script that pipes it to the command | 272 | # If stdin_data is provided, create a script that pipes it to the command |
| 128 | full_script = f"""#!/bin/bash | 273 | full_script = f"""#!/bin/bash |
| 129 | set -e | 274 | set -e |
| 130 | apk update >/dev/null 2>&1 | ||
| 131 | apk add iproute2 iproute2-tc >/dev/null 2>&1 | ||
| 132 | cat << 'STDIN_EOF' | {script} | 275 | cat << 'STDIN_EOF' | {script} |
| 133 | {stdin_data} | 276 | {stdin_data} |
| 134 | STDIN_EOF | 277 | STDIN_EOF |
| @@ -136,27 +279,45 @@ STDIN_EOF | |||
| 136 | else: | 279 | else: |
| 137 | full_script = f"""#!/bin/bash | 280 | full_script = f"""#!/bin/bash |
| 138 | set -e | 281 | set -e |
| 139 | apk update >/dev/null 2>&1 | ||
| 140 | apk add iproute2 iproute2-tc >/dev/null 2>&1 | ||
| 141 | {script} | 282 | {script} |
| 142 | """ | 283 | """ |
| 143 | 284 | ||
| 144 | # Run the script in an Alpine Docker container via SSH | 285 | # Run the script in our custom networking Docker container via SSH |
| 145 | proc = await asyncio.create_subprocess_exec( | 286 | proc = await asyncio.create_subprocess_exec( |
| 146 | "oarsh", machine, | 287 | "oarsh", |
| 147 | "docker", "run", "--rm", "--privileged", "--net=host", | 288 | machine, |
| 148 | "-i", "alpine:latest", "sh", | 289 | "docker", |
| 290 | "run", | ||
| 291 | "--rm", | ||
| 292 | "--privileged", | ||
| 293 | "--pull=always", | ||
| 294 | "--net=host", | ||
| 295 | "-i", | ||
| 296 | "ghcr.io/diogo464/oar-p2p-networking:latest", | ||
| 149 | env={"OAR_JOB_ID": str(job_id)}, | 297 | env={"OAR_JOB_ID": str(job_id)}, |
| 150 | stdin=asyncio.subprocess.PIPE, | 298 | stdin=asyncio.subprocess.PIPE, |
| 151 | stdout=asyncio.subprocess.PIPE, | 299 | stdout=asyncio.subprocess.PIPE, |
| 152 | stderr=asyncio.subprocess.PIPE, | 300 | stderr=asyncio.subprocess.PIPE, |
| 153 | ) | 301 | ) |
| 154 | 302 | ||
| 155 | stdout, stderr = await proc.communicate(input=full_script.encode()) | 303 | stdout, stderr = await proc.communicate(input=full_script.encode()) |
| 156 | 304 | ||
| 157 | if proc.returncode != 0: | 305 | if proc.returncode != 0: |
| 158 | raise Exception(f"Script execution failed on {machine}: {stderr.decode()}") | 306 | cmd_args = [ |
| 159 | 307 | "oarsh", | |
| 308 | machine, | ||
| 309 | "docker", | ||
| 310 | "run", | ||
| 311 | "--rm", | ||
| 312 | "--privileged", | ||
| 313 | "--net=host", | ||
| 314 | "-i", | ||
| 315 | "ghcr.io/diogo464/oar-p2p-networking:latest", | ||
| 316 | ] | ||
| 317 | raise Exception( | ||
| 318 | f"Script execution failed on {machine}\nCommand: {' '.join(cmd_args)}\nScript: {script}\nStderr: {stderr.decode()}" | ||
| 319 | ) | ||
| 320 | |||
| 160 | return stdout.decode() | 321 | return stdout.decode() |
| 161 | 322 | ||
| 162 | 323 | ||
| @@ -170,168 +331,187 @@ def machine_interface(name: str) -> str: | |||
| 170 | 331 | ||
| 171 | return interface | 332 | return interface |
| 172 | 333 | ||
| 173 | async def machine_prepare_interface(job_id: int, machine: str): | 334 | |
| 335 | async def machine_cleanup_interface(job_id: int, machine: str): | ||
| 174 | interface = machine_interface(machine) | 336 | interface = machine_interface(machine) |
| 175 | 337 | ||
| 176 | # Get interface information | 338 | # Get interface information |
| 177 | get_addr_script = f"ip -j addr show {interface}" | 339 | get_addr_script = f"ip -j addr show {interface}" |
| 178 | stdout = await run_script_in_docker(job_id, machine, get_addr_script) | 340 | stdout = await run_script_in_docker(job_id, machine, get_addr_script) |
| 179 | 341 | ||
| 180 | if not stdout.strip(): | 342 | if not stdout.strip(): |
| 181 | print(f"No interface info for {machine}, skipping prepare") | 343 | print(f"No interface info for {machine}, skipping cleanup") |
| 182 | return | 344 | return |
| 183 | 345 | ||
| 184 | # Parse JSON output | 346 | # Parse JSON output |
| 185 | interface_data = json.loads(stdout) | 347 | interface_data = json.loads(stdout) |
| 186 | 348 | ||
| 187 | # Extract addresses that start with '10.' | 349 | # Extract addresses that start with '10.' |
| 188 | commands = [] | 350 | commands = [] |
| 189 | for iface in interface_data: | 351 | for iface in interface_data: |
| 190 | if "addr_info" in iface: | 352 | if "addr_info" in iface: |
| 191 | for addr in iface["addr_info"]: | 353 | for addr in iface["addr_info"]: |
| 192 | if addr.get("family") == "inet" and addr.get("local", "").startswith("10."): | 354 | if addr.get("family") == "inet" and addr.get("local", "").startswith( |
| 355 | "10." | ||
| 356 | ): | ||
| 193 | ip = addr["local"] | 357 | ip = addr["local"] |
| 194 | commands.append(f"ip addr del {ip}/32 dev {interface}") | 358 | commands.append(f"ip addr del {ip}/32 dev {interface}") |
| 195 | 359 | ||
| 196 | # Remove 10.0.0.0/8 route if it exists | 360 | # Remove 10.0.0.0/8 route if it exists |
| 197 | commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") | 361 | commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") |
| 198 | 362 | ||
| 199 | if len(commands) == 1: # Only the route command | 363 | if len(commands) == 1: # Only the route command |
| 200 | print(f"No 10.x addresses to remove from {machine}, only cleaning up route") | 364 | print(f"No 10.x addresses to remove from {machine}, only cleaning up route") |
| 201 | else: | 365 | else: |
| 202 | print(f"Removing {len(commands)-1} addresses and route from {machine}") | 366 | print(f"Removing {len(commands)-1} addresses and route from {machine}") |
| 203 | 367 | ||
| 204 | # Execute batch commands | 368 | # Execute batch commands and clean TC state and NFT table in parallel |
| 205 | remove_script = "\n".join(commands) | 369 | remove_script = "\n".join(commands) |
| 206 | await run_script_in_docker(job_id, machine, remove_script) | 370 | tasks = [ |
| 207 | 371 | run_script_in_docker(job_id, machine, remove_script), | |
| 208 | # Remove existing tc qdiscs separately (ignore errors) | 372 | run_script_in_docker( |
| 209 | await run_script_in_docker(job_id, machine, f"tc qdisc del dev {interface} root 2>/dev/null || true") | 373 | job_id, machine, f"tc qdisc del dev {interface} root 2>/dev/null || true" |
| 210 | 374 | ), | |
| 375 | run_script_in_docker( | ||
| 376 | job_id, machine, f"tc qdisc del dev {interface} ingress 2>/dev/null || true" | ||
| 377 | ), | ||
| 378 | run_script_in_docker( | ||
| 379 | job_id, machine, f"nft delete table {NFT_TABLE_NAME} 2>/dev/null || true" | ||
| 380 | ), | ||
| 381 | ] | ||
| 382 | await asyncio.gather(*tasks) | ||
| 383 | |||
| 211 | # Small delay to ensure cleanup is complete | 384 | # Small delay to ensure cleanup is complete |
| 212 | await asyncio.sleep(0.1) | 385 | await asyncio.sleep(0.2) |
| 213 | 386 | ||
| 214 | 387 | ||
| 215 | async def machine_configure_interface(job_id: int, machine: str, address_indices: list[int]): | 388 | async def setup_command(job_id: int, addresses: int, latency_matrix_path: str): |
| 216 | interface = machine_interface(machine) | 389 | # Load latency matrix |
| 217 | 390 | latency_matrix = LatencyMatrix.read_from_file(latency_matrix_path) | |
| 218 | if not address_indices: | ||
| 219 | return # No addresses to add | ||
| 220 | |||
| 221 | # Generate IP addresses from indices | ||
| 222 | ip_addresses = [address_from_index(idx) for idx in address_indices] | ||
| 223 | |||
| 224 | # Prepare ip commands without the 'ip' prefix for batch execution | ||
| 225 | commands = [] | ||
| 226 | for ip in ip_addresses: | ||
| 227 | commands.append(f"addr add {ip}/32 dev {interface}") | ||
| 228 | |||
| 229 | # Add route for 10.0.0.0/8 | ||
| 230 | commands.append(f"route add 10.0.0.0/8 dev {interface}") | ||
| 231 | |||
| 232 | print(f"Adding {len(ip_addresses)} addresses and route to {machine}") | ||
| 233 | |||
| 234 | # Execute batch commands using ip -b - | ||
| 235 | commands_data = "\n".join(commands) | ||
| 236 | await run_script_in_docker(job_id, machine, "ip -b -", commands_data) | ||
| 237 | |||
| 238 | |||
| 239 | async def machine_configure_latencies(job_id: int, machine: str, address_indices: list[int], latency_matrix: LatencyMatrix): | ||
| 240 | interface = machine_interface(machine) | ||
| 241 | |||
| 242 | if not address_indices: | ||
| 243 | return # No addresses to configure | ||
| 244 | |||
| 245 | # Generate tc commands for latency configuration (without 'tc' prefix) | ||
| 246 | commands = [] | ||
| 247 | |||
| 248 | # Create root qdisc with enough bands for our rules | ||
| 249 | max_bands = min(len(address_indices) * latency_matrix.size(), 16) # prio qdisc supports max 16 bands | ||
| 250 | commands.append(f"qdisc add dev {interface} root handle 1: prio bands {max_bands}") | ||
| 251 | |||
| 252 | # For each src->dst pair, create a simple netem rule | ||
| 253 | filter_counter = 1 | ||
| 254 | band_counter = 1 | ||
| 255 | for src_idx in address_indices: | ||
| 256 | src_ip = address_from_index(src_idx) | ||
| 257 | |||
| 258 | for dst_idx in range(latency_matrix.size()): | ||
| 259 | if src_idx != dst_idx and src_idx < latency_matrix.size() and dst_idx < latency_matrix.size(): # Skip self-to-self and out-of-bounds | ||
| 260 | dst_ip = address_from_index(dst_idx) | ||
| 261 | latency = latency_matrix.get_latency(src_idx, dst_idx) | ||
| 262 | |||
| 263 | # Use bands cyclically since prio has limited bands | ||
| 264 | band = (band_counter % max_bands) + 1 | ||
| 265 | |||
| 266 | # Create a unique handle for this rule (must be different from root handle) | ||
| 267 | handle = f"{filter_counter + 100}:" | ||
| 268 | |||
| 269 | # Add netem qdisc to the appropriate band | ||
| 270 | commands.append(f"qdisc add dev {interface} parent 1:{band} handle {handle} netem delay {latency}ms") | ||
| 271 | |||
| 272 | # Add filter to match traffic from src_ip to dst_ip | ||
| 273 | commands.append(f"filter add dev {interface} protocol ip parent 1: prio {filter_counter} u32 match ip src {src_ip} match ip dst {dst_ip} flowid 1:{band}") | ||
| 274 | |||
| 275 | filter_counter += 1 | ||
| 276 | band_counter += 1 | ||
| 277 | |||
| 278 | if not commands: | ||
| 279 | print(f"No latency configuration needed for {machine}") | ||
| 280 | return | ||
| 281 | |||
| 282 | print(f"Configuring latencies with {filter_counter-1} rules on {machine}") | ||
| 283 | |||
| 284 | # Execute batch tc commands using tc -b - | ||
| 285 | tc_commands = "\n".join(commands) | ||
| 286 | await run_script_in_docker(job_id, machine, "tc -b -", tc_commands) | ||
| 287 | 391 | ||
| 392 | # Get machines from job | ||
| 393 | machines = await oar_job_list_machines(job_id) | ||
| 288 | 394 | ||
| 395 | print(f"Machines: {machines}") | ||
| 396 | print(f"Total addresses: {addresses}") | ||
| 289 | 397 | ||
| 290 | async def main(): | 398 | # Generate configurations for all machines |
| 291 | parser = argparse.ArgumentParser(description="OAR P2P network setup") | 399 | configurations = machine_generate_configurations( |
| 292 | parser.add_argument("job_id", type=int, help="OAR job ID") | 400 | machines, addresses, latency_matrix |
| 293 | parser.add_argument("addresses", type=int, help="Number of addresses to allocate") | 401 | ) |
| 294 | parser.add_argument("latency_matrix", type=str, help="Path to latency matrix file") | ||
| 295 | 402 | ||
| 296 | args = parser.parse_args() | 403 | # Apply configurations to each machine in parallel |
| 404 | async def setup_machine(config: MachineConfiguration): | ||
| 405 | if config.machine == "charmander-2": | ||
| 406 | return | ||
| 407 | print(f"Setting up {config.machine}...") | ||
| 297 | 408 | ||
| 298 | # Load latency matrix | 409 | # First cleanup the interface |
| 299 | latency_matrix = LatencyMatrix.read_from_file(args.latency_matrix) | 410 | await machine_cleanup_interface(job_id, config.machine) |
| 300 | 411 | ||
| 301 | machines = await oar_job_list_machines(args.job_id) | 412 | # Then apply the new configuration |
| 302 | addresses_per_machine = (args.addresses + len(machines) - 1) // len(machines) | 413 | await machine_apply_configuration(job_id, config) |
| 303 | |||
| 304 | machine_indices = [] | ||
| 305 | for machine_idx, machine in enumerate(machines): | ||
| 306 | indices = [] | ||
| 307 | for addr_idx in range(addresses_per_machine): | ||
| 308 | index = machine_idx * addresses_per_machine + addr_idx | ||
| 309 | indices.append(index) | ||
| 310 | machine_indices.append(indices) | ||
| 311 | 414 | ||
| 312 | print(f"Machines: {machines}") | ||
| 313 | print(f"Addresses per machine: {addresses_per_machine}") | ||
| 314 | print(f"Machine indices: {machine_indices}") | ||
| 315 | |||
| 316 | # Prepare and configure interfaces for each machine in parallel | ||
| 317 | async def prepare_and_configure_machine(machine_idx: int, machine: str): | ||
| 318 | print(f"Preparing interface for {machine}...") | ||
| 319 | await machine_prepare_interface(args.job_id, machine) | ||
| 320 | |||
| 321 | print(f"Configuring interface for {machine}...") | ||
| 322 | await machine_configure_interface(args.job_id, machine, machine_indices[machine_idx]) | ||
| 323 | |||
| 324 | print(f"Configuring latencies for {machine}...") | ||
| 325 | await machine_configure_latencies(args.job_id, machine, machine_indices[machine_idx], latency_matrix) | ||
| 326 | |||
| 327 | # Run all machines in parallel | 415 | # Run all machines in parallel |
| 328 | tasks = [ | 416 | tasks = [setup_machine(config) for config in configurations] |
| 329 | prepare_and_configure_machine(machine_idx, machine) | ||
| 330 | for machine_idx, machine in enumerate(machines) | ||
| 331 | ] | ||
| 332 | await asyncio.gather(*tasks) | 417 | await asyncio.gather(*tasks) |
| 333 | 418 | ||
| 334 | 419 | ||
| 420 | async def clean_command(job_id: int): | ||
| 421 | machines = await oar_job_list_machines(job_id) | ||
| 422 | |||
| 423 | print(f"Cleaning up {len(machines)} machines...") | ||
| 424 | |||
| 425 | # Clean up all machines in parallel | ||
| 426 | tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] | ||
| 427 | await asyncio.gather(*tasks) | ||
| 428 | |||
| 429 | print("Cleanup completed for all machines") | ||
| 430 | |||
| 431 | |||
| 432 | async def configurations_command(job_id: int, addresses: int, latency_matrix_path: str): | ||
| 433 | # Load latency matrix | ||
| 434 | latency_matrix = LatencyMatrix.read_from_file(latency_matrix_path) | ||
| 435 | |||
| 436 | # Get machines from job | ||
| 437 | machines = await oar_job_list_machines(job_id) | ||
| 438 | |||
| 439 | # Generate configurations | ||
| 440 | configurations = machine_generate_configurations( | ||
| 441 | machines, addresses, latency_matrix | ||
| 442 | ) | ||
| 443 | |||
| 444 | # Print configurations with markers between each machine | ||
| 445 | for i, config in enumerate(configurations): | ||
| 446 | if i > 0: | ||
| 447 | print("\n" + "=" * 80 + "\n") | ||
| 448 | |||
| 449 | print(f"Machine: {config.machine}") | ||
| 450 | print("-" * 40) | ||
| 451 | |||
| 452 | print("NFT Script:") | ||
| 453 | print(config.nft_script) | ||
| 454 | |||
| 455 | print("\nTC Commands:") | ||
| 456 | for cmd in config.tc_commands: | ||
| 457 | print(f"tc {cmd}") | ||
| 458 | |||
| 459 | print("\nIP Commands:") | ||
| 460 | for cmd in config.ip_commands: | ||
| 461 | print(cmd) | ||
| 462 | |||
| 463 | |||
| 464 | async def main(): | ||
| 465 | parser = argparse.ArgumentParser(description="OAR P2P network management") | ||
| 466 | subparsers = parser.add_subparsers(dest="command", help="Available commands") | ||
| 467 | |||
| 468 | # Setup command | ||
| 469 | setup_parser = subparsers.add_parser( | ||
| 470 | "setup", help="Setup network interfaces and latencies" | ||
| 471 | ) | ||
| 472 | setup_parser.add_argument("job_id", type=int, help="OAR job ID") | ||
| 473 | setup_parser.add_argument( | ||
| 474 | "addresses", type=int, help="Number of addresses to allocate" | ||
| 475 | ) | ||
| 476 | setup_parser.add_argument( | ||
| 477 | "latency_matrix", type=str, help="Path to latency matrix file" | ||
| 478 | ) | ||
| 479 | |||
| 480 | # Clean command | ||
| 481 | clean_parser = subparsers.add_parser("clean", help="Clean up network interfaces") | ||
| 482 | clean_parser.add_argument("job_id", type=int, help="OAR job ID") | ||
| 483 | |||
| 484 | # Configurations command | ||
| 485 | config_parser = subparsers.add_parser( | ||
| 486 | "configurations", help="Generate and print machine configurations" | ||
| 487 | ) | ||
| 488 | config_parser.add_argument("job_id", type=int, help="OAR job ID") | ||
| 489 | config_parser.add_argument( | ||
| 490 | "addresses", type=int, help="Number of addresses to allocate" | ||
| 491 | ) | ||
| 492 | config_parser.add_argument( | ||
| 493 | "latency_matrix", type=str, help="Path to latency matrix file" | ||
| 494 | ) | ||
| 495 | |||
| 496 | args = parser.parse_args() | ||
| 497 | |||
| 498 | if args.command == "setup": | ||
| 499 | await setup_command(args.job_id, args.addresses, args.latency_matrix) | ||
| 500 | elif args.command == "clean": | ||
| 501 | await clean_command(args.job_id) | ||
| 502 | elif args.command == "configurations": | ||
| 503 | await configurations_command(args.job_id, args.addresses, args.latency_matrix) | ||
| 504 | else: | ||
| 505 | parser.print_help() | ||
| 506 | |||
| 507 | |||
| 508 | def address_from_index(index: int) -> str: | ||
| 509 | d = index % 254 | ||
| 510 | c = (index // 254) % 254 | ||
| 511 | assert c <= 254 | ||
| 512 | return f"10.0.{c}.{d+1}" | ||
| 513 | |||
| 514 | |||
| 335 | if __name__ == "__main__": | 515 | if __name__ == "__main__": |
| 336 | asyncio.run(main()) | 516 | asyncio.run(main()) |
| 337 | 517 | ||
