Skip to content

Commit a4421ef

Browse files
committed
🤖 Pass abort controllers through runtime stack
Add abort signal support to all long-running runtime operations to prevent indefinite hangs. Operations with timeouts <10s are excluded per design to avoid noise. Changes: - Runtime interface: Add optional abortSignal to readFile/writeFile/stat and workspace operations - LocalRuntime: Accept and check abort signals in file operations and workspace lifecycle - SSHRuntime: Pass abort signals through to exec() calls for long-running operations: - File I/O (300s timeout) - Git clone operations (300s timeout) - Init hook execution (3600s timeout) - Workspace rename/delete (30s timeout) - Helper functions: Accept and propagate abort signals through readFileString/writeFileString - Bash tool: Pass abort signal to runtime.writeFile() for overflow files All abort signals are optional parameters for backward compatibility. Testing: - 794 unit tests pass - 57 integration tests pass (executeBash, runtimeExecuteBash, runtimeFileEditing, createWorkspace, renameWorkspace, removeWorkspace) - Typecheck and lint clean
1 parent 71e965c commit a4421ef

File tree

5 files changed

+112
-26
lines changed

5 files changed

+112
-26
lines changed

src/runtime/LocalRuntime.ts

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,28 @@ export class LocalRuntime implements Runtime {
209209
return { stdout, stderr, stdin, exitCode, duration };
210210
}
211211

