Skip to content

Commit 2760b83

Browse files
committed
🤖 Pass abort controllers through runtime stack
Add abort signal support to all long-running runtime operations to prevent indefinite hangs. Operations with timeouts <10s are excluded per design to avoid noise. Changes: - Runtime interface: Add optional abortSignal to readFile/writeFile/stat and workspace operations - LocalRuntime: Accept and check abort signals in file operations and workspace lifecycle - SSHRuntime: Pass abort signals through to exec() calls for long-running operations: - File I/O (300s timeout) - Git clone operations (300s timeout) - Init hook execution (3600s timeout) - Workspace rename/delete (30s timeout) - Helper functions: Accept and propagate abort signals through readFileString/writeFileString - Bash tool: Pass abort signal to runtime.writeFile() for overflow files All abort signals are optional parameters for backward compatibility. Testing: - 794 unit tests pass - 57 integration tests pass (executeBash, runtimeExecuteBash, runtimeFileEditing, createWorkspace, renameWorkspace, removeWorkspace) - Typecheck and lint clean
1 parent 86323fa commit 2760b83

File tree

5 files changed

+112
-26
lines changed

5 files changed

+112
-26
lines changed

src/runtime/LocalRuntime.ts

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,28 @@ export class LocalRuntime implements Runtime {
205205
return { stdout, stderr, stdin, exitCode, duration };
206206
}
207207

