From 4c9ac5ae0af76990bf4ae3f4d4a4a81d50196d5d Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Tue, 23 Dec 2025 17:35:23 +0100 Subject: [PATCH] feat: add mux acp stdio bridge subcommand Change-Id: I50480960bbcd75e811f496421dd45b87c9d58820 Signed-off-by: Thomas Kosiewski --- bun.lock | 3 + docs/acp.mdx | 84 +++++ docs/cli.mdx | 10 + docs/docs.json | 1 + package.json | 1 + src/cli/acp.ts | 682 ++++++++++++++++++++++++++++++++++++++ src/cli/acpUtils.test.ts | 77 +++++ src/cli/acpUtils.ts | 70 ++++ src/cli/api.ts | 40 +-- src/cli/discoverServer.ts | 76 +++++ src/cli/index.ts | 5 + 11 files changed, 1011 insertions(+), 38 deletions(-) create mode 100644 docs/acp.mdx create mode 100644 src/cli/acp.ts create mode 100644 src/cli/acpUtils.test.ts create mode 100644 src/cli/acpUtils.ts create mode 100644 src/cli/discoverServer.ts diff --git a/bun.lock b/bun.lock index 099307c4d4..19ebf58ef0 100644 --- a/bun.lock +++ b/bun.lock @@ -5,6 +5,7 @@ "": { "name": "mux", "dependencies": { + "@agentclientprotocol/sdk": "^0.12.0", "@ai-sdk/amazon-bedrock": "^3.0.61", "@ai-sdk/anthropic": "^2.0.47", "@ai-sdk/deepseek": "^1.0.31", @@ -177,6 +178,8 @@ "@adobe/css-tools": ["@adobe/css-tools@4.4.4", "", {}, "sha512-Elp+iwUx5rN5+Y8xLt5/GRoG20WGoDCQ/1Fb+1LiGtvwbDavuSk0jhD/eZdckHAuzcDzccnkv+rEjyWfRx18gg=="], + "@agentclientprotocol/sdk": ["@agentclientprotocol/sdk@0.12.0", "", { "peerDependencies": { "zod": "^3.25.0 || ^4.0.0" } }, "sha512-V8uH/KK1t7utqyJmTA7y7DzKu6+jKFIXM+ZVouz8E55j8Ej2RV42rEvPKn3/PpBJlliI5crcGk1qQhZ7VwaepA=="], + "@ai-sdk/amazon-bedrock": ["@ai-sdk/amazon-bedrock@3.0.65", "", { "dependencies": { "@ai-sdk/anthropic": "2.0.53", "@ai-sdk/provider": "2.0.0", "@ai-sdk/provider-utils": "3.0.18", "@smithy/eventstream-codec": "^4.0.1", "@smithy/util-utf8": "^4.0.0", "aws4fetch": "^1.0.20" }, "peerDependencies": { "zod": "^3.25.76 || ^4.1.8" } }, "sha512-E5KJv9OvLJitwPo6GnTgYdssTjEbwVW08TXqaQE2C6hfpg6XdwMXc7BJvQ97eXogGETAyFSS0irDYsbA90rB+g=="], "@ai-sdk/anthropic": ["@ai-sdk/anthropic@2.0.53", "", { "dependencies": { "@ai-sdk/provider": "2.0.0", "@ai-sdk/provider-utils": "3.0.18" }, "peerDependencies": { "zod": "^3.25.76 || ^4.1.8" } }, "sha512-ih7NV+OFSNWZCF+tYYD7ovvvM+gv7TRKQblpVohg2ipIwC9Y0TirzocJVREzZa/v9luxUwFbsPji++DUDWWxsg=="], diff --git a/docs/acp.mdx b/docs/acp.mdx new file mode 100644 index 0000000000..d8b2465e6e --- /dev/null +++ b/docs/acp.mdx @@ -0,0 +1,84 @@ +--- +title: Agent Client Protocol (ACP) +description: Use mux as an ACP agent in editors like Zed +--- + +`mux acp` starts an **Agent Client Protocol (ACP)** agent over **stdio**. + +It acts as a thin bridge to a running mux **HTTP/WebSocket API server**, so editors that support ACP (like Zed) can drive mux workspaces. + +## Prerequisites + +You need a mux API server running locally: + +- **Mux Desktop**: the API server is typically started automatically (unless disabled). +- **Standalone**: run `mux server`. + +Server discovery order: + +1. `--server-url` / `--server-token` +2. `MUX_SERVER_URL` / `MUX_SERVER_AUTH_TOKEN` +3. `~/.mux/server.lock` + +If none are available, `mux acp` will exit with an error. + +## Zed setup + +Zed can run any ACP agent as a subprocess via `agent_servers`. + +Add this to your Zed `settings.json`: + +```json +{ + "agent_servers": { + "Mux": { + "command": "mux", + "args": ["acp"], + "env": {} + } + } +} +``` + +Then in Zed, create a new external agent thread and select **Mux**. + +### Using a specific mux server + +If you want to connect to a specific server instance, add args/env: + +```json +{ + "agent_servers": { + "Mux (custom server)": { + "command": "mux", + "args": ["acp", "--server-url", "http://127.0.0.1:3000", "--server-token", ""], + "env": {} + } + } +} +``` + +## Notes + +- `mux acp` reserves **stdout** for the ACP protocol. Logs go to **stderr**. +- By default, mux uses the **local** runtime (project directory directly). + - For `--runtime worktree|ssh`, you must also pass `--trunk-branch`. + +## Troubleshooting + +### "No running mux API server found" + +- Start mux desktop (ensure API server is enabled), or run: + +```bash +mux server +``` + +- Or explicitly set `MUX_SERVER_URL` / `MUX_SERVER_AUTH_TOKEN`. + +### Zed shows no output / agent immediately disconnects + +Open Zed’s ACP debug logs (Command Palette: `dev: open acp logs`) and look for: + +- JSON parsing errors (often means something wrote to stdout) +- Connection/auth errors to the mux server diff --git a/docs/cli.mdx b/docs/cli.mdx index 1b774601f8..ee80948fb1 100644 --- a/docs/cli.mdx +++ b/docs/cli.mdx @@ -84,6 +84,16 @@ Options: - `--auth-token ` - Optional bearer token for authentication - `--add-project ` - Add and open project at the specified path +## `mux acp` + +Start an ACP (Agent Client Protocol) stdio agent for editor integrations (e.g. Zed): + +```bash +mux acp +``` + +For setup details, see the **Agent Client Protocol (ACP)** docs. + ## `mux desktop` Launch the desktop app. This is automatically invoked when running the packaged app or via `electron .`: diff --git a/docs/docs.json b/docs/docs.json index a283c62e78..c9ba6c52ac 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -48,6 +48,7 @@ }, "plan-mode", "vscode-extension", + "acp", "models", { "group": "Keyboard Shortcuts", diff --git a/package.json b/package.json index 8842d50374..b08078f50c 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "postinstall": "sh scripts/postinstall.sh" }, "dependencies": { + "@agentclientprotocol/sdk": "^0.12.0", "@ai-sdk/amazon-bedrock": "^3.0.61", "@ai-sdk/anthropic": "^2.0.47", "@ai-sdk/deepseek": "^1.0.31", diff --git a/src/cli/acp.ts b/src/cli/acp.ts new file mode 100644 index 0000000000..8faa1a6ad3 --- /dev/null +++ b/src/cli/acp.ts @@ -0,0 +1,682 @@ +import { Command } from "commander"; +import * as path from "path"; +import * as fs from "fs/promises"; +import * as crypto from "crypto"; +import { Readable, Writable } from "stream"; +import { WebSocket } from "ws"; + +import { + AgentSideConnection, + ndJsonStream, + type Agent, + type AgentCapabilities, + type AuthenticateRequest, + type InitializeRequest, + type InitializeResponse, + type NewSessionRequest, + type NewSessionResponse, + type PromptRequest, + type PromptResponse, + type StopReason, + type CancelNotification, + type SessionNotification, +} from "@agentclientprotocol/sdk"; + +import { RPCLink as HTTPRPCLink } from "@orpc/client/fetch"; +import { RPCLink as WebSocketRPCLink } from "@orpc/client/websocket"; +import { createORPCClient } from "@orpc/client"; +import type { RouterClient } from "@orpc/server"; + +import type { AppRouter } from "@/node/orpc/router"; +import { discoverServer } from "./discoverServer"; +import { getMuxSrcDir } from "@/common/constants/paths"; +import type { RuntimeConfig } from "@/common/types/runtime"; +import assert from "@/common/utils/assert"; +import { + isCaughtUpMessage, + isStreamAbort, + isStreamDelta, + isStreamEnd, + isStreamError, + type WorkspaceChatMessage, +} from "@/common/orpc/types"; +import { contentBlocksToText, muxChatMessageToSessionUpdate } from "./acpUtils"; +import { getErrorMessage } from "@/common/utils/errors"; +import { getParseOptions } from "./argv"; +import { VERSION } from "@/version"; + +type RuntimeFlag = "local" | "worktree" | "ssh"; +type LogLevel = "error" | "warn" | "info" | "debug"; + +interface CliOptions { + serverUrl?: string; + serverToken?: string; + project?: string; + workspace?: string; + runtime: RuntimeFlag; + trunkBranch?: string; + srcBaseDir?: string; + sshRuntimeHost?: string; + sshRuntimePort?: string; + sshIdentityFile?: string; + logLevel: LogLevel; +} + +const LOG_LEVEL_ORDER: Record = { + error: 0, + warn: 1, + info: 2, + debug: 3, +}; + +function stderrLog(level: LogLevel, current: LogLevel, ...args: unknown[]): void { + if (LOG_LEVEL_ORDER[level] > LOG_LEVEL_ORDER[current]) { + return; + } + + const prefix = `[mux acp] ${level}:`; + // Always write to stderr to avoid contaminating ACP stdout. + console.error(prefix, ...args); +} + +function joinUrlPath(baseUrl: string, suffix: string): string { + const url = new URL(baseUrl); + const basePath = url.pathname.replace(/\/+$/, ""); + const addPath = suffix.startsWith("/") ? suffix : `/${suffix}`; + url.pathname = `${basePath}${addPath}`; + return url.toString(); +} + +function toWebSocketUrl(baseUrl: string): string { + const url = new URL(baseUrl); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.toString(); +} + +function parseLogLevel(value: string | undefined): LogLevel { + const normalized = value?.trim().toLowerCase(); + if (normalized === undefined || normalized === "") { + return "error"; + } + + if ( + normalized === "error" || + normalized === "warn" || + normalized === "info" || + normalized === "debug" + ) { + return normalized; + } + + throw new Error( + `Invalid --log-level: ${value ?? "undefined"}. Expected: error, warn, info, debug` + ); +} + +function parseRuntimeFlag(value: string | undefined): RuntimeFlag { + const normalized = value?.trim().toLowerCase(); + if (!normalized) { + return "local"; + } + + if (normalized === "local" || normalized === "worktree" || normalized === "ssh") { + return normalized; + } + + throw new Error(`Invalid --runtime: ${value ?? "undefined"}. Expected: local, worktree, ssh`); +} + +function generateWorkspaceName(): string { + return `acp-${crypto.randomBytes(4).toString("hex")}`; +} + +function toStopReason(value: StopReason): StopReason { + return value; +} + +function normalizeToken(token: string | undefined): string | undefined { + const trimmed = token?.trim(); + if (!trimmed) { + return undefined; + } + return trimmed; +} + +function isErrorEvent( + msg: WorkspaceChatMessage +): msg is Extract { + return (msg as { type?: string }).type === "error"; +} + +class MuxOrpcClient { + readonly http: RouterClient; + readonly ws: RouterClient; + private readonly websocket: WebSocket; + + private constructor(opts: { + http: RouterClient; + ws: RouterClient; + websocket: WebSocket; + }) { + this.http = opts.http; + this.ws = opts.ws; + this.websocket = opts.websocket; + } + + static async connect(opts: { + baseUrl: string; + authToken: string | undefined; + }): Promise { + const baseUrl = opts.baseUrl.replace(/\/+$/, ""); + + const headers = opts.authToken ? { Authorization: `Bearer ${opts.authToken}` } : undefined; + + const httpLink = new HTTPRPCLink({ + url: joinUrlPath(baseUrl, "/orpc"), + headers, + }); + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion -- needed for tsgo typecheck + const http = createORPCClient(httpLink) as RouterClient; + + const wsUrl = joinUrlPath(toWebSocketUrl(baseUrl), "/orpc/ws"); + const websocket = new WebSocket(wsUrl, { + headers, + }); + + await new Promise((resolve, reject) => { + websocket.on("open", () => resolve()); + websocket.on("error", (err) => reject(err)); + }); + + const wsLink = new WebSocketRPCLink({ + websocket: websocket as unknown as globalThis.WebSocket, + }); + // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion -- needed for tsgo typecheck + const ws = createORPCClient(wsLink) as RouterClient; + + return new MuxOrpcClient({ http, ws, websocket }); + } + + close(): void { + try { + this.websocket.close(); + } catch { + // best effort + } + } +} + +interface PendingPrompt { + cancelled: boolean; + resolve: (stopReason: StopReason) => void; + reject: (err: Error) => void; + promise: Promise; +} + +interface SessionState { + workspaceId: string; + caughtUp: boolean; + caughtUpPromise: Promise; + resolveCaughtUp: () => void; + pendingPrompt: PendingPrompt | null; + streamTask: Promise; +} + +class MuxAcpAgent implements Agent { + private readonly sessions = new Map(); + private pinnedWorkspaceId: string | null = null; + + constructor( + private readonly conn: AgentSideConnection, + private readonly mux: MuxOrpcClient, + private readonly opts: CliOptions + ) { + // Ensure we don't leak any logs onto stdout after the protocol starts. + console.log = (...args: unknown[]) => console.error(...args); + console.info = (...args: unknown[]) => console.error(...args); + } + + initialize(params: InitializeRequest): Promise { + stderrLog("debug", this.opts.logLevel, "initialize", params); + + const supportedProtocolVersion = 1; + + const agentCapabilities: AgentCapabilities = { + promptCapabilities: { + embeddedContext: true, + image: false, + audio: false, + }, + // We don't implement session/load yet. + loadSession: false, + }; + + const versionRecord = VERSION as Record; + const gitDescribe = + typeof versionRecord.git_describe === "string" ? versionRecord.git_describe : "unknown"; + + return Promise.resolve({ + protocolVersion: supportedProtocolVersion, + agentInfo: { + name: "mux", + title: "Mux", + version: gitDescribe, + }, + agentCapabilities, + authMethods: [], + }); + } + + authenticate(_params: AuthenticateRequest): Promise { + // mux server auth is handled via bearer token passed to this bridge. + return Promise.resolve(); + } + + async newSession(params: NewSessionRequest): Promise { + stderrLog("debug", this.opts.logLevel, "newSession", params); + + const requestedCwd = params.cwd?.trim(); + const cwdFromRequest = requestedCwd && requestedCwd.length > 0 ? requestedCwd : undefined; + + const projectFallback = this.opts.project?.trim(); + const cwdFromFlag = projectFallback && projectFallback.length > 0 ? projectFallback : undefined; + + const cwd = cwdFromRequest ?? cwdFromFlag ?? process.cwd(); + const projectPath = path.resolve(cwd); + + await this.assertDirectoryExists(projectPath); + + const sessionId = await this.ensureWorkspaceForSession(projectPath); + const state = this.ensureSessionState(sessionId); + + // Wait for catch-up so subsequent prompt calls see only live events. + await this.waitForCaughtUp(state); + + return { + sessionId, + }; + } + + async prompt(params: PromptRequest): Promise { + const sessionId = params.sessionId; + stderrLog("debug", this.opts.logLevel, "prompt", { + sessionId, + promptBlocks: params.prompt.length, + }); + + const state = this.sessions.get(sessionId); + if (!state) { + throw new Error(`Unknown sessionId: ${sessionId}`); + } + + if (state.pendingPrompt) { + throw new Error(`A prompt is already running for sessionId: ${sessionId}`); + } + + const message = contentBlocksToText(params.prompt); + if (!message) { + throw new Error("Prompt was empty after converting ACP content blocks"); + } + + const pending = this.createPendingPrompt(); + state.pendingPrompt = pending; + + try { + const sendResult = await this.mux.http.workspace.sendMessage({ + workspaceId: sessionId, + message, + }); + + if (!sendResult.success) { + const rendered = getErrorMessage(sendResult.error); + throw new Error(`workspace.sendMessage failed: ${rendered}`); + } + + const stopReason = await pending.promise; + return { stopReason: toStopReason(stopReason) }; + } finally { + // If the prompt completed (or failed), clear pending state. + if (state.pendingPrompt === pending) { + state.pendingPrompt = null; + } + } + } + + async cancel(params: CancelNotification): Promise { + const sessionId = params.sessionId; + stderrLog("debug", this.opts.logLevel, "cancel", { sessionId }); + + const state = this.sessions.get(sessionId); + if (state?.pendingPrompt) { + state.pendingPrompt.cancelled = true; + } + + try { + const result = await this.mux.http.workspace.interruptStream({ workspaceId: sessionId }); + if (!result.success) { + stderrLog("warn", this.opts.logLevel, `workspace.interruptStream failed: ${result.error}`); + } + } catch (error) { + stderrLog("warn", this.opts.logLevel, "workspace.interruptStream threw", error); + } + } + + private createPendingPrompt(): PendingPrompt { + let resolve: ((reason: StopReason) => void) | null = null; + let reject: ((err: Error) => void) | null = null; + + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + assert(resolve, "resolve must be set"); + assert(reject, "reject must be set"); + + return { + cancelled: false, + resolve, + reject, + promise, + }; + } + + private async assertDirectoryExists(dirPath: string): Promise { + try { + const stat = await fs.stat(dirPath); + if (!stat.isDirectory()) { + throw new Error(`Not a directory: ${dirPath}`); + } + } catch (error) { + const rendered = getErrorMessage(error); + throw new Error(`Invalid project directory: ${dirPath} (${rendered})`); + } + } + + private async ensureWorkspaceForSession(projectPath: string): Promise { + const pinned = this.opts.workspace?.trim(); + if (pinned) { + this.pinnedWorkspaceId ??= await this.resolveWorkspaceId(pinned); + assert(this.pinnedWorkspaceId, "pinnedWorkspaceId must be set"); + return this.pinnedWorkspaceId; + } + + const runtimeConfig = this.buildRuntimeConfig(); + const trunkBranch = this.opts.trunkBranch?.trim(); + + if (runtimeConfig.type !== "local" && !trunkBranch) { + throw new Error("--trunk-branch is required for --runtime worktree|ssh"); + } + + const createResult = await this.mux.http.workspace.create({ + projectPath, + branchName: generateWorkspaceName(), + trunkBranch: runtimeConfig.type === "local" ? undefined : trunkBranch, + runtimeConfig, + title: "ACP", + }); + + if (!createResult.success) { + throw new Error(`workspace.create failed: ${createResult.error}`); + } + + return createResult.metadata.id; + } + + private buildRuntimeConfig(): RuntimeConfig { + const runtime = this.opts.runtime; + if (runtime === "local") { + return { type: "local" }; + } + + if (runtime === "worktree") { + const explicitSrcBaseDir = this.opts.srcBaseDir?.trim(); + const srcBaseDir = + (explicitSrcBaseDir && explicitSrcBaseDir.length > 0 ? explicitSrcBaseDir : undefined) ?? + getMuxSrcDir(); + return { type: "worktree", srcBaseDir }; + } + + assert(runtime === "ssh", "runtime must be ssh"); + + const host = this.opts.sshRuntimeHost?.trim(); + if (!host) { + throw new Error("--ssh-runtime-host is required for --runtime ssh"); + } + + const srcBaseDir = this.opts.srcBaseDir?.trim(); + if (!srcBaseDir) { + throw new Error("--src-base-dir is required for --runtime ssh (remote base directory)"); + } + + const portRaw = this.opts.sshRuntimePort?.trim(); + const port = portRaw ? Number.parseInt(portRaw, 10) : undefined; + if (portRaw && (!Number.isFinite(port) || port! <= 0)) { + throw new Error(`Invalid --ssh-runtime-port: ${portRaw}`); + } + + const identityFileTrimmed = this.opts.sshIdentityFile?.trim(); + const identityFile = + identityFileTrimmed && identityFileTrimmed.length > 0 ? identityFileTrimmed : undefined; + + return { + type: "ssh", + host, + srcBaseDir, + identityFile, + port, + }; + } + + private async resolveWorkspaceId(value: string): Promise { + const trimmed = value.trim(); + if (!trimmed) { + throw new Error("--workspace must not be empty"); + } + + const byId = await this.mux.http.workspace.getInfo({ workspaceId: trimmed }); + if (byId) { + return byId.id; + } + + const active = await this.mux.http.workspace.list(); + const archived = await this.mux.http.workspace.list({ archived: true }); + const combined = [...active, ...archived]; + const byName = combined.find((w) => w.name === trimmed); + if (byName) { + return byName.id; + } + + throw new Error(`Workspace not found: ${trimmed}`); + } + + private ensureSessionState(workspaceId: string): SessionState { + const existing = this.sessions.get(workspaceId); + if (existing) { + return existing; + } + + let resolveCaughtUp: (() => void) | null = null; + const caughtUpPromise = new Promise((resolve) => { + resolveCaughtUp = resolve; + }); + + assert(resolveCaughtUp, "resolveCaughtUp must be set"); + + const state: SessionState = { + workspaceId, + caughtUp: false, + caughtUpPromise, + resolveCaughtUp, + pendingPrompt: null, + streamTask: Promise.resolve(), + }; + + state.streamTask = this.consumeChatStream(state).catch((error) => { + stderrLog("warn", this.opts.logLevel, `onChat stream ended for ${workspaceId}:`, error); + }); + + this.sessions.set(workspaceId, state); + return state; + } + + private async waitForCaughtUp(state: SessionState): Promise { + if (state.caughtUp) return; + + const timeoutMs = 30_000; + await Promise.race([ + state.caughtUpPromise, + new Promise((_resolve, reject) => + setTimeout( + () => reject(new Error(`Timed out waiting for caught-up (${timeoutMs}ms)`)), + timeoutMs + ) + ), + ]); + } + + private async consumeChatStream(state: SessionState): Promise { + const stream = await this.mux.ws.workspace.onChat({ workspaceId: state.workspaceId }); + + for await (const msg of stream) { + if (this.conn.signal.aborted) { + break; + } + + if (!state.caughtUp) { + if (isCaughtUpMessage(msg)) { + state.caughtUp = true; + state.resolveCaughtUp(); + } + continue; + } + + await this.handleLiveChatMessage(state, msg); + } + } + + private async handleLiveChatMessage( + state: SessionState, + msg: WorkspaceChatMessage + ): Promise { + if (isStreamDelta(msg) || (msg as { type?: string }).type === "reasoning-delta") { + const update = muxChatMessageToSessionUpdate(msg); + if (update) { + await this.safeSessionUpdate({ + sessionId: state.workspaceId, + update, + }); + } + return; + } + + if (isStreamEnd(msg)) { + const pending = state.pendingPrompt; + if (pending) { + pending.resolve(pending.cancelled ? "cancelled" : "end_turn"); + } + return; + } + + if (isStreamAbort(msg)) { + const pending = state.pendingPrompt; + if (pending) { + pending.resolve("cancelled"); + } + return; + } + + if (isStreamError(msg) || isErrorEvent(msg)) { + const pending = state.pendingPrompt; + if (pending) { + const rendered = isStreamError(msg) + ? msg.error + : isErrorEvent(msg) + ? msg.error + : "Unknown stream error"; + pending.reject(new Error(rendered)); + } + return; + } + + // Ignore tool events, history replay, init logs, etc. + } + + private async safeSessionUpdate(params: SessionNotification): Promise { + try { + await this.conn.sessionUpdate(params); + } catch (error) { + if (this.conn.signal.aborted) { + return; + } + throw error; + } + } +} + +const program = new Command(); +program + .name("mux acp") + .description("Start an Agent Client Protocol (ACP) stdio bridge for mux") + .option("--server-url ", "mux server base URL (overrides lockfile and env)") + .option("--server-token ", "mux server bearer token (overrides lockfile and env)") + .option("--project ", "default project directory (used if ACP request omits cwd)") + .option("--workspace ", "attach to an existing mux workspace (by id or name)") + .option("--runtime ", "runtime type: local, worktree, ssh", "local") + .option("--trunk-branch ", "trunk branch (required for worktree/ssh runtimes)") + .option( + "--src-base-dir ", + "worktree/ssh base directory (defaults to ~/.mux/src for worktree)" + ) + .option("--ssh-runtime-host ", "SSH host for ssh runtime (e.g., user@host)") + .option("--ssh-runtime-port ", "SSH port for ssh runtime") + .option("--ssh-identity-file ", "SSH identity file for ssh runtime") + .option("--log-level ", "log level: error, warn, info, debug", "error") + .parse(process.argv, getParseOptions()); + +const rawOpts = program.opts(); +const opts: CliOptions = { + serverUrl: rawOpts.serverUrl as string | undefined, + serverToken: rawOpts.serverToken as string | undefined, + project: rawOpts.project as string | undefined, + workspace: rawOpts.workspace as string | undefined, + runtime: parseRuntimeFlag(rawOpts.runtime as string | undefined), + trunkBranch: rawOpts.trunkBranch as string | undefined, + srcBaseDir: rawOpts.srcBaseDir as string | undefined, + sshRuntimeHost: rawOpts.sshRuntimeHost as string | undefined, + sshRuntimePort: rawOpts.sshRuntimePort as string | undefined, + sshIdentityFile: rawOpts.sshIdentityFile as string | undefined, + logLevel: parseLogLevel(rawOpts.logLevel as string | undefined), +}; + +(async () => { + const { baseUrl, authToken } = await discoverServer({ + baseUrl: opts.serverUrl, + authToken: normalizeToken(opts.serverToken), + }); + + stderrLog("info", opts.logLevel, `Connecting to mux server at ${baseUrl}`); + + const mux = await MuxOrpcClient.connect({ + baseUrl, + authToken: normalizeToken(authToken), + }); + + const stdoutStream = Writable.toWeb(process.stdout) as unknown as WritableStream< + Uint8Array + >; + const stdinStream = Readable.toWeb(process.stdin) as unknown as ReadableStream< + Uint8Array + >; + + const stream = ndJsonStream(stdoutStream, stdinStream); + + const connection = new AgentSideConnection((conn) => new MuxAcpAgent(conn, mux, opts), stream); + + await connection.closed; + mux.close(); +})().catch((error) => { + stderrLog("error", opts.logLevel, "mux acp failed:", error); + process.exit(1); +}); diff --git a/src/cli/acpUtils.test.ts b/src/cli/acpUtils.test.ts new file mode 100644 index 0000000000..4cd42cef14 --- /dev/null +++ b/src/cli/acpUtils.test.ts @@ -0,0 +1,77 @@ +import { describe, expect, test } from "bun:test"; +import type { ContentBlock } from "@agentclientprotocol/sdk"; +import type { WorkspaceChatMessage } from "@/common/orpc/types"; +import { contentBlocksToText, muxChatMessageToSessionUpdate } from "./acpUtils"; + +describe("contentBlocksToText", () => { + test("joins text blocks", () => { + const blocks: ContentBlock[] = [ + { type: "text", text: "hello" }, + { type: "text", text: "world" }, + ]; + + expect(contentBlocksToText(blocks)).toBe("hello\n\nworld"); + }); + + test("includes resource links", () => { + const blocks: ContentBlock[] = [ + { type: "text", text: "see" }, + { + type: "resource_link", + name: "file", + uri: "file:///tmp/a.txt", + title: "a.txt", + }, + ]; + + expect(contentBlocksToText(blocks)).toBe("see\n\n[resource] file:///tmp/a.txt (a.txt)"); + }); +}); + +describe("muxChatMessageToSessionUpdate", () => { + test("maps stream deltas to agent_message_chunk", () => { + const msg: WorkspaceChatMessage = { + type: "stream-delta", + workspaceId: "w", + messageId: "m", + delta: "hi", + tokens: 1, + timestamp: 0, + }; + + expect(muxChatMessageToSessionUpdate(msg)).toEqual({ + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "hi", + }, + }); + }); + + test("maps reasoning deltas to agent_thought_chunk", () => { + const msg: WorkspaceChatMessage = { + type: "reasoning-delta", + workspaceId: "w", + messageId: "m", + delta: "thinking", + tokens: 1, + timestamp: 0, + }; + + expect(muxChatMessageToSessionUpdate(msg)).toEqual({ + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: "thinking", + }, + }); + }); + + test("returns null for unhandled message types", () => { + const msg: WorkspaceChatMessage = { + type: "caught-up", + }; + + expect(muxChatMessageToSessionUpdate(msg)).toBeNull(); + }); +}); diff --git a/src/cli/acpUtils.ts b/src/cli/acpUtils.ts new file mode 100644 index 0000000000..dbe9860060 --- /dev/null +++ b/src/cli/acpUtils.ts @@ -0,0 +1,70 @@ +import type { ContentBlock, SessionUpdate } from "@agentclientprotocol/sdk"; +import { isReasoningDelta, isStreamDelta, type WorkspaceChatMessage } from "@/common/orpc/types"; + +export function contentBlocksToText(blocks: ContentBlock[]): string { + const parts: string[] = []; + + for (const block of blocks) { + if (block.type === "text") { + const text = block.text.trimEnd(); + if (text.length > 0) { + parts.push(text); + } + continue; + } + + if (block.type === "resource_link") { + const title = block.title?.trim() ? ` (${block.title.trim()})` : ""; + parts.push(`[resource] ${block.uri}${title}`); + continue; + } + + if (block.type === "resource") { + // If the client provided embedded context, include it verbatim when it's text-based. + // This keeps the bridge simple while still surfacing useful context to mux. + const resource = block.resource; + if ("text" in resource) { + const uri = resource.uri.trim() ? ` ${resource.uri}` : ""; + parts.push(`[resource${uri}]\n${resource.text}`); + } else { + parts.push(`[resource] ${resource.uri}`); + } + continue; + } + + // Unsupported by mux ACP bridge (image/audio). The client should only send these when enabled. + parts.push(`[${block.type}]`); + } + + return parts.join("\n\n").trim(); +} + +export function muxChatMessageToSessionUpdate(msg: WorkspaceChatMessage): SessionUpdate | null { + if (isStreamDelta(msg)) { + if (!msg.delta) { + return null; + } + return { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: msg.delta, + }, + }; + } + + if (isReasoningDelta(msg)) { + if (!msg.delta) { + return null; + } + return { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: msg.delta, + }, + }; + } + + return null; +} diff --git a/src/cli/api.ts b/src/cli/api.ts index 5cc25aec8a..19556778e6 100644 --- a/src/cli/api.ts +++ b/src/cli/api.ts @@ -13,52 +13,16 @@ import { createCli } from "trpc-cli"; import { router } from "@/node/orpc/router"; import { proxifyOrpc } from "./proxifyOrpc"; -import { ServerLockfile } from "@/node/services/serverLockfile"; -import { getMuxHome } from "@/common/constants/paths"; import { getArgsAfterSplice } from "./argv"; +import { discoverServer } from "./discoverServer"; // index.ts already splices "api" from argv before importing this module, // so we just need to get the remaining args after the splice point. const args = getArgsAfterSplice(); -interface ServerDiscovery { - baseUrl: string; - authToken: string | undefined; -} - -async function discoverServer(): Promise { - // Priority 1: Explicit env vars override everything - if (process.env.MUX_SERVER_URL) { - return { - baseUrl: process.env.MUX_SERVER_URL, - authToken: process.env.MUX_SERVER_AUTH_TOKEN, - }; - } - - // Priority 2: Try lockfile discovery (running Electron or mux server) - try { - const lockfile = new ServerLockfile(getMuxHome()); - const data = await lockfile.read(); - if (data) { - return { - baseUrl: data.baseUrl, - authToken: data.token, - }; - } - } catch { - // Ignore lockfile errors - } - - // Priority 3: Default fallback (standalone server on default port) - return { - baseUrl: "http://localhost:3000", - authToken: process.env.MUX_SERVER_AUTH_TOKEN, - }; -} - // Run async discovery then start CLI (async () => { - const { baseUrl, authToken } = await discoverServer(); + const { baseUrl, authToken } = await discoverServer({ fallbackBaseUrl: "http://localhost:3000" }); const proxiedRouter = proxifyOrpc(router(), { baseUrl, authToken }); diff --git a/src/cli/discoverServer.ts b/src/cli/discoverServer.ts new file mode 100644 index 0000000000..91fd140f7c --- /dev/null +++ b/src/cli/discoverServer.ts @@ -0,0 +1,76 @@ +import { getMuxHome, migrateLegacyMuxHome } from "@/common/constants/paths"; +import { ServerLockfile } from "@/node/services/serverLockfile"; + +export interface ServerDiscovery { + baseUrl: string; + authToken: string | undefined; +} + +function normalizeAuthToken(token: string | undefined): string | undefined { + const trimmed = token?.trim(); + if (!trimmed) { + return undefined; + } + return trimmed; +} + +export interface DiscoverServerOptions { + /** Explicit server URL override (highest priority). */ + baseUrl?: string; + /** Explicit auth token override (highest priority). */ + authToken?: string; + /** + * Optional fallback base URL if discovery fails. + * Used by `mux api` for backward compatibility. + */ + fallbackBaseUrl?: string; +} + +export async function discoverServer(options?: DiscoverServerOptions): Promise { + migrateLegacyMuxHome(); + + const explicitBaseUrl = options?.baseUrl?.trim(); + const explicitAuthToken = normalizeAuthToken(options?.authToken); + if (explicitBaseUrl) { + return { + baseUrl: explicitBaseUrl, + authToken: normalizeAuthToken(explicitAuthToken ?? process.env.MUX_SERVER_AUTH_TOKEN), + }; + } + + const envBaseUrl = process.env.MUX_SERVER_URL?.trim(); + if (envBaseUrl) { + return { + baseUrl: envBaseUrl, + authToken: normalizeAuthToken(explicitAuthToken ?? process.env.MUX_SERVER_AUTH_TOKEN), + }; + } + + try { + const lockfile = new ServerLockfile(getMuxHome()); + const data = await lockfile.read(); + if (data) { + return { + baseUrl: data.baseUrl, + authToken: normalizeAuthToken( + explicitAuthToken ?? process.env.MUX_SERVER_AUTH_TOKEN ?? data.token + ), + }; + } + } catch { + // Ignore lockfile errors + } + + const fallbackBaseUrl = options?.fallbackBaseUrl?.trim(); + if (fallbackBaseUrl) { + return { + baseUrl: fallbackBaseUrl, + authToken: normalizeAuthToken(explicitAuthToken ?? process.env.MUX_SERVER_AUTH_TOKEN), + }; + } + + throw new Error( + "No running mux API server found. Start mux desktop (API server enabled) or run `mux server`. " + + "You can also set MUX_SERVER_URL / MUX_SERVER_AUTH_TOKEN." + ); +} diff --git a/src/cli/index.ts b/src/cli/index.ts index 7798f2d49a..66455adfb1 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -54,6 +54,10 @@ if (subcommand === "run") { process.argv.splice(env.firstArgIndex, 1); // eslint-disable-next-line @typescript-eslint/no-require-imports require("./server"); +} else if (subcommand === "acp") { + process.argv.splice(env.firstArgIndex, 1); + // eslint-disable-next-line @typescript-eslint/no-require-imports + require("./acp"); } else if (subcommand === "api") { process.argv.splice(env.firstArgIndex, 1); // Must use native import() to load ESM module - trpc-cli requires ESM with top-level await. @@ -103,6 +107,7 @@ if (subcommand === "run") { program.command("run").description("Run a one-off agent task"); } program.command("server").description("Start the HTTP/WebSocket ORPC server"); + program.command("acp").description("Start an ACP (Agent Client Protocol) stdio bridge"); program.command("api").description("Interact with the mux API via a running server"); program .command("desktop")