@@ -29,6 +29,7 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
2929import { Prisma } from "@trigger.dev/database" ;
3030import { parseDelay } from "~/utils/delays" ;
3131import { OutOfEntitlementError , TriggerTaskServiceOptions } from "./triggerTask.server" ;
32+ import { removeQueueConcurrencyLimits , updateQueueConcurrencyLimits } from "../runQueue.server" ;
3233
3334export class TriggerTaskServiceV1 extends BaseService {
3435 public async call (
@@ -421,9 +422,16 @@ export class TriggerTaskServiceV1 extends BaseService {
421422
422423 if ( body . options ?. queue ) {
423424 const concurrencyLimit =
424- typeof body . options . queue . concurrencyLimit === "number"
425- ? Math . max ( 0 , body . options . queue . concurrencyLimit )
426- : undefined ;
425+ typeof body . options . queue ?. concurrencyLimit === "number"
426+ ? Math . max (
427+ Math . min (
428+ body . options . queue . concurrencyLimit ,
429+ environment . maximumConcurrencyLimit ,
430+ environment . organization . maximumConcurrencyLimit
431+ ) ,
432+ 0
433+ )
434+ : null ;
427435
428436 let taskQueue = await tx . taskQueue . findFirst ( {
429437 where : {
@@ -450,13 +458,33 @@ export class TriggerTaskServiceV1 extends BaseService {
450458 } ) ;
451459
452460 if ( typeof taskQueue . concurrencyLimit === "number" ) {
453- await marqs ?. updateQueueConcurrencyLimits (
461+ logger . debug ( "TriggerTaskService: updating concurrency limit" , {
462+ runId : taskRun . id ,
463+ friendlyId : taskRun . friendlyId ,
464+ taskQueue,
465+ orgId : environment . organizationId ,
466+ projectId : environment . projectId ,
467+ existingConcurrencyLimit,
468+ concurrencyLimit,
469+ queueOptions : body . options ?. queue ,
470+ } ) ;
471+ await updateQueueConcurrencyLimits (
454472 environment ,
455473 taskQueue . name ,
456474 taskQueue . concurrencyLimit
457475 ) ;
458476 } else {
459- await marqs ?. removeQueueConcurrencyLimits ( environment , taskQueue . name ) ;
477+ logger . debug ( "TriggerTaskService: removing concurrency limit" , {
478+ runId : taskRun . id ,
479+ friendlyId : taskRun . friendlyId ,
480+ taskQueue,
481+ orgId : environment . organizationId ,
482+ projectId : environment . projectId ,
483+ existingConcurrencyLimit,
484+ concurrencyLimit,
485+ queueOptions : body . options ?. queue ,
486+ } ) ;
487+ await removeQueueConcurrencyLimits ( environment , taskQueue . name ) ;
460488 }
461489 }
462490 } else {
0 commit comments