From 6f172cd7c273a74639c043a4dda76cfe5538ac0a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 29 Jan 2026 17:00:50 +0000 Subject: [PATCH 1/2] fix(run-engine): avoid NAPI string overflow in getExecutionSnapshotsSince by only fetching waitpoints for latest snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix Prisma NAPI errors ("Failed to convert rust String into napi string") when runs have many completed waitpoints - Only fetch waitpoints for the latest snapshot instead of all snapshots, avoiding N×M data explosion - Add chunked waitpoint fetching to prevent memory spikes ## Problem The `getExecutionSnapshotsSince` query was causing data explosion when runs had many completed waitpoints: - 24 snapshots × 236 waitpoints × ~100KB outputs = ~570MB - This exceeded Node.js/NAPI string conversion limits ## Solution Modified the query to only attach waitpoints to the **latest** snapshot. This is safe because the runner's `SnapshotManager` only processes waitpoints from the latest snapshot - intermediate snapshots are only checked for deprecated status. Changes: 1. Fetch snapshots without waitpoints first 2. Use raw SQL on `_completedWaitpoints` join table to get waitpoint IDs for latest snapshot only 3. Fetch waitpoints in chunks of 100 to avoid memory spikes 4. Build response with waitpoints only on latest snapshot (earlier snapshots get empty arrays) ## Test plan - [x] Added 7 new tests covering empty results, single snapshot, multiple snapshots, ordering, and waitpoint attachment - [x] Verified runner assumption by tracing `SnapshotManager` code path - [x] Created diagnostic scripts to validate against production data --- .../engine/systems/executionSnapshotSystem.ts | 116 ++- .../engine/tests/getSnapshotsSince.test.ts | 670 ++++++++++++++++++ .../tests/helpers/executionStateMachine.ts | 257 +++++++ .../tests/helpers/snapshotTestHelpers.ts | 321 +++++++++ 4 files changed, 1353 insertions(+), 11 deletions(-) create mode 100644 internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts create mode 100644 internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts create mode 100644 internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index b6f31bcffc..a224e5a86b 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -8,10 +8,14 @@ import { TaskRunExecutionSnapshot, TaskRunExecutionStatus, TaskRunStatus, + Waitpoint, } from "@trigger.dev/database"; import { HeartbeatTimeouts } from "../types.js"; import { SystemResources } from "./systems.js"; +/** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */ +const WAITPOINT_CHUNK_SIZE = 100; + export type ExecutionSnapshotSystemOptions = { resources: SystemResources; heartbeatTimeouts: HeartbeatTimeouts; @@ -31,19 +35,41 @@ type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGe }; }>; +type ExecutionSnapshotWithCheckpoint = Prisma.TaskRunExecutionSnapshotGetPayload<{ + include: { + checkpoint: true; + }; +}>; + function enhanceExecutionSnapshot( snapshot: ExecutionSnapshotWithCheckAndWaitpoints +): EnhancedExecutionSnapshot { + return enhanceExecutionSnapshotWithWaitpoints( + snapshot, + snapshot.completedWaitpoints, + snapshot.completedWaitpointOrder + ); +} + +/** + * Transforms a snapshot (with checkpoint but without waitpoints) into an EnhancedExecutionSnapshot + * by combining it with pre-fetched waitpoints. + */ +function enhanceExecutionSnapshotWithWaitpoints( + snapshot: ExecutionSnapshotWithCheckpoint, + waitpoints: Waitpoint[], + completedWaitpointOrder: string[] ): EnhancedExecutionSnapshot { return { ...snapshot, friendlyId: SnapshotId.toFriendlyId(snapshot.id), runFriendlyId: RunId.toFriendlyId(snapshot.runId), - completedWaitpoints: snapshot.completedWaitpoints.flatMap((w) => { - //get all indexes of the waitpoint in the completedWaitpointOrder - //we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey) + completedWaitpoints: waitpoints.flatMap((w) => { + // Get all indexes of the waitpoint in the completedWaitpointOrder + // We do this because the same run can be in a batch multiple times (i.e. same idempotencyKey) let indexes: (number | undefined)[] = []; - for (let i = 0; i < snapshot.completedWaitpointOrder.length; i++) { - if (snapshot.completedWaitpointOrder[i] === w.id) { + for (let i = 0; i < completedWaitpointOrder.length; i++) { + if (completedWaitpointOrder[i] === w.id) { indexes.push(i); } } @@ -60,9 +86,7 @@ function enhanceExecutionSnapshot( type: w.type, completedAt: w.completedAt ?? new Date(), idempotencyKey: - w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey - ? w.idempotencyKey - : undefined, + w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined, completedByTaskRun: w.completedByTaskRunId ? { id: w.completedByTaskRunId, @@ -91,6 +115,42 @@ function enhanceExecutionSnapshot( }; } +/** + * Gets the waitpoint IDs linked to a snapshot via the _completedWaitpoints join table. + * Uses raw SQL to avoid fetching full waitpoint data. + */ +async function getSnapshotWaitpointIds( + prisma: PrismaClientOrTransaction, + snapshotId: string +): Promise { + const result = await prisma.$queryRaw<{ B: string }[]>` + SELECT "B" FROM "_completedWaitpoints" WHERE "A" = ${snapshotId} + `; + return result.map((r) => r.B); +} + +/** + * Fetches waitpoints in chunks to avoid NAPI string conversion limits. + * This is necessary because waitpoints can have large outputs (100KB+), + * and fetching many at once can exceed Node.js string limits. + */ +async function fetchWaitpointsInChunks( + prisma: PrismaClientOrTransaction, + waitpointIds: string[] +): Promise { + if (waitpointIds.length === 0) return []; + + const allWaitpoints: Waitpoint[] = []; + for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) { + const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE); + const waitpoints = await prisma.waitpoint.findMany({ + where: { id: { in: chunk } }, + }); + allWaitpoints.push(...waitpoints); + } + return allWaitpoints; +} + /* Gets the most recent valid snapshot for a run */ export async function getLatestExecutionSnapshot( prisma: PrismaClientOrTransaction, @@ -191,12 +251,27 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): }; } +/** + * Gets execution snapshots created after the specified snapshot. + * + * IMPORTANT: This function is optimized to avoid N×M data explosion when runs have many + * completed waitpoints. Due to the many-to-many relation, once waitpoints complete, + * all subsequent snapshots have the same waitpoints linked. For a run with 24 snapshots + * and 236 waitpoints with 100KB outputs each, fetching all waitpoints for all snapshots + * would result in ~570MB of data, causing "Failed to convert rust String into napi string" errors. + * + * Solution: Only the LATEST snapshot's waitpoints are fetched and included. The runner's + * SnapshotManager only processes completedWaitpoints from the latest snapshot anyway - + * intermediate snapshots' waitpoints are ignored. This reduces data from N×M to just M. + * + * Waitpoints are fetched in chunks (100 at a time) to handle batches up to 1000 items. + */ export async function getExecutionSnapshotsSince( prisma: PrismaClientOrTransaction, runId: string, sinceSnapshotId: string ): Promise { - // Find the createdAt of the sinceSnapshotId + // Step 1: Find the createdAt of the sinceSnapshotId const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({ where: { id: sinceSnapshotId }, select: { createdAt: true }, @@ -206,6 +281,7 @@ export async function getExecutionSnapshotsSince( throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`); } + // Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ where: { runId, @@ -213,14 +289,32 @@ export async function getExecutionSnapshotsSince( createdAt: { gt: sinceSnapshot.createdAt }, }, include: { - completedWaitpoints: true, checkpoint: true, + // DO NOT include completedWaitpoints here - this causes the N×M explosion }, orderBy: { createdAt: "desc" }, take: 50, }); - return snapshots.reverse().map(enhanceExecutionSnapshot); + if (snapshots.length === 0) return []; + + // Step 3: Get waitpoint IDs for the LATEST snapshot only (first in desc order) + const latestSnapshot = snapshots[0]; + const waitpointIds = await getSnapshotWaitpointIds(prisma, latestSnapshot.id); + + // Step 4: Fetch waitpoints in chunks to avoid NAPI string conversion limits + const waitpoints = await fetchWaitpointsInChunks(prisma, waitpointIds); + + // Step 5: Build enhanced snapshots - only latest gets waitpoints, others get empty arrays + // The runner only uses completedWaitpoints from the latest snapshot anyway + return snapshots.reverse().map((snapshot) => { + const isLatest = snapshot.id === latestSnapshot.id; + return enhanceExecutionSnapshotWithWaitpoints( + snapshot, + isLatest ? waitpoints : [], + latestSnapshot.completedWaitpointOrder + ); + }); } export class ExecutionSnapshotSystem { diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts new file mode 100644 index 0000000000..dd291af4a0 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -0,0 +1,670 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect, describe } from "vitest"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { setTimeout } from "node:timers/promises"; +import { + generateTestScenarios, + type SnapshotTestScenario, +} from "./helpers/executionStateMachine.js"; +import { + createWaitpointsWithOutput, + setupTestScenario, + generateLargeOutput, +} from "./helpers/snapshotTestHelpers.js"; +import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; + +vi.setConfig({ testTimeout: 120_000 }); + +describe("RunEngine getSnapshotsSince", () => { + containerTest( + "returns empty array when querying from latest snapshot", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_empty", + spanId: "s_empty", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_empty", + workerQueue: "main", + }); + + // Get all snapshots + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + expect(allSnapshots.length).toBeGreaterThan(0); + + // Query from the last snapshot + const lastSnapshot = allSnapshots[allSnapshots.length - 1]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: lastSnapshot.id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "returns snapshots after the specified one with waitpoints only on latest", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_wp", + spanId: "s_wp", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_wp", + workerQueue: "main", + }); + + // Start attempt + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + // Create and block with a waitpoint + const { waitpoint } = await engine.createDateTimeWaitpoint({ + projectId: authenticatedEnvironment.project.id, + environmentId: authenticatedEnvironment.id, + completedAfter: new Date(Date.now() + 50), + }); + + await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: [waitpoint.id], + projectId: authenticatedEnvironment.project.id, + organizationId: authenticatedEnvironment.organization.id, + }); + + // Wait for waitpoint completion + await setTimeout(200); + + // Get all snapshots + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + expect(allSnapshots.length).toBeGreaterThanOrEqual(3); + + // Query from the first snapshot + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: allSnapshots[0].id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBeGreaterThanOrEqual(2); + + // The latest snapshot should have completedWaitpoints + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBeGreaterThanOrEqual(0); + + // Earlier snapshots should have empty waitpoints (optimization) + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "handles multiple waitpoints correctly - only latest has them", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_mwp", + spanId: "s_mwp", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_mwp", + workerQueue: "main", + }); + + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + // Create multiple waitpoints + const waitpointCount = 5; + const waitpointPromises = Array.from({ length: waitpointCount }).map(() => + engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }) + ); + const waitpoints = await Promise.all(waitpointPromises); + + // Block the run with all waitpoints + for (const { waitpoint } of waitpoints) { + await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + } + + // Complete all waitpoints + for (const { waitpoint } of waitpoints) { + await engine.completeWaitpoint({ id: waitpoint.id }); + } + + await setTimeout(500); + + // Get all snapshots + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + // Query from early in the sequence + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: allSnapshots[0].id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBeGreaterThan(0); + + // Only the latest should have waitpoints + const latest = result![result!.length - 1]; + + // Earlier snapshots must have empty completedWaitpoints + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest("returns null for invalid snapshot ID", async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_invalid", + spanId: "s_invalid", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + // Query with invalid snapshot ID + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: "invalid-snapshot-id", + }); + + // Should return null (caught by getSnapshotsSince error handler) + expect(result).toBeNull(); + } finally { + await engine.quit(); + } + }); + + // Direct database tests for the core function + containerTest( + "direct test: large waitpoint scenario - 100 waitpoints with 10KB outputs", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + // Create scenario directly in database + const scenario = await setupTestScenario(prisma, authenticatedEnvironment, { + totalWaitpoints: 100, + outputSizeKB: 10, + snapshotConfigs: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING", completedWaitpointCount: 100 }, + { status: "FINISHED", completedWaitpointCount: 100 }, + ], + }); + + // Query from early snapshot + const result = await engine.getSnapshotsSince({ + runId: scenario.run.id, + snapshotId: scenario.snapshots[2].id, // After PENDING_EXECUTING + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBe(6); // EXECUTING through FINISHED + + // Latest should have all 100 waitpoints + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBe(100); + + // Verify all earlier snapshots have empty waitpoints + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "direct test: zombie run scenario - 236 waitpoints with 100KB outputs, 24 snapshots", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + // This scenario matches the exact conditions that caused the NAPI error + // 24 snapshots × 236 waitpoints × 100KB = ~570MB if not optimized + const scenario = await setupTestScenario(prisma, authenticatedEnvironment, { + totalWaitpoints: 236, + outputSizeKB: 100, + snapshotConfigs: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 200 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + ], + }); + + expect(scenario.snapshots.length).toBe(24); + expect(scenario.waitpoints.length).toBe(236); + + // Query from the 6th snapshot (after waitpoints start completing) + const queryFromIndex = 5; + const result = await engine.getSnapshotsSince({ + runId: scenario.run.id, + snapshotId: scenario.snapshots[queryFromIndex].id, + }); + + expect(result).not.toBeNull(); + // Should return snapshots after index 5, which is 24 - 6 = 18 snapshots + expect(result!.length).toBe(24 - queryFromIndex - 1); + + // Latest should have all 236 waitpoints + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBe(236); + + // All other snapshots should have 0 waitpoints (optimization) + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + + // Verify the outputs are present and correct size + for (const wp of latest.completedWaitpoints) { + expect(wp.output).toBeDefined(); + // ~100KB output as JSON string + expect(typeof wp.output).toBe("string"); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "direct test: verifies chunked fetching works with 500+ waitpoints", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + // 500 waitpoints requires 5 chunks (100 per chunk) + const scenario = await setupTestScenario(prisma, authenticatedEnvironment, { + totalWaitpoints: 500, + outputSizeKB: 10, // Smaller outputs for faster test + snapshotConfigs: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 500 }, + { status: "EXECUTING", completedWaitpointCount: 500 }, + ], + }); + + const result = await engine.getSnapshotsSince({ + runId: scenario.run.id, + snapshotId: scenario.snapshots[0].id, + }); + + expect(result).not.toBeNull(); + expect(result!.length).toBe(4); + + const latest = result![result!.length - 1]; + expect(latest.completedWaitpoints.length).toBe(500); + + // All other snapshots should be empty + for (let i = 0; i < result!.length - 1; i++) { + expect(result![i].completedWaitpoints.length).toBe(0); + } + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts b/internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts new file mode 100644 index 0000000000..4dd92cdbd4 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/helpers/executionStateMachine.ts @@ -0,0 +1,257 @@ +import { TaskRunExecutionStatus } from "@trigger.dev/database"; + +/** + * Defines valid execution status transitions for the Run Engine 2.0. + * This is a model of the state machine that governs run execution. + */ +export const EXECUTION_STATUS_TRANSITIONS: Record< + TaskRunExecutionStatus, + TaskRunExecutionStatus[] +> = { + RUN_CREATED: ["QUEUED", "DELAYED"], + DELAYED: ["QUEUED"], + QUEUED: ["PENDING_EXECUTING", "QUEUED_EXECUTING"], + QUEUED_EXECUTING: ["PENDING_EXECUTING", "QUEUED"], + PENDING_EXECUTING: ["EXECUTING", "PENDING_CANCEL", "FINISHED", "QUEUED"], + EXECUTING: ["EXECUTING_WITH_WAITPOINTS", "FINISHED", "PENDING_CANCEL", "QUEUED"], + EXECUTING_WITH_WAITPOINTS: ["EXECUTING", "SUSPENDED", "FINISHED", "PENDING_CANCEL"], + SUSPENDED: ["QUEUED", "PENDING_CANCEL", "FINISHED"], + PENDING_CANCEL: ["FINISHED"], + FINISHED: ["QUEUED"], // Retry case +}; + +/** + * Validates if a transition from one status to another is valid. + */ +export function isValidTransition( + from: TaskRunExecutionStatus, + to: TaskRunExecutionStatus +): boolean { + return EXECUTION_STATUS_TRANSITIONS[from]?.includes(to) ?? false; +} + +/** + * Configuration for a snapshot in a test scenario. + */ +export interface SnapshotConfig { + /** The execution status for this snapshot */ + status: TaskRunExecutionStatus; + /** Number of waitpoints completed at this snapshot (cumulative) */ + completedWaitpointCount: number; + /** Whether this snapshot has a checkpoint */ + hasCheckpoint?: boolean; + /** Description for the snapshot */ + description?: string; +} + +/** + * A test scenario for getSnapshotsSince testing. + */ +export interface SnapshotTestScenario { + /** Unique name for the scenario */ + name: string; + /** Description of what this scenario tests */ + description: string; + /** Total number of waitpoints to create */ + totalWaitpoints: number; + /** Size of each waitpoint's output in KB */ + outputSizeKB: number; + /** Configuration for each snapshot to create */ + snapshots: SnapshotConfig[]; + /** Which snapshot index to query "since" (0-based) */ + queryFromIndex: number; + /** Expected number of waitpoints on the latest snapshot returned */ + expectedWaitpointsOnLatest: number; +} + +/** + * Generates test scenarios for comprehensive getSnapshotsSince testing. + * These scenarios cover various edge cases and stress tests. + */ +export function generateTestScenarios(): SnapshotTestScenario[] { + return [ + { + name: "simple_no_waitpoints", + description: "Basic run without any waitpoints", + totalWaitpoints: 0, + outputSizeKB: 0, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "FINISHED", completedWaitpointCount: 0 }, + ], + queryFromIndex: 0, + expectedWaitpointsOnLatest: 0, + }, + { + name: "single_small_waitpoint", + description: "Single waitpoint with small output", + totalWaitpoints: 1, + outputSizeKB: 1, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 1 }, + ], + queryFromIndex: 2, + expectedWaitpointsOnLatest: 1, + }, + { + name: "batch_100_medium", + description: "Medium batch with 100 waitpoints and medium outputs", + totalWaitpoints: 100, + outputSizeKB: 10, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "SUSPENDED", completedWaitpointCount: 100, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 100 }, + { status: "EXECUTING", completedWaitpointCount: 100 }, + { status: "FINISHED", completedWaitpointCount: 100 }, + ], + queryFromIndex: 3, + expectedWaitpointsOnLatest: 100, + }, + { + name: "batch_236_large_zombie_scenario", + description: + "Matches the zombie run scenario: 24 snapshots, 236 waitpoints, 100KB outputs each", + totalWaitpoints: 236, + outputSizeKB: 100, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 150 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 200 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + ], + queryFromIndex: 6, + expectedWaitpointsOnLatest: 236, + }, + { + name: "batch_500_large", + description: "Large batch requiring chunked fetching", + totalWaitpoints: 500, + outputSizeKB: 50, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 250 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 400 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 500 }, + { status: "SUSPENDED", completedWaitpointCount: 500, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 500 }, + { status: "EXECUTING", completedWaitpointCount: 500 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 500 }, + { status: "SUSPENDED", completedWaitpointCount: 500, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 500 }, + { status: "EXECUTING", completedWaitpointCount: 500 }, + ], + queryFromIndex: 5, + expectedWaitpointsOnLatest: 500, + }, + { + name: "system_failure_finished", + description: "Latest snapshot is FINISHED status with completed waitpoints", + totalWaitpoints: 100, + outputSizeKB: 50, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 50 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 100 }, + { status: "EXECUTING", completedWaitpointCount: 100 }, + { status: "FINISHED", completedWaitpointCount: 100 }, + ], + queryFromIndex: 3, + expectedWaitpointsOnLatest: 100, + }, + { + name: "query_from_latest", + description: "Querying from the latest snapshot should return empty array", + totalWaitpoints: 10, + outputSizeKB: 10, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 10 }, + ], + queryFromIndex: 4, // The last snapshot + expectedWaitpointsOnLatest: 0, // No snapshots returned, so no waitpoints + }, + { + name: "requeue_loop", + description: "Multiple QUEUED->PENDING_EXECUTING cycles with waitpoints", + totalWaitpoints: 236, + outputSizeKB: 100, + snapshots: [ + { status: "RUN_CREATED", completedWaitpointCount: 0 }, + { status: "QUEUED", completedWaitpointCount: 0 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 0 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued again + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued again + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING_WITH_WAITPOINTS", completedWaitpointCount: 236 }, + { status: "SUSPENDED", completedWaitpointCount: 236, hasCheckpoint: true }, + { status: "QUEUED", completedWaitpointCount: 236 }, + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "QUEUED", completedWaitpointCount: 236 }, // Requeued + { status: "PENDING_EXECUTING", completedWaitpointCount: 236 }, + { status: "EXECUTING", completedWaitpointCount: 236 }, + ], + queryFromIndex: 7, + expectedWaitpointsOnLatest: 236, + }, + ]; +} diff --git a/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts b/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts new file mode 100644 index 0000000000..5d42add94d --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts @@ -0,0 +1,321 @@ +import { generateFriendlyId, WaitpointId } from "@trigger.dev/core/v3/isomorphic"; +import { + PrismaClient, + TaskRunExecutionSnapshot, + TaskRunExecutionStatus, + Waitpoint, + WaitpointStatus, +} from "@trigger.dev/database"; +import type { AuthenticatedEnvironment } from "../setup.js"; + +/** + * Generates a large output string of the specified size in KB. + * The output is a valid JSON string to simulate realistic waitpoint output. + */ +export function generateLargeOutput(sizeKB: number): string { + if (sizeKB <= 0) return JSON.stringify({ data: "" }); + + // Create a string that's approximately the target size + // Account for JSON wrapper overhead + const targetBytes = sizeKB * 1024; + const overhead = JSON.stringify({ data: "" }).length; + const payloadSize = Math.max(0, targetBytes - overhead); + + // Generate a payload of repeating 'x' characters + const payload = "x".repeat(payloadSize); + return JSON.stringify({ data: payload }); +} + +/** + * Creates waitpoints with specified output sizes for testing. + */ +export async function createWaitpointsWithOutput( + prisma: PrismaClient, + count: number, + outputSizeKB: number, + environmentId: string, + projectId: string +): Promise { + if (count === 0) return []; + + const output = generateLargeOutput(outputSizeKB); + const waitpoints: Waitpoint[] = []; + + // Create waitpoints in batches to avoid overwhelming the database + const batchSize = 50; + for (let i = 0; i < count; i += batchSize) { + const batchCount = Math.min(batchSize, count - i); + const batch = await Promise.all( + Array.from({ length: batchCount }).map(async (_, j) => { + const waitpointIds = WaitpointId.generate(); + return prisma.waitpoint.create({ + data: { + id: waitpointIds.id, + friendlyId: waitpointIds.friendlyId, + type: "MANUAL", + status: "COMPLETED" as WaitpointStatus, + idempotencyKey: `test-idempotency-${waitpointIds.id}`, + userProvidedIdempotencyKey: false, + completedAt: new Date(), + output, + outputType: "application/json", + outputIsError: false, + environmentId, + projectId, + }, + }); + }) + ); + waitpoints.push(...batch); + } + + return waitpoints; +} + +/** + * Creates a snapshot directly in the database for testing purposes. + * This bypasses the normal engine flow to allow creating specific test scenarios. + */ +export async function createTestSnapshot( + prisma: PrismaClient, + { + runId, + status, + environmentId, + environmentType, + projectId, + organizationId, + completedWaitpointIds, + checkpointId, + previousSnapshotId, + batchId, + workerId, + runnerId, + attemptNumber, + }: { + runId: string; + status: TaskRunExecutionStatus; + environmentId: string; + environmentType: "PRODUCTION" | "STAGING" | "DEVELOPMENT" | "PREVIEW"; + projectId: string; + organizationId: string; + completedWaitpointIds?: string[]; + checkpointId?: string; + previousSnapshotId?: string; + batchId?: string; + workerId?: string; + runnerId?: string; + attemptNumber?: number; + } +): Promise { + // Determine run status based on execution status + const runStatus = getRunStatusFromExecutionStatus(status); + + const snapshot = await prisma.taskRunExecutionSnapshot.create({ + data: { + engine: "V2", + executionStatus: status, + description: `Test snapshot: ${status}`, + previousSnapshotId, + runId, + runStatus, + attemptNumber, + batchId, + environmentId, + environmentType, + projectId, + organizationId, + checkpointId, + workerId, + runnerId, + isValid: true, + completedWaitpoints: completedWaitpointIds + ? { + connect: completedWaitpointIds.map((id) => ({ id })), + } + : undefined, + completedWaitpointOrder: completedWaitpointIds ?? [], + }, + }); + + // Small delay to ensure different createdAt timestamps + await new Promise((resolve) => setTimeout(resolve, 5)); + + return snapshot; +} + +/** + * Maps execution status to run status for test snapshot creation. + */ +function getRunStatusFromExecutionStatus( + status: TaskRunExecutionStatus +): "PENDING" | "EXECUTING" | "WAITING_FOR_DEPLOY" | "COMPLETED_SUCCESSFULLY" | "SYSTEM_FAILURE" { + switch (status) { + case "RUN_CREATED": + case "QUEUED": + case "QUEUED_EXECUTING": + case "PENDING_EXECUTING": + case "DELAYED": + return "PENDING"; + case "EXECUTING": + case "EXECUTING_WITH_WAITPOINTS": + case "SUSPENDED": + case "PENDING_CANCEL": + return "EXECUTING"; + case "FINISHED": + return "COMPLETED_SUCCESSFULLY"; + default: + return "PENDING"; + } +} + +/** + * Creates a checkpoint for testing suspended snapshots. + */ +export async function createTestCheckpoint( + prisma: PrismaClient, + { + runId, + environmentId, + projectId, + }: { + runId: string; + environmentId: string; + projectId: string; + } +) { + return prisma.taskRunCheckpoint.create({ + data: { + friendlyId: generateFriendlyId("checkpoint"), + type: "DOCKER", + location: `s3://test-bucket/checkpoints/${runId}`, + imageRef: `test-image:${runId}`, + reason: "WAIT_FOR_DURATION", + runtimeEnvironment: { + connect: { id: environmentId }, + }, + project: { + connect: { id: projectId }, + }, + }, + }); +} + +/** + * Interface for a complete test scenario setup result. + */ +export interface TestScenarioResult { + run: { + id: string; + friendlyId: string; + }; + snapshots: TaskRunExecutionSnapshot[]; + waitpoints: Waitpoint[]; + checkpoints: Array<{ id: string }>; +} + +/** + * Sets up a complete test scenario with run, snapshots, waitpoints, and checkpoints. + * This creates the full database state needed for testing getSnapshotsSince. + */ +export async function setupTestScenario( + prisma: PrismaClient, + environment: AuthenticatedEnvironment, + { + totalWaitpoints, + outputSizeKB, + snapshotConfigs, + }: { + totalWaitpoints: number; + outputSizeKB: number; + snapshotConfigs: Array<{ + status: TaskRunExecutionStatus; + completedWaitpointCount: number; + hasCheckpoint?: boolean; + }>; + } +): Promise { + // Create waitpoints first + const waitpoints = await createWaitpointsWithOutput( + prisma, + totalWaitpoints, + outputSizeKB, + environment.id, + environment.project.id + ); + + // Create the run + const runFriendlyId = generateFriendlyId("run"); + const run = await prisma.taskRun.create({ + data: { + friendlyId: runFriendlyId, + engine: "V2", + status: "PENDING", + runtimeEnvironmentId: environment.id, + environmentType: environment.type, + organizationId: environment.organization.id, + projectId: environment.project.id, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + traceId: `trace_${runFriendlyId}`, + spanId: `span_${runFriendlyId}`, + context: {}, + traceContext: {}, + isTest: false, + queue: "task/test-task", + workerQueue: "main", + }, + }); + + // Create snapshots in order + const snapshots: TaskRunExecutionSnapshot[] = []; + const checkpoints: Array<{ id: string }> = []; + let previousSnapshotId: string | undefined; + let attemptNumber = 0; + + for (const config of snapshotConfigs) { + // Create checkpoint if needed + let checkpointId: string | undefined; + if (config.hasCheckpoint) { + const checkpoint = await createTestCheckpoint(prisma, { + runId: run.id, + environmentId: environment.id, + projectId: environment.project.id, + }); + checkpointId = checkpoint.id; + checkpoints.push({ id: checkpoint.id }); + } + + // Increment attempt number when entering EXECUTING + if (config.status === "EXECUTING" || config.status === "PENDING_EXECUTING") { + attemptNumber++; + } + + // Get the waitpoint IDs that should be "completed" at this snapshot + const completedWaitpointIds = waitpoints.slice(0, config.completedWaitpointCount).map((w) => w.id); + + const snapshot = await createTestSnapshot(prisma, { + runId: run.id, + status: config.status, + environmentId: environment.id, + environmentType: environment.type, + projectId: environment.project.id, + organizationId: environment.organization.id, + completedWaitpointIds, + checkpointId, + previousSnapshotId, + attemptNumber, + }); + + snapshots.push(snapshot); + previousSnapshotId = snapshot.id; + } + + return { + run: { id: run.id, friendlyId: runFriendlyId }, + snapshots, + waitpoints, + checkpoints, + }; +} From 52fff9f870229b5856c1bab61211cccd3f0c5a95 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 29 Jan 2026 17:46:22 +0000 Subject: [PATCH 2/2] a couple of tweaks and test updates --- .../run-engine/src/engine/tests/getSnapshotsSince.test.ts | 2 +- .../src/engine/tests/helpers/snapshotTestHelpers.ts | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index dd291af4a0..4352e72686 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -213,7 +213,7 @@ describe("RunEngine getSnapshotsSince", () => { // The latest snapshot should have completedWaitpoints const latest = result![result!.length - 1]; - expect(latest.completedWaitpoints.length).toBeGreaterThanOrEqual(0); + expect(latest.completedWaitpoints.length).toBeGreaterThan(0); // Earlier snapshots should have empty waitpoints (optimization) for (let i = 0; i < result!.length - 1; i++) { diff --git a/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts b/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts index 5d42add94d..f981f35145 100644 --- a/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts +++ b/internal-packages/run-engine/src/engine/tests/helpers/snapshotTestHelpers.ts @@ -287,8 +287,9 @@ export async function setupTestScenario( checkpoints.push({ id: checkpoint.id }); } - // Increment attempt number when entering EXECUTING - if (config.status === "EXECUTING" || config.status === "PENDING_EXECUTING") { + // Increment attempt number when entering a new execution attempt + // PENDING_EXECUTING is the entry point - EXECUTING follows within the same attempt + if (config.status === "PENDING_EXECUTING") { attemptNumber++; }