Skip to content

Commit 21a3e31

Browse files
committed
Initial dev process keep alive. moved resource attributes to span attributes because resource attributes are global and immutable
1 parent 124435e commit 21a3e31

File tree

9 files changed

+442
-104
lines changed

9 files changed

+442
-104
lines changed

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 185 additions & 76 deletions
Large diffs are not rendered by default.

packages/cli-v3/src/dev/devSupervisor.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
import pLimit from "p-limit";
2626
import { resolveLocalEnvVars } from "../utilities/localEnvVars.js";
2727
import type { Metafile } from "esbuild";
28+
import { TaskRunProcessPool } from "./taskRunProcessPool.js";
2829

2930
export type WorkerRuntimeOptions = {
3031
name: string | undefined;
@@ -67,6 +68,7 @@ class DevSupervisor implements WorkerRuntime {
6768
private socketConnections = new Set<string>();
6869

6970
private runLimiter?: ReturnType<typeof pLimit>;
71+
private taskRunProcessPool?: TaskRunProcessPool;
7072

7173
constructor(public readonly options: WorkerRuntimeOptions) {}
7274

@@ -95,6 +97,31 @@ class DevSupervisor implements WorkerRuntime {
9597

9698
this.runLimiter = pLimit(maxConcurrentRuns);
9799

100+
// Initialize the task run process pool
101+
const env = await this.#getEnvVars();
102+
103+
const enableProcessReuse =
104+
typeof this.options.config.experimental_processKeepAlive === "boolean"
105+
? this.options.config.experimental_processKeepAlive
106+
: false;
107+
108+
if (enableProcessReuse) {
109+
logger.debug("[DevSupervisor] Enabling process reuse", {
110+
enableProcessReuse,
111+
});
112+
}
113+
114+
this.taskRunProcessPool = new TaskRunProcessPool({
115+
env,
116+
cwd: this.options.config.workingDir,
117+
enableProcessReuse:
118+
typeof this.options.config.experimental_processKeepAlive === "boolean"
119+
? this.options.config.experimental_processKeepAlive
120+
: false,
121+
maxPoolSize: 3,
122+
maxExecutionsPerProcess: 50,
123+
});
124+
98125
this.socket = this.#createSocket();
99126

100127
//start an SSE connection for presence
@@ -111,6 +138,11 @@ class DevSupervisor implements WorkerRuntime {
111138
} catch (error) {
112139
logger.debug("[DevSupervisor] shutdown, socket failed to close", { error });
113140
}
141+
142+
// Shutdown the task run process pool
143+
if (this.taskRunProcessPool) {
144+
await this.taskRunProcessPool.shutdown();
145+
}
114146
}
115147

116148
async initializeWorker(
@@ -293,12 +325,21 @@ class DevSupervisor implements WorkerRuntime {
293325
continue;
294326
}
295327

328+
if (!this.taskRunProcessPool) {
329+
logger.debug(`[DevSupervisor] dequeueRuns. No task run process pool`, {
330+
run: message.run.friendlyId,
331+
worker,
332+
});
333+
continue;
334+
}
335+
296336
//new run
297337
runController = new DevRunController({
298338
runFriendlyId: message.run.friendlyId,
299339
worker: worker,
300340
httpClient: this.options.client,
301341
logLevel: this.options.args.logLevel,
342+
taskRunProcessPool: this.taskRunProcessPool,
302343
onFinished: () => {
303344
logger.debug("[DevSupervisor] Run finished", { runId: message.run.friendlyId });
304345

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import {
2+
MachinePresetResources,
3+
ServerBackgroundWorker,
4+
WorkerManifest,
5+
} from "@trigger.dev/core/v3";
6+
import { TaskRunProcess } from "../executions/taskRunProcess.js";
7+
import { logger } from "../utilities/logger.js";
8+
9+
export type TaskRunProcessPoolOptions = {
10+
env: Record<string, string>;
11+
cwd: string;
12+
enableProcessReuse: boolean;
13+
maxPoolSize?: number;
14+
maxExecutionsPerProcess?: number;
15+
};
16+
17+
export class TaskRunProcessPool {
18+
private availableProcesses: TaskRunProcess[] = [];
19+
private busyProcesses: Set<TaskRunProcess> = new Set();
20+
private readonly options: TaskRunProcessPoolOptions;
21+
private readonly maxPoolSize: number;
22+
private readonly maxExecutionsPerProcess: number;
23+
24+
constructor(options: TaskRunProcessPoolOptions) {
25+
this.options = options;
26+
this.maxPoolSize = options.maxPoolSize ?? 3;
27+
this.maxExecutionsPerProcess = options.maxExecutionsPerProcess ?? 50;
28+
}
29+
30+
async getProcess(
31+
workerManifest: WorkerManifest,
32+
serverWorker: ServerBackgroundWorker,
33+
machineResources: MachinePresetResources,
34+
env?: Record<string, string>
35+
): Promise<TaskRunProcess> {
36+
// Try to reuse an existing process if enabled
37+
if (this.options.enableProcessReuse) {
38+
const reusableProcess = this.findReusableProcess();
39+
if (reusableProcess) {
40+
logger.debug("[TaskRunProcessPool] Reusing existing process", {
41+
availableCount: this.availableProcesses.length,
42+
busyCount: this.busyProcesses.size,
43+
});
44+
45+
this.availableProcesses = this.availableProcesses.filter((p) => p !== reusableProcess);
46+
this.busyProcesses.add(reusableProcess);
47+
return reusableProcess;
48+
} else {
49+
logger.debug("[TaskRunProcessPool] No reusable process found", {
50+
availableCount: this.availableProcesses.length,
51+
busyCount: this.busyProcesses.size,
52+
});
53+
}
54+
}
55+
56+
// Create new process
57+
logger.debug("[TaskRunProcessPool] Creating new process", {
58+
availableCount: this.availableProcesses.length,
59+
busyCount: this.busyProcesses.size,
60+
});
61+
62+
const newProcess = new TaskRunProcess({
63+
workerManifest,
64+
env: {
65+
...this.options.env,
66+
...env,
67+
},
68+
serverWorker,
69+
machineResources,
70+
cwd: this.options.cwd,
71+
}).initialize();
72+
73+
this.busyProcesses.add(newProcess);
74+
return newProcess;
75+
}
76+
77+
async returnProcess(process: TaskRunProcess): Promise<void> {
78+
this.busyProcesses.delete(process);
79+
80+
if (this.shouldReuseProcess(process)) {
81+
logger.debug("[TaskRunProcessPool] Returning process to pool", {
82+
availableCount: this.availableProcesses.length,
83+
busyCount: this.busyProcesses.size,
84+
});
85+
86+
// Clean up but don't kill the process
87+
try {
88+
await process.cleanup(false);
89+
this.availableProcesses.push(process);
90+
} catch (error) {
91+
logger.debug("[TaskRunProcessPool] Failed to cleanup process for reuse, killing it", {
92+
error,
93+
});
94+
await this.killProcess(process);
95+
}
96+
} else {
97+
logger.debug("[TaskRunProcessPool] Killing process", {
98+
availableCount: this.availableProcesses.length,
99+
busyCount: this.busyProcesses.size,
100+
});
101+
await this.killProcess(process);
102+
}
103+
}
104+
105+
private findReusableProcess(): TaskRunProcess | undefined {
106+
return this.availableProcesses.find((process) => this.isProcessHealthy(process));
107+
}
108+
109+
private shouldReuseProcess(process: TaskRunProcess): boolean {
110+
const isHealthy = this.isProcessHealthy(process);
111+
const isBeingKilled = process.isBeingKilled;
112+
const pid = process.pid;
113+
114+
logger.debug("[TaskRunProcessPool] Checking if process should be reused", {
115+
isHealthy,
116+
isBeingKilled,
117+
pid,
118+
availableCount: this.availableProcesses.length,
119+
busyCount: this.busyProcesses.size,
120+
maxPoolSize: this.maxPoolSize,
121+
});
122+
123+
return (
124+
this.options.enableProcessReuse &&
125+
this.isProcessHealthy(process) &&
126+
this.availableProcesses.length < this.maxPoolSize
127+
);
128+
}
129+
130+
private isProcessHealthy(process: TaskRunProcess): boolean {
131+
// Basic health checks - we can expand this later
132+
return !process.isBeingKilled && process.pid !== undefined;
133+
}
134+
135+
private async killProcess(process: TaskRunProcess): Promise<void> {
136+
try {
137+
await process.cleanup(true);
138+
} catch (error) {
139+
logger.debug("[TaskRunProcessPool] Error killing process", { error });
140+
}
141+
}
142+
143+
async shutdown(): Promise<void> {
144+
logger.debug("[TaskRunProcessPool] Shutting down pool", {
145+
availableCount: this.availableProcesses.length,
146+
busyCount: this.busyProcesses.size,
147+
});
148+
149+
// Kill all available processes
150+
await Promise.all(this.availableProcesses.map((process) => this.killProcess(process)));
151+
this.availableProcesses = [];
152+
153+
// Kill all busy processes
154+
await Promise.all(Array.from(this.busyProcesses).map((process) => this.killProcess(process)));
155+
this.busyProcesses.clear();
156+
}
157+
158+
getStats() {
159+
return {
160+
availableCount: this.availableProcesses.length,
161+
busyCount: this.busyProcesses.size,
162+
totalCount: this.availableProcesses.length + this.busyProcesses.size,
163+
};
164+
}
165+
}

packages/cli-v3/src/entryPoints/dev-run-controller.ts

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import { sanitizeEnvVars } from "../utilities/sanitizeEnvVars.js";
1919
import { join } from "node:path";
2020
import { BackgroundWorker } from "../dev/backgroundWorker.js";
2121
import { eventBus } from "../utilities/eventBus.js";
22+
import { TaskRunProcessPool } from "../dev/taskRunProcessPool.js";
2223

2324
type DevRunControllerOptions = {
2425
runFriendlyId: string;
2526
worker: BackgroundWorker;
2627
httpClient: CliApiClient;
2728
logLevel: LogLevel;
2829
heartbeatIntervalSeconds?: number;
30+
taskRunProcessPool: TaskRunProcessPool;
2931
onSubscribeToRunNotifications: (run: Run, snapshot: Snapshot) => void;
3032
onUnsubscribeFromRunNotifications: (run: Run, snapshot: Snapshot) => void;
3133
onFinished: () => void;
@@ -605,25 +607,25 @@ export class DevRunController {
605607

606608
this.snapshotPoller.start();
607609

608-
this.taskRunProcess = new TaskRunProcess({
609-
workerManifest: this.opts.worker.manifest,
610-
env: {
611-
...sanitizeEnvVars(envVars ?? {}),
612-
...sanitizeEnvVars(this.opts.worker.params.env),
613-
TRIGGER_WORKER_MANIFEST_PATH: join(this.opts.worker.build.outputPath, "index.json"),
614-
RUN_WORKER_SHOW_LOGS: this.opts.logLevel === "debug" ? "true" : "false",
615-
TRIGGER_PROJECT_REF: execution.project.ref,
616-
},
617-
serverWorker: {
610+
// Get process from pool instead of creating new one
611+
this.taskRunProcess = await this.opts.taskRunProcessPool.getProcess(
612+
this.opts.worker.manifest,
613+
{
618614
id: "unmanaged",
619615
contentHash: this.opts.worker.build.contentHash,
620616
version: this.opts.worker.serverWorker?.version,
621617
engine: "V2",
622618
},
623-
machineResources: execution.machine,
624-
}).initialize();
619+
execution.machine,
620+
{
621+
TRIGGER_WORKER_MANIFEST_PATH: join(this.opts.worker.build.outputPath, "index.json"),
622+
RUN_WORKER_SHOW_LOGS: this.opts.logLevel === "debug" ? "true" : "false",
623+
}
624+
);
625625

626-
logger.debug("executing task run process", {
626+
// Update the process environment for this specific run
627+
// Note: We may need to enhance TaskRunProcess to support updating env vars
628+
logger.debug("executing task run process from pool", {
627629
attemptNumber: execution.attempt.number,
628630
runId: execution.run.id,
629631
});
@@ -635,15 +637,21 @@ export class DevRunController {
635637
metrics,
636638
},
637639
messageId: run.friendlyId,
640+
env: {
641+
...sanitizeEnvVars(envVars ?? {}),
642+
...sanitizeEnvVars(this.opts.worker.params.env),
643+
TRIGGER_PROJECT_REF: execution.project.ref,
644+
},
638645
});
639646

640647
logger.debug("Completed run", completion);
641648

649+
// Return process to pool instead of killing it
642650
try {
643-
await this.taskRunProcess.cleanup(true);
651+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess);
644652
this.taskRunProcess = undefined;
645653
} catch (error) {
646-
logger.debug("Failed to cleanup task run process, submitting completion anyway", {
654+
logger.debug("Failed to return task run process to pool, submitting completion anyway", {
647655
error,
648656
});
649657
}
@@ -758,11 +766,14 @@ export class DevRunController {
758766
}
759767

760768
private async runFinished() {
761-
// Kill the run process
762-
try {
763-
await this.taskRunProcess?.kill("SIGKILL");
764-
} catch (error) {
765-
logger.debug("Failed to kill task run process", { error });
769+
// Return the process to the pool instead of killing it directly
770+
if (this.taskRunProcess) {
771+
try {
772+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess);
773+
this.taskRunProcess = undefined;
774+
} catch (error) {
775+
logger.debug("Failed to return task run process to pool during runFinished", { error });
776+
}
766777
}
767778

768779
this.runHeartbeat.stop();
@@ -794,9 +805,10 @@ export class DevRunController {
794805

795806
if (this.taskRunProcess && !this.taskRunProcess.isBeingKilled) {
796807
try {
797-
await this.taskRunProcess.cleanup(true);
808+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess);
809+
this.taskRunProcess = undefined;
798810
} catch (error) {
799-
logger.debug("Failed to cleanup task run process", { error });
811+
logger.debug("Failed to return task run process to pool during stop", { error });
800812
}
801813
}
802814

packages/core/src/v3/config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,14 @@ export type TriggerConfig = {
234234
env?: Record<string, string>;
235235
};
236236

237+
/**
238+
* @default false
239+
* @description Keep the process alive after the task has finished running so the next task doesn't have to wait for the process to start up again.
240+
*
241+
* Note that the process could be killed at any time, and we don't make any guarantees about the process being alive for a certain amount of time
242+
*/
243+
experimental_processKeepAlive?: boolean;
244+
237245
/**
238246
* @deprecated Use `dirs` instead
239247
*/

packages/core/src/v3/otel/tracingSDK.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ export class TracingSDK {
114114
: {};
115115

116116
const commonResources = detectResourcesSync({
117-
detectors: [this.asyncResourceDetector, processDetectorSync],
117+
detectors: [processDetectorSync],
118118
})
119119
.merge(
120120
new Resource({
@@ -123,6 +123,8 @@ export class TracingSDK {
123123
getEnvVar("OTEL_SERVICE_NAME") ?? "trigger.dev",
124124
[SemanticInternalAttributes.TRIGGER]: true,
125125
[SemanticInternalAttributes.CLI_VERSION]: VERSION,
126+
[SemanticInternalAttributes.SDK_VERSION]: VERSION,
127+
[SemanticInternalAttributes.SDK_LANGUAGE]: "typescript",
126128
})
127129
)
128130
.merge(config.resource ?? new Resource({}))

0 commit comments

Comments
 (0)