From d87b4dbd9822c2f2b68088ece3fd40b2ef3126da Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Mon, 16 Dec 2024 10:38:14 +0100 Subject: [PATCH 01/15] feat(main) : Add os_flavor argument --- src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 265d270..2340993 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,7 +69,9 @@ struct BenchmarkArgs { #[arg(long, default_value = CONFIG_FILE)] config_file: String, - + /// OS version to deploy first on nodes thanks to kadeploy3 + #[arg(long, default_value = "debian11-nfs")] + os_flavor: String } From 4a34e5e7dd5b82e205502ebec050520fc05ac57b Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Mon, 16 Dec 2024 10:39:24 +0100 Subject: [PATCH 02/15] fix(scripts) : Remove -t night directive, jobs can be run anytime --- templates/oar_directives.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/templates/oar_directives.sh b/templates/oar_directives.sh index ecb217e..231e4fd 100644 --- a/templates/oar_directives.sh +++ b/templates/oar_directives.sh @@ -1,7 +1,6 @@ #OAR -q {{ queue_type }} #OAR -p {{ node_uid }} #OAR -l host=1,walltime={{ walltime }} -#OAR -t night {% if exotic_node %} #OAR -t exotic {% endif %} From 0ffbf98c09b25e9a9ebb56caf3afbc042262c455 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:03:34 +0100 Subject: [PATCH 03/15] feat(configs) : Add first condition for cgroup_basepath decision --- src/configs.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/configs.rs b/src/configs.rs index 58d7d55..6d492ef 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -121,11 +121,17 @@ fn build_hwpc_system(hwpc_events: &HwpcEvents) -> HwpcSystem { } } -fn build_hwpc_config(name: String, system: HwpcSystem) -> HwpcConfig { +fn build_hwpc_config(name: String, system: HwpcSystem, os_flavor: &str) -> HwpcConfig { + let cgroup_basepath; + if os_flavor == "ubuntu2404-nfs" { + cgroup_basepath = "/sys/fs/cgroup"; + } else { + cgroup_basepath = "/sys/fs/cgroup/perf_event"; + } HwpcConfig { name, verbose: true, - cgroup_basepath: "/sys/fs/cgroup/perf_event".to_owned(), + cgroup_basepath: cgroup_basepath.to_owned(), frequency: 1000, output: HwpcOutput { r#type: "csv".to_owned(), @@ -138,13 +144,14 @@ pub fn generate_hwpc_configs( hwpc_events: &HwpcEvents, core_values: &[u32], prefix: &str, + os_flavor: &str, ) -> HashMap { let hwpc_system = build_hwpc_system(hwpc_events); core_values .iter() .map(|&core_value| { let name = format!("{}_sensor_{}", prefix, core_value); - (core_value, build_hwpc_config(name, hwpc_system.clone())) + (core_value, build_hwpc_config(name, hwpc_system.clone(), os_flavor)) }) .collect() } From 9edbbe040f9b800328152f8e2d2ac6a65ba855a9 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:04:35 +0100 Subject: [PATCH 04/15] feat(inventories) : Add cluster information ASAP && duplicate GET function as POST one --- src/inventories.rs | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/inventories.rs b/src/inventories.rs index f07e6a8..c7bb916 100644 --- a/src/inventories.rs +++ b/src/inventories.rs @@ -189,7 +189,7 @@ pub async fn get_api_call( let username = env::var("G5K_USERNAME").expect("G5K_USERNAME must be set"); let password = env::var("G5K_PASSWORD").expect("G5K_PASSWORD must be set"); - debug!("Scraping {}", endpoint); + debug!("GET request to {}", endpoint); let response = client .get(endpoint) @@ -211,6 +211,39 @@ pub async fn get_api_call( } } +pub async fn post_api_call( + client: &Client, + endpoint: &str, + data: &serde_json::Value +) -> Result, InventoryError> { + dotenv::dotenv().ok(); + let username = env::var("G5K_USERNAME").expect("G5K_USERNAME must be set"); + let password = env::var("G5K_PASSWORD").expect("G5K_PASSWORD must be set"); + + debug!("POST request to {}", endpoint); + debug!("with data {:?}", data); + + let response = client + .post(endpoint) + .json(&data) + .basic_auth(username, Some(password)) + .send() + .await; + let response_json = match response { + Ok(response_body) => { + response_body + .json() + .await + }, + Err(e) => Err(e) + }; + + match response_json { + Ok(json) => Ok(json), + Err(e) => Err(InventoryError::HttpRequest(e)), + } +} + pub async fn generate_inventory(inventories_dir: &str) -> Result<(), InventoryError> { dotenv::dotenv().ok(); // Charger les variables d'environnement // @@ -245,8 +278,8 @@ pub async fn generate_inventory(inventories_dir: &str) -> Result<(), InventoryEr .await .unwrap(); for node in nodes.iter_mut() { + node.cluster = Some(cluster.uid.clone().to_string()); if node.is_to_be_deployed() { - node.cluster = Some(cluster.uid.clone().to_string()); let node_specs_file_path = format!("{}/{}.json", cluster_dir, &node.uid); if !Path::new(&node_specs_file_path).exists() { From df0c59c0545450e11e47797825a2f8950e44cd41 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:05:13 +0100 Subject: [PATCH 05/15] feat(main) : Add default os_flavor and os_flavor as arg --- src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.rs b/src/main.rs index 2340993..9b293b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,7 @@ const JOBS_FILE: &str = "jobs.yaml"; const SCRIPTS_DIRECTORY: &str = "scripts.d"; const RESULTS_DIRECTORY: &str = "results.d"; const CONFIG_FILE: &str = "config/events_by_vendor.json"; +const DEFAULT_OS_FLAVOR: &str = "debian11-nfs"; #[derive(Parser, Debug)] #[command(version, about = "Benchmark tool for PowerAPI Framework")] @@ -318,6 +319,7 @@ async fn main() -> Result<(), BenchmarkError> { &benchmark_args.scripts_directory, &benchmark_args.results_directory, &events_by_vendor, + benchmark_args.os_flavor.clone() ) .await?; From c9a7bcafa8d32daafca84889db4f0de52ed270f8 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:06:29 +0100 Subject: [PATCH 06/15] feat(scripts) : Add os_flavor information for HwpcConfigs generation --- src/scripts.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/scripts.rs b/src/scripts.rs index ea85f23..10b4837 100644 --- a/src/scripts.rs +++ b/src/scripts.rs @@ -38,6 +38,8 @@ struct BenchmarkTemplate { core_values: Vec, perf_events: PerfEvents, cpu_ops_per_core_list: Vec, + os_flavor: String + } impl BenchmarkTemplate { @@ -59,6 +61,7 @@ impl BenchmarkTemplate { core_values: Vec, perf_events: PerfEvents, cpu_ops_per_core_list: &[u32], + os_flavor: String, ) -> Self { Self { nb_iterations, @@ -78,6 +81,7 @@ impl BenchmarkTemplate { core_values, perf_events, cpu_ops_per_core_list: cpu_ops_per_core_list.into(), + os_flavor } } } @@ -105,9 +109,9 @@ pub fn generate_script_file( &job.node.processor.version, ); let hwpc_alone_configs = - configs::generate_hwpc_configs(&hwpc_events, &job.core_values, "hwpc_alone"); + configs::generate_hwpc_configs(&hwpc_events, &job.core_values, "hwpc_alone", &job.os_flavor); let hwpc_and_perf_configs = - configs::generate_hwpc_configs(&hwpc_events, &job.core_values, "hwpc_and_perf"); + configs::generate_hwpc_configs(&hwpc_events, &job.core_values, "hwpc_and_perf", &job.os_flavor); let benchmark = BenchmarkTemplate::new( NB_ITERATIONS, true, @@ -126,6 +130,7 @@ pub fn generate_script_file( job.core_values.clone(), perf_events, CPU_OPS_PER_CORE_LIST, + job.os_flavor.clone() ); let benchmark = benchmark.render().unwrap(); file.write_all(benchmark.as_bytes())?; From c2dfc4bf4c1751976f60468c74ec1f6703aa6e9e Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:07:19 +0100 Subject: [PATCH 07/15] feat(ssh) : Add function for remote script start once none-default env has been deployed --- src/ssh.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/ssh.rs b/src/ssh.rs index 641588e..23483c5 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -60,7 +60,28 @@ pub async fn make_script_executable(session: &Session, script_file: &str) -> Ssh Ok(()) } -pub async fn run_oarsub(session: &Session, script_file: &str) -> Result, SshError> { +pub async fn run_script(session: &Session, host:&str, script_file: &str) -> SshResult { + let ssh_command = session + .command("ssh") + .arg(&format!("root@{}", host)) + .arg(&format!("cd /home/nleblond && (nohup bash {} 1> out1 2> out2 &)", script_file)) + .output() + .await; + match ssh_command { + Ok(ssh_output) => { + if ssh_output.status.success() { + info!("Script successsfully started"); + } else { + error!("Job submission failed: {:?}", ssh_output.stderr); + } + }, + Err(e) => error!("Job command failed: {:?}", e) + } + Ok(()) +} + + +pub async fn run_oarsub(session: &Session, script_file: &str) -> Result, SshError> { let oarsub_output = session .command("oarsub") .arg("-S") @@ -72,7 +93,7 @@ pub async fn run_oarsub(session: &Session, script_file: &str) -> Result()?; + let job_id = captures.get(1).unwrap().as_str().parse::()?; info!("Job successfully submitted with OAR_JOB_ID: {}", job_id); Ok(Some(job_id)) } else { From 4862e4543b4d326df806ff671899d2df738dac78 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:08:52 +0100 Subject: [PATCH 08/15] feat(templates) : Add manual docker install in case of none-default env + avoid sudo as user is sudo --- templates/hwpc_alone.sh | 2 +- templates/hwpc_and_perf.sh | 6 +++--- templates/install_packages.sh | 20 +++++++++++++++++--- templates/oar_directives.sh | 12 +++++++----- templates/perf_alone.sh | 4 ++-- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/templates/hwpc_alone.sh b/templates/hwpc_alone.sh index 6b96787..7807ac1 100644 --- a/templates/hwpc_alone.sh +++ b/templates/hwpc_alone.sh @@ -16,7 +16,7 @@ for i in {1..{{ nb_iterations }}}; do while ! [[ -e "{{ results_directory }}/hwpc_alone_{{ core_value }}_{{ cpu_ops_per_core }}/hwpc_alone_{{ core_value }}_{{ cpu_ops_per_core }}_$i/rapl.csv" ]]; do sleep 0.02s ; done stress-ng --cpu {{ core_value }} --cpu-ops {{ core_value * cpu_ops_per_core }} -q sleep 1s - docker rm -f {{ hwpc_alone_configs.get(core_value).unwrap().name }}_{{ cpu_ops_per_core }}_$i + docker stop {{ hwpc_alone_configs.get(core_value).unwrap().name }}_{{ cpu_ops_per_core }}_$i sleep 15 done diff --git a/templates/hwpc_and_perf.sh b/templates/hwpc_and_perf.sh index c1d4373..9105a26 100644 --- a/templates/hwpc_and_perf.sh +++ b/templates/hwpc_and_perf.sh @@ -15,15 +15,15 @@ for i in {1..{{ nb_iterations }}}; do -r {{ hwpc_and_perf_configs.get(core_value).unwrap().output.type }} -U {{ hwpc_home_directory }}/{{ results_directory }}/hwpc_and_perf_{{ core_value }}_{{ cpu_ops_per_core }}/hwpc_and_perf_{{ core_value }}_{{ cpu_ops_per_core }}_$i \ {% if hwpc_alone_configs.get(core_value).unwrap().system.rapl.events.len() > 0 %} -s "rapl" {%~ for event in hwpc_alone_configs.get(core_value).unwrap().system.rapl.events %}-e "{{ event }}" {% endfor %}{% endif %} {% if hwpc_alone_configs.get(core_value).unwrap().system.msr.events.len() > 0 %} -s "msr" {%~ for event in hwpc_alone_configs.get(core_value).unwrap().system.msr.events %}-e "{{ event }}" {% endfor %} {% endif %} {% if hwpc_alone_configs.get(core_value).unwrap().system.core.events.len() > 0 %} -c "core" {%~ for event in hwpc_alone_configs.get(core_value).unwrap().system.core.events %}-e "{{ event }}" {% endfor %} {% endif %} - sudo-g5k bash -c "perf stat -a -o /tmp/perf_and_hwpc_{{ core_value }}_{{ cpu_ops_per_core }}_$i {% for perf_event in perf_events.iter() %}-e {{ perf_event }} {% endfor %} & echo \$!" > /tmp/perf_pid_$i + ${SUDO_CMD}bash -c "perf stat -a -o /tmp/perf_and_hwpc_{{ core_value }}_{{ cpu_ops_per_core }}_$i {% for perf_event in perf_events.iter() %}-e {{ perf_event }} {% endfor %} & echo \$!" > /tmp/perf_pid_$i PERF_PID=$(cat /tmp/perf_pid_$i) while ! [[ -e "{{ results_directory }}/hwpc_and_perf_{{ core_value }}_{{ cpu_ops_per_core }}/hwpc_and_perf_{{ core_value }}_{{ cpu_ops_per_core }}_$i/rapl.csv" ]]; do sleep 0.02s ; done ### PERF with {{ core_value }} CPU * {{ cpu_ops_per_core }} OPS stress-ng --cpu {{ core_value }} --cpu-ops {{ core_value * cpu_ops_per_core }} -q sleep 1s - sudo-g5k kill -2 $PERF_PID - docker rm -f {{ hwpc_and_perf_configs.get(core_value).unwrap().name }}_{{ cpu_ops_per_core }}_$i + ${SUDO_CMD}kill -2 $PERF_PID + docker stop {{ hwpc_and_perf_configs.get(core_value).unwrap().name }}_{{ cpu_ops_per_core }}_$i cat /tmp/perf_and_hwpc_{{ core_value }}_{{ cpu_ops_per_core }}_$i >> {{ results_directory }}/perf_and_hwpc_{{ core_value }}_{{ cpu_ops_per_core }} sleep 15 done diff --git a/templates/install_packages.sh b/templates/install_packages.sh index ea4d85c..dfce85e 100644 --- a/templates/install_packages.sh +++ b/templates/install_packages.sh @@ -1,7 +1,21 @@ -sudo-g5k apt-get install -y stress-ng +{% if os_flavor != super::DEFAULT_OS_FLAVOR %} + curl -sSL https://get.docker.com/ | sh + sudo curl -sSL "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + sudo mkdir -p /etc/docker + echo "{ \"registry-mirrors\": [\"http://docker-cache.grid5000.fr\"] }" | sudo tee /etc/docker/daemon.json + sudo systemctl restart docker + sudo chmod o+rw /var/run/docker.sock + SUDO_CMD="" +{% else %} + g5k-setup-docker + SUDO_CMD="sudo-g5k " +{% endif %} -sudo rm -f /etc/apt/sources.list.d/repo.radeon.com-amdgpu.list -g5k-setup-docker + +${SUDO_CMD}apt-get install -y stress-ng + +${SUDO_CMD}rm -f /etc/apt/sources.list.d/repo.radeon.com-amdgpu.list docker login -u {{ docker_hub_username }} -p {{ docker_hub_token }} docker run --rm -d --name mongo -p 27017:27017 mongo:latest sleep 30 diff --git a/templates/oar_directives.sh b/templates/oar_directives.sh index 231e4fd..b474101 100644 --- a/templates/oar_directives.sh +++ b/templates/oar_directives.sh @@ -1,6 +1,8 @@ -#OAR -q {{ queue_type }} -#OAR -p {{ node_uid }} -#OAR -l host=1,walltime={{ walltime }} -{% if exotic_node %} -#OAR -t exotic +{% if os_flavor == super::DEFAULT_OS_FLAVOR %} + #OAR -q {{ queue_type }} + #OAR -p {{ node_uid }} + #OAR -l host=1,walltime={{ walltime }} + {% if exotic_node %} + #OAR -t exotic + {% endif %} {% endif %} diff --git a/templates/perf_alone.sh b/templates/perf_alone.sh index a39f372..457e65b 100644 --- a/templates/perf_alone.sh +++ b/templates/perf_alone.sh @@ -3,11 +3,11 @@ touch {{ results_directory }}/perf_alone_{{ core_value }}_{{ cpu_ops_per_core }} for i in {1..{{ nb_iterations }}}; do ### PERF with {{ core_value }} CPU * {{ cpu_ops_per_core }} OPS - sudo-g5k bash -c "perf stat -a -o /tmp/perf_alone_{{ core_value }}_{{ cpu_ops_per_core }}_$i {% for perf_event in perf_events.iter() %}-e {{ perf_event }} {% endfor %} & echo \$!" > /tmp/perf_pid_$i + ${SUDO_CMD}bash -c "perf stat -a -o /tmp/perf_alone_{{ core_value }}_{{ cpu_ops_per_core }}_$i {% for perf_event in perf_events.iter() %}-e {{ perf_event }} {% endfor %} & echo \$!" > /tmp/perf_pid_$i PERF_PID=$(cat /tmp/perf_pid_$i) stress-ng --cpu {{ core_value }} --cpu-ops {{ core_value * cpu_ops_per_core }} -q sleep 1s - sudo-g5k kill -2 $PERF_PID + ${SUDO_CMD}kill -2 $PERF_PID cat /tmp/perf_alone_{{ core_value }}_{{ cpu_ops_per_core }}_$i >> {{ results_directory }}/perf_alone_{{ core_value }}_{{ cpu_ops_per_core }} done {% endfor %} From 3bde26c087ce38a003385b38712e6a22ab4d0a2e Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:09:52 +0100 Subject: [PATCH 09/15] feat(jobs) : Add scheduling operations in case of none-default environment --- src/jobs.rs | 174 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 143 insertions(+), 31 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index dcdc330..20d0285 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -17,7 +17,7 @@ use subprocess::{Popen, PopenConfig, Redirection}; use thiserror::Error; use std::process::Command; -const MAX_CONCURRENT_JOBS: usize = 20; +const MAX_CONCURRENT_JOBS: usize = 30; #[derive(Error, Debug)] pub enum JobError { @@ -57,6 +57,9 @@ pub enum OARState { Finishing, Failed, UnknownState, + Processing, + Deployed, + WaitingToBeDeployed } impl Display for OARState { @@ -76,6 +79,9 @@ impl OARState { OARState::Finishing => "Finishing", OARState::Failed => "Failed", OARState::UnknownState => "UnknownState", + OARState::Processing => "Processing", + OARState::Deployed => "Deployed", + OARState::WaitingToBeDeployed => "WaitingToBeDeployed" } } @@ -96,6 +102,9 @@ impl TryFrom<&str> for OARState { "hold" => Ok(OARState::Hold), "finishing" => Ok(OARState::Finishing), "not_submitted" => Ok(OARState::NotSubmitted), + "processing" => Ok(OARState::Processing), + "deployed" => Ok(OARState::Deployed), + "waiting_to_be_deployed" => Ok(OARState::WaitingToBeDeployed), unknown => Err(JobError::UnknownState(unknown.to_string())), } } @@ -105,12 +114,14 @@ impl TryFrom<&str> for OARState { pub struct Job { pub id: usize, pub node: Node, - pub oar_job_id: Option, + pub oar_job_id: Option, pub state: OARState, pub core_values: Vec, pub script_file: String, pub results_dir: String, pub site: String, + pub deployment_id: Option, + pub os_flavor: String } impl Job { @@ -134,7 +145,7 @@ impl Job { ) } - fn new(id: usize, node: Node, core_values: Vec, site: String, root_scripts_dir: &str, root_results_dir: &str) -> Self { + fn new(id: usize, node: Node, core_values: Vec, site: String, root_scripts_dir: &str, root_results_dir: &str, os_flavor: String) -> Self { let script_file = Job::build_script_file_path(&node, &site, root_scripts_dir); let results_dir = Job::build_results_dir_path(&node, &site, root_results_dir); @@ -147,6 +158,8 @@ impl Job { script_file, results_dir, site, + deployment_id: None, + os_flavor } } @@ -160,16 +173,33 @@ impl Job { ssh::sftp_upload(&session, &self.script_file, &self.script_file).await?; ssh::make_script_executable(&session, &self.script_file).await?; - let oar_job_id = ssh::run_oarsub(&session, &self.script_file).await; - - if let Ok(Some(job_id)) = oar_job_id { - self.oar_job_id = Some(job_id); - self.state = OARState::Waiting; + if self.os_flavor == super::DEFAULT_OS_FLAVOR { + let oar_job_id = ssh::run_oarsub(&session, &self.script_file).await; + if let Ok(Some(job_id)) = oar_job_id { + self.oar_job_id = Some(job_id); + self.state = OARState::Waiting; + } else { + self.state = OARState::Failed; + } } else { - self.state = OARState::Failed; + let client = reqwest::Client::builder().build()?; + let endpoint = format!("{}/sites/{}/jobs", super::BASE_URL, self.site); + let data = serde_json::json!({"properties": format!("host={}",self.node.uid), "resources": "walltime=5", "types": ["deploy"], "command": "sleep 14500"}); + + if let Ok(response) = inventories::post_api_call(&client, &endpoint, &data).await { + debug!("Job has been posted on deploy mode"); + self.state = OARState::WaitingToBeDeployed; + let job_id = response.get("uid").unwrap(); + self.oar_job_id = job_id.as_u64(); + } else { + debug!("Job has failed to be posted on deploy mode"); + self.state = OARState::Failed; + } } + session.close().await?; + Ok(()) } @@ -178,19 +208,43 @@ impl Job { client: &reqwest::Client, base_url: &str, ) -> JobResult { - let response: HashMap = crate::inventories::get_api_call( - client, - &format!( - "{}/sites/{}/jobs/{}", - base_url, - &self.site, - &self.oar_job_id.unwrap() - ), - ) - .await - .unwrap(); - let state: String = serde_json::from_value(response.get("state").unwrap().clone())?; - let state = OARState::try_from(state.as_str())?; + + let mut state: OARState; + if self.state == OARState::Processing { + let endpoint = format!("{}/sites/{}/deployments/{}", base_url, self.site, self.deployment_id.clone().unwrap()); + if let Ok(response) = inventories::get_api_call(&client, &endpoint).await { + let str_state = response.get("status").unwrap().as_str().unwrap(); + if str_state == "terminated" { + state = OARState::Deployed; + } else if str_state == "processing" { + state = OARState::Processing; + } else { + state = OARState::Failed; + } + } else { + state = OARState::Failed; + } + + } else { + let response: HashMap = crate::inventories::get_api_call( + client, + &format!( + "{}/sites/{}/jobs/{}", + base_url, + &self.site, + &self.oar_job_id.unwrap() + ), + ) + .await + .unwrap(); + let str_state = response.get("state").unwrap().as_str(); + if str_state == Some("waiting") && self.state == OARState::WaitingToBeDeployed { + state = OARState::WaitingToBeDeployed; + } else { + state = OARState::try_from(str_state.unwrap()).unwrap(); + } + } + if state != self.state { self.state_transition(state).await?; } @@ -206,6 +260,8 @@ impl Job { match new_state { OARState::Terminated | OARState::Failed => self.job_terminated().await, + OARState::Running => self.job_running().await, + OARState::Deployed => self.job_os_deployed().await, _ => { error!("Unhandled state transition to {}", new_state); Ok(()) @@ -213,8 +269,61 @@ impl Job { } } + pub async fn job_running(&mut self) -> JobResult { + if self.os_flavor == super::DEFAULT_OS_FLAVOR { + return Ok(()) + } + // CURL KADEPLOY + let client = reqwest::Client::builder().build()?; + let endpoint = format!("{}/sites/{}/deployments", super::BASE_URL, self.site); + let pub_key_content = fs::read_to_string(".ssh_g5k.pub") + .map_err(|e| format!("Failed to read the SSH public key: {}", e)).unwrap(); + let pub_key_content = pub_key_content.trim(); + + let data = serde_json::json!({ + "nodes": [&format!("{}.{}.grid5000.fr",self.node.uid, self.site)], + "environment": self.os_flavor, + "key": pub_key_content + }); + + match inventories::post_api_call(&client, &endpoint, &data).await { + Ok(response) => { + debug!("Job os_flavor is being deployed"); + self.state = OARState::Processing; + let deployment_id = response.get("uid").unwrap(); + self.deployment_id = Some(deployment_id.as_str().unwrap().to_owned()); + } + Err(e) => { + debug!("Job os_flavor has failed to be deployed : {:?}", e); + self.state = OARState::Failed; + } + } + + Ok(()) + } + + pub async fn job_os_deployed(&mut self) -> JobResult { + + let session = ssh::ssh_connect(&self.site).await?; + let host = format!("{}.{}.grid5000.fr", self.node.uid, self.site); + if let Ok(script_result) = ssh::run_script(&session, &host, &self.script_file).await { + self.state = OARState::Running; + } else { + self.state = OARState::Failed; + } + Ok(()) + } + pub async fn job_terminated(&mut self) -> JobResult { + let script_dir = Path::new(&self.script_file) + .components() // Break the path into components + .filter_map(|comp| match comp { + std::path::Component::Normal(name) => name.to_str(), + _ => None, + }) // Keep only "normal" components (skip root, prefix, etc.) + .next(); if let Err(rsync_result) = rsync_results( + script_dir.unwrap(), &self.site, self.node.cluster.as_deref().unwrap(), &self.node.uid, @@ -261,6 +370,7 @@ impl Jobs { scripts_dir: &str, results_dir: &str, events_by_vendor: &EventsByVendor, + os_flavor: String, ) -> Result<(), JobError> { let sites = inventories::get_inventory_sites(inventories_dir)?; let mut clusters_nodes: Vec> = Vec::new(); @@ -320,27 +430,28 @@ impl Jobs { let core_values = configs::generate_core_values(5, node.architecture.nb_cores); let mut job = - Job::new(self.jobs.len(), node.clone(), core_values, site.to_string(), scripts_dir, results_dir); + Job::new(self.jobs.len(), node.clone(), core_values, site.to_string(), scripts_dir, results_dir, os_flavor.clone()); fs::create_dir_all( std::path::Path::new(&job.script_file).parent().unwrap(), )?; fs::create_dir_all(results_dir)?; - let client = reqwest::Client::builder().build()?; scripts::generate_script_file(&job, events_by_vendor)?; + job.submit_job().await?; self.jobs.push(job); info!("Job submitted for {} node", node_uid); - info!("Wait 100 ms before another submission"); - tokio::time::sleep(Duration::from_millis(100)).await; + info!("Wait 300 ms before another submission"); + tokio::time::sleep(Duration::from_millis(300)).await; + let client = reqwest::Client::builder().build()?; self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file) .await?; // Throttling based on the maximum allowed concurrent jobs } else { info!("Job already listed on {} node, skipping", node_uid); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(300)).await; } } } @@ -361,6 +472,7 @@ impl Jobs { base_url: &str, file_to_dump_to: &str, ) -> Result<(), JobError> { + info!("Checking unfinished job"); for job in self.jobs.iter_mut().filter(|j| !j.finished()) { job.update_job_state(client, base_url).await?; if !job.finished() { @@ -369,7 +481,7 @@ impl Jobs { job.oar_job_id, job.state ); } - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(300)).await; } self.dump_to_file(file_to_dump_to)?; @@ -403,7 +515,7 @@ impl Jobs { } } -pub fn rsync_results(site: &str, cluster: &str, node: &str) -> JobResult { +pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) -> JobResult { let remote_directory = format!("{}:/home/nleblond/results.d", site); let mut p = Popen::create( &["rsync", "-avzP", &remote_directory, "."], @@ -425,7 +537,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str) -> JobResult { } else { p.terminate()?; } - let checksum_file = format!("results.d/{}/{}/{}.tar.xz.md5", site, cluster, node); + let checksum_file = format!("{}/{}/{}/{}.tar.xz.md5", results_dir, site, cluster, node); let mut p = Popen::create( &["md5sum", "-c", &checksum_file], PopenConfig { @@ -475,7 +587,7 @@ fn extract_tar_xz(dir_path: &str) -> Result <(), String> { .arg("-C") .arg(dir.parent().unwrap_or_else(|| Path::new("."))) .output() - .map_err(|e| format!("Failed to execute tar command stripping 5: {}", e))?; + .map_err(|e| format!("Failed to execute tar command stripping 5: {}", e)).unwrap(); if !output_5.status.success() { let output_3 = Command::new("tar") From 8fa12cb2257836b3d428aa70ea256cfaed978c38 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Tue, 17 Dec 2024 15:10:55 +0100 Subject: [PATCH 10/15] chores(lint) : Fix lint errors --- src/jobs.rs | 2 +- src/results.rs | 5 +---- src/ssh.rs | 3 --- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 20d0285..59868dd 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -209,7 +209,7 @@ impl Job { base_url: &str, ) -> JobResult { - let mut state: OARState; + let state: OARState; if self.state == OARState::Processing { let endpoint = format!("{}/sites/{}/deployments/{}", base_url, self.site, self.deployment_id.clone().unwrap()); if let Ok(response) = inventories::get_api_call(&client, &endpoint).await { diff --git a/src/results.rs b/src/results.rs index 5057e81..316a774 100644 --- a/src/results.rs +++ b/src/results.rs @@ -1,13 +1,10 @@ use thiserror::Error; -use log::{debug, info, warn}; -use std::cmp::Ordering; +use log::{debug, warn}; use serde::{Serialize, Deserialize}; -use csv::{Reader}; use std::path::{Path, PathBuf}; use std::fs::File; use std::io::{self, BufRead, BufReader, Write}; use std::fs; -use std::collections::HashMap; #[derive(Error, Debug)] pub enum ResultError { diff --git a/src/ssh.rs b/src/ssh.rs index 23483c5..b87fefc 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -1,12 +1,9 @@ -use bytes::BytesMut; use log::{debug, error, info}; use openssh::{KnownHosts, Session, Stdio}; use openssh_sftp_client::Sftp; use regex::Regex; -use std::path::Path; use std::str::{self}; use thiserror::Error; -use tokio::fs::File; use tokio::io::AsyncWriteExt; #[derive(Error, Debug)] From 766428fa99b9070ac7b8cf98693ef4a1136ebc70 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Thu, 9 Jan 2025 15:14:36 +0100 Subject: [PATCH 11/15] feat(jobs) : Submit job if it can be treated before the next day/night slot, fix Rsync mecanism --- .gitignore | 1 + src/jobs.rs | 96 +++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 76 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index 16a2086..a72cecf 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ /test_results/**/* /jobs.yaml /menage.sh +/resources # Added by cargo diff --git a/src/jobs.rs b/src/jobs.rs index 59868dd..b62b25c 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -10,14 +10,16 @@ use serde_yaml::{self}; use std::collections::HashMap; use std::fmt::{self, Display}; use std::str::{self}; -use std::time::Duration; use std::{env, fs}; use std::path::{Path, PathBuf}; use subprocess::{Popen, PopenConfig, Redirection}; use thiserror::Error; use std::process::Command; +use chrono::{Local, Timelike, Duration}; const MAX_CONCURRENT_JOBS: usize = 30; +const G5K_DAY_BOTTOM_BOUNDARY: i64 = 9; +const G5K_DAY_UP_BOUNDARY: i64 = 19; #[derive(Error, Debug)] pub enum JobError { @@ -168,6 +170,7 @@ impl Job { } pub async fn submit_job(&mut self) -> JobResult { + info!("Submitting job on {}", &self.node.uid); let session = ssh::ssh_connect(&self.site).await?; ssh::create_remote_directory(&session, &self.script_file).await?; ssh::sftp_upload(&session, &self.script_file, &self.script_file).await?; @@ -184,7 +187,7 @@ impl Job { } else { let client = reqwest::Client::builder().build()?; let endpoint = format!("{}/sites/{}/jobs", super::BASE_URL, self.site); - let data = serde_json::json!({"properties": format!("host={}",self.node.uid), "resources": "walltime=5", "types": ["deploy"], "command": "sleep 14500"}); + let data = serde_json::json!({"properties": format!("host={}",self.node.uid), "resources": format!("walltime={}", scripts::WALLTIME), "types": ["deploy"], "command": "sleep 14500"}); if let Ok(response) = inventories::post_api_call(&client, &endpoint, &data).await { debug!("Job has been posted on deploy mode"); @@ -192,7 +195,7 @@ impl Job { let job_id = response.get("uid").unwrap(); self.oar_job_id = job_id.as_u64(); } else { - debug!("Job has failed to be posted on deploy mode"); + error!("Job has failed to be posted on deploy mode"); self.state = OARState::Failed; } } @@ -240,7 +243,9 @@ impl Job { let str_state = response.get("state").unwrap().as_str(); if str_state == Some("waiting") && self.state == OARState::WaitingToBeDeployed { state = OARState::WaitingToBeDeployed; - } else { + } else if str_state == Some("launching") || str_state == Some("to_launch") { + state = self.state.clone(); + } else { state = OARState::try_from(str_state.unwrap()).unwrap(); } } @@ -271,8 +276,10 @@ impl Job { pub async fn job_running(&mut self) -> JobResult { if self.os_flavor == super::DEFAULT_OS_FLAVOR { + info!("Starting script on {}", &self.node.uid); return Ok(()) } + info!("Deploying new environement on {}", &self.node.uid); // CURL KADEPLOY let client = reqwest::Client::builder().build()?; let endpoint = format!("{}/sites/{}/deployments", super::BASE_URL, self.site); @@ -294,7 +301,7 @@ impl Job { self.deployment_id = Some(deployment_id.as_str().unwrap().to_owned()); } Err(e) => { - debug!("Job os_flavor has failed to be deployed : {:?}", e); + error!("Job os_flavor has failed to be deployed : {:?}", e); self.state = OARState::Failed; } } @@ -303,6 +310,7 @@ impl Job { } pub async fn job_os_deployed(&mut self) -> JobResult { + info!("Running script on {}", &self.node.uid); let session = ssh::ssh_connect(&self.site).await?; let host = format!("{}.{}.grid5000.fr", self.node.uid, self.site); @@ -315,18 +323,18 @@ impl Job { } pub async fn job_terminated(&mut self) -> JobResult { - let script_dir = Path::new(&self.script_file) - .components() // Break the path into components + info!("Downloading and processing results from {}", &self.node.uid); + let root_results_dir = Path::new(&self.results_dir) + .components() .filter_map(|comp| match comp { std::path::Component::Normal(name) => name.to_str(), _ => None, - }) // Keep only "normal" components (skip root, prefix, etc.) + }) .next(); if let Err(rsync_result) = rsync_results( - script_dir.unwrap(), &self.site, - self.node.cluster.as_deref().unwrap(), - &self.node.uid, + &self.results_dir, + root_results_dir.unwrap(), ) { self.state = OARState::UnknownState; } else { @@ -426,6 +434,20 @@ impl Jobs { self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file) .await?; } + while !within_time_window(scripts::WALLTIME) { + info!( + "Too close of day|night boundaries for {} WALLTIME", + scripts::WALLTIME + ); + tokio::time::sleep(std::time::Duration::from_secs( + super::SLEEP_CHECK_TIME_IN_SECONDES, + )) + .await; + + let client = reqwest::Client::builder().build()?; + self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file) + .await?; + } // Job creation and submission let core_values = configs::generate_core_values(5, node.architecture.nb_cores); @@ -441,8 +463,8 @@ impl Jobs { job.submit_job().await?; self.jobs.push(job); info!("Job submitted for {} node", node_uid); - info!("Wait 300 ms before another submission"); - tokio::time::sleep(Duration::from_millis(300)).await; + debug!("Wait 300 ms before another submission"); + tokio::time::sleep(std::time::Duration::from_millis(2000)).await; let client = reqwest::Client::builder().build()?; self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file) @@ -451,7 +473,7 @@ impl Jobs { // Throttling based on the maximum allowed concurrent jobs } else { info!("Job already listed on {} node, skipping", node_uid); - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } } @@ -476,12 +498,12 @@ impl Jobs { for job in self.jobs.iter_mut().filter(|j| !j.finished()) { job.update_job_state(client, base_url).await?; if !job.finished() { - debug!( + info!( "Job {:?} is still in '{}' state.", job.oar_job_id, job.state ); } - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(std::time::Duration::from_millis(2000)).await; } self.dump_to_file(file_to_dump_to)?; @@ -515,8 +537,8 @@ impl Jobs { } } -pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) -> JobResult { - let remote_directory = format!("{}:/home/nleblond/results.d", site); +pub fn rsync_results(site: &str, results_dir: &str, root_results_dir: &str) -> JobResult { + let remote_directory = format!("{}:/home/nleblond/{}", site, root_results_dir); let mut p = Popen::create( &["rsync", "-avzP", &remote_directory, "."], PopenConfig { @@ -527,7 +549,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) - let (out, err) = p.communicate(None)?; - if let Ok(Some(exit_status)) = p.wait_timeout(Duration::from_secs(120)) { + if let Ok(Some(exit_status)) = p.wait_timeout(std::time::Duration::from_secs(120)) { if exit_status.success() { debug!("Rsync with site {} done.\n{:?}", site, out); } else { @@ -537,7 +559,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) - } else { p.terminate()?; } - let checksum_file = format!("{}/{}/{}/{}.tar.xz.md5", results_dir, site, cluster, node); + let checksum_file = format!("{}.tar.xz.md5", results_dir); let mut p = Popen::create( &["md5sum", "-c", &checksum_file], PopenConfig { @@ -548,7 +570,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) - let (out, err) = p.communicate(None)?; - if let Ok(Some(exit_status)) = p.wait_timeout(Duration::from_secs(120)) { + if let Ok(Some(exit_status)) = p.wait_timeout(std::time::Duration::from_secs(120)) { if exit_status.success() { debug!("Checksum success.\n{:?}", out); } else { @@ -610,3 +632,35 @@ fn extract_tar_xz(dir_path: &str) -> Result <(), String> { Ok(()) } + +fn parse_walltime(walltime: &str) -> Option { + let parts: Vec<&str> = walltime.split(':').collect(); + match parts.len() { + 1 => parts[0].parse::().ok().map(|h| Duration::hours(h)), + 2 => { + let hours = parts[0].parse::().ok()?; + let minutes = parts[1].parse::().ok()?; + Some(Duration::hours(hours) + Duration::minutes(minutes)) + } + 3 => { + let hours = parts[0].parse::().ok()?; + let minutes = parts[1].parse::().ok()?; + let seconds = parts[2].parse::().ok()?; + Some(Duration::hours(hours) + Duration::minutes(minutes) + Duration::seconds(seconds)) + } + _ => None, + } +} + +fn within_time_window(walltime: &str) -> bool { + let now = Local::now(); + let current_hour = now.hour() as i64; + let walltime_duration = parse_walltime(walltime).unwrap_or_else(|| Duration::hours(0)); + let adjusted_hour = current_hour + walltime_duration.num_hours(); + + if (G5K_DAY_BOTTOM_BOUNDARY..G5K_DAY_UP_BOUNDARY).contains(¤t_hour) { + adjusted_hour < G5K_DAY_UP_BOUNDARY + } else { + adjusted_hour < G5K_DAY_BOTTOM_BOUNDARY || adjusted_hour >= 24 + } +} From 8ef85f89a9a110ab3e54abe60d74bf469323edda Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Thu, 9 Jan 2025 15:16:21 +0100 Subject: [PATCH 12/15] feat(main) : Process each results once done, even if it was not zipped (fails before) --- src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9b293b2..932ec69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -220,10 +220,6 @@ fn init_directories(logs_directory: &str, inventories_directory: &str, scripts_d eprintln!("Failed to create directory {}: {}", dir, e); e })?; - debug!( - "Successfully created or confirmed existing directory: {}", - dir - ); } Ok(()) } @@ -335,6 +331,10 @@ async fn main() -> Result<(), BenchmarkError> { info!("Skipping jobs generation and submission as requested"); } + for job in jobs.jobs { + results::process_results(&job.results_dir)?; + } + Ok(()) } From 138c270cb0cef624cb63c2ce82a2d2bcc57f4ad9 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Thu, 9 Jan 2025 15:17:33 +0100 Subject: [PATCH 13/15] feat(results) : Process each results once done, even if it was not zipped (fails before) --- src/results.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/results.rs b/src/results.rs index 316a774..8ddf25a 100644 --- a/src/results.rs +++ b/src/results.rs @@ -80,14 +80,12 @@ struct PerfRow { /// Creates an aggregation of perf___ into corresponding perf_alone__.csv file fn aggregate_perf(raw_perf_results_file: PathBuf) -> io::Result<()> { - debug!("Processing perf file '{}'", raw_perf_results_file.display()); let output_path = &format!("{}.csv", raw_perf_results_file.display()); fs::File::create(output_path)?; let mut output_writer = csv::Writer::from_path(output_path)?; if let Some((nb_core, nb_ops_per_core)) = parse_perf_metadata(raw_perf_results_file.file_name().unwrap().to_str().unwrap()) { - debug!("{} metadata : nb_core {} ; nb_ops_per_core {}", raw_perf_results_file.display(), nb_core, nb_ops_per_core); let raw_perf_results_file = File::open(raw_perf_results_file)?; let reader = BufReader::new(raw_perf_results_file); let mut iteration = 1; @@ -159,16 +157,12 @@ fn parse_perf_metadata(file_name: &str) -> Option<(String, String)> { fn parse_hwpc_metadata(dir_name: &str) -> Option<(i32, i32, usize)> { if let Some(dir_name) = Path::new(dir_name).file_name().and_then(|os_str| os_str.to_str()) { - debug!("Filename to parse is : {}", dir_name); let parts: Vec<&str> = dir_name.split('_').collect(); if parts.len() == 5 { - debug!("Is hwpc alone"); if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[2].parse::(), parts[3].parse::(), parts[4].parse::()) { - debug!("{:?}", Some((nb_core, nb_ops_per_core, iteration))); return Some((nb_core, nb_ops_per_core, iteration)); } } else if parts.len() == 6 { - debug!("Is hwpc with perf"); if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[3].parse::(), parts[4].parse::(), parts[5].parse::()) { return Some((nb_core, nb_ops_per_core, iteration)); } @@ -180,9 +174,16 @@ fn parse_hwpc_metadata(dir_name: &str) -> Option<(i32, i32, usize)> { } fn aggregate_hwpc_file(raw_rapl_file: &Path, output_path: &str, nb_core: i32, nb_ops_per_core: i32, iteration: usize) -> io::Result<()> { - debug!("Writing to hwpc aggregation file {:?}", output_path); - let mut output_writer = csv::Writer::from_path(output_path)?; - debug!("Processing hwpc raw file {:?}", raw_rapl_file); + let file_exists = std::fs::metadata(output_path).is_ok(); + let file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .append(true) + .open(output_path)?; + + let mut output_writer = csv::WriterBuilder::new().has_headers(!file_exists).from_writer(file); + + if let Ok(mut reader) = csv::Reader::from_path(raw_rapl_file) { let iter = reader.deserialize::(); @@ -207,7 +208,6 @@ fn aggregate_hwpc_file(raw_rapl_file: &Path, output_path: &str, nb_core: i32, nb fn aggregate_hwpc_subdir(subdir: &fs::DirEntry, output_path: &str) -> io::Result<()> { let raw_rapl_file = subdir.path().join("rapl.csv"); - debug!("Processing hwpc aggregation file {:?}", raw_rapl_file); if let Some((nb_core, nb_ops_per_core, iteration)) = parse_hwpc_metadata(subdir.file_name().to_str().unwrap()) { aggregate_hwpc_file(&raw_rapl_file, output_path, nb_core, nb_ops_per_core, iteration)?; } else { @@ -223,7 +223,6 @@ fn aggregate_hwpc( let (output_parent, output_basename) = (raw_results_dir_path.parent().unwrap(), raw_results_dir_path.file_name().unwrap()); let output_path = &format!("{}/{}.csv", output_parent.to_str().unwrap(), output_basename.to_str().unwrap()); - debug!("Output path : {}", output_path); let mut raw_results_subdirs = Vec::new(); @@ -234,7 +233,6 @@ fn aggregate_hwpc( warn!("Could not find subdirectories in {} directory", output_parent.to_str().unwrap()); } - debug!("Found {:?} subdirs", raw_results_subdirs); assert!(raw_results_subdirs.iter().map(|subdir| aggregate_hwpc_subdir(subdir.as_ref().unwrap(), output_path)).all(|result| result.is_ok())); Ok(()) @@ -284,11 +282,9 @@ fn filter_perf_files(directory: &str) -> Vec { } pub fn process_results(results_directory: &str) -> io::Result<()> { let perf_raw_files = filter_perf_files(results_directory); - debug!("Found perf files {:?} in {} directory", perf_raw_files, results_directory); assert!(perf_raw_files.iter().map(|perf_raw_file| aggregate_perf(perf_raw_file.to_path_buf())).all(|result| result.is_ok())); let hwpc_raw_dirs = filter_hwpc_dirs(results_directory); - debug!("Found hwpc subdirs {:?} in {} directory", hwpc_raw_dirs, results_directory); assert!(hwpc_raw_dirs.iter().map(|hwpc_raw_results_dir| aggregate_hwpc(hwpc_raw_results_dir.to_path_buf())).all(|result| result.is_ok())); From 917a5c3bb64809560068fd950dcbe1cff427b536 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Thu, 9 Jan 2025 15:19:27 +0100 Subject: [PATCH 14/15] fix(scripts) : Fix tar command, log level, walltime default --- src/scripts.rs | 2 +- src/ssh.rs | 2 +- templates/zip_results.sh | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/scripts.rs b/src/scripts.rs index 10b4837..5167f70 100644 --- a/src/scripts.rs +++ b/src/scripts.rs @@ -12,7 +12,7 @@ use std::fs::File; use std::io::Write; use thiserror::Error; -const WALLTIME: &str = "4:00:00"; +pub const WALLTIME: &str = "5:00:00"; const QUEUE_TYPE: &str = "default"; const CPU_OPS_PER_CORE_LIST: &[u32] = &[25, 250, 2_500, 25_000]; const NB_ITERATIONS: usize = 10; diff --git a/src/ssh.rs b/src/ssh.rs index b87fefc..6a5ed53 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -67,7 +67,7 @@ pub async fn run_script(session: &Session, host:&str, script_file: &str) -> SshR match ssh_command { Ok(ssh_output) => { if ssh_output.status.success() { - info!("Script successsfully started"); + debug!("Script successsfully started"); } else { error!("Job submission failed: {:?}", ssh_output.stderr); } diff --git a/templates/zip_results.sh b/templates/zip_results.sh index 69ee7a3..bcc4b64 100644 --- a/templates/zip_results.sh +++ b/templates/zip_results.sh @@ -1,5 +1,5 @@ cd {{ results_directory }}/.. -tar -cJf ~/{{ results_directory }}.tar.xz ~/{{ results_directory }} -cd ~ +tar -cJf /home/nleblond/{{ results_directory }}.tar.xz /home/nleblond/{{ results_directory }} +cd /home/nleblond rm -rf {{ results_directory }} md5sum {{ results_directory }}.tar.xz > {{ results_directory }}.tar.xz.md5 From 33be2afa8365d195107bbf8748a4c1e6d1df676f Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Thu, 9 Jan 2025 15:24:04 +0100 Subject: [PATCH 15/15] chores(git): Ignore local fs --- .gitignore | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index a72cecf..37ca6ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,17 +1,20 @@ -/logs.d/**/* +/logs*.d/**/* .env /Cargo.lock /tasks.d/**/* /jobs.d/**/* -/results.d/**/* -/scripts.d/**/* -/inventories.d/**/* +/results*.d/**/* +/scripts*.d/**/* +/inventories*.d/**/* /backup/**/* /test_results/**/* -/jobs.yaml +/jobs*.yaml /menage.sh /resources - +/*.tar.* +/*ipynb* +/batches/** +/.ssh_g5k.pub # Added by cargo