diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index a224e5a86b..4d154546ec 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -31,7 +31,15 @@ export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot { type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{ include: { checkpoint: true; - completedWaitpoints: true; + completedWaitpoints: { + include: { + completedByTaskRun: { + select: { + taskIdentifier: true; + }; + }; + }; + }; }; }>; @@ -57,7 +65,9 @@ function enhanceExecutionSnapshot( */ function enhanceExecutionSnapshotWithWaitpoints( snapshot: ExecutionSnapshotWithCheckpoint, - waitpoints: Waitpoint[], + waitpoints: (Waitpoint & { + completedByTaskRun: { taskIdentifier: string | null } | null; + })[], completedWaitpointOrder: string[] ): EnhancedExecutionSnapshot { return { @@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints( w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined, completedByTaskRun: w.completedByTaskRunId ? { - id: w.completedByTaskRunId, - friendlyId: RunId.toFriendlyId(w.completedByTaskRunId), - batch: snapshot.batchId - ? { - id: snapshot.batchId, - friendlyId: BatchId.toFriendlyId(snapshot.batchId), - } - : undefined, - } + id: w.completedByTaskRunId, + friendlyId: RunId.toFriendlyId(w.completedByTaskRunId), + batch: snapshot.batchId + ? { + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } + : undefined, + taskIdentifier: w.completedByTaskRun?.taskIdentifier ?? undefined, + } : undefined, completedAfter: w.completedAfter ?? undefined, completedByBatch: w.completedByBatchId ? { - id: w.completedByBatchId, - friendlyId: BatchId.toFriendlyId(w.completedByBatchId), - } + id: w.completedByBatchId, + friendlyId: BatchId.toFriendlyId(w.completedByBatchId), + } : undefined, output: w.output ?? undefined, outputType: w.outputType, @@ -137,14 +148,23 @@ async function getSnapshotWaitpointIds( async function fetchWaitpointsInChunks( prisma: PrismaClientOrTransaction, waitpointIds: string[] -): Promise { +): Promise<(Waitpoint & { completedByTaskRun: { taskIdentifier: string | null } | null })[]> { if (waitpointIds.length === 0) return []; - const allWaitpoints: Waitpoint[] = []; + const allWaitpoints: (Waitpoint & { + completedByTaskRun: { taskIdentifier: string | null } | null; + })[] = []; for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) { const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE); const waitpoints = await prisma.waitpoint.findMany({ where: { id: { in: chunk } }, + include: { + completedByTaskRun: { + select: { + taskIdentifier: true, + }, + }, + }, }); allWaitpoints.push(...waitpoints); } @@ -159,7 +179,15 @@ export async function getLatestExecutionSnapshot( const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({ where: { runId, isValid: true }, include: { - completedWaitpoints: true, + completedWaitpoints: { + include: { + completedByTaskRun: { + select: { + taskIdentifier: true, + }, + }, + }, + }, checkpoint: true, }, orderBy: { createdAt: "desc" }, @@ -179,7 +207,15 @@ export async function getExecutionSnapshotCompletedWaitpoints( const waitpoints = await prisma.taskRunExecutionSnapshot.findFirst({ where: { id: snapshotId }, include: { - completedWaitpoints: true, + completedWaitpoints: { + include: { + completedByTaskRun: { + select: { + taskIdentifier: true, + }, + }, + }, + }, }, }); @@ -233,19 +269,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): }, batch: snapshot.batchId ? { - id: snapshot.batchId, - friendlyId: BatchId.toFriendlyId(snapshot.batchId), - } + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } : undefined, checkpoint: snapshot.checkpoint ? { - id: snapshot.checkpoint.id, - friendlyId: snapshot.checkpoint.friendlyId, - type: snapshot.checkpoint.type, - location: snapshot.checkpoint.location, - imageRef: snapshot.checkpoint.imageRef, - reason: snapshot.checkpoint.reason ?? undefined, - } + id: snapshot.checkpoint.id, + friendlyId: snapshot.checkpoint.friendlyId, + type: snapshot.checkpoint.type, + location: snapshot.checkpoint.location, + imageRef: snapshot.checkpoint.imageRef, + reason: snapshot.checkpoint.reason ?? undefined, + } : undefined, completedWaitpoints: snapshot.completedWaitpoints, }; diff --git a/packages/core/src/v3/runtime/sharedRuntimeManager.test.ts b/packages/core/src/v3/runtime/sharedRuntimeManager.test.ts new file mode 100644 index 0000000000..3dc3b4d5b8 --- /dev/null +++ b/packages/core/src/v3/runtime/sharedRuntimeManager.test.ts @@ -0,0 +1,121 @@ +import { describe, expect, it } from "vitest"; +import { SharedRuntimeManager } from "./sharedRuntimeManager.js"; +import { CompletedWaitpoint } from "../schemas/index.js"; + +describe("SharedRuntimeManager", () => { + const mockIpc = { + send: () => { }, + } as any; + + const manager = new SharedRuntimeManager(mockIpc, false); + + // Access private method + const waitpointToResult = (manager as any).waitpointToTaskRunExecutionResult.bind(manager); + + describe("waitpointToTaskRunExecutionResult", () => { + it("should use the taskIdentifier from the waitpoint if present (success)", () => { + const waitpoint: CompletedWaitpoint = { + id: "wp_1", + friendlyId: "wp_1", + type: "RUN", + completedAt: new Date(), + outputIsError: false, + output: JSON.stringify({ foo: "bar" }), + outputType: "application/json", + completedByTaskRun: { + id: "run_1", + friendlyId: "run_1", + taskIdentifier: "my-task", + }, + }; + + const result = waitpointToResult(waitpoint); + + expect(result).toEqual({ + ok: true, + id: "run_1", + taskIdentifier: "my-task", + output: JSON.stringify({ foo: "bar" }), + outputType: "application/json", + }); + }); + + it("should default taskIdentifier to 'unknown' if missing (success)", () => { + const waitpoint: CompletedWaitpoint = { + id: "wp_2", + friendlyId: "wp_2", + type: "RUN", + completedAt: new Date(), + outputIsError: false, + output: JSON.stringify({ foo: "bar" }), + outputType: "application/json", + completedByTaskRun: { + id: "run_2", + friendlyId: "run_2", + // database/legacy object missing taskIdentifier + } as any, + }; + + const result = waitpointToResult(waitpoint); + + expect(result).toEqual({ + ok: true, + id: "run_2", + taskIdentifier: "unknown", + output: JSON.stringify({ foo: "bar" }), + outputType: "application/json", + }); + }); + + it("should use the taskIdentifier from the waitpoint if present (failure)", () => { + const waitpoint: CompletedWaitpoint = { + id: "wp_3", + friendlyId: "wp_3", + type: "RUN", + completedAt: new Date(), + outputIsError: true, + output: JSON.stringify({ message: "Boom" }), + outputType: "application/json", + completedByTaskRun: { + id: "run_3", + friendlyId: "run_3", + taskIdentifier: "my-failed-task", + }, + }; + + const result = waitpointToResult(waitpoint); + + expect(result).toEqual({ + ok: false, + id: "run_3", + taskIdentifier: "my-failed-task", + error: { message: "Boom" }, + }); + }); + + it("should default taskIdentifier to 'unknown' if missing (failure)", () => { + const waitpoint: CompletedWaitpoint = { + id: "wp_4", + friendlyId: "wp_4", + type: "RUN", + completedAt: new Date(), + outputIsError: true, + output: JSON.stringify({ message: "Boom" }), + outputType: "application/json", + completedByTaskRun: { + id: "run_4", + friendlyId: "run_4", + } as any, + }; + + const result = waitpointToResult(waitpoint); + + expect(result).toEqual({ + ok: false, + id: "run_4", + taskIdentifier: "unknown", + error: { message: "Boom" }, + }); + }); + }); +}); diff --git a/packages/core/src/v3/runtime/sharedRuntimeManager.ts b/packages/core/src/v3/runtime/sharedRuntimeManager.ts index 09c718c1f6..2cfb08072e 100644 --- a/packages/core/src/v3/runtime/sharedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/sharedRuntimeManager.ts @@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager { return { ok: false, id: waitpoint.completedByTaskRun.friendlyId, + taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown", error: waitpoint.output ? JSON.parse(waitpoint.output) : { - type: "STRING_ERROR", - message: "Missing error output", - }, + type: "STRING_ERROR", + message: "Missing error output", + }, } satisfies TaskRunFailedExecutionResult; } else { return { ok: true, id: waitpoint.completedByTaskRun.friendlyId, + taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier ?? "unknown", output: waitpoint.output, outputType: waitpoint.outputType ?? "application/json", } satisfies TaskRunSuccessfulExecutionResult; diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index e4057e7ca6..3e01ee7dc4 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -81,6 +81,7 @@ export const CompletedWaitpoint = z.object({ .object({ id: z.string(), friendlyId: z.string(), + taskIdentifier: z.string().optional(), /** If the run has an associated batch */ batch: z .object({