Skip to content

Commit e601b71

Browse files
committed
report worker queue lengths via opentelemetry metrics
1 parent f018a8f commit e601b71

File tree

11 files changed

+253
-18
lines changed

11 files changed

+253
-18
lines changed

apps/webapp/app/env.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ const EnvironmentSchema = z.object({
328328
INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"),
329329

330330
INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(),
331+
INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(),
332+
INTERNAL_OTEL_METRIC_EXPORTER_AUTH_HEADERS: z.string().optional(),
333+
INTERNAL_OTEL_METRIC_EXPORTER_DISABLED: z.string().default("0"),
334+
INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL: z.coerce.number().int().default(30_000),
331335

332336
ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(),
333337
ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(),
@@ -621,6 +625,7 @@ const EnvironmentSchema = z.object({
621625
RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10),
622626

623627
RUN_ENGINE_WORKER_ENABLED: z.string().default("1"),
628+
RUN_ENGINE_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
624629

625630
/** How long should the presence ttl last */
626631
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { prisma } from "~/db.server";
44
import { env } from "~/env.server";
55
import { singleton } from "~/utils/singleton";
66
import { allMachines } from "./machinePresets.server";
7-
import { tracer } from "./tracer.server";
7+
import { tracer, meter } from "./tracer.server";
88

99
export const engine = singleton("RunEngine", createRunEngine);
1010

@@ -13,6 +13,7 @@ export type { RunEngine };
1313
function createRunEngine() {
1414
const engine = new RunEngine({
1515
prisma,
16+
logLevel: env.RUN_ENGINE_WORKER_LOG_LEVEL,
1617
worker: {
1718
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
1819
workers: env.RUN_ENGINE_WORKER_COUNT,
@@ -71,6 +72,7 @@ function createRunEngine() {
7172
},
7273
},
7374
tracer,
75+
meter,
7476
heartbeatTimeoutsMs: {
7577
PENDING_EXECUTING: env.RUN_ENGINE_TIMEOUT_PENDING_EXECUTING,
7678
PENDING_CANCEL: env.RUN_ENGINE_TIMEOUT_PENDING_CANCEL,

apps/webapp/app/v3/tracer.server.ts

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import {
1111
Tracer,
1212
diag,
1313
trace,
14+
metrics,
15+
Meter,
1416
} from "@opentelemetry/api";
1517
import { logs, SeverityNumber } from "@opentelemetry/api-logs";
1618
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
@@ -19,6 +21,12 @@ import { BatchLogRecordProcessor, LoggerProvider } from "@opentelemetry/sdk-logs
1921
import { type Instrumentation, registerInstrumentations } from "@opentelemetry/instrumentation";
2022
import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express";
2123
import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
24+
import {
25+
MeterProvider,
26+
ConsoleMetricExporter,
27+
PeriodicExportingMetricReader,
28+
} from "@opentelemetry/sdk-metrics";
29+
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http";
2230
import { Resource } from "@opentelemetry/resources";
2331
import {
2432
BatchSpanProcessor,
@@ -30,17 +38,24 @@ import {
3038
TraceIdRatioBasedSampler,
3139
} from "@opentelemetry/sdk-trace-base";
3240
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
33-
import { SEMRESATTRS_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
41+
import {
42+
SEMRESATTRS_SERVICE_INSTANCE_ID,
43+
SEMRESATTRS_SERVICE_NAME,
44+
} from "@opentelemetry/semantic-conventions";
3445
import { PrismaInstrumentation } from "@prisma/instrumentation";
3546
import { env } from "~/env.server";
3647
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
3748
import { singleton } from "~/utils/singleton";
3849
import { LoggerSpanExporter } from "./telemetry/loggerExporter.server";
3950
import { logger } from "~/services/logger.server";
4051
import { flattenAttributes } from "@trigger.dev/core/v3";
52+
import { randomUUID } from "node:crypto";
53+
import { prisma } from "~/db.server";
4154

4255
export const SEMINTATTRS_FORCE_RECORDING = "forceRecording";
4356

57+
const SERVICE_INSTANCE_ID = randomUUID();
58+
4459
class CustomWebappSampler implements Sampler {
4560
constructor(private readonly _baseSampler: Sampler) {}
4661

@@ -83,7 +98,12 @@ class CustomWebappSampler implements Sampler {
8398
}
8499
}
85100

86-
export const { tracer, logger: otelLogger, provider } = singleton("tracer", getTracer);
101+
export const {
102+
tracer,
103+
logger: otelLogger,
104+
provider,
105+
meter,
106+
} = singleton("opentelemetry", setupTelemetry);
87107

88108
export async function startActiveSpan<T>(
89109
name: string,
@@ -148,14 +168,15 @@ export async function emitWarnLog(message: string, params: Record<string, unknow
148168
});
149169
}
150170

151-
function getTracer() {
171+
function setupTelemetry() {
152172
if (env.INTERNAL_OTEL_TRACE_DISABLED === "1") {
153173
console.log(`🔦 Tracer disabled, returning a noop tracer`);
154174

155175
return {
156176
tracer: trace.getTracer("trigger.dev", "3.3.12"),
157177
logger: logs.getLogger("trigger.dev", "3.3.12"),
158178
provider: new NodeTracerProvider(),
179+
meter: setupMetrics(),
159180
};
160181
}
161182

@@ -167,6 +188,7 @@ function getTracer() {
167188
forceFlushTimeoutMillis: 15_000,
168189
resource: new Resource({
169190
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
191+
[SEMRESATTRS_SERVICE_INSTANCE_ID]: SERVICE_INSTANCE_ID,
170192
}),
171193
sampler: new ParentBasedSampler({
172194
root: new CustomWebappSampler(new TraceIdRatioBasedSampler(samplingRate)),
@@ -261,10 +283,90 @@ function getTracer() {
261283
return {
262284
tracer: provider.getTracer("trigger.dev", "3.3.12"),
263285
logger: logs.getLogger("trigger.dev", "3.3.12"),
286+
meter: setupMetrics(),
264287
provider,
265288
};
266289
}
267290

291+
function setupMetrics() {
292+
if (env.INTERNAL_OTEL_METRIC_EXPORTER_DISABLED === "1") {
293+
return metrics.getMeter("trigger.dev", "3.3.12");
294+
}
295+
296+
const exporter = env.INTERNAL_OTEL_METRIC_EXPORTER_URL
297+
? new OTLPMetricExporter({
298+
url: env.INTERNAL_OTEL_METRIC_EXPORTER_URL,
299+
timeoutMillis: 30_000,
300+
headers: parseInternalMetricsHeaders() ?? {},
301+
})
302+
: new ConsoleMetricExporter();
303+
304+
const meterProvider = new MeterProvider({
305+
resource: new Resource({
306+
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
307+
[SEMRESATTRS_SERVICE_INSTANCE_ID]: SERVICE_INSTANCE_ID,
308+
}),
309+
readers: [
310+
new PeriodicExportingMetricReader({
311+
exporter,
312+
exportIntervalMillis: env.INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL,
313+
exportTimeoutMillis: 30_000,
314+
}),
315+
],
316+
});
317+
318+
metrics.setGlobalMeterProvider(meterProvider);
319+
320+
const meter = meterProvider.getMeter("trigger.dev", "3.3.12");
321+
322+
configurePrismaMetrics(meter);
323+
324+
return meter;
325+
}
326+
327+
function configurePrismaMetrics(meter: Meter) {
328+
const totalGauge = meter.createObservableGauge("db.pool.connections.total", {
329+
description: "Open Prisma-pool connections",
330+
unit: "connections",
331+
});
332+
const busyGauge = meter.createObservableGauge("db.pool.connections.busy", {
333+
description: "Connections currently executing queries",
334+
unit: "connections",
335+
});
336+
const freeGauge = meter.createObservableGauge("db.pool.connections.free", {
337+
description: "Idle (free) connections in the pool",
338+
unit: "connections",
339+
});
340+
341+
// Single helper so we hit Prisma only once per scrape ---------------------
342+
async function readPoolCounters() {
343+
const { gauges } = await prisma.$metrics.json();
344+
345+
const busy = gauges.find((g) => g.key === "prisma_pool_connections_busy")?.value ?? 0;
346+
const free = gauges.find((g) => g.key === "prisma_pool_connections_idle")?.value ?? 0;
347+
const total =
348+
gauges.find((g) => g.key === "prisma_pool_connections_open")?.value ?? busy + free; // fallback compute
349+
350+
return { total, busy, free };
351+
}
352+
353+
// Register callbacks (one scrape == one DB call) --------------------------
354+
totalGauge.addCallback(async (res) => {
355+
const { total } = await readPoolCounters();
356+
res.observe(total);
357+
});
358+
359+
busyGauge.addCallback(async (res) => {
360+
const { busy } = await readPoolCounters();
361+
res.observe(busy);
362+
});
363+
364+
freeGauge.addCallback(async (res) => {
365+
const { free } = await readPoolCounters();
366+
res.observe(free);
367+
});
368+
}
369+
268370
const SemanticEnvResources = {
269371
ENV_ID: "$trigger.env.id",
270372
ENV_TYPE: "$trigger.env.type",
@@ -300,3 +402,13 @@ function parseInternalTraceHeaders(): Record<string, string> | undefined {
300402
return;
301403
}
302404
}
405+
406+
function parseInternalMetricsHeaders(): Record<string, string> | undefined {
407+
try {
408+
return env.INTERNAL_OTEL_METRIC_EXPORTER_AUTH_HEADERS
409+
? (JSON.parse(env.INTERNAL_OTEL_METRIC_EXPORTER_AUTH_HEADERS) as Record<string, string>)
410+
: undefined;
411+
} catch {
412+
return;
413+
}
414+
}

apps/webapp/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@
6161
"@opentelemetry/core": "1.25.1",
6262
"@opentelemetry/exporter-logs-otlp-http": "0.52.1",
6363
"@opentelemetry/exporter-trace-otlp-http": "0.52.1",
64+
"@opentelemetry/exporter-metrics-otlp-http": "0.52.1",
6465
"@opentelemetry/instrumentation": "0.52.1",
6566
"@opentelemetry/instrumentation-express": "^0.36.1",
6667
"@opentelemetry/instrumentation-http": "0.52.1",
6768
"@opentelemetry/resources": "1.25.1",
6869
"@opentelemetry/sdk-logs": "0.52.1",
6970
"@opentelemetry/sdk-node": "0.52.1",
71+
"@opentelemetry/sdk-metrics": "1.25.1",
7072
"@opentelemetry/sdk-trace-base": "1.25.1",
7173
"@opentelemetry/sdk-trace-node": "1.25.1",
7274
"@opentelemetry/semantic-conventions": "1.25.1",

internal-packages/run-engine/src/engine/index.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createRedisClient, Redis } from "@internal/redis";
2-
import { startSpan, trace, Tracer } from "@internal/tracing";
2+
import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
44
import {
55
CheckpointInput,
@@ -55,8 +55,9 @@ export class RunEngine {
5555
private runLockRedis: Redis;
5656
private runLock: RunLocker;
5757
private worker: EngineWorker;
58-
private logger = new Logger("RunEngine", "debug");
58+
private logger: Logger;
5959
private tracer: Tracer;
60+
private meter: Meter;
6061
private heartbeatTimeouts: HeartbeatTimeouts;
6162

6263
prisma: PrismaClient;
@@ -76,6 +77,7 @@ export class RunEngine {
7677
raceSimulationSystem: RaceSimulationSystem = new RaceSimulationSystem();
7778

7879
constructor(private readonly options: RunEngineOptions) {
80+
this.logger = options.logger ?? new Logger("RunEngine", this.options.logLevel ?? "info");
7981
this.prisma = options.prisma;
8082
this.runLockRedis = createRedisClient(
8183
{
@@ -109,7 +111,7 @@ export class RunEngine {
109111
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
110112
}),
111113
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
112-
logger: new Logger("RunQueue", "debug"),
114+
logger: new Logger("RunQueue", this.options.logLevel ?? "info"),
113115
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
114116
retryOptions: options.queue?.retryOptions,
115117
workerOptions: {
@@ -121,6 +123,7 @@ export class RunEngine {
121123
},
122124
masterQueueConsumersDisabled: options.queue?.masterQueueConsumersDisabled,
123125
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
126+
meter: options.meter,
124127
});
125128

126129
this.worker = new Worker({
@@ -184,6 +187,7 @@ export class RunEngine {
184187
}
185188

186189
this.tracer = options.tracer;
190+
this.meter = options.meter ?? getMeter("run-engine");
187191

188192
const defaultHeartbeatTimeouts: HeartbeatTimeouts = {
189193
PENDING_EXECUTING: 60_000,
@@ -203,6 +207,7 @@ export class RunEngine {
203207
eventBus: this.eventBus,
204208
logger: this.logger,
205209
tracer: this.tracer,
210+
meter: this.meter,
206211
runLock: this.runLock,
207212
runQueue: this.runQueue,
208213
raceSimulationSystem: this.raceSimulationSystem,
@@ -566,6 +571,9 @@ export class RunEngine {
566571
runnerId?: string;
567572
tx?: PrismaClientOrTransaction;
568573
}): Promise<DequeuedMessage[]> {
574+
// We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues
575+
this.runQueue.registerObservableWorkerQueue(workerQueue);
576+
569577
const dequeuedMessage = await this.dequeueSystem.dequeueFromWorkerQueue({
570578
consumerId,
571579
workerQueue,

internal-packages/run-engine/src/engine/systems/systems.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Tracer } from "@internal/tracing";
1+
import { Meter, Tracer } from "@internal/tracing";
22
import { Logger } from "@trigger.dev/core/logger";
33
import { PrismaClient } from "@trigger.dev/database";
44
import { RunQueue } from "../../run-queue/index.js";
@@ -13,6 +13,7 @@ export type SystemResources = {
1313
eventBus: EventBus;
1414
logger: Logger;
1515
tracer: Tracer;
16+
meter: Meter;
1617
runLock: RunLocker;
1718
runQueue: RunQueue;
1819
raceSimulationSystem: RaceSimulationSystem;

internal-packages/run-engine/src/engine/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { type RedisOptions } from "@internal/redis";
22
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
3-
import { Tracer } from "@internal/tracing";
3+
import { Meter, Tracer } from "@internal/tracing";
44
import {
55
MachinePreset,
66
MachinePresetName,
@@ -12,6 +12,7 @@ import { PrismaClient } from "@trigger.dev/database";
1212
import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelectionStrategy.js";
1313
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
1414
import { workerCatalog } from "./workerCatalog.js";
15+
import { Logger, LogLevel } from "@trigger.dev/core/logger";
1516

1617
export type RunEngineOptions = {
1718
prisma: PrismaClient;
@@ -47,6 +48,9 @@ export type RunEngineOptions = {
4748
heartbeatTimeoutsMs?: Partial<HeartbeatTimeouts>;
4849
queueRunsWaitingForWorkerBatchSize?: number;
4950
tracer: Tracer;
51+
meter?: Meter;
52+
logger?: Logger;
53+
logLevel?: LogLevel;
5054
releaseConcurrency?: {
5155
disabled?: boolean;
5256
maxTokensRatio?: number;

0 commit comments

Comments
 (0)