Skip to content

Commit bb53fc5

Browse files
committed
Some fixes from badge conflict resolution
1 parent 67f2f8b commit bb53fc5

File tree

2 files changed

+67
-40
lines changed

2 files changed

+67
-40
lines changed

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 65 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
11
import {
22
IOPacket,
3+
packetRequiresOffloading,
34
QueueOptions,
45
SemanticInternalAttributes,
6+
taskRunErrorEnhancer,
7+
taskRunErrorToString,
58
TriggerTaskRequestBody,
6-
packetRequiresOffloading,
79
} from "@trigger.dev/core/v3";
10+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/apps";
11+
import { Prisma, TaskRun } from "@trigger.dev/database";
812
import { env } from "~/env.server";
13+
import { sanitizeQueueName } from "~/models/taskQueue.server";
14+
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
915
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1016
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
11-
import { workerQueue } from "~/services/worker.server";
17+
import { logger } from "~/services/logger.server";
18+
import { getEntitlement } from "~/services/platform.v3.server";
19+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
20+
import { handleMetadataPacket } from "~/utils/packets";
1221
import { marqs } from "~/v3/marqs/index.server";
1322
import { eventRepository } from "../eventRepository.server";
1423
import { generateFriendlyId } from "../friendlyIdentifiers";
15-
import { uploadPacketToObjectStore } from "../r2.server";
16-
import { startActiveSpan } from "../tracer.server";
17-
import { getEntitlement } from "~/services/platform.v3.server";
18-
import { BaseService, ServiceValidationError } from "./baseService.server";
19-
import { logger } from "~/services/logger.server";
20-
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
21-
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2224
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
23-
import { handleMetadataPacket } from "~/utils/packets";
24-
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/apps";
25-
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
2625
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
26+
import { uploadPacketToObjectStore } from "../r2.server";
27+
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
28+
import { startActiveSpan } from "../tracer.server";
2729
import { clampMaxDuration } from "../utils/maxDuration";
28-
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
29-
import { Prisma, TaskRun } from "@trigger.dev/database";
30-
import { sanitizeQueueName } from "~/models/taskQueue.server";
30+
import { BaseService, ServiceValidationError } from "./baseService.server";
3131
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
32+
import { enqueueRun } from "./enqueueRun.server";
33+
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
3234
import { getTaskEventStore } from "../taskEventStore.server";
3335

3436
export type TriggerTaskServiceOptions = {
@@ -187,6 +189,7 @@ export class TriggerTaskService extends BaseService {
187189
rootTaskRunId: true,
188190
depth: true,
189191
queueTimestamp: true,
192+
queue: true,
190193
},
191194
},
192195
},
@@ -244,6 +247,7 @@ export class TriggerTaskService extends BaseService {
244247
rootTaskRunId: true,
245248
depth: true,
246249
queueTimestamp: true,
250+
queue: true,
247251
},
248252
},
249253
},
@@ -296,7 +300,7 @@ export class TriggerTaskService extends BaseService {
296300
: undefined;
297301

298302
try {
299-
return await eventRepository.traceEvent(
303+
const result = await eventRepository.traceEvent(
300304
taskId,
301305
{
302306
context: options.traceContext,
@@ -556,40 +560,61 @@ export class TriggerTaskService extends BaseService {
556560
this._prisma
557561
);
558562

559-
// If this is a triggerAndWait or batchTriggerAndWait,
560-
// we need to add the parent run to the reserve concurrency set
561-
// to free up concurrency for the children to run
562-
if (dependentAttempt) {
563-
await marqs?.reserveConcurrency(dependentAttempt.taskRun.id);
564-
} else if (dependentBatchRun?.dependentTaskAttempt) {
565-
await marqs?.reserveConcurrency(dependentBatchRun.dependentTaskAttempt.taskRun.id);
566-
}
567-
568563
if (!run) {
569564
return;
570565
}
571566

572-
// We need to enqueue the task run into the appropriate queue. This is done after the tx completes to prevent a race condition where the task run hasn't been created yet by the time we dequeue.
567+
// Now enqueue the run if it's not delayed
573568
if (run.status === "PENDING") {
574-
await marqs?.enqueueMessage(
575-
environment,
576-
run.queue,
577-
run.id,
578-
{
579-
type: "EXECUTE",
580-
taskIdentifier: taskId,
581-
projectId: environment.projectId,
582-
environmentId: environment.id,
583-
environmentType: environment.type,
584-
},
585-
body.options?.concurrencyKey,
586-
run.queueTimestamp ?? undefined
587-
);
569+
const enqueueResult = await enqueueRun({
570+
env: environment,
571+
run,
572+
dependentRun:
573+
dependentAttempt?.taskRun ?? dependentBatchRun?.dependentTaskAttempt?.taskRun,
574+
});
575+
576+
if (!enqueueResult.ok) {
577+
// Now we need to fail the run with enqueueResult.error and make sure and
578+
// set the traced event to failed as well
579+
await this._prisma.taskRun.update({
580+
where: { id: run.id },
581+
data: {
582+
status: "SYSTEM_FAILURE",
583+
completedAt: new Date(),
584+
error: enqueueResult.error,
585+
},
586+
});
587+
588+
event.failWithError(enqueueResult.error);
589+
590+
return {
591+
run,
592+
isCached: false,
593+
error: enqueueResult.error,
594+
};
595+
}
588596
}
589597

590598
return { run, isCached: false };
591599
}
592600
);
601+
602+
if (result?.error) {
603+
throw new ServiceValidationError(
604+
taskRunErrorToString(taskRunErrorEnhancer(result.error))
605+
);
606+
}
607+
608+
const run = result?.run;
609+
610+
if (!run) {
611+
return;
612+
}
613+
614+
return {
615+
run,
616+
isCached: result?.isCached,
617+
};
593618
} catch (error) {
594619
// Detect a prisma transaction Unique constraint violation
595620
if (error instanceof Prisma.PrismaClientKnownRequestError) {

internal-packages/database/prisma/schema.prisma

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1731,6 +1731,8 @@ model TaskRun {
17311731
17321732
taskEventStore String @default("taskEvent")
17331733
1734+
queueTimestamp DateTime?
1735+
17341736
batchItems BatchTaskRunItem[]
17351737
dependency TaskRunDependency?
17361738
CheckpointRestoreEvent CheckpointRestoreEvent[]

0 commit comments

Comments
 (0)