Skip to content

Commit 01f62df

Browse files
committed
Remove message from worker queue in certain circumstances when acking
1 parent 5b78b5d commit 01f62df

File tree

4 files changed

+146
-8
lines changed

4 files changed

+146
-8
lines changed

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,9 @@ export class RunAttemptSystem {
10501050
}
10511051

10521052
//remove it from the queue and release concurrency
1053-
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
1053+
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
1054+
removeFromWorkerQueue: true,
1055+
});
10541056

10551057
await this.releaseConcurrencySystem.refillTokensForSnapshot(latestSnapshot);
10561058

@@ -1233,7 +1235,9 @@ export class RunAttemptSystem {
12331235
throw new ServiceValidationError("No associated waitpoint found", 400);
12341236
}
12351237

1236-
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
1238+
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId, {
1239+
removeFromWorkerQueue: true,
1240+
});
12371241

12381242
await this.waitpointSystem.completeWaitpoint({
12391243
id: run.associatedWaitpoint.id,

internal-packages/run-engine/src/engine/systems/ttlSystem.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,13 @@ export class TtlSystem {
106106
},
107107
});
108108

109-
await this.$.runQueue.acknowledgeMessage(updatedRun.runtimeEnvironment.organizationId, runId);
109+
await this.$.runQueue.acknowledgeMessage(
110+
updatedRun.runtimeEnvironment.organizationId,
111+
runId,
112+
{
113+
removeFromWorkerQueue: true,
114+
}
115+
);
110116

