Skip to content

Commit 5775d60

Browse files
committed
WIP
1 parent 3ceea77 commit 5775d60

File tree

13 files changed

+646
-218
lines changed

13 files changed

+646
-218
lines changed

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66
"**/node_modules/**": true,
77
"packages/cli-v3/e2e": true
88
},
9-
"vitest.disableWorkspaceWarning": true
9+
"vitest.disableWorkspaceWarning": true,
10+
"typescript.experimental.useTsgo": true
1011
}

apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { LoaderFunctionArgs } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { requireUser } from "~/services/session.server";
44
import { v3RunParamsSchema } from "~/utils/pathBuilder";
5-
import { RunPreparedEvent, eventRepository } from "~/v3/eventRepository.server";
5+
import { eventRepository } from "~/v3/eventRepository.server";
6+
import type { RunPreparedEvent } from "~/v3/eventRepository.types";
67
import { createGzip } from "zlib";
78
import { Readable } from "stream";
89
import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations";

apps/webapp/app/utils/taskEvent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import type {
1919
SpanLink,
2020
SpanSummary,
2121
TraceSummary,
22-
} from "~/v3/eventRepository.server";
22+
} from "~/v3/eventRepository.types";
2323

2424
export type TraceSpan = NonNullable<ReturnType<typeof createSpanFromEvents>>;
2525

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 58 additions & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -33,220 +33,38 @@ import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis
3333
import { logger } from "~/services/logger.server";
3434
import { singleton } from "~/utils/singleton";
3535
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
36+
import type {
37+
IEventRepository,
38+
CreatableEvent,
39+
CreatableEventKind,
40+
CreatableEventStatus,
41+
CreatableEventEnvironmentType,
42+
CompleteableTaskRun,
43+
TraceAttributes,
44+
SetAttribute,
45+
TraceEventOptions,
46+
EventBuilder,
47+
EventRepoConfig,
48+
QueryOptions,
49+
TaskEventRecord,
50+
QueriedEvent,
51+
PreparedEvent,
52+
PreparedDetailedEvent,
53+
RunPreparedEvent,
54+
SpanLink,
55+
SpanSummary,
56+
TraceSummary,
57+
SpanDetailedSummary,
58+
TraceDetailedSummary,
59+
UpdateEventOptions,
60+
} from "./eventRepository.types";
3661
import { DetailedTraceEvent, TaskEventStore, TaskEventStoreTable } from "./taskEventStore.server";
3762
import { startActiveSpan } from "./tracer.server";
3863
import { startSpan } from "./tracing.server";
3964

4065
const MAX_FLUSH_DEPTH = 5;
4166

