@@ -8,10 +8,14 @@ import {
88 TaskRunExecutionSnapshot ,
99 TaskRunExecutionStatus ,
1010 TaskRunStatus ,
11+ Waitpoint ,
1112} from "@trigger.dev/database" ;
1213import { HeartbeatTimeouts } from "../types.js" ;
1314import { SystemResources } from "./systems.js" ;
1415
16+ /** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */
17+ const WAITPOINT_CHUNK_SIZE = 100 ;
18+
1519export type ExecutionSnapshotSystemOptions = {
1620 resources : SystemResources ;
1721 heartbeatTimeouts : HeartbeatTimeouts ;
@@ -31,19 +35,41 @@ type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGe
3135 } ;
3236} > ;
3337
38+ type ExecutionSnapshotWithCheckpoint = Prisma . TaskRunExecutionSnapshotGetPayload < {
39+ include : {
40+ checkpoint : true ;
41+ } ;
42+ } > ;
43+
3444function enhanceExecutionSnapshot (
3545 snapshot : ExecutionSnapshotWithCheckAndWaitpoints
46+ ) : EnhancedExecutionSnapshot {
47+ return enhanceExecutionSnapshotWithWaitpoints (
48+ snapshot ,
49+ snapshot . completedWaitpoints ,
50+ snapshot . completedWaitpointOrder
51+ ) ;
52+ }
53+
54+ /**
55+ * Transforms a snapshot (with checkpoint but without waitpoints) into an EnhancedExecutionSnapshot
56+ * by combining it with pre-fetched waitpoints.
57+ */
58+ function enhanceExecutionSnapshotWithWaitpoints (
59+ snapshot : ExecutionSnapshotWithCheckpoint ,
60+ waitpoints : Waitpoint [ ] ,
61+ completedWaitpointOrder : string [ ]
3662) : EnhancedExecutionSnapshot {
3763 return {
3864 ...snapshot ,
3965 friendlyId : SnapshotId . toFriendlyId ( snapshot . id ) ,
4066 runFriendlyId : RunId . toFriendlyId ( snapshot . runId ) ,
41- completedWaitpoints : snapshot . completedWaitpoints . flatMap ( ( w ) => {
42- //get all indexes of the waitpoint in the completedWaitpointOrder
43- //we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
67+ completedWaitpoints : waitpoints . flatMap ( ( w ) => {
68+ // Get all indexes of the waitpoint in the completedWaitpointOrder
69+ // We do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
4470 let indexes : ( number | undefined ) [ ] = [ ] ;
45- for ( let i = 0 ; i < snapshot . completedWaitpointOrder . length ; i ++ ) {
46- if ( snapshot . completedWaitpointOrder [ i ] === w . id ) {
71+ for ( let i = 0 ; i < completedWaitpointOrder . length ; i ++ ) {
72+ if ( completedWaitpointOrder [ i ] === w . id ) {
4773 indexes . push ( i ) ;
4874 }
4975 }
@@ -60,9 +86,7 @@ function enhanceExecutionSnapshot(
6086 type : w . type ,
6187 completedAt : w . completedAt ?? new Date ( ) ,
6288 idempotencyKey :
63- w . userProvidedIdempotencyKey && ! w . inactiveIdempotencyKey
64- ? w . idempotencyKey
65- : undefined ,
89+ w . userProvidedIdempotencyKey && ! w . inactiveIdempotencyKey ? w . idempotencyKey : undefined ,
6690 completedByTaskRun : w . completedByTaskRunId
6791 ? {
6892 id : w . completedByTaskRunId ,
@@ -91,6 +115,42 @@ function enhanceExecutionSnapshot(
91115 } ;
92116}
93117
118+ /**
119+ * Gets the waitpoint IDs linked to a snapshot via the _completedWaitpoints join table.
120+ * Uses raw SQL to avoid fetching full waitpoint data.
121+ */
122+ async function getSnapshotWaitpointIds (
123+ prisma : PrismaClientOrTransaction ,
124+ snapshotId : string
125+ ) : Promise < string [ ] > {
126+ const result = await prisma . $queryRaw < { B : string } [ ] > `
127+ SELECT "B" FROM "_completedWaitpoints" WHERE "A" = ${ snapshotId }
128+ ` ;
129+ return result . map ( ( r ) => r . B ) ;
130+ }
131+
132+ /**
133+ * Fetches waitpoints in chunks to avoid NAPI string conversion limits.
134+ * This is necessary because waitpoints can have large outputs (100KB+),
135+ * and fetching many at once can exceed Node.js string limits.
136+ */
137+ async function fetchWaitpointsInChunks (
138+ prisma : PrismaClientOrTransaction ,
139+ waitpointIds : string [ ]
140+ ) : Promise < Waitpoint [ ] > {
141+ if ( waitpointIds . length === 0 ) return [ ] ;
142+
143+ const allWaitpoints : Waitpoint [ ] = [ ] ;
144+ for ( let i = 0 ; i < waitpointIds . length ; i += WAITPOINT_CHUNK_SIZE ) {
145+ const chunk = waitpointIds . slice ( i , i + WAITPOINT_CHUNK_SIZE ) ;
146+ const waitpoints = await prisma . waitpoint . findMany ( {
147+ where : { id : { in : chunk } } ,
148+ } ) ;
149+ allWaitpoints . push ( ...waitpoints ) ;
150+ }
151+ return allWaitpoints ;
152+ }
153+
94154/* Gets the most recent valid snapshot for a run */
95155export async function getLatestExecutionSnapshot (
96156 prisma : PrismaClientOrTransaction ,
@@ -191,12 +251,27 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
191251 } ;
192252}
193253
254+ /**
255+ * Gets execution snapshots created after the specified snapshot.
256+ *
257+ * IMPORTANT: This function is optimized to avoid N×M data explosion when runs have many
258+ * completed waitpoints. Due to the many-to-many relation, once waitpoints complete,
259+ * all subsequent snapshots have the same waitpoints linked. For a run with 24 snapshots
260+ * and 236 waitpoints with 100KB outputs each, fetching all waitpoints for all snapshots
261+ * would result in ~570MB of data, causing "Failed to convert rust String into napi string" errors.
262+ *
263+ * Solution: Only the LATEST snapshot's waitpoints are fetched and included. The runner's
264+ * SnapshotManager only processes completedWaitpoints from the latest snapshot anyway -
265+ * intermediate snapshots' waitpoints are ignored. This reduces data from N×M to just M.
266+ *
267+ * Waitpoints are fetched in chunks (100 at a time) to handle batches up to 1000 items.
268+ */
194269export async function getExecutionSnapshotsSince (
195270 prisma : PrismaClientOrTransaction ,
196271 runId : string ,
197272 sinceSnapshotId : string
198273) : Promise < EnhancedExecutionSnapshot [ ] > {
199- // Find the createdAt of the sinceSnapshotId
274+ // Step 1: Find the createdAt of the sinceSnapshotId
200275 const sinceSnapshot = await prisma . taskRunExecutionSnapshot . findFirst ( {
201276 where : { id : sinceSnapshotId } ,
202277 select : { createdAt : true } ,
@@ -206,21 +281,40 @@ export async function getExecutionSnapshotsSince(
206281 throw new Error ( `No execution snapshot found for id ${ sinceSnapshotId } ` ) ;
207282 }
208283
284+ // Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion
209285 const snapshots = await prisma . taskRunExecutionSnapshot . findMany ( {
210286 where : {
211287 runId,
212288 isValid : true ,
213289 createdAt : { gt : sinceSnapshot . createdAt } ,
214290 } ,
215291 include : {
216- completedWaitpoints : true ,
217292 checkpoint : true ,
293+ // DO NOT include completedWaitpoints here - this causes the N×M explosion
218294 } ,
219295 orderBy : { createdAt : "desc" } ,
220296 take : 50 ,
221297 } ) ;
222298
223- return snapshots . reverse ( ) . map ( enhanceExecutionSnapshot ) ;
299+ if ( snapshots . length === 0 ) return [ ] ;
300+
301+ // Step 3: Get waitpoint IDs for the LATEST snapshot only (first in desc order)
302+ const latestSnapshot = snapshots [ 0 ] ;
303+ const waitpointIds = await getSnapshotWaitpointIds ( prisma , latestSnapshot . id ) ;
304+
305+ // Step 4: Fetch waitpoints in chunks to avoid NAPI string conversion limits
306+ const waitpoints = await fetchWaitpointsInChunks ( prisma , waitpointIds ) ;
307+
308+ // Step 5: Build enhanced snapshots - only latest gets waitpoints, others get empty arrays
309+ // The runner only uses completedWaitpoints from the latest snapshot anyway
310+ return snapshots . reverse ( ) . map ( ( snapshot ) => {
311+ const isLatest = snapshot . id === latestSnapshot . id ;
312+ return enhanceExecutionSnapshotWithWaitpoints (
313+ snapshot ,
314+ isLatest ? waitpoints : [ ] ,
315+ latestSnapshot . completedWaitpointOrder
316+ ) ;
317+ } ) ;
224318}
225319
226320export class ExecutionSnapshotSystem {
0 commit comments