Skip to content

Commit a7c08a2

Browse files
committed
fix: avoid scheduler deadlock for foreground awaits
Change-Id: I64ae7ece054cefa9d4862beabe565bc32a47a921 Signed-off-by: Thomas Kosiewski <tk@coder.com>
1 parent 67ebc01 commit a7c08a2

File tree

4 files changed

+166
-1
lines changed

4 files changed

+166
-1
lines changed

src/node/services/taskService.test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,107 @@ describe("TaskService", () => {
375375
expect(started?.taskStatus).toBe("running");
376376
}, 20_000);
377377

378+
test("does not count foreground-awaiting tasks towards maxParallelAgentTasks", async () => {
379+
const config = await createTestConfig(rootDir);
380+
stubStableIds(config, ["aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"], "dddddddddd");
381+
382+
const projectPath = await createTestProject(rootDir);
383+
384+
let streamingWorkspaceId: string | null = null;
385+
const { aiService } = createAIServiceMocks(config, {
386+
isStreaming: mock((workspaceId: string) => workspaceId === streamingWorkspaceId),
387+
});
388+
389+
const runtimeConfig = { type: "worktree" as const, srcBaseDir: config.srcDir };
390+
const runtime = createRuntime(runtimeConfig, { projectPath });
391+
const initLogger = createNullInitLogger();
392+
393+
const rootName = "root";
394+
await runtime.createWorkspace({
395+
projectPath,
396+
branchName: rootName,
397+
trunkBranch: "main",
398+
directoryName: rootName,
399+
initLogger,
400+
});
401+
402+
const rootWorkspaceId = "root-111";
403+
await config.saveConfig({
404+
projects: new Map([
405+
[
406+
projectPath,
407+
{
408+
workspaces: [
409+
{
410+
path: runtime.getWorkspacePath(projectPath, rootName),
411+
id: rootWorkspaceId,
412+
name: rootName,
413+
createdAt: new Date().toISOString(),
414+
runtimeConfig,
415+
},
416+
],
417+
},
418+
],
419+
]),
420+
taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 },
421+
});
422+
423+
const { workspaceService, sendMessage } = createWorkspaceServiceMocks();
424+
const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService });
425+
426+
const parentTask = await taskService.create({
427+
parentWorkspaceId: rootWorkspaceId,
428+
kind: "agent",
429+
agentType: "explore",
430+
prompt: "parent task",
431+
});
432+
expect(parentTask.success).toBe(true);
433+
if (!parentTask.success) return;
434+
streamingWorkspaceId = parentTask.data.taskId;
435+
436+
// With maxParallelAgentTasks=1, nested tasks will be created as queued.
437+
const childTask = await taskService.create({
438+
parentWorkspaceId: parentTask.data.taskId,
439+
kind: "agent",
440+
agentType: "explore",
441+
prompt: "child task",
442+
});
443+
expect(childTask.success).toBe(true);
444+
if (!childTask.success) return;
445+
expect(childTask.data.status).toBe("queued");
446+
447+
// Simulate a foreground await from the parent task workspace. This should allow the queued child
448+
// to start despite maxParallelAgentTasks=1, avoiding a scheduler deadlock.
449+
const waiter = taskService.waitForAgentReport(childTask.data.taskId, {
450+
timeoutMs: 10_000,
451+
requestingWorkspaceId: parentTask.data.taskId,
452+
});
453+
454+
const internal = taskService as unknown as {
455+
maybeStartQueuedTasks: () => Promise<void>;
456+
resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void;
457+
};
458+
459+
await internal.maybeStartQueuedTasks();
460+
461+
expect(sendMessage).toHaveBeenCalledWith(
462+
childTask.data.taskId,
463+
"child task",
464+
expect.anything(),
465+
expect.objectContaining({ allowQueuedAgentTask: true })
466+
);
467+
468+
const cfgAfterStart = config.loadConfigOrDefault();
469+
const startedEntry = Array.from(cfgAfterStart.projects.values())
470+
.flatMap((p) => p.workspaces)
471+
.find((w) => w.id === childTask.data.taskId);
472+
expect(startedEntry?.taskStatus).toBe("running");
473+
474+
internal.resolveWaiters(childTask.data.taskId, { reportMarkdown: "ok" });
475+
const report = await waiter;
476+
expect(report.reportMarkdown).toBe("ok");
477+
}, 20_000);
478+
378479
test("does not run init hooks for queued tasks until they start", async () => {
379480
const config = await createTestConfig(rootDir);
380481
stubStableIds(config, ["aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"], "dddddddddd");

src/node/services/taskService.ts

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ export class TaskService {
162162
private readonly mutex = new AsyncMutex();
163163
private readonly pendingWaitersByTaskId = new Map<string, PendingTaskWaiter[]>();
164164
private readonly pendingStartWaitersByTaskId = new Map<string, PendingTaskStartWaiter[]>();
165+
// Tracks workspaces currently blocked in a foreground wait (e.g. a task tool call awaiting
166+
// agent_report). Used to avoid scheduler deadlocks when maxParallelAgentTasks is low and tasks
167+
// spawn nested tasks in the foreground.
168+
private readonly foregroundAwaitCountByWorkspaceId = new Map<string, number>();
165169
// Cache completed reports so callers can retrieve them even after the task workspace is removed.
166170
// Bounded by TTL + max entries (see COMPLETED_REPORT_CACHE_*).
167171
private readonly completedReportsByTaskId = new Map<
@@ -682,9 +686,39 @@ export class TaskService {
682686
}
683687
}
684688

689+
private isForegroundAwaiting(workspaceId: string): boolean {
690+
const count = this.foregroundAwaitCountByWorkspaceId.get(workspaceId);
691+
return typeof count === "number" && count > 0;
692+
}
693+
694+
private startForegroundAwait(workspaceId: string): () => void {
695+
assert(workspaceId.length > 0, "startForegroundAwait: workspaceId must be non-empty");
696+
697+
const current = this.foregroundAwaitCountByWorkspaceId.get(workspaceId) ?? 0;
698+
assert(
699+
Number.isInteger(current) && current >= 0,
700+
"startForegroundAwait: expected non-negative integer counter"
701+
);
702+
703+
this.foregroundAwaitCountByWorkspaceId.set(workspaceId, current + 1);
704+
705+
return () => {
706+
const current = this.foregroundAwaitCountByWorkspaceId.get(workspaceId) ?? 0;
707+
assert(
708+
Number.isInteger(current) && current > 0,
709+
"startForegroundAwait cleanup: expected positive integer counter"
710+
);
711+
if (current <= 1) {
712+
this.foregroundAwaitCountByWorkspaceId.delete(workspaceId);
713+
} else {
714+
this.foregroundAwaitCountByWorkspaceId.set(workspaceId, current - 1);
715+
}
716+
};
717+
}
718+
685719
waitForAgentReport(
686720
taskId: string,
687-
options?: { timeoutMs?: number; abortSignal?: AbortSignal }
721+
options?: { timeoutMs?: number; abortSignal?: AbortSignal; requestingWorkspaceId?: string }
688722
): Promise<{ reportMarkdown: string; title?: string }> {
689723
assert(taskId.length > 0, "waitForAgentReport: taskId must be non-empty");
690724

@@ -700,6 +734,8 @@ export class TaskService {
700734
const timeoutMs = options?.timeoutMs ?? 10 * 60 * 1000; // 10 minutes
701735
assert(Number.isFinite(timeoutMs) && timeoutMs > 0, "waitForAgentReport: timeoutMs invalid");
702736

737+
const requestingWorkspaceId = coerceNonEmptyString(options?.requestingWorkspaceId);
738+
703739
return new Promise<{ reportMarkdown: string; title?: string }>((resolve, reject) => {
704740
// Validate existence early to avoid waiting on never-resolving task IDs.
705741
const cfg = this.config.loadConfigOrDefault();
@@ -712,6 +748,9 @@ export class TaskService {
712748
let timeout: ReturnType<typeof setTimeout> | null = null;
713749
let startWaiter: PendingTaskStartWaiter | null = null;
714750
let abortListener: (() => void) | null = null;
751+
let stopBlockingRequester: (() => void) | null = requestingWorkspaceId
752+
? this.startForegroundAwait(requestingWorkspaceId)
753+
: null;
715754

716755
const startReportTimeout = () => {
717756
if (timeout) return;
@@ -759,6 +798,14 @@ export class TaskService {
759798
options.abortSignal.removeEventListener("abort", abortListener);
760799
abortListener = null;
761800
}
801+
802+
if (stopBlockingRequester) {
803+
try {
804+
stopBlockingRequester();
805+
} finally {
806+
stopBlockingRequester = null;
807+
}
808+
}
762809
},
763810
};
764811

@@ -798,6 +845,13 @@ export class TaskService {
798845
cleanupStartWaiter();
799846
startReportTimeout();
800847
}
848+
849+
// If the awaited task is queued and the caller is blocked in the foreground, ensure the
850+
// scheduler runs after the waiter is registered. This avoids deadlocks when
851+
// maxParallelAgentTasks is low.
852+
if (requestingWorkspaceId) {
853+
void this.maybeStartQueuedTasks();
854+
}
801855
} else {
802856
startReportTimeout();
803857
}
@@ -1040,6 +1094,14 @@ export class TaskService {
10401094
let activeCount = 0;
10411095
for (const task of this.listAgentTaskWorkspaces(config)) {
10421096
const status: AgentTaskStatus = task.taskStatus ?? "running";
1097+
// If this task workspace is blocked in a foreground wait, do not count it towards parallelism.
1098+
// This prevents deadlocks where a task spawns a nested task in the foreground while
1099+
// maxParallelAgentTasks is low (e.g. 1).
1100+
// Note: StreamManager can still report isStreaming() while a tool call is executing, so
1101+
// isStreaming is not a reliable signal for "actively doing work" here.
1102+
if (status === "running" && task.id && this.isForegroundAwaiting(task.id)) {
1103+
continue;
1104+
}
10431105
if (status === "running" || status === "awaiting_report") {
10441106
activeCount += 1;
10451107
continue;

src/node/services/tools/task.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export const createTaskTool: ToolFactory = (config: ToolConfiguration) => {
4444

4545
const report = await taskService.waitForAgentReport(created.data.taskId, {
4646
abortSignal,
47+
requestingWorkspaceId: workspaceId,
4748
});
4849

4950
return parseToolResult(

src/node/services/tools/task_await.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => {
5858
const report = await taskService.waitForAgentReport(taskId, {
5959
timeoutMs,
6060
abortSignal,
61+
requestingWorkspaceId: workspaceId,
6162
});
6263
return {
6364
status: "completed" as const,

0 commit comments

Comments
 (0)