Skip to content

Commit 593a2d9

Browse files
committed
convert closing cached run spans to new system
1 parent f8a7b66 commit 593a2d9

File tree

6 files changed

+264
-45
lines changed

6 files changed

+264
-45
lines changed

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,18 @@ export class IdempotencyKeyConcern {
9090
isError: associatedWaitpoint.outputIsError,
9191
},
9292
async (event) => {
93+
const spanId =
94+
request.options?.parentAsLinkType === "replay"
95+
? event.spanId
96+
: event.traceparent?.spanId
97+
? `${event.traceparent.spanId}:${event.spanId}`
98+
: event.spanId;
99+
93100
//block run with waitpoint
94101
await this.engine.blockRunWithWaitpoint({
95102
runId: RunId.fromFriendlyId(parentRunId),
96103
waitpoints: associatedWaitpoint.id,
97-
spanIdToComplete: event.spanId,
104+
spanIdToComplete: spanId,
98105
batch: request.options?.batchId
99106
? {
100107
id: request.options.batchId,

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 141 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,57 @@ export class EventRepository {
411411
});
412412
}
413413

414+
async completeCachedRunEvent({
415+
run,
416+
blockedRun,
417+
endTime,
418+
spanId,
419+
parentSpanId,
420+
spanCreatedAt,
421+
isError,
422+
}: {
423+
run: CompleteableTaskRun;
424+
blockedRun: CompleteableTaskRun;
425+
spanId: string;
426+
parentSpanId: string;
427+
spanCreatedAt: Date;
428+
isError: boolean;
429+
endTime?: Date;
430+
}) {
431+
const startTime = convertDateToNanoseconds(spanCreatedAt);
432+
433+
await this.insertImmediate({
434+
message: run.taskIdentifier,
435+
serviceName: "api server",
436+
serviceNamespace: "trigger.dev",
437+
level: "TRACE",
438+
kind: "SERVER",
439+
traceId: blockedRun.traceId,
440+
spanId: spanId,
441+
parentId: parentSpanId,
442+
runId: blockedRun.friendlyId,
443+
taskSlug: run.taskIdentifier,
444+
projectRef: "",
445+
projectId: run.projectId,
446+
environmentId: run.runtimeEnvironmentId,
447+
environmentType: run.environmentType ?? "DEVELOPMENT",
448+
organizationId: run.organizationId ?? "",
449+
isPartial: false,
450+
isError,
451+
isCancelled: false,
452+
status: "OK",
453+
runIsTest: run.isTest,
454+
startTime,
455+
properties: {},
456+
metadata: undefined,
457+
style: undefined,
458+
duration: calculateDurationFromStart(startTime, endTime ?? new Date()),
459+
output: undefined,
460+
payload: undefined,
461+
payloadType: undefined,
462+
});
463+
}
464+
414465
async completeFailedRunEvent({
415466
run,
416467
endTime,
@@ -1007,6 +1058,8 @@ export class EventRepository {
10071058
endCreatedAt
10081059
);
10091060

1061+
logger.debug("[getSpan] span", { span });
1062+
10101063
const output = rehydrateJson(spanEvent.output);
10111064
const payload = rehydrateJson(spanEvent.payload);
10121065

@@ -1052,11 +1105,13 @@ export class EventRepository {
10521105
}
10531106

10541107
const spanEvents = transformEvents(
1055-
preparedEvent.events,
1108+
span.data.events,
10561109
spanEvent.metadata as Attributes,
10571110
spanEvent.environmentType === "DEVELOPMENT"
10581111
);
10591112

1113+
logger.debug("[getSpan] spanEvents", { spanEvents });
1114+
10601115
const originalRun = rehydrateAttribute<string>(
10611116
spanEvent.properties,
10621117
SemanticInternalAttributes.ORIGINAL_RUN_ID
@@ -1092,8 +1147,7 @@ export class EventRepository {
10921147
endCreatedAt?: Date
10931148
) {
10941149
return await startActiveSpan("createSpanFromEvent", async (s) => {
1095-
let ancestorCancelled = false;
1096-
let duration = event.duration;
1150+
let overrides: AncestorOverrides | undefined;
10971151

10981152
if (!event.isCancelled && event.isPartial) {
10991153
await this.#walkSpanAncestors(
@@ -1107,25 +1161,70 @@ export class EventRepository {
11071161
}
11081162

11091163
if (ancestorEvent.isCancelled) {
1110-
ancestorCancelled = true;
1164+
overrides = {
1165+
isCancelled: true,
1166+
};
11111167

11121168
// We need to get the cancellation time from the cancellation span event
11131169
const cancellationEvent = ancestorEvent.events.find(
11141170
(event) => event.name === "cancellation"
11151171
);
11161172

11171173
if (cancellationEvent) {
1118-
duration = calculateDurationFromStart(event.startTime, cancellationEvent.time);
1174+
overrides.duration = calculateDurationFromStart(
1175+
event.startTime,
1176+
cancellationEvent.time
1177+
);
11191178
}
11201179

11211180
return { stop: true };
11221181
}
11231182

1183+
const attemptFailedEvent = (ancestorEvent.events ?? []).find(
1184+
(spanEvent) =>
1185+
spanEvent.name === "attempt_failed" &&
1186+
spanEvent.properties.attemptNumber === event.attemptNumber
1187+
);
1188+
1189+
if (!attemptFailedEvent) {
1190+
return { stop: false };
1191+
}
1192+
1193+
overrides = {
1194+
isError: true,
1195+
events: [
1196+
{
1197+
name: "exception",
1198+
time: attemptFailedEvent.time,
1199+
properties: {
1200+
exception: (attemptFailedEvent as AttemptFailedSpanEvent).properties.exception,
1201+
},
1202+
},
1203+
],
1204+
duration: calculateDurationFromStart(event.startTime, attemptFailedEvent.time),
1205+
};
1206+
11241207
return { stop: false };
11251208
}
11261209
);
11271210
}
11281211

1212+
if (overrides) {
1213+
logger.debug("[#createSpanFromEvent] overrides", { overrides, event });
1214+
}
1215+
1216+
const ancestorCancelled = overrides?.isCancelled ?? false;
1217+
const ancestorIsError = overrides?.isError ?? false;
1218+
const duration = overrides?.duration ?? event.duration;
1219+
const events = [...(overrides?.events ?? []), ...(event.events ?? [])];
1220+
const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial;
1221+
const isCancelled = event.isCancelled === true ? true : event.isPartial && ancestorCancelled;
1222+
const isError = isCancelled
1223+
? false
1224+
: typeof overrides?.isError === "boolean"
1225+
? overrides.isError
1226+
: event.isError;
1227+
11291228
const span = {
11301229
id: event.spanId,
11311230
parentId: event.parentId ?? undefined,
@@ -1135,12 +1234,12 @@ export class EventRepository {
11351234
message: event.message,
11361235
style: event.style,
11371236
duration,
1138-
isError: event.isError,
1139-
isPartial: ancestorCancelled ? false : event.isPartial,
1140-
isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled,
1237+
isError,
1238+
isPartial,
1239+
isCancelled,
11411240
startTime: getDateFromNanoseconds(event.startTime),
11421241
level: event.level,
1143-
events: event.events,
1242+
events,
11441243
environmentType: event.environmentType,
11451244
},
11461245
};
@@ -1216,10 +1315,16 @@ export class EventRepository {
12161315
);
12171316

12181317
let finalEvent: TaskEvent | undefined;
1318+
let overrideEvents: TaskEvent[] = [];
12191319
let partialEvent: TaskEvent | undefined;
12201320

12211321
// Separate partial and final events
12221322
for (const event of events) {
1323+
if (event.kind === "UNSPECIFIED") {
1324+
overrideEvents.push(event);
1325+
continue;
1326+
}
1327+
12231328
if (event.isPartial) {
12241329
// Take the first partial event (earliest)
12251330
if (!partialEvent) {
@@ -1233,11 +1338,14 @@ export class EventRepository {
12331338

12341339
// If we have both partial and final events, merge them intelligently
12351340
if (finalEvent && partialEvent) {
1236-
return this.#mergePartialWithFinalEvent(partialEvent, finalEvent);
1341+
return this.#mergeOverrides(
1342+
this.#mergePartialWithFinalEvent(partialEvent, finalEvent),
1343+
overrideEvents
1344+
);
12371345
}
12381346

12391347
// Return whichever event we have
1240-
return finalEvent ?? partialEvent;
1348+
return this.#mergeOverrides(finalEvent ?? partialEvent, overrideEvents);
12411349
});
12421350
}
12431351

@@ -1260,6 +1368,27 @@ export class EventRepository {
12601368
return merged;
12611369
}
12621370

1371+
#mergeOverrides(
1372+
event: TaskEvent | undefined,
1373+
overrideEvents: TaskEvent[]
1374+
): TaskEvent | undefined {
1375+
function extractEventsFromEvent(event: TaskEvent): SpanEvent[] {
1376+
return (event.events ?? []) as unknown as SpanEvent[];
1377+
}
1378+
1379+
if (!event) {
1380+
return;
1381+
}
1382+
1383+
return {
1384+
...event,
1385+
events: [
1386+
...extractEventsFromEvent(event),
1387+
...overrideEvents.flatMap(extractEventsFromEvent),
1388+
] as unknown as Prisma.JsonValue,
1389+
};
1390+
}
1391+
12631392
public async recordEvent(
12641393
message: string,
12651394
options: TraceEventOptions & { duration?: number; parentId?: string }
@@ -2034,7 +2163,7 @@ function findAttemptFailedAncestor(
20342163
return;
20352164
}
20362165

2037-
const attemptFailedEvent = ancestorSpan.events.find(
2166+
const attemptFailedEvent = (ancestorSpan.events ?? []).find(
20382167
(event) =>
20392168
event.name === "attempt_failed" &&
20402169
event.properties.attemptNumber === originalSpan.attemptNumber

0 commit comments

Comments
 (0)