Skip to content

Commit d489709

Browse files
committed
Use clickhouse in task list aggregation queries instead of pg (keep pg for self-hosters)
1 parent 211d116 commit d489709

File tree

5 files changed

+437
-175
lines changed

5 files changed

+437
-175
lines changed

apps/webapp/app/env.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,14 @@ const EnvironmentSchema = z.object({
741741
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
742742
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
743743

744+
// Clickhouse
745+
CLICKHOUSE_URL: z.string().optional(),
746+
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
747+
CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
748+
CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
749+
CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
750+
CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
751+
744752
// Bootstrap
745753
TRIGGER_BOOTSTRAP_ENABLED: z.string().default("0"),
746754
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: z.string().optional(),
Lines changed: 45 additions & 173 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
2+
import { PrismaClientOrTransaction, type TaskTriggerSource } from "@trigger.dev/database";
3+
import { $replica, sqlDatabaseSchema } from "~/db.server";
4+
import { clickhouseClient } from "~/services/clickhouseInstance.server";
25
import {
3-
Prisma,
4-
type TaskRunStatus as DBTaskRunStatus,
5-
type TaskRunStatus as TaskRunStatusType,
6-
type TaskTriggerSource,
7-
} from "@trigger.dev/database";
8-
import { QUEUED_STATUSES } from "~/components/runs/v3/TaskRunStatus";
9-
import { TaskRunStatus } from "~/database-types";
10-
import { sqlDatabaseSchema } from "~/db.server";
11-
import { logger } from "~/services/logger.server";
12-
import { BasePresenter } from "./basePresenter.server";
6+
ClickHouseEnvironmentMetricsRepository,
7+
DailyTaskActivity,
8+
EnvironmentMetricsRepository,
9+
PostgrestEnvironmentMetricsRepository,
10+
} from "~/services/environmentMetricsRepository.server";
11+
import { singleton } from "~/utils/singleton";
1312

1413
export type TaskListItem = {
1514
slug: string;
@@ -18,11 +17,14 @@ export type TaskListItem = {
1817
triggerSource: TaskTriggerSource;
1918
};
2019

21-
type Return = Awaited<ReturnType<TaskListPresenter["call"]>>;
20+
export type TaskActivity = DailyTaskActivity[string];
2221

23-
export type TaskActivity = Awaited<Return["activity"]>[string];
22+
export class TaskListPresenter {
23+
constructor(
24+
private readonly environmentMetricsRepository: EnvironmentMetricsRepository,
25+
private readonly _replica: PrismaClientOrTransaction
26+
) {}
2427

25-
export class TaskListPresenter extends BasePresenter {
2628
public async call({ environmentId, projectId }: { environmentId: string; projectId: string }) {
2729
const tasks = await this._replica.$queryRaw<
2830
{
@@ -56,187 +58,57 @@ export class TaskListPresenter extends BasePresenter {
5658
//then get the activity for each task
5759
const activity = this.#getActivity(
5860
tasks.map((t) => t.slug),
59-
projectId,
6061
environmentId
6162
);
6263

6364
const runningStats = this.#getRunningStats(
6465
tasks.map((t) => t.slug),
65-
projectId,
6666
environmentId
6767
);
6868

6969
const durations = this.#getAverageDurations(
7070
tasks.map((t) => t.slug),
71-
projectId,
7271
environmentId
7372
);
7473

7574
return { tasks, activity, runningStats, durations };
7675
}
7776

78-
async #getActivity(tasks: string[], projectId: string, environmentId: string) {
79-
if (tasks.length === 0) {
80-
return {};
81-
}
82-
83-
const activity = await this._replica.$queryRaw<
84-
{
85-
taskIdentifier: string;
86-
status: TaskRunStatusType;
87-
day: Date;
88-
count: BigInt;
89-
}[]
90-
>`
91-
SELECT
92-
tr."taskIdentifier",
93-
tr."status",
94-
DATE(tr."createdAt") as day,
95-
COUNT(*)
96-
FROM
97-
${sqlDatabaseSchema}."TaskRun" as tr
98-
WHERE
99-
tr."taskIdentifier" IN (${Prisma.join(tasks)})
100-
AND tr."projectId" = ${projectId}
101-
AND tr."runtimeEnvironmentId" = ${environmentId}
102-
AND tr."createdAt" >= (current_date - interval '6 days')
103-
GROUP BY
104-
tr."taskIdentifier",
105-
tr."status",
106-
day
107-
ORDER BY
108-
tr."taskIdentifier" ASC,
109-
day ASC,
110-
tr."status" ASC;`;
111-
112-
//today with no time
113-
const today = new Date();
114-
today.setUTCHours(0, 0, 0, 0);
115-
116-
return activity.reduce((acc, a) => {
117-
let existingTask = acc[a.taskIdentifier];
118-
119-
if (!existingTask) {
120-
existingTask = [];
121-
//populate the array with the past 7 days
122-
for (let i = 6; i >= 0; i--) {
123-
const day = new Date(today);
124-
day.setUTCDate(today.getDate() - i);
125-
day.setUTCHours(0, 0, 0, 0);
126-
127-
existingTask.push({
128-
day: day.toISOString(),
129-
[TaskRunStatus.COMPLETED_SUCCESSFULLY]: 0,
130-
} as { day: string } & Record<TaskRunStatusType, number>);
131-
}
132-
133-
acc[a.taskIdentifier] = existingTask;
134-
}
135-
136-
const dayString = a.day.toISOString();
137-
const day = existingTask.find((d) => d.day === dayString);
138-
139-
if (!day) {
140-
logger.warn(`Day not found for TaskRun`, {
141-
day: dayString,
142-
taskIdentifier: a.taskIdentifier,
143-
existingTask,
144-
});
145-
return acc;
146-
}
147-
148-
day[a.status] = Number(a.count);
149-
150-
return acc;
151-
}, {} as Record<string, ({ day: string } & Record<TaskRunStatusType, number>)[]>);
77+
async #getActivity(tasks: string[], environmentId: string) {
78+
return this.environmentMetricsRepository.getDailyTaskActivity({
79+
environmentId,
80+
days: 6,
81+
tasks,
82+
});
15283
}
15384

154-
async #getRunningStats(tasks: string[], projectId: string, environmentId: string) {
155-
if (tasks.length === 0) {
156-
return {};
157-
}
158-
159-
const stats = await this._replica.$queryRaw<
160-
{
161-
taskIdentifier: string;
162-
status: DBTaskRunStatus;
163-
count: BigInt;
164-
}[]
165-
>`
166-
SELECT
167-
tr."taskIdentifier",
168-
tr.status,
169-
COUNT(*)
170-
FROM
171-
${sqlDatabaseSchema}."TaskRun" as tr
172-
WHERE
173-
tr."taskIdentifier" IN (${Prisma.join(tasks)})
174-
AND tr."projectId" = ${projectId}
175-
AND tr."runtimeEnvironmentId" = ${environmentId}
176-
AND tr."status" = ANY(ARRAY[${Prisma.join([
177-
...QUEUED_STATUSES,
178-
"EXECUTING",
179-
])}]::\"TaskRunStatus\"[])
180-
GROUP BY
181-
tr."taskIdentifier",
182-
tr.status
183-
ORDER BY
184-
tr."taskIdentifier" ASC`;
185-
186-
//create an object combining the queued and concurrency counts
187-
const result: Record<string, { queued: number; running: number }> = {};
188-
for (const task of tasks) {
189-
const queued = stats.filter(
190-
(q) => q.taskIdentifier === task && QUEUED_STATUSES.includes(q.status)
191-
);
192-
const queuedCount =
193-
queued.length === 0
194-
? 0
195-
: queued.reduce((acc, q) => {
196-
return acc + Number(q.count);
197-
}, 0);
198-
199-
const running = stats.filter((r) => r.taskIdentifier === task && r.status === "EXECUTING");
200-
const runningCount =
201-
running.length === 0
202-
? 0
203-
: running.reduce((acc, r) => {
204-
return acc + Number(r.count);
205-
}, 0);
85+
async #getRunningStats(tasks: string[], environmentId: string) {
86+
return this.environmentMetricsRepository.getCurrentRunningStats({
87+
environmentId,
88+
days: 6,
89+
tasks,
90+
});
91+
}
20692

207-
result[task] = {
208-
queued: queuedCount,
209-
running: runningCount,
210-
};
211-
}
212-
return result;
93+
async #getAverageDurations(tasks: string[], environmentId: string) {
94+
return this.environmentMetricsRepository.getAverageDurations({
95+
environmentId,
96+
days: 6,
97+
tasks,
98+
});
21399
}
100+
}
214101

