Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl ParseableSinkProcessor {
schema_version,
StreamType::UserDefined,
&p_custom_fields,
TelemetryType::Logs,
)?;

Ok(p_event)
Expand Down
6 changes: 5 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::EventFormat;
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
use crate::{
handlers::TelemetryType, metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field,
};

pub struct Event {
pub json: Value,
Expand Down Expand Up @@ -147,6 +149,7 @@ impl EventFormat for Event {
schema_version: SchemaVersion,
stream_type: StreamType,
p_custom_fields: &HashMap<String, String>,
telemetry_type: TelemetryType,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
Expand Down Expand Up @@ -179,6 +182,7 @@ impl EventFormat for Event {
time_partition: None,
custom_partition_values,
stream_type,
telemetry_type,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{
handlers::TelemetryType,
metadata::SchemaVersion,
storage::StreamType,
utils::arrow::{add_parseable_fields, get_field},
Expand Down Expand Up @@ -220,6 +221,7 @@ pub trait EventFormat: Sized {
schema_version: SchemaVersion,
stream_type: StreamType,
p_custom_fields: &HashMap<String, String>,
telemetry_type: TelemetryType,
) -> Result<Event, AnyError>;
}

Expand Down
4 changes: 3 additions & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use self::error::EventError;
use crate::{
LOCK_EXPECT,
handlers::TelemetryType,
metadata::update_stats,
metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date},
parseable::{PARSEABLE, StagingError},
Expand All @@ -52,6 +53,7 @@ pub struct Event {
pub time_partition: Option<String>,
pub custom_partition_values: HashMap<String, String>,
pub stream_type: StreamType,
pub telemetry_type: TelemetryType,
}

// Events holds the schema related to a each event for a single log stream
Expand Down Expand Up @@ -92,7 +94,7 @@ impl Event {
// Track billing metrics for event ingestion
let date_string = self.parsed_timestamp.date().to_string();
increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string);
increment_events_ingested_size_by_date(self.origin_size, &date_string);
increment_events_ingested_size_by_date(self.origin_size, &date_string, self.telemetry_type);

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

Expand Down
70 changes: 70 additions & 0 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ struct BillingMetricsCollector {
pub total_input_llm_tokens_by_date: HashMap<String, HashMap<String, HashMap<String, u64>>>, // provider -> model -> date -> count
pub total_output_llm_tokens_by_date: HashMap<String, HashMap<String, HashMap<String, u64>>>,
pub total_metrics_collected_by_date: HashMap<String, u64>,
pub total_metrics_collected_size_by_date: HashMap<String, u64>,
pub total_logs_collected_by_date: HashMap<String, u64>,
pub total_logs_collected_size_by_date: HashMap<String, u64>,
pub total_traces_collected_by_date: HashMap<String, u64>,
pub total_traces_collected_size_by_date: HashMap<String, u64>,
pub event_time: chrono::NaiveDateTime,
}

Expand Down Expand Up @@ -202,6 +207,41 @@ impl BillingMetricsCollector {
&self.total_metrics_collected_by_date,
);
}
if !self.total_metrics_collected_size_by_date.is_empty() {
add_simple_metric(
events,
"total_metrics_collected_size",
&self.total_metrics_collected_size_by_date,
);
}
if !self.total_logs_collected_by_date.is_empty() {
add_simple_metric(
events,
"total_logs_collected",
&self.total_logs_collected_by_date,
);
}
if !self.total_logs_collected_size_by_date.is_empty() {
add_simple_metric(
events,
"total_logs_collected_size",
&self.total_logs_collected_size_by_date,
);
}
if !self.total_traces_collected_by_date.is_empty() {
add_simple_metric(
events,
"total_traces_collected",
&self.total_traces_collected_by_date,
);
}
if !self.total_traces_collected_size_by_date.is_empty() {
add_simple_metric(
events,
"total_traces_collected_size",
&self.total_traces_collected_size_by_date,
);
}
}

/// Add object store metrics (method-based) to the events vector
Expand Down Expand Up @@ -1273,6 +1313,11 @@ fn is_simple_metric(metric: &str) -> bool {
| "parseable_total_files_scanned_in_query_by_date"
| "parseable_total_bytes_scanned_in_query_by_date"
| "parseable_total_metrics_collected_by_date"
| "parseable_total_metrics_collected_size_by_date"
| "parseable_total_logs_collected_by_date"
| "parseable_total_logs_collected_size_by_date"
| "parseable_total_traces_collected_by_date"
| "parseable_total_traces_collected_size_by_date"
)
}

