@@ -251,6 +251,123 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
251251 ) ;
252252 }
253253
254+ const lockedToBackgroundWorker = body . options ?. lockToVersion
255+ ? await this . _prisma . backgroundWorker . findFirst ( {
256+ where : {
257+ projectId : environment . projectId ,
258+ runtimeEnvironmentId : environment . id ,
259+ version : body . options ?. lockToVersion ,
260+ } ,
261+ select : {
262+ id : true ,
263+ version : true ,
264+ sdkVersion : true ,
265+ cliVersion : true ,
266+ } ,
267+ } )
268+ : undefined ;
269+
270+ let queueName : string ;
271+ let lockedQueueId : string | undefined ;
272+
273+ // Determine queue name based on lockToVersion and provided options
274+ if ( lockedToBackgroundWorker ) {
275+ // Task is locked to a specific worker version
276+ if ( body . options ?. queue ?. name ) {
277+ const specifiedQueueName = body . options . queue . name ;
278+ // A specific queue name is provided
279+ const specifiedQueue = await this . _prisma . taskQueue . findFirst ( {
280+ // Validate it exists for the locked worker
281+ where : {
282+ name : specifiedQueueName ,
283+ workers : { some : { id : lockedToBackgroundWorker . id } } , // Ensure the queue is associated with any task of the locked worker
284+ } ,
285+ } ) ;
286+
287+ if ( ! specifiedQueue ) {
288+ throw new ServiceValidationError (
289+ `Specified queue '${ specifiedQueueName } ' not found or not associated with locked worker version '${
290+ body . options ?. lockToVersion ?? "<unknown>"
291+ } '.`
292+ ) ;
293+ }
294+ // Use the validated queue name directly
295+ queueName = specifiedQueue . name ;
296+ lockedQueueId = specifiedQueue . id ;
297+ } else {
298+ // No specific queue name provided, use the default queue for the task on the locked worker
299+ const lockedTask = await this . _prisma . backgroundWorkerTask . findFirst ( {
300+ where : {
301+ workerId : lockedToBackgroundWorker . id ,
302+ slug : taskId ,
303+ } ,
304+ include : {
305+ queue : true ,
306+ } ,
307+ } ) ;
308+
309+ if ( ! lockedTask ) {
310+ throw new ServiceValidationError (
311+ `Task '${ taskId } ' not found on locked worker version '${
312+ body . options ?. lockToVersion ?? "<unknown>"
313+ } '.`
314+ ) ;
315+ }
316+
317+ if ( ! lockedTask . queue ) {
318+ // This case should ideally be prevented by earlier checks or schema constraints,
319+ // but handle it defensively.
320+ logger . error ( "Task found on locked worker, but has no associated queue record" , {
321+ taskId,
322+ workerId : lockedToBackgroundWorker . id ,
323+ version : lockedToBackgroundWorker . version ,
324+ } ) ;
325+ throw new ServiceValidationError (
326+ `Default queue configuration for task '${ taskId } ' missing on locked worker version '${
327+ body . options ?. lockToVersion ?? "<unknown>"
328+ } '.`
329+ ) ;
330+ }
331+ // Use the task's default queue name
332+ queueName = lockedTask . queue . name ;
333+ lockedQueueId = lockedTask . queue . id ;
334+ }
335+ } else {
336+ // Task is not locked to a specific version, use regular logic
337+ if ( body . options ?. lockToVersion ) {
338+ // This should only happen if the findFirst failed, indicating the version doesn't exist
339+ throw new ServiceValidationError (
340+ `Task locked to version '${ body . options . lockToVersion } ', but no worker found with that version.`
341+ ) ;
342+ }
343+
344+ // Get queue name using the helper for non-locked case (handles provided name or finds default)
345+ queueName = await this . #getQueueName( taskId , environment , body . options ?. queue ?. name ) ;
346+ }
347+
348+ // Sanitize the final determined queue name once
349+ const sanitizedQueueName = sanitizeQueueName ( queueName ) ;
350+
351+ // Check that the queuename is not an empty string
352+ if ( ! sanitizedQueueName ) {
353+ queueName = sanitizeQueueName ( `task/${ taskId } ` ) ; // Fallback if sanitization results in empty
354+ } else {
355+ queueName = sanitizedQueueName ;
356+ }
357+
358+ //upsert tags
359+ const tags = await createTags (
360+ {
361+ tags : body . options ?. tags ,
362+ projectId : environment . projectId ,
363+ } ,
364+ this . _prisma
365+ ) ;
366+
367+ const depth = parentRun ? parentRun . depth + 1 : 0 ;
368+
369+ const masterQueue = await this . #getMasterQueueForEnvironment( environment ) ;
370+
254371 try {
255372 return await eventRepository . traceEvent (
256373 taskId ,
@@ -279,50 +396,11 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
279396 const result = await autoIncrementCounter . incrementInTransaction (
280397 `v3-run:${ environment . id } :${ taskId } ` ,
281398 async ( num , tx ) => {
282- const lockedToBackgroundWorker = body . options ?. lockToVersion
283- ? await tx . backgroundWorker . findFirst ( {
284- where : {
285- projectId : environment . projectId ,
286- runtimeEnvironmentId : environment . id ,
287- version : body . options ?. lockToVersion ,
288- } ,
289- select : {
290- id : true ,
291- version : true ,
292- sdkVersion : true ,
293- cliVersion : true ,
294- } ,
295- } )
296- : undefined ;
297-
298- let queueName = sanitizeQueueName (
299- await this . #getQueueName( taskId , environment , body . options ?. queue ?. name )
300- ) ;
301-
302- // Check that the queuename is not an empty string
303- if ( ! queueName ) {
304- queueName = sanitizeQueueName ( `task/${ taskId } ` ) ;
305- }
306-
307399 event . setAttribute ( "queueName" , queueName ) ;
308400 span . setAttribute ( "queueName" , queueName ) ;
309-
310- //upsert tags
311- const tags = await createTags (
312- {
313- tags : body . options ?. tags ,
314- projectId : environment . projectId ,
315- } ,
316- this . _prisma
317- ) ;
318-
319- const depth = parentRun ? parentRun . depth + 1 : 0 ;
320-
321401 event . setAttribute ( "runId" , runFriendlyId ) ;
322402 span . setAttribute ( "runId" , runFriendlyId ) ;
323403
324- const masterQueue = await this . #getMasterQueueForEnvironment( environment ) ;
325-
326404 const taskRun = await this . _engine . trigger (
327405 {
328406 number : num ,
@@ -345,6 +423,7 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
345423 cliVersion : lockedToBackgroundWorker ?. cliVersion ,
346424 concurrencyKey : body . options ?. concurrencyKey ,
347425 queue : queueName ,
426+ lockedQueueId,
348427 masterQueue : masterQueue ,
349428 isTest : body . options ?. test ?? false ,
350429 delayUntil,
@@ -473,13 +552,15 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
473552 return workerGroup . masterQueue ;
474553 }
475554
555+ // Gets the queue name when the task is NOT locked to a specific version
476556 async #getQueueName( taskId : string , environment : AuthenticatedEnvironment , queueName ?: string ) {
477557 if ( queueName ) {
478558 return queueName ;
479559 }
480560
481561 const defaultQueueName = `task/${ taskId } ` ;
482562
563+ // Find the current worker for the environment
483564 const worker = await findCurrentWorkerFromEnvironment ( environment ) ;
484565
485566 if ( ! worker ) {
0 commit comments