Skip to content

Commit 5abfd2e

Browse files
committed
Reset execution environment between runs
1 parent 21a3e31 commit 5abfd2e

File tree

9 files changed

+111
-10
lines changed

9 files changed

+111
-10
lines changed

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
runMetadata,
2020
runtime,
2121
runTimelineMetrics,
22+
taskContext,
2223
TaskRunErrorCodes,
2324
TaskRunExecution,
2425
timeout,
@@ -58,6 +59,7 @@ import sourceMapSupport from "source-map-support";
5859
import { env } from "std-env";
5960
import { normalizeImportPath } from "../utilities/normalizeImportPath.js";
6061
import { VERSION } from "../version.js";
62+
import { promiseWithResolvers } from "@trigger.dev/core/utils";
6163

6264
sourceMapSupport.install({
6365
handleUncaughtExceptions: false,
@@ -112,8 +114,12 @@ runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager);
112114

113115
const devUsageManager = new DevUsageManager();
114116
usage.setGlobalUsageManager(devUsageManager);
115-
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
116-
resourceCatalog.setGlobalResourceCatalog(new StandardResourceCatalog());
117+
118+
const usageTimeoutManager = new UsageTimeoutManager(devUsageManager);
119+
timeout.setGlobalManager(usageTimeoutManager);
120+
121+
const standardResourceCatalog = new StandardResourceCatalog();
122+
resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog);
117123

118124
const durableClock = new DurableClock();
119125
clock.setGlobalClock(durableClock);
@@ -238,7 +244,29 @@ let _isRunning = false;
238244
let _isCancelled = false;
239245
let _tracingSDK: TracingSDK | undefined;
240246
let _executionMeasurement: UsageMeasurement | undefined;
241-
const cancelController = new AbortController();
247+
let _cancelController = new AbortController();
248+
let _lastFlushPromise: Promise<void> | undefined;
249+
250+
function resetExecutionEnvironment() {
251+
_execution = undefined;
252+
_isRunning = false;
253+
_isCancelled = false;
254+
_executionMeasurement = undefined;
255+
_cancelController = new AbortController();
256+
257+
standardLocalsManager.reset();
258+
standardLifecycleHooksManager.reset();
259+
standardRunTimelineMetricsManager.reset();
260+
devUsageManager.reset();
261+
usageTimeoutManager.reset();
262+
runMetadataManager.reset();
263+
waitUntilManager.reset();
264+
sharedWorkerRuntime.reset();
265+
durableClock.reset();
266+
taskContext.disable();
267+
268+
log(`[${new Date().toISOString()}] Reset execution environment`);
269+
}
242270

