Skip to content

Commit f4a7369

Browse files
committed
Add sharding support for electric
1 parent d39db91 commit f4a7369

File tree

3 files changed

+32
-5
lines changed

3 files changed

+32
-5
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ const EnvironmentSchema = z.object({
3535
API_ORIGIN: z.string().optional(),
3636
STREAM_ORIGIN: z.string().optional(),
3737
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
38+
// A comma separated list of electric origins to shard into different electric instances by environmentId
39+
// example: "http://localhost:3060,http://localhost:3061,http://localhost:3062"
40+
ELECTRIC_ORIGIN_SHARDS: z.string().optional(),
3841
APP_ENV: z.string().default(process.env.NODE_ENV),
3942
SERVICE_NAME: z.string().default("trigger.dev webapp"),
4043
POSTHOG_PROJECT_KEY: z.string().default("phc_LFH7kJiGhdIlnO22hTAKgHpaKhpM8gkzWAFvHmf5vfS"),

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { randomUUID } from "node:crypto";
66
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
77
import { longPollingFetch } from "~/utils/longPollingFetch";
88
import { logger } from "./logger.server";
9+
import { jumpHash } from "@trigger.dev/core/v3/serverOnly";
910

1011
export interface CachedLimitProvider {
1112
getCachedLimit: (organizationId: string, defaultValue: number) => Promise<number | undefined>;
@@ -45,7 +46,7 @@ const RESERVED_COLUMNS = ["id", "taskIdentifier", "friendlyId", "status", "creat
4546
const RESERVED_SEARCH_PARAMS = ["createdAt", "tags", "skipColumns"];
4647

4748
export type RealtimeClientOptions = {
48-
electricOrigin: string;
49+
electricOrigin: string | string[];
4950
redis: RedisWithClusterOptions;
5051
cachedLimitProvider: CachedLimitProvider;
5152
keyPrefix: string;
@@ -221,15 +222,26 @@ export class RealtimeClient {
221222
whereClause: string,
222223
clientVersion?: string
223224
) {
224-
const electricUrl = this.#constructRunsElectricUrl(url, whereClause, clientVersion);
225+
const electricUrl = this.#constructRunsElectricUrl(
226+
url,
227+
environment,
228+
whereClause,
229+
clientVersion
230+
);
225231

226232
return this.#performElectricRequest(electricUrl, environment, undefined, clientVersion);
227233
}
228234

229-
#constructRunsElectricUrl(url: URL | string, whereClause: string, clientVersion?: string): URL {
235+
#constructRunsElectricUrl(
236+
url: URL | string,
237+
environment: RealtimeEnvironment,
238+
whereClause: string,
239+
clientVersion?: string
240+
): URL {
230241
const $url = new URL(url.toString());
231242

232-
const electricUrl = new URL(`${this.options.electricOrigin}/v1/shape`);
243+
const electricOrigin = this.#resolveElectricOrigin(environment.id);
244+
const electricUrl = new URL(`${electricOrigin}/v1/shape`);
233245

234246
// Copy over all the url search params to the electric url
235247
$url.searchParams.forEach((value, key) => {
@@ -428,6 +440,16 @@ export class RealtimeClient {
428440
return `${this.options.keyPrefix}:${environmentId}`;
429441
}
430442

443+
#resolveElectricOrigin(environmentId: string) {
444+
if (typeof this.options.electricOrigin === "string") {
445+
return this.options.electricOrigin;
446+
}
447+
448+
const index = jumpHash(environmentId, this.options.electricOrigin.length);
449+
450+
return this.options.electricOrigin[index] ?? this.options.electricOrigin[0];
451+
}
452+
431453
#registerCommands() {
432454
this.redis.defineCommand("incrementAndCheckConcurrency", {
433455
numberOfKeys: 1,

apps/webapp/app/services/realtimeClientGlobal.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import { RealtimeClient } from "./realtimeClient.server";
44
import { getCachedLimit } from "./platform.v3.server";
55

66
function initializeRealtimeClient() {
7+
const electricOrigin = env.ELECTRIC_ORIGIN_SHARDS?.split(",") ?? env.ELECTRIC_ORIGIN;
8+
79
return new RealtimeClient({
8-
electricOrigin: env.ELECTRIC_ORIGIN,
10+
electricOrigin: electricOrigin,
911
keyPrefix: "tr:realtime:concurrency",
1012
redis: {
1113
port: env.RATE_LIMIT_REDIS_PORT,

0 commit comments

Comments
 (0)