Skip to content

Commit b383a73

Browse files
committed
fix race condition when the delayUntil is updated while the enqueueDelayedRun worker job is executing
1 parent cddc7b6 commit b383a73

File tree

2 files changed

+123
-1
lines changed

2 files changed

+123
-1
lines changed

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ export class DelayedRunSystem {
4141
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
4242

4343
// Check if the run is still in DELAYED status (or legacy RUN_CREATED for older runs)
44-
if (snapshot.executionStatus !== "DELAYED" && snapshot.executionStatus !== "RUN_CREATED") {
44+
if (
45+
snapshot.executionStatus !== "DELAYED" &&
46+
snapshot.executionStatus !== "RUN_CREATED"
47+
) {
4548
throw new ServiceValidationError("Cannot reschedule a run that is not delayed");
4649
}
4750

@@ -128,6 +131,19 @@ export class DelayedRunSystem {
128131
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);
129132
}
130133

134+
// Check if delayUntil has been rescheduled to the future (e.g., by debounce)
135+
// If so, don't enqueue - the rescheduled worker job will handle it
136+
if (run.delayUntil && run.delayUntil > new Date()) {
137+
this.$.logger.debug(
138+
"enqueueDelayedRun: delay was rescheduled to the future, skipping enqueue",
139+
{
140+
runId,
141+
delayUntil: run.delayUntil,
142+
}
143+
);
144+
return;
145+
}
146+
131147
// Now we need to enqueue the run into the RunQueue
132148
// Skip the lock in enqueueRun since we already hold it
133149
await this.enqueueSystem.enqueueRun({

internal-packages/run-engine/src/engine/tests/delays.test.ts

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,4 +401,110 @@ describe("RunEngine delays", () => {
401401
await engine.quit();
402402
}
403403
});
404+
405+
containerTest(
406+
"enqueueDelayedRun respects rescheduled delayUntil",
407+
async ({ prisma, redisOptions }) => {
408+
// This test verifies the race condition fix where if delayUntil is updated
409+
// (e.g., by debounce reschedule) while the worker job is executing,
410+
// the run should NOT be enqueued at the original time.
411+
//
412+
// The race condition occurs when:
413+
// 1. Worker job is scheduled for T1
414+
// 2. rescheduleDelayedRun updates delayUntil to T2 in DB
415+
// 3. worker.reschedule() tries to update the job, but it's already dequeued
416+
// 4. Original worker job fires and calls enqueueDelayedRun
417+
//
418+
// Without the fix: Run would be enqueued at T1 (wrong!)
419+
// With the fix: enqueueDelayedRun checks delayUntil > now and skips
420+
421+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
422+
423+
const engine = new RunEngine({
424+
prisma,
425+
worker: {
426+
redis: redisOptions,
427+
workers: 1,
428+
tasksPerWorker: 10,
429+
pollIntervalMs: 100,
430+
},
431+
queue: {
432+
redis: redisOptions,
433+
},
434+
runLock: {
435+
redis: redisOptions,
436+
},
437+
machines: {
438+
defaultMachine: "small-1x",
439+
machines: {
440+
"small-1x": {
441+
name: "small-1x" as const,
442+
cpu: 0.5,
443+
memory: 0.5,
444+
centsPerMs: 0.0001,
445+
},
446+
},
447+
baseCostInCents: 0.0001,
448+
},
449+
tracer: trace.getTracer("test", "0.0.0"),
450+
});
451+
452+
try {
453+
const taskIdentifier = "test-task";
454+
455+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
456+
457+
// Create a delayed run with a short delay (300ms)
458+
const run = await engine.trigger(
459+
{
460+
number: 1,
461+
friendlyId: "run_1235",
462+
environment: authenticatedEnvironment,
463+
taskIdentifier,
464+
payload: "{}",
465+
payloadType: "application/json",
466+
context: {},
467+
traceContext: {},
468+
traceId: "t12345",
469+
spanId: "s12345",
470+
workerQueue: "main",
471+
queue: "task/test-task",
472+
isTest: false,
473+
tags: [],
474+
delayUntil: new Date(Date.now() + 300),
475+
},
476+
prisma
477+
);
478+
479+
// Verify it's delayed
480+
const executionData = await engine.getRunExecutionData({ runId: run.id });
481+
assertNonNullable(executionData);
482+
expect(executionData.snapshot.executionStatus).toBe("DELAYED");
483+
484+
// Simulate race condition: directly update delayUntil in the database to a future time
485+
// This simulates what happens when rescheduleDelayedRun updates the DB but the
486+
// worker.reschedule() call doesn't affect the already-dequeued job
487+
const newDelayUntil = new Date(Date.now() + 10_000); // 10 seconds in the future
488+
await prisma.taskRun.update({
489+
where: { id: run.id },
490+
data: { delayUntil: newDelayUntil },
491+
});
492+
493+
// Wait past the original delay (500ms) so the worker job fires
494+
await setTimeout(500);
495+
496+
// KEY ASSERTION: The run should still be DELAYED because the fix checks delayUntil > now
497+
// Without the fix, the run would be QUEUED here (wrong!)
498+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
499+
assertNonNullable(executionData2);
500+
expect(executionData2.snapshot.executionStatus).toBe("DELAYED");
501+
502+
// Note: We don't test the run eventually becoming QUEUED here because we only
503+
// updated the DB (simulating the race). In the real scenario, rescheduleDelayedRun
504+
// would also reschedule the worker job to fire at the new delayUntil time.
505+
} finally {
506+
await engine.quit();
507+
}
508+
}
509+
);
404510
});

0 commit comments

Comments
 (0)