Skip to content

Commit f018a8f

Browse files
committed
v4 now working with the new worker queues, and added the legacy master queue migration stuff
1 parent 91e7657 commit f018a8f

File tree

13 files changed

+314
-195
lines changed

13 files changed

+314
-195
lines changed

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ export class SpanPresenter extends BasePresenter {
145145
},
146146
},
147147
engine: true,
148-
masterQueue: true,
149-
secondaryMasterQueue: true,
148+
workerQueue: true,
150149
error: true,
151150
output: true,
152151
outputType: true,
@@ -364,8 +363,7 @@ export class SpanPresenter extends BasePresenter {
364363
maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds),
365364
batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined,
366365
engine: run.engine,
367-
masterQueue: run.masterQueue,
368-
secondaryMasterQueue: run.secondaryMasterQueue,
366+
workerQueue: run.workerQueue,
369367
spanId: run.spanId,
370368
isCached: !!span.originalRun,
371369
};
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { engine } from "~/v3/runEngine.server";
5+
6+
export async function action({ request }: ActionFunctionArgs) {
7+
// Next authenticate the request
8+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
9+
10+
if (!authenticationResult) {
11+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
12+
}
13+
14+
const user = await prisma.user.findUnique({
15+
where: {
16+
id: authenticationResult.userId,
17+
},
18+
});
19+
20+
if (!user) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
if (!user.admin) {
25+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
26+
}
27+
28+
try {
29+
await engine.migrateLegacyMasterQueues();
30+
31+
return json({
32+
success: true,
33+
});
34+
} catch (error) {
35+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
36+
}
37+
}
Lines changed: 7 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,22 @@
11
import { json } from "@remix-run/server-runtime";
2-
import { DequeuedMessage, DevDequeueRequestBody, MachineResources } from "@trigger.dev/core/v3";
3-
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
4-
import { env } from "~/env.server";
2+
import { DevDequeueRequestBody } from "@trigger.dev/core/v3";
53
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
64
import { engine } from "~/v3/runEngine.server";
75

