diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 78cf9f0449..320362d384 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1122,6 +1122,10 @@ const EnvironmentSchema = z EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"), EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000), EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), + EVENTS_CLICKHOUSE_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), + EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT: z.string().default("1"), + EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760), + EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000), EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(), EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"), EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000), diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index e96455bb03..87755a4014 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -64,6 +64,10 @@ export type ClickhouseEventRepositoryConfig = { clickhouse: ClickHouse; batchSize?: number; flushInterval?: number; + insertStrategy?: "insert" | "insert_async"; + waitForAsyncInsert?: boolean; + asyncInsertMaxDataSize?: number; + asyncInsertBusyTimeoutMs?: number; tracer?: Tracer; maximumTraceSummaryViewCount?: number; maximumTraceDetailedSummaryViewCount?: number; @@ -119,7 +123,11 @@ export class ClickhouseEventRepository implements IEventRepository { }); } - const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events); + const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events, { + params: { + clickhouse_settings: this.#getClickhouseInsertSettings(), + }, + }); if (insertError) { throw insertError; @@ -134,6 +142,19 @@ export class ClickhouseEventRepository implements IEventRepository { }); } + #getClickhouseInsertSettings() { + if (this._config.insertStrategy === "insert") { + return {}; + } else { + return { + async_insert: 1 as const, + async_insert_max_data_size: this._config.asyncInsertMaxDataSize?.toString() ?? "10485760", + async_insert_busy_timeout_ms: this._config.asyncInsertBusyTimeoutMs ?? 5000, + wait_for_async_insert: this._config.waitForAsyncInsert ? (1 as const) : (0 as const), + }; + } + } + async #publishToRedis(events: TaskEventV1Input[]) { if (events.length === 0) return; await tracePubSub.publish(events.map((e) => e.trace_id)); diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts index d80ab93e38..4c7945e804 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts @@ -43,6 +43,10 @@ function initializeClickhouseRepository() { maximumTraceDetailedSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, + insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, + waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", + asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, + asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, }); return repository;