Skip to content

Commit 08bad34

Browse files
committed
🤖 Eliminate race conditions in init state with promise-based waiting
Replace event-based waiting with promise-based state map in InitStateManager. This eliminates race conditions between timeout and event firing, simplifies cleanup, and handles workspace deletion correctly. Changes: - Add initPromises map to store completion promises for running inits - startInit() creates promise, endInit() resolves it - waitForInit() uses Promise.race() for clean timeout handling - clearInMemoryState() rejects orphaned promises on workspace deletion - No event listener cleanup needed (promises auto-resolve) Extended integration tests: - Verify tools wait for init completion before executing - Send second message after init completes to verify state persistence - Use failed init (exit code 1) to verify tools proceed regardless of status - Both messages succeed, second is faster (no init wait) Benefits: - Multiple tools share same promise (no duplicate listeners) - Promise.race() handles timeout cleanly (no manual cleanup) - Workspace deletion properly cancels pending waits - Simpler code, fewer edge cases, easier to reason about Tests: All 7 integration tests pass (19s), 111 tool tests pass (3.5s)
1 parent f36f7a4 commit 08bad34

20 files changed

+632
-63
lines changed

src/debug/agentSessionCli.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import { parseArgs } from "util";
77
import { Config } from "@/config";
88
import { HistoryService } from "@/services/historyService";
99
import { PartialService } from "@/services/partialService";
10-
import { AIService } from "@/services/aiService";
1110
import { InitStateManager } from "@/services/initStateManager";
11+
import { AIService } from "@/services/aiService";
1212
import { AgentSession, type AgentSessionChatEvent } from "@/services/agentSession";
1313
import {
1414
isCaughtUpMessage,
@@ -209,8 +209,8 @@ async function main(): Promise<void> {
209209

210210
const historyService = new HistoryService(config);
211211
const partialService = new PartialService(config, historyService);
212-
const aiService = new AIService(config, historyService, partialService);
213212
const initStateManager = new InitStateManager(config);
213+
const aiService = new AIService(config, historyService, partialService, initStateManager);
214214
ensureProvidersConfig(config);
215215

216216
const session = new AgentSession({

src/debug/replay-history.ts

100755100644
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { parseArgs } from "util";
1717
import { defaultConfig } from "@/config";
1818
import type { CmuxMessage } from "@/types/message";
1919
import { createCmuxMessage } from "@/types/message";
20+
import { InitStateManager } from "@/services/initStateManager";
2021
import { AIService } from "@/services/aiService";
2122
import { HistoryService } from "@/services/historyService";
2223
import { PartialService } from "@/services/partialService";
@@ -123,7 +124,8 @@ async function main() {
123124
const config = defaultConfig;
124125
const historyService = new HistoryService(config);
125126
const partialService = new PartialService(config, historyService);
126-
const aiService = new AIService(config, historyService, partialService);
127+
const initStateManager = new InitStateManager(config);
128+
const aiService = new AIService(config, historyService, partialService, initStateManager);
127129

128130
const modelString = values.model ?? "openai:gpt-5-codex";
129131
const thinkingLevel = (values.thinking ?? "high") as "low" | "medium" | "high";

src/services/aiService.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { describe, it, expect, beforeEach } from "bun:test";
66
import { AIService } from "./aiService";
77
import { HistoryService } from "./historyService";
88
import { PartialService } from "./partialService";
9+
import { InitStateManager } from "./initStateManager";
910
import { Config } from "@/config";
1011

1112
describe("AIService", () => {
@@ -15,7 +16,8 @@ describe("AIService", () => {
1516
const config = new Config();
1617
const historyService = new HistoryService(config);
1718
const partialService = new PartialService(config, historyService);
18-
service = new AIService(config, historyService, partialService);
19+
const initStateManager = new InitStateManager(config);
20+
service = new AIService(config, historyService, partialService, initStateManager);
1921
});
2022

2123
// Note: These tests are placeholders as Bun doesn't support Jest mocking

src/services/aiService.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type { CmuxMessage, CmuxTextPart } from "@/types/message";
1212
import { createCmuxMessage } from "@/types/message";
1313
import type { Config } from "@/config";
1414
import { StreamManager } from "./streamManager";
15+
import { InitStateManager } from "./initStateManager";
1516
import type { SendMessageError } from "@/types/errors";
1617
import { getToolsForModel } from "@/utils/tools/tools";
1718
import { createRuntime } from "@/runtime/runtimeFactory";
@@ -108,17 +109,24 @@ export class AIService extends EventEmitter {
108109
private readonly historyService: HistoryService;
109110
private readonly partialService: PartialService;
110111
private readonly config: Config;
112+
private readonly initStateManager: InitStateManager;
111113
private readonly mockModeEnabled: boolean;
112114
private readonly mockScenarioPlayer?: MockScenarioPlayer;
113115

114-
constructor(config: Config, historyService: HistoryService, partialService: PartialService) {
116+
constructor(
117+
config: Config,
118+
historyService: HistoryService,
119+
partialService: PartialService,
120+
initStateManager: InitStateManager
121+
) {
115122
super();
116123
// Increase max listeners to accommodate multiple concurrent workspace listeners
117124
// Each workspace subscribes to stream events, and we expect >10 concurrent workspaces
118125
this.setMaxListeners(50);
119126
this.config = config;
120127
this.historyService = historyService;
121128
this.partialService = partialService;
129+
this.initStateManager = initStateManager;
122130
this.streamManager = new StreamManager(historyService, partialService);
123131
void this.ensureSessionsDir();
124132
this.setupStreamEventForwarding();
@@ -427,6 +435,8 @@ export class AIService extends EventEmitter {
427435
cwd: process.cwd(),
428436
runtime: earlyRuntime,
429437
runtimeTempDir: os.tmpdir(),
438+
workspaceId: "", // Empty workspace ID for early stub config
439+
initStateManager: this.initStateManager,
430440
secrets: {},
431441
});
432442
const earlyTools = applyToolPolicy(earlyAllTools, toolPolicy);
@@ -536,6 +546,8 @@ export class AIService extends EventEmitter {
536546
const allTools = await getToolsForModel(modelString, {
537547
cwd: workspacePath,
538548
runtime,
549+
workspaceId,
550+
initStateManager: this.initStateManager,
539551
secrets: secretsToRecord(projectSecrets),
540552
runtimeTempDir,
541553
});

src/services/initStateManager.ts

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,29 @@ type InitHookState = InitStatus;
4747
* - Permanent persistence (init logs kept forever as workspace metadata)
4848
*
4949
* Lifecycle:
50-
* 1. startInit() - Create in-memory state, emit init-start
50+
* 1. startInit() - Create in-memory state, emit init-start, create completion promise
5151
* 2. appendOutput() - Accumulate lines, emit init-output
52-
* 3. endInit() - Finalize state, write to disk, emit init-end
52+
* 3. endInit() - Finalize state, write to disk, emit init-end, resolve promise
5353
* 4. State remains in memory until cleared or process restart
5454
* 5. replayInit() - Re-emit events from in-memory or disk state (via EventStore)
55+
*
56+
* Waiting: Tools use waitForInit() which returns a promise that resolves when
57+
* init completes. This promise is stored in initPromises map and resolved by
58+
* endInit(). No event listeners needed, eliminating race conditions.
5559
*/
5660
export class InitStateManager extends EventEmitter {
5761
private readonly store: EventStore<InitHookState, WorkspaceInitEvent & { workspaceId: string }>;
5862

63+
/**
64+
* Promise-based completion tracking for running inits.
65+
* Each running init has a promise that resolves when endInit() is called.
66+
* Multiple tools can await the same promise without race conditions.
67+
*/
68+
private readonly initPromises: Map<
69+
string,
70+
{ promise: Promise<void>; resolve: () => void; reject: (error: Error) => void }
71+
> = new Map();
72+
5973
constructor(config: Config) {
6074
super();
6175
this.store = new EventStore(
@@ -111,7 +125,7 @@ export class InitStateManager extends EventEmitter {
111125

112126
/**
113127
* Start tracking a new init hook execution.
114-
* Creates in-memory state and emits init-start event.
128+
* Creates in-memory state, completion promise, and emits init-start event.
115129
*/
116130
startInit(workspaceId: string, hookPath: string): void {
117131
const startTime = Date.now();
@@ -127,6 +141,21 @@ export class InitStateManager extends EventEmitter {
127141

128142
this.store.setState(workspaceId, state);
129143

144+
// Create completion promise for this init
145+
// This allows multiple tools to await the same init without event listeners
146+
let resolve: () => void;
147+
let reject: (error: Error) => void;
148+
const promise = new Promise<void>((res, rej) => {
149+
resolve = res;
150+
reject = rej;
151+
});
152+
153+
this.initPromises.set(workspaceId, {
154+
promise,
155+
resolve: resolve!,
156+
reject: reject!,
157+
});
158+
130159
log.debug(`Init hook started for workspace ${workspaceId}: ${hookPath}`);
131160

132161
// Emit init-start event
@@ -167,7 +196,7 @@ export class InitStateManager extends EventEmitter {
167196

168197
/**
169198
* Finalize init hook execution.
170-
* Updates state, persists to disk, and emits init-end event.
199+
* Updates state, persists to disk, emits init-end event, and resolves completion promise.
171200
*/
172201
async endInit(workspaceId: string, exitCode: number): Promise<void> {
173202
const state = this.store.getState(workspaceId);
@@ -197,6 +226,13 @@ export class InitStateManager extends EventEmitter {
197226
timestamp: endTime,
198227
} satisfies WorkspaceInitEvent & { workspaceId: string });
199228

229+
// Resolve completion promise for waiting tools
230+
const promiseEntry = this.initPromises.get(workspaceId);
231+
if (promiseEntry) {
232+
promiseEntry.resolve();
233+
this.initPromises.delete(workspaceId);
234+
}
235+
200236
// Keep state in memory for replay (unlike streams which delete immediately)
201237
}
202238

@@ -244,8 +280,68 @@ export class InitStateManager extends EventEmitter {
244280
* Clear in-memory state for a workspace.
245281
* Useful for testing or cleanup after workspace deletion.
246282
* Does NOT delete disk file (use deleteInitStatus for that).
283+
*
284+
* Also cancels any running init promises to prevent orphaned waiters.
247285
*/
248286
clearInMemoryState(workspaceId: string): void {
249287
this.store.deleteState(workspaceId);
288+
289+
// Cancel any running init promise for this workspace
290+
const promiseEntry = this.initPromises.get(workspaceId);
291+
if (promiseEntry) {
292+
promiseEntry.reject(new Error(`Workspace ${workspaceId} was deleted`));
293+
this.initPromises.delete(workspaceId);
294+
}
295+
}
296+
297+
/**
298+
* Wait for workspace initialization to complete.
299+
* Used by tools (bash, file_*) to ensure files are ready before executing.
300+
*
301+
* Behavior:
302+
* - No init state: Returns immediately (init not needed or backwards compat)
303+
* - Init succeeded/failed: Returns immediately (let tool proceed/fail naturally)
304+
* - Init running: Waits for completion promise (up to 5 minutes)
305+
*
306+
* Promise-based approach eliminates race conditions:
307+
* - Multiple tools share the same promise (no duplicate listeners)
308+
* - No event cleanup needed (promise auto-resolves once)
309+
* - Timeout races handled by Promise.race()
310+
*
311+
* @param workspaceId Workspace ID to wait for
312+
* @throws Error if init times out after 5 minutes
313+
*/
314+
async waitForInit(workspaceId: string): Promise<void> {
315+
const state = this.getInitState(workspaceId);
316+
317+
// No init state - proceed immediately (backwards compat or init not needed)
318+
if (!state) {
319+
return;
320+
}
321+
322+
// Init already completed (success or failure) - proceed immediately
323+
if (state.status !== "running") {
324+
return;
325+
}
326+
327+
// Init is running - wait for completion promise with timeout
328+
const promiseEntry = this.initPromises.get(workspaceId);
329+
330+
if (!promiseEntry) {
331+
// State says running but no promise exists (shouldn't happen, but handle gracefully)
332+
log.error(`Init state is running for ${workspaceId} but no promise found, proceeding`);
333+
return;
334+
}
335+
336+
const INIT_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
337+
const timeoutPromise = new Promise<void>((_, reject) => {
338+
setTimeout(() => {
339+
reject(new Error("Workspace initialization timed out after 5 minutes"));
340+
}, INIT_TIMEOUT_MS);
341+
});
342+
343+
// Race between completion and timeout
344+
// If timeout wins, promise rejection propagates to caller
345+
await Promise.race([promiseEntry.promise, timeoutPromise]);
250346
}
251347
}

src/services/ipcMain.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,14 @@ export class IpcMain {
134134
this.config = config;
135135
this.historyService = new HistoryService(config);
136136
this.partialService = new PartialService(config, this.historyService);
137-
this.aiService = new AIService(config, this.historyService, this.partialService);
138-
this.bashService = new BashExecutionService();
139137
this.initStateManager = new InitStateManager(config);
138+
this.aiService = new AIService(
139+
config,
140+
this.historyService,
141+
this.partialService,
142+
this.initStateManager
143+
);
144+
this.bashService = new BashExecutionService();
140145
}
141146

142147
private getOrCreateSession(workspaceId: string): AgentSession {
@@ -922,6 +927,8 @@ export class IpcMain {
922927
const bashTool = createBashTool({
923928
cwd: workspacePath, // Bash executes in the workspace directory
924929
runtime,
930+
workspaceId,
931+
initStateManager: this.initStateManager,
925932
secrets: secretsToRecord(projectSecrets),
926933
niceness: options?.niceness,
927934
runtimeTempDir: tempDir.path,

0 commit comments

Comments
 (0)