From 17c794298e1a78610159c5adb6b9caca9fda6926 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 28 Jan 2026 14:44:45 +0000 Subject: [PATCH 1/6] vortex-metrics v2 Signed-off-by: Adam Gutglick --- Cargo.lock | 36 +- Cargo.toml | 2 +- vortex-datafusion/src/persistent/metrics.rs | 112 ++-- vortex-datafusion/src/persistent/opener.rs | 43 +- vortex-datafusion/src/persistent/source.rs | 17 +- vortex-file/src/open.rs | 19 +- vortex-file/src/read/driver.rs | 99 +-- vortex-file/src/segments/source.rs | 20 +- vortex-file/src/tests.rs | 2 - vortex-file/tests/test_write_table.rs | 2 - vortex-io/src/read.rs | 117 ++-- vortex-layout/src/segments/cache.rs | 25 +- vortex-layout/src/test.rs | 2 - vortex-metrics/Cargo.toml | 5 +- vortex-metrics/src/counter.rs | 30 + vortex-metrics/src/gauge.rs | 63 ++ vortex-metrics/src/histogram.rs | 56 ++ vortex-metrics/src/lib.rs | 420 ++++++------- vortex-metrics/src/timer.rs | 84 +++ vortex-scan/src/scan_builder.rs | 10 +- vortex-scan/src/test.rs | 2 - vortex/src/lib.rs | 2 - wasm-test/Cargo.lock | 637 ++++++++++++++------ 23 files changed, 1154 insertions(+), 651 deletions(-) create mode 100644 vortex-metrics/src/counter.rs create mode 100644 vortex-metrics/src/gauge.rs create mode 100644 vortex-metrics/src/histogram.rs create mode 100644 vortex-metrics/src/timer.rs diff --git a/Cargo.lock b/Cargo.lock index de836acc39c..d3fdb83fd57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3953,16 +3953,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "exponential-decay-histogram" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7962d7e9baab6ea05af175491fa6a8441f3bf461558037622142573d98dd6d6" -dependencies = [ - "ordered-float 5.1.0", - "rand 0.9.2", -] - [[package]] name = "ext-trait" version = "1.0.1" @@ -8592,16 +8582,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-value" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" -dependencies = [ - "ordered-float 2.10.1", - "serde", -] - [[package]] name = "serde_core" version = "1.0.228" @@ -10864,8 +10844,9 @@ version = "0.1.0" dependencies = [ "getrandom 0.3.4", "parking_lot", + "portable-atomic", + "sketches-ddsketch", "vortex-session", - "witchcraft-metrics", ] [[package]] @@ -11826,19 +11807,6 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" -[[package]] -name = "witchcraft-metrics" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "867a74dec702d742179279ab0b5bcab72ca5858c0c3ccf870bdb5c99f54d675b" -dependencies = [ - "exponential-decay-histogram", - "once_cell", - "parking_lot", - "serde", - "serde-value", -] - [[package]] name = "writeable" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index abce82d3762..abe7cde5bf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,6 +177,7 @@ parquet = "57.1" paste = "1.0.15" pco = "0.4.4" pin-project-lite = "0.2.15" +portable-atomic = "1.10" primitive-types = { version = "0.14.0" } proc-macro2 = "1.0.95" prost = "0.14" @@ -233,7 +234,6 @@ url = "2.5.7" uuid = { version = "1.19", features = ["js"] } walkdir = "2.5.0" wasm-bindgen-futures = "0.4.39" -witchcraft-metrics = "1.0.1" xshell = "0.2.6" xz2 = "0.1.7" zigzag = "0.1.0" diff --git a/vortex-datafusion/src/persistent/metrics.rs b/vortex-datafusion/src/persistent/metrics.rs index 58c68d82d58..968af4a280d 100644 --- a/vortex-datafusion/src/persistent/metrics.rs +++ b/vortex-datafusion/src/persistent/metrics.rs @@ -3,6 +3,7 @@ //! Vortex table provider metrics. use std::sync::Arc; +use std::time::Duration; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::source::DataSourceExec; @@ -15,9 +16,11 @@ use datafusion_physical_plan::metrics::Gauge; use datafusion_physical_plan::metrics::Label as DatafusionLabel; use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue; use datafusion_physical_plan::metrics::MetricsSet; +use datafusion_physical_plan::metrics::Time; +use vortex::error::VortexExpect; +use vortex::metrics::Label; use vortex::metrics::Metric; -use vortex::metrics::MetricId; -use vortex::metrics::Tags; +use vortex::metrics::MetricValue; use crate::persistent::source::VortexSource; @@ -59,7 +62,7 @@ impl ExecutionPlanVisitor for VortexMetricsFinder { .vx_metrics() .snapshot() .iter() - .flat_map(|(id, metric)| metric_to_datafusion(id, metric)) + .flat_map(metric_to_datafusion) { set.push(Arc::new(metric)); } @@ -72,74 +75,80 @@ impl ExecutionPlanVisitor for VortexMetricsFinder { } } -fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator { - let (partition, labels) = tags_to_datafusion(id.tags()); - metric_value_to_datafusion(id.name(), metric) +fn metric_to_datafusion(metric: &Metric) -> impl Iterator { + let (partition, labels) = labels_to_datafusion(metric.labels()); + metric_value_to_datafusion(metric.name(), metric.value()) .into_iter() .map(move |metric_value| { DatafusionMetric::new_with_labels(metric_value, partition, labels.clone()) }) } -fn tags_to_datafusion(tags: &Tags) -> (Option, Vec) { +fn labels_to_datafusion(tags: &[Label]) -> (Option, Vec) { tags.iter() - .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| { - if k == PARTITION_LABEL { - partition = v.parse().ok(); + .fold((None, Vec::new()), |(mut partition, mut labels), metric| { + if metric.key() == PARTITION_LABEL { + partition = metric.value().parse().ok(); } else { - labels.push(DatafusionLabel::new(k.to_string(), v.to_string())); + labels.push(DatafusionLabel::new( + metric.key().to_string(), + metric.value().to_string(), + )); } (partition, labels) }) } -fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec { +fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec { match metric { - Metric::Counter(counter) => counter - .count() + MetricValue::Counter(counter) => counter + .value() .try_into() .into_iter() .map(|count| df_counter(name.to_string(), count)) .collect(), - Metric::Histogram(hist) => { + MetricValue::Histogram(hist) => { let mut res = Vec::new(); - if let Ok(count) = hist.count().try_into() { - res.push(df_counter(format!("{name}_count"), count)); - } - let snapshot = hist.snapshot(); - if let Ok(max) = snapshot.max().try_into() { - res.push(df_gauge(format!("{name}_max"), max)); - } - if let Ok(min) = snapshot.min().try_into() { - res.push(df_gauge(format!("{name}_min"), min)); - } - if let Some(p90) = f_to_u(snapshot.value(0.90)) { - res.push(df_gauge(format!("{name}_p95"), p90)); - } - if let Some(p99) = f_to_u(snapshot.value(0.99)) { - res.push(df_gauge(format!("{name}_p99"), p99)); + + res.push(df_counter(format!("{name}_count"), hist.count())); + + if !hist.is_empty() { + if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_max"), max)); + } + + if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_min"), min)); + } + + if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_p95"), p95)); + } + if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) { + res.push(df_gauge(format!("{name}_p99"), p99)); + } } + res } - Metric::Timer(timer) => { + MetricValue::Timer(timer) => { let mut res = Vec::new(); - if let Ok(count) = timer.count().try_into() { - res.push(df_counter(format!("{name}_count"), count)); - } - let snapshot = timer.snapshot(); - if let Ok(max) = snapshot.max().try_into() { - // NOTE(os): unlike Time metrics, gauges allow custom aggregation - res.push(df_gauge(format!("{name}_max"), max)); - } - if let Ok(min) = snapshot.min().try_into() { - res.push(df_gauge(format!("{name}_min"), min)); - } - if let Some(p95) = f_to_u(snapshot.value(0.95)) { - res.push(df_gauge(format!("{name}_p95"), p95)); - } - if let Some(p99) = f_to_u(snapshot.value(0.95)) { - res.push(df_gauge(format!("{name}_p99"), p99)); + res.push(df_counter(format!("{name}_count"), timer.count())); + + if !timer.is_empty() { + let max = timer.quantile(1.0).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_max"), max)); + + let min = timer.quantile(0.0).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_min"), min)); + + let p95 = timer.quantile(0.95).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_p95"), p95)); + + let p99 = timer.quantile(0.99).vortex_expect("must not be empty"); + res.push(df_timer(format!("{name}_p99"), p99)); } + res } // TODO(os): add more metric types when added to VortexMetrics @@ -165,6 +174,15 @@ fn df_gauge(name: String, value: usize) -> DatafusionMetricValue { } } +fn df_timer(name: String, value: Duration) -> DatafusionMetricValue { + let time = Time::new(); + time.add_duration(value); + DatafusionMetricValue::Time { + name: name.into(), + time, + } +} + #[expect( clippy::cast_possible_truncation, reason = "truncation is checked before cast" diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 45cbba7871c..ea9d1a89a63 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -38,7 +38,7 @@ use vortex::error::VortexError; use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; -use vortex::metrics::VortexMetrics; +use vortex::metrics::MetricsRegistry; use vortex::scan::ScanBuilder; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; @@ -50,11 +50,14 @@ use crate::convert::exprs::ExpressionConvertor; use crate::convert::exprs::ProcessedProjection; use crate::convert::exprs::make_vortex_predicate; use crate::convert::schema::calculate_physical_schema; +use crate::metrics::PARTITION_LABEL; use crate::persistent::cache::CachedVortexMetadata; use crate::persistent::stream::PrunableStream; #[derive(Clone)] pub(crate) struct VortexOpener { + /// The partition this opener is assigned to. Only used for labeling metrics. + pub partition: usize, pub session: VortexSession, pub vortex_reader_factory: Arc, /// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is @@ -76,7 +79,7 @@ pub(crate) struct VortexOpener { /// If provided, the scan will not return more than this many rows. pub limit: Option, /// A metrics object for tracking performance of the scan. - pub metrics: VortexMetrics, + pub metrics_registry: Arc, /// A shared cache of file readers. /// /// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache @@ -92,9 +95,7 @@ pub(crate) struct VortexOpener { impl FileOpener for VortexOpener { fn open(&self, file: PartitionedFile) -> DFResult { let session = self.session.clone(); - let metrics = self - .metrics - .child_with_tags([("file_path", file.path().to_string())]); + let metrics_registry = self.metrics_registry.clone(); let mut projection = self.projection.clone(); let mut filter = self.filter.clone(); @@ -103,7 +104,14 @@ impl FileOpener for VortexOpener { .vortex_reader_factory .create_reader(file.path().as_ref(), &session)?; - let reader = InstrumentedReadAt::new(reader, &metrics); + let reader = InstrumentedReadAt::new_with_labels( + reader, + metrics_registry.as_ref(), + [ + ("file_path", file.path().to_string()), + (PARTITION_LABEL, self.partition.to_string()), + ], + ); let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = self.expr_adapter_factory.clone(); @@ -169,7 +177,7 @@ impl FileOpener for VortexOpener { let mut open_opts = session .open_options() .with_file_size(file.object_meta.size) - .with_metrics(metrics.clone()); + .with_metrics(metrics_registry.clone()); if let Some(file_metadata_cache) = file_metadata_cache && let Some(file_metadata) = file_metadata_cache.get(&file.object_meta) @@ -318,7 +326,7 @@ impl FileOpener for VortexOpener { } let stream = scan_builder - .with_metrics(metrics) + .with_metrics(metrics_registry) .with_projection(scan_projection) .with_some_filter(filter) .with_ordered(has_output_ordering) @@ -440,6 +448,7 @@ mod tests { use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; + use vortex::metrics::DefaultMetricsRegistry; use vortex::scan::Selection; use vortex::session::VortexSession; @@ -516,6 +525,7 @@ mod tests { filter: Option, ) -> VortexOpener { VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), @@ -525,7 +535,7 @@ mod tests { table_schema, batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -607,6 +617,7 @@ mod tests { )]))); let make_opener = |filter| VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), @@ -616,7 +627,7 @@ mod tests { table_schema: table_schema.clone(), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -690,6 +701,7 @@ mod tests { ])); let opener = VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection: ProjectionExprs::from_indices(&[0, 1, 2], &table_schema), @@ -699,7 +711,7 @@ mod tests { table_schema: TableSchema::from_file_schema(table_schema.clone()), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -839,6 +851,7 @@ mod tests { let projection = vec![0, 2]; let opener = VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection: ProjectionExprs::from_indices( @@ -851,7 +864,7 @@ mod tests { table_schema: table_schema.clone(), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -898,6 +911,7 @@ mod tests { projection: ProjectionExprs, ) -> VortexOpener { VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection, @@ -907,7 +921,7 @@ mod tests { table_schema: TableSchema::from_file_schema(schema), batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -1096,6 +1110,7 @@ mod tests { )]); let opener = VortexOpener { + partition: 1, session: SESSION.clone(), vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection, @@ -1105,7 +1120,7 @@ mod tests { table_schema, batch_size: 100, limit: None, - metrics: Default::default(), + metrics_registry: Arc::new(DefaultMetricsRegistry::default()), layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 5e46f61c5e4..6aa3ae58afc 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -29,11 +29,11 @@ use object_store::path::Path; use vortex::error::VortexExpect as _; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; -use vortex::metrics::VortexMetrics; +use vortex::metrics::DefaultMetricsRegistry; +use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; -use super::metrics::PARTITION_LABEL; use super::opener::VortexOpener; use crate::DefaultVortexReaderFactory; use crate::VortexReaderFactory; @@ -62,7 +62,7 @@ pub struct VortexSource { layout_readers: Arc>>, expression_convertor: Arc, pub(crate) vortex_reader_factory: Option>, - vx_metrics: VortexMetrics, + vx_metrics: Arc, file_metadata_cache: Option>, } @@ -83,7 +83,7 @@ impl VortexSource { layout_readers: Arc::new(DashMap::default()), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), vortex_reader_factory: None, - vx_metrics: VortexMetrics::default(), + vx_metrics: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, } } @@ -109,7 +109,7 @@ impl VortexSource { } /// The metrics instance attached to this source. - pub fn vx_metrics(&self) -> &VortexMetrics { + pub fn vx_metrics(&self) -> &Arc { &self.vx_metrics } @@ -130,10 +130,6 @@ impl FileSource for VortexSource { base_config: &FileScanConfig, partition: usize, ) -> DFResult> { - let partition_metrics = self - .vx_metrics() - .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter()); - let batch_size = self .batch_size .vortex_expect("batch_size must be supplied to VortexSource"); @@ -149,6 +145,7 @@ impl FileSource for VortexSource { .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store))); let opener = VortexOpener { + partition, session: self.session.clone(), vortex_reader_factory, projection: self.projection.clone(), @@ -158,7 +155,7 @@ impl FileSource for VortexSource { table_schema: self.table_schema.clone(), batch_size, limit: base_config.limit.map(|l| l as u64), - metrics: partition_metrics, + metrics_registry: self.vx_metrics.clone(), layout_readers: self.layout_readers.clone(), has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 3e2d73aee40..e6f258180ae 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -21,7 +21,8 @@ use vortex_layout::segments::SegmentCacheSourceAdapter; use vortex_layout::segments::SegmentId; use vortex_layout::segments::SharedSegmentSource; use vortex_layout::session::LayoutSessionExt; -use vortex_metrics::VortexMetrics; +use vortex_metrics::DefaultMetricsRegistry; +use vortex_metrics::MetricsRegistry; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; @@ -52,7 +53,7 @@ pub struct VortexOpenOptions { /// The segments read during the initial read. initial_read_segments: RwLock>, /// A metrics registry for the file. - metrics: Option, + metrics: Option>, } pub trait OpenOptionsSessionExt: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt { @@ -119,8 +120,8 @@ impl VortexOpenOptions { self } - /// Configure a custom [`VortexMetrics`]. - pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self { + /// Configure a custom [`MetricsRegistry`] implementation. + pub fn with_metrics(mut self, metrics: Arc) -> Self { self.metrics = Some(metrics); self } @@ -156,7 +157,10 @@ impl VortexOpenOptions { /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation. pub async fn open_read(self, reader: R) -> VortexResult { - let metrics = self.metrics.clone().unwrap_or_default(); + let metrics_registry = self + .metrics + .clone() + .unwrap_or_else(|| Arc::new(DefaultMetricsRegistry::default())); let footer = if let Some(footer) = self.footer { footer } else { @@ -168,7 +172,7 @@ impl VortexOpenOptions { initial: self.initial_read_segments, fallback: self.segment_cache, }, - metrics.clone(), + metrics_registry.as_ref(), )); // Create a segment source backed by the VortexRead implementation. @@ -176,7 +180,7 @@ impl VortexOpenOptions { footer.segment_map().clone(), reader, self.session.handle(), - metrics.clone(), + metrics_registry.as_ref(), ))); // Wrap up the segment source to first resolve segments from the initial read cache. @@ -344,7 +348,6 @@ mod tests { // Create a large file (> 1MB) let mut buf = ByteBufferMut::empty(); let mut session = VortexSession::empty() - .with::() .with::() .with::() .with::() diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index e0aa59666fa..eb99d71ba01 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -4,7 +4,6 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use std::pin::Pin; -use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -15,7 +14,8 @@ use vortex_error::VortexExpect; use vortex_io::CoalesceConfig; use vortex_metrics::Counter; use vortex_metrics::Histogram; -use vortex_metrics::VortexMetrics; +use vortex_metrics::MetricBuilder; +use vortex_metrics::MetricsRegistry; use crate::read::ReadRequest; use crate::read::RequestId; @@ -49,7 +49,7 @@ impl IoRequestStream { events: S, coalesce_window: Option, coalesced_buffer_alignment: Alignment, - metrics: VortexMetrics, + metrics_registry: &dyn MetricsRegistry, ) -> Self where S: Stream + Unpin + Send + 'static, @@ -58,7 +58,7 @@ impl IoRequestStream { events, inner_done: false, coalesce_window, - state: State::new(metrics, coalesced_buffer_alignment), + state: State::new(metrics_registry, coalesced_buffer_alignment), } } } @@ -123,28 +123,31 @@ struct State { } struct StateMetrics { - individual_requests: Arc, - coalesced_requests: Arc, - num_requests_coalesced: Arc, + individual_requests: Counter, + coalesced_requests: Counter, + num_requests_coalesced: Histogram, } impl StateMetrics { - fn new(registry: VortexMetrics) -> Self { + fn new(metrics_registry: &dyn MetricsRegistry) -> Self { Self { - individual_requests: registry.counter("io.requests.individual"), - coalesced_requests: registry.counter("io.requests.coalesced"), - num_requests_coalesced: registry.histogram("io.requests.coalesced.num_coalesced"), + individual_requests: MetricBuilder::new(metrics_registry) + .counter("io.requests.individual"), + coalesced_requests: MetricBuilder::new(metrics_registry) + .counter("io.requests.coalesced"), + num_requests_coalesced: MetricBuilder::new(metrics_registry) + .histogram("io.requests.coalesced.num_coalesced"), } } } impl State { - fn new(metrics: VortexMetrics, coalesced_buffer_alignment: Alignment) -> Self { + fn new(metrics_registry: &dyn MetricsRegistry, coalesced_buffer_alignment: Alignment) -> Self { Self { requests: BTreeMap::new(), polled_requests: BTreeMap::new(), requests_by_offset: BTreeSet::new(), - metrics: StateMetrics::new(metrics), + metrics: StateMetrics::new(metrics_registry), coalesced_buffer_alignment, } } @@ -179,17 +182,17 @@ impl State { fn next(&mut self, coalesce_window: Option<&CoalesceConfig>) -> Option { match coalesce_window { None => self.next_uncoalesced().map(|request| { - self.metrics.individual_requests.inc(); + self.metrics.individual_requests.add(1); IoRequest::new_single(request) }), Some(window) => self.next_coalesced(window).map(|request| { match request.requests.len() { - 1 => self.metrics.individual_requests.inc(), + 1 => self.metrics.individual_requests.add(1), num_requests => { - self.metrics.coalesced_requests.inc(); + self.metrics.coalesced_requests.add(1); self.metrics .num_requests_coalesced - .update(num_requests as i64); + .update(num_requests as f64); } }; IoRequest::new_coalesced(request) @@ -328,6 +331,8 @@ mod tests { use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_error::VortexResult; + use vortex_metrics::DefaultMetricsRegistry; + use vortex_metrics::MetricValue; use super::*; use crate::read::request::IoRequestInner; @@ -363,12 +368,12 @@ mod tests { coalesced_buffer_alignment: Alignment, ) -> Vec { let event_stream = stream::iter(events); - let metrics = VortexMetrics::default(); + let metrics_registry = DefaultMetricsRegistry::default(); let io_stream = IoRequestStream::new( event_stream, coalesce_window, coalesced_buffer_alignment, - metrics, + &metrics_registry, ); io_stream.collect().await } @@ -681,7 +686,7 @@ mod tests { ]; let event_stream = stream::iter(events); - let metrics = VortexMetrics::default(); + let metrics_registry = DefaultMetricsRegistry::default(); let io_stream = IoRequestStream::new( event_stream, Some(CoalesceConfig { @@ -689,28 +694,28 @@ mod tests { max_size: 1024, }), Alignment::none(), - metrics.clone(), + &metrics_registry, ); let outputs: Vec = io_stream.collect().await; assert_eq!(outputs.len(), 2); - let snapshot = metrics.snapshot(); - let mut individual_count = 0i64; - let mut coalesced_operations = 0i64; - let mut coalesced_histogram_count = 0u64; - - for (metric_id, metric) in snapshot.iter() { - match metric { - vortex_metrics::Metric::Counter(counter) => { - if metric_id.name() == "io.requests.individual" { - individual_count = counter.count(); - } else if metric_id.name() == "io.requests.coalesced" { - coalesced_operations = counter.count(); + let snapshot = metrics_registry.snapshot(); + let mut individual_count = 0u64; + let mut coalesced_operations = 0u64; + let mut coalesced_histogram_count = 0usize; + + for metric in snapshot.iter() { + match metric.value() { + MetricValue::Counter(counter) => { + if metric.name() == "io.requests.individual" { + individual_count = counter.value(); + } else if metric.name() == "io.requests.coalesced" { + coalesced_operations = counter.value(); } } - vortex_metrics::Metric::Histogram(histogram) => { - if metric_id.name() == "io.requests.coalesced.num_coalesced" { + MetricValue::Histogram(histogram) => { + if metric.name() == "io.requests.coalesced.num_coalesced" { coalesced_histogram_count = histogram.count(); } } @@ -740,25 +745,25 @@ mod tests { ]; let event_stream = stream::iter(events); - let metrics = VortexMetrics::default(); + let metrics_registry = DefaultMetricsRegistry::default(); // No coalescing window - should be individual requests let io_stream = - IoRequestStream::new(event_stream, None, Alignment::none(), metrics.clone()); + IoRequestStream::new(event_stream, None, Alignment::none(), &metrics_registry); let outputs: Vec = io_stream.collect().await; assert_eq!(outputs.len(), 2); // Check metrics - let snapshot = metrics.snapshot(); - let mut individual_count = 0i64; - let mut coalesced_operations = 0i64; - - for (metric_id, metric) in snapshot.iter() { - if let vortex_metrics::Metric::Counter(counter) = metric { - if metric_id.name() == "io.requests.individual" { - individual_count = counter.count(); - } else if metric_id.name() == "io.requests.coalesced.num_coalesced" { - coalesced_operations = counter.count(); + let snapshot = metrics_registry.snapshot(); + let mut individual_count = 0_u64; + let mut coalesced_operations = 0_u64; + + for metric in snapshot.iter() { + if let MetricValue::Counter(counter) = metric.value() { + if metric.name() == "io.requests.individual" { + individual_count = counter.value(); + } else if metric.name() == "io.requests.coalesced.num_coalesced" { + coalesced_operations = counter.value(); } } } diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 344f805f516..4eb320a6ec6 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -21,7 +21,7 @@ use vortex_io::runtime::Handle; use vortex_layout::segments::SegmentFuture; use vortex_layout::segments::SegmentId; use vortex_layout::segments::SegmentSource; -use vortex_metrics::VortexMetrics; +use vortex_metrics::MetricsRegistry; use crate::SegmentSpec; use crate::read::IoRequestStream; @@ -69,7 +69,7 @@ impl FileSegmentSource { segments: Arc<[SegmentSpec]>, reader: R, handle: Handle, - metrics: VortexMetrics, + metrics_registry: &dyn MetricsRegistry, ) -> Self { let (send, recv) = mpsc::unbounded(); @@ -87,15 +87,15 @@ impl FileSegmentSource { }); let concurrency = reader.concurrency(); - let drive_fut = async move { - let stream = IoRequestStream::new( - StreamExt::boxed(recv), - coalesce_config, - max_alignment, - metrics, - ) - .boxed(); + let stream = IoRequestStream::new( + StreamExt::boxed(recv), + coalesce_config, + max_alignment, + metrics_registry, + ) + .boxed(); + let drive_fut = async move { stream .map(move |req| { let reader = reader.clone(); diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index a3932c1756a..f8c4d6c171c 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -64,7 +64,6 @@ use vortex_dtype::datetime::TimestampOptions; use vortex_error::VortexResult; use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; -use vortex_metrics::VortexMetrics; use vortex_scalar::Scalar; use vortex_scan::ScanBuilder; use vortex_session::VortexSession; @@ -77,7 +76,6 @@ use crate::WriteOptionsSessionExt; static SESSION: LazyLock = LazyLock::new(|| { let mut session = VortexSession::empty() - .with::() .with::() .with::() .with::() diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index 4821e9962e6..360d07f4262 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -25,12 +25,10 @@ use vortex_layout::layouts::compressed::CompressingStrategy; use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_layout::layouts::table::TableStrategy; use vortex_layout::session::LayoutSession; -use vortex_metrics::VortexMetrics; use vortex_session::VortexSession; static SESSION: LazyLock = LazyLock::new(|| { let mut session = VortexSession::empty() - .with::() .with::() .with::() .with::() diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index dbae9415e61..39c047146d3 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -13,8 +13,10 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_metrics::Counter; use vortex_metrics::Histogram; +use vortex_metrics::Label; +use vortex_metrics::MetricBuilder; +use vortex_metrics::MetricsRegistry; use vortex_metrics::Timer; -use vortex_metrics::VortexMetrics; /// Configuration for coalescing nearby I/O requests into single operations. #[derive(Clone, Copy, Debug)] @@ -136,12 +138,6 @@ impl VortexReadAt for Arc { ) -> BoxFuture<'static, VortexResult> { self.as_ref().read_at(offset, length, alignment) } - - // fn drive(self: Arc, requests: BoxStream<'static, IoRequest>) -> BoxFuture<'static, ()> { - // // Delegate to the inner implementation's drive - // let inner: Arc = Arc::clone(&self); - // inner.drive(requests) - // } } impl VortexReadAt for ByteBuffer { @@ -185,46 +181,90 @@ impl VortexReadAt for ByteBuffer { #[derive(Clone)] pub struct InstrumentedReadAt { read: T, - sizes: Arc, - total_size: Arc, - durations: Arc, + // We use `Arc` to take care of all the complexity that's potentially associated with reference counting + // and dropping + metrics: Arc, +} + +struct InnerMetrics { + sizes: Histogram, + total_size: Counter, + durations: Timer, } impl InstrumentedReadAt { - pub fn new(read: T, metrics: &VortexMetrics) -> Self { + pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self { + Self::new_with_labels(read, metrics_registry, Vec::