Skip to content

Commit aad7840

Browse files
committed
fix(runner): improve restore detection
1 parent 9c08764 commit aad7840

File tree

3 files changed

+161
-45
lines changed

3 files changed

+161
-45
lines changed

packages/cli-v3/src/entryPoints/managed/controller.ts

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -461,19 +461,6 @@ export class ManagedRunController {
461461
runId: this.runFriendlyId,
462462
message: "Socket connected to supervisor",
463463
});
464-
465-
// This should handle the case where we reconnect after being restored
466-
if (
467-
this.runFriendlyId &&
468-
this.snapshotFriendlyId &&
469-
this.runFriendlyId !== this.env.TRIGGER_RUN_ID
470-
) {
471-
this.sendDebugLog({
472-
runId: this.runFriendlyId,
473-
message: "Subscribing to notifications for in-progress run",
474-
});
475-
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
476-
}
477464
});
478465

479466
socket.on("connect_error", (error) => {
@@ -514,7 +501,7 @@ export class ManagedRunController {
514501
supervisorApiUrl: this.env.TRIGGER_SUPERVISOR_API_URL,
515502
};
516503

517-
await this.currentExecution.processEnvOverrides("socket disconnected", true);
504+
const result = await this.currentExecution.processEnvOverrides("socket disconnected", true);
518505

519506
const newEnv = {
520507
workerInstanceName: this.env.TRIGGER_WORKER_INSTANCE_NAME,
@@ -528,6 +515,43 @@ export class ManagedRunController {
528515
properties: { reason, ...parseDescription(), currentEnv, newEnv },
529516
});
530517

518+
if (!result) {
519+
return;
520+
}
521+
522+
// If runner ID changed, we detected a restore
523+
if (result.runnerIdChanged) {
524+
this.sendDebugLog({
525+
runId: this.runFriendlyId,
526+
message: "Runner ID changed - restore detected",
527+
properties: {
528+
supervisorChanged: result.supervisorChanged,
529+
},
530+
});
531+
532+
if (!result.supervisorChanged) {
533+
return;
534+
}
535+
536+
// Only reconnect WebSocket if supervisor URL actually changed
537+
this.sendDebugLog({
538+
runId: this.runFriendlyId,
539+
message: "Supervisor URL changed - creating new socket connection",
540+
});
541+
542+
// First disconnect the old socket to avoid conflicts
543+
socket.removeAllListeners();
544+
socket.disconnect();
545+
546+
// Create a new socket with the updated URL and headers
547+
this.socket = this.createSupervisorSocket();
548+
549+
// Re-subscribe to notifications if we have an active execution
550+
if (this.runFriendlyId && this.snapshotFriendlyId) {
551+
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
552+
}
553+
}
554+
531555
return;
532556
}
533557

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,23 @@ export class RunExecution {
873873
);
874874

875875
if (!continuationResult.success) {
876-
throw new Error(continuationResult.error);
876+
// Check if we need to refresh metadata due to connection error
877+
if (continuationResult.isConnectionError) {
878+
this.sendDebugLog("restore: connection error detected, refreshing metadata");
879+
await this.processEnvOverrides("restore connection error");
880+
881+
// Retry the continuation after refreshing metadata
882+
const retryResult = await this.httpClient.continueRunExecution(
883+
this.runFriendlyId,
884+
this.snapshotManager.snapshotId
885+
);
886+
887+
if (!retryResult.success) {
888+
throw new Error(retryResult.error);
889+
}
890+
} else {
891+
throw new Error(continuationResult.error);
892+
}
877893
}
878894

879895
// Track restore count
@@ -899,11 +915,18 @@ export class RunExecution {
899915
public async processEnvOverrides(
900916
reason?: string,
901917
shouldPollForSnapshotChanges?: boolean
902-
): Promise<{ overrides: Metadata } | null> {
918+
): Promise<{
919+
overrides: Metadata;
920+
runnerIdChanged?: boolean;
921+
supervisorChanged?: boolean;
922+
} | null> {
903923
if (!this.metadataClient) {
904924
return null;
905925
}
906926

927+
const previousRunnerId = this.env.TRIGGER_RUNNER_ID;
928+
const previousSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL;
929+
907930
const [error, overrides] = await this.metadataClient.getEnvOverrides();
908931

909932
if (error) {
@@ -931,6 +954,14 @@ export class RunExecution {
931954
// Override the env with the new values
932955
this.env.override(overrides);
933956

957+
// Check if runner ID changed
958+
const newRunnerId = this.env.TRIGGER_RUNNER_ID;
959+
const runnerIdChanged = previousRunnerId !== newRunnerId;
960+
961+
// Check if supervisor URL changed
962+
const newSupervisorUrl = this.env.TRIGGER_SUPERVISOR_API_URL;
963+
const supervisorChanged = previousSupervisorUrl !== newSupervisorUrl;
964+
934965
// Update services with new values
935966
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
936967
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
@@ -954,6 +985,8 @@ export class RunExecution {
954985

955986
return {
956987
overrides,
988+
runnerIdChanged,
989+
supervisorChanged,
957990
};
958991
}
959992

@@ -977,6 +1010,12 @@ export class RunExecution {
9771010

9781011
if (!response.success) {
9791012
this.sendDebugLog("heartbeat: failed", { error: response.error });
1013+
1014+
// Check if we need to refresh metadata due to connection error
1015+
if (response.isConnectionError) {
1016+
this.sendDebugLog("heartbeat: connection error detected, refreshing metadata");
1017+
await this.processEnvOverrides("heartbeat connection error");
1018+
}
9801019
}
9811020

9821021
this.lastHeartbeat = new Date();
@@ -1192,6 +1231,14 @@ export class RunExecution {
11921231
error: response.error,
11931232
});
11941233

1234+
if (response.isConnectionError) {
1235+
// Log this separately to make it more visible
1236+
this.sendDebugLog(
1237+
"fetchAndProcessSnapshotChanges: connection error detected, refreshing metadata"
1238+
);
1239+
}
1240+
1241+
// Always trigger metadata refresh on snapshot fetch errors
11951242
await this.processEnvOverrides("snapshots since error");
11961243
return;
11971244
}

packages/core/src/v3/runEngineWorker/workload/http.ts

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,58 @@ export class WorkloadHttpClient {
5252
});
5353
}
5454

55+
private isConnectionError(error: string): boolean {
56+
const connectionErrors = [
57+
"Connection error",
58+
"ECONNREFUSED",
59+
"ETIMEDOUT",
60+
"ENOTFOUND",
61+
"ECONNRESET",
62+
"EHOSTUNREACH",
63+
"ENETUNREACH",
64+
"EPIPE",
65+
"ECONNABORTED",
66+
];
67+
return connectionErrors.some((errType) => error.includes(errType));
68+
}
69+
70+
private async withConnectionErrorDetection<T>(
71+
operation: () => Promise<{ success: true; data: T } | { success: false; error: string }>
72+
): Promise<
73+
{ success: true; data: T } | { success: false; error: string; isConnectionError?: boolean }
74+
> {
75+
const result = await operation();
76+
77+
if (result.success) {
78+
return result;
79+
}
80+
81+
// Check if this is a connection error
82+
if (this.isConnectionError(result.error)) {
83+
return {
84+
...result,
85+
isConnectionError: true,
86+
};
87+
}
88+
89+
return result;
90+
}
91+
5592
async heartbeatRun(runId: string, snapshotId: string, body?: WorkloadHeartbeatRequestBody) {
56-
return wrapZodFetch(
57-
WorkloadHeartbeatResponseBody,
58-
`${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/heartbeat`,
59-
{
60-
method: "POST",
61-
headers: {
62-
...this.defaultHeaders(),
63-
"Content-Type": "application/json",
64-
},
65-
body: JSON.stringify(body ?? {}),
66-
}
93+
return this.withConnectionErrorDetection(() =>
94+
wrapZodFetch(
95+
WorkloadHeartbeatResponseBody,
96+
`${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/heartbeat`,
97+
{
98+
method: "POST",
99+
headers: {
100+
...this.defaultHeaders(),
101+
"Content-Type": "application/json",
102+
},
103+
body: JSON.stringify(body ?? {}),
104+
signal: AbortSignal.timeout(10_000), // 10 second timeout
105+
}
106+
)
67107
);
68108
}
69109

@@ -81,15 +121,17 @@ export class WorkloadHttpClient {
81121
}
82122

83123
async continueRunExecution(runId: string, snapshotId: string) {
84-
return wrapZodFetch(
85-
WorkloadContinueRunExecutionResponseBody,
86-
`${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/continue`,
87-
{
88-
method: "GET",
89-
headers: {
90-
...this.defaultHeaders(),
91-
},
92-
}
124+
return this.withConnectionErrorDetection(() =>
125+
wrapZodFetch(
126+
WorkloadContinueRunExecutionResponseBody,
127+
`${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/${snapshotId}/continue`,
128+
{
129+
method: "GET",
130+
headers: {
131+
...this.defaultHeaders(),
132+
},
133+
}
134+
)
93135
);
94136
}
95137

@@ -130,15 +172,18 @@ export class WorkloadHttpClient {
130172
}
131173

132174
async getSnapshotsSince(runId: string, snapshotId: string) {
133-
return wrapZodFetch(
134-
WorkloadRunSnapshotsSinceResponseBody,
135-
`${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/since/${snapshotId}`,
136-
{
137-
method: "GET",
138-
headers: {
139-
...this.defaultHeaders(),
140-
},
141-
}
175+
return this.withConnectionErrorDetection(() =>
176+
wrapZodFetch(
177+
WorkloadRunSnapshotsSinceResponseBody,
178+
`${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/since/${snapshotId}`,
179+
{
180+
method: "GET",
181+
headers: {
182+
...this.defaultHeaders(),
183+
},
184+
signal: AbortSignal.timeout(10_000), // 10 second timeout
185+
}
186+
)
142187
);
143188
}
144189

0 commit comments

Comments
 (0)