Skip to content

Commit 4b0af15

Browse files
committed
supervisor uses new idle interval after empty dequeues and errors
1 parent 8b0e4f2 commit 4b0af15

File tree

4 files changed

+21
-9
lines changed

4 files changed

+21
-9
lines changed

apps/supervisor/src/env.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ const Env = z.object({
3131

3232
// Dequeue settings (provider mode)
3333
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
34-
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
34+
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
35+
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
3536
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
3637
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1),
3738

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class ManagedSupervisor {
116116
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
117117
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
118118
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
119+
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
119120
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
120121
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
121122
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { PreDequeueFn, PreSkipFn } from "./types.js";
44

55
type RunQueueConsumerOptions = {
66
client: SupervisorHttpClient;
7-
intervalMs?: number;
7+
intervalMs: number;
8+
idleIntervalMs: number;
89
preDequeue?: PreDequeueFn;
910
preSkip?: PreSkipFn;
1011
maxRunCount?: number;
@@ -19,11 +20,13 @@ export class RunQueueConsumer {
1920
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
2021

2122
private intervalMs: number;
23+
private idleIntervalMs: number;
2224
private isEnabled: boolean;
2325

2426
constructor(opts: RunQueueConsumerOptions) {
2527
this.isEnabled = false;
26-
this.intervalMs = opts.intervalMs ?? 5_000;
28+
this.intervalMs = opts.intervalMs;
29+
this.idleIntervalMs = opts.idleIntervalMs;
2730
this.preDequeue = opts.preDequeue;
2831
this.preSkip = opts.preSkip;
2932
this.maxRunCount = opts.maxRunCount;
@@ -84,9 +87,11 @@ export class RunQueueConsumer {
8487
}
8588
}
8689

87-
return this.scheduleNextDequeue();
90+
return this.scheduleNextDequeue(this.idleIntervalMs);
8891
}
8992

93+
let nextIntervalMs = this.idleIntervalMs;
94+
9095
try {
9196
const response = await this.client.dequeue({
9297
maxResources: preDequeueResult?.maxResources,
@@ -98,6 +103,10 @@ export class RunQueueConsumer {
98103
} else {
99104
try {
100105
await this.onDequeue(response.data);
106+
107+
if (response.data.length > 0) {
108+
nextIntervalMs = this.intervalMs;
109+
}
101110
} catch (handlerError) {
102111
console.error("[RunQueueConsumer] onDequeue error", { error: handlerError });
103112
}
@@ -106,10 +115,10 @@ export class RunQueueConsumer {
106115
console.error("[RunQueueConsumer] client.dequeue error", { error: clientError });
107116
}
108117

109-
this.scheduleNextDequeue();
118+
this.scheduleNextDequeue(nextIntervalMs);
110119
}
111120

112-
scheduleNextDequeue(delay: number = this.intervalMs) {
113-
setTimeout(this.dequeue.bind(this), delay);
121+
scheduleNextDequeue(delayMs: number) {
122+
setTimeout(this.dequeue.bind(this), delayMs);
114123
}
115124
}

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1414
queueConsumerEnabled?: boolean;
1515
runNotificationsEnabled?: boolean;
1616
heartbeatIntervalSeconds?: number;
17-
dequeueIntervalMs?: number;
17+
dequeueIntervalMs: number;
18+
dequeueIdleIntervalMs: number;
1819
preDequeue?: PreDequeueFn;
1920
preSkip?: PreSkipFn;
2021
maxRunCount?: number;
@@ -47,11 +48,11 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
4748
preSkip: opts.preSkip,
4849
onDequeue: this.onDequeue.bind(this),
4950
intervalMs: opts.dequeueIntervalMs,
51+
idleIntervalMs: opts.dequeueIdleIntervalMs,
5052
maxRunCount: opts.maxRunCount,
5153
});
5254
});
5355

54-
// TODO: This should be dynamic and set by (or at least overridden by) the platform
5556
this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30;
5657
this.heartbeat = new IntervalService({
5758
onInterval: async () => {

0 commit comments

Comments
 (0)