Skip to content

Commit 3d7305f

Browse files
authored
🤖 refactor: default SSH acquireConnection to wait (#1155)
### Summary - Make SSH backoff non-catastrophic by default: `SSHConnectionPool.acquireConnection()` now waits through backoff (bounded) instead of throwing immediately. - Preserve herd protection semantics (singleflight probe + backoff schedule) while improving user-facing reliability. - Remove SSH-only wait knobs from `ExecOptions` to avoid runtime API disparity; SSHRuntime always goes through the pool. ### Behavior - Default: `acquireConnection(config)` waits through backoff for up to 2 minutes. - Opt-in fail-fast: `acquireConnection(config, { maxWaitMs: 0 })`. ### Notes - Workspace init/sync still emits progress via `onWait` for the preflight step, but the waiting policy now lives in the pool. --- _Generated with `mux` • Model: `openai:gpt-5.2` • Thinking: `xhigh`_
1 parent 1776d88 commit 3d7305f

File tree

3 files changed

+222
-52
lines changed

3 files changed

+222
-52
lines changed

src/node/runtime/SSHRuntime.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ const shescape = {
4141
},
4242
};
4343

44+
function logSSHBackoffWait(initLogger: InitLogger, waitMs: number): void {
45+
const secs = Math.max(1, Math.ceil(waitMs / 1000));
46+
initLogger.logStep(`SSH unavailable; retrying in ${secs}s...`);
47+
}
48+
4449
// Re-export SSHRuntimeConfig from connection pool (defined there to avoid circular deps)
4550
export type { SSHRuntimeConfig } from "./sshConnectionPool";
4651

@@ -122,9 +127,11 @@ export class SSHRuntime implements Runtime {
122127
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
123128
}
124129

125-
// Ensure connection is healthy before executing
126-
// This provides backoff protection and singleflighting for concurrent requests
127-
await sshConnectionPool.acquireConnection(this.config);
130+
// Ensure connection is healthy before executing.
131+
// This provides backoff protection and singleflighting for concurrent requests.
132+
await sshConnectionPool.acquireConnection(this.config, {
133+
abortSignal: options.abortSignal,
134+
});
128135

129136
// Build command parts
130137
const parts: string[] = [];
@@ -437,7 +444,7 @@ export class SSHRuntime implements Runtime {
437444
*/
438445
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
439446
// Ensure connection is healthy before executing
440-
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
447+
await sshConnectionPool.acquireConnection(this.config, { timeoutMs });
441448

442449
const sshArgs = this.buildSSHArgs();
443450
sshArgs.push(this.config.host, command);
@@ -623,7 +630,13 @@ export class SSHRuntime implements Runtime {
623630
initLogger.logStderr(`Could not get origin URL: ${getErrorMessage(error)}`);
624631
}
625632

626-
// Step 2: Create bundle locally and pipe to remote file via SSH
633+
// Step 2: Ensure the SSH host is reachable before doing expensive local work
634+
await sshConnectionPool.acquireConnection(this.config, {
635+
abortSignal,
636+
onWait: (waitMs) => logSSHBackoffWait(initLogger, waitMs),
637+
});
638+
639+
// Step 3: Create bundle locally and pipe to remote file via SSH
627640
initLogger.logStep(`Creating git bundle...`);
628641

629642
// Ensure SSH connection is established before starting the bundle transfer
@@ -676,7 +689,7 @@ export class SSHRuntime implements Runtime {
676689
});
677690
});
678691

679-
// Step 3: Clone from bundle on remote using this.exec
692+
// Step 4: Clone from bundle on remote using this.exec
680693
initLogger.logStep(`Cloning repository on remote...`);
681694

682695
// Expand tilde in destination path for git clone
@@ -699,7 +712,7 @@ export class SSHRuntime implements Runtime {
699712
throw new Error(`Failed to clone repository: ${cloneStderr || cloneStdout}`);
700713
}
701714

