Skip to content

Commit 1f2d3b4

Browse files
fix: use PayloadConfig for OTEL ingest, make payload size configurable (#1499)
Fix OTEL endpoints to use PayloadConfig instead of JsonConfig so large OTEL payloads are handled correctly replace const MAX_EVENT_PAYLOAD_SIZE with configurable value add validation to keep this size > 100 bytes and less than 100 MBs
1 parent 4e67fc6 commit 1f2d3b4

File tree

5 files changed

+46
-9
lines changed

5 files changed

+46
-9
lines changed

src/cli.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,15 @@ pub struct Options {
433433
)]
434434
pub max_field_statistics: usize,
435435

436+
#[arg(
437+
long,
438+
env = "P_MAX_EVENT_PAYLOAD_SIZE",
439+
default_value = "10485760",
440+
value_parser = validation::validate_payload_size,
441+
help = "Maximum allowed event payload size in bytes for ingest endpoints"
442+
)]
443+
pub max_event_payload_size: usize,
444+
436445
// collect dataset stats
437446
#[arg(
438447
long,

src/handlers/http/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ pub mod resource_check;
4949
pub mod role;
5050
pub mod targets;
5151
pub mod users;
52-
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
5352
pub const API_BASE_PATH: &str = "api";
5453
pub const API_VERSION: &str = "v1";
5554
pub const PRISM_BASE_PATH: &str = "prism";
@@ -137,3 +136,7 @@ pub const CACHING_NOTICE: &str = "Caching as a feature has been deprecated";
137136
pub async fn caching_removed() -> impl Responder {
138137
(CACHING_NOTICE, StatusCode::GONE)
139138
}
139+
140+
pub fn max_event_payload_size() -> usize {
141+
PARSEABLE.options.max_event_payload_size
142+
}

src/handlers/http/modal/query_server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use std::thread;
2121

2222
use crate::handlers::airplane;
2323
use crate::handlers::http::cluster;
24+
use crate::handlers::http::logstream;
25+
use crate::handlers::http::max_event_payload_size;
2426
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
2527
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
26-
use crate::handlers::http::{MAX_EVENT_PAYLOAD_SIZE, logstream};
2728
use crate::handlers::http::{base_path, prism_base_path, resource_check};
2829
use crate::handlers::http::{rbac, role};
2930
use crate::hottier::HotTierManager;
@@ -287,7 +288,7 @@ impl QueryServer {
287288
.to(querier_logstream::delete)
288289
.authorize_for_resource(Action::DeleteStream),
289290
)
290-
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
291+
.app_data(web::JsonConfig::default().limit(max_event_payload_size())),
291292
)
292293
.service(
293294
// GET "/logstream/{logstream}/info" ==> Get info for given log stream

src/handlers/http/modal/server.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
2525
use crate::handlers::http::base_path;
2626
use crate::handlers::http::demo_data::get_demo_data;
2727
use crate::handlers::http::health_check;
28+
use crate::handlers::http::max_event_payload_size;
2829
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
2930
use crate::handlers::http::prism_base_path;
3031
use crate::handlers::http::query;
@@ -52,7 +53,7 @@ use tokio::sync::oneshot;
5253

5354
use crate::{
5455
handlers::http::{
55-
self, MAX_EVENT_PAYLOAD_SIZE, ingest, llm, logstream,
56+
self, ingest, llm, logstream,
5657
middleware::{DisAllowRootUser, RouteExt},
5758
oidc, role,
5859
},
@@ -462,7 +463,7 @@ impl Server {
462463
.to(logstream::delete)
463464
.authorize_for_resource(Action::DeleteStream),
464465
)
465-
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
466+
.app_data(web::JsonConfig::default().limit(max_event_payload_size())),
466467
)
467468
.service(
468469
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
@@ -533,7 +534,7 @@ impl Server {
533534
.to(ingest::ingest)
534535
.authorize_for_resource(Action::Ingest),
535536
)
536-
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
537+
.app_data(web::JsonConfig::default().limit(max_event_payload_size()))
537538
}
538539

539540
// /v1/logs endpoint to be used for OTEL log ingestion only
@@ -546,7 +547,7 @@ impl Server {
546547
.to(ingest::handle_otel_logs_ingestion)
547548
.authorize_for_resource(Action::Ingest),
548549
)
549-
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
550+
.app_data(web::PayloadConfig::default().limit(max_event_payload_size())),
550551
)
551552
.service(
552553
web::resource("/metrics")
@@ -555,7 +556,7 @@ impl Server {
555556
.to(ingest::handle_otel_metrics_ingestion)
556557
.authorize_for_resource(Action::Ingest),
557558
)
558-
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
559+
.app_data(web::PayloadConfig::default().limit(max_event_payload_size())),
559560
)
560561
.service(
561562
web::resource("/traces")
@@ -564,7 +565,7 @@ impl Server {
564565
.to(ingest::handle_otel_traces_ingestion)
565566
.authorize_for_resource(Action::Ingest),
566567
)
567-
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
568+
.app_data(web::PayloadConfig::default().limit(max_event_payload_size())),
568569
)
569570
}
570571

src/option.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,4 +209,27 @@ pub mod validation {
209209
Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string())
210210
}
211211
}
212+
213+
pub fn validate_payload_size(s: &str) -> Result<usize, String> {
214+
const MIN_SIZE: usize = 100; // 100 bytes
215+
const MAX_SIZE: usize = 100 * 1024 * 1024; // 100 MB
216+
217+
if let Ok(size) = s.parse::<usize>() {
218+
if size < MIN_SIZE {
219+
Err(format!(
220+
"Invalid value for P_MAX_EVENT_PAYLOAD_SIZE. It must be at least {} bytes",
221+
MIN_SIZE
222+
))
223+
} else if size > MAX_SIZE {
224+
Err(format!(
225+
"Invalid value for P_MAX_EVENT_PAYLOAD_SIZE. It must be at most {} bytes",
226+
MAX_SIZE
227+
))
228+
} else {
229+
Ok(size)
230+
}
231+
} else {
232+
Err("Invalid value for P_MAX_EVENT_PAYLOAD_SIZE. It should be given as integer number of bytes".to_string())
233+
}
234+
}
212235
}

0 commit comments

Comments
 (0)