215-
async #getAverageDurations(tasks: string[], projectId: string, environmentId: string) {
216-
if (tasks.length === 0) {
217-
return {};
218-
}
102+
export const taskListPresenter = singleton("taskListPresenter", setupTaskListPresenter);
219103

220-
const durations = await this._replica.$queryRaw<
221-
{
222-
taskIdentifier: string;
223-
duration: Number;
224-
}[]
225-
>`
226-
SELECT
227-
tr."taskIdentifier",
228-
AVG(EXTRACT(EPOCH FROM (tr."updatedAt" - COALESCE(tr."startedAt", tr."lockedAt")))) as duration
229-
FROM
230-
${sqlDatabaseSchema}."TaskRun" as tr
231-
WHERE
232-
tr."taskIdentifier" IN (${Prisma.join(tasks)})
233-
AND tr."projectId" = ${projectId}
234-
AND tr."runtimeEnvironmentId" = ${environmentId}
235-
AND tr."createdAt" >= (current_date - interval '6 days')
236-
AND tr."status" IN ('COMPLETED_SUCCESSFULLY', 'COMPLETED_WITH_ERRORS')
237-
GROUP BY
238-
tr."taskIdentifier";`;
104+
function setupTaskListPresenter() {
105+
const environmentMetricsRepository = clickhouseClient
106+
? new ClickHouseEnvironmentMetricsRepository({
107+
clickhouse: clickhouseClient,
108+
})
109+
: new PostgrestEnvironmentMetricsRepository({
110+
prisma: $replica,
111+
});
239112

240-
return Object.fromEntries(durations.map((s) => [s.taskIdentifier, Number(s.duration)]));
241-
}
113+
return new TaskListPresenter(environmentMetricsRepository, $replica);
242114
}

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam._index/route.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
7575
import {
7676
type TaskActivity,
7777
type TaskListItem,
78+
taskListPresenter,
7879
TaskListPresenter,
7980
} from "~/presenters/v3/TaskListPresenter.server";
8081
import {
@@ -123,8 +124,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
123124
}
124125

