aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-06-29 15:25:20 +0100
committerdiogo464 <[email protected]>2025-06-29 15:25:20 +0100
commitd1ff88e25cfdc52621a291034fa0dcbb390033c8 (patch)
tree4a9e97194fb1f083c937c0babbea2e5e63e3c714
parent0509970c261d3c04822246c80e6f05dcd246b8a5 (diff)
Replace print statements with logging module and add IP output
Replace status/error print statements with logging module to ensure they go to stderr instead of stdout. Configure logging with timestamps and appropriate levels. Add machine-IP output at end of 'up' command for easy parsing. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
-rw-r--r--oar_p2p_net.py43
1 files changed, 28 insertions, 15 deletions
diff --git a/oar_p2p_net.py b/oar_p2p_net.py
index dcffa2b..d01cb21 100644
--- a/oar_p2p_net.py
+++ b/oar_p2p_net.py
@@ -2,6 +2,7 @@ import json
2import math 2import math
3import asyncio 3import asyncio
4import argparse 4import argparse
5import logging
5 6
6from collections import defaultdict 7from collections import defaultdict
7from dataclasses import dataclass 8from dataclasses import dataclass
@@ -234,7 +235,7 @@ async def machine_apply_configuration(job_id: int, config: MachineConfiguration)
234 """Apply a machine configuration by executing IP commands, TC commands, and NFT script.""" 235 """Apply a machine configuration by executing IP commands, TC commands, and NFT script."""
235 machine = config.machine 236 machine = config.machine
236 237
237 print(f"Applying configuration to {machine}...") 238 logging.info(f"Applying configuration to {machine}...")
238 239
239 # Prepare tasks for parallel execution 240 # Prepare tasks for parallel execution
240 tasks = [] 241 tasks = []
@@ -242,18 +243,18 @@ async def machine_apply_configuration(job_id: int, config: MachineConfiguration)
242 # IP commands task 243 # IP commands task
243 if config.ip_commands: 244 if config.ip_commands:
244 ip_batch = "\n".join(config.ip_commands) 245 ip_batch = "\n".join(config.ip_commands)
245 print(f"Executing {len(config.ip_commands)} IP commands on {machine}") 246 logging.info(f"Executing {len(config.ip_commands)} IP commands on {machine}")
246 tasks.append(run_script_in_docker(job_id, machine, "ip -b -", ip_batch)) 247 tasks.append(run_script_in_docker(job_id, machine, "ip -b -", ip_batch))
247 248
248 # TC commands task 249 # TC commands task
249 if config.tc_commands: 250 if config.tc_commands:
250 tc_batch = "\n".join(config.tc_commands) 251 tc_batch = "\n".join(config.tc_commands)
251 print(f"Executing {len(config.tc_commands)} TC commands on {machine}") 252 logging.info(f"Executing {len(config.tc_commands)} TC commands on {machine}")
252 tasks.append(run_script_in_docker(job_id, machine, "tc -b -", tc_batch)) 253 tasks.append(run_script_in_docker(job_id, machine, "tc -b -", tc_batch))
253 254
254 # NFT script task 255 # NFT script task
255 if config.nft_script: 256 if config.nft_script:
256 print(f"Applying NFT script on {machine}") 257 logging.info(f"Applying NFT script on {machine}")
257 tasks.append( 258 tasks.append(
258 run_script_in_docker(job_id, machine, "nft -f -", config.nft_script) 259 run_script_in_docker(job_id, machine, "nft -f -", config.nft_script)
259 ) 260 )
@@ -263,7 +264,7 @@ async def machine_apply_configuration(job_id: int, config: MachineConfiguration)
263 try: 264 try:
264 await asyncio.gather(*tasks) 265 await asyncio.gather(*tasks)
265 except Exception as e: 266 except Exception as e:
266 print(f"ERROR applying configuration to {machine}: {e}") 267 logging.error(f"ERROR applying configuration to {machine}: {e}")
267 raise 268 raise
268 269
269 270
@@ -361,7 +362,7 @@ async def machine_cleanup_interface(job_id: int, machine: str):
361 stdout = await run_script_in_docker(job_id, machine, get_addr_script) 362 stdout = await run_script_in_docker(job_id, machine, get_addr_script)
362 363
363 if not stdout.strip(): 364 if not stdout.strip():
364 print(f"No interface info for {machine}, skipping cleanup") 365 logging.info(f"No interface info for {machine}, skipping cleanup")
365 return 366 return
366 367
367 # Parse JSON output 368 # Parse JSON output
@@ -382,9 +383,9 @@ async def machine_cleanup_interface(job_id: int, machine: str):
382 commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") 383 commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true")
383 384
384 if len(commands) == 1: # Only the route command 385 if len(commands) == 1: # Only the route command
385 print(f"No 10.x addresses to remove from {machine}, only cleaning up route") 386 logging.info(f"No 10.x addresses to remove from {machine}, only cleaning up route")
386 else: 387 else:
387 print(f"Removing {len(commands)-1} addresses and route from {machine}") 388 logging.info(f"Removing {len(commands)-1} addresses and route from {machine}")
388 389
389 # Execute batch commands and clean TC state and NFT table in parallel 390 # Execute batch commands and clean TC state and NFT table in parallel
390 remove_script = "\n".join(commands) 391 remove_script = "\n".join(commands)
@@ -413,8 +414,8 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str):
413 # Get machines from job 414 # Get machines from job
414 machines = await oar_job_list_machines(job_id) 415 machines = await oar_job_list_machines(job_id)
415 416
416 print(f"Machines: {machines}") 417 logging.info(f"Machines: {machines}")
417 print(f"Total addresses: {addresses}") 418 logging.info(f"Total addresses: {addresses}")
418 419
419 # Generate configurations for all machines 420 # Generate configurations for all machines
420 configurations = machine_generate_configurations( 421 configurations = machine_generate_configurations(
@@ -425,7 +426,7 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str):
425 async def setup_machine(config: MachineConfiguration): 426 async def setup_machine(config: MachineConfiguration):
426 if config.machine == "charmander-2": 427 if config.machine == "charmander-2":
427 return 428 return
428 print(f"Setting up {config.machine}...") 429 logging.info(f"Setting up {config.machine}...")
429 430
430 # First cleanup the interface 431 # First cleanup the interface
431 await machine_cleanup_interface(job_id, config.machine) 432 await machine_cleanup_interface(job_id, config.machine)
@@ -436,12 +437,17 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str):
436 # Run all machines in parallel 437 # Run all machines in parallel
437 tasks = [setup_machine(config) for config in configurations] 438 tasks = [setup_machine(config) for config in configurations]
438 await asyncio.gather(*tasks) 439 await asyncio.gather(*tasks)
440
441 # Print machine IP pairs to stdout
442 for config in configurations:
443 for ip in config.addresses:
444 print(f"{config.machine} {ip}")
439 445
440 446
441async def clean_command(job_id: int): 447async def clean_command(job_id: int):
442 machines = await oar_job_list_machines(job_id) 448 machines = await oar_job_list_machines(job_id)
443 449
444 print(f"Cleaning up {len(machines)} machines...") 450 logging.info(f"Cleaning up {len(machines)} machines...")
445 451
446 # Clean up all machines in parallel, but don't fail fast 452 # Clean up all machines in parallel, but don't fail fast
447 tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] 453 tasks = [machine_cleanup_interface(job_id, machine) for machine in machines]
@@ -452,9 +458,9 @@ async def clean_command(job_id: int):
452 for machine, result in zip(machines, results): 458 for machine, result in zip(machines, results):
453 if isinstance(result, Exception): 459 if isinstance(result, Exception):
454 failures.append((machine, result)) 460 failures.append((machine, result))
455 print(f"ERROR: Cleanup failed on {machine}: {result}") 461 logging.error(f"ERROR: Cleanup failed on {machine}: {result}")
456 else: 462 else:
457 print(f"Cleanup completed successfully on {machine}") 463 logging.info(f"Cleanup completed successfully on {machine}")
458 464
459 if failures: 465 if failures:
460 failed_machines = [machine for machine, _ in failures] 466 failed_machines = [machine for machine, _ in failures]
@@ -462,7 +468,7 @@ async def clean_command(job_id: int):
462 f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}" 468 f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}"
463 ) 469 )
464 470
465 print("Cleanup completed successfully on all machines") 471 logging.info("Cleanup completed successfully on all machines")
466 472
467 473
468async def configurations_command(job_id: int, addresses: int, latency_matrix_path: str): 474async def configurations_command(job_id: int, addresses: int, latency_matrix_path: str):
@@ -502,6 +508,13 @@ async def configurations_command(job_id: int, addresses: int, latency_matrix_pat
502 508
503 509
504async def main(): 510async def main():
511 # Configure logging to write to stderr
512 logging.basicConfig(
513 level=logging.INFO,
514 format='%(asctime)s - %(levelname)s - %(message)s',
515 handlers=[logging.StreamHandler()]
516 )
517
505 parser = argparse.ArgumentParser(description="OAR P2P network management") 518 parser = argparse.ArgumentParser(description="OAR P2P network management")
506 subparsers = parser.add_subparsers(dest="command", help="Available commands") 519 subparsers = parser.add_subparsers(dest="command", help="Available commands")
507 520