@@ -71,6 +71,7 @@ export class RunEngine {
7171 private tracer : Tracer ;
7272 private meter : Meter ;
7373 private heartbeatTimeouts : HeartbeatTimeouts ;
74+ private repairSnapshotTimeoutMs : number ;
7475
7576 prisma : PrismaClient ;
7677 readOnlyPrisma : PrismaReplicaClient ;
@@ -191,6 +192,9 @@ export class RunEngine {
191192 heartbeatSnapshot : async ( { payload } ) => {
192193 await this . #handleStalledSnapshot( payload ) ;
193194 } ,
195+ repairSnapshot : async ( { payload } ) => {
196+ await this . #handleRepairSnapshot( payload ) ;
197+ } ,
194198 expireRun : async ( { payload } ) => {
195199 await this . ttlSystem . expireRun ( { runId : payload . runId } ) ;
196200 } ,
@@ -241,6 +245,8 @@ export class RunEngine {
241245 ...( options . heartbeatTimeoutsMs ?? { } ) ,
242246 } ;
243247
248+ this . repairSnapshotTimeoutMs = options . repairSnapshotTimeoutMs ?? 60_000 ;
249+
244250 const resources : SystemResources = {
245251 prisma : this . prisma ,
246252 worker : this . worker ,
@@ -1174,81 +1180,77 @@ export class RunEngine {
11741180 async repairEnvironment ( environment : AuthenticatedEnvironment , dryRun : boolean ) {
11751181 const runIds = await this . runQueue . getCurrentConcurrencyOfEnvironment ( environment ) ;
11761182
1177- const completedRuns = await this . #concurrencySweeperCallback( runIds , 5000 ) ;
1183+ return this . #repairRuns( runIds , dryRun ) ;
1184+ }
11781185
1179- if ( dryRun ) {
1180- return {
1181- runIds,
1182- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1183- dryRun,
1184- } ;
1185- }
1186+ async repairQueue (
1187+ environment : AuthenticatedEnvironment ,
1188+ queue : string ,
1189+ dryRun : boolean ,
1190+ ignoreRunIds : string [ ]
1191+ ) {
1192+ const runIds = await this . runQueue . getCurrentConcurrencyOfQueue ( environment , queue ) ;
1193+
1194+ const runIdsToRepair = runIds . filter ( ( runId ) => ! ignoreRunIds . includes ( runId ) ) ;
1195+
1196+ return this . #repairRuns( runIdsToRepair , dryRun ) ;
1197+ }
11861198
1187- if ( completedRuns . length === 0 ) {
1199+ async #repairRuns( runIds : string [ ] , dryRun : boolean ) {
1200+ if ( runIds . length === 0 ) {
11881201 return {
11891202 runIds,
1190- completedRunIds : [ ] ,
1203+ repairs : [ ] ,
11911204 dryRun,
11921205 } ;
11931206 }
11941207
1195- await pMap (
1196- completedRuns ,
1197- async ( run ) => {
1198- await this . runQueue . acknowledgeMessage ( run . orgId , run . id , {
1199- skipDequeueProcessing : true ,
1200- removeFromWorkerQueue : false ,
1201- } ) ;
1208+ const repairs = await pMap (
1209+ runIds ,
1210+ async ( runId ) => {
1211+ return this . #repairRun( runId , dryRun ) ;
12021212 } ,
12031213 { concurrency : 5 }
12041214 ) ;
12051215
12061216 return {
12071217 runIds,
1208- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1218+ repairs ,
12091219 dryRun,
12101220 } ;
12111221 }
12121222
1213- async repairQueue ( environment : AuthenticatedEnvironment , queue : string , dryRun : boolean ) {
1214- const runIds = await this . runQueue . getCurrentConcurrencyOfQueue ( environment , queue ) ;
1215-
1216- const completedRuns = await this . #concurrencySweeperCallback( runIds , 5000 ) ;
1217-
1218- if ( dryRun ) {
1219- return {
1220- queue,
1221- runIds,
1222- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1223- dryRun,
1224- } ;
1225- }
1223+ async #repairRun( runId : string , dryRun : boolean ) {
1224+ const snapshot = await getLatestExecutionSnapshot ( this . prisma , runId ) ;
1225+
1226+ if (
1227+ snapshot . executionStatus === "QUEUED" ||
1228+ snapshot . executionStatus === "SUSPENDED" ||
1229+ snapshot . executionStatus === "FINISHED"
1230+ ) {
1231+ if ( ! dryRun ) {
1232+ // Schedule the repair job
1233+ await this . worker . enqueueOnce ( {
1234+ id : `repair-in-progress-run:${ runId } ` ,
1235+ job : "repairSnapshot" ,
1236+ payload : { runId, snapshotId : snapshot . id , executionStatus : snapshot . executionStatus } ,
1237+ availableAt : new Date ( Date . now ( ) + this . repairSnapshotTimeoutMs ) ,
1238+ } ) ;
1239+ }
12261240
1227- if ( completedRuns . length === 0 ) {
12281241 return {
1229- queue ,
1230- runIds ,
1231- completedRunIds : [ ] ,
1232- dryRun ,
1242+ action : "repairSnapshot" ,
1243+ runId ,
1244+ snapshotStatus : snapshot . executionStatus ,
1245+ snapshotId : snapshot . id ,
12331246 } ;
12341247 }
12351248
1236- await pMap (
1237- completedRuns ,
1238- async ( run ) => {
1239- await this . runQueue . acknowledgeMessage ( run . orgId , run . id , {
1240- skipDequeueProcessing : true ,
1241- removeFromWorkerQueue : false ,
1242- } ) ;
1243- } ,
1244- { concurrency : 5 }
1245- ) ;
1246-
12471249 return {
1248- queue ,
1249- runIds ,
1250- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1251- dryRun ,
1250+ action : "ignore" ,
1251+ runId ,
1252+ snapshotStatus : snapshot . executionStatus ,
1253+ snapshotId : snapshot . id ,
12521254 } ;
12531255 }
12541256
@@ -1650,6 +1652,117 @@ export class RunEngine {
16501652 } ) ;
16511653 }
16521654
1655+ async #handleRepairSnapshot( {
1656+ runId,
1657+ snapshotId,
1658+ executionStatus,
1659+ } : {
1660+ runId : string ;
1661+ snapshotId : string ;
1662+ executionStatus : string ;
1663+ } ) {
1664+ return await this . runLock . lock ( "handleRepairSnapshot" , [ runId ] , async ( ) => {
1665+ const latestSnapshot = await getLatestExecutionSnapshot ( this . prisma , runId ) ;
1666+
1667+ if ( latestSnapshot . id !== snapshotId ) {
1668+ this . logger . log (
1669+ "RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair." ,
1670+ {
1671+ runId,
1672+ snapshotId,
1673+ latestSnapshotExecutionStatus : latestSnapshot . executionStatus ,
1674+ repairExecutionStatus : executionStatus ,
1675+ }
1676+ ) ;
1677+
1678+ return ;
1679+ }
1680+
1681+ // Okay, so this means we haven't transitioned to a new status yes, so we need to do something
1682+ switch ( latestSnapshot . executionStatus ) {
1683+ case "EXECUTING" :
1684+ case "EXECUTING_WITH_WAITPOINTS" :
1685+ case "PENDING_CANCEL" :
1686+ case "PENDING_EXECUTING" :
1687+ case "QUEUED_EXECUTING" :
1688+ case "RUN_CREATED" : {
1689+ // Do nothing;
1690+ return ;
1691+ }
1692+ case "QUEUED" : {
1693+ this . logger . log ( "RunEngine.handleRepairSnapshot QUEUED" , {
1694+ runId,
1695+ snapshotId,
1696+ } ) ;
1697+
1698+ //it will automatically be requeued X times depending on the queue retry settings
1699+ const gotRequeued = await this . runQueue . nackMessage ( {
1700+ orgId : latestSnapshot . organizationId ,
1701+ messageId : runId ,
1702+ } ) ;
1703+
1704+ if ( ! gotRequeued ) {
1705+ this . logger . error ( "RunEngine.handleRepairSnapshot QUEUED repair failed" , {
1706+ runId,
1707+ snapshot : latestSnapshot ,
1708+ } ) ;
1709+ } else {
1710+ this . logger . log ( "RunEngine.handleRepairSnapshot QUEUED repair successful" , {
1711+ runId,
1712+ snapshot : latestSnapshot ,
1713+ } ) ;
1714+ }
1715+
1716+ break ;
1717+ }
1718+ case "FINISHED" :
1719+ case "SUSPENDED" : {
1720+ this . logger . log ( "RunEngine.handleRepairSnapshot SUSPENDED/FINISHED" , {
1721+ runId,
1722+ snapshotId,
1723+ } ) ;
1724+
1725+ const taskRun = await this . prisma . taskRun . findFirst ( {
1726+ where : { id : runId } ,
1727+ select : {
1728+ queue : true ,
1729+ } ,
1730+ } ) ;
1731+
1732+ if ( ! taskRun ) {
1733+ this . logger . error ( "RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found" , {
1734+ runId,
1735+ snapshotId,
1736+ } ) ;
1737+ return ;
1738+ }
1739+
1740+ // We need to clear this run from the current concurrency sets
1741+ await this . runQueue . clearMessageFromConcurrencySets ( {
1742+ runId,
1743+ orgId : latestSnapshot . organizationId ,
1744+ queue : taskRun . queue ,
1745+ env : {
1746+ id : latestSnapshot . environmentId ,
1747+ type : latestSnapshot . environmentType ,
1748+ project : {
1749+ id : latestSnapshot . projectId ,
1750+ } ,
1751+ organization : {
1752+ id : latestSnapshot . organizationId ,
1753+ } ,
1754+ } ,
1755+ } ) ;
1756+
1757+ break ;
1758+ }
1759+ default : {
1760+ assertNever ( latestSnapshot . executionStatus ) ;
1761+ }
1762+ }
1763+ } ) ;
1764+ }
1765+
16531766 async #concurrencySweeperCallback(
16541767 runIds : string [ ] ,
16551768 completedAtOffsetMs : number = 1000 * 60 * 10
0 commit comments