Skip to content

Commit 91e7657

Browse files
committed
Fix the triggerTask tests in the webapp
1 parent 59324b4 commit 91e7657

File tree

2 files changed

+39
-23
lines changed

2 files changed

+39
-23
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {
234234
lockedQueueId,
235235
});
236236

237-
const masterQueue = await this.queueConcern.getWorkerQueue(environment);
237+
const workerQueue = await this.queueConcern.getWorkerQueue(environment);
238238

239239
try {
240240
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {
@@ -271,7 +271,7 @@ export class RunEngineTriggerTaskService {
271271
concurrencyKey: body.options?.concurrencyKey,
272272
queue: queueName,
273273
lockedQueueId,
274-
workerQueue: masterQueue,
274+
workerQueue,
275275
isTest: body.options?.test ?? false,
276276
delayUntil,
277277
queuedAt: delayUntil ? undefined : new Date(),

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
ValidationResult,
3333
} from "~/runEngine/types";
3434
import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server";
35+
import { setTimeout } from "node:timers/promises";
3536

3637
vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
3738

@@ -490,6 +491,8 @@ describe("RunEngineTriggerTaskService", () => {
490491
},
491492
queue: {
492493
redis: redisOptions,
494+
masterQueueConsumersDisabled: true,
495+
processWorkerQueueDebounceMs: 100,
493496
},
494497
runLock: {
495498
redis: redisOptions,
@@ -569,11 +572,12 @@ describe("RunEngineTriggerTaskService", () => {
569572
);
570573
expect(queueLength).toBe(1);
571574

575+
await setTimeout(500);
576+
572577
// Now we need to dequeue the run so so we can trigger a subtask
573-
const dequeued = await engine.dequeueFromMasterQueue({
578+
const dequeued = await engine.dequeueFromWorkerQueue({
574579
consumerId: "test_12345",
575-
masterQueue: result?.run.masterQueue!,
576-
maxRunCount: 1,
580+
workerQueue: result?.run.workerQueue!,
577581
});
578582

579583
expect(dequeued.length).toBe(1);
@@ -606,11 +610,12 @@ describe("RunEngineTriggerTaskService", () => {
606610
},
607611
});
608612

613+
await setTimeout(500);
614+
609615
// Okay, now lets dequeue the subtask
610-
const dequeuedSubtask = await engine.dequeueFromMasterQueue({
616+
const dequeuedSubtask = await engine.dequeueFromWorkerQueue({
611617
consumerId: "test_12345",
612-
masterQueue: subtaskResult?.run.masterQueue!,
613-
maxRunCount: 1,
618+
workerQueue: subtaskResult?.run.workerQueue!,
614619
});
615620

616621
expect(dequeuedSubtask.length).toBe(1);
@@ -649,6 +654,8 @@ describe("RunEngineTriggerTaskService", () => {
649654
},
650655
queue: {
651656
redis: redisOptions,
657+
masterQueueConsumersDisabled: true,
658+
processWorkerQueueDebounceMs: 100,
652659
},
653660
runLock: {
654661
redis: redisOptions,
@@ -722,11 +729,12 @@ describe("RunEngineTriggerTaskService", () => {
722729
expect(parentResult?.run.queue).toBe(`task/${taskIdentifier1}`);
723730
expect(parentResult?.run.lockedQueueId).toBeDefined();
724731

732+
await setTimeout(500);
733+
725734
// Dequeue the parent run to simulate it running
726-
const dequeued = await engine.dequeueFromMasterQueue({
735+
const dequeued = await engine.dequeueFromWorkerQueue({
727736
consumerId: "test_12345",
728-
masterQueue: parentResult?.run.masterQueue!,
729-
maxRunCount: 1,
737+
workerQueue: parentResult?.run.workerQueue!,
730738
});
731739

732740
expect(dequeued.length).toBe(1);
@@ -923,11 +931,12 @@ describe("RunEngineTriggerTaskService", () => {
923931
expect(parentResult?.run.queue).toBe(`task/${taskIdentifier1}`);
924932
expect(parentResult?.run.lockedQueueId).toBeDefined();
925933

934+
await setTimeout(500);
935+
926936
// Dequeue the parent run to simulate it running
927-
const dequeued = await engine.dequeueFromMasterQueue({
937+
const dequeued = await engine.dequeueFromWorkerQueue({
928938
consumerId: "test_12345",
929-
masterQueue: parentResult?.run.masterQueue!,
930-
maxRunCount: 1,
939+
workerQueue: parentResult?.run.workerQueue!,
931940
});
932941

933942
expect(dequeued.length).toBe(1);
@@ -980,6 +989,8 @@ describe("RunEngineTriggerTaskService", () => {
980989
},
981990
queue: {
982991
redis: redisOptions,
992+
masterQueueConsumersDisabled: true,
993+
processWorkerQueueDebounceMs: 100,
983994
},
984995
runLock: {
985996
redis: redisOptions,
@@ -1053,11 +1064,12 @@ describe("RunEngineTriggerTaskService", () => {
10531064
expect(parentResult?.run.queue).toBe(`task/${taskIdentifier1}`);
10541065
expect(parentResult?.run.lockedQueueId).toBeDefined();
10551066

1067+
await setTimeout(500);
1068+
10561069
// Dequeue the parent run to simulate it running
1057-
const dequeued = await engine.dequeueFromMasterQueue({
1070+
const dequeued = await engine.dequeueFromWorkerQueue({
10581071
consumerId: "test_12345",
1059-
masterQueue: parentResult?.run.masterQueue!,
1060-
maxRunCount: 1,
1072+
workerQueue: parentResult?.run.workerQueue!,
10611073
});
10621074

10631075
expect(dequeued.length).toBe(1);
@@ -1109,6 +1121,8 @@ describe("RunEngineTriggerTaskService", () => {
11091121
},
11101122
queue: {
11111123
redis: redisOptions,
1124+
masterQueueConsumersDisabled: true,
1125+
processWorkerQueueDebounceMs: 100,
11121126
},
11131127
runLock: {
11141128
redis: redisOptions,
@@ -1186,11 +1200,12 @@ describe("RunEngineTriggerTaskService", () => {
11861200
);
11871201
expect(queueLength).toBe(1);
11881202

1203+
await setTimeout(500);
1204+
11891205
// Now we need to dequeue the run so so we can trigger a subtask
1190-
const dequeued = await engine.dequeueFromMasterQueue({
1206+
const dequeued = await engine.dequeueFromWorkerQueue({
11911207
consumerId: "test_12345",
1192-
masterQueue: result?.run.masterQueue!,
1193-
maxRunCount: 1,
1208+
workerQueue: result?.run.workerQueue!,
11941209
});
11951210

11961211
expect(dequeued.length).toBe(1);
@@ -1223,11 +1238,12 @@ describe("RunEngineTriggerTaskService", () => {
12231238
},
12241239
});
12251240

1241+
await setTimeout(500);
1242+
12261243
// Okay, now lets dequeue the subtask
1227-
const dequeuedSubtask = await engine.dequeueFromMasterQueue({
1244+
const dequeuedSubtask = await engine.dequeueFromWorkerQueue({
12281245
consumerId: "test_12345",
1229-
masterQueue: subtaskResult?.run.masterQueue!,
1230-
maxRunCount: 1,
1246+
workerQueue: subtaskResult?.run.workerQueue!,
12311247
});
12321248

12331249
expect(dequeuedSubtask.length).toBe(1);

0 commit comments

Comments
 (0)