From 89be7566bc4ec3cea0a850ce9ddb4857ba0ef1b8 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Mon, 25 Nov 2024 17:13:26 +0100 Subject: [PATCH 01/11] feat(main) : Add options for directories --- src/main.rs | 132 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 96 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index 07325fc..cc8de0e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,41 @@ const CONFIG_FILE: &str = "config/events_by_vendor.json"; struct BenchmarkArgs { /// Skip the scrapping against Grid5000 API refreshing node configurations #[arg(short, long)] - skip_inventory: bool, + inventory_skip: bool, + + /// Skip the jobs generation/submission step + #[arg(short, long)] + jobs_skip: bool, + + /// Skip the results processing step + #[arg(short, long)] + results_process_skip: bool, + + /// Directory to store logs + #[arg(long, default_value = LOGS_DIRECTORY)] + logs_directory: String, + + /// Directory to store nodes metadata + #[arg(long, default_value = INVENTORIES_DIRECTORY)] + inventories_directory: String, + + /// File to store OAR jobs info + #[arg(long, default_value = JOBS_FILE)] + jobs_file: String, + + /// Directory to store generated scripts + #[arg(long, default_value = SCRIPTS_DIRECTORY)] + scripts_directory: String, + + /// Directory to store results / retrieve results to process + #[arg(long, default_value = RESULTS_DIRECTORY)] + results_directory: String, + + /// File to find events / process for hwpc and perf + #[arg(long, default_value = CONFIG_FILE)] + config_file: String, + + } @@ -65,6 +99,14 @@ pub struct HwpcEvents { core: Vec, } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +enum PerfEvent { + PowerEnergyPkg, + PowerEnergyDram, + PowerEnergyPsys, + PowerEnergyCores, +} + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct PerfEvents(Vec); impl Display for PerfEvents { @@ -161,12 +203,12 @@ impl EventsByVendor { } // Creates all directories if not already existing -fn init_directories() -> BenchmarkResult { +fn init_directories(logs_directory: &str, inventories_directory: &str, scripts_directory: &str, results_directory: &str) -> BenchmarkResult { let directories = [ - LOGS_DIRECTORY, - INVENTORIES_DIRECTORY, - SCRIPTS_DIRECTORY, - RESULTS_DIRECTORY, + logs_directory, + inventories_directory, + scripts_directory, + results_directory, ]; for dir in directories { @@ -200,16 +242,16 @@ fn build_logger(log_level: &str) -> Result<(), log::SetLoggerError> { .try_init() } -fn load_events_config() -> Result { - let content = fs::read_to_string(CONFIG_FILE)?; +fn load_events_config(config_file: &str) -> Result { + let content = fs::read_to_string(config_file)?; let events: EventsByVendor = serde_json::from_str(&content)?; Ok(events) } -fn load_or_init_jobs() -> Result { - if std::path::Path::new(JOBS_FILE).exists() { +fn load_or_init_jobs(jobs_file: &str) -> Result { + if std::path::Path::new(jobs_file).exists() { info!("Found jobs.yaml file, processing with existing jobs"); - let content = fs::read_to_string(JOBS_FILE)?; + let content = fs::read_to_string(jobs_file)?; Ok(serde_yaml::from_str(&content)?) } else { info!("No jobs.yaml file found, starting with an empty job list"); @@ -229,38 +271,56 @@ async fn main() -> Result<(), BenchmarkError> { - init_directories()?; + init_directories( + &benchmark_args.logs_directory, + &benchmark_args.inventories_directory, + &benchmark_args.scripts_directory, + &benchmark_args.results_directory, + )?; - let events_by_vendor = load_events_config()?; - let mut jobs: Jobs = load_or_init_jobs()?; + let events_by_vendor = load_events_config(&benchmark_args.config_file)?; + let mut jobs: Jobs = load_or_init_jobs(&benchmark_args.jobs_file)?; - if ! benchmark_args.skip_inventory { + if ! benchmark_args.inventory_skip { + info!("Processing inventory step"); inventories::generate_inventory(INVENTORIES_DIRECTORY).await?; + } else { + info!("Skipping inventory scrapping as requested"); } - // If we loaded existing jobs, check their status - if jobs.jobs.len() != 0 { + + if ! benchmark_args.jobs_skip { + info!("Processing jobs step"); + // If we loaded existing jobs, check their status + if jobs.jobs.len() != 0 { + let client = reqwest::Client::builder().build()?; + jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) + .await?; + } + + jobs.generate_jobs( + JOBS_FILE, + INVENTORIES_DIRECTORY, + SCRIPTS_DIRECTORY, + RESULTS_DIRECTORY, + &events_by_vendor, + ) + .await?; + let client = reqwest::Client::builder().build()?; - jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) - .await?; - } - jobs.generate_jobs( - JOBS_FILE, - INVENTORIES_DIRECTORY, - SCRIPTS_DIRECTORY, - RESULTS_DIRECTORY, - &events_by_vendor, - ) - .await?; - - let client = reqwest::Client::builder().build()?; - - while !jobs.job_is_done() { - debug!("Job not done!"); - jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) - .await?; - tokio::time::sleep(Duration::from_secs(SLEEP_CHECK_TIME_IN_SECONDES)).await; + + while !jobs.job_is_done() { + debug!("Job not done!"); + jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) + .await?; + tokio::time::sleep(Duration::from_secs(SLEEP_CHECK_TIME_IN_SECONDES)).await; + } + } else { + info!("Skipping jobs generation and submission as requested"); } + + results::process_results()?; + Ok(()) } From 2bb9a8254b48d4baaafe552752def8b96b568c38 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Mon, 25 Nov 2024 17:14:21 +0100 Subject: [PATCH 02/11] feat(jobs) : Reduce round-robin timestep 300ms -> 100ms --- src/jobs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jobs.rs b/src/jobs.rs index ed54673..403dacf 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -316,7 +316,7 @@ impl Jobs { // Throttling based on the maximum allowed concurrent jobs } else { info!("Job already listed on {} node, skipping", node_uid); - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } } } From 56638ff158feba401d1d0634312f44910330b657 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Mon, 25 Nov 2024 17:18:01 +0100 Subject: [PATCH 03/11] feat(results) : Add perf aggregation from file and hwpc from main dir --- src/results.rs | 246 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 241 insertions(+), 5 deletions(-) diff --git a/src/results.rs b/src/results.rs index 01c6cde..43854ef 100644 --- a/src/results.rs +++ b/src/results.rs @@ -1,10 +1,246 @@ use thiserror::Error; +use log::{debug, info, warn}; +use serde::{Serialize, Deserialize}; +use csv::{Reader}; +use std::path::Path; +use std::fs::File; +use std::io::{self, BufRead, BufReader, Write}; +use std::fs; +use std::collections::HashMap; +use plotters::prelude::*; #[derive(Error, Debug)] -pub enum ResultError {} +pub enum ResultError { + #[error("Could not parse CSV File : {0}")] + Csv(#[from] csv::Error), +} -#[derive()] -pub struct HwpcReport {} +#[derive(Debug, Deserialize, Serialize, PartialEq)] +struct HwpcRowRaw { + timestamp: i64, + sensor: String, + target: String, + socket: i32, + cpu: i32, + RAPL_ENERGY_PKG: Option, + RAPL_ENERGY_DRAM: Option, + RAPL_ENERGY_CORES: Option, + time_enabled: i64, + time_running: i64, +} + + + + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +struct HwpcRow { + timestamp: i64, + sensor: String, + target: String, + socket: i32, + cpu: i32, + rapl_energy_pkg: Option, + rapl_energy_dram: Option, + rapl_energy_cores: Option, + time_enabled: i64, + time_running: i64, + nb_core: i32, + nb_ops_per_core: i32, + iteration: usize +} + +impl HwpcRow { + fn from_raw_record(raw_record: HwpcRowRaw, nb_core: i32, nb_ops_per_core: i32, iteration: usize) -> Self { + HwpcRow { + timestamp: raw_record.timestamp, + sensor: raw_record.sensor, + target: raw_record.target, + socket: raw_record.socket, + cpu: raw_record.cpu, + rapl_energy_pkg: raw_record.RAPL_ENERGY_PKG, + rapl_energy_dram: raw_record.RAPL_ENERGY_DRAM, + rapl_energy_cores: raw_record.RAPL_ENERGY_CORES, + time_enabled: raw_record.time_enabled, + time_running: raw_record.time_running, + nb_core, + nb_ops_per_core, + iteration + } + } +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +struct PerfRow { + power_energy_pkg: Option, + power_energy_ram: Option, + power_energy_cores: Option, + time_elapsed: f64, + nb_core: i32, + nb_ops_per_core: i32, + iteration: usize, +} + +#[derive(Debug)] +struct Statistics { + nb_core: i32, + nb_ops_per_core: i32, + task: String, + mean: f64, + variance: f64, + std_dev: f64, + coeff_variation: f64, +} + +/// Creates an aggregation of perf___ into corresponding perf_alone__.csv file +fn aggregate_perf(raw_perf_results_file: &str) -> io::Result<()> { + debug!("Processing perf file '{}'", raw_perf_results_file); + + let output_path = &format!("{}.csv", raw_perf_results_file); + fs::File::create(output_path)?; + let mut output_writer = csv::Writer::from_path(output_path)?; + + if let Some((nb_core, nb_ops_per_core)) = parse_perf_metadata(raw_perf_results_file) { + let raw_perf_results_file = File::open(raw_perf_results_file)?; + let reader = BufReader::new(raw_perf_results_file); + let mut iteration = 1; + let mut cores_joules = None; + let mut pkg_joules = None; + let mut ram_joules = None; + let mut time_elapsed = None; + + for line in reader.lines() { + let line = line?; + if line.contains("power/energy-cores/") { + if let Some(value) = line.trim().split_whitespace().next() { + cores_joules = Some(value.parse::().unwrap_or_default()); + } + } else if line.contains("power/energy-pkg/") { + if let Some(value) = line.trim().split_whitespace().next() { + pkg_joules = Some(value.parse::().unwrap_or_default()); + } + } else if line.contains("power/energy-ram/") { + if let Some(value) = line.trim().split_whitespace().next() { + ram_joules = Some(value.parse::().unwrap_or_default()); + } + } else if line.contains("seconds time elapsed") { + if let Some(value) = line.trim().split_whitespace().next() { + time_elapsed = Some(value.parse::().unwrap_or_default()); + } + let perf_row = PerfRow { + power_energy_pkg: pkg_joules, + power_energy_ram: ram_joules, + power_energy_cores: cores_joules, + time_elapsed: time_elapsed.unwrap(), + nb_core: nb_core.parse::().unwrap(), + nb_ops_per_core: nb_ops_per_core.parse::().unwrap(), + iteration + }; + output_writer.serialize(perf_row)?; + iteration += 1; + cores_joules = None; + pkg_joules = None; + ram_joules = None; + time_elapsed = None; // Reset for the next iteration + } + } + } else { + warn!("Could not parse metadata from file name: {:?}", raw_perf_results_file); + } + + Ok(()) +} + +fn parse_perf_metadata(file_name: &str) -> Option<(String, String)> { + if let Some(file_name) = Path::new(file_name).file_name().and_then(|os_str| os_str.to_str()) { + let parts: Vec<&str> = file_name.split('_').collect(); + if parts.len() == 4 { + if let (Ok(nb_core), Ok(nb_ops_per_core)) = (parts[2].parse::(), parts[3].parse::()) { + return Some((nb_core.to_string(), nb_ops_per_core.to_string())); + } + } else if parts.len() == 5 { + if let (Ok(nb_core), Ok(nb_ops_per_core)) = (parts[3].parse::(), parts[4].parse::()) { + return Some((nb_core.to_string(), nb_ops_per_core.to_string())); + } + } + } else { + warn!("Could not parse filename {} to get metadata", file_name); + } + None +} + +fn parse_hwpc_metadata(dir_name: &str) -> Option<(i32, i32, usize)> { + if let Some(dir_name) = Path::new(dir_name).file_name().and_then(|os_str| os_str.to_str()) { + let parts: Vec<&str> = dir_name.split('_').collect(); + if parts.len() == 5 { + if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[2].parse::(), parts[3].parse::(), parts[4].parse::()) { + return Some((nb_core, nb_ops_per_core, iteration)); + } + } else if parts.len() == 6 { + if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[3].parse::(), parts[4].parse::(), parts[5].parse::()) { + return Some((nb_core, nb_ops_per_core, iteration)); + } + } + } else { + warn!("Could not parse filename {} to get metadata", dir_name); + } + None +} + +fn aggregate_hwpc_file(raw_rapl_file: &Path, output_path: &str, nb_core: i32, nb_ops_per_core: i32, iteration: usize) -> io::Result<()> { + let mut output_writer = csv::Writer::from_path(output_path)?; + let mut reader = csv::Reader::from_path(raw_rapl_file)?; + let iter = reader.deserialize::(); + + for hwpc_row_raw in iter { + let hwpc_raw = HwpcRow::from_raw_record(hwpc_row_raw?, nb_core, nb_ops_per_core, iteration); + output_writer.serialize(hwpc_raw)?; + } + Ok(()) +} + +fn aggregate_hwpc_subdir(subdir: &fs::DirEntry, output_path: &str) -> io::Result<()> { + + let raw_rapl_file = subdir.path().join("rapl.csv"); + info!("Processing hwpc file {:?}", raw_rapl_file); + if let Some((nb_core, nb_ops_per_core, iteration)) = parse_hwpc_metadata(subdir.path().to_str().unwrap()) { + aggregate_hwpc_file(&raw_rapl_file, output_path, nb_core, nb_ops_per_core, iteration)?; + } else { + warn!("Could not parse metadata from directory name: {:?}", subdir); + } + Ok(()) +} + +/// Creates an aggregation of hwpc___ into corresponding hwpc___.csv file +fn aggregate_hwpc( + raw_results_dir: &str, +) -> io::Result<()> { + + let raw_results_dir_path = Path::new(raw_results_dir); + let (output_parent, output_basename) = (raw_results_dir_path.parent().unwrap(), raw_results_dir_path.file_name().unwrap()); + let output_path = &format!("{}/{}.csv", output_parent.to_str().unwrap(), output_basename.to_str().unwrap()); + + + let mut raw_results_subdirs = Vec::new(); + + if let Ok(entries) = fs::read_dir(raw_results_dir) { + raw_results_subdirs = entries.filter(|entry| entry.as_ref().unwrap().file_type().unwrap().is_dir()).collect(); + } else { + warn!("Could not find subdirectories in {} directory", raw_results_dir); + } + + debug!("Found {:?} subdirs", raw_results_subdirs); + assert!(raw_results_subdirs.iter().map(|subdir| aggregate_hwpc_subdir(subdir.as_ref().unwrap(), output_path)).all(|result| result.is_ok())); + + Ok(()) +} + +pub fn process_results() -> io::Result<()> { + let perf_raw_results_file = "test_results/lille/chiclet/chiclet-3/perf_alone_18_25"; + aggregate_perf(perf_raw_results_file)?; + + let hwpc_raw_results_dir = "test_results/lille/chiclet/chiclet-3/hwpc_alone_18_25"; + aggregate_hwpc(hwpc_raw_results_dir)?; + + Ok(()) +} -#[derive()] -pub struct PerfReport {} From 6d5a5e7018b34126a9d36d87c457e9ab318272fd Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Mon, 25 Nov 2024 17:34:22 +0100 Subject: [PATCH 04/11] fix(directories) : Fix direct reference to super:: --- src/jobs.rs | 16 ++++++++-------- src/main.rs | 14 +++++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 403dacf..1d0305a 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -111,29 +111,29 @@ pub struct Job { } impl Job { - fn build_script_file_path(node: &Node, site: &str) -> String { + fn build_script_file_path(node: &Node, site: &str, root_scripts_dir: &str) -> String { format!( "{}/{}/{}/{}.sh", - super::SCRIPTS_DIRECTORY, + root_scripts_dir, site, node.cluster.as_ref().unwrap(), node.uid ) } - fn build_results_dir_path(node: &Node, site: &str) -> String { + fn build_results_dir_path(node: &Node, site: &str, root_results_dir: &str) -> String { format!( "{}/{}/{}/{}", - super::RESULTS_DIRECTORY, + root_results_dir, site, node.cluster.as_ref().unwrap(), node.uid ) } - fn new(id: usize, node: Node, core_values: Vec, site: String) -> Self { - let script_file = Job::build_script_file_path(&node, &site); - let results_dir = Job::build_results_dir_path(&node, &site); + fn new(id: usize, node: Node, core_values: Vec, site: String, root_scripts_dir: &str, root_results_dir: &str) -> Self { + let script_file = Job::build_script_file_path(&node, &site, root_scripts_dir); + let results_dir = Job::build_results_dir_path(&node, &site, root_results_dir); Job { id, @@ -296,7 +296,7 @@ impl Jobs { let core_values = configs::generate_core_values(5, node.architecture.nb_cores); let mut job = - Job::new(self.jobs.len(), node.clone(), core_values, site.to_string()); + Job::new(self.jobs.len(), node.clone(), core_values, site.to_string(), scripts_dir, results_dir); fs::create_dir_all( std::path::Path::new(&job.script_file).parent().unwrap(), )?; diff --git a/src/main.rs b/src/main.rs index cc8de0e..f662b52 100644 --- a/src/main.rs +++ b/src/main.rs @@ -283,7 +283,7 @@ async fn main() -> Result<(), BenchmarkError> { if ! benchmark_args.inventory_skip { info!("Processing inventory step"); - inventories::generate_inventory(INVENTORIES_DIRECTORY).await?; + inventories::generate_inventory(&benchmark_args.inventories_directory).await?; } else { info!("Skipping inventory scrapping as requested"); } @@ -293,15 +293,15 @@ async fn main() -> Result<(), BenchmarkError> { // If we loaded existing jobs, check their status if jobs.jobs.len() != 0 { let client = reqwest::Client::builder().build()?; - jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) + jobs.check_unfinished_jobs(&client, BASE_URL, &benchmark_args.jobs_file) .await?; } jobs.generate_jobs( - JOBS_FILE, - INVENTORIES_DIRECTORY, - SCRIPTS_DIRECTORY, - RESULTS_DIRECTORY, + &benchmark_args.jobs_file, + &benchmark_args.inventories_directory, + &benchmark_args.scripts_directory, + &benchmark_args.results_directory, &events_by_vendor, ) .await?; @@ -310,7 +310,7 @@ async fn main() -> Result<(), BenchmarkError> { while !jobs.job_is_done() { debug!("Job not done!"); - jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) + jobs.check_unfinished_jobs(&client, BASE_URL, &benchmark_args.jobs_file) .await?; tokio::time::sleep(Duration::from_secs(SLEEP_CHECK_TIME_IN_SECONDES)).await; } From 58a0147200536c1c3eb972b48b9479d0faea2b00 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Wed, 4 Dec 2024 10:50:41 +0100 Subject: [PATCH 05/11] feat(results) : Process each Job results once it transitions to Terminating (Terminated/Finishing/Failed) state --- .gitmodules | 3 + 674ebc02e125e194eeda1bb9 | 1 + src/jobs.rs | 61 ++++++++++++++++++- src/results.rs | 123 ++++++++++++++++++++++++++++----------- 4 files changed, 153 insertions(+), 35 deletions(-) create mode 100644 .gitmodules create mode 160000 674ebc02e125e194eeda1bb9 diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..352957d --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "674ebc02e125e194eeda1bb9"] + path = 674ebc02e125e194eeda1bb9 + url = https://git@git.overleaf.com/674ebc02e125e194eeda1bb9 diff --git a/674ebc02e125e194eeda1bb9 b/674ebc02e125e194eeda1bb9 new file mode 160000 index 0000000..969ad3c --- /dev/null +++ b/674ebc02e125e194eeda1bb9 @@ -0,0 +1 @@ +Subproject commit 969ad3c9498d7a84dce895208d09a8c597e97db2 diff --git a/src/jobs.rs b/src/jobs.rs index 1d0305a..a834ffb 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -3,7 +3,8 @@ use crate::configs; use crate::inventories::{self, Node}; use crate::scripts; use crate::ssh; -use log::{debug, error, info}; +use crate::results; +use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use serde_yaml::{self}; use std::collections::HashMap; @@ -11,8 +12,10 @@ use std::fmt::{self, Display}; use std::str::{self}; use std::time::Duration; use std::{env, fs}; +use std::path::{Path, PathBuf}; use subprocess::{Popen, PopenConfig, Redirection}; use thiserror::Error; +use std::process::Command; const MAX_CONCURRENT_JOBS: usize = 20; @@ -217,6 +220,12 @@ impl Job { &self.node.uid, ) { self.state = OARState::UnknownState; + } else { + if let Ok(_extracted) = extract_tar_xz(&self.results_dir) { + results::process_results(&self.results_dir)?; + } else { + warn!("Could not extract tar"); + } } Ok(()) } @@ -345,6 +354,7 @@ impl Jobs { job.oar_job_id, job.state ); } + tokio::time::sleep(Duration::from_secs(5)).await; } self.dump_to_file(file_to_dump_to)?; @@ -424,3 +434,52 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str) -> JobResult { Ok(()) } + +fn extract_tar_xz(dir_path: &str) -> Result <(), String> { + let dir = Path::new(dir_path); + + let tar_xz_name = match dir.file_name() { + Some(name) => { + let mut archive_name = PathBuf::from(name); + archive_name.set_extension("tar.xz"); + archive_name + } + None => return Err("Failed to compute archive name from directory path.".to_string()), + }; + + let archive_path = dir.parent().unwrap_or_else(|| Path::new(".")).join(&tar_xz_name); + + if !archive_path.exists() { + return Err(format!("Archive not found: {:?}", archive_path)); + } + + let output_5 = Command::new("tar") + .arg("-xf") + .arg(&archive_path) + .arg("--strip-components=5") // Strips the leading directory components + .arg("-C") + .arg(dir.parent().unwrap_or_else(|| Path::new("."))) + .output() + .map_err(|e| format!("Failed to execute tar command stripping 5: {}", e))?; + + if !output_5.status.success() { + let output_3 = Command::new("tar") + .arg("-xf") + .arg(&archive_path) + .arg("--strip-components=3") // Strips the leading directory components + .arg("-C") + .arg(dir.parent().unwrap_or_else(|| Path::new("."))) + .output() + .map_err(|e| format!("Failed to execute tar command stripping 3: {}", e))?; + + if !output_3.status.success() { + + return Err(format!( + "tar command failed with error: {}", + String::from_utf8_lossy(&output_3.stderr) + )); + } + } + + Ok(()) +} diff --git a/src/results.rs b/src/results.rs index 43854ef..75b22fc 100644 --- a/src/results.rs +++ b/src/results.rs @@ -1,8 +1,9 @@ use thiserror::Error; use log::{debug, info, warn}; +use std::cmp::Ordering; use serde::{Serialize, Deserialize}; use csv::{Reader}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::fs::File; use std::io::{self, BufRead, BufReader, Write}; use std::fs; @@ -80,26 +81,17 @@ struct PerfRow { iteration: usize, } -#[derive(Debug)] -struct Statistics { - nb_core: i32, - nb_ops_per_core: i32, - task: String, - mean: f64, - variance: f64, - std_dev: f64, - coeff_variation: f64, -} /// Creates an aggregation of perf___ into corresponding perf_alone__.csv file -fn aggregate_perf(raw_perf_results_file: &str) -> io::Result<()> { - debug!("Processing perf file '{}'", raw_perf_results_file); +fn aggregate_perf(raw_perf_results_file: PathBuf) -> io::Result<()> { + debug!("Processing perf file '{}'", raw_perf_results_file.display()); - let output_path = &format!("{}.csv", raw_perf_results_file); + let output_path = &format!("{}.csv", raw_perf_results_file.display()); fs::File::create(output_path)?; let mut output_writer = csv::Writer::from_path(output_path)?; - if let Some((nb_core, nb_ops_per_core)) = parse_perf_metadata(raw_perf_results_file) { + if let Some((nb_core, nb_ops_per_core)) = parse_perf_metadata(raw_perf_results_file.file_name().unwrap().to_str().unwrap()) { + debug!("{} metadata : nb_core {} ; nb_ops_per_core {}", raw_perf_results_file.display(), nb_core, nb_ops_per_core); let raw_perf_results_file = File::open(raw_perf_results_file)?; let reader = BufReader::new(raw_perf_results_file); let mut iteration = 1; @@ -112,15 +104,16 @@ fn aggregate_perf(raw_perf_results_file: &str) -> io::Result<()> { let line = line?; if line.contains("power/energy-cores/") { if let Some(value) = line.trim().split_whitespace().next() { - cores_joules = Some(value.parse::().unwrap_or_default()); + cores_joules = Some(value.replace(',', "").parse::().unwrap_or_default()); } } else if line.contains("power/energy-pkg/") { if let Some(value) = line.trim().split_whitespace().next() { - pkg_joules = Some(value.parse::().unwrap_or_default()); + pkg_joules = Some(value.replace(',', "").parse::().unwrap_or_default()); } } else if line.contains("power/energy-ram/") { if let Some(value) = line.trim().split_whitespace().next() { - ram_joules = Some(value.parse::().unwrap_or_default()); + + ram_joules = Some(value.replace(',', "").parse::().unwrap_or_default()); } } else if line.contains("seconds time elapsed") { if let Some(value) = line.trim().split_whitespace().next() { @@ -170,12 +163,16 @@ fn parse_perf_metadata(file_name: &str) -> Option<(String, String)> { fn parse_hwpc_metadata(dir_name: &str) -> Option<(i32, i32, usize)> { if let Some(dir_name) = Path::new(dir_name).file_name().and_then(|os_str| os_str.to_str()) { + debug!("Filename to parse is : {}", dir_name); let parts: Vec<&str> = dir_name.split('_').collect(); if parts.len() == 5 { + debug!("Is hwpc alone"); if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[2].parse::(), parts[3].parse::(), parts[4].parse::()) { + debug!("{:?}", Some((nb_core, nb_ops_per_core, iteration))); return Some((nb_core, nb_ops_per_core, iteration)); } } else if parts.len() == 6 { + debug!("Is hwpc with perf"); if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[3].parse::(), parts[4].parse::(), parts[5].parse::()) { return Some((nb_core, nb_ops_per_core, iteration)); } @@ -187,13 +184,26 @@ fn parse_hwpc_metadata(dir_name: &str) -> Option<(i32, i32, usize)> { } fn aggregate_hwpc_file(raw_rapl_file: &Path, output_path: &str, nb_core: i32, nb_ops_per_core: i32, iteration: usize) -> io::Result<()> { + debug!("Writing to hwpc aggregation file {:?}", output_path); let mut output_writer = csv::Writer::from_path(output_path)?; - let mut reader = csv::Reader::from_path(raw_rapl_file)?; - let iter = reader.deserialize::(); + debug!("Processing hwpc raw file {:?}", raw_rapl_file); + if let Ok(mut reader) = csv::Reader::from_path(raw_rapl_file) { + let iter = reader.deserialize::(); - for hwpc_row_raw in iter { - let hwpc_raw = HwpcRow::from_raw_record(hwpc_row_raw?, nb_core, nb_ops_per_core, iteration); - output_writer.serialize(hwpc_raw)?; + + for hwpc_row_raw in iter { + match hwpc_row_raw { + Ok(row_raw) => { + let hwpc_raw = HwpcRow::from_raw_record(row_raw, nb_core, nb_ops_per_core, iteration); + output_writer.serialize(hwpc_raw)?; + }, + Err(e) => { + warn!("Raw row malformed : {}", e); + } + } + } + } else { + warn!("Could not open {}", output_path); } Ok(()) } @@ -201,8 +211,8 @@ fn aggregate_hwpc_file(raw_rapl_file: &Path, output_path: &str, nb_core: i32, nb fn aggregate_hwpc_subdir(subdir: &fs::DirEntry, output_path: &str) -> io::Result<()> { let raw_rapl_file = subdir.path().join("rapl.csv"); - info!("Processing hwpc file {:?}", raw_rapl_file); - if let Some((nb_core, nb_ops_per_core, iteration)) = parse_hwpc_metadata(subdir.path().to_str().unwrap()) { + debug!("Processing hwpc aggregation file {:?}", raw_rapl_file); + if let Some((nb_core, nb_ops_per_core, iteration)) = parse_hwpc_metadata(subdir.file_name().to_str().unwrap()) { aggregate_hwpc_file(&raw_rapl_file, output_path, nb_core, nb_ops_per_core, iteration)?; } else { warn!("Could not parse metadata from directory name: {:?}", subdir); @@ -212,20 +222,20 @@ fn aggregate_hwpc_subdir(subdir: &fs::DirEntry, output_path: &str) -> io::Result /// Creates an aggregation of hwpc___ into corresponding hwpc___.csv file fn aggregate_hwpc( - raw_results_dir: &str, + raw_results_dir_path: PathBuf, ) -> io::Result<()> { - let raw_results_dir_path = Path::new(raw_results_dir); let (output_parent, output_basename) = (raw_results_dir_path.parent().unwrap(), raw_results_dir_path.file_name().unwrap()); let output_path = &format!("{}/{}.csv", output_parent.to_str().unwrap(), output_basename.to_str().unwrap()); + debug!("Output path : {}", output_path); let mut raw_results_subdirs = Vec::new(); - if let Ok(entries) = fs::read_dir(raw_results_dir) { + if let Ok(entries) = fs::read_dir(&raw_results_dir_path) { raw_results_subdirs = entries.filter(|entry| entry.as_ref().unwrap().file_type().unwrap().is_dir()).collect(); } else { - warn!("Could not find subdirectories in {} directory", raw_results_dir); + warn!("Could not find subdirectories in {} directory", output_parent.to_str().unwrap()); } debug!("Found {:?} subdirs", raw_results_subdirs); @@ -234,12 +244,57 @@ fn aggregate_hwpc( Ok(()) } -pub fn process_results() -> io::Result<()> { - let perf_raw_results_file = "test_results/lille/chiclet/chiclet-3/perf_alone_18_25"; - aggregate_perf(perf_raw_results_file)?; - let hwpc_raw_results_dir = "test_results/lille/chiclet/chiclet-3/hwpc_alone_18_25"; - aggregate_hwpc(hwpc_raw_results_dir)?; +fn filter_hwpc_dirs(directory: &str) -> Vec { + let mut filtered_files = Vec::new(); + + if let Ok(entries) = fs::read_dir(directory) { + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.is_dir() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if file_name.starts_with("hwpc") { + filtered_files.push(path); + } + } + } + } + } + } + + filtered_files +} + +fn filter_perf_files(directory: &str) -> Vec { + let mut filtered_files = Vec::new(); + + if let Ok(entries) = fs::read_dir(directory) { + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if file_name.starts_with("perf_") && !file_name.ends_with(".csv") { + filtered_files.push(path); + } + } + } + } + } + } + + filtered_files +} +pub fn process_results(results_directory: &str) -> io::Result<()> { + let perf_raw_files = filter_perf_files(results_directory); + debug!("Found perf files {:?} in {} directory", perf_raw_files, results_directory); + assert!(perf_raw_files.iter().map(|perf_raw_file| aggregate_perf(perf_raw_file.to_path_buf())).all(|result| result.is_ok())); + + let hwpc_raw_dirs = filter_hwpc_dirs(results_directory); + debug!("Found hwpc subdirs {:?} in {} directory", hwpc_raw_dirs, results_directory); + assert!(hwpc_raw_dirs.iter().map(|hwpc_raw_results_dir| aggregate_hwpc(hwpc_raw_results_dir.to_path_buf())).all(|result| result.is_ok())); + Ok(()) } From 6c566f0d21fa4345a6a5ddcff2de6a3d2cf3045a Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Wed, 4 Dec 2024 10:51:36 +0100 Subject: [PATCH 06/11] fix(Cargo.toml) : Add csv crate --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 06fc9c3..2958992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ tar = "0.4.43" bytes = "1.8.0" subprocess = "0.2.9" clap = { version = "4.5.21", features = ["derive"] } +csv = "1.3.1" From 118880c105bb7d3de8bb152c73478a4e74681e1e Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Wed, 4 Dec 2024 10:59:11 +0100 Subject: [PATCH 07/11] fix(main) : Remove process_results function as it is on job level now --- src/main.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index f662b52..ab23006 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ use std::{fmt, fs, time::Duration}; use thiserror::Error; const SUPPORTED_PROCESSOR_VENDOR: &[&str; 3] = &["Intel", "AMD", "Cavium"]; -const SLEEP_CHECK_TIME_IN_SECONDES: u64 = 300; +const SLEEP_CHECK_TIME_IN_SECONDES: u64 = 900; const BASE_URL: &str = "https://api.grid5000.fr/stable"; // URL de base de l'API const LOGS_DIRECTORY: &str = "logs.d"; const INVENTORIES_DIRECTORY: &str = "inventories.d"; @@ -250,7 +250,7 @@ fn load_events_config(config_file: &str) -> Result Result { if std::path::Path::new(jobs_file).exists() { - info!("Found jobs.yaml file, processing with existing jobs"); + info!("Found {} file, processing with existing jobs", jobs_file); let content = fs::read_to_string(jobs_file)?; Ok(serde_yaml::from_str(&content)?) } else { @@ -319,8 +319,6 @@ async fn main() -> Result<(), BenchmarkError> { } - results::process_results()?; - Ok(()) } From 01823e7601257b00fa4956dd57565062af0c230c Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Fri, 13 Dec 2024 15:19:51 +0100 Subject: [PATCH 08/11] chores(inventories) : Better error handling in case of request blocked in case of spam --- src/inventories.rs | 52 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/src/inventories.rs b/src/inventories.rs index 3092697..f07e6a8 100644 --- a/src/inventories.rs +++ b/src/inventories.rs @@ -18,6 +18,8 @@ pub enum InventoryError { JsonParse(#[from] serde_json::Error), #[error("I/O error: {0}")] Io(#[from] std::io::Error), + #[error("The requested resource is blacklisted.")] + Blacklisted, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -27,6 +29,7 @@ pub struct Node { pub exotic: bool, pub processor: Processor, pub architecture: Architecture, + pub operating_system: Option, pub supported_job_types: SupportedJobTypes, } @@ -116,6 +119,16 @@ pub struct Processor { pub version: StrOrFloat, // Is sometimes another type, like f64, and then panic } +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct OperatingSystem { + cstate_driver: String, + cstate_governor: String, + pstate_driver: String, + pstate_governor: String, + turboboost_enabled: bool +} + + #[derive(Deserialize, Debug)] struct Cluster { uid: String, @@ -147,23 +160,25 @@ async fn fetch_clusters( Ok(clusters) } -async fn fetch_nodes( +pub async fn fetch_nodes( client: &Client, base_url: &str, site_uid: &str, cluster_uid: &str, ) -> Result, InventoryError> { - let response = get_api_call( - client, - &format!( - "{}/sites/{}/clusters/{}/nodes", - base_url, site_uid, cluster_uid - ), - ) - .await - .unwrap(); - let nodes: Vec = serde_json::from_value(response.get("items").unwrap().clone())?; - Ok(nodes) + if let Ok(response) = get_api_call( + client, + &format!( + "{}/sites/{}/clusters/{}/nodes", + base_url, site_uid, cluster_uid + ), + ) + .await { + let nodes: Vec = serde_json::from_value(response.get("items").unwrap().clone())?; + Ok(nodes) + } else { + Err(InventoryError::Blacklisted) + } } pub async fn get_api_call( @@ -180,12 +195,17 @@ pub async fn get_api_call( .get(endpoint) .basic_auth(username, Some(password)) .send() - .await - .unwrap() - .json() .await; + let response_json = match response { + Ok(response_body) => { + response_body + .json() + .await + }, + Err(e) => Err(e) + }; - match response { + match response_json { Ok(json) => Ok(json), Err(e) => Err(InventoryError::HttpRequest(e)), } From 6528401629965cfc518c2378fbff79cb327c5d0d Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Fri, 13 Dec 2024 15:22:17 +0100 Subject: [PATCH 09/11] chores(main) : Add draft of a method to update statefull data --- src/main.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/main.rs b/src/main.rs index ab23006..265d270 100644 --- a/src/main.rs +++ b/src/main.rs @@ -284,6 +284,19 @@ async fn main() -> Result<(), BenchmarkError> { if ! benchmark_args.inventory_skip { info!("Processing inventory step"); inventories::generate_inventory(&benchmark_args.inventories_directory).await?; + /* + * This code shall be extracted into a proper utilitary function + let client = reqwest::Client::builder().build()?; + for job in jobs.jobs.iter_mut() { + debug!("Updating node info for node {} on job {}", job.node.uid, job.id); + if let Err(_job_update_status) = job.update_node(&client, BASE_URL).await { + break + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + jobs.dump_to_file(JOBS_FILE)?; + */ + } else { info!("Skipping inventory scrapping as requested"); } From e4dd3708e1f8145013a2b28cc752cd584a99721f Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Fri, 13 Dec 2024 15:23:17 +0100 Subject: [PATCH 10/11] fix(script) : Delete a repo for g5k-docker-setup as it is broken --- src/jobs.rs | 21 ++++++++++++++++++--- src/results.rs | 1 - templates/install_packages.sh | 2 ++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index a834ffb..dcdc330 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -230,6 +230,21 @@ impl Job { Ok(()) } + pub async fn update_node(&mut self, client: &reqwest::Client, base_url: &str) -> JobResult { + + let cluster = self.node.cluster.clone().unwrap(); + if let Ok(nodes) = inventories::fetch_nodes(&client, base_url, &self.site, &cluster).await { + + let node: Node = nodes.into_iter().find(|node| node.uid == self.node.uid).unwrap(); + + debug!("Cluster : {} ; Node : {} ; os : {:?}", cluster, node.uid, node.operating_system); + self.node = node; + } else { + warn!("Could not gather nodes"); + } + Ok(()) + } + } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -316,8 +331,8 @@ impl Jobs { job.submit_job().await?; self.jobs.push(job); info!("Job submitted for {} node", node_uid); - info!("Wait 1 secondes before another submission"); - tokio::time::sleep(Duration::from_secs(1)).await; + info!("Wait 100 ms before another submission"); + tokio::time::sleep(Duration::from_millis(100)).await; self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file) .await?; @@ -354,7 +369,7 @@ impl Jobs { job.oar_job_id, job.state ); } - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } self.dump_to_file(file_to_dump_to)?; diff --git a/src/results.rs b/src/results.rs index 75b22fc..5057e81 100644 --- a/src/results.rs +++ b/src/results.rs @@ -8,7 +8,6 @@ use std::fs::File; use std::io::{self, BufRead, BufReader, Write}; use std::fs; use std::collections::HashMap; -use plotters::prelude::*; #[derive(Error, Debug)] pub enum ResultError { diff --git a/templates/install_packages.sh b/templates/install_packages.sh index 0b3f622..ea4d85c 100644 --- a/templates/install_packages.sh +++ b/templates/install_packages.sh @@ -1,4 +1,6 @@ sudo-g5k apt-get install -y stress-ng + +sudo rm -f /etc/apt/sources.list.d/repo.radeon.com-amdgpu.list g5k-setup-docker docker login -u {{ docker_hub_username }} -p {{ docker_hub_token }} docker run --rm -d --name mongo -p 27017:27017 mongo:latest From 90853272608ef09c27ec700f35500069802f9172 Mon Sep 17 00:00:00 2001 From: Inkedstinct Date: Fri, 13 Dec 2024 15:27:01 +0100 Subject: [PATCH 11/11] doc(main) : Add results processing description --- .gitmodules | 3 --- 674ebc02e125e194eeda1bb9 | 1 - README.md | 1 + 3 files changed, 1 insertion(+), 4 deletions(-) delete mode 160000 674ebc02e125e194eeda1bb9 diff --git a/.gitmodules b/.gitmodules index 352957d..e69de29 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "674ebc02e125e194eeda1bb9"] - path = 674ebc02e125e194eeda1bb9 - url = https://git@git.overleaf.com/674ebc02e125e194eeda1bb9 diff --git a/674ebc02e125e194eeda1bb9 b/674ebc02e125e194eeda1bb9 deleted file mode 160000 index 969ad3c..0000000 --- a/674ebc02e125e194eeda1bb9 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 969ad3c9498d7a84dce895208d09a8c597e97db2 diff --git a/README.md b/README.md index 68d60a8..933526c 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ This repository contains the source code for generating and running benchmarks f - The status of each submitted job is tracked until it completes (either successfully or in a failed state). - Upon completion, **rsync** is used to retrieve the results files locally. If the retrieval fails, the job’s state is marked as `UnknownState` for manual review. 6. **Store Results**: Once all filtered nodes have completed their benchmark jobs, the benchmarking process concludes, and all result files are stored in the `/results.d` directory. +7. **Processe Results**: Once a job reaches a terminal state (likely Terminated or Failed), aggregates all files into proper CSVs. Formats can be found in [the results source code](./src/results), structures provides it. ## Why it exists.