diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 76b857f0e..b0628197a 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -27,7 +27,10 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::error; use super::EventFormat; @@ -74,6 +77,10 @@ impl EventFormat for Event { _ => unreachable!("flatten would have failed beforehand"), }; + // Rename JSON keys starting with '@' to '_' to match the schema + // Reject event if renaming would cause a key collision + let value_arr = rename_json_keys(value_arr)?; + // collect all the keys from all the json objects in the request body let fields = collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); @@ -257,6 +264,49 @@ fn collect_keys<'a>(values: impl Iterator) -> Result) -> Result, anyhow::Error> { + values + .into_iter() + .map(|value| { + if let Value::Object(map) = value { + // Collect original keys to check for collisions + let original_keys: HashSet = map.keys().cloned().collect(); + + // Check for collisions before renaming + for key in map.keys() { + if key.starts_with('@') { + let mut normalized_key = key.clone(); + super::normalize_field_name(&mut normalized_key); + if original_keys.contains(&normalized_key) { + return Err(anyhow!( + "Key collision detected: '{}' and '{}' would both map to '{}'", + key, + normalized_key, + normalized_key + )); + } + } + } + + let new_map: serde_json::Map = map + .into_iter() + .map(|(mut key, val)| { + if key.starts_with('@') { + super::normalize_field_name(&mut key); + } + (key, val) + }) + .collect(); + Ok(Value::Object(new_map)) + } else { + Ok(value) + } + }) + .collect() +} + fn fields_mismatch( schema: &[Arc], body: &Value, diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 56d3a676d..4157627b6 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -57,6 +57,15 @@ static TIME_FIELD_NAME_PARTS: [&str; 11] = [ ]; type EventSchema = Vec>; +/// Normalizes a field name by replacing leading '@' with '_'. +/// Fields starting with '@' are renamed to start with '_'. +#[inline] +pub fn normalize_field_name(name: &mut String) { + if let Some(stripped) = name.strip_prefix('@') { + *name = format!("_{}", stripped); + } +} + /// Source of the logs, used to perform special processing for certain sources #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum LogSource { @@ -335,7 +344,9 @@ pub fn override_data_type( .fields() .iter() .map(|field| { - let field_name = field.name().as_str(); + // Normalize field names - replace '@' prefix with '_' + let mut field_name = field.name().to_string(); + normalize_field_name(&mut field_name); match (schema_version, map.get(field.name())) { // in V1 for new fields in json named "time"/"date" or such and having inferred // type string, that can be parsed as timestamp, use the timestamp type. diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index f00221643..5c46744ec 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,6 +24,7 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType}; use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; +use tracing::error; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; @@ -543,6 +544,7 @@ impl actix_web::ResponseError for PostError { } fn error_response(&self) -> actix_web::HttpResponse { + error!("{self}"); match self { PostError::MetastoreError(metastore_error) => { actix_web::HttpResponse::build(metastore_error.status_code())