diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..e69de29 diff --git a/Cargo.toml b/Cargo.toml index 06fc9c3..2958992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ tar = "0.4.43" bytes = "1.8.0" subprocess = "0.2.9" clap = { version = "4.5.21", features = ["derive"] } +csv = "1.3.1" diff --git a/README.md b/README.md index 68d60a8..933526c 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ This repository contains the source code for generating and running benchmarks f - The status of each submitted job is tracked until it completes (either successfully or in a failed state). - Upon completion, **rsync** is used to retrieve the results files locally. If the retrieval fails, the job’s state is marked as `UnknownState` for manual review. 6. **Store Results**: Once all filtered nodes have completed their benchmark jobs, the benchmarking process concludes, and all result files are stored in the `/results.d` directory. +7. **Processe Results**: Once a job reaches a terminal state (likely Terminated or Failed), aggregates all files into proper CSVs. Formats can be found in [the results source code](./src/results), structures provides it. ## Why it exists. diff --git a/src/inventories.rs b/src/inventories.rs index 3092697..f07e6a8 100644 --- a/src/inventories.rs +++ b/src/inventories.rs @@ -18,6 +18,8 @@ pub enum InventoryError { JsonParse(#[from] serde_json::Error), #[error("I/O error: {0}")] Io(#[from] std::io::Error), + #[error("The requested resource is blacklisted.")] + Blacklisted, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -27,6 +29,7 @@ pub struct Node { pub exotic: bool, pub processor: Processor, pub architecture: Architecture, + pub operating_system: Option, pub supported_job_types: SupportedJobTypes, } @@ -116,6 +119,16 @@ pub struct Processor { pub version: StrOrFloat, // Is sometimes another type, like f64, and then panic } +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct OperatingSystem { + cstate_driver: String, + cstate_governor: String, + pstate_driver: String, + pstate_governor: String, + turboboost_enabled: bool +} + + #[derive(Deserialize, Debug)] struct Cluster { uid: String, @@ -147,23 +160,25 @@ async fn fetch_clusters( Ok(clusters) } -async fn fetch_nodes( +pub async fn fetch_nodes( client: &Client, base_url: &str, site_uid: &str, cluster_uid: &str, ) -> Result, InventoryError> { - let response = get_api_call( - client, - &format!( - "{}/sites/{}/clusters/{}/nodes", - base_url, site_uid, cluster_uid - ), - ) - .await - .unwrap(); - let nodes: Vec = serde_json::from_value(response.get("items").unwrap().clone())?; - Ok(nodes) + if let Ok(response) = get_api_call( + client, + &format!( + "{}/sites/{}/clusters/{}/nodes", + base_url, site_uid, cluster_uid + ), + ) + .await { + let nodes: Vec = serde_json::from_value(response.get("items").unwrap().clone())?; + Ok(nodes) + } else { + Err(InventoryError::Blacklisted) + } } pub async fn get_api_call( @@ -180,12 +195,17 @@ pub async fn get_api_call( .get(endpoint) .basic_auth(username, Some(password)) .send() - .await - .unwrap() - .json() .await; + let response_json = match response { + Ok(response_body) => { + response_body + .json() + .await + }, + Err(e) => Err(e) + }; - match response { + match response_json { Ok(json) => Ok(json), Err(e) => Err(InventoryError::HttpRequest(e)), } diff --git a/src/jobs.rs b/src/jobs.rs index ed54673..dcdc330 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -3,7 +3,8 @@ use crate::configs; use crate::inventories::{self, Node}; use crate::scripts; use crate::ssh; -use log::{debug, error, info}; +use crate::results; +use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use serde_yaml::{self}; use std::collections::HashMap; @@ -11,8 +12,10 @@ use std::fmt::{self, Display}; use std::str::{self}; use std::time::Duration; use std::{env, fs}; +use std::path::{Path, PathBuf}; use subprocess::{Popen, PopenConfig, Redirection}; use thiserror::Error; +use std::process::Command; const MAX_CONCURRENT_JOBS: usize = 20; @@ -111,29 +114,29 @@ pub struct Job { } impl Job { - fn build_script_file_path(node: &Node, site: &str) -> String { + fn build_script_file_path(node: &Node, site: &str, root_scripts_dir: &str) -> String { format!( "{}/{}/{}/{}.sh", - super::SCRIPTS_DIRECTORY, + root_scripts_dir, site, node.cluster.as_ref().unwrap(), node.uid ) } - fn build_results_dir_path(node: &Node, site: &str) -> String { + fn build_results_dir_path(node: &Node, site: &str, root_results_dir: &str) -> String { format!( "{}/{}/{}/{}", - super::RESULTS_DIRECTORY, + root_results_dir, site, node.cluster.as_ref().unwrap(), node.uid ) } - fn new(id: usize, node: Node, core_values: Vec, site: String) -> Self { - let script_file = Job::build_script_file_path(&node, &site); - let results_dir = Job::build_results_dir_path(&node, &site); + fn new(id: usize, node: Node, core_values: Vec, site: String, root_scripts_dir: &str, root_results_dir: &str) -> Self { + let script_file = Job::build_script_file_path(&node, &site, root_scripts_dir); + let results_dir = Job::build_results_dir_path(&node, &site, root_results_dir); Job { id, @@ -217,6 +220,27 @@ impl Job { &self.node.uid, ) { self.state = OARState::UnknownState; + } else { + if let Ok(_extracted) = extract_tar_xz(&self.results_dir) { + results::process_results(&self.results_dir)?; + } else { + warn!("Could not extract tar"); + } + } + Ok(()) + } + + pub async fn update_node(&mut self, client: &reqwest::Client, base_url: &str) -> JobResult { + + let cluster = self.node.cluster.clone().unwrap(); + if let Ok(nodes) = inventories::fetch_nodes(&client, base_url, &self.site, &cluster).await { + + let node: Node = nodes.into_iter().find(|node| node.uid == self.node.uid).unwrap(); + + debug!("Cluster : {} ; Node : {} ; os : {:?}", cluster, node.uid, node.operating_system); + self.node = node; + } else { + warn!("Could not gather nodes"); } Ok(()) } @@ -296,7 +320,7 @@ impl Jobs { let core_values = configs::generate_core_values(5, node.architecture.nb_cores); let mut job = - Job::new(self.jobs.len(), node.clone(), core_values, site.to_string()); + Job::new(self.jobs.len(), node.clone(), core_values, site.to_string(), scripts_dir, results_dir); fs::create_dir_all( std::path::Path::new(&job.script_file).parent().unwrap(), )?; @@ -307,8 +331,8 @@ impl Jobs { job.submit_job().await?; self.jobs.push(job); info!("Job submitted for {} node", node_uid); - info!("Wait 1 secondes before another submission"); - tokio::time::sleep(Duration::from_secs(1)).await; + info!("Wait 100 ms before another submission"); + tokio::time::sleep(Duration::from_millis(100)).await; self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file) .await?; @@ -316,7 +340,7 @@ impl Jobs { // Throttling based on the maximum allowed concurrent jobs } else { info!("Job already listed on {} node, skipping", node_uid); - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } } } @@ -345,6 +369,7 @@ impl Jobs { job.oar_job_id, job.state ); } + tokio::time::sleep(Duration::from_millis(100)).await; } self.dump_to_file(file_to_dump_to)?; @@ -424,3 +449,52 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str) -> JobResult { Ok(()) } + +fn extract_tar_xz(dir_path: &str) -> Result <(), String> { + let dir = Path::new(dir_path); + + let tar_xz_name = match dir.file_name() { + Some(name) => { + let mut archive_name = PathBuf::from(name); + archive_name.set_extension("tar.xz"); + archive_name + } + None => return Err("Failed to compute archive name from directory path.".to_string()), + }; + + let archive_path = dir.parent().unwrap_or_else(|| Path::new(".")).join(&tar_xz_name); + + if !archive_path.exists() { + return Err(format!("Archive not found: {:?}", archive_path)); + } + + let output_5 = Command::new("tar") + .arg("-xf") + .arg(&archive_path) + .arg("--strip-components=5") // Strips the leading directory components + .arg("-C") + .arg(dir.parent().unwrap_or_else(|| Path::new("."))) + .output() + .map_err(|e| format!("Failed to execute tar command stripping 5: {}", e))?; + + if !output_5.status.success() { + let output_3 = Command::new("tar") + .arg("-xf") + .arg(&archive_path) + .arg("--strip-components=3") // Strips the leading directory components + .arg("-C") + .arg(dir.parent().unwrap_or_else(|| Path::new("."))) + .output() + .map_err(|e| format!("Failed to execute tar command stripping 3: {}", e))?; + + if !output_3.status.success() { + + return Err(format!( + "tar command failed with error: {}", + String::from_utf8_lossy(&output_3.stderr) + )); + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 07325fc..265d270 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ use std::{fmt, fs, time::Duration}; use thiserror::Error; const SUPPORTED_PROCESSOR_VENDOR: &[&str; 3] = &["Intel", "AMD", "Cavium"]; -const SLEEP_CHECK_TIME_IN_SECONDES: u64 = 300; +const SLEEP_CHECK_TIME_IN_SECONDES: u64 = 900; const BASE_URL: &str = "https://api.grid5000.fr/stable"; // URL de base de l'API const LOGS_DIRECTORY: &str = "logs.d"; const INVENTORIES_DIRECTORY: &str = "inventories.d"; @@ -35,7 +35,41 @@ const CONFIG_FILE: &str = "config/events_by_vendor.json"; struct BenchmarkArgs { /// Skip the scrapping against Grid5000 API refreshing node configurations #[arg(short, long)] - skip_inventory: bool, + inventory_skip: bool, + + /// Skip the jobs generation/submission step + #[arg(short, long)] + jobs_skip: bool, + + /// Skip the results processing step + #[arg(short, long)] + results_process_skip: bool, + + /// Directory to store logs + #[arg(long, default_value = LOGS_DIRECTORY)] + logs_directory: String, + + /// Directory to store nodes metadata + #[arg(long, default_value = INVENTORIES_DIRECTORY)] + inventories_directory: String, + + /// File to store OAR jobs info + #[arg(long, default_value = JOBS_FILE)] + jobs_file: String, + + /// Directory to store generated scripts + #[arg(long, default_value = SCRIPTS_DIRECTORY)] + scripts_directory: String, + + /// Directory to store results / retrieve results to process + #[arg(long, default_value = RESULTS_DIRECTORY)] + results_directory: String, + + /// File to find events / process for hwpc and perf + #[arg(long, default_value = CONFIG_FILE)] + config_file: String, + + } @@ -65,6 +99,14 @@ pub struct HwpcEvents { core: Vec, } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +enum PerfEvent { + PowerEnergyPkg, + PowerEnergyDram, + PowerEnergyPsys, + PowerEnergyCores, +} + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct PerfEvents(Vec); impl Display for PerfEvents { @@ -161,12 +203,12 @@ impl EventsByVendor { } // Creates all directories if not already existing -fn init_directories() -> BenchmarkResult { +fn init_directories(logs_directory: &str, inventories_directory: &str, scripts_directory: &str, results_directory: &str) -> BenchmarkResult { let directories = [ - LOGS_DIRECTORY, - INVENTORIES_DIRECTORY, - SCRIPTS_DIRECTORY, - RESULTS_DIRECTORY, + logs_directory, + inventories_directory, + scripts_directory, + results_directory, ]; for dir in directories { @@ -200,16 +242,16 @@ fn build_logger(log_level: &str) -> Result<(), log::SetLoggerError> { .try_init() } -fn load_events_config() -> Result { - let content = fs::read_to_string(CONFIG_FILE)?; +fn load_events_config(config_file: &str) -> Result { + let content = fs::read_to_string(config_file)?; let events: EventsByVendor = serde_json::from_str(&content)?; Ok(events) } -fn load_or_init_jobs() -> Result { - if std::path::Path::new(JOBS_FILE).exists() { - info!("Found jobs.yaml file, processing with existing jobs"); - let content = fs::read_to_string(JOBS_FILE)?; +fn load_or_init_jobs(jobs_file: &str) -> Result { + if std::path::Path::new(jobs_file).exists() { + info!("Found {} file, processing with existing jobs", jobs_file); + let content = fs::read_to_string(jobs_file)?; Ok(serde_yaml::from_str(&content)?) } else { info!("No jobs.yaml file found, starting with an empty job list"); @@ -229,38 +271,67 @@ async fn main() -> Result<(), BenchmarkError> { - init_directories()?; + init_directories( + &benchmark_args.logs_directory, + &benchmark_args.inventories_directory, + &benchmark_args.scripts_directory, + &benchmark_args.results_directory, + )?; - let events_by_vendor = load_events_config()?; - let mut jobs: Jobs = load_or_init_jobs()?; + let events_by_vendor = load_events_config(&benchmark_args.config_file)?; + let mut jobs: Jobs = load_or_init_jobs(&benchmark_args.jobs_file)?; - if ! benchmark_args.skip_inventory { - inventories::generate_inventory(INVENTORIES_DIRECTORY).await?; - } - // If we loaded existing jobs, check their status - if jobs.jobs.len() != 0 { + if ! benchmark_args.inventory_skip { + info!("Processing inventory step"); + inventories::generate_inventory(&benchmark_args.inventories_directory).await?; + /* + * This code shall be extracted into a proper utilitary function let client = reqwest::Client::builder().build()?; - jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) - .await?; + for job in jobs.jobs.iter_mut() { + debug!("Updating node info for node {} on job {}", job.node.uid, job.id); + if let Err(_job_update_status) = job.update_node(&client, BASE_URL).await { + break + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + jobs.dump_to_file(JOBS_FILE)?; + */ + + } else { + info!("Skipping inventory scrapping as requested"); } - jobs.generate_jobs( - JOBS_FILE, - INVENTORIES_DIRECTORY, - SCRIPTS_DIRECTORY, - RESULTS_DIRECTORY, - &events_by_vendor, - ) - .await?; - - let client = reqwest::Client::builder().build()?; - - while !jobs.job_is_done() { - debug!("Job not done!"); - jobs.check_unfinished_jobs(&client, BASE_URL, JOBS_FILE) - .await?; - tokio::time::sleep(Duration::from_secs(SLEEP_CHECK_TIME_IN_SECONDES)).await; + + if ! benchmark_args.jobs_skip { + info!("Processing jobs step"); + // If we loaded existing jobs, check their status + if jobs.jobs.len() != 0 { + let client = reqwest::Client::builder().build()?; + jobs.check_unfinished_jobs(&client, BASE_URL, &benchmark_args.jobs_file) + .await?; + } + + jobs.generate_jobs( + &benchmark_args.jobs_file, + &benchmark_args.inventories_directory, + &benchmark_args.scripts_directory, + &benchmark_args.results_directory, + &events_by_vendor, + ) + .await?; + + let client = reqwest::Client::builder().build()?; + + while !jobs.job_is_done() { + debug!("Job not done!"); + jobs.check_unfinished_jobs(&client, BASE_URL, &benchmark_args.jobs_file) + .await?; + tokio::time::sleep(Duration::from_secs(SLEEP_CHECK_TIME_IN_SECONDES)).await; + } + } else { + info!("Skipping jobs generation and submission as requested"); } + Ok(()) } diff --git a/src/results.rs b/src/results.rs index 01c6cde..5057e81 100644 --- a/src/results.rs +++ b/src/results.rs @@ -1,10 +1,300 @@ use thiserror::Error; +use log::{debug, info, warn}; +use std::cmp::Ordering; +use serde::{Serialize, Deserialize}; +use csv::{Reader}; +use std::path::{Path, PathBuf}; +use std::fs::File; +use std::io::{self, BufRead, BufReader, Write}; +use std::fs; +use std::collections::HashMap; #[derive(Error, Debug)] -pub enum ResultError {} +pub enum ResultError { + #[error("Could not parse CSV File : {0}")] + Csv(#[from] csv::Error), +} -#[derive()] -pub struct HwpcReport {} +#[derive(Debug, Deserialize, Serialize, PartialEq)] +struct HwpcRowRaw { + timestamp: i64, + sensor: String, + target: String, + socket: i32, + cpu: i32, + RAPL_ENERGY_PKG: Option, + RAPL_ENERGY_DRAM: Option, + RAPL_ENERGY_CORES: Option, + time_enabled: i64, + time_running: i64, +} + + + + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +struct HwpcRow { + timestamp: i64, + sensor: String, + target: String, + socket: i32, + cpu: i32, + rapl_energy_pkg: Option, + rapl_energy_dram: Option, + rapl_energy_cores: Option, + time_enabled: i64, + time_running: i64, + nb_core: i32, + nb_ops_per_core: i32, + iteration: usize +} + +impl HwpcRow { + fn from_raw_record(raw_record: HwpcRowRaw, nb_core: i32, nb_ops_per_core: i32, iteration: usize) -> Self { + HwpcRow { + timestamp: raw_record.timestamp, + sensor: raw_record.sensor, + target: raw_record.target, + socket: raw_record.socket, + cpu: raw_record.cpu, + rapl_energy_pkg: raw_record.RAPL_ENERGY_PKG, + rapl_energy_dram: raw_record.RAPL_ENERGY_DRAM, + rapl_energy_cores: raw_record.RAPL_ENERGY_CORES, + time_enabled: raw_record.time_enabled, + time_running: raw_record.time_running, + nb_core, + nb_ops_per_core, + iteration + } + } +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +struct PerfRow { + power_energy_pkg: Option, + power_energy_ram: Option, + power_energy_cores: Option, + time_elapsed: f64, + nb_core: i32, + nb_ops_per_core: i32, + iteration: usize, +} + + +/// Creates an aggregation of perf___ into corresponding perf_alone__.csv file +fn aggregate_perf(raw_perf_results_file: PathBuf) -> io::Result<()> { + debug!("Processing perf file '{}'", raw_perf_results_file.display()); + + let output_path = &format!("{}.csv", raw_perf_results_file.display()); + fs::File::create(output_path)?; + let mut output_writer = csv::Writer::from_path(output_path)?; + + if let Some((nb_core, nb_ops_per_core)) = parse_perf_metadata(raw_perf_results_file.file_name().unwrap().to_str().unwrap()) { + debug!("{} metadata : nb_core {} ; nb_ops_per_core {}", raw_perf_results_file.display(), nb_core, nb_ops_per_core); + let raw_perf_results_file = File::open(raw_perf_results_file)?; + let reader = BufReader::new(raw_perf_results_file); + let mut iteration = 1; + let mut cores_joules = None; + let mut pkg_joules = None; + let mut ram_joules = None; + let mut time_elapsed = None; + + for line in reader.lines() { + let line = line?; + if line.contains("power/energy-cores/") { + if let Some(value) = line.trim().split_whitespace().next() { + cores_joules = Some(value.replace(',', "").parse::().unwrap_or_default()); + } + } else if line.contains("power/energy-pkg/") { + if let Some(value) = line.trim().split_whitespace().next() { + pkg_joules = Some(value.replace(',', "").parse::().unwrap_or_default()); + } + } else if line.contains("power/energy-ram/") { + if let Some(value) = line.trim().split_whitespace().next() { + + ram_joules = Some(value.replace(',', "").parse::().unwrap_or_default()); + } + } else if line.contains("seconds time elapsed") { + if let Some(value) = line.trim().split_whitespace().next() { + time_elapsed = Some(value.parse::().unwrap_or_default()); + } + let perf_row = PerfRow { + power_energy_pkg: pkg_joules, + power_energy_ram: ram_joules, + power_energy_cores: cores_joules, + time_elapsed: time_elapsed.unwrap(), + nb_core: nb_core.parse::().unwrap(), + nb_ops_per_core: nb_ops_per_core.parse::().unwrap(), + iteration + }; + output_writer.serialize(perf_row)?; + iteration += 1; + cores_joules = None; + pkg_joules = None; + ram_joules = None; + time_elapsed = None; // Reset for the next iteration + } + } + } else { + warn!("Could not parse metadata from file name: {:?}", raw_perf_results_file); + } + + Ok(()) +} + +fn parse_perf_metadata(file_name: &str) -> Option<(String, String)> { + if let Some(file_name) = Path::new(file_name).file_name().and_then(|os_str| os_str.to_str()) { + let parts: Vec<&str> = file_name.split('_').collect(); + if parts.len() == 4 { + if let (Ok(nb_core), Ok(nb_ops_per_core)) = (parts[2].parse::(), parts[3].parse::()) { + return Some((nb_core.to_string(), nb_ops_per_core.to_string())); + } + } else if parts.len() == 5 { + if let (Ok(nb_core), Ok(nb_ops_per_core)) = (parts[3].parse::(), parts[4].parse::()) { + return Some((nb_core.to_string(), nb_ops_per_core.to_string())); + } + } + } else { + warn!("Could not parse filename {} to get metadata", file_name); + } + None +} + +fn parse_hwpc_metadata(dir_name: &str) -> Option<(i32, i32, usize)> { + if let Some(dir_name) = Path::new(dir_name).file_name().and_then(|os_str| os_str.to_str()) { + debug!("Filename to parse is : {}", dir_name); + let parts: Vec<&str> = dir_name.split('_').collect(); + if parts.len() == 5 { + debug!("Is hwpc alone"); + if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[2].parse::(), parts[3].parse::(), parts[4].parse::()) { + debug!("{:?}", Some((nb_core, nb_ops_per_core, iteration))); + return Some((nb_core, nb_ops_per_core, iteration)); + } + } else if parts.len() == 6 { + debug!("Is hwpc with perf"); + if let (Ok(nb_core), Ok(nb_ops_per_core), Ok(iteration)) = (parts[3].parse::(), parts[4].parse::(), parts[5].parse::()) { + return Some((nb_core, nb_ops_per_core, iteration)); + } + } + } else { + warn!("Could not parse filename {} to get metadata", dir_name); + } + None +} + +fn aggregate_hwpc_file(raw_rapl_file: &Path, output_path: &str, nb_core: i32, nb_ops_per_core: i32, iteration: usize) -> io::Result<()> { + debug!("Writing to hwpc aggregation file {:?}", output_path); + let mut output_writer = csv::Writer::from_path(output_path)?; + debug!("Processing hwpc raw file {:?}", raw_rapl_file); + if let Ok(mut reader) = csv::Reader::from_path(raw_rapl_file) { + let iter = reader.deserialize::(); + + + for hwpc_row_raw in iter { + match hwpc_row_raw { + Ok(row_raw) => { + let hwpc_raw = HwpcRow::from_raw_record(row_raw, nb_core, nb_ops_per_core, iteration); + output_writer.serialize(hwpc_raw)?; + }, + Err(e) => { + warn!("Raw row malformed : {}", e); + } + } + } + } else { + warn!("Could not open {}", output_path); + } + Ok(()) +} + +fn aggregate_hwpc_subdir(subdir: &fs::DirEntry, output_path: &str) -> io::Result<()> { + + let raw_rapl_file = subdir.path().join("rapl.csv"); + debug!("Processing hwpc aggregation file {:?}", raw_rapl_file); + if let Some((nb_core, nb_ops_per_core, iteration)) = parse_hwpc_metadata(subdir.file_name().to_str().unwrap()) { + aggregate_hwpc_file(&raw_rapl_file, output_path, nb_core, nb_ops_per_core, iteration)?; + } else { + warn!("Could not parse metadata from directory name: {:?}", subdir); + } + Ok(()) +} + +/// Creates an aggregation of hwpc___ into corresponding hwpc___.csv file +fn aggregate_hwpc( + raw_results_dir_path: PathBuf, +) -> io::Result<()> { + + let (output_parent, output_basename) = (raw_results_dir_path.parent().unwrap(), raw_results_dir_path.file_name().unwrap()); + let output_path = &format!("{}/{}.csv", output_parent.to_str().unwrap(), output_basename.to_str().unwrap()); + debug!("Output path : {}", output_path); + + + let mut raw_results_subdirs = Vec::new(); + + if let Ok(entries) = fs::read_dir(&raw_results_dir_path) { + raw_results_subdirs = entries.filter(|entry| entry.as_ref().unwrap().file_type().unwrap().is_dir()).collect(); + } else { + warn!("Could not find subdirectories in {} directory", output_parent.to_str().unwrap()); + } + + debug!("Found {:?} subdirs", raw_results_subdirs); + assert!(raw_results_subdirs.iter().map(|subdir| aggregate_hwpc_subdir(subdir.as_ref().unwrap(), output_path)).all(|result| result.is_ok())); + + Ok(()) +} + + +fn filter_hwpc_dirs(directory: &str) -> Vec { + let mut filtered_files = Vec::new(); + + if let Ok(entries) = fs::read_dir(directory) { + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.is_dir() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if file_name.starts_with("hwpc") { + filtered_files.push(path); + } + } + } + } + } + } + + filtered_files +} + +fn filter_perf_files(directory: &str) -> Vec { + let mut filtered_files = Vec::new(); + + if let Ok(entries) = fs::read_dir(directory) { + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if file_name.starts_with("perf_") && !file_name.ends_with(".csv") { + filtered_files.push(path); + } + } + } + } + } + } + + filtered_files +} +pub fn process_results(results_directory: &str) -> io::Result<()> { + let perf_raw_files = filter_perf_files(results_directory); + debug!("Found perf files {:?} in {} directory", perf_raw_files, results_directory); + assert!(perf_raw_files.iter().map(|perf_raw_file| aggregate_perf(perf_raw_file.to_path_buf())).all(|result| result.is_ok())); + + let hwpc_raw_dirs = filter_hwpc_dirs(results_directory); + debug!("Found hwpc subdirs {:?} in {} directory", hwpc_raw_dirs, results_directory); + assert!(hwpc_raw_dirs.iter().map(|hwpc_raw_results_dir| aggregate_hwpc(hwpc_raw_results_dir.to_path_buf())).all(|result| result.is_ok())); + + + Ok(()) +} -#[derive()] -pub struct PerfReport {} diff --git a/templates/install_packages.sh b/templates/install_packages.sh index 0b3f622..ea4d85c 100644 --- a/templates/install_packages.sh +++ b/templates/install_packages.sh @@ -1,4 +1,6 @@ sudo-g5k apt-get install -y stress-ng + +sudo rm -f /etc/apt/sources.list.d/repo.radeon.com-amdgpu.list g5k-setup-docker docker login -u {{ docker_hub_username }} -p {{ docker_hub_token }} docker run --rm -d --name mongo -p 27017:27017 mongo:latest