Skip to content

Commit 91df7cb

Browse files
committed
feat(metrics): add support for otel metrics (WIP)
1 parent b4e08bd commit 91df7cb

File tree

19 files changed

+608
-14
lines changed

19 files changed

+608
-14
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ const EnvironmentSchema = z
372372

373373
// Development OTEL environment variables
374374
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
375+
DEV_OTEL_METRICS_ENDPOINT: z.string().optional(),
375376
// If this is set to 1, then the below variables are used to configure the batch processor for spans and logs
376377
DEV_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"),
377378
DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
@@ -382,6 +383,7 @@ const EnvironmentSchema = z
382383
DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
383384
DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
384385
DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
386+
DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(),
385387

386388
PROD_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"),
387389
PROD_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
@@ -392,6 +394,7 @@ const EnvironmentSchema = z
392394
PROD_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
393395
PROD_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
394396
PROD_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
397+
PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(),
395398

396399
TRIGGER_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"),
397400
TRIGGER_OTEL_LOG_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"),
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import {
3+
ExportMetricsServiceRequest,
4+
ExportMetricsServiceResponse,
5+
} from "@trigger.dev/otlp-importer";
6+
import { otlpExporter } from "~/v3/otlpExporter.server";
7+
8+
export async function action({ request }: ActionFunctionArgs) {
9+
try {
10+
const contentType = request.headers.get("content-type")?.toLowerCase() ?? "";
11+
12+
if (contentType.startsWith("application/json")) {
13+
const body = await request.json();
14+
15+
const exportResponse = await otlpExporter.exportMetrics(
16+
body as ExportMetricsServiceRequest
17+
);
18+
19+
return json(exportResponse, { status: 200 });
20+
} else if (contentType.startsWith("application/x-protobuf")) {
21+
const buffer = await request.arrayBuffer();
22+
23+
const exportRequest = ExportMetricsServiceRequest.decode(new Uint8Array(buffer));
24+
25+
const exportResponse = await otlpExporter.exportMetrics(exportRequest);
26+
27+
return new Response(ExportMetricsServiceResponse.encode(exportResponse).finish(), {
28+
status: 200,
29+
});
30+
} else {
31+
return new Response(
32+
"Unsupported content type. Must be either application/x-protobuf or application/json",
33+
{ status: 400 }
34+
);
35+
}
36+
} catch (error) {
37+
console.error(error);
38+
39+
return new Response("Internal Server Error", { status: 500 });
40+
}
41+
}

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,26 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment
956956
},
957957
];
958958

