Skip to content

Commit ac8fa75

Browse files
committed
Add reserve concurrency concept to allow waiting to resume parent tasks to release concurrency at the env level for child tasks to use (or else there is a deadlock). WIP recursive tasks
1 parent 511a97c commit ac8fa75

File tree

8 files changed

+248
-90
lines changed

8 files changed

+248
-90
lines changed

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export type FairDequeuingStrategyOptions = {
4949
type FairQueueConcurrency = {
5050
current: number;
5151
limit: number;
52+
reserve: number;
5253
};
5354

5455
type FairQueue = { id: string; age: number; org: string; env: string };
@@ -365,7 +366,7 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
365366
);
366367

367368
const envsAtFullConcurrency = envs.filter(
368-
(env) => env.concurrency.current >= env.concurrency.limit
369+
(env) => env.concurrency.current >= env.concurrency.limit + env.concurrency.reserve
369370
);
370371

371372
const envIdsAtFullConcurrency = new Set(envsAtFullConcurrency.map((env) => env.id));
@@ -454,15 +455,17 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
454455
return await startSpan(this.options.tracer, "getEnvConcurrency", async (span) => {
455456
span.setAttribute("env_id", envId);
456457

457-
const [currentValue, limitValue] = await Promise.all([
458+
const [currentValue, limitValue, reserveValue] = await Promise.all([
458459
this.#getEnvCurrentConcurrency(envId),
459460
this.#getEnvConcurrencyLimit(envId),
461+
this.#getEnvReserveConcurrency(envId),
460462
]);
461463

462464
span.setAttribute("current_value", currentValue);
463465
span.setAttribute("limit_value", limitValue);
466+
span.setAttribute("reserve_value", reserveValue);
464467

465-
return { current: currentValue, limit: limitValue };
468+
return { current: currentValue, limit: limitValue, reserve: reserveValue };
466469
});
467470
}
468471

@@ -535,6 +538,20 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
535538
return result;
536539
});
537540
}
541+
542+
async #getEnvReserveConcurrency(envId: string) {
543+
return await startSpan(this.options.tracer, "getEnvReserveConcurrency", async (span) => {
544+
span.setAttribute("env_id", envId);
545+
546+
const key = this.options.keys.envReserveConcurrencyKey(envId);
547+
548+
const result = await this.options.redis.scard(key);
549+
550+
span.setAttribute("current_value", result);
551+
552+
return result;
553+
});
554+
}
538555
}
539556

540557
export class NoopFairDequeuingStrategy implements MarQSFairDequeueStrategy {

0 commit comments

Comments
 (0)