Skip to content

Commit 5e049cd

Browse files
authored
fix(run-engine): avoid NAPI string overflow in getExecutionSnapshotsSince by only fetching waitpoints for latest snapshot (#2972)
1 parent 72c3571 commit 5e049cd

File tree

4 files changed

+1354
-11
lines changed

4 files changed

+1354
-11
lines changed

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 105 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ import {
88
TaskRunExecutionSnapshot,
99
TaskRunExecutionStatus,
1010
TaskRunStatus,
11+
Waitpoint,
1112
} from "@trigger.dev/database";
1213
import { HeartbeatTimeouts } from "../types.js";
1314
import { SystemResources } from "./systems.js";
1415

16+
/** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */
17+
const WAITPOINT_CHUNK_SIZE = 100;
18+
1519
export type ExecutionSnapshotSystemOptions = {
1620
resources: SystemResources;
1721
heartbeatTimeouts: HeartbeatTimeouts;
@@ -31,19 +35,41 @@ type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGe
3135
};
3236
}>;
3337

38+
type ExecutionSnapshotWithCheckpoint = Prisma.TaskRunExecutionSnapshotGetPayload<{
39+
include: {
40+
checkpoint: true;
41+
};
42+
}>;
43+
3444
function enhanceExecutionSnapshot(
3545
snapshot: ExecutionSnapshotWithCheckAndWaitpoints
46+
): EnhancedExecutionSnapshot {
47+
return enhanceExecutionSnapshotWithWaitpoints(
48+
snapshot,
49+
snapshot.completedWaitpoints,
50+
snapshot.completedWaitpointOrder
51+
);
52+
}
53+
54+
/**
55+
* Transforms a snapshot (with checkpoint but without waitpoints) into an EnhancedExecutionSnapshot
56+
* by combining it with pre-fetched waitpoints.
57+
*/
58+
function enhanceExecutionSnapshotWithWaitpoints(
59+
snapshot: ExecutionSnapshotWithCheckpoint,
60+
waitpoints: Waitpoint[],
61+
completedWaitpointOrder: string[]
3662
): EnhancedExecutionSnapshot {
3763
return {
3864
...snapshot,
3965
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
4066
runFriendlyId: RunId.toFriendlyId(snapshot.runId),
41-
completedWaitpoints: snapshot.completedWaitpoints.flatMap((w) => {
42-
//get all indexes of the waitpoint in the completedWaitpointOrder
43-
//we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
67+
completedWaitpoints: waitpoints.flatMap((w) => {
68+
// Get all indexes of the waitpoint in the completedWaitpointOrder
69+
// We do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
4470
let indexes: (number | undefined)[] = [];
45-
for (let i = 0; i < snapshot.completedWaitpointOrder.length; i++) {
46-
if (snapshot.completedWaitpointOrder[i] === w.id) {
71+
for (let i = 0; i < completedWaitpointOrder.length; i++) {
72+
if (completedWaitpointOrder[i] === w.id) {
4773
indexes.push(i);
4874
}
4975
}
@@ -60,9 +86,7 @@ function enhanceExecutionSnapshot(
6086
type: w.type,
6187
completedAt: w.completedAt ?? new Date(),
6288
idempotencyKey:
63-
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey
64-
? w.idempotencyKey
65-
: undefined,
89+
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
6690
completedByTaskRun: w.completedByTaskRunId
6791
? {
6892
id: w.completedByTaskRunId,
@@ -91,6 +115,42 @@ function enhanceExecutionSnapshot(
91115
};
92116
}
93117

118+
/**
119+
* Gets the waitpoint IDs linked to a snapshot via the _completedWaitpoints join table.
120+
* Uses raw SQL to avoid fetching full waitpoint data.
121+
*/
122+
async function getSnapshotWaitpointIds(
123+
prisma: PrismaClientOrTransaction,
124+
snapshotId: string
125+
): Promise<string[]> {
126+
const result = await prisma.$queryRaw<{ B: string }[]>`
127+
SELECT "B" FROM "_completedWaitpoints" WHERE "A" = ${snapshotId}
128+
`;
129+
return result.map((r) => r.B);
130+
}
131+
132+
/**
133+
* Fetches waitpoints in chunks to avoid NAPI string conversion limits.
134+
* This is necessary because waitpoints can have large outputs (100KB+),
135+
* and fetching many at once can exceed Node.js string limits.
136+
*/
137+
async function fetchWaitpointsInChunks(
138+
prisma: PrismaClientOrTransaction,
139+
waitpointIds: string[]
140+
): Promise<Waitpoint[]> {
141+
if (waitpointIds.length === 0) return [];
142+
143+
const allWaitpoints: Waitpoint[] = [];
144+
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
145+
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
146+
const waitpoints = await prisma.waitpoint.findMany({
147+
where: { id: { in: chunk } },
148+
});
149+
allWaitpoints.push(...waitpoints);
150+
}
151+
return allWaitpoints;
152+
}
153+
94154
/* Gets the most recent valid snapshot for a run */
95155
export async function getLatestExecutionSnapshot(
96156
prisma: PrismaClientOrTransaction,
@@ -191,12 +251,27 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
191251
};
192252
}
193253

254+
/**
255+
* Gets execution snapshots created after the specified snapshot.
256+
*
257+
* IMPORTANT: This function is optimized to avoid N×M data explosion when runs have many
258+
* completed waitpoints. Due to the many-to-many relation, once waitpoints complete,
259+
* all subsequent snapshots have the same waitpoints linked. For a run with 24 snapshots
260+
* and 236 waitpoints with 100KB outputs each, fetching all waitpoints for all snapshots
261+
* would result in ~570MB of data, causing "Failed to convert rust String into napi string" errors.
262+
*
263+
* Solution: Only the LATEST snapshot's waitpoints are fetched and included. The runner's
264+
* SnapshotManager only processes completedWaitpoints from the latest snapshot anyway -
265+
* intermediate snapshots' waitpoints are ignored. This reduces data from N×M to just M.
266+
*
267+
* Waitpoints are fetched in chunks (100 at a time) to handle batches up to 1000 items.
268+
*/
194269
export async function getExecutionSnapshotsSince(
195270
prisma: PrismaClientOrTransaction,
196271
runId: string,
197272
sinceSnapshotId: string
198273
): Promise<EnhancedExecutionSnapshot[]> {
199-
// Find the createdAt of the sinceSnapshotId
274+
// Step 1: Find the createdAt of the sinceSnapshotId
200275
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({
201276
where: { id: sinceSnapshotId },
202277
select: { createdAt: true },
@@ -206,21 +281,40 @@ export async function getExecutionSnapshotsSince(
206281
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
207282
}
208283

284+
// Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion
209285
const snapshots = await prisma.taskRunExecutionSnapshot.findMany({
210286
where: {
211287
runId,
212288
isValid: true,
213289
createdAt: { gt: sinceSnapshot.createdAt },
214290
},
215291
include: {
216-
completedWaitpoints: true,
217292
checkpoint: true,
293+
// DO NOT include completedWaitpoints here - this causes the N×M explosion
218294
},
219295
orderBy: { createdAt: "desc" },
220296
take: 50,
221297
});
222298

223-
return snapshots.reverse().map(enhanceExecutionSnapshot);
299+
if (snapshots.length === 0) return [];
300+
301+
// Step 3: Get waitpoint IDs for the LATEST snapshot only (first in desc order)
302+
const latestSnapshot = snapshots[0];
303+
const waitpointIds = await getSnapshotWaitpointIds(prisma, latestSnapshot.id);
304+
305+
// Step 4: Fetch waitpoints in chunks to avoid NAPI string conversion limits
306+
const waitpoints = await fetchWaitpointsInChunks(prisma, waitpointIds);
307+
308+
// Step 5: Build enhanced snapshots - only latest gets waitpoints, others get empty arrays
309+
// The runner only uses completedWaitpoints from the latest snapshot anyway
310+
return snapshots.reverse().map((snapshot) => {
311+
const isLatest = snapshot.id === latestSnapshot.id;
312+
return enhanceExecutionSnapshotWithWaitpoints(
313+
snapshot,
314+
isLatest ? waitpoints : [],
315+
latestSnapshot.completedWaitpointOrder
316+
);
317+
});
224318
}
225319

226320
export class ExecutionSnapshotSystem {

0 commit comments

Comments
 (0)