Skip to content

Commit d39db91

Browse files
committed
Filter realtime colums and expose ability to skip some columns
1 parent 0ffd481 commit d39db91

File tree

11 files changed

+208
-47
lines changed

11 files changed

+208
-47
lines changed

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,37 @@ export interface CachedLimitProvider {
1313

1414
const MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS = 7 * 24 * 60 * 60 * 1000;
1515

16+
const DEFAULT_ELECTRIC_COLUMNS = [
17+
"id",
18+
"taskIdentifier",
19+
"createdAt",
20+
"updatedAt",
21+
"startedAt",
22+
"delayUntil",
23+
"queuedAt",
24+
"expiredAt",
25+
"completedAt",
26+
"friendlyId",
27+
"number",
28+
"isTest",
29+
"status",
30+
"usageDurationMs",
31+
"costInCents",
32+
"baseCostInCents",
33+
"ttl",
34+
"payload",
35+
"payloadType",
36+
"metadata",
37+
"metadataType",
38+
"output",
39+
"outputType",
40+
"runTags",
41+
"error",
42+
];
43+
44+
const RESERVED_COLUMNS = ["id", "taskIdentifier", "friendlyId", "status", "createdAt"];
45+
const RESERVED_SEARCH_PARAMS = ["createdAt", "tags", "skipColumns"];
46+
1647
export type RealtimeClientOptions = {
1748
electricOrigin: string;
1849
redis: RedisWithClusterOptions;
@@ -202,6 +233,10 @@ export class RealtimeClient {
202233

203234
// Copy over all the url search params to the electric url
204235
$url.searchParams.forEach((value, key) => {
236+
if (RESERVED_SEARCH_PARAMS.includes(key)) {
237+
return;
238+
}
239+
205240
electricUrl.searchParams.set(key, value);
206241
});
207242

@@ -214,6 +249,27 @@ export class RealtimeClient {
214249
electricUrl.searchParams.set("handle", electricUrl.searchParams.get("shape_id") ?? "");
215250
}
216251

252+
const skipColumnsRaw = $url.searchParams.get("skipColumns");
253+
254+
if (skipColumnsRaw) {
255+
const skipColumns = skipColumnsRaw
256+
.split(",")
257+
.map((c) => c.trim())
258+
.filter((c) => c !== "" && !RESERVED_COLUMNS.includes(c));
259+
260+
electricUrl.searchParams.set(
261+
"columns",
262+
DEFAULT_ELECTRIC_COLUMNS.filter((c) => !skipColumns.includes(c))
263+
.map((c) => `"${c}"`)
264+
.join(",")
265+
);
266+
} else {
267+
electricUrl.searchParams.set(
268+
"columns",
269+
DEFAULT_ELECTRIC_COLUMNS.map((c) => `"${c}"`).join(",")
270+
);
271+
}
272+
217273
return electricUrl;
218274
}
219275

packages/core/src/v3/apiClient/index.ts

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import {
6666
SSEStreamSubscriptionFactory,
6767
TaskRunShape,
6868
runShapeStream,
69+
RealtimeRunSkipColumns,
6970
} from "./runStream.js";
7071
import {
7172
CreateEnvironmentVariableParams,
@@ -88,6 +89,7 @@ export type {
8889
ImportEnvironmentVariablesParams,
8990
SubscribeToRunsQueryParams,
9091
UpdateEnvironmentVariableParams,
92+
RealtimeRunSkipColumns,
9193
};
9294

9395
export type ClientTriggerOptions = {
@@ -890,26 +892,37 @@ export class ApiClient {
890892
signal?: AbortSignal;
891893
closeOnComplete?: boolean;
892894
onFetchError?: (error: Error) => void;
895+
skipColumns?: string[];
893896
}
894897
) {
895-
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
896-
closeOnComplete:
897-
typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true,
898-
headers: this.#getRealtimeHeaders(),
899-
client: this,
900-
signal: options?.signal,
901-
onFetchError: options?.onFetchError,
902-
});
898+
const queryParams = new URLSearchParams();
899+
900+
if (options?.skipColumns) {
901+
queryParams.append("skipColumns", options.skipColumns.join(","));
902+
}
903+
904+
return runShapeStream<TRunTypes>(
905+
`${this.baseUrl}/realtime/v1/runs/${runId}${queryParams ? `?${queryParams}` : ""}`,
906+
{
907+
closeOnComplete:
908+
typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true,
909+
headers: this.#getRealtimeHeaders(),
910+
client: this,
911+
signal: options?.signal,
912+
onFetchError: options?.onFetchError,
913+
}
914+
);
903915
}
904916

905917
subscribeToRunsWithTag<TRunTypes extends AnyRunTypes>(
906918
tag: string | string[],
907-
filters?: { createdAt?: string },
919+
filters?: { createdAt?: string; skipColumns?: string[] },
908920
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
909921
) {
910922
const searchParams = createSearchQueryForSubscribeToRuns({
911923
tags: tag,
912924
...(filters ? { createdAt: filters.createdAt } : {}),
925+
...(filters?.skipColumns ? { skipColumns: filters.skipColumns } : {}),
913926
});
914927

915928
return runShapeStream<TRunTypes>(
@@ -926,15 +939,28 @@ export class ApiClient {
926939

927940
subscribeToBatch<TRunTypes extends AnyRunTypes>(
928941
batchId: string,
929-
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
942+
options?: {
943+
signal?: AbortSignal;
944+
onFetchError?: (error: Error) => void;
945+
skipColumns?: string[];
946+
}
930947
) {
931-
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/batches/${batchId}`, {
932-
closeOnComplete: false,
933-
headers: this.#getRealtimeHeaders(),
934-
client: this,
935-
signal: options?.signal,
936-
onFetchError: options?.onFetchError,
937-
});
948+
const queryParams = new URLSearchParams();
949+
950+
if (options?.skipColumns) {
951+
queryParams.append("skipColumns", options.skipColumns.join(","));
952+
}
953+
954+
return runShapeStream<TRunTypes>(
955+
`${this.baseUrl}/realtime/v1/batches/${batchId}${queryParams ? `?${queryParams}` : ""}`,
956+
{
957+
closeOnComplete: false,
958+
headers: this.#getRealtimeHeaders(),
959+
client: this,
960+
signal: options?.signal,
961+
onFetchError: options?.onFetchError,
962+
}
963+
);
938964
}
939965

940966
async fetchStream<T>(
@@ -1049,6 +1075,10 @@ function createSearchQueryForSubscribeToRuns(query?: SubscribeToRunsQueryParams)
10491075
if (query.createdAt) {
10501076
searchParams.append("createdAt", query.createdAt);
10511077
}
1078+
1079+
if (query.skipColumns) {
1080+
searchParams.append("skipColumns", query.skipColumns.join(","));
1081+
}
10521082
}
10531083

10541084
return searchParams;

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,27 @@ export type TaskRunShape<TTask extends AnyTask> = RunShape<InferRunTypes<TTask>>
5555
export type RealtimeRun<TTask extends AnyTask> = TaskRunShape<TTask>;
5656
export type AnyRealtimeRun = RealtimeRun<AnyTask>;
5757

58+
export type RealtimeRunSkipColumns = Array<
59+
| "startedAt"
60+
| "delayUntil"
61+
| "queuedAt"
62+
| "expiredAt"
63+
| "completedAt"
64+
| "number"
65+
| "isTest"
66+
| "usageDurationMs"
67+
| "costInCents"
68+
| "baseCostInCents"
69+
| "ttl"
70+
| "payload"
71+
| "payloadType"
72+
| "metadata"
73+
| "output"
74+
| "outputType"
75+
| "runTags"
76+
| "error"
77+
>;
78+
5879
export type RunStreamCallback<TRunTypes extends AnyRunTypes> = (
5980
run: RunShape<TRunTypes>
6081
) => void | Promise<void>;
@@ -395,16 +416,16 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
395416

396417
return {
397418
id: row.friendlyId,
398-
payload,
399-
output,
400419
createdAt: row.createdAt,
401420
updatedAt: row.updatedAt,
402421
taskIdentifier: row.taskIdentifier,
403-
number: row.number,
404422
status: apiStatusFromRunStatus(row.status),
405-
durationMs: row.usageDurationMs,
406-
costInCents: row.costInCents,
407-
baseCostInCents: row.baseCostInCents,
423+
payload,
424+
output,
425+
number: row.number ?? 0,
426+
durationMs: row.usageDurationMs ?? 0,
427+
costInCents: row.costInCents ?? 0,
428+
baseCostInCents: row.baseCostInCents ?? 0,
408429
tags: row.runTags ?? [],
409430
idempotencyKey: row.idempotencyKey ?? undefined,
410431
expiredAt: row.expiredAt ?? undefined,
@@ -413,7 +434,7 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
413434
delayedUntil: row.delayUntil ?? undefined,
414435
queuedAt: row.queuedAt ?? undefined,
415436
error: row.error ? createJsonErrorObject(row.error) : undefined,
416-
isTest: row.isTest,
437+
isTest: row.isTest ?? false,
417438
metadata,
418439
} as RunShape<TRunTypes>;
419440
}

packages/core/src/v3/apiClient/stream.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,16 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
102102
// Create the source stream that will receive messages
103103
const source = new ReadableStream<Message<T>[]>({
104104
start: (controller) => {
105-
this.#unsubscribe = this.#stream.subscribe(
106-
(messages) => {
107-
if (!this.#isStreamClosed) {
108-
controller.enqueue(messages);
109-
}
110-
},
111-
this.#handleError.bind(this)
112-
);
105+
this.#unsubscribe = this.#stream.subscribe((messages) => {
106+
if (!this.#isStreamClosed) {
107+
controller.enqueue(messages);
108+
}
109+
}, this.#handleError.bind(this));
113110
},
114111
cancel: () => {
115112
this.#isStreamClosed = true;
116113
this.#unsubscribe?.();
117-
}
114+
},
118115
});
119116

120117
// Create the transformed stream that processes messages and emits complete rows

packages/core/src/v3/apiClient/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export interface SubscribeToRunsQueryParams {
4242
tasks?: Array<string> | string;
4343
tags?: Array<string> | string;
4444
createdAt?: string;
45+
skipColumns?: string[];
4546
}
4647

4748
export interface ListWaitpointTokensQueryParams extends CursorPageParams {

packages/core/src/v3/schemas/api.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -870,22 +870,22 @@ const RawOptionalShapeDate = z
870870

871871
export const SubscribeRunRawShape = z.object({
872872
id: z.string(),
873-
idempotencyKey: z.string().nullish(),
873+
taskIdentifier: z.string(),
874+
friendlyId: z.string(),
875+
status: z.string(),
874876
createdAt: RawShapeDate,
875877
updatedAt: RawShapeDate,
876878
startedAt: RawOptionalShapeDate,
877879
delayUntil: RawOptionalShapeDate,
878880
queuedAt: RawOptionalShapeDate,
879881
expiredAt: RawOptionalShapeDate,
880882
completedAt: RawOptionalShapeDate,
881-
taskIdentifier: z.string(),
882-
friendlyId: z.string(),
883-
number: z.number(),
884-
isTest: z.boolean(),
885-
status: z.string(),
886-
usageDurationMs: z.number(),
887-
costInCents: z.number(),
888-
baseCostInCents: z.number(),
883+
idempotencyKey: z.string().nullish(),
884+
number: z.number().default(0),
885+
isTest: z.boolean().default(false),
886+
usageDurationMs: z.number().default(0),
887+
costInCents: z.number().default(0),
888+
baseCostInCents: z.number().default(0),
889889
ttl: z.string().nullish(),
890890
payload: z.string().nullish(),
891891
payloadType: z.string().nullish(),

0 commit comments

Comments
 (0)