Skip to content

Commit abbdb4f

Browse files
committed
Separate Redis clients for the run engine worker/queue/runlock
1 parent 8fc9304 commit abbdb4f

18 files changed

+394
-246
lines changed

apps/webapp/app/env.server.ts

Lines changed: 124 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -92,33 +92,6 @@ const EnvironmentSchema = z.object({
9292
REDIS_PASSWORD: z.string().optional(),
9393
REDIS_TLS_DISABLED: z.string().optional(),
9494

95-
// Valkey options (used in Run Engine 2.0+)
96-
VALKEY_HOST: z
97-
.string()
98-
.nullish()
99-
.default(process.env.REDIS_HOST ?? null),
100-
VALKEY_READER_HOST: z
101-
.string()
102-
.nullish()
103-
.default(process.env.REDIS_READER_HOST ?? null),
104-
VALKEY_READER_PORT: z.coerce
105-
.number()
106-
.nullish()
107-
.default(process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : null),
108-
VALKEY_PORT: z.coerce
109-
.number()
110-
.nullish()
111-
.default(process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : null),
112-
VALKEY_USERNAME: z
113-
.string()
114-
.nullish()
115-
.default(process.env.REDIS_USERNAME ?? null),
116-
VALKEY_PASSWORD: z
117-
.string()
118-
.nullish()
119-
.default(process.env.REDIS_PASSWORD ?? null),
120-
VALKEY_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
121-
12295
RATE_LIMIT_REDIS_HOST: z
12396
.string()
12497
.optional()
@@ -409,6 +382,130 @@ const EnvironmentSchema = z.object({
409382
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
410383
RUN_ENGINE_DEBUG_WORKER_NOTIFICATIONS: z.coerce.boolean().default(false),
411384

385+
RUN_ENGINE_WORKER_REDIS_HOST: z
386+
.string()
387+
.optional()
388+
.transform((v) => v ?? process.env.REDIS_HOST),
389+
RUN_ENGINE_WORKER_REDIS_READER_HOST: z
390+
.string()
391+
.optional()
392+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
393+
RUN_ENGINE_WORKER_REDIS_READER_PORT: z.coerce
394+
.number()
395+
.optional()
396+
.transform(
397+
(v) =>
398+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
399+
),
400+
RUN_ENGINE_WORKER_REDIS_PORT: z.coerce
401+
.number()
402+
.optional()
403+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
404+
RUN_ENGINE_WORKER_REDIS_USERNAME: z
405+
.string()
406+
.optional()
407+
.transform((v) => v ?? process.env.REDIS_USERNAME),
408+
RUN_ENGINE_WORKER_REDIS_PASSWORD: z
409+
.string()
410+
.optional()
411+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
412+
RUN_ENGINE_WORKER_REDIS_TLS_DISABLED: z
413+
.string()
414+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
415+
416+
RUN_ENGINE_RUN_QUEUE_REDIS_HOST: z
417+
.string()
418+
.optional()
419+
.transform((v) => v ?? process.env.REDIS_HOST),
420+
RUN_ENGINE_RUN_QUEUE_REDIS_READER_HOST: z
421+
.string()
422+
.optional()
423+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
424+
RUN_ENGINE_RUN_QUEUE_REDIS_READER_PORT: z.coerce
425+
.number()
426+
.optional()
427+
.transform(
428+
(v) =>
429+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
430+
),
431+
RUN_ENGINE_RUN_QUEUE_REDIS_PORT: z.coerce
432+
.number()
433+
.optional()
434+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
435+
RUN_ENGINE_RUN_QUEUE_REDIS_USERNAME: z
436+
.string()
437+
.optional()
438+
.transform((v) => v ?? process.env.REDIS_USERNAME),
439+
RUN_ENGINE_RUN_QUEUE_REDIS_PASSWORD: z
440+
.string()
441+
.optional()
442+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
443+
RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED: z
444+
.string()
445+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
446+
447+
RUN_ENGINE_RUN_LOCK_REDIS_HOST: z
448+
.string()
449+
.optional()
450+
.transform((v) => v ?? process.env.REDIS_HOST),
451+
RUN_ENGINE_RUN_LOCK_REDIS_READER_HOST: z
452+
.string()
453+
.optional()
454+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
455+
RUN_ENGINE_RUN_LOCK_REDIS_READER_PORT: z.coerce
456+
.number()
457+
.optional()
458+
.transform(
459+
(v) =>
460+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
461+
),
462+
RUN_ENGINE_RUN_LOCK_REDIS_PORT: z.coerce
463+
.number()
464+
.optional()
465+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
466+
RUN_ENGINE_RUN_LOCK_REDIS_USERNAME: z
467+
.string()
468+
.optional()
469+
.transform((v) => v ?? process.env.REDIS_USERNAME),
470+
RUN_ENGINE_RUN_LOCK_REDIS_PASSWORD: z
471+
.string()
472+
.optional()
473+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
474+
RUN_ENGINE_RUN_LOCK_REDIS_TLS_DISABLED: z
475+
.string()
476+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
477+
478+
RUN_ENGINE_DEV_PRESENCE_REDIS_HOST: z
479+
.string()
480+
.optional()
481+
.transform((v) => v ?? process.env.REDIS_HOST),
482+
RUN_ENGINE_DEV_PRESENCE_REDIS_READER_HOST: z
483+
.string()
484+
.optional()
485+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
486+
RUN_ENGINE_DEV_PRESENCE_REDIS_READER_PORT: z.coerce
487+
.number()
488+
.optional()
489+
.transform(
490+
(v) =>
491+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
492+
),
493+
RUN_ENGINE_DEV_PRESENCE_REDIS_PORT: z.coerce
494+
.number()
495+
.optional()
496+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
497+
RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME: z
498+
.string()
499+
.optional()
500+
.transform((v) => v ?? process.env.REDIS_USERNAME),
501+
RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD: z
502+
.string()
503+
.optional()
504+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
505+
RUN_ENGINE_DEV_PRESENCE_REDIS_TLS_DISABLED: z
506+
.string()
507+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
508+
412509
/** How long should the presence ttl last */
413510
DEV_PRESENCE_TTL_MS: z.coerce.number().int().default(30_000),
414511
DEV_PRESENCE_POLL_INTERVAL_MS: z.coerce.number().int().default(5_000),

apps/webapp/app/routes/api.v1.dev.presence.ts

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import { logger } from "~/services/logger.server";
77
import { createSSELoader } from "~/utils/sse";
88

99
const redis = new Redis({
10-
port: env.VALKEY_PORT ?? undefined,
11-
host: env.VALKEY_HOST ?? undefined,
12-
username: env.VALKEY_USERNAME ?? undefined,
13-
password: env.VALKEY_PASSWORD ?? undefined,
10+
port: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PORT ?? undefined,
11+
host: env.RUN_ENGINE_DEV_PRESENCE_REDIS_HOST ?? undefined,
12+
username: env.RUN_ENGINE_DEV_PRESENCE_REDIS_USERNAME ?? undefined,
13+
password: env.RUN_ENGINE_DEV_PRESENCE_REDIS_PASSWORD ?? undefined,
1414
enableAutoPipelining: true,
15-
...(env.VALKEY_TLS_DISABLED === "true" ? {} : { tls: {} }),
15+
...(env.RUN_ENGINE_DEV_PRESENCE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
1616
});
1717

1818
export const loader = createSSELoader({
@@ -44,11 +44,7 @@ export const loader = createSSELoader({
4444
//won't need multi
4545

4646
// Set initial presence with more context
47-
await redis.setex(
48-
presenceKey,
49-
env.DEV_PRESENCE_TTL_MS / 1000,
50-
Date.now().toString()
51-
);
47+
await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, Date.now().toString());
5248

5349
// Publish presence update
5450
await redis.publish(
@@ -63,11 +59,7 @@ export const loader = createSSELoader({
6359
send({ event: "start", data: `Started ${id}` });
6460
},
6561
iterator: async ({ send, date }) => {
66-
await redis.setex(
67-
presenceKey,
68-
env.DEV_PRESENCE_TTL_MS / 1000,
69-
date.toISOString()
70-
);
62+
await redis.setex(presenceKey, env.DEV_PRESENCE_TTL_MS / 1000, date.toISOString());
7163

7264
send({ event: "time", data: new Date().toISOString() });
7365
},

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ export type { RunEngine };
1313
function createRunEngine() {
1414
const engine = new RunEngine({
1515
prisma,
16-
redis: {
17-
keyPrefix: "engine:",
18-
port: env.VALKEY_PORT ?? undefined,
19-
host: env.VALKEY_HOST ?? undefined,
20-
username: env.VALKEY_USERNAME ?? undefined,
21-
password: env.VALKEY_PASSWORD ?? undefined,
22-
enableAutoPipelining: true,
23-
...(env.VALKEY_TLS_DISABLED === "true" ? {} : { tls: {} }),
24-
},
2516
worker: {
2617
workers: env.RUN_ENGINE_WORKER_COUNT,
2718
tasksPerWorker: env.RUN_ENGINE_TASKS_PER_WORKER,
2819
pollIntervalMs: env.RUN_ENGINE_WORKER_POLL_INTERVAL,
20+
redis: {
21+
keyPrefix: "engine:",
22+
port: env.RUN_ENGINE_WORKER_REDIS_PORT ?? undefined,
23+
host: env.RUN_ENGINE_WORKER_REDIS_HOST ?? undefined,
24+
username: env.RUN_ENGINE_WORKER_REDIS_USERNAME ?? undefined,
25+
password: env.RUN_ENGINE_WORKER_REDIS_PASSWORD ?? undefined,
26+
enableAutoPipelining: true,
27+
...(env.RUN_ENGINE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
28+
},
2929
},
3030
machines: {
3131
defaultMachine,
@@ -34,6 +34,26 @@ function createRunEngine() {
3434
},
3535
queue: {
3636
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
37+
redis: {
38+
keyPrefix: "engine:",
39+
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
40+
host: env.RUN_ENGINE_RUN_QUEUE_REDIS_HOST ?? undefined,
41+
username: env.RUN_ENGINE_RUN_QUEUE_REDIS_USERNAME ?? undefined,
42+
password: env.RUN_ENGINE_RUN_QUEUE_REDIS_PASSWORD ?? undefined,
43+
enableAutoPipelining: true,
44+
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
45+
},
46+
},
47+
runLock: {
48+
redis: {
49+
keyPrefix: "engine:",
50+
port: env.RUN_ENGINE_RUN_LOCK_REDIS_PORT ?? undefined,
51+
host: env.RUN_ENGINE_RUN_LOCK_REDIS_HOST ?? undefined,
52+
username: env.RUN_ENGINE_RUN_LOCK_REDIS_USERNAME ?? undefined,
53+
password: env.RUN_ENGINE_RUN_LOCK_REDIS_PASSWORD ?? undefined,
54+
enableAutoPipelining: true,
55+
...(env.RUN_ENGINE_RUN_LOCK_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
56+
},
3757
},
3858
tracer,
3959
heartbeatTimeoutsMs: {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ export class RunEngine {
136136
constructor(private readonly options: RunEngineOptions) {
137137
this.prisma = options.prisma;
138138
this.runLockRedis = new Redis({
139-
...options.redis,
140-
keyPrefix: `${options.redis.keyPrefix}runlock:`,
139+
...options.runLock.redis,
140+
keyPrefix: `${options.runLock.redis.keyPrefix}runlock:`,
141141
});
142142
this.runLock = new RunLocker({ redis: this.runLockRedis });
143143

@@ -147,14 +147,17 @@ export class RunEngine {
147147
queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 36 }),
148148
envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 12 }),
149149
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
150-
logger: new Logger("RunQueue", "warn"),
151-
redis: { ...options.redis, keyPrefix: `${options.redis.keyPrefix}runqueue:` },
150+
logger: new Logger("RunQueue", "debug"),
151+
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
152152
retryOptions: options.queue?.retryOptions,
153153
});
154154

155155
this.worker = new Worker({
156156
name: "worker",
157-
redisOptions: { ...options.redis, keyPrefix: `${options.redis.keyPrefix}worker:` },
157+
redisOptions: {
158+
...options.worker.redis,
159+
keyPrefix: `${options.worker.redis.keyPrefix}worker:`,
160+
},
158161
catalog: workerCatalog,
159162
concurrency: options.worker,
160163
pollIntervalMs: options.worker.pollIntervalMs,
@@ -1639,8 +1642,6 @@ export class RunEngine {
16391642
}
16401643
}
16411644

1642-
await this.#finalizeRun(run);
1643-
16441645
return executionResultFromSnapshot(newSnapshot);
16451646
});
16461647
});
@@ -2336,7 +2337,7 @@ export class RunEngine {
23362337
runnerId,
23372338
});
23382339

2339-
await this.worker.ack(`heartbeatSnapshot.${snapshotId}`);
2340+
await this.worker.ack(`heartbeatSnapshot.${runId}`);
23402341
return executionResultFromSnapshot(latestSnapshot);
23412342
}
23422343

@@ -3629,7 +3630,7 @@ export class RunEngine {
36293630
}
36303631

36313632
await this.worker.enqueue({
3632-
id: `heartbeatSnapshot.${snapshotId}`,
3633+
id: `heartbeatSnapshot.${runId}`,
36333634
job: "heartbeatSnapshot",
36343635
payload: { snapshotId, runId },
36353636
availableAt: new Date(Date.now() + intervalMs),
@@ -3658,7 +3659,7 @@ export class RunEngine {
36583659
}
36593660
);
36603661

3661-
await this.worker.ack(`heartbeatSnapshot.${snapshotId}`);
3662+
await this.worker.ack(`heartbeatSnapshot.${runId}`);
36623663
return;
36633664
}
36643665

@@ -3823,6 +3824,9 @@ export class RunEngine {
38233824
if (batchId) {
38243825
await this.tryCompleteBatch({ batchId });
38253826
}
3827+
3828+
//cancel the heartbeats
3829+
await this.worker.ack(`heartbeatSnapshot.${id}`);
38263830
}
38273831

38283832
/**

internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,24 @@ describe("RunEngine batchTrigger", () => {
1313
containerTest(
1414
"Batch trigger shares a batch",
1515
{ timeout: 15_000 },
16-
async ({ prisma, redisContainer }) => {
16+
async ({ prisma, redisOptions }) => {
1717
//create environment
1818
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
1919

2020
const engine = new RunEngine({
2121
prisma,
22-
redis: {
23-
host: redisContainer.getHost(),
24-
port: redisContainer.getPort(),
25-
password: redisContainer.getPassword(),
26-
enableAutoPipelining: true,
27-
},
2822
worker: {
23+
redis: redisOptions,
2924
workers: 1,
3025
tasksPerWorker: 10,
3126
pollIntervalMs: 100,
3227
},
28+
queue: {
29+
redis: redisOptions,
30+
},
31+
runLock: {
32+
redis: redisOptions,
33+
},
3334
machines: {
3435
defaultMachine: "small-1x",
3536
machines: {

internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,18 @@ describe("RunEngine batchTriggerAndWait", () => {
2020

2121
const engine = new RunEngine({
2222
prisma,
23-
redis: {
24-
...redisOptions,
25-
},
2623
worker: {
24+
redis: redisOptions,
2725
workers: 1,
2826
tasksPerWorker: 10,
2927
pollIntervalMs: 20,
3028
},
29+
queue: {
30+
redis: redisOptions,
31+
},
32+
runLock: {
33+
redis: redisOptions,
34+
},
3135
machines: {
3236
defaultMachine: "small-1x",
3337
machines: {

0 commit comments

Comments
 (0)