208-
readFile(filePath: string): ReadableStream<Uint8Array> {
208+
readFile(filePath: string, abortSignal?: AbortSignal): ReadableStream<Uint8Array> {
209209
const nodeStream = fs.createReadStream(filePath);
210210

211211
// Handle errors by wrapping in a transform
212212
const webStream = Readable.toWeb(nodeStream) as unknown as ReadableStream<Uint8Array>;
213213

214214
return new ReadableStream<Uint8Array>({
215215
async start(controller: ReadableStreamDefaultController<Uint8Array>) {
216+
// Check if already aborted
217+
if (abortSignal?.aborted) {
218+
controller.error(new Error("Read operation aborted"));
219+
nodeStream.destroy();
220+
return;
221+
}
222+
223+
// Set up abort listener
224+
const abortHandler = () => {
225+
controller.error(new Error("Read operation aborted"));
226+
nodeStream.destroy();
227+
};
228+
abortSignal?.addEventListener("abort", abortHandler);
229+
216230
try {
217231
const reader = webStream.getReader();
218232
while (true) {
@@ -229,17 +243,24 @@ export class LocalRuntime implements Runtime {
229243
err instanceof Error ? err : undefined
230244
)
231245
);
246+
} finally {
247+
abortSignal?.removeEventListener("abort", abortHandler);
232248
}
233249
},
234250
});
235251
}
236252

237-
writeFile(filePath: string): WritableStream<Uint8Array> {
253+
writeFile(filePath: string, abortSignal?: AbortSignal): WritableStream<Uint8Array> {
238254
let tempPath: string;
239255
let writer: WritableStreamDefaultWriter<Uint8Array>;
240256

241257
return new WritableStream<Uint8Array>({
242258
async start() {
259+
// Check if already aborted
260+
if (abortSignal?.aborted) {
261+
throw new Error("Write operation aborted");
262+
}
263+
243264
// Create parent directories if they don't exist
244265
const parentDir = path.dirname(filePath);
245266
await fsPromises.mkdir(parentDir, { recursive: true });
@@ -249,8 +270,19 @@ export class LocalRuntime implements Runtime {
249270
const nodeStream = fs.createWriteStream(tempPath);
250271
const webStream = Writable.toWeb(nodeStream) as WritableStream<Uint8Array>;
251272
writer = webStream.getWriter();
273+
274+
// Set up abort listener
275+
const abortHandler = () => {
276+
writer.abort("Write operation aborted").catch(() => {
277+
// Ignore errors during abort
278+
});
279+
};
280+
abortSignal?.addEventListener("abort", abortHandler);
252281
},
253282
async write(chunk: Uint8Array) {
283+
if (abortSignal?.aborted) {
284+
throw new Error("Write operation aborted");
285+
}
254286
await writer.write(chunk);
255287
},
256288
async close() {
@@ -282,7 +314,12 @@ export class LocalRuntime implements Runtime {
282314
});
283315
}
284316

285-
async stat(filePath: string): Promise<FileStat> {
317+
async stat(filePath: string, abortSignal?: AbortSignal): Promise<FileStat> {
318+
// Check if already aborted
319+
if (abortSignal?.aborted) {
320+
throw new Error("Stat operation aborted");
321+
}
322+
286323
try {
287324
const stats = await fsPromises.stat(filePath);
288325
return {
@@ -450,10 +487,15 @@ export class LocalRuntime implements Runtime {
450487
async renameWorkspace(
451488
projectPath: string,
452489
oldName: string,
453-
newName: string
490+
newName: string,
491+
abortSignal?: AbortSignal
454492
): Promise<
455493
{ success: true; oldPath: string; newPath: string } | { success: false; error: string }
456494
> {
495+
// Check if already aborted
496+
if (abortSignal?.aborted) {
497+
return { success: false, error: "Rename operation aborted" };
498+
}
457499
// Compute workspace paths using canonical method
458500
const oldPath = this.getWorkspacePath(projectPath, oldName);
459501
const newPath = this.getWorkspacePath(projectPath, newName);
@@ -473,8 +515,14 @@ export class LocalRuntime implements Runtime {
473515
async deleteWorkspace(
474516
projectPath: string,
475517
workspaceName: string,
476-
force: boolean
518+
force: boolean,
519+
abortSignal?: AbortSignal
477520
): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> {
521+
// Check if already aborted
522+
if (abortSignal?.aborted) {
523+
return { success: false, error: "Delete operation aborted" };
524+
}
525+
478526
// Compute workspace path using the canonical method
479527
const deletedPath = this.getWorkspacePath(projectPath, workspaceName);
480528

src/runtime/Runtime.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ export interface WorkspaceCreationParams {
119119
directoryName: string;
120120
/** Logger for streaming creation progress and init hook output */
121121
initLogger: InitLogger;
122+
/** Optional abort signal for cancellation */
123+
abortSignal?: AbortSignal;
122124
}
123125

124126
/**
@@ -145,6 +147,8 @@ export interface WorkspaceInitParams {
145147
workspacePath: string;
146148
/** Logger for streaming initialization progress and output */
147149
initLogger: InitLogger;
150+
/** Optional abort signal for cancellation */
151+
abortSignal?: AbortSignal;
148152
}
149153

150154
/**
@@ -174,26 +178,29 @@ export interface Runtime {
174178
/**
175179
* Read file contents as a stream
176180
* @param path Absolute or relative path to file
181+
* @param abortSignal Optional abort signal for cancellation
177182
* @returns Readable stream of file contents
178183
* @throws RuntimeError if file cannot be read
179184
*/
180-
readFile(path: string): ReadableStream<Uint8Array>;
185+
readFile(path: string, abortSignal?: AbortSignal): ReadableStream<Uint8Array>;
181186

182187
/**
183188
* Write file contents atomically from a stream
184189
* @param path Absolute or relative path to file
190+
* @param abortSignal Optional abort signal for cancellation
185191
* @returns Writable stream for file contents
186192
* @throws RuntimeError if file cannot be written
187193
*/
188-
writeFile(path: string): WritableStream<Uint8Array>;
194+
writeFile(path: string, abortSignal?: AbortSignal): WritableStream<Uint8Array>;
189195

190196
/**
191197
* Get file statistics
192198
* @param path Absolute or relative path to file/directory
199+
* @param abortSignal Optional abort signal for cancellation
193200
* @returns File statistics
194201
* @throws RuntimeError if path does not exist or cannot be accessed
195202
*/
196-
stat(path: string): Promise<FileStat>;
203+
stat(path: string, abortSignal?: AbortSignal): Promise<FileStat>;
197204

198205
/**
199206
* Normalize a path for comparison purposes within this runtime's context.
@@ -258,12 +265,14 @@ export interface Runtime {
258265
* @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name)
259266
* @param oldName Current workspace name
260267
* @param newName New workspace name
268+
* @param abortSignal Optional abort signal for cancellation
261269
* @returns Promise resolving to Result with old/new paths on success, or error message
262270
*/
263271
renameWorkspace(
264272
projectPath: string,
265273
oldName: string,
266-
newName: string
274+
newName: string,
275+
abortSignal?: AbortSignal
267276
): Promise<
268277
{ success: true; oldPath: string; newPath: string } | { success: false; error: string }
269278
>;
@@ -281,12 +290,14 @@ export interface Runtime {
281290
* @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name)
282291
* @param workspaceName Workspace name to delete
283292
* @param force If true, force deletion even with uncommitted changes or special conditions (submodules, etc.)
293+
* @param abortSignal Optional abort signal for cancellation
284294
* @returns Promise resolving to Result with deleted path on success, or error message
285295
*/
286296
deleteWorkspace(
287297
projectPath: string,
288298
workspaceName: string,
289-
force: boolean
299+
force: boolean,
300+
abortSignal?: AbortSignal
290301
): Promise<{ success: true; deletedPath: string } | { success: false; error: string }>;
291302
}
292303

src/runtime/SSHRuntime.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,15 @@ export class SSHRuntime implements Runtime {
184184
/**
185185
* Read file contents over SSH as a stream
186186
*/
187-
readFile(path: string): ReadableStream<Uint8Array> {
187+
readFile(path: string, abortSignal?: AbortSignal): ReadableStream<Uint8Array> {
188188
// Return stdout, but wrap to handle errors from exec() and exit code
189189
return new ReadableStream<Uint8Array>({
190190
start: async (controller: ReadableStreamDefaultController<Uint8Array>) => {
191191
try {
192192
const stream = await this.exec(`cat ${shescape.quote(path)}`, {
193193
cwd: this.config.srcBaseDir,
194194
timeout: 300, // 5 minutes - reasonable for large files
195+
abortSignal,
195196
});
196197

197198
const reader = stream.stdout.getReader();
@@ -232,7 +233,7 @@ export class SSHRuntime implements Runtime {
232233
/**
233234
* Write file contents over SSH atomically from a stream
234235
*/
235-
writeFile(path: string): WritableStream<Uint8Array> {
236+
writeFile(path: string, abortSignal?: AbortSignal): WritableStream<Uint8Array> {
236237
const tempPath = `${path}.tmp.${Date.now()}`;
237238
// Create parent directory if needed, then write file atomically
238239
// Use shescape.quote for safe path escaping
@@ -245,6 +246,7 @@ export class SSHRuntime implements Runtime {
245246
execPromise ??= this.exec(writeCommand, {
246247
cwd: this.config.srcBaseDir,
247248
timeout: 300, // 5 minutes - reasonable for large files
249+
abortSignal,
248250
});
249251
return execPromise;
250252
};
@@ -282,12 +284,13 @@ export class SSHRuntime implements Runtime {
282284
/**
283285
* Get file statistics over SSH
284286
*/
285-
async stat(path: string): Promise<FileStat> {
287+
async stat(path: string, _abortSignal?: AbortSignal): Promise<FileStat> {
286288
// Use stat with format string to get: size, mtime, type
287289
// %s = size, %Y = mtime (seconds since epoch), %F = file type
290+
// Note: timeout is <10s so no abort signal needed per requirement
288291
const stream = await this.exec(`stat -c '%s %Y %F' ${shescape.quote(path)}`, {
289292
cwd: this.config.srcBaseDir,
290-
timeout: 10, // 10 seconds - stat should be fast
293+
timeout: 10, // 10 seconds - stat should be fast (no abort needed per requirement)
291294
});
292295

293296
const [stdout, stderr, exitCode] = await Promise.all([
@@ -404,7 +407,8 @@ export class SSHRuntime implements Runtime {
404407
private async syncProjectToRemote(
405408
projectPath: string,
406409
workspacePath: string,
407-
initLogger: InitLogger
410+
initLogger: InitLogger,
411+
abortSignal?: AbortSignal
408412
): Promise<void> {
409413
// Use timestamp-based bundle path to avoid conflicts (simpler than $$)
410414
const timestamp = Date.now();
@@ -470,6 +474,7 @@ export class SSHRuntime implements Runtime {
470474
const cloneStream = await this.exec(`git clone --quiet ${bundleTempPath} ${cloneDestPath}`, {
471475
cwd: "~",
472476
timeout: 300, // 5 minutes for clone
477+
abortSignal,
473478
});
474479

475480
const [cloneStdout, cloneStderr, cloneExitCode] = await Promise.all([
@@ -491,6 +496,7 @@ export class SSHRuntime implements Runtime {
491496
{
492497
cwd: "~",
493498
timeout: 30,
499+
abortSignal,
494500
}
495501
);
496502
await createTrackingBranchesStream.exitCode;
@@ -561,7 +567,8 @@ export class SSHRuntime implements Runtime {
561567
private async runInitHook(
562568
projectPath: string,
563569
workspacePath: string,
564-
initLogger: InitLogger
570+
initLogger: InitLogger,
571+
abortSignal?: AbortSignal
565572
): Promise<void> {
566573
// Check if hook exists locally (we synced the project, so local check is sufficient)
567574
const hookExists = await checkInitHookExists(projectPath);
@@ -582,6 +589,7 @@ export class SSHRuntime implements Runtime {
582589
const hookStream = await this.exec(hookCommand, {
583590
cwd: workspacePath, // Run in the workspace directory
584591
timeout: 3600, // 1 hour - generous timeout for init hooks
592+
abortSignal,
585593
});
586594

587595
// Create line-buffered loggers
@@ -685,13 +693,13 @@ export class SSHRuntime implements Runtime {
685693
}
686694

687695
async initWorkspace(params: WorkspaceInitParams): Promise<WorkspaceInitResult> {
688-
const { projectPath, branchName, trunkBranch, workspacePath, initLogger } = params;
696+
const { projectPath, branchName, trunkBranch, workspacePath, initLogger, abortSignal } = params;
689697

690698
try {
691699
// 1. Sync project to remote (opportunistic rsync with scp fallback)
692700
initLogger.logStep("Syncing project files to remote...");
693701
try {
694-
await this.syncProjectToRemote(projectPath, workspacePath, initLogger);
702+
await this.syncProjectToRemote(projectPath, workspacePath, initLogger, abortSignal);
695703
} catch (error) {
696704
const errorMsg = getErrorMessage(error);
697705
initLogger.logStderr(`Failed to sync project: ${errorMsg}`);
@@ -715,6 +723,7 @@ export class SSHRuntime implements Runtime {
715723
const checkoutStream = await this.exec(checkoutCmd, {
716724
cwd: workspacePath, // Use the full workspace path for git operations
717725
timeout: 300, // 5 minutes for git checkout (can be slow on large repos)
726+
abortSignal,
718727
});
719728

720729
const [stdout, stderr, exitCode] = await Promise.all([
@@ -738,7 +747,7 @@ export class SSHRuntime implements Runtime {
738747
// Note: runInitHook calls logComplete() internally if hook exists
739748
const hookExists = await checkInitHookExists(projectPath);
740749
if (hookExists) {
741-
await this.runInitHook(projectPath, workspacePath, initLogger);
750+
await this.runInitHook(projectPath, workspacePath, initLogger, abortSignal);
742751
} else {
743752
// No hook - signal completion immediately
744753
initLogger.logComplete(0);
@@ -759,10 +768,15 @@ export class SSHRuntime implements Runtime {
759768
async renameWorkspace(
760769
projectPath: string,
761770
oldName: string,
762-
newName: string
771+
newName: string,
772+
abortSignal?: AbortSignal
763773
): Promise<
764774
{ success: true; oldPath: string; newPath: string } | { success: false; error: string }
765775
> {
776+
// Check if already aborted
777+
if (abortSignal?.aborted) {
778+
return { success: false, error: "Rename operation aborted" };
779+
}
766780
// Compute workspace paths using canonical method
767781
const oldPath = this.getWorkspacePath(projectPath, oldName);
768782
const newPath = this.getWorkspacePath(projectPath, newName);
@@ -780,6 +794,7 @@ export class SSHRuntime implements Runtime {
780794
const stream = await this.exec(moveCommand, {
781795
cwd: this.config.srcBaseDir,
782796
timeout: 30,
797+
abortSignal,
783798
});
784799

785800
await stream.stdin.close();
@@ -814,8 +829,14 @@ export class SSHRuntime implements Runtime {
814829
async deleteWorkspace(
815830
projectPath: string,
816831
workspaceName: string,
817-
force: boolean
832+
force: boolean,
833+
abortSignal?: AbortSignal
818834
): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> {
835+
// Check if already aborted
836+
if (abortSignal?.aborted) {
837+
return { success: false, error: "Delete operation aborted" };
838+
}
839+
819840
// Compute workspace path using canonical method
820841
const deletedPath = this.getWorkspacePath(projectPath, workspaceName);
821842

@@ -865,6 +886,7 @@ export class SSHRuntime implements Runtime {
865886
const stream = await this.exec(removeCommand, {
866887
cwd: this.config.srcBaseDir,
867888
timeout: 30,
889+
abortSignal,
868890
});
869891

870892
await stream.stdin.close();

src/services/tools/bash.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
412412
const fullOutput = lines.join("\n");
413413

414414
// Use runtime.writeFile() for SSH support
415-
const writer = config.runtime.writeFile(overflowPath);
415+
const writer = config.runtime.writeFile(overflowPath, abortSignal);
416416
const encoder = new TextEncoder();
417417
const writerInstance = writer.getWriter();
418418
await writerInstance.write(encoder.encode(fullOutput));

0 commit comments

Comments
 (0)