Skip to content

Commit ce50f68

Browse files
committed
WIP
1 parent eee0fb2 commit ce50f68

File tree

16 files changed

+1061
-732
lines changed

16 files changed

+1061
-732
lines changed

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ export class DefaultQueueManager implements QueueManager {
196196
};
197197
}
198198

199-
async getMasterQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
199+
async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
200200
if (environment.type === "DEVELOPMENT") {
201201
return;
202202
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {
234234
lockedQueueId,
235235
});
236236

237-
const masterQueue = await this.queueConcern.getMasterQueue(environment);
237+
const masterQueue = await this.queueConcern.getWorkerQueue(environment);
238238

239239
try {
240240
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {
@@ -271,7 +271,7 @@ export class RunEngineTriggerTaskService {
271271
concurrencyKey: body.options?.concurrencyKey,
272272
queue: queueName,
273273
lockedQueueId,
274-
masterQueue: masterQueue,
274+
workerQueue: masterQueue,
275275
isTest: body.options?.test ?? false,
276276
delayUntil,
277277
queuedAt: delayUntil ? undefined : new Date(),

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export interface QueueManager {
6666
): Promise<QueueProperties>;
6767
getQueueName(request: TriggerTaskRequest): Promise<string>;
6868
validateQueueLimits(env: AuthenticatedEnvironment): Promise<QueueValidationResult>;
69-
getMasterQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
69+
getWorkerQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
7070
}
7171

7272
export interface PayloadProcessor {

internal-packages/database/prisma/schema.prisma

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1764,7 +1764,8 @@ model TaskRun {
17641764
lockedQueueId String?
17651765
17661766
/// The main queue that this run is part of
1767-
masterQueue String @default("main")
1767+
workerQueue String @default("main") @map("masterQueue")
1768+
/// @deprecated
17681769
secondaryMasterQueue String?
17691770
17701771
/// From engine v2+ this will be defined after a run has been dequeued (starting at 1)

internal-packages/run-engine/src/engine/index.ts

Lines changed: 21 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ import {
77
CreateCheckpointResult,
88
DequeuedMessage,
99
ExecutionResult,
10-
MachineResources,
1110
RunExecutionData,
1211
StartRunAttemptResult,
1312
TaskRunExecutionResult,
1413
} from "@trigger.dev/core/v3";
15-
import { BatchId, RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic";
14+
import { RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic";
1615
import {
1716
Prisma,
1817
PrismaClient,
@@ -37,20 +36,20 @@ import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
3736
import { DequeueSystem } from "./systems/dequeueSystem.js";
3837
import { EnqueueSystem } from "./systems/enqueueSystem.js";
3938
import {
39+
executionDataFromSnapshot,
4040
ExecutionSnapshotSystem,
41-
getLatestExecutionSnapshot,
4241
getExecutionSnapshotsSince,
43-
executionDataFromSnapshot,
42+
getLatestExecutionSnapshot,
4443
} from "./systems/executionSnapshotSystem.js";
4544
import { PendingVersionSystem } from "./systems/pendingVersionSystem.js";
45+
import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js";
4646
import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js";
4747
import { RunAttemptSystem } from "./systems/runAttemptSystem.js";
4848
import { SystemResources } from "./systems/systems.js";
4949
import { TtlSystem } from "./systems/ttlSystem.js";
5050
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5151
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5252
import { workerCatalog } from "./workerCatalog.js";
53-
import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js";
5453

5554
export class RunEngine {
5655
private runLockRedis: Redis;
@@ -318,7 +317,7 @@ export class RunEngine {
318317
sdkVersion,
319318
cliVersion,
320319
concurrencyKey,
321-
masterQueue,
320+
workerQueue,
322321
queue,
323322
lockedQueueId,
324323
isTest,
@@ -359,22 +358,6 @@ export class RunEngine {
359358
async (span) => {
360359
const status = delayUntil ? "DELAYED" : "PENDING";
361360

362-
let secondaryMasterQueue: string | undefined = undefined;
363-
364-
if (environment.type === "DEVELOPMENT") {
365-
// In dev we use the environment id as the master queue, or the locked worker id
366-
masterQueue = this.#environmentMasterQueueKey(environment.id);
367-
if (lockedToVersionId) {
368-
masterQueue = this.#backgroundWorkerQueueKey(lockedToVersionId);
369-
}
370-
} else {
371-
// For deployed runs, we add the env/worker id as the secondary master queue
372-
secondaryMasterQueue = this.#environmentMasterQueueKey(environment.id);
373-
if (lockedToVersionId) {
374-
secondaryMasterQueue = this.#backgroundWorkerQueueKey(lockedToVersionId);
375-
}
376-
}
377-
378361
//create run
379362
let taskRun: TaskRun;
380363
try {
@@ -406,8 +389,7 @@ export class RunEngine {
406389
concurrencyKey,
407390
queue,
408391
lockedQueueId,
409-
masterQueue,
410-
secondaryMasterQueue,
392+
workerQueue,
411393
isTest,
412394
delayUntil,
413395
queuedAt,
@@ -557,93 +539,58 @@ export class RunEngine {
557539
/**
558540
* Gets a fairly selected run from the specified master queue, returning the information required to run it.
559541
* @param consumerId: The consumer that is pulling, allows multiple consumers to pull from the same queue
560-
* @param masterQueue: The shared queue to pull from, can be an individual environment (for dev)
542+
* @param workerQueue: The worker queue to pull from, can be an individual environment (for dev)
561543
* @returns
562544
*/
563-
async dequeueFromMasterQueue({
545+
async dequeueFromWorkerQueue({
564546
consumerId,
565-
masterQueue,
566-
maxRunCount,
567-
maxResources,
547+
workerQueue,
568548
backgroundWorkerId,
569549
workerId,
570550
runnerId,
571551
tx,
572552
}: {
573553
consumerId: string;
574-
masterQueue: string;
575-
maxRunCount: number;
576-
maxResources?: MachineResources;
554+
workerQueue: string;
577555
backgroundWorkerId?: string;
578556
workerId?: string;
579557
runnerId?: string;
580558
tx?: PrismaClientOrTransaction;
581559
}): Promise<DequeuedMessage[]> {
582-
return this.dequeueSystem.dequeueFromMasterQueue({
560+
const dequeuedMessage = await this.dequeueSystem.dequeueFromWorkerQueue({
583561
consumerId,
584-
masterQueue,
585-
maxRunCount,
586-
maxResources,
562+
workerQueue,
587563
backgroundWorkerId,
588564
workerId,
589565
runnerId,
590566
tx,
591567
});
568+
569+
if (!dequeuedMessage) {
570+
return [];
571+
}
572+
573+
return [dequeuedMessage];
592574
}
593575

594-
async dequeueFromEnvironmentMasterQueue({
576+
async dequeueFromEnvironmentWorkerQueue({
595577
consumerId,
596578
environmentId,
597-
maxRunCount,
598-
maxResources,
599579
backgroundWorkerId,
600580
workerId,
601581
runnerId,
602582
tx,
603583
}: {
604584
consumerId: string;
605585
environmentId: string;
606-
maxRunCount: number;
607-
maxResources?: MachineResources;
608586
backgroundWorkerId?: string;
609587
workerId?: string;
610588
runnerId?: string;
611589
tx?: PrismaClientOrTransaction;
612590
}): Promise<DequeuedMessage[]> {
613-
return this.dequeueFromMasterQueue({
614-
consumerId,
615-
masterQueue: this.#environmentMasterQueueKey(environmentId),
616-
maxRunCount,
617-
maxResources,
618-
backgroundWorkerId,
619-
workerId,
620-
runnerId,
621-
tx,
622-
});
623-
}
624-
625-
async dequeueFromBackgroundWorkerMasterQueue({
626-
consumerId,
627-
backgroundWorkerId,
628-
maxRunCount,
629-
maxResources,
630-
workerId,
631-
runnerId,
632-
tx,
633-
}: {
634-
consumerId: string;
635-
backgroundWorkerId: string;
636-
maxRunCount: number;
637-
maxResources?: MachineResources;
638-
workerId?: string;
639-
runnerId?: string;
640-
tx?: PrismaClientOrTransaction;
641-
}): Promise<DequeuedMessage[]> {
642-
return this.dequeueFromMasterQueue({
591+
return this.dequeueFromWorkerQueue({
643592
consumerId,
644-
masterQueue: this.#backgroundWorkerQueueKey(backgroundWorkerId),
645-
maxRunCount,
646-
maxResources,
593+
workerQueue: environmentId,
647594
backgroundWorkerId,
648595
workerId,
649596
runnerId,
@@ -1318,12 +1265,4 @@ export class RunEngine {
13181265
}
13191266
});
13201267
}
1321-
1322-
#environmentMasterQueueKey(environmentId: string) {
1323-
return `master-env:${environmentId}`;
1324-
}
1325-
1326-
#backgroundWorkerQueueKey(backgroundWorkerId: string) {
1327-
return `master-background-worker:${backgroundWorkerId}`;
1328-
}
13291268
}

0 commit comments

Comments
 (0)