Skip to content

Commit e8d84fa

Browse files
fix(webapp): reconcile trace with run lifecycle to handle clickhouse lag
1 parent bfda16b commit e8d84fa

File tree

2 files changed

+237
-35
lines changed

2 files changed

+237
-35
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 96 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { getUsername } from "~/utils/username";
66
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
77
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
88
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
9-
import { isFinalRunStatus } from "~/v3/taskStatus";
9+
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1010
import { env } from "~/env.server";
1111

1212
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
@@ -211,49 +211,47 @@ export class RunPresenter {
211211
let totalDuration = tree?.data.duration ?? 0;
212212
const events = tree
213213
? flattenTree(tree).map((n) => {
214-
const offset = millisecondsToNanoseconds(
215-
n.data.startTime.getTime() - treeRootStartTimeMs
216-
);
217-
//only let non-debug events extend the total duration
218-
if (!n.data.isDebug) {
219-
totalDuration = Math.max(totalDuration, offset + n.data.duration);
220-
}
221-
return {
222-
...n,
223-
data: {
224-
...n.data,
225-
timelineEvents: createTimelineSpanEventsFromSpanEvents(
226-
n.data.events,
227-
user?.admin ?? false,
228-
treeRootStartTimeMs
229-
),
230-
//set partial nodes to null duration
231-
duration: n.data.isPartial ? null : n.data.duration,
232-
offset,
233-
isRoot: n.id === traceSummary.rootSpan.id,
234-
},
235-
};
236-
})
214+
const offset = millisecondsToNanoseconds(
215+
n.data.startTime.getTime() - treeRootStartTimeMs
216+
);
217+
//only let non-debug events extend the total duration
218+
if (!n.data.isDebug) {
219+
totalDuration = Math.max(totalDuration, offset + n.data.duration);
220+
}
221+
return {
222+
...n,
223+
data: {
224+
...n.data,
225+
timelineEvents: createTimelineSpanEventsFromSpanEvents(
226+
n.data.events,
227+
user?.admin ?? false,
228+
treeRootStartTimeMs
229+
),
230+
//set partial nodes to null duration
231+
duration: n.data.isPartial ? null : n.data.duration,
232+
offset,
233+
isRoot: n.id === traceSummary.rootSpan.id,
234+
},
235+
};
236+
})
237237
: [];
238238

239239
//total duration should be a minimum of 1ms
240240
totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1));
241241

242-
let rootSpanStatus: "executing" | "completed" | "failed" = "executing";
243-
if (events[0]) {
244-
if (events[0].data.isError) {
245-
rootSpanStatus = "failed";
246-
} else if (!events[0].data.isPartial) {
247-
rootSpanStatus = "completed";
248-
}
249-
}
242+
const reconciled = reconcileTraceWithRunLifecycle(
243+
runData,
244+
traceSummary.rootSpan.id,
245+
events,
246+
totalDuration
247+
);
250248

