Skip to content

Commit 972ef0f

Browse files
committed
Added org and global concurrency limits
1 parent 6e3313f commit 972ef0f

File tree

5 files changed

+281
-58
lines changed

5 files changed

+281
-58
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,6 +1182,10 @@ const EnvironmentSchema = z
11821182
QUERY_CLICKHOUSE_MAX_EXPANDED_AST_ELEMENTS: z.coerce.number().int().default(4_000_000),
11831183
QUERY_CLICKHOUSE_MAX_BYTES_BEFORE_EXTERNAL_GROUP_BY: z.coerce.number().int().default(0),
11841184

1185+
// Query page concurrency limits
1186+
QUERY_DEFAULT_ORG_CONCURRENCY_LIMIT: z.coerce.number().int().default(3),
1187+
QUERY_GLOBAL_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
1188+
11851189
EVENTS_CLICKHOUSE_URL: z
11861190
.string()
11871191
.optional()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { env } from "~/env.server";
2+
import { singleton } from "~/utils/singleton";
3+
import { RedisConcurrencyLimiter } from "./redisConcurrencyLimiter.server";
4+
5+
function initializeQueryConcurrencyLimiter() {
6+
return new RedisConcurrencyLimiter({
7+
keyPrefix: "query:concurrency",
8+
redis: {
9+
port: env.RATE_LIMIT_REDIS_PORT,
10+
host: env.RATE_LIMIT_REDIS_HOST,
11+
username: env.RATE_LIMIT_REDIS_USERNAME,
12+
password: env.RATE_LIMIT_REDIS_PASSWORD,
13+
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
14+
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
15+
},
16+
});
17+
}
18+
19+
export const queryConcurrencyLimiter = singleton(
20+
"queryConcurrencyLimiter",
21+
initializeQueryConcurrencyLimiter
22+
);
23+
24+
/** Default per-org concurrency limit from environment */
25+
export const DEFAULT_ORG_CONCURRENCY_LIMIT = env.QUERY_DEFAULT_ORG_CONCURRENCY_LIMIT;
26+
27+
/** Global concurrency limit from environment */
28+
export const GLOBAL_CONCURRENCY_LIMIT = env.QUERY_GLOBAL_CONCURRENCY_LIMIT;
29+
Lines changed: 102 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
executeTSQL,
3+
QueryError,
34
type ClickHouseSettings,
45
type ExecuteTSQLOptions,
56
type FieldMappings,
@@ -11,6 +12,11 @@ import { type z } from "zod";
1112
import { prisma } from "~/db.server";
1213
import { env } from "~/env.server";
1314
import { clickhouseClient } from "./clickhouseInstance.server";
15+
import {
16+
queryConcurrencyLimiter,
17+
DEFAULT_ORG_CONCURRENCY_LIMIT,
18+
GLOBAL_CONCURRENCY_LIMIT,
19+
} from "./queryConcurrencyLimiter.server";
1420

1521
export type { TableSchema, TSQLQueryResult };
1622

@@ -69,6 +75,8 @@ export type ExecuteQueryOptions<TOut extends z.ZodSchema> = Omit<
6975
/** User ID (optional, null for API calls) */
7076
userId?: string | null;
7177
};
78+
/** Custom per-org concurrency limit (overrides default) */
79+
customOrgConcurrencyLimit?: number;
7280
};
7381

