Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .gitmodules
Empty file.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ tar = "0.4.43"
bytes = "1.8.0"
subprocess = "0.2.9"
clap = { version = "4.5.21", features = ["derive"] }
csv = "1.3.1"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This repository contains the source code for generating and running benchmarks f
- The status of each submitted job is tracked until it completes (either successfully or in a failed state).
- Upon completion, **rsync** is used to retrieve the results files locally. If the retrieval fails, the job’s state is marked as `UnknownState` for manual review.
6. **Store Results**: Once all filtered nodes have completed their benchmark jobs, the benchmarking process concludes, and all result files are stored in the `/results.d` directory.
7. **Processe Results**: Once a job reaches a terminal state (likely Terminated or Failed), aggregates all files into proper CSVs. Formats can be found in [the results source code](./src/results), structures provides it.

## Why it exists.

Expand Down
52 changes: 36 additions & 16 deletions src/inventories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum InventoryError {
JsonParse(#[from] serde_json::Error),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("The requested resource is blacklisted.")]
Blacklisted,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand All @@ -27,6 +29,7 @@ pub struct Node {
pub exotic: bool,
pub processor: Processor,
pub architecture: Architecture,
pub operating_system: Option<OperatingSystem>,
pub supported_job_types: SupportedJobTypes,
}

Expand Down Expand Up @@ -116,6 +119,16 @@ pub struct Processor {
pub version: StrOrFloat, // Is sometimes another type, like f64, and then panic
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct OperatingSystem {
cstate_driver: String,
cstate_governor: String,
pstate_driver: String,
pstate_governor: String,
turboboost_enabled: bool
}


#[derive(Deserialize, Debug)]
struct Cluster {
uid: String,
Expand Down Expand Up @@ -147,23 +160,25 @@ async fn fetch_clusters(
Ok(clusters)
}

async fn fetch_nodes(
pub async fn fetch_nodes(
client: &Client,
base_url: &str,
site_uid: &str,
cluster_uid: &str,
) -> Result<Vec<Node>, InventoryError> {
let response = get_api_call(
client,
&format!(
"{}/sites/{}/clusters/{}/nodes",
base_url, site_uid, cluster_uid
),
)
.await
.unwrap();
let nodes: Vec<Node> = serde_json::from_value(response.get("items").unwrap().clone())?;
Ok(nodes)
if let Ok(response) = get_api_call(
client,
&format!(
"{}/sites/{}/clusters/{}/nodes",
base_url, site_uid, cluster_uid
),
)
.await {
let nodes: Vec<Node> = serde_json::from_value(response.get("items").unwrap().clone())?;
Ok(nodes)
} else {
Err(InventoryError::Blacklisted)
}
}

pub async fn get_api_call(
Expand All @@ -180,12 +195,17 @@ pub async fn get_api_call(
.get(endpoint)
.basic_auth(username, Some(password))
.send()
.await
.unwrap()
.json()
.await;
let response_json = match response {
Ok(response_body) => {
response_body
.json()
.await
},
Err(e) => Err(e)
};

match response {
match response_json {
Ok(json) => Ok(json),
Err(e) => Err(InventoryError::HttpRequest(e)),
}
Expand Down
98 changes: 86 additions & 12 deletions src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ use crate::configs;
use crate::inventories::{self, Node};
use crate::scripts;
use crate::ssh;
use log::{debug, error, info};
use crate::results;
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use serde_yaml::{self};
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::str::{self};
use std::time::Duration;
use std::{env, fs};
use std::path::{Path, PathBuf};
use subprocess::{Popen, PopenConfig, Redirection};
use thiserror::Error;
use std::process::Command;

const MAX_CONCURRENT_JOBS: usize = 20;

Expand Down Expand Up @@ -111,29 +114,29 @@ pub struct Job {
}

impl Job {
fn build_script_file_path(node: &Node, site: &str) -> String {
fn build_script_file_path(node: &Node, site: &str, root_scripts_dir: &str) -> String {
format!(
"{}/{}/{}/{}.sh",
super::SCRIPTS_DIRECTORY,
root_scripts_dir,
site,
node.cluster.as_ref().unwrap(),
node.uid
)
}

fn build_results_dir_path(node: &Node, site: &str) -> String {
fn build_results_dir_path(node: &Node, site: &str, root_results_dir: &str) -> String {
format!(
"{}/{}/{}/{}",
super::RESULTS_DIRECTORY,
root_results_dir,
site,
node.cluster.as_ref().unwrap(),
node.uid
)
}

fn new(id: usize, node: Node, core_values: Vec<u32>, site: String) -> Self {
let script_file = Job::build_script_file_path(&node, &site);
let results_dir = Job::build_results_dir_path(&node, &site);
fn new(id: usize, node: Node, core_values: Vec<u32>, site: String, root_scripts_dir: &str, root_results_dir: &str) -> Self {
let script_file = Job::build_script_file_path(&node, &site, root_scripts_dir);
let results_dir = Job::build_results_dir_path(&node, &site, root_results_dir);

Job {
id,
Expand Down Expand Up @@ -217,6 +220,27 @@ impl Job {
&self.node.uid,
) {
self.state = OARState::UnknownState;
} else {
if let Ok(_extracted) = extract_tar_xz(&self.results_dir) {
results::process_results(&self.results_dir)?;
} else {
warn!("Could not extract tar");
}
}
Ok(())
}

pub async fn update_node(&mut self, client: &reqwest::Client, base_url: &str) -> JobResult {

let cluster = self.node.cluster.clone().unwrap();
if let Ok(nodes) = inventories::fetch_nodes(&client, base_url, &self.site, &cluster).await {

let node: Node = nodes.into_iter().find(|node| node.uid == self.node.uid).unwrap();

debug!("Cluster : {} ; Node : {} ; os : {:?}", cluster, node.uid, node.operating_system);
self.node = node;
} else {
warn!("Could not gather nodes");
}
Ok(())
}
Expand Down Expand Up @@ -296,7 +320,7 @@ impl Jobs {
let core_values =
configs::generate_core_values(5, node.architecture.nb_cores);
let mut job =
Job::new(self.jobs.len(), node.clone(), core_values, site.to_string());
Job::new(self.jobs.len(), node.clone(), core_values, site.to_string(), scripts_dir, results_dir);
fs::create_dir_all(
std::path::Path::new(&job.script_file).parent().unwrap(),
)?;
Expand All @@ -307,16 +331,16 @@ impl Jobs {
job.submit_job().await?;
self.jobs.push(job);
info!("Job submitted for {} node", node_uid);
info!("Wait 1 secondes before another submission");
tokio::time::sleep(Duration::from_secs(1)).await;
info!("Wait 100 ms before another submission");
tokio::time::sleep(Duration::from_millis(100)).await;

self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file)
.await?;

// Throttling based on the maximum allowed concurrent jobs
} else {
info!("Job already listed on {} node, skipping", node_uid);
tokio::time::sleep(Duration::from_millis(300)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
Expand Down Expand Up @@ -345,6 +369,7 @@ impl Jobs {
job.oar_job_id, job.state
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}

self.dump_to_file(file_to_dump_to)?;
Expand Down Expand Up @@ -424,3 +449,52 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str) -> JobResult {

Ok(())
}

fn extract_tar_xz(dir_path: &str) -> Result <(), String> {
let dir = Path::new(dir_path);

let tar_xz_name = match dir.file_name() {
Some(name) => {
let mut archive_name = PathBuf::from(name);
archive_name.set_extension("tar.xz");
archive_name
}
None => return Err("Failed to compute archive name from directory path.".to_string()),
};

let archive_path = dir.parent().unwrap_or_else(|| Path::new(".")).join(&tar_xz_name);

if !archive_path.exists() {
return Err(format!("Archive not found: {:?}", archive_path));
}

let output_5 = Command::new("tar")
.arg("-xf")
.arg(&archive_path)
.arg("--strip-components=5") // Strips the leading directory components
.arg("-C")
.arg(dir.parent().unwrap_or_else(|| Path::new(".")))
.output()
.map_err(|e| format!("Failed to execute tar command stripping 5: {}", e))?;

if !output_5.status.success() {
let output_3 = Command::new("tar")
.arg("-xf")
.arg(&archive_path)
.arg("--strip-components=3") // Strips the leading directory components
.arg("-C")
.arg(dir.parent().unwrap_or_else(|| Path::new(".")))
.output()
.map_err(|e| format!("Failed to execute tar command stripping 3: {}", e))?;

if !output_3.status.success() {

return Err(format!(
"tar command failed with error: {}",
String::from_utf8_lossy(&output_3.stderr)
));
}
}

Ok(())
}
Loading
Loading