aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-13 11:03:05 +0100
committerdiogo464 <[email protected]>2025-07-13 11:03:07 +0100
commit7e756e1c4845e2f663cc04863045b971a8eda116 (patch)
tree3b75b0e9996823b7cadfec8bbf6a5254d75c2f58 /src
parent7ac96fc5d1249aec37ecc7ecb2a3fa9261d002a2 (diff)
added custom signals to run subcommand
Diffstat (limited to 'src')
-rw-r--r--src/main.rs41
-rw-r--r--src/signal.rs261
2 files changed, 288 insertions, 14 deletions
diff --git a/src/main.rs b/src/main.rs
index e2d356e..55a96cd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,7 +4,7 @@ use std::{
4 net::Ipv4Addr, 4 net::Ipv4Addr,
5 path::{Path, PathBuf}, 5 path::{Path, PathBuf},
6 process::Output, 6 process::Output,
7 time::Duration, 7 time::{Duration, Instant},
8}; 8};
9 9
10use clap::{Args, Parser, Subcommand}; 10use clap::{Args, Parser, Subcommand};
@@ -22,6 +22,7 @@ use crate::{
22 address_allocation_policy::AddressAllocationPolicy, 22 address_allocation_policy::AddressAllocationPolicy,
23 context::{Context, ExecutionNode}, 23 context::{Context, ExecutionNode},
24 latency_matrix::LatencyMatrix, 24 latency_matrix::LatencyMatrix,
25 signal::{Signal, SignalSpec},
25}; 26};
26 27
27pub mod address_allocation_policy; 28pub mod address_allocation_policy;
@@ -29,6 +30,7 @@ pub mod context;
29pub mod latency_matrix; 30pub mod latency_matrix;
30pub mod machine; 31pub mod machine;
31pub mod oar; 32pub mod oar;
33pub mod signal;
32 34
33const CONTAINER_IMAGE_NAME: &str = "local/oar-p2p-networking"; 35const CONTAINER_IMAGE_NAME: &str = "local/oar-p2p-networking";
34 36
@@ -109,8 +111,8 @@ struct RunArgs {
109 #[clap(long)] 111 #[clap(long)]
110 output_dir: PathBuf, 112 output_dir: PathBuf,
111 113
112 #[clap(long, default_value = "10")] 114 #[clap(long)]
113 signal_delay: u64, 115 signal: Vec<SignalSpec>,
114 116
115 schedule: Option<PathBuf>, 117 schedule: Option<PathBuf>,
116} 118}
@@ -311,16 +313,27 @@ async fn cmd_run(args: RunArgs) -> Result<()> {
311 ) 313 )
312 .await?; 314 .await?;
313 315
314 tracing::info!("waiting {} seconds before signaling", args.signal_delay); 316 let signal_start_instant = Instant::now();
315 tokio::time::sleep(Duration::from_secs(args.signal_delay)).await; 317 let signal_specs = {
318 let mut specs = args.signal.clone();
319 specs.sort_by_key(|s| s.delay);
320 specs
321 };
316 322
317 machine::for_each( 323 for spec in signal_specs {
318 machines 324 tracing::info!("waiting to trigger signal {}", spec.signal);
319 .iter() 325 let expire = signal_start_instant + spec.delay;
320 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)), 326 tokio::time::sleep_until(expire.into()).await;
321 |machine| machine_signal_containers(&ctx, machine), 327
322 ) 328 tracing::info!("triggering signal {}", spec.signal);
323 .await?; 329 machine::for_each(
330 machines
331 .iter()
332 .filter(|&machine| containers.iter().any(|c| c.machine == *machine)),
333 |machine| machine_signal_containers(&ctx, machine, &spec.signal),
334 )
335 .await?;
336 }
324 337
325 tracing::info!("waiting for all containers to exit"); 338 tracing::info!("waiting for all containers to exit");
326 machine::for_each(&machines, |machine| { 339 machine::for_each(&machines, |machine| {
@@ -427,9 +440,9 @@ async fn machine_start_containers(ctx: &Context, machine: Machine) -> Result<()>
427} 440}
428 441
429#[tracing::instrument(ret, err, skip(ctx))] 442#[tracing::instrument(ret, err, skip(ctx))]
430async fn machine_signal_containers(ctx: &Context, machine: Machine) -> Result<()> { 443async fn machine_signal_containers(ctx: &Context, machine: Machine, signal: &Signal) -> Result<()> {
431 tracing::info!("signaling containers"); 444 tracing::info!("signaling containers");
432 machine_run_script(ctx, machine, "touch /tmp/oar-p2p-signal/start").await?; 445 machine_run_script(ctx, machine, &format!("touch /tmp/oar-p2p-signal/{signal}")).await?;
433 tracing::info!("containers signaled"); 446 tracing::info!("containers signaled");
434 Ok(()) 447 Ok(())
435} 448}
diff --git a/src/signal.rs b/src/signal.rs
new file mode 100644
index 0000000..8076349
--- /dev/null
+++ b/src/signal.rs
@@ -0,0 +1,261 @@
1use std::{str::FromStr, time::Duration};
2
3const SIGNAL_MIN_LEN: usize = 1;
4const SIGNAL_MAX_LEN: usize = 64;
5
6#[derive(Debug, Clone, PartialEq, Eq, Hash)]
7pub struct Signal(String);
8
9impl std::fmt::Display for Signal {
10 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
11 f.write_str(&self.0)
12 }
13}
14
15impl Signal {
16 pub fn as_str(&self) -> &str {
17 &self.0
18 }
19}
20
21#[derive(Debug)]
22pub struct InvalidSignal(String);
23
24impl std::fmt::Display for InvalidSignal {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(
27 f,
28 "invalid signal '{}'. a signal must be composed of alphanumeric, '-' or '_' and be between 1 and 64 characters long",
29 self.0
30 )
31 }
32}
33
34impl std::error::Error for InvalidSignal {}
35
36impl FromStr for Signal {
37 type Err = InvalidSignal;
38
39 fn from_str(s: &str) -> Result<Self, Self::Err> {
40 if s.len() < SIGNAL_MIN_LEN
41 || s.len() > SIGNAL_MAX_LEN
42 || !s.chars().all(is_valid_signal_char)
43 {
44 Err(InvalidSignal(s.to_string()))
45 } else {
46 Ok(Self(s.to_string()))
47 }
48 }
49}
50
51fn is_valid_signal_char(c: char) -> bool {
52 c.is_alphanumeric() || c == '_' || c == '-'
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct SignalSpec {
57 pub signal: Signal,
58 pub delay: Duration,
59}
60
61#[derive(Debug)]
62pub struct InvalidSignalSpec(String);
63
64impl std::fmt::Display for InvalidSignalSpec {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(
67 f,
68 "invalid signal spec '{}'. signal spec must be in format <signal>:<seconds>",
69 self.0
70 )
71 }
72}
73
74impl std::error::Error for InvalidSignalSpec {}
75
76impl FromStr for SignalSpec {
77 type Err = InvalidSignalSpec;
78
79 fn from_str(s: &str) -> Result<Self, Self::Err> {
80 let err_fn = || InvalidSignalSpec(s.to_string());
81 let (lhs, rhs) = s.split_once(':').ok_or_else(err_fn)?;
82 let signal = lhs.parse().ok().ok_or_else(err_fn)?;
83 let delay = Duration::from_secs(rhs.parse().ok().ok_or_else(err_fn)?);
84 Ok(Self { signal, delay })
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use std::time::Duration;
92
93 #[test]
94 fn test_signal_valid() {
95 let max_length_signal = "a".repeat(64);
96 let valid_signals = vec![
97 "a",
98 "test",
99 "test-signal",
100 "test_signal",
101 "123",
102 "abc123",
103 &max_length_signal, // max length
104 ];
105
106 for signal_str in valid_signals {
107 let signal: Result<Signal, _> = signal_str.parse();
108 assert!(signal.is_ok(), "Signal '{}' should be valid", signal_str);
109 assert_eq!(signal.unwrap().as_str(), signal_str);
110 }
111 }
112
113 #[test]
114 fn test_signal_invalid_empty() {
115 let signal: Result<Signal, _> = "".parse();
116 assert!(signal.is_err());
117 }
118
119 #[test]
120 fn test_signal_invalid_too_long() {
121 let long_signal = "a".repeat(65); // max + 1
122 let signal: Result<Signal, _> = long_signal.parse();
123 assert!(signal.is_err());
124 }
125
126 #[test]
127 fn test_signal_invalid_characters() {
128 let invalid_signals = vec![
129 "test signal", // space
130 "test@signal", // @
131 "test.signal", // .
132 "test/signal", // /
133 "test\\signal", // \
134 "test!signal", // !
135 "test#signal", // #
136 "test$signal", // $
137 ];
138
139 for signal_str in invalid_signals {
140 let signal: Result<Signal, _> = signal_str.parse();
141 assert!(signal.is_err(), "Signal '{}' should be invalid", signal_str);
142 }
143 }
144
145 #[test]
146 fn test_signal_clone_and_equality() {
147 let signal1: Signal = "test-signal".parse().unwrap();
148 let signal2 = signal1.clone();
149 assert_eq!(signal1, signal2);
150 }
151
152 #[test]
153 fn test_signal_debug() {
154 let signal: Signal = "test".parse().unwrap();
155 let debug_str = format!("{:?}", signal);
156 assert!(debug_str.contains("Signal"));
157 assert!(debug_str.contains("test"));
158 }
159
160 #[test]
161 fn test_signal_spec_valid() {
162 let valid_specs = vec![
163 ("test:5", "test", 5),
164 ("signal-name:10", "signal-name", 10),
165 ("a:0", "a", 0),
166 ("long_signal_name:3600", "long_signal_name", 3600),
167 ];
168
169 for (spec_str, expected_signal, expected_seconds) in valid_specs {
170 let spec: Result<SignalSpec, _> = spec_str.parse();
171 assert!(spec.is_ok(), "SignalSpec '{}' should be valid", spec_str);
172
173 let spec = spec.unwrap();
174 assert_eq!(spec.signal.as_str(), expected_signal);
175 assert_eq!(spec.delay, Duration::from_secs(expected_seconds));
176 }
177 }
178
179 #[test]
180 fn test_signal_spec_invalid_no_colon() {
181 let spec: Result<SignalSpec, _> = "test5".parse();
182 assert!(spec.is_err());
183 }
184
185 #[test]
186 fn test_signal_spec_invalid_signal() {
187 let spec: Result<SignalSpec, _> = "bad@signal:5".parse();
188 assert!(spec.is_err());
189 }
190
191 #[test]
192 fn test_signal_spec_invalid_delay() {
193 let invalid_delays = vec![
194 "test:abc", // non-numeric
195 "test:-5", // negative
196 "test:5.5", // float
197 "test:", // empty delay
198 ];
199
200 for spec_str in invalid_delays {
201 let spec: Result<SignalSpec, _> = spec_str.parse();
202 assert!(spec.is_err(), "SignalSpec '{}' should be invalid", spec_str);
203 }
204 }
205
206 #[test]
207 fn test_signal_spec_clone_and_equality() {
208 let spec1: SignalSpec = "test:5".parse().unwrap();
209 let spec2 = spec1.clone();
210 assert_eq!(spec1, spec2);
211 }
212
213 #[test]
214 fn test_signal_spec_debug() {
215 let spec: SignalSpec = "test:5".parse().unwrap();
216 let debug_str = format!("{:?}", spec);
217 assert!(debug_str.contains("SignalSpec"));
218 }
219
220 #[test]
221 fn test_signal_boundary_lengths() {
222 // Test minimum length (1 character)
223 let min_signal: Signal = "a".parse().unwrap();
224 assert_eq!(min_signal.as_str(), "a");
225
226 // Test maximum length (64 characters)
227 let max_signal_str = "a".repeat(64);
228 let max_signal: Signal = max_signal_str.parse().unwrap();
229 assert_eq!(max_signal.as_str(), max_signal_str);
230 }
231
232 #[test]
233 fn test_is_valid_signal_char() {
234 // Valid characters
235 assert!(is_valid_signal_char('a'));
236 assert!(is_valid_signal_char('Z'));
237 assert!(is_valid_signal_char('0'));
238 assert!(is_valid_signal_char('9'));
239 assert!(is_valid_signal_char('_'));
240 assert!(is_valid_signal_char('-'));
241
242 // Invalid characters
243 assert!(!is_valid_signal_char(' '));
244 assert!(!is_valid_signal_char('@'));
245 assert!(!is_valid_signal_char('.'));
246 assert!(!is_valid_signal_char('/'));
247 assert!(!is_valid_signal_char('!'));
248 }
249
250 #[test]
251 fn test_signal_spec_zero_delay() {
252 let spec: SignalSpec = "test:0".parse().unwrap();
253 assert_eq!(spec.delay, Duration::from_secs(0));
254 }
255
256 #[test]
257 fn test_signal_spec_large_delay() {
258 let spec: SignalSpec = "test:86400".parse().unwrap(); // 24 hours
259 assert_eq!(spec.delay, Duration::from_secs(86400));
260 }
261}