Skip to content

Commit 30813f1

Browse files
committed
WIP clickhouse powered runs list
stuff
1 parent d489709 commit 30813f1

File tree

12 files changed

+2177
-4
lines changed

12 files changed

+2177
-4
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import { ClickHouse } from "@internal/clickhouse";
2+
import { Tracer } from "@internal/tracing";
3+
import { Logger, LogLevel } from "@trigger.dev/core/logger";
4+
import { TaskRunStatus } from "@trigger.dev/database";
5+
import { PrismaClient } from "~/db.server";
6+
7+
export type RunsRepositorySOptions = {
8+
clickhouse: ClickHouse;
9+
prisma: PrismaClient;
10+
logger?: Logger;
11+
logLevel?: LogLevel;
12+
tracer?: Tracer;
13+
};
14+
15+
export type ListRunsOptions = {
16+
projectId: string;
17+
environmentId: string;
18+
//filters
19+
tasks?: string[];
20+
versions?: string[];
21+
statuses?: TaskRunStatus[];
22+
tags?: string[];
23+
scheduleId?: string;
24+
period?: string;
25+
from?: number;
26+
to?: number;
27+
isTest?: boolean;
28+
rootOnly?: boolean;
29+
batchId?: string;
30+
runFriendlyIds?: string[];
31+
runIds?: string[];
32+
//pagination
33+
page: {
34+
size: number;
35+
cursor?: string;
36+
direction?: "forward" | "backward";
37+
};
38+
};
39+
40+
export class RunsRepository {
41+
constructor(private readonly options: RunsRepositorySOptions) {}
42+
43+
async listRuns(options: ListRunsOptions) {
44+
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
45+
queryBuilder
46+
.where("environment_id = {environmentId: String}", {
47+
environmentId: options.environmentId,
48+
})
49+
.where("project_id = {projectId: String}", {
50+
projectId: options.projectId,
51+
});
52+
53+
if (options.tasks) {
54+
queryBuilder.where("task_identifier IN {tasks: Array(String)}", { tasks: options.tasks });
55+
}
56+
57+
if (options.versions) {
58+
queryBuilder.where("task_version IN {versions: Array(String)}", {
59+
versions: options.versions,
60+
});
61+
}
62+
63+
if (options.statuses) {
64+
queryBuilder.where("status IN {statuses: Array(String)}", { statuses: options.statuses });
65+
}
66+
67+
if (options.tags) {
68+
queryBuilder.where("hasAny(tags, {tags: Array(String)})", { tags: options.tags });
69+
}
70+
71+
if (options.scheduleId) {
72+
queryBuilder.where("schedule_id = {scheduleId: String}", { scheduleId: options.scheduleId });
73+
}
74+
75+
if (options.period) {
76+
queryBuilder.where("period = {period: String}", { period: options.period });
77+
}
78+
79+
if (options.from) {
80+
queryBuilder.where("created_at >= {from: DateTime}", { from: options.from });
81+
}
82+
83+
if (options.to) {
84+
queryBuilder.where("created_at <= {to: DateTime}", { to: options.to });
85+
}
86+
87+
if (typeof options.isTest === "boolean") {
88+
queryBuilder.where("is_test = {isTest: Boolean}", { isTest: options.isTest });
89+
}
90+
91+
if (options.rootOnly) {
92+
queryBuilder.where("root_run_id = ''");
93+
}
94+
95+
if (options.batchId) {
96+
queryBuilder.where("batch_id = {batchId: String}", { batchId: options.batchId });
97+
}
98+
99+
if (options.runFriendlyIds) {
100+
queryBuilder.where("friendly_id IN {runFriendlyIds: Array(String)}", {
101+
runFriendlyIds: options.runFriendlyIds,
102+
});
103+
}
104+
105+
if (options.runIds) {
106+
queryBuilder.where("run_id IN {runIds: Array(String)}", { runIds: options.runIds });
107+
}
108+
109+
if (options.page.cursor) {
110+
if (options.page.direction === "forward") {
111+
queryBuilder
112+
.where("run_id > {runId: String}", { runId: options.page.cursor })
113+
.orderBy("run_id DESC")
114+
.limit(options.page.size + 1);
115+
} else {
116+
queryBuilder
117+
.where("run_id < {runId: String}", { runId: options.page.cursor })
118+
.orderBy("run_id ASC")
119+
.limit(options.page.size + 1);
120+
}
121+
}
122+
123+
const [queryError, result] = await queryBuilder.execute();
124+
125+
if (queryError) {
126+
throw queryError;
127+
}
128+
129+
const runIds = result.map((row) => row.run_id);
130+
131+
// If there are more runs than the page size, we need to fetch the next page
132+
const hasMore = runIds.length > options.page.size;
133+
134+
let nextCursor: string | null = null;
135+
let previousCursor: string | null = null;
136+
137+
//get cursors for next and previous pages
138+
switch (options.page.direction) {
139+
case "forward":
140+
previousCursor = options.page.cursor ? runIds.at(0) ?? null : null;
141+
if (hasMore) {
142+
nextCursor = runIds[options.page.size];
143+
}
144+
break;
145+
case "backward":
146+
runIds.reverse();
147+
if (hasMore) {
148+
previousCursor = runIds[1];
149+
nextCursor = runIds[options.page.size];
150+
} else {
151+
nextCursor = runIds[options.page.size - 1];
152+
}
153+
break;
154+
}
155+
156+
const runIdsToReturn = hasMore ? runIds.slice(0, -1) : runIds;
157+
158+
const runs = await this.options.prisma.taskRun.findMany({
159+
where: {
160+
id: {
161+
in: runIdsToReturn,
162+
},
163+
},
164+
orderBy: {
165+
id: "desc",
166+
},
167+
select: {
168+
id: true,
169+
friendlyId: true,
170+
taskIdentifier: true,
171+
taskVersion: true,
172+
runtimeEnvironmentId: true,
173+
status: true,
174+
createdAt: true,
175+
startedAt: true,
176+
lockedAt: true,
177+
delayUntil: true,
178+
updatedAt: true,
179+
completedAt: true,
180+
isTest: true,
181+
spanId: true,
182+
idempotencyKey: true,
183+
ttl: true,
184+
expiredAt: true,
185+
costInCents: true,
186+
baseCostInCents: true,
187+
usageDurationMs: true,
188+
runTags: true,
189+
depth: true,
190+
rootTaskRunId: true,
191+
batchId: true,
192+
metadata: true,
193+
metadataType: true,
194+
},
195+
});
196+
197+
return {
198+
runs,
199+
pagination: {
200+
nextCursor,
201+
previousCursor,
202+
},
203+
};
204+
}
205+
}

0 commit comments

Comments
 (0)