Skip to content

Commit b4b8ba1

Browse files
committed
better machine ID and don't send metrics in between runs in a warm process
1 parent 9abf598 commit b4b8ba1

File tree

10 files changed

+33
-10
lines changed

10 files changed

+33
-10
lines changed

packages/cli-v3/src/dev/taskRunProcessPool.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
MachinePresetResources,
33
ServerBackgroundWorker,
44
WorkerManifest,
5+
generateFriendlyId,
56
} from "@trigger.dev/core/v3";
67
import { TaskRunProcess } from "../executions/taskRunProcess.js";
78
import { logger } from "../utilities/logger.js";
@@ -106,6 +107,7 @@ export class TaskRunProcessPool {
106107
env: {
107108
...this.options.env,
108109
...env,
110+
TRIGGER_MACHINE_ID: generateFriendlyId("machine"),
109111
},
110112
serverWorker,
111113
machineResources,

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ async function doBootstrap() {
213213
forceFlushTimeoutMillis: 30_000,
214214
resource: config.telemetry?.resource,
215215
hostMetrics: true,
216+
// Drop per-CPU per-state system.cpu metrics in dev to reduce noise
217+
droppedMetrics: ["system.cpu.*"],
216218
});
217219

218220
const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION);
@@ -622,8 +624,11 @@ const zodIpc = new ZodIpcConnection({
622624
}
623625
await flushAll(timeoutInMs);
624626
},
625-
FLUSH: async ({ timeoutInMs }) => {
627+
FLUSH: async ({ timeoutInMs, disableContext }) => {
626628
await flushAll(timeoutInMs);
629+
if (disableContext) {
630+
taskContext.disable();
631+
}
627632
},
628633
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
629634
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,8 +610,11 @@ const zodIpc = new ZodIpcConnection({
610610
}
611611
await flushAll(timeoutInMs);
612612
},
613-
FLUSH: async ({ timeoutInMs }) => {
613+
FLUSH: async ({ timeoutInMs, disableContext }) => {
614614
await flushAll(timeoutInMs);
615+
if (disableContext) {
616+
taskContext.disable();
617+
}
615618
},
616619
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
617620
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);

packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { WorkerManifest } from "@trigger.dev/core/v3";
1+
import { WorkerManifest, generateFriendlyId } from "@trigger.dev/core/v3";
22
import { TaskRunProcess } from "../../executions/taskRunProcess.js";
33
import { RunnerEnv } from "./env.js";
44
import { RunLogger, SendDebugLogOptions } from "./logger.js";
@@ -22,6 +22,7 @@ export class TaskRunProcessProvider {
2222
private readonly logger: RunLogger;
2323
private readonly processKeepAliveEnabled: boolean;
2424
private readonly processKeepAliveMaxExecutionCount: number;
25+
private readonly machineId = generateFriendlyId("machine");
2526

2627
// Process keep-alive state
2728
private persistentProcess: TaskRunProcess | null = null;
@@ -269,6 +270,7 @@ export class TaskRunProcessProvider {
269270
return {
270271
...taskRunEnv,
271272
...this.env.gatherProcessEnv(),
273+
TRIGGER_MACHINE_ID: this.machineId,
272274
HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000),
273275
};
274276
}

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ export class TaskRunProcess {
126126
return;
127127
}
128128

129-
await tryCatch(this.#flush());
129+
await tryCatch(this.#flush({ disableContext: !kill }));
130130

131131
if (kill) {
132132
await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
@@ -240,10 +240,10 @@ export class TaskRunProcess {
240240
return this;
241241
}
242242

243-
async #flush(timeoutInMs: number = 5_000) {
243+
async #flush({ timeoutInMs = 5_000, disableContext = false } = {}) {
244244
logger.debug("flushing task run process", { pid: this.pid });
245245

246-
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
246+
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs, disableContext }, timeoutInMs + 1_000);
247247
}
248248

249249
async #cancel(timeoutInMs: number = 30_000) {

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export {
4949
NULL_SENTINEL,
5050
} from "./utils/flattenAttributes.js";
5151
export { omit } from "./utils/omit.js";
52+
export { generateFriendlyId, fromFriendlyId } from "./isomorphic/friendlyId.js";
5253
export {
5354
calculateNextRetryDelay,
5455
calculateResetAt,
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
import { randomUUID } from "crypto";
1+
import { generateFriendlyId } from "../isomorphic/friendlyId.js";
2+
import { getEnvVar } from "../utils/getEnv.js";
23

3-
export const machineId = randomUUID();
4+
export const machineId = getEnvVar("TRIGGER_MACHINE_ID") ?? generateFriendlyId("machine");

packages/core/src/v3/otel/tracingSDK.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
SimpleLogRecordProcessor,
2929
} from "@opentelemetry/sdk-logs";
3030
import {
31+
AggregationType,
3132
MeterProvider,
3233
PeriodicExportingMetricReader,
3334
type MetricReader,
@@ -82,6 +83,8 @@ export type TracingSDKConfig = {
8283
diagLogLevel?: TracingDiagnosticLogLevel;
8384
resource?: Resource;
8485
hostMetrics?: boolean;
86+
/** Metric instrument name patterns to drop (supports wildcards, e.g. "system.cpu.*") */
87+
droppedMetrics?: string[];
8588
};
8689

8790
const idGenerator = new RandomIdGenerator();
@@ -300,6 +303,10 @@ export class TracingSDK {
300303
const meterProvider = new MeterProvider({
301304
resource: commonResources,
302305
readers: metricReaders,
306+
views: (config.droppedMetrics ?? []).map((pattern) => ({
307+
instrumentName: pattern,
308+
aggregation: { type: AggregationType.DROP },
309+
})),
303310
});
304311

305312
this._meterProvider = meterProvider;

packages/core/src/v3/schemas/messages.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ export const WorkerToExecutorMessageCatalog = {
213213
FLUSH: {
214214
message: z.object({
215215
timeoutInMs: z.number(),
216+
disableContext: z.boolean().optional(),
216217
}),
217218
callback: z.void(),
218219
},

packages/core/src/v3/taskContext/otelProcessors.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Attributes, Context, trace, Tracer } from "@opentelemetry/api";
2-
import { ExportResult } from "@opentelemetry/core";
2+
import { ExportResult, ExportResultCode } from "@opentelemetry/core";
33
import { LogRecordProcessor, SdkLogRecord } from "@opentelemetry/sdk-logs";
44
import type {
55
AggregationOption,
@@ -130,7 +130,8 @@ export class TaskContextMetricExporter implements PushMetricExporter {
130130

131131
export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
132132
if (!taskContext.ctx) {
133-
this._innerExporter.export(metrics, resultCallback);
133+
// No active run — drop metrics (between-run noise)
134+
resultCallback({ code: ExportResultCode.SUCCESS });
134135
return;
135136
}
136137

0 commit comments

Comments
 (0)