Skip to content

Commit 0ffd481

Browse files
committed
Add createdAt filter to realtime subscribing with tags
1 parent e617c14 commit 0ffd481

File tree

9 files changed

+700
-43
lines changed

9 files changed

+700
-43
lines changed

apps/webapp/app/routes/realtime.v1.runs.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const SearchParamsSchema = z.object({
99
.transform((value) => {
1010
return value ? value.split(",") : undefined;
1111
}),
12+
createdAt: z.string().optional(),
1213
});
1314

1415
export const loader = createLoaderApiRoute(

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import { json } from "@remix-run/server-runtime";
2-
import Redis, { Callback, Result, type RedisOptions } from "ioredis";
2+
import { tryCatch } from "@trigger.dev/core/utils";
3+
import { safeParseNaturalLanguageDurationAgo } from "@trigger.dev/core/v3/isomorphic";
4+
import { Callback, Result } from "ioredis";
35
import { randomUUID } from "node:crypto";
6+
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
47
import { longPollingFetch } from "~/utils/longPollingFetch";
58
import { logger } from "./logger.server";
6-
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
79

810
export interface CachedLimitProvider {
911
getCachedLimit: (organizationId: string, defaultValue: number) => Promise<number | undefined>;
1012
}
1113

14+
const MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS = 7 * 24 * 60 * 60 * 1000;
15+
1216
export type RealtimeClientOptions = {
1317
electricOrigin: string;
1418
redis: RedisWithClusterOptions;
@@ -24,6 +28,7 @@ export type RealtimeEnvironment = {
2428

2529
export type RealtimeRunsParams = {
2630
tags?: string[];
31+
createdAt?: string;
2732
};
2833

2934
export class RealtimeClient {
@@ -92,9 +97,91 @@ export class RealtimeClient {
9297
whereClauses.push(`"runTags" @> ARRAY[${params.tags.map((t) => `'${t}'`).join(",")}]`);
9398
}
9499

100+
const createdAtFilter = await this.#calculateCreatedAtFilter(url, params.createdAt);
101+
102+
if (createdAtFilter) {
103+
whereClauses.push(`"createdAt" > '${createdAtFilter.toISOString()}'`);
104+
}
105+
95106
const whereClause = whereClauses.join(" AND ");
96107

97-
return this.#streamRunsWhere(url, environment, whereClause, clientVersion);
108+
const response = await this.#streamRunsWhere(url, environment, whereClause, clientVersion);
109+
110+
if (createdAtFilter) {
111+
const [setCreatedAtFilterError] = await tryCatch(
112+
this.#setCreatedAtFilterFromResponse(response, createdAtFilter)
113+
);
114+
115+
if (setCreatedAtFilterError) {
116+
logger.error("[realtimeClient] Failed to set createdAt filter", {
117+
error: setCreatedAtFilterError,
118+
createdAtFilter,
119+
responseHeaders: Object.fromEntries(response.headers.entries()),
120+
responseStatus: response.status,
121+
});
122+
}
123+
}
124+
125+
return response;
126+
}
127+
128+
async #calculateCreatedAtFilter(url: URL | string, createdAt?: string) {
129+
const duration = createdAt ?? "24h";
130+
const $url = new URL(url.toString());
131+
const shapeId = extractShapeId($url);
132+
133+
if (!shapeId) {
134+
// This means we need to calculate the createdAt filter and store it in redis after we get back the response
135+
const createdAtFilter = safeParseNaturalLanguageDurationAgo(duration);
136+
137+
// Validate that the createdAt filter is in the past, and not more than 1 week in the past.
138+
// if it's more than 1 week in the past, just return 1 week ago Date
139+
if (
140+
createdAtFilter &&
141+
createdAtFilter < new Date(Date.now() - MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS)
142+
) {
143+
return new Date(Date.now() - MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS);
144+
}
145+
146+
return createdAtFilter;
147+
} else {
148+
// We need to get the createdAt filter value from redis, if there is none we need to return undefined
149+
const [createdAtFilterError, createdAtFilter] = await tryCatch(
150+
this.#getCreatedAtFilter(shapeId)
151+
);
152+
153+
if (createdAtFilterError) {
154+
logger.error("[realtimeClient] Failed to get createdAt filter", {
155+
shapeId,
156+
error: createdAtFilterError,
157+
});
158+
159+
return;
160+
}
161+
162+
return createdAtFilter;
163+
}
164+
}
165+
166+
async #getCreatedAtFilter(shapeId: string) {
167+
// TODO: replace this with unkey cache so we can use in memory
168+
const createdAtFilterRawValue = await this.redis.get(`shapes:${shapeId}:filters:createdAt`);
169+
170+
if (!createdAtFilterRawValue) {
171+
return;
172+
}
173+
174+
return new Date(createdAtFilterRawValue);
175+
}
176+
177+
async #setCreatedAtFilterFromResponse(response: Response, createdAtFilter: Date) {
178+
const shapeId = extractShapeIdFromResponse(response);
179+
180+
if (!shapeId) {
181+
return;
182+
}
183+
184+
await this.redis.set(`shapes:${shapeId}:filters:createdAt`, createdAtFilter.toISOString());
98185
}
99186