959+
if (env.DEV_OTEL_METRICS_ENDPOINT) {
960+
result.push({
961+
key: "TRIGGER_OTEL_METRICS_ENDPOINT",
962+
value: env.DEV_OTEL_METRICS_ENDPOINT,
963+
});
964+
}
965+
966+
if (env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS) {
967+
result.push(
968+
{
969+
key: "TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS",
970+
value: env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS,
971+
},
972+
{
973+
key: "TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS",
974+
value: env.DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS,
975+
}
976+
);
977+
}
978+
959979
if (env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1") {
960980
result = result.concat([
961981
{
@@ -1087,6 +1107,19 @@ async function resolveBuiltInProdVariables(
10871107
]);
10881108
}
10891109

1110+
if (env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS) {
1111+
result.push(
1112+
{
1113+
key: "TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS",
1114+
value: env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS,
1115+
},
1116+
{
1117+
key: "TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS",
1118+
value: env.PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS,
1119+
}
1120+
);
1121+
}
1122+
10901123
if (env.PROD_OTEL_BATCH_PROCESSING_ENABLED === "1") {
10911124
result = result.concat([
10921125
{

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,23 @@ import {
44
AnyValue,
55
ExportLogsServiceRequest,
66
ExportLogsServiceResponse,
7+
ExportMetricsServiceRequest,
8+
ExportMetricsServiceResponse,
79
ExportTraceServiceRequest,
810
ExportTraceServiceResponse,
911
KeyValue,
1012
ResourceLogs,
13+
ResourceMetrics,
1114
ResourceSpans,
1215
SeverityNumber,
1316
Span,
1417
Span_Event,
1518
Span_SpanKind,
1619
Status_StatusCode,
1720
} from "@trigger.dev/otlp-importer";
21+
import type { MetricsV1Input } from "@internal/clickhouse";
1822
import { logger } from "~/services/logger.server";
23+
import { clickhouseClient } from "~/services/clickhouseInstance.server";
1924
import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server";
2025
import {
2126
clickhouseEventRepository,
@@ -66,6 +71,29 @@ class OTLPExporter {
6671
});
6772
}
6873

74+
async exportMetrics(
75+
request: ExportMetricsServiceRequest
76+
): Promise<ExportMetricsServiceResponse> {
77+
return await startSpan(this._tracer, "exportMetrics", async (span) => {
78+
const rows = this.#filterResourceMetrics(request.resourceMetrics).flatMap(
79+
(resourceMetrics) => {
80+
return convertMetricsToClickhouseRows(
81+
resourceMetrics,
82+
this._spanAttributeValueLengthLimit
83+
);
84+
}
85+
);
86+
87+
span.setAttribute("metric_row_count", rows.length);
88+
89+
if (rows.length > 0) {
90+
await clickhouseClient.metrics.insert(rows);
91+
}
92+
93+
return ExportMetricsServiceResponse.create();
94+
});
95+
}
96+
6997
async exportLogs(request: ExportLogsServiceRequest): Promise<ExportLogsServiceResponse> {
7098
return await startSpan(this._tracer, "exportLogs", async (span) => {
7199
this.#logExportLogsVerbose(request);
@@ -202,6 +230,18 @@ class OTLPExporter {
202230
return isBoolValue(attribute.value) ? attribute.value.boolValue : false;
203231
});
204232
}
233+
234+
#filterResourceMetrics(resourceMetrics: ResourceMetrics[]): ResourceMetrics[] {
235+
return resourceMetrics.filter((rm) => {
236+
const triggerAttribute = rm.resource?.attributes.find(
237+
(attribute) => attribute.key === SemanticInternalAttributes.TRIGGER
238+
);
239+
240+
if (!triggerAttribute) return false;
241+
242+
return isBoolValue(triggerAttribute.value) ? triggerAttribute.value.boolValue : false;
243+
});
244+
}
205245
}
206246

207247
function convertLogsToCreateableEvents(
@@ -410,6 +450,208 @@ function convertSpansToCreateableEvents(
410450
return { events, taskEventStore };
411451
}
412452

453+
function floorToTenSecondBucket(timeUnixNano: bigint | number): string {
454+
const epochMs = Number(BigInt(timeUnixNano) / BigInt(1_000_000));
455+
const flooredMs = Math.floor(epochMs / 10_000) * 10_000;
456+
const date = new Date(flooredMs);
457+
// Format as ClickHouse DateTime: YYYY-MM-DD HH:MM:SS
458+
return date.toISOString().replace("T", " ").replace(/\.\d{3}Z$/, "");
459+
}
460+
461+
function convertMetricsToClickhouseRows(
462+
resourceMetrics: ResourceMetrics,
463+
spanAttributeValueLengthLimit: number
464+
): MetricsV1Input[] {
465+
const resourceAttributes = resourceMetrics.resource?.attributes ?? [];
466+
const resourceProperties = extractEventProperties(resourceAttributes);
467+
468+
const organizationId = resourceProperties.organizationId ?? "unknown";
469+
const projectId = resourceProperties.projectId ?? "unknown";
470+
const environmentId = resourceProperties.environmentId ?? "unknown";
471+
const resourceCtx = {
472+
taskSlug: resourceProperties.taskSlug,
473+
runId: resourceProperties.runId,
474+
attemptNumber: resourceProperties.attemptNumber,
475+
machineId: extractStringAttribute(resourceAttributes, SemanticInternalAttributes.MACHINE_ID),
476+
workerId: extractStringAttribute(resourceAttributes, SemanticInternalAttributes.WORKER_ID),
477+
workerVersion: extractStringAttribute(
478+
resourceAttributes,
479+
SemanticInternalAttributes.WORKER_VERSION
480+
),
481+
};
482+
483+
const rows: MetricsV1Input[] = [];
484+
485+
for (const scopeMetrics of resourceMetrics.scopeMetrics) {
486+
for (const metric of scopeMetrics.metrics) {
487+
const metricName = metric.name;
488+
489+
// Process gauge data points
490+
if (metric.gauge) {
491+
for (const dp of metric.gauge.dataPoints) {
492+
const value =
493+
dp.asDouble !== 0 ? dp.asDouble : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0;
494+
const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx);
495+
496+
rows.push({
497+
organization_id: organizationId,
498+
project_id: projectId,
499+
environment_id: environmentId,
500+
metric_name: metricName,
501+
metric_type: "gauge",
502+
metric_subject: resolved.machineId ?? "unknown",
503+
bucket_start: floorToTenSecondBucket(dp.timeUnixNano),
504+
count: 0,
505+
sum_value: 0,
506+
max_value: value,
507+
min_value: value,
508+
last_value: value,
509+
attributes: resolved.attributes,
510+
});
511+
}
512+
}
513+
514+
// Process sum data points
515+
if (metric.sum) {
516+
for (const dp of metric.sum.dataPoints) {
517+
const value =
518+
dp.asDouble !== 0 ? dp.asDouble : dp.asInt !== BigInt(0) ? Number(dp.asInt) : 0;
519+
const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx);
520+
521+
rows.push({
522+
organization_id: organizationId,
523+
project_id: projectId,
524+
environment_id: environmentId,
525+
metric_name: metricName,
526+
metric_type: "sum",
527+
metric_subject: resolved.machineId ?? "unknown",
528+
bucket_start: floorToTenSecondBucket(dp.timeUnixNano),
529+
count: 1,
530+
sum_value: value,
531+
max_value: value,
532+
min_value: value,
533+
last_value: value,
534+
attributes: resolved.attributes,
535+
});
536+
}
537+
}
538+
539+
// Process histogram data points
540+
if (metric.histogram) {
541+
for (const dp of metric.histogram.dataPoints) {
542+
const resolved = resolveDataPointContext(dp.attributes ?? [], resourceCtx);
543+
const count = Number(dp.count);
544+
const sum = dp.sum ?? 0;
545+
const max = dp.max ?? 0;
546+
const min = dp.min ?? 0;
547+
548+
rows.push({
549+
organization_id: organizationId,
550+
project_id: projectId,
551+
environment_id: environmentId,
552+
metric_name: metricName,
553+
metric_type: "histogram",
554+
metric_subject: resolved.machineId ?? "unknown",
555+
bucket_start: floorToTenSecondBucket(dp.timeUnixNano),
556+
count,
557+
sum_value: sum,
558+
max_value: max,
559+
min_value: min,
560+
last_value: count > 0 ? sum / count : 0,
561+
attributes: resolved.attributes,
562+
});
563+
}
564+
}
565+
}
566+
}
567+
568+
return rows;
569+
}
570+
571+
// Prefixes injected by TaskContextMetricExporter — these are extracted into
572+
// the nested `trigger` key and should not appear as top-level user attributes.
573+
const INTERNAL_METRIC_ATTRIBUTE_PREFIXES = ["ctx.", "worker."];
574+
575+
interface ResourceContext {
576+
taskSlug: string | undefined;
577+
runId: string | undefined;
578+
attemptNumber: number | undefined;
579+
machineId: string | undefined;
580+
workerId: string | undefined;
581+
workerVersion: string | undefined;
582+
}
583+
584+
function resolveDataPointContext(
585+
dpAttributes: KeyValue[],
586+
resourceCtx: ResourceContext
587+
): {
588+
machineId: string | undefined;
589+
attributes: Record<string, unknown>;
590+
} {
591+
const runId =
592+
resourceCtx.runId ??
593+
extractStringAttribute(dpAttributes, SemanticInternalAttributes.RUN_ID);
594+
const taskSlug =
595+
resourceCtx.taskSlug ??
596+
extractStringAttribute(dpAttributes, SemanticInternalAttributes.TASK_SLUG);
597+
const attemptNumber =
598+
resourceCtx.attemptNumber ??
599+
extractNumberAttribute(dpAttributes, SemanticInternalAttributes.ATTEMPT_NUMBER);
600+
const machineId =
601+
resourceCtx.machineId ??
602+
extractStringAttribute(dpAttributes, SemanticInternalAttributes.MACHINE_ID);
603+
const workerId =
604+
resourceCtx.workerId ??
605+
extractStringAttribute(dpAttributes, SemanticInternalAttributes.WORKER_ID);
606+
const workerVersion =
607+
resourceCtx.workerVersion ??
608+
extractStringAttribute(dpAttributes, SemanticInternalAttributes.WORKER_VERSION);
609+
const machineName = extractStringAttribute(
610+
dpAttributes,
611+
SemanticInternalAttributes.MACHINE_PRESET_NAME
612+
);
613+
const environmentType = extractStringAttribute(
614+
dpAttributes,
615+
SemanticInternalAttributes.ENVIRONMENT_TYPE
616+
);
617+
618+
// Build the trigger context object with only defined values
619+
const trigger: Record<string, string | number> = {};
620+
if (runId) trigger.run_id = runId;
621+
if (taskSlug) trigger.task_slug = taskSlug;
622+
if (attemptNumber !== undefined) trigger.attempt_number = attemptNumber;
623+
if (machineId) trigger.machine_id = machineId;
624+
if (machineName) trigger.machine_name = machineName;
625+
if (workerId) trigger.worker_id = workerId;
626+
if (workerVersion) trigger.worker_version = workerVersion;
627+
if (environmentType) trigger.environment_type = environmentType;
628+
629+
// Build user attributes, filtering out internal ctx/worker keys
630+
const result: Record<string, unknown> = {};
631+
632+
if (Object.keys(trigger).length > 0) {
633+
result.trigger = trigger;
634+
}
635+
636+
for (const attr of dpAttributes) {
637+
if (INTERNAL_METRIC_ATTRIBUTE_PREFIXES.some((prefix) => attr.key.startsWith(prefix))) {
638+
continue;
639+
}
640+
641+
if (isStringValue(attr.value)) {
642+
result[attr.key] = attr.value.stringValue;
643+
} else if (isIntValue(attr.value)) {
644+
result[attr.key] = Number(attr.value.intValue);
645+
} else if (isDoubleValue(attr.value)) {
646+
result[attr.key] = attr.value.doubleValue;
647+
} else if (isBoolValue(attr.value)) {
648+
result[attr.key] = attr.value.boolValue;
649+
}
650+
}
651+
652+
return { machineId, attributes: result };
653+
}
654+
413655
function extractEventProperties(attributes: KeyValue[], prefix?: string) {
414656
return {
415657
metadata: convertSelectedKeyValueItemsToMap(attributes, [SemanticInternalAttributes.METADATA]),

0 commit comments

Comments
 (0)