251249
return {
252250
run: runData,
253251
trace: {
254-
rootSpanStatus,
255-
events: events,
256-
duration: totalDuration,
252+
rootSpanStatus: reconciled.rootSpanStatus,
253+
events: reconciled.events,
254+
duration: reconciled.totalDuration,
257255
rootStartedAt: tree?.data.startTime,
258256
startedAt: run.startedAt,
259257
queuedDuration: run.startedAt
@@ -265,3 +263,66 @@ export class RunPresenter {
265263
};
266264
}
267265
}
266+
267+
// NOTE: Clickhouse trace ingestion is eventually consistent.
268+
// When a run is marked finished in Postgres, we reconcile the
269+
// root span to reflect completion even if telemetry is still partial.
270+
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
271+
// run states in the dashboard.
272+
export function reconcileTraceWithRunLifecycle(
273+
runData: {
274+
isFinished: boolean;
275+
status: Run["status"];
276+
createdAt: Date;
277+
completedAt: Date | null;
278+
rootTaskRun: { createdAt: Date } | null;
279+
},
280+
rootSpanId: string,
281+
events: RunEvent[],
282+
totalDuration: number
283+
): {
284+
events: RunEvent[];
285+
totalDuration: number;
286+
rootSpanStatus: "executing" | "completed" | "failed";
287+
} {
288+
const currentStatus: "executing" | "completed" | "failed" = events[0]
289+
? events[0].data.isError
290+
? "failed"
291+
: !events[0].data.isPartial
292+
? "completed"
293+
: "executing"
294+
: "executing";
295+
296+
if (!runData.isFinished) {
297+
return { events, totalDuration, rootSpanStatus: currentStatus };
298+
}
299+
300+
const postgresRunDuration = runData.completedAt
301+
? millisecondsToNanoseconds(
302+
runData.completedAt.getTime() - (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime()
303+
)
304+
: 0;
305+
306+
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
307+
308+
const updatedEvents = events.map((e) => {
309+
if (e.id === rootSpanId && e.data.isPartial) {
310+
return {
311+
...e,
312+
data: {
313+
...e.data,
314+
isPartial: false,
315+
duration: Math.max(e.data.duration ?? 0, postgresRunDuration),
316+
isError: isFailedRunStatus(runData.status),
317+
},
318+
};
319+
}
320+
return e;
321+
});
322+
323+
return {
324+
events: updatedEvents,
325+
totalDuration: updatedTotalDuration,
326+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
327+
};
328+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { vi, describe, it, expect } from "vitest";
2+
3+
vi.mock("../app/env.server", () => ({
4+
env: {
5+
MAXIMUM_LIVE_RELOADING_EVENTS: 1000,
6+
},
7+
}));
8+
9+
vi.mock("../app/db.server", () => ({
10+
prisma: {},
11+
$replica: {},
12+
$transaction: vi.fn(),
13+
}));
14+
15+
vi.mock("../app/v3/eventRepository/index.server", () => ({
16+
resolveEventRepositoryForStore: vi.fn(),
17+
}));
18+
19+
vi.mock("../app/v3/taskEventStore.server", () => ({
20+
getTaskEventStoreTableForRun: vi.fn(),
21+
}));
22+
23+
vi.mock("../app/utils/username", () => ({
24+
getUsername: vi.fn(),
25+
}));
26+
27+
import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/RunPresenter.server";
28+
import { millisecondsToNanoseconds } from "@trigger.dev/core/v3";
29+
30+
describe("reconcileTraceWithRunLifecycle", () => {
31+
const rootSpanId = "root-span-id";
32+
const createdAt = new Date("2024-01-01T00:00:00Z");
33+
const completedAt = new Date("2024-01-01T00:00:05Z");
34+
35+
const runData: any = {
36+
isFinished: true,
37+
status: "COMPLETED_SUCCESSFULLY",
38+
createdAt,
39+
completedAt,
40+
rootTaskRun: null,
41+
};
42+
43+
const initialEvents = [
44+
{
45+
id: rootSpanId,
46+
data: {
47+
isPartial: true,
48+
duration: millisecondsToNanoseconds(1000), // 1s, less than the 5s run duration
49+
isError: false,
50+
},
51+
},
52+
{
53+
id: "child-span-id",
54+
data: {
55+
isPartial: false,
56+
duration: millisecondsToNanoseconds(500),
57+
isError: false,
58+
},
59+
},
60+
];
61+
62+
it("should reconcile a finished run with lagging partial telemetry", () => {
63+
const totalDuration = millisecondsToNanoseconds(1000);
64+
const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, initialEvents as any, totalDuration);
65+
66+
expect(result.rootSpanStatus).toBe("completed");
67+
68+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
69+
expect(rootEvent?.data.isPartial).toBe(false);
70+
// 5s duration = 5000ms
71+
expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000));
72+
expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000));
73+
});
74+
75+
it("should not override duration if Clickhouse already has a longer finished duration", () => {
76+
const longDuration = millisecondsToNanoseconds(10000);
77+
const finishedEvents = [
78+
{
79+
id: rootSpanId,
80+
data: {
81+
isPartial: false,
82+
duration: longDuration,
83+
isError: false,
84+
},
85+
},
86+
];
87+
88+
const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, finishedEvents as any, longDuration);
89+
90+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
91+
expect(rootEvent?.data.duration).toBe(longDuration);
92+
expect(rootEvent?.data.isPartial).toBe(false);
93+
expect(result.totalDuration).toBe(longDuration);
94+
});
95+
96+
it("should handle unfinished runs without modification", () => {
97+
const unfinishedRun = { ...runData, isFinished: false, completedAt: null };
98+
const totalDuration = millisecondsToNanoseconds(1000);
99+
const result = reconcileTraceWithRunLifecycle(unfinishedRun, rootSpanId, initialEvents as any, totalDuration);
100+
101+
expect(result.rootSpanStatus).toBe("executing");
102+
103+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
104+
expect(rootEvent?.data.isPartial).toBe(true);
105+
expect(rootEvent?.data.duration).toBe(millisecondsToNanoseconds(1000));
106+
});
107+
108+
it("should reconcile failed runs correctly", () => {
109+
const failedRun = { ...runData, status: "COMPLETED_WITH_ERRORS" };
110+
const result = reconcileTraceWithRunLifecycle(failedRun, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000));
111+
112+
expect(result.rootSpanStatus).toBe("failed");
113+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
114+
expect(rootEvent?.data.isError).toBe(true);
115+
expect(rootEvent?.data.isPartial).toBe(false);
116+
});
117+
118+
it("should use rootTaskRun createdAt if available for duration calculation", () => {
119+
const rootTaskCreatedAt = new Date("2023-12-31T23:59:50Z"); // 10s before run.createdAt
120+
const runDataWithRoot: any = {
121+
...runData,
122+
rootTaskRun: { createdAt: rootTaskCreatedAt },
123+
};
124+
125+
const result = reconcileTraceWithRunLifecycle(runDataWithRoot, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000));
126+
127+
// Duration should be from 23:59:50 to 00:00:05 = 15s
128+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
129+
expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000));
130+
expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000));
131+
});
132+
133+
it("should handle missing root span gracefully", () => {
134+
const result = reconcileTraceWithRunLifecycle(runData, "non-existent-id", initialEvents as any, millisecondsToNanoseconds(1000));
135+
136+
expect(result.rootSpanStatus).toBe("completed");
137+
expect(result.events).toEqual(initialEvents);
138+
// totalDuration should still be updated to postgres duration even if root span is missing from events list
139+
expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000));
140+
});
141+
});

0 commit comments

Comments
 (0)