Skip to content

Commit eaeaa1e

Browse files
committed
use feature flags to determine the run taskEvent store
1 parent 3295f35 commit eaeaa1e

File tree

11 files changed

+142
-40
lines changed

11 files changed

+142
-40
lines changed

.vscode/settings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77
"packages/cli-v3/e2e": true
88
},
99
"vitest.disableWorkspaceWarning": true,
10-
"typescript.experimental.useTsgo": true
10+
"typescript.experimental.useTsgo": false
1111
}

apps/webapp/app/env.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,19 @@ const EnvironmentSchema = z
11091109
CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
11101110
CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
11111111

1112+
EVENTS_CLICKHOUSE_URL: z
1113+
.string()
1114+
.optional()
1115+
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
1116+
EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
1117+
EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1118+
EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1119+
EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
1120+
EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
1121+
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
1122+
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
1123+
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
1124+
11121125
// Bootstrap
11131126
TRIGGER_BOOTSTRAP_ENABLED: z.string().default("0"),
11141127
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: z.string().optional(),

apps/webapp/app/runEngine/concerns/traceEvents.server.ts

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,48 @@
11
import { EventRepository } from "~/v3/eventRepository.server";
22
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
33
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
4-
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
54
import { TaskRun } from "@trigger.dev/database";
5+
import { getTaskEventStore } from "~/v3/taskEventStore.server";
6+
import { ClickhouseEventRepository } from "~/v3/clickhouseEventRepository.server";
7+
import { IEventRepository } from "~/v3/eventRepository.types";
8+
import { FEATURE_FLAG, flags } from "~/v3/featureFlags.server";
9+
import { env } from "~/env.server";
610

