Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, Metadata, Tags};
use super::{EventFormat, LogSource, Metadata, Tags};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
Expand All @@ -52,7 +52,7 @@ impl EventFormat for Event {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(
self.data,
Expand Down
36 changes: 34 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@ type Tags = String;
type Metadata = String;
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub enum LogSource {
// AWS Kinesis sends logs in the format of a json array
Kinesis,
// OpenTelemetry sends logs according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
OtelLogs,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
OtelMetrics,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
OtelTraces,
#[default]
// Json object or array
Json,
Custom(String),
}

impl From<&str> for LogSource {
fn from(s: &str) -> Self {
match s {
"kinesis" => LogSource::Kinesis,
"otel-logs" => LogSource::OtelLogs,
"otel-metrics" => LogSource::OtelMetrics,
"otel-traces" => LogSource::OtelTraces,
custom => LogSource::Custom(custom.to_owned()),
}
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand All @@ -54,7 +86,7 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
Expand All @@ -65,7 +97,7 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema,
Expand Down
100 changes: 70 additions & 30 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use crate::event::format::LogSource;
use crate::event::{
self,
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::{
LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES,
STREAM_NAME_HEADER_KEY,
};
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::{SchemaVersion, STREAM_INFO};
use crate::option::{Mode, CONFIG};
Expand Down Expand Up @@ -95,7 +93,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
metadata: String::default(),
};
// For internal streams, use old schema
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, "")?
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, &LogSource::default())?
};
event::Event {
rb,
Expand Down Expand Up @@ -127,8 +125,8 @@ pub async fn handle_otel_logs_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_LOGS {
let log_source = LogSource::from(log_source.to_str().unwrap());
if log_source != LogSource::OtelLogs {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-logs for ingesting otel logs"
)));
Expand All @@ -142,7 +140,7 @@ pub async fn handle_otel_logs_ingestion(
let mut json = flatten_otel_logs(&logs);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body, log_source).await?;
push_logs(&stream_name, &req, &body, &log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand All @@ -161,8 +159,8 @@ pub async fn handle_otel_metrics_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_METRICS {
let log_source = LogSource::from(log_source.to_str().unwrap());
if log_source != LogSource::OtelMetrics {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-metrics for ingesting otel metrics"
)));
Expand All @@ -175,7 +173,7 @@ pub async fn handle_otel_metrics_ingestion(
let mut json = flatten_otel_metrics(metrics);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body, log_source).await?;
push_logs(&stream_name, &req, &body, &log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand All @@ -195,8 +193,8 @@ pub async fn handle_otel_traces_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_TRACES {
let log_source = LogSource::from(log_source.to_str().unwrap());
if log_source != LogSource::OtelTraces {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-traces for ingesting otel traces"
)));
Expand All @@ -209,7 +207,7 @@ pub async fn handle_otel_traces_ingestion(
let mut json = flatten_otel_traces(&traces);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body, log_source).await?;
push_logs(&stream_name, &req, &body, &log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -371,7 +369,7 @@ mod tests {
use serde_json::json;

use crate::{
event,
event::{self, format::LogSource},
handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS},
metadata::SchemaVersion,
};
Expand Down Expand Up @@ -420,7 +418,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -471,7 +469,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -505,8 +503,16 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
let (rb, _) = into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -538,7 +544,16 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
assert!(into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
}

#[test]
Expand All @@ -556,8 +571,16 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
let (rb, _) = into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -576,7 +599,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
""
&LogSource::default()
)
.is_err())
}
Expand Down Expand Up @@ -608,7 +631,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -665,7 +688,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -715,8 +738,16 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
let (rb, _) = into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -765,7 +796,16 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
assert!(into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
}

#[test]
Expand Down Expand Up @@ -800,7 +840,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -881,7 +921,7 @@ mod tests {
None,
None,
SchemaVersion::V1,
"",
&LogSource::default(),
)
.unwrap();

Expand Down
Loading
Loading