diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f9ed52e131..7f44923a4fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -584,7 +584,11 @@ jobs: - name: Rust Tests (Windows) if: matrix.os == 'windows-x64' run: | - cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude vortex-cuda --exclude vortex-nvcomp --exclude vortex-cub --exclude vortex-test-e2e-cuda --exclude duckdb-bench --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench --exclude xtask + cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench ` + --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude vortex-cuda ` + --exclude vortex-nvcomp --exclude vortex-cub --exclude vortex-test-e2e-cuda --exclude duckdb-bench ` + --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench ` + --exclude xtask --exclude vortex-datafusion - name: Rust Tests (Other) if: matrix.os != 'windows-x64' run: cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude xtask diff --git a/Cargo.lock b/Cargo.lock index de836acc39c..20822526491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3953,16 +3953,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "exponential-decay-histogram" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7962d7e9baab6ea05af175491fa6a8441f3bf461558037622142573d98dd6d6" -dependencies = [ - "ordered-float 5.1.0", - "rand 0.9.2", -] - [[package]] name = "ext-trait" version = "1.0.1" @@ -8592,16 +8582,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-value" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" -dependencies = [ - "ordered-float 2.10.1", - "serde", -] - [[package]] name = "serde_core" version = "1.0.228" @@ -10864,8 +10844,8 @@ version = "0.1.0" dependencies = [ "getrandom 0.3.4", "parking_lot", + "sketches-ddsketch", "vortex-session", - "witchcraft-metrics", ] [[package]] @@ -11826,19 +11806,6 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" -[[package]] -name = "witchcraft-metrics" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "867a74dec702d742179279ab0b5bcab72ca5858c0c3ccf870bdb5c99f54d675b" -dependencies = [ - "exponential-decay-histogram", - "once_cell", - "parking_lot", - "serde", - "serde-value", -] - [[package]] name = "writeable" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index abce82d3762..f25e858917f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,7 +233,6 @@ url = "2.5.7" uuid = { version = "1.19", features = ["js"] } walkdir = "2.5.0" wasm-bindgen-futures = "0.4.39" -witchcraft-metrics = "1.0.1" xshell = "0.2.6" xz2 = "0.1.7" zigzag = "0.1.0" diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 6be64e5bbad..0a8e26fde42 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -320,9 +320,9 @@ fn print_metrics(plans: &[(usize, Format, Arc)]) { eprintln!("metrics for query={query_idx}, {format}:"); for (scan_idx, metrics_set) in metric_sets.iter().enumerate() { - eprintln!(" scan[{scan_idx}]:"); - for metric in metrics_set.clone().aggregate().sorted_for_display().iter() { - eprintln!(" {metric}"); + eprintln!("\tscan[{scan_idx}]:"); + for metric in metrics_set.aggregate().sorted_for_display().iter() { + eprintln!("\t\t{metric}"); } } } diff --git a/vortex-datafusion/src/persistent/metrics.rs b/vortex-datafusion/src/persistent/metrics.rs index 58c68d82d58..7e99430568f 100644 --- a/vortex-datafusion/src/persistent/metrics.rs +++ b/vortex-datafusion/src/persistent/metrics.rs @@ -3,6 +3,7 @@ //! Vortex table provider metrics. use std::sync::Arc; +use std::time::Duration; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::source::DataSourceExec; @@ -15,13 +16,16 @@ use datafusion_physical_plan::metrics::Gauge; use datafusion_physical_plan::metrics::Label as DatafusionLabel; use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue; use datafusion_physical_plan::metrics::MetricsSet; +use datafusion_physical_plan::metrics::Time; +use vortex::error::VortexExpect; +use vortex::metrics::Label; use vortex::metrics::Metric; -use vortex::metrics::MetricId; -use vortex::metrics::Tags; +use vortex::metrics::MetricValue; use crate::persistent::source::VortexSource; pub(crate) static PARTITION_LABEL: &str = "partition"; +pub(crate) static PATH_LABEL: &str = "file_path"; /// Extracts datafusion metrics from all VortexExec instances in /// a given physical plan. @@ -43,9 +47,8 @@ impl ExecutionPlanVisitor for VortexMetricsFinder { type Error = std::convert::Infallible; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { if let Some(exec) = plan.as_any().downcast_ref::() { - if let Some(metrics) = exec.metrics() { - self.0.push(metrics); - } + // Start with exec metrics or create a new set + let mut set = exec.metrics().unwrap_or_default(); // Include our own metrics from VortexSource if let Some(file_scan) = exec.data_source().as_any().downcast_ref::() @@ -54,92 +57,99 @@ impl ExecutionPlanVisitor for VortexMetricsFinder { .as_any() .downcast_ref::() { - let mut set = MetricsSet::new(); for metric in scan - .vx_metrics() + .metrics_registry() .snapshot() .iter() - .flat_map(|(id, metric)| metric_to_datafusion(id, metric)) + .flat_map(metric_to_datafusion) { set.push(Arc::new(metric)); } - - self.0.push(set); } - } - Ok(true) + self.0.push(set); + + Ok(false) + } else { + Ok(true) + } } } -fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator { - let (partition, labels) = tags_to_datafusion(id.tags()); - metric_value_to_datafusion(id.name(), metric) +fn metric_to_datafusion(metric: &Metric) -> impl Iterator { + let (partition, labels) = labels_to_datafusion(metric.labels()); + metric_value_to_datafusion(metric.name(), metric.value()) .into_iter() .map(move |metric_value| { DatafusionMetric::new_with_labels(metric_value, partition, labels.clone()) }) } -fn tags_to_datafusion(tags: &Tags) -> (Option, Vec) { +fn labels_to_datafusion(tags: &[Label]) -> (Option, Vec) { tags.iter() - .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| { - if k == PARTITION_LABEL { - partition = v.parse().ok(); + .fold((None, Vec::new()), |(mut partition, mut labels), metric| { + if metric.key() == PARTITION_LABEL { + partition = metric.value().parse().ok(); } else { - labels.push(DatafusionLabel::new(k.to_string(), v.to_string())); + labels.push(DatafusionLabel::new( + metric.key().to_string(), + metric.value().to_string(), + )); } (partition, labels) }) } -fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec { +fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec { match metric { - Metric::Counter(counter) => counter - .count() + MetricValue::Counter(counter) => counter + .value() .try_into() .into_iter() .map(|count| df_counter(name.to_string(), count)) .collect(), - Metric::Histogram(hist) => { + MetricValue::Histogram(hist) => { let mut res = Vec::new(); - if let Ok(count) = hist.count().try_into() { - res.push(df_counter(format!("{name}_count"), count)); - } - let snapshot = hist.snapshot(); - if let Ok(max) = snapshot.max().try_into() { - res.push(df_gauge(format!("{name}_max"), max)); - } - if let Ok(min) = snapshot.min().try_into() { - res.push(df_gauge(format!("{name}_min"), min)); - } - if let Some(p90) = f_to_u(snapshot.value(0.90)) { - res.push(df_gauge(format!("{name}_p95"), p90)); - } - if let Some(p99) = f_to_u(snapshot.value(0.99)) { - res.push(df_gauge(format!("{name}_p99"), p99)); + + res.push(df_counter(format!("{name}_count"), hist.count())); + + if !hist.is_empty() { + if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_max"), max)); + } + + if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_min"), min)); + } + + if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_p95"), p95)); + } + if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_p99"), p99)); + } } + res } - Metric::Timer(timer) => { + MetricValue::Timer(timer) => { let mut res = Vec::new(); - if let Ok(count) = timer.count().try_into() { - res.push(df_counter(format!("{name}_count"), count)); - } - let snapshot = timer.snapshot(); - if let Ok(max) = snapshot.max().try_into() { - // NOTE(os): unlike Time metrics, gauges allow custom aggregation - res.push(df_gauge(format!("{name}_max"), max)); - } - if let Ok(min) = snapshot.min().try_into() { - res.push(df_gauge(format!("{name}_min"), min)); - } - if let Some(p95) = f_to_u(snapshot.value(0.95)) { - res.push(df_gauge(format!("{name}_p95"), p95)); - } - if let Some(p99) = f_to_u(snapshot.value(0.95)) { - res.push(df_gauge(format!("{name}_p99"), p99)); + res.push(df_counter(format!("{name}_count"), timer.count())); + + if !timer.is_empty() { + let max = timer.quantile(1.0).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_max"), max)); + + let min = timer.quantile(0.0).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_min"), min)); + + let p95 = timer.quantile(0.95).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_p95"), p95)); + + let p99 = timer.quantile(0.99).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_p99"), p99)); } + res } // TODO(os): add more metric types when added to VortexMetrics @@ -165,6 +175,15 @@ fn df_gauge(name: String, value: usize) -> DatafusionMetricValue { } } +fn df_timer(name: String, value: Duration) -> DatafusionMetricValue { + let time = Time::new(); + time.add_duration(value); + DatafusionMetricValue::Time { + name: name.into(), + time, + } +} + #[expect( clippy::cast_possible_truncation, reason = "truncation is checked before cast" @@ -174,3 +193,74 @@ fn f_to_u(f: f64) -> Option { // After the range check, truncation is guaranteed to keep the value in usize bounds. f.trunc() as usize) } + +#[cfg(test)] +mod tests { + + use datafusion_datasource::source::DataSourceExec; + use datafusion_physical_plan::ExecutionPlanVisitor; + use datafusion_physical_plan::accept; + + use super::VortexMetricsFinder; + use crate::common_tests::TestSessionContext; + + /// Counts the number of DataSourceExec nodes in a plan. + struct DataSourceExecCounter(usize); + + impl ExecutionPlanVisitor for DataSourceExecCounter { + type Error = std::convert::Infallible; + fn pre_visit( + &mut self, + plan: &dyn datafusion_physical_plan::ExecutionPlan, + ) -> Result { + if plan.as_any().downcast_ref::().is_some() { + self.0 += 1; + Ok(false) + } else { + Ok(true) + } + } + } + + #[tokio::test] + async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> { + let ctx = TestSessionContext::default(); + + ctx.session + .sql( + "CREATE EXTERNAL TABLE my_tbl \ + (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \ + STORED AS vortex \ + LOCATION 'files/'", + ) + .await?; + + ctx.session + .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)") + .await? + .collect() + .await?; + + let df = ctx.session.sql("SELECT * FROM my_tbl").await?; + let (state, plan) = df.into_parts(); + let physical_plan = state.create_physical_plan(&plan).await?; + + // Count DataSourceExec nodes + let mut counter = DataSourceExecCounter(0); + accept(physical_plan.as_ref(), &mut counter)?; + + // Get metrics sets + let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref()); + + assert!(!metrics_sets.is_empty()); + assert_eq!( + metrics_sets.len(), + counter.0, + "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes", + metrics_sets.len(), + counter.0 + ); + + Ok(()) + } +} diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 45cbba7871c..15a2a81887a 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -38,7 +38,8 @@ use vortex::error::VortexError; use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; -use vortex::metrics::VortexMetrics; +use vortex::metrics::Label; +use vortex::metrics::MetricsRegistry; use vortex::scan::ScanBuilder; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; @@ -50,11 +51,15 @@ use crate::convert::exprs::ExpressionConvertor; use crate::convert::exprs::ProcessedProjection; use crate::convert::exprs::make_vortex_predicate; use crate::convert::schema::calculate_physical_schema; +use crate::metrics::PARTITION_LABEL; +use crate::metrics::PATH_LABEL; use crate::persistent::cache::CachedVortexMetadata; use crate::persistent::stream::PrunableStream; #[derive(Clone)] pub(crate) struct VortexOpener { + /// The partition this opener is assigned to. Only used for labeling metrics. + pub partition: usize, pub session: VortexSession, pub vortex_reader_factory: Arc, /// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is @@ -76,7 +81,7 @@ pub(crate) struct VortexOpener { /// If provided, the scan will not return more than this many rows. pub limit: Option, /// A metrics object for tracking performance of the scan. - pub metrics: VortexMetrics, + pub metrics_registry: Arc, /// A shared cache of file readers. /// /// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache @@ -92,9 +97,11 @@ pub(crate) struct VortexOpener { impl FileOpener for VortexOpener { fn open(&self, file: PartitionedFile) -> DFResult { let session = self.session.clone(); - let metrics = self - .metrics - .child_with_tags([("file_path", file.path().to_string())]); + let metrics_registry = self.metrics_registry.clone(); + let labels = vec![ + Label::new(PATH_LABEL, file.path().to_string()), + Label::new(PARTITION_LABEL, self.partition.to_string()), + ]; let mut projection = self.projection.clone(); let mut filter = self.filter.clone(); @@ -103,7 +110,8 @@ impl FileOpener for VortexOpener { .vortex_reader_factory .create_reader(file.path().as_ref(), &session)?; - let reader = InstrumentedReadAt::new(reader, &metrics); + let reader = + InstrumentedReadAt::new_with_labels(reader, metrics_registry.as_ref(), labels.clone()); let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = self.expr_adapter_factory.clone(); @@ -169,7 +177,8 @@ impl FileOpener for VortexOpener { let mut open_opts = session .open_options() .with_file_size(file.object_meta.size) - .with_metrics(metrics.clone()); + .with_metrics_registry(metrics_registry.clone()) + .with_labels(labels); if let Some(file_metadata_cache) = file_metadata_cache && let Some(file_metadata) = file_metadata_cache.get(&file.object_meta) @@ -318,7 +327,7 @@ impl FileOpener for VortexOpener { } let stream = scan_builder - .with_metrics(metrics) + .with_metrics_registry(metrics_registry) .with_projection(scan_projection) .with_some_filter(filter) .with_ordered(has_output_ordering) @@ -440,6 +449,7 @@ mod tests { use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; + use vortex::metrics::DefaultMetricsRegistry; use vortex::scan::Selection; use vortex::session::VortexSession; @@ -516,6 +526,7 @@ mod tests { filter: Option, ) -> VortexOpener { VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), @@ -525,7 +536,7 @@ mod tests { table_schema, batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -607,6 +618,7 @@ mod tests { )]))); let make_opener = |filter| VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), @@ -616,7 +628,7 @@ mod tests { table_schema: table_schema.clone(), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -690,6 +702,7 @@ mod tests { ])); let opener = VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection: ProjectionExprs::from_indices(&[0, 1, 2], &table_schema), @@ -699,7 +712,7 @@ mod tests { table_schema: TableSchema::from_file_schema(table_schema.clone()), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -839,6 +852,7 @@ mod tests { let projection = vec![0, 2]; let opener = VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection: ProjectionExprs::from_indices( @@ -851,7 +865,7 @@ mod tests { table_schema: table_schema.clone(), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -898,6 +912,7 @@ mod tests { projection: ProjectionExprs, ) -> VortexOpener { VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection, @@ -907,7 +922,7 @@ mod tests { table_schema: TableSchema::from_file_schema(schema), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -1096,6 +1111,7 @@ mod tests { )]); let opener = VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection, @@ -1105,7 +1121,7 @@ mod tests { table_schema, batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 5e46f61c5e4..53bdd639b38 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -29,11 +29,11 @@ use object_store::path::Path; use vortex::error::VortexExpect as _; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; -use vortex::metrics::VortexMetrics; +use vortex::metrics::DefaultMetricsRegistry; +use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; -use super::metrics::PARTITION_LABEL; use super::opener::VortexOpener; use crate::DefaultVortexReaderFactory; use crate::VortexReaderFactory; @@ -62,7 +62,7 @@ pub struct VortexSource { layout_readers: Arc>>, expression_convertor: Arc, pub(crate) vortex_reader_factory: Option>, - vx_metrics: VortexMetrics, + vx_metrics_registry: Arc, file_metadata_cache: Option>, } @@ -83,7 +83,7 @@ impl VortexSource { layout_readers: Arc::new(DashMap::default()), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), vortex_reader_factory: None, - vx_metrics: VortexMetrics::default(), + vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, } } @@ -108,9 +108,9 @@ impl VortexSource { self } - /// The metrics instance attached to this source. - pub fn vx_metrics(&self) -> &VortexMetrics { - &self.vx_metrics + /// Returns the [`MetricsRegistry`] attached to this source. + pub fn metrics_registry(&self) -> &Arc { + &self.vx_metrics_registry } /// Override the file metadata cache @@ -130,10 +130,6 @@ impl FileSource for VortexSource { base_config: &FileScanConfig, partition: usize, ) -> DFResult> { - let partition_metrics = self - .vx_metrics() - .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter()); - let batch_size = self .batch_size .vortex_expect("batch_size must be supplied to VortexSource"); @@ -149,6 +145,7 @@ impl FileSource for VortexSource { .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store))); let opener = VortexOpener { + partition, session: self.session.clone(), vortex_reader_factory, projection: self.projection.clone(), @@ -158,7 +155,7 @@ impl FileSource for VortexSource { table_schema: self.table_schema.clone(), batch_size, limit: base_config.limit.map(|l| l as u64), - metrics: partition_metrics, + metrics_registry: self.vx_metrics_registry.clone(), layout_readers: self.layout_readers.clone(), has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 3e2d73aee40..b45e3d1bdc8 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -14,14 +14,16 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; +use vortex_layout::segments::InstrumentedSegmentCache; use vortex_layout::segments::NoOpSegmentCache; use vortex_layout::segments::SegmentCache; -use vortex_layout::segments::SegmentCacheMetrics; use vortex_layout::segments::SegmentCacheSourceAdapter; use vortex_layout::segments::SegmentId; use vortex_layout::segments::SharedSegmentSource; use vortex_layout::session::LayoutSessionExt; -use vortex_metrics::VortexMetrics; +use vortex_metrics::DefaultMetricsRegistry; +use vortex_metrics::Label; +use vortex_metrics::MetricsRegistry; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; @@ -32,6 +34,7 @@ use crate::VortexFile; use crate::footer::Footer; use crate::segments::FileSegmentSource; use crate::segments::InitialReadSegmentCache; +use crate::segments::RequestMetrics; const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE; @@ -40,7 +43,7 @@ pub struct VortexOpenOptions { /// The session to use for opening the file. session: VortexSession, /// Cache to use for file segments. - segment_cache: Arc, + segment_cache: Option>, /// The number of bytes to read when parsing the footer. initial_read_size: usize, /// An optional, externally provided, file size. @@ -52,7 +55,9 @@ pub struct VortexOpenOptions { /// The segments read during the initial read. initial_read_segments: RwLock>, /// A metrics registry for the file. - metrics: Option, + metrics_registry: Option>, + /// Default labels applied to all the file's metrics + labels: Vec