From a9b7e04e466422549b5d7aee29ab5e9fb4ea5407 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 29 Dec 2025 18:20:00 +1100 Subject: [PATCH] fix: use PayloadConfig for OTEL ingest, make payload size configurable Fix OTEL endpoints to use PayloadConfig instead of JsonConfig so large OTEL payloads are handled correctly replace const MAX_EVENT_PAYLOAD_SIZE with configurable value add validation to keep this size > 100 bytes and less than 100 MBs --- src/cli.rs | 9 +++++++++ src/handlers/http/mod.rs | 5 ++++- src/handlers/http/modal/query_server.rs | 5 +++-- src/handlers/http/modal/server.rs | 13 +++++++------ src/option.rs | 23 +++++++++++++++++++++++ 5 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index b06284359..61b6fa5b7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -433,6 +433,15 @@ pub struct Options { )] pub max_field_statistics: usize, + #[arg( + long, + env = "P_MAX_EVENT_PAYLOAD_SIZE", + default_value = "10485760", + value_parser = validation::validate_payload_size, + help = "Maximum allowed event payload size in bytes for ingest endpoints" + )] + pub max_event_payload_size: usize, + // collect dataset stats #[arg( long, diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index b84d7dc43..b49bee74e 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -49,7 +49,6 @@ pub mod resource_check; pub mod role; pub mod targets; pub mod users; -pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; pub const PRISM_BASE_PATH: &str = "prism"; @@ -137,3 +136,7 @@ pub const CACHING_NOTICE: &str = "Caching as a feature has been deprecated"; pub async fn caching_removed() -> impl Responder { (CACHING_NOTICE, StatusCode::GONE) } + +pub fn max_event_payload_size() -> usize { + PARSEABLE.options.max_event_payload_size +} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 577da542f..de7a55f88 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -21,9 +21,10 @@ use std::thread; use crate::handlers::airplane; use crate::handlers::http::cluster; +use crate::handlers::http::logstream; +use crate::handlers::http::max_event_payload_size; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup; -use crate::handlers::http::{MAX_EVENT_PAYLOAD_SIZE, logstream}; use crate::handlers::http::{base_path, prism_base_path, resource_check}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; @@ -287,7 +288,7 @@ impl QueryServer { .to(querier_logstream::delete) .authorize_for_resource(Action::DeleteStream), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::JsonConfig::default().limit(max_event_payload_size())), ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 11408db8a..c288e9d5d 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -25,6 +25,7 @@ use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::demo_data::get_demo_data; use crate::handlers::http::health_check; +use crate::handlers::http::max_event_payload_size; use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; @@ -52,7 +53,7 @@ use tokio::sync::oneshot; use crate::{ handlers::http::{ - self, MAX_EVENT_PAYLOAD_SIZE, ingest, llm, logstream, + self, ingest, llm, logstream, middleware::{DisAllowRootUser, RouteExt}, oidc, role, }, @@ -462,7 +463,7 @@ impl Server { .to(logstream::delete) .authorize_for_resource(Action::DeleteStream), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::JsonConfig::default().limit(max_event_payload_size())), ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream @@ -533,7 +534,7 @@ impl Server { .to(ingest::ingest) .authorize_for_resource(Action::Ingest), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + .app_data(web::JsonConfig::default().limit(max_event_payload_size())) } // /v1/logs endpoint to be used for OTEL log ingestion only @@ -546,7 +547,7 @@ impl Server { .to(ingest::handle_otel_logs_ingestion) .authorize_for_resource(Action::Ingest), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::PayloadConfig::default().limit(max_event_payload_size())), ) .service( web::resource("/metrics") @@ -555,7 +556,7 @@ impl Server { .to(ingest::handle_otel_metrics_ingestion) .authorize_for_resource(Action::Ingest), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::PayloadConfig::default().limit(max_event_payload_size())), ) .service( web::resource("/traces") @@ -564,7 +565,7 @@ impl Server { .to(ingest::handle_otel_traces_ingestion) .authorize_for_resource(Action::Ingest), ) - .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + .app_data(web::PayloadConfig::default().limit(max_event_payload_size())), ) } diff --git a/src/option.rs b/src/option.rs index 8d8b6bccc..fbd682d17 100644 --- a/src/option.rs +++ b/src/option.rs @@ -209,4 +209,27 @@ pub mod validation { Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string()) } } + + pub fn validate_payload_size(s: &str) -> Result { + const MIN_SIZE: usize = 100; // 100 bytes + const MAX_SIZE: usize = 100 * 1024 * 1024; // 100 MB + + if let Ok(size) = s.parse::() { + if size < MIN_SIZE { + Err(format!( + "Invalid value for P_MAX_EVENT_PAYLOAD_SIZE. It must be at least {} bytes", + MIN_SIZE + )) + } else if size > MAX_SIZE { + Err(format!( + "Invalid value for P_MAX_EVENT_PAYLOAD_SIZE. It must be at most {} bytes", + MAX_SIZE + )) + } else { + Ok(size) + } + } else { + Err("Invalid value for P_MAX_EVENT_PAYLOAD_SIZE. It should be given as integer number of bytes".to_string()) + } + } }