Skip to content

Commit 2af62e2

Browse files
committed
run engine v1: orgs are no longer considered for concurrency
1 parent d6c869e commit 2af62e2

File tree

10 files changed

+94
-509
lines changed

10 files changed

+94
-509
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ const EnvironmentSchema = z.object({
357357
MARQS_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3),
358358
MARQS_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
359359
MARQS_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
360-
MARQS_MAXIMUM_ORG_COUNT: z.coerce.number().int().optional(),
360+
MARQS_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
361361

362362
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
363363

apps/webapp/app/routes/projects.v3.$projectRef.metrics/registerProjectMetrics.server.ts

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,50 +25,6 @@ export async function registerProjectMetrics(
2525
},
2626
});
2727

28-
const firstEnv = allEnvironments[0];
29-
30-
if (firstEnv) {
31-
new Gauge({
32-
name: sanitizeMetricName(`trigger_org_queue_concurrency`),
33-
help: `The number of tasks currently being executed in the org environment queue`,
34-
registers: [registry],
35-
async collect() {
36-
const length = await marqs?.currentConcurrencyOfOrg(firstEnv);
37-
38-
if (length) {
39-
this.set(length);
40-
}
41-
},
42-
});
43-
44-
new Gauge({
45-
name: sanitizeMetricName(`trigger_org_queue_concurrency_limit`),
46-
help: `The concurrency limit for the org queue`,
47-
registers: [registry],
48-
async collect() {
49-
const length = await marqs?.getOrgConcurrencyLimit(firstEnv);
50-
51-
if (length) {
52-
this.set(length);
53-
}
54-
},
55-
});
56-
57-
new Gauge({
58-
name: sanitizeMetricName(`trigger_org_queue_capacity`),
59-
help: "The capacity of the org queue",
60-
registers: [registry],
61-
async collect() {
62-
const concurrencyLimit = await marqs?.getOrgConcurrencyLimit(firstEnv);
63-
const currentConcurrency = await marqs?.currentConcurrencyOfOrg(firstEnv);
64-
65-
if (typeof concurrencyLimit === "number" && typeof currentConcurrency === "number") {
66-
this.set(concurrencyLimit - currentConcurrency);
67-
}
68-
},
69-
});
70-
}
71-
7228
for (const env of allEnvironments) {
7329
if (env.type === "DEVELOPMENT" && env.orgMember?.userId === userId) {
7430
await registerEnvironmentMetrics(env, registry);

apps/webapp/app/v3/marqs/concurrencyMonitor.server.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,16 @@ export class MarqsConcurrencyMonitor {
103103

104104
async #processKey(key: string, redis: Redis) {
105105
key = this.keys.stripKeyPrefix(key);
106-
const orgKey = this.keys.orgCurrentConcurrencyKeyFromQueue(key);
107106
const envKey = this.keys.envCurrentConcurrencyKeyFromQueue(key);
108107

109108
let runIds: string[] = [];
110109

111110
try {
112111
// Next, we need to get all the items from the key, and any parent keys (org, env, queue) using sunion.
113-
runIds = await redis.sunion(orgKey, envKey, key);
112+
runIds = await redis.sunion(envKey, key);
114113
} catch (e) {
115114
this._logger.error("[MarqsConcurrencyMonitor] error during sunion", {
116115
key,
117-
orgKey,
118116
envKey,
119117
runIds,
120118
error: e,
@@ -136,7 +134,6 @@ export class MarqsConcurrencyMonitor {
136134
if (completedRunIds.length === 0) {
137135
this._logger.debug("[MarqsConcurrencyMonitor] no completed runs found", {
138136
key,
139-
orgKey,
140137
envKey,
141138
runIds,
142139
durationMs,
@@ -147,7 +144,6 @@ export class MarqsConcurrencyMonitor {
147144

148145
this._logger.debug("[MarqsConcurrencyMonitor] removing completed runs from queue", {
149146
key,
150-
orgKey,
151147
envKey,
152148
completedRunIds,
153149
durationMs,
@@ -160,15 +156,13 @@ export class MarqsConcurrencyMonitor {
160156
const pipeline = redis.pipeline();
161157

162158
pipeline.srem(key, ...completedRunIds);
163-
pipeline.srem(orgKey, ...completedRunIds);
164159
pipeline.srem(envKey, ...completedRunIds);
165160

166161
try {
167162
await pipeline.exec();
168163
} catch (e) {
169164
this._logger.error("[MarqsConcurrencyMonitor] error removing completed runs from queue", {
170165
key,
171-
orgKey,
172166
envKey,
173167
completedRunIds,
174168
error: e,

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 37 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ export type FairDequeuingStrategyBiases = {
3333
export type FairDequeuingStrategyOptions = {
3434
redis: Redis;
3535
keys: MarQSKeyProducer;
36-
defaultOrgConcurrency: number;
3736
defaultEnvConcurrency: number;
3837
parentQueueLimit: number;
3938
tracer: Tracer;
@@ -44,7 +43,7 @@ export type FairDequeuingStrategyOptions = {
4443
*/
4544
biases?: FairDequeuingStrategyBiases;
4645
reuseSnapshotCount?: number;
47-
maximumOrgCount?: number;
46+
maximumEnvCount?: number;
4847
};
4948

5049
type FairQueueConcurrency = {
@@ -56,7 +55,6 @@ type FairQueue = { id: string; age: number; org: string; env: string };
5655

5756
type FairQueueSnapshot = {
5857
id: string;
59-
orgs: Record<string, { concurrency: FairQueueConcurrency }>;
6058
envs: Record<string, { concurrency: FairQueueConcurrency }>;
6159
queues: Array<FairQueue>;
6260
};
@@ -73,7 +71,6 @@ type WeightedQueue = {
7371

7472
const emptyFairQueueSnapshot: FairQueueSnapshot = {
7573
id: "empty",
76-
orgs: {},
7774
envs: {},
7875
queues: [],
7976
};
@@ -124,7 +121,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
124121
const snapshot = await this.#createQueueSnapshot(parentQueue, consumerId);
125122

126123
span.setAttributes({
127-
snapshot_org_count: Object.keys(snapshot.orgs).length,
128124
snapshot_env_count: Object.keys(snapshot.envs).length,
129125
snapshot_queue_count: snapshot.queues.length,
130126
});
@@ -344,69 +340,27 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
344340
return emptyFairQueueSnapshot;
345341
}
346342

347-
// Apply org selection if maximumOrgCount is specified
348-
let selectedOrgIds: Set<string>;
349-
if (this.options.maximumOrgCount && this.options.maximumOrgCount > 0) {
350-
selectedOrgIds = this.#selectTopOrgs(queues, this.options.maximumOrgCount);
351-
// Filter queues to only include selected orgs
352-
queues = queues.filter((queue) => selectedOrgIds.has(queue.org));
343+
// Apply env selection if maximumEnvCount is specified
344+
let selectedEnvIds: Set<string>;
345+
if (this.options.maximumEnvCount && this.options.maximumEnvCount > 0) {
346+
selectedEnvIds = this.#selectTopEnvs(queues, this.options.maximumEnvCount);
347+
// Filter queues to only include selected envs
348+
queues = queues.filter((queue) => selectedEnvIds.has(queue.env));
353349

354-
span.setAttribute("selected_org_count", selectedOrgIds.size);
350+
span.setAttribute("selected_env_count", selectedEnvIds.size);
355351
}
356352

357353
span.setAttribute("selected_queue_count", queues.length);
358354

359-
const orgIds = new Set<string>();
360355
const envIds = new Set<string>();
361-
const envIdToOrgId = new Map<string, string>();
362356

363357
for (const queue of queues) {
364-
orgIds.add(queue.org);
365358
envIds.add(queue.env);
366-
367-
envIdToOrgId.set(queue.env, queue.org);
368-
}
369-
370-
const orgs = await Promise.all(
371-
Array.from(orgIds).map(async (orgId) => {
372-
return { id: orgId, concurrency: await this.#getOrgConcurrency(orgId) };
373-
})
374-
);
375-
376-
const orgsAtFullConcurrency = orgs.filter(
377-
(org) => org.concurrency.current >= org.concurrency.limit
378-
);
379-
380-
const orgIdsAtFullConcurrency = new Set(orgsAtFullConcurrency.map((org) => org.id));
381-
382-
const orgsSnapshot = orgs.reduce((acc, org) => {
383-
if (!orgIdsAtFullConcurrency.has(org.id)) {
384-
acc[org.id] = org;
385-
}
386-
387-
return acc;
388-
}, {} as Record<string, { concurrency: FairQueueConcurrency }>);
389-
390-
span.setAttributes({
391-
org_count: orgs.length,
392-
orgs_at_full_concurrency_count: orgsAtFullConcurrency.length,
393-
orgs_snapshot_count: Object.keys(orgsSnapshot).length,
394-
});
395-
396-
if (Object.keys(orgsSnapshot).length === 0) {
397-
return emptyFairQueueSnapshot;
398359
}
399360

400-
const envsWithoutFullOrgs = Array.from(envIds).filter(
401-
(envId) => !orgIdsAtFullConcurrency.has(envIdToOrgId.get(envId)!)
402-
);
403-
404361
const envs = await Promise.all(
405-
envsWithoutFullOrgs.map(async (envId) => {
406-
return {
407-
id: envId,
408-
concurrency: await this.#getEnvConcurrency(envId, envIdToOrgId.get(envId)!),
409-
};
362+
Array.from(envIds).map(async (envId) => {
363+
return { id: envId, concurrency: await this.#getEnvConcurrency(envId) };
410364
})
411365
);
412366

@@ -420,7 +374,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
420374
if (!envIdsAtFullConcurrency.has(env.id)) {
421375
acc[env.id] = env;
422376
}
423-
424377
return acc;
425378
}, {} as Record<string, { concurrency: FairQueueConcurrency }>);
426379

@@ -429,14 +382,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
429382
envs_at_full_concurrency_count: envsAtFullConcurrency.length,
430383
});
431384

432-
const queuesSnapshot = queues.filter(
433-
(queue) =>
434-
!orgIdsAtFullConcurrency.has(queue.org) && !envIdsAtFullConcurrency.has(queue.env)
435-
);
385+
const queuesSnapshot = queues.filter((queue) => !envIdsAtFullConcurrency.has(queue.env));
436386

437387
const snapshot = {
438388
id: randomUUID(),
439-
orgs: orgsSnapshot,
440389
envs: envsSnapshot,
441390
queues: queuesSnapshot,
442391
};
@@ -455,71 +404,54 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
455404
});
456405
}
457406

458-
#selectTopOrgs(queues: FairQueue[], maximumOrgCount: number): Set<string> {
459-
// Group queues by org
460-
const queuesByOrg = queues.reduce((acc, queue) => {
461-
if (!acc[queue.org]) {
462-
acc[queue.org] = [];
407+
#selectTopEnvs(queues: FairQueue[], maximumEnvCount: number): Set<string> {
408+
// Group queues by env
409+
const queuesByEnv = queues.reduce((acc, queue) => {
410+
if (!acc[queue.env]) {
411+
acc[queue.env] = [];
463412
}
464-
acc[queue.org].push(queue);
413+
acc[queue.env].push(queue);
465414
return acc;
466415
}, {} as Record<string, FairQueue[]>);
467416

468-
// Calculate average age for each org
469-
const orgAverageAges = Object.entries(queuesByOrg).map(([orgId, orgQueues]) => {
470-
const averageAge = orgQueues.reduce((sum, q) => sum + q.age, 0) / orgQueues.length;
471-
return { orgId, averageAge };
417+
// Calculate average age for each env
418+
const envAverageAges = Object.entries(queuesByEnv).map(([envId, envQueues]) => {
419+
const averageAge = envQueues.reduce((sum, q) => sum + q.age, 0) / envQueues.length;
420+
return { envId, averageAge };
472421
});
473422

474423
// Perform weighted shuffle based on average ages
475-
const maxAge = Math.max(...orgAverageAges.map((o) => o.averageAge));
476-
const weightedOrgs = orgAverageAges.map((org) => ({
477-
orgId: org.orgId,
478-
weight: org.averageAge / maxAge, // Normalize weights
424+
const maxAge = Math.max(...envAverageAges.map((e) => e.averageAge));
425+
const weightedEnvs = envAverageAges.map((env) => ({
426+
envId: env.envId,
427+
weight: env.averageAge / maxAge, // Normalize weights
479428
}));
480429

481-
// Select top N orgs using weighted shuffle
482-
const selectedOrgs = new Set<string>();
483-
let remainingOrgs = [...weightedOrgs];
484-
let totalWeight = remainingOrgs.reduce((sum, org) => sum + org.weight, 0);
430+
// Select top N envs using weighted shuffle
431+
const selectedEnvs = new Set<string>();
432+
let remainingEnvs = [...weightedEnvs];
433+
let totalWeight = remainingEnvs.reduce((sum, env) => sum + env.weight, 0);
485434

486-
while (selectedOrgs.size < maximumOrgCount && remainingOrgs.length > 0) {
435+
while (selectedEnvs.size < maximumEnvCount && remainingEnvs.length > 0) {
487436
let random = this._rng() * totalWeight;
488437
let index = 0;
489438

490-
while (random > 0 && index < remainingOrgs.length) {
491-
random -= remainingOrgs[index].weight;
439+
while (random > 0 && index < remainingEnvs.length) {
440+
random -= remainingEnvs[index].weight;
492441
index++;
493442
}
494443
index = Math.max(0, index - 1);
495444

496-
selectedOrgs.add(remainingOrgs[index].orgId);
497-
totalWeight -= remainingOrgs[index].weight;
498-
remainingOrgs.splice(index, 1);
445+
selectedEnvs.add(remainingEnvs[index].envId);
446+
totalWeight -= remainingEnvs[index].weight;
447+
remainingEnvs.splice(index, 1);
499448
}
500449

501-
return selectedOrgs;
502-
}
503-
504-
async #getOrgConcurrency(orgId: string): Promise<FairQueueConcurrency> {
505-
return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => {
506-
span.setAttribute("org_id", orgId);
507-
508-
const [currentValue, limitValue] = await Promise.all([
509-
this.#getOrgCurrentConcurrency(orgId),
510-
this.#getOrgConcurrencyLimit(orgId),
511-
]);
512-
513-
span.setAttribute("current_value", currentValue);
514-
span.setAttribute("limit_value", limitValue);
515-
516-
return { current: currentValue, limit: limitValue };
517-
});
450+
return selectedEnvs;
518451
}
519452

520-
async #getEnvConcurrency(envId: string, orgId: string): Promise<FairQueueConcurrency> {
453+
async #getEnvConcurrency(envId: string): Promise<FairQueueConcurrency> {
521454
return await startSpan(this.options.tracer, "getEnvConcurrency", async (span) => {
522-
span.setAttribute("org_id", orgId);
523455
span.setAttribute("env_id", envId);
524456

525457
const [currentValue, limitValue] = await Promise.all([
@@ -570,40 +502,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
570502
});
571503
}
572504

573-
async #getOrgConcurrencyLimit(orgId: string) {
574-
return await startSpan(this.options.tracer, "getOrgConcurrencyLimit", async (span) => {
575-
span.setAttribute("org_id", orgId);
576-
577-
const key = this.options.keys.orgConcurrencyLimitKey(orgId);
578-
579-
const result = await this._cache.concurrencyLimit.swr(key, async () => {
580-
const value = await this.options.redis.get(key);
581-
582-
if (!value) {
583-
return this.options.defaultOrgConcurrency;
584-
}
585-
586-
return Number(value);
587-
});
588-
589-
return result.val ?? this.options.defaultOrgConcurrency;
590-
});
591-
}
592-
593-
async #getOrgCurrentConcurrency(orgId: string) {
594-
return await startSpan(this.options.tracer, "getOrgCurrentConcurrency", async (span) => {
595-
span.setAttribute("org_id", orgId);
596-
597-
const key = this.options.keys.orgCurrentConcurrencyKey(orgId);
598-
599-
const result = await this.options.redis.scard(key);
600-
601-
span.setAttribute("current_value", result);
602-
603-
return result;
604-
});
605-
}
606-
607505
async #getEnvConcurrencyLimit(envId: string) {
608506
return await startSpan(this.options.tracer, "getEnvConcurrencyLimit", async (span) => {
609507
span.setAttribute("env_id", envId);

0 commit comments

Comments
 (0)