@@ -278,6 +278,119 @@ describe("RunEngine attempt failures", () => {
278278 }
279279 } ) ;
280280
281+ containerTest (
282+ "Fail (not a retriable error)" ,
283+ { timeout : 15_000 } ,
284+ async ( { prisma, redisOptions } ) => {
285+ //create environment
286+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
287+
288+ const engine = new RunEngine ( {
289+ prisma,
290+ worker : {
291+ redis : redisOptions ,
292+ workers : 1 ,
293+ tasksPerWorker : 10 ,
294+ pollIntervalMs : 100 ,
295+ } ,
296+ queue : {
297+ redis : redisOptions ,
298+ } ,
299+ runLock : {
300+ redis : redisOptions ,
301+ } ,
302+ machines : {
303+ defaultMachine : "small-1x" ,
304+ machines : {
305+ "small-1x" : {
306+ name : "small-1x" as const ,
307+ cpu : 0.5 ,
308+ memory : 0.5 ,
309+ centsPerMs : 0.0001 ,
310+ } ,
311+ } ,
312+ baseCostInCents : 0.0001 ,
313+ } ,
314+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
315+ } ) ;
316+
317+ try {
318+ const taskIdentifier = "test-task" ;
319+
320+ //create background worker
321+ await setupBackgroundWorker ( prisma , authenticatedEnvironment , taskIdentifier , undefined , {
322+ maxAttempts : 1 ,
323+ } ) ;
324+
325+ //trigger the run
326+ const run = await engine . trigger (
327+ {
328+ number : 1 ,
329+ friendlyId : "run_1234" ,
330+ environment : authenticatedEnvironment ,
331+ taskIdentifier,
332+ payload : "{}" ,
333+ payloadType : "application/json" ,
334+ context : { } ,
335+ traceContext : { } ,
336+ traceId : "t12345" ,
337+ spanId : "s12345" ,
338+ masterQueue : "main" ,
339+ queueName : "task/test-task" ,
340+ isTest : false ,
341+ tags : [ ] ,
342+ } ,
343+ prisma
344+ ) ;
345+
346+ //dequeue the run
347+ const dequeued = await engine . dequeueFromMasterQueue ( {
348+ consumerId : "test_12345" ,
349+ masterQueue : run . masterQueue ,
350+ maxRunCount : 10 ,
351+ } ) ;
352+
353+ //create an attempt
354+ const attemptResult = await engine . startRunAttempt ( {
355+ runId : dequeued [ 0 ] . run . id ,
356+ snapshotId : dequeued [ 0 ] . snapshot . id ,
357+ } ) ;
358+
359+ //fail the attempt with an unretriable error
360+ const error = {
361+ type : "INTERNAL_ERROR" as const ,
362+ code : "DISK_SPACE_EXCEEDED" as const ,
363+ } ;
364+ const result = await engine . completeRunAttempt ( {
365+ runId : dequeued [ 0 ] . run . id ,
366+ snapshotId : attemptResult . snapshot . id ,
367+ completion : {
368+ ok : false ,
369+ id : dequeued [ 0 ] . run . id ,
370+ error,
371+ retry : {
372+ timestamp : Date . now ( ) ,
373+ delay : 0 ,
374+ } ,
375+ } ,
376+ } ) ;
377+ expect ( result . attemptStatus ) . toBe ( "RUN_FINISHED" ) ;
378+ expect ( result . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
379+ expect ( result . run . status ) . toBe ( "CRASHED" ) ;
380+
381+ //state should be pending
382+ const executionData3 = await engine . getRunExecutionData ( { runId : run . id } ) ;
383+ assertNonNullable ( executionData3 ) ;
384+ expect ( executionData3 . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
385+ //only when the new attempt is created, should the attempt be increased
386+ expect ( executionData3 . run . attemptNumber ) . toBe ( 1 ) ;
387+ expect ( executionData3 . run . status ) . toBe ( "CRASHED" ) ;
388+ } finally {
389+ engine . quit ( ) ;
390+ }
391+ }
392+ ) ;
393+
281394 containerTest ( "OOM fail" , { timeout : 15_000 } , async ( { prisma, redisOptions } ) => {
282395 //create environment
283396 const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
@@ -553,4 +666,170 @@ describe("RunEngine attempt failures", () => {
553666 }
554667 }
555668 ) ;
669+
670+ containerTest (
671+ "OOM fails after retrying on larger machine" ,
672+ { timeout : 15_000 } ,
673+ async ( { prisma, redisOptions } ) => {
674+ //create environment
675+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
676+
677+ const engine = new RunEngine ( {
678+ prisma,
679+ worker : {
680+ redis : redisOptions ,
681+ workers : 1 ,
682+ tasksPerWorker : 10 ,
683+ pollIntervalMs : 100 ,
684+ } ,
685+ queue : {
686+ redis : redisOptions ,
687+ } ,
688+ runLock : {
689+ redis : redisOptions ,
690+ } ,
691+ machines : {
692+ defaultMachine : "small-1x" ,
693+ machines : {
694+ "small-1x" : {
695+ name : "small-1x" as const ,
696+ cpu : 0.5 ,
697+ memory : 0.5 ,
698+ centsPerMs : 0.0001 ,
699+ } ,
700+ "small-2x" : {
701+ name : "small-2x" as const ,
702+ cpu : 1 ,
703+ memory : 1 ,
704+ centsPerMs : 0.0002 ,
705+ } ,
706+ } ,
707+ baseCostInCents : 0.0001 ,
708+ } ,
709+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
710+ } ) ;
711+
712+ try {
713+ const taskIdentifier = "test-task" ;
714+
715+ //create background worker
716+ await setupBackgroundWorker ( prisma , authenticatedEnvironment , taskIdentifier , undefined , {
717+ outOfMemory : {
718+ machine : "small-2x" ,
719+ } ,
720+ } ) ;
721+
722+ //trigger the run
723+ const run = await engine . trigger (
724+ {
725+ number : 1 ,
726+ friendlyId : "run_1234" ,
727+ environment : authenticatedEnvironment ,
728+ taskIdentifier,
729+ payload : "{}" ,
730+ payloadType : "application/json" ,
731+ context : { } ,
732+ traceContext : { } ,
733+ traceId : "t12345" ,
734+ spanId : "s12345" ,
735+ masterQueue : "main" ,
736+ queueName : "task/test-task" ,
737+ isTest : false ,
738+ tags : [ ] ,
739+ } ,
740+ prisma
741+ ) ;
742+
743+ //dequeue the run
744+ const dequeued = await engine . dequeueFromMasterQueue ( {
745+ consumerId : "test_12345" ,
746+ masterQueue : run . masterQueue ,
747+ maxRunCount : 10 ,
748+ } ) ;
749+
750+ //create first attempt
751+ const attemptResult = await engine . startRunAttempt ( {
752+ runId : dequeued [ 0 ] . run . id ,
753+ snapshotId : dequeued [ 0 ] . snapshot . id ,
754+ } ) ;
755+
756+ //fail the first attempt with an OOM error
757+ const error = {
758+ type : "INTERNAL_ERROR" as const ,
759+ code : "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const ,
760+ message : "Process exited with code -1 after signal SIGKILL." ,
761+ stackTrace : "JavaScript heap out of memory" ,
762+ } ;
763+
764+ const result = await engine . completeRunAttempt ( {
765+ runId : dequeued [ 0 ] . run . id ,
766+ snapshotId : attemptResult . snapshot . id ,
767+ completion : {
768+ ok : false ,
769+ id : dequeued [ 0 ] . run . id ,
770+ error,
771+ retry : {
772+ timestamp : Date . now ( ) ,
773+ delay : 0 ,
774+ } ,
775+ } ,
776+ } ) ;
777+
778+ // The run should be retried with a larger machine
779+ expect ( result . attemptStatus ) . toBe ( "RETRY_QUEUED" ) ;
780+ expect ( result . snapshot . executionStatus ) . toBe ( "QUEUED" ) ;
781+ expect ( result . run . status ) . toBe ( "RETRYING_AFTER_FAILURE" ) ;
782+
783+ //state should be queued
784+ const executionData = await engine . getRunExecutionData ( { runId : run . id } ) ;
785+ assertNonNullable ( executionData ) ;
786+ expect ( executionData . snapshot . executionStatus ) . toBe ( "QUEUED" ) ;
787+ expect ( executionData . run . attemptNumber ) . toBe ( 1 ) ;
788+ expect ( executionData . run . status ) . toBe ( "RETRYING_AFTER_FAILURE" ) ;
789+
790+ //dequeue again
791+ const dequeued2 = await engine . dequeueFromMasterQueue ( {
792+ consumerId : "test_12345" ,
793+ masterQueue : run . masterQueue ,
794+ maxRunCount : 10 ,
795+ } ) ;
796+
797+ //create second attempt
798+ const attemptResult2 = await engine . startRunAttempt ( {
799+ runId : dequeued2 [ 0 ] . run . id ,
800+ snapshotId : dequeued2 [ 0 ] . snapshot . id ,
801+ } ) ;
802+ expect ( attemptResult2 . run . attemptNumber ) . toBe ( 2 ) ;
803+
804+ //fail the second attempt with the same OOM error
805+ const result2 = await engine . completeRunAttempt ( {
806+ runId : dequeued2 [ 0 ] . run . id ,
807+ snapshotId : attemptResult2 . snapshot . id ,
808+ completion : {
809+ ok : false ,
810+ id : dequeued2 [ 0 ] . run . id ,
811+ error,
812+ retry : {
813+ timestamp : Date . now ( ) ,
814+ delay : 0 ,
815+ } ,
816+ } ,
817+ } ) ;
818+
819+ // The run should fail after the second OOM
820+ expect ( result2 . attemptStatus ) . toBe ( "RUN_FINISHED" ) ;
821+ expect ( result2 . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
822+ expect ( result2 . run . status ) . toBe ( "CRASHED" ) ;
823+
824+ //final state should be crashed
825+ const finalExecutionData = await engine . getRunExecutionData ( { runId : run . id } ) ;
826+ assertNonNullable ( finalExecutionData ) ;
827+ expect ( finalExecutionData . snapshot . executionStatus ) . toBe ( "FINISHED" ) ;
828+ expect ( finalExecutionData . run . attemptNumber ) . toBe ( 2 ) ;
829+ expect ( finalExecutionData . run . status ) . toBe ( "CRASHED" ) ;
830+ } finally {
831+ engine . quit ( ) ;
832+ }
833+ }
834+ ) ;
556835} ) ;
0 commit comments