@@ -18,7 +18,6 @@ import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.
1818import { startActiveSpan } from "../tracer.server" ;
1919import { ServiceValidationError , WithRunEngine } from "./baseService.server" ;
2020import { OutOfEntitlementError , TriggerTaskService } from "./triggerTask.server" ;
21- import { guardQueueSizeLimitsForEnv } from "./triggerTaskV2.server" ;
2221
2322const PROCESSING_BATCH_SIZE = 50 ;
2423const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20 ;
@@ -40,8 +39,6 @@ export const BatchProcessingOptions = z.object({
4039export type BatchProcessingOptions = z . infer < typeof BatchProcessingOptions > ;
4140
4241export type BatchTriggerTaskServiceOptions = {
43- idempotencyKey ?: string ;
44- idempotencyKeyExpiresAt ?: Date ;
4542 triggerVersion ?: string ;
4643 traceContext ?: Record < string , string | undefined > ;
4744 spanParentAsLink ?: boolean ;
@@ -73,61 +70,6 @@ export class BatchTriggerV3Service extends WithRunEngine {
7370 "call()" ,
7471 environment ,
7572 async ( span ) => {
76- const existingBatch = options . idempotencyKey
77- ? await this . _prisma . batchTaskRun . findUnique ( {
78- where : {
79- runtimeEnvironmentId_idempotencyKey : {
80- runtimeEnvironmentId : environment . id ,
81- idempotencyKey : options . idempotencyKey ,
82- } ,
83- } ,
84- } )
85- : undefined ;
86-
87- if ( existingBatch ) {
88- if (
89- existingBatch . idempotencyKeyExpiresAt &&
90- existingBatch . idempotencyKeyExpiresAt < new Date ( )
91- ) {
92- logger . debug ( "[BatchTriggerV3][call] Idempotency key has expired" , {
93- idempotencyKey : options . idempotencyKey ,
94- batch : {
95- id : existingBatch . id ,
96- friendlyId : existingBatch . friendlyId ,
97- runCount : existingBatch . runCount ,
98- idempotencyKeyExpiresAt : existingBatch . idempotencyKeyExpiresAt ,
99- idempotencyKey : existingBatch . idempotencyKey ,
100- } ,
101- } ) ;
102-
103- // Update the existing batch to remove the idempotency key
104- await this . _prisma . batchTaskRun . update ( {
105- where : { id : existingBatch . id } ,
106- data : { idempotencyKey : null } ,
107- } ) ;
108-
109- // Don't return, just continue with the batch trigger
110- } else {
111- span . setAttribute ( "batchId" , existingBatch . friendlyId ) ;
112-
113- //block the parent with all of the children
114- if ( body . resumeParentOnCompletion && body . parentRunId ) {
115- await this . #blockParentRun( {
116- parentRunId : body . parentRunId ,
117- childFriendlyIds : existingBatch . runIds ,
118- environment,
119- } ) ;
120- }
121-
122- return {
123- id : existingBatch . friendlyId ,
124- idempotencyKey : existingBatch . idempotencyKey ?? undefined ,
125- isCached : true ,
126- runCount : existingBatch . runCount ,
127- } ;
128- }
129- }
130-
13173 const { id, friendlyId } = BatchId . generate ( ) ;
13274
13375 span . setAttribute ( "batchId" , friendlyId ) ;
@@ -212,8 +154,6 @@ export class BatchTriggerV3Service extends WithRunEngine {
212154 id : BatchId . fromFriendlyId ( batchId ) ,
213155 friendlyId : batchId ,
214156 runtimeEnvironmentId : environment . id ,
215- idempotencyKey : options . idempotencyKey ,
216- idempotencyKeyExpiresAt : options . idempotencyKeyExpiresAt ,
217157 runCount : body . items . length ,
218158 runIds : [ ] ,
219159 payload : payloadPacket . data ,
@@ -305,8 +245,6 @@ export class BatchTriggerV3Service extends WithRunEngine {
305245 id : BatchId . fromFriendlyId ( batchId ) ,
306246 friendlyId : batchId ,
307247 runtimeEnvironmentId : environment . id ,
308- idempotencyKey : options . idempotencyKey ,
309- idempotencyKeyExpiresAt : options . idempotencyKeyExpiresAt ,
310248 runCount : body . items . length ,
311249 runIds : [ ] ,
312250 payload : payloadPacket . data ,
@@ -728,40 +666,4 @@ export class BatchTriggerV3Service extends WithRunEngine {
728666 } ;
729667 } ) ;
730668 }
731-
732- //todo what if the idempotent batch hasn't finished creating all the runs yet?!
733- async #blockParentRun( {
734- parentRunId,
735- childFriendlyIds,
736- environment,
737- } : {
738- parentRunId : string ;
739- childFriendlyIds : string [ ] ;
740- environment : AuthenticatedEnvironment ;
741- } ) {
742- const runsWithAssociatedWaitpoints = await this . _prisma . taskRun . findMany ( {
743- where : {
744- id : {
745- in : childFriendlyIds . map ( ( r ) => RunId . fromFriendlyId ( r ) ) ,
746- } ,
747- } ,
748- select : {
749- associatedWaitpoint : {
750- select : {
751- id : true ,
752- } ,
753- } ,
754- } ,
755- } ) ;
756-
757- await this . _engine . blockRunWithWaitpoint ( {
758- runId : RunId . fromFriendlyId ( parentRunId ) ,
759- waitpoints : runsWithAssociatedWaitpoints . flatMap ( ( r ) => {
760- if ( ! r . associatedWaitpoint ) return [ ] ;
761- return [ r . associatedWaitpoint . id ] ;
762- } ) ,
763- environmentId : environment . id ,
764- projectId : environment . projectId ,
765- } ) ;
766- }
767669}
0 commit comments