Skip to content

Commit 5cf7c52

Browse files
committed
optionally skip dequeue
1 parent a76ef98 commit 5cf7c52

File tree

3 files changed

+25
-15
lines changed

3 files changed

+25
-15
lines changed

packages/worker/src/supervisor/queueConsumer.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
1-
import { MachineResources } from "@trigger.dev/core/v3";
21
import { SupervisorHttpClient } from "./http.js";
32
import { WorkerApiDequeueResponseBody } from "./schemas.js";
3+
import { PreDequeueFn } from "./types.js";
44

55
type RunQueueConsumerOptions = {
66
client: SupervisorHttpClient;
77
intervalMs?: number;
8-
preDequeue?: () => Promise<{
9-
maxResources?: MachineResources;
10-
}>;
8+
preDequeue?: PreDequeueFn;
119
onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
1210
};
1311

1412
export class RunQueueConsumer {
1513
private readonly client: SupervisorHttpClient;
16-
private readonly preDequeue?: () => Promise<{
17-
maxResources?: MachineResources;
18-
}>;
14+
private readonly preDequeue?: PreDequeueFn;
1915
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
2016

2117
private intervalMs: number;
@@ -54,18 +50,27 @@ export class RunQueueConsumer {
5450
return;
5551
}
5652

57-
let maxResources: MachineResources | undefined;
53+
let preDequeueResult: Awaited<ReturnType<PreDequeueFn>> | undefined;
5854
if (this.preDequeue) {
5955
try {
60-
const preDequeueResult = await this.preDequeue();
61-
maxResources = preDequeueResult.maxResources;
56+
preDequeueResult = await this.preDequeue();
6257
} catch (preDequeueError) {
6358
console.error("[RunQueueConsumer] preDequeue error", { error: preDequeueError });
6459
}
6560
}
6661

62+
if (
63+
preDequeueResult?.skipDequeue ||
64+
preDequeueResult?.maxResources?.cpu === 0 ||
65+
preDequeueResult?.maxResources?.memory === 0
66+
) {
67+
return this.scheduleNextDequeue();
68+
}
69+
6770
try {
68-
const response = await this.client.dequeue({ maxResources });
71+
const response = await this.client.dequeue({
72+
maxResources: preDequeueResult?.maxResources,
73+
});
6974

7075
if (!response.success) {
7176
console.error("[RunQueueConsumer] Failed to dequeue", { error: response.error });

packages/worker/src/supervisor/session.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { HeartbeatService, MachineResources } from "@trigger.dev/core/v3";
22
import { SupervisorHttpClient } from "./http.js";
3-
import { SupervisorClientCommonOptions } from "./types.js";
3+
import { PreDequeueFn, SupervisorClientCommonOptions } from "./types.js";
44
import { WorkerApiDequeueResponseBody, WorkerApiHeartbeatRequestBody } from "./schemas.js";
55
import { RunQueueConsumer } from "./queueConsumer.js";
66
import { WorkerEvents } from "./events.js";
@@ -13,9 +13,7 @@ import { getDefaultWorkerHeaders } from "./util.js";
1313
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1414
heartbeatIntervalSeconds?: number;
1515
dequeueIntervalMs?: number;
16-
preDequeue?: () => Promise<{
17-
maxResources?: MachineResources;
18-
}>;
16+
preDequeue?: PreDequeueFn;
1917
};
2018

2119
export class SupervisorSession extends EventEmitter<WorkerEvents> {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
import type { MachineResources } from "@trigger.dev/core/v3";
2+
13
export type SupervisorClientCommonOptions = {
24
apiUrl: string;
35
workerToken: string;
46
instanceName: string;
57
deploymentId?: string;
68
managedWorkerSecret?: string;
79
};
10+
11+
export type PreDequeueFn = () => Promise<{
12+
maxResources?: MachineResources;
13+
skipDequeue?: boolean;
14+
}>;

0 commit comments

Comments
 (0)