100187
async #streamRunsWhere(
@@ -172,6 +259,8 @@ export class RealtimeClient {
172259
) {
173260
const shapeId = extractShapeId(url);
174261

262+
url = await this.#handleCreatedAtFilter(url, shapeId);
263+
175264
logger.debug("[realtimeClient] request", {
176265
url: url.toString(),
177266
});
@@ -232,6 +321,10 @@ export class RealtimeClient {
232321
// ... (rest of your existing code for the long polling request)
233322
const response = await longPollingFetch(url.toString(), { signal }, rewriteResponseHeaders);
234323

324+
// If this is the initial request, the response.headers['electric-handle'] will be the shapeId
325+
// And we may need to set the "createdAt" filter timestamp keyed by the shapeId
326+
// Then in the next request, we will get the createdAt timestamp value via the shapeId and use it to filter the results
327+
235328
// Decrement the counter after the long polling request is complete
236329
await this.#decrementConcurrency(environment.id, requestId);
237330

@@ -244,6 +337,10 @@ export class RealtimeClient {
244337
}
245338
}
246339

340+
async #handleCreatedAtFilter(url: URL, shapeId?: string | null) {
341+
return url;
342+
}
343+
247344
async #incrementAndCheck(environmentId: string, requestId: string, limit: number) {
248345
const key = this.#getKey(environmentId);
249346
const now = Date.now();
@@ -314,7 +411,11 @@ export class RealtimeClient {
314411
}
315412

316413
function extractShapeId(url: URL) {
317-
return url.searchParams.get("handle");
414+
return url.searchParams.get("handle") ?? url.searchParams.get("shape_id");
415+
}
416+
417+
function extractShapeIdFromResponse(response: Response) {
418+
return response.headers.get("electric-handle");
318419
}
319420

