From ed48963011ab42b91dac699173601f1119c3487f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 30 Dec 2025 19:50:26 +1100 Subject: [PATCH 1/2] add metrics collection for logs, traces and metrics separately add below metrics - 1. total logs collected by date 2. total size of logs collected by date 3. total metrics collected by date 4. total size of metrics collected by date 5. total traces collected by date 6. total size of traces collected by date add all of these to pbilling --- src/event/format/json.rs | 6 +- src/event/format/mod.rs | 2 + src/event/mod.rs | 4 +- src/handlers/http/cluster/mod.rs | 70 ++++++++++ src/handlers/http/ingest.rs | 37 +++++- src/handlers/http/modal/utils/ingest_utils.rs | 10 +- src/metrics/mod.rs | 123 +++++++++++++++++- src/otel/logs.rs | 4 + src/otel/metrics.rs | 5 +- src/otel/traces.rs | 5 + src/storage/field_stats.rs | 1 + 11 files changed, 251 insertions(+), 16 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 5cd862b21..76b857f0e 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -31,7 +31,9 @@ use std::{collections::HashMap, sync::Arc}; use tracing::error; use super::EventFormat; -use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; +use crate::{ + handlers::TelemetryType, metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field, +}; pub struct Event { pub json: Value, @@ -147,6 +149,7 @@ impl EventFormat for Event { schema_version: SchemaVersion, stream_type: StreamType, p_custom_fields: &HashMap, + telemetry_type: TelemetryType, ) -> Result { let custom_partition_values = match custom_partitions.as_ref() { Some(custom_partition) => { @@ -179,6 +182,7 @@ impl EventFormat for Event { time_partition: None, custom_partition_values, stream_type, + telemetry_type, }) } } diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 9ed9e6052..56d3a676d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::{ + handlers::TelemetryType, metadata::SchemaVersion, storage::StreamType, utils::arrow::{add_parseable_fields, get_field}, @@ -220,6 +221,7 @@ pub trait EventFormat: Sized { schema_version: SchemaVersion, stream_type: StreamType, p_custom_fields: &HashMap, + telemetry_type: TelemetryType, ) -> Result; } diff --git a/src/event/mod.rs b/src/event/mod.rs index aed646927..110ce2828 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use self::error::EventError; use crate::{ LOCK_EXPECT, + handlers::TelemetryType, metadata::update_stats, metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date}, parseable::{PARSEABLE, StagingError}, @@ -52,6 +53,7 @@ pub struct Event { pub time_partition: Option, pub custom_partition_values: HashMap, pub stream_type: StreamType, + pub telemetry_type: TelemetryType, } // Events holds the schema related to a each event for a single log stream @@ -92,7 +94,7 @@ impl Event { // Track billing metrics for event ingestion let date_string = self.parsed_timestamp.date().to_string(); increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string); - increment_events_ingested_size_by_date(self.origin_size, &date_string); + increment_events_ingested_size_by_date(self.origin_size, &date_string, self.telemetry_type); crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 332b4c42d..29527c734 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -100,6 +100,11 @@ struct BillingMetricsCollector { pub total_input_llm_tokens_by_date: HashMap>>, // provider -> model -> date -> count pub total_output_llm_tokens_by_date: HashMap>>, pub total_metrics_collected_by_date: HashMap, + pub total_metrics_collected_size_by_date: HashMap, + pub total_logs_collected_by_date: HashMap, + pub total_logs_collected_size_by_date: HashMap, + pub total_traces_collected_by_date: HashMap, + pub total_traces_collected_size_by_date: HashMap, pub event_time: chrono::NaiveDateTime, } @@ -202,6 +207,41 @@ impl BillingMetricsCollector { &self.total_metrics_collected_by_date, ); } + if !self.total_metrics_collected_size_by_date.is_empty() { + add_simple_metric( + events, + "total_metrics_collected_size", + &self.total_metrics_collected_size_by_date, + ); + } + if !self.total_logs_collected_by_date.is_empty() { + add_simple_metric( + events, + "total_logs_collected", + &self.total_logs_collected_by_date, + ); + } + if !self.total_logs_collected_size_by_date.is_empty() { + add_simple_metric( + events, + "total_logs_collected_size", + &self.total_logs_collected_size_by_date, + ); + } + if !self.total_traces_collected_by_date.is_empty() { + add_simple_metric( + events, + "total_traces_collected", + &self.total_traces_collected_by_date, + ); + } + if !self.total_traces_collected_size_by_date.is_empty() { + add_simple_metric( + events, + "total_traces_collected_size", + &self.total_traces_collected_size_by_date, + ); + } } /// Add object store metrics (method-based) to the events vector @@ -1273,6 +1313,11 @@ fn is_simple_metric(metric: &str) -> bool { | "parseable_total_files_scanned_in_query_by_date" | "parseable_total_bytes_scanned_in_query_by_date" | "parseable_total_metrics_collected_by_date" + | "parseable_total_metrics_collected_size_by_date" + | "parseable_total_logs_collected_by_date" + | "parseable_total_logs_collected_size_by_date" + | "parseable_total_traces_collected_by_date" + | "parseable_total_traces_collected_size_by_date" ) } @@ -1344,6 +1389,31 @@ fn process_simple_metric( .total_metrics_collected_by_date .insert(date.to_string(), value); } + "parseable_total_metrics_collected_size_by_date" => { + collector + .total_metrics_collected_size_by_date + .insert(date.to_string(), value); + } + "parseable_total_logs_collected_by_date" => { + collector + .total_logs_collected_by_date + .insert(date.to_string(), value); + } + "parseable_total_logs_collected_size_by_date" => { + collector + .total_logs_collected_size_by_date + .insert(date.to_string(), value); + } + "parseable_total_traces_collected_by_date" => { + collector + .total_traces_collected_by_date + .insert(date.to_string(), value); + } + "parseable_total_traces_collected_size_by_date" => { + collector + .total_traces_collected_size_by_date + .insert(date.to_string(), value); + } _ => {} } } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 254224eb1..39d11f42e 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -125,7 +125,15 @@ pub async fn ingest( .add_update_log_source(&stream_name, log_source_entry) .await?; - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + None, + telemetry_type, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -149,6 +157,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< SchemaVersion::V0, StreamType::Internal, &p_custom_fields, + TelemetryType::Logs, )? .process()?; @@ -235,6 +244,7 @@ async fn process_otel_content( body: web::Bytes, stream_name: &str, log_source: &LogSource, + telemetry_type: TelemetryType, ) -> Result<(), PostError> { let p_custom_fields = get_custom_fields_from_header(req); @@ -251,6 +261,7 @@ async fn process_otel_content( log_source, &p_custom_fields, None, + telemetry_type, ) .await?; } else if content_type == CONTENT_TYPE_PROTOBUF { @@ -289,7 +300,7 @@ pub async fn handle_otel_logs_ingestion( ) .await?; - process_otel_content(&req, body, &stream_name, &log_source).await?; + process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?; Ok(HttpResponse::Ok().finish()) } @@ -309,7 +320,14 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - process_otel_content(&req, body, &stream_name, &log_source).await?; + process_otel_content( + &req, + body, + &stream_name, + &log_source, + TelemetryType::Metrics, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -329,7 +347,7 @@ pub async fn handle_otel_traces_ingestion( ) .await?; - process_otel_content(&req, body, &stream_name, &log_source).await?; + process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Traces).await?; Ok(HttpResponse::Ok().finish()) } @@ -396,7 +414,15 @@ pub async fn post_event( //return error if the stream log source is otel traces or otel metrics validate_stream_for_ingestion(&stream_name)?; - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + None, + TelemetryType::Logs, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -415,6 +441,7 @@ pub async fn push_logs_unchecked( is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, + telemetry_type: TelemetryType::Logs, }; unchecked_event.process_unchecked()?; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index d544bece4..f9c5be680 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -32,7 +32,7 @@ use crate::{ format::{EventFormat, LogSource, json}, }, handlers::{ - EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, + EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType, http::{ ingest::PostError, kinesis::{Message, flatten_kinesis_logs}, @@ -54,6 +54,7 @@ pub async fn flatten_and_push_logs( log_source: &LogSource, p_custom_fields: &HashMap, time_partition: Option, + telemetry_type: TelemetryType, ) -> Result<(), PostError> { // Verify the dataset fields count verify_dataset_fields_count(stream_name)?; @@ -70,6 +71,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition, + telemetry_type, ) .await?; } @@ -83,6 +85,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition.clone(), + telemetry_type, ) .await?; } @@ -97,6 +100,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition.clone(), + telemetry_type, ) .await?; } @@ -111,6 +115,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition.clone(), + telemetry_type, ) .await?; } @@ -122,6 +127,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition, + telemetry_type, ) .await? } @@ -136,6 +142,7 @@ pub async fn push_logs( log_source: &LogSource, p_custom_fields: &HashMap, time_partition: Option, + telemetry_type: TelemetryType, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition_limit = PARSEABLE @@ -169,6 +176,7 @@ pub async fn push_logs( schema_version, StreamType::UserDefined, p_custom_fields, + telemetry_type, )? .process()?; } diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 784df157c..2b3200f67 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -17,7 +17,10 @@ */ pub mod prom_utils; -use crate::{handlers::http::metrics_path, stats::FullStats}; +use crate::{ + handlers::{TelemetryType, http::metrics_path}, + stats::FullStats, +}; use actix_web::Responder; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use error::MetricsError; @@ -380,7 +383,67 @@ pub static TOTAL_METRICS_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { "Total metrics collected by date", ) .namespace(METRICS_NAMESPACE), - &["date"], + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_METRICS_COLLECTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_metrics_collected_size_by_date", + "Total metrics collected size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_LOGS_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_logs_collected_by_date", + "Total logs collected by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_LOGS_COLLECTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_logs_collected_size_by_date", + "Total logs collected size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_TRACES_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_traces_collected_by_date", + "Total traces collected by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_TRACES_COLLECTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_traces_collected_size_by_date", + "Total traces collected size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], ) .expect("metric can be created") }); @@ -487,6 +550,21 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(TOTAL_METRICS_COLLECTED_BY_DATE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_METRICS_COLLECTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_LOGS_COLLECTED_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_LOGS_COLLECTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_TRACES_COLLECTED_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_TRACES_COLLECTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); } pub fn build_metrics_handler() -> PrometheusMetrics { @@ -553,10 +631,31 @@ pub fn increment_events_ingested_by_date(count: u64, date: &str) { .inc_by(count); } -pub fn increment_events_ingested_size_by_date(size: u64, date: &str) { +pub fn increment_events_ingested_size_by_date( + size: u64, + date: &str, + telemetry_type: TelemetryType, +) { TOTAL_EVENTS_INGESTED_SIZE_BY_DATE .with_label_values(&[date]) .inc_by(size); + match telemetry_type { + TelemetryType::Logs | TelemetryType::Events => { + TOTAL_LOGS_COLLECTED_SIZE_BY_DATE + .with_label_values(&["all", date]) + .inc_by(size); + } + TelemetryType::Metrics => { + TOTAL_METRICS_COLLECTED_SIZE_BY_DATE + .with_label_values(&["all", date]) + .inc_by(size); + } + TelemetryType::Traces => { + TOTAL_TRACES_COLLECTED_SIZE_BY_DATE + .with_label_values(&["all", date]) + .inc_by(size); + } + } } pub fn increment_parquets_stored_by_date(date: &str) { @@ -634,10 +733,22 @@ pub fn increment_reasoning_llm_tokens_by_date( .inc_by(tokens); } -pub fn increment_metrics_collected_by_date(date: &str) { +pub fn increment_metrics_collected_by_date(count: u64, date: &str) { TOTAL_METRICS_COLLECTED_BY_DATE - .with_label_values(&[date]) - .inc(); + .with_label_values(&["all", date]) + .inc_by(count); +} + +pub fn increment_logs_collected_by_date(count: u64, date: &str) { + TOTAL_LOGS_COLLECTED_BY_DATE + .with_label_values(&["all", date]) + .inc_by(count); +} + +pub fn increment_traces_collected_by_date(count: u64, date: &str) { + TOTAL_TRACES_COLLECTED_BY_DATE + .with_label_values(&["all", date]) + .inc_by(count); } use actix_web::HttpResponse; diff --git a/src/otel/logs.rs b/src/otel/logs.rs index a370c4a9e..ae1abeb4e 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -18,6 +18,7 @@ use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; +use crate::metrics::increment_logs_collected_by_date; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; @@ -146,6 +147,9 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json.push(combined_json); } + let date = chrono::Utc::now().date_naive().to_string(); + increment_logs_collected_by_date(scope_log.log_records.len() as u64, &date); + vec_scope_log_json } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index ac2e82083..810baa54d 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -502,8 +502,6 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec data_points_json.push(metric_json); } - let current_date = chrono::Utc::now().date_naive().to_string(); - increment_metrics_collected_by_date(¤t_date); data_points_json } @@ -544,6 +542,9 @@ fn process_resource_metrics( vec_scope_metrics_json.extend(flatten_metrics_record(get_metric(metric))); } + let date = chrono::Utc::now().date_naive().to_string(); + increment_metrics_collected_by_date(metrics.len() as u64, &date); + if let Some(scope) = get_scope(scope_metric) { scope_metrics_json .insert("scope_name".to_string(), Value::String(scope.name.clone())); diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 3ed89c3f8..34eaed13d 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -25,6 +25,7 @@ use opentelemetry_proto::tonic::trace::v1::span::Event; use opentelemetry_proto::tonic::trace::v1::span::Link; use serde_json::{Map, Value}; +use crate::metrics::increment_traces_collected_by_date; use crate::otel::otel_utils::flatten_attributes; use super::otel_utils::convert_epoch_nano_to_timestamp; @@ -74,6 +75,9 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { vec_scope_span_json.extend(span_record_json); } + let date = chrono::Utc::now().date_naive().to_string(); + increment_traces_collected_by_date(scope_span.spans.len() as u64, &date); + if let Some(scope) = &scope_span.scope { scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_span_json.insert( @@ -404,6 +408,7 @@ fn flatten_span_record(span_record: &Span) -> Vec> { } } } + span_records_json } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 521baa626..817a66cd3 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -161,6 +161,7 @@ pub async fn calculate_field_stats( SchemaVersion::V1, StreamType::Internal, &p_custom_fields, + TelemetryType::Logs, )? .process()?; } From 18e455ec6a9cb198e0c4cd2c28ad863466bb0554 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 30 Dec 2025 20:46:24 +1100 Subject: [PATCH 2/2] fix in kafka processor --- src/connectors/kafka/processor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 7727fc1fe..0ff9496c8 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -91,6 +91,7 @@ impl ParseableSinkProcessor { schema_version, StreamType::UserDefined, &p_custom_fields, + TelemetryType::Logs, )?; Ok(p_event)