Skip to content

Commit 2964cf6

Browse files
committed
keep sending otel metrics in between runs, dev now acts more like prod, decouple flushing metrics with metric bucket intervals
1 parent bf178b2 commit 2964cf6

File tree

6 files changed

+217
-19
lines changed

6 files changed

+217
-19
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ const EnvironmentSchema = z
384384
DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
385385
DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
386386
DEV_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(),
387+
DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS: z.string().optional(),
387388

388389
PROD_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"),
389390
PROD_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
@@ -395,6 +396,7 @@ const EnvironmentSchema = z
395396
PROD_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
396397
PROD_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
397398
PROD_OTEL_METRICS_EXPORT_INTERVAL_MILLIS: z.string().optional(),
399+
PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS: z.string().optional(),
398400

399401
TRIGGER_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"),
400402
TRIGGER_OTEL_LOG_ATTRIBUTE_COUNT_LIMIT: z.string().default("1024"),

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,13 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment
976976
);
977977
}
978978

979+
if (env.DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS) {
980+
result.push({
981+
key: "TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS",
982+
value: env.DEV_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS,
983+
});
984+
}
985+
979986
if (env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1") {
980987
result = result.concat([
981988
{
@@ -1120,6 +1127,13 @@ async function resolveBuiltInProdVariables(
11201127
);
11211128
}
11221129

1130+
if (env.PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS) {
1131+
result.push({
1132+
key: "TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS",
1133+
value: env.PROD_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS,
1134+
});
1135+
}
1136+
11231137
if (env.PROD_OTEL_BATCH_PROCESSING_ENABLED === "1") {
11241138
result = result.concat([
11251139
{

packages/cli-v3/src/dev/taskRunProcessPool.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ export class TaskRunProcessPool {
2424
private readonly maxExecutionsPerProcess: number;
2525
private readonly executionCountsPerProcess: Map<number, number> = new Map();
2626
private readonly deprecatedVersions: Set<string> = new Set();
27+
private readonly idleTimers: Map<TaskRunProcess, NodeJS.Timeout> = new Map();
28+
private static readonly IDLE_TIMEOUT_MS = 30_000;
2729

2830
constructor(options: TaskRunProcessPoolOptions) {
2931
this.options = options;
@@ -39,6 +41,7 @@ export class TaskRunProcessPool {
3941
const versionProcesses = this.availableProcessesByVersion.get(version) || [];
4042

4143
const processesToKill = versionProcesses.filter((process) => !process.isExecuting());
44+
processesToKill.forEach((process) => this.clearIdleTimer(process));
4245
Promise.all(processesToKill.map((process) => this.killProcess(process))).then(() => {
4346
this.availableProcessesByVersion.delete(version);
4447
});
@@ -72,6 +75,7 @@ export class TaskRunProcessPool {
7275
version,
7376
availableProcesses.filter((p) => p !== reusableProcess)
7477
);
78+
this.clearIdleTimer(reusableProcess);
7579

7680
if (!this.busyProcessesByVersion.has(version)) {
7781
this.busyProcessesByVersion.set(version, new Set());
@@ -156,6 +160,7 @@ export class TaskRunProcessPool {
156160
this.availableProcessesByVersion.set(version, []);
157161
}
158162
this.availableProcessesByVersion.get(version)!.push(process);
163+
this.startIdleTimer(process, version);
159164
} catch (error) {
160165
logger.debug("[TaskRunProcessPool] Failed to cleanup process for reuse, killing it", {
161166
error,
@@ -215,7 +220,42 @@ export class TaskRunProcessPool {
215220
return process.isHealthy;
216221
}
217222

223+
private startIdleTimer(process: TaskRunProcess, version: string): void {
224+
this.clearIdleTimer(process);
225+
226+
const timer = setTimeout(() => {
227+
// Synchronously remove from available pool before async kill to prevent race with getProcess()
228+
const available = this.availableProcessesByVersion.get(version);
229+
if (available) {
230+
const index = available.indexOf(process);
231+
if (index !== -1) {
232+
available.splice(index, 1);
233+
}
234+
}
235+
this.idleTimers.delete(process);
236+
237+
logger.debug("[TaskRunProcessPool] Idle timeout reached, killing process", {
238+
pid: process.pid,
239+
version,
240+
});
241+
242+
this.killProcess(process);
243+
}, TaskRunProcessPool.IDLE_TIMEOUT_MS);
244+
245+
this.idleTimers.set(process, timer);
246+
}
247+
248+
private clearIdleTimer(process: TaskRunProcess): void {
249+
const timer = this.idleTimers.get(process);
250+
if (timer) {
251+
clearTimeout(timer);
252+
this.idleTimers.delete(process);
253+
}
254+
}
255+
218256
private async killProcess(process: TaskRunProcess): Promise<void> {
257+
this.clearIdleTimer(process);
258+
219259
if (!process.isHealthy) {
220260
logger.debug("[TaskRunProcessPool] Process is not healthy, skipping cleanup", {
221261
processId: process.pid,
@@ -247,6 +287,12 @@ export class TaskRunProcessPool {
247287
versions: Array.from(this.availableProcessesByVersion.keys()),
248288
});
249289

290+
// Clear all idle timers
291+
for (const timer of this.idleTimers.values()) {
292+
clearTimeout(timer);
293+
}
294+
this.idleTimers.clear();
295+
250296
// Kill all available processes across all versions
251297
const allAvailableProcesses = Array.from(this.availableProcessesByVersion.values()).flat();
252298
await Promise.all(allAvailableProcesses.map((process) => this.killProcess(process)));

packages/core/src/v3/otel/tracingSDK.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import {
5555
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
5656
import { taskContext } from "../task-context-api.js";
5757
import {
58+
BufferingMetricExporter,
5859
TaskContextLogProcessor,
5960
TaskContextMetricExporter,
6061
TaskContextSpanProcessor,
@@ -285,14 +286,22 @@ export class TracingSDK {
285286
url: metricsUrl,
286287
timeoutMillis: config.forceFlushTimeoutMillis,
287288
});
288-
const metricExporter = new TaskContextMetricExporter(rawMetricExporter);
289+
290+
const collectionIntervalMs = parseInt(
291+
getEnvVar("TRIGGER_OTEL_METRICS_COLLECTION_INTERVAL_MILLIS") ?? "10000"
292+
);
293+
const exportIntervalMs = parseInt(
294+
getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS") ?? "30000"
295+
);
296+
297+
// Chain: PeriodicReader(10s) → TaskContextMetricExporter → BufferingMetricExporter(30s) → OTLP
298+
const bufferingExporter = new BufferingMetricExporter(rawMetricExporter, exportIntervalMs);
299+
const metricExporter = new TaskContextMetricExporter(bufferingExporter);
289300

290301
const metricReaders: MetricReader[] = [
291302
new PeriodicExportingMetricReader({
292303
exporter: metricExporter,
293-
exportIntervalMillis: parseInt(
294-
getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_INTERVAL_MILLIS") ?? "60000"
295-
),
304+
exportIntervalMillis: collectionIntervalMs,
296305
exportTimeoutMillis: parseInt(
297306
getEnvVar("TRIGGER_OTEL_METRICS_EXPORT_TIMEOUT_MILLIS") ?? "30000"
298307
),

packages/core/src/v3/taskContext/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { Attributes } from "@opentelemetry/api";
22
import { ServerBackgroundWorker, TaskRunContext } from "../schemas/index.js";
33
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
4-
import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js";
4+
import { getGlobal, registerGlobal } from "../utils/globals.js";
55
import { TaskContext } from "./types.js";
66

77
const API_NAME = "task-context";
88

99
export class TaskContextAPI {
1010
private static _instance?: TaskContextAPI;
11+
private _runDisabled = false;
1112

1213
private constructor() {}
1314

@@ -23,6 +24,10 @@ export class TaskContextAPI {
2324
return this.#getTaskContext() !== undefined;
2425
}
2526

27+
get isRunDisabled(): boolean {
28+
return this._runDisabled;
29+
}
30+
2631
get ctx(): TaskRunContext | undefined {
2732
return this.#getTaskContext()?.ctx;
2833
}
@@ -98,11 +103,12 @@ export class TaskContextAPI {
98103
}
99104

100105
public disable() {
101-
unregisterGlobal(API_NAME);
106+
this._runDisabled = true;
102107
}
103108

104109
public setGlobalTaskContext(taskContext: TaskContext): boolean {
105-
return registerGlobal(API_NAME, taskContext);
110+
this._runDisabled = false;
111+
return registerGlobal(API_NAME, taskContext, true);
106112
}
107113

108114
#getTaskContext(): TaskContext | undefined {

packages/core/src/v3/taskContext/otelProcessors.ts

Lines changed: 133 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
MetricData,
99
PushMetricExporter,
1010
ResourceMetrics,
11+
ScopeMetrics,
1112
} from "@opentelemetry/sdk-metrics";
1213
import { Span, SpanProcessor } from "@opentelemetry/sdk-trace-base";
1314
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
@@ -130,29 +131,44 @@ export class TaskContextMetricExporter implements PushMetricExporter {
130131

131132
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
132133
if (!taskContext.ctx) {
133-
// No active run — drop metrics (between-run noise)
134+
// No context at all — drop metrics
134135
resultCallback({ code: ExportResultCode.SUCCESS });
135136
return;
136137
}
137138

138139
const ctx = taskContext.ctx;
139-
const contextAttrs: Attributes = {
140-
[SemanticInternalAttributes.RUN_ID]: ctx.run.id,
141-
[SemanticInternalAttributes.TASK_SLUG]: ctx.task.id,
142-
[SemanticInternalAttributes.ATTEMPT_NUMBER]: ctx.attempt.number,
143-
[SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id,
144-
[SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id,
145-
[SemanticInternalAttributes.PROJECT_ID]: ctx.project.id,
146-
[SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name,
147-
[SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type,
148-
};
140+
141+
let contextAttrs: Attributes;
142+
143+
if (taskContext.isRunDisabled) {
144+
// Between runs: keep environment/project/org/machine attrs, strip run-specific ones
145+
contextAttrs = {
146+
[SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id,
147+
[SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type,
148+
[SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id,
149+
[SemanticInternalAttributes.PROJECT_ID]: ctx.project.id,
150+
[SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name,
151+
};
152+
} else {
153+
// During a run: full context attrs
154+
contextAttrs = {
155+
[SemanticInternalAttributes.RUN_ID]: ctx.run.id,
156+
[SemanticInternalAttributes.TASK_SLUG]: ctx.task.id,
157+
[SemanticInternalAttributes.ATTEMPT_NUMBER]: ctx.attempt.number,
158+
[SemanticInternalAttributes.ENVIRONMENT_ID]: ctx.environment.id,
159+
[SemanticInternalAttributes.ORGANIZATION_ID]: ctx.organization.id,
160+
[SemanticInternalAttributes.PROJECT_ID]: ctx.project.id,
161+
[SemanticInternalAttributes.MACHINE_PRESET_NAME]: ctx.machine?.name,
162+
[SemanticInternalAttributes.ENVIRONMENT_TYPE]: ctx.environment.type,
163+
};
164+
}
149165

150166
if (taskContext.worker) {
151167
contextAttrs[SemanticInternalAttributes.WORKER_ID] = taskContext.worker.id;
152168
contextAttrs[SemanticInternalAttributes.WORKER_VERSION] = taskContext.worker.version;
153169
}
154170

155-
if (ctx.run.tags?.length) {
171+
if (!taskContext.isRunDisabled && ctx.run.tags?.length) {
156172
contextAttrs[SemanticInternalAttributes.RUN_TAGS] = ctx.run.tags;
157173
}
158174

@@ -184,3 +200,108 @@ export class TaskContextMetricExporter implements PushMetricExporter {
184200
return this._innerExporter.shutdown();
185201
}
186202
}
203+
204+
export class BufferingMetricExporter implements PushMetricExporter {
205+
selectAggregationTemporality?: (instrumentType: InstrumentType) => AggregationTemporality;
206+
selectAggregation?: (instrumentType: InstrumentType) => AggregationOption;
207+
208+
private _buffer: ResourceMetrics[] = [];
209+
private _lastFlushTime = Date.now();
210+
211+
constructor(
212+
private _innerExporter: PushMetricExporter,
213+
private _flushIntervalMs: number
214+
) {
215+
if (_innerExporter.selectAggregationTemporality) {
216+
this.selectAggregationTemporality =
217+
_innerExporter.selectAggregationTemporality.bind(_innerExporter);
218+
}
219+
if (_innerExporter.selectAggregation) {
220+
this.selectAggregation = _innerExporter.selectAggregation.bind(_innerExporter);
221+
}
222+
}
223+
224+
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
225+
this._buffer.push(metrics);
226+
227+
const now = Date.now();
228+
if (now - this._lastFlushTime >= this._flushIntervalMs) {
229+
this._lastFlushTime = now;
230+
const merged = this._mergeBuffer();
231+
this._innerExporter.export(merged, resultCallback);
232+
} else {
233+
resultCallback({ code: ExportResultCode.SUCCESS });
234+
}
235+
}
236+
237+
forceFlush(): Promise<void> {
238+
if (this._buffer.length > 0) {
239+
this._lastFlushTime = Date.now();
240+
const merged = this._mergeBuffer();
241+
return new Promise<void>((resolve, reject) => {
242+
this._innerExporter.export(merged, (result) => {
243+
if (result.code === ExportResultCode.SUCCESS) {
244+
resolve();
245+
} else {
246+
reject(result.error ?? new Error("Export failed"));
247+
}
248+
});
249+
}).then(() => this._innerExporter.forceFlush());
250+
}
251+
return this._innerExporter.forceFlush();
252+
}
253+
254+
shutdown(): Promise<void> {
255+
return this.forceFlush().then(() => this._innerExporter.shutdown());
256+
}
257+
258+
private _mergeBuffer(): ResourceMetrics {
259+
const batch = this._buffer;
260+
this._buffer = [];
261+
262+
if (batch.length === 1) {
263+
return batch[0]!;
264+
}
265+
266+
const base = batch[0]!;
267+
268+
// Merge all scopeMetrics by scope name, then metrics by descriptor name
269+
const scopeMap = new Map<string, { scope: ScopeMetrics["scope"]; metricsMap: Map<string, MetricData> }>();
270+
271+
for (const rm of batch) {
272+
for (const sm of rm.scopeMetrics) {
273+
const scopeKey = sm.scope.name;
274+
let scopeEntry = scopeMap.get(scopeKey);
275+
if (!scopeEntry) {
276+
scopeEntry = { scope: sm.scope, metricsMap: new Map() };
277+
scopeMap.set(scopeKey, scopeEntry);
278+
}
279+
280+
for (const metric of sm.metrics) {
281+
const metricKey = metric.descriptor.name;
282+
const existing = scopeEntry.metricsMap.get(metricKey);
283+
if (existing) {
284+
// Append data points from this collection to the existing metric
285+
scopeEntry.metricsMap.set(metricKey, {
286+
...existing,
287+
dataPoints: [...existing.dataPoints, ...metric.dataPoints],
288+
} as MetricData);
289+
} else {
290+
scopeEntry.metricsMap.set(metricKey, {
291+
...metric,
292+
dataPoints: [...metric.dataPoints],
293+
} as MetricData);
294+
}
295+
}
296+
}
297+
}
298+
299+
return {
300+
resource: base.resource,
301+
scopeMetrics: Array.from(scopeMap.values()).map(({ scope, metricsMap }) => ({
302+
scope,
303+
metrics: Array.from(metricsMap.values()),
304+
})),
305+
};
306+
}
307+
}

0 commit comments

Comments
 (0)