diff --git a/.changeset/fix-batch-taskidentifier-unknown.md b/.changeset/fix-batch-taskidentifier-unknown.md new file mode 100644 index 0000000000..e0dfc47741 --- /dev/null +++ b/.changeset/fix-batch-taskidentifier-unknown.md @@ -0,0 +1,15 @@ +--- +"@trigger.dev/core": patch +"run-engine": patch +--- + +fix: taskIdentifier shows "unknown" in batch.triggerAndWait and batch.triggerByTaskAndWait results + +When using `batch.triggerAndWait()` or `batch.triggerByTaskAndWait()`, the `run.taskIdentifier` in the results was showing as "unknown" instead of the actual task identifier (e.g., "generate-audio" or "generate-scene"). + +This fix: +- Adds `taskIdentifier` field to the `completedByTaskRun` schema in `runEngine.ts` +- Updates `executionSnapshotSystem.ts` to include `taskIdentifier` when fetching waitpoints by joining with the TaskRun table +- Updates `sharedRuntimeManager.ts` to pass through `taskIdentifier` in the execution result + +Fixes #2942 diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index a224e5a86b..94de281062 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -16,6 +16,11 @@ import { SystemResources } from "./systems.js"; /** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */ const WAITPOINT_CHUNK_SIZE = 100; +/** Waitpoint with optional completedByTaskRun info including taskIdentifier */ +type WaitpointWithTaskRun = Waitpoint & { + completedByTaskRun: { taskIdentifier: string } | null; +}; + export type ExecutionSnapshotSystemOptions = { resources: SystemResources; heartbeatTimeouts: HeartbeatTimeouts; @@ -57,7 +62,7 @@ function enhanceExecutionSnapshot( */ function enhanceExecutionSnapshotWithWaitpoints( snapshot: ExecutionSnapshotWithCheckpoint, - waitpoints: Waitpoint[], + waitpoints: WaitpointWithTaskRun[] | Waitpoint[], completedWaitpointOrder: string[] ): EnhancedExecutionSnapshot { return { @@ -78,6 +83,11 @@ function enhanceExecutionSnapshotWithWaitpoints( indexes.push(undefined); } + // Extract taskIdentifier from completedByTaskRun if available + const taskIdentifier = 'completedByTaskRun' in w && w.completedByTaskRun + ? w.completedByTaskRun.taskIdentifier + : undefined; + return indexes.map((index) => { return { id: w.id, @@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints( w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined, completedByTaskRun: w.completedByTaskRunId ? { - id: w.completedByTaskRunId, - friendlyId: RunId.toFriendlyId(w.completedByTaskRunId), - batch: snapshot.batchId - ? { - id: snapshot.batchId, - friendlyId: BatchId.toFriendlyId(snapshot.batchId), - } - : undefined, - } + id: w.completedByTaskRunId, + friendlyId: RunId.toFriendlyId(w.completedByTaskRunId), + taskIdentifier, + batch: snapshot.batchId + ? { + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } + : undefined, + } : undefined, completedAfter: w.completedAfter ?? undefined, completedByBatch: w.completedByBatchId ? { - id: w.completedByBatchId, - friendlyId: BatchId.toFriendlyId(w.completedByBatchId), - } + id: w.completedByBatchId, + friendlyId: BatchId.toFriendlyId(w.completedByBatchId), + } : undefined, output: w.output ?? undefined, outputType: w.outputType, @@ -137,14 +148,19 @@ async function getSnapshotWaitpointIds( async function fetchWaitpointsInChunks( prisma: PrismaClientOrTransaction, waitpointIds: string[] -): Promise { +): Promise { if (waitpointIds.length === 0) return []; - const allWaitpoints: Waitpoint[] = []; + const allWaitpoints: WaitpointWithTaskRun[] = []; for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) { const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE); const waitpoints = await prisma.waitpoint.findMany({ where: { id: { in: chunk } }, + include: { + completedByTaskRun: { + select: { taskIdentifier: true }, + }, + }, }); allWaitpoints.push(...waitpoints); } @@ -233,19 +249,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): }, batch: snapshot.batchId ? { - id: snapshot.batchId, - friendlyId: BatchId.toFriendlyId(snapshot.batchId), - } + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } : undefined, checkpoint: snapshot.checkpoint ? { - id: snapshot.checkpoint.id, - friendlyId: snapshot.checkpoint.friendlyId, - type: snapshot.checkpoint.type, - location: snapshot.checkpoint.location, - imageRef: snapshot.checkpoint.imageRef, - reason: snapshot.checkpoint.reason ?? undefined, - } + id: snapshot.checkpoint.id, + friendlyId: snapshot.checkpoint.friendlyId, + type: snapshot.checkpoint.type, + location: snapshot.checkpoint.location, + imageRef: snapshot.checkpoint.imageRef, + reason: snapshot.checkpoint.reason ?? undefined, + } : undefined, completedWaitpoints: snapshot.completedWaitpoints, }; diff --git a/packages/core/src/v3/runtime/sharedRuntimeManager.ts b/packages/core/src/v3/runtime/sharedRuntimeManager.ts index 09c718c1f6..f9b1cbf164 100644 --- a/packages/core/src/v3/runtime/sharedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/sharedRuntimeManager.ts @@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager { return { ok: false, id: waitpoint.completedByTaskRun.friendlyId, + taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier, error: waitpoint.output ? JSON.parse(waitpoint.output) : { - type: "STRING_ERROR", - message: "Missing error output", - }, + type: "STRING_ERROR", + message: "Missing error output", + }, } satisfies TaskRunFailedExecutionResult; } else { return { ok: true, id: waitpoint.completedByTaskRun.friendlyId, + taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier, output: waitpoint.output, outputType: waitpoint.outputType ?? "application/json", } satisfies TaskRunSuccessfulExecutionResult; diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index e4057e7ca6..1697b94781 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -81,6 +81,8 @@ export const CompletedWaitpoint = z.object({ .object({ id: z.string(), friendlyId: z.string(), + /** The task identifier of the completing run */ + taskIdentifier: z.string().optional(), /** If the run has an associated batch */ batch: z .object({