111117
if (!updatedRun.associatedWaitpoint) {
112118
throw new ServiceValidationError("No associated waitpoint found", 400);

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ describe("RunQueue", () => {
482482
}
483483
);
484484

485-
redisTest("Enqueue/Dequeue a 8 shards", async ({ redisContainer }) => {
485+
redisTest("Enqueue/Dequeue with 8 shards", async ({ redisContainer }) => {
486486
const queue = new RunQueue({
487487
...testOptions,
488488
shardCount: 8,
@@ -716,6 +716,98 @@ describe("RunQueue", () => {
716716
}
717717
});
718718

719+
redisTest(
720+
"Ack after moving to workerQueue with removeFromWorkerQueue = undefined",
721+
{ timeout: 5_000 },
722+
async ({ redisContainer }) => {
723+
const queue = new RunQueue({
724+
...testOptions,
725+
queueSelectionStrategy: new FairQueueSelectionStrategy({
726+
redis: {
727+
keyPrefix: "runqueue:test:",
728+
host: redisContainer.getHost(),
729+
port: redisContainer.getPort(),
730+
},
731+
keys: testOptions.keys,
732+
}),
733+
redis: {
734+
keyPrefix: "runqueue:test:",
735+
host: redisContainer.getHost(),
736+
port: redisContainer.getPort(),
737+
},
738+
});
739+
740+
try {
741+
await queue.enqueueMessage({
742+
env: authenticatedEnvProd,
743+
message: messageProd,
744+
workerQueue: "main",
745+
});
746+
747+
const queueLength = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
748+
expect(queueLength).toBe(1);
749+
const envQueueLength = await queue.lengthOfEnvQueue(authenticatedEnvProd);
750+
expect(envQueueLength).toBe(1);
751+
752+
await setTimeout(1000);
753+
754+
await queue.acknowledgeMessage(messageProd.orgId, messageProd.runId);
755+
756+
const messages = await queue.peekAllOnWorkerQueue("main");
757+
expect(messages.length).toEqual(1);
758+
} finally {
759+
await queue.quit();
760+
}
761+
}
762+
);
763+
764+
redisTest(
765+
"Ack after moving to workerQueue with removeFromWorkerQueue = true",
766+
{ timeout: 5_000 },
767+
async ({ redisContainer }) => {
768+
const queue = new RunQueue({
769+
...testOptions,
770+
queueSelectionStrategy: new FairQueueSelectionStrategy({
771+
redis: {
772+
keyPrefix: "runqueue:test:",
773+
host: redisContainer.getHost(),
774+
port: redisContainer.getPort(),
775+
},
776+
keys: testOptions.keys,
777+
}),
778+
redis: {
779+
keyPrefix: "runqueue:test:",
780+
host: redisContainer.getHost(),
781+
port: redisContainer.getPort(),
782+
},
783+
});
784+
785+
try {
786+
await queue.enqueueMessage({
787+
env: authenticatedEnvProd,
788+
message: messageProd,
789+
workerQueue: "main",
790+
});
791+
792+
const queueLength = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
793+
expect(queueLength).toBe(1);
794+
const envQueueLength = await queue.lengthOfEnvQueue(authenticatedEnvProd);
795+
expect(envQueueLength).toBe(1);
796+
797+
await setTimeout(1000);
798+
799+
await queue.acknowledgeMessage(messageProd.orgId, messageProd.runId, {
800+
removeFromWorkerQueue: true,
801+
});
802+
803+
const messages = await queue.peekAllOnWorkerQueue("main");
804+
expect(messages.length).toEqual(0);
805+
} finally {
806+
await queue.quit();
807+
}
808+
}
809+
);
810+
719811
redisTest("Nacking", { timeout: 15_000 }, async ({ redisContainer, redisOptions }) => {
720812
const queue = new RunQueue({
721813
...testOptions,

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ export class RunQueue {
550550
public async acknowledgeMessage(
551551
orgId: string,
552552
messageId: string,
553-
options?: { skipDequeueProcessing?: boolean }
553+
options?: { skipDequeueProcessing?: boolean; removeFromWorkerQueue?: boolean }
554554
) {
555555
return this.#trace(
556556
"acknowledgeMessage",
@@ -586,6 +586,7 @@ export class RunQueue {
586586

587587
await this.#callAcknowledgeMessage({
588588
message,
589+
removeFromWorkerQueue: options?.removeFromWorkerQueue,
589590
});
590591
},
591592
{
@@ -836,6 +837,14 @@ export class RunQueue {
836837
await this.redis.quit();
837838
}
838839

840+
/**
841+
* Peek all messages on a worker queue (useful for tests or debugging)
842+
*/
843+
async peekAllOnWorkerQueue(workerQueue: string) {
844+
const workerQueueKey = this.keys.workerQueueKey(workerQueue);
845+
return await this.redis.lrange(workerQueueKey, 0, -1);
846+
}
847+
839848
private async handleRedriveMessage(channel: string, message: string) {
840849
try {
841850
const { runId, envId, projectId, orgId } = JSON.parse(message) as any;
@@ -1430,7 +1439,13 @@ export class RunQueue {
14301439
};
14311440
}
14321441

1433-
async #callAcknowledgeMessage({ message }: { message: OutputPayload }) {
1442+
async #callAcknowledgeMessage({
1443+
message,
1444+
removeFromWorkerQueue,
1445+
}: {
1446+
message: OutputPayload;
1447+
removeFromWorkerQueue?: boolean;
1448+
}) {
14341449
const messageId = message.runId;
14351450
const messageKey = this.keys.messageKey(message.orgId, messageId);
14361451
const messageQueue = message.queue;
@@ -1441,6 +1456,9 @@ export class RunQueue {
14411456
message.environmentId,
14421457
this.shardCount
14431458
);
1459+
const workerQueue = this.#getWorkerQueueFromMessage(message);
1460+
const workerQueueKey = this.keys.workerQueueKey(workerQueue);
1461+
const messageKeyValue = this.keys.messageKey(message.orgId, messageId);
14441462

14451463
this.logger.debug("Calling acknowledgeMessage", {
14461464
messageKey,
@@ -1450,6 +1468,10 @@ export class RunQueue {
14501468
envQueueKey,
14511469
messageId,
14521470
masterQueueKey,
1471+
workerQueue,
1472+
workerQueueKey,
1473+
removeFromWorkerQueue,
1474+
messageKeyValue,
14531475
service: this.name,
14541476
});
14551477

@@ -1460,8 +1482,11 @@ export class RunQueue {
14601482
queueCurrentConcurrencyKey,
14611483
envCurrentConcurrencyKey,
14621484
envQueueKey,
1485+
workerQueueKey,
14631486
messageId,
1464-
messageQueue
1487+
messageQueue,
1488+
messageKeyValue,
1489+
removeFromWorkerQueue ? "1" : "0"
14651490
);
14661491
}
14671492

@@ -1767,7 +1792,7 @@ return results
17671792
});
17681793

17691794
this.redis.defineCommand("acknowledgeMessage", {
1770-
numberOfKeys: 6,
1795+
numberOfKeys: 7,
17711796
lua: `
17721797
-- Keys:
17731798
local masterQueueKey = KEYS[1]
@@ -1776,10 +1801,13 @@ local messageQueueKey = KEYS[3]
17761801
local queueCurrentConcurrencyKey = KEYS[4]
17771802
local envCurrentConcurrencyKey = KEYS[5]
17781803
local envQueueKey = KEYS[6]
1804+
local workerQueueKey = KEYS[7]
17791805
17801806
-- Args:
17811807
local messageId = ARGV[1]
17821808
local messageQueueName = ARGV[2]
1809+
local messageKeyValue = ARGV[3]
1810+
local removeFromWorkerQueue = ARGV[4]
17831811
17841812
-- Remove the message from the message key
17851813
redis.call('DEL', messageKey)
@@ -1799,6 +1827,11 @@ end
17991827
-- Update the concurrency keys
18001828
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
18011829
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1830+
1831+
-- Remove the message from the worker queue
1832+
if removeFromWorkerQueue == '1' then
1833+
redis.call('LREM', workerQueueKey, 0, messageKeyValue)
1834+
end
18021835
`,
18031836
});
18041837

@@ -2025,8 +2058,11 @@ declare module "@internal/redis" {
20252058
concurrencyKey: string,
20262059
envConcurrencyKey: string,
20272060
envQueueKey: string,
2061+
workerQueueKey: string,
20282062
messageId: string,
20292063
messageQueueName: string,
2064+
messageKeyValue: string,
2065+
removeFromWorkerQueue: string,
20302066
callback?: Callback<void>
20312067
): Result<void, Context>;
20322068

0 commit comments

Comments
 (0)