77} from "@trigger.dev/core/v3" ;
88import {
99 BatchTaskRun ,
10+ isPrismaRetriableError ,
1011 isUniqueConstraintError ,
1112 Prisma ,
1213 TaskRunAttempt ,
@@ -20,6 +21,7 @@ import { logger } from "~/services/logger.server";
2021import { getEntitlement } from "~/services/platform.v3.server" ;
2122import { workerQueue } from "~/services/worker.server" ;
2223import { generateFriendlyId } from "../friendlyIdentifiers" ;
24+ import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server" ;
2325import { marqs } from "../marqs/index.server" ;
2426import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server" ;
2527import { downloadPacketFromObjectStore , uploadPacketToObjectStore } from "../r2.server" ;
@@ -923,71 +925,122 @@ export async function completeBatchTaskRunItemV3(
923925 batchTaskRunId : string ,
924926 tx : PrismaClientOrTransaction ,
925927 scheduleResumeOnComplete = false ,
926- taskRunAttemptId ?: string
928+ taskRunAttemptId ?: string ,
929+ retryAttempt ?: number
927930) {
928- await $transaction (
929- tx ,
930- "completeBatchTaskRunItemV3" ,
931- async ( tx , span ) => {
932- span ?. setAttribute ( "batch_id" , batchTaskRunId ) ;
933-
934- // Update the item to complete
935- const updated = await tx . batchTaskRunItem . updateMany ( {
936- where : {
937- id : itemId ,
938- status : "PENDING" ,
939- } ,
940- data : {
941- status : "COMPLETED" ,
942- taskRunAttemptId,
943- } ,
944- } ) ;
931+ const isRetry = retryAttempt !== undefined ;
932+
933+ if ( isRetry ) {
934+ logger . debug ( "completeBatchTaskRunItemV3 retrying" , {
935+ itemId,
936+ batchTaskRunId,
937+ scheduleResumeOnComplete,
938+ taskRunAttemptId,
939+ retryAttempt,
940+ } ) ;
941+ }
945942
946- if ( updated . count === 0 ) {
947- return ;
948- }
943+ try {
944+ await $transaction (
945+ tx ,
946+ "completeBatchTaskRunItemV3" ,
947+ async ( tx , span ) => {
948+ span ?. setAttribute ( "batch_id" , batchTaskRunId ) ;
949949
950- const updatedBatchRun = await tx . batchTaskRun . update ( {
951- where : {
952- id : batchTaskRunId ,
953- } ,
954- data : {
955- completedCount : {
956- increment : 1 ,
950+ // Update the item to complete
951+ const updated = await tx . batchTaskRunItem . updateMany ( {
952+ where : {
953+ id : itemId ,
954+ status : "PENDING" ,
957955 } ,
958- } ,
959- select : {
960- sealed : true ,
961- status : true ,
962- completedCount : true ,
963- expectedCount : true ,
964- dependentTaskAttemptId : true ,
965- } ,
966- } ) ;
956+ data : {
957+ status : "COMPLETED" ,
958+ taskRunAttemptId,
959+ } ,
960+ } ) ;
967961
968- if (
969- updatedBatchRun . status === "PENDING" &&
970- updatedBatchRun . completedCount === updatedBatchRun . expectedCount &&
971- updatedBatchRun . sealed
972- ) {
973- await tx . batchTaskRun . update ( {
962+ if ( updated . count === 0 ) {
963+ return ;
964+ }
965+
966+ const updatedBatchRun = await tx . batchTaskRun . update ( {
974967 where : {
975968 id : batchTaskRunId ,
976969 } ,
977970 data : {
978- status : "COMPLETED" ,
979- completedAt : new Date ( ) ,
971+ completedCount : {
972+ increment : 1 ,
973+ } ,
974+ } ,
975+ select : {
976+ sealed : true ,
977+ status : true ,
978+ completedCount : true ,
979+ expectedCount : true ,
980+ dependentTaskAttemptId : true ,
980981 } ,
981982 } ) ;
982983
983- // We only need to resume the batch if it has a dependent task attempt ID
984- if ( scheduleResumeOnComplete && updatedBatchRun . dependentTaskAttemptId ) {
985- await ResumeBatchRunService . enqueue ( batchTaskRunId , true , tx ) ;
984+ if (
985+ updatedBatchRun . status === "PENDING" &&
986+ updatedBatchRun . completedCount === updatedBatchRun . expectedCount &&
987+ updatedBatchRun . sealed
988+ ) {
989+ await tx . batchTaskRun . update ( {
990+ where : {
991+ id : batchTaskRunId ,
992+ } ,
993+ data : {
994+ status : "COMPLETED" ,
995+ completedAt : new Date ( ) ,
996+ } ,
997+ } ) ;
998+
999+ // We only need to resume the batch if it has a dependent task attempt ID
1000+ if ( scheduleResumeOnComplete && updatedBatchRun . dependentTaskAttemptId ) {
1001+ await ResumeBatchRunService . enqueue ( batchTaskRunId , true , tx ) ;
1002+ }
9861003 }
1004+ } ,
1005+ {
1006+ timeout : 10000 ,
9871007 }
988- } ,
989- {
990- timeout : 10000 ,
1008+ ) ;
1009+ } catch ( error ) {
1010+ if ( isPrismaRetriableError ( error ) ) {
1011+ logger . error ( "completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry" , {
1012+ itemId,
1013+ batchTaskRunId,
1014+ error,
1015+ retryAttempt,
1016+ isRetry,
1017+ } ) ;
1018+
1019+ if ( isRetry ) {
1020+ //throwing this error will cause the Redis worker to retry the job
1021+ throw error ;
1022+ } else {
1023+ //schedule a retry
1024+ await legacyRunEngineWorker . enqueue ( {
1025+ id : `completeBatchTaskRunItem:${ itemId } ` ,
1026+ job : "completeBatchTaskRunItem" ,
1027+ payload : {
1028+ itemId,
1029+ batchTaskRunId,
1030+ scheduleResumeOnComplete,
1031+ taskRunAttemptId,
1032+ } ,
1033+ availableAt : new Date ( Date . now ( ) + 2_000 ) ,
1034+ } ) ;
1035+ }
1036+ } else {
1037+ logger . error ( "completeBatchTaskRunItemV3 failed with a non-retriable error" , {
1038+ itemId,
1039+ batchTaskRunId,
1040+ error,
1041+ retryAttempt,
1042+ isRetry,
1043+ } ) ;
9911044 }
992- ) ;
1045+ }
9931046}
0 commit comments