From bee6b882c30b6dc9b680c6da4095f38addd6b24e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 12 Jun 2025 17:06:34 +0100 Subject: [PATCH 01/14] Remove the runlock from the trigger path --- .../run-engine/src/engine/index.ts | 88 +++++++++---------- .../run-engine/src/engine/locking.ts | 14 +++ .../src/engine/systems/enqueueSystem.ts | 4 +- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 620eac3f64..ac29abb749 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -486,56 +486,52 @@ export class RunEngine { span.setAttribute("runId", taskRun.id); - await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => { - //create associated waitpoint (this completes when the run completes) - const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint( - prisma, - { - projectId: environment.project.id, - environmentId: environment.id, - completedByTaskRunId: taskRun.id, - } - ); - - //triggerAndWait or batchTriggerAndWait - if (resumeParentOnCompletion && parentTaskRunId) { - //this will block the parent run from continuing until this waitpoint is completed (and removed) - await this.waitpointSystem.blockRunWithWaitpoint({ - runId: parentTaskRunId, - waitpoints: associatedWaitpoint.id, - projectId: associatedWaitpoint.projectId, - organizationId: environment.organization.id, - batch, - workerId, - runnerId, - tx: prisma, - releaseConcurrency, - }); + //create associated waitpoint (this completes when the run completes) + const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint( + prisma, + { + projectId: environment.project.id, + environmentId: environment.id, + completedByTaskRunId: taskRun.id, } + ); - //Make sure lock extension succeeded - signal.throwIfAborted(); - - if (taskRun.delayUntil) { - // Schedule the run to be enqueued at the delayUntil time - await this.delayedRunSystem.scheduleDelayedRunEnqueuing({ - runId: taskRun.id, - delayUntil: taskRun.delayUntil, - }); - } else { - await this.enqueueSystem.enqueueRun({ - run: taskRun, - env: environment, - workerId, - runnerId, - tx: prisma, - }); + //triggerAndWait or batchTriggerAndWait + if (resumeParentOnCompletion && parentTaskRunId) { + //this will block the parent run from continuing until this waitpoint is completed (and removed) + await this.waitpointSystem.blockRunWithWaitpoint({ + runId: parentTaskRunId, + waitpoints: associatedWaitpoint.id, + projectId: associatedWaitpoint.projectId, + organizationId: environment.organization.id, + batch, + workerId, + runnerId, + tx: prisma, + releaseConcurrency, + }); + } - if (taskRun.ttl) { - await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); - } + if (taskRun.delayUntil) { + // Schedule the run to be enqueued at the delayUntil time + await this.delayedRunSystem.scheduleDelayedRunEnqueuing({ + runId: taskRun.id, + delayUntil: taskRun.delayUntil, + }); + } else { + if (taskRun.ttl) { + await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); } - }); + + await this.enqueueSystem.enqueueRun({ + run: taskRun, + env: environment, + workerId, + runnerId, + tx: prisma, + skipRunLock: true, + }); + } this.eventBus.emit("runCreated", { time: new Date(), diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 1e120a3feb..9eff9aa89b 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -165,6 +165,20 @@ export class RunLocker { ); } + async lockIf( + condition: boolean, + name: string, + resources: string[], + duration: number, + routine: () => Promise + ): Promise { + if (condition) { + return this.lock(name, resources, duration, routine); + } else { + return routine(); + } + } + isInsideLock(): boolean { return !!this.asyncLocalStorage.getStore(); } diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 1f5383fe0a..c43f93bcf7 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -33,6 +33,7 @@ export class EnqueueSystem { completedWaitpoints, workerId, runnerId, + skipRunLock, }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; @@ -51,10 +52,11 @@ export class EnqueueSystem { }[]; workerId?: string; runnerId?: string; + skipRunLock?: boolean; }) { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("enqueueRun", [run.id], 5000, async () => { + return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], 5000, async () => { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run: run, snapshot: { From a7659fee747f6d5cc828769f55b6963264603c50 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 12 Jun 2025 18:07:47 +0100 Subject: [PATCH 02/14] WIP runlock rewrite --- .../run-engine/src/engine/locking.ts | 189 +++++++++++++++++- 1 file changed, 188 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 9eff9aa89b..d868bb3ed4 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -165,12 +165,199 @@ export class RunLocker { ); } + /** Manual lock acquisition with custom retry logic */ + async #acquireAndExecute( + name: string, + resources: string[], + duration: number, + routine: (signal: redlock.RedlockAbortSignal) => Promise, + lockId: string, + lockStartTime: number + ): Promise { + const joinedResources = resources.sort().join(","); + + // Custom retry settings + const maxRetries = 10; + const baseDelay = 200; + const jitter = 200; + + // Retry the lock acquisition specifically using tryCatch + let lock: redlock.Lock; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + const [error, acquiredLock] = await tryCatch(this.redlock.acquire(resources, duration)); + + if (!error && acquiredLock) { + lock = acquiredLock; + break; + } + + // If this is the last attempt, throw the error + if (attempt === maxRetries) { + throw error || new Error("Failed to acquire lock after maximum retries"); + } + + // If it's a ResourceLockedError, we should retry + if (error && error.name === "ResourceLockedError") { + // Calculate delay with jitter + const delay = baseDelay + Math.floor((Math.random() * 2 - 1) * jitter); + await new Promise((resolve) => setTimeout(resolve, Math.max(0, delay))); + continue; + } + + // For other errors, throw immediately + throw error || new Error("Unknown error during lock acquisition"); + } + + // Create an AbortController for our signal + const controller = new AbortController(); + const signal = controller.signal as redlock.RedlockAbortSignal; + + const manualContext: ManualLockContext = { + lock: lock!, + timeout: undefined, + extension: undefined, + }; + + // Set up auto-extension starting from when lock was actually acquired + this.#setupAutoExtension(manualContext, duration, signal, controller); + + try { + const newContext: LockContext = { + resources: joinedResources, + signal, + lockType: name, + }; + + // Track active lock + this.activeLocks.set(lockId, { + lockType: name, + resources: resources, + }); + + let lockSuccess = true; + try { + const result = await this.asyncLocalStorage.run(newContext, async () => { + return routine(signal); + }); + + return result; + } catch (lockError) { + lockSuccess = false; + throw lockError; + } finally { + // Record lock duration + const lockDuration = performance.now() - lockStartTime; + this.lockDurationHistogram.record(lockDuration, { + [SemanticAttributes.LOCK_TYPE]: name, + [SemanticAttributes.LOCK_SUCCESS]: lockSuccess.toString(), + }); + + // Remove from active locks when done + this.activeLocks.delete(lockId); + } + } finally { + // Clean up extension mechanism - this ensures auto extension stops after routine finishes + this.#cleanupExtension(manualContext); + + // Release the lock using tryCatch + const [releaseError] = await tryCatch(lock!.release()); + if (releaseError) { + this.logger.warn("[RunLocker] Error releasing lock", { + error: releaseError, + resources, + lockValue: lock!.value, + }); + } + } + } + + /** Set up automatic lock extension */ + #setupAutoExtension( + context: ManualLockContext, + duration: number, + signal: redlock.RedlockAbortSignal, + controller: AbortController + ): void { + const automaticExtensionThreshold = 500; // Same as redlock default + + if (automaticExtensionThreshold > duration - 100) { + // Don't set up auto-extension if duration is too short + return; + } + + const scheduleExtension = (): void => { + const timeUntilExtension = context.lock.expiration - Date.now() - automaticExtensionThreshold; + + if (timeUntilExtension > 0) { + context.timeout = setTimeout(() => { + context.extension = this.#extendLock( + context, + duration, + signal, + controller, + scheduleExtension + ); + }, timeUntilExtension); + } + }; + + scheduleExtension(); + } + + /** Extend a lock */ + async #extendLock( + context: ManualLockContext, + duration: number, + signal: redlock.RedlockAbortSignal, + controller: AbortController, + scheduleNext: () => void + ): Promise { + context.timeout = undefined; + + const [error, newLock] = await tryCatch(context.lock.extend(duration)); + + if (!error && newLock) { + context.lock = newLock; + // Only schedule next extension if we haven't been cleaned up + if (context.timeout !== null) { + scheduleNext(); + } + } else { + if (context.lock.expiration > Date.now()) { + // If lock hasn't expired yet, try again (but only if not cleaned up) + if (context.timeout !== null) { + return this.#extendLock(context, duration, signal, controller, scheduleNext); + } + } else { + // Lock has expired, abort the signal + signal.error = error instanceof Error ? error : new Error(String(error)); + controller.abort(); + } + } + } + + /** Clean up extension mechanism */ + #cleanupExtension(context: ManualLockContext): void { + // Signal that we're cleaning up by setting timeout to null + if (context.timeout) { + clearTimeout(context.timeout); + } + context.timeout = null; + + // Wait for any in-flight extension to complete + if (context.extension) { + context.extension.catch(() => { + // Ignore errors during cleanup + }); + } + } + async lockIf( condition: boolean, name: string, resources: string[], duration: number, - routine: () => Promise + routine: (signal?: redlock.RedlockAbortSignal) => Promise ): Promise { if (condition) { return this.lock(name, resources, duration, routine); From 5cd4499529f5b3d464ebb00544e7dad18b6dc7e9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Jun 2025 15:15:50 +0100 Subject: [PATCH 03/14] Use our own manual locking instead of redlock.using --- .../run-engine/src/engine/locking.ts | 65 +++++++++---------- .../src/engine/tests/locking.test.ts | 64 +++++++++--------- 2 files changed, 64 insertions(+), 65 deletions(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index d868bb3ed4..b06e5c9ffd 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -28,6 +28,12 @@ interface LockContext { lockType: string; } +interface ManualLockContext { + lock: redlock.Lock; + timeout: NodeJS.Timeout | null | undefined; + extension: Promise | undefined; +} + export class RunLocker { private redlock: InstanceType; private asyncLocalStorage: AsyncLocalStorage; @@ -35,6 +41,7 @@ export class RunLocker { private tracer: Tracer; private meter: Meter; private activeLocks: Map = new Map(); + private activeManualContexts: Map = new Map(); private lockDurationHistogram: Histogram; constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter }) { @@ -110,39 +117,7 @@ export class RunLocker { const lockStartTime = performance.now(); const [error, result] = await tryCatch( - this.redlock.using(resources, duration, async (signal) => { - const newContext: LockContext = { - resources: joinedResources, - signal, - lockType: name, - }; - - // Track active lock - this.activeLocks.set(lockId, { - lockType: name, - resources: resources, - }); - - let lockSuccess = true; - try { - return this.asyncLocalStorage.run(newContext, async () => { - return routine(signal); - }); - } catch (lockError) { - lockSuccess = false; - throw lockError; - } finally { - // Record lock duration - const lockDuration = performance.now() - lockStartTime; - this.lockDurationHistogram.record(lockDuration, { - [SemanticAttributes.LOCK_TYPE]: name, - [SemanticAttributes.LOCK_SUCCESS]: lockSuccess.toString(), - }); - - // Remove from active locks when done - this.activeLocks.delete(lockId); - } - }) + this.#acquireAndExecute(name, resources, duration, routine, lockId, lockStartTime) ); if (error) { @@ -218,6 +193,9 @@ export class RunLocker { extension: undefined, }; + // Track the manual context for cleanup + this.activeManualContexts.set(lockId, manualContext); + // Set up auto-extension starting from when lock was actually acquired this.#setupAutoExtension(manualContext, duration, signal, controller); @@ -256,6 +234,9 @@ export class RunLocker { this.activeLocks.delete(lockId); } } finally { + // Remove from active manual contexts + this.activeManualContexts.delete(lockId); + // Clean up extension mechanism - this ensures auto extension stops after routine finishes this.#cleanupExtension(manualContext); @@ -375,6 +356,24 @@ export class RunLocker { } async quit() { + // Clean up all active manual contexts + for (const [lockId, context] of this.activeManualContexts) { + this.#cleanupExtension(context); + + // Try to release any remaining locks + const [releaseError] = await tryCatch(context.lock.release()); + if (releaseError) { + this.logger.warn("[RunLocker] Error releasing lock during quit", { + error: releaseError, + lockId, + lockValue: context.lock.value, + }); + } + } + + this.activeManualContexts.clear(); + this.activeLocks.clear(); + await this.redlock.quit(); } } diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index bdb5f61242..8607664647 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -8,14 +8,14 @@ import { Logger } from "@trigger.dev/core/logger"; describe("RunLocker", () => { redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ - redis, - logger, - tracer: trace.getTracer("RunLockTest"), - }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + try { expect(runLock.isInsideLock()).toBe(false); await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { @@ -25,16 +25,16 @@ describe("RunLocker", () => { expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } }); redisTest("Test double locking works", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { expect(runLock.isInsideLock()).toBe(false); await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { @@ -50,7 +50,7 @@ describe("RunLocker", () => { expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } }); @@ -59,10 +59,10 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { expect(runLock.isInsideLock()).toBe(false); await expect( @@ -74,7 +74,7 @@ describe("RunLocker", () => { // Verify the lock was released expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); @@ -84,10 +84,10 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { expect(runLock.isInsideLock()).toBe(false); await expect( @@ -105,17 +105,17 @@ describe("RunLocker", () => { // Verify all locks were released expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); redisTest("Test lock throws when it times out", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { // First, ensure we can acquire the lock normally let firstLockAcquired = false; await runLock.lock("test-lock", ["test-1"], 5000, async () => { @@ -147,7 +147,7 @@ describe("RunLocker", () => { // Verify final state expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } }); @@ -156,10 +156,10 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { await runLock.lock("test-lock", ["test-1"], 5000, async () => { // First lock acquired expect(runLock.isInsideLock()).toBe(true); @@ -176,7 +176,7 @@ describe("RunLocker", () => { // Verify final state expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); @@ -186,10 +186,10 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { // First verify we can acquire the lock normally let firstLockAcquired = false; await runLock.lock("test-lock", ["test-1"], 5000, async () => { @@ -225,7 +225,7 @@ describe("RunLocker", () => { expect(innerLockExecuted).toBe(true); expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); From a2680815f246c78f35230d8440e6560f29ddfeff Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 09:42:24 +0100 Subject: [PATCH 04/14] Better retries and more tests --- .../run-engine/src/engine/locking.ts | 180 ++++- .../src/engine/tests/locking.test.ts | 673 +++++++++++++++++- 2 files changed, 824 insertions(+), 29 deletions(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index b06e5c9ffd..859f20de3f 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -22,6 +22,23 @@ const SemanticAttributes = { LOCK_SUCCESS: "run_engine.lock.success", }; +export class LockAcquisitionTimeoutError extends Error { + constructor( + public readonly resources: string[], + public readonly totalWaitTime: number, + public readonly attempts: number, + message?: string + ) { + super( + message || + `Failed to acquire lock on resources [${resources.join( + ", " + )}] after ${totalWaitTime}ms and ${attempts} attempts` + ); + this.name = "LockAcquisitionTimeoutError"; + } +} + interface LockContext { resources: string; signal: redlock.RedlockAbortSignal; @@ -34,6 +51,21 @@ interface ManualLockContext { extension: Promise | undefined; } +interface RetryConfig { + /** Maximum number of retry attempts (default: 10) */ + maxRetries?: number; + /** Initial delay in milliseconds (default: 200) */ + baseDelay?: number; + /** Maximum delay cap in milliseconds (default: 5000) */ + maxDelay?: number; + /** Exponential backoff multiplier (default: 1.5) */ + backoffMultiplier?: number; + /** Jitter factor as percentage (default: 0.1 for 10%) */ + jitterFactor?: number; + /** Maximum total wait time in milliseconds (default: 30000) */ + maxTotalWaitTime?: number; +} + export class RunLocker { private redlock: InstanceType; private asyncLocalStorage: AsyncLocalStorage; @@ -43,8 +75,15 @@ export class RunLocker { private activeLocks: Map = new Map(); private activeManualContexts: Map = new Map(); private lockDurationHistogram: Histogram; - - constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter }) { + private retryConfig: Required; + + constructor(options: { + redis: Redis; + logger: Logger; + tracer: Tracer; + meter?: Meter; + retryConfig?: RetryConfig; + }) { this.redlock = new Redlock([options.redis], { driftFactor: 0.01, retryCount: 10, @@ -57,6 +96,16 @@ export class RunLocker { this.tracer = options.tracer; this.meter = options.meter ?? getMeter("run-engine"); + // Initialize retry configuration with defaults + this.retryConfig = { + maxRetries: options.retryConfig?.maxRetries ?? 10, + baseDelay: options.retryConfig?.baseDelay ?? 200, + maxDelay: options.retryConfig?.maxDelay ?? 5000, + backoffMultiplier: options.retryConfig?.backoffMultiplier ?? 1.5, + jitterFactor: options.retryConfig?.jitterFactor ?? 0.1, + maxTotalWaitTime: options.retryConfig?.maxTotalWaitTime ?? 30000, + }; + const activeLocksObservableGauge = this.meter.createObservableGauge("run_engine.locks.active", { description: "The number of active locks by type", unit: "locks", @@ -140,7 +189,7 @@ export class RunLocker { ); } - /** Manual lock acquisition with custom retry logic */ + /** Manual lock acquisition with exponential backoff retry logic */ async #acquireAndExecute( name: string, resources: string[], @@ -151,36 +200,115 @@ export class RunLocker { ): Promise { const joinedResources = resources.sort().join(","); - // Custom retry settings - const maxRetries = 10; - const baseDelay = 200; - const jitter = 200; + // Use configured retry settings with exponential backoff + const { maxRetries, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } = + this.retryConfig; - // Retry the lock acquisition specifically using tryCatch + // Track timing for total wait time limit + const retryStartTime = performance.now(); + let totalWaitTime = 0; + + // Retry the lock acquisition with exponential backoff let lock: redlock.Lock; + let lastError: Error | undefined; + for (let attempt = 0; attempt <= maxRetries; attempt++) { const [error, acquiredLock] = await tryCatch(this.redlock.acquire(resources, duration)); if (!error && acquiredLock) { lock = acquiredLock; + if (attempt > 0) { + this.logger.debug("[RunLocker] Lock acquired after retries", { + name, + resources, + attempts: attempt + 1, + totalWaitTime: Math.round(totalWaitTime), + }); + } break; } - // If this is the last attempt, throw the error + lastError = error instanceof Error ? error : new Error(String(error)); + + // Check if we've exceeded total wait time limit + if (totalWaitTime >= maxTotalWaitTime) { + this.logger.warn("[RunLocker] Lock acquisition exceeded total wait time limit", { + name, + resources, + attempts: attempt + 1, + totalWaitTime: Math.round(totalWaitTime), + maxTotalWaitTime, + }); + throw new LockAcquisitionTimeoutError( + resources, + Math.round(totalWaitTime), + attempt + 1, + `Lock acquisition on resources [${resources.join( + ", " + )}] exceeded total wait time limit of ${maxTotalWaitTime}ms` + ); + } + + // If this is the last attempt, throw timeout error if (attempt === maxRetries) { - throw error || new Error("Failed to acquire lock after maximum retries"); + this.logger.warn("[RunLocker] Lock acquisition exhausted all retries", { + name, + resources, + attempts: attempt + 1, + totalWaitTime: Math.round(totalWaitTime), + lastError: lastError.message, + }); + throw new LockAcquisitionTimeoutError( + resources, + Math.round(totalWaitTime), + attempt + 1, + `Lock acquisition on resources [${resources.join(", ")}] failed after ${ + attempt + 1 + } attempts` + ); } - // If it's a ResourceLockedError, we should retry - if (error && error.name === "ResourceLockedError") { - // Calculate delay with jitter - const delay = baseDelay + Math.floor((Math.random() * 2 - 1) * jitter); - await new Promise((resolve) => setTimeout(resolve, Math.max(0, delay))); + // Check if it's a retryable error (lock contention) + // ExecutionError: General redlock failure (including lock contention) + // ResourceLockedError: Specific lock contention error (if thrown) + const isRetryableError = + error && (error.name === "ResourceLockedError" || error.name === "ExecutionError"); + + if (isRetryableError) { + // Calculate exponential backoff delay with jitter and cap + const exponentialDelay = Math.min( + baseDelay * Math.pow(backoffMultiplier, attempt), + maxDelay + ); + const jitter = exponentialDelay * jitterFactor * (Math.random() * 2 - 1); // ±jitterFactor% jitter + const delay = Math.max(0, Math.round(exponentialDelay + jitter)); + + // Update total wait time before delay + totalWaitTime += delay; + + this.logger.debug("[RunLocker] Lock acquisition failed, retrying with backoff", { + name, + resources, + attempt: attempt + 1, + delay, + totalWaitTime: Math.round(totalWaitTime), + error: error.message, + errorName: error.name, + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); continue; } - // For other errors, throw immediately - throw error || new Error("Unknown error during lock acquisition"); + // For other errors (non-retryable), throw immediately + this.logger.error("[RunLocker] Lock acquisition failed with non-retryable error", { + name, + resources, + attempt: attempt + 1, + error: lastError.message, + errorName: lastError.name, + }); + throw lastError; } // Create an AbortController for our signal @@ -305,9 +433,19 @@ export class RunLocker { } } else { if (context.lock.expiration > Date.now()) { - // If lock hasn't expired yet, try again (but only if not cleaned up) + // If lock hasn't expired yet, schedule a retry instead of recursing + // This prevents stack overflow from repeated extension failures if (context.timeout !== null) { - return this.#extendLock(context, duration, signal, controller, scheduleNext); + const retryDelay = 100; // Short delay before retry + context.timeout = setTimeout(() => { + context.extension = this.#extendLock( + context, + duration, + signal, + controller, + scheduleNext + ); + }, retryDelay); } } else { // Lock has expired, abort the signal @@ -355,6 +493,10 @@ export class RunLocker { return this.asyncLocalStorage.getStore()?.resources; } + getRetryConfig(): Readonly> { + return { ...this.retryConfig }; + } + async quit() { // Clean up all active manual contexts for (const [lockId, context] of this.activeManualContexts) { diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index 8607664647..2cad3c428a 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -1,9 +1,10 @@ import { createRedisClient } from "@internal/redis"; import { redisTest } from "@internal/testcontainers"; import { expect } from "vitest"; -import { RunLocker } from "../locking.js"; +import { RunLocker, LockAcquisitionTimeoutError } from "../locking.js"; import { trace } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; +import * as redlock from "redlock"; describe("RunLocker", () => { redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => { @@ -110,10 +111,19 @@ describe("RunLocker", () => { } ); - redisTest("Test lock throws when it times out", { timeout: 15_000 }, async ({ redisOptions }) => { + redisTest("Test lock throws when it times out", { timeout: 45_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 3, + baseDelay: 100, + maxTotalWaitTime: 2000, // 2 second timeout for faster test + }, + }); try { // First, ensure we can acquire the lock normally @@ -128,18 +138,17 @@ describe("RunLocker", () => { // Now create a long-running lock const lockPromise1 = runLock.lock("test-lock", ["test-1"], 5000, async () => { - // Hold the lock longer than all possible retry attempts - // (10 retries * (200ms delay + 200ms max jitter) = ~4000ms max) - await new Promise((resolve) => setTimeout(resolve, 5000)); + // Hold the lock longer than the retry timeout + await new Promise((resolve) => setTimeout(resolve, 10000)); }); - // Try to acquire same lock immediately + // Try to acquire same lock immediately - should timeout with LockAcquisitionTimeoutError await expect( runLock.lock("test-lock", ["test-1"], 5000, async () => { // This should never execute expect(true).toBe(false); }) - ).rejects.toThrow("unable to achieve a quorum"); + ).rejects.toThrow(LockAcquisitionTimeoutError); // Complete the first lock await lockPromise1; @@ -214,8 +223,8 @@ describe("RunLocker", () => { expect(runLock.isInsideLock()).toBe(true); expect(runLock.getCurrentResources()).toBe("test-1"); - // Sleep longer than retry attempts would take - // (10 retries * (200ms delay + 200ms max jitter) = ~4000ms max) + // Sleep longer than retry attempts would normally take + // This proves the nested lock doesn't go through the retry logic await new Promise((resolve) => setTimeout(resolve, 5000)); }); }); @@ -229,4 +238,648 @@ describe("RunLocker", () => { } } ); + + redisTest( + "Test configurable retry settings work", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 2, + baseDelay: 50, + maxDelay: 200, + backoffMultiplier: 2.0, + jitterFactor: 0.1, + maxTotalWaitTime: 1000, + }, + }); + + try { + // Verify configuration is set correctly + const config = runLock.getRetryConfig(); + expect(config.maxRetries).toBe(2); + expect(config.baseDelay).toBe(50); + expect(config.maxDelay).toBe(200); + expect(config.backoffMultiplier).toBe(2.0); + expect(config.jitterFactor).toBe(0.1); + expect(config.maxTotalWaitTime).toBe(1000); + + // Test that the lock still works normally + await runLock.lock("test-lock", ["test-config"], 5000, async (signal) => { + expect(signal).toBeDefined(); + expect(runLock.isInsideLock()).toBe(true); + }); + + expect(runLock.isInsideLock()).toBe(false); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test LockAcquisitionTimeoutError contains correct information", + { timeout: 25_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 2, + baseDelay: 50, + maxTotalWaitTime: 500, // Shorter timeout to ensure failure + }, + }); + + try { + // Create a long-running lock that will definitely outlast the retry timeout + const lockPromise = runLock.lock("test-lock", ["test-error"], 10000, async () => { + await new Promise((resolve) => setTimeout(resolve, 15000)); // Hold for 15 seconds + }); + + // Wait a bit to ensure the first lock is acquired + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Try to acquire same lock and capture the timeout error + try { + await runLock.lock("test-lock", ["test-error"], 5000, async () => { + expect(true).toBe(false); // Should never execute + }); + expect(true).toBe(false); // Should not reach here + } catch (error) { + expect(error).toBeInstanceOf(LockAcquisitionTimeoutError); + + if (error instanceof LockAcquisitionTimeoutError) { + expect(error.resources).toEqual(["test-error"]); + expect(error.attempts).toBeGreaterThan(0); + expect(error.attempts).toBeLessThanOrEqual(3); // maxRetries + 1 + expect(error.totalWaitTime).toBeGreaterThan(0); + expect(error.totalWaitTime).toBeLessThanOrEqual(800); // Some tolerance + expect(error.name).toBe("LockAcquisitionTimeoutError"); + expect(error.message).toContain("test-error"); + expect(error.message).toContain(`${error.attempts} attempts`); + } + } + + // Complete the first lock + await lockPromise; + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test default configuration values", { timeout: 15_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + // No retryConfig provided - should use defaults + }); + + try { + const config = runLock.getRetryConfig(); + expect(config.maxRetries).toBe(10); + expect(config.baseDelay).toBe(200); + expect(config.maxDelay).toBe(5000); + expect(config.backoffMultiplier).toBe(1.5); + expect(config.jitterFactor).toBe(0.1); + expect(config.maxTotalWaitTime).toBe(30000); + + // Test that it still works + await runLock.lock("test-lock", ["test-default"], 5000, async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test partial configuration override", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 5, + maxTotalWaitTime: 10000, + // Other values should use defaults + }, + }); + + try { + const config = runLock.getRetryConfig(); + expect(config.maxRetries).toBe(5); // Overridden + expect(config.maxTotalWaitTime).toBe(10000); // Overridden + expect(config.baseDelay).toBe(200); // Default + expect(config.maxDelay).toBe(5000); // Default + expect(config.backoffMultiplier).toBe(1.5); // Default + expect(config.jitterFactor).toBe(0.1); // Default + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test lockIf functionality", { timeout: 15_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + let executedWithLock = false; + let executedWithoutLock = false; + let signalReceived: any = null; + + // Test with condition = true (should acquire lock) + await runLock.lockIf(true, "test-lock", ["test-lockif"], 5000, async (signal) => { + executedWithLock = true; + signalReceived = signal; + expect(runLock.isInsideLock()).toBe(true); + expect(runLock.getCurrentResources()).toBe("test-lockif"); + }); + + expect(executedWithLock).toBe(true); + expect(signalReceived).toBeDefined(); + expect(runLock.isInsideLock()).toBe(false); + + // Test with condition = false (should not acquire lock) + await runLock.lockIf(false, "test-lock", ["test-lockif"], 5000, async (signal) => { + executedWithoutLock = true; + expect(runLock.isInsideLock()).toBe(false); + expect(runLock.getCurrentResources()).toBeUndefined(); + expect(signal).toBeUndefined(); // No signal when not locked + }); + + expect(executedWithoutLock).toBe(true); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test concurrent locks on different resources", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + const results: string[] = []; + + // Start multiple concurrent locks on different resources + const lock1Promise = runLock.lock("test-lock", ["resource-1"], 5000, async () => { + results.push("lock1-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + results.push("lock1-end"); + return "result1"; + }); + + const lock2Promise = runLock.lock("test-lock", ["resource-2"], 5000, async () => { + results.push("lock2-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + results.push("lock2-end"); + return "result2"; + }); + + const lock3Promise = runLock.lock("test-lock", ["resource-3"], 5000, async () => { + results.push("lock3-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + results.push("lock3-end"); + return "result3"; + }); + + const [result1, result2, result3] = await Promise.all([ + lock1Promise, + lock2Promise, + lock3Promise, + ]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + expect(result3).toBe("result3"); + + // All locks should have started (concurrent execution) + expect(results).toContain("lock1-start"); + expect(results).toContain("lock2-start"); + expect(results).toContain("lock3-start"); + expect(results).toContain("lock1-end"); + expect(results).toContain("lock2-end"); + expect(results).toContain("lock3-end"); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test multiple resources in single lock", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + await runLock.lock( + "test-lock", + ["resource-a", "resource-b", "resource-c"], + 5000, + async (signal) => { + expect(signal).toBeDefined(); + expect(runLock.isInsideLock()).toBe(true); + // Resources should be sorted and joined + expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); + } + ); + + // Test that resource order doesn't matter (should be normalized) + await runLock.lock( + "test-lock", + ["resource-c", "resource-a", "resource-b"], + 5000, + async () => { + expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); + } + ); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test different lock names on same resources don't interfere", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + const results: string[] = []; + + // These should be able to run concurrently despite same resources + // because they have different lock names + const promise1 = runLock.lock("lock-type-1", ["shared-resource"], 5000, async () => { + results.push("type1-start"); + await new Promise((resolve) => setTimeout(resolve, 200)); + results.push("type1-end"); + }); + + const promise2 = runLock.lock("lock-type-2", ["shared-resource"], 5000, async () => { + results.push("type2-start"); + await new Promise((resolve) => setTimeout(resolve, 200)); + results.push("type2-end"); + }); + + await Promise.all([promise1, promise2]); + + // Both should have executed (different lock names don't block each other) + expect(results).toContain("type1-start"); + expect(results).toContain("type1-end"); + expect(results).toContain("type2-start"); + expect(results).toContain("type2-end"); + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test configuration edge cases", { timeout: 15_000 }, async ({ redisOptions }) => { + const logger = new Logger("RunLockTest", "debug"); + + // Test with maxRetries = 0 + const redis1 = createRedisClient(redisOptions); + const runLock1 = new RunLocker({ + redis: redis1, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 0, + baseDelay: 100, + maxTotalWaitTime: 1000, + }, + }); + + try { + const config = runLock1.getRetryConfig(); + expect(config.maxRetries).toBe(0); + + // Should work for successful acquisitions + await runLock1.lock("test-lock", ["test-edge"], 5000, async () => { + expect(runLock1.isInsideLock()).toBe(true); + }); + } finally { + await runLock1.quit(); + } + + // Test with very small delays + const redis2 = createRedisClient(redisOptions); + const runLock2 = new RunLocker({ + redis: redis2, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 2, + baseDelay: 1, + maxDelay: 10, + backoffMultiplier: 2.0, + jitterFactor: 0.5, + maxTotalWaitTime: 100, + }, + }); + + try { + const config = runLock2.getRetryConfig(); + expect(config.baseDelay).toBe(1); + expect(config.maxDelay).toBe(10); + expect(config.jitterFactor).toBe(0.5); + + await runLock2.lock("test-lock", ["test-small"], 5000, async () => { + expect(runLock2.isInsideLock()).toBe(true); + }); + } finally { + await runLock2.quit(); + } + }); + + redisTest("Test total wait time configuration", { timeout: 10_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 100, // High retry count + baseDelay: 100, + maxTotalWaitTime: 500, // But low total wait time + }, + }); + + try { + // Test that total wait time configuration is properly applied + const config = runLock.getRetryConfig(); + expect(config.maxRetries).toBe(100); + expect(config.maxTotalWaitTime).toBe(500); + expect(config.baseDelay).toBe(100); + + // Basic functionality test with the configuration + await runLock.lock("test-lock", ["test-timing-config"], 5000, async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + expect(runLock.isInsideLock()).toBe(false); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test quit functionality and cleanup", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + // Acquire some locks to create state + await runLock.lock("test-lock", ["quit-test-1"], 5000, async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + // Verify we can still acquire locks + await runLock.lock("test-lock", ["quit-test-2"], 5000, async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + // Now quit should clean everything up + await runLock.quit(); + + // After quit, should be able to create new instance and acquire locks + const newRedis = createRedisClient(redisOptions); + const newRunLock = new RunLocker({ + redis: newRedis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + await newRunLock.lock("test-lock", ["quit-test-1"], 5000, async () => { + expect(newRunLock.isInsideLock()).toBe(true); + }); + } finally { + await newRunLock.quit(); + } + } + ); + + redisTest( + "Test lock extension during long operations", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + let lockExtended = false; + const startTime = Date.now(); + + // Acquire lock with short duration but long operation + await runLock.lock("test-lock", ["extension-test"], 1000, async (signal) => { + expect(signal).toBeDefined(); + + // Operation longer than lock duration - should trigger extension + await new Promise((resolve) => setTimeout(resolve, 2500)); + + const elapsed = Date.now() - startTime; + expect(elapsed).toBeGreaterThan(2000); + + // If we get here, extension must have worked + lockExtended = true; + }); + + expect(lockExtended).toBe(true); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test getCurrentResources in various states", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + // Outside any lock + expect(runLock.getCurrentResources()).toBeUndefined(); + expect(runLock.isInsideLock()).toBe(false); + + await runLock.lock("test-lock", ["resource-x", "resource-y"], 5000, async () => { + // Inside lock + expect(runLock.getCurrentResources()).toBe("resource-x,resource-y"); + expect(runLock.isInsideLock()).toBe(true); + + await runLock.lock("test-lock", ["resource-x", "resource-y"], 5000, async () => { + // Nested lock with same resources + expect(runLock.getCurrentResources()).toBe("resource-x,resource-y"); + expect(runLock.isInsideLock()).toBe(true); + }); + }); + + // Outside lock again + expect(runLock.getCurrentResources()).toBeUndefined(); + expect(runLock.isInsideLock()).toBe(false); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test signal properties and behavior", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + let signalReceived: redlock.RedlockAbortSignal | undefined; + + await runLock.lock("test-lock", ["signal-test"], 5000, async (signal) => { + signalReceived = signal; + + expect(signal).toBeDefined(); + expect(typeof signal.aborted).toBe("boolean"); + expect(signal.aborted).toBe(false); + + // Signal should have event listener capabilities + expect(typeof signal.addEventListener).toBe("function"); + expect(typeof signal.removeEventListener).toBe("function"); + }); + + expect(signalReceived).toBeDefined(); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test retry behavior with exact timing", + { timeout: 25_000 }, + async ({ redisOptions }) => { + const redis1 = createRedisClient(redisOptions); + const redis2 = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + const runLock1 = new RunLocker({ + redis: redis1, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + const runLock2 = new RunLocker({ + redis: redis2, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxRetries: 3, + baseDelay: 100, + maxDelay: 500, + backoffMultiplier: 2.0, + jitterFactor: 0, // No jitter for predictable timing + maxTotalWaitTime: 10000, + }, + }); + + try { + // Create blocking lock with first instance - make it last much longer than retry logic + const blockingPromise = runLock1.lock("test-lock", ["timing-test"], 30000, async () => { + await new Promise((resolve) => setTimeout(resolve, 15000)); + }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const startTime = Date.now(); + try { + await runLock2.lock("test-lock", ["timing-test"], 5000, async () => { + expect(true).toBe(false); + }); + expect(true).toBe(false); // Should not reach here + } catch (error) { + const elapsed = Date.now() - startTime; + expect(error).toBeInstanceOf(LockAcquisitionTimeoutError); + + if (error instanceof LockAcquisitionTimeoutError) { + expect(error.attempts).toBe(4); // 0 + 3 retries + // With backoff: 100ms + 200ms + 400ms = 700ms total wait time + expect(error.totalWaitTime).toBeGreaterThan(600); + expect(error.totalWaitTime).toBeLessThan(800); + expect(elapsed).toBeGreaterThan(600); + } + } + + await blockingPromise; + } finally { + await runLock1.quit(); + await runLock2.quit(); + } + } + ); }); From 04b290ae081f15328304f98d9452f3b8c8c6dd7a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 10:10:42 +0100 Subject: [PATCH 05/14] Further RunLock improvements --- apps/webapp/app/env.server.ts | 9 + apps/webapp/app/v3/runEngine.server.ts | 10 ++ .../run-engine/src/engine/index.ts | 11 ++ .../run-engine/src/engine/locking.ts | 134 +++++++++++--- .../src/engine/tests/locking.test.ts | 170 ++++++++++++++++++ .../run-engine/src/engine/types.ts | 4 + 6 files changed, 318 insertions(+), 20 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 3297616866..5a9deb7ae4 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -429,6 +429,15 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500), + RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000), + RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000), + RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10), + RUN_ENGINE_RUN_LOCK_BASE_DELAY: z.coerce.number().int().default(100), + RUN_ENGINE_RUN_LOCK_MAX_DELAY: z.coerce.number().int().default(3000), + RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER: z.coerce.number().default(1.8), + RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15), + RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000), + RUN_ENGINE_WORKER_REDIS_HOST: z .string() .optional() diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index de83c92d20..acfaa57719 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -75,6 +75,16 @@ function createRunEngine() { enableAutoPipelining: true, ...(env.RUN_ENGINE_RUN_LOCK_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, + duration: env.RUN_ENGINE_RUN_LOCK_DURATION, + automaticExtensionThreshold: env.RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD, + retryConfig: { + maxRetries: env.RUN_ENGINE_RUN_LOCK_MAX_RETRIES, + baseDelay: env.RUN_ENGINE_RUN_LOCK_BASE_DELAY, + maxDelay: env.RUN_ENGINE_RUN_LOCK_MAX_DELAY, + backoffMultiplier: env.RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER, + jitterFactor: env.RUN_ENGINE_RUN_LOCK_JITTER_FACTOR, + maxTotalWaitTime: env.RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME, + }, }, tracer, meter, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index ac29abb749..ae8e494c57 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -98,6 +98,17 @@ export class RunEngine { logger: this.logger, tracer: trace.getTracer("RunLocker"), meter: options.meter, + defaultDuration: options.runLock.duration ?? 5000, + automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000, + retryConfig: { + maxRetries: 10, + baseDelay: 100, + maxDelay: 3000, + backoffMultiplier: 1.8, + jitterFactor: 0.15, + maxTotalWaitTime: 15000, + ...options.runLock.retryConfig, + }, }); const keys = new RunQueueFullKeyProducer(); diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 859f20de3f..f1fc2799dc 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -51,7 +51,7 @@ interface ManualLockContext { extension: Promise | undefined; } -interface RetryConfig { +export interface LockRetryConfig { /** Maximum number of retry attempts (default: 10) */ maxRetries?: number; /** Initial delay in milliseconds (default: 200) */ @@ -66,6 +66,15 @@ interface RetryConfig { maxTotalWaitTime?: number; } +interface LockOptions { + /** Default lock duration in milliseconds (default: 5000) */ + defaultDuration?: number; + /** Automatic extension threshold in milliseconds - how early to extend locks before expiration (default: 500) */ + automaticExtensionThreshold?: number; + /** Retry configuration for lock acquisition */ + retryConfig?: LockRetryConfig; +} + export class RunLocker { private redlock: InstanceType; private asyncLocalStorage: AsyncLocalStorage; @@ -75,21 +84,29 @@ export class RunLocker { private activeLocks: Map = new Map(); private activeManualContexts: Map = new Map(); private lockDurationHistogram: Histogram; - private retryConfig: Required; + private retryConfig: Required; + private defaultDuration: number; + private automaticExtensionThreshold: number; constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter; - retryConfig?: RetryConfig; + defaultDuration?: number; + automaticExtensionThreshold?: number; + retryConfig?: LockRetryConfig; }) { + // Initialize configuration values + this.defaultDuration = options.defaultDuration ?? 5000; + this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500; + this.redlock = new Redlock([options.redis], { driftFactor: 0.01, - retryCount: 10, - retryDelay: 200, // time in ms - retryJitter: 200, // time in ms - automaticExtensionThreshold: 500, // time in ms + retryCount: 0, // Disable Redlock's internal retrying - we handle retries ourselves + retryDelay: 200, // Not used since retryCount = 0 + retryJitter: 200, // Not used since retryCount = 0 + automaticExtensionThreshold: this.automaticExtensionThreshold, }); this.asyncLocalStorage = new AsyncLocalStorage(); this.logger = options.logger; @@ -143,12 +160,37 @@ export class RunLocker { async lock( name: string, resources: string[], - duration: number, + duration: number | undefined, routine: (signal: redlock.RedlockAbortSignal) => Promise + ): Promise; + async lock( + name: string, + resources: string[], + routine: (signal: redlock.RedlockAbortSignal) => Promise + ): Promise; + async lock( + name: string, + resources: string[], + durationOrRoutine: number | undefined | ((signal: redlock.RedlockAbortSignal) => Promise), + routine?: (signal: redlock.RedlockAbortSignal) => Promise ): Promise { const currentContext = this.asyncLocalStorage.getStore(); const joinedResources = resources.sort().join(","); + // Handle overloaded parameters + let actualDuration: number; + let actualRoutine: (signal: redlock.RedlockAbortSignal) => Promise; + + if (typeof durationOrRoutine === "function") { + // Called as lock(name, resources, routine) - use default duration + actualDuration = this.defaultDuration; + actualRoutine = durationOrRoutine; + } else { + // Called as lock(name, resources, duration, routine) - use provided duration + actualDuration = durationOrRoutine ?? this.defaultDuration; + actualRoutine = routine!; + } + return startSpan( this.tracer, "RunLocker.lock", @@ -156,7 +198,7 @@ export class RunLocker { if (currentContext && currentContext.resources === joinedResources) { span.setAttribute("nested", true); // We're already inside a lock with the same resources, just run the routine - return routine(currentContext.signal); + return actualRoutine(currentContext.signal); } span.setAttribute("nested", false); @@ -166,7 +208,14 @@ export class RunLocker { const lockStartTime = performance.now(); const [error, result] = await tryCatch( - this.#acquireAndExecute(name, resources, duration, routine, lockId, lockStartTime) + this.#acquireAndExecute( + name, + resources, + actualDuration, + actualRoutine, + lockId, + lockStartTime + ) ); if (error) { @@ -177,14 +226,18 @@ export class RunLocker { [SemanticAttributes.LOCK_SUCCESS]: "false", }); - this.logger.error("[RunLocker] Error locking resources", { error, resources, duration }); + this.logger.error("[RunLocker] Error locking resources", { + error, + resources, + duration: actualDuration, + }); throw error; } return result; }, { - attributes: { name, resources, timeout: duration }, + attributes: { name, resources, timeout: actualDuration }, } ); } @@ -387,15 +440,14 @@ export class RunLocker { signal: redlock.RedlockAbortSignal, controller: AbortController ): void { - const automaticExtensionThreshold = 500; // Same as redlock default - - if (automaticExtensionThreshold > duration - 100) { + if (this.automaticExtensionThreshold > duration - 100) { // Don't set up auto-extension if duration is too short return; } const scheduleExtension = (): void => { - const timeUntilExtension = context.lock.expiration - Date.now() - automaticExtensionThreshold; + const timeUntilExtension = + context.lock.expiration - Date.now() - this.automaticExtensionThreshold; if (timeUntilExtension > 0) { context.timeout = setTimeout(() => { @@ -475,13 +527,47 @@ export class RunLocker { condition: boolean, name: string, resources: string[], - duration: number, + duration: number | undefined, + routine: (signal?: redlock.RedlockAbortSignal) => Promise + ): Promise; + async lockIf( + condition: boolean, + name: string, + resources: string[], routine: (signal?: redlock.RedlockAbortSignal) => Promise + ): Promise; + async lockIf( + condition: boolean, + name: string, + resources: string[], + durationOrRoutine: number | undefined | ((signal?: redlock.RedlockAbortSignal) => Promise), + routine?: (signal?: redlock.RedlockAbortSignal) => Promise ): Promise { if (condition) { - return this.lock(name, resources, duration, routine); + // Handle overloaded parameters + if (typeof durationOrRoutine === "function") { + // Called as lockIf(condition, name, resources, routine) - use default duration + return this.lock( + name, + resources, + durationOrRoutine as (signal: redlock.RedlockAbortSignal) => Promise + ); + } else { + // Called as lockIf(condition, name, resources, duration, routine) - use provided duration + return this.lock( + name, + resources, + durationOrRoutine, + routine! as (signal: redlock.RedlockAbortSignal) => Promise + ); + } } else { - return routine(); + // Handle overloaded parameters for non-lock case + if (typeof durationOrRoutine === "function") { + return durationOrRoutine(); + } else { + return routine!(); + } } } @@ -493,10 +579,18 @@ export class RunLocker { return this.asyncLocalStorage.getStore()?.resources; } - getRetryConfig(): Readonly> { + getRetryConfig(): Readonly> { return { ...this.retryConfig }; } + getDefaultDuration(): number { + return this.defaultDuration; + } + + getAutomaticExtensionThreshold(): number { + return this.automaticExtensionThreshold; + } + async quit() { // Clean up all active manual contexts for (const [lockId, context] of this.activeManualContexts) { diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index 2cad3c428a..5d20dd0c0f 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -578,6 +578,176 @@ describe("RunLocker", () => { } ); + redisTest( + "Test default duration configuration", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test with custom default duration + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + defaultDuration: 8000, + }); + + try { + // Test that the default duration is set correctly + expect(runLock.getDefaultDuration()).toBe(8000); + + // Test lock without specifying duration (should use default) + const startTime = Date.now(); + await runLock.lock("test-lock", ["default-duration-test"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Sleep for a bit to ensure the lock is working + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + const elapsed = Date.now() - startTime; + expect(elapsed).toBeGreaterThan(90); // Should have completed successfully + + // Test lock with explicit duration (should override default) + await runLock.lock("test-lock", ["explicit-duration-test"], 2000, async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + // Test lockIf without duration (should use default) + await runLock.lockIf(true, "test-lock", ["lockif-default"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + // Test lockIf with explicit duration + await runLock.lockIf(true, "test-lock", ["lockif-explicit"], 3000, async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test automatic extension threshold configuration", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test with custom automatic extension threshold + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + automaticExtensionThreshold: 200, // Custom threshold + }); + + try { + // Test that the threshold is set correctly + expect(runLock.getAutomaticExtensionThreshold()).toBe(200); + expect(runLock.getDefaultDuration()).toBe(5000); // Should use default + + // Test lock extension with custom threshold + // Use a short lock duration but longer operation to trigger extension + await runLock.lock("test-lock", ["extension-threshold-test"], 800, async () => { + expect(runLock.isInsideLock()).toBe(true); + // Sleep longer than lock duration to ensure extension works + await new Promise((resolve) => setTimeout(resolve, 1200)); + }); + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test Redlock retry configuration", { timeout: 10_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test that we can configure all settings + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + defaultDuration: 3000, + automaticExtensionThreshold: 300, + retryConfig: { + maxRetries: 5, + baseDelay: 150, + }, + }); + + try { + // Verify all configurations are set + expect(runLock.getDefaultDuration()).toBe(3000); + expect(runLock.getAutomaticExtensionThreshold()).toBe(300); + + const retryConfig = runLock.getRetryConfig(); + expect(retryConfig.maxRetries).toBe(5); + expect(retryConfig.baseDelay).toBe(150); + + // Test basic functionality with all custom configs + await runLock.lock("test-lock", ["all-config-test"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test production-optimized configuration", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test with production-optimized settings (similar to RunEngine) + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + defaultDuration: 10000, + automaticExtensionThreshold: 2000, + retryConfig: { + maxRetries: 15, + baseDelay: 100, + maxDelay: 3000, + backoffMultiplier: 1.8, + jitterFactor: 0.15, + maxTotalWaitTime: 25000, + }, + }); + + try { + // Verify production configuration + expect(runLock.getDefaultDuration()).toBe(10000); + expect(runLock.getAutomaticExtensionThreshold()).toBe(2000); + + const retryConfig = runLock.getRetryConfig(); + expect(retryConfig.maxRetries).toBe(15); + expect(retryConfig.baseDelay).toBe(100); + expect(retryConfig.maxDelay).toBe(3000); + expect(retryConfig.backoffMultiplier).toBe(1.8); + expect(retryConfig.jitterFactor).toBe(0.15); + expect(retryConfig.maxTotalWaitTime).toBe(25000); + + // Test lock with default duration (should use 10 seconds) + const startTime = Date.now(); + await runLock.lock("test-lock", ["production-config"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Simulate a typical operation duration + await new Promise((resolve) => setTimeout(resolve, 200)); + }); + const elapsed = Date.now() - startTime; + expect(elapsed).toBeGreaterThan(190); + expect(elapsed).toBeLessThan(1000); // Should complete quickly for successful operation + } finally { + await runLock.quit(); + } + } + ); + redisTest("Test configuration edge cases", { timeout: 15_000 }, async ({ redisOptions }) => { const logger = new Logger("RunLockTest", "debug"); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index f07dd703ab..50e7c8da4e 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -13,6 +13,7 @@ import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelecti import { MinimalAuthenticatedEnvironment } from "../shared/index.js"; import { workerCatalog } from "./workerCatalog.js"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { LockRetryConfig } from "./locking.js"; export type RunEngineOptions = { prisma: PrismaClient; @@ -45,6 +46,9 @@ export type RunEngineOptions = { }; runLock: { redis: RedisOptions; + duration?: number; + automaticExtensionThreshold?: number; + retryConfig?: LockRetryConfig; }; /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number; From d75f7fd427e0ae1959367283c9a1318eeca28590 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 13:47:54 +0100 Subject: [PATCH 06/14] A few more improvements --- .../run-engine/src/engine/locking.ts | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index f1fc2799dc..f7b0b47d77 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -175,7 +175,7 @@ export class RunLocker { routine?: (signal: redlock.RedlockAbortSignal) => Promise ): Promise { const currentContext = this.asyncLocalStorage.getStore(); - const joinedResources = resources.sort().join(","); + const joinedResources = [...resources].sort().join(","); // Handle overloaded parameters let actualDuration: number; @@ -251,7 +251,9 @@ export class RunLocker { lockId: string, lockStartTime: number ): Promise { - const joinedResources = resources.sort().join(","); + // Sort resources to ensure consistent lock acquisition order and prevent deadlocks + const sortedResources = [...resources].sort(); + const joinedResources = sortedResources.join(","); // Use configured retry settings with exponential backoff const { maxRetries, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } = @@ -266,14 +268,14 @@ export class RunLocker { let lastError: Error | undefined; for (let attempt = 0; attempt <= maxRetries; attempt++) { - const [error, acquiredLock] = await tryCatch(this.redlock.acquire(resources, duration)); + const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration)); if (!error && acquiredLock) { lock = acquiredLock; if (attempt > 0) { this.logger.debug("[RunLocker] Lock acquired after retries", { name, - resources, + resources: sortedResources, attempts: attempt + 1, totalWaitTime: Math.round(totalWaitTime), }); @@ -287,16 +289,16 @@ export class RunLocker { if (totalWaitTime >= maxTotalWaitTime) { this.logger.warn("[RunLocker] Lock acquisition exceeded total wait time limit", { name, - resources, + resources: sortedResources, attempts: attempt + 1, totalWaitTime: Math.round(totalWaitTime), maxTotalWaitTime, }); throw new LockAcquisitionTimeoutError( - resources, + sortedResources, Math.round(totalWaitTime), attempt + 1, - `Lock acquisition on resources [${resources.join( + `Lock acquisition on resources [${sortedResources.join( ", " )}] exceeded total wait time limit of ${maxTotalWaitTime}ms` ); @@ -306,16 +308,16 @@ export class RunLocker { if (attempt === maxRetries) { this.logger.warn("[RunLocker] Lock acquisition exhausted all retries", { name, - resources, + resources: sortedResources, attempts: attempt + 1, totalWaitTime: Math.round(totalWaitTime), lastError: lastError.message, }); throw new LockAcquisitionTimeoutError( - resources, + sortedResources, Math.round(totalWaitTime), attempt + 1, - `Lock acquisition on resources [${resources.join(", ")}] failed after ${ + `Lock acquisition on resources [${sortedResources.join(", ")}] failed after ${ attempt + 1 } attempts` ); @@ -334,14 +336,14 @@ export class RunLocker { maxDelay ); const jitter = exponentialDelay * jitterFactor * (Math.random() * 2 - 1); // ±jitterFactor% jitter - const delay = Math.max(0, Math.round(exponentialDelay + jitter)); + const delay = Math.min(maxDelay, Math.max(0, Math.round(exponentialDelay + jitter))); // Update total wait time before delay totalWaitTime += delay; this.logger.debug("[RunLocker] Lock acquisition failed, retrying with backoff", { name, - resources, + resources: sortedResources, attempt: attempt + 1, delay, totalWaitTime: Math.round(totalWaitTime), @@ -356,7 +358,7 @@ export class RunLocker { // For other errors (non-retryable), throw immediately this.logger.error("[RunLocker] Lock acquisition failed with non-retryable error", { name, - resources, + resources: sortedResources, attempt: attempt + 1, error: lastError.message, errorName: lastError.name, @@ -390,7 +392,7 @@ export class RunLocker { // Track active lock this.activeLocks.set(lockId, { lockType: name, - resources: resources, + resources: sortedResources, }); let lockSuccess = true; @@ -426,7 +428,7 @@ export class RunLocker { if (releaseError) { this.logger.warn("[RunLocker] Error releasing lock", { error: releaseError, - resources, + resources: sortedResources, lockValue: lock!.value, }); } From 8874e1711996dfaace15ea3561fc42bb0e9b3cc5 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 14:26:44 +0100 Subject: [PATCH 07/14] Remove the per lock duration (duration same for all locks in a single RunLock instance) --- .../run-engine/src/engine/index.ts | 4 +- .../run-engine/src/engine/locking.ts | 112 ++---------- .../src/engine/systems/checkpointSystem.ts | 4 +- .../src/engine/systems/delayedRunSystem.ts | 2 +- .../src/engine/systems/dequeueSystem.ts | 5 +- .../src/engine/systems/enqueueSystem.ts | 2 +- .../systems/releaseConcurrencySystem.ts | 1 - .../src/engine/systems/runAttemptSystem.ts | 10 +- .../src/engine/systems/ttlSystem.ts | 2 +- .../src/engine/systems/waitpointSystem.ts | 4 +- .../src/engine/tests/locking.test.ts | 173 ++++++------------ 11 files changed, 86 insertions(+), 233 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index ae8e494c57..39c862d848 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -98,7 +98,7 @@ export class RunEngine { logger: this.logger, tracer: trace.getTracer("RunLocker"), meter: options.meter, - defaultDuration: options.runLock.duration ?? 5000, + duration: options.runLock.duration ?? 5000, automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000, retryConfig: { maxRetries: 10, @@ -1162,7 +1162,7 @@ export class RunEngine { tx?: PrismaClientOrTransaction; }) { const prisma = tx ?? this.prisma; - return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => { + return await this.runLock.lock("handleStalledSnapshot", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { this.logger.log( diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index f7b0b47d77..a91debc98a 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -66,15 +66,6 @@ export interface LockRetryConfig { maxTotalWaitTime?: number; } -interface LockOptions { - /** Default lock duration in milliseconds (default: 5000) */ - defaultDuration?: number; - /** Automatic extension threshold in milliseconds - how early to extend locks before expiration (default: 500) */ - automaticExtensionThreshold?: number; - /** Retry configuration for lock acquisition */ - retryConfig?: LockRetryConfig; -} - export class RunLocker { private redlock: InstanceType; private asyncLocalStorage: AsyncLocalStorage; @@ -85,7 +76,7 @@ export class RunLocker { private activeManualContexts: Map = new Map(); private lockDurationHistogram: Histogram; private retryConfig: Required; - private defaultDuration: number; + private duration: number; private automaticExtensionThreshold: number; constructor(options: { @@ -93,12 +84,12 @@ export class RunLocker { logger: Logger; tracer: Tracer; meter?: Meter; - defaultDuration?: number; + duration?: number; automaticExtensionThreshold?: number; retryConfig?: LockRetryConfig; }) { // Initialize configuration values - this.defaultDuration = options.defaultDuration ?? 5000; + this.duration = options.duration ?? 5000; this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500; this.redlock = new Redlock([options.redis], { @@ -157,40 +148,10 @@ export class RunLocker { } /** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */ - async lock( - name: string, - resources: string[], - duration: number | undefined, - routine: (signal: redlock.RedlockAbortSignal) => Promise - ): Promise; - async lock( - name: string, - resources: string[], - routine: (signal: redlock.RedlockAbortSignal) => Promise - ): Promise; - async lock( - name: string, - resources: string[], - durationOrRoutine: number | undefined | ((signal: redlock.RedlockAbortSignal) => Promise), - routine?: (signal: redlock.RedlockAbortSignal) => Promise - ): Promise { + async lock(name: string, resources: string[], routine: () => Promise): Promise { const currentContext = this.asyncLocalStorage.getStore(); const joinedResources = [...resources].sort().join(","); - // Handle overloaded parameters - let actualDuration: number; - let actualRoutine: (signal: redlock.RedlockAbortSignal) => Promise; - - if (typeof durationOrRoutine === "function") { - // Called as lock(name, resources, routine) - use default duration - actualDuration = this.defaultDuration; - actualRoutine = durationOrRoutine; - } else { - // Called as lock(name, resources, duration, routine) - use provided duration - actualDuration = durationOrRoutine ?? this.defaultDuration; - actualRoutine = routine!; - } - return startSpan( this.tracer, "RunLocker.lock", @@ -198,7 +159,7 @@ export class RunLocker { if (currentContext && currentContext.resources === joinedResources) { span.setAttribute("nested", true); // We're already inside a lock with the same resources, just run the routine - return actualRoutine(currentContext.signal); + return routine(); } span.setAttribute("nested", false); @@ -208,14 +169,7 @@ export class RunLocker { const lockStartTime = performance.now(); const [error, result] = await tryCatch( - this.#acquireAndExecute( - name, - resources, - actualDuration, - actualRoutine, - lockId, - lockStartTime - ) + this.#acquireAndExecute(name, resources, this.duration, routine, lockId, lockStartTime) ); if (error) { @@ -229,7 +183,7 @@ export class RunLocker { this.logger.error("[RunLocker] Error locking resources", { error, resources, - duration: actualDuration, + duration: this.duration, }); throw error; } @@ -237,7 +191,7 @@ export class RunLocker { return result; }, { - attributes: { name, resources, timeout: actualDuration }, + attributes: { name, resources, timeout: this.duration }, } ); } @@ -247,7 +201,7 @@ export class RunLocker { name: string, resources: string[], duration: number, - routine: (signal: redlock.RedlockAbortSignal) => Promise, + routine: () => Promise, lockId: string, lockStartTime: number ): Promise { @@ -260,7 +214,6 @@ export class RunLocker { this.retryConfig; // Track timing for total wait time limit - const retryStartTime = performance.now(); let totalWaitTime = 0; // Retry the lock acquisition with exponential backoff @@ -398,7 +351,7 @@ export class RunLocker { let lockSuccess = true; try { const result = await this.asyncLocalStorage.run(newContext, async () => { - return routine(signal); + return routine(); }); return result; @@ -529,47 +482,12 @@ export class RunLocker { condition: boolean, name: string, resources: string[], - duration: number | undefined, - routine: (signal?: redlock.RedlockAbortSignal) => Promise - ): Promise; - async lockIf( - condition: boolean, - name: string, - resources: string[], - routine: (signal?: redlock.RedlockAbortSignal) => Promise - ): Promise; - async lockIf( - condition: boolean, - name: string, - resources: string[], - durationOrRoutine: number | undefined | ((signal?: redlock.RedlockAbortSignal) => Promise), - routine?: (signal?: redlock.RedlockAbortSignal) => Promise + routine: () => Promise ): Promise { if (condition) { - // Handle overloaded parameters - if (typeof durationOrRoutine === "function") { - // Called as lockIf(condition, name, resources, routine) - use default duration - return this.lock( - name, - resources, - durationOrRoutine as (signal: redlock.RedlockAbortSignal) => Promise - ); - } else { - // Called as lockIf(condition, name, resources, duration, routine) - use provided duration - return this.lock( - name, - resources, - durationOrRoutine, - routine! as (signal: redlock.RedlockAbortSignal) => Promise - ); - } + return this.lock(name, resources, routine); } else { - // Handle overloaded parameters for non-lock case - if (typeof durationOrRoutine === "function") { - return durationOrRoutine(); - } else { - return routine!(); - } + return routine(); } } @@ -585,8 +503,8 @@ export class RunLocker { return { ...this.retryConfig }; } - getDefaultDuration(): number { - return this.defaultDuration; + getDuration(): number { + return this.duration; } getAutomaticExtensionThreshold(): number { diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index 3fe96ea238..1220ce43d2 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -53,7 +53,7 @@ export class CheckpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("createCheckpoint", [runId], 5_000, async () => { + return await this.$.runLock.lock("createCheckpoint", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); const isValidSnapshot = @@ -267,7 +267,7 @@ export class CheckpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("continueRunExecution", [runId], 5_000, async () => { + return await this.$.runLock.lock("continueRunExecution", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); if (snapshot.id !== snapshotId) { diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 9a45977442..e3dca4b544 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -37,7 +37,7 @@ export class DelayedRunSystem { this.$.tracer, "rescheduleDelayedRun", async () => { - return await this.$.runLock.lock("rescheduleDelayedRun", [runId], 5_000, async () => { + return await this.$.runLock.lock("rescheduleDelayedRun", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); //if the run isn't just created then we can't reschedule it diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 0bfcf44c62..bff9ec9cd6 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -80,8 +80,7 @@ export class DequeueSystem { const dequeuedRun = await this.$.runLock.lock( "dequeueFromWorkerQueue", [runId], - 5000, - async (signal) => { + async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); if (!isDequeueableExecutionStatus(snapshot.executionStatus)) { @@ -548,7 +547,7 @@ export class DequeueSystem { statusReason, }); - return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("pendingVersion", [runId], async () => { this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version lock acquired", { runId, reason, diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index c43f93bcf7..395e44727c 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -56,7 +56,7 @@ export class EnqueueSystem { }) { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], 5000, async () => { + return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], async () => { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run: run, snapshot: { diff --git a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts index 0f9583df4f..c89c1fe709 100644 --- a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts +++ b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts @@ -263,7 +263,6 @@ export class ReleaseConcurrencySystem { return await this.$.runLock.lock( "executeReleaseConcurrencyForSnapshot", [snapshot.runId], - 5_000, async () => { const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId); diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index e7aec0202d..1a45c4108f 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -85,7 +85,7 @@ export class RunAttemptSystem { this.$.tracer, "startRunAttempt", async (span) => { - return this.$.runLock.lock("startRunAttempt", [runId], 5000, async () => { + return this.$.runLock.lock("startRunAttempt", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -441,7 +441,7 @@ export class RunAttemptSystem { this.$.tracer, "#completeRunAttemptSuccess", async (span) => { - return this.$.runLock.lock("attemptSucceeded", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("attemptSucceeded", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -594,7 +594,7 @@ export class RunAttemptSystem { this.$.tracer, "completeRunAttemptFailure", async (span) => { - return this.$.runLock.lock("attemptFailed", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("attemptFailed", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -905,7 +905,7 @@ export class RunAttemptSystem { }): Promise<{ wasRequeued: boolean } & ExecutionResult> { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("tryNackAndRequeue", [run.id], 5000, async (signal) => { + return await this.$.runLock.lock("tryNackAndRequeue", [run.id], async () => { //we nack the message, this allows another work to pick up the run const gotRequeued = await this.$.runQueue.nackMessage({ orgId, @@ -982,7 +982,7 @@ export class RunAttemptSystem { reason = reason ?? "Cancelled by user"; return startSpan(this.$.tracer, "cancelRun", async (span) => { - return this.$.runLock.lock("cancelRun", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("cancelRun", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); //already finished, do nothing diff --git a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts index 5ab957c989..cbed7b98ad 100644 --- a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts @@ -23,7 +23,7 @@ export class TtlSystem { async expireRun({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) { const prisma = tx ?? this.$.prisma; - await this.$.runLock.lock("expireRun", [runId], 5_000, async () => { + await this.$.runLock.lock("expireRun", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); //if we're executing then we won't expire the run diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 92f0885c07..52a8094e4b 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -380,7 +380,7 @@ export class WaitpointSystem { let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints; - return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], 5000, async () => { + return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); //block the run with the waitpoints, returning how many waitpoints are pending @@ -549,7 +549,7 @@ export class WaitpointSystem { } //4. Continue the run whether it's executing or not - await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => { + await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); if (isFinishedOrPendingFinished(snapshot.executionStatus)) { diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index 5d20dd0c0f..e289e36fe2 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -4,7 +4,6 @@ import { expect } from "vitest"; import { RunLocker, LockAcquisitionTimeoutError } from "../locking.js"; import { trace } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; -import * as redlock from "redlock"; describe("RunLocker", () => { redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => { @@ -19,8 +18,7 @@ describe("RunLocker", () => { try { expect(runLock.isInsideLock()).toBe(false); - await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); }); @@ -38,13 +36,11 @@ describe("RunLocker", () => { try { expect(runLock.isInsideLock()).toBe(false); - await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); //should be able to "lock it again" - await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); }); }); @@ -67,7 +63,7 @@ describe("RunLocker", () => { expect(runLock.isInsideLock()).toBe(false); await expect( - runLock.lock("test-lock", ["test-1"], 5000, async () => { + runLock.lock("test-lock", ["test-1"], async () => { throw new Error("Test error"); }) ).rejects.toThrow("Test error"); @@ -92,11 +88,11 @@ describe("RunLocker", () => { expect(runLock.isInsideLock()).toBe(false); await expect( - runLock.lock("test-lock", ["test-1"], 5000, async () => { + runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); // Nested lock with same resource - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); throw new Error("Inner lock error"); }); @@ -128,7 +124,7 @@ describe("RunLocker", () => { try { // First, ensure we can acquire the lock normally let firstLockAcquired = false; - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { firstLockAcquired = true; }); //wait for 20ms @@ -137,14 +133,14 @@ describe("RunLocker", () => { expect(firstLockAcquired).toBe(true); // Now create a long-running lock - const lockPromise1 = runLock.lock("test-lock", ["test-1"], 5000, async () => { + const lockPromise1 = runLock.lock("test-lock", ["test-1"], async () => { // Hold the lock longer than the retry timeout await new Promise((resolve) => setTimeout(resolve, 10000)); }); // Try to acquire same lock immediately - should timeout with LockAcquisitionTimeoutError await expect( - runLock.lock("test-lock", ["test-1"], 5000, async () => { + runLock.lock("test-lock", ["test-1"], async () => { // This should never execute expect(true).toBe(false); }) @@ -169,13 +165,13 @@ describe("RunLocker", () => { const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); try { - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { // First lock acquired expect(runLock.isInsideLock()).toBe(true); // Try to acquire the same resource with a very short timeout // This should work because we already hold the lock - await runLock.lock("test-lock", ["test-1"], 100, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); // Wait longer than the timeout to prove it doesn't matter await new Promise((resolve) => setTimeout(resolve, 500)); @@ -201,7 +197,7 @@ describe("RunLocker", () => { try { // First verify we can acquire the lock normally let firstLockAcquired = false; - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { firstLockAcquired = true; }); expect(firstLockAcquired).toBe(true); @@ -210,7 +206,7 @@ describe("RunLocker", () => { let outerLockExecuted = false; let innerLockExecuted = false; - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { outerLockExecuted = true; expect(runLock.isInsideLock()).toBe(true); expect(runLock.getCurrentResources()).toBe("test-1"); @@ -218,7 +214,7 @@ describe("RunLocker", () => { // Try to acquire the same resource in a nested lock // This should work immediately without any retries // because we already hold the lock - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { innerLockExecuted = true; expect(runLock.isInsideLock()).toBe(true); expect(runLock.getCurrentResources()).toBe("test-1"); @@ -270,8 +266,7 @@ describe("RunLocker", () => { expect(config.maxTotalWaitTime).toBe(1000); // Test that the lock still works normally - await runLock.lock("test-lock", ["test-config"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-config"], async () => { expect(runLock.isInsideLock()).toBe(true); }); @@ -301,7 +296,7 @@ describe("RunLocker", () => { try { // Create a long-running lock that will definitely outlast the retry timeout - const lockPromise = runLock.lock("test-lock", ["test-error"], 10000, async () => { + const lockPromise = runLock.lock("test-lock", ["test-error"], async () => { await new Promise((resolve) => setTimeout(resolve, 15000)); // Hold for 15 seconds }); @@ -310,7 +305,7 @@ describe("RunLocker", () => { // Try to acquire same lock and capture the timeout error try { - await runLock.lock("test-lock", ["test-error"], 5000, async () => { + await runLock.lock("test-lock", ["test-error"], async () => { expect(true).toBe(false); // Should never execute }); expect(true).toBe(false); // Should not reach here @@ -357,7 +352,7 @@ describe("RunLocker", () => { expect(config.maxTotalWaitTime).toBe(30000); // Test that it still works - await runLock.lock("test-lock", ["test-default"], 5000, async () => { + await runLock.lock("test-lock", ["test-default"], async () => { expect(runLock.isInsideLock()).toBe(true); }); } finally { @@ -408,26 +403,22 @@ describe("RunLocker", () => { try { let executedWithLock = false; let executedWithoutLock = false; - let signalReceived: any = null; // Test with condition = true (should acquire lock) - await runLock.lockIf(true, "test-lock", ["test-lockif"], 5000, async (signal) => { + await runLock.lockIf(true, "test-lock", ["test-lockif"], async () => { executedWithLock = true; - signalReceived = signal; expect(runLock.isInsideLock()).toBe(true); expect(runLock.getCurrentResources()).toBe("test-lockif"); }); expect(executedWithLock).toBe(true); - expect(signalReceived).toBeDefined(); expect(runLock.isInsideLock()).toBe(false); // Test with condition = false (should not acquire lock) - await runLock.lockIf(false, "test-lock", ["test-lockif"], 5000, async (signal) => { + await runLock.lockIf(false, "test-lock", ["test-lockif"], async () => { executedWithoutLock = true; expect(runLock.isInsideLock()).toBe(false); expect(runLock.getCurrentResources()).toBeUndefined(); - expect(signal).toBeUndefined(); // No signal when not locked }); expect(executedWithoutLock).toBe(true); @@ -452,21 +443,21 @@ describe("RunLocker", () => { const results: string[] = []; // Start multiple concurrent locks on different resources - const lock1Promise = runLock.lock("test-lock", ["resource-1"], 5000, async () => { + const lock1Promise = runLock.lock("test-lock", ["resource-1"], async () => { results.push("lock1-start"); await new Promise((resolve) => setTimeout(resolve, 100)); results.push("lock1-end"); return "result1"; }); - const lock2Promise = runLock.lock("test-lock", ["resource-2"], 5000, async () => { + const lock2Promise = runLock.lock("test-lock", ["resource-2"], async () => { results.push("lock2-start"); await new Promise((resolve) => setTimeout(resolve, 100)); results.push("lock2-end"); return "result2"; }); - const lock3Promise = runLock.lock("test-lock", ["resource-3"], 5000, async () => { + const lock3Promise = runLock.lock("test-lock", ["resource-3"], async () => { results.push("lock3-start"); await new Promise((resolve) => setTimeout(resolve, 100)); results.push("lock3-end"); @@ -509,27 +500,16 @@ describe("RunLocker", () => { }); try { - await runLock.lock( - "test-lock", - ["resource-a", "resource-b", "resource-c"], - 5000, - async (signal) => { - expect(signal).toBeDefined(); - expect(runLock.isInsideLock()).toBe(true); - // Resources should be sorted and joined - expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); - } - ); + await runLock.lock("test-lock", ["resource-a", "resource-b", "resource-c"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Resources should be sorted and joined + expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); + }); // Test that resource order doesn't matter (should be normalized) - await runLock.lock( - "test-lock", - ["resource-c", "resource-a", "resource-b"], - 5000, - async () => { - expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); - } - ); + await runLock.lock("test-lock", ["resource-c", "resource-a", "resource-b"], async () => { + expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); + }); } finally { await runLock.quit(); } @@ -553,13 +533,13 @@ describe("RunLocker", () => { // These should be able to run concurrently despite same resources // because they have different lock names - const promise1 = runLock.lock("lock-type-1", ["shared-resource"], 5000, async () => { + const promise1 = runLock.lock("lock-type-1", ["shared-resource"], async () => { results.push("type1-start"); await new Promise((resolve) => setTimeout(resolve, 200)); results.push("type1-end"); }); - const promise2 = runLock.lock("lock-type-2", ["shared-resource"], 5000, async () => { + const promise2 = runLock.lock("lock-type-2", ["shared-resource"], async () => { results.push("type2-start"); await new Promise((resolve) => setTimeout(resolve, 200)); results.push("type2-end"); @@ -590,12 +570,12 @@ describe("RunLocker", () => { redis, logger, tracer: trace.getTracer("RunLockTest"), - defaultDuration: 8000, + duration: 8000, }); try { // Test that the default duration is set correctly - expect(runLock.getDefaultDuration()).toBe(8000); + expect(runLock.getDuration()).toBe(8000); // Test lock without specifying duration (should use default) const startTime = Date.now(); @@ -607,20 +587,10 @@ describe("RunLocker", () => { const elapsed = Date.now() - startTime; expect(elapsed).toBeGreaterThan(90); // Should have completed successfully - // Test lock with explicit duration (should override default) - await runLock.lock("test-lock", ["explicit-duration-test"], 2000, async () => { - expect(runLock.isInsideLock()).toBe(true); - }); - // Test lockIf without duration (should use default) await runLock.lockIf(true, "test-lock", ["lockif-default"], async () => { expect(runLock.isInsideLock()).toBe(true); }); - - // Test lockIf with explicit duration - await runLock.lockIf(true, "test-lock", ["lockif-explicit"], 3000, async () => { - expect(runLock.isInsideLock()).toBe(true); - }); } finally { await runLock.quit(); } @@ -640,16 +610,17 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), automaticExtensionThreshold: 200, // Custom threshold + duration: 800, }); try { // Test that the threshold is set correctly expect(runLock.getAutomaticExtensionThreshold()).toBe(200); - expect(runLock.getDefaultDuration()).toBe(5000); // Should use default + expect(runLock.getDuration()).toBe(800); // Should use configured value // Test lock extension with custom threshold // Use a short lock duration but longer operation to trigger extension - await runLock.lock("test-lock", ["extension-threshold-test"], 800, async () => { + await runLock.lock("test-lock", ["extension-threshold-test"], async () => { expect(runLock.isInsideLock()).toBe(true); // Sleep longer than lock duration to ensure extension works await new Promise((resolve) => setTimeout(resolve, 1200)); @@ -669,7 +640,7 @@ describe("RunLocker", () => { redis, logger, tracer: trace.getTracer("RunLockTest"), - defaultDuration: 3000, + duration: 3000, automaticExtensionThreshold: 300, retryConfig: { maxRetries: 5, @@ -679,7 +650,7 @@ describe("RunLocker", () => { try { // Verify all configurations are set - expect(runLock.getDefaultDuration()).toBe(3000); + expect(runLock.getDuration()).toBe(3000); expect(runLock.getAutomaticExtensionThreshold()).toBe(300); const retryConfig = runLock.getRetryConfig(); @@ -707,7 +678,7 @@ describe("RunLocker", () => { redis, logger, tracer: trace.getTracer("RunLockTest"), - defaultDuration: 10000, + duration: 10000, automaticExtensionThreshold: 2000, retryConfig: { maxRetries: 15, @@ -721,7 +692,7 @@ describe("RunLocker", () => { try { // Verify production configuration - expect(runLock.getDefaultDuration()).toBe(10000); + expect(runLock.getDuration()).toBe(10000); expect(runLock.getAutomaticExtensionThreshold()).toBe(2000); const retryConfig = runLock.getRetryConfig(); @@ -769,7 +740,7 @@ describe("RunLocker", () => { expect(config.maxRetries).toBe(0); // Should work for successful acquisitions - await runLock1.lock("test-lock", ["test-edge"], 5000, async () => { + await runLock1.lock("test-lock", ["test-edge"], async () => { expect(runLock1.isInsideLock()).toBe(true); }); } finally { @@ -798,7 +769,7 @@ describe("RunLocker", () => { expect(config.maxDelay).toBe(10); expect(config.jitterFactor).toBe(0.5); - await runLock2.lock("test-lock", ["test-small"], 5000, async () => { + await runLock2.lock("test-lock", ["test-small"], async () => { expect(runLock2.isInsideLock()).toBe(true); }); } finally { @@ -828,7 +799,7 @@ describe("RunLocker", () => { expect(config.baseDelay).toBe(100); // Basic functionality test with the configuration - await runLock.lock("test-lock", ["test-timing-config"], 5000, async () => { + await runLock.lock("test-lock", ["test-timing-config"], async () => { expect(runLock.isInsideLock()).toBe(true); }); @@ -851,12 +822,12 @@ describe("RunLocker", () => { }); // Acquire some locks to create state - await runLock.lock("test-lock", ["quit-test-1"], 5000, async () => { + await runLock.lock("test-lock", ["quit-test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); }); // Verify we can still acquire locks - await runLock.lock("test-lock", ["quit-test-2"], 5000, async () => { + await runLock.lock("test-lock", ["quit-test-2"], async () => { expect(runLock.isInsideLock()).toBe(true); }); @@ -872,7 +843,7 @@ describe("RunLocker", () => { }); try { - await newRunLock.lock("test-lock", ["quit-test-1"], 5000, async () => { + await newRunLock.lock("test-lock", ["quit-test-1"], async () => { expect(newRunLock.isInsideLock()).toBe(true); }); } finally { @@ -891,6 +862,7 @@ describe("RunLocker", () => { redis, logger, tracer: trace.getTracer("RunLockTest"), + duration: 1000, }); try { @@ -898,9 +870,7 @@ describe("RunLocker", () => { const startTime = Date.now(); // Acquire lock with short duration but long operation - await runLock.lock("test-lock", ["extension-test"], 1000, async (signal) => { - expect(signal).toBeDefined(); - + await runLock.lock("test-lock", ["extension-test"], async () => { // Operation longer than lock duration - should trigger extension await new Promise((resolve) => setTimeout(resolve, 2500)); @@ -935,12 +905,12 @@ describe("RunLocker", () => { expect(runLock.getCurrentResources()).toBeUndefined(); expect(runLock.isInsideLock()).toBe(false); - await runLock.lock("test-lock", ["resource-x", "resource-y"], 5000, async () => { + await runLock.lock("test-lock", ["resource-x", "resource-y"], async () => { // Inside lock expect(runLock.getCurrentResources()).toBe("resource-x,resource-y"); expect(runLock.isInsideLock()).toBe(true); - await runLock.lock("test-lock", ["resource-x", "resource-y"], 5000, async () => { + await runLock.lock("test-lock", ["resource-x", "resource-y"], async () => { // Nested lock with same resources expect(runLock.getCurrentResources()).toBe("resource-x,resource-y"); expect(runLock.isInsideLock()).toBe(true); @@ -956,40 +926,6 @@ describe("RunLocker", () => { } ); - redisTest( - "Test signal properties and behavior", - { timeout: 15_000 }, - async ({ redisOptions }) => { - const redis = createRedisClient(redisOptions); - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ - redis, - logger, - tracer: trace.getTracer("RunLockTest"), - }); - - try { - let signalReceived: redlock.RedlockAbortSignal | undefined; - - await runLock.lock("test-lock", ["signal-test"], 5000, async (signal) => { - signalReceived = signal; - - expect(signal).toBeDefined(); - expect(typeof signal.aborted).toBe("boolean"); - expect(signal.aborted).toBe(false); - - // Signal should have event listener capabilities - expect(typeof signal.addEventListener).toBe("function"); - expect(typeof signal.removeEventListener).toBe("function"); - }); - - expect(signalReceived).toBeDefined(); - } finally { - await runLock.quit(); - } - } - ); - redisTest( "Test retry behavior with exact timing", { timeout: 25_000 }, @@ -1008,6 +944,7 @@ describe("RunLocker", () => { redis: redis2, logger, tracer: trace.getTracer("RunLockTest"), + duration: 30000, retryConfig: { maxRetries: 3, baseDelay: 100, @@ -1020,7 +957,7 @@ describe("RunLocker", () => { try { // Create blocking lock with first instance - make it last much longer than retry logic - const blockingPromise = runLock1.lock("test-lock", ["timing-test"], 30000, async () => { + const blockingPromise = runLock1.lock("test-lock", ["timing-test"], async () => { await new Promise((resolve) => setTimeout(resolve, 15000)); }); @@ -1028,7 +965,7 @@ describe("RunLocker", () => { const startTime = Date.now(); try { - await runLock2.lock("test-lock", ["timing-test"], 5000, async () => { + await runLock2.lock("test-lock", ["timing-test"], async () => { expect(true).toBe(false); }); expect(true).toBe(false); // Should not reach here From 6674c58e3ee0db68c225bd2bedc3ce680041ceab Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 21:23:06 +0100 Subject: [PATCH 08/14] Fixed race condition --- internal-packages/run-engine/src/engine/locking.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index a91debc98a..fefe367fec 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -93,11 +93,7 @@ export class RunLocker { this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500; this.redlock = new Redlock([options.redis], { - driftFactor: 0.01, retryCount: 0, // Disable Redlock's internal retrying - we handle retries ourselves - retryDelay: 200, // Not used since retryCount = 0 - retryJitter: 200, // Not used since retryCount = 0 - automaticExtensionThreshold: this.automaticExtensionThreshold, }); this.asyncLocalStorage = new AsyncLocalStorage(); this.logger = options.logger; @@ -428,6 +424,11 @@ export class RunLocker { controller: AbortController, scheduleNext: () => void ): Promise { + // Check if cleanup has started before proceeding + if (context.timeout === null) { + return; + } + context.timeout = undefined; const [error, newLock] = await tryCatch(context.lock.extend(duration)); From 2034c570eccb24811f84e52d2f59ec595444c683 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 21:34:56 +0100 Subject: [PATCH 09/14] Handle undefined lock edge case --- .../run-engine/src/engine/locking.ts | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index fefe367fec..86675b7dd6 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -213,7 +213,7 @@ export class RunLocker { let totalWaitTime = 0; // Retry the lock acquisition with exponential backoff - let lock: redlock.Lock; + let lock: redlock.Lock | undefined; let lastError: Error | undefined; for (let attempt = 0; attempt <= maxRetries; attempt++) { @@ -315,6 +315,23 @@ export class RunLocker { throw lastError; } + // Safety guard: ensure lock was successfully acquired + if (!lock) { + this.logger.error("[RunLocker] Lock was not acquired despite completing retry loop", { + name, + resources: sortedResources, + maxRetries, + totalWaitTime: Math.round(totalWaitTime), + lastError: lastError?.message, + }); + throw new LockAcquisitionTimeoutError( + sortedResources, + Math.round(totalWaitTime), + maxRetries + 1, + `Lock acquisition on resources [${sortedResources.join(", ")}] failed unexpectedly` + ); + } + // Create an AbortController for our signal const controller = new AbortController(); const signal = controller.signal as redlock.RedlockAbortSignal; From 707cdf6a31d6b5aac04215bd2966948c7ef790a2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 21:41:22 +0100 Subject: [PATCH 10/14] Fixed reschedule race condition --- .../run-engine/src/engine/locking.ts | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 86675b7dd6..7d2ba5759a 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -418,15 +418,18 @@ export class RunLocker { context.lock.expiration - Date.now() - this.automaticExtensionThreshold; if (timeUntilExtension > 0) { - context.timeout = setTimeout(() => { - context.extension = this.#extendLock( - context, - duration, - signal, - controller, - scheduleExtension - ); - }, timeUntilExtension); + // Check for cleanup immediately before scheduling to prevent race condition + if (context.timeout !== null) { + context.timeout = setTimeout(() => { + context.extension = this.#extendLock( + context, + duration, + signal, + controller, + scheduleExtension + ); + }, timeUntilExtension); + } } }; @@ -452,10 +455,8 @@ export class RunLocker { if (!error && newLock) { context.lock = newLock; - // Only schedule next extension if we haven't been cleaned up - if (context.timeout !== null) { - scheduleNext(); - } + // Schedule next extension (cleanup check is now inside scheduleNext) + scheduleNext(); } else { if (context.lock.expiration > Date.now()) { // If lock hasn't expired yet, schedule a retry instead of recursing From abd30cb400a39f17eed91b4ebf0713e03ea95f20 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 14 Jun 2025 22:25:55 +0100 Subject: [PATCH 11/14] rename maxRetries to maxAttempts --- apps/webapp/app/v3/runEngine.server.ts | 2 +- .../run-engine/src/engine/index.ts | 2 +- .../run-engine/src/engine/locking.ts | 16 ++++---- .../src/engine/tests/locking.test.ts | 38 +++++++++---------- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index acfaa57719..559c363988 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -78,7 +78,7 @@ function createRunEngine() { duration: env.RUN_ENGINE_RUN_LOCK_DURATION, automaticExtensionThreshold: env.RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD, retryConfig: { - maxRetries: env.RUN_ENGINE_RUN_LOCK_MAX_RETRIES, + maxAttempts: env.RUN_ENGINE_RUN_LOCK_MAX_RETRIES, baseDelay: env.RUN_ENGINE_RUN_LOCK_BASE_DELAY, maxDelay: env.RUN_ENGINE_RUN_LOCK_MAX_DELAY, backoffMultiplier: env.RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 39c862d848..f5fcdca179 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -101,7 +101,7 @@ export class RunEngine { duration: options.runLock.duration ?? 5000, automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000, retryConfig: { - maxRetries: 10, + maxAttempts: 10, baseDelay: 100, maxDelay: 3000, backoffMultiplier: 1.8, diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 7d2ba5759a..5e2e7432bd 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -52,8 +52,8 @@ interface ManualLockContext { } export interface LockRetryConfig { - /** Maximum number of retry attempts (default: 10) */ - maxRetries?: number; + /** Maximum number of locking attempts (default: 10) */ + maxAttempts?: number; /** Initial delay in milliseconds (default: 200) */ baseDelay?: number; /** Maximum delay cap in milliseconds (default: 5000) */ @@ -102,7 +102,7 @@ export class RunLocker { // Initialize retry configuration with defaults this.retryConfig = { - maxRetries: options.retryConfig?.maxRetries ?? 10, + maxAttempts: options.retryConfig?.maxAttempts ?? 10, baseDelay: options.retryConfig?.baseDelay ?? 200, maxDelay: options.retryConfig?.maxDelay ?? 5000, backoffMultiplier: options.retryConfig?.backoffMultiplier ?? 1.5, @@ -206,7 +206,7 @@ export class RunLocker { const joinedResources = sortedResources.join(","); // Use configured retry settings with exponential backoff - const { maxRetries, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } = + const { maxAttempts, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } = this.retryConfig; // Track timing for total wait time limit @@ -216,7 +216,7 @@ export class RunLocker { let lock: redlock.Lock | undefined; let lastError: Error | undefined; - for (let attempt = 0; attempt <= maxRetries; attempt++) { + for (let attempt = 0; attempt <= maxAttempts; attempt++) { const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration)); if (!error && acquiredLock) { @@ -254,7 +254,7 @@ export class RunLocker { } // If this is the last attempt, throw timeout error - if (attempt === maxRetries) { + if (attempt === maxAttempts) { this.logger.warn("[RunLocker] Lock acquisition exhausted all retries", { name, resources: sortedResources, @@ -320,14 +320,14 @@ export class RunLocker { this.logger.error("[RunLocker] Lock was not acquired despite completing retry loop", { name, resources: sortedResources, - maxRetries, + maxAttempts, totalWaitTime: Math.round(totalWaitTime), lastError: lastError?.message, }); throw new LockAcquisitionTimeoutError( sortedResources, Math.round(totalWaitTime), - maxRetries + 1, + maxAttempts + 1, `Lock acquisition on resources [${sortedResources.join(", ")}] failed unexpectedly` ); } diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index e289e36fe2..04b43fd19a 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -115,7 +115,7 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 3, + maxAttempts: 3, baseDelay: 100, maxTotalWaitTime: 2000, // 2 second timeout for faster test }, @@ -246,7 +246,7 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 2, + maxAttempts: 2, baseDelay: 50, maxDelay: 200, backoffMultiplier: 2.0, @@ -258,7 +258,7 @@ describe("RunLocker", () => { try { // Verify configuration is set correctly const config = runLock.getRetryConfig(); - expect(config.maxRetries).toBe(2); + expect(config.maxAttempts).toBe(2); expect(config.baseDelay).toBe(50); expect(config.maxDelay).toBe(200); expect(config.backoffMultiplier).toBe(2.0); @@ -288,7 +288,7 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 2, + maxAttempts: 2, baseDelay: 50, maxTotalWaitTime: 500, // Shorter timeout to ensure failure }, @@ -315,7 +315,7 @@ describe("RunLocker", () => { if (error instanceof LockAcquisitionTimeoutError) { expect(error.resources).toEqual(["test-error"]); expect(error.attempts).toBeGreaterThan(0); - expect(error.attempts).toBeLessThanOrEqual(3); // maxRetries + 1 + expect(error.attempts).toBeLessThanOrEqual(3); // maxAttempts + 1 expect(error.totalWaitTime).toBeGreaterThan(0); expect(error.totalWaitTime).toBeLessThanOrEqual(800); // Some tolerance expect(error.name).toBe("LockAcquisitionTimeoutError"); @@ -344,7 +344,7 @@ describe("RunLocker", () => { try { const config = runLock.getRetryConfig(); - expect(config.maxRetries).toBe(10); + expect(config.maxAttempts).toBe(10); expect(config.baseDelay).toBe(200); expect(config.maxDelay).toBe(5000); expect(config.backoffMultiplier).toBe(1.5); @@ -371,7 +371,7 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 5, + maxAttempts: 5, maxTotalWaitTime: 10000, // Other values should use defaults }, @@ -379,7 +379,7 @@ describe("RunLocker", () => { try { const config = runLock.getRetryConfig(); - expect(config.maxRetries).toBe(5); // Overridden + expect(config.maxAttempts).toBe(5); // Overridden expect(config.maxTotalWaitTime).toBe(10000); // Overridden expect(config.baseDelay).toBe(200); // Default expect(config.maxDelay).toBe(5000); // Default @@ -643,7 +643,7 @@ describe("RunLocker", () => { duration: 3000, automaticExtensionThreshold: 300, retryConfig: { - maxRetries: 5, + maxAttempts: 5, baseDelay: 150, }, }); @@ -654,7 +654,7 @@ describe("RunLocker", () => { expect(runLock.getAutomaticExtensionThreshold()).toBe(300); const retryConfig = runLock.getRetryConfig(); - expect(retryConfig.maxRetries).toBe(5); + expect(retryConfig.maxAttempts).toBe(5); expect(retryConfig.baseDelay).toBe(150); // Test basic functionality with all custom configs @@ -681,7 +681,7 @@ describe("RunLocker", () => { duration: 10000, automaticExtensionThreshold: 2000, retryConfig: { - maxRetries: 15, + maxAttempts: 15, baseDelay: 100, maxDelay: 3000, backoffMultiplier: 1.8, @@ -696,7 +696,7 @@ describe("RunLocker", () => { expect(runLock.getAutomaticExtensionThreshold()).toBe(2000); const retryConfig = runLock.getRetryConfig(); - expect(retryConfig.maxRetries).toBe(15); + expect(retryConfig.maxAttempts).toBe(15); expect(retryConfig.baseDelay).toBe(100); expect(retryConfig.maxDelay).toBe(3000); expect(retryConfig.backoffMultiplier).toBe(1.8); @@ -722,14 +722,14 @@ describe("RunLocker", () => { redisTest("Test configuration edge cases", { timeout: 15_000 }, async ({ redisOptions }) => { const logger = new Logger("RunLockTest", "debug"); - // Test with maxRetries = 0 + // Test with maxAttempts = 0 const redis1 = createRedisClient(redisOptions); const runLock1 = new RunLocker({ redis: redis1, logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 0, + maxAttempts: 0, baseDelay: 100, maxTotalWaitTime: 1000, }, @@ -737,7 +737,7 @@ describe("RunLocker", () => { try { const config = runLock1.getRetryConfig(); - expect(config.maxRetries).toBe(0); + expect(config.maxAttempts).toBe(0); // Should work for successful acquisitions await runLock1.lock("test-lock", ["test-edge"], async () => { @@ -754,7 +754,7 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 2, + maxAttempts: 2, baseDelay: 1, maxDelay: 10, backoffMultiplier: 2.0, @@ -785,7 +785,7 @@ describe("RunLocker", () => { logger, tracer: trace.getTracer("RunLockTest"), retryConfig: { - maxRetries: 100, // High retry count + maxAttempts: 100, // High retry count baseDelay: 100, maxTotalWaitTime: 500, // But low total wait time }, @@ -794,7 +794,7 @@ describe("RunLocker", () => { try { // Test that total wait time configuration is properly applied const config = runLock.getRetryConfig(); - expect(config.maxRetries).toBe(100); + expect(config.maxAttempts).toBe(100); expect(config.maxTotalWaitTime).toBe(500); expect(config.baseDelay).toBe(100); @@ -946,7 +946,7 @@ describe("RunLocker", () => { tracer: trace.getTracer("RunLockTest"), duration: 30000, retryConfig: { - maxRetries: 3, + maxAttempts: 3, baseDelay: 100, maxDelay: 500, backoffMultiplier: 2.0, From f56f1bfce433f65b692138f572112be2a2aaa23a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 15 Jun 2025 07:30:10 +0100 Subject: [PATCH 12/14] Fix off by one error --- internal-packages/run-engine/src/engine/locking.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 5e2e7432bd..ca4261e625 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -216,7 +216,7 @@ export class RunLocker { let lock: redlock.Lock | undefined; let lastError: Error | undefined; - for (let attempt = 0; attempt <= maxAttempts; attempt++) { + for (let attempt = 0; attempt < maxAttempts; attempt++) { const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration)); if (!error && acquiredLock) { From 6d1366aa0c85737d20523d25a66963838feda635 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 15 Jun 2025 20:02:07 +0100 Subject: [PATCH 13/14] Go back to original loop condition --- internal-packages/run-engine/src/engine/locking.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index ca4261e625..5e2e7432bd 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -216,7 +216,7 @@ export class RunLocker { let lock: redlock.Lock | undefined; let lastError: Error | undefined; - for (let attempt = 0; attempt < maxAttempts; attempt++) { + for (let attempt = 0; attempt <= maxAttempts; attempt++) { const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration)); if (!error && acquiredLock) { From e4584d7c091c756e6d3386c3dcc26576c8f2c91d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 16 Jun 2025 16:02:41 +0100 Subject: [PATCH 14/14] Remove unnecessary locks --- internal-packages/run-engine/src/engine/locking.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 5e2e7432bd..a58c0dcfdc 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -337,7 +337,7 @@ export class RunLocker { const signal = controller.signal as redlock.RedlockAbortSignal; const manualContext: ManualLockContext = { - lock: lock!, + lock, timeout: undefined, extension: undefined, }; @@ -390,12 +390,12 @@ export class RunLocker { this.#cleanupExtension(manualContext); // Release the lock using tryCatch - const [releaseError] = await tryCatch(lock!.release()); + const [releaseError] = await tryCatch(lock.release()); if (releaseError) { this.logger.warn("[RunLocker] Error releasing lock", { error: releaseError, resources: sortedResources, - lockValue: lock!.value, + lockValue: lock.value, }); } }