Skip to content

Commit 8c90908

Browse files
committed
Fix CodeRabbit suggestions
1 parent 4564540 commit 8c90908

File tree

9 files changed

+33
-70
lines changed

9 files changed

+33
-70
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ const EnvironmentSchema = z.object({
472472
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
473473
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
474474
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
475+
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
475476

476477
RUN_ENGINE_WORKER_REDIS_HOST: z
477478
.string()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ function createRunEngine() {
6161
},
6262
shardCount: env.RUN_ENGINE_RUN_QUEUE_SHARD_COUNT,
6363
processWorkerQueueDebounceMs: env.RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS,
64+
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
6465
},
6566
runLock: {
6667
redis: {

apps/webapp/app/v3/tracer.server.ts

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -344,21 +344,15 @@ function configurePrismaMetrics(meter: Meter) {
344344
return { total, busy, free };
345345
}
346346

347-
// Register callbacks (one scrape == one DB call) --------------------------
348-
totalGauge.addCallback(async (res) => {
349-
const { total } = await readPoolCounters();
350-
res.observe(total);
351-
});
352-
353-
busyGauge.addCallback(async (res) => {
354-
const { busy } = await readPoolCounters();
355-
res.observe(busy);
356-
});
357-
358-
freeGauge.addCallback(async (res) => {
359-
const { free } = await readPoolCounters();
360-
res.observe(free);
361-
});
347+
meter.addBatchObservableCallback(
348+
async (res) => {
349+
const { total, busy, free } = await readPoolCounters();
350+
res.observe(totalGauge, total);
351+
res.observe(busyGauge, busy);
352+
res.observe(freeGauge, free);
353+
},
354+
[totalGauge, busyGauge, freeGauge]
355+
);
362356
}
363357

364358
const SemanticEnvResources = {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export class RunEngine {
125125
shardCount: options.queue?.shardCount,
126126
masterQueueConsumersDisabled: options.worker.disabled,
127127
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
128+
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
128129
meter: options.meter,
129130
});
130131

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
7979
private batchSize: number;
8080
private maxRetries: number;
8181
private backoff: NonNullable<Required<ReleaseConcurrencyQueueRetryOptions["backoff"]>>;
82-
private _lastReleasingsLength: number = 0;
83-
private _lastMasterQueueLength: number = 0;
8482

8583
constructor(private readonly options: ReleaseConcurrencyQueueOptions<T>) {
8684
this.redis = createRedisClient(options.redis);
@@ -129,7 +127,6 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
129127

130128
if (!options.disableConsumers) {
131129
this.#startConsumers();
132-
this.#startMetricsProducer();
133130
this.#startReleasingsSweeper();
134131
}
135132
}
@@ -140,11 +137,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
140137
}
141138

142139
async #updateReleasingsLength(observableResult: ObservableResult<Attributes>) {
143-
observableResult.observe(this._lastReleasingsLength);
140+
const releasingsLength = await this.redis.zcard(this.#releasingsKey());
141+
observableResult.observe(releasingsLength);
144142
}
145143

146144
async #updateMasterQueueLength(observableResult: ObservableResult<Attributes>) {
147-
observableResult.observe(this._lastMasterQueueLength);
145+
const masterQueueLength = await this.redis.zcard(this.masterQueuesKey);
146+
observableResult.observe(masterQueueLength);
148147
}
149148