42-
export type CreatableEvent = Omit<
43-
Prisma.TaskEventCreateInput,
44-
"id" | "createdAt" | "properties" | "metadata" | "style" | "output" | "payload"
45-
> & {
46-
properties: Attributes;
47-
metadata: Attributes | undefined;
48-
style: Attributes | undefined;
49-
output: Attributes | string | boolean | number | undefined;
50-
payload: Attributes | string | boolean | number | undefined;
51-
};
52-
53-
export type CreatableEventKind = TaskEventKind;
54-
export type CreatableEventStatus = TaskEventStatus;
55-
export type CreatableEventEnvironmentType = CreatableEvent["environmentType"];
56-
57-
export type CompleteableTaskRun = Pick<
58-
TaskRun,
59-
| "friendlyId"
60-
| "traceId"
61-
| "spanId"
62-
| "parentSpanId"
63-
| "createdAt"
64-
| "completedAt"
65-
| "taskIdentifier"
66-
| "projectId"
67-
| "runtimeEnvironmentId"
68-
| "organizationId"
69-
| "environmentType"
70-
| "isTest"
71-
>;
72-
73-
export type TraceAttributes = Partial<
74-
Pick<
75-
CreatableEvent,
76-
| "attemptId"
77-
| "isError"
78-
| "isCancelled"
79-
| "isDebug"
80-
| "runId"
81-
| "runIsTest"
82-
| "output"
83-
| "outputType"
84-
| "metadata"
85-
| "properties"
86-
| "style"
87-
| "queueId"
88-
| "queueName"
89-
| "batchId"
90-
| "payload"
91-
| "payloadType"
92-
| "idempotencyKey"
93-
>
94-
>;
95-
96-
export type SetAttribute<T extends TraceAttributes> = (key: keyof T, value: T[keyof T]) => void;
97-
98-
export type TraceEventOptions = {
99-
kind?: CreatableEventKind;
100-
context?: Record<string, unknown>;
101-
spanParentAsLink?: boolean;
102-
parentAsLinkType?: "trigger" | "replay";
103-
spanIdSeed?: string;
104-
attributes: TraceAttributes;
105-
environment: TaskEventEnvironment;
106-
taskSlug: string;
107-
startTime?: bigint;
108-
endTime?: Date;
109-
immediate?: boolean;
110-
};
111-
112-
export type EventBuilder = {
113-
traceId: string;
114-
spanId: string;
115-
setAttribute: SetAttribute<TraceAttributes>;
116-
stop: () => void;
117-
failWithError: (error: TaskRunError) => void;
118-
};
119-
120-
export type EventRepoConfig = {
121-
batchSize: number;
122-
batchInterval: number;
123-
redis: RedisWithClusterOptions;
124-
retentionInDays: number;
125-
partitioningEnabled: boolean;
126-
tracer?: Tracer;
127-
minConcurrency?: number;
128-
maxConcurrency?: number;
129-
maxBatchSize?: number;
130-
memoryPressureThreshold?: number;
131-
loadSheddingThreshold?: number;
132-
loadSheddingEnabled?: boolean;
133-
};
134-
135-
export type QueryOptions = Prisma.TaskEventWhereInput;
136-
137-
export type TaskEventRecord = TaskEvent;
138-
139-
export type QueriedEvent = Prisma.TaskEventGetPayload<{
140-
select: {
141-
spanId: true;
142-
parentId: true;
143-
runId: true;
144-
idempotencyKey: true;
145-
message: true;
146-
style: true;
147-
startTime: true;
148-
duration: true;
149-
isError: true;
150-
isPartial: true;
151-
isCancelled: true;
152-
level: true;
153-
events: true;
154-
environmentType: true;
155-
kind: true;
156-
attemptNumber: true;
157-
};
158-
}>;
159-
160-
export type PreparedEvent = Omit<QueriedEvent, "events" | "style" | "duration"> & {
161-
duration: number;
162-
events: SpanEvents;
163-
style: TaskEventStyle;
164-
};
165-
166-
export type PreparedDetailedEvent = Omit<DetailedTraceEvent, "events" | "style" | "duration"> & {
167-
duration: number;
168-
events: SpanEvents;
169-
style: TaskEventStyle;
170-
};
171-
172-
export type RunPreparedEvent = PreparedEvent & {
173-
taskSlug?: string;
174-
};
175-
176-
export type SpanLink =
177-
| {
178-
type: "run";
179-
icon?: string;
180-
title: string;
181-
runId: string;
182-
}
183-
| {
184-
type: "span";
185-
icon?: string;
186-
title: string;
187-
traceId: string;
188-
spanId: string;
189-
};
190-
191-
export type SpanSummary = {
192-
id: string;
193-
parentId: string | undefined;
194-
runId: string;
195-
data: {
196-
message: string;
197-
style: TaskEventStyle;
198-
events: SpanEvents;
199-
startTime: Date;
200-
duration: number;
201-
isError: boolean;
202-
isPartial: boolean;
203-
isCancelled: boolean;
204-
isDebug: boolean;
205-
level: NonNullable<CreatableEvent["level"]>;
206-
environmentType: CreatableEventEnvironmentType;
207-
};
208-
};
209-
210-
export type TraceSummary = { rootSpan: SpanSummary; spans: Array<SpanSummary> };
211-
212-
export type SpanDetailedSummary = {
213-
id: string;
214-
parentId: string | undefined;
215-
message: string;
216-
data: {
217-
runId: string;
218-
taskSlug?: string;
219-
taskPath?: string;
220-
events: SpanEvents;
221-
startTime: Date;
222-
duration: number;
223-
isError: boolean;
224-
isPartial: boolean;
225-
isCancelled: boolean;
226-
level: NonNullable<CreatableEvent["level"]>;
227-
environmentType: CreatableEventEnvironmentType;
228-
workerVersion?: string;
229-
queueName?: string;
230-
machinePreset?: string;
231-
properties?: Attributes;
232-
output?: Attributes;
233-
};
234-
children: Array<SpanDetailedSummary>;
235-
};
236-
237-
export type TraceDetailedSummary = {
238-
traceId: string;
239-
rootSpan: SpanDetailedSummary;
240-
};
241-
242-
export type UpdateEventOptions = {
243-
attributes: TraceAttributes;
244-
endTime?: Date;
245-
immediate?: boolean;
246-
events?: SpanEvents;
247-
};
248-
249-
export class EventRepository {
67+
export class EventRepository implements IEventRepository {
25068
private readonly _flushScheduler: DynamicFlushScheduler<CreatableEvent>;
25169
private _randomIdGenerator = new RandomIdGenerator();
25270
private _redisPublishClient: RedisClient;
@@ -304,7 +122,7 @@ export class EventRepository {
304122
}
305123

306124
async insertManyImmediate(events: CreatableEvent[]) {
307-
return await this.#flushBatch(nanoid(), events);
125+
return await this.#flushBatchWithReturn(nanoid(), events);
308126
}
309127

310128
async completeSuccessfulRunEvent({ run, endTime }: { run: CompleteableTaskRun; endTime?: Date }) {
@@ -1637,6 +1455,35 @@ export class EventRepository {
16371455
}
16381456

16391457
async #flushBatch(flushId: string, batch: CreatableEvent[]) {
1458+
await startSpan(this._tracer, "flushBatch", async (span) => {
1459+
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
1460+
1461+
span.setAttribute("flush_id", flushId);
1462+
span.setAttribute("event_count", events.length);
1463+
span.setAttribute("partial_event_count", batch.length - events.length);
1464+
span.setAttribute(
1465+
"last_flush_in_ms",
1466+
this._lastFlushedAt ? new Date().getTime() - this._lastFlushedAt.getTime() : 0
1467+
);
1468+
1469+
const flushedEvents = await this.#doFlushBatch(flushId, events);
1470+
1471+
this._lastFlushedAt = new Date();
1472+
1473+
if (flushedEvents.length !== events.length) {
1474+
logger.debug("[EventRepository][flushBatch] Failed to insert all events", {
1475+
attemptCount: events.length,
1476+
successCount: flushedEvents.length,
1477+
});
1478+
1479+
span.setAttribute("failed_event_count", events.length - flushedEvents.length);
1480+
}
1481+
1482+
this.#publishToRedis(flushedEvents);
1483+
});
1484+
}
1485+
1486+
async #flushBatchWithReturn(flushId: string, batch: CreatableEvent[]): Promise<CreatableEvent[]> {
16401487
return await startSpan(this._tracer, "flushBatch", async (span) => {
16411488
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
16421489

@@ -1662,6 +1509,8 @@ export class EventRepository {
16621509
}
16631510

16641511
this.#publishToRedis(flushedEvents);
1512+
1513+
return flushedEvents;
16651514
});
16661515
}
16671516

0 commit comments

Comments
 (0)