Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,11 @@ jobs:
- name: Rust Tests (Windows)
if: matrix.os == 'windows-x64'
run: |
cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude vortex-cuda --exclude vortex-nvcomp --exclude vortex-cub --exclude vortex-test-e2e-cuda --exclude duckdb-bench --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench --exclude xtask
cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench `
--exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude vortex-cuda `
--exclude vortex-nvcomp --exclude vortex-cub --exclude vortex-test-e2e-cuda --exclude duckdb-bench `
--exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench `
--exclude xtask --exclude vortex-datafusion
- name: Rust Tests (Other)
if: matrix.os != 'windows-x64'
run: cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude xtask
Expand Down
35 changes: 1 addition & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ url = "2.5.7"
uuid = { version = "1.19", features = ["js"] }
walkdir = "2.5.0"
wasm-bindgen-futures = "0.4.39"
witchcraft-metrics = "1.0.1"
xshell = "0.2.6"
xz2 = "0.1.7"
zigzag = "0.1.0"
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,9 @@ fn print_metrics(plans: &[(usize, Format, Arc<dyn ExecutionPlan>)]) {

eprintln!("metrics for query={query_idx}, {format}:");
for (scan_idx, metrics_set) in metric_sets.iter().enumerate() {
eprintln!(" scan[{scan_idx}]:");
for metric in metrics_set.clone().aggregate().sorted_for_display().iter() {
eprintln!(" {metric}");
eprintln!("\tscan[{scan_idx}]:");
for metric in metrics_set.aggregate().sorted_for_display().iter() {
eprintln!("\t\t{metric}");
}
}
}
Expand Down
202 changes: 146 additions & 56 deletions vortex-datafusion/src/persistent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Vortex table provider metrics.
use std::sync::Arc;
use std::time::Duration;

use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::source::DataSourceExec;
Expand All @@ -15,13 +16,16 @@ use datafusion_physical_plan::metrics::Gauge;
use datafusion_physical_plan::metrics::Label as DatafusionLabel;
use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::metrics::Time;
use vortex::error::VortexExpect;
use vortex::metrics::Label;
use vortex::metrics::Metric;
use vortex::metrics::MetricId;
use vortex::metrics::Tags;
use vortex::metrics::MetricValue;

use crate::persistent::source::VortexSource;

pub(crate) static PARTITION_LABEL: &str = "partition";
pub(crate) static PATH_LABEL: &str = "file_path";

/// Extracts datafusion metrics from all VortexExec instances in
/// a given physical plan.
Expand All @@ -43,9 +47,8 @@ impl ExecutionPlanVisitor for VortexMetricsFinder {
type Error = std::convert::Infallible;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
if let Some(metrics) = exec.metrics() {
self.0.push(metrics);
}
// Start with exec metrics or create a new set
let mut set = exec.metrics().unwrap_or_default();

// Include our own metrics from VortexSource
if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
Expand All @@ -54,92 +57,99 @@ impl ExecutionPlanVisitor for VortexMetricsFinder {
.as_any()
.downcast_ref::<VortexSource>()
{
let mut set = MetricsSet::new();
for metric in scan
.vx_metrics()
.metrics_registry()
.snapshot()
.iter()
.flat_map(|(id, metric)| metric_to_datafusion(id, metric))
.flat_map(metric_to_datafusion)
{
set.push(Arc::new(metric));
}

self.0.push(set);
}
}

Ok(true)
self.0.push(set);

Ok(false)
} else {
Ok(true)
}
}
}

fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
let (partition, labels) = tags_to_datafusion(id.tags());
metric_value_to_datafusion(id.name(), metric)
fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
let (partition, labels) = labels_to_datafusion(metric.labels());
metric_value_to_datafusion(metric.name(), metric.value())
.into_iter()
.map(move |metric_value| {
DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
})
}

fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
tags.iter()
.fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
if k == PARTITION_LABEL {
partition = v.parse().ok();
.fold((None, Vec::new()), |(mut partition, mut labels), metric| {
if metric.key() == PARTITION_LABEL {
partition = metric.value().parse().ok();
} else {
labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
labels.push(DatafusionLabel::new(
metric.key().to_string(),
metric.value().to_string(),
));
}
(partition, labels)
})
}

fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
match metric {
Metric::Counter(counter) => counter
.count()
MetricValue::Counter(counter) => counter
.value()
.try_into()
.into_iter()
.map(|count| df_counter(name.to_string(), count))
.collect(),
Metric::Histogram(hist) => {
MetricValue::Histogram(hist) => {
let mut res = Vec::new();
if let Ok(count) = hist.count().try_into() {
res.push(df_counter(format!("{name}_count"), count));
}
let snapshot = hist.snapshot();
if let Ok(max) = snapshot.max().try_into() {
res.push(df_gauge(format!("{name}_max"), max));
}
if let Ok(min) = snapshot.min().try_into() {
res.push(df_gauge(format!("{name}_min"), min));
}
if let Some(p90) = f_to_u(snapshot.value(0.90)) {
res.push(df_gauge(format!("{name}_p95"), p90));
}
if let Some(p99) = f_to_u(snapshot.value(0.99)) {
res.push(df_gauge(format!("{name}_p99"), p99));

res.push(df_counter(format!("{name}_count"), hist.count()));

if !hist.is_empty() {
if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
res.push(df_gauge(format!("{name}_max"), max));
}

if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
res.push(df_gauge(format!("{name}_min"), min));
}

if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
res.push(df_gauge(format!("{name}_p95"), p95));
}
if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
res.push(df_gauge(format!("{name}_p99"), p99));
}
}

res
}
Metric::Timer(timer) => {
MetricValue::Timer(timer) => {
let mut res = Vec::new();
if let Ok(count) = timer.count().try_into() {
res.push(df_counter(format!("{name}_count"), count));
}
let snapshot = timer.snapshot();
if let Ok(max) = snapshot.max().try_into() {
// NOTE(os): unlike Time metrics, gauges allow custom aggregation
res.push(df_gauge(format!("{name}_max"), max));
}
if let Ok(min) = snapshot.min().try_into() {
res.push(df_gauge(format!("{name}_min"), min));
}
if let Some(p95) = f_to_u(snapshot.value(0.95)) {
res.push(df_gauge(format!("{name}_p95"), p95));
}
if let Some(p99) = f_to_u(snapshot.value(0.95)) {
res.push(df_gauge(format!("{name}_p99"), p99));
res.push(df_counter(format!("{name}_count"), timer.count()));

if !timer.is_empty() {
let max = timer.quantile(1.0).vortex_expect("must not be empty");
res.push(df_timer(format!("{name}_max"), max));

let min = timer.quantile(0.0).vortex_expect("must not be empty");
res.push(df_timer(format!("{name}_min"), min));

let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
res.push(df_timer(format!("{name}_p95"), p95));

let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
res.push(df_timer(format!("{name}_p99"), p99));
}

res
}
// TODO(os): add more metric types when added to VortexMetrics
Expand All @@ -165,6 +175,15 @@ fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
}
}

fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
let time = Time::new();
time.add_duration(value);
DatafusionMetricValue::Time {
name: name.into(),
time,
}
}

#[expect(
clippy::cast_possible_truncation,
reason = "truncation is checked before cast"
Expand All @@ -174,3 +193,74 @@ fn f_to_u(f: f64) -> Option<usize> {
// After the range check, truncation is guaranteed to keep the value in usize bounds.
f.trunc() as usize)
}

#[cfg(test)]
mod tests {

use datafusion_datasource::source::DataSourceExec;
use datafusion_physical_plan::ExecutionPlanVisitor;
use datafusion_physical_plan::accept;

use super::VortexMetricsFinder;
use crate::common_tests::TestSessionContext;

/// Counts the number of DataSourceExec nodes in a plan.
struct DataSourceExecCounter(usize);

impl ExecutionPlanVisitor for DataSourceExecCounter {
type Error = std::convert::Infallible;
fn pre_visit(
&mut self,
plan: &dyn datafusion_physical_plan::ExecutionPlan,
) -> Result<bool, Self::Error> {
if plan.as_any().downcast_ref::<DataSourceExec>().is_some() {
self.0 += 1;
Ok(false)
} else {
Ok(true)
}
}
}

#[tokio::test]
async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
let ctx = TestSessionContext::default();

ctx.session
.sql(
"CREATE EXTERNAL TABLE my_tbl \
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex \
LOCATION 'files/'",
)
.await?;

ctx.session
.sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
.await?
.collect()
.await?;

let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
let (state, plan) = df.into_parts();
let physical_plan = state.create_physical_plan(&plan).await?;

// Count DataSourceExec nodes
let mut counter = DataSourceExecCounter(0);
accept(physical_plan.as_ref(), &mut counter)?;

// Get metrics sets
let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());

assert!(!metrics_sets.is_empty());
assert_eq!(
metrics_sets.len(),
counter.0,
"Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
metrics_sets.len(),
counter.0
);

Ok(())
}
}
Loading
Loading