diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 041460992c5..cf5b784f4a1 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -409,6 +409,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con struct azure_kusto_file *chunk; struct mk_list *tmp; struct mk_list *head; + struct mk_list *f_tmp; struct mk_list *f_head; struct flb_fstore_file *fsf; struct flb_fstore_stream *fs_stream; @@ -427,7 +428,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con continue; } - mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; diff --git a/tests/runtime/out_azure_kusto.c b/tests/runtime/out_azure_kusto.c index 6bf8499ba13..9258cadb57c 100644 --- a/tests/runtime/out_azure_kusto.c +++ b/tests/runtime/out_azure_kusto.c @@ -1,5 +1,9 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE 700 +#endif + /* Fluent Bit * ========== * Copyright (C) 2019-2022 The Fluent Bit Authors @@ -20,8 +24,30 @@ #include #include "flb_tests_runtime.h" +#include +#include +#include +#include +#include /* Test data */ + +static int flb_kusto_unlink_cb(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) +{ + return remove(fpath); +} + +static void flb_kusto_rm_rf(const char *path) +{ + struct stat st; + + if (stat(path, &st) != 0) { + return; + } + + nftw(path, flb_kusto_unlink_cb, 64, FTW_DEPTH | FTW_PHYS); +} + #include "data/common/json_invalid.h" /* JSON_INVALID */ /* Test functions */ @@ -30,6 +56,7 @@ void flb_test_azure_kusto_managed_identity_system(void); void flb_test_azure_kusto_managed_identity_user(void); void flb_test_azure_kusto_service_principal(void); void flb_test_azure_kusto_workload_identity(void); +void flb_test_azure_kusto_buffering_backlog(void); /* Test list */ TEST_LIST = { @@ -38,6 +65,7 @@ TEST_LIST = { {"managed_identity_user", flb_test_azure_kusto_managed_identity_user}, {"service_principal", flb_test_azure_kusto_service_principal}, {"workload_identity", flb_test_azure_kusto_workload_identity}, + {"buffering_backlog", flb_test_azure_kusto_buffering_backlog}, {NULL, NULL} }; @@ -210,4 +238,93 @@ void flb_test_azure_kusto_workload_identity(void) flb_stop(ctx); flb_destroy(ctx); +} + +/* Regression: exercise buffering-enabled backlog processing on restart to validate nested mk_list_foreach_safe fix */ +void flb_test_azure_kusto_buffering_backlog(void) +{ + int i; + int ret; + int bytes; + char sample[] = "{\"k\":\"v\"}"; + size_t sample_size = sizeof(sample) - 1; + char buffer_dir[PATH_MAX]; + pid_t pid; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + pid = getpid(); + snprintf(buffer_dir, sizeof(buffer_dir), "/tmp/flb-kusto-test-%d", (int) pid); + + /* Ensure a clean buffer directory before starting */ + flb_kusto_rm_rf(buffer_dir); + ret = mkdir(buffer_dir, 0700); + if (ret != 0) { + perror("mkdir(buffer_dir)"); + TEST_CHECK(ret == 0); + return; + } + + /* First run: enable buffering and write data to disk */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + for (i = 0; i < 5; i++) { + bytes = flb_lib_push(ctx, in_ffd, sample, sample_size); + TEST_CHECK(bytes == (int) sample_size); + } + + sleep(1); /* allow flush to write buffered chunks */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Second run: restart to process backlog from buffer_dir */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(1); /* ingest_all_chunks runs on startup for buffered backlog */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup buffer directory after test */ + flb_kusto_rm_rf(buffer_dir); } \ No newline at end of file