aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-06-27 16:37:14 +0100
committerdiogo464 <[email protected]>2025-06-27 16:37:14 +0100
commitd48e1bc85526a090204ed59e1a985aef50e56af2 (patch)
treec36b18d26381bdb1205b74c1ab6a7d102ceb626d
parent0848f5e5be88789c3160df76e5ba4f55e9feb4b8 (diff)
Refactor P2P network configuration with modular design and NFT integration
- Add configuration generation and application system - Implement machine_apply_configuration for parallel execution - Integrate nftables for packet marking with latency-based sets - Add configurations subcommand for preview and debugging - Improve cleanup with parallel TC/NFT operations - Filter self-traffic and empty latency buckets - Update container to include nftables package - Replace old interface/latency functions with unified approach 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
-rw-r--r--Containerfile3
-rw-r--r--oar-p2p.py490
2 files changed, 337 insertions, 156 deletions
diff --git a/Containerfile b/Containerfile
index 896a128..086b7d9 100644
--- a/Containerfile
+++ b/Containerfile
@@ -22,7 +22,8 @@ FROM docker.io/alpine:3.22
22# Install runtime dependencies 22# Install runtime dependencies
23RUN apk add --no-cache \ 23RUN apk add --no-cache \
24 iproute2 \ 24 iproute2 \
25 iproute2-tc 25 iproute2-tc \
26 nftables
26 27
27# Copy the binary from builder stage 28# Copy the binary from builder stage
28COPY --from=builder /app/target/release/oar-p2p /usr/local/bin/oar-p2p 29COPY --from=builder /app/target/release/oar-p2p /usr/local/bin/oar-p2p
diff --git a/oar-p2p.py b/oar-p2p.py
index 9c10d0d..b6be8ba 100644
--- a/oar-p2p.py
+++ b/oar-p2p.py
@@ -1,6 +1,12 @@
1import json
2import math
1import asyncio 3import asyncio
2import argparse 4import argparse
3import json 5
6from collections import defaultdict
7from dataclasses import dataclass
8
9NFT_TABLE_NAME = "oar-p2p"
4 10
5MACHINE_INTERFACES = { 11MACHINE_INTERFACES = {
6 "alakazam-01": None, 12 "alakazam-01": None,
@@ -70,35 +76,172 @@ class LatencyMatrix:
70 # Assert matrix is square 76 # Assert matrix is square
71 size = len(matrix) 77 size = len(matrix)
72 for row in matrix: 78 for row in matrix:
73 assert len(row) == size, f"Matrix must be square: expected {size} columns, got {len(row)}" 79 assert (
80 len(row) == size
81 ), f"Matrix must be square: expected {size} columns, got {len(row)}"
74 self.matrix = matrix 82 self.matrix = matrix
75 83
76 @staticmethod 84 @staticmethod
77 def read_from_file(file_path: str) -> 'LatencyMatrix': 85 def read_from_file(file_path: str) -> "LatencyMatrix":
78 """Read a latency matrix from a file and return as LatencyMatrix instance.""" 86 """Read a latency matrix from a file and return as LatencyMatrix instance."""
79 matrix = [] 87 matrix = []
80 with open(file_path, 'r') as f: 88 with open(file_path, "r") as f:
81 for line in f: 89 for line in f:
82 line = line.strip() 90 line = line.strip()
83 if line and not line.startswith('#'): # Skip empty lines and comments 91 if line and not line.startswith("#"): # Skip empty lines and comments
84 row = [float(x) for x in line.split()] 92 row = [float(x) for x in line.split()]
85 matrix.append(row) 93 matrix.append(row)
86 return LatencyMatrix(matrix) 94 return LatencyMatrix(matrix)
87 95
88 def get_latency(self, src_idx: int, dst_idx: int) -> float: 96 def get_latency(self, src_idx: int, dst_idx: int) -> float:
89 """Get the latency value from source index to destination index.""" 97 """Get the latency value from source index to destination index."""
90 return self.matrix[src_idx][dst_idx] 98 return self.matrix[src_idx][dst_idx]
91 99
92 def size(self) -> int: 100 def size(self) -> int:
93 """Get the size of the square matrix.""" 101 """Get the size of the square matrix."""
94 return len(self.matrix) 102 return len(self.matrix)
95 103
96 104
97def address_from_index(index: int) -> str: 105@dataclass
98 d = index % 254 106class MachineConfiguration:
99 c = (index // 254) % 254 107 machine: str
100 assert c <= 254 108 nft_script: str
101 return f"10.0.{c}.{d+1}" 109 tc_commands: list[str]
110 ip_commands: list[str]
111
112
113def machine_get_interface(machine: str) -> str:
114 interface = MACHINE_INTERFACES.get(machine, None)
115 assert interface is not None, f"machine interface not configured: {machine}"
116 return interface
117
118
119def machine_generate_configurations(
120 machines: list[str], num_addresses: int, matrix: LatencyMatrix
121) -> list[MachineConfiguration]:
122 configurations = []
123
124 machine_addr_idxs = defaultdict(list)
125
126 for i in range(num_addresses):
127 machine = machines[i % len(machines)]
128 machine_addr_idxs[machine].append(i)
129
130 for machine in machines:
131 interface = machine_get_interface(machine)
132 addr_idxs = machine_addr_idxs[machine]
133 ip_commands = []
134 tc_commands = []
135
136 ip_commands.append(f"route add 10.0.0.0/8 dev {interface}")
137 for addr_idx in addr_idxs:
138 ip_commands.append(
139 f"addr add {address_from_index(addr_idx)}/32 dev {interface}"
140 )
141
142 latencies_set = set()
143 latencies_buckets = defaultdict(list)
144 for addr_idx in addr_idxs:
145 for i in range(num_addresses):
146 if addr_idx == i:
147 continue
148 latency = matrix.get_latency(addr_idx, i)
149 latency_rounded = math.ceil(latency) // 1
150 latencies_set.add(latency_rounded)
151 latencies_buckets[latency_rounded].append((addr_idx, i))
152
153 latencies = list(sorted(latencies_set))
154
155 tc_commands.append(f"qdisc add dev {interface} root handle 1: htb default 9999")
156 tc_commands.append(
157 f"class add dev {interface} parent 1: classid 1:9999 htb rate 10gbit"
158 )
159 for idx, latency in enumerate(latencies):
160 # tc class for latency at idx X is X + 1
161 tc_commands.append(
162 f"class add dev {interface} parent 1: classid 1:{idx+1} htb rate 10gbit"
163 )
164 tc_commands.append(
165 f"qdisc add dev {interface} parent 1:{idx+1} handle {idx+2}: netem delay {latency}ms"
166 )
167 # mark for latency at idx X is X + 1
168 tc_commands.append(
169 f"filter add dev {interface} parent 1:0 prio 1 handle {idx+1} fw flowid 1:{idx+1}"
170 )
171
172 nft_script = ""
173 nft_script += "table ip oar-p2p {" + "\n"
174 for latency_idx, latency in enumerate(latencies):
175 if len(latencies_buckets[latency]) == 0:
176 continue
177 nft_script += f" set mark_{latency_idx}_pairs {{\n"
178 nft_script += f" type ipv4_addr . ipv4_addr\n"
179 nft_script += f" flags interval\n"
180 nft_script += f" elements = {{\n"
181 for src_idx, dst_idx in latencies_buckets[latency]:
182 assert src_idx != dst_idx
183 src_addr = address_from_index(src_idx)
184 dst_addr = address_from_index(dst_idx)
185 nft_script += f" {src_addr} . {dst_addr},\n"
186 nft_script += f" }}\n"
187 nft_script += f" }}\n\n"
188
189 nft_script += " chain postrouting {\n"
190 nft_script += " type filter hook postrouting priority mangle - 1\n"
191 nft_script += " policy accept\n"
192 nft_script += "\n"
193 for latency_idx in range(len(latencies)):
194 nft_script += f" ip saddr . ip daddr @mark_{latency_idx}_pairs meta mark set {latency_idx+1}\n"
195 nft_script += " }" + "\n"
196 nft_script += "}" + "\n"
197
198 configurations.append(
199 MachineConfiguration(
200 machine=machine,
201 nft_script=nft_script,
202 tc_commands=tc_commands,
203 ip_commands=ip_commands,
204 )
205 )
206
207 return configurations
208
209
210async def machine_apply_configuration(job_id: int, config: MachineConfiguration):
211 """Apply a machine configuration by executing IP commands, TC commands, and NFT script."""
212 machine = config.machine
213
214 print(f"Applying configuration to {machine}...")
215
216 # Prepare tasks for parallel execution
217 tasks = []
218
219 # IP commands task
220 if config.ip_commands:
221 ip_batch = "\n".join(config.ip_commands)
222 print(f"Executing {len(config.ip_commands)} IP commands on {machine}")
223 tasks.append(run_script_in_docker(job_id, machine, "ip -b -", ip_batch))
224
225 # TC commands task
226 if config.tc_commands:
227 tc_batch = "\n".join(config.tc_commands)
228 print(f"Executing {len(config.tc_commands)} TC commands on {machine}")
229 tasks.append(run_script_in_docker(job_id, machine, "tc -b -", tc_batch))
230
231 # NFT script task
232 if config.nft_script:
233 print(f"Applying NFT script on {machine}")
234 tasks.append(
235 run_script_in_docker(job_id, machine, "nft -f -", config.nft_script)
236 )
237
238 # Execute all tasks in parallel
239 if tasks:
240 try:
241 await asyncio.gather(*tasks)
242 except Exception as e:
243 print(f"ERROR applying configuration to {machine}: {e}")
244 raise
102 245
103 246
104async def oar_job_list_machines(job_id: int) -> list[str]: 247async def oar_job_list_machines(job_id: int) -> list[str]:
@@ -121,14 +264,14 @@ async def oar_job_list_machines(job_id: int) -> list[str]:
121 return data[str(job_id)]["assigned_network_address"] 264 return data[str(job_id)]["assigned_network_address"]
122 265
123 266
124async def run_script_in_docker(job_id: int, machine: str, script: str, stdin_data: str = None) -> str: 267async def run_script_in_docker(
125 # Prepare the full script with package installation 268 job_id: int, machine: str, script: str, stdin_data: str | None = None
269) -> str:
270 # Prepare the script (no package installation needed with custom image)
126 if stdin_data: 271 if stdin_data:
127 # If stdin_data is provided, create a script that pipes it to the command 272 # If stdin_data is provided, create a script that pipes it to the command
128 full_script = f"""#!/bin/bash 273 full_script = f"""#!/bin/bash
129set -e 274set -e
130apk update >/dev/null 2>&1
131apk add iproute2 iproute2-tc >/dev/null 2>&1
132cat << 'STDIN_EOF' | {script} 275cat << 'STDIN_EOF' | {script}
133{stdin_data} 276{stdin_data}
134STDIN_EOF 277STDIN_EOF
@@ -136,27 +279,45 @@ STDIN_EOF
136 else: 279 else:
137 full_script = f"""#!/bin/bash 280 full_script = f"""#!/bin/bash
138set -e 281set -e
139apk update >/dev/null 2>&1
140apk add iproute2 iproute2-tc >/dev/null 2>&1
141{script} 282{script}
142""" 283"""
143 284
144 # Run the script in an Alpine Docker container via SSH 285 # Run the script in our custom networking Docker container via SSH
145 proc = await asyncio.create_subprocess_exec( 286 proc = await asyncio.create_subprocess_exec(
146 "oarsh", machine, 287 "oarsh",
147 "docker", "run", "--rm", "--privileged", "--net=host", 288 machine,
148 "-i", "alpine:latest", "sh", 289 "docker",
290 "run",
291 "--rm",
292 "--privileged",
293 "--pull=always",
294 "--net=host",
295 "-i",
296 "ghcr.io/diogo464/oar-p2p-networking:latest",
149 env={"OAR_JOB_ID": str(job_id)}, 297 env={"OAR_JOB_ID": str(job_id)},
150 stdin=asyncio.subprocess.PIPE, 298 stdin=asyncio.subprocess.PIPE,
151 stdout=asyncio.subprocess.PIPE, 299 stdout=asyncio.subprocess.PIPE,
152 stderr=asyncio.subprocess.PIPE, 300 stderr=asyncio.subprocess.PIPE,
153 ) 301 )
154 302
155 stdout, stderr = await proc.communicate(input=full_script.encode()) 303 stdout, stderr = await proc.communicate(input=full_script.encode())
156 304
157 if proc.returncode != 0: 305 if proc.returncode != 0:
158 raise Exception(f"Script execution failed on {machine}: {stderr.decode()}") 306 cmd_args = [
159 307 "oarsh",
308 machine,
309 "docker",
310 "run",
311 "--rm",
312 "--privileged",
313 "--net=host",
314 "-i",
315 "ghcr.io/diogo464/oar-p2p-networking:latest",
316 ]
317 raise Exception(
318 f"Script execution failed on {machine}\nCommand: {' '.join(cmd_args)}\nScript: {script}\nStderr: {stderr.decode()}"
319 )
320
160 return stdout.decode() 321 return stdout.decode()
161 322
162 323
@@ -170,168 +331,187 @@ def machine_interface(name: str) -> str:
170 331
171 return interface 332 return interface
172 333
173async def machine_prepare_interface(job_id: int, machine: str): 334
335async def machine_cleanup_interface(job_id: int, machine: str):
174 interface = machine_interface(machine) 336 interface = machine_interface(machine)
175 337
176 # Get interface information 338 # Get interface information
177 get_addr_script = f"ip -j addr show {interface}" 339 get_addr_script = f"ip -j addr show {interface}"
178 stdout = await run_script_in_docker(job_id, machine, get_addr_script) 340 stdout = await run_script_in_docker(job_id, machine, get_addr_script)
179 341
180 if not stdout.strip(): 342 if not stdout.strip():
181 print(f"No interface info for {machine}, skipping prepare") 343 print(f"No interface info for {machine}, skipping cleanup")
182 return 344 return
183 345
184 # Parse JSON output 346 # Parse JSON output
185 interface_data = json.loads(stdout) 347 interface_data = json.loads(stdout)
186 348
187 # Extract addresses that start with '10.' 349 # Extract addresses that start with '10.'
188 commands = [] 350 commands = []
189 for iface in interface_data: 351 for iface in interface_data:
190 if "addr_info" in iface: 352 if "addr_info" in iface:
191 for addr in iface["addr_info"]: 353 for addr in iface["addr_info"]:
192 if addr.get("family") == "inet" and addr.get("local", "").startswith("10."): 354 if addr.get("family") == "inet" and addr.get("local", "").startswith(
355 "10."
356 ):
193 ip = addr["local"] 357 ip = addr["local"]
194 commands.append(f"ip addr del {ip}/32 dev {interface}") 358 commands.append(f"ip addr del {ip}/32 dev {interface}")
195 359
196 # Remove 10.0.0.0/8 route if it exists 360 # Remove 10.0.0.0/8 route if it exists
197 commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") 361 commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true")
198 362
199 if len(commands) == 1: # Only the route command 363 if len(commands) == 1: # Only the route command
200 print(f"No 10.x addresses to remove from {machine}, only cleaning up route") 364 print(f"No 10.x addresses to remove from {machine}, only cleaning up route")
201 else: 365 else:
202 print(f"Removing {len(commands)-1} addresses and route from {machine}") 366 print(f"Removing {len(commands)-1} addresses and route from {machine}")
203 367
204 # Execute batch commands 368 # Execute batch commands and clean TC state and NFT table in parallel
205 remove_script = "\n".join(commands) 369 remove_script = "\n".join(commands)
206 await run_script_in_docker(job_id, machine, remove_script) 370 tasks = [
207 371 run_script_in_docker(job_id, machine, remove_script),
208 # Remove existing tc qdiscs separately (ignore errors) 372 run_script_in_docker(
209 await run_script_in_docker(job_id, machine, f"tc qdisc del dev {interface} root 2>/dev/null || true") 373 job_id, machine, f"tc qdisc del dev {interface} root 2>/dev/null || true"
210 374 ),
375 run_script_in_docker(
376 job_id, machine, f"tc qdisc del dev {interface} ingress 2>/dev/null || true"
377 ),
378 run_script_in_docker(
379 job_id, machine, f"nft delete table {NFT_TABLE_NAME} 2>/dev/null || true"
380 ),
381 ]
382 await asyncio.gather(*tasks)
383
211 # Small delay to ensure cleanup is complete 384 # Small delay to ensure cleanup is complete
212 await asyncio.sleep(0.1) 385 await asyncio.sleep(0.2)
213 386
214 387
215async def machine_configure_interface(job_id: int, machine: str, address_indices: list[int]): 388async def setup_command(job_id: int, addresses: int, latency_matrix_path: str):
216 interface = machine_interface(machine) 389 # Load latency matrix
217 390 latency_matrix = LatencyMatrix.read_from_file(latency_matrix_path)
218 if not address_indices:
219 return # No addresses to add
220
221 # Generate IP addresses from indices
222 ip_addresses = [address_from_index(idx) for idx in address_indices]
223
224 # Prepare ip commands without the 'ip' prefix for batch execution
225 commands = []
226 for ip in ip_addresses:
227 commands.append(f"addr add {ip}/32 dev {interface}")
228
229 # Add route for 10.0.0.0/8
230 commands.append(f"route add 10.0.0.0/8 dev {interface}")
231
232 print(f"Adding {len(ip_addresses)} addresses and route to {machine}")
233
234 # Execute batch commands using ip -b -
235 commands_data = "\n".join(commands)
236 await run_script_in_docker(job_id, machine, "ip -b -", commands_data)
237
238
239async def machine_configure_latencies(job_id: int, machine: str, address_indices: list[int], latency_matrix: LatencyMatrix):
240 interface = machine_interface(machine)
241
242 if not address_indices:
243 return # No addresses to configure
244
245 # Generate tc commands for latency configuration (without 'tc' prefix)
246 commands = []
247
248 # Create root qdisc with enough bands for our rules
249 max_bands = min(len(address_indices) * latency_matrix.size(), 16) # prio qdisc supports max 16 bands
250 commands.append(f"qdisc add dev {interface} root handle 1: prio bands {max_bands}")
251
252 # For each src->dst pair, create a simple netem rule
253 filter_counter = 1
254 band_counter = 1
255 for src_idx in address_indices:
256 src_ip = address_from_index(src_idx)
257
258 for dst_idx in range(latency_matrix.size()):
259 if src_idx != dst_idx and src_idx < latency_matrix.size() and dst_idx < latency_matrix.size(): # Skip self-to-self and out-of-bounds
260 dst_ip = address_from_index(dst_idx)
261 latency = latency_matrix.get_latency(src_idx, dst_idx)
262
263 # Use bands cyclically since prio has limited bands
264 band = (band_counter % max_bands) + 1
265
266 # Create a unique handle for this rule (must be different from root handle)
267 handle = f"{filter_counter + 100}:"
268
269 # Add netem qdisc to the appropriate band
270 commands.append(f"qdisc add dev {interface} parent 1:{band} handle {handle} netem delay {latency}ms")
271
272 # Add filter to match traffic from src_ip to dst_ip
273 commands.append(f"filter add dev {interface} protocol ip parent 1: prio {filter_counter} u32 match ip src {src_ip} match ip dst {dst_ip} flowid 1:{band}")
274
275 filter_counter += 1
276 band_counter += 1
277
278 if not commands:
279 print(f"No latency configuration needed for {machine}")
280 return
281
282 print(f"Configuring latencies with {filter_counter-1} rules on {machine}")
283
284 # Execute batch tc commands using tc -b -
285 tc_commands = "\n".join(commands)
286 await run_script_in_docker(job_id, machine, "tc -b -", tc_commands)
287 391
392 # Get machines from job
393 machines = await oar_job_list_machines(job_id)
288 394
395 print(f"Machines: {machines}")
396 print(f"Total addresses: {addresses}")
289 397
290async def main(): 398 # Generate configurations for all machines
291 parser = argparse.ArgumentParser(description="OAR P2P network setup") 399 configurations = machine_generate_configurations(
292 parser.add_argument("job_id", type=int, help="OAR job ID") 400 machines, addresses, latency_matrix
293 parser.add_argument("addresses", type=int, help="Number of addresses to allocate") 401 )
294 parser.add_argument("latency_matrix", type=str, help="Path to latency matrix file")
295 402
296 args = parser.parse_args() 403 # Apply configurations to each machine in parallel
404 async def setup_machine(config: MachineConfiguration):
405 if config.machine == "charmander-2":
406 return
407 print(f"Setting up {config.machine}...")
297 408
298 # Load latency matrix 409 # First cleanup the interface
299 latency_matrix = LatencyMatrix.read_from_file(args.latency_matrix) 410 await machine_cleanup_interface(job_id, config.machine)
300 411
301 machines = await oar_job_list_machines(args.job_id) 412 # Then apply the new configuration
302 addresses_per_machine = (args.addresses + len(machines) - 1) // len(machines) 413 await machine_apply_configuration(job_id, config)
303
304 machine_indices = []
305 for machine_idx, machine in enumerate(machines):
306 indices = []
307 for addr_idx in range(addresses_per_machine):
308 index = machine_idx * addresses_per_machine + addr_idx
309 indices.append(index)
310 machine_indices.append(indices)
311 414
312 print(f"Machines: {machines}")
313 print(f"Addresses per machine: {addresses_per_machine}")
314 print(f"Machine indices: {machine_indices}")
315
316 # Prepare and configure interfaces for each machine in parallel
317 async def prepare_and_configure_machine(machine_idx: int, machine: str):
318 print(f"Preparing interface for {machine}...")
319 await machine_prepare_interface(args.job_id, machine)
320
321 print(f"Configuring interface for {machine}...")
322 await machine_configure_interface(args.job_id, machine, machine_indices[machine_idx])
323
324 print(f"Configuring latencies for {machine}...")
325 await machine_configure_latencies(args.job_id, machine, machine_indices[machine_idx], latency_matrix)
326
327 # Run all machines in parallel 415 # Run all machines in parallel
328 tasks = [ 416 tasks = [setup_machine(config) for config in configurations]
329 prepare_and_configure_machine(machine_idx, machine)
330 for machine_idx, machine in enumerate(machines)
331 ]
332 await asyncio.gather(*tasks) 417 await asyncio.gather(*tasks)
333 418
334 419
420async def clean_command(job_id: int):
421 machines = await oar_job_list_machines(job_id)
422
423 print(f"Cleaning up {len(machines)} machines...")
424
425 # Clean up all machines in parallel
426 tasks = [machine_cleanup_interface(job_id, machine) for machine in machines]
427 await asyncio.gather(*tasks)
428
429 print("Cleanup completed for all machines")
430
431
432async def configurations_command(job_id: int, addresses: int, latency_matrix_path: str):
433 # Load latency matrix
434 latency_matrix = LatencyMatrix.read_from_file(latency_matrix_path)
435
436 # Get machines from job
437 machines = await oar_job_list_machines(job_id)
438
439 # Generate configurations
440 configurations = machine_generate_configurations(
441 machines, addresses, latency_matrix
442 )
443
444 # Print configurations with markers between each machine
445 for i, config in enumerate(configurations):
446 if i > 0:
447 print("\n" + "=" * 80 + "\n")
448
449 print(f"Machine: {config.machine}")
450 print("-" * 40)
451
452 print("NFT Script:")
453 print(config.nft_script)
454
455 print("\nTC Commands:")
456 for cmd in config.tc_commands:
457 print(f"tc {cmd}")
458
459 print("\nIP Commands:")
460 for cmd in config.ip_commands:
461 print(cmd)
462
463
464async def main():
465 parser = argparse.ArgumentParser(description="OAR P2P network management")
466 subparsers = parser.add_subparsers(dest="command", help="Available commands")
467
468 # Setup command
469 setup_parser = subparsers.add_parser(
470 "setup", help="Setup network interfaces and latencies"
471 )
472 setup_parser.add_argument("job_id", type=int, help="OAR job ID")
473 setup_parser.add_argument(
474 "addresses", type=int, help="Number of addresses to allocate"
475 )
476 setup_parser.add_argument(
477 "latency_matrix", type=str, help="Path to latency matrix file"
478 )
479
480 # Clean command
481 clean_parser = subparsers.add_parser("clean", help="Clean up network interfaces")
482 clean_parser.add_argument("job_id", type=int, help="OAR job ID")
483
484 # Configurations command
485 config_parser = subparsers.add_parser(
486 "configurations", help="Generate and print machine configurations"
487 )
488 config_parser.add_argument("job_id", type=int, help="OAR job ID")
489 config_parser.add_argument(
490 "addresses", type=int, help="Number of addresses to allocate"
491 )
492 config_parser.add_argument(
493 "latency_matrix", type=str, help="Path to latency matrix file"
494 )
495
496 args = parser.parse_args()
497
498 if args.command == "setup":
499 await setup_command(args.job_id, args.addresses, args.latency_matrix)
500 elif args.command == "clean":
501 await clean_command(args.job_id)
502 elif args.command == "configurations":
503 await configurations_command(args.job_id, args.addresses, args.latency_matrix)
504 else:
505 parser.print_help()
506
507
508def address_from_index(index: int) -> str:
509 d = index % 254
510 c = (index // 254) % 254
511 assert c <= 254
512 return f"10.0.{c}.{d+1}"
513
514
335if __name__ == "__main__": 515if __name__ == "__main__":
336 asyncio.run(main()) 516 asyncio.run(main())
337 517