Skip to content

Commit bf97d4b

Browse files
committed
more clickhouse events stuff
1 parent eaeaa1e commit bf97d4b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1609
-653
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
33
import { prisma, type PrismaClient } from "~/db.server";
44
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
55
import { getUsername } from "~/utils/username";
6-
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { resolveEventRepositoryForStore } from "~/v3/eventRepository";
77
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
88
import { isFinalRunStatus } from "~/v3/taskStatus";
99

@@ -140,9 +140,12 @@ export class RunPresenter {
140140
};
141141
}
142142

143+
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
144+
143145
// get the events
144146
const traceSummary = await eventRepository.getTraceSummary(
145147
getTaskEventStoreTableForRun(run),
148+
run.runtimeEnvironment.id,
146149
run.traceId,
147150
run.rootTaskRun?.createdAt ?? run.createdAt,
148151
run.completedAt ?? undefined,

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { eventStream } from "remix-utils/sse/server";
33
import { PrismaClient, prisma } from "~/db.server";
44
import { logger } from "~/services/logger.server";
55
import { throttle } from "~/utils/throttle";
6-
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { tracePubSub } from "~/v3/services/tracePubSub.server";
77

88
const pingInterval = 1000;
99

@@ -41,7 +41,7 @@ export class RunStreamPresenter {
4141

4242
let pinger: NodeJS.Timeout | undefined = undefined;
4343

44-
const { unsubscribe, eventEmitter } = await eventRepository.subscribeToTrace(run.traceId);
44+
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
4545

4646
return eventStream(request.signal, (send, close) => {
4747
const safeSend = (args: { event?: string; data: string }) => {

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,23 @@ import {
1010
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
1111
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
1212
import { logger } from "~/services/logger.server";
13-
import { eventRepository, rehydrateAttribute } from "~/v3/eventRepository.server";
13+
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
1414
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1515
import { getTaskEventStoreTableForRun, type TaskEventStoreTable } from "~/v3/taskEventStore.server";
1616
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1717
import { BasePresenter } from "./basePresenter.server";
1818
import { WaitpointPresenter } from "./WaitpointPresenter.server";
1919
import { engine } from "~/v3/runEngine.server";
20+
import { resolveEventRepositoryForStore } from "~/v3/eventRepository";
21+
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
2022

2123
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
2224
export type Span = NonNullable<NonNullable<Result>["span"]>;
2325
export type SpanRun = NonNullable<NonNullable<Result>["run"]>;
2426
type FindRunResult = NonNullable<
2527
Awaited<ReturnType<InstanceType<typeof SpanPresenter>["findRun"]>>
2628
>;
27-
type GetSpanResult = NonNullable<Awaited<ReturnType<(typeof eventRepository)["getSpan"]>>>;
29+
type GetSpanResult = SpanDetail;
2830

2931
export class SpanPresenter extends BasePresenter {
3032
public async call({
@@ -74,14 +76,20 @@ export class SpanPresenter extends BasePresenter {
7476
return;
7577
}
7678

79+
const { traceId } = parentRun;
80+
81+
const eventRepository = resolveEventRepositoryForStore(parentRun.taskEventStore);
82+
7783
const eventStore = getTaskEventStoreTableForRun(parentRun);
7884

7985
const run = await this.getRun({
8086
eventStore,
81-
environmentId: parentRun.runtimeEnvironmentId,
87+
traceId,
88+
eventRepository,
8289
spanId,
8390
createdAt: parentRun.createdAt,
8491
completedAt: parentRun.completedAt,
92+
environmentId: parentRun.runtimeEnvironmentId,
8593
});
8694
if (run) {
8795
return {
@@ -93,10 +101,12 @@ export class SpanPresenter extends BasePresenter {
93101
const span = await this.#getSpan({
94102
eventStore,
95103
spanId,
104+
traceId,
96105
environmentId: parentRun.runtimeEnvironmentId,
97106
projectId: parentRun.projectId,
98107
createdAt: parentRun.createdAt,
99108
completedAt: parentRun.completedAt,
109+
eventRepository,
100110
});
101111

102112
if (!span) {
@@ -112,29 +122,30 @@ export class SpanPresenter extends BasePresenter {
112122
async getRun({
113123
eventStore,
114124
environmentId,
125+
traceId,
126+
eventRepository,
115127
spanId,
116128
createdAt,
117129
completedAt,
118130
}: {
119131
eventStore: TaskEventStoreTable;
120132
environmentId: string;
133+
traceId: string;
134+
eventRepository: IEventRepository;
121135
spanId: string;
122136
createdAt: Date;
123137
completedAt: Date | null;
124138
}) {
125-
const span = await eventRepository.getSpan({
126-
storeTable: eventStore,
139+
const originalRunId = await eventRepository.getSpanOriginalRunId(
140+
eventStore,
141+
environmentId,
127142
spanId,
128143
environmentId,
129-
startCreatedAt: createdAt,
130-
endCreatedAt: completedAt ?? undefined,
131-
});
132-
133-
if (!span) {
134-
return;
135-
}
144+
createdAt,
145+
completedAt ?? undefined
146+
);
136147

137-
const run = await this.findRun({ span, spanId });
148+
const run = await this.findRun({ originalRunId, spanId, environmentId });
138149

139150
if (!run) {
140151
return;
@@ -259,7 +270,7 @@ export class SpanPresenter extends BasePresenter {
259270
workerQueue: run.workerQueue,
260271
traceId: run.traceId,
261272
spanId: run.spanId,
262-
isCached: !!span.originalRun,
273+
isCached: !!originalRunId,
263274
machinePreset: machine?.name,
264275
externalTraceId,
265276
};
@@ -294,7 +305,15 @@ export class SpanPresenter extends BasePresenter {
294305
};
295306
}
296307

297-
async findRun({ span, spanId }: { span: GetSpanResult; spanId: string }) {
308+
async findRun({
309+
originalRunId,
310+
spanId,
311+
environmentId,
312+
}: {
313+
originalRunId?: string;
314+
spanId: string;
315+
environmentId: string;
316+
}) {
298317
const run = await this._replica.taskRun.findFirst({
299318
select: {
300319
id: true,
@@ -404,12 +423,14 @@ export class SpanPresenter extends BasePresenter {
404423
},
405424
},
406425
},
407-
where: span.originalRun
426+
where: originalRunId
408427
? {
409-
friendlyId: span.originalRun,
428+
friendlyId: originalRunId,
429+
runtimeEnvironmentId: environmentId,
410430
}
411431
: {
412432
spanId,
433+
runtimeEnvironmentId: environmentId,
413434
},
414435
});
415436

@@ -418,27 +439,32 @@ export class SpanPresenter extends BasePresenter {
418439

419440
async #getSpan({
420441
eventStore,
442+
eventRepository,
443+
traceId,
421444
spanId,
422445
environmentId,
423446
projectId,
424447
createdAt,
425448
completedAt,
426449
}: {
450+
eventRepository: IEventRepository;
451+
traceId: string;
427452
spanId: string;
428453
environmentId: string;
429454
projectId: string;
430455
eventStore: TaskEventStoreTable;
431456
createdAt: Date;
432457
completedAt: Date | null;
433458
}) {
434-
const span = await eventRepository.getSpan({
435-
storeTable: eventStore,
459+
const span = await eventRepository.getSpan(
460+
eventStore,
461+
environmentId,
436462
spanId,
437463
environmentId,
438-
startCreatedAt: createdAt,
439-
endCreatedAt: completedAt ?? undefined,
440-
options: { includeDebugLogs: true },
441-
});
464+
createdAt,
465+
completedAt ?? undefined,
466+
{ includeDebugLogs: true }
467+
);
442468

443469
if (!span) {
444470
return;

apps/webapp/app/routes/api.v1.runs.$runId.events.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
44
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5-
import { eventRepository } from "~/v3/eventRepository.server";
5+
import { eventRepository } from "~/v3/eventRepository/eventRepository.server";
66
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
77

88
const ParamsSchema = z.object({
@@ -30,9 +30,10 @@ export const loader = createLoaderApiRoute(
3030
superScopes: ["read:runs", "read:all", "admin"],
3131
},
3232
},
33-
async ({ resource: run }) => {
33+
async ({ resource: run, authentication }) => {
3434
const runEvents = await eventRepository.getRunEvents(
3535
getTaskEventStoreTableForRun(run),
36+
authentication.environment.id,
3637
run.friendlyId,
3738
run.createdAt,
3839
run.completedAt ?? undefined

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { BatchId } from "@trigger.dev/core/v3/isomorphic";
33
import { z } from "zod";
44
import { $replica } from "~/db.server";
55
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
6-
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { eventRepository } from "~/v3/eventRepository/eventRepository.server";
77
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
88

99
const ParamsSchema = z.object({
@@ -35,9 +35,10 @@ export const loader = createLoaderApiRoute(
3535
superScopes: ["read:runs", "read:all", "admin"],
3636
},
3737
},
38-
async ({ resource: run }) => {
38+
async ({ resource: run, authentication }) => {
3939
const traceSummary = await eventRepository.getTraceDetailedSummary(
4040
getTaskEventStoreTableForRun(run),
41+
authentication.environment.id,
4142
run.traceId,
4243
run.createdAt,
4344
run.completedAt ?? undefined

apps/webapp/app/routes/engine.v1.dev.runs.$runFriendlyId.logs.debug.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { z } from "zod";
99
import { prisma } from "~/db.server";
1010
import { logger } from "~/services/logger.server";
1111
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
12-
import { recordRunDebugLog } from "~/v3/eventRepository.server";
12+
import { recordRunDebugLog } from "~/v3/eventRepository/eventRepository.server";
1313

1414
// const { action } = createActionApiRoute(
1515
// {

apps/webapp/app/routes/engine.v1.dev.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import { MachinePreset } from "@trigger.dev/core/v3";
2+
import { MachinePreset, SemanticInternalAttributes } from "@trigger.dev/core/v3";
33
import { RunId, SnapshotId } from "@trigger.dev/core/v3/isomorphic";
44
import {
55
WorkerApiRunAttemptStartRequestBody,
@@ -57,7 +57,8 @@ const { action } = createActionApiRoute(
5757
const envVars = await getEnvVars(
5858
authentication.environment,
5959
engineResult.run.id,
60-
engineResult.execution.machine ?? defaultMachinePreset
60+
engineResult.execution.machine ?? defaultMachinePreset,
61+
engineResult.run.taskEventStore
6162
);
6263

6364
return json({
@@ -77,7 +78,8 @@ const { action } = createActionApiRoute(
7778
async function getEnvVars(
7879
environment: RuntimeEnvironment,
7980
runId: string,
80-
machinePreset: MachinePreset
81+
machinePreset: MachinePreset,
82+
taskEventStore?: string
8183
): Promise<Record<string, string>> {
8284
const variables = await resolveVariablesForEnvironment(environment);
8385

@@ -94,6 +96,19 @@ async function getEnvVars(
9496
]
9597
);
9698

99+
if (taskEventStore) {
100+
const resourceAttributes = JSON.stringify({
101+
[SemanticInternalAttributes.TASK_EVENT_STORE]: taskEventStore,
102+
});
103+
104+
variables.push(
105+
...[
106+
{ key: "OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
107+
{ key: "TRIGGER_OTEL_RESOURCE_ATTRIBUTES", value: resourceAttributes },
108+
]
109+
);
110+
}
111+
97112
return variables.reduce((acc: Record<string, string>, curr) => {
98113
acc[curr.key] = curr.value;
99114
return acc;

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.logs.debug.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
33
import { WorkerApiDebugLogBody } from "@trigger.dev/core/v3/runEngineWorker";
44
import { z } from "zod";
55
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
6-
import { recordRunDebugLog } from "~/v3/eventRepository.server";
6+
import { recordRunDebugLog } from "~/v3/eventRepository/eventRepository.server";
77

88
export const action = createActionWorkerApiRoute(
99
{

apps/webapp/app/routes/otel.v1.logs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export async function action({ request }: ActionFunctionArgs) {
99
if (contentType === "application/json") {
1010
const body = await request.json();
1111

12-
const exportResponse = await otlpExporter.exportLogs(body as ExportLogsServiceRequest, false);
12+
const exportResponse = await otlpExporter.exportLogs(body as ExportLogsServiceRequest);
1313

1414
return json(exportResponse, { status: 200 });
1515
} else if (contentType === "application/x-protobuf") {

apps/webapp/app/routes/otel.v1.traces.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ export async function action({ request }: ActionFunctionArgs) {
99
if (contentType === "application/json") {
1010
const body = await request.json();
1111

12-
const exportResponse = await otlpExporter.exportTraces(
13-
body as ExportTraceServiceRequest,
14-
false
15-
);
12+
const exportResponse = await otlpExporter.exportTraces(body as ExportTraceServiceRequest);
1613

1714
return json(exportResponse, { status: 200 });
1815
} else if (contentType === "application/x-protobuf") {

0 commit comments

Comments
 (0)