From 6f14c3806602579c55b6cfdaeb71ad9392497f5d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 27 Aug 2025 16:51:57 +0100 Subject: [PATCH 1/4] more eager dequeuing, queue cooloff periods, return workerQueueLength when dequeueing --- .../run-engine/src/engine/index.ts | 8 + .../src/engine/systems/dequeueSystem.ts | 11 +- .../run-engine/src/run-queue/index.ts | 262 ++++++++++++++---- .../dequeueMessageFromWorkerQueue.test.ts | 87 ++++++ packages/core/src/v3/schemas/runEngine.ts | 1 + 5 files changed, 314 insertions(+), 55 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 6ee122c418..3e13b56d16 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -566,6 +566,8 @@ export class RunEngine { runnerId, tx, skipObserving, + blockingPop, + blockingPopTimeoutSeconds, }: { consumerId: string; workerQueue: string; @@ -574,6 +576,8 @@ export class RunEngine { runnerId?: string; tx?: PrismaClientOrTransaction; skipObserving?: boolean; + blockingPop?: boolean; + blockingPopTimeoutSeconds?: number; }): Promise { if (!skipObserving) { // We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues @@ -587,6 +591,8 @@ export class RunEngine { workerId, runnerId, tx, + blockingPop, + blockingPopTimeoutSeconds, }); if (!dequeuedMessage) { @@ -619,6 +625,8 @@ export class RunEngine { runnerId, tx, skipObserving: true, + blockingPop: true, + blockingPopTimeoutSeconds: 10, }); } diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 3935802a5c..912dfff335 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -46,6 +46,8 @@ export class DequeueSystem { workerId, runnerId, tx, + blockingPop, + blockingPopTimeoutSeconds, }: { consumerId: string; workerQueue: string; @@ -53,6 +55,8 @@ export class DequeueSystem { workerId?: string; runnerId?: string; tx?: PrismaClientOrTransaction; + blockingPop?: boolean; + blockingPopTimeoutSeconds?: number; }): Promise { const prisma = tx ?? this.$.prisma; @@ -63,7 +67,11 @@ export class DequeueSystem { //gets multiple runs from the queue const message = await this.$.runQueue.dequeueMessageFromWorkerQueue( consumerId, - workerQueue + workerQueue, + { + blockingPop, + blockingPopTimeoutSeconds, + } ); if (!message) { return; @@ -452,6 +460,7 @@ export class DequeueSystem { return { version: "1" as const, dequeuedAt: new Date(), + workerQueueLength: message.workerQueueLength, snapshot: { id: newSnapshot.id, friendlyId: newSnapshot.friendlyId, diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 1afd7b41b1..fa6603a503 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -35,7 +35,6 @@ import { attributesFromAuthenticatedEnv, MinimalAuthenticatedEnvironment, } from "../shared/index.js"; -import { MessageNotFoundError } from "./errors.js"; import { InputPayload, OutputPayload, @@ -71,6 +70,9 @@ export type RunQueueOptions = { shardCount?: number; masterQueueConsumersDisabled?: boolean; masterQueueConsumersIntervalMs?: number; + masterQueueCooloffPeriodMs?: number; + masterQueueCooloffCountThreshold?: number; + masterQueueConsumerDequeueCount?: number; processWorkerQueueDebounceMs?: number; workerOptions?: { pollIntervalMs?: number; @@ -98,6 +100,7 @@ type DequeuedMessage = { messageId: string; messageScore: string; message: OutputPayload; + workerQueueLength?: number; }; type MarkedRun = { @@ -142,6 +145,16 @@ const workerCatalog = { }, }; +type QueueCooloffState = + | { + _tag: "normal"; + consecutiveFailures: number; + } + | { + _tag: "cooloff"; + cooloffExpiresAt: number; + }; + /** * RunQueue – the queue that's used to process runs */ @@ -158,6 +171,7 @@ export class RunQueue { private worker: Worker; private _observableWorkerQueues: Set = new Set(); private _meter: Meter; + private _queueCooloffStates: Map = new Map(); constructor(public readonly options: RunQueueOptions) { this.shardCount = options.shardCount ?? 2; @@ -602,13 +616,20 @@ export class RunQueue { */ public async dequeueMessageFromWorkerQueue( consumerId: string, - workerQueue: string + workerQueue: string, + options?: { + blockingPop?: boolean; + blockingPopTimeoutSeconds?: number; + } ): Promise { return this.#trace( "dequeueMessageFromWorkerQueue", async (span) => { const dequeuedMessage = await this.#callDequeueMessageFromWorkerQueue({ workerQueue, + blockingPop: options?.blockingPop ?? true, + blockingPopTimeoutSeconds: + options?.blockingPopTimeoutSeconds ?? this.options.dequeueBlockingTimeoutSeconds ?? 10, }); if (!dequeuedMessage) { @@ -1078,7 +1099,13 @@ export class RunQueue { const now = performance.now(); - const [error, results] = await tryCatch(this.#processMasterQueueShard(shard, consumerId)); + const [error, results] = await tryCatch( + this.#processMasterQueueShard( + shard, + consumerId, + this.options.masterQueueConsumerDequeueCount ?? 10 + ) + ); if (error) { this.logger.error(`Failed to process master queue shard ${shard}`, { @@ -1190,7 +1217,9 @@ export class RunQueue { consumerId ); + span.setAttribute("master_queue_key", masterQueueKey); span.setAttribute("environment_count", envQueues.length); + span.setAttribute("max_count", maxCount); if (envQueues.length === 0) { return []; @@ -1198,11 +1227,27 @@ export class RunQueue { let attemptedEnvs = 0; let attemptedQueues = 0; + let messagesDequeued = 0; + let cooloffPeriodCount = 0; for (const env of envQueues) { attemptedEnvs++; for (const queue of env.queues) { + const cooloffState = this._queueCooloffStates.get(queue) ?? { + _tag: "normal", + consecutiveFailures: 0, + }; + + if (cooloffState._tag === "cooloff") { + if (cooloffState.cooloffExpiresAt > Date.now()) { + cooloffPeriodCount++; + continue; + } else { + this._queueCooloffStates.delete(queue); + } + } + attemptedQueues++; // Attempt to dequeue from this queue @@ -1217,9 +1262,10 @@ export class RunQueue { if (error) { this.logger.error( - `[processMasterQueueShard][${this.name}] Failed to dequeue from queue ${queue}`, + `[processMasterQueueShard][${this.name}] Failed to dequeue from queue`, { error, + queue, } ); @@ -1227,12 +1273,55 @@ export class RunQueue { } if (messages.length === 0) { + if (cooloffState._tag === "normal") { + const cooloffCountThreshold = Math.max( + 1, + this.options.masterQueueCooloffCountThreshold ?? 10 + ); // Don't let the cooloff count be less than 1 + + this.logger.debug("Evaluating cooloff state", { + queue, + cooloffState, + cooloffCountThreshold, + consecutiveFailures: cooloffState.consecutiveFailures, + }); + + if (cooloffState.consecutiveFailures >= cooloffCountThreshold) { + this.logger.debug("Setting cooloff state", { + queue, + cooloffState, + cooloffCountThreshold, + consecutiveFailures: cooloffState.consecutiveFailures, + cooloffExpiresAt: new Date( + Date.now() + (this.options.masterQueueCooloffPeriodMs ?? 10_000) + ), + }); + + this._queueCooloffStates.set(queue, { + _tag: "cooloff", + cooloffExpiresAt: + Date.now() + (this.options.masterQueueCooloffPeriodMs ?? 10_000), + }); + } else { + this._queueCooloffStates.set(queue, { + _tag: "normal", + consecutiveFailures: cooloffState.consecutiveFailures + 1, + }); + } + } + continue; } + messagesDequeued += messages.length; + await this.#enqueueMessagesToWorkerQueues(messages); } } + + span.setAttribute("attempted_envs", attemptedEnvs); + span.setAttribute("attempted_queues", attemptedQueues); + span.setAttribute("messages_dequeued", messagesDequeued); }, { kind: SpanKind.CONSUMER, @@ -1259,7 +1348,15 @@ export class RunQueue { maxCount: 10, }); + if (messages.length === 0) { + return; + } + await this.#enqueueMessagesToWorkerQueues(messages); + + if (messages.length === 10) { + await this.#processQueueForWorkerQueue(queueKey, environmentId); + } } async #enqueueMessagesToWorkerQueues(messages: DequeuedMessage[]) { @@ -1444,80 +1541,112 @@ export class RunQueue { async #callDequeueMessageFromWorkerQueue({ workerQueue, + blockingPop, + blockingPopTimeoutSeconds, }: { workerQueue: string; + blockingPop: boolean; + blockingPopTimeoutSeconds: number; }): Promise { const workerQueueKey = this.keys.workerQueueKey(workerQueue); - this.logger.debug("#callDequeueMessageFromWorkerQueue", { - workerQueue, - workerQueueKey, - }); + if (blockingPop) { + this.logger.debug("#callDequeueMessageFromWorkerQueue blocking pop", { + workerQueue, + workerQueueKey, + blockingPopTimeoutSeconds, + }); - if (this.abortController.signal.aborted) { - return; - } + if (this.abortController.signal.aborted) { + return; + } - const blockingClient = this.#createBlockingDequeueClient(); + const blockingClient = this.#createBlockingDequeueClient(); - async function cleanup() { - await blockingClient.quit(); - } + async function cleanup() { + await blockingClient.quit(); + } - this.abortController.signal.addEventListener("abort", cleanup); + this.abortController.signal.addEventListener("abort", cleanup); - const result = await blockingClient.blpop( - workerQueueKey, - this.options.dequeueBlockingTimeoutSeconds ?? 10 - ); + const result = await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds); - this.abortController.signal.removeEventListener("abort", cleanup); + this.abortController.signal.removeEventListener("abort", cleanup); - cleanup().then(() => { - this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { - service: this.name, + cleanup().then(() => { + this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { + service: this.name, + }); }); - }); - - if (!result) { - return; - } - this.logger.debug("dequeueMessageFromWorkerQueue raw result", { - result, - service: this.name, - }); + if (!result) { + return; + } - if (result.length !== 2) { - this.logger.error("Invalid dequeue message from worker queue result", { + this.logger.debug("dequeueMessageFromWorkerQueue raw result", { result, service: this.name, }); - return; - } - // Make sure they are both strings - if (typeof result[0] !== "string" || typeof result[1] !== "string") { - this.logger.error("Invalid dequeue message from worker queue result", { - result, - service: this.name, + if (result.length !== 2) { + this.logger.error("Invalid dequeue message from worker queue result", { + result, + service: this.name, + }); + return; + } + + // Make sure they are both strings + if (typeof result[0] !== "string" || typeof result[1] !== "string") { + this.logger.error("Invalid dequeue message from worker queue result", { + result, + service: this.name, + }); + return; + } + + const [, messageKey] = result; + + const workerQueueLength = await this.redis.llen(workerQueueKey); + const message = await this.#dequeueMessageFromKey(messageKey); + + if (!message) { + return; + } + + return { + messageId: message.runId, + messageScore: String(message.timestamp), + message, + workerQueueLength, + }; + } else { + this.logger.debug("#callDequeueMessageFromWorkerQueue non-blocking pop", { + workerQueue, + workerQueueKey, }); - return; - } - const [, messageKey] = result; + const result = await this.redis.dequeueMessageFromWorkerQueueNonBlocking(workerQueueKey); + + if (!result) { + return; + } + + const [messageKey, workerQueueLength] = result; - const message = await this.#dequeueMessageFromKey(messageKey); + const message = await this.#dequeueMessageFromKey(messageKey); - if (!message) { - return; - } + if (!message) { + return; + } - return { - messageId: message.runId, - messageScore: String(message.timestamp), - message, - }; + return { + messageId: message.runId, + messageScore: String(message.timestamp), + message, + workerQueueLength: Number(workerQueueLength), + }; + } } async #callAcknowledgeMessage({ @@ -2129,6 +2258,26 @@ return results `, }); + this.redis.defineCommand("dequeueMessageFromWorkerQueueNonBlocking", { + numberOfKeys: 1, + lua: ` +local workerQueueKey = KEYS[1] + +-- lpop the first message from the worker queue +local messageId = redis.call('LPOP', workerQueueKey) + +-- if there is no messageId, return nil +if not messageId then + return nil +end + +-- get the length of the worker queue +local queueLength = tonumber(redis.call('LLEN', workerQueueKey) or '0') + +return {messageId, queueLength} -- Return message details + `, + }); + this.redis.defineCommand("dequeueMessageFromKey", { numberOfKeys: 1, lua: ` @@ -2442,6 +2591,11 @@ declare module "@internal/redis" { callback?: Callback ): Result; + dequeueMessageFromWorkerQueueNonBlocking( + workerQueueKey: string, + callback?: Callback<[string, string] | undefined> + ): Result<[string, string] | undefined, Context>; + dequeueMessageFromKey( // keys messageKey: string, diff --git a/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts b/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts index 08cea00cc1..cc7485483a 100644 --- a/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromWorkerQueue.test.ts @@ -468,4 +468,91 @@ describe("RunQueue.dequeueMessageFromWorkerQueue", () => { } } ); + + redisTest( + "should put the queue in a cooloff state when it fails to dequeue after a certain number of consecutive failures", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + logLevel: "debug", + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + masterQueueCooloffPeriodMs: 10_000, + masterQueueCooloffCountThreshold: 1, + masterQueueConsumersIntervalMs: 500, + masterQueueConsumerDequeueCount: 1, + dequeueBlockingTimeoutSeconds: 1, + }); + + try { + // Set queue concurrency limit to 1 + await queue.updateQueueConcurrencyLimits(authenticatedEnvDev, messageDev.queue, 1); + + // Enqueue two messages + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: "main", + }); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: { ...messageDev, runId: "r4322" }, + workerQueue: "main", + }); + + await setTimeout(1000); + + // Dequeue first message + const dequeued1 = await queue.dequeueMessageFromWorkerQueue("test_12345", "main"); + expect(dequeued1).toBeDefined(); + assertNonNullable(dequeued1); + + // Try to dequeue second message + const dequeued2 = await queue.dequeueMessageFromWorkerQueue("test_12345", "main"); + expect(dequeued2).toBeUndefined(); + + const queueConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrency).toBe(1); + + // Now lets ack the first message (and skip dequeue processing so we can test the cooloff state) + await queue.acknowledgeMessage(dequeued1.message.orgId, dequeued1.messageId, { + skipDequeueProcessing: true, + }); + + await setTimeout(1000); + + // Try and dequeue the second message again (should still be in the cooloff state) + const dequeued3 = await queue.dequeueMessageFromWorkerQueue("test_12345", "main"); + expect(dequeued3).toBeUndefined(); + + // Wait for the cooloff period to end + await setTimeout(10_000); + + // Try and dequeue the second message again (should be dequeued now) + const dequeued4 = await queue.dequeueMessageFromWorkerQueue("test_12345", "main"); + expect(dequeued4).toBeDefined(); + assertNonNullable(dequeued4); + expect(dequeued4.messageId).toEqual("r4322"); + expect(dequeued4.message.orgId).toEqual(messageDev.orgId); + expect(dequeued4.message.version).toEqual("2"); + } finally { + await queue.quit(); + } + } + ); }); diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index bf84edde3b..fe59e712ce 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -235,6 +235,7 @@ export const DequeuedMessage = z.object({ version: z.literal("1"), snapshot: ExecutionSnapshot, dequeuedAt: z.coerce.date(), + workerQueueLength: z.number().optional(), image: z.string().optional(), checkpoint: DequeueMessageCheckpoint.optional(), completedWaitpoints: z.array(CompletedWaitpoint), From 61a71982c75543c506cf0859469bb1004eff4be2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 27 Aug 2025 17:53:20 +0100 Subject: [PATCH 2/4] Cache worker group authentication and remove old self-hosted worker code (only managed is currently supported) --- .../engine.v1.worker-actions.dequeue.ts | 7 +- ...s.$snapshotFriendlyId.attempts.complete.ts | 2 + ...hots.$snapshotFriendlyId.attempts.start.ts | 2 + ....snapshots.$snapshotFriendlyId.continue.ts | 2 + ...snapshots.$snapshotFriendlyId.heartbeat.ts | 2 + ...d.snapshots.$snapshotFriendlyId.suspend.ts | 2 + .../routeBuilders/apiBuilder.server.ts | 9 + .../worker/workerGroupTokenService.server.ts | 553 ++++++------------ apps/webapp/package.json | 1 + pnpm-lock.yaml | 3 + 10 files changed, 192 insertions(+), 391 deletions(-) diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts b/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts index bdc6456236..2b41cd23c2 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts @@ -9,7 +9,10 @@ export const action = createActionWorkerApiRoute( { body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility }, - async ({ authenticatedWorker }): Promise> => { - return json(await authenticatedWorker.dequeue()); + async ({ + authenticatedWorker, + runnerId, + }): Promise> => { + return json(await authenticatedWorker.dequeue({ runnerId })); } ); diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts index 81f53280e1..0ea7c8f3c7 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts @@ -18,6 +18,7 @@ export const action = createActionWorkerApiRoute( authenticatedWorker, body, params, + runnerId, }): Promise> => { const { completion } = body; const { runFriendlyId, snapshotFriendlyId } = params; @@ -26,6 +27,7 @@ export const action = createActionWorkerApiRoute( runFriendlyId, snapshotFriendlyId, completion, + runnerId, }); return json({ result: completeResult }); diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts index f7a8e874ec..de0290f2bb 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts @@ -18,6 +18,7 @@ export const action = createActionWorkerApiRoute( authenticatedWorker, body, params, + runnerId, }): Promise> => { const { runFriendlyId, snapshotFriendlyId } = params; @@ -25,6 +26,7 @@ export const action = createActionWorkerApiRoute( runFriendlyId, snapshotFriendlyId, isWarmStart: body.isWarmStart, + runnerId, }); return json(runExecutionData); diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts index 5a436b6575..bdeb76ca8a 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts @@ -14,6 +14,7 @@ export const loader = createLoaderWorkerApiRoute( async ({ authenticatedWorker, params, + runnerId, }): Promise> => { const { runFriendlyId, snapshotFriendlyId } = params; @@ -23,6 +24,7 @@ export const loader = createLoaderWorkerApiRoute( const continuationResult = await authenticatedWorker.continueRunExecution({ runFriendlyId, snapshotFriendlyId, + runnerId, }); return json(continuationResult); diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts index 0942bcde8b..cd2f63bd1e 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts @@ -13,12 +13,14 @@ export const action = createActionWorkerApiRoute( async ({ authenticatedWorker, params, + runnerId, }): Promise> => { const { runFriendlyId, snapshotFriendlyId } = params; await authenticatedWorker.heartbeatRun({ runFriendlyId, snapshotFriendlyId, + runnerId, }); return json({ ok: true }); diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts index 2d745ed94a..ee1b30863f 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts @@ -19,6 +19,7 @@ export const action = createActionWorkerApiRoute( authenticatedWorker, params, body, + runnerId, }): Promise> => { const { runFriendlyId, snapshotFriendlyId } = params; @@ -39,6 +40,7 @@ export const action = createActionWorkerApiRoute( runFriendlyId, snapshotFriendlyId, checkpoint: body.checkpoint, + runnerId, }); return json({ ok: true }); diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index 9d06d3345c..25783f8610 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -22,6 +22,7 @@ import { WorkerGroupTokenService, } from "~/v3/services/worker/workerGroupTokenService.server"; import { API_VERSIONS, getApiVersion } from "~/api/versions"; +import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker"; type AnyZodSchema = z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; @@ -795,6 +796,7 @@ type WorkerLoaderHandlerFunction< headers: THeadersSchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion ? z.infer : undefined; + runnerId?: string; }) => Promise; export function createLoaderWorkerApiRoute< @@ -858,12 +860,15 @@ export function createLoaderWorkerApiRoute< parsedHeaders = headers.data; } + const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined; + const result = await handler({ params: parsedParams, searchParams: parsedSearchParams, authenticatedWorker: authenticationResult, request, headers: parsedHeaders, + runnerId, }); return result; } catch (error) { @@ -924,6 +929,7 @@ type WorkerActionHandlerFunction< body: TBodySchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion ? z.infer : undefined; + runnerId?: string; }) => Promise; export function createActionWorkerApiRoute< @@ -1021,6 +1027,8 @@ export function createActionWorkerApiRoute< parsedBody = parsed.data; } + const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined; + const result = await handler({ params: parsedParams, searchParams: parsedSearchParams, @@ -1028,6 +1036,7 @@ export function createActionWorkerApiRoute< request, body: parsedBody, headers: parsedHeaders, + runnerId, }); return result; } catch (error) { diff --git a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts index 910d599050..be60250344 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts @@ -1,35 +1,50 @@ -import { customAlphabet } from "nanoid"; -import { ServiceValidationError, WithRunEngine, WithRunEngineOptions } from "../baseService.server"; -import { createHash, timingSafeEqual } from "crypto"; -import { logger } from "~/services/logger.server"; +import { createCache, DefaultStatefulContext, MemoryStore, Namespace } from "@internal/cache"; +import { + CheckpointInput, + CompleteRunAttemptResult, + DequeuedMessage, + ExecutionResult, + MachinePreset, + StartRunAttemptResult, + TaskRunExecutionResult, +} from "@trigger.dev/core/v3"; +import { fromFriendlyId } from "@trigger.dev/core/v3/isomorphic"; +import { WORKER_HEADERS } from "@trigger.dev/core/v3/workers"; import { Prisma, RuntimeEnvironment, WorkerInstanceGroup, WorkerInstanceGroupType, } from "@trigger.dev/database"; +import { createHash, timingSafeEqual } from "crypto"; +import { customAlphabet } from "nanoid"; import { z } from "zod"; -import { WORKER_HEADERS } from "@trigger.dev/core/v3/workers"; -import { - TaskRunExecutionResult, - DequeuedMessage, - CompleteRunAttemptResult, - StartRunAttemptResult, - ExecutionResult, - MachinePreset, - MachineResources, - CheckpointInput, -} from "@trigger.dev/core/v3"; import { env } from "~/env.server"; -import { $transaction } from "~/db.server"; -import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server"; import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; -import { - CURRENT_UNMANAGED_DEPLOYMENT_LABEL, - fromFriendlyId, -} from "@trigger.dev/core/v3/isomorphic"; -import { machinePresetFromName } from "~/v3/machinePresets.server"; +import { logger } from "~/services/logger.server"; import { defaultMachine } from "~/services/platform.v3.server"; +import { singleton } from "~/utils/singleton"; +import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server"; +import { machinePresetFromName } from "~/v3/machinePresets.server"; +import { WithRunEngine, WithRunEngineOptions } from "../baseService.server"; + +const authenticatedWorkerInstanceCache = singleton( + "authenticatedWorkerInstanceCache", + createAuthenticatedWorkerInstanceCache +); + +function createAuthenticatedWorkerInstanceCache() { + return createCache({ + authenticatedWorkerInstance: new Namespace( + new DefaultStatefulContext(), + { + stores: [new MemoryStore({ persistentMap: new Map() })], + fresh: 60_000 * 10, // 10 minutes + stale: 60_000 * 11, // 11 minutes + } + ), + }); +} export class WorkerGroupTokenService extends WithRunEngine { private readonly tokenPrefix = "tr_wgt_"; @@ -148,321 +163,148 @@ export class WorkerGroupTokenService extends WithRunEngine { return; } - const workerGroup = await this.findWorkerGroup({ token }); + const managedWorkerSecret = request.headers.get(WORKER_HEADERS.MANAGED_SECRET); - if (!workerGroup) { - logger.warn("[WorkerGroupTokenService] Worker group not found", { token }); + if (!managedWorkerSecret) { + logger.error("[WorkerGroupTokenService] Managed secret not found in request", { + headers: this.sanitizeHeaders(request), + }); return; } - if (workerGroup.type === WorkerInstanceGroupType.MANAGED) { - const managedWorkerSecret = request.headers.get(WORKER_HEADERS.MANAGED_SECRET); - - if (!managedWorkerSecret) { - logger.error("[WorkerGroupTokenService] Managed secret not found in request", { - headers: this.sanitizeHeaders(request), - }); - return; - } - - const encoder = new TextEncoder(); - - const a = encoder.encode(managedWorkerSecret); - const b = encoder.encode(env.MANAGED_WORKER_SECRET); + const encoder = new TextEncoder(); - if (a.byteLength !== b.byteLength) { - logger.error("[WorkerGroupTokenService] Managed secret length mismatch", { - managedWorkerSecret, - headers: this.sanitizeHeaders(request), - }); - return; - } - - if (!timingSafeEqual(a, b)) { - logger.error("[WorkerGroupTokenService] Managed secret mismatch", { - managedWorkerSecret, - headers: this.sanitizeHeaders(request), - }); - return; - } - } + const a = encoder.encode(managedWorkerSecret); + const b = encoder.encode(env.MANAGED_WORKER_SECRET); - const workerInstance = await this.getOrCreateWorkerInstance({ - workerGroup, - instanceName, - deploymentId: request.headers.get(WORKER_HEADERS.DEPLOYMENT_ID) ?? undefined, - }); - - if (!workerInstance) { - logger.error("[WorkerGroupTokenService] Unable to get or create worker instance", { - workerGroup, - instanceName, + if (a.byteLength !== b.byteLength) { + logger.error("[WorkerGroupTokenService] Managed secret length mismatch", { + managedWorkerSecret, + headers: this.sanitizeHeaders(request), }); return; } - if (workerGroup.type === WorkerInstanceGroupType.MANAGED) { - return new AuthenticatedWorkerInstance({ - prisma: this._prisma, - engine: this._engine, - type: WorkerInstanceGroupType.MANAGED, - name: workerGroup.name, - workerGroupId: workerGroup.id, - workerInstanceId: workerInstance.id, - masterQueue: workerGroup.masterQueue, - environment: null, - runnerId: request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined, + if (!timingSafeEqual(a, b)) { + logger.error("[WorkerGroupTokenService] Managed secret mismatch", { + managedWorkerSecret, + headers: this.sanitizeHeaders(request), }); - } - - if (!workerInstance.environment) { - logger.error( - "[WorkerGroupTokenService] Unmanaged worker instance not linked to environment", - { workerGroup, workerInstance } - ); return; } - if (!workerInstance.deployment) { - logger.error("[WorkerGroupTokenService] Unmanaged worker instance not linked to deployment", { - workerGroup, - workerInstance, - }); - return; - } + const result = await authenticatedWorkerInstanceCache.authenticatedWorkerInstance.swr( + `worker-group-token-${token}`, + async () => { + const workerGroup = await this.findWorkerGroup({ token }); + + if (!workerGroup) { + logger.warn("[WorkerGroupTokenService] Worker group not found", { token }); + return; + } + + const workerInstance = await this.getOrCreateWorkerInstance({ + workerGroup, + instanceName, + }); - if (!workerInstance.deployment.workerId) { - logger.error( - "[WorkerGroupTokenService] Unmanaged worker instance deployment not linked to background worker", - { workerGroup, workerInstance } - ); + if (!workerInstance) { + logger.error("[WorkerGroupTokenService] Unable to get or create worker instance", { + workerGroup, + instanceName, + }); + return; + } + + return new AuthenticatedWorkerInstance({ + prisma: this._prisma, + engine: this._engine, + type: WorkerInstanceGroupType.MANAGED, + name: workerGroup.name, + workerGroupId: workerGroup.id, + workerInstanceId: workerInstance.id, + masterQueue: workerGroup.masterQueue, + }); + } + ); + + if (result.err) { + logger.error("[WorkerGroupTokenService] Failed to authenticate worker instance", { + error: result.err, + }); return; } - return new AuthenticatedWorkerInstance({ - prisma: this._prisma, - engine: this._engine, - type: WorkerInstanceGroupType.UNMANAGED, - name: workerGroup.name, - workerGroupId: workerGroup.id, - workerInstanceId: workerInstance.id, - masterQueue: workerGroup.masterQueue, - environmentId: workerInstance.environment.id, - deploymentId: workerInstance.deployment.id, - backgroundWorkerId: workerInstance.deployment.workerId, - environment: workerInstance.environment, - }); + return result.val; } private async getOrCreateWorkerInstance({ workerGroup, instanceName, - deploymentId, }: { workerGroup: WorkerInstanceGroup; instanceName: string; - deploymentId?: string; }) { - return await $transaction(this._prisma, async (tx) => { - const resourceIdentifier = deploymentId ? `${deploymentId}:${instanceName}` : instanceName; + const resourceIdentifier = instanceName; + + const workerInstance = await this._prisma.workerInstance.findFirst({ + where: { + workerGroupId: workerGroup.id, + resourceIdentifier, + }, + include: { + deployment: true, + environment: true, + }, + }); - const workerInstance = await tx.workerInstance.findFirst({ - where: { + if (workerInstance) { + return workerInstance; + } + + try { + const newWorkerInstance = await this._prisma.workerInstance.create({ + data: { workerGroupId: workerGroup.id, + name: instanceName, resourceIdentifier, }, include: { + // This will always be empty for shared worker instances, but required for types deployment: true, environment: true, }, }); - if (workerInstance) { - return workerInstance; - } + return newWorkerInstance; + } catch (error) { + // Gracefully handle race conditions when connecting for the first time + if (error instanceof Prisma.PrismaClientKnownRequestError) { + // Unique constraint violation + if (error.code === "P2002") { + try { + const existingWorkerInstance = await this._prisma.workerInstance.findFirst({ + where: { + workerGroupId: workerGroup.id, + resourceIdentifier, + }, + include: { + deployment: true, + environment: true, + }, + }); - if (workerGroup.type === WorkerInstanceGroupType.MANAGED) { - if (deploymentId) { - logger.warn( - "[WorkerGroupTokenService] Shared worker group instances should not authenticate with a deployment ID", - { + return existingWorkerInstance; + } catch (error) { + logger.error("[WorkerGroupTokenService] Failed to find worker instance", { workerGroup, workerInstance, - deploymentId, - } - ); - } - - try { - const newWorkerInstance = await tx.workerInstance.create({ - data: { - workerGroupId: workerGroup.id, - name: instanceName, - resourceIdentifier, - }, - include: { - // This will always be empty for shared worker instances, but required for types - deployment: true, - environment: true, - }, - }); - return newWorkerInstance; - } catch (error) { - // Gracefully handle race conditions when connecting for the first time - if (error instanceof Prisma.PrismaClientKnownRequestError) { - // Unique constraint violation - if (error.code === "P2002") { - try { - const existingWorkerInstance = await tx.workerInstance.findFirst({ - where: { - workerGroupId: workerGroup.id, - resourceIdentifier, - }, - include: { - deployment: true, - environment: true, - }, - }); - return existingWorkerInstance; - } catch (error) { - logger.error("[WorkerGroupTokenService] Failed to find worker instance", { - workerGroup, - workerInstance, - deploymentId, - }); - return; - } - } + }); + return; } } } - - if (!workerGroup.projectId || !workerGroup.organizationId) { - logger.error( - "[WorkerGroupTokenService] Unmanaged worker group missing project or organization", - { - workerGroup, - workerInstance, - deploymentId, - } - ); - return; - } - - if (!deploymentId) { - logger.error("[WorkerGroupTokenService] Unmanaged worker group required deployment ID", { - workerGroup, - workerInstance, - }); - return; - } - - // Unmanaged workers instances are locked to a specific deployment version - - const deployment = await tx.workerDeployment.findFirst({ - where: { - ...(deploymentId.startsWith("deployment_") - ? { - friendlyId: deploymentId, - } - : { - id: deploymentId, - }), - }, - }); - - if (!deployment) { - logger.error("[WorkerGroupTokenService] Deployment not found", { - workerGroup, - workerInstance, - deploymentId, - }); - return; - } - - if (deployment.projectId !== workerGroup.projectId) { - logger.error("[WorkerGroupTokenService] Deployment does not match worker group project", { - deployment, - workerGroup, - workerInstance, - }); - return; - } - - if (deployment.status === "DEPLOYING") { - // This is the first instance to be created for this deployment, so mark it as deployed - await tx.workerDeployment.update({ - where: { - id: deployment.id, - }, - data: { - status: "DEPLOYED", - deployedAt: new Date(), - }, - }); - - // Check if the deployment should be promoted - const workerPromotion = await tx.workerDeploymentPromotion.findFirst({ - where: { - label: CURRENT_UNMANAGED_DEPLOYMENT_LABEL, - environmentId: deployment.environmentId, - }, - include: { - deployment: true, - }, - }); - - const shouldPromote = - !workerPromotion || deployment.createdAt > workerPromotion.deployment.createdAt; - - if (shouldPromote) { - // Promote the deployment - await tx.workerDeploymentPromotion.upsert({ - where: { - environmentId_label: { - environmentId: deployment.environmentId, - label: CURRENT_UNMANAGED_DEPLOYMENT_LABEL, - }, - }, - create: { - deploymentId: deployment.id, - environmentId: deployment.environmentId, - label: CURRENT_UNMANAGED_DEPLOYMENT_LABEL, - }, - update: { - deploymentId: deployment.id, - }, - }); - } - } else if (deployment.status !== "DEPLOYED") { - logger.error("[WorkerGroupTokenService] Deployment not deploying or deployed", { - deployment, - workerGroup, - workerInstance, - }); - return; - } - - const nonSharedWorkerInstance = tx.workerInstance.create({ - data: { - workerGroupId: workerGroup.id, - name: instanceName, - resourceIdentifier, - environmentId: deployment.environmentId, - deploymentId: deployment.id, - }, - include: { - deployment: true, - environment: { - include: { - parentEnvironment: true, - }, - }, - }, - }); - - return nonSharedWorkerInstance; - }); + } } private sanitizeHeaders(request: Request, skipHeaders = ["authorization"]) { @@ -491,11 +333,6 @@ export type AuthenticatedWorkerInstanceOptions = WithRunEngineOptions<{ workerGroupId: string; workerInstanceId: string; masterQueue: string; - environmentId?: string; - deploymentId?: string; - backgroundWorkerId?: string; - runnerId?: string; - environment: EnvironmentWithParent | null; }>; export class AuthenticatedWorkerInstance extends WithRunEngine { @@ -503,11 +340,7 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { readonly name: string; readonly workerGroupId: string; readonly workerInstanceId: string; - readonly runnerId?: string; readonly masterQueue: string; - readonly environment: EnvironmentWithParent | null; - readonly deploymentId?: string; - readonly backgroundWorkerId?: string; // FIXME: Required for unmanaged workers readonly isLatestDeployment = true; @@ -520,10 +353,6 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { this.workerGroupId = opts.workerGroupId; this.workerInstanceId = opts.workerInstanceId; this.masterQueue = opts.masterQueue; - this.environment = opts.environment; - this.deploymentId = opts.deploymentId; - this.backgroundWorkerId = opts.backgroundWorkerId; - this.runnerId = opts.runnerId; } async connect(metadata: Record): Promise { @@ -537,62 +366,12 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { }); } - async dequeue(): Promise { - if (this.type === WorkerInstanceGroupType.MANAGED) { - return await this._engine.dequeueFromWorkerQueue({ - consumerId: this.workerInstanceId, - workerQueue: this.masterQueue, - workerId: this.workerInstanceId, - runnerId: this.runnerId, - }); - } - - if (!this.environment || !this.deploymentId || !this.backgroundWorkerId) { - logger.error("[AuthenticatedWorkerInstance] Missing environment or deployment", { - ...this.toJSON(), - }); - return []; - } - - await this._prisma.workerInstance.update({ - where: { - id: this.workerInstanceId, - }, - data: { - lastDequeueAt: new Date(), - }, - }); - - if (this.isLatestDeployment) { - return await this._engine.dequeueFromEnvironmentWorkerQueue({ - consumerId: this.workerInstanceId, - environmentId: this.environment.id, - workerId: this.workerInstanceId, - runnerId: this.runnerId, - }); - } - - throw new ServiceValidationError("Unmanaged workers cannot dequeue from a specific version"); - } - - /** Allows managed workers to dequeue from a specific environment */ - async dequeueFromEnvironment( - backgroundWorkerId: string, - environmentId: string - ): Promise { - if (this.type !== WorkerInstanceGroupType.MANAGED) { - logger.error("[AuthenticatedWorkerInstance] Worker instance is not managed", { - ...this.toJSON(), - }); - return []; - } - - return await this._engine.dequeueFromEnvironmentWorkerQueue({ + async dequeue({ runnerId }: { runnerId?: string }): Promise { + return await this._engine.dequeueFromWorkerQueue({ consumerId: this.workerInstanceId, - backgroundWorkerId, - environmentId, + workerQueue: this.masterQueue, workerId: this.workerInstanceId, - runnerId: this.runnerId, + runnerId, }); } @@ -610,15 +389,17 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { async heartbeatRun({ runFriendlyId, snapshotFriendlyId, + runnerId, }: { runFriendlyId: string; snapshotFriendlyId: string; + runnerId?: string; }): Promise { return await this._engine.heartbeatRun({ runId: fromFriendlyId(runFriendlyId), snapshotId: fromFriendlyId(snapshotFriendlyId), workerId: this.workerInstanceId, - runnerId: this.runnerId, + runnerId, }); } @@ -626,10 +407,12 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { runFriendlyId, snapshotFriendlyId, isWarmStart, + runnerId, }: { runFriendlyId: string; snapshotFriendlyId: string; isWarmStart?: boolean; + runnerId?: string; }): Promise< StartRunAttemptResult & { envVars: Record; @@ -640,21 +423,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { snapshotId: fromFriendlyId(snapshotFriendlyId), isWarmStart, workerId: this.workerInstanceId, - runnerId: this.runnerId, + runnerId, }); const defaultMachinePreset = machinePresetFromName(defaultMachine); - const environment = - this.environment ?? - (await this._prisma.runtimeEnvironment.findFirst({ - where: { - id: engineResult.execution.environment.id, - }, - include: { - parentEnvironment: true, - }, - })); + const environment = await this._prisma.runtimeEnvironment.findFirst({ + where: { + id: engineResult.execution.environment.id, + }, + include: { + parentEnvironment: true, + }, + }); const envVars = environment ? await this.getEnvVars( @@ -675,17 +456,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { runFriendlyId, snapshotFriendlyId, completion, + runnerId, }: { runFriendlyId: string; snapshotFriendlyId: string; completion: TaskRunExecutionResult; + runnerId?: string; }): Promise { return await this._engine.completeRunAttempt({ runId: fromFriendlyId(runFriendlyId), snapshotId: fromFriendlyId(snapshotFriendlyId), completion, workerId: this.workerInstanceId, - runnerId: this.runnerId, + runnerId, }); } @@ -699,32 +482,36 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { runFriendlyId, snapshotFriendlyId, checkpoint, + runnerId, }: { runFriendlyId: string; snapshotFriendlyId: string; checkpoint: CheckpointInput; + runnerId?: string; }) { return await this._engine.createCheckpoint({ runId: fromFriendlyId(runFriendlyId), snapshotId: fromFriendlyId(snapshotFriendlyId), checkpoint, workerId: this.workerInstanceId, - runnerId: this.runnerId, + runnerId, }); } async continueRunExecution({ runFriendlyId, snapshotFriendlyId, + runnerId, }: { runFriendlyId: string; snapshotFriendlyId: string; + runnerId?: string; }) { return await this._engine.continueRunExecution({ runId: fromFriendlyId(runFriendlyId), snapshotId: fromFriendlyId(snapshotFriendlyId), workerId: this.workerInstanceId, - runnerId: this.runnerId, + runnerId, }); } @@ -742,24 +529,12 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { } toJSON(): WorkerGroupTokenAuthenticationResponse { - if (this.type === WorkerInstanceGroupType.MANAGED) { - return { - type: WorkerInstanceGroupType.MANAGED, - name: this.name, - workerGroupId: this.workerGroupId, - workerInstanceId: this.workerInstanceId, - masterQueue: this.masterQueue, - }; - } - return { - type: WorkerInstanceGroupType.UNMANAGED, + type: WorkerInstanceGroupType.MANAGED, name: this.name, workerGroupId: this.workerGroupId, workerInstanceId: this.workerInstanceId, masterQueue: this.masterQueue, - environmentId: this.environment?.id!, - deploymentId: this.deploymentId!, }; } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index b6341aca5f..6b2ecdc265 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -54,6 +54,7 @@ "@headlessui/react": "^1.7.8", "@heroicons/react": "^2.0.12", "@internal/redis": "workspace:*", + "@internal/cache": "workspace:*", "@internal/run-engine": "workspace:*", "@internal/schedule-engine": "workspace:*", "@internal/tracing": "workspace:*", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 35e5c83469..78a2821c39 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -266,6 +266,9 @@ importers: '@heroicons/react': specifier: ^2.0.12 version: 2.0.13(react@18.2.0) + '@internal/cache': + specifier: workspace:* + version: link:../../internal-packages/cache '@internal/redis': specifier: workspace:* version: link:../../internal-packages/redis From 347819435935a940645f10d1d2e053373a2d222a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 27 Aug 2025 18:18:08 +0100 Subject: [PATCH 3/4] add additional spans during dequeue --- .../run-engine/src/engine/db/worker.ts | 342 --------------- .../src/engine/systems/dequeueSystem.ts | 411 +++++++++++++++++- 2 files changed, 407 insertions(+), 346 deletions(-) delete mode 100644 internal-packages/run-engine/src/engine/db/worker.ts diff --git a/internal-packages/run-engine/src/engine/db/worker.ts b/internal-packages/run-engine/src/engine/db/worker.ts deleted file mode 100644 index 6b89eacbaf..0000000000 --- a/internal-packages/run-engine/src/engine/db/worker.ts +++ /dev/null @@ -1,342 +0,0 @@ -import { - BackgroundWorker, - BackgroundWorkerTask, - Prisma, - PrismaClientOrTransaction, - TaskQueue, - WorkerDeployment, -} from "@trigger.dev/database"; -import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; - -type RunWithMininimalEnvironment = Prisma.TaskRunGetPayload<{ - include: { - runtimeEnvironment: { - select: { - id: true; - type: true; - }; - }; - }; -}>; - -type RunWithBackgroundWorkerTasksResult = - | { - success: false; - code: "NO_RUN"; - message: string; - } - | { - success: false; - code: - | "NO_WORKER" - | "TASK_NOT_IN_LATEST" - | "TASK_NEVER_REGISTERED" - | "BACKGROUND_WORKER_MISMATCH" - | "QUEUE_NOT_FOUND" - | "RUN_ENVIRONMENT_ARCHIVED"; - message: string; - run: RunWithMininimalEnvironment; - } - | { - success: false; - code: "BACKGROUND_WORKER_MISMATCH"; - message: string; - backgroundWorker: { - expected: string; - received: string; - }; - run: RunWithMininimalEnvironment; - } - | { - success: true; - run: RunWithMininimalEnvironment; - worker: BackgroundWorker; - task: BackgroundWorkerTask; - queue: TaskQueue; - deployment: WorkerDeployment | null; - }; - -export async function getRunWithBackgroundWorkerTasks( - prisma: PrismaClientOrTransaction, - runId: string, - backgroundWorkerId?: string -): Promise { - const run = await prisma.taskRun.findFirst({ - where: { - id: runId, - }, - include: { - runtimeEnvironment: { - select: { - id: true, - type: true, - archivedAt: true, - }, - }, - lockedToVersion: { - include: { - deployment: true, - tasks: true, - }, - }, - }, - }); - - if (!run) { - return { - success: false as const, - code: "NO_RUN", - message: `No run found with id: ${runId}`, - }; - } - - if (run.runtimeEnvironment.archivedAt) { - return { - success: false as const, - code: "RUN_ENVIRONMENT_ARCHIVED", - message: `Run is on an archived environment: ${run.id}`, - run, - }; - } - - const workerId = run.lockedToVersionId ?? backgroundWorkerId; - - //get the relevant BackgroundWorker with tasks and deployment (if not DEV) - let workerWithTasks: WorkerDeploymentWithWorkerTasks | null = null; - - if (run.runtimeEnvironment.type === "DEVELOPMENT") { - workerWithTasks = workerId - ? await getWorkerById(prisma, workerId) - : await getMostRecentWorker(prisma, run.runtimeEnvironmentId); - } else { - workerWithTasks = workerId - ? await getWorkerDeploymentFromWorker(prisma, workerId) - : await getManagedWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId); - } - - if (!workerWithTasks) { - return { - success: false as const, - code: "NO_WORKER", - message: `No worker found for run: ${run.id}`, - run, - }; - } - - if (backgroundWorkerId) { - if (backgroundWorkerId !== workerWithTasks.worker.id) { - return { - success: false as const, - code: "BACKGROUND_WORKER_MISMATCH", - message: `Background worker mismatch for run: ${run.id}`, - backgroundWorker: { - expected: backgroundWorkerId, - received: workerWithTasks.worker.id, - }, - run, - }; - } - } - - const backgroundTask = workerWithTasks.tasks.find((task) => task.slug === run.taskIdentifier); - - if (!backgroundTask) { - const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({ - where: { - slug: run.taskIdentifier, - projectId: run.projectId, - runtimeEnvironmentId: run.runtimeEnvironmentId, - }, - include: { - worker: true, - }, - orderBy: { - createdAt: "desc", - }, - }); - - if (nonCurrentTask) { - return { - success: false as const, - code: "TASK_NOT_IN_LATEST", - message: `Task not found in latest version: ${run.taskIdentifier}. Found in ${nonCurrentTask.worker.version}`, - run, - }; - } else { - return { - success: false as const, - code: "TASK_NEVER_REGISTERED", - message: `Task has never been registered (in dev or deployed): ${run.taskIdentifier}`, - run, - }; - } - } - - const queue = workerWithTasks.queues.find((queue) => - run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue - ); - - if (!queue) { - return { - success: false as const, - code: "QUEUE_NOT_FOUND", - message: `Queue not found for run: ${run.id}`, - run, - }; - } - - return { - success: true as const, - run, - worker: workerWithTasks.worker, - task: backgroundTask, - queue, - deployment: workerWithTasks.deployment, - }; -} - -type WorkerDeploymentWithWorkerTasks = { - worker: BackgroundWorker; - tasks: BackgroundWorkerTask[]; - queues: TaskQueue[]; - deployment: WorkerDeployment | null; -}; - -export async function getWorkerDeploymentFromWorker( - prisma: PrismaClientOrTransaction, - workerId: string -): Promise { - const worker = await prisma.backgroundWorker.findFirst({ - where: { - id: workerId, - }, - include: { - deployment: true, - tasks: true, - queues: true, - }, - }); - - if (!worker) { - return null; - } - - return { worker, tasks: worker.tasks, queues: worker.queues, deployment: worker.deployment }; -} - -export async function getMostRecentWorker( - prisma: PrismaClientOrTransaction, - environmentId: string -): Promise { - const worker = await prisma.backgroundWorker.findFirst({ - where: { - runtimeEnvironmentId: environmentId, - }, - include: { - tasks: true, - queues: true, - }, - orderBy: { - id: "desc", - }, - }); - - if (!worker) { - return null; - } - - return { worker, tasks: worker.tasks, queues: worker.queues, deployment: null }; -} - -export async function getWorkerById( - prisma: PrismaClientOrTransaction, - workerId: string -): Promise { - const worker = await prisma.backgroundWorker.findFirst({ - where: { - id: workerId, - }, - include: { - deployment: true, - tasks: true, - queues: true, - }, - orderBy: { - id: "desc", - }, - }); - - if (!worker) { - return null; - } - - return { worker, tasks: worker.tasks, queues: worker.queues, deployment: worker.deployment }; -} - -export async function getManagedWorkerFromCurrentlyPromotedDeployment( - prisma: PrismaClientOrTransaction, - environmentId: string -): Promise { - const promotion = await prisma.workerDeploymentPromotion.findFirst({ - where: { - environmentId, - label: CURRENT_DEPLOYMENT_LABEL, - }, - include: { - deployment: { - include: { - worker: { - include: { - tasks: true, - queues: true, - }, - }, - }, - }, - }, - }); - - if (!promotion || !promotion.deployment.worker) { - return null; - } - - if (promotion.deployment.type === "MANAGED") { - // This is a run engine v2 deployment, so return it - return { - worker: promotion.deployment.worker, - tasks: promotion.deployment.worker.tasks, - queues: promotion.deployment.worker.queues, - deployment: promotion.deployment, - }; - } - - // We need to get the latest run engine v2 deployment - const latestV2Deployment = await prisma.workerDeployment.findFirst({ - where: { - environmentId, - type: "MANAGED", - }, - orderBy: { - id: "desc", - }, - include: { - worker: { - include: { - tasks: true, - queues: true, - }, - }, - }, - }); - - if (!latestV2Deployment?.worker) { - return null; - } - - return { - worker: latestV2Deployment.worker, - tasks: latestV2Deployment.worker.tasks, - queues: latestV2Deployment.worker.queues, - deployment: latestV2Deployment, - }; -} diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 912dfff335..922aef4e45 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -4,8 +4,16 @@ import { assertExhaustive } from "@trigger.dev/core"; import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3"; import { placementTag } from "@trigger.dev/core/v3/serverOnly"; import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; -import { PrismaClientOrTransaction } from "@trigger.dev/database"; -import { getRunWithBackgroundWorkerTasks } from "../db/worker.js"; +import { + BackgroundWorker, + BackgroundWorkerTask, + Prisma, + PrismaClientOrTransaction, + TaskQueue, + WorkerDeployment, +} from "@trigger.dev/database"; +import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; + import { sendNotificationToWorker } from "../eventBus.js"; import { getMachinePreset } from "../machinePresets.js"; import { isDequeueableExecutionStatus, isExecuting } from "../statuses.js"; @@ -22,6 +30,61 @@ export type DequeueSystemOptions = { billingCache: BillingCache; }; +type RunWithMininimalEnvironment = Prisma.TaskRunGetPayload<{ + include: { + runtimeEnvironment: { + select: { + id: true; + type: true; + }; + }; + }; +}>; + +type RunWithBackgroundWorkerTasksResult = + | { + success: false; + code: "NO_RUN"; + message: string; + } + | { + success: false; + code: + | "NO_WORKER" + | "TASK_NOT_IN_LATEST" + | "TASK_NEVER_REGISTERED" + | "BACKGROUND_WORKER_MISMATCH" + | "QUEUE_NOT_FOUND" + | "RUN_ENVIRONMENT_ARCHIVED"; + message: string; + run: RunWithMininimalEnvironment; + } + | { + success: false; + code: "BACKGROUND_WORKER_MISMATCH"; + message: string; + backgroundWorker: { + expected: string; + received: string; + }; + run: RunWithMininimalEnvironment; + } + | { + success: true; + run: RunWithMininimalEnvironment; + worker: BackgroundWorker; + task: BackgroundWorkerTask; + queue: TaskQueue; + deployment: WorkerDeployment | null; + }; + +type WorkerDeploymentWithWorkerTasks = { + worker: BackgroundWorker; + tasks: BackgroundWorkerTask[]; + queues: TaskQueue[]; + deployment: WorkerDeployment | null; +}; + export class DequeueSystem { private readonly $: SystemResources; private readonly executionSnapshotSystem: ExecutionSnapshotSystem; @@ -80,7 +143,14 @@ export class DequeueSystem { const orgId = message.message.orgId; const runId = message.messageId; - span.setAttribute("runId", runId); + span.setAttribute("run_id", runId); + span.setAttribute("org_id", orgId); + span.setAttribute("environment_id", message.message.environmentId); + span.setAttribute("environment_type", message.message.environmentType); + span.setAttribute("worker_queue_length", message.workerQueueLength ?? 0); + span.setAttribute("consumer_id", consumerId); + span.setAttribute("worker_queue", workerQueue); + span.setAttribute("blocking_pop", blockingPop ?? true); //lock the run so nothing else can modify it try { @@ -199,7 +269,7 @@ export class DequeueSystem { return; } - const result = await getRunWithBackgroundWorkerTasks( + const result = await this.#getRunWithBackgroundWorkerTasks( prisma, runId, backgroundWorkerId @@ -666,4 +736,337 @@ export class DequeueSystem { }); }); } + + async #getRunWithBackgroundWorkerTasks( + prisma: PrismaClientOrTransaction, + runId: string, + backgroundWorkerId?: string + ): Promise { + return startSpan(this.$.tracer, "getRunWithBackgroundWorkerTasks", async (span) => { + span.setAttribute("run_id", runId); + + const run = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + include: { + runtimeEnvironment: { + select: { + id: true, + type: true, + archivedAt: true, + }, + }, + lockedToVersion: { + include: { + deployment: true, + tasks: true, + }, + }, + }, + }); + + if (!run) { + span.setAttribute("result", "NO_RUN"); + return { + success: false as const, + code: "NO_RUN", + message: `No run found with id: ${runId}`, + }; + } + + span.setAttribute("environment_type", run.runtimeEnvironment.type); + + if (run.runtimeEnvironment.archivedAt) { + span.setAttribute("result", "RUN_ENVIRONMENT_ARCHIVED"); + return { + success: false as const, + code: "RUN_ENVIRONMENT_ARCHIVED", + message: `Run is on an archived environment: ${run.id}`, + run, + }; + } + + const workerId = run.lockedToVersionId ?? backgroundWorkerId; + + //get the relevant BackgroundWorker with tasks and deployment (if not DEV) + let workerWithTasks: WorkerDeploymentWithWorkerTasks | null = null; + + if (run.runtimeEnvironment.type === "DEVELOPMENT") { + workerWithTasks = workerId + ? await this.#getWorkerById(prisma, workerId) + : await this.#getMostRecentWorker(prisma, run.runtimeEnvironmentId); + } else { + workerWithTasks = workerId + ? await this.#getWorkerDeploymentFromWorker(prisma, workerId) + : await this.#getManagedWorkerFromCurrentlyPromotedDeployment( + prisma, + run.runtimeEnvironmentId + ); + } + + if (!workerWithTasks) { + span.setAttribute("result", "NO_WORKER"); + return { + success: false as const, + code: "NO_WORKER", + message: `No worker found for run: ${run.id}`, + run, + }; + } + + if (backgroundWorkerId) { + if (backgroundWorkerId !== workerWithTasks.worker.id) { + span.setAttribute("result", "BACKGROUND_WORKER_MISMATCH"); + return { + success: false as const, + code: "BACKGROUND_WORKER_MISMATCH", + message: `Background worker mismatch for run: ${run.id}`, + backgroundWorker: { + expected: backgroundWorkerId, + received: workerWithTasks.worker.id, + }, + run, + }; + } + } + + const backgroundTask = workerWithTasks.tasks.find((task) => task.slug === run.taskIdentifier); + + if (!backgroundTask) { + const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({ + where: { + slug: run.taskIdentifier, + projectId: run.projectId, + runtimeEnvironmentId: run.runtimeEnvironmentId, + }, + include: { + worker: true, + }, + orderBy: { + createdAt: "desc", + }, + }); + + if (nonCurrentTask) { + span.setAttribute("result", "TASK_NOT_IN_LATEST"); + return { + success: false as const, + code: "TASK_NOT_IN_LATEST", + message: `Task not found in latest version: ${run.taskIdentifier}. Found in ${nonCurrentTask.worker.version}`, + run, + }; + } else { + span.setAttribute("result", "TASK_NEVER_REGISTERED"); + return { + success: false as const, + code: "TASK_NEVER_REGISTERED", + message: `Task has never been registered (in dev or deployed): ${run.taskIdentifier}`, + run, + }; + } + } + + const queue = workerWithTasks.queues.find((queue) => + run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue + ); + + if (!queue) { + span.setAttribute("result", "QUEUE_NOT_FOUND"); + return { + success: false as const, + code: "QUEUE_NOT_FOUND", + message: `Queue not found for run: ${run.id}`, + run, + }; + } + + span.setAttribute("result", "SUCCESS"); + + return { + success: true as const, + run, + worker: workerWithTasks.worker, + task: backgroundTask, + queue, + deployment: workerWithTasks.deployment, + }; + }); + } + + async #getWorkerDeploymentFromWorker( + prisma: PrismaClientOrTransaction, + workerId: string + ): Promise { + return startSpan(this.$.tracer, "getWorkerDeploymentFromWorker", async (span) => { + const worker = await prisma.backgroundWorker.findFirst({ + where: { + id: workerId, + }, + include: { + deployment: true, + tasks: true, + queues: true, + }, + }); + + if (!worker) { + span.setAttribute("result", "NOT_FOUND"); + return null; + } + + span.setAttribute("result", "SUCCESS"); + + return { + worker, + tasks: worker.tasks, + queues: worker.queues, + deployment: worker.deployment, + }; + }); + } + + async #getMostRecentWorker( + prisma: PrismaClientOrTransaction, + environmentId: string + ): Promise { + return startSpan(this.$.tracer, "getMostRecentWorker", async (span) => { + const worker = await prisma.backgroundWorker.findFirst({ + where: { + runtimeEnvironmentId: environmentId, + }, + include: { + tasks: true, + queues: true, + }, + orderBy: { + id: "desc", + }, + }); + + if (!worker) { + span.setAttribute("result", "NOT_FOUND"); + return null; + } + + span.setAttribute("result", "SUCCESS"); + + return { worker, tasks: worker.tasks, queues: worker.queues, deployment: null }; + }); + } + + async #getWorkerById( + prisma: PrismaClientOrTransaction, + workerId: string + ): Promise { + return startSpan(this.$.tracer, "getWorkerById", async (span) => { + const worker = await prisma.backgroundWorker.findFirst({ + where: { + id: workerId, + }, + include: { + deployment: true, + tasks: true, + queues: true, + }, + orderBy: { + id: "desc", + }, + }); + + if (!worker) { + span.setAttribute("result", "NOT_FOUND"); + return null; + } + + span.setAttribute("result", "SUCCESS"); + + return { + worker, + tasks: worker.tasks, + queues: worker.queues, + deployment: worker.deployment, + }; + }); + } + + async #getManagedWorkerFromCurrentlyPromotedDeployment( + prisma: PrismaClientOrTransaction, + environmentId: string + ): Promise { + return startSpan( + this.$.tracer, + "getManagedWorkerFromCurrentlyPromotedDeployment", + async (span) => { + const promotion = await prisma.workerDeploymentPromotion.findFirst({ + where: { + environmentId, + label: CURRENT_DEPLOYMENT_LABEL, + }, + include: { + deployment: { + include: { + worker: { + include: { + tasks: true, + queues: true, + }, + }, + }, + }, + }, + }); + + if (!promotion || !promotion.deployment.worker) { + span.setAttribute("result", "NO_PROMOTION_OR_WORKER"); + return null; + } + + if (promotion.deployment.type === "MANAGED") { + // This is a run engine v2 deployment, so return it + span.setAttribute("result", "SUCCESS_CURRENT_MANAGED"); + + return { + worker: promotion.deployment.worker, + tasks: promotion.deployment.worker.tasks, + queues: promotion.deployment.worker.queues, + deployment: promotion.deployment, + }; + } + + // We need to get the latest run engine v2 deployment + const latestV2Deployment = await prisma.workerDeployment.findFirst({ + where: { + environmentId, + type: "MANAGED", + }, + orderBy: { + id: "desc", + }, + include: { + worker: { + include: { + tasks: true, + queues: true, + }, + }, + }, + }); + + if (!latestV2Deployment?.worker) { + span.setAttribute("result", "NO_V2_DEPLOYMENT"); + return null; + } + + span.setAttribute("result", "SUCCESS_LATEST_V2"); + + return { + worker: latestV2Deployment.worker, + tasks: latestV2Deployment.worker.tasks, + queues: latestV2Deployment.worker.queues, + deployment: latestV2Deployment, + }; + } + ); + } } From 3f96ae09c55e69544540375d1d4f75ecc2b296d2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 27 Aug 2025 18:44:57 +0100 Subject: [PATCH 4/4] Add env vars and additional spans --- apps/webapp/app/env.server.ts | 5 +- apps/webapp/app/v3/runEngine.server.ts | 3 + .../worker/workerGroupTokenService.server.ts | 4 +- .../run-engine/src/engine/locking.ts | 21 +- .../run-engine/src/engine/types.ts | 3 + .../run-engine/src/run-queue/index.ts | 243 +++++++++++------- 6 files changed, 181 insertions(+), 98 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index fdd343c90b..a8ea87e79a 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -496,7 +496,10 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000), RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), - RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500), + RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000), + RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000), + RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), + RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10), RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(), RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(), RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 730e156e0d..c3ddc89b92 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -67,6 +67,9 @@ function createRunEngine() { dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS, masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS, masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0", + masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS, + masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD, + masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT, concurrencySweeper: { scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE, processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE, diff --git a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts index be60250344..37aab78e62 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts @@ -193,8 +193,10 @@ export class WorkerGroupTokenService extends WithRunEngine { return; } + const cacheKey = ["worker-group-token", token, instanceName]; + const result = await authenticatedWorkerInstanceCache.authenticatedWorkerInstance.swr( - `worker-group-token-${token}`, + cacheKey.join("-"), async () => { const workerGroup = await this.findWorkerGroup({ token }); diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index a58c0dcfdc..6921db28a3 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -217,7 +217,16 @@ export class RunLocker { let lastError: Error | undefined; for (let attempt = 0; attempt <= maxAttempts; attempt++) { - const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration)); + const [error, acquiredLock] = await tryCatch( + startSpan(this.tracer, "RunLocker.acquireLock", async (span) => { + span.setAttributes({ + resources: joinedResources, + attempt, + totalWaitTime, + }); + return await this.redlock.acquire(sortedResources, duration); + }) + ); if (!error && acquiredLock) { lock = acquiredLock; @@ -390,7 +399,15 @@ export class RunLocker { this.#cleanupExtension(manualContext); // Release the lock using tryCatch - const [releaseError] = await tryCatch(lock.release()); + const [releaseError] = await tryCatch( + startSpan(this.tracer, "RunLocker.releaseLock", async (span) => { + span.setAttributes({ + resources: joinedResources, + lockValue: lock.value, + }); + return await lock.release(); + }) + ); if (releaseError) { this.logger.warn("[RunLocker] Error releasing lock", { error: releaseError, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 9abf2acfa2..502a9b318f 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -39,6 +39,9 @@ export type RunEngineOptions = { masterQueueConsumersDisabled?: boolean; processWorkerQueueDebounceMs?: number; masterQueueConsumersIntervalMs?: number; + masterQueueCooloffPeriodMs?: number; + masterQueueCooloffCountThreshold?: number; + masterQueueConsumerDequeueCount?: number; workerOptions?: WorkerConcurrencyOptions; retryOptions?: RetryOptions; defaultEnvConcurrency?: number; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index fa6603a503..ec12c19f1d 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -1451,92 +1451,107 @@ export class RunQueue { shard: number; maxCount: number; }): Promise { - const queueConcurrencyLimitKey = this.keys.queueConcurrencyLimitKeyFromQueue(messageQueue); - const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(messageQueue); - const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(messageQueue); - const envConcurrencyLimitBurstFactorKey = - this.keys.envConcurrencyLimitBurstFactorKeyFromQueue(messageQueue); - const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue); - const messageKeyPrefix = this.keys.messageKeyPrefixFromQueue(messageQueue); - const envQueueKey = this.keys.envQueueKeyFromQueue(messageQueue); - const masterQueueKey = this.keys.masterQueueKeyForShard(shard); - - this.logger.debug("#callDequeueMessagesFromQueue", { - messageQueue, - queueConcurrencyLimitKey, - envConcurrencyLimitKey, - envConcurrencyLimitBurstFactorKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - messageKeyPrefix, - envQueueKey, - masterQueueKey, - shard, - maxCount, - }); + return this.#trace("callDequeueMessagesFromQueue", async (span) => { + span.setAttributes({ + messageQueue, + shard, + maxCount, + }); - const result = await this.redis.dequeueMessagesFromQueue( - //keys - messageQueue, - queueConcurrencyLimitKey, - envConcurrencyLimitKey, - envConcurrencyLimitBurstFactorKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - messageKeyPrefix, - envQueueKey, - masterQueueKey, - //args - messageQueue, - String(Date.now()), - String(this.options.defaultEnvConcurrency), - String(this.options.defaultEnvConcurrencyBurstFactor ?? 1), - this.options.redis.keyPrefix ?? "", - String(maxCount) - ); + const queueConcurrencyLimitKey = this.keys.queueConcurrencyLimitKeyFromQueue(messageQueue); + const queueCurrentConcurrencyKey = + this.keys.queueCurrentConcurrencyKeyFromQueue(messageQueue); + const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(messageQueue); + const envConcurrencyLimitBurstFactorKey = + this.keys.envConcurrencyLimitBurstFactorKeyFromQueue(messageQueue); + const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue); + const messageKeyPrefix = this.keys.messageKeyPrefixFromQueue(messageQueue); + const envQueueKey = this.keys.envQueueKeyFromQueue(messageQueue); + const masterQueueKey = this.keys.masterQueueKeyForShard(shard); - if (!result) { - return []; - } + this.logger.debug("#callDequeueMessagesFromQueue", { + messageQueue, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + messageKeyPrefix, + envQueueKey, + masterQueueKey, + shard, + maxCount, + }); - this.logger.debug("dequeueMessagesFromQueue raw result", { - result, - service: this.name, - }); + const result = await this.redis.dequeueMessagesFromQueue( + //keys + messageQueue, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + messageKeyPrefix, + envQueueKey, + masterQueueKey, + //args + messageQueue, + String(Date.now()), + String(this.options.defaultEnvConcurrency), + String(this.options.defaultEnvConcurrencyBurstFactor ?? 1), + this.options.redis.keyPrefix ?? "", + String(maxCount) + ); - const messages = []; - for (let i = 0; i < result.length; i += 3) { - const messageId = result[i]; - const messageScore = result[i + 1]; - const rawMessage = result[i + 2]; + if (!result) { + span.setAttribute("message_count", 0); - //read message - const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); - if (!parsedMessage.success) { - this.logger.error(`[${this.name}] Failed to parse message`, { + return []; + } + + this.logger.debug("dequeueMessagesFromQueue raw result", { + result, + service: this.name, + }); + + const messages = []; + for (let i = 0; i < result.length; i += 3) { + const messageId = result[i]; + const messageScore = result[i + 1]; + const rawMessage = result[i + 2]; + + //read message + const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); + if (!parsedMessage.success) { + this.logger.error(`[${this.name}] Failed to parse message`, { + messageId, + error: parsedMessage.error, + service: this.name, + }); + + continue; + } + + const message = parsedMessage.data; + + messages.push({ messageId, - error: parsedMessage.error, - service: this.name, + messageScore, + message, }); - - continue; } - const message = parsedMessage.data; - - messages.push({ - messageId, - messageScore, - message, + this.logger.debug("dequeueMessagesFromQueue parsed result", { + messages, + service: this.name, }); - } - this.logger.debug("dequeueMessagesFromQueue parsed result", { - messages, - service: this.name, - }); + const filteredMessages = messages.filter(Boolean) as DequeuedMessage[]; + + span.setAttribute("message_count", filteredMessages.length); - return messages.filter(Boolean) as DequeuedMessage[]; + return filteredMessages; + }); } async #callDequeueMessageFromWorkerQueue({ @@ -1569,7 +1584,16 @@ export class RunQueue { this.abortController.signal.addEventListener("abort", cleanup); - const result = await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds); + const result = await this.#trace("popMessageFromWorkerQueue", async (span) => { + span.setAttributes({ + workerQueue, + workerQueueKey, + blockingPopTimeoutSeconds, + blocking: true, + }); + + return await blockingClient.blpop(workerQueueKey, blockingPopTimeoutSeconds); + }); this.abortController.signal.removeEventListener("abort", cleanup); @@ -1607,7 +1631,15 @@ export class RunQueue { const [, messageKey] = result; - const workerQueueLength = await this.redis.llen(workerQueueKey); + const workerQueueLength = await this.#trace("getWorkerQueueLength", async (span) => { + span.setAttributes({ + workerQueue, + workerQueueKey, + }); + + return await this.redis.llen(workerQueueKey); + }); + const message = await this.#dequeueMessageFromKey(messageKey); if (!message) { @@ -1626,7 +1658,15 @@ export class RunQueue { workerQueueKey, }); - const result = await this.redis.dequeueMessageFromWorkerQueueNonBlocking(workerQueueKey); + const result = await this.#trace("popMessageFromWorkerQueue", async (span) => { + span.setAttributes({ + workerQueue, + workerQueueKey, + blocking: false, + }); + + return await this.redis.dequeueMessageFromWorkerQueueNonBlocking(workerQueueKey); + }); if (!result) { return; @@ -2070,27 +2110,42 @@ export class RunQueue { } async #dequeueMessageFromKey(messageKey: string) { - const rawMessage = await this.redis.dequeueMessageFromKey( - messageKey, - this.options.redis.keyPrefix ?? "" - ); + return this.#trace("dequeueMessageFromKey", async (span) => { + span.setAttributes({ + messageKey, + }); - if (!rawMessage) { - return; - } + const rawMessage = await this.redis.dequeueMessageFromKey( + messageKey, + this.options.redis.keyPrefix ?? "" + ); - const [error, message] = parseRawMessage(rawMessage); + if (!rawMessage) { + span.setAttribute("result", "NO_MESSAGE"); - if (error) { - this.logger.error(`[${this.name}] Failed to parse message`, { - messageKey, - error, - service: this.name, - message: message ?? rawMessage, - }); - } + return; + } - return message; + const [error, message] = parseRawMessage(rawMessage); + + if (error) { + this.logger.error(`[${this.name}] Failed to parse message`, { + messageKey, + error, + service: this.name, + message: message ?? rawMessage, + }); + } + + if (message) { + span.setAttribute("result", "SUCCESS"); + span.setAttribute("messageId", message.runId); + } else { + span.setAttribute("result", "NO_MESSAGE"); + } + + return message; + }); } #registerCommands() {