711
export class DefaultTraceEventsConcern implements TraceEventConcern {
812
private readonly eventRepository: EventRepository;
13+
private readonly clickhouseEventRepository: ClickhouseEventRepository;
914

10-
constructor(eventRepository: EventRepository) {
15+
constructor(
16+
eventRepository: EventRepository,
17+
clickhouseEventRepository: ClickhouseEventRepository
18+
) {
1119
this.eventRepository = eventRepository;
20+
this.clickhouseEventRepository = clickhouseEventRepository;
21+
}
22+
23+
async #getEventRepository(
24+
request: TriggerTaskRequest
25+
): Promise<{ repository: IEventRepository; store: string }> {
26+
const taskEventRepository = await flags({
27+
key: FEATURE_FLAG.taskEventRepository,
28+
defaultValue: env.EVENT_REPOSITORY_DEFAULT_STORE,
29+
overrides: request.environment.organization.featureFlags as Record<string, unknown>,
30+
});
31+
32+
if (taskEventRepository === "clickhouse") {
33+
return { repository: this.clickhouseEventRepository, store: "clickhouse" };
34+
}
35+
36+
return { repository: this.eventRepository, store: getTaskEventStore() };
1237
}
1338

1439
async traceRun<T>(
1540
request: TriggerTaskRequest,
16-
callback: (span: TracedEventSpan) => Promise<T>
41+
callback: (span: TracedEventSpan, store: string) => Promise<T>
1742
): Promise<T> {
18-
return await this.eventRepository.traceEvent(
43+
const { repository, store } = await this.#getEventRepository(request);
44+
45+
return await repository.traceEvent(
1946
request.taskId,
2047
{
2148
context: request.options?.traceContext,
@@ -38,14 +65,17 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
3865
: undefined,
3966
},
4067
async (event, traceContext, traceparent) => {
41-
return await callback({
42-
traceId: event.traceId,
43-
spanId: event.spanId,
44-
traceContext,
45-
traceparent,
46-
setAttribute: (key, value) => event.setAttribute(key as any, value),
47-
failWithError: event.failWithError.bind(event),
48-
});
68+
return await callback(
69+
{
70+
traceId: event.traceId,
71+
spanId: event.spanId,
72+
traceContext,
73+
traceparent,
74+
setAttribute: (key, value) => event.setAttribute(key as any, value),
75+
failWithError: event.failWithError.bind(event),
76+
},
77+
store
78+
);
4979
}
5080
);
5181
}
@@ -58,11 +88,12 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
5888
incomplete: boolean;
5989
isError: boolean;
6090
},
61-
callback: (span: TracedEventSpan) => Promise<T>
91+
callback: (span: TracedEventSpan, store: string) => Promise<T>
6292
): Promise<T> {
6393
const { existingRun, idempotencyKey, incomplete, isError } = options;
94+
const { repository, store } = await this.#getEventRepository(request);
6495

65-
return await this.eventRepository.traceEvent(
96+
return await repository.traceEvent(
6697
`${request.taskId} (cached)`,
6798
{
6899
context: request.options?.traceContext,
@@ -99,14 +130,17 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
99130
}
100131
);
101132

102-
return await callback({
103-
traceId: event.traceId,
104-
spanId: event.spanId,
105-
traceContext,
106-
traceparent,
107-
setAttribute: (key, value) => event.setAttribute(key as any, value),
108-
failWithError: event.failWithError.bind(event),
109-
});
133+
return await callback(
134+
{
135+
traceId: event.traceId,
136+
spanId: event.spanId,
137+
traceContext,
138+
traceparent,
139+
setAttribute: (key, value) => event.setAttribute(key as any, value),
140+
failWithError: event.failWithError.bind(event),
141+
},
142+
store
143+
);
110144
}
111145
);
112146
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import type {
2929
TriggerTaskServiceOptions,
3030
TriggerTaskServiceResult,
3131
} from "../../v3/services/triggerTask.server";
32-
import { getTaskEventStore } from "../../v3/taskEventStore.server";
3332
import { clampMaxDuration } from "../../v3/utils/maxDuration";
3433
import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server";
3534
import type {
@@ -267,7 +266,7 @@ export class RunEngineTriggerTaskService {
267266
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
268267

269268
try {
270-
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {
269+
return await this.traceEventConcern.traceRun(triggerRequest, async (event, store) => {
271270
const result = await this.runNumberIncrementer.incrementRunNumber(
272271
triggerRequest,
273272
async (num) => {
@@ -311,7 +310,7 @@ export class RunEngineTriggerTaskService {
311310
delayUntil,
312311
queuedAt: delayUntil ? undefined : new Date(),
313312
maxAttempts: body.options?.maxAttempts,
314-
taskEventStore: getTaskEventStore(),
313+
taskEventStore: store,
315314
ttl,
316315
tags,
317316
oneTimeUseToken: options.oneTimeUseToken,

apps/webapp/app/runEngine/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ export type TracedEventSpan = {
143143
export interface TraceEventConcern {
144144
traceRun<T>(
145145
request: TriggerTaskRequest,
146-
callback: (span: TracedEventSpan) => Promise<T>
146+
callback: (span: TracedEventSpan, store: string) => Promise<T>
147147
): Promise<T>;
148148
traceIdempotentRun<T>(
149149
request: TriggerTaskRequest,
@@ -153,7 +153,7 @@ export interface TraceEventConcern {
153153
incomplete: boolean;
154154
isError: boolean;
155155
},
156-
callback: (span: TracedEventSpan) => Promise<T>
156+
callback: (span: TracedEventSpan, store: string) => Promise<T>
157157
): Promise<T>;
158158
}
159159

apps/webapp/app/v3/clickhouseEventRepository.server.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,3 @@ export class ClickhouseEventRepository implements IEventRepository {
197197
throw new Error("ClickhouseEventRepository.generateSpanId not implemented");
198198
}
199199
}
200-
201-
/**
202-
* Factory function to create a ClickhouseEventRepository instance.
203-
* This can be used as an alternative to the PostgreSQL-based EventRepository.
204-
*/
205-
export function createClickhouseEventRepository(
206-
config: ClickhouseEventRepositoryConfig
207-
): IEventRepository {
208-
return new ClickhouseEventRepository(config);
209-
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { ClickHouse } from "@internal/clickhouse";
2+
import { env } from "~/env.server";
3+
import { singleton } from "~/utils/singleton";
4+
import { ClickhouseEventRepository } from "./clickhouseEventRepository.server";
5+
6+
export const clickhouseEventRepository = singleton(
7+
"clickhouseEventRepository",
8+
initializeClickhouseRepository
9+
);
10+
11+
function initializeClickhouseRepository() {
12+
if (!env.EVENTS_CLICKHOUSE_URL) {
13+
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
14+
}
15+
16+
const url = new URL(env.EVENTS_CLICKHOUSE_URL);
17+
url.searchParams.delete("secure");
18+
19+
const safeUrl = new URL(url.toString());
20+
safeUrl.password = "redacted";
21+
22+
console.log("🗃️ Initializing Clickhouse event repository", { url: safeUrl.toString() });
23+
24+
const clickhouse = new ClickHouse({
25+
url: url.toString(),
26+
name: "task-events",
27+
keepAlive: {
28+
enabled: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
29+
idleSocketTtl: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
30+
},
31+
logLevel: env.EVENTS_CLICKHOUSE_LOG_LEVEL,
32+
compression: {
33+
request: true,
34+
},
35+
maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
36+
});
37+
38+
const repository = new ClickhouseEventRepository({
39+
clickhouse: clickhouse,
40+
batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE,
41+
flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS,
42+
});
43+
44+
return repository;
45+
}

apps/webapp/app/v3/featureFlags.server.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@ import { prisma, type PrismaClientOrTransaction } from "~/db.server";
44
export const FEATURE_FLAG = {
55
defaultWorkerInstanceGroupId: "defaultWorkerInstanceGroupId",
66
runsListRepository: "runsListRepository",
7+
taskEventRepository: "taskEventRepository",
78
} as const;
89

910
const FeatureFlagCatalog = {
1011
[FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(),
1112
[FEATURE_FLAG.runsListRepository]: z.enum(["clickhouse", "postgres"]),
13+
[FEATURE_FLAG.taskEventRepository]: z.enum(["clickhouse", "postgres"]),
1214
};
1315

1416
type FeatureFlagKey = keyof typeof FeatureFlagCatalog;
1517

1618
export type FlagsOptions<T extends FeatureFlagKey> = {
1719
key: T;
1820
defaultValue?: z.infer<(typeof FeatureFlagCatalog)[T]>;
21+
overrides?: Record<string, unknown>;
1922
};
2023

2124
export function makeFlags(_prisma: PrismaClientOrTransaction = prisma) {
@@ -34,7 +37,17 @@ export function makeFlags(_prisma: PrismaClientOrTransaction = prisma) {
3437
},
3538
});
3639

37-
const parsed = FeatureFlagCatalog[opts.key].safeParse(value?.value);
40+
const flagSchema = FeatureFlagCatalog[opts.key];
41+
42+
if (opts.overrides?.[opts.key]) {
43+
const parsed = flagSchema.safeParse(opts.overrides[opts.key]);
44+
45+
if (parsed.success) {
46+
return parsed.data;
47+
}
48+
}
49+
50+
const parsed = flagSchema.safeParse(value?.value);
3851

3952
if (!parsed.success) {
4053
return opts.defaultValue;

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { tracer } from "../tracer.server";
1414
import { WithRunEngine } from "./baseService.server";
1515
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
1616
import { env } from "~/env.server";
17+
import { clickhouseEventRepository } from "../clickhouseEventRepositoryInstance.server";
1718

1819
export type TriggerTaskServiceOptions = {
1920
idempotencyKey?: string;
@@ -93,7 +94,10 @@ export class TriggerTaskService extends WithRunEngine {
9394
body: TriggerTaskRequestBody,
9495
options: TriggerTaskServiceOptions = {}
9596
): Promise<TriggerTaskServiceResult | undefined> {
96-
const traceEventConcern = new DefaultTraceEventsConcern(eventRepository);
97+
const traceEventConcern = new DefaultTraceEventsConcern(
98+
eventRepository,
99+
clickhouseEventRepository
100+
);
97101

98102
const service = new RunEngineTriggerTaskService({
99103
prisma: this._prisma,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."Organization" ADD COLUMN "featureFlags" JSONB;

0 commit comments

Comments
 (0)