7482
/**
@@ -78,71 +86,107 @@ export type ExecuteQueryOptions<TOut extends z.ZodSchema> = Omit<
7886
export async function executeQuery<TOut extends z.ZodSchema>(
7987
options: ExecuteQueryOptions<TOut>
8088
): Promise<TSQLQueryResult<z.output<TOut>>> {
81-
const { scope, organizationId, projectId, environmentId, history, ...baseOptions } = options;
82-
83-
// Build tenant IDs based on scope
84-
const tenantOptions: {
85-
organizationId: string;
86-
projectId?: string;
87-
environmentId?: string;
88-
} = {
89+
const {
90+
scope,
8991
organizationId,
90-
};
91-
92-
if (scope === "project" || scope === "environment") {
93-
tenantOptions.projectId = projectId;
94-
}
95-
96-
if (scope === "environment") {
97-
tenantOptions.environmentId = environmentId;
98-
}
99-
100-
// Build field mappings for project_ref → project_id and environment_id → slug translation
101-
const projects = await prisma.project.findMany({
102-
where: { organizationId },
103-
select: { id: true, externalRef: true },
92+
projectId,
93+
environmentId,
94+
history,
95+
customOrgConcurrencyLimit,
96+
...baseOptions
97+
} = options;
98+
99+
// Generate unique request ID for concurrency tracking
100+
const requestId = crypto.randomUUID();
101+
const orgLimit = customOrgConcurrencyLimit ?? DEFAULT_ORG_CONCURRENCY_LIMIT;
102+
103+
// Acquire concurrency slot
104+
const acquireResult = await queryConcurrencyLimiter.acquire({
105+
key: organizationId,
106+
requestId,
107+
keyLimit: orgLimit,
108+
globalLimit: GLOBAL_CONCURRENCY_LIMIT,
104109
});
105110

106-
const environments = await prisma.runtimeEnvironment.findMany({
107-
where: { project: { organizationId } },
108-
select: { id: true, slug: true },
109-
});
111+
if (!acquireResult.success) {
112+
const errorMessage =
113+
acquireResult.reason === "key_limit"
114+
? `You've exceeded your query concurrency of ${orgLimit} for this organization. Please try again later.`
115+
: "We're experiencing a lot of queries at the moment. Please try again later.";
116+
return [new QueryError(errorMessage, { query: options.query }), null];
117+
}
110118

111-
const fieldMappings: FieldMappings = {
112-
project: Object.fromEntries(projects.map((p) => [p.id, p.externalRef])),
113-
environment: Object.fromEntries(environments.map((e) => [e.id, e.slug])),
114-
};
119+
try {
120+
// Build tenant IDs based on scope
121+
const tenantOptions: {
122+
organizationId: string;
123+
projectId?: string;
124+
environmentId?: string;
125+
} = {
126+
organizationId,
127+
};
128+
129+
if (scope === "project" || scope === "environment") {
130+
tenantOptions.projectId = projectId;
131+
}
132+
133+
if (scope === "environment") {
134+
tenantOptions.environmentId = environmentId;
135+
}
136+
137+
// Build field mappings for project_ref → project_id and environment_id → slug translation
138+
const projects = await prisma.project.findMany({
139+
where: { organizationId },
140+
select: { id: true, externalRef: true },
141+
});
115142

116-
const result = await executeTSQL(clickhouseClient.reader, {
117-
...baseOptions,
118-
...tenantOptions,
119-
fieldMappings,
120-
clickhouseSettings: {
121-
...getDefaultClickhouseSettings(),
122-
...baseOptions.clickhouseSettings, // Allow caller overrides if needed
123-
},
124-
});
143+
const environments = await prisma.runtimeEnvironment.findMany({
144+
where: { project: { organizationId } },
145+
select: { id: true, slug: true },
146+
});
125147

126-
// If query succeeded and history options provided, save to history
127-
if (result[0] === null && history) {
128-
const stats = result[1].stats;
129-
const byteSeconds = parseFloat(stats.byte_seconds) || 0;
130-
const costInCents = byteSeconds * env.CENTS_PER_QUERY_BYTE_SECOND;
131-
132-
await prisma.customerQuery.create({
133-
data: {
134-
query: options.query,
135-
scope: scopeToEnum[scope],
136-
stats: { ...stats },
137-
costInCents,
138-
source: history.source,
139-
organizationId,
140-
projectId: scope === "project" || scope === "environment" ? projectId : null,
141-
environmentId: scope === "environment" ? environmentId : null,
142-
userId: history.userId ?? null,
148+
const fieldMappings: FieldMappings = {
149+
project: Object.fromEntries(projects.map((p) => [p.id, p.externalRef])),
150+
environment: Object.fromEntries(environments.map((e) => [e.id, e.slug])),
151+
};
152+
153+
const result = await executeTSQL(clickhouseClient.reader, {
154+
...baseOptions,
155+
...tenantOptions,
156+
fieldMappings,
157+
clickhouseSettings: {
158+
...getDefaultClickhouseSettings(),
159+
...baseOptions.clickhouseSettings, // Allow caller overrides if needed
143160
},
144161
});
145-
}
146162

147-
return result;
163+
// If query succeeded and history options provided, save to history
164+
if (result[0] === null && history) {
165+
const stats = result[1].stats;
166+
const byteSeconds = parseFloat(stats.byte_seconds) || 0;
167+
const costInCents = byteSeconds * env.CENTS_PER_QUERY_BYTE_SECOND;
168+
169+
await prisma.customerQuery.create({
170+
data: {
171+
query: options.query,
172+
scope: scopeToEnum[scope],
173+
stats: { ...stats },
174+
costInCents,
175+
source: history.source,
176+
organizationId,
177+
projectId: scope === "project" || scope === "environment" ? projectId : null,
178+
environmentId: scope === "environment" ? environmentId : null,
179+
userId: history.userId ?? null,
180+
},
181+
});
182+
}
183+
184+
return result;
185+
} finally {
186+
// Always release the concurrency slot
187+
await queryConcurrencyLimiter.release({
188+
key: organizationId,
189+
requestId,
190+
});
191+
}
148192
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import { createRedisClient, type RedisClient, type RedisWithClusterOptions } from "~/redis.server";
2+
3+
export type RedisConcurrencyLimiterOptions = {
4+
redis: RedisWithClusterOptions;
5+
/** Prefix for Redis keys */
6+
keyPrefix: string;
7+
/** Auto-expire stale entries after this many seconds (default: 300 = 5 minutes) */
8+
expiryTimeInSeconds?: number;
9+
};
10+
11+
export type AcquireResult =
12+
| { success: true }
13+
| { success: false; reason: "key_limit" | "global_limit" };
14+
15+
/**
16+
* A generic Redis-based concurrency limiter that supports two-level limiting:
17+
* - Key-level limit (e.g., per organization)
18+
* - Global limit (across all keys)
19+
*
20+
* Uses Redis sorted sets with timestamps to track active requests,
21+
* with automatic expiry of stale entries as a safety net.
22+
*/
23+
export class RedisConcurrencyLimiter {
24+
private redis: RedisClient;
25+
private keyPrefix: string;
26+
private expiryTimeInSeconds: number;
27+
28+
constructor(options: RedisConcurrencyLimiterOptions) {
29+
this.redis = createRedisClient(`${options.keyPrefix}:limiter`, options.redis);
30+
this.keyPrefix = options.keyPrefix;
31+
this.expiryTimeInSeconds = options.expiryTimeInSeconds ?? 300; // 5 minutes default
32+
this.#registerCommands();
33+
}
34+
35+
/**
36+
* Acquire a concurrency slot atomically checking both key and global limits.
37+
*
38+
* @param options.key - The key to limit (e.g., organizationId)
39+
* @param options.requestId - A unique identifier for this request
40+
* @param options.keyLimit - The maximum concurrent requests for this key
41+
* @param options.globalLimit - The maximum concurrent requests globally
42+
* @returns Success or failure with reason
43+
*/
44+
async acquire(options: {
45+
key: string;
46+
requestId: string;
47+
keyLimit: number;
48+
globalLimit: number;
49+
}): Promise<AcquireResult> {
50+
const { key, requestId, keyLimit, globalLimit } = options;
51+
const keyKey = this.#getKeyKey(key);
52+
const globalKey = this.#getGlobalKey();
53+
const now = Date.now();
54+
const cutoffTime = now - this.expiryTimeInSeconds * 1000;
55+
56+
// @ts-expect-error - Custom command defined via defineCommand
57+
const result = await this.redis.acquireConcurrency(
58+
keyKey,
59+
globalKey,
60+
now.toString(),
61+
requestId,
62+
this.expiryTimeInSeconds.toString(),
63+
cutoffTime.toString(),
64+
keyLimit.toString(),
65+
globalLimit.toString()
66+
);
67+
68+
// Result: 1 = success, 0 = key limit exceeded, -1 = global limit exceeded
69+
if (result === 1) {
70+
return { success: true };
71+
} else if (result === 0) {
72+
return { success: false, reason: "key_limit" };
73+
} else {
74+
return { success: false, reason: "global_limit" };
75+
}
76+
}
77+
78+
/**
79+
* Release a concurrency slot.
80+
*
81+
* @param options.key - The key that was used to acquire
82+
* @param options.requestId - The request identifier used to acquire
83+
*/
84+
async release(options: { key: string; requestId: string }): Promise<void> {
85+
const { key, requestId } = options;
86+
const keyKey = this.#getKeyKey(key);
87+
const globalKey = this.#getGlobalKey();
88+
89+
// Remove from both sets in a single round trip
90+
await this.redis.pipeline().zrem(keyKey, requestId).zrem(globalKey, requestId).exec();
91+
}
92+
93+
#getKeyKey(key: string): string {
94+
return `${this.keyPrefix}:key:${key}`;
95+
}
96+
97+
#getGlobalKey(): string {
98+
return `${this.keyPrefix}:global`;
99+
}
100+
101+
#registerCommands() {
102+
this.redis.defineCommand("acquireConcurrency", {
103+
numberOfKeys: 2,
104+
lua: /* lua */ `
105+
local keyKey = KEYS[1]
106+
local globalKey = KEYS[2]
107+
108+
local timestamp = tonumber(ARGV[1])
109+
local requestId = ARGV[2]
110+
local expiryTime = tonumber(ARGV[3])
111+
local cutoffTime = tonumber(ARGV[4])
112+
local keyLimit = tonumber(ARGV[5])
113+
local globalLimit = tonumber(ARGV[6])
114+
115+
-- Remove expired entries from both sets
116+
redis.call('ZREMRANGEBYSCORE', keyKey, '-inf', cutoffTime)
117+
redis.call('ZREMRANGEBYSCORE', globalKey, '-inf', cutoffTime)
118+
119+
-- Check global limit first (more restrictive check)
120+
local globalCount = redis.call('ZCARD', globalKey)
121+
if globalCount >= globalLimit then
122+
return -1 -- Global limit exceeded
123+
end
124+
125+
-- Check key-specific limit
126+
local keyCount = redis.call('ZCARD', keyKey)
127+
if keyCount >= keyLimit then
128+
return 0 -- Key limit exceeded
129+
end
130+
131+
-- Add the request to both sorted sets
132+
redis.call('ZADD', keyKey, timestamp, requestId)
133+
redis.call('ZADD', globalKey, timestamp, requestId)
134+
135+
-- Set expiry on both keys
136+
redis.call('EXPIRE', keyKey, expiryTime)
137+
redis.call('EXPIRE', globalKey, expiryTime)
138+
139+
return 1 -- Success
140+
`,
141+
});
142+
}
143+
}

internal-packages/clickhouse/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ export {
4545
} from "./client/tsql.js";
4646
export type { OutputColumnMetadata } from "@internal/tsql";
4747

48+
// Errors
49+
export { QueryError } from "./client/errors.js";
50+
4851
export type ClickhouseCommonConfig = {
4952
keepAlive?: {
5053
enabled?: boolean;

0 commit comments

Comments
 (0)