Skip to content

Commit a76ef98

Browse files
committed
add pre-dequeue callback to determine max resources
1 parent 1d6fb48 commit a76ef98

File tree

6 files changed

+56
-10
lines changed

6 files changed

+56
-10
lines changed
Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import { WorkerApiDequeueResponseBody } from "@trigger.dev/worker";
3-
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
2+
import { WorkerApiDequeueRequestBody, WorkerApiDequeueResponseBody } from "@trigger.dev/worker";
3+
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
44

5-
export const loader = createLoaderWorkerApiRoute(
6-
{},
7-
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
8-
return json(await authenticatedWorker.dequeue());
5+
export const action = createActionWorkerApiRoute(
6+
{
7+
body: WorkerApiDequeueRequestBody,
8+
},
9+
async ({ authenticatedWorker, body }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
10+
return json(
11+
await authenticatedWorker.dequeue({
12+
maxResources: body.maxResources,
13+
})
14+
);
915
}
1016
);

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
ExecutionResult,
1919
MachinePreset,
2020
WaitForDurationResult,
21+
MachineResources,
2122
} from "@trigger.dev/core/v3";
2223
import { env } from "~/env.server";
2324
import { $transaction } from "~/db.server";
@@ -525,12 +526,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
525526
});
526527
}
527528

528-
async dequeue(maxRunCount = 10): Promise<DequeuedMessage[]> {
529+
async dequeue({
530+
maxRunCount = 10,
531+
maxResources,
532+
}: {
533+
maxRunCount?: number;
534+
maxResources?: MachineResources;
535+
} = {}): Promise<DequeuedMessage[]> {
529536
if (this.type === WorkerInstanceGroupType.MANAGED) {
530537
return await this._engine.dequeueFromMasterQueue({
531538
consumerId: this.workerInstanceId,
532539
masterQueue: this.masterQueue,
533540
maxRunCount,
541+
maxResources,
534542
});
535543
}
536544

packages/worker/src/supervisor/http.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { zodfetch, ApiError } from "@trigger.dev/core/v3/zodfetch";
33
import {
44
WorkerApiConnectRequestBody,
55
WorkerApiConnectResponseBody,
6+
WorkerApiDequeueRequestBody,
67
WorkerApiDequeueResponseBody,
78
WorkerApiHeartbeatRequestBody,
89
WorkerApiHeartbeatResponseBody,
@@ -61,14 +62,17 @@ export class SupervisorHttpClient {
6162
);
6263
}
6364

64-
async dequeue() {
65+
async dequeue(body: WorkerApiDequeueRequestBody) {
6566
return wrapZodFetch(
6667
WorkerApiDequeueResponseBody,
6768
`${this.apiUrl}/api/v1/worker-actions/dequeue`,
6869
{
70+
method: "POST",
6971
headers: {
7072
...this.defaultHeaders,
73+
"Content-Type": "application/json",
7174
},
75+
body: JSON.stringify(body),
7276
}
7377
);
7478
}

packages/worker/src/supervisor/queueConsumer.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
import { MachineResources } from "@trigger.dev/core/v3";
12
import { SupervisorHttpClient } from "./http.js";
23
import { WorkerApiDequeueResponseBody } from "./schemas.js";
34

45
type RunQueueConsumerOptions = {
56
client: SupervisorHttpClient;
67
intervalMs?: number;
8+
preDequeue?: () => Promise<{
9+
maxResources?: MachineResources;
10+
}>;
711
onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
812
};
913

1014
export class RunQueueConsumer {
1115
private readonly client: SupervisorHttpClient;
16+
private readonly preDequeue?: () => Promise<{
17+
maxResources?: MachineResources;
18+
}>;
1219
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
1320

1421
private intervalMs: number;
@@ -17,6 +24,7 @@ export class RunQueueConsumer {
1724
constructor(opts: RunQueueConsumerOptions) {
1825
this.isEnabled = false;
1926
this.intervalMs = opts.intervalMs ?? 5_000;
27+
this.preDequeue = opts.preDequeue;
2028
this.onDequeue = opts.onDequeue;
2129
this.client = opts.client;
2230
}
@@ -46,8 +54,18 @@ export class RunQueueConsumer {
4654
return;
4755
}
4856

57+
let maxResources: MachineResources | undefined;
58+
if (this.preDequeue) {
59+
try {
60+
const preDequeueResult = await this.preDequeue();
61+
maxResources = preDequeueResult.maxResources;
62+
} catch (preDequeueError) {
63+
console.error("[RunQueueConsumer] preDequeue error", { error: preDequeueError });
64+
}
65+
}
66+
4967
try {
50-
const response = await this.client.dequeue();
68+
const response = await this.client.dequeue({ maxResources });
5169

5270
if (!response.success) {
5371
console.error("[RunQueueConsumer] Failed to dequeue", { error: response.error });

packages/worker/src/supervisor/schemas.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { z } from "zod";
22
import {
33
CompleteRunAttemptResult,
44
DequeuedMessage,
5+
MachineResources,
56
RunExecutionData,
67
StartRunAttemptResult,
78
TaskRunExecutionResult,
@@ -40,6 +41,11 @@ export const WorkerApiConnectResponseBody = z.object({
4041
});
4142
export type WorkerApiConnectResponseBody = z.infer<typeof WorkerApiConnectResponseBody>;
4243

44+
export const WorkerApiDequeueRequestBody = z.object({
45+
maxResources: MachineResources.optional(),
46+
});
47+
export type WorkerApiDequeueRequestBody = z.infer<typeof WorkerApiDequeueRequestBody>;
48+
4349
export const WorkerApiDequeueResponseBody = DequeuedMessage.array();
4450
export type WorkerApiDequeueResponseBody = z.infer<typeof WorkerApiDequeueResponseBody>;
4551

packages/worker/src/supervisor/session.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { HeartbeatService } from "@trigger.dev/core/v3";
1+
import { HeartbeatService, MachineResources } from "@trigger.dev/core/v3";
22
import { SupervisorHttpClient } from "./http.js";
33
import { SupervisorClientCommonOptions } from "./types.js";
44
import { WorkerApiDequeueResponseBody, WorkerApiHeartbeatRequestBody } from "./schemas.js";
@@ -13,6 +13,9 @@ import { getDefaultWorkerHeaders } from "./util.js";
1313
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1414
heartbeatIntervalSeconds?: number;
1515
dequeueIntervalMs?: number;
16+
preDequeue?: () => Promise<{
17+
maxResources?: MachineResources;
18+
}>;
1619
};
1720

1821
export class SupervisorSession extends EventEmitter<WorkerEvents> {
@@ -30,6 +33,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
3033
this.httpClient = new SupervisorHttpClient(opts);
3134
this.queueConsumer = new RunQueueConsumer({
3235
client: this.httpClient,
36+
preDequeue: opts.preDequeue,
3337
onDequeue: this.onDequeue.bind(this),
3438
intervalMs: opts.dequeueIntervalMs,
3539
});

0 commit comments

Comments
 (0)