diff options
| author | diogo464 <[email protected]> | 2025-07-13 11:03:05 +0100 |
|---|---|---|
| committer | diogo464 <[email protected]> | 2025-07-13 11:03:07 +0100 |
| commit | 7e756e1c4845e2f663cc04863045b971a8eda116 (patch) | |
| tree | 3b75b0e9996823b7cadfec8bbf6a5254d75c2f58 /src | |
| parent | 7ac96fc5d1249aec37ecc7ecb2a3fa9261d002a2 (diff) | |
added custom signals to run subcommand
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 41 | ||||
| -rw-r--r-- | src/signal.rs | 261 |
2 files changed, 288 insertions, 14 deletions
diff --git a/src/main.rs b/src/main.rs index e2d356e..55a96cd 100644 --- a/src/main.rs +++ b/src/main.rs | |||
| @@ -4,7 +4,7 @@ use std::{ | |||
| 4 | net::Ipv4Addr, | 4 | net::Ipv4Addr, |
| 5 | path::{Path, PathBuf}, | 5 | path::{Path, PathBuf}, |
| 6 | process::Output, | 6 | process::Output, |
| 7 | time::Duration, | 7 | time::{Duration, Instant}, |
| 8 | }; | 8 | }; |
| 9 | 9 | ||
| 10 | use clap::{Args, Parser, Subcommand}; | 10 | use clap::{Args, Parser, Subcommand}; |
| @@ -22,6 +22,7 @@ use crate::{ | |||
| 22 | address_allocation_policy::AddressAllocationPolicy, | 22 | address_allocation_policy::AddressAllocationPolicy, |
| 23 | context::{Context, ExecutionNode}, | 23 | context::{Context, ExecutionNode}, |
| 24 | latency_matrix::LatencyMatrix, | 24 | latency_matrix::LatencyMatrix, |
| 25 | signal::{Signal, SignalSpec}, | ||
| 25 | }; | 26 | }; |
| 26 | 27 | ||
| 27 | pub mod address_allocation_policy; | 28 | pub mod address_allocation_policy; |
| @@ -29,6 +30,7 @@ pub mod context; | |||
| 29 | pub mod latency_matrix; | 30 | pub mod latency_matrix; |
| 30 | pub mod machine; | 31 | pub mod machine; |
| 31 | pub mod oar; | 32 | pub mod oar; |
| 33 | pub mod signal; | ||
| 32 | 34 | ||
| 33 | const CONTAINER_IMAGE_NAME: &str = "local/oar-p2p-networking"; | 35 | const CONTAINER_IMAGE_NAME: &str = "local/oar-p2p-networking"; |
| 34 | 36 | ||
| @@ -109,8 +111,8 @@ struct RunArgs { | |||
| 109 | #[clap(long)] | 111 | #[clap(long)] |
| 110 | output_dir: PathBuf, | 112 | output_dir: PathBuf, |
| 111 | 113 | ||
| 112 | #[clap(long, default_value = "10")] | 114 | #[clap(long)] |
| 113 | signal_delay: u64, | 115 | signal: Vec<SignalSpec>, |
| 114 | 116 | ||
| 115 | schedule: Option<PathBuf>, | 117 | schedule: Option<PathBuf>, |
| 116 | } | 118 | } |
| @@ -311,16 +313,27 @@ async fn cmd_run(args: RunArgs) -> Result<()> { | |||
| 311 | ) | 313 | ) |
| 312 | .await?; | 314 | .await?; |
| 313 | 315 | ||
| 314 | tracing::info!("waiting {} seconds before signaling", args.signal_delay); | 316 | let signal_start_instant = Instant::now(); |
| 315 | tokio::time::sleep(Duration::from_secs(args.signal_delay)).await; | 317 | let signal_specs = { |
| 318 | let mut specs = args.signal.clone(); | ||
| 319 | specs.sort_by_key(|s| s.delay); | ||
| 320 | specs | ||
| 321 | }; | ||
| 316 | 322 | ||
| 317 | machine::for_each( | 323 | for spec in signal_specs { |
| 318 | machines | 324 | tracing::info!("waiting to trigger signal {}", spec.signal); |
| 319 | .iter() | 325 | let expire = signal_start_instant + spec.delay; |
| 320 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | 326 | tokio::time::sleep_until(expire.into()).await; |
| 321 | |machine| machine_signal_containers(&ctx, machine), | 327 | |
| 322 | ) | 328 | tracing::info!("triggering signal {}", spec.signal); |
| 323 | .await?; | 329 | machine::for_each( |
| 330 | machines | ||
| 331 | .iter() | ||
| 332 | .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), | ||
| 333 | |machine| machine_signal_containers(&ctx, machine, &spec.signal), | ||
| 334 | ) | ||
| 335 | .await?; | ||
| 336 | } | ||
| 324 | 337 | ||
| 325 | tracing::info!("waiting for all containers to exit"); | 338 | tracing::info!("waiting for all containers to exit"); |
| 326 | machine::for_each(&machines, |machine| { | 339 | machine::for_each(&machines, |machine| { |
| @@ -427,9 +440,9 @@ async fn machine_start_containers(ctx: &Context, machine: Machine) -> Result<()> | |||
| 427 | } | 440 | } |
| 428 | 441 | ||
| 429 | #[tracing::instrument(ret, err, skip(ctx))] | 442 | #[tracing::instrument(ret, err, skip(ctx))] |
| 430 | async fn machine_signal_containers(ctx: &Context, machine: Machine) -> Result<()> { | 443 | async fn machine_signal_containers(ctx: &Context, machine: Machine, signal: &Signal) -> Result<()> { |
| 431 | tracing::info!("signaling containers"); | 444 | tracing::info!("signaling containers"); |
| 432 | machine_run_script(ctx, machine, "touch /tmp/oar-p2p-signal/start").await?; | 445 | machine_run_script(ctx, machine, &format!("touch /tmp/oar-p2p-signal/{signal}")).await?; |
| 433 | tracing::info!("containers signaled"); | 446 | tracing::info!("containers signaled"); |
| 434 | Ok(()) | 447 | Ok(()) |
| 435 | } | 448 | } |
diff --git a/src/signal.rs b/src/signal.rs new file mode 100644 index 0000000..8076349 --- /dev/null +++ b/src/signal.rs | |||
| @@ -0,0 +1,261 @@ | |||
| 1 | use std::{str::FromStr, time::Duration}; | ||
| 2 | |||
| 3 | const SIGNAL_MIN_LEN: usize = 1; | ||
| 4 | const SIGNAL_MAX_LEN: usize = 64; | ||
| 5 | |||
| 6 | #[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||
| 7 | pub struct Signal(String); | ||
| 8 | |||
| 9 | impl std::fmt::Display for Signal { | ||
| 10 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 11 | f.write_str(&self.0) | ||
| 12 | } | ||
| 13 | } | ||
| 14 | |||
| 15 | impl Signal { | ||
| 16 | pub fn as_str(&self) -> &str { | ||
| 17 | &self.0 | ||
| 18 | } | ||
| 19 | } | ||
| 20 | |||
| 21 | #[derive(Debug)] | ||
| 22 | pub struct InvalidSignal(String); | ||
| 23 | |||
| 24 | impl std::fmt::Display for InvalidSignal { | ||
| 25 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 26 | write!( | ||
| 27 | f, | ||
| 28 | "invalid signal '{}'. a signal must be composed of alphanumeric, '-' or '_' and be between 1 and 64 characters long", | ||
| 29 | self.0 | ||
| 30 | ) | ||
| 31 | } | ||
| 32 | } | ||
| 33 | |||
| 34 | impl std::error::Error for InvalidSignal {} | ||
| 35 | |||
| 36 | impl FromStr for Signal { | ||
| 37 | type Err = InvalidSignal; | ||
| 38 | |||
| 39 | fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
| 40 | if s.len() < SIGNAL_MIN_LEN | ||
| 41 | || s.len() > SIGNAL_MAX_LEN | ||
| 42 | || !s.chars().all(is_valid_signal_char) | ||
| 43 | { | ||
| 44 | Err(InvalidSignal(s.to_string())) | ||
| 45 | } else { | ||
| 46 | Ok(Self(s.to_string())) | ||
| 47 | } | ||
| 48 | } | ||
| 49 | } | ||
| 50 | |||
| 51 | fn is_valid_signal_char(c: char) -> bool { | ||
| 52 | c.is_alphanumeric() || c == '_' || c == '-' | ||
| 53 | } | ||
| 54 | |||
| 55 | #[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||
| 56 | pub struct SignalSpec { | ||
| 57 | pub signal: Signal, | ||
| 58 | pub delay: Duration, | ||
| 59 | } | ||
| 60 | |||
| 61 | #[derive(Debug)] | ||
| 62 | pub struct InvalidSignalSpec(String); | ||
| 63 | |||
| 64 | impl std::fmt::Display for InvalidSignalSpec { | ||
| 65 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| 66 | write!( | ||
| 67 | f, | ||
| 68 | "invalid signal spec '{}'. signal spec must be in format <signal>:<seconds>", | ||
| 69 | self.0 | ||
| 70 | ) | ||
| 71 | } | ||
| 72 | } | ||
| 73 | |||
| 74 | impl std::error::Error for InvalidSignalSpec {} | ||
| 75 | |||
| 76 | impl FromStr for SignalSpec { | ||
| 77 | type Err = InvalidSignalSpec; | ||
| 78 | |||
| 79 | fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
| 80 | let err_fn = || InvalidSignalSpec(s.to_string()); | ||
| 81 | let (lhs, rhs) = s.split_once(':').ok_or_else(err_fn)?; | ||
| 82 | let signal = lhs.parse().ok().ok_or_else(err_fn)?; | ||
| 83 | let delay = Duration::from_secs(rhs.parse().ok().ok_or_else(err_fn)?); | ||
| 84 | Ok(Self { signal, delay }) | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | #[cfg(test)] | ||
| 89 | mod tests { | ||
| 90 | use super::*; | ||
| 91 | use std::time::Duration; | ||
| 92 | |||
| 93 | #[test] | ||
| 94 | fn test_signal_valid() { | ||
| 95 | let max_length_signal = "a".repeat(64); | ||
| 96 | let valid_signals = vec![ | ||
| 97 | "a", | ||
| 98 | "test", | ||
| 99 | "test-signal", | ||
| 100 | "test_signal", | ||
| 101 | "123", | ||
| 102 | "abc123", | ||
| 103 | &max_length_signal, // max length | ||
| 104 | ]; | ||
| 105 | |||
| 106 | for signal_str in valid_signals { | ||
| 107 | let signal: Result<Signal, _> = signal_str.parse(); | ||
| 108 | assert!(signal.is_ok(), "Signal '{}' should be valid", signal_str); | ||
| 109 | assert_eq!(signal.unwrap().as_str(), signal_str); | ||
| 110 | } | ||
| 111 | } | ||
| 112 | |||
| 113 | #[test] | ||
| 114 | fn test_signal_invalid_empty() { | ||
| 115 | let signal: Result<Signal, _> = "".parse(); | ||
| 116 | assert!(signal.is_err()); | ||
| 117 | } | ||
| 118 | |||
| 119 | #[test] | ||
| 120 | fn test_signal_invalid_too_long() { | ||
| 121 | let long_signal = "a".repeat(65); // max + 1 | ||
| 122 | let signal: Result<Signal, _> = long_signal.parse(); | ||
| 123 | assert!(signal.is_err()); | ||
| 124 | } | ||
| 125 | |||
| 126 | #[test] | ||
| 127 | fn test_signal_invalid_characters() { | ||
| 128 | let invalid_signals = vec![ | ||
| 129 | "test signal", // space | ||
| 130 | "test@signal", // @ | ||
| 131 | "test.signal", // . | ||
| 132 | "test/signal", // / | ||
| 133 | "test\\signal", // \ | ||
| 134 | "test!signal", // ! | ||
| 135 | "test#signal", // # | ||
| 136 | "test$signal", // $ | ||
| 137 | ]; | ||
| 138 | |||
| 139 | for signal_str in invalid_signals { | ||
| 140 | let signal: Result<Signal, _> = signal_str.parse(); | ||
| 141 | assert!(signal.is_err(), "Signal '{}' should be invalid", signal_str); | ||
| 142 | } | ||
| 143 | } | ||
| 144 | |||
| 145 | #[test] | ||
| 146 | fn test_signal_clone_and_equality() { | ||
| 147 | let signal1: Signal = "test-signal".parse().unwrap(); | ||
| 148 | let signal2 = signal1.clone(); | ||
| 149 | assert_eq!(signal1, signal2); | ||
| 150 | } | ||
| 151 | |||
| 152 | #[test] | ||
| 153 | fn test_signal_debug() { | ||
| 154 | let signal: Signal = "test".parse().unwrap(); | ||
| 155 | let debug_str = format!("{:?}", signal); | ||
| 156 | assert!(debug_str.contains("Signal")); | ||
| 157 | assert!(debug_str.contains("test")); | ||
| 158 | } | ||
| 159 | |||
| 160 | #[test] | ||
| 161 | fn test_signal_spec_valid() { | ||
| 162 | let valid_specs = vec![ | ||
| 163 | ("test:5", "test", 5), | ||
| 164 | ("signal-name:10", "signal-name", 10), | ||
| 165 | ("a:0", "a", 0), | ||
| 166 | ("long_signal_name:3600", "long_signal_name", 3600), | ||
| 167 | ]; | ||
| 168 | |||
| 169 | for (spec_str, expected_signal, expected_seconds) in valid_specs { | ||
| 170 | let spec: Result<SignalSpec, _> = spec_str.parse(); | ||
| 171 | assert!(spec.is_ok(), "SignalSpec '{}' should be valid", spec_str); | ||
| 172 | |||
| 173 | let spec = spec.unwrap(); | ||
| 174 | assert_eq!(spec.signal.as_str(), expected_signal); | ||
| 175 | assert_eq!(spec.delay, Duration::from_secs(expected_seconds)); | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | #[test] | ||
| 180 | fn test_signal_spec_invalid_no_colon() { | ||
| 181 | let spec: Result<SignalSpec, _> = "test5".parse(); | ||
| 182 | assert!(spec.is_err()); | ||
| 183 | } | ||
| 184 | |||
| 185 | #[test] | ||
| 186 | fn test_signal_spec_invalid_signal() { | ||
| 187 | let spec: Result<SignalSpec, _> = "bad@signal:5".parse(); | ||
| 188 | assert!(spec.is_err()); | ||
| 189 | } | ||
| 190 | |||
| 191 | #[test] | ||
| 192 | fn test_signal_spec_invalid_delay() { | ||
| 193 | let invalid_delays = vec![ | ||
| 194 | "test:abc", // non-numeric | ||
| 195 | "test:-5", // negative | ||
| 196 | "test:5.5", // float | ||
| 197 | "test:", // empty delay | ||
| 198 | ]; | ||
| 199 | |||
| 200 | for spec_str in invalid_delays { | ||
| 201 | let spec: Result<SignalSpec, _> = spec_str.parse(); | ||
| 202 | assert!(spec.is_err(), "SignalSpec '{}' should be invalid", spec_str); | ||
| 203 | } | ||
| 204 | } | ||
| 205 | |||
| 206 | #[test] | ||
| 207 | fn test_signal_spec_clone_and_equality() { | ||
| 208 | let spec1: SignalSpec = "test:5".parse().unwrap(); | ||
| 209 | let spec2 = spec1.clone(); | ||
| 210 | assert_eq!(spec1, spec2); | ||
| 211 | } | ||
| 212 | |||
| 213 | #[test] | ||
| 214 | fn test_signal_spec_debug() { | ||
| 215 | let spec: SignalSpec = "test:5".parse().unwrap(); | ||
| 216 | let debug_str = format!("{:?}", spec); | ||
| 217 | assert!(debug_str.contains("SignalSpec")); | ||
| 218 | } | ||
| 219 | |||
| 220 | #[test] | ||
| 221 | fn test_signal_boundary_lengths() { | ||
| 222 | // Test minimum length (1 character) | ||
| 223 | let min_signal: Signal = "a".parse().unwrap(); | ||
| 224 | assert_eq!(min_signal.as_str(), "a"); | ||
| 225 | |||
| 226 | // Test maximum length (64 characters) | ||
| 227 | let max_signal_str = "a".repeat(64); | ||
| 228 | let max_signal: Signal = max_signal_str.parse().unwrap(); | ||
| 229 | assert_eq!(max_signal.as_str(), max_signal_str); | ||
| 230 | } | ||
| 231 | |||
| 232 | #[test] | ||
| 233 | fn test_is_valid_signal_char() { | ||
| 234 | // Valid characters | ||
| 235 | assert!(is_valid_signal_char('a')); | ||
| 236 | assert!(is_valid_signal_char('Z')); | ||
| 237 | assert!(is_valid_signal_char('0')); | ||
| 238 | assert!(is_valid_signal_char('9')); | ||
| 239 | assert!(is_valid_signal_char('_')); | ||
| 240 | assert!(is_valid_signal_char('-')); | ||
| 241 | |||
| 242 | // Invalid characters | ||
| 243 | assert!(!is_valid_signal_char(' ')); | ||
| 244 | assert!(!is_valid_signal_char('@')); | ||
| 245 | assert!(!is_valid_signal_char('.')); | ||
| 246 | assert!(!is_valid_signal_char('/')); | ||
| 247 | assert!(!is_valid_signal_char('!')); | ||
| 248 | } | ||
| 249 | |||
| 250 | #[test] | ||
| 251 | fn test_signal_spec_zero_delay() { | ||
| 252 | let spec: SignalSpec = "test:0".parse().unwrap(); | ||
| 253 | assert_eq!(spec.delay, Duration::from_secs(0)); | ||
| 254 | } | ||
| 255 | |||
| 256 | #[test] | ||
| 257 | fn test_signal_spec_large_delay() { | ||
| 258 | let spec: SignalSpec = "test:86400".parse().unwrap(); // 24 hours | ||
| 259 | assert_eq!(spec.delay, Duration::from_secs(86400)); | ||
| 260 | } | ||
| 261 | } | ||