320421
function isLiveRequestUrl(url: URL) {

packages/core/src/v3/apiClient/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,10 +904,12 @@ export class ApiClient {
904904

905905
subscribeToRunsWithTag<TRunTypes extends AnyRunTypes>(
906906
tag: string | string[],
907+
filters?: { createdAt?: string },
907908
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
908909
) {
909910
const searchParams = createSearchQueryForSubscribeToRuns({
910911
tags: tag,
912+
...(filters ? { createdAt: filters.createdAt } : {}),
911913
});
912914

913915
return runShapeStream<TRunTypes>(
@@ -1043,6 +1045,10 @@ function createSearchQueryForSubscribeToRuns(query?: SubscribeToRunsQueryParams)
10431045
if (query.tags) {
10441046
searchParams.append("tags", Array.isArray(query.tags) ? query.tags.join(",") : query.tags);
10451047
}
1048+
1049+
if (query.createdAt) {
1050+
searchParams.append("createdAt", query.createdAt);
1051+
}
10461052
}
10471053

10481054
return searchParams;

packages/core/src/v3/apiClient/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export interface ListProjectRunsQueryParams extends CursorPageParams, ListRunsQu
4141
export interface SubscribeToRunsQueryParams {
4242
tasks?: Array<string> | string;
4343
tags?: Array<string> | string;
44+
createdAt?: string;
4445
}
4546

4647
export interface ListWaitpointTokensQueryParams extends CursorPageParams {

packages/core/src/v3/isomorphic/duration.ts

Lines changed: 129 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,149 @@
11
export function parseNaturalLanguageDuration(duration: string): Date | undefined {
2-
const regexPattern = /^(\d+w)?(\d+d)?(\d+h)?(\d+m)?(\d+s)?$/;
2+
// More flexible regex that captures all units individually regardless of order
3+
const weekMatch = duration.match(/(\d+)w/);
4+
const dayMatch = duration.match(/(\d+)d/);
5+
const hourMatch = duration.match(/(\d+)(?:hr|h)/);
6+
const minuteMatch = duration.match(/(\d+)m/);
7+
const secondMatch = duration.match(/(\d+)s/);
38

4-
const result: Date = new Date();
9+
// Check if the entire string consists only of valid duration units
10+
const validPattern = /^(\d+(?:w|d|hr|h|m|s))+$/;
11+
if (!validPattern.test(duration)) {
12+
return undefined;
13+
}
14+
15+
let totalMilliseconds = 0;
516
let hasMatch = false;
617

7-
const elements = duration.match(regexPattern);
8-
if (elements) {
9-
if (elements[1]) {
10-
const weeks = Number(elements[1].slice(0, -1));
11-
if (weeks >= 0) {
12-
result.setDate(result.getDate() + 7 * weeks);
13-
hasMatch = true;
14-
}
18+
if (weekMatch) {
19+
const weeks = Number(weekMatch[1]);
20+
if (weeks >= 0) {
21+
totalMilliseconds += weeks * 7 * 24 * 60 * 60 * 1000;
22+
hasMatch = true;
1523
}
16-
if (elements[2]) {
17-
const days = Number(elements[2].slice(0, -1));
18-
if (days >= 0) {
19-
result.setDate(result.getDate() + days);
20-
hasMatch = true;
21-
}
24+
}
25+
26+
if (dayMatch) {
27+
const days = Number(dayMatch[1]);
28+
if (days >= 0) {
29+
totalMilliseconds += days * 24 * 60 * 60 * 1000;
30+
hasMatch = true;
2231
}
23-
if (elements[3]) {
24-
const hours = Number(elements[3].slice(0, -1));
25-
if (hours >= 0) {
26-
result.setHours(result.getHours() + hours);
27-
hasMatch = true;
28-
}
32+
}
33+
34+
if (hourMatch) {
35+
const hours = Number(hourMatch[1]);
36+
if (hours >= 0) {
37+
totalMilliseconds += hours * 60 * 60 * 1000;
38+
hasMatch = true;
2939
}
30-
if (elements[4]) {
31-
const minutes = Number(elements[4].slice(0, -1));
32-
if (minutes >= 0) {
33-
result.setMinutes(result.getMinutes() + minutes);
34-
hasMatch = true;
35-
}
40+
}
41+
42+
if (minuteMatch) {
43+
const minutes = Number(minuteMatch[1]);
44+
if (minutes >= 0) {
45+
totalMilliseconds += minutes * 60 * 1000;
46+
hasMatch = true;
3647
}
37-
if (elements[5]) {
38-
const seconds = Number(elements[5].slice(0, -1));
39-
if (seconds >= 0) {
40-
result.setSeconds(result.getSeconds() + seconds);
41-
hasMatch = true;
42-
}
48+
}
49+
50+
if (secondMatch) {
51+
const seconds = Number(secondMatch[1]);
52+
if (seconds >= 0) {
53+
totalMilliseconds += seconds * 1000;
54+
hasMatch = true;
4355
}
4456
}
4557

4658
if (hasMatch) {
47-
return result;
59+
return new Date(Date.now() + totalMilliseconds);
4860
}
4961

5062
return undefined;
5163
}
5264

65+
export function safeParseNaturalLanguageDuration(duration: string): Date | undefined {
66+
try {
67+
return parseNaturalLanguageDuration(duration);
68+
} catch (error) {
69+
return undefined;
70+
}
71+
}
72+
73+
// ... existing code ...
74+
75+
export function parseNaturalLanguageDurationAgo(duration: string): Date | undefined {
76+
// More flexible regex that captures all units individually regardless of order
77+
const weekMatch = duration.match(/(\d+)w/);
78+
const dayMatch = duration.match(/(\d+)d/);
79+
const hourMatch = duration.match(/(\d+)(?:hr|h)/);
80+
const minuteMatch = duration.match(/(\d+)m/);
81+
const secondMatch = duration.match(/(\d+)s/);
82+
83+
// Check if the entire string consists only of valid duration units
84+
const validPattern = /^(\d+(?:w|d|hr|h|m|s))+$/;
85+
if (!validPattern.test(duration)) {
86+
return undefined;
87+
}
88+
89+
let totalMilliseconds = 0;
90+
let hasMatch = false;
91+
92+
if (weekMatch) {
93+
const weeks = Number(weekMatch[1]);
94+
if (weeks >= 0) {
95+
totalMilliseconds += weeks * 7 * 24 * 60 * 60 * 1000;
96+
hasMatch = true;
97+
}
98+
}
99+
100+
if (dayMatch) {
101+
const days = Number(dayMatch[1]);
102+
if (days >= 0) {
103+
totalMilliseconds += days * 24 * 60 * 60 * 1000;
104+
hasMatch = true;
105+
}
106+
}
107+
108+
if (hourMatch) {
109+
const hours = Number(hourMatch[1]);
110+
if (hours >= 0) {
111+
totalMilliseconds += hours * 60 * 60 * 1000;
112+
hasMatch = true;
113+
}
114+
}
115+
116+
if (minuteMatch) {
117+
const minutes = Number(minuteMatch[1]);
118+
if (minutes >= 0) {
119+
totalMilliseconds += minutes * 60 * 1000;
120+
hasMatch = true;
121+
}
122+
}
123+
124+
if (secondMatch) {
125+
const seconds = Number(secondMatch[1]);
126+
if (seconds >= 0) {
127+
totalMilliseconds += seconds * 1000;
128+
hasMatch = true;
129+
}
130+
}
131+
132+
if (hasMatch) {
133+
return new Date(Date.now() - totalMilliseconds);
134+
}
135+
136+
return undefined;
137+
}
138+
139+
export function safeParseNaturalLanguageDurationAgo(duration: string): Date | undefined {
140+
try {
141+
return parseNaturalLanguageDurationAgo(duration);
142+
} catch (error) {
143+
return undefined;
144+
}
145+
}
146+
53147
export function stringifyDuration(seconds: number): string | undefined {
54148
if (seconds <= 0) {
55149
return;

0 commit comments

Comments
 (0)