From 2ce33db5e6180fd9660bcd64910b53112eb7be4c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 12 Dec 2025 14:56:26 +0900 Subject: [PATCH 1/2] in_emitter: Add backpressure limitation for filesystem storage Signed-off-by: Hiroshi Hatake --- plugins/in_emitter/emitter.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 5d1bd44e876..af957ecb451 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -318,6 +319,29 @@ static int cb_emitter_init(struct flb_input_instance *in, return -1; } + /* + * The emitter is used internally by filters such as rewrite_tag. When the + * downstream outputs experience backpressure, the emitter needs to pause + * its upstream senders to avoid holding an arbitrary number of "up" + * chunks in memory. Without pausing on the filesystem storage limit, the + * emitter can continue to accumulate in-memory chunks (for example, in a + * rewrite_tag pipeline) even though storage.max_chunks_up intends to cap + * usage. Enable pausing on the storage chunks limit by default when + * filesystem storage is in use so the configured storage.max_chunks_up + * limit is honored. + */ + if (in->storage_type == FLB_STORAGE_FS && + in->storage_pause_on_chunks_overlimit == FLB_FALSE) { + in->storage_pause_on_chunks_overlimit = FLB_TRUE; + flb_plg_debug(in, "enable pause on storage chunks overlimit for emitter"); + } + else if (in->storage_type != FLB_STORAGE_FS && + in->storage_pause_on_chunks_overlimit == FLB_TRUE) { + flb_plg_debug(in, + "storage.pause_on_chunks_overlimit reset: storage.type is not filesystem"); + in->storage_pause_on_chunks_overlimit = FLB_FALSE; + } + if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) { ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY; flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)", From f979501d31708e5a44d3906549023fb0bead1d49 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 12 Dec 2025 15:25:07 +0900 Subject: [PATCH 2/2] in_emitter: Address coderabbitai comments Signed-off-by: Hiroshi Hatake --- plugins/in_emitter/emitter.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index af957ecb451..92803b69e78 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -298,6 +298,7 @@ static int cb_emitter_init(struct flb_input_instance *in, { struct flb_sched *scheduler; struct flb_emitter *ctx; + char *pause_prop = NULL; int ret; scheduler = flb_sched_ctx_get(); @@ -330,16 +331,13 @@ static int cb_emitter_init(struct flb_input_instance *in, * filesystem storage is in use so the configured storage.max_chunks_up * limit is honored. */ - if (in->storage_type == FLB_STORAGE_FS && - in->storage_pause_on_chunks_overlimit == FLB_FALSE) { - in->storage_pause_on_chunks_overlimit = FLB_TRUE; - flb_plg_debug(in, "enable pause on storage chunks overlimit for emitter"); - } - else if (in->storage_type != FLB_STORAGE_FS && - in->storage_pause_on_chunks_overlimit == FLB_TRUE) { - flb_plg_debug(in, - "storage.pause_on_chunks_overlimit reset: storage.type is not filesystem"); - in->storage_pause_on_chunks_overlimit = FLB_FALSE; + pause_prop = flb_input_get_property("storage.pause_on_chunks_overlimit", in); + if (pause_prop == NULL) { + if (in->storage_type == FLB_STORAGE_FS && + in->storage_pause_on_chunks_overlimit == FLB_FALSE) { + in->storage_pause_on_chunks_overlimit = FLB_TRUE; + flb_plg_debug(in, "enable pause on storage chunks overlimit for emitter"); + } } if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) {