Skip to content

Commit 903dd43

Browse files
committed
Remove the runlock from the trigger path
1 parent 20a8e7c commit 903dd43

File tree

3 files changed

+59
-47
lines changed

3 files changed

+59
-47
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -486,56 +486,52 @@ export class RunEngine {
486486

487487
span.setAttribute("runId", taskRun.id);
488488

489-
await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => {
490-
//create associated waitpoint (this completes when the run completes)
491-
const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint(
492-
prisma,
493-
{
494-
projectId: environment.project.id,
495-
environmentId: environment.id,
496-
completedByTaskRunId: taskRun.id,
497-
}
498-
);
499-
500-
//triggerAndWait or batchTriggerAndWait
501-
if (resumeParentOnCompletion && parentTaskRunId) {
502-
//this will block the parent run from continuing until this waitpoint is completed (and removed)
503-
await this.waitpointSystem.blockRunWithWaitpoint({
504-
runId: parentTaskRunId,
505-
waitpoints: associatedWaitpoint.id,
506-
projectId: associatedWaitpoint.projectId,
507-
organizationId: environment.organization.id,
508-
batch,
509-
workerId,
510-
runnerId,
511-
tx: prisma,
512-
releaseConcurrency,
513-
});
489+
//create associated waitpoint (this completes when the run completes)
490+
const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint(
491+
prisma,
492+
{
493+
projectId: environment.project.id,
494+
environmentId: environment.id,
495+
completedByTaskRunId: taskRun.id,
514496
}
497+
);
515498

516-
//Make sure lock extension succeeded
517-
signal.throwIfAborted();
518-
519-
if (taskRun.delayUntil) {
520-
// Schedule the run to be enqueued at the delayUntil time
521-
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
522-
runId: taskRun.id,
523-
delayUntil: taskRun.delayUntil,
524-
});
525-
} else {
526-
await this.enqueueSystem.enqueueRun({
527-
run: taskRun,
528-
env: environment,
529-
workerId,
530-
runnerId,
531-
tx: prisma,
532-
});
499+
//triggerAndWait or batchTriggerAndWait
500+
if (resumeParentOnCompletion && parentTaskRunId) {
501+
//this will block the parent run from continuing until this waitpoint is completed (and removed)
502+
await this.waitpointSystem.blockRunWithWaitpoint({
503+
runId: parentTaskRunId,
504+
waitpoints: associatedWaitpoint.id,
505+
projectId: associatedWaitpoint.projectId,
506+
organizationId: environment.organization.id,
507+
batch,
508+
workerId,
509+
runnerId,
510+
tx: prisma,
511+
releaseConcurrency,
512+
});
513+
}
533514

534-
if (taskRun.ttl) {
535-
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
536-
}
515+
if (taskRun.delayUntil) {
516+
// Schedule the run to be enqueued at the delayUntil time
517+
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
518+
runId: taskRun.id,
519+
delayUntil: taskRun.delayUntil,
520+
});
521+
} else {
522+
if (taskRun.ttl) {
523+
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
537524
}
538-
});
525+
526+
await this.enqueueSystem.enqueueRun({
527+
run: taskRun,
528+
env: environment,
529+
workerId,
530+
runnerId,
531+
tx: prisma,
532+
skipRunLock: true,
533+
});
534+
}
539535

540536
this.eventBus.emit("runCreated", {
541537
time: new Date(),

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,20 @@ export class RunLocker {
165165
);
166166
}
167167

168+
async lockIf<T>(
169+
condition: boolean,
170+
name: string,
171+
resources: string[],
172+
duration: number,
173+
routine: () => Promise<T>
174+
): Promise<T> {
175+
if (condition) {
176+
return this.lock(name, resources, duration, routine);
177+
} else {
178+
return routine();
179+
}
180+
}
181+
168182
isInsideLock(): boolean {
169183
return !!this.asyncLocalStorage.getStore();
170184
}

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export class EnqueueSystem {
3333
completedWaitpoints,
3434
workerId,
3535
runnerId,
36+
skipRunLock,
3637
}: {
3738
run: TaskRun;
3839
env: MinimalAuthenticatedEnvironment;
@@ -51,10 +52,11 @@ export class EnqueueSystem {
5152
}[];
5253
workerId?: string;
5354
runnerId?: string;
55+
skipRunLock?: boolean;
5456
}) {
5557
const prisma = tx ?? this.$.prisma;
5658

57-
return await this.$.runLock.lock("enqueueRun", [run.id], 5000, async () => {
59+
return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], 5000, async () => {
5860
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
5961
run: run,
6062
snapshot: {

0 commit comments

Comments
 (0)