Skip to content

Commit be82a67

Browse files
committed
Ensure master queue consumers cannot stop from a processing error, and make the consumer interval configurable via an env var
1 parent 95a522b commit be82a67

File tree

5 files changed

+20
-2
lines changed

5 files changed

+20
-2
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ const EnvironmentSchema = z.object({
469469
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
470470
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
471471
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
472+
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
472473

473474
RUN_ENGINE_WORKER_REDIS_HOST: z
474475
.string()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ function createRunEngine() {
6262
shardCount: env.RUN_ENGINE_RUN_QUEUE_SHARD_COUNT,
6363
processWorkerQueueDebounceMs: env.RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS,
6464
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
65+
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
6566
},
6667
runLock: {
6768
redis: {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export class RunEngine {
124124
},
125125
shardCount: options.queue?.shardCount,
126126
masterQueueConsumersDisabled: options.worker.disabled,
127+
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
127128
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
128129
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
129130
meter: options.meter,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export type RunEngineOptions = {
3232
redis: RedisOptions;
3333
shardCount?: number;
3434
processWorkerQueueDebounceMs?: number;
35+
masterQueueConsumersIntervalMs?: number;
3536
workerOptions?: WorkerConcurrencyOptions;
3637
retryOptions?: RetryOptions;
3738
defaultEnvConcurrency?: number;

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ export type RunQueueOptions = {
6767
retryOptions?: RetryOptions;
6868
shardCount?: number;
6969
masterQueueConsumersDisabled?: boolean;
70+
masterQueueConsumersIntervalMs?: number;
7071
processWorkerQueueDebounceMs?: number;
7172
workerOptions?: {
7273
pollIntervalMs?: number;
@@ -1001,7 +1002,9 @@ export class RunQueue {
10011002
const consumerId = nanoid();
10021003

10031004
try {
1004-
for await (const _ of setInterval(500, null, { signal: this.abortController.signal })) {
1005+
for await (const _ of setInterval(this.options.masterQueueConsumersIntervalMs ?? 500, null, {
1006+
signal: this.abortController.signal,
1007+
})) {
10051008
this.logger.verbose(`Processing master queue shard ${shard}`, {
10061009
processedCount,
10071010
lastProcessedAt,
@@ -1012,7 +1015,18 @@ export class RunQueue {
10121015

10131016
const now = performance.now();
10141017

1015-
const results = await this.#processMasterQueueShard(shard, consumerId);
1018+
const [error, results] = await tryCatch(this.#processMasterQueueShard(shard, consumerId));
1019+
1020+
if (error) {
1021+
this.logger.error(`Failed to process master queue shard ${shard}`, {
1022+
error,
1023+
service: this.name,
1024+
shard,
1025+
consumerId,
1026+
});
1027+
1028+
continue;
1029+
}
10161030

10171031
const duration = performance.now() - now;
10181032

0 commit comments

Comments
 (0)