Skip to content

Commit 966e42e

Browse files
committed
add shutdown timeout to redis-worker
1 parent 4395062 commit 966e42e

File tree

7 files changed

+31
-7
lines changed

7 files changed

+31
-7
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ const EnvironmentSchema = z.object({
424424
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
425425
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
426426
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
427+
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
427428

428429
RUN_ENGINE_WORKER_REDIS_HOST: z
429430
.string()
@@ -585,6 +586,7 @@ const EnvironmentSchema = z.object({
585586
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
586587
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
587588
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
589+
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
588590

589591
LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
590592
.string()
@@ -627,6 +629,7 @@ const EnvironmentSchema = z.object({
627629
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
628630
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
629631
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
632+
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
630633

631634
COMMON_WORKER_REDIS_HOST: z
632635
.string()

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ function initializeWorker() {
8080
},
8181
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
8282
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
83+
shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS,
8384
logger: new Logger("CommonWorker", "debug"),
8485
jobs: {
8586
"v3.deliverAlert": async ({ payload }) => {

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ function initializeWorker() {
6767
},
6868
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
6969
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
70+
shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
7071
logger: new Logger("LegacyRunEngineWorker", "debug"),
7172
jobs: {
7273
runHeartbeat: async ({ payload }) => {

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ function createRunEngine() {
1717
workers: env.RUN_ENGINE_WORKER_COUNT,
1818
tasksPerWorker: env.RUN_ENGINE_TASKS_PER_WORKER,
1919
pollIntervalMs: env.RUN_ENGINE_WORKER_POLL_INTERVAL,
20+
shutdownTimeoutMs: env.RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
2021
redis: {
2122
keyPrefix: "engine:",
2223
port: env.RUN_ENGINE_WORKER_REDIS_PORT ?? undefined,

internal-packages/redis-worker/src/worker.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { AnyQueueItem, SimpleQueue } from "./queue.js";
88
import { nanoid } from "nanoid";
99
import pLimit from "p-limit";
1010
import { createRedisClient } from "@internal/redis";
11+
import { shutdownManager } from "@trigger.dev/core/v3/serverOnly";
1112

1213
export type WorkerCatalog = {
1314
[key: string]: {
@@ -44,6 +45,7 @@ type WorkerOptions<TCatalog extends WorkerCatalog> = {
4445
concurrency?: WorkerConcurrencyOptions;
4546
pollIntervalMs?: number;
4647
immediatePollIntervalMs?: number;
48+
shutdownTimeoutMs?: number;
4749
logger?: Logger;
4850
tracer?: Tracer;
4951
};
@@ -69,6 +71,7 @@ class Worker<TCatalog extends WorkerCatalog> {
6971
private workerLoops: Promise<void>[] = [];
7072
private isShuttingDown = false;
7173
private concurrency: Required<NonNullable<WorkerOptions<TCatalog>["concurrency"]>>;
74+
private shutdownTimeoutMs: number;
7275

7376
// The p-limit limiter to control overall concurrency.
7477
private limiter: ReturnType<typeof pLimit>;
@@ -77,6 +80,8 @@ class Worker<TCatalog extends WorkerCatalog> {
7780
this.logger = options.logger ?? new Logger("Worker", "debug");
7881
this.tracer = options.tracer ?? trace.getTracer(options.name);
7982

83+
this.shutdownTimeoutMs = options.shutdownTimeoutMs ?? 60_000;
84+
8085
const schema: QueueCatalogFromWorkerCatalog<TCatalog> = Object.fromEntries(
8186
Object.entries(this.options.catalog).map(([key, value]) => [key, value.schema])
8287
) as QueueCatalogFromWorkerCatalog<TCatalog>;
@@ -386,22 +391,33 @@ class Worker<TCatalog extends WorkerCatalog> {
386391
}
387392

388393
private setupShutdownHandlers() {
389-
process.on("SIGTERM", this.shutdown.bind(this));
390-
process.on("SIGINT", this.shutdown.bind(this));
394+
shutdownManager.register("redis-worker", this.shutdown.bind(this));
391395
}
392396

393-
private async shutdown() {
394-
if (this.isShuttingDown) return;
397+
private async shutdown(signal?: NodeJS.Signals) {
398+
if (this.isShuttingDown) {
399+
this.logger.log("Worker already shutting down", { signal });
400+
return;
401+
}
402+
395403
this.isShuttingDown = true;
396-
this.logger.log("Shutting down worker loops...");
404+
this.logger.log("Shutting down worker loops...", { signal });
397405

398406
// Wait for all worker loops to finish.
399-
await Promise.all(this.workerLoops);
407+
await Promise.race([
408+
Promise.all(this.workerLoops),
409+
Worker.delay(this.shutdownTimeoutMs).then(() => {
410+
this.logger.error("Worker shutdown timed out", {
411+
signal,
412+
shutdownTimeoutMs: this.shutdownTimeoutMs,
413+
});
414+
}),
415+
]);
400416

401417
await this.subscriber?.unsubscribe();
402418
await this.subscriber?.quit();
403419
await this.queue.close();
404-
this.logger.log("All workers and subscribers shut down.");
420+
this.logger.log("All workers and subscribers shut down.", { signal });
405421
}
406422

407423
public async stop() {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ export class RunEngine {
180180
concurrency: options.worker,
181181
pollIntervalMs: options.worker.pollIntervalMs,
182182
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
183+
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
183184
logger: new Logger("RunEngineWorker", "debug"),
184185
jobs: {
185186
finishWaitpoint: async ({ payload }) => {

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export type RunEngineOptions = {
1212
redis: RedisOptions;
1313
pollIntervalMs?: number;
1414
immediatePollIntervalMs?: number;
15+
shutdownTimeoutMs?: number;
1516
};
1617
machines: {
1718
defaultMachine: MachinePresetName;

0 commit comments

Comments
 (0)