212-
readFile(filePath: string): ReadableStream<Uint8Array> {
212+
readFile(filePath: string, abortSignal?: AbortSignal): ReadableStream<Uint8Array> {
213213
const nodeStream = fs.createReadStream(filePath);
214214

215215
// Handle errors by wrapping in a transform
216216
const webStream = Readable.toWeb(nodeStream) as unknown as ReadableStream<Uint8Array>;
217217

218218
return new ReadableStream<Uint8Array>({
219219
async start(controller: ReadableStreamDefaultController<Uint8Array>) {
220+
// Check if already aborted
221+
if (abortSignal?.aborted) {
222+
controller.error(new Error("Read operation aborted"));
223+
nodeStream.destroy();
224+
return;
225+
}
226+
227+
// Set up abort listener
228+
const abortHandler = () => {
229+
controller.error(new Error("Read operation aborted"));
230+
nodeStream.destroy();
231+
};
232+
abortSignal?.addEventListener("abort", abortHandler);
233+
220234
try {
221235
const reader = webStream.getReader();
222236
while (true) {
@@ -233,17 +247,24 @@ export class LocalRuntime implements Runtime {
233247
err instanceof Error ? err : undefined
234248
)
235249
);
250+
} finally {
251+
abortSignal?.removeEventListener("abort", abortHandler);
236252
}
237253
},
238254
});
239255
}
240256

241-
writeFile(filePath: string): WritableStream<Uint8Array> {
257+
writeFile(filePath: string, abortSignal?: AbortSignal): WritableStream<Uint8Array> {
242258
let tempPath: string;
243259
let writer: WritableStreamDefaultWriter<Uint8Array>;
244260

245261
return new WritableStream<Uint8Array>({
246262
async start() {
263+
// Check if already aborted
264+
if (abortSignal?.aborted) {
265+
throw new Error("Write operation aborted");
266+
}
267+
247268
// Create parent directories if they don't exist
248269
const parentDir = path.dirname(filePath);
249270
await fsPromises.mkdir(parentDir, { recursive: true });
@@ -253,8 +274,19 @@ export class LocalRuntime implements Runtime {
253274
const nodeStream = fs.createWriteStream(tempPath);
254275
const webStream = Writable.toWeb(nodeStream) as WritableStream<Uint8Array>;
255276
writer = webStream.getWriter();
277+
278+
// Set up abort listener
279+
const abortHandler = () => {
280+
writer.abort("Write operation aborted").catch(() => {
281+
// Ignore errors during abort
282+
});
283+
};
284+
abortSignal?.addEventListener("abort", abortHandler);
256285
},
257286
async write(chunk: Uint8Array) {
287+
if (abortSignal?.aborted) {
288+
throw new Error("Write operation aborted");
289+
}
258290
await writer.write(chunk);
259291
},
260292
async close() {
@@ -286,7 +318,12 @@ export class LocalRuntime implements Runtime {
286318
});
287319
}
288320

289-
async stat(filePath: string): Promise<FileStat> {
321+
async stat(filePath: string, abortSignal?: AbortSignal): Promise<FileStat> {
322+
// Check if already aborted
323+
if (abortSignal?.aborted) {
324+
throw new Error("Stat operation aborted");
325+
}
326+
290327
try {
291328
const stats = await fsPromises.stat(filePath);
292329
return {
@@ -462,10 +499,15 @@ export class LocalRuntime implements Runtime {
462499
async renameWorkspace(
463500
projectPath: string,
464501
oldName: string,
465-
newName: string
502+
newName: string,
503+
abortSignal?: AbortSignal
466504
): Promise<
467505
{ success: true; oldPath: string; newPath: string } | { success: false; error: string }
468506
> {
507+
// Check if already aborted
508+
if (abortSignal?.aborted) {
509+
return { success: false, error: "Rename operation aborted" };
510+
}
469511
// Compute workspace paths using canonical method
470512
const oldPath = this.getWorkspacePath(projectPath, oldName);
471513
const newPath = this.getWorkspacePath(projectPath, newName);
@@ -485,8 +527,14 @@ export class LocalRuntime implements Runtime {
485527
async deleteWorkspace(
486528
projectPath: string,
487529
workspaceName: string,
488-
force: boolean
530+
force: boolean,
531+
abortSignal?: AbortSignal
489532
): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> {
533+
// Check if already aborted
534+
if (abortSignal?.aborted) {
535+
return { success: false, error: "Delete operation aborted" };
536+
}
537+
490538
// Compute workspace path using the canonical method
491539
const deletedPath = this.getWorkspacePath(projectPath, workspaceName);
492540

src/runtime/Runtime.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ export interface WorkspaceCreationParams {
119119
directoryName: string;
120120
/** Logger for streaming creation progress and init hook output */
121121
initLogger: InitLogger;
122+
/** Optional abort signal for cancellation */
123+
abortSignal?: AbortSignal;
122124
}
123125

124126
/**
@@ -145,6 +147,8 @@ export interface WorkspaceInitParams {
145147
workspacePath: string;
146148
/** Logger for streaming initialization progress and output */
147149
initLogger: InitLogger;
150+
/** Optional abort signal for cancellation */
151+
abortSignal?: AbortSignal;
148152
}
149153

150154
/**
@@ -208,26 +212,29 @@ export interface Runtime {
208212
/**
209213
* Read file contents as a stream
210214
* @param path Absolute or relative path to file
215+
* @param abortSignal Optional abort signal for cancellation
211216
* @returns Readable stream of file contents
212217
* @throws RuntimeError if file cannot be read
213218
*/
214-
readFile(path: string): ReadableStream<Uint8Array>;
219+
readFile(path: string, abortSignal?: AbortSignal): ReadableStream<Uint8Array>;
215220

216221
/**
217222
* Write file contents atomically from a stream
218223
* @param path Absolute or relative path to file
224+
* @param abortSignal Optional abort signal for cancellation
219225
* @returns Writable stream for file contents
220226
* @throws RuntimeError if file cannot be written
221227
*/
222-
writeFile(path: string): WritableStream<Uint8Array>;
228+
writeFile(path: string, abortSignal?: AbortSignal): WritableStream<Uint8Array>;
223229

224230
/**
225231
* Get file statistics
226232
* @param path Absolute or relative path to file/directory
233+
* @param abortSignal Optional abort signal for cancellation
227234
* @returns File statistics
228235
* @throws RuntimeError if path does not exist or cannot be accessed
229236
*/
230-
stat(path: string): Promise<FileStat>;
237+
stat(path: string, abortSignal?: AbortSignal): Promise<FileStat>;
231238

232239
/**
233240
* Resolve a path to its absolute, canonical form (expanding tildes, resolving symlinks, etc.).
@@ -310,12 +317,14 @@ export interface Runtime {
310317
* @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name)
311318
* @param oldName Current workspace name
312319
* @param newName New workspace name
320+
* @param abortSignal Optional abort signal for cancellation
313321
* @returns Promise resolving to Result with old/new paths on success, or error message
314322
*/
315323
renameWorkspace(
316324
projectPath: string,
317325
oldName: string,
318-
newName: string
326+
newName: string,
327+
abortSignal?: AbortSignal
319328
): Promise<
320329
{ success: true; oldPath: string; newPath: string } | { success: false; error: string }
321330
>;
@@ -333,12 +342,14 @@ export interface Runtime {
333342
* @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name)
334343
* @param workspaceName Workspace name to delete
335344
* @param force If true, force deletion even with uncommitted changes or special conditions (submodules, etc.)
345+
* @param abortSignal Optional abort signal for cancellation
336346
* @returns Promise resolving to Result with deleted path on success, or error message
337347
*/
338348
deleteWorkspace(
339349
projectPath: string,
340350
workspaceName: string,
341-
force: boolean
351+
force: boolean,
352+
abortSignal?: AbortSignal
342353
): Promise<{ success: true; deletedPath: string } | { success: false; error: string }>;
343354

344355
/**

src/runtime/SSHRuntime.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,15 @@ export class SSHRuntime implements Runtime {
192192
/**
193193
* Read file contents over SSH as a stream
194194
*/
195-
readFile(path: string): ReadableStream<Uint8Array> {
195+
readFile(path: string, abortSignal?: AbortSignal): ReadableStream<Uint8Array> {
196196
// Return stdout, but wrap to handle errors from exec() and exit code
197197
return new ReadableStream<Uint8Array>({
198198
start: async (controller: ReadableStreamDefaultController<Uint8Array>) => {
199199
try {
200200
const stream = await this.exec(`cat ${shescape.quote(path)}`, {
201201
cwd: this.config.srcBaseDir,
202202
timeout: 300, // 5 minutes - reasonable for large files
203+
abortSignal,
203204
});
204205

205206
const reader = stream.stdout.getReader();
@@ -240,7 +241,7 @@ export class SSHRuntime implements Runtime {
240241
/**
241242
* Write file contents over SSH atomically from a stream
242243
*/
243-
writeFile(path: string): WritableStream<Uint8Array> {
244+
writeFile(path: string, abortSignal?: AbortSignal): WritableStream<Uint8Array> {
244245
const tempPath = `${path}.tmp.${Date.now()}`;
245246
// Create parent directory if needed, then write file atomically
246247
// Use shescape.quote for safe path escaping
@@ -253,6 +254,7 @@ export class SSHRuntime implements Runtime {
253254
execPromise ??= this.exec(writeCommand, {
254255
cwd: this.config.srcBaseDir,
255256
timeout: 300, // 5 minutes - reasonable for large files
257+
abortSignal,
256258
});
257259
return execPromise;
258260
};
@@ -290,12 +292,13 @@ export class SSHRuntime implements Runtime {
290292
/**
291293
* Get file statistics over SSH
292294
*/
293-
async stat(path: string): Promise<FileStat> {
295+
async stat(path: string, _abortSignal?: AbortSignal): Promise<FileStat> {
294296
// Use stat with format string to get: size, mtime, type
295297
// %s = size, %Y = mtime (seconds since epoch), %F = file type
298+
// Note: timeout is <10s so no abort signal needed per requirement
296299
const stream = await this.exec(`stat -c '%s %Y %F' ${shescape.quote(path)}`, {
297300
cwd: this.config.srcBaseDir,
298-
timeout: 10, // 10 seconds - stat should be fast
301+
timeout: 10, // 10 seconds - stat should be fast (no abort needed per requirement)
299302
});
300303

301304
const [stdout, stderr, exitCode] = await Promise.all([
@@ -482,7 +485,8 @@ export class SSHRuntime implements Runtime {
482485
private async syncProjectToRemote(
483486
projectPath: string,
484487
workspacePath: string,
485-
initLogger: InitLogger
488+
initLogger: InitLogger,
489+
abortSignal?: AbortSignal
486490
): Promise<void> {
487491
// Use timestamp-based bundle path to avoid conflicts (simpler than $$)
488492
const timestamp = Date.now();
@@ -548,6 +552,7 @@ export class SSHRuntime implements Runtime {
548552
const cloneStream = await this.exec(`git clone --quiet ${bundleTempPath} ${cloneDestPath}`, {
549553
cwd: "~",
550554
timeout: 300, // 5 minutes for clone
555+
abortSignal,
551556
});
552557

553558
const [cloneStdout, cloneStderr, cloneExitCode] = await Promise.all([
@@ -569,6 +574,7 @@ export class SSHRuntime implements Runtime {
569574
{
570575
cwd: "~",
571576
timeout: 30,
577+
abortSignal,
572578
}
573579
);
574580
await createTrackingBranchesStream.exitCode;
@@ -639,7 +645,8 @@ export class SSHRuntime implements Runtime {
639645
private async runInitHook(
640646
projectPath: string,
641647
workspacePath: string,
642-
initLogger: InitLogger
648+
initLogger: InitLogger,
649+
abortSignal?: AbortSignal
643650
): Promise<void> {
644651
// Check if hook exists locally (we synced the project, so local check is sufficient)
645652
const hookExists = await checkInitHookExists(projectPath);
@@ -660,6 +667,7 @@ export class SSHRuntime implements Runtime {
660667
const hookStream = await this.exec(hookCommand, {
661668
cwd: workspacePath, // Run in the workspace directory
662669
timeout: 3600, // 1 hour - generous timeout for init hooks
670+
abortSignal,
663671
});
664672

665673
// Create line-buffered loggers
@@ -763,13 +771,13 @@ export class SSHRuntime implements Runtime {
763771
}
764772

765773
async initWorkspace(params: WorkspaceInitParams): Promise<WorkspaceInitResult> {
766-
const { projectPath, branchName, trunkBranch, workspacePath, initLogger } = params;
774+
const { projectPath, branchName, trunkBranch, workspacePath, initLogger, abortSignal } = params;
767775

768776
try {
769777
// 1. Sync project to remote (opportunistic rsync with scp fallback)
770778
initLogger.logStep("Syncing project files to remote...");
771779
try {
772-
await this.syncProjectToRemote(projectPath, workspacePath, initLogger);
780+
await this.syncProjectToRemote(projectPath, workspacePath, initLogger, abortSignal);
773781
} catch (error) {
774782
const errorMsg = getErrorMessage(error);
775783
initLogger.logStderr(`Failed to sync project: ${errorMsg}`);
@@ -793,6 +801,7 @@ export class SSHRuntime implements Runtime {
793801
const checkoutStream = await this.exec(checkoutCmd, {
794802
cwd: workspacePath, // Use the full workspace path for git operations
795803
timeout: 300, // 5 minutes for git checkout (can be slow on large repos)
804+
abortSignal,
796805
});
797806

798807
const [stdout, stderr, exitCode] = await Promise.all([
@@ -816,7 +825,7 @@ export class SSHRuntime implements Runtime {
816825
// Note: runInitHook calls logComplete() internally if hook exists
817826
const hookExists = await checkInitHookExists(projectPath);
818827
if (hookExists) {
819-
await this.runInitHook(projectPath, workspacePath, initLogger);
828+
await this.runInitHook(projectPath, workspacePath, initLogger, abortSignal);
820829
} else {
821830
// No hook - signal completion immediately
822831
initLogger.logComplete(0);
@@ -837,10 +846,15 @@ export class SSHRuntime implements Runtime {
837846
async renameWorkspace(
838847
projectPath: string,
839848
oldName: string,
840-
newName: string
849+
newName: string,
850+
abortSignal?: AbortSignal
841851
): Promise<
842852
{ success: true; oldPath: string; newPath: string } | { success: false; error: string }
843853
> {
854+
// Check if already aborted
855+
if (abortSignal?.aborted) {
856+
return { success: false, error: "Rename operation aborted" };
857+
}
844858
// Compute workspace paths using canonical method
845859
const oldPath = this.getWorkspacePath(projectPath, oldName);
846860
const newPath = this.getWorkspacePath(projectPath, newName);
@@ -858,6 +872,7 @@ export class SSHRuntime implements Runtime {
858872
const stream = await this.exec(moveCommand, {
859873
cwd: this.config.srcBaseDir,
860874
timeout: 30,
875+
abortSignal,
861876
});
862877

863878
await stream.stdin.close();
@@ -892,8 +907,14 @@ export class SSHRuntime implements Runtime {
892907
async deleteWorkspace(
893908
projectPath: string,
894909
workspaceName: string,
895-
force: boolean
910+
force: boolean,
911+
abortSignal?: AbortSignal
896912
): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> {
913+
// Check if already aborted
914+
if (abortSignal?.aborted) {
915+
return { success: false, error: "Delete operation aborted" };
916+
}
917+
897918
// Compute workspace path using canonical method
898919
const deletedPath = this.getWorkspacePath(projectPath, workspaceName);
899920

@@ -943,6 +964,7 @@ export class SSHRuntime implements Runtime {
943964
const stream = await this.exec(removeCommand, {
944965
cwd: this.config.srcBaseDir,
945966
timeout: 30,
967+
abortSignal,
946968
});
947969

948970
await stream.stdin.close();

src/services/tools/bash.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
425425
const fullOutput = lines.join("\n");
426426

427427
// Use runtime.writeFile() for SSH support
428-
const writer = config.runtime.writeFile(overflowPath);
428+
const writer = config.runtime.writeFile(overflowPath, abortSignal);
429429
const encoder = new TextEncoder();
430430
const writerInstance = writer.getWriter();
431431
await writerInstance.write(encoder.encode(fullOutput));

0 commit comments

Comments
 (0)