Expand Down Expand Up @@ -1344,6 +1389,31 @@ fn process_simple_metric(
.total_metrics_collected_by_date
.insert(date.to_string(), value);
}
"parseable_total_metrics_collected_size_by_date" => {
collector
.total_metrics_collected_size_by_date
.insert(date.to_string(), value);
}
"parseable_total_logs_collected_by_date" => {
collector
.total_logs_collected_by_date
.insert(date.to_string(), value);
}
"parseable_total_logs_collected_size_by_date" => {
collector
.total_logs_collected_size_by_date
.insert(date.to_string(), value);
}
"parseable_total_traces_collected_by_date" => {
collector
.total_traces_collected_by_date
.insert(date.to_string(), value);
}
"parseable_total_traces_collected_size_by_date" => {
collector
.total_traces_collected_size_by_date
.insert(date.to_string(), value);
}
_ => {}
}
}
Expand Down
37 changes: 32 additions & 5 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,15 @@ pub async fn ingest(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
flatten_and_push_logs(
json,
&stream_name,
&log_source,
&p_custom_fields,
None,
telemetry_type,
)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -149,6 +157,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
SchemaVersion::V0,
StreamType::Internal,
&p_custom_fields,
TelemetryType::Logs,
)?
.process()?;

Expand Down Expand Up @@ -235,6 +244,7 @@ async fn process_otel_content(
body: web::Bytes,
stream_name: &str,
log_source: &LogSource,
telemetry_type: TelemetryType,
) -> Result<(), PostError> {
let p_custom_fields = get_custom_fields_from_header(req);

Expand All @@ -251,6 +261,7 @@ async fn process_otel_content(
log_source,
&p_custom_fields,
None,
telemetry_type,
)
.await?;
} else if content_type == CONTENT_TYPE_PROTOBUF {
Expand Down Expand Up @@ -289,7 +300,7 @@ pub async fn handle_otel_logs_ingestion(
)
.await?;

process_otel_content(&req, body, &stream_name, &log_source).await?;
process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -309,7 +320,14 @@ pub async fn handle_otel_metrics_ingestion(
)
.await?;

process_otel_content(&req, body, &stream_name, &log_source).await?;
process_otel_content(
&req,
body,
&stream_name,
&log_source,
TelemetryType::Metrics,
)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -329,7 +347,7 @@ pub async fn handle_otel_traces_ingestion(
)
.await?;

process_otel_content(&req, body, &stream_name, &log_source).await?;
process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Traces).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -396,7 +414,15 @@ pub async fn post_event(
//return error if the stream log source is otel traces or otel metrics
validate_stream_for_ingestion(&stream_name)?;

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
flatten_and_push_logs(
json,
&stream_name,
&log_source,
&p_custom_fields,
None,
TelemetryType::Logs,
)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -415,6 +441,7 @@ pub async fn push_logs_unchecked(
is_first_event: true, // NOTE: Maybe should be false
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
stream_type: StreamType::UserDefined,
telemetry_type: TelemetryType::Logs,
};
unchecked_event.process_unchecked()?;

Expand Down
10 changes: 9 additions & 1 deletion src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
format::{EventFormat, LogSource, json},
},
handlers::{
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY,
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType,
http::{
ingest::PostError,
kinesis::{Message, flatten_kinesis_logs},
Expand All @@ -54,6 +54,7 @@ pub async fn flatten_and_push_logs(
log_source: &LogSource,
p_custom_fields: &HashMap<String, String>,
time_partition: Option<String>,
telemetry_type: TelemetryType,
) -> Result<(), PostError> {
// Verify the dataset fields count
verify_dataset_fields_count(stream_name)?;
Expand All @@ -70,6 +71,7 @@ pub async fn flatten_and_push_logs(
log_source,
p_custom_fields,
time_partition,
telemetry_type,
)
.await?;
}
Expand All @@ -83,6 +85,7 @@ pub async fn flatten_and_push_logs(
log_source,
p_custom_fields,
time_partition.clone(),
telemetry_type,
)
.await?;
}
Expand All @@ -97,6 +100,7 @@ pub async fn flatten_and_push_logs(
log_source,
p_custom_fields,
time_partition.clone(),
telemetry_type,
)
.await?;
}
Expand All @@ -111,6 +115,7 @@ pub async fn flatten_and_push_logs(
log_source,
p_custom_fields,
time_partition.clone(),
telemetry_type,
)
.await?;
}
Expand All @@ -122,6 +127,7 @@ pub async fn flatten_and_push_logs(
log_source,
p_custom_fields,
time_partition,
telemetry_type,
)
.await?
}
Expand All @@ -136,6 +142,7 @@ pub async fn push_logs(
log_source: &LogSource,
p_custom_fields: &HashMap<String, String>,
time_partition: Option<String>,
telemetry_type: TelemetryType,
) -> Result<(), PostError> {
let stream = PARSEABLE.get_stream(stream_name)?;
let time_partition_limit = PARSEABLE
Expand Down Expand Up @@ -169,6 +176,7 @@ pub async fn push_logs(
schema_version,
StreamType::UserDefined,
p_custom_fields,
telemetry_type,
)?
.process()?;
}
Expand Down
Loading
Loading