Skip to content

Commit 777db7c

Browse files
committed
made accessing run and payload fields more type safe
1 parent 4de8899 commit 777db7c

File tree

5 files changed

+129
-44
lines changed

5 files changed

+129
-44
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse";
2-
import { TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse";
2+
import { getTaskRunField, getPayloadField } from "@internal/clickhouse";
33
import { type RedisOptions } from "@internal/redis";
44
import {
55
LogicalReplicationClient,
@@ -580,28 +580,28 @@ export class RunsReplicationService {
580580
// batch inserts in clickhouse are more performant if the items
581581
// are pre-sorted by the primary key
582582
.sort((a, b) => {
583-
const aOrgId = a[TASK_RUN_INDEX.organization_id] as string;
584-
const bOrgId = b[TASK_RUN_INDEX.organization_id] as string;
583+
const aOrgId = getTaskRunField(a, "organization_id");
584+
const bOrgId = getTaskRunField(b, "organization_id");
585585
if (aOrgId !== bOrgId) {
586586
return aOrgId < bOrgId ? -1 : 1;
587587
}
588-
const aProjId = a[TASK_RUN_INDEX.project_id] as string;
589-
const bProjId = b[TASK_RUN_INDEX.project_id] as string;
588+
const aProjId = getTaskRunField(a, "project_id");
589+
const bProjId = getTaskRunField(b, "project_id");
590590
if (aProjId !== bProjId) {
591591
return aProjId < bProjId ? -1 : 1;
592592
}
593-
const aEnvId = a[TASK_RUN_INDEX.environment_id] as string;
594-
const bEnvId = b[TASK_RUN_INDEX.environment_id] as string;
593+
const aEnvId = getTaskRunField(a, "environment_id");
594+
const bEnvId = getTaskRunField(b, "environment_id");
595595
if (aEnvId !== bEnvId) {
596596
return aEnvId < bEnvId ? -1 : 1;
597597
}
598-
const aCreatedAt = a[TASK_RUN_INDEX.created_at] as number;
599-
const bCreatedAt = b[TASK_RUN_INDEX.created_at] as number;
598+
const aCreatedAt = getTaskRunField(a, "created_at");
599+
const bCreatedAt = getTaskRunField(b, "created_at");
600600
if (aCreatedAt !== bCreatedAt) {
601601
return aCreatedAt - bCreatedAt;
602602
}
603-
const aRunId = a[TASK_RUN_INDEX.run_id] as string;
604-
const bRunId = b[TASK_RUN_INDEX.run_id] as string;
603+
const aRunId = getTaskRunField(a, "run_id");
604+
const bRunId = getTaskRunField(b, "run_id");
605605
if (aRunId === bRunId) return 0;
606606
return aRunId < bRunId ? -1 : 1;
607607
});
@@ -612,8 +612,8 @@ export class RunsReplicationService {
612612
// batch inserts in clickhouse are more performant if the items
613613
// are pre-sorted by the primary key
614614
.sort((a, b) => {
615-
const aRunId = a[PAYLOAD_INDEX.run_id] as string;
616-
const bRunId = b[PAYLOAD_INDEX.run_id] as string;
615+
const aRunId = getPayloadField(a, "run_id");
616+
const bRunId = getPayloadField(b, "run_id");
617617
if (aRunId === bRunId) return 0;
618618
return aRunId < bRunId ? -1 : 1;
619619
});

apps/webapp/test/runsReplicationService.part2.test.ts

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ClickHouse, TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse";
1+
import { ClickHouse, getTaskRunField, getPayloadField } from "@internal/clickhouse";
22
import { containerTest } from "@internal/testcontainers";
33
import { Logger } from "@trigger.dev/core/logger";
44
import { readFile } from "node:fs/promises";
@@ -889,13 +889,13 @@ describe("RunsReplicationService (part 2/2)", () => {
889889
await setTimeout(1000);
890890

891891
expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2);
892-
// Use TASK_RUN_INDEX for type-safe array access
893-
expect(batchFlushedEvents?.[0].taskRunInserts[0][TASK_RUN_INDEX.run_id]).toEqual(run.id);
894-
expect(batchFlushedEvents?.[0].taskRunInserts[0][TASK_RUN_INDEX.status]).toEqual(
892+
// Use getTaskRunField for type-safe array access
893+
expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[0], "run_id")).toEqual(run.id);
894+
expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[0], "status")).toEqual(
895895
"PENDING_VERSION"
896896
);
897-
expect(batchFlushedEvents?.[0].taskRunInserts[1][TASK_RUN_INDEX.run_id]).toEqual(run.id);
898-
expect(batchFlushedEvents?.[0].taskRunInserts[1][TASK_RUN_INDEX.status]).toEqual(
897+
expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[1], "run_id")).toEqual(run.id);
898+
expect(getTaskRunField(batchFlushedEvents![0].taskRunInserts[1], "status")).toEqual(
899899
"COMPLETED_SUCCESSFULLY"
900900
);
901901

@@ -1063,22 +1063,22 @@ describe("RunsReplicationService (part 2/2)", () => {
10631063

10641064
// Verify sorting order: organization_id, project_id, environment_id, created_at, run_id
10651065
for (let i = 1; i < batchFlushedEvents[0]?.taskRunInserts.length; i++) {
1066-
const prev = batchFlushedEvents[0]?.taskRunInserts[i - 1];
1067-
const curr = batchFlushedEvents[0]?.taskRunInserts[i];
1066+
const prev = batchFlushedEvents[0]!.taskRunInserts[i - 1];
1067+
const curr = batchFlushedEvents[0]!.taskRunInserts[i];
10681068

10691069
const prevKey = [
1070-
prev[TASK_RUN_INDEX.organization_id],
1071-
prev[TASK_RUN_INDEX.project_id],
1072-
prev[TASK_RUN_INDEX.environment_id],
1073-
prev[TASK_RUN_INDEX.created_at],
1074-
prev[TASK_RUN_INDEX.run_id],
1070+
getTaskRunField(prev, "organization_id"),
1071+
getTaskRunField(prev, "project_id"),
1072+
getTaskRunField(prev, "environment_id"),
1073+
getTaskRunField(prev, "created_at"),
1074+
getTaskRunField(prev, "run_id"),
10751075
];
10761076
const currKey = [
1077-
curr[TASK_RUN_INDEX.organization_id],
1078-
curr[TASK_RUN_INDEX.project_id],
1079-
curr[TASK_RUN_INDEX.environment_id],
1080-
curr[TASK_RUN_INDEX.created_at],
1081-
curr[TASK_RUN_INDEX.run_id],
1077+
getTaskRunField(curr, "organization_id"),
1078+
getTaskRunField(curr, "project_id"),
1079+
getTaskRunField(curr, "environment_id"),
1080+
getTaskRunField(curr, "created_at"),
1081+
getTaskRunField(curr, "run_id"),
10821082
];
10831083

10841084
const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]);
@@ -1106,9 +1106,9 @@ describe("RunsReplicationService (part 2/2)", () => {
11061106

11071107
// Verify payloadInserts are also sorted by run_id
11081108
for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) {
1109-
const prev = batchFlushedEvents[0]?.payloadInserts[i - 1];
1110-
const curr = batchFlushedEvents[0]?.payloadInserts[i];
1111-
expect(prev[PAYLOAD_INDEX.run_id] <= curr[PAYLOAD_INDEX.run_id]).toBeTruthy();
1109+
const prev = batchFlushedEvents[0]!.payloadInserts[i - 1];
1110+
const curr = batchFlushedEvents[0]!.payloadInserts[i];
1111+
expect(getPayloadField(prev, "run_id") <= getPayloadField(curr, "run_id")).toBeTruthy();
11121112
}
11131113

11141114
await runsReplicationService.stop();

internal-packages/clickhouse/src/client/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
708708
table: req.table,
709709
});
710710

711-
recordSpanError(span, clickhouseError);
711+
recordClickhouseError(span, clickhouseError);
712712
return [new InsertError(clickhouseError.message), null];
713713
}
714714

