Skip to content

Commit b86629a

Browse files
add metrics collection for logs, traces and metrics separately
add below metrics - 1. total logs collected by date 2. total size of logs collected by date 3. total metrics collected by date 4. total size of metrics collected by date 5. total traces collected by date 6. total size of traces collected by date add all of these to pbilling
1 parent cab9fc0 commit b86629a

File tree

11 files changed

+251
-16
lines changed

11 files changed

+251
-16
lines changed

src/event/format/json.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use std::{collections::HashMap, sync::Arc};
3131
use tracing::error;
3232

3333
use super::EventFormat;
34-
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
34+
use crate::{
35+
handlers::TelemetryType, metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field,
36+
};
3537

3638
pub struct Event {
3739
pub json: Value,
@@ -147,6 +149,7 @@ impl EventFormat for Event {
147149
schema_version: SchemaVersion,
148150
stream_type: StreamType,
149151
p_custom_fields: &HashMap<String, String>,
152+
telemetry_type: TelemetryType,
150153
) -> Result<super::Event, anyhow::Error> {
151154
let custom_partition_values = match custom_partitions.as_ref() {
152155
Some(custom_partition) => {
@@ -179,6 +182,7 @@ impl EventFormat for Event {
179182
time_partition: None,
180183
custom_partition_values,
181184
stream_type,
185+
telemetry_type,
182186
})
183187
}
184188
}

src/event/format/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize};
3131
use serde_json::Value;
3232

3333
use crate::{
34+
handlers::TelemetryType,
3435
metadata::SchemaVersion,
3536
storage::StreamType,
3637
utils::arrow::{add_parseable_fields, get_field},
@@ -220,6 +221,7 @@ pub trait EventFormat: Sized {
220221
schema_version: SchemaVersion,
221222
stream_type: StreamType,
222223
p_custom_fields: &HashMap<String, String>,
224+
telemetry_type: TelemetryType,
223225
) -> Result<Event, AnyError>;
224226
}
225227

src/event/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::sync::Arc;
2727
use self::error::EventError;
2828
use crate::{
2929
LOCK_EXPECT,
30+
handlers::TelemetryType,
3031
metadata::update_stats,
3132
metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date},
3233
parseable::{PARSEABLE, StagingError},
@@ -52,6 +53,7 @@ pub struct Event {
5253
pub time_partition: Option<String>,
5354
pub custom_partition_values: HashMap<String, String>,
5455
pub stream_type: StreamType,
56+
pub telemetry_type: TelemetryType,
5557
}
5658

5759
// Events holds the schema related to a each event for a single log stream
@@ -92,7 +94,7 @@ impl Event {
9294
// Track billing metrics for event ingestion
9395
let date_string = self.parsed_timestamp.date().to_string();
9496
increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string);
95-
increment_events_ingested_size_by_date(self.origin_size, &date_string);
97+
increment_events_ingested_size_by_date(self.origin_size, &date_string, self.telemetry_type);
9698

9799
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
98100

src/handlers/http/cluster/mod.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ struct BillingMetricsCollector {
100100
pub total_input_llm_tokens_by_date: HashMap<String, HashMap<String, HashMap<String, u64>>>, // provider -> model -> date -> count
101101
pub total_output_llm_tokens_by_date: HashMap<String, HashMap<String, HashMap<String, u64>>>,
102102
pub total_metrics_collected_by_date: HashMap<String, u64>,
103+
pub total_metrics_collected_size_by_date: HashMap<String, u64>,
104+
pub total_logs_collected_by_date: HashMap<String, u64>,
105+
pub total_logs_collected_size_by_date: HashMap<String, u64>,
106+
pub total_traces_collected_by_date: HashMap<String, u64>,
107+
pub total_traces_collected_size_by_date: HashMap<String, u64>,
103108
pub event_time: chrono::NaiveDateTime,
104109
}
105110