150149
/**
@@ -532,36 +531,6 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
532531
}
533532
}
534533

535-
async #startMetricsProducer() {
536-
try {
537-
// Produce metrics every 60 seconds, using a tracer span
538-
for await (const _ of setInterval(60_000)) {
539-
const metrics = await this.getQueueMetrics();
540-
this.logger.info("Queue metrics:", { metrics });
541-
542-
// Update cached values for OpenTelemetry observable gauges
543-
this._lastReleasingsLength = await this.redis.zcard(this.#releasingsKey());
544-
this._lastMasterQueueLength = await this.redis.zcard(this.masterQueuesKey);
545-
546-
await startSpan(
547-
this.options.tracer,
548-
"ReleaseConcurrencyTokenBucketQueue.metrics",
549-
async (span) => {},
550-
{
551-
attributes: {
552-
...flattenAttributes(metrics, "queues"),
553-
releasingsLength: this._lastReleasingsLength,
554-
masterQueueLength: this._lastMasterQueueLength,
555-
forceRecording: true,
556-
},
557-
}
558-
);
559-
}
560-
} catch (error) {
561-
this.logger.error("Error starting metrics producer:", { error });
562-
}
563-
}
564-
565534
#calculateBackoffScore(item: QueueItemMetadata): string {
566535
const delay = Math.min(
567536
this.backoff.maxDelay,

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ export class DequeueSystem {
7878
//lock the run so nothing else can modify it
7979
try {
8080
const dequeuedRun = await this.$.runLock.lock(
81-
"dequeueFromMasterQueue",
81+
"dequeueFromWorkerQueue",
8282
[runId],
8383
5000,
8484
async (signal) => {
@@ -135,7 +135,7 @@ export class DequeueSystem {
135135
tx: prisma,
136136
});
137137
this.$.logger.error(
138-
`RunEngine.dequeueFromMasterQueue(): Run is not in a valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}`
138+
`RunEngine.dequeueFromWorkerQueue(): Run is not in a valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}`
139139
);
140140

141141
return;
@@ -192,7 +192,7 @@ export class DequeueSystem {
192192
switch (result.code) {
193193
case "NO_RUN": {
194194
//this should not happen, the run is unrecoverable so we'll ack it
195-
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): No run found", {
195+
this.$.logger.error("RunEngine.dequeueFromWorkerQueue(): No run found", {
196196
runId,
197197
latestSnapshot: snapshot.id,
198198
});
@@ -202,7 +202,7 @@ export class DequeueSystem {
202202
case "RUN_ENVIRONMENT_ARCHIVED": {
203203
//this happens if the preview branch was archived
204204
this.$.logger.warn(
205-
"RunEngine.dequeueFromMasterQueue(): Run environment archived",
205+
"RunEngine.dequeueFromWorkerQueue(): Run environment archived",
206206
{
207207
runId,
208208
latestSnapshot: snapshot.id,
@@ -216,7 +216,7 @@ export class DequeueSystem {
216216
case "TASK_NEVER_REGISTERED":
217217
case "QUEUE_NOT_FOUND":
218218
case "TASK_NOT_IN_LATEST": {
219-
this.$.logger.warn(`RunEngine.dequeueFromMasterQueue(): ${result.code}`, {
219+
this.$.logger.warn(`RunEngine.dequeueFromWorkerQueue(): ${result.code}`, {
220220
runId,
221221
latestSnapshot: snapshot.id,
222222
result,
@@ -234,7 +234,7 @@ export class DequeueSystem {
234234
}
235235
case "BACKGROUND_WORKER_MISMATCH": {
236236
this.$.logger.warn(
237-
"RunEngine.dequeueFromMasterQueue(): Background worker mismatch",
237+
"RunEngine.dequeueFromWorkerQueue(): Background worker mismatch",
238238
{
239239
runId,
240240
latestSnapshot: snapshot.id,
@@ -256,7 +256,7 @@ export class DequeueSystem {
256256
//check for a valid deployment if it's not a development environment
257257
if (result.run.runtimeEnvironment.type !== "DEVELOPMENT") {
258258
if (!result.deployment || !result.deployment.imageReference) {
259-
this.$.logger.warn("RunEngine.dequeueFromMasterQueue(): No deployment found", {
259+
this.$.logger.warn("RunEngine.dequeueFromWorkerQueue(): No deployment found", {
260260
runId,
261261
latestSnapshot: snapshot.id,
262262
result,
@@ -289,7 +289,7 @@ export class DequeueSystem {
289289
const retryConfig = result.task.retryConfig;
290290

291291
this.$.logger.debug(
292-
"RunEngine.dequeueFromMasterQueue(): maxAttempts not set, using task's retry config",
292+
"RunEngine.dequeueFromWorkerQueue(): maxAttempts not set, using task's retry config",
293293
{
294294
runId,
295295
task: result.task.id,
@@ -300,7 +300,7 @@ export class DequeueSystem {
300300
const parsedConfig = RetryOptions.nullable().safeParse(retryConfig);
301301

302302
if (!parsedConfig.success) {
303-
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): Invalid retry config", {
303+
this.$.logger.error("RunEngine.dequeueFromWorkerQueue(): Invalid retry config", {
304304
runId,
305305
task: result.task.id,
306306
rawRetryConfig: retryConfig,
@@ -373,7 +373,7 @@ export class DequeueSystem {
373373
});
374374

375375
if (!lockedTaskRun) {
376-
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): Failed to lock task run", {
376+
this.$.logger.error("RunEngine.dequeueFromWorkerQueue(): Failed to lock task run", {
377377
taskRun: result.run.id,
378378
taskIdentifier: result.run.taskIdentifier,
379379
deployment: result.deployment?.id,
@@ -464,7 +464,7 @@ export class DequeueSystem {
464464
return dequeuedRun;
465465
} catch (error) {
466466
this.$.logger.error(
467-
"RunEngine.dequeueFromMasterQueue(): Thrown error while preparing run to be run",
467+
"RunEngine.dequeueFromWorkerQueue(): Thrown error while preparing run to be run",
468468
{
469469
error,
470470
runId,
@@ -481,7 +481,7 @@ export class DequeueSystem {
481481
if (!run) {
482482
//this isn't ideal because we're not creating a snapshot… but we can't do much else
483483
this.$.logger.error(
484-
"RunEngine.dequeueFromMasterQueue(): Thrown error, then run not found. Nacking.",
484+
"RunEngine.dequeueFromWorkerQueue(): Thrown error, then run not found. Nacking.",
485485
{
486486
runId,
487487
orgId,
@@ -507,7 +507,7 @@ export class DequeueSystem {
507507
});
508508

509509
if (!gotRequeued) {
510-
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): Failed to requeue run", {
510+
this.$.logger.error("RunEngine.dequeueFromWorkerQueue(): Failed to requeue run", {
511511
runId,
512512
orgId,
513513
});
@@ -570,7 +570,7 @@ export class DequeueSystem {
570570
},
571571
});
572572

573-
this.$.logger.debug("RunEngine.dequeueFromMasterQueue(): Pending version", {
573+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
574574
runId,
575575
run,
576576
});

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export type RunEngineOptions = {
3939
FairQueueSelectionStrategyOptions,
4040
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
4141
>;
42+
dequeueBlockingTimeoutSeconds?: number;
4243
};
4344
runLock: {
4445
redis: RedisOptions;

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ export type RunQueueOptions = {
7676
disabled?: boolean;
7777
};
7878
meter?: Meter;
79+
dequeueBlockingTimeoutSeconds?: number;
7980
};
8081

8182
type DequeuedMessage = {
@@ -510,9 +511,6 @@ export class RunQueue {
510511
return this.#trace(
511512
"dequeueMessageFromWorkerQueue",
512513
async (span) => {
513-
// TODO: this is where we read from the worker queue, which is a list of message IDs
514-
// We'll perform a BLPOP on the worker list, and then read the message from the message list
515-
// We'll then return the message
516514
const dequeuedMessage = await this.#callDequeueMessageFromWorkerQueue({
517515
workerQueue,
518516
});
@@ -1376,8 +1374,7 @@ export class RunQueue {
13761374
workerQueueKey,
13771375
//args
13781376
this.options.redis.keyPrefix ?? "",
1379-
// TODO: make this configurable
1380-
String(10)
1377+
String(this.options.dequeueBlockingTimeoutSeconds ?? 10)
13811378
);
13821379

13831380
this.abortController.signal.removeEventListener("abort", cleanup);

src/run-queue/tests/nack.test.ts

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)