diff options
| author | diogo464 <[email protected]> | 2025-06-29 15:25:20 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-06-29 15:25:20 +0100 |
| commit | d1ff88e25cfdc52621a291034fa0dcbb390033c8 (patch) | |
| tree | 4a9e97194fb1f083c937c0babbea2e5e63e3c714 | |
| parent | 0509970c261d3c04822246c80e6f05dcd246b8a5 (diff) | |
Replace print statements with logging module and add IP output
Replace status/error print statements with logging module to ensure they go to stderr instead of stdout. Configure logging with timestamps and appropriate levels. Add machine-IP output at end of 'up' command for easy parsing.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <[email protected]>
| -rw-r--r-- | oar_p2p_net.py | 43 |
1 files changed, 28 insertions, 15 deletions
diff --git a/oar_p2p_net.py b/oar_p2p_net.py index dcffa2b..d01cb21 100644 --- a/oar_p2p_net.py +++ b/oar_p2p_net.py | |||
| @@ -2,6 +2,7 @@ import json | |||
| 2 | import math | 2 | import math |
| 3 | import asyncio | 3 | import asyncio |
| 4 | import argparse | 4 | import argparse |
| 5 | import logging | ||
| 5 | 6 | ||
| 6 | from collections import defaultdict | 7 | from collections import defaultdict |
| 7 | from dataclasses import dataclass | 8 | from dataclasses import dataclass |
| @@ -234,7 +235,7 @@ async def machine_apply_configuration(job_id: int, config: MachineConfiguration) | |||
| 234 | """Apply a machine configuration by executing IP commands, TC commands, and NFT script.""" | 235 | """Apply a machine configuration by executing IP commands, TC commands, and NFT script.""" |
| 235 | machine = config.machine | 236 | machine = config.machine |
| 236 | 237 | ||
| 237 | print(f"Applying configuration to {machine}...") | 238 | logging.info(f"Applying configuration to {machine}...") |
| 238 | 239 | ||
| 239 | # Prepare tasks for parallel execution | 240 | # Prepare tasks for parallel execution |
| 240 | tasks = [] | 241 | tasks = [] |
| @@ -242,18 +243,18 @@ async def machine_apply_configuration(job_id: int, config: MachineConfiguration) | |||
| 242 | # IP commands task | 243 | # IP commands task |
| 243 | if config.ip_commands: | 244 | if config.ip_commands: |
| 244 | ip_batch = "\n".join(config.ip_commands) | 245 | ip_batch = "\n".join(config.ip_commands) |
| 245 | print(f"Executing {len(config.ip_commands)} IP commands on {machine}") | 246 | logging.info(f"Executing {len(config.ip_commands)} IP commands on {machine}") |
| 246 | tasks.append(run_script_in_docker(job_id, machine, "ip -b -", ip_batch)) | 247 | tasks.append(run_script_in_docker(job_id, machine, "ip -b -", ip_batch)) |
| 247 | 248 | ||
| 248 | # TC commands task | 249 | # TC commands task |
| 249 | if config.tc_commands: | 250 | if config.tc_commands: |
| 250 | tc_batch = "\n".join(config.tc_commands) | 251 | tc_batch = "\n".join(config.tc_commands) |
| 251 | print(f"Executing {len(config.tc_commands)} TC commands on {machine}") | 252 | logging.info(f"Executing {len(config.tc_commands)} TC commands on {machine}") |
| 252 | tasks.append(run_script_in_docker(job_id, machine, "tc -b -", tc_batch)) | 253 | tasks.append(run_script_in_docker(job_id, machine, "tc -b -", tc_batch)) |
| 253 | 254 | ||
| 254 | # NFT script task | 255 | # NFT script task |
| 255 | if config.nft_script: | 256 | if config.nft_script: |
| 256 | print(f"Applying NFT script on {machine}") | 257 | logging.info(f"Applying NFT script on {machine}") |
| 257 | tasks.append( | 258 | tasks.append( |
| 258 | run_script_in_docker(job_id, machine, "nft -f -", config.nft_script) | 259 | run_script_in_docker(job_id, machine, "nft -f -", config.nft_script) |
| 259 | ) | 260 | ) |
| @@ -263,7 +264,7 @@ async def machine_apply_configuration(job_id: int, config: MachineConfiguration) | |||
| 263 | try: | 264 | try: |
| 264 | await asyncio.gather(*tasks) | 265 | await asyncio.gather(*tasks) |
| 265 | except Exception as e: | 266 | except Exception as e: |
| 266 | print(f"ERROR applying configuration to {machine}: {e}") | 267 | logging.error(f"ERROR applying configuration to {machine}: {e}") |
| 267 | raise | 268 | raise |
| 268 | 269 | ||
| 269 | 270 | ||
| @@ -361,7 +362,7 @@ async def machine_cleanup_interface(job_id: int, machine: str): | |||
| 361 | stdout = await run_script_in_docker(job_id, machine, get_addr_script) | 362 | stdout = await run_script_in_docker(job_id, machine, get_addr_script) |
| 362 | 363 | ||
| 363 | if not stdout.strip(): | 364 | if not stdout.strip(): |
| 364 | print(f"No interface info for {machine}, skipping cleanup") | 365 | logging.info(f"No interface info for {machine}, skipping cleanup") |
| 365 | return | 366 | return |
| 366 | 367 | ||
| 367 | # Parse JSON output | 368 | # Parse JSON output |
| @@ -382,9 +383,9 @@ async def machine_cleanup_interface(job_id: int, machine: str): | |||
| 382 | commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") | 383 | commands.append(f"ip route del 10.0.0.0/8 dev {interface} 2>/dev/null || true") |
| 383 | 384 | ||
| 384 | if len(commands) == 1: # Only the route command | 385 | if len(commands) == 1: # Only the route command |
| 385 | print(f"No 10.x addresses to remove from {machine}, only cleaning up route") | 386 | logging.info(f"No 10.x addresses to remove from {machine}, only cleaning up route") |
| 386 | else: | 387 | else: |
| 387 | print(f"Removing {len(commands)-1} addresses and route from {machine}") | 388 | logging.info(f"Removing {len(commands)-1} addresses and route from {machine}") |
| 388 | 389 | ||
| 389 | # Execute batch commands and clean TC state and NFT table in parallel | 390 | # Execute batch commands and clean TC state and NFT table in parallel |
| 390 | remove_script = "\n".join(commands) | 391 | remove_script = "\n".join(commands) |
| @@ -413,8 +414,8 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str): | |||
| 413 | # Get machines from job | 414 | # Get machines from job |
| 414 | machines = await oar_job_list_machines(job_id) | 415 | machines = await oar_job_list_machines(job_id) |
| 415 | 416 | ||
| 416 | print(f"Machines: {machines}") | 417 | logging.info(f"Machines: {machines}") |
| 417 | print(f"Total addresses: {addresses}") | 418 | logging.info(f"Total addresses: {addresses}") |
| 418 | 419 | ||
| 419 | # Generate configurations for all machines | 420 | # Generate configurations for all machines |
| 420 | configurations = machine_generate_configurations( | 421 | configurations = machine_generate_configurations( |
| @@ -425,7 +426,7 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str): | |||
| 425 | async def setup_machine(config: MachineConfiguration): | 426 | async def setup_machine(config: MachineConfiguration): |
| 426 | if config.machine == "charmander-2": | 427 | if config.machine == "charmander-2": |
| 427 | return | 428 | return |
| 428 | print(f"Setting up {config.machine}...") | 429 | logging.info(f"Setting up {config.machine}...") |
| 429 | 430 | ||
| 430 | # First cleanup the interface | 431 | # First cleanup the interface |
| 431 | await machine_cleanup_interface(job_id, config.machine) | 432 | await machine_cleanup_interface(job_id, config.machine) |
| @@ -436,12 +437,17 @@ async def setup_command(job_id: int, addresses: int, latency_matrix_path: str): | |||
| 436 | # Run all machines in parallel | 437 | # Run all machines in parallel |
| 437 | tasks = [setup_machine(config) for config in configurations] | 438 | tasks = [setup_machine(config) for config in configurations] |
| 438 | await asyncio.gather(*tasks) | 439 | await asyncio.gather(*tasks) |
| 440 | |||
| 441 | # Print machine IP pairs to stdout | ||
| 442 | for config in configurations: | ||
| 443 | for ip in config.addresses: | ||
| 444 | print(f"{config.machine} {ip}") | ||
| 439 | 445 | ||
| 440 | 446 | ||
| 441 | async def clean_command(job_id: int): | 447 | async def clean_command(job_id: int): |
| 442 | machines = await oar_job_list_machines(job_id) | 448 | machines = await oar_job_list_machines(job_id) |
| 443 | 449 | ||
| 444 | print(f"Cleaning up {len(machines)} machines...") | 450 | logging.info(f"Cleaning up {len(machines)} machines...") |
| 445 | 451 | ||
| 446 | # Clean up all machines in parallel, but don't fail fast | 452 | # Clean up all machines in parallel, but don't fail fast |
| 447 | tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] | 453 | tasks = [machine_cleanup_interface(job_id, machine) for machine in machines] |
| @@ -452,9 +458,9 @@ async def clean_command(job_id: int): | |||
| 452 | for machine, result in zip(machines, results): | 458 | for machine, result in zip(machines, results): |
| 453 | if isinstance(result, Exception): | 459 | if isinstance(result, Exception): |
| 454 | failures.append((machine, result)) | 460 | failures.append((machine, result)) |
| 455 | print(f"ERROR: Cleanup failed on {machine}: {result}") | 461 | logging.error(f"ERROR: Cleanup failed on {machine}: {result}") |
| 456 | else: | 462 | else: |
| 457 | print(f"Cleanup completed successfully on {machine}") | 463 | logging.info(f"Cleanup completed successfully on {machine}") |
| 458 | 464 | ||
| 459 | if failures: | 465 | if failures: |
| 460 | failed_machines = [machine for machine, _ in failures] | 466 | failed_machines = [machine for machine, _ in failures] |
| @@ -462,7 +468,7 @@ async def clean_command(job_id: int): | |||
| 462 | f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}" | 468 | f"Cleanup failed on {len(failures)} machines: {', '.join(failed_machines)}" |
| 463 | ) | 469 | ) |
| 464 | 470 | ||
| 465 | print("Cleanup completed successfully on all machines") | 471 | logging.info("Cleanup completed successfully on all machines") |
| 466 | 472 | ||
| 467 | 473 | ||
| 468 | async def configurations_command(job_id: int, addresses: int, latency_matrix_path: str): | 474 | async def configurations_command(job_id: int, addresses: int, latency_matrix_path: str): |
| @@ -502,6 +508,13 @@ async def configurations_command(job_id: int, addresses: int, latency_matrix_pat | |||
| 502 | 508 | ||
| 503 | 509 | ||
| 504 | async def main(): | 510 | async def main(): |
| 511 | # Configure logging to write to stderr | ||
| 512 | logging.basicConfig( | ||
| 513 | level=logging.INFO, | ||
| 514 | format='%(asctime)s - %(levelname)s - %(message)s', | ||
| 515 | handlers=[logging.StreamHandler()] | ||
| 516 | ) | ||
| 517 | |||
| 505 | parser = argparse.ArgumentParser(description="OAR P2P network management") | 518 | parser = argparse.ArgumentParser(description="OAR P2P network management") |
| 506 | subparsers = parser.add_subparsers(dest="command", help="Available commands") | 519 | subparsers = parser.add_subparsers(dest="command", help="Available commands") |
| 507 | 520 | ||