internal-packages/clickhouse/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ export type * from "./taskRuns.js";
3131
export type * from "./taskEvents.js";
3232
export type * from "./client/queryBuilder.js";
3333

34-
// Re-export column constants and indices for type-safe array access
34+
// Re-export column constants, indices, and type-safe accessors
3535
export {
3636
TASK_RUN_COLUMNS,
3737
TASK_RUN_INDEX,
3838
PAYLOAD_COLUMNS,
3939
PAYLOAD_INDEX,
40+
getTaskRunField,
41+
getPayloadField,
4042
} from "./taskRuns.js";
4143

4244
// TSQL query execution

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,73 @@ export const TASK_RUN_INDEX = Object.fromEntries(
109109
TASK_RUN_COLUMNS.map((col, idx) => [col, idx])
110110
) as { readonly [K in TaskRunColumnName]: number };
111111

112+
/**
113+
* Type mapping from column name to its type in TaskRunInsertArray.
114+
* This enables type-safe field access without manual casting.
115+
*/
116+
export type TaskRunFieldTypes = {
117+
environment_id: string;
118+
organization_id: string;
119+
project_id: string;
120+
run_id: string;
121+
updated_at: number;
122+
created_at: number;
123+
status: string;
124+
environment_type: string;
125+
friendly_id: string;
126+
attempt: number;
127+
engine: string;
128+
task_identifier: string;
129+
queue: string;
130+
schedule_id: string;
131+
batch_id: string;
132+
completed_at: number | null;
133+
started_at: number | null;
134+
executed_at: number | null;
135+
delay_until: number | null;
136+
queued_at: number | null;
137+
expired_at: number | null;
138+
usage_duration_ms: number;
139+
cost_in_cents: number;
140+
base_cost_in_cents: number;
141+
output: { data: unknown };
142+
error: { data: unknown };
143+
tags: string[];
144+
task_version: string;
145+
sdk_version: string;
146+
cli_version: string;
147+
machine_preset: string;
148+
root_run_id: string;
149+
parent_run_id: string;
150+
depth: number;
151+
span_id: string;
152+
trace_id: string;
153+
idempotency_key: string;
154+
expiration_ttl: string;
155+
is_test: boolean;
156+
_version: string;
157+
_is_deleted: number;
158+
concurrency_key: string;
159+
bulk_action_group_ids: string[];
160+
worker_queue: string;
161+
max_duration_in_seconds: number | null;
162+
};
163+
164+
/**
165+
* Type-safe accessor for TaskRunInsertArray fields.
166+
* Returns the correct type for each field without manual casting.
167+
*
168+
* @example
169+
* const orgId = getTaskRunField(run, "organization_id"); // type: string
170+
* const createdAt = getTaskRunField(run, "created_at"); // type: number
171+
*/
172+
export function getTaskRunField<K extends TaskRunColumnName>(
173+
run: TaskRunInsertArray,
174+
field: K
175+
): TaskRunFieldTypes[K] {
176+
return run[TASK_RUN_INDEX[field]] as TaskRunFieldTypes[K];
177+
}
178+
112179
export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) {
113180
return ch.insertCompactRaw({
114181
name: "insertTaskRunsCompactArrays",
@@ -149,9 +216,29 @@ export const PAYLOAD_COLUMNS = ["run_id", "created_at", "payload"] as const;
149216
export type PayloadColumnName = (typeof PAYLOAD_COLUMNS)[number];
150217

151218
// Type-safe column indices generated from PAYLOAD_COLUMNS
152-
export const PAYLOAD_INDEX = Object.fromEntries(
153-
PAYLOAD_COLUMNS.map((col, idx) => [col, idx])
154-
) as { readonly [K in PayloadColumnName]: number };
219+
export const PAYLOAD_INDEX = Object.fromEntries(PAYLOAD_COLUMNS.map((col, idx) => [col, idx])) as {
220+
readonly [K in PayloadColumnName]: number;
221+
};
222+
223+
/**
224+
* Type mapping from column name to its type in PayloadInsertArray.
225+
*/
226+
export type PayloadFieldTypes = {
227+
run_id: string;
228+
created_at: number;
229+
payload: { data: unknown };
230+
};
231+
232+
/**
233+
* Type-safe accessor for PayloadInsertArray fields.
234+
* Returns the correct type for each field without manual casting.
235+
*/
236+
export function getPayloadField<K extends PayloadColumnName>(
237+
payload: PayloadInsertArray,
238+
field: K
239+
): PayloadFieldTypes[K] {
240+
return payload[PAYLOAD_INDEX[field]] as PayloadFieldTypes[K];
241+
}
155242

156243
/**
157244
* Type-safe tuple representing a task run insert array.
@@ -209,11 +296,7 @@ export type TaskRunInsertArray = [
209296
* Type-safe tuple representing a payload insert array.
210297
* Order matches PAYLOAD_COLUMNS exactly.
211298
*/
212-
export type PayloadInsertArray = [
213-
run_id: string,
214-
created_at: number,
215-
payload: { data: unknown },
216-
];
299+
export type PayloadInsertArray = [run_id: string, created_at: number, payload: { data: unknown }];
217300

218301
export function insertRawTaskRunPayloadsCompactArrays(
219302
ch: ClickhouseWriter,

0 commit comments

Comments
 (0)