@@ -202,6 +207,41 @@ impl BillingMetricsCollector {
202207
&self.total_metrics_collected_by_date,
203208
);
204209
}
210+
if !self.total_metrics_collected_size_by_date.is_empty() {
211+
add_simple_metric(
212+
events,
213+
"total_metrics_collected_size",
214+
&self.total_metrics_collected_size_by_date,
215+
);
216+
}
217+
if !self.total_logs_collected_by_date.is_empty() {
218+
add_simple_metric(
219+
events,
220+
"total_logs_collected",
221+
&self.total_logs_collected_by_date,
222+
);
223+
}
224+
if !self.total_logs_collected_size_by_date.is_empty() {
225+
add_simple_metric(
226+
events,
227+
"total_logs_collected_size",
228+
&self.total_logs_collected_size_by_date,
229+
);
230+
}
231+
if !self.total_traces_collected_by_date.is_empty() {
232+
add_simple_metric(
233+
events,
234+
"total_traces_collected",
235+
&self.total_traces_collected_by_date,
236+
);
237+
}
238+
if !self.total_traces_collected_size_by_date.is_empty() {
239+
add_simple_metric(
240+
events,
241+
"total_traces_collected_size",
242+
&self.total_traces_collected_size_by_date,
243+
);
244+
}
205245
}
206246

207247
/// Add object store metrics (method-based) to the events vector
@@ -1273,6 +1313,11 @@ fn is_simple_metric(metric: &str) -> bool {
12731313
| "parseable_total_files_scanned_in_query_by_date"
12741314
| "parseable_total_bytes_scanned_in_query_by_date"
12751315
| "parseable_total_metrics_collected_by_date"
1316+
| "parseable_total_metrics_collected_size_by_date"
1317+
| "parseable_total_logs_collected_by_date"
1318+
| "parseable_total_logs_collected_size_by_date"
1319+
| "parseable_total_traces_collected_by_date"
1320+
| "parseable_total_traces_collected_size_by_date"
12761321
)
12771322
}
12781323

@@ -1344,6 +1389,31 @@ fn process_simple_metric(
13441389
.total_metrics_collected_by_date
13451390
.insert(date.to_string(), value);
13461391
}
1392+
"parseable_total_metrics_collected_size_by_date" => {
1393+
collector
1394+
.total_metrics_collected_size_by_date
1395+
.insert(date.to_string(), value);
1396+
}
1397+
"parseable_total_logs_collected_by_date" => {
1398+
collector
1399+
.total_logs_collected_by_date
1400+
.insert(date.to_string(), value);
1401+
}
1402+
"parseable_total_logs_collected_size_by_date" => {
1403+
collector
1404+
.total_logs_collected_size_by_date
1405+
.insert(date.to_string(), value);
1406+
}
1407+
"parseable_total_traces_collected_by_date" => {
1408+
collector
1409+
.total_traces_collected_by_date
1410+
.insert(date.to_string(), value);
1411+
}
1412+
"parseable_total_traces_collected_size_by_date" => {
1413+
collector
1414+
.total_traces_collected_size_by_date
1415+
.insert(date.to_string(), value);
1416+
}
13471417
_ => {}
13481418
}
13491419
}

src/handlers/http/ingest.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,15 @@ pub async fn ingest(
125125
.add_update_log_source(&stream_name, log_source_entry)
126126
.await?;
127127

128-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
128+
flatten_and_push_logs(
129+
json,
130+
&stream_name,
131+
&log_source,
132+
&p_custom_fields,
133+
None,
134+
telemetry_type,
135+
)
136+
.await?;
129137

130138
Ok(HttpResponse::Ok().finish())
131139
}
@@ -149,6 +157,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
149157
SchemaVersion::V0,
150158
StreamType::Internal,
151159
&p_custom_fields,
160+
TelemetryType::Logs,
152161
)?
153162
.process()?;
154163

@@ -235,6 +244,7 @@ async fn process_otel_content(
235244
body: web::Bytes,
236245
stream_name: &str,
237246
log_source: &LogSource,
247+
telemetry_type: TelemetryType,
238248
) -> Result<(), PostError> {
239249
let p_custom_fields = get_custom_fields_from_header(req);
240250

@@ -251,6 +261,7 @@ async fn process_otel_content(
251261
log_source,
252262
&p_custom_fields,
253263
None,
264+
telemetry_type,
254265
)
255266
.await?;
256267
} else if content_type == CONTENT_TYPE_PROTOBUF {
@@ -289,7 +300,7 @@ pub async fn handle_otel_logs_ingestion(
289300
)
290301
.await?;
291302

292-
process_otel_content(&req, body, &stream_name, &log_source).await?;
303+
process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?;
293304

294305
Ok(HttpResponse::Ok().finish())
295306
}
@@ -309,7 +320,14 @@ pub async fn handle_otel_metrics_ingestion(
309320
)
310321
.await?;
311322

