Skip to content

Commit 4e9f8ca

Browse files
committed
Pass batch ids through to the run engine when triggering
1 parent 2d4c67c commit 4e9f8ca

File tree

1 file changed

+38
-26
lines changed

1 file changed

+38
-26
lines changed

apps/webapp/app/v3/services/triggerTaskV2.server.ts

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,35 @@
1+
import { RunEngine } from "@internal/run-engine";
2+
import { RunDuplicateIdempotencyKeyError } from "@internal/run-engine/engine";
13
import {
24
IOPacket,
5+
packetRequiresOffloading,
36
QueueOptions,
47
SemanticInternalAttributes,
58
TriggerTaskRequestBody,
6-
packetRequiresOffloading,
79
} from "@trigger.dev/core/v3";
10+
import { BatchId, RunId, stringifyDuration } from "@trigger.dev/core/v3/apps";
11+
import { Prisma, TaskRun } from "@trigger.dev/database";
812
import { env } from "~/env.server";
13+
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
914
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1015
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
16+
import { logger } from "~/services/logger.server";
17+
import { getEntitlement } from "~/services/platform.v3.server";
18+
import { parseDelay } from "~/utils/delays";
19+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
20+
import { handleMetadataPacket } from "~/utils/packets";
1121
import { sanitizeQueueName } from "~/v3/marqs/index.server";
12-
import {
13-
CreatableEvent,
14-
eventRepository,
15-
extractContextFromCarrier,
16-
} from "../eventRepository.server";
22+
import { eventRepository } from "../eventRepository.server";
23+
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
1724
import { uploadPacketToObjectStore } from "../r2.server";
25+
import { isFinalRunStatus } from "../taskStatus";
1826
import { startActiveSpan } from "../tracer.server";
19-
import { getEntitlement } from "~/services/platform.v3.server";
27+
import { clampMaxDuration } from "../utils/maxDuration";
2028
import { ServiceValidationError, WithRunEngine } from "./baseService.server";
21-
import { logger } from "~/services/logger.server";
22-
import { isFinalRunStatus } from "../taskStatus";
23-
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
24-
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
25-
import { handleMetadataPacket } from "~/utils/packets";
26-
import { WorkerGroupService } from "./worker/workerGroupService.server";
27-
import { parseDelay } from "~/utils/delays";
28-
import { RunId, stringifyDuration } from "@trigger.dev/core/v3/apps";
2929
import { OutOfEntitlementError, TriggerTaskServiceOptions } from "./triggerTask.server";
30-
import { Prisma } from "@trigger.dev/database";
31-
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
32-
import { clampMaxDuration } from "../utils/maxDuration";
33-
import { RunEngine } from "@internal/run-engine";
34-
import { Attributes } from "@opentelemetry/api";
35-
import { safeJsonParse } from "~/utils/json";
36-
import { getNowInNanoseconds } from "~/utils/taskEvent";
30+
import { WorkerGroupService } from "./worker/workerGroupService.server";
31+
32+
type Result = TaskRun & { isCached: boolean };
3733

3834
/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */
3935
export class TriggerTaskServiceV2 extends WithRunEngine {
@@ -47,7 +43,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
4743
environment: AuthenticatedEnvironment;
4844
body: TriggerTaskRequestBody;
4945
options?: TriggerTaskServiceOptions;
50-
}) {
46+
}): Promise<Result | undefined> {
5147
return await this.traceWithEnv("call()", environment, async (span) => {
5248
span.setAttribute("taskId", taskId);
5349

@@ -119,7 +115,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
119115
icon: "task-cached",
120116
},
121117
runIsTest: body.options?.test ?? false,
122-
batchId: options.batchId,
118+
batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined,
123119
idempotencyKey,
124120
runId: existingRun.friendlyId,
125121
},
@@ -146,6 +142,12 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
146142
runId: RunId.fromFriendlyId(body.options!.parentRunId!),
147143
waitpoints: existingRun.associatedWaitpoint!.id,
148144
spanIdToComplete: event.spanId,
145+
batch: options?.batchId
146+
? {
147+
id: options.batchId,
148+
index: options.batchIndex ?? 0,
149+
}
150+
: undefined,
149151
environmentId: environment.id,
150152
projectId: environment.projectId,
151153
tx: this._prisma,
@@ -246,7 +248,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
246248
icon: options.customIcon ?? "task",
247249
},
248250
runIsTest: body.options?.test ?? false,
249-
batchId: options.batchId,
251+
batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined,
250252
idempotencyKey,
251253
},
252254
incomplete: true,
@@ -350,7 +352,12 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
350352
oneTimeUseToken: options.oneTimeUseToken,
351353
parentTaskRunId: parentRun?.id,
352354
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
353-
batchId: body.options?.parentBatch ?? undefined,
355+
batch: options?.batchId
356+
? {
357+
id: options.batchId,
358+
index: options.batchIndex ?? 0,
359+
}
360+
: undefined,
354361
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
355362
depth,
356363
metadata: metadataPacket?.data,
@@ -386,6 +393,11 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
386393
}
387394
);
388395
} catch (error) {
396+
if (error instanceof RunDuplicateIdempotencyKeyError) {
397+
//retry calling this function, because this time it will return the idempotent run
398+
return await this.call({ taskId, environment, body, options });
399+
}
400+
389401
// Detect a prisma transaction Unique constraint violation
390402
if (error instanceof Prisma.PrismaClientKnownRequestError) {
391403
logger.debug("TriggerTask: Prisma transaction error", {

0 commit comments

Comments
 (0)