702-
// Step 4: Create local tracking branches for all remote branches
715+
// Step 5: Create local tracking branches for all remote branches
703716
// This ensures that branch names like "custom-trunk" can be used directly
704717
// in git checkout commands, rather than needing "origin/custom-trunk"
705718
initLogger.logStep(`Creating local tracking branches...`);
@@ -714,7 +727,7 @@ export class SSHRuntime implements Runtime {
714727
await createTrackingBranchesStream.exitCode;
715728
// Don't fail if this fails - some branches may already exist
716729

717-
// Step 5: Update origin remote if we have an origin URL
730+
// Step 6: Update origin remote if we have an origin URL
718731
if (originUrl) {
719732
initLogger.logStep(`Setting origin remote to ${originUrl}...`);
720733
const setOriginStream = await this.exec(
@@ -746,7 +759,7 @@ export class SSHRuntime implements Runtime {
746759
await removeOriginStream.exitCode;
747760
}
748761

749-
// Step 5: Remove bundle file
762+
// Step 7: Remove bundle file
750763
initLogger.logStep(`Cleaning up bundle file...`);
751764
const rmStream = await this.exec(`rm ${bundleTempPath}`, {
752765
cwd: "~",

src/node/runtime/sshConnectionPool.test.ts

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,9 @@ describe("SSHConnectionPool", () => {
193193
};
194194

195195
// Trigger a failure via acquireConnection (will fail to connect)
196-
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
196+
await expect(
197+
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 })
198+
).rejects.toThrow();
197199

198200
// Verify we're now in backoff
199201
const healthBefore = pool.getConnectionHealth(config);
@@ -231,6 +233,38 @@ describe("SSHConnectionPool", () => {
231233
expect(elapsed).toBeLessThan(50);
232234
});
233235

236+
test("waits through backoff (bounded) instead of throwing", async () => {
237+
const pool = new SSHConnectionPool();
238+
const config: SSHRuntimeConfig = {
239+
host: "test.example.com",
240+
srcBaseDir: "/work",
241+
};
242+
243+
// Put host into backoff without doing a real probe.
244+
pool.reportFailure(config, "Connection refused");
245+
expect(pool.getConnectionHealth(config)?.backoffUntil).toBeDefined();
246+
247+
const sleepCalls: number[] = [];
248+
const onWaitCalls: number[] = [];
249+
250+
await pool.acquireConnection(config, {
251+
onWait: (ms) => {
252+
onWaitCalls.push(ms);
253+
},
254+
sleep: (ms) => {
255+
sleepCalls.push(ms);
256+
// Simulate time passing / recovery.
257+
pool.markHealthy(config);
258+
return Promise.resolve();
259+
},
260+
});
261+
262+
expect(sleepCalls.length).toBe(1);
263+
expect(onWaitCalls.length).toBe(1);
264+
expect(sleepCalls[0]).toBeGreaterThan(0);
265+
expect(onWaitCalls[0]).toBe(sleepCalls[0]);
266+
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");
267+
});
234268
test("throws immediately when in backoff", async () => {
235269
const pool = new SSHConnectionPool();
236270
const config: SSHRuntimeConfig = {
@@ -239,10 +273,12 @@ describe("SSHConnectionPool", () => {
239273
};
240274

241275
// Trigger a failure to put connection in backoff
242-
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
276+
await expect(
277+
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 })
278+
).rejects.toThrow();
243279

244280
// Second call should throw immediately with backoff message
245-
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
281+
await expect(pool.acquireConnection(config, { maxWaitMs: 0 })).rejects.toThrow(/in backoff/);
246282
});
247283

248284
test("getControlPath returns deterministic path", () => {
@@ -270,9 +306,9 @@ describe("SSHConnectionPool", () => {
270306

271307
// All concurrent calls should share the same probe and get same result
272308
const results = await Promise.allSettled([
273-
pool.acquireConnection(config, 1000),
274-
pool.acquireConnection(config, 1000),
275-
pool.acquireConnection(config, 1000),
309+
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 }),
310+
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 }),
311+
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 }),
276312
]);
277313

278314
// All should be rejected (connection fails)

src/node/runtime/sshConnectionPool.ts

Lines changed: 158 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,65 @@ const BACKOFF_SCHEDULE = [1, 5, 10, 20, 40, 60];
6464
*/
6565
const HEALTHY_TTL_MS = 15 * 1000; // 15 seconds
6666

67+
const DEFAULT_PROBE_TIMEOUT_MS = 10_000;
68+
const DEFAULT_MAX_WAIT_MS = 2 * 60 * 1000; // 2 minutes
69+
70+
export interface AcquireConnectionOptions {
71+
/** Timeout for the health check probe. */
72+
timeoutMs?: number;
73+
74+
/**
75+
* Max time to wait (ms) for a host to become healthy (waits + probes).
76+
*
77+
* - Omit to use the default (waits through backoff).
78+
* - Set to 0 to fail fast.
79+
*/
80+
maxWaitMs?: number;
81+
82+
/** Optional abort signal to cancel any waiting. */
83+
abortSignal?: AbortSignal;
84+
85+
/**
86+
* Called when acquireConnection is waiting due to backoff.
87+
*
88+
* Useful for user-facing progress logs (e.g. workspace init).
89+
*/
90+
onWait?: (waitMs: number) => void;
91+
92+
/**
93+
* Test seam.
94+
*
95+
* If provided, this is used for sleeping between wait cycles.
96+
*/
97+
sleep?: (ms: number, abortSignal?: AbortSignal) => Promise<void>;
98+
}
99+
100+
async function sleepWithAbort(ms: number, abortSignal?: AbortSignal): Promise<void> {
101+
if (ms <= 0) return;
102+
if (abortSignal?.aborted) {
103+
throw new Error("Operation aborted");
104+
}
105+
106+
await new Promise<void>((resolve, reject) => {
107+
const timer = setTimeout(() => {
108+
cleanup();
109+
resolve();
110+
}, ms);
111+
112+
const onAbort = () => {
113+
cleanup();
114+
reject(new Error("Operation aborted"));
115+
};
116+
117+
const cleanup = () => {
118+
clearTimeout(timer);
119+
abortSignal?.removeEventListener("abort", onAbort);
120+
};
121+
122+
abortSignal?.addEventListener("abort", onAbort);
123+
});
124+
}
125+
67126
/**
68127
* SSH Connection Pool
69128
*
@@ -80,51 +139,113 @@ export class SSHConnectionPool {
80139
/**
81140
* Ensure connection is healthy before proceeding.
82141
*
83-
* @param config SSH configuration
84-
* @param timeoutMs Timeout for health check probe (default: 10s)
85-
* @throws Error if connection is in backoff or health check fails
142+
* By default, acquireConnection waits through backoff (bounded) so user-facing
143+
* actions don’t immediately fail during transient SSH outages.
144+
*
145+
* Callers can opt into fail-fast behavior by passing `{ maxWaitMs: 0 }`.
86146
*/
87-
async acquireConnection(config: SSHRuntimeConfig, timeoutMs = 10000): Promise<void> {
147+
async acquireConnection(config: SSHRuntimeConfig, timeoutMs?: number): Promise<void>;
148+
async acquireConnection(
149+
config: SSHRuntimeConfig,
150+
options?: AcquireConnectionOptions
151+
): Promise<void>;
152+
async acquireConnection(
153+
config: SSHRuntimeConfig,
154+
timeoutMsOrOptions: number | AcquireConnectionOptions = DEFAULT_PROBE_TIMEOUT_MS
155+
): Promise<void> {
156+
const options: AcquireConnectionOptions =
157+
typeof timeoutMsOrOptions === "number"
158+
? { timeoutMs: timeoutMsOrOptions }
159+
: (timeoutMsOrOptions ?? {});
160+
161+
const timeoutMs = options.timeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS;
162+
const sleep = options.sleep ?? sleepWithAbort;
163+
164+
const maxWaitMs = options.maxWaitMs ?? DEFAULT_MAX_WAIT_MS;
165+
const shouldWait = maxWaitMs > 0;
166+
88167
const key = makeConnectionKey(config);
89-
const health = this.health.get(key);
168+
const startTime = Date.now();
90169

91-
// Check if in backoff
92-
if (health?.backoffUntil && health.backoffUntil > new Date()) {
93-
const remainingSecs = Math.ceil((health.backoffUntil.getTime() - Date.now()) / 1000);
94-
throw new Error(
95-
`SSH connection to ${config.host} is in backoff for ${remainingSecs}s. ` +
96-
`Last error: ${health.lastError ?? "unknown"}`
97-
);
98-
}
170+
while (true) {
171+
if (options.abortSignal?.aborted) {
172+
throw new Error("Operation aborted");
173+
}
99174

100-
// Return immediately if known healthy and not stale
101-
if (health?.status === "healthy") {
102-
const age = Date.now() - (health.lastSuccess?.getTime() ?? 0);
103-
if (age < HEALTHY_TTL_MS) {
104-
log.debug(`SSH connection to ${config.host} is known healthy, skipping probe`);
105-
return;
175+
const health = this.health.get(key);
176+
177+
// If in backoff: either fail fast or wait (bounded).
178+
if (health?.backoffUntil && health.backoffUntil > new Date()) {
179+
const remainingMs = health.backoffUntil.getTime() - Date.now();
180+
const remainingSecs = Math.ceil(remainingMs / 1000);
181+
182+
if (!shouldWait) {
183+
throw new Error(
184+
`SSH connection to ${config.host} is in backoff for ${remainingSecs}s. ` +
185+
`Last error: ${health.lastError ?? "unknown"}`
186+
);
187+
}
188+
189+
const elapsedMs = Date.now() - startTime;
190+
const budgetMs = Math.max(0, maxWaitMs - elapsedMs);
191+
if (budgetMs <= 0) {
192+
throw new Error(
193+
`SSH connection to ${config.host} did not become healthy within ${maxWaitMs}ms. ` +
194+
`Last error: ${health.lastError ?? "unknown"}`
195+
);
196+
}
197+
198+
const waitMs = Math.min(remainingMs, budgetMs);
199+
options.onWait?.(waitMs);
200+
await sleep(waitMs, options.abortSignal);
201+
continue;
106202
}
107-
log.debug(
108-
`SSH connection to ${config.host} health is stale (${Math.round(age / 1000)}s), re-probing`
109-
);
110-
}
111203

112-
// Check for inflight probe - singleflighting
113-
const existing = this.inflight.get(key);
114-
if (existing) {
115-
log.debug(`SSH connection to ${config.host} has inflight probe, waiting...`);
116-
return existing;
117-
}
204+
// Return immediately if known healthy and not stale.
205+
if (health?.status === "healthy") {
206+
const age = Date.now() - (health.lastSuccess?.getTime() ?? 0);
207+
if (age < HEALTHY_TTL_MS) {
208+
log.debug(`SSH connection to ${config.host} is known healthy, skipping probe`);
209+
return;
210+
}
211+
log.debug(
212+
`SSH connection to ${config.host} health is stale (${Math.round(age / 1000)}s), re-probing`
213+
);
214+
}
118215

119-
// Start new probe
120-
log.debug(`SSH connection to ${config.host} needs probe, starting health check`);
121-
const probe = this.probeConnection(config, timeoutMs, key);
122-
this.inflight.set(key, probe);
216+
// Check for inflight probe - singleflighting.
217+
const existing = this.inflight.get(key);
218+
if (existing) {
219+
log.debug(`SSH connection to ${config.host} has inflight probe, waiting...`);
220+
try {
221+
await existing;
222+
return;
223+
} catch (error) {
224+
// Probe failed; if we're in wait mode we'll loop and sleep through the backoff.
225+
if (!shouldWait) {
226+
throw error;
227+
}
228+
continue;
229+
}
230+
}
231+
232+
// Start new probe.
233+
log.debug(`SSH connection to ${config.host} needs probe, starting health check`);
234+
const probe = this.probeConnection(config, timeoutMs, key);
235+
this.inflight.set(key, probe);
123236

124-
try {
125-
await probe;
126-
} finally {
127-
this.inflight.delete(key);
237+
try {
238+
await probe;
239+
return;
240+
} catch (error) {
241+
if (!shouldWait) {
242+
throw error;
243+
}
244+
// In wait mode: probeConnection() recorded backoff; loop and wait.
245+
continue;
246+
} finally {
247+
this.inflight.delete(key);
248+
}
128249
}
129250
}
130251

0 commit comments

Comments
 (0)