Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .changeset/fix-batch-taskidentifier-unknown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"@trigger.dev/core": patch
"run-engine": patch
---

fix: taskIdentifier shows "unknown" in batch.triggerAndWait and batch.triggerByTaskAndWait results

When using `batch.triggerAndWait()` or `batch.triggerByTaskAndWait()`, the `run.taskIdentifier` in the results was showing as "unknown" instead of the actual task identifier (e.g., "generate-audio" or "generate-scene").

This fix:
- Adds `taskIdentifier` field to the `completedByTaskRun` schema in `runEngine.ts`
- Updates `executionSnapshotSystem.ts` to include `taskIdentifier` when fetching waitpoints by joining with the TaskRun table
- Updates `sharedRuntimeManager.ts` to pass through `taskIdentifier` in the execution result

Fixes #2942

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Incomplete fix: getLatestExecutionSnapshot doesn't include completedByTaskRun relation

The PR aims to fix taskIdentifier showing as "unknown" but only partially addresses the issue. The fetchWaitpointsInChunks function (used by getExecutionSnapshotsSince) was updated to include the completedByTaskRun relation, but getLatestExecutionSnapshot was not.

Click to expand

Root Cause

At executionSnapshotSystem.ts:175-180, the query uses completedWaitpoints: true which doesn't include the nested completedByTaskRun relation:

const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
  where: { runId, isValid: true },
  include: {
    completedWaitpoints: true,  // Missing completedByTaskRun include
    checkpoint: true,
  },
  ...
});

Impact

The getLatestExecutionSnapshot function is used extensively throughout the codebase including:

  • getRunExecutionData (index.ts:1340) - used by engine.v1.dev.runs.$runFriendlyId.snapshots.latest.ts and workerGroupTokenService.server.ts
  • dequeueSystem.ts:170 - which passes completedWaitpoints to DequeuedMessage
  • Many other critical paths

When these code paths are used, taskIdentifier will be undefined because the completedByTaskRun relation data is never fetched.

Actual vs Expected

  • Expected: taskIdentifier should contain the actual task name (e.g., "generate-audio")
  • Actual: taskIdentifier will be undefined when using getLatestExecutionSnapshot

(Refers to line 178)

Recommendation: Update the query to include the completedByTaskRun relation:

include: {
  completedWaitpoints: {
    include: {
      completedByTaskRun: {
        select: { taskIdentifier: true },
      },
    },
  },
  checkpoint: true,
},

Also update the ExecutionSnapshotWithCheckAndWaitpoints type accordingly.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import { SystemResources } from "./systems.js";
/** Chunk size for fetching waitpoints to avoid NAPI string conversion limits */
const WAITPOINT_CHUNK_SIZE = 100;

/** Waitpoint with optional completedByTaskRun info including taskIdentifier */
type WaitpointWithTaskRun = Waitpoint & {
completedByTaskRun: { taskIdentifier: string } | null;
};

export type ExecutionSnapshotSystemOptions = {
resources: SystemResources;
heartbeatTimeouts: HeartbeatTimeouts;
Expand Down Expand Up @@ -57,7 +62,7 @@ function enhanceExecutionSnapshot(
*/
function enhanceExecutionSnapshotWithWaitpoints(
snapshot: ExecutionSnapshotWithCheckpoint,
waitpoints: Waitpoint[],
waitpoints: WaitpointWithTaskRun[] | Waitpoint[],
completedWaitpointOrder: string[]
): EnhancedExecutionSnapshot {
return {
Expand All @@ -78,6 +83,11 @@ function enhanceExecutionSnapshotWithWaitpoints(
indexes.push(undefined);
}

// Extract taskIdentifier from completedByTaskRun if available
const taskIdentifier = 'completedByTaskRun' in w && w.completedByTaskRun
? w.completedByTaskRun.taskIdentifier
: undefined;

return indexes.map((index) => {
return {
id: w.id,
Expand All @@ -89,22 +99,23 @@ function enhanceExecutionSnapshotWithWaitpoints(
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
completedByTaskRun: w.completedByTaskRunId
? {
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
id: w.completedByTaskRunId,
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
taskIdentifier,
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
}
: undefined,
completedAfter: w.completedAfter ?? undefined,
completedByBatch: w.completedByBatchId
? {
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
id: w.completedByBatchId,
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
}
: undefined,
output: w.output ?? undefined,
outputType: w.outputType,
Expand Down Expand Up @@ -137,14 +148,19 @@ async function getSnapshotWaitpointIds(
async function fetchWaitpointsInChunks(
prisma: PrismaClientOrTransaction,
waitpointIds: string[]
): Promise<Waitpoint[]> {
): Promise<WaitpointWithTaskRun[]> {
if (waitpointIds.length === 0) return [];

const allWaitpoints: Waitpoint[] = [];
const allWaitpoints: WaitpointWithTaskRun[] = [];
for (let i = 0; i < waitpointIds.length; i += WAITPOINT_CHUNK_SIZE) {
const chunk = waitpointIds.slice(i, i + WAITPOINT_CHUNK_SIZE);
const waitpoints = await prisma.waitpoint.findMany({
where: { id: { in: chunk } },
include: {
completedByTaskRun: {
select: { taskIdentifier: true },
},
},
});
allWaitpoints.push(...waitpoints);
}
Expand Down Expand Up @@ -233,19 +249,19 @@ export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot):
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
Expand Down
8 changes: 5 additions & 3 deletions packages/core/src/v3/runtime/sharedRuntimeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,19 @@ export class SharedRuntimeManager implements RuntimeManager {
return {
ok: false,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier,
error: waitpoint.output
? JSON.parse(waitpoint.output)
: {
type: "STRING_ERROR",
message: "Missing error output",
},
type: "STRING_ERROR",
message: "Missing error output",
},
} satisfies TaskRunFailedExecutionResult;
} else {
return {
ok: true,
id: waitpoint.completedByTaskRun.friendlyId,
taskIdentifier: waitpoint.completedByTaskRun.taskIdentifier,
output: waitpoint.output,
outputType: waitpoint.outputType ?? "application/json",
} satisfies TaskRunSuccessfulExecutionResult;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export const CompletedWaitpoint = z.object({
.object({
id: z.string(),
friendlyId: z.string(),
/** The task identifier of the completing run */
taskIdentifier: z.string().optional(),
/** If the run has an associated batch */
batch: z
.object({
Expand Down