diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index b6f31bcffc..a224e5a86b 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -8,10 +8,14 @@ import { TaskRunExecutionSnapshot, TaskRunExecutionStatus, TaskRunStatus, + Waitpoint, } from "@trigger.dev/database"; import { HeartbeatTimeouts } from "../types.js"; import { SystemResources } from "./systems.js"; +/** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */ +const WAITPOINT_CHUNK_SIZE = 100; + export type ExecutionSnapshotSystemOptions = { resources: SystemResources; heartbeatTimeouts: HeartbeatTimeouts; @@ -31,19 +35,41 @@ type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGe }; }>; +type ExecutionSnapshotWithCheckpoint = Prisma.TaskRunExecutionSnapshotGetPayload<{ + include: { + checkpoint: true; + }; +}>; + function enhanceExecutionSnapshot( snapshot: ExecutionSnapshotWithCheckAndWaitpoints +): EnhancedExecutionSnapshot { + return enhanceExecutionSnapshotWithWaitpoints( + snapshot, + snapshot.completedWaitpoints, + snapshot.completedWaitpointOrder + ); +} + +/** + * Transforms a snapshot (with checkpoint but without waitpoints) into an EnhancedExecutionSnapshot + * by combining it with pre-fetched waitpoints. + */ +function enhanceExecutionSnapshotWithWaitpoints( + snapshot: ExecutionSnapshotWithCheckpoint, + waitpoints: Waitpoint[], + completedWaitpointOrder: string[] ): EnhancedExecutionSnapshot { return { ...snapshot, friendlyId: SnapshotId.toFriendlyId(snapshot.id), runFriendlyId: RunId.toFriendlyId(snapshot.runId), - completedWaitpoints: snapshot.completedWaitpoints.flatMap((w) => { - //get all indexes of the waitpoint in the completedWaitpointOrder - //we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey) + completedWaitpoints: waitpoints.flatMap((w) => { + // Get all indexes of the waitpoint in the completedWaitpointOrder + // We do this because the same run can be in a batch multiple times (i.e. same idempotencyKey) let indexes: (number | undefined)[] = []; - for (let i = 0; i < snapshot.completedWaitpointOrder.length; i++) { - if (snapshot.completedWaitpointOrder[i] === w.id) { + for (let i = 0; i < completedWaitpointOrder.length; i++) { + if (completedWaitpointOrder[i] === w.id) { indexes.push(i); } } @@ -60,9 +86,7 @@ function enhanceExecutionSnapshot( type: w.type, completedAt: w.completedAt ?? new Date(), idempotencyKey: - w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey - ? w.idempotencyKey - : undefined, + w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined, completedByTaskRun: w.completedByTaskRunId ? { id: w.completedByTaskRunId, @@ -91,6 +115,42 @@ function enhanceExecutionSnapshot( }; } +/** + * Gets the waitpoint IDs linked to a snapshot via the _completedWaitpoints join table. + * Uses raw SQL to avoid fetching full waitpoint data. + */ +async function getSnapshotWaitpointIds( + prisma: PrismaClientOrTransaction, + snapshotId: string +): Promise { + const result = await prisma.$queryRaw<{ B: string }[]>` + SELECT "B" FROM "_completedWaitpoints" WHERE "A" = ${snapshotId} + `; + return result.map((r) => r.B); +} + +/** + * Fetches waitpoints in chunks to avoid NAPI string conversion limits. + * This is necessary because waitpoints can have large outputs (100KB+), + * and fetching many at once can exceed Node.js string limits. + */ +async function fetchWaitpointsInChunks( + prisma: PrismaClientOrTransaction, + waitpointIds: string[] +): Promise { + if (waitpointIds.length === 0) return []; + + const allWaitpoints: Waitpoint[] = []; + for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) { + const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE); + const waitpoints = await prisma.waitpoint.findMany({ + where: { id: { in: chunk } }, + }); + allWaitpoints.push(...waitpoints); + } + return allWaitpoints; +} + /* Gets the most recent valid snapshot for a run */ export async function getLatestExecutionSnapshot( prisma: PrismaClientOrTransaction, @@ -191,12 +251,27 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): }; } +/** + * Gets execution snapshots created after the specified snapshot. + * + * IMPORTANT: This function is optimized to avoid N×M data explosion when runs have many + * completed waitpoints. Due to the many-to-many relation, once waitpoints complete, + * all subsequent snapshots have the same waitpoints linked. For a run with 24 snapshots + * and 236 waitpoints with 100KB outputs each, fetching all waitpoints for all snapshots + * would result in ~570MB of data, causing "Failed to convert rust String into napi string" errors. + * + * Solution: Only the LATEST snapshot's waitpoints are fetched and included. The runner's + * SnapshotManager only processes completedWaitpoints from the latest snapshot anyway - + * intermediate snapshots' waitpoints are ignored. This reduces data from N×M to just M. + * + * Waitpoints are fetched in chunks (100 at a time) to handle batches up to 1000 items. + */ export async function getExecutionSnapshotsSince( prisma: PrismaClientOrTransaction, runId: string, sinceSnapshotId: string ): Promise { - // Find the createdAt of the sinceSnapshotId + // Step 1: Find the createdAt of the sinceSnapshotId const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({ where: { id: sinceSnapshotId }, select: { createdAt: true }, @@ -206,6 +281,7 @@ export async function getExecutionSnapshotsSince( throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`); } + // Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ where: { runId, @@ -213,14 +289,32 @@ export async function getExecutionSnapshotsSince( createdAt: { gt: sinceSnapshot.createdAt }, }, include: { - completedWaitpoints: true, checkpoint: true, + // DO NOT include completedWaitpoints here - this causes the N×M explosion }, orderBy: { createdAt: "desc" }, take: 50, }); - return snapshots.reverse().map(enhanceExecutionSnapshot); + if (snapshots.length === 0) return []; + + // Step 3: Get waitpoint IDs for the LATEST snapshot only (first in desc order) + const latestSnapshot = snapshots[0]; + const waitpointIds = await getSnapshotWaitpointIds(prisma, latestSnapshot.id); + + // Step 4: Fetch waitpoints in chunks to avoid NAPI string conversion limits + const waitpoints = await fetchWaitpointsInChunks(prisma, waitpointIds); + + // Step 5: Build enhanced snapshots - only latest gets waitpoints, others get empty arrays + // The runner only uses completedWaitpoints from the latest snapshot anyway + return snapshots.reverse().map((snapshot) => { + const isLatest = snapshot.id === latestSnapshot.id; + return enhanceExecutionSnapshotWithWaitpoints( + snapshot, + isLatest ? waitpoints : [], + latestSnapshot.completedWaitpointOrder + ); + }); } export class ExecutionSnapshotSystem { diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts new file mode 100644 index 0000000000..4352e72686 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -0,0 +1,670 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect, describe } from "vitest"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { setTimeout } from "node:timers/promises"; +import { + generateTestScenarios, + type SnapshotTestScenario, +} from "./helpers/executionStateMachine.js"; +import { + createWaitpointsWithOutput, + setupTestScenario, + generateLargeOutput, +} from "./helpers/snapshotTestHelpers.js"; +import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; + +vi.setConfig({ testTimeout: 120_000 }); + +describe("RunEngine getSnapshotsSince", () => { + containerTest( + "returns empty array when querying from latest snapshot", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_empty", + spanId: "s_empty", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_empty", + workerQueue: "main", + }); + + // Get all snapshots + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + expect(allSnapshots.length).toBeGreaterThan(0); + + // Query from the last snapshot + const lastSnapshot = allSnapshots[allSnapshots.length - 1]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: lastSnapshot.id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "returns snapshots after the specified one with waitpoints only on latest", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_wp", + spanId: "s_wp", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_wp", + workerQueue: "main", + }); + + // Start attempt + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + // Create and block with a waitpoint + const { waitpoint } = await engine.createDateTimeWaitpoint({ + projectId: authenticatedEnvironment.project.id, + environmentId: authenticatedEnvironment.id, + completedAfter: new Date(Date.now() + 50), + }); + + await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: [waitpoint.id], + projectId: authenticatedEnvironment.project.id, + organizationId: authenticatedEnvironment.organization.id, + }); + + // Wait for waitpoint completion + await setTimeout(200); + + // Get all snapshots + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + expect(allSnapshots.length).toBeGreaterThanOrEqual(3); + + // Query from the first snapshot + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: allSnapshots[0].id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBeGreaterThanOrEqual(2); + + // The latest snapshot should have completedWaitpoints + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBeGreaterThan(0); + + // Earlier snapshots should have empty waitpoints (optimization) + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "handles multiple waitpoints correctly - only latest has them", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_mwp", + spanId: "s_mwp", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_mwp", + workerQueue: "main", + }); + + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + // Create multiple waitpoints + const waitpointCount = 5; + const waitpointPromises = Array.from({ length: waitpointCount }).map(() => + engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }) + ); + const waitpoints = await Promise.all(waitpointPromises); + + // Block the run with all waitpoints + for (const { waitpoint } of waitpoints) { + await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + } + + // Complete all waitpoints + for (const { waitpoint } of waitpoints) { + await engine.completeWaitpoint({ id: waitpoint.id }); + } + + await setTimeout(500); + + // Get all snapshots + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + // Query from early in the sequence + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: allSnapshots[0].id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBeGreaterThan(0); + + // Only the latest should have waitpoints + const latest = result![result!.length - 1]; + + // Earlier snapshots must have empty completedWaitpoints + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest("returns null for invalid snapshot ID", async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_invalid", + spanId: "s_invalid", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + // Query with invalid snapshot ID + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: "invalid-snapshot-id", + }); + + // Should return null (caught by getSnapshotsSince error handler) + expect(result).toBeNull(); + } finally { + await engine.quit(); + } + }); + + // Direct database tests for the core function + containerTest( + "direct test: large waitpoint scenario - 100 waitpoints with 10KB outputs", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + // Create scenario directly in database + const scenario = await setupTestScenario(prisma, authenticatedEnvironment, { + totalWaitpoints: 100, + outputSizeKB: 10, + snapshotConfigs: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING", completedWaitpointCount: 100 }, + { status: "FINISHED", completedWaitpointCount: 100 }, + ], + }); + + // Query from early snapshot + const result = await engine.getSnapshotsSince({ + runId: scenario.run.id, + snapshotId: scenario.snapshots[2].id, // After PENDING_EXECUTING + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBe(6); // EXECUTING through FINISHED + + // Latest should have all 100 waitpoints + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBe(100); + + // Verify all earlier snapshots have empty waitpoints + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "direct test: zombie run scenario - 236 waitpoints with 100KB outputs, 24 snapshots", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + // This scenario matches the exact conditions that caused the NAPI error + // 24 snapshots × 236 waitpoints × 100KB = ~570MB if not optimized + const scenario = await setupTestScenario(prisma, authenticatedEnvironment, { + totalWaitpoints: 236, + outputSizeKB: 100, + snapshotConfigs: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 200 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + ], + }); + + expect(scenario.snapshots.length).toBe(24); + expect(scenario.waitpoints.length).toBe(236); + + // Query from the 6th snapshot (after waitpoints start completing) + const queryFromIndex = 5; + const result = await engine.getSnapshotsSince({ + runId: scenario.run.id, + snapshotId: scenario.snapshots[queryFromIndex].id, + }); + + expect(result).not.toBeNull(); + // Should return snapshots after index 5, which is 24 - 6 = 18 snapshots + expect(result!.length).toBe(24 - queryFromIndex - 1); + + // Latest should have all 236 waitpoints + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBe(236); + + // All other snapshots should have 0 waitpoints (optimization) + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + + // Verify the outputs are present and correct size + for (const wp of latest.completedWaitpoints) { + expect(wp.output).toBeDefined(); + // ~100KB output as JSON string + expect(typeof wp.output).toBe("string"); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "direct test: verifies chunked fetching works with 500+ waitpoints", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + // 500 waitpoints requires 5 chunks (100 per chunk) + const scenario = await setupTestScenario(prisma, authenticatedEnvironment, { + totalWaitpoints: 500, + outputSizeKB: 10, // Smaller outputs for faster test + snapshotConfigs: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 500 }, + { status: "EXECUTING", completedWaitpointCount: 500 }, + ], + }); + + const result = await engine.getSnapshotsSince({ + runId: scenario.run.id, + snapshotId: scenario.snapshots[0].id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBe(4); + + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBe(500); + + // All other snapshots should be empty + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts b/internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts new file mode 100644 index 0000000000..4dd92cdbd4 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts @@ -0,0 +1,257 @@ +import { TaskRunExecutionStatus } from "@trigger.dev/database"; + +/** + * Defines valid execution status transitions for the Run Engine 2.0. + * This is a model of the state machine that governs run execution. + */ +export const EXECUTION_STATUS_TRANSITIONS: Record< + TaskRunExecutionStatus, + TaskRunExecutionStatus[] +> = { + RUN_CREATED: ["QUEUED", "DELAYED"], + DELAYED: ["QUEUED"], + QUEUED: ["PENDING_EXECUTING", "QUEUED_EXECUTING"], + QUEUED_EXECUTING: ["PENDING_EXECUTING", "QUEUED"], + PENDING_EXECUTING: ["EXECUTING", "PENDING_CANCEL", "FINISHED", "QUEUED"], + EXECUTING: ["EXECUTING_WITH_WAITPOINTS", "FINISHED", "PENDING_CANCEL", "QUEUED"], + EXECUTING_WITH_WAITPOINTS: ["EXECUTING", "SUSPENDED", "FINISHED", "PENDING_CANCEL"], + SUSPENDED: ["QUEUED", "PENDING_CANCEL", "FINISHED"], + PENDING_CANCEL: ["FINISHED"], + FINISHED: ["QUEUED"], // Retry case +}; + +/** + * Validates if a transition from one status to another is valid. + */ +export function isValidTransition( + from: TaskRunExecutionStatus, + to: TaskRunExecutionStatus +): boolean { + return EXECUTION_STATUS_TRANSITIONS[from]?.includes(to) ?? false; +} + +/** + * Configuration for a snapshot in a test scenario. + */ +export interface SnapshotConfig { + /** The execution status for this snapshot */ + status: TaskRunExecutionStatus; + /** Number of waitpoints completed at this snapshot (cumulative) */ + completedWaitpointCount: number; + /** Whether this snapshot has a checkpoint */ + hasCheckpoint?: boolean; + /** Description for the snapshot */ + description?: string; +} + +/** + * A test scenario for getSnapshotsSince testing. + */ +export interface SnapshotTestScenario { + /** Unique name for the scenario */ + name: string; + /** Description of what this scenario tests */ + description: string; + /** Total number of waitpoints to create */ + totalWaitpoints: number; + /** Size of each waitpoint's output in KB */ + outputSizeKB: number; + /** Configuration for each snapshot to create */ + snapshots: SnapshotConfig[]; + /** Which snapshot index to query "since" (0-based) */ + queryFromIndex: number; + /** Expected number of waitpoints on the latest snapshot returned */ + expectedWaitpointsOnLatest: number; +} + +/** + * Generates test scenarios for comprehensive getSnapshotsSince testing. + * These scenarios cover various edge cases and stress tests. + */ +export function generateTestScenarios(): SnapshotTestScenario[] { + return [ + { + name: "simple_no_waitpoints", + description: "Basic run without any waitpoints", + totalWaitpoints: 0, + outputSizeKB: 0, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "FINISHED", completedWaitpointCount: 0 }, + ], + queryFromIndex: 0, + expectedWaitpointsOnLatest: 0, + }, + { + name: "single_small_waitpoint", + description: "Single waitpoint with small output", + totalWaitpoints: 1, + outputSizeKB: 1, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 1 }, + ], + queryFromIndex: 2, + expectedWaitpointsOnLatest: 1, + }, + { + name: "batch_100_medium", + description: "Medium batch with 100 waitpoints and medium outputs", + totalWaitpoints: 100, + outputSizeKB: 10, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "SUSPENDED", completedWaitpointCount: 100, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 100 }, + { status: "EXECUTING", completedWaitpointCount: 100 }, + { status: "FINISHED", completedWaitpointCount: 100 }, + ], + queryFromIndex: 3, + expectedWaitpointsOnLatest: 100, + }, + { + name: "batch_236_large_zombie_scenario", + description: + "Matches the zombie run scenario: 24 snapshots, 236 waitpoints, 100KB outputs each", + totalWaitpoints: 236, + outputSizeKB: 100, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 150 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 200 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + ], + queryFromIndex: 6, + expectedWaitpointsOnLatest: 236, + }, + { + name: "batch_500_large", + description: "Large batch requiring chunked fetching", + totalWaitpoints: 500, + outputSizeKB: 50, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 250 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 400 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 500 }, + { status: "SUSPENDED", completedWaitpointCount: 500, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 500 }, + { status: "EXECUTING", completedWaitpointCount: 500 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 500 }, + { status: "SUSPENDED", completedWaitpointCount: 500, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 500 }, + { status: "EXECUTING", completedWaitpointCount: 500 }, + ], + queryFromIndex: 5, + expectedWaitpointsOnLatest: 500, + }, + { + name: "system_failure_finished", + description: "Latest snapshot is FINISHED status with completed waitpoints", + totalWaitpoints: 100, + outputSizeKB: 50, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING", completedWaitpointCount: 100 }, + { status: "FINISHED", completedWaitpointCount: 100 }, + ], + queryFromIndex: 3, + expectedWaitpointsOnLatest: 100, + }, + { + name: "query_from_latest", + description: "Querying from the latest snapshot should return empty array", + totalWaitpoints: 10, + outputSizeKB: 10, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 10 }, + ], + queryFromIndex: 4, // The last snapshot + expectedWaitpointsOnLatest: 0, // No snapshots returned, so no waitpoints + }, + { + name: "requeue_loop", + description: "Multiple QUEUED->PENDING_EXECUTING cycles with waitpoints", + totalWaitpoints: 236, + outputSizeKB: 100, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued again + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued again + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + ], + queryFromIndex: 7, + expectedWaitpointsOnLatest: 236, + }, + ]; +} diff --git a/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts b/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts new file mode 100644 index 0000000000..f981f35145 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts @@ -0,0 +1,322 @@ +import { generateFriendlyId, WaitpointId } from "@trigger.dev/core/v3/isomorphic"; +import { + PrismaClient, + TaskRunExecutionSnapshot, + TaskRunExecutionStatus, + Waitpoint, + WaitpointStatus, +} from "@trigger.dev/database"; +import type { AuthenticatedEnvironment } from "../setup.js"; + +/** + * Generates a large output string of the specified size in KB. + * The output is a valid JSON string to simulate realistic waitpoint output. + */ +export function generateLargeOutput(sizeKB: number): string { + if (sizeKB <= 0) return JSON.stringify({ data: "" }); + + // Create a string that's approximately the target size + // Account for JSON wrapper overhead + const targetBytes = sizeKB * 1024; + const overhead = JSON.stringify({ data: "" }).length; + const payloadSize = Math.max(0, targetBytes - overhead); + + // Generate a payload of repeating 'x' characters + const payload = "x".repeat(payloadSize); + return JSON.stringify({ data: payload }); +} + +/** + * Creates waitpoints with specified output sizes for testing. + */ +export async function createWaitpointsWithOutput( + prisma: PrismaClient, + count: number, + outputSizeKB: number, + environmentId: string, + projectId: string +): Promise { + if (count === 0) return []; + + const output = generateLargeOutput(outputSizeKB); + const waitpoints: Waitpoint[] = []; + + // Create waitpoints in batches to avoid overwhelming the database + const batchSize = 50; + for (let i = 0; i < count; i += batchSize) { + const batchCount = Math.min(batchSize, count - i); + const batch = await Promise.all( + Array.from({ length: batchCount }).map(async (_, j) => { + const waitpointIds = WaitpointId.generate(); + return prisma.waitpoint.create({ + data: { + id: waitpointIds.id, + friendlyId: waitpointIds.friendlyId, + type: "MANUAL", + status: "COMPLETED" as WaitpointStatus, + idempotencyKey: `test-idempotency-${waitpointIds.id}`, + userProvidedIdempotencyKey: false, + completedAt: new Date(), + output, + outputType: "application/json", + outputIsError: false, + environmentId, + projectId, + }, + }); + }) + ); + waitpoints.push(...batch); + } + + return waitpoints; +} + +/** + * Creates a snapshot directly in the database for testing purposes. + * This bypasses the normal engine flow to allow creating specific test scenarios. + */ +export async function createTestSnapshot( + prisma: PrismaClient, + { + runId, + status, + environmentId, + environmentType, + projectId, + organizationId, + completedWaitpointIds, + checkpointId, + previousSnapshotId, + batchId, + workerId, + runnerId, + attemptNumber, + }: { + runId: string; + status: TaskRunExecutionStatus; + environmentId: string; + environmentType: "PRODUCTION" | "STAGING" | "DEVELOPMENT" | "PREVIEW"; + projectId: string; + organizationId: string; + completedWaitpointIds?: string[]; + checkpointId?: string; + previousSnapshotId?: string; + batchId?: string; + workerId?: string; + runnerId?: string; + attemptNumber?: number; + } +): Promise { + // Determine run status based on execution status + const runStatus = getRunStatusFromExecutionStatus(status); + + const snapshot = await prisma.taskRunExecutionSnapshot.create({ + data: { + engine: "V2", + executionStatus: status, + description: `Test snapshot: ${status}`, + previousSnapshotId, + runId, + runStatus, + attemptNumber, + batchId, + environmentId, + environmentType, + projectId, + organizationId, + checkpointId, + workerId, + runnerId, + isValid: true, + completedWaitpoints: completedWaitpointIds + ? { + connect: completedWaitpointIds.map((id) => ({ id })), + } + : undefined, + completedWaitpointOrder: completedWaitpointIds ?? [], + }, + }); + + // Small delay to ensure different createdAt timestamps + await new Promise((resolve) => setTimeout(resolve, 5)); + + return snapshot; +} + +/** + * Maps execution status to run status for test snapshot creation. + */ +function getRunStatusFromExecutionStatus( + status: TaskRunExecutionStatus +): "PENDING" | "EXECUTING" | "WAITING_FOR_DEPLOY" | "COMPLETED_SUCCESSFULLY" | "SYSTEM_FAILURE" { + switch (status) { + case "RUN_CREATED": + case "QUEUED": + case "QUEUED_EXECUTING": + case "PENDING_EXECUTING": + case "DELAYED": + return "PENDING"; + case "EXECUTING": + case "EXECUTING_WITH_WAITPOINTS": + case "SUSPENDED": + case "PENDING_CANCEL": + return "EXECUTING"; + case "FINISHED": + return "COMPLETED_SUCCESSFULLY"; + default: + return "PENDING"; + } +} + +/** + * Creates a checkpoint for testing suspended snapshots. + */ +export async function createTestCheckpoint( + prisma: PrismaClient, + { + runId, + environmentId, + projectId, + }: { + runId: string; + environmentId: string; + projectId: string; + } +) { + return prisma.taskRunCheckpoint.create({ + data: { + friendlyId: generateFriendlyId("checkpoint"), + type: "DOCKER", + location: `s3://test-bucket/checkpoints/${runId}`, + imageRef: `test-image:${runId}`, + reason: "WAIT_FOR_DURATION", + runtimeEnvironment: { + connect: { id: environmentId }, + }, + project: { + connect: { id: projectId }, + }, + }, + }); +} + +/** + * Interface for a complete test scenario setup result. + */ +export interface TestScenarioResult { + run: { + id: string; + friendlyId: string; + }; + snapshots: TaskRunExecutionSnapshot[]; + waitpoints: Waitpoint[]; + checkpoints: Array<{ id: string }>; +} + +/** + * Sets up a complete test scenario with run, snapshots, waitpoints, and checkpoints. + * This creates the full database state needed for testing getSnapshotsSince. + */ +export async function setupTestScenario( + prisma: PrismaClient, + environment: AuthenticatedEnvironment, + { + totalWaitpoints, + outputSizeKB, + snapshotConfigs, + }: { + totalWaitpoints: number; + outputSizeKB: number; + snapshotConfigs: Array<{ + status: TaskRunExecutionStatus; + completedWaitpointCount: number; + hasCheckpoint?: boolean; + }>; + } +): Promise { + // Create waitpoints first + const waitpoints = await createWaitpointsWithOutput( + prisma, + totalWaitpoints, + outputSizeKB, + environment.id, + environment.project.id + ); + + // Create the run + const runFriendlyId = generateFriendlyId("run"); + const run = await prisma.taskRun.create({ + data: { + friendlyId: runFriendlyId, + engine: "V2", + status: "PENDING", + runtimeEnvironmentId: environment.id, + environmentType: environment.type, + organizationId: environment.organization.id, + projectId: environment.project.id, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + traceId: `trace_${runFriendlyId}`, + spanId: `span_${runFriendlyId}`, + context: {}, + traceContext: {}, + isTest: false, + queue: "task/test-task", + workerQueue: "main", + }, + }); + + // Create snapshots in order + const snapshots: TaskRunExecutionSnapshot[] = []; + const checkpoints: Array<{ id: string }> = []; + let previousSnapshotId: string | undefined; + let attemptNumber = 0; + + for (const config of snapshotConfigs) { + // Create checkpoint if needed + let checkpointId: string | undefined; + if (config.hasCheckpoint) { + const checkpoint = await createTestCheckpoint(prisma, { + runId: run.id, + environmentId: environment.id, + projectId: environment.project.id, + }); + checkpointId = checkpoint.id; + checkpoints.push({ id: checkpoint.id }); + } + + // Increment attempt number when entering a new execution attempt + // PENDING_EXECUTING is the entry point - EXECUTING follows within the same attempt + if (config.status === "PENDING_EXECUTING") { + attemptNumber++; + } + + // Get the waitpoint IDs that should be "completed" at this snapshot + const completedWaitpointIds = waitpoints.slice(0, config.completedWaitpointCount).map((w) => w.id); + + const snapshot = await createTestSnapshot(prisma, { + runId: run.id, + status: config.status, + environmentId: environment.id, + environmentType: environment.type, + projectId: environment.project.id, + organizationId: environment.organization.id, + completedWaitpointIds, + checkpointId, + previousSnapshotId, + attemptNumber, + }); + + snapshots.push(snapshot); + previousSnapshotId = snapshot.id; + } + + return { + run: { id: run.id, friendlyId: runFriendlyId }, + snapshots, + waitpoints, + checkpoints, + }; +}