aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordiogo464 <[email protected]>2025-07-11 13:59:07 +0100
committerdiogo464 <[email protected]>2025-07-11 13:59:07 +0100
commit16320eed1185b911d53bfe5aa77e43b7af02b358 (patch)
treecddee7798d26743f76250c7cd3ad8ca905f74a34 /src
parent727fd77f5ce92a27417420919218f8b1449b246b (diff)
added addr allocation policy
Diffstat (limited to 'src')
-rw-r--r--src/address_allocation_policy.rs156
-rw-r--r--src/main.rs64
2 files changed, 204 insertions, 16 deletions
diff --git a/src/address_allocation_policy.rs b/src/address_allocation_policy.rs
new file mode 100644
index 0000000..758ab21
--- /dev/null
+++ b/src/address_allocation_policy.rs
@@ -0,0 +1,156 @@
1#[derive(Debug, Clone, PartialEq, Eq)]
2pub enum AddressAllocationPolicy {
3 PerCpu(u32),
4 PerMachine(u32),
5 Total(u32),
6}
7
8#[derive(Debug)]
9pub struct InvalidAddressAllocationPolicy(String);
10
11impl std::fmt::Display for InvalidAddressAllocationPolicy {
12 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
13 f.write_str("invalid address allocation policy: ")?;
14 f.write_str(&self.0)
15 }
16}
17
18impl std::error::Error for InvalidAddressAllocationPolicy {}
19
20impl From<std::num::ParseIntError> for InvalidAddressAllocationPolicy {
21 fn from(value: std::num::ParseIntError) -> Self {
22 Self(value.to_string())
23 }
24}
25
26impl std::str::FromStr for AddressAllocationPolicy {
27 type Err = InvalidAddressAllocationPolicy;
28
29 fn from_str(s: &str) -> Result<Self, Self::Err> {
30 if let Some(n) = s.strip_suffix("/cpu") {
31 Ok(Self::PerCpu(n.parse()?))
32 } else if let Some(n) = s.strip_suffix("/machine") {
33 Ok(Self::PerMachine(n.parse()?))
34 } else {
35 Ok(Self::Total(s.parse()?))
36 }
37 }
38}
39
40#[cfg(test)]
41mod tests {
42 use super::*;
43 use std::str::FromStr;
44
45 #[test]
46 fn test_per_cpu_parsing() {
47 assert_eq!(
48 AddressAllocationPolicy::from_str("10/cpu").unwrap(),
49 AddressAllocationPolicy::PerCpu(10)
50 );
51 assert_eq!(
52 AddressAllocationPolicy::from_str("1/cpu").unwrap(),
53 AddressAllocationPolicy::PerCpu(1)
54 );
55 assert_eq!(
56 AddressAllocationPolicy::from_str("1000/cpu").unwrap(),
57 AddressAllocationPolicy::PerCpu(1000)
58 );
59 }
60
61 #[test]
62 fn test_per_machine_parsing() {
63 assert_eq!(
64 AddressAllocationPolicy::from_str("20/machine").unwrap(),
65 AddressAllocationPolicy::PerMachine(20)
66 );
67 assert_eq!(
68 AddressAllocationPolicy::from_str("1/machine").unwrap(),
69 AddressAllocationPolicy::PerMachine(1)
70 );
71 assert_eq!(
72 AddressAllocationPolicy::from_str("500/machine").unwrap(),
73 AddressAllocationPolicy::PerMachine(500)
74 );
75 }
76
77 #[test]
78 fn test_total_parsing() {
79 assert_eq!(
80 AddressAllocationPolicy::from_str("100").unwrap(),
81 AddressAllocationPolicy::Total(100)
82 );
83 assert_eq!(
84 AddressAllocationPolicy::from_str("1").unwrap(),
85 AddressAllocationPolicy::Total(1)
86 );
87 assert_eq!(
88 AddressAllocationPolicy::from_str("9999").unwrap(),
89 AddressAllocationPolicy::Total(9999)
90 );
91 }
92
93 #[test]
94 fn test_invalid_number_formats() {
95 assert!(AddressAllocationPolicy::from_str("-5/cpu").is_err());
96 assert!(AddressAllocationPolicy::from_str("abc/cpu").is_err());
97 assert!(AddressAllocationPolicy::from_str("10.5/machine").is_err());
98 assert!(AddressAllocationPolicy::from_str("xyz").is_err());
99 assert!(AddressAllocationPolicy::from_str("").is_err());
100 }
101
102 #[test]
103 fn test_invalid_suffixes() {
104 assert!(AddressAllocationPolicy::from_str("10/node").is_err());
105 assert!(AddressAllocationPolicy::from_str("10/core").is_err());
106 assert!(AddressAllocationPolicy::from_str("10/").is_err());
107 }
108
109 #[test]
110 fn test_zero_values() {
111 assert_eq!(
112 AddressAllocationPolicy::from_str("0/cpu").unwrap(),
113 AddressAllocationPolicy::PerCpu(0)
114 );
115 assert_eq!(
116 AddressAllocationPolicy::from_str("0/machine").unwrap(),
117 AddressAllocationPolicy::PerMachine(0)
118 );
119 assert_eq!(
120 AddressAllocationPolicy::from_str("0").unwrap(),
121 AddressAllocationPolicy::Total(0)
122 );
123 }
124
125 #[test]
126 fn test_large_numbers() {
127 assert_eq!(
128 AddressAllocationPolicy::from_str("4294967295/cpu").unwrap(),
129 AddressAllocationPolicy::PerCpu(u32::MAX)
130 );
131 assert_eq!(
132 AddressAllocationPolicy::from_str("4294967295/machine").unwrap(),
133 AddressAllocationPolicy::PerMachine(u32::MAX)
134 );
135 assert_eq!(
136 AddressAllocationPolicy::from_str("4294967295").unwrap(),
137 AddressAllocationPolicy::Total(u32::MAX)
138 );
139 }
140
141 #[test]
142 fn test_overflow() {
143 assert!(AddressAllocationPolicy::from_str("4294967296/cpu").is_err());
144 assert!(AddressAllocationPolicy::from_str("9999999999999/machine").is_err());
145 assert!(AddressAllocationPolicy::from_str("18446744073709551616").is_err());
146 }
147
148 #[test]
149 fn test_whitespace_handling() {
150 assert!(AddressAllocationPolicy::from_str(" 10/cpu").is_err());
151 assert!(AddressAllocationPolicy::from_str("10/cpu ").is_err());
152 assert!(AddressAllocationPolicy::from_str("10 /cpu").is_err());
153 assert!(AddressAllocationPolicy::from_str("10/ cpu").is_err());
154 }
155}
156
diff --git a/src/main.rs b/src/main.rs
index 47b4493..f05dad9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -18,10 +18,12 @@ use tokio::{
18}; 18};
19 19
20use crate::{ 20use crate::{
21 address_allocation_policy::AddressAllocationPolicy,
21 context::{Context, ExecutionNode}, 22 context::{Context, ExecutionNode},
22 latency_matrix::LatencyMatrix, 23 latency_matrix::LatencyMatrix,
23}; 24};
24 25
26pub mod address_allocation_policy;
25pub mod context; 27pub mod context;
26pub mod latency_matrix; 28pub mod latency_matrix;
27pub mod machine; 29pub mod machine;
@@ -69,7 +71,7 @@ struct NetUpArgs {
69 #[clap(flatten)] 71 #[clap(flatten)]
70 common: Common, 72 common: Common,
71 #[clap(long)] 73 #[clap(long)]
72 addr_per_cpu: u32, 74 addresses: AddressAllocationPolicy,
73 #[clap(long)] 75 #[clap(long)]
74 latency_matrix: PathBuf, 76 latency_matrix: PathBuf,
75} 77}
@@ -92,7 +94,7 @@ struct NetPreviewArgs {
92 machine: Vec<Machine>, 94 machine: Vec<Machine>,
93 95
94 #[clap(long)] 96 #[clap(long)]
95 addr_per_cpu: u32, 97 addresses: AddressAllocationPolicy,
96 98
97 #[clap(long)] 99 #[clap(long)]
98 latency_matrix: PathBuf, 100 latency_matrix: PathBuf,
@@ -153,7 +155,7 @@ async fn cmd_net_up(args: NetUpArgs) -> Result<()> {
153 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) 155 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds)
154 .context("parsing latency matrix")?; 156 .context("parsing latency matrix")?;
155 let machines = oar::job_list_machines(&context).await?; 157 let machines = oar::job_list_machines(&context).await?;
156 let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); 158 let configs = machine_generate_configs(&matrix, &machines, &args.addresses);
157 machines_net_container_build(&context, &machines).await?; 159 machines_net_container_build(&context, &machines).await?;
158 machines_clean(&context, &machines).await?; 160 machines_clean(&context, &machines).await?;
159 machines_configure(&context, &configs).await?; 161 machines_configure(&context, &configs).await?;
@@ -197,7 +199,7 @@ async fn cmd_net_preview(args: NetPreviewArgs) -> Result<()> {
197 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds) 199 let matrix = LatencyMatrix::parse(&matrix_content, latency_matrix::TimeUnit::Milliseconds)
198 .context("parsing latency matrix")?; 200 .context("parsing latency matrix")?;
199 let machines = args.machine; 201 let machines = args.machine;
200 let configs = machine_generate_configs(&matrix, &machines, args.addr_per_cpu); 202 let configs = machine_generate_configs(&matrix, &machines, &args.addresses);
201 203
202 for config in configs { 204 for config in configs {
203 (0..20).for_each(|_| print!("-")); 205 (0..20).for_each(|_| print!("-"));
@@ -757,38 +759,68 @@ fn machine_address_for_idx(machine: Machine, idx: u32) -> Ipv4Addr {
757fn machine_generate_configs( 759fn machine_generate_configs(
758 matrix: &LatencyMatrix, 760 matrix: &LatencyMatrix,
759 machines: &[Machine], 761 machines: &[Machine],
760 addr_per_cpu: u32, 762 addr_policy: &AddressAllocationPolicy,
761) -> Vec<MachineConfig> { 763) -> Vec<MachineConfig> {
762 let mut configs = Vec::default(); 764 let mut configs = Vec::default();
763 let mut addresses = Vec::default(); 765 let mut addresses = Vec::default();
764 let mut address_to_index = HashMap::<Ipv4Addr, usize>::default(); 766 let mut address_to_index = HashMap::<Ipv4Addr, usize>::default();
767 let mut addresses_per_machine = HashMap::<Machine, Vec<Ipv4Addr>>::default();
768 machines.iter().for_each(|&m| {
769 addresses_per_machine.insert(m, Default::default());
770 });
765 771
766 // gather all addresses across all machines 772 // gather all addresses across all machines
767 for &machine in machines { 773 match addr_policy {
768 for i in 0..(addr_per_cpu * machine.cpus()) { 774 AddressAllocationPolicy::PerCpu(n) => {
769 let address = machine_address_for_idx(machine, i); 775 for &machine in machines {
770 addresses.push(address); 776 for i in 0..(n * machine.cpus()) {
771 address_to_index.insert(address, addresses.len() - 1); 777 let address = machine_address_for_idx(machine, i);
778 addresses.push(address);
779 }
780 }
772 } 781 }
782 AddressAllocationPolicy::PerMachine(n) => {
783 for &machine in machines {
784 for i in 0..*n {
785 let address = machine_address_for_idx(machine, i);
786 addresses.push(address);
787 }
788 }
789 }
790 AddressAllocationPolicy::Total(n) => {
791 let mut counter = 0;
792 while counter < *n {
793 let machine = machines[(counter as usize) % machines.len()];
794 let address = machine_address_for_idx(machine, counter / (machines.len() as u32));
795 addresses.push(address);
796 counter += 1;
797 }
798 }
799 }
800 for (idx, &address) in addresses.iter().enumerate() {
801 let machine = machine_from_addr(address).expect("we should only generate valid addresses");
802 address_to_index.insert(address, idx);
803 addresses_per_machine
804 .entry(machine)
805 .or_default()
806 .push(address);
773 } 807 }
774 808
775 for &machine in machines { 809 for &machine in machines {
776 let mut machine_addresses = Vec::default(); 810 let machine_addresses = &addresses_per_machine[&machine];
777 let mut machine_ip_commands = Vec::default(); 811 let mut machine_ip_commands = Vec::default();
778 let mut machine_tc_commands = Vec::default(); 812 let mut machine_tc_commands = Vec::default();
779 let mut machine_nft_script = String::default(); 813 let mut machine_nft_script = String::default();
780 814
781 machine_ip_commands.push(format!("route add 10.0.0.0/8 dev {}", machine.interface())); 815 machine_ip_commands.push(format!("route add 10.0.0.0/8 dev {}", machine.interface()));
782 for i in 0..(addr_per_cpu * machine.cpus()) { 816 for address in machine_addresses.iter() {
783 let address = machine_address_for_idx(machine, i);
784 machine_addresses.push(address);
785 machine_ip_commands.push(format!("addr add {address}/32 dev {}", machine.interface())); 817 machine_ip_commands.push(format!("addr add {address}/32 dev {}", machine.interface()));
786 } 818 }
787 819
788 let mut latencies_set = HashSet::<u32>::default(); 820 let mut latencies_set = HashSet::<u32>::default();
789 let mut latencies_buckets = Vec::<u32>::default(); 821 let mut latencies_buckets = Vec::<u32>::default();
790 let mut latencies_addr_pairs = HashMap::<u32, Vec<(Ipv4Addr, Ipv4Addr)>>::default(); 822 let mut latencies_addr_pairs = HashMap::<u32, Vec<(Ipv4Addr, Ipv4Addr)>>::default();
791 for &addr in &machine_addresses { 823 for &addr in machine_addresses {
792 let addr_idx = address_to_index[&addr]; 824 let addr_idx = address_to_index[&addr];
793 for other_idx in (0..addresses.len()).filter(|i| *i != addr_idx) { 825 for other_idx in (0..addresses.len()).filter(|i| *i != addr_idx) {
794 let other = addresses[other_idx]; 826 let other = addresses[other_idx];
@@ -863,7 +895,7 @@ fn machine_generate_configs(
863 895
864 configs.push(MachineConfig { 896 configs.push(MachineConfig {
865 machine, 897 machine,
866 addresses: machine_addresses, 898 addresses: machine_addresses.clone(),
867 nft_script: machine_nft_script, 899 nft_script: machine_nft_script,
868 tc_commands: machine_tc_commands, 900 tc_commands: machine_tc_commands,
869 ip_commands: machine_ip_commands, 901 ip_commands: machine_ip_commands,