Skip to content

Commit f62cd8f

Browse files
committed
requeuing should clear concurrency sets
1 parent 30cb631 commit f62cd8f

File tree

5 files changed

+183
-38
lines changed

5 files changed

+183
-38
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
export const MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET = 31_556_952 * 1000; // 1 year
22
export const MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET = 15_778_476 * 1000; // 6 months
3+
export const MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS = 500;
4+
export const MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS = 500;

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 133 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ import {
3737
import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server";
3838
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
3939
import {
40+
MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS,
4041
MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET,
4142
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
43+
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
4244
} from "./constants.server";
4345

4446
const KEY_PREFIX = "marqs:";
@@ -210,20 +212,24 @@ export class MarQS {
210212

211213
propagation.inject(context.active(), messageData);
212214

215+
const $timestamp =
216+
typeof timestamp === "undefined"
217+
? Date.now()
218+
: typeof timestamp === "number"
219+
? timestamp
220+
: timestamp.getTime();
221+
213222
const messagePayload: MessagePayload = {
214223
version: "1",
215224
data: messageData,
216225
queue: messageQueue,
217226
concurrencyKey,
218-
timestamp:
219-
typeof timestamp === "undefined"
220-
? Date.now()
221-
: typeof timestamp === "number"
222-
? timestamp
223-
: timestamp.getTime(),
227+
timestamp: $timestamp,
224228
messageId,
225229
parentQueue,
226230
priority,
231+
availableAt: Date.now(),
232+
enqueueMethod: "enqueue",
227233
};
228234

229235
span.setAttributes({
@@ -294,6 +300,7 @@ export class MarQS {
294300
messageId,
295301
parentQueue: oldMessage.parentQueue,
296302
priority: oldMessage.priority,
303+
enqueueMethod: "replace",
297304
};
298305

299306
await this.#saveMessageIfExists(newMessage);
@@ -336,6 +343,8 @@ export class MarQS {
336343
tracestate: oldMessage.data.tracestate,
337344
};
338345

346+
const $timestamp = timestamp ?? Date.now();
347+
339348
const newMessage: MessagePayload = {
340349
version: "1",
341350
// preserve original trace context
@@ -347,32 +356,21 @@ export class MarQS {
347356
},
348357
queue: oldMessage.queue,
349358
concurrencyKey: oldMessage.concurrencyKey,
350-
timestamp: timestamp ?? Date.now(),
359+
timestamp: $timestamp,
351360
messageId,
352361
parentQueue: oldMessage.parentQueue,
353362
priority: priority ?? oldMessage.priority,
363+
availableAt: $timestamp,
364+
enqueueMethod: "requeue",
354365
};
355366

356367
await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);
357-
await this.#saveMessage(newMessage);
358368

359369
// If the message timestamp is enough in the future (e.g. more than 500ms from now),
360370
// we will schedule it to be requeued in the future using the legacy run engine redis worker
361371
// If not, we just requeue it immediately
362-
if (timestamp && timestamp > Date.now() + 500) {
363-
logger.debug(`Scheduling requeue for message`, {
364-
timestamp,
365-
service: this.name,
366-
newMessage,
367-
});
368-
369-
// Schedule the requeue in the future
370-
await legacyRunEngineWorker.enqueue({
371-
id: `marqs-requeue-${messageId}`,
372-
job: "scheduleRequeueMessage",
373-
payload: { messageId },
374-
availableAt: new Date(timestamp - 150), // 150ms before the timestamp
375-
});
372+
if ($timestamp > Date.now() + MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS) {
373+
await this.#callDelayedRequeueMessage(newMessage);
376374
} else {
377375
await this.#callRequeueMessage(newMessage);
378376
}
@@ -478,7 +476,11 @@ export class MarQS {
478476
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
479477
attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success
480478
message_timestamp: message.timestamp,
481-
message_age: Date.now() - message.timestamp,
479+
message_age: this.#calculateMessageAge(message),
480+
message_priority: message.priority,
481+
message_enqueue_method: message.enqueueMethod,
482+
message_available_at: message.availableAt,
483+
...flattenAttributes(message.data, "message.data"),
482484
});
483485

484486
await this.options.subscriber?.messageDequeued(message);
@@ -568,20 +570,20 @@ export class MarQS {
568570
const message = await this.readMessage(messageData.messageId);
569571

570572
if (message) {
571-
const ageOfMessageInMs = Date.now() - message.timestamp;
572-
573573
span.setAttributes({
574574
[SEMATTRS_MESSAGE_ID]: message.messageId,
575575
[SemanticAttributes.QUEUE]: message.queue,
576576
[SemanticAttributes.MESSAGE_ID]: message.messageId,
577577
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
578578
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
579-
age_in_seconds: ageOfMessageInMs / 1000,
580579
attempted_queues: attemptedQueues, // How many queues we tried before success
581580
attempted_envs: attemptedEnvs, // How many environments we tried before success
582581
message_timestamp: message.timestamp,
583-
message_age: Date.now() - message.timestamp,
584-
...flattenAttributes(message.data, "message"),
582+
message_age: this.#calculateMessageAge(message),
583+
message_priority: message.priority,
584+
message_enqueue_method: message.enqueueMethod,
585+
message_available_at: message.availableAt,
586+
...flattenAttributes(message.data, "message.data"),
585587
});
586588

587589
await this.options.subscriber?.messageDequeued(message);
@@ -806,6 +808,12 @@ export class MarQS {
806808
}
807809
}
808810

811+
#calculateMessageAge(message: MessagePayload) {
812+
const $timestamp = message.availableAt ?? message.timestamp;
813+
814+
return Date.now() - $timestamp;
815+
}
816+
809817
async #getNackCount(messageId: string): Promise<number> {
810818
const result = await this.redis.get(this.keys.nackCounterKey(messageId));
811819

@@ -1348,6 +1356,67 @@ export class MarQS {
13481356
return true;
13491357
}
13501358

1359+
async #callDelayedRequeueMessage(message: MessagePayload) {
1360+
const messageKey = this.keys.messageKey(message.messageId);
1361+
const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue);
1362+
const queueReserveConcurrencyKey = this.keys.queueReserveConcurrencyKeyFromQueue(message.queue);
1363+
const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
1364+
const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(message.queue);
1365+
1366+
const messageId = message.messageId;
1367+
const messageData = JSON.stringify(message);
1368+
1369+
logger.debug("Calling delayedRequeueMessage", {
1370+
service: this.name,
1371+
messageKey,
1372+
queueCurrentConcurrencyKey,
1373+
queueReserveConcurrencyKey,
1374+
envCurrentConcurrencyKey,
1375+
envReserveConcurrencyKey,
1376+
messageId,
1377+
messageData,
1378+
});
1379+
1380+
const result = await this.redis.delayedRequeueMessage(
1381+
messageKey,
1382+
queueCurrentConcurrencyKey,
1383+
queueReserveConcurrencyKey,
1384+
envCurrentConcurrencyKey,
1385+
envReserveConcurrencyKey,
1386+
messageId,
1387+
messageData
1388+
);
1389+
1390+
logger.debug("delayedRequeueMessage result", {
1391+
service: this.name,
1392+
messageKey,
1393+
queueCurrentConcurrencyKey,
1394+
queueReserveConcurrencyKey,
1395+
envCurrentConcurrencyKey,
1396+
envReserveConcurrencyKey,
1397+
messageId,
1398+
messageData,
1399+
result,
1400+
});
1401+
1402+
logger.debug("Enqueuing scheduleRequeueMessage in LRE worker", {
1403+
service: this.name,
1404+
message,
1405+
});
1406+
1407+
// Schedule the requeue in the future
1408+
await legacyRunEngineWorker.enqueue({
1409+
id: `marqs-requeue-${messageId}`,
1410+
job: "scheduleRequeueMessage",
1411+
payload: { messageId },
1412+
availableAt: new Date(
1413+
message.timestamp - MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS
1414+
),
1415+
});
1416+
1417+
return true;
1418+
}
1419+
13511420
async #callAcknowledgeMessage({
13521421
parentQueue,
13531422
messageQueue,
@@ -1791,6 +1860,31 @@ redis.call('SREM', queueReserveConcurrencyKey, messageId)
17911860
redis.call('SREM', envCurrentConcurrencyKey, messageId)
17921861
redis.call('SREM', envReserveConcurrencyKey, messageId)
17931862
1863+
return true
1864+
`,
1865+
});
1866+
1867+
this.redis.defineCommand("delayedRequeueMessage", {
1868+
numberOfKeys: 5,
1869+
lua: `
1870+
local messageKey = KEYS[1]
1871+
local queueCurrentConcurrencyKey = KEYS[2]
1872+
local queueReserveConcurrencyKey = KEYS[3]
1873+
local envCurrentConcurrencyKey = KEYS[4]
1874+
local envReserveConcurrencyKey = KEYS[5]
1875+
1876+
local messageId = ARGV[1]
1877+
local messageData = ARGV[2]
1878+
1879+
-- Write the new message data
1880+
redis.call('SET', messageKey, messageData)
1881+
1882+
-- Clear all concurrency sets
1883+
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
1884+
redis.call('SREM', queueReserveConcurrencyKey, messageId)
1885+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1886+
redis.call('SREM', envReserveConcurrencyKey, messageId)
1887+
17941888
return true
17951889
`,
17961890
});
@@ -1967,6 +2061,17 @@ declare module "ioredis" {
19672061
callback?: Callback<string>
19682062
): Result<string, Context>;
19692063

2064+
delayedRequeueMessage(
2065+
messageKey: string,
2066+
queueCurrentConcurrencyKey: string,
2067+
queueReserveConcurrencyKey: string,
2068+
envCurrentConcurrencyKey: string,
2069+
envReserveConcurrencyKey: string,
2070+
messageId: string,
2071+
messageData: string,
2072+
callback?: Callback<string>
2073+
): Result<string, Context>;
2074+
19702075
acknowledgeMessage(
19712076
parentQueue: string,
19722077
messageKey: string,

apps/webapp/app/v3/marqs/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ export const MessagePayload = z.object({
8383
parentQueue: z.string(),
8484
concurrencyKey: z.string().optional(),
8585
priority: MarQSPriorityLevel.optional(),
86+
availableAt: z.number().optional(),
87+
enqueueMethod: z.enum(["enqueue", "requeue", "replace"]).default("enqueue"),
8688
});
8789

8890
export type MessagePayload = z.infer<typeof MessagePayload>;

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,8 +614,7 @@ export class CompleteAttemptService extends BaseService {
614614
});
615615

616616
if (environment.type === "DEVELOPMENT") {
617-
// TODO: move the scheduling of this into the LRE redis worker
618-
marqs.requeueMessage(taskRunAttempt.taskRunId, {}, executionRetry.timestamp, "retry");
617+
await marqs.requeueMessage(taskRunAttempt.taskRunId, {}, executionRetry.timestamp, "retry");
619618

620619
return "RETRIED";
621620
}

0 commit comments

Comments
 (0)