86
const { action } = createActionApiRoute(
97
{
10-
body: DevDequeueRequestBody,
8+
body: DevDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
119
maxContentLength: 1024 * 10, // 10KB
1210
method: "POST",
1311
},
14-
async ({ authentication, body }) => {
15-
//we won't return more runs than this in one API call
16-
let maxDequeueCount = env.DEV_DEQUEUE_MAX_RUNS_PER_PULL;
17-
18-
//we can't use more than the max resources
19-
const availableResources = body.maxResources ?? {
20-
cpu: 8,
21-
memory: 16,
22-
};
23-
24-
let dequeuedMessages: DequeuedMessage[] = [];
25-
26-
//we need to check the current worker, because a run might have been locked to it
27-
const workers = body.oldWorkers.concat(body.currentWorker);
28-
29-
//first we want to clear out old runs
30-
for (const worker of workers) {
31-
//dequeue
32-
const latestResult = await engine.dequeueFromBackgroundWorkerMasterQueue({
33-
consumerId: authentication.environment.id,
34-
//specific version
35-
backgroundWorkerId: BackgroundWorkerId.toId(worker),
36-
maxRunCount: maxDequeueCount,
37-
maxResources: availableResources,
38-
});
39-
40-
//add runs to the array
41-
dequeuedMessages.push(...latestResult);
42-
43-
//update availableResources
44-
const consumedResources = latestResult.reduce(
45-
(acc, r) => {
46-
return {
47-
cpu: acc.cpu + r.run.machine.cpu,
48-
memory: acc.memory + r.run.machine.memory,
49-
};
50-
},
51-
{ cpu: 0, memory: 0 }
52-
);
53-
updateAvailableResources(availableResources, consumedResources);
54-
55-
//update maxDequeueCount
56-
maxDequeueCount -= latestResult.length;
57-
58-
//if we have no resources left, we exit the loop
59-
if (!hasAvailableResources(availableResources)) break;
60-
//we've already dequeued the max number of runs
61-
if (maxDequeueCount <= 0) break;
62-
}
63-
64-
//dequeue from the current version if we still have space
65-
if (hasAvailableResources(availableResources) && maxDequeueCount > 0) {
66-
const latestResult = await engine.dequeueFromEnvironmentMasterQueue({
67-
consumerId: authentication.environment.id,
68-
//current dev version (no specific version specified)
69-
environmentId: authentication.environment.id,
70-
maxRunCount: maxDequeueCount,
71-
maxResources: availableResources,
72-
});
73-
dequeuedMessages.push(...latestResult);
74-
}
12+
async ({ authentication }) => {
13+
const dequeuedMessages = await engine.dequeueFromEnvironmentWorkerQueue({
14+
consumerId: authentication.environment.id,
15+
environmentId: authentication.environment.id,
16+
});
7517

7618
return json({ dequeuedMessages }, { status: 200 });
7719
}
7820
);
7921

80-
function updateAvailableResources(
81-
availableResources: MachineResources,
82-
resources: MachineResources
83-
) {
84-
availableResources.cpu -= resources.cpu;
85-
availableResources.memory -= resources.memory;
86-
}
87-
88-
function hasAvailableResources(availableResources: MachineResources) {
89-
return availableResources.cpu > 0 && availableResources.memory > 0;
90-
}
91-
9222
export { action };
Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
32
import { WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
43
import { z } from "zod";
5-
import { $replica, prisma } from "~/db.server";
64
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
75

6+
// Keep this route for backwards compatibility
87
export const loader = createLoaderWorkerApiRoute(
98
{
109
params: z.object({
@@ -14,55 +13,7 @@ export const loader = createLoaderWorkerApiRoute(
1413
maxRunCount: z.coerce.number().optional(),
1514
}),
1615
},
17-
async ({
18-
authenticatedWorker,
19-
params,
20-
searchParams,
21-
}): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
22-
const deployment = await $replica.workerDeployment.findUnique({
23-
where: {
24-
friendlyId: params.deploymentFriendlyId,
25-
},
26-
include: {
27-
worker: true,
28-
},
29-
});
30-
31-
if (!deployment) {
32-
throw new Error("Deployment not found");
33-
}
34-
35-
if (!deployment.worker) {
36-
throw new Error("Worker not found");
37-
}
38-
39-
const dequeuedMessages = (await isCurrentDeployment(deployment.id, deployment.environmentId))
40-
? await authenticatedWorker.dequeueFromEnvironment(
41-
deployment.worker.id,
42-
deployment.environmentId
43-
)
44-
: await authenticatedWorker.dequeueFromVersion(
45-
deployment.worker.id,
46-
searchParams.maxRunCount
47-
);
48-
49-
return json(dequeuedMessages);
16+
async (): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
17+
return json([]);
5018
}
5119
);
52-
53-
async function isCurrentDeployment(deploymentId: string, environmentId: string): Promise<boolean> {
54-
const promotion = await prisma.workerDeploymentPromotion.findUnique({
55-
where: {
56-
environmentId_label: {
57-
environmentId,
58-
label: CURRENT_DEPLOYMENT_LABEL,
59-
},
60-
},
61-
});
62-
63-
if (!promotion) {
64-
return false;
65-
}
66-
67-
return promotion.deploymentId === deploymentId;
68-
}

apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,9 @@ import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.
77

88
export const action = createActionWorkerApiRoute(
99
{
10-
body: WorkerApiDequeueRequestBody,
10+
body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
1111
},
12-
async ({ authenticatedWorker, body }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
13-
return json(
14-
await authenticatedWorker.dequeue({
15-
maxResources: body.maxResources,
16-
maxRunCount: body.maxRunCount,
17-
})
18-
);
12+
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
13+
return json(await authenticatedWorker.dequeue());
1914
}
2015
);

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -700,12 +700,8 @@ function RunBody({
700700
{isAdmin && (
701701
<>
702702
<Property.Item>
703-
<Property.Label>Primary master queue</Property.Label>
704-
<Property.Value>{run.masterQueue}</Property.Value>
705-
</Property.Item>
706-
<Property.Item>
707-
<Property.Label>Secondary master queue</Property.Label>
708-
<Property.Value>{run.secondaryMasterQueue ?? "–"}</Property.Value>
703+
<Property.Label>Worker queue</Property.Label>
704+
<Property.Value>{run.workerQueue}</Property.Value>
709705
</Property.Item>
710706
</>
711707
)}

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ export class DefaultQueueManager implements QueueManager {
198198

199199
async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
200200
if (environment.type === "DEVELOPMENT") {
201-
return;
201+
return environment.id;
202202
}
203203

204204
const workerGroupService = new WorkerGroupService({

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { customAlphabet } from "nanoid";
2-
import { WithRunEngine, WithRunEngineOptions } from "../baseService.server";
2+
import { ServiceValidationError, WithRunEngine, WithRunEngineOptions } from "../baseService.server";
33
import { createHash, timingSafeEqual } from "crypto";
44
import { logger } from "~/services/logger.server";
55
import {
@@ -537,19 +537,11 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
537537
});
538538
}
539539

540-
async dequeue({
541-
maxRunCount = 10,
542-
maxResources,
543-
}: {
544-
maxRunCount?: number;
545-
maxResources?: MachineResources;
546-
} = {}): Promise<DequeuedMessage[]> {
540+
async dequeue(): Promise<DequeuedMessage[]> {
547541
if (this.type === WorkerInstanceGroupType.MANAGED) {
548-
return await this._engine.dequeueFromMasterQueue({
542+
return await this._engine.dequeueFromWorkerQueue({
549543
consumerId: this.workerInstanceId,
550-
masterQueue: this.masterQueue,
551-
maxRunCount,
552-
maxResources,
544+
workerQueue: this.masterQueue,
553545
workerId: this.workerInstanceId,
554546
runnerId: this.runnerId,
555547
});
@@ -572,51 +564,21 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
572564
});
573565

574566
if (this.isLatestDeployment) {
575-
return await this._engine.dequeueFromEnvironmentMasterQueue({
567+
return await this._engine.dequeueFromEnvironmentWorkerQueue({
576568
consumerId: this.workerInstanceId,
577569
environmentId: this.environment.id,
578-
maxRunCount,
579-
backgroundWorkerId: this.backgroundWorkerId,
580570
workerId: this.workerInstanceId,
581571
runnerId: this.runnerId,
582572
});
583573
}
584574

585-
return await this._engine.dequeueFromBackgroundWorkerMasterQueue({
586-
consumerId: this.workerInstanceId,
587-
backgroundWorkerId: this.backgroundWorkerId,
588-
maxRunCount,
589-
workerId: this.workerInstanceId,
590-
runnerId: this.runnerId,
591-
});
592-
}
593-
594-
/** Allows managed workers to dequeue from a specific version */
595-
async dequeueFromVersion(
596-
backgroundWorkerId: string,
597-
maxRunCount = 1
598-
): Promise<DequeuedMessage[]> {
599-
if (this.type !== WorkerInstanceGroupType.MANAGED) {
600-
logger.error("[AuthenticatedWorkerInstance] Worker instance is not managed", {
601-
...this.toJSON(),
602-
});
603-
return [];
604-
}
605-
606-
return await this._engine.dequeueFromBackgroundWorkerMasterQueue({
607-
consumerId: this.workerInstanceId,
608-
backgroundWorkerId,
609-
maxRunCount,
610-
workerId: this.workerInstanceId,
611-
runnerId: this.runnerId,
612-
});
575+
throw new ServiceValidationError("Unmanaged workers cannot dequeue from a specific version");
613576
}
614577

615578
/** Allows managed workers to dequeue from a specific environment */
616579
async dequeueFromEnvironment(
617580
backgroundWorkerId: string,
618-
environmentId: string,
619-
maxRunCount = 1
581+
environmentId: string
620582
): Promise<DequeuedMessage[]> {
621583
if (this.type !== WorkerInstanceGroupType.MANAGED) {
622584
logger.error("[AuthenticatedWorkerInstance] Worker instance is not managed", {
@@ -625,11 +587,10 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
625587
return [];
626588
}
627589

628-
return await this._engine.dequeueFromEnvironmentMasterQueue({
590+
return await this._engine.dequeueFromEnvironmentWorkerQueue({
629591
consumerId: this.workerInstanceId,
630592
backgroundWorkerId,
631593
environmentId,
632-
maxRunCount,
633594
workerId: this.workerInstanceId,
634595
runnerId: this.runnerId,
635596
});

0 commit comments

Comments
 (0)