diff --git a/apps/webapp/app/v3/eventRepository/index.server.ts b/apps/webapp/app/v3/eventRepository/index.server.ts index 6424fb7c7c..9f06ec1bc3 100644 --- a/apps/webapp/app/v3/eventRepository/index.server.ts +++ b/apps/webapp/app/v3/eventRepository/index.server.ts @@ -38,6 +38,24 @@ export async function getEventRepository( return { repository: eventRepository, store: getTaskEventStore() }; } +export async function getV3EventRepository( + parentStore: string | undefined +): Promise<{ repository: IEventRepository; store: string }> { + if (typeof parentStore === "string") { + if (parentStore === "clickhouse") { + return { repository: clickhouseEventRepository, store: "clickhouse" }; + } else { + return { repository: eventRepository, store: getTaskEventStore() }; + } + } + + if (env.EVENT_REPOSITORY_DEFAULT_STORE === "clickhouse") { + return { repository: clickhouseEventRepository, store: "clickhouse" }; + } else { + return { repository: eventRepository, store: getTaskEventStore() }; + } +} + async function resolveTaskEventRepositoryFlag( featureFlags: Record | undefined ): Promise<"clickhouse" | "postgres"> { diff --git a/apps/webapp/app/v3/services/triggerTaskV1.server.ts b/apps/webapp/app/v3/services/triggerTaskV1.server.ts index 5e6ac7c9f1..9d414f5b43 100644 --- a/apps/webapp/app/v3/services/triggerTaskV1.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV1.server.ts @@ -22,7 +22,7 @@ import { parseDelay } from "~/utils/delays"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { handleMetadataPacket } from "~/utils/packets"; import { marqs } from "~/v3/marqs/index.server"; -import { getEventRepository } from "../eventRepository/index.server"; +import { getV3EventRepository } from "../eventRepository/index.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; @@ -291,8 +291,7 @@ export class TriggerTaskServiceV1 extends BaseService { }) : undefined; - const { repository, store } = await getEventRepository( - environment.organization.featureFlags as Record, + const { repository, store } = await getV3EventRepository( dependentAttempt?.taskRun.taskEventStore ?? parentAttempt?.taskRun.taskEventStore ?? dependentBatchRun?.dependentTaskAttempt?.taskRun.taskEventStore