Skip to content

Commit fdaa397

Browse files
committed
feat(rate-limit): gcra and queues logic
1 parent 5d6be21 commit fdaa397

File tree

9 files changed

+778
-18
lines changed

9 files changed

+778
-18
lines changed

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ function createRunEngine() {
8080
scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS,
8181
processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS,
8282
},
83+
disableRateLimits: env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1",
8384
},
8485
runLock: {
8586
redis: {

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ async function createWorkerTask(
255255
let queue = queues.find((queue) => queue.name === task.queue?.name);
256256

257257
if (!queue) {
258-
// Create a TaskQueue with rate limit config if provided
258+
// Create a TaskQueue
259259
queue = await createWorkerQueue(
260260
{
261261
name: task.queue?.name ?? `task/${task.id}`,
@@ -370,7 +370,6 @@ async function createWorkerQueue(
370370
? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0)
371371
: queue.concurrencyLimit;
372372

373-
// Parse rate limit config if provided
374373
let rateLimitConfig: QueueRateLimitConfig | null = null;
375374
if (queue.rateLimit) {
376375
try {
@@ -401,7 +400,6 @@ async function createWorkerQueue(
401400
const newConcurrencyLimit = taskQueue.concurrencyLimit;
402401

403402
if (!taskQueue.paused) {
404-
// Handle concurrency limit sync
405403
if (typeof newConcurrencyLimit === "number") {
406404
logger.debug("createWorkerQueue: updating concurrency limit", {
407405
workerId: worker.id,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ export class RunEngine {
179179
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
180180
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
181181
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
182+
disableRateLimits: options.queue?.disableRateLimits,
182183
meter: options.meter,
183184
});
184185

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export type RunEngineOptions = {
6363
scanJitterInMs?: number;
6464
processMarkedJitterInMs?: number;
6565
};
66+
disableRateLimits?: boolean;
6667
};
6768
runLock: {
6869
redis: RedisOptions;

internal-packages/run-engine/src/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,13 @@ export type {
2020
ProcessBatchItemCallback,
2121
BatchCompletionCallback,
2222
} from "./batch-queue/types.js";
23+
24+
// Rate limiter exports
25+
export { GCRARateLimiter, configToGCRAParams, parseDurationToMs } from "./rate-limiter/index.js";
26+
export type {
27+
GCRARateLimiterOptions,
28+
GCRAParams,
29+
QueueRateLimitConfig,
30+
StoredQueueRateLimitConfig,
31+
RateLimitResult,
32+
} from "./rate-limiter/index.js";
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
import type { Redis } from "@internal/redis";
2+
3+
/**
4+
* Configuration for queue rate limiting (input format).
5+
*/
6+
export interface QueueRateLimitConfig {
7+
/** Maximum number of requests allowed within the period */
8+
limit: number;
9+
/** Time period in milliseconds */
10+
periodMs: number;
11+
/** Optional burst capacity (defaults to 1) */
12+
burst?: number;
13+
}
14+
15+
/**
16+
* Stored configuration for queue rate limiting (includes pre-calculated GCRA params).
17+
* This is what gets stored in Redis and read by the Lua dequeue script.
18+
*/
19+
export interface StoredQueueRateLimitConfig extends QueueRateLimitConfig, GCRAParams {}
20+
21+
/**
22+
* GCRA parameters calculated from QueueRateLimitConfig.
23+
* These are stored in Redis for use by the Lua dequeue script.
24+
*/
25+
export interface GCRAParams {
26+
/** The minimum interval between requests in milliseconds */
27+
emissionInterval: number;
28+
/** The burst tolerance in milliseconds */
29+
burstTolerance: number;
30+
/** Key expiration in milliseconds */
31+
keyExpiration: number;
32+
}
33+
34+
/**
35+
* Options for configuring the RateLimiter.
36+
*/
37+
export interface GCRARateLimiterOptions {
38+
/** An instance of Redis. */
39+
redis: Redis;
40+
/**
41+
* A string prefix to namespace keys in Redis.
42+
* Defaults to "ratelimit:".
43+
*/
44+
keyPrefix?: string;
45+
/**
46+
* The minimum interval between requests (the emission interval) in milliseconds.
47+
* For example, 1000 ms for one request per second.
48+
*/
49+
emissionInterval: number;
50+
/**
51+
* The burst tolerance in milliseconds. This represents how much "credit" can be
52+
* accumulated to allow short bursts beyond the average rate.
53+
* For example, if you want to allow 3 requests in a burst with an emission interval of 1000 ms,
54+
* you might set this to 3000.
55+
*/
56+
burstTolerance: number;
57+
/**
58+
* Expiration for the Redis key in milliseconds.
59+
* Defaults to the larger of 60 seconds or (emissionInterval + burstTolerance).
60+
*/
61+
keyExpiration?: number;
62+
}
63+
64+
/**
65+
* The result of a rate limit check.
66+
*/
67+
export interface RateLimitResult {
68+
/** Whether the request is allowed. */
69+
allowed: boolean;
70+
/**
71+
* If not allowed, this is the number of milliseconds the caller should wait
72+
* before retrying.
73+
*/
74+
retryAfter?: number;
75+
}
76+
77+
/**
78+
* Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds.
79+
* @throws Error if the duration string is invalid
80+
*/
81+
export function parseDurationToMs(duration: string): number {
82+
const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/);
83+
84+
if (!match) {
85+
throw new Error(
86+
`Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)`
87+
);
88+
}
89+
90+
const [, value, unit] = match;
91+
const numValue = parseFloat(value);
92+
93+
switch (unit) {
94+
case "ms":
95+
return Math.round(numValue);
96+
case "s":
97+
return Math.round(numValue * 1000);
98+
case "m":
99+
return Math.round(numValue * 60 * 1000);
100+
case "h":
101+
return Math.round(numValue * 60 * 60 * 1000);
102+
case "d":
103+
return Math.round(numValue * 24 * 60 * 60 * 1000);
104+
default:
105+
throw new Error(`Unknown duration unit: ${unit}`);
106+
}
107+
}
108+
109+
/**
110+
* Convert a QueueRateLimitConfig to GCRA parameters.
111+
*
112+
* @example
113+
* // 10 requests per minute with burst of 3
114+
* configToGCRAParams({ limit: 10, periodMs: 60000, burst: 3 })
115+
* // => { emissionInterval: 6000, burstTolerance: 12000, keyExpiration: 60000 }
116+
*/
117+
export function configToGCRAParams(config: QueueRateLimitConfig): GCRAParams {
118+
const emissionInterval = Math.ceil(config.periodMs / config.limit);
119+
const burst = config.burst ?? 1;
120+
// burst-1 because GCRA allows 1 request immediately, then burst-1 more within tolerance
121+
const burstTolerance = (burst - 1) * emissionInterval;
122+
const keyExpiration = Math.max(60_000, emissionInterval + burstTolerance);
123+
return { emissionInterval, burstTolerance, keyExpiration };
124+
}
125+
126+
/**
127+
* A rate limiter using Redis and the Generic Cell Rate Algorithm (GCRA).
128+
*
129+
* The GCRA is implemented using a Lua script that runs atomically in Redis.
130+
*
131+
* When a request comes in, the algorithm:
132+
* - Retrieves the current "Theoretical Arrival Time" (TAT) from Redis (or initializes it if missing).
133+
* - If the current time is greater than or equal to the TAT, the request is allowed and the TAT is updated to now + emissionInterval.
134+
* - Otherwise, if the current time plus the burst tolerance is at least the TAT, the request is allowed and the TAT is incremented.
135+
* - If neither condition is met, the request is rejected and a Retry-After value is returned.
136+
*/
137+
export class GCRARateLimiter {
138+
private redis: Redis;
139+
private keyPrefix: string;
140+
private emissionInterval: number;
141+
private burstTolerance: number;
142+
private keyExpiration: number;
143+
144+
constructor(options: GCRARateLimiterOptions) {
145+
this.redis = options.redis;
146+
this.keyPrefix = options.keyPrefix || "gcra:ratelimit:";
147+
this.emissionInterval = options.emissionInterval;
148+
this.burstTolerance = options.burstTolerance;
149+
// Default expiration: at least 60 seconds or the sum of emissionInterval and burstTolerance
150+
this.keyExpiration =
151+
options.keyExpiration || Math.max(60_000, this.emissionInterval + this.burstTolerance);
152+
153+
// Define a custom Redis command 'gcra' that implements the GCRA algorithm.
154+
// Using defineCommand ensures the Lua script is loaded once and run atomically.
155+
this.redis.defineCommand("gcra", {
156+
numberOfKeys: 1,
157+
lua: `
158+
--[[
159+
GCRA Lua script
160+
KEYS[1] - The rate limit key (e.g. "ratelimit:<identifier>")
161+
ARGV[1] - Current time in ms (number)
162+
ARGV[2] - Emission interval in ms (number)
163+
ARGV[3] - Burst tolerance in ms (number)
164+
ARGV[4] - Key expiration in ms (number)
165+
166+
Returns: { allowedFlag, value }
167+
allowedFlag: 1 if allowed, 0 if rate-limited.
168+
value: 0 when allowed; if not allowed, the number of ms to wait.
169+
]]--
170+
171+
local key = KEYS[1]
172+
local now = tonumber(ARGV[1])
173+
local emission_interval = tonumber(ARGV[2])
174+
local burst_tolerance = tonumber(ARGV[3])
175+
local expire = tonumber(ARGV[4])
176+
177+
-- Get the stored Theoretical Arrival Time (TAT) or default to 0.
178+
local tat = tonumber(redis.call("GET", key) or 0)
179+
if tat == 0 then
180+
tat = now
181+
end
182+
183+
local allowed, new_tat, retry_after
184+
185+
if now >= tat then
186+
-- No delay: request is on schedule.
187+
new_tat = now + emission_interval
188+
allowed = true
189+
elseif (now + burst_tolerance) >= tat then
190+
-- Within burst capacity: allow request.
191+
new_tat = tat + emission_interval
192+
allowed = true
193+
else
194+
-- Request exceeds the allowed burst; calculate wait time.
195+
allowed = false
196+
retry_after = tat - (now + burst_tolerance)
197+
end
198+
199+
if allowed then
200+
redis.call("SET", key, new_tat, "PX", expire)
201+
return {1, 0}
202+
else
203+
return {0, retry_after}
204+
end
205+
`,
206+
});
207+
}
208+
209+
/**
210+
* Checks whether a request associated with the given identifier is allowed.
211+
*
212+
* @param identifier A unique string identifying the subject of rate limiting (e.g. user ID, IP address, or domain).
213+
* @returns A promise that resolves to a RateLimitResult.
214+
*
215+
* @example
216+
* const result = await rateLimiter.check('user:12345');
217+
* if (!result.allowed) {
218+
* // Tell the client to retry after result.retryAfter milliseconds.
219+
* }
220+
*/
221+
async check(identifier: string): Promise<RateLimitResult> {
222+
const key = `${this.keyPrefix}${identifier}`;
223+
const now = Date.now();
224+
225+
try {
226+
// Call the custom 'gcra' command.
227+
// The script returns an array: [allowedFlag, value]
228+
// - allowedFlag: 1 if allowed; 0 if rejected.
229+
// - value: 0 when allowed; if rejected, the number of ms to wait before retrying.
230+
const result: [number, number] = await (this.redis as any).gcra(
231+
key,
232+
now,
233+
this.emissionInterval,
234+
this.burstTolerance,
235+
this.keyExpiration
236+
);
237+
const allowed = result[0] === 1;
238+
if (allowed) {
239+
return { allowed: true };
240+
} else {
241+
return { allowed: false, retryAfter: result[1] };
242+
}
243+
} catch (error) {
244+
// In a production system you might log the error and either
245+
// allow the request (fail open) or deny it (fail closed).
246+
// Here we choose to propagate the error.
247+
throw error;
248+
}
249+
}
250+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
export {
2+
GCRARateLimiter,
3+
configToGCRAParams,
4+
parseDurationToMs,
5+
type GCRARateLimiterOptions,
6+
type GCRAParams,
7+
type QueueRateLimitConfig,
8+
type StoredQueueRateLimitConfig,
9+
type RateLimitResult,
10+
} from "./gcra.js";

0 commit comments

Comments
 (0)