312-
process_otel_content(&req, body, &stream_name, &log_source).await?;
323+
process_otel_content(
324+
&req,
325+
body,
326+
&stream_name,
327+
&log_source,
328+
TelemetryType::Metrics,
329+
)
330+
.await?;
313331

314332
Ok(HttpResponse::Ok().finish())
315333
}
@@ -329,7 +347,7 @@ pub async fn handle_otel_traces_ingestion(
329347
)
330348
.await?;
331349

332-
process_otel_content(&req, body, &stream_name, &log_source).await?;
350+
process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Traces).await?;
333351

334352
Ok(HttpResponse::Ok().finish())
335353
}
@@ -396,7 +414,15 @@ pub async fn post_event(
396414
//return error if the stream log source is otel traces or otel metrics
397415
validate_stream_for_ingestion(&stream_name)?;
398416

399-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
417+
flatten_and_push_logs(
418+
json,
419+
&stream_name,
420+
&log_source,
421+
&p_custom_fields,
422+
None,
423+
TelemetryType::Logs,
424+
)
425+
.await?;
400426

401427
Ok(HttpResponse::Ok().finish())
402428
}
@@ -415,6 +441,7 @@ pub async fn push_logs_unchecked(
415441
is_first_event: true, // NOTE: Maybe should be false
416442
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
417443
stream_type: StreamType::UserDefined,
444+
telemetry_type: TelemetryType::Logs,
418445
};
419446
unchecked_event.process_unchecked()?;
420447

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
format::{EventFormat, LogSource, json},
3333
},
3434
handlers::{
35-
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY,
35+
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType,
3636
http::{
3737
ingest::PostError,
3838
kinesis::{Message, flatten_kinesis_logs},
@@ -54,6 +54,7 @@ pub async fn flatten_and_push_logs(
5454
log_source: &LogSource,
5555
p_custom_fields: &HashMap<String, String>,
5656
time_partition: Option<String>,
57+
telemetry_type: TelemetryType,
5758
) -> Result<(), PostError> {
5859
// Verify the dataset fields count
5960
verify_dataset_fields_count(stream_name)?;
@@ -70,6 +71,7 @@ pub async fn flatten_and_push_logs(
7071
log_source,
7172
p_custom_fields,
7273
time_partition,
74+
telemetry_type,
7375
)
7476
.await?;
7577
}
@@ -83,6 +85,7 @@ pub async fn flatten_and_push_logs(
8385
log_source,
8486
p_custom_fields,
8587
time_partition.clone(),
88+
telemetry_type,
8689
)
8790
.await?;
8891
}
@@ -97,6 +100,7 @@ pub async fn flatten_and_push_logs(
97100
log_source,
98101
p_custom_fields,
99102
time_partition.clone(),
103+
telemetry_type,
100104
)
101105
.await?;
102106
}
@@ -111,6 +115,7 @@ pub async fn flatten_and_push_logs(
111115
log_source,
112116
p_custom_fields,
113117
time_partition.clone(),
118+
telemetry_type,
114119
)
115120
.await?;
116121
}
@@ -122,6 +127,7 @@ pub async fn flatten_and_push_logs(
122127
log_source,
123128
p_custom_fields,
124129
time_partition,
130+
telemetry_type,
125131
)
126132
.await?
127133
}
@@ -136,6 +142,7 @@ pub async fn push_logs(
136142
log_source: &LogSource,
137143
p_custom_fields: &HashMap<String, String>,
138144
time_partition: Option<String>,
145+
telemetry_type: TelemetryType,
139146
) -> Result<(), PostError> {
140147
let stream = PARSEABLE.get_stream(stream_name)?;
141148
let time_partition_limit = PARSEABLE
@@ -169,6 +176,7 @@ pub async fn push_logs(
169176
schema_version,
170177
StreamType::UserDefined,
171178
p_custom_fields,
179+
telemetry_type,
172180
)?
173181
.process()?;
174182
}

0 commit comments

Comments
 (0)