diff options
| author | diogo464 <[email protected]> | 2025-07-01 20:28:49 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-01 20:28:49 +0100 |
| commit | 8018cd7a378baee5c5e1fab85bba2592c9244c72 (patch) | |
| tree | b4ba4567b7d3b14644f2513099dcb694ae7f8ef6 | |
| parent | ecea5bbe25e584b47aece7860bf1511e97eb3494 (diff) | |
Optimize netfilter configuration with map-based packet marking
Replace sequential set-based lookups with a single hash map for
better performance. Also apply traffic control to loopback interface
and add proper cleanup for lo device.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <[email protected]>
| -rw-r--r-- | oar_p2p_net.py | 90 |
1 files changed, 56 insertions, 34 deletions
diff --git a/oar_p2p_net.py b/oar_p2p_net.py index 6ae134b..12c9809 100644 --- a/oar_p2p_net.py +++ b/oar_p2p_net.py | |||
| @@ -169,32 +169,32 @@ def machine_generate_configurations( | |||
| 169 | 169 | ||
| 170 | latencies = list(sorted(latencies_set)) | 170 | latencies = list(sorted(latencies_set)) |
| 171 | 171 | ||
| 172 | tc_commands.append(f"qdisc add dev {interface} root handle 1: htb default 9999") | 172 | for iface in ["lo", interface]: |
| 173 | tc_commands.append( | 173 | tc_commands.append(f"qdisc add dev {iface} root handle 1: htb default 9999") |
| 174 | f"class add dev {interface} parent 1: classid 1:9999 htb rate 10gbit" | ||
| 175 | ) | ||
| 176 | for idx, latency in enumerate(latencies): | ||
| 177 | # tc class for latency at idx X is X + 1 | ||
| 178 | tc_commands.append( | ||
| 179 | f"class add dev {interface} parent 1: classid 1:{idx+1} htb rate 10gbit" | ||
| 180 | ) | ||
| 181 | tc_commands.append( | ||
| 182 | f"qdisc add dev {interface} parent 1:{idx+1} handle {idx+2}: netem delay {latency}ms" | ||
| 183 | ) | ||
| 184 | # mark for latency at idx X is X + 1 | ||
| 185 | tc_commands.append( | 174 | tc_commands.append( |
| 186 | f"filter add dev {interface} parent 1:0 prio 1 handle {idx+1} fw flowid 1:{idx+1}" | 175 | f"class add dev {iface} parent 1: classid 1:9999 htb rate 10gbit" |
| 187 | ) | 176 | ) |
| 177 | for idx, latency in enumerate(latencies): | ||
| 178 | # tc class for latency at idx X is X + 1 | ||
| 179 | tc_commands.append( | ||
| 180 | f"class add dev {iface} parent 1: classid 1:{idx+1} htb rate 10gbit" | ||
| 181 | ) | ||
| 182 | tc_commands.append( | ||
| 183 | f"qdisc add dev {iface} parent 1:{idx+1} handle {idx+2}: netem delay {latency}ms" | ||
| 184 | ) | ||
| 185 | # mark for latency at idx X is X + 1 | ||
| 186 | tc_commands.append( | ||
| 187 | f"filter add dev {iface} parent 1:0 prio 1 handle {idx+1} fw flowid 1:{idx+1}" | ||
| 188 | ) | ||
| 188 | 189 | ||
| 189 | nft_script = "" | 190 | nft_script = "" |
| 190 | nft_script += "table ip oar-p2p {" + "\n" | 191 | nft_script += "table ip oar-p2p {" + "\n" |
| 192 | nft_script += f" map mark_pairs {{\n" | ||
| 193 | nft_script += f" type ipv4_addr . ipv4_addr : mark\n" | ||
| 194 | nft_script += f" elements = {{\n" | ||
| 191 | for latency_idx, latency in enumerate(latencies): | 195 | for latency_idx, latency in enumerate(latencies): |
| 192 | if len(latencies_buckets[latency]) == 0: | 196 | if len(latencies_buckets[latency]) == 0: |
| 193 | continue | 197 | continue |
| 194 | nft_script += f" set mark_{latency_idx}_pairs {{\n" | ||
| 195 | nft_script += f" type ipv4_addr . ipv4_addr\n" | ||
| 196 | nft_script += f" flags interval\n" | ||
| 197 | nft_script += f" elements = {{\n" | ||
| 198 | for src_idx, dst_idx in latencies_buckets[latency]: | 198 | for src_idx, dst_idx in latencies_buckets[latency]: |
| 199 | assert src_idx != dst_idx | 199 | assert src_idx != dst_idx |
| 200 | src_addr = address_from_index( | 200 | src_addr = address_from_index( |
| @@ -205,16 +205,16 @@ def machine_generate_configurations( | |||
| 205 | addr_idx_to_machine_idx[dst_idx], | 205 | addr_idx_to_machine_idx[dst_idx], |
| 206 | dst_idx % num_addresses_per_machine, | 206 | dst_idx % num_addresses_per_machine, |
| 207 | ) | 207 | ) |
| 208 | nft_script += f" {src_addr} . {dst_addr},\n" | 208 | nft_script += f" {src_addr} . {dst_addr} : {latency_idx+1},\n" |
| 209 | nft_script += f" }}\n" | 209 | nft_script += f" }}\n" |
| 210 | nft_script += f" }}\n\n" | 210 | nft_script += f" }}\n\n" |
| 211 | 211 | ||
| 212 | nft_script += " chain postrouting {\n" | 212 | nft_script += " chain postrouting {\n" |
| 213 | nft_script += " type filter hook postrouting priority mangle - 1\n" | 213 | nft_script += " type filter hook postrouting priority mangle - 1\n" |
| 214 | nft_script += " policy accept\n" | 214 | nft_script += " policy accept\n" |
| 215 | nft_script += "\n" | 215 | nft_script += ( |
| 216 | for latency_idx in range(len(latencies)): | 216 | " meta mark set ip saddr . ip daddr map @mark_pairs counter\n" |
| 217 | nft_script += f" ip saddr . ip daddr @mark_{latency_idx}_pairs meta mark set {latency_idx+1}\n" | 217 | ) |
| 218 | nft_script += " }" + "\n" | 218 | nft_script += " }" + "\n" |
| 219 | nft_script += "}" + "\n" | 219 | nft_script += "}" + "\n" |
| 220 | 220 | ||
| @@ -383,7 +383,9 @@ async def machine_cleanup_interface(job_id: int, machine: str): | |||
| 383 | commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") | 383 | commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") |
| 384 | 384 | ||
| 385 | if len(commands) == 1: # Only the route command | 385 | if len(commands) == 1: # Only the route command |
| 386 | logging.info(f"No 10.x addresses to remove from {machine}, only cleaning up route") | 386 | logging.info( |
| 387 | f"No 10.x addresses to remove from {machine}, only cleaning up route" | ||
| 388 | ) | ||
| 387 | else: | 389 | else: |
| 388 | logging.info(f"Removing {len(commands)-1} addresses and route from {machine}") | 390 | logging.info(f"Removing {len(commands)-1} addresses and route from {machine}") |
| 389 | 391 | ||
| @@ -398,6 +400,12 @@ async def machine_cleanup_interface(job_id: int, machine: str): | |||
| 398 | job_id, machine, f"tc qdisc del dev {interface} ingress 2>/dev/null || true" | 400 | job_id, machine, f"tc qdisc del dev {interface} ingress 2>/dev/null || true" |
| 399 | ), | 401 | ), |
| 400 | run_script_in_docker( | 402 | run_script_in_docker( |
| 403 | job_id, machine, f"tc qdisc del dev lo root 2>/dev/null || true" | ||
| 404 | ), | ||
| 405 | run_script_in_docker( | ||
| 406 | job_id, machine, f"tc qdisc del dev lo ingress 2>/dev/null || true" | ||
| 407 | ), | ||
| 408 | run_script_in_docker( | ||
| 401 | job_id, machine, f"nft delete table {NFT_TABLE_NAME} 2>/dev/null || true" | 409 | job_id, machine, f"nft delete table {NFT_TABLE_NAME} 2>/dev/null || true" |
| 402 | ), | 410 | ), |
| 403 | ] | 411 | ] |
| @@ -437,7 +445,7 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str): | |||
| 437 | # Run all machines in parallel | 445 | # Run all machines in parallel |
| 438 | tasks = [setup_machine(config) for config in configurations] | 446 | tasks = [setup_machine(config) for config in configurations] |
| 439 | await asyncio.gather(*tasks) | 447 | await asyncio.gather(*tasks) |
| 440 | 448 | ||
| 441 | # Print machine IP pairs to stdout | 449 | # Print machine IP pairs to stdout |
| 442 | for config in configurations: | 450 | for config in configurations: |
| 443 | for ip in config.addresses: | 451 | for ip in config.addresses: |
| @@ -511,10 +519,10 @@ async def main(): | |||
| 511 | # Configure logging to write to stderr | 519 | # Configure logging to write to stderr |
| 512 | logging.basicConfig( | 520 | logging.basicConfig( |
| 513 | level=logging.INFO, | 521 | level=logging.INFO, |
| 514 | format='%(asctime)s - %(levelname)s - %(message)s', | 522 | format="%(asctime)s - %(levelname)s - %(message)s", |
| 515 | handlers=[logging.StreamHandler()] | 523 | handlers=[logging.StreamHandler()], |
| 516 | ) | 524 | ) |
| 517 | 525 | ||
| 518 | parser = argparse.ArgumentParser(description="OAR P2P network management") | 526 | parser = argparse.ArgumentParser(description="OAR P2P network management") |
| 519 | subparsers = parser.add_subparsers(dest="command", help="Available commands") | 527 | subparsers = parser.add_subparsers(dest="command", help="Available commands") |
| 520 | 528 | ||
| @@ -524,7 +532,10 @@ async def main(): | |||
| 524 | ) | 532 | ) |
| 525 | setup_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID") | 533 | setup_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID") |
| 526 | setup_parser.add_argument( | 534 | setup_parser.add_argument( |
| 527 | "--num-addresses", type=int, required=True, help="Number of addresses to allocate" | 535 | "--num-addresses", |
| 536 | type=int, | ||
| 537 | required=True, | ||
| 538 | help="Number of addresses to allocate", | ||
| 528 | ) | 539 | ) |
| 529 | setup_parser.add_argument( | 540 | setup_parser.add_argument( |
| 530 | "--latency-matrix", type=str, required=True, help="Path to latency matrix file" | 541 | "--latency-matrix", type=str, required=True, help="Path to latency matrix file" |
| @@ -540,7 +551,10 @@ async def main(): | |||
| 540 | ) | 551 | ) |
| 541 | config_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID") | 552 | config_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID") |
| 542 | config_parser.add_argument( | 553 | config_parser.add_argument( |
| 543 | "--num-addresses", type=int, required=True, help="Number of addresses to allocate per machine" | 554 | "--num-addresses", |
| 555 | type=int, | ||
| 556 | required=True, | ||
| 557 | help="Number of addresses to allocate per machine", | ||
| 544 | ) | 558 | ) |
| 545 | config_parser.add_argument( | 559 | config_parser.add_argument( |
| 546 | "--latency-matrix", type=str, required=True, help="Path to latency matrix file" | 560 | "--latency-matrix", type=str, required=True, help="Path to latency matrix file" |
| @@ -549,11 +563,19 @@ async def main(): | |||
| 549 | args = parser.parse_args() | 563 | args = parser.parse_args() |
| 550 | 564 | ||
| 551 | if args.command == "up": | 565 | if args.command == "up": |
| 552 | await setup_command(getattr(args, 'job_id'), getattr(args, 'num_addresses'), getattr(args, 'latency_matrix')) | 566 | await setup_command( |
| 567 | getattr(args, "job_id"), | ||
| 568 | getattr(args, "num_addresses"), | ||
| 569 | getattr(args, "latency_matrix"), | ||
| 570 | ) | ||
| 553 | elif args.command == "down": | 571 | elif args.command == "down": |
| 554 | await clean_command(getattr(args, 'job_id')) | 572 | await clean_command(getattr(args, "job_id")) |
| 555 | elif args.command == "configurations": | 573 | elif args.command == "configurations": |
| 556 | await configurations_command(getattr(args, 'job_id'), getattr(args, 'num_addresses'), getattr(args, 'latency_matrix')) | 574 | await configurations_command( |
| 575 | getattr(args, "job_id"), | ||
| 576 | getattr(args, "num_addresses"), | ||
| 577 | getattr(args, "latency_matrix"), | ||
| 578 | ) | ||
| 557 | else: | 579 | else: |
| 558 | parser.print_help() | 580 | parser.print_help() |
| 559 | 581 | ||
