Skip to content

Commit f3e2643

Browse files
committed
Update the usage task list to use clickhouse
1 parent be70f25 commit f3e2643

File tree

3 files changed

+112
-26
lines changed

3 files changed

+112
-26
lines changed

apps/webapp/app/presenters/v3/UsagePresenter.server.ts

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import { sqlDatabaseSchema } from "~/db.server";
1+
import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server";
22
import { env } from "~/env.server";
33
import { getUsage, getUsageSeries } from "~/services/platform.v3.server";
44
import { createTimeSeriesData } from "~/utils/graphs";
55
import { BasePresenter } from "./basePresenter.server";
66
import { DataPoint, linear } from "regression";
7+
import { clickhouseClient } from "~/services/clickhouseInstance.server";
78

89
type Options = {
910
organizationId: string;
@@ -103,27 +104,69 @@ export class UsagePresenter extends BasePresenter {
103104
});
104105

105106
//usage by task
106-
const tasks = this._replica.$queryRaw<TaskUsageItem[]>`
107+
const tasks = await getTaskUsageByOrganization(
108+
organizationId,
109+
startOfMonth,
110+
endOfMonth,
111+
this._replica
112+
);
113+
114+
return {
115+
usage,
116+
tasks,
117+
};
118+
}
119+
}
120+
121+
async function getTaskUsageByOrganization(
122+
organizationId: string,
123+
startOfMonth: Date,
124+
endOfMonth: Date,
125+
replica: PrismaClientOrTransaction
126+
) {
127+
if (clickhouseClient) {
128+
const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({
129+
startTime: startOfMonth.getTime(),
130+
endTime: endOfMonth.getTime(),
131+
organizationId,
132+
});
133+
134+
if (queryError) {
135+
throw queryError;
136+
}
137+
138+
return tasks
139+
.map((task) => ({
140+
taskIdentifier: task.task_identifier,
141+
runCount: Number(task.run_count),
142+
averageDuration: Number(task.average_duration),
143+
averageCost: Number(task.average_cost) + env.CENTS_PER_RUN / 100,
144+
totalDuration: Number(task.total_duration),
145+
totalCost: Number(task.total_cost) + Number(task.total_base_cost),
146+
}))
147+
.sort((a, b) => b.totalCost - a.totalCost);
148+
} else {
149+
return replica.$queryRaw<TaskUsageItem[]>`
107150
SELECT
108-
tr."taskIdentifier",
109-
COUNT(*) AS "runCount",
110-
AVG(tr."usageDurationMs") AS "averageDuration",
111-
SUM(tr."usageDurationMs") AS "totalDuration",
112-
AVG(tr."costInCents") / 100.0 AS "averageCost",
113-
SUM(tr."costInCents") / 100.0 AS "totalCost",
114-
SUM(tr."baseCostInCents") / 100.0 AS "totalBaseCost"
115-
FROM
116-
${sqlDatabaseSchema}."TaskRun" tr
117-
JOIN ${sqlDatabaseSchema}."Project" pr ON pr.id = tr."projectId"
118-
JOIN ${sqlDatabaseSchema}."Organization" org ON org.id = pr."organizationId"
119-
JOIN ${sqlDatabaseSchema}."RuntimeEnvironment" env ON env."id" = tr."runtimeEnvironmentId"
120-
WHERE
121-
env.type <> 'DEVELOPMENT'
122-
AND tr."createdAt" > ${startOfMonth}
123-
AND tr."createdAt" < ${endOfMonth}
124-
AND org.id = ${organizationId}
125-
GROUP BY
126-
tr."taskIdentifier";
151+
tr."taskIdentifier",
152+
COUNT(*) AS "runCount",
153+
AVG(tr."usageDurationMs") AS "averageDuration",
154+
SUM(tr."usageDurationMs") AS "totalDuration",
155+
AVG(tr."costInCents") / 100.0 AS "averageCost",
156+
SUM(tr."costInCents") / 100.0 AS "totalCost",
157+
SUM(tr."baseCostInCents") / 100.0 AS "totalBaseCost"
158+
FROM
159+
${sqlDatabaseSchema}."TaskRun" tr
160+
JOIN ${sqlDatabaseSchema}."Project" pr ON pr.id = tr."projectId"
161+
JOIN ${sqlDatabaseSchema}."Organization" org ON org.id = pr."organizationId"
162+
JOIN ${sqlDatabaseSchema}."RuntimeEnvironment" env ON env."id" = tr."runtimeEnvironmentId"
163+
WHERE
164+
env.type <> 'DEVELOPMENT'
165+
AND tr."createdAt" > ${startOfMonth}
166+
AND tr."createdAt" < ${endOfMonth}
167+
AND org.id = ${organizationId}
168+
GROUP BY
169+
tr."taskIdentifier";
127170
`.then((data) => {
128171
return data
129172
.map((item) => ({
@@ -136,10 +179,5 @@ export class UsagePresenter extends BasePresenter {
136179
}))
137180
.sort((a, b) => b.totalCost - a.totalCost);
138181
});
139-
140-
return {
141-
usage,
142-
tasks,
143-
};
144182
}
145183
}

internal-packages/clickhouse/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
getTaskActivityQueryBuilder,
1010
getCurrentRunningStats,
1111
getAverageDurations,
12+
getTaskUsageByOrganization,
1213
} from "./taskRuns.js";
1314
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
1415
import type { Agent as HttpAgent } from "http";
@@ -146,6 +147,7 @@ export class ClickHouse {
146147
getTaskActivity: getTaskActivityQueryBuilder(this.reader),
147148
getCurrentRunningStats: getCurrentRunningStats(this.reader),
148149
getAverageDurations: getAverageDurations(this.reader),
150+
getTaskUsageByOrganization: getTaskUsageByOrganization(this.reader),
149151
};
150152
}
151153
}

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,3 +220,49 @@ export function getAverageDurations(ch: ClickhouseReader, settings?: ClickHouseS
220220
settings,
221221
});
222222
}
223+
224+
export const TaskUsageByOrganizationQueryResult = z.object({
225+
task_identifier: z.string(),
226+
run_count: z.number(),
227+
average_duration: z.number(),
228+
total_duration: z.number(),
229+
average_cost: z.number(),
230+
total_cost: z.number(),
231+
total_base_cost: z.number(),
232+
});
233+
234+
export const TaskUsageByOrganizationQueryParams = z.object({
235+
startTime: z.number().int(),
236+
endTime: z.number().int(),
237+
organizationId: z.string(),
238+
});
239+
240+
export function getTaskUsageByOrganization(ch: ClickhouseReader, settings?: ClickHouseSettings) {
241+
return ch.query({
242+
name: "getTaskUsageByOrganization",
243+
query: `
244+
SELECT
245+
task_identifier,
246+
count() AS run_count,
247+
avg(usage_duration_ms) AS average_duration,
248+
sum(usage_duration_ms) AS total_duration,
249+
avg(cost_in_cents) / 100.0 AS average_cost,
250+
sum(cost_in_cents) / 100.0 AS total_cost,
251+
sum(base_cost_in_cents) / 100.0 AS total_base_cost
252+
FROM trigger_dev.task_runs_v2 FINAL
253+
WHERE
254+
environment_type != 'DEVELOPMENT'
255+
AND created_at >= fromUnixTimestamp64Milli({startTime: Int64})
256+
AND created_at < fromUnixTimestamp64Milli({endTime: Int64})
257+
AND organization_id = {organizationId: String}
258+
AND _is_deleted = 0
259+
GROUP BY
260+
task_identifier
261+
ORDER BY
262+
total_cost DESC
263+
`,
264+
schema: TaskUsageByOrganizationQueryResult,
265+
params: TaskUsageByOrganizationQueryParams,
266+
settings,
267+
});
268+
}

0 commit comments

Comments
 (0)