243271
const zodIpc = new ZodIpcConnection({
244272
listenSchema: WorkerToExecutorMessageCatalog,
@@ -254,6 +282,18 @@ const zodIpc = new ZodIpcConnection({
254282

255283
log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);
256284

285+
if (_lastFlushPromise) {
286+
const now = performance.now();
287+
288+
await _lastFlushPromise;
289+
290+
const duration = performance.now() - now;
291+
292+
log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`);
293+
}
294+
295+
resetExecutionEnvironment();
296+
257297
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);
258298

259299
if (_isRunning) {
@@ -365,8 +405,6 @@ const zodIpc = new ZodIpcConnection({
365405
return;
366406
}
367407

368-
process.title = `trigger-dev-worker: ${execution.task.id} ${execution.run.id}`;
369-
370408
// Import the task module
371409
const task = resourceCatalog.getTask(execution.task.id);
372410

@@ -413,7 +451,7 @@ const zodIpc = new ZodIpcConnection({
413451

414452
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
415453

416-
const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);
454+
const signal = AbortSignal.any([_cancelController.signal, timeoutController.signal]);
417455

418456
const { result } = await executor.execute(execution, metadata, traceContext, signal);
419457

@@ -432,8 +470,7 @@ const zodIpc = new ZodIpcConnection({
432470
});
433471
}
434472
} finally {
435-
_execution = undefined;
436-
_isRunning = false;
473+
log(`[${new Date().toISOString()}] Task run completed`);
437474
}
438475
} catch (err) {
439476
logError("Failed to execute task", err);
@@ -459,7 +496,7 @@ const zodIpc = new ZodIpcConnection({
459496
},
460497
CANCEL: async ({ timeoutInMs }) => {
461498
_isCancelled = true;
462-
cancelController.abort("run cancelled");
499+
_cancelController.abort("run cancelled");
463500
await callCancelHooks(timeoutInMs);
464501
if (_executionMeasurement) {
465502
usage.stop(_executionMeasurement);
@@ -490,6 +527,10 @@ async function callCancelHooks(timeoutInMs: number = 10_000) {
490527
async function flushAll(timeoutInMs: number = 10_000) {
491528
const now = performance.now();
492529

530+
const { promise, resolve } = promiseWithResolvers<void>();
531+
532+
_lastFlushPromise = promise;
533+
493534
const results = await Promise.allSettled([
494535
flushTracingSDK(timeoutInMs),
495536
flushMetadata(timeoutInMs),
@@ -522,6 +563,9 @@ async function flushAll(timeoutInMs: number = 10_000) {
522563
const duration = performance.now() - now;
523564

524565
log(`Flushed all in ${duration}ms`);
566+
567+
// Resolve the last flush promise
568+
resolve();
525569
}
526570

527571
async function flushTracingSDK(timeoutInMs: number = 10_000) {

packages/core/src/v3/lifecycleHooks/manager.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,23 @@ export class StandardLifecycleHooksManager implements LifecycleHooksManager {
6868
private taskCancelHooks: Map<string, RegisteredHookFunction<AnyOnCancelHookFunction>> = new Map();
6969
private onCancelHookListeners: (() => Promise<void>)[] = [];
7070

71+
reset(): void {
72+
this.taskInitHooks.clear();
73+
this.taskStartHooks.clear();
74+
this.taskFailureHooks.clear();
75+
this.taskSuccessHooks.clear();
76+
this.taskCompleteHooks.clear();
77+
this.taskResumeHooks.clear();
78+
this.taskCatchErrorHooks.clear();
79+
this.taskMiddlewareHooks.clear();
80+
this.taskCleanupHooks.clear();
81+
this.taskWaitHooks.clear();
82+
this.taskCancelHooks.clear();
83+
this.onCancelHookListeners = [];
84+
this.onWaitHookListeners = [];
85+
this.onResumeHookListeners = [];
86+
}
87+
7188
registerOnCancelHookListener(listener: () => Promise<void>): void {
7289
this.onCancelHookListeners.push(listener);
7390
}

packages/core/src/v3/locals/manager.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,8 @@ export class StandardLocalsManager implements LocalsManager {
3333
setLocal<T>(key: LocalsKey<T>, value: T): void {
3434
this.store.set(key.__type, value);
3535
}
36+
37+
reset(): void {
38+
this.store.clear();
39+
}
3640
}
37-
0;

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ export class StandardMetadataManager implements RunMetadataManager {
3030
private streamsVersion: "v1" | "v2" = "v1"
3131
) {}
3232

33+
reset(): void {
34+
this.queuedOperations.clear();
35+
this.queuedParentOperations.clear();
36+
this.queuedRootOperations.clear();
37+
this.activeStreams.clear();
38+
this.store = undefined;
39+
this.runId = undefined;
40+
this.flushTimeoutId = null;
41+
this.isFlushing = false;
42+
}
43+
3344
get parent(): RunMetadataUpdater {
3445
// Store a reference to 'this' to ensure proper context
3546
const self = this;

packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana
3030
}
3131
}
3232

33+
reset(): void {
34+
this._metrics = [];
35+
}
36+
37+
// TODO: handle this when processKeepAlive is enabled
3338
#seedMetricsFromEnvironment() {
3439
const forkStartTime = getEnvVar("TRIGGER_PROCESS_FORK_START_TIME");
3540
const warmStart = getEnvVar("TRIGGER_WARM_START");

packages/core/src/v3/runtime/sharedRuntimeManager.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ export class SharedRuntimeManager implements RuntimeManager {
4141
}, 300_000);
4242
}
4343

44+
reset(): void {
45+
this.resolversById.clear();
46+
this.waitpointsByResolverId.clear();
47+
}
48+
4449
disable(): void {
4550
// do nothing
4651
}

packages/core/src/v3/timeout/usageTimeoutManager.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ export class UsageTimeoutManager implements TimeoutManager {
1414
return this._abortSignal;
1515
}
1616

17+
reset(): void {
18+
this._abortController = new AbortController();
19+
this._abortSignal = undefined;
20+
this._intervalId = undefined;
21+
}
22+
1723
abortAfterTimeout(timeoutInSeconds?: number): AbortController {
1824
this._abortSignal = this._abortController.signal;
1925

packages/core/src/v3/usage/devUsageManager.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ export class DevUsageManager implements UsageManager {
5050

5151
async flush(): Promise<void> {}
5252

53+
reset(): void {
54+
this._firstMeasurement = undefined;
55+
this._currentMeasurements.clear();
56+
this._pauses.clear();
57+
}
58+
5359
sample(): UsageSample | undefined {
5460
return this._firstMeasurement?.sample();
5561
}

packages/core/src/v3/waitUntil/manager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ import { MaybeDeferredPromise, WaitUntilManager } from "./types.js";
33
export class StandardWaitUntilManager implements WaitUntilManager {
44
private maybeDeferredPromises: Set<MaybeDeferredPromise> = new Set();
55

6+
reset(): void {
7+
this.maybeDeferredPromises.clear();
8+
}
9+
610
register(promise: MaybeDeferredPromise): void {
711
this.maybeDeferredPromises.add(promise);
812
}

0 commit comments

Comments
 (0)