125126
try {
126-
const presenter = new TaskListPresenter();
127-
const { tasks, activity, runningStats, durations } = await presenter.call({
127+
const { tasks, activity, runningStats, durations } = await taskListPresenter.call({
128128
environmentId: environment.id,
129129
projectId: project.id,
130130
});
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { ClickHouse } from "@internal/clickhouse";
2+
import { env } from "~/env.server";
3+
import { singleton } from "~/utils/singleton";
4+
5+
export const clickhouseClient = singleton("clickhouseClient", initializeClickhouseClient);
6+
7+
function initializeClickhouseClient() {
8+
if (!env.CLICKHOUSE_URL) {
9+
console.log("🗃️ Clickhouse service not enabled");
10+
return;
11+
}
12+
13+
console.log("🗃️ Clickhouse service enabled");
14+
15+
const clickhouse = new ClickHouse({
16+
url: env.CLICKHOUSE_URL,
17+
name: "clickhouse-instance",
18+
keepAlive: {
19+
enabled: env.CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
20+
idleSocketTtl: env.CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
21+
},
22+
logLevel: env.CLICKHOUSE_LOG_LEVEL,
23+
compression: {
24+
request: true,
25+
},
26+
maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS,
27+
});
28+
29+
return clickhouse;
30+
}

0 commit comments

Comments
 (0)