From 0df2a2decf65d4d8d939cf3ff408c2ad4fab6013 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 8 Jan 2026 21:23:06 +1100 Subject: [PATCH 1/3] normalise field name: change prefix from @ to _ all fields prefixed with `@` will be renamed to have `_` prefix this is to make field queryable --- src/event/format/json.rs | 28 +++++++++++++++++++++++++++- src/event/format/mod.rs | 14 +++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 76b857f0e..a7ec53961 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -118,6 +118,9 @@ impl EventFormat for Event { )); } + // Rename JSON keys starting with '@' to '_' to match the schema + let value_arr = rename_json_keys(value_arr); + Ok((value_arr, schema, is_first)) } @@ -257,6 +260,27 @@ fn collect_keys<'a>(values: impl Iterator) -> Result) -> Vec { + values + .into_iter() + .map(|value| { + if let Value::Object(map) = value { + let new_map: serde_json::Map = map + .into_iter() + .map(|(key, val)| { + let new_key = super::normalize_field_name(&key); + (new_key, val) + }) + .collect(); + Value::Object(new_map) + } else { + value + } + }) + .collect() +} + fn fields_mismatch( schema: &[Arc], body: &Value, @@ -267,7 +291,9 @@ fn fields_mismatch( if val.is_null() { continue; } - let Some(field) = get_field(schema, name) else { + // Normalize field name to match schema transformation + let lookup_name = super::normalize_field_name(name); + let Some(field) = get_field(schema, &lookup_name) else { return true; }; if !valid_type(field, val, schema_version, static_schema_flag) { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 56d3a676d..d5ba9568b 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -57,6 +57,17 @@ static TIME_FIELD_NAME_PARTS: [&str; 11] = [ ]; type EventSchema = Vec>; +/// Normalizes a field name by replacing leading '@' with '_'. +/// Fields starting with '@' are renamed to start with '_'. +#[inline] +pub fn normalize_field_name(name: &str) -> String { + if let Some(stripped) = name.strip_prefix('@') { + format!("_{}", stripped) + } else { + name.to_string() + } +} + /// Source of the logs, used to perform special processing for certain sources #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum LogSource { @@ -335,7 +346,8 @@ pub fn override_data_type( .fields() .iter() .map(|field| { - let field_name = field.name().as_str(); + // Normalize field names - replace '@' prefix with '_' + let field_name = normalize_field_name(field.name()); match (schema_version, map.get(field.name())) { // in V1 for new fields in json named "time"/"date" or such and having inferred // type string, that can be parsed as timestamp, use the timestamp type. From 8a2a412411c9f1145a77bead98b4758ada197c73 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 8 Jan 2026 23:04:59 +1100 Subject: [PATCH 2/3] avoid extra allocation by mutating the var --- src/event/format/json.rs | 9 +++++---- src/event/format/mod.rs | 9 ++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index a7ec53961..b782d1f3c 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -268,9 +268,9 @@ fn rename_json_keys(values: Vec) -> Vec { if let Value::Object(map) = value { let new_map: serde_json::Map = map .into_iter() - .map(|(key, val)| { - let new_key = super::normalize_field_name(&key); - (new_key, val) + .map(|(mut key, val)| { + super::normalize_field_name(&mut key); + (key, val) }) .collect(); Value::Object(new_map) @@ -292,7 +292,8 @@ fn fields_mismatch( continue; } // Normalize field name to match schema transformation - let lookup_name = super::normalize_field_name(name); + let mut lookup_name = name.to_string(); + super::normalize_field_name(&mut lookup_name); let Some(field) = get_field(schema, &lookup_name) else { return true; }; diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index d5ba9568b..4157627b6 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -60,11 +60,9 @@ type EventSchema = Vec>; /// Normalizes a field name by replacing leading '@' with '_'. /// Fields starting with '@' are renamed to start with '_'. #[inline] -pub fn normalize_field_name(name: &str) -> String { +pub fn normalize_field_name(name: &mut String) { if let Some(stripped) = name.strip_prefix('@') { - format!("_{}", stripped) - } else { - name.to_string() + *name = format!("_{}", stripped); } } @@ -347,7 +345,8 @@ pub fn override_data_type( .iter() .map(|field| { // Normalize field names - replace '@' prefix with '_' - let field_name = normalize_field_name(field.name()); + let mut field_name = field.name().to_string(); + normalize_field_name(&mut field_name); match (schema_version, map.get(field.name())) { // in V1 for new fields in json named "time"/"date" or such and having inferred // type string, that can be parsed as timestamp, use the timestamp type. From 0e0a8f89b4adb5f9aeafe6936491915865bedd58 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 14 Jan 2026 21:02:38 +1100 Subject: [PATCH 3/3] reject event on key collision, log error on ingestion failure --- src/event/format/json.rs | 49 +++++++++++++++++++++++++++---------- src/handlers/http/ingest.rs | 2 ++ 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index b782d1f3c..b0628197a 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -27,7 +27,10 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::error; use super::EventFormat; @@ -74,6 +77,10 @@ impl EventFormat for Event { _ => unreachable!("flatten would have failed beforehand"), }; + // Rename JSON keys starting with '@' to '_' to match the schema + // Reject event if renaming would cause a key collision + let value_arr = rename_json_keys(value_arr)?; + // collect all the keys from all the json objects in the request body let fields = collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); @@ -118,9 +125,6 @@ impl EventFormat for Event { )); } - // Rename JSON keys starting with '@' to '_' to match the schema - let value_arr = rename_json_keys(value_arr); - Ok((value_arr, schema, is_first)) } @@ -260,22 +264,44 @@ fn collect_keys<'a>(values: impl Iterator) -> Result) -> Vec { +/// Renames JSON keys to match the schema transformation using normalize_field_name. +/// Returns an error if renaming would cause a key collision. +fn rename_json_keys(values: Vec) -> Result, anyhow::Error> { values .into_iter() .map(|value| { if let Value::Object(map) = value { + // Collect original keys to check for collisions + let original_keys: HashSet = map.keys().cloned().collect(); + + // Check for collisions before renaming + for key in map.keys() { + if key.starts_with('@') { + let mut normalized_key = key.clone(); + super::normalize_field_name(&mut normalized_key); + if original_keys.contains(&normalized_key) { + return Err(anyhow!( + "Key collision detected: '{}' and '{}' would both map to '{}'", + key, + normalized_key, + normalized_key + )); + } + } + } + let new_map: serde_json::Map = map .into_iter() .map(|(mut key, val)| { - super::normalize_field_name(&mut key); + if key.starts_with('@') { + super::normalize_field_name(&mut key); + } (key, val) }) .collect(); - Value::Object(new_map) + Ok(Value::Object(new_map)) } else { - value + Ok(value) } }) .collect() @@ -291,10 +317,7 @@ fn fields_mismatch( if val.is_null() { continue; } - // Normalize field name to match schema transformation - let mut lookup_name = name.to_string(); - super::normalize_field_name(&mut lookup_name); - let Some(field) = get_field(schema, &lookup_name) else { + let Some(field) = get_field(schema, name) else { return true; }; if !valid_type(field, val, schema_version, static_schema_flag) { diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index f00221643..5c46744ec 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,6 +24,7 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType}; use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; +use tracing::error; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; @@ -543,6 +544,7 @@ impl actix_web::ResponseError for PostError { } fn error_response(&self) -> actix_web::HttpResponse { + error!("{self}"); match self { PostError::MetastoreError(metastore_error) => { actix_web::HttpResponse::build(metastore_error.status_code())