Skip to content

Commit 61fab3d

Browse files
committed
WIP priority queues
1 parent bb53fc5 commit 61fab3d

File tree

9 files changed

+1000
-244
lines changed

9 files changed

+1000
-244
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { EnvQueues, MarQSFairDequeueStrategy, MarQSKeyProducer } from "./types";
2+
3+
export type EnvPriorityDequeuingStrategyOptions = {
4+
keys: MarQSKeyProducer;
5+
delegate: MarQSFairDequeueStrategy;
6+
};
7+
8+
export class EnvPriorityDequeuingStrategy implements MarQSFairDequeueStrategy {
9+
private _delegate: MarQSFairDequeueStrategy;
10+
11+
constructor(private options: EnvPriorityDequeuingStrategyOptions) {
12+
this._delegate = options.delegate;
13+
}
14+
15+
async distributeFairQueuesFromParentQueue(
16+
parentQueue: string,
17+
consumerId: string
18+
): Promise<Array<EnvQueues>> {
19+
const envQueues = await this._delegate.distributeFairQueuesFromParentQueue(
20+
parentQueue,
21+
consumerId
22+
);
23+
24+
return this.#sortQueuesInEnvironmentsByPriority(envQueues);
25+
}
26+
27+
#sortQueuesInEnvironmentsByPriority(envs: EnvQueues[]): EnvQueues[] {
28+
return envs.map((env) => {
29+
return this.#sortQueuesInEnvironmentByPriority(env);
30+
});
31+
}
32+
33+
// Sorts the queues by priority. A higher priority means the queue should be dequeued first.
34+
// All the queues with the same priority should keep the order they were in the original list.
35+
// So that means if all the queues have the same priority, the order should be preserved.
36+
#sortQueuesInEnvironmentByPriority(env: EnvQueues): EnvQueues {
37+
const queues = env.queues;
38+
39+
const sortedQueues = [...queues].sort((a, b) => {
40+
const aPriority = this.#getQueuePriority(a);
41+
const bPriority = this.#getQueuePriority(b);
42+
43+
if (aPriority === bPriority) {
44+
return 0;
45+
}
46+
47+
return aPriority > bPriority ? -1 : 1;
48+
});
49+
50+
return { envId: env.envId, queues: sortedQueues };
51+
}
52+
53+
#getQueuePriority(queue: string): number {
54+
const queueRecord = this.options.keys.queueDescriptorFromQueue(queue);
55+
56+
return queueRecord.priority ?? 0;
57+
}
58+
}

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } f
33
import { MemoryStore } from "@unkey/cache/stores";
44
import { randomUUID } from "crypto";
55
import { Redis } from "ioredis";
6-
import { MarQSFairDequeueStrategy, MarQSKeyProducer } from "./types";
6+
import { EnvQueues, MarQSFairDequeueStrategy, MarQSKeyProducer } from "./types";
77
import seedrandom from "seedrandom";
88
import { Tracer } from "@opentelemetry/api";
99
import { startSpan } from "../tracing.server";
@@ -111,7 +111,7 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
111111
async distributeFairQueuesFromParentQueue(
112112
parentQueue: string,
113113
consumerId: string
114-
): Promise<Array<string>> {
114+
): Promise<Array<EnvQueues>> {
115115
return await startSpan(
116116
this.options.tracer,
117117
"distributeFairQueuesFromParentQueue",
@@ -132,21 +132,27 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
132132
return [];
133133
}
134134

135-
const shuffledQueues = this.#shuffleQueuesByEnv(snapshot);
135+
const envQueues = this.#shuffleQueuesByEnv(snapshot);
136136

137-
span.setAttribute("shuffled_queue_count", shuffledQueues.length);
137+
span.setAttribute(
138+
"shuffled_queue_count",
139+
envQueues.reduce((sum, env) => sum + env.queues.length, 0)
140+
);
138141

139-
if (shuffledQueues[0]) {
140-
span.setAttribute("winning_env", this.options.keys.envIdFromQueue(shuffledQueues[0]));
141-
span.setAttribute("winning_org", this.options.keys.orgIdFromQueue(shuffledQueues[0]));
142+
if (envQueues[0]?.queues[0]) {
143+
span.setAttribute("winning_env", envQueues[0].envId);
144+
span.setAttribute(
145+
"winning_org",
146+
this.options.keys.orgIdFromQueue(envQueues[0].queues[0])
147+
);
142148
}
143149

144-
return shuffledQueues;
150+
return envQueues;
145151
}
146152
);
147153
}
148154

149-
#shuffleQueuesByEnv(snapshot: FairQueueSnapshot): Array<string> {
155+
#shuffleQueuesByEnv(snapshot: FairQueueSnapshot): Array<EnvQueues> {
150156
const envs = Object.keys(snapshot.envs);
151157
const biases = this.options.biases ?? defaultBiases;
152158

@@ -212,7 +218,8 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
212218
}
213219

214220
// Helper method to maintain DRY principle
215-
#orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array<string> {
221+
// Update return type
222+
#orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array<EnvQueues> {
216223
const queuesByEnv = snapshot.queues.reduce((acc, queue) => {
217224
if (!acc[queue.env]) {
218225
acc[queue.env] = [];
@@ -221,15 +228,20 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
221228
return acc;
222229
}, {} as Record<string, Array<FairQueue>>);
223230

224-
const queues = envs.reduce((acc, envId) => {
231+
return envs.reduce((acc, envId) => {
225232
if (queuesByEnv[envId]) {
226-
// Instead of sorting by age, use weighted random selection
227-
acc.push(...this.#weightedRandomQueueOrder(queuesByEnv[envId]));
233+
// Get ordered queues for this env
234+
const orderedQueues = this.#weightedRandomQueueOrder(queuesByEnv[envId]);
235+
// Only add the env if it has queues
236+
if (orderedQueues.length > 0) {
237+
acc.push({
238+
envId,
239+
queues: orderedQueues.map((queue) => queue.id),
240+
});
241+
}
228242
}
229243
return acc;
230-
}, [] as Array<FairQueue>);
231-
232-
return queues.map((queue) => queue.id);
244+
}, [] as Array<EnvQueues>);
233245
}
234246

235247
#weightedRandomQueueOrder(queues: FairQueue[]): FairQueue[] {
@@ -558,7 +570,7 @@ export class NoopFairDequeuingStrategy implements MarQSFairDequeueStrategy {
558570
async distributeFairQueuesFromParentQueue(
559571
parentQueue: string,
560572
consumerId: string
561-
): Promise<Array<string>> {
573+
): Promise<Array<EnvQueues>> {
562574
return [];
563575
}
564576
}

0 commit comments

Comments
 (0)