1- import { createRedisClient , Redis , RedisOptions } from "@internal/redis" ;
1+ import {
2+ createRedisClient ,
3+ Redis ,
4+ RedisOptions ,
5+ type Callback ,
6+ type Result ,
7+ } from "@internal/redis" ;
28import { startSpan } from "@internal/tracing" ;
39import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic" ;
410import { PrismaClientOrTransaction , TaskRun , Waitpoint } from "@trigger.dev/database" ;
@@ -79,6 +85,30 @@ export class DebounceSystem {
7985 this . executionSnapshotSystem = options . executionSnapshotSystem ;
8086 this . delayedRunSystem = options . delayedRunSystem ;
8187 this . maxDebounceDurationMs = options . maxDebounceDurationMs ;
88+
89+ this . #registerCommands( ) ;
90+ }
91+
92+ #registerCommands( ) {
93+ // Atomically deletes a key only if its value starts with "pending:".
94+ // Returns [1, nil] if deleted (was pending or didn't exist)
95+ // Returns [0, value] if not deleted (has a run ID)
96+ // This prevents the race condition where between checking "still pending?"
97+ // and calling DEL, the original server could complete and register a valid run ID.
98+ this . redis . defineCommand ( "conditionallyDeletePendingKey" , {
99+ numberOfKeys : 1 ,
100+ lua : `
101+ local value = redis.call('GET', KEYS[1])
102+ if not value then
103+ return { 1, nil }
104+ end
105+ if string.sub(value, 1, 8) == 'pending:' then
106+ redis.call('DEL', KEYS[1])
107+ return { 1, nil }
108+ end
109+ return { 0, value }
110+ ` ,
111+ } ) ;
82112 }
83113
84114 /**
@@ -89,6 +119,34 @@ export class DebounceSystem {
89119 return `${ envId } :${ taskId } :${ debounceKey } ` ;
90120 }
91121
122+ /**
123+ * Atomically deletes a key only if its value still starts with "pending:".
124+ * This prevents the race condition where between the final GET check and DEL,
125+ * the original server could complete and register a valid run ID.
126+ *
127+ * @returns { deleted: true } if the key was deleted or didn't exist
128+ * @returns { deleted: false, existingRunId: string } if the key has a valid run ID
129+ */
130+ private async conditionallyDeletePendingKey (
131+ redisKey : string
132+ ) : Promise < { deleted : true } | { deleted : false ; existingRunId : string } > {
133+ const result = await this . redis . conditionallyDeletePendingKey ( redisKey ) ;
134+
135+ if ( ! result ) {
136+ // Should not happen, but treat as deleted if no result
137+ return { deleted : true } ;
138+ }
139+
140+ const [ wasDeleted , currentValue ] = result ;
141+
142+ if ( wasDeleted === 1 ) {
143+ return { deleted : true } ;
144+ }
145+
146+ // Key exists with a valid run ID
147+ return { deleted : false , existingRunId : currentValue ! } ;
148+ }
149+
92150 /**
93151 * Atomically claims a debounce key using SET NX.
94152 * This prevents the race condition where two servers both check for an existing
@@ -215,16 +273,43 @@ export class DebounceSystem {
215273 }
216274
217275 // Timed out waiting - the other server may have failed
218- // Delete the stale pending key and return "new"
276+ // Conditionally delete the key only if it's still pending
277+ // This prevents the race where the original server completed between our last check and now
219278 this . $ . logger . warn (
220- "waitForExistingRun: timed out waiting for pending claim, deleting stale key " ,
279+ "waitForExistingRun: timed out waiting for pending claim, attempting conditional delete " ,
221280 {
222281 redisKey,
223282 debounceKey : debounce . key ,
224283 }
225284 ) ;
226- await this . redis . del ( redisKey ) ;
227- return { status : "new" } ;
285+
286+ const deleteResult = await this . conditionallyDeletePendingKey ( redisKey ) ;
287+
288+ if ( deleteResult . deleted ) {
289+ // Key was pending (or didn't exist) - safe to create new run
290+ this . $ . logger . debug ( "waitForExistingRun: stale pending key deleted, returning new" , {
291+ redisKey,
292+ debounceKey : debounce . key ,
293+ } ) ;
294+ return { status : "new" } ;
295+ }
296+
297+ // Key now has a valid run ID - the original server completed!
298+ // Handle the existing run instead of creating a duplicate
299+ this . $ . logger . debug (
300+ "waitForExistingRun: original server completed during timeout, handling existing run" ,
301+ {
302+ redisKey,
303+ debounceKey : debounce . key ,
304+ existingRunId : deleteResult . existingRunId ,
305+ }
306+ ) ;
307+ return await this . handleExistingRun ( {
308+ existingRunId : deleteResult . existingRunId ,
309+ redisKey,
310+ debounce,
311+ tx,
312+ } ) ;
228313 }
229314
230315 /**
@@ -558,3 +643,17 @@ export class DebounceSystem {
558643 await this . redis . quit ( ) ;
559644 }
560645}
646+
647+ declare module "@internal/redis" {
648+ interface RedisCommander < Context > {
649+ /**
650+ * Atomically deletes a key only if its value starts with "pending:".
651+ * @returns [1, nil] if deleted (was pending or didn't exist)
652+ * @returns [0, value] if not deleted (has a run ID)
653+ */
654+ conditionallyDeletePendingKey (
655+ key : string ,
656+ callback ?: Callback < [ number , string | null ] >
657+ ) : Result < [ number , string | null ] , Context > ;
658+ }
659+ }
0 commit comments