Skip to content

Commit 5d6be21

Browse files
committed
feat(rate-limit): added infraestructure for rate-limit logic
1 parent ff80742 commit 5d6be21

File tree

23 files changed

+815
-14
lines changed

23 files changed

+815
-14
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,12 @@ const EnvironmentSchema = z
759759
RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
760760
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),
761761

762+
/**
763+
* Disable queue rate limiting (useful for development and testing).
764+
* When set to "1", rate limit checks on queues will be bypassed.
765+
*/
766+
TRIGGER_DISABLE_QUEUE_RATE_LIMITS: z.string().default("0"),
767+
762768
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
763769
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
764770
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ export class RunEngineTriggerTaskService {
299299
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
300300
cliVersion: lockedToBackgroundWorker?.cliVersion,
301301
concurrencyKey: body.options?.concurrencyKey,
302+
rateLimitKey: body.options?.rateLimitKey,
302303
queue: queueName,
303304
lockedQueueId,
304305
workerQueue,

apps/webapp/app/v3/runQueue.server.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,21 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
22
import { marqs } from "./marqs/index.server";
33
import { engine } from "./runEngine.server";
44

5+
// Re-export pure utility function from durations.ts (testable without env deps)
6+
export { parseDurationToMs } from "./utils/durations";
7+
58
//This allows us to update MARQS and the RunQueue
69

10+
/** Rate limit configuration for a queue */
11+
export type QueueRateLimitConfig = {
12+
/** Maximum number of requests allowed in the period */
13+
limit: number;
14+
/** Time window in milliseconds */
15+
periodMs: number;
16+
/** Optional burst allowance (defaults to limit) */
17+
burst?: number;
18+
};
19+
720
/** Updates MARQS and the RunQueue limits */
821
export async function updateEnvConcurrencyLimits(
922
environment: AuthenticatedEnvironment,
@@ -42,3 +55,20 @@ export async function removeQueueConcurrencyLimits(
4255
engine.runQueue.removeQueueConcurrencyLimits(environment, queueName),
4356
]);
4457
}
58+
59+
/** Updates the rate limit configuration for a queue in Redis */
60+
export async function updateQueueRateLimitConfig(
61+
environment: AuthenticatedEnvironment,
62+
queueName: string,
63+
config: QueueRateLimitConfig
64+
) {
65+
await engine.runQueue.setQueueRateLimitConfig(environment, queueName, config);
66+
}
67+
68+
/** Removes the rate limit configuration for a queue from Redis */
69+
export async function removeQueueRateLimitConfig(
70+
environment: AuthenticatedEnvironment,
71+
queueName: string
72+
) {
73+
await engine.runQueue.removeQueueRateLimitConfig(environment, queueName);
74+
}

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@ import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
99
import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database";
1010
import cronstrue from "cronstrue";
1111
import { Prisma, PrismaClientOrTransaction } from "~/db.server";
12+
import { env } from "~/env.server";
1213
import { sanitizeQueueName } from "~/models/taskQueue.server";
1314
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1415
import { logger } from "~/services/logger.server";
1516
import { generateFriendlyId } from "../friendlyIdentifiers";
1617
import {
18+
parseDurationToMs,
1719
removeQueueConcurrencyLimits,
20+
removeQueueRateLimitConfig,
1821
updateEnvConcurrencyLimits,
1922
updateQueueConcurrencyLimits,
23+
updateQueueRateLimitConfig,
24+
type QueueRateLimitConfig,
2025
} from "../runQueue.server";
2126
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
2227
import { clampMaxDuration } from "../utils/maxDuration";
@@ -250,11 +255,12 @@ async function createWorkerTask(
250255
let queue = queues.find((queue) => queue.name === task.queue?.name);
251256

252257
if (!queue) {
253-
// Create a TaskQueue
258+
// Create a TaskQueue with rate limit config if provided
254259
queue = await createWorkerQueue(
255260
{
256261
name: task.queue?.name ?? `task/${task.id}`,
257262
concurrencyLimit: task.queue?.concurrencyLimit,
263+
rateLimit: task.queue?.rateLimit,
258264
},
259265
task.id,
260266
task.queue?.name ? "NAMED" : "VIRTUAL",
@@ -364,9 +370,28 @@ async function createWorkerQueue(
364370
? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0)
365371
: queue.concurrencyLimit;
366372

373+
// Parse rate limit config if provided
374+
let rateLimitConfig: QueueRateLimitConfig | null = null;
375+
if (queue.rateLimit) {
376+
try {
377+
rateLimitConfig = {
378+
limit: queue.rateLimit.limit,
379+
periodMs: parseDurationToMs(queue.rateLimit.period),
380+
burst: queue.rateLimit.burst,
381+
};
382+
} catch (error) {
383+
logger.error("createWorkerQueue: invalid rate limit period format", {
384+
queueName,
385+
rateLimit: queue.rateLimit,
386+
error,
387+
});
388+
}
389+
}
390+
367391
const taskQueue = await upsertWorkerQueueRecord(
368392
queueName,
369393
baseConcurrencyLimit ?? null,
394+
rateLimitConfig,
370395
orderableName,
371396
queueType,
372397
worker,
@@ -376,6 +401,7 @@ async function createWorkerQueue(
376401
const newConcurrencyLimit = taskQueue.concurrencyLimit;
377402

378403
if (!taskQueue.paused) {
404+
// Handle concurrency limit sync
379405
if (typeof newConcurrencyLimit === "number") {
380406
logger.debug("createWorkerQueue: updating concurrency limit", {
381407
workerId: worker.id,
@@ -397,8 +423,36 @@ async function createWorkerQueue(
397423
});
398424
await removeQueueConcurrencyLimits(environment, taskQueue.name);
399425
}
426+
427+
// Handle rate limit config sync to Redis
428+
if (env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1") {
429+
// Rate limiting disabled: remove any existing config from Redis
430+
// This ensures clean state when toggling the flag
431+
logger.debug("createWorkerQueue: rate limiting disabled by env flag, removing config", {
432+
workerId: worker.id,
433+
taskQueue: taskQueue.name,
434+
orgId: environment.organizationId,
435+
projectId: environment.projectId,
436+
environmentId: environment.id,
437+
});
438+
await removeQueueRateLimitConfig(environment, taskQueue.name);
439+
} else if (rateLimitConfig) {
440+
// Rate limiting enabled and config exists: sync to Redis
441+
logger.debug("createWorkerQueue: updating rate limit config", {
442+
workerId: worker.id,
443+
taskQueue: taskQueue.name,
444+
orgId: environment.organizationId,
445+
projectId: environment.projectId,
446+
environmentId: environment.id,
447+
rateLimitConfig,
448+
});
449+
await updateQueueRateLimitConfig(environment, taskQueue.name, rateLimitConfig);
450+
} else {
451+
// Rate limiting enabled but no config: remove any stale config
452+
await removeQueueRateLimitConfig(environment, taskQueue.name);
453+
}
400454
} else {
401-
logger.debug("createWorkerQueue: queue is paused, not updating concurrency limit", {
455+
logger.debug("createWorkerQueue: queue is paused, not updating limits", {
402456
workerId: worker.id,
403457
taskQueue,
404458
orgId: environment.organizationId,
@@ -413,6 +467,7 @@ async function createWorkerQueue(
413467
async function upsertWorkerQueueRecord(
414468
queueName: string,
415469
concurrencyLimit: number | null,
470+
rateLimitConfig: QueueRateLimitConfig | null,
416471
orderableName: string,
417472
queueType: TaskQueueType,
418473
worker: BackgroundWorker,
@@ -431,6 +486,15 @@ async function upsertWorkerQueueRecord(
431486
},
432487
});
433488

489+
// Serialize rate limit config for storage (or null to clear)
490+
const rateLimitData = rateLimitConfig
491+
? {
492+
limit: rateLimitConfig.limit,
493+
periodMs: rateLimitConfig.periodMs,
494+
burst: rateLimitConfig.burst,
495+
}
496+
: Prisma.JsonNull;
497+
434498
if (!taskQueue) {
435499
taskQueue = await prisma.taskQueue.create({
436500
data: {
@@ -439,6 +503,7 @@ async function upsertWorkerQueueRecord(
439503
name: queueName,
440504
orderableName,
441505
concurrencyLimit,
506+
rateLimit: rateLimitData,
442507
runtimeEnvironmentId: worker.runtimeEnvironmentId,
443508
projectId: worker.projectId,
444509
type: queueType,
@@ -463,6 +528,8 @@ async function upsertWorkerQueueRecord(
463528
// If overridden, keep current limit and update base; otherwise update limit normally
464529
concurrencyLimit: hasOverride ? undefined : concurrencyLimit,
465530
concurrencyLimitBase: hasOverride ? concurrencyLimit : undefined,
531+
// Always update rate limit config (not overrideable for now)
532+
rateLimit: rateLimitData,
466533
},
467534
});
468535
}
@@ -474,6 +541,7 @@ async function upsertWorkerQueueRecord(
474541
return await upsertWorkerQueueRecord(
475542
queueName,
476543
concurrencyLimit,
544+
rateLimitConfig,
477545
orderableName,
478546
queueType,
479547
worker,

apps/webapp/app/v3/services/enqueueRun.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export type EnqueueRunOptions = {
77
env: AuthenticatedEnvironment;
88
run: TaskRun;
99
dependentRun?: { queue: string; id: string };
10+
rateLimitKey?: string;
1011
};
1112

1213
export type EnqueueRunResult =
@@ -22,6 +23,7 @@ export async function enqueueRun({
2223
env,
2324
run,
2425
dependentRun,
26+
rateLimitKey,
2527
}: EnqueueRunOptions): Promise<EnqueueRunResult> {
2628
// If this is a triggerAndWait or batchTriggerAndWait,
2729
// we need to add the parent run to the reserve concurrency set
@@ -39,6 +41,8 @@ export async function enqueueRun({
3941
projectId: env.projectId,
4042
environmentId: env.id,
4143
environmentType: env.type,
44+
// Include rateLimitKey in message payload for dequeue-time checks
45+
rateLimitKey,
4246
},
4347
run.concurrencyKey ?? undefined,
4448
run.queueTimestamp ?? undefined,
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds.
3+
* @throws Error if the duration string is invalid
4+
*/
5+
export function parseDurationToMs(duration: string): number {
6+
const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/);
7+
8+
if (!match) {
9+
throw new Error(
10+
`Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)`
11+
);
12+
}
13+
14+
const [, value, unit] = match;
15+
const numValue = parseFloat(value);
16+
17+
switch (unit) {
18+
case "ms":
19+
return Math.round(numValue);
20+
case "s":
21+
return Math.round(numValue * 1000);
22+
case "m":
23+
return Math.round(numValue * 60 * 1000);
24+
case "h":
25+
return Math.round(numValue * 60 * 60 * 1000);
26+
case "d":
27+
return Math.round(numValue * 24 * 60 * 60 * 1000);
28+
default:
29+
throw new Error(`Unknown duration unit: ${unit}`);
30+
}
31+
}
32+
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { describe, it, expect } from "vitest";
2+
import { parseDurationToMs } from "~/v3/utils/durations";
3+
4+
describe("parseDurationToMs", () => {
5+
it("parses milliseconds", () => {
6+
expect(parseDurationToMs("100ms")).toBe(100);
7+
expect(parseDurationToMs("1500ms")).toBe(1500);
8+
expect(parseDurationToMs("0ms")).toBe(0);
9+
});
10+
11+
it("parses seconds", () => {
12+
expect(parseDurationToMs("1s")).toBe(1000);
13+
expect(parseDurationToMs("30s")).toBe(30000);
14+
expect(parseDurationToMs("1.5s")).toBe(1500);
15+
expect(parseDurationToMs("0.5s")).toBe(500);
16+
});
17+
18+
it("parses minutes", () => {
19+
expect(parseDurationToMs("1m")).toBe(60000);
20+
expect(parseDurationToMs("5m")).toBe(300000);
21+
expect(parseDurationToMs("0.5m")).toBe(30000);
22+
});
23+
24+
it("parses hours", () => {
25+
expect(parseDurationToMs("1h")).toBe(3600000);
26+
expect(parseDurationToMs("24h")).toBe(86400000);
27+
expect(parseDurationToMs("0.5h")).toBe(1800000);
28+
});
29+
30+
it("parses days", () => {
31+
expect(parseDurationToMs("1d")).toBe(86400000);
32+
expect(parseDurationToMs("7d")).toBe(604800000);
33+
});
34+
35+
it("throws on invalid format", () => {
36+
expect(() => parseDurationToMs("invalid")).toThrow();
37+
expect(() => parseDurationToMs("1x")).toThrow();
38+
expect(() => parseDurationToMs("")).toThrow();
39+
expect(() => parseDurationToMs("ms")).toThrow();
40+
expect(() => parseDurationToMs("10")).toThrow();
41+
});
42+
43+
it("throws on negative values (invalid regex)", () => {
44+
expect(() => parseDurationToMs("-1s")).toThrow();
45+
});
46+
});
47+

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ model TaskRun {
656656
priorityMs Int @default(0)
657657
658658
concurrencyKey String?
659+
rateLimitKey String?
659660
660661
delayUntil DateTime?
661662
queuedAt DateTime?

internal-packages/run-engine/src/engine/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ export class RunEngine {
395395
sdkVersion,
396396
cliVersion,
397397
concurrencyKey,
398+
rateLimitKey,
398399
workerQueue,
399400
queue,
400401
lockedQueueId,
@@ -471,6 +472,7 @@ export class RunEngine {
471472
sdkVersion,
472473
cliVersion,
473474
concurrencyKey,
475+
rateLimitKey,
474476
queue,
475477
lockedQueueId,
476478
workerQueue,
@@ -578,6 +580,7 @@ export class RunEngine {
578580

579581
if (taskRun.delayUntil) {
580582
// Schedule the run to be enqueued at the delayUntil time
583+
// Note: rateLimitKey is not passed for delayed runs - it will need to be stored on the run if needed
581584
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
582585
runId: taskRun.id,
583586
delayUntil: taskRun.delayUntil,
@@ -594,6 +597,7 @@ export class RunEngine {
594597
runnerId,
595598
tx: prisma,
596599
skipRunLock: true,
600+
rateLimitKey,
597601
});
598602
}
599603

0 commit comments

Comments
 (0)