aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-01 20:28:49 +0100
committerdiogo464 <[email protected]>2025-07-01 20:28:49 +0100
commit8018cd7a378baee5c5e1fab85bba2592c9244c72 (patch)
treeb4ba4567b7d3b14644f2513099dcb694ae7f8ef6
parentecea5bbe25e584b47aece7860bf1511e97eb3494 (diff)
Optimize netfilter configuration with map-based packet marking
Replace sequential set-based lookups with a single hash map for better performance. Also apply traffic control to loopback interface and add proper cleanup for lo device. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
-rw-r--r--oar_p2p_net.py90
1 files changed, 56 insertions, 34 deletions
diff --git a/oar_p2p_net.py b/oar_p2p_net.py
index 6ae134b..12c9809 100644
--- a/oar_p2p_net.py
+++ b/oar_p2p_net.py
@@ -169,32 +169,32 @@ def machine_generate_configurations(
169 169
170 latencies = list(sorted(latencies_set)) 170 latencies = list(sorted(latencies_set))
171 171
172 tc_commands.append(f"qdisc add dev {interface} root handle 1: htb default 9999") 172 for iface in ["lo", interface]:
173 tc_commands.append( 173 tc_commands.append(f"qdisc add dev {iface} root handle 1: htb default 9999")
174 f"class add dev {interface} parent 1: classid 1:9999 htb rate 10gbit"
175 )
176 for idx, latency in enumerate(latencies):
177 # tc class for latency at idx X is X + 1
178 tc_commands.append(
179 f"class add dev {interface} parent 1: classid 1:{idx+1} htb rate 10gbit"
180 )
181 tc_commands.append(
182 f"qdisc add dev {interface} parent 1:{idx+1} handle {idx+2}: netem delay {latency}ms"
183 )
184 # mark for latency at idx X is X + 1
185 tc_commands.append( 174 tc_commands.append(
186 f"filter add dev {interface} parent 1:0 prio 1 handle {idx+1} fw flowid 1:{idx+1}" 175 f"class add dev {iface} parent 1: classid 1:9999 htb rate 10gbit"
187 ) 176 )
177 for idx, latency in enumerate(latencies):
178 # tc class for latency at idx X is X + 1
179 tc_commands.append(
180 f"class add dev {iface} parent 1: classid 1:{idx+1} htb rate 10gbit"
181 )
182 tc_commands.append(
183 f"qdisc add dev {iface} parent 1:{idx+1} handle {idx+2}: netem delay {latency}ms"
184 )
185 # mark for latency at idx X is X + 1
186 tc_commands.append(
187 f"filter add dev {iface} parent 1:0 prio 1 handle {idx+1} fw flowid 1:{idx+1}"
188 )
188 189
189 nft_script = "" 190 nft_script = ""
190 nft_script += "table ip oar-p2p {" + "\n" 191 nft_script += "table ip oar-p2p {" + "\n"
192 nft_script += f" map mark_pairs {{\n"
193 nft_script += f" type ipv4_addr . ipv4_addr : mark\n"
194 nft_script += f" elements = {{\n"
191 for latency_idx, latency in enumerate(latencies): 195 for latency_idx, latency in enumerate(latencies):
192 if len(latencies_buckets[latency]) == 0: 196 if len(latencies_buckets[latency]) == 0:
193 continue 197 continue
194 nft_script += f" set mark_{latency_idx}_pairs {{\n"
195 nft_script += f" type ipv4_addr . ipv4_addr\n"
196 nft_script += f" flags interval\n"
197 nft_script += f" elements = {{\n"
198 for src_idx, dst_idx in latencies_buckets[latency]: 198 for src_idx, dst_idx in latencies_buckets[latency]:
199 assert src_idx != dst_idx 199 assert src_idx != dst_idx
200 src_addr = address_from_index( 200 src_addr = address_from_index(
@@ -205,16 +205,16 @@ def machine_generate_configurations(
205 addr_idx_to_machine_idx[dst_idx], 205 addr_idx_to_machine_idx[dst_idx],
206 dst_idx % num_addresses_per_machine, 206 dst_idx % num_addresses_per_machine,
207 ) 207 )
208 nft_script += f" {src_addr} . {dst_addr},\n" 208 nft_script += f" {src_addr} . {dst_addr} : {latency_idx+1},\n"
209 nft_script += f" }}\n" 209 nft_script += f" }}\n"
210 nft_script += f" }}\n\n" 210 nft_script += f" }}\n\n"
211 211
212 nft_script += " chain postrouting {\n" 212 nft_script += " chain postrouting {\n"
213 nft_script += " type filter hook postrouting priority mangle - 1\n" 213 nft_script += " type filter hook postrouting priority mangle - 1\n"
214 nft_script += " policy accept\n" 214 nft_script += " policy accept\n"
215 nft_script += "\n" 215 nft_script += (
216 for latency_idx in range(len(latencies)): 216 " meta mark set ip saddr . ip daddr map @mark_pairs counter\n"
217 nft_script += f" ip saddr . ip daddr @mark_{latency_idx}_pairs meta mark set {latency_idx+1}\n" 217 )
218 nft_script += " }" + "\n" 218 nft_script += " }" + "\n"
219 nft_script += "}" + "\n" 219 nft_script += "}" + "\n"
220 220
@@ -383,7 +383,9 @@ async def machine_cleanup_interface(job_id: int, machine: str):
383 commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") 383 commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true")
384 384
385 if len(commands) == 1: # Only the route command 385 if len(commands) == 1: # Only the route command
386 logging.info(f"No 10.x addresses to remove from {machine}, only cleaning up route") 386 logging.info(
387 f"No 10.x addresses to remove from {machine}, only cleaning up route"
388 )
387 else: 389 else:
388 logging.info(f"Removing {len(commands)-1} addresses and route from {machine}") 390 logging.info(f"Removing {len(commands)-1} addresses and route from {machine}")
389 391
@@ -398,6 +400,12 @@ async def machine_cleanup_interface(job_id: int, machine: str):
398 job_id, machine, f"tc qdisc del dev {interface} ingress 2>/dev/null || true" 400 job_id, machine, f"tc qdisc del dev {interface} ingress 2>/dev/null || true"
399 ), 401 ),
400 run_script_in_docker( 402 run_script_in_docker(
403 job_id, machine, f"tc qdisc del dev lo root 2>/dev/null || true"
404 ),
405 run_script_in_docker(
406 job_id, machine, f"tc qdisc del dev lo ingress 2>/dev/null || true"
407 ),
408 run_script_in_docker(
401 job_id, machine, f"nft delete table {NFT_TABLE_NAME} 2>/dev/null || true" 409 job_id, machine, f"nft delete table {NFT_TABLE_NAME} 2>/dev/null || true"
402 ), 410 ),
403 ] 411 ]
@@ -437,7 +445,7 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str):
437 # Run all machines in parallel 445 # Run all machines in parallel
438 tasks = [setup_machine(config) for config in configurations] 446 tasks = [setup_machine(config) for config in configurations]
439 await asyncio.gather(*tasks) 447 await asyncio.gather(*tasks)
440 448
441 # Print machine IP pairs to stdout 449 # Print machine IP pairs to stdout
442 for config in configurations: 450 for config in configurations:
443 for ip in config.addresses: 451 for ip in config.addresses:
@@ -511,10 +519,10 @@ async def main():
511 # Configure logging to write to stderr 519 # Configure logging to write to stderr
512 logging.basicConfig( 520 logging.basicConfig(
513 level=logging.INFO, 521 level=logging.INFO,
514 format='%(asctime)s - %(levelname)s - %(message)s', 522 format="%(asctime)s - %(levelname)s - %(message)s",
515 handlers=[logging.StreamHandler()] 523 handlers=[logging.StreamHandler()],
516 ) 524 )
517 525
518 parser = argparse.ArgumentParser(description="OAR P2P network management") 526 parser = argparse.ArgumentParser(description="OAR P2P network management")
519 subparsers = parser.add_subparsers(dest="command", help="Available commands") 527 subparsers = parser.add_subparsers(dest="command", help="Available commands")
520 528
@@ -524,7 +532,10 @@ async def main():
524 ) 532 )
525 setup_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID") 533 setup_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID")
526 setup_parser.add_argument( 534 setup_parser.add_argument(
527 "--num-addresses", type=int, required=True, help="Number of addresses to allocate" 535 "--num-addresses",
536 type=int,
537 required=True,
538 help="Number of addresses to allocate",
528 ) 539 )
529 setup_parser.add_argument( 540 setup_parser.add_argument(
530 "--latency-matrix", type=str, required=True, help="Path to latency matrix file" 541 "--latency-matrix", type=str, required=True, help="Path to latency matrix file"
@@ -540,7 +551,10 @@ async def main():
540 ) 551 )
541 config_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID") 552 config_parser.add_argument("--job-id", type=int, required=True, help="OAR job ID")
542 config_parser.add_argument( 553 config_parser.add_argument(
543 "--num-addresses", type=int, required=True, help="Number of addresses to allocate per machine" 554 "--num-addresses",
555 type=int,
556 required=True,
557 help="Number of addresses to allocate per machine",
544 ) 558 )
545 config_parser.add_argument( 559 config_parser.add_argument(
546 "--latency-matrix", type=str, required=True, help="Path to latency matrix file" 560 "--latency-matrix", type=str, required=True, help="Path to latency matrix file"
@@ -549,11 +563,19 @@ async def main():
549 args = parser.parse_args() 563 args = parser.parse_args()
550 564
551 if args.command == "up": 565 if args.command == "up":
552 await setup_command(getattr(args, 'job_id'), getattr(args, 'num_addresses'), getattr(args, 'latency_matrix')) 566 await setup_command(
567 getattr(args, "job_id"),
568 getattr(args, "num_addresses"),
569 getattr(args, "latency_matrix"),
570 )
553 elif args.command == "down": 571 elif args.command == "down":
554 await clean_command(getattr(args, 'job_id')) 572 await clean_command(getattr(args, "job_id"))
555 elif args.command == "configurations": 573 elif args.command == "configurations":
556 await configurations_command(getattr(args, 'job_id'), getattr(args, 'num_addresses'), getattr(args, 'latency_matrix')) 574 await configurations_command(
575 getattr(args, "job_id"),
576 getattr(args, "num_addresses"),
577 getattr(args, "latency_matrix"),
578 )
557 else: 579 else:
558 parser.print_help() 580 parser.print_help()
559 581